@@ -31,12 +31,15 @@ module Streamly.Internal.Data.SVar
3131
3232 -- State threaded around the stream
3333 , Limit (.. )
34+ , BufferStyle (.. )
3435 , State (streamVar )
3536 , defState
3637 , adaptState
3738 , getMaxThreads
3839 , setMaxThreads
3940 , getMaxBuffer
41+ , getBufferStyle
42+ , setBufferStyle
4043 , setMaxBuffer
4144 , getStreamRate
4245 , setStreamRate
@@ -62,6 +65,7 @@ module Streamly.Internal.Data.SVar
6265 , ChildEvent (.. )
6366 , AheadHeapEntry (.. )
6467 , send
68+ , sendReplace
6569 , sendToProducer
6670 , sendYield
6771 , sendStop
@@ -387,11 +391,27 @@ data SVarStopStyle =
387391-- XXX Maybe we can separate the implementation in two different types instead
388392-- of using a common SVar type.
389393--
390- data PushBufferPolicy =
394+ data BufferOverflowPolicy =
391395 PushBufferDropNew -- drop the latest element and continue
392396 | PushBufferDropOld -- drop the oldest element and continue
393397 | PushBufferBlock -- block the thread until space
394398 -- becomes available
399+ | PushBufferToFile String -- Append the buffer to a file on disk
400+ -- The String is the filename prefix, two files
401+ -- are used, <filename>1 and <filename>2. While
402+ -- the consumer is consuming from one file the
403+ -- producers are writing to the other file. The
404+ -- current Index ownership is maintained in the
405+ -- SVar.
406+ deriving (Show )
407+
408+ -- XXX in general, instead of just the last event we can store last N events in
409+ -- the SVar, we can have a BufferLastN case.
410+ data BufferStyle
411+ = BufferUnlimited
412+ | BufferLast -- Buffer only the latest element
413+ | BufferLimited Word BufferOverflowPolicy
414+ deriving (Show )
395415
396416-- IMPORTANT NOTE: we cannot update the SVar after generating it as we have
397417-- references to the original SVar stored in several functions which will keep
@@ -441,10 +461,9 @@ data SVar t m a = SVar
441461 -- potentially each worker may yield one value to the buffer in the worst
442462 -- case exceeding the requested buffer size.
443463 , maxWorkerLimit :: Limit
444- , maxBufferLimit :: Limit
445- -- These two are valid and used only when maxBufferLimit is Limited .
464+ , maxBufferLimit :: BufferStyle
465+ -- This is valid and used only when maxBufferLimit is BufferLimited .
446466 , pushBufferSpace :: IORef Count
447- , pushBufferPolicy :: PushBufferPolicy
448467 -- [LOCKING] The consumer puts this MVar after emptying the buffer, workers
449468 -- block on it when the buffer becomes full. No overhead unless the buffer
450469 -- becomes full.
@@ -505,7 +524,7 @@ data State t m a = State
505524 -- persistent configuration, state that remains valid until changed by
506525 -- an explicit setting via a combinator.
507526 , _threadsHigh :: Limit
508- , _bufferHigh :: Limit
527+ , _bufferHigh :: BufferStyle
509528 -- XXX these two can be collapsed into a single type
510529 , _streamLatency :: Maybe NanoSecond64 -- bootstrap latency
511530 , _maxStreamRate :: Maybe Rate
@@ -523,9 +542,11 @@ data State t m a = State
523542magicMaxBuffer :: Word
524543magicMaxBuffer = 1500
525544
526- defaultMaxThreads , defaultMaxBuffer :: Limit
545+ defaultMaxThreads :: Limit
527546defaultMaxThreads = Limited magicMaxBuffer
528- defaultMaxBuffer = Limited magicMaxBuffer
547+
548+ defaultMaxBuffer :: BufferStyle
549+ defaultMaxBuffer = BufferLimited magicMaxBuffer PushBufferBlock
529550
530551-- The fields prefixed by an _ are not to be accessed or updated directly but
531552-- via smart accessor APIs.
@@ -592,18 +613,27 @@ setMaxThreads n st =
592613getMaxThreads :: State t m a -> Limit
593614getMaxThreads = _threadsHigh
594615
616+ setBufferStyle :: BufferStyle -> State t m a -> State t m a
617+ setBufferStyle style st = st { _bufferHigh = style }
618+
595619setMaxBuffer :: Int -> State t m a -> State t m a
596- setMaxBuffer n st =
597- st { _bufferHigh =
598- if n < 0
599- then Unlimited
600- else if n == 0
601- then defaultMaxBuffer
602- else Limited (fromIntegral n)
603- }
620+ setMaxBuffer n = setBufferStyle style
621+ where
622+ style =
623+ if n < 0
624+ then BufferUnlimited
625+ else if n == 0
626+ then defaultMaxBuffer
627+ else BufferLimited (fromIntegral n) PushBufferBlock
628+
629+ getBufferStyle :: State t m a -> BufferStyle
630+ getBufferStyle = _bufferHigh
604631
605632getMaxBuffer :: State t m a -> Limit
606- getMaxBuffer = _bufferHigh
633+ getMaxBuffer st =
634+ case getBufferStyle st of
635+ BufferLimited n _ -> Limited n
636+ _ -> Unlimited
607637
608638setStreamRate :: Maybe Rate -> State t m a -> State t m a
609639setStreamRate r st = st { _maxStreamRate = r }
@@ -1000,18 +1030,18 @@ incrementYieldLimit sv =
10001030
10011031-- XXX Only yields should be counted in the buffer limit and not the Stop
10021032-- events.
1033+ --
1034+ -- XXX we can parameterize the SVar with a buffer type to reduce the runtime
1035+ -- overhead of determining the buffer type before queuing the elements.
10031036
10041037{-# INLINE decrementBufferLimit #-}
1005- decrementBufferLimit :: SVar t m a -> IO ()
1006- decrementBufferLimit sv =
1007- case maxBufferLimit sv of
1008- Unlimited -> return ()
1009- Limited _ -> do
1038+ decrementBufferLimit :: SVar t m a -> BufferOverflowPolicy -> IO ()
1039+ decrementBufferLimit sv policy = do
10101040 let ref = pushBufferSpace sv
10111041 old <- atomicModifyIORefCAS ref $ \ x ->
10121042 (if x >= 1 then x - 1 else x, x)
10131043 when (old <= 0 ) $
1014- case pushBufferPolicy sv of
1044+ case policy of
10151045 PushBufferBlock -> blockAndRetry
10161046 PushBufferDropNew -> do
10171047 -- We just drop one item and proceed. It is possible
@@ -1031,6 +1061,7 @@ decrementBufferLimit sv =
10311061 when block blockAndRetry
10321062 -- XXX need a dequeue or ring buffer for this
10331063 PushBufferDropOld -> undefined
1064+ PushBufferToFile _ -> undefined
10341065
10351066 where
10361067
@@ -1053,19 +1084,19 @@ decrementBufferLimit sv =
10531084incrementBufferLimit :: SVar t m a -> IO ()
10541085incrementBufferLimit sv =
10551086 case maxBufferLimit sv of
1056- Unlimited -> return ()
1057- Limited _ -> do
1087+ BufferLimited _ _ -> do
10581088 atomicModifyIORefCAS_ (pushBufferSpace sv) (+ 1 )
10591089 writeBarrier
10601090 void $ liftIO $ tryPutMVar (pushBufferMVar sv) ()
1091+ _ -> return ()
10611092
10621093{-# INLINE resetBufferLimit #-}
10631094resetBufferLimit :: SVar t m a -> IO ()
10641095resetBufferLimit sv =
10651096 case maxBufferLimit sv of
1066- Unlimited -> return ( )
1067- Limited n -> atomicModifyIORefCAS_ (pushBufferSpace sv )
1068- ( const ( fromIntegral n) )
1097+ BufferLimited n _ -> atomicModifyIORefCAS_ (pushBufferSpace sv )
1098+ ( const ( fromIntegral n) )
1099+ _ -> return ( )
10691100
10701101{-# INLINE sendWithDoorBell #-}
10711102sendWithDoorBell ::
@@ -1092,6 +1123,27 @@ sendWithDoorBell q bell msg = do
10921123send :: SVar t m a -> ChildEvent a -> IO Int
10931124send sv msg = sendWithDoorBell (outputQueue sv) (outputDoorBell sv) msg
10941125
1126+ -- | Just replace the previous value in the buffer.
1127+ sendReplace :: SVar t m a -> ChildEvent a -> IO ()
1128+ sendReplace sv msg = do
1129+ -- XXX we can use a nonlist buffer to make it faster, we do not need a
1130+ -- tuple here, for Prim/Storable streams we can also avoid using an IORef
1131+ -- we can just use an unboxed reference.
1132+ let q = outputQueue sv
1133+ oldlen <- atomicModifyIORefCAS q $ \ (_, n) -> (([msg], 1 ), n)
1134+ when (oldlen <= 0 ) $ do
1135+ -- The wake up must happen only after the store has finished otherwise
1136+ -- we can have lost wakeup problems.
1137+ writeBarrier
1138+ -- Since multiple workers can try this at the same time, it is possible
1139+ -- that we may put a spurious MVar after the consumer has already seen
1140+ -- the output. But that's harmless, at worst it may cause the consumer
1141+ -- to read the queue again and find it empty.
1142+ -- The important point is that the consumer is guaranteed to receive a
1143+ -- doorbell if something was added to the queue after it empties it.
1144+ let bell = outputDoorBell sv
1145+ void $ tryPutMVar bell ()
1146+
10951147-- There is no bound implemented on the buffer, this is assumed to be low
10961148-- traffic.
10971149sendToProducer :: SVar t m a -> ChildEvent a -> IO Int
@@ -1206,10 +1258,10 @@ sendYield sv mwinfo msg = do
12061258 oldlen <- send sv msg
12071259 let limit = maxBufferLimit sv
12081260 bufferSpaceOk <- case limit of
1209- Unlimited -> return True
1210- Limited lim -> do
1261+ BufferLimited lim _ -> do
12111262 active <- readIORef (workerCount sv)
12121263 return $ (oldlen + 1 ) < (fromIntegral lim - active)
1264+ _ -> return True
12131265 rateLimitOk <-
12141266 case mwinfo of
12151267 Just winfo ->
@@ -2256,9 +2308,8 @@ getAheadSVar st f mrun = do
22562308 { outputQueue = outQ
22572309 , outputQueueFromConsumer = undefined
22582310 , remainingWork = yl
2259- , maxBufferLimit = getMaxBuffer st
2311+ , maxBufferLimit = getBufferStyle st
22602312 , pushBufferSpace = undefined
2261- , pushBufferPolicy = undefined
22622313 , pushBufferMVar = undefined
22632314 , maxWorkerLimit = min (getMaxThreads st) (getMaxBuffer st)
22642315 , yieldRateInfo = rateInfo
@@ -2334,11 +2385,12 @@ getParallelSVar ss st mrun = do
23342385 Nothing -> return Nothing
23352386 Just x -> Just <$> newIORef x
23362387 rateInfo <- getYieldRateInfo st
2337- let bufLim =
2338- case getMaxBuffer st of
2339- Unlimited -> undefined
2340- Limited x -> (fromIntegral x)
2341- remBuf <- newIORef bufLim
2388+ let bufSpace =
2389+ case getBufferStyle st of
2390+ BufferUnlimited -> undefined
2391+ BufferLast -> undefined
2392+ BufferLimited x _ -> fromIntegral x
2393+ remBuf <- newIORef bufSpace
23422394 pbMVar <- newMVar ()
23432395
23442396 stats <- newSVarStats
@@ -2353,9 +2405,8 @@ getParallelSVar ss st mrun = do
23532405 SVar { outputQueue = outQ
23542406 , outputQueueFromConsumer = outQRev
23552407 , remainingWork = yl
2356- , maxBufferLimit = getMaxBuffer st
2408+ , maxBufferLimit = getBufferStyle st
23572409 , pushBufferSpace = remBuf
2358- , pushBufferPolicy = PushBufferBlock
23592410 , pushBufferMVar = pbMVar
23602411 , maxWorkerLimit = Unlimited
23612412 -- Used only for diagnostics
0 commit comments