diff options
author | Gregor Kleen <gkleen@yggdrasil.li> | 2016-02-17 22:08:36 +0000 |
---|---|---|
committer | Gregor Kleen <gkleen@yggdrasil.li> | 2016-02-17 22:08:36 +0000 |
commit | b5b4b86427286002081f102d1e97baef9162851e (patch) | |
tree | 5d3eeef8abc884931756dd3f9711c355f4a88a4e /server | |
parent | eebc709302833932d7fca95dfc5c8536f8911e69 (diff) | |
download | thermoprint-b5b4b86427286002081f102d1e97baef9162851e.tar thermoprint-b5b4b86427286002081f102d1e97baef9162851e.tar.gz thermoprint-b5b4b86427286002081f102d1e97baef9162851e.tar.bz2 thermoprint-b5b4b86427286002081f102d1e97baef9162851e.tar.xz thermoprint-b5b4b86427286002081f102d1e97baef9162851e.zip |
concurrency & dyre fixes for server spec
Diffstat (limited to 'server')
-rw-r--r-- | server/default-conf/Main.hs | 2 | ||||
-rw-r--r-- | server/src/Thermoprint/Server.hs | 33 | ||||
-rw-r--r-- | server/src/Thermoprint/Server/Fork.hs | 81 | ||||
-rw-r--r-- | server/test/Thermoprint/ServerSpec.hs | 43 | ||||
-rw-r--r-- | server/thermoprint-server.cabal | 1 |
5 files changed, 130 insertions, 30 deletions
diff --git a/server/default-conf/Main.hs b/server/default-conf/Main.hs index 36f6c12..cbfc476 100644 --- a/server/default-conf/Main.hs +++ b/server/default-conf/Main.hs | |||
@@ -14,7 +14,7 @@ import Control.Monad.Reader | |||
14 | import Database.Persist.Sqlite | 14 | import Database.Persist.Sqlite |
15 | 15 | ||
16 | main :: IO () | 16 | main :: IO () |
17 | main = thermoprintServer (Nat runSqlite) $ def `withPrinters` printers | 17 | main = thermoprintServer True (Nat runSqlite) $ def `withPrinters` printers |
18 | where | 18 | where |
19 | runSqlite :: ReaderT ConnectionPool (LoggingT IO) a -> IO a | 19 | runSqlite :: ReaderT ConnectionPool (LoggingT IO) a -> IO a |
20 | runSqlite = runStderrLoggingT . withSqlitePool "thermoprint.sqlite" 1 . runReaderT | 20 | runSqlite = runStderrLoggingT . withSqlitePool "thermoprint.sqlite" 1 . runReaderT |
diff --git a/server/src/Thermoprint/Server.hs b/server/src/Thermoprint/Server.hs index 678d056..4559414 100644 --- a/server/src/Thermoprint/Server.hs +++ b/server/src/Thermoprint/Server.hs | |||
@@ -23,6 +23,9 @@ import qualified Config.Dyre as Dyre | |||
23 | import Data.Map (Map) | 23 | import Data.Map (Map) |
24 | import qualified Data.Map as Map | 24 | import qualified Data.Map as Map |
25 | 25 | ||
26 | import Data.Set (Set) | ||
27 | import qualified Data.Set as Set | ||
28 | |||
26 | import Data.Maybe (maybe) | 29 | import Data.Maybe (maybe) |
27 | import Data.Foldable (mapM_, forM_, foldlM) | 30 | import Data.Foldable (mapM_, forM_, foldlM) |
28 | 31 | ||
@@ -34,7 +37,7 @@ import Control.Monad.Reader | |||
34 | import Control.Monad.IO.Class | 37 | import Control.Monad.IO.Class |
35 | import Control.Monad.Morph | 38 | import Control.Monad.Morph |
36 | import Control.Category | 39 | import Control.Category |
37 | import Control.Monad.Catch (MonadMask) | 40 | import Control.Monad.Catch (MonadMask(mask), finally) |
38 | import Prelude hiding (id, (.)) | 41 | import Prelude hiding (id, (.)) |
39 | 42 | ||
40 | import qualified Control.Monad as M | 43 | import qualified Control.Monad as M |
@@ -56,12 +59,16 @@ import Database.Persist.Sql (runMigrationSilent, ConnectionPool, runSqlPool) | |||
56 | 59 | ||
57 | import Thermoprint.API (thermoprintAPI, PrinterId) | 60 | import Thermoprint.API (thermoprintAPI, PrinterId) |
58 | 61 | ||
62 | import Thermoprint.Server.Fork | ||
63 | |||
59 | import Thermoprint.Server.Database | 64 | import Thermoprint.Server.Database |
60 | import Thermoprint.Server.Printer | 65 | import Thermoprint.Server.Printer |
61 | import Thermoprint.Server.Queue | 66 | import Thermoprint.Server.Queue |
62 | import qualified Thermoprint.Server.API as API (thermoprintServer) | 67 | import qualified Thermoprint.Server.API as API (thermoprintServer) |
63 | import Thermoprint.Server.API hiding (thermoprintServer) | 68 | import Thermoprint.Server.API hiding (thermoprintServer) |
64 | 69 | ||
70 | import Debug.Trace | ||
71 | |||
65 | -- | Compile-time configuration for 'thermoprintServer' | 72 | -- | Compile-time configuration for 'thermoprintServer' |
66 | data Config m = Config { dyreError :: Maybe String -- ^ Set by 'Dyre' -- sent to log as an error | 73 | data Config m = Config { dyreError :: Maybe String -- ^ Set by 'Dyre' -- sent to log as an error |
67 | , warpSettings :: Warp.Settings -- ^ Configure 'Warp's behaviour | 74 | , warpSettings :: Warp.Settings -- ^ Configure 'Warp's behaviour |
@@ -107,21 +114,25 @@ thermoprintServer :: ( MonadLoggerIO m | |||
107 | , MonadReader ConnectionPool m | 114 | , MonadReader ConnectionPool m |
108 | , MonadResourceBase m | 115 | , MonadResourceBase m |
109 | , MonadMask m | 116 | , MonadMask m |
110 | ) => (m :~> IO) -- ^ 'dyre' controls the base of the monad-transformer-stack ('IO') but we let the user specify much of the rest of it (we handle 'ResourceT' ourselves, since we need it to fork properly). Therefore we require a specification of how to collapse the stack. | 117 | ) => Bool -- ^ Invoke 'dyre' to look for and attempt to compile custom configurations (pass 'False' iff testing) |
118 | -> (m :~> IO) -- ^ 'dyre' controls the base of the monad-transformer-stack ('IO') but we let the user specify much of the rest of it (we handle 'ResourceT' ourselves, since we need it to fork properly). Therefore we require a specification of how to collapse the stack. | ||
111 | -> ResourceT m (Config (ResourceT m)) -> IO () | 119 | -> ResourceT m (Config (ResourceT m)) -> IO () |
112 | -- ^ Run the server | 120 | -- ^ Run the server |
113 | thermoprintServer io = Dyre.wrapMain $ Dyre.defaultParams | 121 | thermoprintServer dyre io = Dyre.wrapMain $ Dyre.defaultParams |
114 | { Dyre.projectName = "thermoprint-server" | 122 | { Dyre.projectName = "thermoprint-server" |
115 | , Dyre.realMain = realMain | 123 | , Dyre.realMain = realMain |
116 | , Dyre.showError = flip (\msg -> fmap (\cfg -> cfg { dyreError = Just msg })) | 124 | , Dyre.showError = flip (\msg -> fmap (\cfg -> cfg { dyreError = Just msg })) |
125 | , Dyre.configCheck = dyre | ||
117 | } | 126 | } |
118 | where | 127 | where |
119 | realMain cfg = unNat (io . Nat runResourceT) $ do | 128 | realMain cfg = unNat (io . Nat runResourceT) $ do |
120 | Config{..} <- cfg | 129 | tMgr <- threadManager resourceForkIO |
121 | maybe (return ()) ($(logErrorS) "Dyre" . T.pack) dyreError | 130 | flip finally (cleanup tMgr) $ do |
122 | mapM_ ($(logWarnS) "DB") =<< runSqlPool (runMigrationSilent migrateAll) =<< ask | 131 | Config{..} <- cfg |
123 | forM_ printers $ resourceForkIO . runPrinter | 132 | maybe (return ()) ($(logErrorS) "Dyre" . T.pack) dyreError |
124 | let | 133 | mapM_ ($(logWarnS) "DB") =<< runSqlPool (runMigrationSilent migrateAll) =<< ask |
125 | runQM' (queueManagers -> QMConfig qm nat) printer = unNat nat $ runQM qm printer | 134 | forM_ printers $ fork tMgr . runPrinter |
126 | Map.foldrWithKey (\k p a -> resourceForkIO (runQM' k p) >> a) (return ()) printers | 135 | let |
127 | liftIO . Warp.runSettings warpSettings . serve thermoprintAPI . flip enter API.thermoprintServer =<< handlerNat printers | 136 | runQM' (queueManagers -> QMConfig qm nat) printer = unNat nat $ runQM qm printer |
137 | mapM_ (fork tMgr . uncurry runQM') $ Map.toList printers | ||
138 | liftIO . Warp.runSettings warpSettings . serve thermoprintAPI . flip enter API.thermoprintServer =<< handlerNat printers | ||
diff --git a/server/src/Thermoprint/Server/Fork.hs b/server/src/Thermoprint/Server/Fork.hs new file mode 100644 index 0000000..402c1f8 --- /dev/null +++ b/server/src/Thermoprint/Server/Fork.hs | |||
@@ -0,0 +1,81 @@ | |||
1 | {-# LANGUAGE FlexibleContexts #-} | ||
2 | |||
3 | module Thermoprint.Server.Fork | ||
4 | ( ThreadManager | ||
5 | , fork | ||
6 | , cleanup | ||
7 | , threadManager | ||
8 | ) where | ||
9 | |||
10 | import Control.Monad.Reader.Class | ||
11 | import Control.Monad.Trans.Class | ||
12 | import Control.Monad.Trans.Reader (ReaderT, runReaderT) | ||
13 | import Control.Monad.Catch | ||
14 | |||
15 | import Control.Monad.IO.Class | ||
16 | |||
17 | import Control.Monad | ||
18 | import Control.Applicative | ||
19 | import Data.Maybe | ||
20 | |||
21 | import Data.Foldable | ||
22 | |||
23 | import Data.Map (Map) | ||
24 | import qualified Data.Map as Map | ||
25 | |||
26 | import Control.Concurrent | ||
27 | import Control.Concurrent.STM | ||
28 | import Control.Concurrent.STM.TVar (TVar) | ||
29 | import qualified Control.Concurrent.STM.TVar as T | ||
30 | import Control.Concurrent.STM.TSem (TSem) | ||
31 | import qualified Control.Concurrent.STM.TSem as S | ||
32 | |||
33 | data ThreadManager m = ThreadManager | ||
34 | { fork :: m () -> m ThreadId | ||
35 | , cleanup :: m () | ||
36 | } | ||
37 | |||
38 | threadManager :: (MonadIO m, MonadMask m) => (m () -> m ThreadId) -> m (ThreadManager m) | ||
39 | threadManager f = do | ||
40 | tVar <- newTVar Map.empty | ||
41 | return ThreadManager | ||
42 | { fork = \act -> do | ||
43 | let | ||
44 | unregisterSelf :: MonadIO m => m () | ||
45 | unregisterSelf = do | ||
46 | tMap <- readTVar tVar | ||
47 | tId <- liftIO $ myThreadId | ||
48 | modifyTVar' tVar $ Map.delete tId | ||
49 | maybeM signalTSem $ Map.lookup tId tMap | ||
50 | |||
51 | mask $ \unmask -> do | ||
52 | tId <- f (unmask act `finally` unregisterSelf) | ||
53 | modifyTVar' tVar =<< (Map.insert tId <$> newTSem 0) | ||
54 | return tId | ||
55 | , cleanup = liftIO $ | ||
56 | mapM_ (\(tId, s) -> killThread tId >> waitTSem s) . Map.toList =<< readTVar tVar | ||
57 | } | ||
58 | where | ||
59 | atomically' :: MonadIO m => STM a -> m a | ||
60 | atomically' = liftIO . atomically | ||
61 | |||
62 | newTSem :: MonadIO m => Int -> m TSem | ||
63 | newTSem = atomically' . S.newTSem | ||
64 | |||
65 | waitTSem :: MonadIO m => TSem -> m () | ||
66 | waitTSem = atomically' . S.waitTSem | ||
67 | |||
68 | signalTSem :: MonadIO m => TSem -> m () | ||
69 | signalTSem = atomically' . S.signalTSem | ||
70 | |||
71 | newTVar :: MonadIO m => a -> m (TVar a) | ||
72 | newTVar = atomically' . T.newTVar | ||
73 | |||
74 | readTVar :: MonadIO m => TVar a -> m a | ||
75 | readTVar = atomically' . T.readTVar | ||
76 | |||
77 | modifyTVar' :: MonadIO m => TVar a -> (a -> a) -> m () | ||
78 | modifyTVar' t = atomically' . T.modifyTVar t | ||
79 | |||
80 | maybeM :: Applicative m => (a -> m ()) -> Maybe a -> m () | ||
81 | maybeM = maybe $ pure () | ||
diff --git a/server/test/Thermoprint/ServerSpec.hs b/server/test/Thermoprint/ServerSpec.hs index 0d698f0..fe06a05 100644 --- a/server/test/Thermoprint/ServerSpec.hs +++ b/server/test/Thermoprint/ServerSpec.hs | |||
@@ -3,27 +3,29 @@ | |||
3 | 3 | ||
4 | module Thermoprint.ServerSpec (spec) where | 4 | module Thermoprint.ServerSpec (spec) where |
5 | 5 | ||
6 | import Test.Hspec | 6 | import Test.Hspec |
7 | 7 | ||
8 | import Thermoprint.API | 8 | import Thermoprint.API |
9 | import Thermoprint.Server | 9 | import Thermoprint.Server |
10 | 10 | ||
11 | import Control.Monad | 11 | import Control.Monad |
12 | import Control.Monad.Logger | 12 | import Control.Monad.Logger |
13 | import Control.Monad.Reader | 13 | import Control.Monad.Reader |
14 | import Control.Monad.Trans.Identity | 14 | import Control.Monad.Trans.Identity |
15 | import Control.Monad.Trans.Resource | 15 | import Control.Monad.Trans.Resource |
16 | 16 | ||
17 | import Database.Persist.Sqlite | 17 | import Database.Persist.Sqlite |
18 | 18 | ||
19 | import Control.Concurrent | 19 | import Control.Concurrent |
20 | import Control.Concurrent.STM | 20 | import Control.Concurrent.STM |
21 | 21 | ||
22 | import System.IO | 22 | import System.IO |
23 | import System.IO.Temp | 23 | import System.IO.Temp |
24 | 24 | ||
25 | import qualified Data.Text as T | 25 | import qualified Data.Text as T |
26 | 26 | ||
27 | import Debug.Trace | ||
28 | |||
27 | data TestPrinter = TestPrinter | 29 | data TestPrinter = TestPrinter |
28 | { outputChan :: TChan Printout | 30 | { outputChan :: TChan Printout |
29 | , failSwitch :: TMVar PrintingError | 31 | , failSwitch :: TMVar PrintingError |
@@ -33,13 +35,14 @@ data TestManager = TestManager | |||
33 | { manage :: TMVar (QueueManager IdentityT) | 35 | { manage :: TMVar (QueueManager IdentityT) |
34 | } | 36 | } |
35 | 37 | ||
36 | setup :: IO (ThreadId, TestPrinter, TestManager) | 38 | setup :: IO (ThreadId, QSem, TestPrinter, TestManager) |
37 | setup = withSystemTempFile "thermoprint.sqlite" $ \fp h -> hClose h >> do | 39 | setup = withSystemTempFile "thermoprint.sqlite" $ \fp h -> hClose h >> do |
38 | tPrinter <- TestPrinter <$> newTChanIO <*> newEmptyTMVarIO | 40 | tPrinter <- TestPrinter <$> newTChanIO <*> newEmptyTMVarIO |
39 | tManager <- TestManager <$> newEmptyTMVarIO | 41 | tManager <- TestManager <$> newEmptyTMVarIO |
42 | termSem <- newQSem 0 | ||
40 | let | 43 | let |
41 | runSqlite :: ReaderT ConnectionPool (LoggingT IO) a -> IO a | 44 | runSqlite :: ReaderT ConnectionPool (LoggingT IO) a -> IO a |
42 | runSqlite = runStderrLoggingT . withSqlitePool (T.pack fp) 1 . runReaderT | 45 | runSqlite = runNoLoggingT . withSqlitePool (T.pack fp) 1 . runReaderT |
43 | 46 | ||
44 | printers = [ ( pure $ PM tPM | 47 | printers = [ ( pure $ PM tPM |
45 | , QMConfig (join . lift $ takeTMVar (manage tManager)) (Nat $ liftIO . runIdentityT) | 48 | , QMConfig (join . lift $ takeTMVar (manage tManager)) (Nat $ liftIO . runIdentityT) |
@@ -48,12 +51,16 @@ setup = withSystemTempFile "thermoprint.sqlite" $ \fp h -> hClose h >> do | |||
48 | 51 | ||
49 | tPM :: MonadIO m => Printout -> m (Maybe PrintingError) | 52 | tPM :: MonadIO m => Printout -> m (Maybe PrintingError) |
50 | tPM printout = liftIO . atomically $ writeTChan (outputChan tPrinter) printout >> tryTakeTMVar (failSwitch tPrinter) | 53 | tPM printout = liftIO . atomically $ writeTChan (outputChan tPrinter) printout >> tryTakeTMVar (failSwitch tPrinter) |
51 | (,,) <$> forkIO (thermoprintServer (Nat runSqlite) $ def `withPrinters` printers) <*> pure tPrinter <*> pure tManager | 54 | (,,,) <$> forkFinally (thermoprintServer False (Nat runSqlite) $ def `withPrinters` printers) (const $ signalQSem termSem) <*> pure termSem <*> pure tPrinter <*> pure tManager |
55 | |||
56 | withSetup :: SpecWith (ThreadId, QSem, TestPrinter, TestManager) -> Spec | ||
57 | withSetup = beforeAll setup . afterAll (\(tId, termSem, _, _) -> killThread tId >> waitQSem termSem) | ||
52 | 58 | ||
53 | spec :: Spec | 59 | spec :: Spec |
54 | spec = beforeAll setup $ do | 60 | spec = withSetup $ do |
55 | describe "blubTests" $ do | 61 | describe "blubTests" $ do |
56 | it "prints Blub." $ \(tId, _, _) -> do | 62 | it "prints Blub." $ \(tId, _, _, _) -> do |
63 | threadDelay 5000 | ||
57 | putStrLn "Blub." | 64 | putStrLn "Blub." |
58 | System.IO.print tId | 65 | System.IO.print tId |
59 | True `shouldSatisfy` id | 66 | True `shouldSatisfy` id |
diff --git a/server/thermoprint-server.cabal b/server/thermoprint-server.cabal index bfd5b9b..cfef947 100644 --- a/server/thermoprint-server.cabal +++ b/server/thermoprint-server.cabal | |||
@@ -18,6 +18,7 @@ cabal-version: >=1.10 | |||
18 | 18 | ||
19 | library | 19 | library |
20 | exposed-modules: Thermoprint.Server | 20 | exposed-modules: Thermoprint.Server |
21 | , Thermoprint.Server.Fork | ||
21 | , Thermoprint.Server.Database | 22 | , Thermoprint.Server.Database |
22 | , Thermoprint.Server.API | 23 | , Thermoprint.Server.API |
23 | , Thermoprint.Server.Queue | 24 | , Thermoprint.Server.Queue |