diff --git a/src/Streamly/Internal/Data/Fold.hs b/src/Streamly/Internal/Data/Fold.hs index 3ffb083c0f..4881cb654e 100644 --- a/src/Streamly/Internal/Data/Fold.hs +++ b/src/Streamly/Internal/Data/Fold.hs @@ -1261,8 +1261,14 @@ toParallelSVar svar winfo = Fold step initial extract step () x = liftIO $ do -- XXX we can have a separate fold for unlimited buffer case to avoid a -- branch in the step here. - decrementBufferLimit svar - void $ send svar (ChildYield x) + case maxBufferLimit svar of + BufferUnlimited -> + void $ send svar (ChildYield x) + BufferLast -> + void $ sendReplace svar (ChildYield x) + BufferLimited _ policy -> do + decrementBufferLimit svar policy + void $ send svar (ChildYield x) extract () = liftIO $ do sendStop svar winfo @@ -1279,8 +1285,14 @@ toParallelSVarLimited svar winfo = Fold step initial extract yieldLimitOk <- decrementYieldLimit svar if yieldLimitOk then do - decrementBufferLimit svar - void $ send svar (ChildYield x) + case maxBufferLimit svar of + BufferUnlimited -> + void $ send svar (ChildYield x) + BufferLast -> + void $ sendReplace svar (ChildYield x) + BufferLimited _ policy -> do + decrementBufferLimit svar policy + void $ send svar (ChildYield x) return True else do cleanupSVarFromWorker svar diff --git a/src/Streamly/Internal/Data/SVar.hs b/src/Streamly/Internal/Data/SVar.hs index eb5f21cf00..db06078b9b 100644 --- a/src/Streamly/Internal/Data/SVar.hs +++ b/src/Streamly/Internal/Data/SVar.hs @@ -31,12 +31,15 @@ module Streamly.Internal.Data.SVar -- State threaded around the stream , Limit (..) + , BufferStyle (..) , State (streamVar) , defState , adaptState , getMaxThreads , setMaxThreads , getMaxBuffer + , getBufferStyle + , setBufferStyle , setMaxBuffer , getStreamRate , setStreamRate @@ -62,6 +65,7 @@ module Streamly.Internal.Data.SVar , ChildEvent (..) , AheadHeapEntry (..) , send + , sendReplace , sendToProducer , sendYield , sendStop @@ -387,11 +391,27 @@ data SVarStopStyle = -- XXX Maybe we can separate the implementation in two different types instead -- of using a common SVar type. -- -data PushBufferPolicy = +data BufferOverflowPolicy = PushBufferDropNew -- drop the latest element and continue | PushBufferDropOld -- drop the oldest element and continue | PushBufferBlock -- block the thread until space -- becomes available + | PushBufferToFile String -- Append the buffer to a file on disk + -- The String is the filename prefix, two files + -- are used, 1 and 2. While + -- the consumer is consuming from one file the + -- producers are writing to the other file. The + -- current Index ownership is maintained in the + -- SVar. + deriving (Show) + +-- XXX in general, instead of just the last event we can store last N events in +-- the SVar, we can have a BufferLastN case. +data BufferStyle + = BufferUnlimited + | BufferLast -- Buffer only the latest element + | BufferLimited Word BufferOverflowPolicy + deriving (Show) -- IMPORTANT NOTE: we cannot update the SVar after generating it as we have -- references to the original SVar stored in several functions which will keep @@ -441,10 +461,9 @@ data SVar t m a = SVar -- potentially each worker may yield one value to the buffer in the worst -- case exceeding the requested buffer size. , maxWorkerLimit :: Limit - , maxBufferLimit :: Limit - -- These two are valid and used only when maxBufferLimit is Limited. + , maxBufferLimit :: BufferStyle + -- This is valid and used only when maxBufferLimit is BufferLimited. , pushBufferSpace :: IORef Count - , pushBufferPolicy :: PushBufferPolicy -- [LOCKING] The consumer puts this MVar after emptying the buffer, workers -- block on it when the buffer becomes full. No overhead unless the buffer -- becomes full. @@ -505,7 +524,7 @@ data State t m a = State -- persistent configuration, state that remains valid until changed by -- an explicit setting via a combinator. , _threadsHigh :: Limit - , _bufferHigh :: Limit + , _bufferHigh :: BufferStyle -- XXX these two can be collapsed into a single type , _streamLatency :: Maybe NanoSecond64 -- bootstrap latency , _maxStreamRate :: Maybe Rate @@ -523,9 +542,11 @@ data State t m a = State magicMaxBuffer :: Word magicMaxBuffer = 1500 -defaultMaxThreads, defaultMaxBuffer :: Limit +defaultMaxThreads :: Limit defaultMaxThreads = Limited magicMaxBuffer -defaultMaxBuffer = Limited magicMaxBuffer + +defaultMaxBuffer :: BufferStyle +defaultMaxBuffer = BufferLimited magicMaxBuffer PushBufferBlock -- The fields prefixed by an _ are not to be accessed or updated directly but -- via smart accessor APIs. @@ -592,18 +613,27 @@ setMaxThreads n st = getMaxThreads :: State t m a -> Limit getMaxThreads = _threadsHigh +setBufferStyle :: BufferStyle -> State t m a -> State t m a +setBufferStyle style st = st { _bufferHigh = style } + setMaxBuffer :: Int -> State t m a -> State t m a -setMaxBuffer n st = - st { _bufferHigh = - if n < 0 - then Unlimited - else if n == 0 - then defaultMaxBuffer - else Limited (fromIntegral n) - } +setMaxBuffer n = setBufferStyle style + where + style = + if n < 0 + then BufferUnlimited + else if n == 0 + then defaultMaxBuffer + else BufferLimited (fromIntegral n) PushBufferBlock + +getBufferStyle :: State t m a -> BufferStyle +getBufferStyle = _bufferHigh getMaxBuffer :: State t m a -> Limit -getMaxBuffer = _bufferHigh +getMaxBuffer st = + case getBufferStyle st of + BufferLimited n _ -> Limited n + _ -> Unlimited setStreamRate :: Maybe Rate -> State t m a -> State t m a setStreamRate r st = st { _maxStreamRate = r } @@ -1000,18 +1030,18 @@ incrementYieldLimit sv = -- XXX Only yields should be counted in the buffer limit and not the Stop -- events. +-- +-- XXX we can parameterize the SVar with a buffer type to reduce the runtime +-- overhead of determining the buffer type before queuing the elements. {-# INLINE decrementBufferLimit #-} -decrementBufferLimit :: SVar t m a -> IO () -decrementBufferLimit sv = - case maxBufferLimit sv of - Unlimited -> return () - Limited _ -> do +decrementBufferLimit :: SVar t m a -> BufferOverflowPolicy -> IO () +decrementBufferLimit sv policy = do let ref = pushBufferSpace sv old <- atomicModifyIORefCAS ref $ \x -> (if x >= 1 then x - 1 else x, x) when (old <= 0) $ - case pushBufferPolicy sv of + case policy of PushBufferBlock -> blockAndRetry PushBufferDropNew -> do -- We just drop one item and proceed. It is possible @@ -1031,6 +1061,7 @@ decrementBufferLimit sv = when block blockAndRetry -- XXX need a dequeue or ring buffer for this PushBufferDropOld -> undefined + PushBufferToFile _ -> undefined where @@ -1053,19 +1084,19 @@ decrementBufferLimit sv = incrementBufferLimit :: SVar t m a -> IO () incrementBufferLimit sv = case maxBufferLimit sv of - Unlimited -> return () - Limited _ -> do + BufferLimited _ _ -> do atomicModifyIORefCAS_ (pushBufferSpace sv) (+ 1) writeBarrier void $ liftIO $ tryPutMVar (pushBufferMVar sv) () + _ -> return () {-# INLINE resetBufferLimit #-} resetBufferLimit :: SVar t m a -> IO () resetBufferLimit sv = case maxBufferLimit sv of - Unlimited -> return () - Limited n -> atomicModifyIORefCAS_ (pushBufferSpace sv) - (const (fromIntegral n)) + BufferLimited n _ -> atomicModifyIORefCAS_ (pushBufferSpace sv) + (const (fromIntegral n)) + _ -> return () {-# INLINE sendWithDoorBell #-} sendWithDoorBell :: @@ -1092,6 +1123,27 @@ sendWithDoorBell q bell msg = do send :: SVar t m a -> ChildEvent a -> IO Int send sv msg = sendWithDoorBell (outputQueue sv) (outputDoorBell sv) msg +-- | Just replace the previous value in the buffer. +sendReplace :: SVar t m a -> ChildEvent a -> IO () +sendReplace sv msg = do + -- XXX we can use a nonlist buffer to make it faster, we do not need a + -- tuple here, for Prim/Storable streams we can also avoid using an IORef + -- we can just use an unboxed reference. + let q = outputQueue sv + oldlen <- atomicModifyIORefCAS q $ \(_, n) -> (([msg], 1), n) + when (oldlen <= 0) $ do + -- The wake up must happen only after the store has finished otherwise + -- we can have lost wakeup problems. + writeBarrier + -- Since multiple workers can try this at the same time, it is possible + -- that we may put a spurious MVar after the consumer has already seen + -- the output. But that's harmless, at worst it may cause the consumer + -- to read the queue again and find it empty. + -- The important point is that the consumer is guaranteed to receive a + -- doorbell if something was added to the queue after it empties it. + let bell = outputDoorBell sv + void $ tryPutMVar bell () + -- There is no bound implemented on the buffer, this is assumed to be low -- traffic. sendToProducer :: SVar t m a -> ChildEvent a -> IO Int @@ -1206,10 +1258,10 @@ sendYield sv mwinfo msg = do oldlen <- send sv msg let limit = maxBufferLimit sv bufferSpaceOk <- case limit of - Unlimited -> return True - Limited lim -> do + BufferLimited lim _ -> do active <- readIORef (workerCount sv) return $ (oldlen + 1) < (fromIntegral lim - active) + _ -> return True rateLimitOk <- case mwinfo of Just winfo -> @@ -2256,9 +2308,8 @@ getAheadSVar st f mrun = do { outputQueue = outQ , outputQueueFromConsumer = undefined , remainingWork = yl - , maxBufferLimit = getMaxBuffer st + , maxBufferLimit = getBufferStyle st , pushBufferSpace = undefined - , pushBufferPolicy = undefined , pushBufferMVar = undefined , maxWorkerLimit = min (getMaxThreads st) (getMaxBuffer st) , yieldRateInfo = rateInfo @@ -2334,11 +2385,12 @@ getParallelSVar ss st mrun = do Nothing -> return Nothing Just x -> Just <$> newIORef x rateInfo <- getYieldRateInfo st - let bufLim = - case getMaxBuffer st of - Unlimited -> undefined - Limited x -> (fromIntegral x) - remBuf <- newIORef bufLim + let bufSpace = + case getBufferStyle st of + BufferUnlimited -> undefined + BufferLast -> undefined + BufferLimited x _ -> fromIntegral x + remBuf <- newIORef bufSpace pbMVar <- newMVar () stats <- newSVarStats @@ -2353,9 +2405,8 @@ getParallelSVar ss st mrun = do SVar { outputQueue = outQ , outputQueueFromConsumer = outQRev , remainingWork = yl - , maxBufferLimit = getMaxBuffer st + , maxBufferLimit = getBufferStyle st , pushBufferSpace = remBuf - , pushBufferPolicy = PushBufferBlock , pushBufferMVar = pbMVar , maxWorkerLimit = Unlimited -- Used only for diagnostics diff --git a/src/Streamly/Internal/Data/Stream/Ahead.hs b/src/Streamly/Internal/Data/Stream/Ahead.hs index ed7b271e34..7074f2fb28 100644 --- a/src/Streamly/Internal/Data/Stream/Ahead.hs +++ b/src/Streamly/Internal/Data/Stream/Ahead.hs @@ -143,9 +143,9 @@ underMaxHeap sv hp = do -- XXX simplify this let maxHeap = case maxBufferLimit sv of - Limited lim -> Limited $ + BufferLimited lim _ -> Limited $ max 0 (lim - fromIntegral len) - Unlimited -> Unlimited + _ -> Unlimited case maxHeap of Limited lim -> do diff --git a/src/Streamly/Internal/Data/Stream/Async.hs b/src/Streamly/Internal/Data/Stream/Async.hs index 74ad61d9ee..9e0cf9f048 100644 --- a/src/Streamly/Internal/Data/Stream/Async.hs +++ b/src/Streamly/Internal/Data/Stream/Async.hs @@ -295,9 +295,8 @@ getLifoSVar st mrun = do { outputQueue = outQ , outputQueueFromConsumer = undefined , remainingWork = yl - , maxBufferLimit = getMaxBuffer st + , maxBufferLimit = getBufferStyle st , pushBufferSpace = undefined - , pushBufferPolicy = undefined , pushBufferMVar = undefined , maxWorkerLimit = min (getMaxThreads st) (getMaxBuffer st) , yieldRateInfo = rateInfo @@ -392,9 +391,8 @@ getFifoSVar st mrun = do { outputQueue = outQ , outputQueueFromConsumer = undefined , remainingWork = yl - , maxBufferLimit = getMaxBuffer st + , maxBufferLimit = getBufferStyle st , pushBufferSpace = undefined - , pushBufferPolicy = undefined , pushBufferMVar = undefined , maxWorkerLimit = min (getMaxThreads st) (getMaxBuffer st) , yieldRateInfo = rateInfo diff --git a/src/Streamly/Internal/Data/Stream/Combinators.hs b/src/Streamly/Internal/Data/Stream/Combinators.hs index 9e016fe22f..c874e17b8c 100644 --- a/src/Streamly/Internal/Data/Stream/Combinators.hs +++ b/src/Streamly/Internal/Data/Stream/Combinators.hs @@ -62,6 +62,15 @@ maxThreadsSerial :: Int -> SerialT m a -> SerialT m a maxThreadsSerial _ = id -} +-- XXX The actual buffer size can be double of the specified value because the +-- consumer thread takes the whole buffer in one go and decrements the used +-- buffer space to 0. Since the full buffer space is now available to the +-- producers they can again fill it even though the consumer may not yet have +-- actually consumed any of the previous items. So the actual buffer is in the +-- range n and 2n where n is the buffer size specified by the user. We can make +-- this precise by having the consumer also modify the buffer count, but then +-- there will be more lock contention. +-- -- | Specify the maximum size of the buffer for storing the results from -- concurrent computations. If the buffer becomes full we stop spawning more -- concurrent tasks until there is space in the buffer. diff --git a/src/Streamly/Internal/Data/Stream/Parallel.hs b/src/Streamly/Internal/Data/Stream/Parallel.hs index 2ab93e6d4a..b9dd89dd4e 100644 --- a/src/Streamly/Internal/Data/Stream/Parallel.hs +++ b/src/Streamly/Internal/Data/Stream/Parallel.hs @@ -31,10 +31,12 @@ module Streamly.Internal.Data.Stream.Parallel , mkParallel , applyParallel , foldParallel + , sample + , sampleWithDefault ) where -import Control.Concurrent (myThreadId, takeMVar) +import Control.Concurrent (myThreadId, takeMVar, tryPutMVar) import Control.Monad (when) import Control.Monad.Base (MonadBase(..), liftBaseDefault) import Control.Monad.Catch (MonadThrow, throwM) @@ -54,7 +56,7 @@ import Prelude hiding (map) import qualified Data.Set as Set import Streamly.Internal.Data.Stream.SVar - (fromSVar, fromProducer, fromConsumer, pushToFold) + (fromSVar, fromProducer, fromProducerPeek, fromConsumer, pushToFold) import Streamly.Internal.Data.Stream.StreamK (IsStream(..), Stream, mkStream, foldStream, foldStreamShared, adapt) @@ -85,18 +87,30 @@ runOne st m0 winfo = where go m = do - liftIO $ decrementBufferLimit sv - foldStreamShared st yieldk single stop m + case maxBufferLimit sv of + BufferUnlimited -> + foldStreamShared st yieldk single stopUnlim m + BufferLast -> + foldStreamShared st yieldkRep singleRep stopUnlim m + BufferLimited _ policy -> do + liftIO $ decrementBufferLimit sv policy + foldStreamShared st yieldk single stopLim m sv = fromJust $ streamVar st - stop = liftIO $ do + stopUnlim = liftIO $ sendStop sv winfo + stopLim = liftIO $ do incrementBufferLimit sv sendStop sv winfo + sendit a = liftIO $ void $ send sv (ChildYield a) single a = sendit a >> (liftIO $ sendStop sv winfo) yieldk a r = sendit a >> go r + sendReplaceIt a = liftIO $ void $ sendReplace sv (ChildYield a) + singleRep a = sendReplaceIt a >> (liftIO $ sendStop sv winfo) + yieldkRep a r = sendReplaceIt a >> go r + runOneLimited :: MonadIO m => State Stream m a -> Stream m a -> Maybe WorkerInfo -> m () @@ -108,22 +122,34 @@ runOneLimited st m0 winfo = go m0 yieldLimitOk <- liftIO $ decrementYieldLimit sv if yieldLimitOk then do - liftIO $ decrementBufferLimit sv - foldStreamShared st yieldk single stop m + case maxBufferLimit sv of + BufferUnlimited -> + foldStreamShared st yieldk single stopUnlim m + BufferLast -> + foldStreamShared st yieldkRep singleRep stopUnlim m + BufferLimited _ policy -> do + liftIO $ decrementBufferLimit sv policy + foldStreamShared st yieldk single stopLim m else do liftIO $ cleanupSVarFromWorker sv liftIO $ sendStop sv winfo sv = fromJust $ streamVar st - stop = liftIO $ do + stopUnlim = liftIO $ sendStop sv winfo + stopLim = liftIO $ do incrementBufferLimit sv incrementYieldLimit sv sendStop sv winfo + sendit a = liftIO $ void $ send sv (ChildYield a) single a = sendit a >> (liftIO $ sendStop sv winfo) yieldk a r = sendit a >> go r + sendReplaceIt a = liftIO $ void $ sendReplace sv (ChildYield a) + singleRep a = sendReplaceIt a >> (liftIO $ sendStop sv winfo) + yieldkRep a r = sendReplaceIt a >> go r + ------------------------------------------------------------------------------- -- Consing and appending a stream in parallel style ------------------------------------------------------------------------------- @@ -386,6 +412,67 @@ tapAsync f m = mkStream $ \st yld sng stp -> do sv <- newFoldSVar st f foldStreamShared st yld sng stp (teeToSVar sv m) +------------------------------------------------------------------------------ +-- Sampling +------------------------------------------------------------------------------ + +-- | The input stream of 'sample' is asynchronously evaluated in a loop, the +-- latest value from the evaluation is retained, whenever the output stream +-- produced by 'sample' is evaluated it supplies the latest sample available +-- from the input stream. If the input has not yet generated any output +-- 'sample' waits for the first sample to arrive. Any exceptions from the +-- input stream are propagated to the output stream. +-- +-- @ +-- S.mapM_ (\x -> print x >> threadDelay 100000) +-- $ S.sample +-- $ S.fromList [1..] +-- @ +-- +-- /Internal/ +-- +{-# INLINE sample #-} +sample :: (IsStream t, MonadAsync m) => t m a -> t m a +sample m = mkStream $ \s yld sng stp -> do + let st = setBufferStyle BufferLast s + sv <- newParallelVar StopNone (adaptState st) + -- pushWorkerPar sv (runOne st{streamVar = Just sv} (toStream m)) + D.toSVarParallel (adaptState st) sv $ D.toStreamD m + -- Hack! drain the putMVar from modifyThread in D.toSVarParallel + -- so that we can wait for the first event in fromProducerPeek + liftIO $ takeMVar (outputDoorBell sv) + foldStream st yld sng stp $ fromProducerPeek sv + +-- This can be implemented by merging the sampled stream with a default stream +-- using a priority merge giving higher priority to the sampled stream. +-- +-- | Like 'sample' but uses a default sample when no output is yet generated by +-- the input stream. +-- +-- @ +-- S.mapM_ (\x -> print x >> threadDelay 100000) +-- $ Par.sampleWithDefault 0 +-- $ S.mapM (\x -> +-- if x == 1 +-- then threadDelay 1000000 >> return x +-- else return x) +-- $ S.fromList [1..] +-- @ +-- +-- /Internal/ +-- +{-# INLINE sampleWithDefault #-} +sampleWithDefault :: (IsStream t, MonadAsync m) => a -> t m a -> t m a +sampleWithDefault def m = mkStream $ \s yld sng stp -> do + let st = setBufferStyle BufferLast s + sv <- newParallelVar StopNone (adaptState st) + -- initialize the buffer with the default value + liftIO $ writeIORef (outputQueue sv) ([ChildYield def],1) + liftIO $ void $ tryPutMVar (outputDoorBell sv) () + -- pushWorkerPar sv (runOne st{streamVar = Just sv} (toStream m)) + D.toSVarParallel (adaptState st) sv $ D.toStreamD m + foldStream st yld sng stp $ fromProducerPeek sv + ------------------------------------------------------------------------------ -- ParallelT ------------------------------------------------------------------------------ diff --git a/src/Streamly/Internal/Data/Stream/SVar.hs b/src/Streamly/Internal/Data/Stream/SVar.hs index 5e20092b54..4e7ab59b42 100644 --- a/src/Streamly/Internal/Data/Stream/SVar.hs +++ b/src/Streamly/Internal/Data/Stream/SVar.hs @@ -16,14 +16,16 @@ module Streamly.Internal.Data.Stream.SVar , fromStreamVar , fromProducer , fromConsumer + , fromProducerPeek , toSVar , pushToFold ) where +import Control.Concurrent.MVar (takeMVar) import Control.Exception (fromException) import Control.Monad (when, void) -import Control.Monad.Catch (throwM) +import Control.Monad.Catch (throwM, MonadThrow) import Control.Monad.IO.Class (MonadIO(liftIO)) import Data.IORef (newIORef, readIORef, mkWeakIORef, writeIORef) import Data.Maybe (isNothing) @@ -118,7 +120,7 @@ toSVar sv m = toStreamVar sv (toStream m) -- | Pull a stream from an SVar. {-# NOINLINE fromProducer #-} -fromProducer :: MonadAsync m => SVar Stream m a -> Stream m a +fromProducer :: MonadIO m => SVar Stream m a -> Stream m a fromProducer sv = mkStream $ \st yld sng stp -> do list <- readOutputQ sv -- Reversing the output is important to guarantee that we process the @@ -150,6 +152,53 @@ fromProducer sv = mkStream $ \st yld sng stp -> do Nothing -> allDone stp Just _ -> error "Bug: fromProducer: received exception" +{-# INLINE peekOutputQPar #-} +peekOutputQPar :: MonadIO m => SVar t m a -> m [ChildEvent a] +peekOutputQPar sv = liftIO $ do + case yieldRateInfo sv of + Nothing -> return () + Just yinfo -> void $ collectLatency sv yinfo False + fst `fmap` readIORef (outputQueue sv) + +-- | Pull a stream from an SVar. +{-# NOINLINE fromProducerPeek #-} +fromProducerPeek :: (IsStream t, MonadIO m) => SVar Stream m a -> t m a +fromProducerPeek sv = go True + where + + go True = mkStream $ \st yld sng stp -> do + liftIO $ withDiagMVar sv "peekOutputQPar: doorbell" + $ takeMVar (outputDoorBell sv) + list <- liftIO $ fst `fmap` readIORef (outputQueue sv) + when (Prelude.null list) $ error "bug empty list" + foldStream st yld sng stp $ processEvents $ reverse list + + go False = mkStream $ \st yld sng stp -> do + list <- peekOutputQPar sv + foldStream st yld sng stp $ processEvents $ reverse list + + allDone stp = do + when (svarInspectMode sv) $ do + t <- liftIO $ getTime Monotonic + liftIO $ writeIORef (svarStopTime (svarStats sv)) (Just t) + liftIO $ printSVar sv "SVar Done" + sendStopToProducer sv + stp + + {-# INLINE processEvents #-} + processEvents [] = mkStream $ \st yld sng stp -> do + foldStream st yld sng stp $ go False + + processEvents (ev : es) = mkStream $ \_ yld _ stp -> do + let rest = processEvents es + case ev of + ChildYield a -> yld a rest + ChildStop tid e -> do + accountThread sv tid + case e of + Nothing -> allDone stp + Just _ -> error "Bug: fromProducer: received exception" + ------------------------------------------------------------------------------- -- Process events received by the producer thread from the consumer side ------------------------------------------------------------------------------- @@ -160,7 +209,7 @@ fromProducer sv = mkStream $ \st yld sng stp -> do -- exceptions or handle them and still keep driving the fold. -- {-# NOINLINE fromConsumer #-} -fromConsumer :: MonadAsync m => SVar Stream m a -> m Bool +fromConsumer :: (MonadIO m, MonadThrow m) => SVar Stream m a -> m Bool fromConsumer sv = do (list, _) <- liftIO $ readOutputQBasic (outputQueueFromConsumer sv) -- Reversing the output is important to guarantee that we process the @@ -182,7 +231,7 @@ fromConsumer sv = do -- push values to a fold worker via an SVar. Returns whether the fold is done. {-# INLINE pushToFold #-} -pushToFold :: MonadAsync m => SVar Stream m a -> a -> m Bool +pushToFold :: (MonadIO m, MonadThrow m) => SVar Stream m a -> a -> m Bool pushToFold sv a = do -- Check for exceptions before decrement so that we do not -- block forever if the child already exited with an exception. @@ -200,6 +249,11 @@ pushToFold sv a = do if done then return True else liftIO $ do - decrementBufferLimit sv - void $ send sv (ChildYield a) + let sendit = void $ send sv (ChildYield a) + in case maxBufferLimit sv of + BufferUnlimited -> sendit + BufferLast -> sendReplace sv (ChildYield a) + BufferLimited _ policy -> do + decrementBufferLimit sv policy + sendit return False diff --git a/src/Streamly/Internal/Prelude.hs b/src/Streamly/Internal/Prelude.hs index 6daba0dd4f..8e2fc79540 100644 --- a/src/Streamly/Internal/Prelude.hs +++ b/src/Streamly/Internal/Prelude.hs @@ -342,6 +342,7 @@ module Streamly.Internal.Prelude , trace , tap , tapAsync + , Par.sample -- * Windowed Classification