Simon Marechal | 31 Jan 10:48 2013
Picon

"branching" conduits

Hello,

	I have found the Conduit abstraction to be very well suited to a set of
problems I am facing. I am however wondering how to implement
"branching" conduits, and even conduit pools.

	I am currently in the process of rewriting parts (the simple parts) of
the Logstash tool. There is a sample program that I use here:

https://github.com/bartavelle/hslogstash/blob/deprecateUtils/examples/RedisToElasticsearch.hs

	As it can be seen, it uses a "Redis" source, a conduit that decodes the
JSON ByteString into a LogstashMessage, a conduit that stores it into
Elasticsearch and outputs the result of that action as an Either, and
finally a sink that prints the errors.

	My problem is that I would like more complex behaviour. For example, I
would like to route messages to another server instead of putting them
into Elasticsearch when the LogstashMessage has some tag set. But this
is just an example, and it is probable I will want much more complex
behavior soon.

	I am not sure how to proceed from here, but have the following ideas:

 * investigate how the Conduits are made internally to see if I can
create a operator similar to $$, but that would have a signature like:
	Source m (Either a b) -> Sink a m r -> Sink b m r
and would do the branching in a binary fashion. I am not sure this is
even possible.

(Continue reading)

Michael Snoyman | 31 Jan 13:50 2013

Re: "branching" conduits




On Thu, Jan 31, 2013 at 11:48 AM, Simon Marechal <simon <at> banquise.net> wrote:
Hello,

        I have found the Conduit abstraction to be very well suited to a set of
problems I am facing. I am however wondering how to implement
"branching" conduits, and even conduit pools.

        I am currently in the process of rewriting parts (the simple parts) of
the Logstash tool. There is a sample program that I use here:

https://github.com/bartavelle/hslogstash/blob/deprecateUtils/examples/RedisToElasticsearch.hs

        As it can be seen, it uses a "Redis" source, a conduit that decodes the
JSON ByteString into a LogstashMessage, a conduit that stores it into
Elasticsearch and outputs the result of that action as an Either, and
finally a sink that prints the errors.

        My problem is that I would like more complex behaviour. For example, I
would like to route messages to another server instead of putting them
into Elasticsearch when the LogstashMessage has some tag set. But this
is just an example, and it is probable I will want much more complex
behavior soon.

        I am not sure how to proceed from here, but have the following ideas:

 * investigate how the Conduits are made internally to see if I can
create a operator similar to $$, but that would have a signature like:
        Source m (Either a b) -> Sink a m r -> Sink b m r
and would do the branching in a binary fashion. I am not sure this is
even possible.

 * create a "mvars" connectors constructor, which might have a signature
like this:

 Int -- ^ branch count
 (LogstashMessage -> Int) -- ^ branching function
 (Sink LogstashMessage m (), [Source m LogstashMessage])
 -- ^ a suitable sink, several sources for the other conduits

 it would internally create a MVar (Maybe LogstashMessage) for each
branch, and put putMVar accordingly to the branching function. When the
Conduit is destroyed, it will putMVar Nothing in all MVars.
 the sources would takeMVar, check if it is Nothing, or just proceed as
expected.

 The MVar should guarantee the constant space property, but there is the
risk of inter branch blocking when one of the branches is significantly
slower than the others. It doesn't really matter to me anyway. And all
the branch Sinks would have to have some synchronization mechanism so
that the main thread waits for them (as they are going to be launched by
a forkIO).



  This is the simplest scheme I have thought of, and it is probably not
a very good one. I am very interested in suggestions here.

_______________________________________________
Haskell-Cafe mailing list
Haskell-Cafe <at> haskell.org
http://www.haskell.org/mailman/listinfo/haskell-cafe


Hi Simon,

For your first approach, I think what you're looking to do is combine two Sinks together, something like:

combine :: Monad m
        => Sink i1 m r1
        -> Sink i2 m r2
        -> Sink (Either i1 i2) m (r1, r2)

Then you'd be able to use the standard $$ and =$ operators on it. I've put up an example implementation here[1]. The majority of the code is simple pattern matching on the different possible combination, but some things to point out:

* To simplify, we start off with a call to injectLeftovers. This means that we can entirely ignore the Leftover constructor in the main function.
* Since a Sink will never yield values, we can also ignore the HaveOutput constructor.
* As soon as either of the Sinks terminates, we terminate the other one as well and return the results.

You can also consider going the mutable container route if you like. Instead of creating a lot of stuff from scratch with MVars, you could use stm-conduit[2]. In fact, that package already contains some kind of merging behavior for sources, it might make sense to ask the author about including unmerging behavior for Sinks.

Michael

_______________________________________________
Haskell-Cafe mailing list
Haskell-Cafe <at> haskell.org
http://www.haskell.org/mailman/listinfo/haskell-cafe
Simon Marechal | 31 Jan 14:24 2013
Picon

Re: "branching" conduits

On 31/01/2013 13:50, Michael Snoyman wrote:
> * To simplify, we start off with a call to injectLeftovers. This means
> that we can entirely ignore the Leftover constructor in the main function.
> * Since a Sink will never yield values, we can also ignore the
> HaveOutput constructor.
> * As soon as either of the Sinks terminates, we terminate the other one
> as well and return the results.

Your gist is extremely informative to me. I figured it would be
something along these lines, but was very scared to try it myself. I
have however realized that my first use case doesn't cover my need, as I
will want to feed an arbitrary set of sinks with any value ...

I started coding right after I sent that mail and wrote this:
https://github.com/bartavelle/hslogstash/blob/master/Data/Conduit/Branching.hs

It is not very elegant as the "branching" functions outputs [Int].

I haven't tested it yet, but it should branch with any number of sinks.
Another point that might (or might not) be of interest is the
distribution of distinct branches on separate threads.

> You can also consider going the mutable container route if you like.
> Instead of creating a lot of stuff from scratch with MVars, you could
> use stm-conduit[2]. In fact, that package already contains some kind of
> merging behavior for sources, it might make sense to ask the author
> about including unmerging behavior for Sinks.

I did not think of bounded channels. They are a indeed a better match
than MVars !

I can see it uses resourceForkIO, which I believe is OK for sources that
will be used in your 'main' thread. But for multiple Sinks, you need a
way to wait for the all Sinks to terminate. I used stuff from
Control.Concurrent.ParallelIO, but I am not sure it is ideal.
Alexander V Vershilov | 31 Jan 17:31 2013
Picon

Re: "branching" conduits

Hello, Simon.

On 31 January 2013 17:24, Simon Marechal <simon <at> banquise.net> wrote:
> On 31/01/2013 13:50, Michael Snoyman wrote:
>> * To simplify, we start off with a call to injectLeftovers. This means
>> that we can entirely ignore the Leftover constructor in the main function.
>> * Since a Sink will never yield values, we can also ignore the
>> HaveOutput constructor.
>> * As soon as either of the Sinks terminates, we terminate the other one
>> as well and return the results.
>
> Your gist is extremely informative to me. I figured it would be
> something along these lines, but was very scared to try it myself. I
> have however realized that my first use case doesn't cover my need, as I
> will want to feed an arbitrary set of sinks with any value ...
>
> I started coding right after I sent that mail and wrote this:
> https://github.com/bartavelle/hslogstash/blob/master/Data/Conduit/Branching.hs
>
> It is not very elegant as the "branching" functions outputs [Int].
>
> I haven't tested it yet, but it should branch with any number of sinks.
> Another point that might (or might not) be of interest is the
> distribution of distinct branches on separate threads.
>
>> You can also consider going the mutable container route if you like.
>> Instead of creating a lot of stuff from scratch with MVars, you could
>> use stm-conduit[2]. In fact, that package already contains some kind of
>> merging behavior for sources, it might make sense to ask the author
>> about including unmerging behavior for Sinks.
>
> I did not think of bounded channels. They are a indeed a better match
> than MVars !
>
> I can see it uses resourceForkIO, which I believe is OK for sources that
> will be used in your 'main' thread. But for multiple Sinks, you need a
> way to wait for the all Sinks to terminate. I used stuff from
> Control.Concurrent.ParallelIO, but I am not sure it is ideal.
>

Sorry I've sent my first email of the list, and have no copy to resend it.
If you will use stm-conduit you can meet next troubles:

1). early close: if you'll use modern conduit API (yield, await), then try
to use $$ on sinkT?Chan very accurate as it will close Channel and so
receiver will be also closed.

2). late close: if you'll use resourceForkIO you channel may leave to
long, hovewer in much cases you'll be save with forkIO, unless you
share resources that was allocated with resourcet.

3). pipeline notifying: if you'll map you branches to different threads
you'll need a way to notify that branch is closed (closing channels will
be good, but you'll need an additional steps to check if they were closed
and either close all pipeline or just forgot this channel). On the other hand
 you can use one pipeline (and thread) for every branch, then you need
approach that was shown in Michael's gist. Run each downstream pipe,
and save a result, possibly modifying a list.

I'll paste concept that I mailed to you here hovewer in may be not accurate:

branch :: [a -> m ()] -- it's better to place sink here
branch fs = do chs <- mapM (\f -> (,) f <$> newTBMChanIO) fs
                      bracket (mapM (\(f,ch) ->  forkIO $
sourceTBMChan ch $$ f) chs)
                                  (mapM_ killThread)
                                  (\_ -> do x <- await
                                               mapM_ (flip
writeTBMChan x . snd) chs)

as an additional step you can call isTBMChanClosed on channel to check if branch
is still alive and filter channel if so. (Previous version contained a
filter function that
allowes to send value to branches that needs it, hovewer it's less composable)

also you may like iochan-conduit package [1], it can give a better
results in some cases
hovewer, stm-channels is much more general.

[1] https://github.com/qnikst/iochan-conduit
--

-- 
Alexander Vershilov

Gmane