aboutsummaryrefslogtreecommitdiff
path: root/server/src/Thermoprint
diff options
context:
space:
mode:
authorGregor Kleen <gkleen@yggdrasil.li>2016-02-17 22:08:36 +0000
committerGregor Kleen <gkleen@yggdrasil.li>2016-02-17 22:08:36 +0000
commitb5b4b86427286002081f102d1e97baef9162851e (patch)
tree5d3eeef8abc884931756dd3f9711c355f4a88a4e /server/src/Thermoprint
parenteebc709302833932d7fca95dfc5c8536f8911e69 (diff)
downloadthermoprint-b5b4b86427286002081f102d1e97baef9162851e.tar
thermoprint-b5b4b86427286002081f102d1e97baef9162851e.tar.gz
thermoprint-b5b4b86427286002081f102d1e97baef9162851e.tar.bz2
thermoprint-b5b4b86427286002081f102d1e97baef9162851e.tar.xz
thermoprint-b5b4b86427286002081f102d1e97baef9162851e.zip
concurrency & dyre fixes for server spec
Diffstat (limited to 'server/src/Thermoprint')
-rw-r--r--server/src/Thermoprint/Server.hs33
-rw-r--r--server/src/Thermoprint/Server/Fork.hs81
2 files changed, 103 insertions, 11 deletions
diff --git a/server/src/Thermoprint/Server.hs b/server/src/Thermoprint/Server.hs
index 678d056..4559414 100644
--- a/server/src/Thermoprint/Server.hs
+++ b/server/src/Thermoprint/Server.hs
@@ -23,6 +23,9 @@ import qualified Config.Dyre as Dyre
23import Data.Map (Map) 23import Data.Map (Map)
24import qualified Data.Map as Map 24import qualified Data.Map as Map
25 25
26import Data.Set (Set)
27import qualified Data.Set as Set
28
26import Data.Maybe (maybe) 29import Data.Maybe (maybe)
27import Data.Foldable (mapM_, forM_, foldlM) 30import Data.Foldable (mapM_, forM_, foldlM)
28 31
@@ -34,7 +37,7 @@ import Control.Monad.Reader
34import Control.Monad.IO.Class 37import Control.Monad.IO.Class
35import Control.Monad.Morph 38import Control.Monad.Morph
36import Control.Category 39import Control.Category
37import Control.Monad.Catch (MonadMask) 40import Control.Monad.Catch (MonadMask(mask), finally)
38import Prelude hiding (id, (.)) 41import Prelude hiding (id, (.))
39 42
40import qualified Control.Monad as M 43import qualified Control.Monad as M
@@ -56,12 +59,16 @@ import Database.Persist.Sql (runMigrationSilent, ConnectionPool, runSqlPool)
56 59
57import Thermoprint.API (thermoprintAPI, PrinterId) 60import Thermoprint.API (thermoprintAPI, PrinterId)
58 61
62import Thermoprint.Server.Fork
63
59import Thermoprint.Server.Database 64import Thermoprint.Server.Database
60import Thermoprint.Server.Printer 65import Thermoprint.Server.Printer
61import Thermoprint.Server.Queue 66import Thermoprint.Server.Queue
62import qualified Thermoprint.Server.API as API (thermoprintServer) 67import qualified Thermoprint.Server.API as API (thermoprintServer)
63import Thermoprint.Server.API hiding (thermoprintServer) 68import Thermoprint.Server.API hiding (thermoprintServer)
64 69
70import Debug.Trace
71
65-- | Compile-time configuration for 'thermoprintServer' 72-- | Compile-time configuration for 'thermoprintServer'
66data Config m = Config { dyreError :: Maybe String -- ^ Set by 'Dyre' -- sent to log as an error 73data Config m = Config { dyreError :: Maybe String -- ^ Set by 'Dyre' -- sent to log as an error
67 , warpSettings :: Warp.Settings -- ^ Configure 'Warp's behaviour 74 , warpSettings :: Warp.Settings -- ^ Configure 'Warp's behaviour
@@ -107,21 +114,25 @@ thermoprintServer :: ( MonadLoggerIO m
107 , MonadReader ConnectionPool m 114 , MonadReader ConnectionPool m
108 , MonadResourceBase m 115 , MonadResourceBase m
109 , MonadMask m 116 , MonadMask m
110 ) => (m :~> IO) -- ^ 'dyre' controls the base of the monad-transformer-stack ('IO') but we let the user specify much of the rest of it (we handle 'ResourceT' ourselves, since we need it to fork properly). Therefore we require a specification of how to collapse the stack. 117 ) => Bool -- ^ Invoke 'dyre' to look for and attempt to compile custom configurations (pass 'False' iff testing)
118 -> (m :~> IO) -- ^ 'dyre' controls the base of the monad-transformer-stack ('IO') but we let the user specify much of the rest of it (we handle 'ResourceT' ourselves, since we need it to fork properly). Therefore we require a specification of how to collapse the stack.
111 -> ResourceT m (Config (ResourceT m)) -> IO () 119 -> ResourceT m (Config (ResourceT m)) -> IO ()
112-- ^ Run the server 120-- ^ Run the server
113thermoprintServer io = Dyre.wrapMain $ Dyre.defaultParams 121thermoprintServer dyre io = Dyre.wrapMain $ Dyre.defaultParams
114 { Dyre.projectName = "thermoprint-server" 122 { Dyre.projectName = "thermoprint-server"
115 , Dyre.realMain = realMain 123 , Dyre.realMain = realMain
116 , Dyre.showError = flip (\msg -> fmap (\cfg -> cfg { dyreError = Just msg })) 124 , Dyre.showError = flip (\msg -> fmap (\cfg -> cfg { dyreError = Just msg }))
125 , Dyre.configCheck = dyre
117 } 126 }
118 where 127 where
119 realMain cfg = unNat (io . Nat runResourceT) $ do 128 realMain cfg = unNat (io . Nat runResourceT) $ do
120 Config{..} <- cfg 129 tMgr <- threadManager resourceForkIO
121 maybe (return ()) ($(logErrorS) "Dyre" . T.pack) dyreError 130 flip finally (cleanup tMgr) $ do
122 mapM_ ($(logWarnS) "DB") =<< runSqlPool (runMigrationSilent migrateAll) =<< ask 131 Config{..} <- cfg
123 forM_ printers $ resourceForkIO . runPrinter 132 maybe (return ()) ($(logErrorS) "Dyre" . T.pack) dyreError
124 let 133 mapM_ ($(logWarnS) "DB") =<< runSqlPool (runMigrationSilent migrateAll) =<< ask
125 runQM' (queueManagers -> QMConfig qm nat) printer = unNat nat $ runQM qm printer 134 forM_ printers $ fork tMgr . runPrinter
126 Map.foldrWithKey (\k p a -> resourceForkIO (runQM' k p) >> a) (return ()) printers 135 let
127 liftIO . Warp.runSettings warpSettings . serve thermoprintAPI . flip enter API.thermoprintServer =<< handlerNat printers 136 runQM' (queueManagers -> QMConfig qm nat) printer = unNat nat $ runQM qm printer
137 mapM_ (fork tMgr . uncurry runQM') $ Map.toList printers
138 liftIO . Warp.runSettings warpSettings . serve thermoprintAPI . flip enter API.thermoprintServer =<< handlerNat printers
diff --git a/server/src/Thermoprint/Server/Fork.hs b/server/src/Thermoprint/Server/Fork.hs
new file mode 100644
index 0000000..402c1f8
--- /dev/null
+++ b/server/src/Thermoprint/Server/Fork.hs
@@ -0,0 +1,81 @@
1{-# LANGUAGE FlexibleContexts #-}
2
3module Thermoprint.Server.Fork
4 ( ThreadManager
5 , fork
6 , cleanup
7 , threadManager
8 ) where
9
10import Control.Monad.Reader.Class
11import Control.Monad.Trans.Class
12import Control.Monad.Trans.Reader (ReaderT, runReaderT)
13import Control.Monad.Catch
14
15import Control.Monad.IO.Class
16
17import Control.Monad
18import Control.Applicative
19import Data.Maybe
20
21import Data.Foldable
22
23import Data.Map (Map)
24import qualified Data.Map as Map
25
26import Control.Concurrent
27import Control.Concurrent.STM
28import Control.Concurrent.STM.TVar (TVar)
29import qualified Control.Concurrent.STM.TVar as T
30import Control.Concurrent.STM.TSem (TSem)
31import qualified Control.Concurrent.STM.TSem as S
32
33data ThreadManager m = ThreadManager
34 { fork :: m () -> m ThreadId
35 , cleanup :: m ()
36 }
37
38threadManager :: (MonadIO m, MonadMask m) => (m () -> m ThreadId) -> m (ThreadManager m)
39threadManager f = do
40 tVar <- newTVar Map.empty
41 return ThreadManager
42 { fork = \act -> do
43 let
44 unregisterSelf :: MonadIO m => m ()
45 unregisterSelf = do
46 tMap <- readTVar tVar
47 tId <- liftIO $ myThreadId
48 modifyTVar' tVar $ Map.delete tId
49 maybeM signalTSem $ Map.lookup tId tMap
50
51 mask $ \unmask -> do
52 tId <- f (unmask act `finally` unregisterSelf)
53 modifyTVar' tVar =<< (Map.insert tId <$> newTSem 0)
54 return tId
55 , cleanup = liftIO $
56 mapM_ (\(tId, s) -> killThread tId >> waitTSem s) . Map.toList =<< readTVar tVar
57 }
58 where
59 atomically' :: MonadIO m => STM a -> m a
60 atomically' = liftIO . atomically
61
62 newTSem :: MonadIO m => Int -> m TSem
63 newTSem = atomically' . S.newTSem
64
65 waitTSem :: MonadIO m => TSem -> m ()
66 waitTSem = atomically' . S.waitTSem
67
68 signalTSem :: MonadIO m => TSem -> m ()
69 signalTSem = atomically' . S.signalTSem
70
71 newTVar :: MonadIO m => a -> m (TVar a)
72 newTVar = atomically' . T.newTVar
73
74 readTVar :: MonadIO m => TVar a -> m a
75 readTVar = atomically' . T.readTVar
76
77 modifyTVar' :: MonadIO m => TVar a -> (a -> a) -> m ()
78 modifyTVar' t = atomically' . T.modifyTVar t
79
80maybeM :: Applicative m => (a -> m ()) -> Maybe a -> m ()
81maybeM = maybe $ pure ()