diff --git a/haskell/lvish/Control/LVish/Sched.hs b/haskell/lvish/Control/LVish/Sched.hs index 250890c9..571e8b09 100644 --- a/haskell/lvish/Control/LVish/Sched.hs +++ b/haskell/lvish/Control/LVish/Sched.hs @@ -197,21 +197,6 @@ isFrozen (LVar {status}) = do Active _ -> return False Frozen -> return True --- Optionall wraps an IO action so that it will only execute once even if --- called multiple times (even concurrently). -dedupWhen :: Bool -> (a -> IO ()) -> IO (a -> IO ()) -{-# INLINE dedupWhen #-} -dedupWhen dedup c = - if dedup - then do - hasInvoked <- newIORef False - return $ \x -> do - ticket <- readForCAS hasInvoked - unless (peekTicket ticket) $ do - (winner, _) <- casIORef hasInvoked ticket True - when winner $ c x - else return c - -- | Logging within the (internal) Par monad. logStrLn :: Int -> String -> Par () @@ -234,12 +219,12 @@ logHelper lgr num msg = when (dbgLvl >= 1) $ do Just lgr -> L.logOn lgr msg' Nothing -> hPutStrLn stderr ("WARNING/nologger:"++show msg') -logWith :: Sched.State a s -> Int -> String -> IO () -logOffRecord :: Sched.State a s -> Int -> String -> IO () +logWith :: Queue.State a s -> Int -> String -> IO () +logOffRecord :: Queue.State a s -> Int -> String -> IO () #ifdef DEBUG_LVAR -- Only when the debug level is 1 or higher is the logger even initialized: -logWith q lvl str = logHelper (Sched.logger q) (Sched.no q) (L.StrMsg lvl str) -logOffRecord q lvl str = logHelper (Sched.logger q) (Sched.no q) (L.OffTheRecord lvl str) +logWith q lvl str = logHelper (Queue.logger q) (Queue.no q) (L.StrMsg lvl str) +logOffRecord q lvl str = logHelper (Queue.logger q) (Queue.no q) (L.OffTheRecord lvl str) #else logWith _ _ _ = return () logOffRecord _ _ _ = return () @@ -286,7 +271,9 @@ getLV lv@(LVar {state, status}) globalThresh deltaThresh = mkPar $ \k q -> do -- continuation immediately Nothing -> do -- /transiently/ not past the threshhold; block - enableCont <- dedupWhen (not $ Queue.idemp q) $ Queue.pushWork q . k + execFlag <- newDedupCheck + let enableCont b = unless (Queue.idemp q) $ + winnerCheck execFlag q (Queue.pushWork q (k b)) (return ()) let onUpdate d = unblockWhen $ deltaThresh d onFreeze = unblockWhen $ globalThresh state True @@ -299,8 +286,7 @@ getLV lv@(LVar {state, status}) globalThresh deltaThresh = mkPar $ \k q -> do B.remove tok enableCont b - logWith q 8$ " [dbg-lvish] getLV "++show(unsafeName execFlag)++ - ": blocking on LVar, registering listeners..." + logWith q 8$ " [dbg-lvish] getLV: blocking on LVar, registering listeners..." -- add listener, i.e., move the continuation to the waiting bag tok <- B.put listeners $ Listener onUpdate onFreeze @@ -337,7 +323,7 @@ getLV lv@(LVar {state, status}) globalThresh deltaThresh = mkPar $ \k q -> do {-# INLINE newDedupCheck #-} {-# INLINE winnerCheck #-} -winnerCheck :: DedupCell -> Sched.State a s -> IO () -> IO () -> IO () +winnerCheck :: DedupCell -> Queue.State a s -> IO () -> IO () -> IO () newDedupCheck :: IO DedupCell #if GET_ONCE @@ -353,7 +339,7 @@ winnerCheck execFlag q tru fal = do else do (winner, _) <- casIORef execFlag ticket True logWith q 8 $ " [dbg-lvish] getLV "++show(unsafeName execFlag) - ++" on worker "++ (show$ Sched.no q) ++": winner check? " ++show winner + ++" on worker "++ (show$ Queue.no q) ++": winner check? " ++show winner ++ ", ticks " ++ show (ticket, peekTicket ticket) if winner then tru else fal # else @@ -363,7 +349,7 @@ newDedupCheck = C2.newCounter 0 winnerCheck execFlag q tru fal = do cnt <- C2.incrCounter 1 execFlag logWith q 8 $ " [dbg-lvish] getLV "++show(unsafeName execFlag) - ++" on worker "++ (show$ Sched.no q) ++": winner check? " ++show (cnt==1) + ++" on worker "++ (show$ Queue.no q) ++": winner check? " ++show (cnt==1) ++ ", counter val " ++ show cnt if cnt==1 then tru else fal @@ -378,8 +364,6 @@ winnerCheck _ _ tr _ = tr - - -- | Update an LVar. putLV_ :: LVar a d -- ^ the LVar -> (a -> Par (Maybe d, b)) -- ^ how to do the put, and whether the LVar's @@ -508,10 +492,10 @@ closeInPool (Just hp) dedup c = do Queue.pushWork q t B.foreach (blockedOnQuiesce hp) invoke - onTerminate <- dedupWhen dedup onTerminate_ + dedupFlag <- newDedupCheck let onFinishHandler _ = ClosedPar $ \q -> do - onTerminate q + when dedup $ winnerCheck dedupFlag q (onTerminate_ q) (return ()) sched q C.inc $ numHandlers hp -- record handler invocation in pool @@ -572,9 +556,15 @@ addHandler hp LVar {state, status, handlerStatus, name} globalCB updateThresh = logWith q 4 " [dbg-lvish] addHandler: calling globalCB.." -- At registration time, traverse (globally) over the previously inserted items -- to launch any required callbacks. - let k2 x = do relLock q; k x + let k2 :: () -> ClosedPar + k2 () = case k () of + ClosedPar go -> ClosedPar $ \ q2 -> do + -- Warning! What happens if the globalCB blocks and then wakes on a different thread? + relLock q -- Release lock on original worker. + go q2 -- Continue after the addHandler. -- Ported over bugfix here from master branch. - -- There's a quirk here where we need to stick in the lock release: + -- There's a quirk here where we need to stick in the lock release + -- to happen afetr the globalCB is done (in the continuation). exec (close (globalCB state) k2) q -- | Block until a handler pool is quiescent. @@ -647,7 +637,7 @@ liftIO io = mkPar $ \k q -> do -- current Par session, otherwise it will simply throw an exception. getLogger :: Par L.Logger getLogger = mkPar $ \k q -> - let Just lgr = Sched.logger q in + let Just lgr = Queue.logger q in exec (k lgr) q -- | Return the worker that we happen to be running on. (NONDETERMINISTIC.) @@ -697,7 +687,7 @@ runParDetailed :: DbgCfg -- ^ Debugging config -> Par a -- ^ The computation to run. -> IO ([String], Either E.SomeException a) runParDetailed cfg@DbgCfg{dbgRange, dbgDests, dbgScheduling } numWrkrs comp = do - (lgr,queues) <- Sched.new cfg numWrkrs noName + (lgr,queues) <- Queue.new cfg numWrkrs noName -- We create a thread on each CPU with forkOn. The CPU on which -- the current thread is running will host the main thread; the @@ -717,7 +707,7 @@ runParDetailed cfg@DbgCfg{dbgRange, dbgDests, dbgScheduling } numWrkrs comp = do -- Use Control.Concurrent.Async to deal with exceptions: ---------------------------------------------------------------------------------- - let runWorker :: (Int,Sched.State ClosedPar LVarID) -> IO () + let runWorker :: (Int,Queue.State ClosedPar LVarID) -> IO () runWorker (cpu, q) = do if (cpu /= main_cpu) then do logOffRecord q 3 $ " [dbg-lvish] Auxillary worker #"++show cpu++" starting."