From b5b4b86427286002081f102d1e97baef9162851e Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Wed, 17 Feb 2016 22:08:36 +0000 Subject: concurrency & dyre fixes for server spec --- server/src/Thermoprint/Server.hs | 33 +++++++++----- server/src/Thermoprint/Server/Fork.hs | 81 +++++++++++++++++++++++++++++++++++ 2 files changed, 103 insertions(+), 11 deletions(-) create mode 100644 server/src/Thermoprint/Server/Fork.hs (limited to 'server/src') diff --git a/server/src/Thermoprint/Server.hs b/server/src/Thermoprint/Server.hs index 678d056..4559414 100644 --- a/server/src/Thermoprint/Server.hs +++ b/server/src/Thermoprint/Server.hs @@ -23,6 +23,9 @@ import qualified Config.Dyre as Dyre import Data.Map (Map) import qualified Data.Map as Map +import Data.Set (Set) +import qualified Data.Set as Set + import Data.Maybe (maybe) import Data.Foldable (mapM_, forM_, foldlM) @@ -34,7 +37,7 @@ import Control.Monad.Reader import Control.Monad.IO.Class import Control.Monad.Morph import Control.Category -import Control.Monad.Catch (MonadMask) +import Control.Monad.Catch (MonadMask(mask), finally) import Prelude hiding (id, (.)) import qualified Control.Monad as M @@ -56,12 +59,16 @@ import Database.Persist.Sql (runMigrationSilent, ConnectionPool, runSqlPool) import Thermoprint.API (thermoprintAPI, PrinterId) +import Thermoprint.Server.Fork + import Thermoprint.Server.Database import Thermoprint.Server.Printer import Thermoprint.Server.Queue import qualified Thermoprint.Server.API as API (thermoprintServer) import Thermoprint.Server.API hiding (thermoprintServer) +import Debug.Trace + -- | Compile-time configuration for 'thermoprintServer' data Config m = Config { dyreError :: Maybe String -- ^ Set by 'Dyre' -- sent to log as an error , warpSettings :: Warp.Settings -- ^ Configure 'Warp's behaviour @@ -107,21 +114,25 @@ thermoprintServer :: ( MonadLoggerIO m , MonadReader ConnectionPool m , MonadResourceBase m , MonadMask m - ) => (m :~> IO) -- ^ 'dyre' controls the base of the monad-transformer-stack ('IO') but we let the user specify much of the rest of it (we handle 'ResourceT' ourselves, since we need it to fork properly). Therefore we require a specification of how to collapse the stack. + ) => Bool -- ^ Invoke 'dyre' to look for and attempt to compile custom configurations (pass 'False' iff testing) + -> (m :~> IO) -- ^ 'dyre' controls the base of the monad-transformer-stack ('IO') but we let the user specify much of the rest of it (we handle 'ResourceT' ourselves, since we need it to fork properly). Therefore we require a specification of how to collapse the stack. -> ResourceT m (Config (ResourceT m)) -> IO () -- ^ Run the server -thermoprintServer io = Dyre.wrapMain $ Dyre.defaultParams +thermoprintServer dyre io = Dyre.wrapMain $ Dyre.defaultParams { Dyre.projectName = "thermoprint-server" , Dyre.realMain = realMain , Dyre.showError = flip (\msg -> fmap (\cfg -> cfg { dyreError = Just msg })) + , Dyre.configCheck = dyre } where realMain cfg = unNat (io . Nat runResourceT) $ do - Config{..} <- cfg - maybe (return ()) ($(logErrorS) "Dyre" . T.pack) dyreError - mapM_ ($(logWarnS) "DB") =<< runSqlPool (runMigrationSilent migrateAll) =<< ask - forM_ printers $ resourceForkIO . runPrinter - let - runQM' (queueManagers -> QMConfig qm nat) printer = unNat nat $ runQM qm printer - Map.foldrWithKey (\k p a -> resourceForkIO (runQM' k p) >> a) (return ()) printers - liftIO . Warp.runSettings warpSettings . serve thermoprintAPI . flip enter API.thermoprintServer =<< handlerNat printers + tMgr <- threadManager resourceForkIO + flip finally (cleanup tMgr) $ do + Config{..} <- cfg + maybe (return ()) ($(logErrorS) "Dyre" . T.pack) dyreError + mapM_ ($(logWarnS) "DB") =<< runSqlPool (runMigrationSilent migrateAll) =<< ask + forM_ printers $ fork tMgr . runPrinter + let + runQM' (queueManagers -> QMConfig qm nat) printer = unNat nat $ runQM qm printer + mapM_ (fork tMgr . uncurry runQM') $ Map.toList printers + liftIO . Warp.runSettings warpSettings . serve thermoprintAPI . flip enter API.thermoprintServer =<< handlerNat printers diff --git a/server/src/Thermoprint/Server/Fork.hs b/server/src/Thermoprint/Server/Fork.hs new file mode 100644 index 0000000..402c1f8 --- /dev/null +++ b/server/src/Thermoprint/Server/Fork.hs @@ -0,0 +1,81 @@ +{-# LANGUAGE FlexibleContexts #-} + +module Thermoprint.Server.Fork + ( ThreadManager + , fork + , cleanup + , threadManager + ) where + +import Control.Monad.Reader.Class +import Control.Monad.Trans.Class +import Control.Monad.Trans.Reader (ReaderT, runReaderT) +import Control.Monad.Catch + +import Control.Monad.IO.Class + +import Control.Monad +import Control.Applicative +import Data.Maybe + +import Data.Foldable + +import Data.Map (Map) +import qualified Data.Map as Map + +import Control.Concurrent +import Control.Concurrent.STM +import Control.Concurrent.STM.TVar (TVar) +import qualified Control.Concurrent.STM.TVar as T +import Control.Concurrent.STM.TSem (TSem) +import qualified Control.Concurrent.STM.TSem as S + +data ThreadManager m = ThreadManager + { fork :: m () -> m ThreadId + , cleanup :: m () + } + +threadManager :: (MonadIO m, MonadMask m) => (m () -> m ThreadId) -> m (ThreadManager m) +threadManager f = do + tVar <- newTVar Map.empty + return ThreadManager + { fork = \act -> do + let + unregisterSelf :: MonadIO m => m () + unregisterSelf = do + tMap <- readTVar tVar + tId <- liftIO $ myThreadId + modifyTVar' tVar $ Map.delete tId + maybeM signalTSem $ Map.lookup tId tMap + + mask $ \unmask -> do + tId <- f (unmask act `finally` unregisterSelf) + modifyTVar' tVar =<< (Map.insert tId <$> newTSem 0) + return tId + , cleanup = liftIO $ + mapM_ (\(tId, s) -> killThread tId >> waitTSem s) . Map.toList =<< readTVar tVar + } + where + atomically' :: MonadIO m => STM a -> m a + atomically' = liftIO . atomically + + newTSem :: MonadIO m => Int -> m TSem + newTSem = atomically' . S.newTSem + + waitTSem :: MonadIO m => TSem -> m () + waitTSem = atomically' . S.waitTSem + + signalTSem :: MonadIO m => TSem -> m () + signalTSem = atomically' . S.signalTSem + + newTVar :: MonadIO m => a -> m (TVar a) + newTVar = atomically' . T.newTVar + + readTVar :: MonadIO m => TVar a -> m a + readTVar = atomically' . T.readTVar + + modifyTVar' :: MonadIO m => TVar a -> (a -> a) -> m () + modifyTVar' t = atomically' . T.modifyTVar t + +maybeM :: Applicative m => (a -> m ()) -> Maybe a -> m () +maybeM = maybe $ pure () -- cgit v1.2.3