Simon Marechal | 31 Jan 16:12 2013
Picon

Yet another Conduit question

I am working with bulk sources and sinks, that is with a type like:

Source m [a]
Sink [a] m ()

The problem is that I would like to work on individual values in my
conduit. I can have this:

concat :: (Monad m) => Conduit [a] m a
concat = awaitForever (mapM_ yield)

But how can I do it the other way around ? I suppose it involves pattern
matching on the different states my conduit might me in. But is that
even possible to do it in a "non blocking" way, that is catenate data
while there is something to read (up to a certain threshold), and send
it as soon as there is nothing left to read ? Or doesn't that make any
sense in the context of Conduits (in the sense that this conduit will be
recheck for input before the upstream conduits will have a chance to
operate) ?

Another approach would be to have a map equivalent:

conduitMap :: Conduit i m o -> Conduit [i] m [o]

But I am not sure how to do this either ...
Michael Snoyman | 1 Feb 05:21 2013

Re: Yet another Conduit question

Firstly, what's the use case that you want to deal with lists? If it's for efficiency, you'd probably be better off using a Vector instead.

But I think the inverse of `concat` is `singleton = Data.Conduit.List.map return`, or `awaitForever $ yield . return`, using the list instance for Monad. Your conduitMap could be implemented then as:

    conduitMap conduit = concat =$= conduit =$= singleton

Michael


On Thu, Jan 31, 2013 at 5:12 PM, Simon Marechal <simon <at> banquise.net> wrote:
I am working with bulk sources and sinks, that is with a type like:

Source m [a]
Sink [a] m ()

The problem is that I would like to work on individual values in my
conduit. I can have this:

concat :: (Monad m) => Conduit [a] m a
concat = awaitForever (mapM_ yield)

But how can I do it the other way around ? I suppose it involves pattern
matching on the different states my conduit might me in. But is that
even possible to do it in a "non blocking" way, that is catenate data
while there is something to read (up to a certain threshold), and send
it as soon as there is nothing left to read ? Or doesn't that make any
sense in the context of Conduits (in the sense that this conduit will be
recheck for input before the upstream conduits will have a chance to
operate) ?

Another approach would be to have a map equivalent:

conduitMap :: Conduit i m o -> Conduit [i] m [o]

But I am not sure how to do this either ...

_______________________________________________
Haskell-Cafe mailing list
Haskell-Cafe <at> haskell.org
http://www.haskell.org/mailman/listinfo/haskell-cafe

_______________________________________________
Haskell-Cafe mailing list
Haskell-Cafe <at> haskell.org
http://www.haskell.org/mailman/listinfo/haskell-cafe
Simon Marechal | 1 Feb 07:42 2013
Picon

Re: Yet another Conduit question

On 02/01/2013 05:21 AM, Michael Snoyman wrote:
> Firstly, what's the use case that you want to deal with lists? If it's
> for efficiency, you'd probably be better off using a Vector instead.

That is a good point, and I wanted to go that way, but was not sure it
would help me a lot here. My use case is for services where there is a
"bulk"  API, such as Redis pipelining or Elasticsearch bulk inserts. The
network round-trip gains would exceed by far those from a List to Vector
conversion.

> But I think the inverse of `concat` is `singleton =
> Data.Conduit.List.map return`, or `awaitForever $ yield . return`, using
> the list instance for Monad. Your conduitMap could be implemented then as:
> 
>     conduitMap conduit = concat =$= conduit =$= singleton

I can see how to do singleton, but that would gain me ... singletons.
That means I could not exploit a bulk API.
Michael Snoyman | 1 Feb 08:21 2013

Re: Yet another Conduit question




On Fri, Feb 1, 2013 at 8:42 AM, Simon Marechal <simon <at> banquise.net> wrote:
On 02/01/2013 05:21 AM, Michael Snoyman wrote:
> Firstly, what's the use case that you want to deal with lists? If it's
> for efficiency, you'd probably be better off using a Vector instead.

That is a good point, and I wanted to go that way, but was not sure it
would help me a lot here. My use case is for services where there is a
"bulk"  API, such as Redis pipelining or Elasticsearch bulk inserts. The
network round-trip gains would exceed by far those from a List to Vector
conversion.

> But I think the inverse of `concat` is `singleton =
> Data.Conduit.List.map return`, or `awaitForever $ yield . return`, using
> the list instance for Monad. Your conduitMap could be implemented then as:
>
>     conduitMap conduit = concat =$= conduit =$= singleton

I can see how to do singleton, but that would gain me ... singletons.
That means I could not exploit a bulk API.

_______________________________________________
Haskell-Cafe mailing list
Haskell-Cafe <at> haskell.org
http://www.haskell.org/mailman/listinfo/haskell-cafe


So you're saying you want to keep the same grouping that you had originally? Or do you want to batch up a certain number of results? There are lots of ways of approaching this problem, and the types don't imply nearly enough to determine what you're hoping to achieve here.

Michael
_______________________________________________
Haskell-Cafe mailing list
Haskell-Cafe <at> haskell.org
http://www.haskell.org/mailman/listinfo/haskell-cafe
Simon Marechal | 1 Feb 09:28 2013
Picon

Re: Yet another Conduit question

On 01/02/2013 08:21, Michael Snoyman wrote:
> So you're saying you want to keep the same grouping that you had
> originally? Or do you want to batch up a certain number of results?
> There are lots of ways of approaching this problem, and the types don't
> imply nearly enough to determine what you're hoping to achieve here.

Sorry for not being clear. I would like to group them "as much as
possible", that is up to a certain limit, and also within a "time
threshold". I believe that the conduit code will be called only when
something happens in the conduit, so an actual timer would be useless
(unless I handle this at the source perhaps, and propagate "ticks").

That is why in my first message I talked about stacking things into the
list until the conduit has no more input available, or a maximum size is
reached, but was not sure this even made sense.
Felipe Almeida Lessa | 3 Feb 16:06 2013
Picon

Re: Yet another Conduit question

I guess you could use the Flush datatype [1] depending on how your
data is generated.

Cheers,

[1] http://hackage.haskell.org/packages/archive/conduit/0.5.4.1/doc/html/Data-Conduit.html#t:Flush

On Fri, Feb 1, 2013 at 6:28 AM, Simon Marechal <simon <at> banquise.net> wrote:
> On 01/02/2013 08:21, Michael Snoyman wrote:
>> So you're saying you want to keep the same grouping that you had
>> originally? Or do you want to batch up a certain number of results?
>> There are lots of ways of approaching this problem, and the types don't
>> imply nearly enough to determine what you're hoping to achieve here.
>
> Sorry for not being clear. I would like to group them "as much as
> possible", that is up to a certain limit, and also within a "time
> threshold". I believe that the conduit code will be called only when
> something happens in the conduit, so an actual timer would be useless
> (unless I handle this at the source perhaps, and propagate "ticks").
>
> That is why in my first message I talked about stacking things into the
> list until the conduit has no more input available, or a maximum size is
> reached, but was not sure this even made sense.
>
> _______________________________________________
> Haskell-Cafe mailing list
> Haskell-Cafe <at> haskell.org
> http://www.haskell.org/mailman/listinfo/haskell-cafe

--

-- 
Felipe.
Michael Snoyman | 4 Feb 10:25 2013

Re: Yet another Conduit question

I think this is probably the right approach. However, there's something important to point out: flushing based on timing issues must be handled *outside* of the conduit functionality, since by design conduit will not allow you to (for example) run `await` for up to a certain amount of time. You'll probably need to do this outside of your conduit chain, in the initial Source. It might look something like this:

yourSource = do
    mx <- timeout somePeriod myAction
    yield $ maybe Flush Chunk mx
    yourSource


On Sun, Feb 3, 2013 at 5:06 PM, Felipe Almeida Lessa <felipe.lessa <at> gmail.com> wrote:
I guess you could use the Flush datatype [1] depending on how your
data is generated.

Cheers,

[1] http://hackage.haskell.org/packages/archive/conduit/0.5.4.1/doc/html/Data-Conduit.html#t:Flush

On Fri, Feb 1, 2013 at 6:28 AM, Simon Marechal <simon <at> banquise.net> wrote:
> On 01/02/2013 08:21, Michael Snoyman wrote:
>> So you're saying you want to keep the same grouping that you had
>> originally? Or do you want to batch up a certain number of results?
>> There are lots of ways of approaching this problem, and the types don't
>> imply nearly enough to determine what you're hoping to achieve here.
>
> Sorry for not being clear. I would like to group them "as much as
> possible", that is up to a certain limit, and also within a "time
> threshold". I believe that the conduit code will be called only when
> something happens in the conduit, so an actual timer would be useless
> (unless I handle this at the source perhaps, and propagate "ticks").
>
> That is why in my first message I talked about stacking things into the
> list until the conduit has no more input available, or a maximum size is
> reached, but was not sure this even made sense.
>
> _______________________________________________
> Haskell-Cafe mailing list
> Haskell-Cafe <at> haskell.org
> http://www.haskell.org/mailman/listinfo/haskell-cafe



--
Felipe.

_______________________________________________
Haskell-Cafe mailing list
Haskell-Cafe <at> haskell.org
http://www.haskell.org/mailman/listinfo/haskell-cafe
Kevin Quick | 4 Feb 15:37 2013

Re: Yet another Conduit question

While on the subject of conduits and timing, I'm using the following  
conduit to add elapsed timing information:

timedConduit :: MonadResource m => forall l o u . Pipe l o o u m (u,  
NominalDiffTime)
timedConduit = bracketP getCurrentTime (\_ -> return ()) inner
     where inner st = do r <- awaitE
                         case r of
                           Right x -> yield x >> inner st
                           Left  r -> deltaTime st >>= \t -> return (r,t)
           deltaTime st = liftIO $ flip diffUTCTime st <$> getCurrentTime

I'm aware that this is primarily timing the downstream (and ultimately the  
Sink) more than the upstream, and I'm using the bracketP to attempt to  
delay the acquisition of the initial time (st) until the first downstream  
request for data.

I would appreciate any other insights regarding concerns, issues, or  
oddities that I might encounter with the above.

Thanks,
   Kevin

On Mon, 04 Feb 2013 02:25:11 -0700, Michael Snoyman <michael <at> snoyman.com>  
wrote:

> I think this is probably the right approach. However, there's something
> important to point out: flushing based on timing issues must be handled
> *outside* of the conduit functionality, since by design conduit will not
> allow you to (for example) run `await` for up to a certain amount of  
> time.
> You'll probably need to do this outside of your conduit chain, in the
> initial Source. It might look something like this:
>
> yourSource = do
>     mx <- timeout somePeriod myAction
>     yield $ maybe Flush Chunk mx
>     yourSource
>
>
> On Sun, Feb 3, 2013 at 5:06 PM, Felipe Almeida Lessa  
> <felipe.lessa <at> gmail.com
>> wrote:
>
>> I guess you could use the Flush datatype [1] depending on how your
>> data is generated.
>>
>> Cheers,
>>
>> [1]
>> http://hackage.haskell.org/packages/archive/conduit/0.5.4.1/doc/html/Data-Conduit.html#t:Flush
>>
>> On Fri, Feb 1, 2013 at 6:28 AM, Simon Marechal <simon <at> banquise.net>  
>> wrote:
>> > On 01/02/2013 08:21, Michael Snoyman wrote:
>> >> So you're saying you want to keep the same grouping that you had
>> >> originally? Or do you want to batch up a certain number of results?
>> >> There are lots of ways of approaching this problem, and the types  
>> don't
>> >> imply nearly enough to determine what you're hoping to achieve here.
>> >
>> > Sorry for not being clear. I would like to group them "as much as
>> > possible", that is up to a certain limit, and also within a "time
>> > threshold". I believe that the conduit code will be called only when
>> > something happens in the conduit, so an actual timer would be useless
>> > (unless I handle this at the source perhaps, and propagate "ticks").
>> >
>> > That is why in my first message I talked about stacking things into  
>> the
>> > list until the conduit has no more input available, or a maximum size  
>> is
>> > reached, but was not sure this even made sense.
>> >
>> > _______________________________________________
>> > Haskell-Cafe mailing list
>> > Haskell-Cafe <at> haskell.org
>> > http://www.haskell.org/mailman/listinfo/haskell-cafe
>>
>>
>>
>> --
>> Felipe.
>>

--

-- 
-KQ
Michael Snoyman | 4 Feb 16:13 2013

Re: Yet another Conduit question

Hmm, that's an interesting trick. I can't say that I ever thought bracketP would be used in that way. The only change I might recommend is using addCleanup[1] instead, which doesn't introduce the MonadResource constraint.



On Mon, Feb 4, 2013 at 4:37 PM, Kevin Quick <quick <at> sparq.org> wrote:
While on the subject of conduits and timing, I'm using the following conduit to add elapsed timing information:

timedConduit :: MonadResource m => forall l o u . Pipe l o o u m (u, NominalDiffTime)
timedConduit = bracketP getCurrentTime (\_ -> return ()) inner
    where inner st = do r <- awaitE
                        case r of
                          Right x -> yield x >> inner st
                          Left  r -> deltaTime st >>= \t -> return (r,t)
          deltaTime st = liftIO $ flip diffUTCTime st <$> getCurrentTime

I'm aware that this is primarily timing the downstream (and ultimately the Sink) more than the upstream, and I'm using the bracketP to attempt to delay the acquisition of the initial time (st) until the first downstream request for data.

I would appreciate any other insights regarding concerns, issues, or oddities that I might encounter with the above.

Thanks,
  Kevin


On Mon, 04 Feb 2013 02:25:11 -0700, Michael Snoyman <michael <at> snoyman.com> wrote:

I think this is probably the right approach. However, there's something
important to point out: flushing based on timing issues must be handled
*outside* of the conduit functionality, since by design conduit will not
allow you to (for example) run `await` for up to a certain amount of time.
You'll probably need to do this outside of your conduit chain, in the
initial Source. It might look something like this:

yourSource = do
    mx <- timeout somePeriod myAction
    yield $ maybe Flush Chunk mx
    yourSource


On Sun, Feb 3, 2013 at 5:06 PM, Felipe Almeida Lessa <felipe.lessa <at> gmail.com
wrote:

I guess you could use the Flush datatype [1] depending on how your
data is generated.

Cheers,

[1]
http://hackage.haskell.org/packages/archive/conduit/0.5.4.1/doc/html/Data-Conduit.html#t:Flush

On Fri, Feb 1, 2013 at 6:28 AM, Simon Marechal <simon <at> banquise.net> wrote:
> On 01/02/2013 08:21, Michael Snoyman wrote:
>> So you're saying you want to keep the same grouping that you had
>> originally? Or do you want to batch up a certain number of results?
>> There are lots of ways of approaching this problem, and the types don't
>> imply nearly enough to determine what you're hoping to achieve here.
>
> Sorry for not being clear. I would like to group them "as much as
> possible", that is up to a certain limit, and also within a "time
> threshold". I believe that the conduit code will be called only when
> something happens in the conduit, so an actual timer would be useless
> (unless I handle this at the source perhaps, and propagate "ticks").
>
> That is why in my first message I talked about stacking things into the
> list until the conduit has no more input available, or a maximum size is
> reached, but was not sure this even made sense.
>
> _______________________________________________
> Haskell-Cafe mailing list
> Haskell-Cafe <at> haskell.org
> http://www.haskell.org/mailman/listinfo/haskell-cafe



--
Felipe.



--
-KQ


_______________________________________________
Haskell-Cafe mailing list
Haskell-Cafe <at> haskell.org
http://www.haskell.org/mailman/listinfo/haskell-cafe

_______________________________________________
Haskell-Cafe mailing list
Haskell-Cafe <at> haskell.org
http://www.haskell.org/mailman/listinfo/haskell-cafe
Simon Marechal | 4 Feb 14:47 2013
Picon

Re: Yet another Conduit question

On 03/02/2013 16:06, Felipe Almeida Lessa wrote:
> I guess you could use the Flush datatype [1] depending on how your
> data is generated.

Thank you for this suggestion. I tried to do exactly this by modifying
my bulk Redis source so that it can timeout and send empty lists [1].
Then I wrote a few helpers conduits[2], such as :

concatFlush :: (Monad m) => Integer -> Conduit [a] m (Flush a)

which will convert a stream of [a] into a stream of (Flush a), sending
Flush whenever it encounters and empty list or it send a tunable amount
of data downstream.

I finally modified my examples [3]. I realized then it would be nice to
have fmap for conduits (but I am not sure how to write such a type
signature). Suggestions are welcome !

[1]
https://github.com/bartavelle/hslogstash/commit/663bf8f5e6058b476c9ed9b5c9cf087221b79b36
[2]
https://github.com/bartavelle/hslogstash/blob/master/Data/Conduit/Misc.hs
[3]
https://github.com/bartavelle/hslogstash/blob/master/examples/RedisToElasticsearch.hs
Michael Snoyman | 4 Feb 16:11 2013

Re: Yet another Conduit question




On Mon, Feb 4, 2013 at 3:47 PM, Simon Marechal <simon <at> banquise.net> wrote:
On 03/02/2013 16:06, Felipe Almeida Lessa wrote:
> I guess you could use the Flush datatype [1] depending on how your
> data is generated.

Thank you for this suggestion. I tried to do exactly this by modifying
my bulk Redis source so that it can timeout and send empty lists [1].
Then I wrote a few helpers conduits[2], such as :

concatFlush :: (Monad m) => Integer -> Conduit [a] m (Flush a)

which will convert a stream of [a] into a stream of (Flush a), sending
Flush whenever it encounters and empty list or it send a tunable amount
of data downstream.

I finally modified my examples [3]. I realized then it would be nice to
have fmap for conduits (but I am not sure how to write such a type
signature). Suggestions are welcome !

Actually `fmap` already exists on the Pipe datatype, it just probably doesn't do what you want. It modifies the return value, which is only relevant for Sinks.

What you probably are looking for is mapOutput[1].

Michael

 

[1]
https://github.com/bartavelle/hslogstash/commit/663bf8f5e6058b476c9ed9b5c9cf087221b79b36
[2]
https://github.com/bartavelle/hslogstash/blob/master/Data/Conduit/Misc.hs
[3]
https://github.com/bartavelle/hslogstash/blob/master/examples/RedisToElasticsearch.hs

_______________________________________________
Haskell-Cafe mailing list
Haskell-Cafe <at> haskell.org
http://www.haskell.org/mailman/listinfo/haskell-cafe

Gmane