EzDevInfo.com

conduit

Middleware for PHP

One processing conduit, 2 IO sources of the same type

In my GHC Haskell application utilizing stm, network-conduit and conduit, I have a strand for each socket which is forked automatically using runTCPServer. Strands can communicate with other strands through the use of a broadcasting TChan.

This showcases how I would like to set up the conduit "chain":

enter image description here

So, what we have here is two sources (each bound to helper conduits which) which produce a Packet object which encoder will accept and turn into ByteString, then send out the socket. I've had a great amount of difficulty with the efficient (performance is a concern) fusing of the two inputs.

I would appreciate if somebody could point me in the right direction.


Since it would be rude of me to post this question without making an attempt, I'll put what I've previously tried here;

I've written/cherrypicked a function which (blocking) produces a Source from a TMChan (closeable channel);

-- | Takes a generic type of STM chan and, given read and close functionality,
--   returns a conduit 'Source' which consumes the elements of the channel.
chanSource 
    :: (MonadIO m, MonadSTM m)
    => a                    -- ^ The channel
    -> (a -> STM (Maybe b)) -- ^ The read function
    -> (a -> STM ())        -- ^ The close/finalizer function
    -> Source m b
chanSource ch readCh closeCh = ConduitM pull
    where close     = liftSTM $ closeCh ch
          pull      = PipeM $ liftSTM $ readCh ch >>= translate
          translate = return . maybe (Done ()) (HaveOutput pull close)

Likewise, a function to transform a Chan into a sink;

-- | Takes a stream and, given write and close functionality, returns a sink
--   which wil consume elements and broadcast them into the channel 
chanSink
    :: (MonadIO m, MonadSTM m)
    => a                 -- ^ The channel
    -> (a -> b -> STM()) -- ^ The write function
    -> (a -> STM())      -- ^ The close/finalizer function
    -> Sink b m ()
chanSink ch writeCh closeCh = ConduitM sink
    where close  = const . liftSTM $ closeCh ch
          sink   = NeedInput push close
          write  = liftSTM . writeCh ch
          push x = PipeM $ write x >> return sink

Then mergeSources is straightforward; fork 2 threads (which I really don't want to do, but what the heck) which can put their new items into the one list which I then produce a source of;

-- | Merges a list of 'Source' objects, sinking them into a 'TMChan' and returns
--   a source which consumes the elements of the channel.
mergeSources
    :: (MonadIO m, MonadBaseControl IO m, MonadSTM m)
    => [Source (ResourceT m) a]             -- ^ The list of sources
    -> ResourceT m (Source (ResourceT m) a)
mergeSources sx = liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn
    where push c s = s $$ chanSink c writeTMChan closeTMChan
          fsrc x c = mapM_ (\s -> resourceForkIO $ push c s) x
          retn c   = return $ chanSource c readTMChan closeTMChan

While I was successful in making these functions typecheck, I was unsuccessful in getting any utilization of these functions to typecheck;

-- | Helper which represents a conduit chain for each client connection
serverApp :: Application SessionIO
serverApp appdata = do
    use ssBroadcast >>= liftIO . atomically . dupTMChan >>= assign ssBroadcast
    -- appSource appdata $$ decoder $= protocol =$= encoder =$ appSink appdata
    mergsrc $$ protocol $= encoder =$ appSink appdata
    where chansrc = chanSource (use ssBroadcast) readTMChan closeTMChan
          mergsrc = mergeSources [appSource appdata $= decoder, chansrc]

-- | Structure which holds mutable information for clients
data SessionState = SessionState
    { _ssBroadcast     :: TMChan Packet -- ^ Outbound packet broadcast channel
    }

makeLenses ''SessionState

-- | A transformer encompassing both SessionReader and SessionState
type Session m = ReaderT SessionReader (StateT SessionState m)

-- | Macro providing Session applied to an IO monad
type SessionIO = Session IO

I see this method as being flawed anyhow -- there are many intermediate lists and conversions. This can not be good for performance. Seeking guidance.


PS. From what I can understand, this is not a duplicate of; Fusing conduits with multiple inputs , as in my situation both sources produce the same type and I don't care from which source the Packet object is produced, as long as I'm not waiting on one while another has objects ready to be consumed.

PPS. I apologize for the usage (and therefore requirement of knowledge) of Lens in example code.


Source: (StackOverflow)

What are the pros and cons of Enumerators vs. Conduits vs. Pipes?

I'd like to hear from someone with a deeper understanding than myself what the fundamental differences are between Enumerators, Conduits, and Pipes as well as the key benefits and drawbacks. Some discussion's already ongoing but it'd be nice to have a high-level overview.


Source: (StackOverflow)

Advertisements

What's the "easier way" that deprecates the need for Data.Conduit.Util's zip?

Getting started with conduit, and I noticed that in Data.Conduit.Util:

Utility functions from older versions of conduit. These should be considered deprecated, as there are now easier ways to handle their use cases. This module is provided solely for backwards compatibility.

Of particular concern to me is zip :: Monad m => Source m a -> Source m b -> Source m (a, b). This seems pretty useful to me - I could independently work on a way of producing as and a way of producing bs, then just zip them together when I need them, rather than have to mix concerns earlier in the process.

But, like I said, I'm new in the ways of conduit, so I'm ignorant. What are these "easier ways to handle their use cases"?


Source: (StackOverflow)

What's the real benefit of conduit's upstream type parameter?

I'm trying to understand the differences between different implementations of the concept of pipes. One of the differences between conduit and pipes is how they fuse pipes together. Conduit has

(>+>) :: Monad m
      => Pipe l a b r0 m r1 -> Pipe Void b c r1 m r2 -> Pipe l a c r0 m r2

while pipes have

(>->) :: (Monad m, Proxy p)
      => (b' -> p a' a b' b m r) -> (c' -> p b' b c' c m r) -> c' -> p a' a c' c m r

If I understand it correctly, with pipes, when any pipe of the two stops, its result is returned and the other one is stopped. With conduit, if the left pipe finished, its result is sent downstream to the right pipe.

I wonder, what is the benefit of conduit's approach? I'd like to see some example (preferably real-world) which is easy to implement using conduit and >+>, but hard(er) to implement using pipes and >->.


Source: (StackOverflow)

Why does this cause a memory leak in the Haskell Conduit library?

I have a conduit pipeline processing a long file. I want to print a progress report for the user every 1000 records, so I've written this:

-- | Every n records, perform the IO action.
-- Used for progress reports to the user.
progress :: (MonadIO m) => Int -> (Int -> i -> IO ()) -> Conduit i m i
progress n act = skipN n 1
   where
      skipN c t = do
         mv <- await
         case mv of
            Nothing -> return ()
            Just v ->
               if c <= 1
                  then do
                     liftIO $ act t v
                     yield v
                     skipN n (succ t)
                  else do
                     yield v
                     skipN (pred c) (succ t)

No matter what action I call this with, it leaks memory, even if I just tell it to print a full stop.

As far as I can see the function is tail recursive and both counters are regularly forced (I tried putting "seq c" and "seq t" in, to no avail). Any clue?

If I put in an "awaitForever" that prints a report for every record then it works fine.

Update 1: This occurs only when compiled with -O2. Profiling indicates that the leaking memory is allocated in the recursive "skipN" function and being retained by "SYSTEM" (whatever that means).

Update 2: I've managed to cure it, at least in the context of my current program. I've replaced the function above with this. Note that "proc" is of type "Int -> Int -> Maybe i -> m ()": to use it you call "await" and pass it the result. For some reason swapping over the "await" and "yield" solved the problem. So now it awaits the next input before yielding the previous result.

-- | Every n records, perform the monadic action. 
-- Used for progress reports to the user.
progress :: (MonadIO m) => Int -> (Int -> i -> IO ()) -> Conduit i m i
progress n act = await >>= proc 1 n
   where
      proc c t = seq c $ seq t $ maybe (return ()) $ \v ->
         if c <= 1
            then {-# SCC "progress.then" #-} do
               liftIO $ act t v
               v1 <- await
               yield v
               proc n (succ t) v1
            else {-# SCC "progress.else" #-} do
               v1 <- await
               yield v
               proc (pred c) (succ t) v1

So if you have a memory leak in a Conduit, try swapping the yield and await actions.


Source: (StackOverflow)

Is there any difference between "MonadIO m" and "MonadBaseControl IO m"?

Function runTCPClient from network-conduit has the following signature:

runTCPClient :: (MonadIO m, MonadBaseControl IO m)
             => ClientSettings m -> Application m -> m ()

MonadIO m provides

liftIO :: IO a -> m a

and MonadBaseControl IO m provides

liftBase :: IO a -> m a

There is no visible difference. Do they provide the same functionality? If yes, why the duplication in the type signature? If not, what's the difference?


Source: (StackOverflow)

Is it safe to reuse a conduit?

Is it safe to perform multiple actions using the same conduit value? Something like

do
  let sink = sinkSocket sock

  something $$ sink
  somethingElse $$ sink

I recall that in the early versions of conduit there were some dirty hacks that made this unsafe. What's the current status?

(Note that sinkSocket doesn't close the socket.)


Source: (StackOverflow)

Http-Conduit frequent connection failures

I am writing application which will download some files by HTTP. Up to some point I was using following code snippet to download page body:

import network.HTTP
simpleHTTP (getRequest "http://www.haskell.org/") >>= getResponseBody

It was working fine but it could not establish connection by HTTPS protocol. So to fix this I have switched to HTTP-Conduit and now I am using following code:

simpleHttp' :: Manager -> String -> IO (C.Response LBS.ByteString)
simpleHttp' manager url = do
     request <- parseUrl url
     runResourceT $ httpLbs request manager

It can connect to HTTPS but new frustrating problem appeared. About every fifth connection fails with exception:

getpics.hs: FailedConnectionException "i.imgur.com" 80

I am convinced that this is HTTP-Conduit problem because network.HTTP was working fine on same set of pages (excluding https pages).

Have anybody met such problem and know solution or better (and simple because this is simple task which should not take more than few lines of code) alternative to Conduit library?


Source: (StackOverflow)

Haskell http-conduit web-scraping daemon crashes with out of memory error

I've written a daemon in Haskell that scrapes information from a webpage every 5 minutes.

The daemon originally ran fine for about 50 minutes, but then it unexpectedly died with out of memory (requested 1048576 bytes). Every time I ran it it died after the same amount of time. Setting it to sleep only 30 seconds, it instead died after 8 minutes.

I realized the code to scrape the website was incredibly memory inefficient (going from about 30M while sleeping to 250M while parsing 9M of html), so I rewrote it so that now it only uses about 15M extra while parsing. Thinking the problem was fixed, I ran the daemon overnight and when I woke up it was actually using less memory than it was that night. I thought I was done, but roughly 20 hours after it had started, it had crashed with the same error.

I started looking into ghc profiling but I wasn't able to get that to work. Next I started messing with rts options, and I tried setting -H64m to set the default heap size to be larger than my program was using, and also using -Ksize to shrink the maximum size of the stack to see if that would make it crash sooner.

Despite every change I've made, the daemon still seems to crash after a constant number of iterations. Making the parsing more memory efficient made this value higher, but it still crashes. This doesn't make sense to me because none of these have runs have even come close to using all of my memory, much less swap space. The heap size is supposed to be unlimited by default, shrinking the stack size didn't make a difference, and all my ulimits are either unlimited or significantly higher than what the daemon is using.

In the original code I pinpointed the crash to somewhere in the html parsing, but I haven't done the same for the more memory efficient version because 20 hours takes so long to run. I don't know if this would even be useful to know because it doesn't seem like any specific part of the program is broken because it run successfully for dozens of iterations before crashing.

Out of ideas, I even looked through the ghc source code for this error, and it appears to be a failed call to mmap, which wasn't very helpful to me because I assume that isn't the root of the problem.

(Edit: code rewritten and moved to end of post)

I'm pretty new at Haskell, so I'm hoping this is some quirk of lazy evaluation or something else that has a quick fix. Otherwise, I'm fresh out of ideas.

I'm using GHC version 7.4.2 on FreeBsd 9.1

Edit:

Replacing the downloading with static html got rid of the problem, so I've narrowed it down to how I'm using http-conduit. I've edited the code above to include my networking code. The hackage docs mention to share a manager so I've done that. And it also says that for http you have to explicitly close connections, but I don't think I need to do that for httpLbs.

Here's my code.

import Control.Monad.IO.Class (liftIO)
import qualified Data.Text as T
import qualified Data.ByteString.Lazy as BL
import Text.Regex.PCRE
import Network.HTTP.Conduit

main :: IO ()
main = do
    manager <- newManager def
    daemonLoop manager

daemonLoop :: Manager -> IO ()
daemonLoop manager = do
    rows <- scrapeWebpage manager
    putStrLn $ "number of rows parsed: " ++ (show $ length rows)
    doSleep
    daemonLoop manager

scrapeWebpage :: Manager -> IO [[BL.ByteString]]
scrapeWebpage manager = do
    putStrLn "before makeRequest"
    html <- makeRequest manager
    -- Force evaluation of html.
    putStrLn $ "html length: " ++ (show $ BL.length html)
    putStrLn "after makeRequest"
    -- Breaks ~10M html table into 2d list of bytestrings.
    -- Max memory usage is about 45M, which is about 15M more than when sleeping.
    return $ map tail $ html =~ pattern
    where
        pattern :: BL.ByteString
        pattern = BL.concat $ replicate 12 "<td[^>]*>([^<]+)</td>\\s*"

makeRequest :: Manager -> IO BL.ByteString
makeRequest manager = runResourceT $ do
    defReq <- parseUrl url
    let request = urlEncodedBody params $ defReq
                    -- Don't throw errors for bad statuses.
                    { checkStatus = \_ _ -> Nothing
                    -- 1 minute.
                    , responseTimeout = Just 60000000
                    }
    response <- httpLbs request manager
    return $ responseBody response

and it's output:

before makeRequest
html length: 1555212
after makeRequest
number of rows parsed: 3608
...
before makeRequest
html length: 1555212
after makeRequest
bannerstalkerd: out of memory (requested 2097152 bytes)

Getting rid of the regex computations fixed the problem, but it seems that the error happens after the networking and during the regex, presumably because of something I'm doing wrong with http-conduit. Any ideas?

Also, when I try to compile with profiling enabled I get this error:

Could not find module `Network.HTTP.Conduit'
Perhaps you haven't installed the profiling libraries for package `http-conduit-1.8.9'?

Indeed, I have not installed profiling libraries for http-conduit and I don't know how.


Source: (StackOverflow)

Rechunk a conduit into larger chunks using combinators

I am trying to construct a Conduit that receives as input ByteStrings (of around 1kb per chunk in size) and produces as output concatenated ByteStrings of 512kb chunks.

This seems like it should be simple to do, but I'm having a lot of trouble, most of the strategies I've tried using have only succeeded in dividing the chunks into smaller chunks, I haven't succeeded in concatenating larger chunks.

I started out trying isolate, then takeExactlyE and eventually conduitVector, but to no avail. Eventually I settled on this:

import qualified Data.Conduit               as C
import qualified Data.Conduit.Combinators   as C
import qualified Data.ByteString            as B
import qualified Data.ByteString.Lazy       as BL

chunksOfAtLeast :: Monad m => Int -> C.Conduit B.ByteString m BL.ByteString
chunksOfAtLeast chunkSize = loop BL.empty chunkSize
  where 
    loop buffer n = do
      mchunk <- C.await
      case mchunk of 
        Nothing -> 
          -- Yield last remaining bytes
          when (n < chunkSize) (C.yield buffer)
        Just chunk -> do
          -- Yield when the buffer has been filled and start over
          let buffer' = buffer <> BL.fromStrict chunk
              l       = B.length chunk
          if n <= l
          then C.yield buffer' >> loop BL.empty chunkSize
          else loop buffer' (n - l)

P.S. I decided not to split larger chunks for this function, but this was just a convenient simplification.

However, this seems very verbose given all the conduit functions that deal with chunking[1,2,3,4]. Please help! There must surely be a better way to do this using combinators, but I am missing some piece of intuition!

P.P.S. Is it ok to use lazy bytestring for the buffer as I've done? I'm a bit unclear about the internal representation for bytestring and whether this will help, especially since I'm using BL.length which I guess might evaluate the thunk anyway?


Conclusion

Just to elaborate on Michael's answer and comments, I ended up with this conduit:

import qualified Data.Conduit               as C
import qualified Data.Conduit.Combinators   as C
import qualified Data.ByteString            as B
import qualified Data.ByteString.Lazy       as BL

-- | "Strict" rechunk of a chunked conduit
chunksOfE' :: (MonadBase base m, PrimMonad base) 
         => Int 
         -> C.Conduit ByteString m ByteString
chunksOfE' chunkSize = C.vectorBuilder chunkSize C.mapM_E =$= C.map fromByteVector

My understanding is that vectorBuilder will pay the cost for concatenating the smaller chunks early on, producing the aggregated chunks as strict bytestrings.

From what I can tell, an alternative implementation that produces lazy bytestring chunks (i.e. "chunked chunks") might be desirable when the aggregated chunks are very large and/or feed into a naturally streaming interface like a network socket. Here's my best attempt at the "lazy bytestring" version:

import qualified Data.Sequences.Lazy        as SL
import qualified Data.Sequences             as S
import qualified Data.Conduit.List          as CL

-- | "Lazy" rechunk of a chunked conduit
chunksOfE :: (Monad m, SL.LazySequence lazy strict)
          => S.Index lazy
          -> C.Conduit strict m lazy
chunksOfE chunkSize = CL.sequence C.sinkLazy =$= C.takeE chunkSize

Source: (StackOverflow)

Sequential Binary Data Decoding Using Conduits

The goal is to have a conduit with the following type signature

protobufConduit :: MonadResource m => (ByteString -> a) -> Conduit ByteString m a

The conduit should repeatedly parse protocol buffers (using the ByteString -> a function) received via TCP/IP (using the network-conduit package).

The wire message format is

{length (32 bits big endian)}{protobuf 1}{length}{protobuf 2}...

(The curly braces are not party of the protocol, only used here to separate the entities).

The first idea was to use sequenceSink to repeatedly apply a Sink that is able to parse one ProtoBuf:

[...]
import qualified Data.Binary         as B
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.Util   as CU

protobufConduit :: MonadResource m => (ByteString -> a) -> Conduit ByteString m a
protobufConduit protobufDecode =
    CU.sequenceSink () $ \() ->
        do lenBytes <- CB.take 4                                -- read protobuf length
           let len :: Word32
               len = B.decode lengthBytes                       -- decode ProtoBuf length
               intLen = fromIntegral len
           protobufBytes <- CB.take intLen                      -- read the ProtoBuf bytes
           return $ CU.Emit () [ protobufDecode protobufBytes ] -- emit decoded ProtoBuf

It doens't work (only works for the first protocol buffer) because there seems to be a number of "leftover" bytes already read from the source but not consumed via CB.take that get discarded.

And I found no way of pushing "the rest back into the source".

Did I get the concept entirely wrong?

PS: Even if I use protocol buffers here, the problem is not related to protocol buffers. To debug the problem I always use {length}{UTF8 encoded string}{length}{UTF8 encoded string}... and a conduit similar to the above one (utf8StringConduit :: MonadResource m => Conduit ByteString m Text).

Update:

I just tried to replace the state (no state () in the sample above) by the remaining bytes and replaced the CB.take calls by calls to a function that first consumes the already read bytes (from the state) and calls await only as needed (when the state is not large enough). Unfortunately, that doesn't work either because as soon as the Source has no bytes left, sequenceSink does not execute the code but the state still contains the remaining bytes :-(.

If you should be interested in the code (which isn't optimized or very good but should be enough to test):

utf8StringConduit :: forall m. MonadResource m => Conduit ByteString m Text
utf8StringConduit =
    CU.sequenceSink [] $ \st ->
        do (lengthBytes, st') <- takeWithState BS.empty st 4
           let len :: Word32
               len = B.decode $ BSL.fromChunks [lengthBytes]
               intLength = fromIntegral len
           (textBytes, st'') <- takeWithState BS.empty st' intLength
           return $ CU.Emit st'' [ TE.decodeUtf8 $ textBytes ]

takeWithState :: Monad m
              => ByteString
              -> [ByteString]
              -> Int
              -> Pipe l ByteString o u m (ByteString, [ByteString])
takeWithState acc state 0 = return (acc, state)
takeWithState acc state neededLen =
    let stateLenSum = foldl' (+) 0 $ map BS.length state
     in if stateLenSum >= neededLen
           then do let (firstChunk:state') = state
                       (neededChunk, pushBack) = BS.splitAt neededLen firstChunk
                       acc' = acc `BS.append` neededChunk
                       neededLen' = neededLen - BS.length neededChunk
                       state'' = if BS.null pushBack
                                    then state'
                                    else pushBack:state'
                   takeWithState acc' state'' neededLen'
           else do aM <- await
                   case aM of
                     Just a -> takeWithState acc (state ++ [a]) neededLen
                     Nothing -> error "to be fixed later"

Source: (StackOverflow)

GHC rewrite rules with class constraints

I've added the following rewrite rule to conduit without issue:

{-# RULES "ConduitM: lift x >>= f" forall m f.
    lift m >>= f = ConduitM (PipeM (liftM (unConduitM . f) m))
  #-}

I'm trying to add a similar rewrite rules for liftIO as well

{-# RULES "ConduitM: liftIO x >>= f" forall m f.
    liftIO m >>= f = ConduitM (PipeM (liftM (unConduitM . f) (liftIO m)))
  #-}

However, when I try to do so, I get the following error messages from GHC:

Data/Conduit/Internal/Conduit.hs:1025:84:
    Could not deduce (Monad m) arising from a use of ‘liftM’
    from the context (Monad (ConduitM i o m), MonadIO (ConduitM i o m))
      bound by the RULE "ConduitM: liftIO x >>= f"
      at Data/Conduit/Internal/Conduit.hs:1025:11-118
    Possible fix:
      add (Monad m) to the context of the RULE "ConduitM: liftIO x >>= f"
    In the first argument of ‘PipeM’, namely
      ‘(liftM (unConduitM . f) (liftIO m))’
    In the first argument of ‘ConduitM’, namely
      ‘(PipeM (liftM (unConduitM . f) (liftIO m)))’
    In the expression:
      ConduitM (PipeM (liftM (unConduitM . f) (liftIO m)))

Data/Conduit/Internal/Conduit.hs:1025:108:
    Could not deduce (MonadIO m) arising from a use of ‘liftIO’
    from the context (Monad (ConduitM i o m), MonadIO (ConduitM i o m))
      bound by the RULE "ConduitM: liftIO x >>= f"
      at Data/Conduit/Internal/Conduit.hs:1025:11-118
    Possible fix:
      add (MonadIO m) to the context of
        the RULE "ConduitM: liftIO x >>= f"
    In the second argument of ‘liftM’, namely ‘(liftIO m)’
    In the first argument of ‘PipeM’, namely
      ‘(liftM (unConduitM . f) (liftIO m))’
    In the first argument of ‘ConduitM’, namely
      ‘(PipeM (liftM (unConduitM . f) (liftIO m)))’

I'm unaware of any syntax that would let me specify such context to a rewrite rule. Is there a way to achieve this?


Source: (StackOverflow)

Conduit Broadcast

A view days ago, I asked this question. Now I need a pure single threaded version of this function:

To repeat, I need a function that sends each received value to each sink and collects their results. The type signature of the function should be something like this:

broadcast :: [Sink a m b] -> Sink a m [b]

Best Sven


P.S. It is not sequence, I've tried that:

> C.sourceList [1..100] $$ sequence [C.fold (+) 0, C.fold (+) 0]
[5050, 0]

expected result:

[5050, 5050]

P.P.S. zipSinks gives the desired result, but it works just with tuples:

> C.sourceList [1..100] $$ C.zipSinks (C.fold (+) 0) (C.fold (+) 0)
(5050, 5050)

Source: (StackOverflow)

How do I make a "branched" Conduit?

I want the same data to be split in two "branches" to be processed separately, then "joined"...

                                +----------+
                +---------+  -->| doublber |---   +--------+
   +--------+   |         |--   +----------+   -->|        |   +------+
   | source |-->| splitter|                       | summer |-->| sink |
   +--------+   |         |--   +----------+   -->|        |   +------+
                +---------+  -->| delayer  |---   +--------+
                                +----------+

How should I do this?

My attempt:

import Data.Conduit
import Control.Monad.IO.Class
import qualified Data.Conduit.List as CL
-- import Data.Conduit.Internal (zipSources)
import Control.Arrow ((>>>))

source :: Source IO Int
source = do
    x <- liftIO $ getLine
    yield (read x)
    source

splitter :: Conduit Int IO (Int, Int)
splitter = CL.map $ \x -> (x,x)

doubler = CL.map (* 2)

delayer :: Conduit Int IO Int
delayer = do
    yield 0
    CL.map id

twoConduitBranches :: Monad m => Conduit a m b -> Conduit c m d -> Conduit (a,b) m (c,d)
twoConduitBranches q w = awaitForever $ \(x, y) -> do
    out1 <- undefined q x
    out2 <- undefined w y
    yield (out1, out2)


summer :: Conduit (Int,Int) IO Int
summer = CL.map $ \(x,y) -> x + y

sink :: Sink Int IO ()
sink = CL.mapM_ (show >>> putStrLn) 

-- combosrc = zipSources (source $= delayer) (source $= doubler)
main = source $= splitter $= twoConduitBranches doubler delayer $= summer $$ sink

What shall I write in place of the undefineds?


Source: (StackOverflow)

What is the correct way of cleaning up resources using ResourceT?

I've been playing around with conduit-extra's UNIX package, which basically allows for an easy creation of a server using UNIX domain sockets, specifically using the runUnixServer funciton.

The problem is that after the function exists it doesn't cleanup the socket file, which means it needs to be cleaned up manually. Here's a simple example, which basically creates an echo server.

main :: IO ()
main = do
  let settings = serverSettings "foobar.sock"
  runUnixServer settings (\ad -> (appSource ad) $$ (appSink ad))

I've google around a bit and found that the correct way to handle the resources here is by using the resourcet package. Though the problem is that most of the APIs in resources expect me to allocate the resource myself, which isn't the case of runUnixSever, which doesn't return anyhting.

At first I thought I could use register, to register a function that removes the file, such as the following

main :: IO ()
main = runResourceT $ do
  register $ removeLink "foobar.sock"
  let settings = serverSettings "foobar.sock"
  liftIO $ runUnixServer settings (\ad -> (appSource ad) $$ (appSink ad))

There's a problem with this approach though, at least as far as the documentation for allocate says:

This is almost identical to calling the allocation and then registering the release action, but this properly handles masking of asynchronous exceptions.

Does this mean that register in itself doesn't handle asynchronous exceptions? If so, could that be a problem when one of the handlers spawned by the runUnixServer (docs say it spawns a thread for each client) raises an error?

A third and final solution that I came up with is by using allocate, in order to make sure that the asynchronous exceptions are handled properly (I'm not sure if it is really necessary in this case).

main :: IO ()
main = runResourceT $ do
  allocate (return 1) (const $ removeLink "foobar.sock")
  let settings = serverSettings "foobar.sock"
  liftIO $ runUnixServer settings (\ad -> (appSource ad) $$ (appSink ad))

But is this really the best solution? Since I'm creating a value which I'll never use (return 1) and then using a const function to ignore that value in the finalizer.


Source: (StackOverflow)