Nicolas Trangez | 27 Nov 11:57 2012

Conduit and pipelined protocol processing using a threadpool

All,

I've written a library to implement servers for some protocol using
Conduit (I'll announce more details later).

The protocol supports pipelining, i.e. a client can send a 'command'
which contains some opaque 'handle' chosen by the client, the server
processes this command, then returns some reply which contains this
handle. The client is free to send other commands before receiving a
reply for any previous request, and the server can process these
commands in any order, sequential or concurrently.

The library is based on network-conduit's "Application" style [1], as
such now I write code like (OTOH)

> application :: AppData IO -> IO ()
> application client = appSource client $= handler $$ appSink client
>   where
>     handler = do
>         negotiateResult <- MyLib.negotiate
>         liftIO $ validateNegotiateResult negotiateResult
>         MyLib.sendInformation 123
>         loop
>
>    loop = do
>        command <- MyLib.getCommand
>        case command of
>            CommandA handle arg -> do
>                result <- liftIO $ doComplexProcessingA arg
>                MyLib.sendReply handle result
(Continue reading)

Michael Snoyman | 27 Nov 16:14 2012

Re: Conduit and pipelined protocol processing using a threadpool

I think the stm-conduit package[1] may be helpful for this use case. Each time you get a new command, you can fork a thread and give it the TBMChan to write to, and you can use sourceTBMChan to get a source to send to the client.



On Tue, Nov 27, 2012 at 12:57 PM, Nicolas Trangez <nicolas <at> incubaid.com> wrote:
All,

I've written a library to implement servers for some protocol using
Conduit (I'll announce more details later).

The protocol supports pipelining, i.e. a client can send a 'command'
which contains some opaque 'handle' chosen by the client, the server
processes this command, then returns some reply which contains this
handle. The client is free to send other commands before receiving a
reply for any previous request, and the server can process these
commands in any order, sequential or concurrently.

The library is based on network-conduit's "Application" style [1], as
such now I write code like (OTOH)

> application :: AppData IO -> IO ()
> application client = appSource client $= handler $$ appSink client
>   where
>     handler = do
>         negotiateResult <- MyLib.negotiate
>         liftIO $ validateNegotiateResult negotiateResult
>         MyLib.sendInformation 123
>         loop
>
>    loop = do
>        command <- MyLib.getCommand
>        case command of
>            CommandA handle arg -> do
>                result <- liftIO $ doComplexProcessingA arg
>                MyLib.sendReply handle result
>                loop
>            Disconnect -> return ()

This approach handles commands in-order, sequentially. Since command
processing can involve quite some IO operations to disk or network, I've
been trying to support pipelining on the server-side, but as of now I
was unable to get things working.

The idea would be to have a pool of worker threads, which receive work
items from some channel, then return any result on some other channel,
which should then be returned to the client.

This means inside "loop" I would have 2 sources: commands coming from
the client (using 'MyLib.getCommand :: MonadIO m => Pipe ByteString
ByteString o u m Command'), as well as command results coming from the
worker threads through the result channel. Whenever the first source
produces something, it should be pushed onto the work queue, and
whenever the second on yields some result it should be sent to the
client using 'MyLib.sendReply :: Monad m => Handle -> Result -> Pipe l i
ByteString u m ()'

I've been fighting this for a while and haven't managed to get something
sensible working. Maybe the design of my library is flawed, or maybe I'm
approaching the problem incorrectly, or ...

Has this ever been done before, or would anyone have some pointers how
to tackle this?

Thanks,

Nicolas

[1]
http://hackage.haskell.org/packages/archive/network-conduit/0.6.1.1/doc/html/Data-Conduit-Network.html#g:2



_______________________________________________
Haskell-Cafe mailing list
Haskell-Cafe <at> haskell.org
http://www.haskell.org/mailman/listinfo/haskell-cafe

_______________________________________________
Haskell-Cafe mailing list
Haskell-Cafe <at> haskell.org
http://www.haskell.org/mailman/listinfo/haskell-cafe
Nicolas Trangez | 27 Nov 18:25 2012

Re: Conduit and pipelined protocol processing using a threadpool

Michael,

On Tue, 2012-11-27 at 17:14 +0200, Michael Snoyman wrote:
> I think the stm-conduit package[1] may be helpful for this use case.
> Each time you get a new command, you can fork a thread and give it the
> TBMChan to write to, and you can use sourceTBMChan to get a source to
> send to the client.

That's +- what I had in mind. I did find stm-conduit before and did try
to get the thing working using it, but these attempts failed.

I attached an example which might clarify what I intend to do. I'm aware
it contains several potential bugs (leaking threads etc), but that's
beside the question ;-)

If only I could figure out what to put on the 3 lines of comment I left
in there...

Thanks for your help,

Nicolas

Attachment (pool.hs): text/x-haskell, 2408 bytes
_______________________________________________
Haskell-Cafe mailing list
Haskell-Cafe <at> haskell.org
http://www.haskell.org/mailman/listinfo/haskell-cafe
Michael Snoyman | 28 Nov 08:17 2012

Re: Conduit and pipelined protocol processing using a threadpool




On Tue, Nov 27, 2012 at 7:25 PM, Nicolas Trangez <nicolas <at> incubaid.com> wrote:
Michael,

On Tue, 2012-11-27 at 17:14 +0200, Michael Snoyman wrote:
> I think the stm-conduit package[1] may be helpful for this use case.
> Each time you get a new command, you can fork a thread and give it the
> TBMChan to write to, and you can use sourceTBMChan to get a source to
> send to the client.

That's +- what I had in mind. I did find stm-conduit before and did try
to get the thing working using it, but these attempts failed.

I attached an example which might clarify what I intend to do. I'm aware
it contains several potential bugs (leaking threads etc), but that's
beside the question ;-)

If only I could figure out what to put on the 3 lines of comment I left
in there...

Thanks for your help,

Nicolas


The issue is that you're trying to put everything into a single Conduit, which forces reading and writing to occur in a single thread of execution. Since you want your writing to be triggered by a separate event (data being available on the Chan), you're running into limitations.

The reason network-conduit provides a Source for the incoming data and a Sink for outgoing data is specifically to address your use case. You want to take the data from the Source and put it into the Chan in one thread, and take the data from the other Chan and put it into the Sink in a separate thread. Something like:

myApp appdata = do
    chan1 <- ...
    chan2 <- ...
    replicateM_ 5 $ forkIO $ worker chan1 chan2
    forkIO $ appSource appdata $$ sinkTBMChan chan1
    sourceTBMChan chan2 $$ appSink appdata

You'll also want to make sure to close chan1 and chan2 to make sure that your threads stop running.

Michael
_______________________________________________
Haskell-Cafe mailing list
Haskell-Cafe <at> haskell.org
http://www.haskell.org/mailman/listinfo/haskell-cafe
Nicolas Trangez | 28 Nov 10:21 2012

Re: Conduit and pipelined protocol processing using a threadpool

On Wed, 2012-11-28 at 09:17 +0200, Michael Snoyman wrote:
> 
> 
> 
> On Tue, Nov 27, 2012 at 7:25 PM, Nicolas Trangez
> <nicolas <at> incubaid.com> wrote:
>         Michael,
>         
>         On Tue, 2012-11-27 at 17:14 +0200, Michael Snoyman wrote:
>         > I think the stm-conduit package[1] may be helpful for this
>         use case.
>         > Each time you get a new command, you can fork a thread and
>         give it the
>         > TBMChan to write to, and you can use sourceTBMChan to get a
>         source to
>         > send to the client.
>         
>         
>         That's +- what I had in mind. I did find stm-conduit before
>         and did try
>         to get the thing working using it, but these attempts failed.
>         
>         I attached an example which might clarify what I intend to do.
>         I'm aware
>         it contains several potential bugs (leaking threads etc), but
>         that's
>         beside the question ;-)
>         
>         If only I could figure out what to put on the 3 lines of
>         comment I left
>         in there...
>         
>         Thanks for your help,
>         
>         Nicolas
>         
> 
> 
> The issue is that you're trying to put everything into a single
> Conduit, which forces reading and writing to occur in a single thread
> of execution. Since you want your writing to be triggered by a
> separate event (data being available on the Chan), you're running into
> limitations.
> 
> 
> The reason network-conduit provides a Source for the incoming data and
> a Sink for outgoing data is specifically to address your use case. You
> want to take the data from the Source and put it into the Chan in one
> thread, and take the data from the other Chan and put it into the Sink
> in a separate thread. Something like:
> 
> 
> myApp appdata = do
>     chan1 <- ...
>     chan2 <- ...
>     replicateM_ 5 $ forkIO $ worker chan1 chan2
>     forkIO $ appSource appdata $$ sinkTBMChan chan1
>     sourceTBMChan chan2 $$ appSink appdata
> 
> 
> You'll also want to make sure to close chan1 and chan2 to make sure
> that your threads stop running.

Thanks, I +- figured that out last night. Only thing left is managing
the handshake before forking off the workers, if possible (otherwise
things become very messy IMHO), but ResumableSources etc might be of use
here.

Maybe there's a library somewhere in here...

Thanks a bunch,

Nicolas

Gmane