From b5b4b86427286002081f102d1e97baef9162851e Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Wed, 17 Feb 2016 22:08:36 +0000 Subject: concurrency & dyre fixes for server spec --- server/default-conf/Main.hs | 2 +- server/src/Thermoprint/Server.hs | 33 +++++++++----- server/src/Thermoprint/Server/Fork.hs | 81 +++++++++++++++++++++++++++++++++++ server/test/Thermoprint/ServerSpec.hs | 43 +++++++++++-------- server/thermoprint-server.cabal | 1 + 5 files changed, 130 insertions(+), 30 deletions(-) create mode 100644 server/src/Thermoprint/Server/Fork.hs diff --git a/server/default-conf/Main.hs b/server/default-conf/Main.hs index 36f6c12..cbfc476 100644 --- a/server/default-conf/Main.hs +++ b/server/default-conf/Main.hs @@ -14,7 +14,7 @@ import Control.Monad.Reader import Database.Persist.Sqlite main :: IO () -main = thermoprintServer (Nat runSqlite) $ def `withPrinters` printers +main = thermoprintServer True (Nat runSqlite) $ def `withPrinters` printers where runSqlite :: ReaderT ConnectionPool (LoggingT IO) a -> IO a runSqlite = runStderrLoggingT . withSqlitePool "thermoprint.sqlite" 1 . runReaderT diff --git a/server/src/Thermoprint/Server.hs b/server/src/Thermoprint/Server.hs index 678d056..4559414 100644 --- a/server/src/Thermoprint/Server.hs +++ b/server/src/Thermoprint/Server.hs @@ -23,6 +23,9 @@ import qualified Config.Dyre as Dyre import Data.Map (Map) import qualified Data.Map as Map +import Data.Set (Set) +import qualified Data.Set as Set + import Data.Maybe (maybe) import Data.Foldable (mapM_, forM_, foldlM) @@ -34,7 +37,7 @@ import Control.Monad.Reader import Control.Monad.IO.Class import Control.Monad.Morph import Control.Category -import Control.Monad.Catch (MonadMask) +import Control.Monad.Catch (MonadMask(mask), finally) import Prelude hiding (id, (.)) import qualified Control.Monad as M @@ -56,12 +59,16 @@ import Database.Persist.Sql (runMigrationSilent, ConnectionPool, runSqlPool) import Thermoprint.API (thermoprintAPI, PrinterId) +import Thermoprint.Server.Fork + import Thermoprint.Server.Database import Thermoprint.Server.Printer import Thermoprint.Server.Queue import qualified Thermoprint.Server.API as API (thermoprintServer) import Thermoprint.Server.API hiding (thermoprintServer) +import Debug.Trace + -- | Compile-time configuration for 'thermoprintServer' data Config m = Config { dyreError :: Maybe String -- ^ Set by 'Dyre' -- sent to log as an error , warpSettings :: Warp.Settings -- ^ Configure 'Warp's behaviour @@ -107,21 +114,25 @@ thermoprintServer :: ( MonadLoggerIO m , MonadReader ConnectionPool m , MonadResourceBase m , MonadMask m - ) => (m :~> IO) -- ^ 'dyre' controls the base of the monad-transformer-stack ('IO') but we let the user specify much of the rest of it (we handle 'ResourceT' ourselves, since we need it to fork properly). Therefore we require a specification of how to collapse the stack. + ) => Bool -- ^ Invoke 'dyre' to look for and attempt to compile custom configurations (pass 'False' iff testing) + -> (m :~> IO) -- ^ 'dyre' controls the base of the monad-transformer-stack ('IO') but we let the user specify much of the rest of it (we handle 'ResourceT' ourselves, since we need it to fork properly). Therefore we require a specification of how to collapse the stack. -> ResourceT m (Config (ResourceT m)) -> IO () -- ^ Run the server -thermoprintServer io = Dyre.wrapMain $ Dyre.defaultParams +thermoprintServer dyre io = Dyre.wrapMain $ Dyre.defaultParams { Dyre.projectName = "thermoprint-server" , Dyre.realMain = realMain , Dyre.showError = flip (\msg -> fmap (\cfg -> cfg { dyreError = Just msg })) + , Dyre.configCheck = dyre } where realMain cfg = unNat (io . Nat runResourceT) $ do - Config{..} <- cfg - maybe (return ()) ($(logErrorS) "Dyre" . T.pack) dyreError - mapM_ ($(logWarnS) "DB") =<< runSqlPool (runMigrationSilent migrateAll) =<< ask - forM_ printers $ resourceForkIO . runPrinter - let - runQM' (queueManagers -> QMConfig qm nat) printer = unNat nat $ runQM qm printer - Map.foldrWithKey (\k p a -> resourceForkIO (runQM' k p) >> a) (return ()) printers - liftIO . Warp.runSettings warpSettings . serve thermoprintAPI . flip enter API.thermoprintServer =<< handlerNat printers + tMgr <- threadManager resourceForkIO + flip finally (cleanup tMgr) $ do + Config{..} <- cfg + maybe (return ()) ($(logErrorS) "Dyre" . T.pack) dyreError + mapM_ ($(logWarnS) "DB") =<< runSqlPool (runMigrationSilent migrateAll) =<< ask + forM_ printers $ fork tMgr . runPrinter + let + runQM' (queueManagers -> QMConfig qm nat) printer = unNat nat $ runQM qm printer + mapM_ (fork tMgr . uncurry runQM') $ Map.toList printers + liftIO . Warp.runSettings warpSettings . serve thermoprintAPI . flip enter API.thermoprintServer =<< handlerNat printers diff --git a/server/src/Thermoprint/Server/Fork.hs b/server/src/Thermoprint/Server/Fork.hs new file mode 100644 index 0000000..402c1f8 --- /dev/null +++ b/server/src/Thermoprint/Server/Fork.hs @@ -0,0 +1,81 @@ +{-# LANGUAGE FlexibleContexts #-} + +module Thermoprint.Server.Fork + ( ThreadManager + , fork + , cleanup + , threadManager + ) where + +import Control.Monad.Reader.Class +import Control.Monad.Trans.Class +import Control.Monad.Trans.Reader (ReaderT, runReaderT) +import Control.Monad.Catch + +import Control.Monad.IO.Class + +import Control.Monad +import Control.Applicative +import Data.Maybe + +import Data.Foldable + +import Data.Map (Map) +import qualified Data.Map as Map + +import Control.Concurrent +import Control.Concurrent.STM +import Control.Concurrent.STM.TVar (TVar) +import qualified Control.Concurrent.STM.TVar as T +import Control.Concurrent.STM.TSem (TSem) +import qualified Control.Concurrent.STM.TSem as S + +data ThreadManager m = ThreadManager + { fork :: m () -> m ThreadId + , cleanup :: m () + } + +threadManager :: (MonadIO m, MonadMask m) => (m () -> m ThreadId) -> m (ThreadManager m) +threadManager f = do + tVar <- newTVar Map.empty + return ThreadManager + { fork = \act -> do + let + unregisterSelf :: MonadIO m => m () + unregisterSelf = do + tMap <- readTVar tVar + tId <- liftIO $ myThreadId + modifyTVar' tVar $ Map.delete tId + maybeM signalTSem $ Map.lookup tId tMap + + mask $ \unmask -> do + tId <- f (unmask act `finally` unregisterSelf) + modifyTVar' tVar =<< (Map.insert tId <$> newTSem 0) + return tId + , cleanup = liftIO $ + mapM_ (\(tId, s) -> killThread tId >> waitTSem s) . Map.toList =<< readTVar tVar + } + where + atomically' :: MonadIO m => STM a -> m a + atomically' = liftIO . atomically + + newTSem :: MonadIO m => Int -> m TSem + newTSem = atomically' . S.newTSem + + waitTSem :: MonadIO m => TSem -> m () + waitTSem = atomically' . S.waitTSem + + signalTSem :: MonadIO m => TSem -> m () + signalTSem = atomically' . S.signalTSem + + newTVar :: MonadIO m => a -> m (TVar a) + newTVar = atomically' . T.newTVar + + readTVar :: MonadIO m => TVar a -> m a + readTVar = atomically' . T.readTVar + + modifyTVar' :: MonadIO m => TVar a -> (a -> a) -> m () + modifyTVar' t = atomically' . T.modifyTVar t + +maybeM :: Applicative m => (a -> m ()) -> Maybe a -> m () +maybeM = maybe $ pure () diff --git a/server/test/Thermoprint/ServerSpec.hs b/server/test/Thermoprint/ServerSpec.hs index 0d698f0..fe06a05 100644 --- a/server/test/Thermoprint/ServerSpec.hs +++ b/server/test/Thermoprint/ServerSpec.hs @@ -3,27 +3,29 @@ module Thermoprint.ServerSpec (spec) where -import Test.Hspec +import Test.Hspec -import Thermoprint.API -import Thermoprint.Server +import Thermoprint.API +import Thermoprint.Server -import Control.Monad -import Control.Monad.Logger -import Control.Monad.Reader -import Control.Monad.Trans.Identity -import Control.Monad.Trans.Resource +import Control.Monad +import Control.Monad.Logger +import Control.Monad.Reader +import Control.Monad.Trans.Identity +import Control.Monad.Trans.Resource -import Database.Persist.Sqlite +import Database.Persist.Sqlite -import Control.Concurrent -import Control.Concurrent.STM +import Control.Concurrent +import Control.Concurrent.STM -import System.IO -import System.IO.Temp +import System.IO +import System.IO.Temp import qualified Data.Text as T +import Debug.Trace + data TestPrinter = TestPrinter { outputChan :: TChan Printout , failSwitch :: TMVar PrintingError @@ -33,13 +35,14 @@ data TestManager = TestManager { manage :: TMVar (QueueManager IdentityT) } -setup :: IO (ThreadId, TestPrinter, TestManager) +setup :: IO (ThreadId, QSem, TestPrinter, TestManager) setup = withSystemTempFile "thermoprint.sqlite" $ \fp h -> hClose h >> do tPrinter <- TestPrinter <$> newTChanIO <*> newEmptyTMVarIO tManager <- TestManager <$> newEmptyTMVarIO + termSem <- newQSem 0 let runSqlite :: ReaderT ConnectionPool (LoggingT IO) a -> IO a - runSqlite = runStderrLoggingT . withSqlitePool (T.pack fp) 1 . runReaderT + runSqlite = runNoLoggingT . withSqlitePool (T.pack fp) 1 . runReaderT printers = [ ( pure $ PM tPM , QMConfig (join . lift $ takeTMVar (manage tManager)) (Nat $ liftIO . runIdentityT) @@ -48,12 +51,16 @@ setup = withSystemTempFile "thermoprint.sqlite" $ \fp h -> hClose h >> do tPM :: MonadIO m => Printout -> m (Maybe PrintingError) tPM printout = liftIO . atomically $ writeTChan (outputChan tPrinter) printout >> tryTakeTMVar (failSwitch tPrinter) - (,,) <$> forkIO (thermoprintServer (Nat runSqlite) $ def `withPrinters` printers) <*> pure tPrinter <*> pure tManager + (,,,) <$> forkFinally (thermoprintServer False (Nat runSqlite) $ def `withPrinters` printers) (const $ signalQSem termSem) <*> pure termSem <*> pure tPrinter <*> pure tManager + +withSetup :: SpecWith (ThreadId, QSem, TestPrinter, TestManager) -> Spec +withSetup = beforeAll setup . afterAll (\(tId, termSem, _, _) -> killThread tId >> waitQSem termSem) spec :: Spec -spec = beforeAll setup $ do +spec = withSetup $ do describe "blubTests" $ do - it "prints Blub." $ \(tId, _, _) -> do + it "prints Blub." $ \(tId, _, _, _) -> do + threadDelay 5000 putStrLn "Blub." System.IO.print tId True `shouldSatisfy` id diff --git a/server/thermoprint-server.cabal b/server/thermoprint-server.cabal index bfd5b9b..cfef947 100644 --- a/server/thermoprint-server.cabal +++ b/server/thermoprint-server.cabal @@ -18,6 +18,7 @@ cabal-version: >=1.10 library exposed-modules: Thermoprint.Server + , Thermoprint.Server.Fork , Thermoprint.Server.Database , Thermoprint.Server.API , Thermoprint.Server.Queue -- cgit v1.2.3