@@ -12,9 +12,7 @@ module Development.IDE.Graph.Internal.Database (compute, newDatabase, incDatabas
12
12
13
13
import Prelude hiding (unzip )
14
14
15
- import Control.Concurrent.Async
16
- import Control.Concurrent.Extra
17
- import Control.Concurrent.STM.Stats (STM , TVar , atomically ,
15
+ import Control.Concurrent.STM.Stats (STM , atomically ,
18
16
atomicallyNamed ,
19
17
modifyTVar' , newTVarIO ,
20
18
readTVar , readTVarIO ,
@@ -31,28 +29,29 @@ import Data.IORef.Extra
31
29
import Data.Maybe
32
30
import Data.Traversable (for )
33
31
import Data.Tuple.Extra
34
- import Debug.Trace (traceM )
32
+ import Debug.Trace (traceEvent )
35
33
import Development.IDE.Graph.Classes
36
34
import Development.IDE.Graph.Internal.Key
37
35
import Development.IDE.Graph.Internal.Rules
38
36
import Development.IDE.Graph.Internal.Types
39
37
import qualified Focus
40
38
import qualified ListT
41
39
import qualified StmContainers.Map as SMap
42
- import System.Time.Extra (duration , sleep )
43
- import UnliftIO (MonadUnliftIO (withRunInIO ))
44
- import qualified UnliftIO.Exception as UE
40
+ import System.Time.Extra (duration )
45
41
46
42
#if MIN_VERSION_base(4,19,0)
47
43
import Data.Functor (unzip )
48
44
#else
49
45
import Data.List.NonEmpty (unzip )
50
46
#endif
47
+ import Development.IDE.WorkerThread (TaskQueue ,
48
+ awaitRunInThreadStmInNewThread )
51
49
52
50
53
- newDatabase :: Dynamic -> TheRules -> IO Database
54
- newDatabase databaseExtra databaseRules = do
51
+ newDatabase :: TaskQueue ( IO () ) -> Dynamic -> TheRules -> IO Database
52
+ newDatabase databaseQueue databaseExtra databaseRules = do
55
53
databaseStep <- newTVarIO $ Step 0
54
+ databaseThreads <- newTVarIO []
56
55
databaseValues <- atomically SMap. new
57
56
pure Database {.. }
58
57
@@ -100,8 +99,9 @@ build db stack keys = do
100
99
else throw $ AsyncParentKill i $ Step (- 1 )
101
100
where
102
101
go = do
103
- step <- readTVarIO $ databaseStep db
104
- ! built <- runAIO step $ builder db stack (fmap newKey keys)
102
+ -- step <- readTVarIO $ databaseStep db
103
+ -- built <- mapConcurrently (builderOne db stack) (fmap newKey keys)
104
+ built <- builder db stack (fmap newKey keys)
105
105
let (ids, vs) = unzip built
106
106
pure (ids, fmap (asV . resultValue) vs)
107
107
where
@@ -112,38 +112,39 @@ build db stack keys = do
112
112
-- | Build a list of keys and return their results.
113
113
-- If none of the keys are dirty, we can return the results immediately.
114
114
-- Otherwise, a blocking computation is returned *which must be evaluated asynchronously* to avoid deadlock.
115
- builder :: (Traversable f ) => Database -> Stack -> f Key -> AIO (f (Key , Result ))
115
+ builder :: (Traversable f ) => Database -> Stack -> f Key -> IO (f (Key , Result ))
116
116
-- builder _ st kk | traceShow ("builder", st,kk) False = undefined
117
- builder db stack keys = do
118
- keyWaits <- for keys $ \ k -> builderOne db stack k
119
- ! res <- for keyWaits $ \ (k, waitR) -> do
120
- ! v<- liftIO waitR
121
- return (k, v)
122
- return res
117
+ builder db stack keys = for keys $ \ k -> builderOne db stack k
123
118
124
- builderOne :: Database -> Stack -> Key -> AIO (Key , IO Result )
125
- builderOne db@ Database {.. } stack id = UE. uninterruptibleMask $ \ restore -> do
126
- current <- liftIO $ readTVarIO databaseStep
127
- (k, registerWaitResult) <- restore $ liftIO $ atomicallyNamed " builder" $ do
119
+ builderOne :: Database -> Stack -> Key -> IO (Key , Result )
120
+ builderOne db@ Database {.. } stack id = do
121
+ traceEvent ( " builderOne: " ++ show id ) return ()
122
+ res <- liftIO $ atomicallyNamed " builder" $ do
128
123
-- Spawn the id if needed
129
124
status <- SMap. lookup id databaseValues
125
+ current@ (Step cs) <- readTVar databaseStep
126
+ let getStep = do
127
+ Step current <- readTVar databaseStep
128
+ return current
129
+
130
130
val <- case viewDirty current $ maybe (Dirty Nothing ) keyStatus status of
131
131
Dirty s -> do
132
- let act =
133
- asyncWithCleanUp
134
- ((restore $ refresh db stack id s)
135
- `UE.onException` UE. uninterruptibleMask_ (liftIO (atomicallyNamed " builder - onException" (SMap. focus updateDirty id databaseValues)))
136
- )
137
132
SMap. focus (updateStatus $ Running current s) id databaseValues
138
- return act
139
- Clean r -> pure . pure . pure $ r
133
+ traceEvent (" Starting build of key: " ++ show id ++ " , step " ++ show current)
134
+ $ awaitRunInThreadStmInNewThread getStep cs databaseQueue databaseThreads (refresh db stack id s)
135
+ $ \ e -> atomically $ SMap. focus (updateStatus $ Exception current e s) id databaseValues
136
+ return Nothing
137
+ Clean r -> return $ Just r
140
138
-- force here might contains async exceptions from previous runs
141
139
Running _step _s
142
140
| memberStack id stack -> throw $ StackException stack
143
141
| otherwise -> retry
144
- pure (id , val)
145
- waitR <- registerWaitResult
146
- return (k, waitR)
142
+ Exception _ e _s -> throw e
143
+ pure val
144
+ case res of
145
+ Just r -> return (id , r)
146
+ Nothing -> builderOne db stack id
147
+
147
148
-- | isDirty
148
149
-- only dirty when it's build time is older than the changed time of one of its dependencies
149
150
isDirty :: Foldable t => Result -> t (a , Result ) -> Bool
@@ -156,30 +157,27 @@ isDirty me = any (\(_,dep) -> resultBuilt me < resultChanged dep)
156
157
-- * If no dirty dependencies and we have evaluated the key previously, then we refresh it in the current thread.
157
158
-- This assumes that the implementation will be a lookup
158
159
-- * Otherwise, we spawn a new thread to refresh the dirty deps (if any) and the key itself
159
- refreshDeps :: KeySet -> Database -> Stack -> Key -> Result -> [KeySet ] -> AIO Result
160
+ refreshDeps :: KeySet -> Database -> Stack -> Key -> Result -> [KeySet ] -> IO Result
160
161
refreshDeps visited db stack key result = \ case
161
162
-- no more deps to refresh
162
- [] -> compute' db stack key RunDependenciesSame (Just result)
163
+ [] -> compute db stack key RunDependenciesSame (Just result)
163
164
(dep: deps) -> do
164
165
let newVisited = dep <> visited
165
166
res <- builder db stack (toListKeySet (dep `differenceKeySet` visited))
166
167
if isDirty result res
167
168
-- restart the computation if any of the deps are dirty
168
- then compute' db stack key RunDependenciesChanged (Just result)
169
+ then compute db stack key RunDependenciesChanged (Just result)
169
170
-- else kick the rest of the deps
170
171
else refreshDeps newVisited db stack key result deps
171
172
172
173
173
174
-- refresh :: Database -> Stack -> Key -> Maybe Result -> IO Result
174
175
-- refresh _ st k _ | traceShow ("refresh", st, k) False = undefined
175
- refresh :: Database -> Stack -> Key -> Maybe Result -> AIO Result
176
+ refresh :: Database -> Stack -> Key -> Maybe Result -> IO Result
176
177
refresh db stack key result = case (addStack key stack, result) of
177
178
(Left e, _) -> throw e
178
179
(Right stack, Just me@ Result {resultDeps = ResultDeps deps}) -> refreshDeps mempty db stack key me (reverse deps)
179
- (Right stack, _) -> compute' db stack key RunDependenciesChanged result
180
-
181
- compute' :: Database -> Stack -> Key -> RunMode -> Maybe Result -> AIO Result
182
- compute' db stack key mode result = liftIO $ compute db stack key mode result
180
+ (Right stack, _) -> compute db stack key RunDependenciesChanged result
183
181
-- | Compute a key.
184
182
compute :: Database -> Stack -> Key -> RunMode -> Maybe Result -> IO Result
185
183
-- compute _ st k _ _ | traceShow ("compute", st, k) False = undefined
@@ -284,68 +282,5 @@ transitiveDirtySet database = flip State.execStateT mempty . traverse_ loop
284
282
next <- lift $ atomically $ getReverseDependencies database x
285
283
traverse_ loop (maybe mempty toListKeySet next)
286
284
287
- --------------------------------------------------------------------------------
288
- -- Asynchronous computations with cancellation
289
-
290
- -- | A simple monad to implement cancellation on top of 'Async',
291
- -- generalizing 'withAsync' to monadic scopes.
292
- newtype AIO a = AIO { unAIO :: ReaderT (TVar [Async () ]) IO a }
293
- deriving newtype (Applicative , Functor , Monad , MonadIO )
294
-
295
- data AsyncParentKill = AsyncParentKill ThreadId Step
296
- deriving (Show , Eq )
297
-
298
- instance Exception AsyncParentKill where
299
- toException = asyncExceptionToException
300
- fromException = asyncExceptionFromException
301
-
302
- -- | Run the monadic computation, cancelling all the spawned asyncs if an exception arises
303
- runAIO :: Step -> AIO a -> IO a
304
- runAIO s (AIO act) = do
305
- asyncsRef <- newTVarIO []
306
- -- Log the exact exception (including async exceptions) before cleanup,
307
- -- then rethrow to preserve previous semantics.
308
- runReaderT act asyncsRef `onException` do
309
- asyncs <- atomically $ do
310
- r <- readTVar asyncsRef
311
- modifyTVar' asyncsRef $ const []
312
- return r
313
- tid <- myThreadId
314
- cleanupAsync asyncs tid s
315
-
316
- -- | Like 'async' but with built-in cancellation.
317
- -- Returns an IO action to wait on the result.
318
- asyncWithCleanUp :: AIO a -> AIO (IO a )
319
- asyncWithCleanUp act = do
320
- st <- AIO ask
321
- io <- unliftAIO act
322
- -- mask to make sure we keep track of the spawned async
323
- liftIO $ uninterruptibleMask $ \ restore -> do
324
- a <- async $ restore io
325
- atomically $ modifyTVar' st (void a : )
326
- return $ wait a
327
-
328
- unliftAIO :: AIO a -> AIO (IO a )
329
- unliftAIO act = do
330
- st <- AIO ask
331
- return $ runReaderT (unAIO act) st
332
285
333
- instance MonadUnliftIO AIO where
334
- withRunInIO k = do
335
- st <- AIO ask
336
- liftIO $ k (\ aio -> runReaderT (unAIO aio) st)
337
286
338
- cleanupAsync :: [Async a ] -> ThreadId -> Step -> IO ()
339
- -- mask to make sure we interrupt all the asyncs
340
- cleanupAsync asyncs tid step = uninterruptibleMask $ \ unmask -> do
341
- -- interrupt all the asyncs without waiting
342
- -- mapM_ (\a -> throwTo (asyncThreadId a) AsyncCancelled) asyncs
343
- mapM_ (\ a -> throwTo (asyncThreadId a) $ AsyncParentKill tid step) asyncs
344
- -- Wait until all the asyncs are done
345
- -- But if it takes more than 10 seconds, log to stderr
346
- unless (null asyncs) $ do
347
- let warnIfTakingTooLong = unmask $ forever $ do
348
- sleep 10
349
- traceM " cleanupAsync: waiting for asyncs to finish"
350
- withAsync warnIfTakingTooLong $ \ _ ->
351
- mapM_ waitCatch asyncs
0 commit comments