diff options
Diffstat (limited to 'server/src/Thermoprint')
| -rw-r--r-- | server/src/Thermoprint/Server.hs | 33 | ||||
| -rw-r--r-- | server/src/Thermoprint/Server/Fork.hs | 81 | 
2 files changed, 103 insertions, 11 deletions
| diff --git a/server/src/Thermoprint/Server.hs b/server/src/Thermoprint/Server.hs index 678d056..4559414 100644 --- a/server/src/Thermoprint/Server.hs +++ b/server/src/Thermoprint/Server.hs | |||
| @@ -23,6 +23,9 @@ import qualified Config.Dyre as Dyre | |||
| 23 | import Data.Map (Map) | 23 | import Data.Map (Map) | 
| 24 | import qualified Data.Map as Map | 24 | import qualified Data.Map as Map | 
| 25 | 25 | ||
| 26 | import Data.Set (Set) | ||
| 27 | import qualified Data.Set as Set | ||
| 28 | |||
| 26 | import Data.Maybe (maybe) | 29 | import Data.Maybe (maybe) | 
| 27 | import Data.Foldable (mapM_, forM_, foldlM) | 30 | import Data.Foldable (mapM_, forM_, foldlM) | 
| 28 | 31 | ||
| @@ -34,7 +37,7 @@ import Control.Monad.Reader | |||
| 34 | import Control.Monad.IO.Class | 37 | import Control.Monad.IO.Class | 
| 35 | import Control.Monad.Morph | 38 | import Control.Monad.Morph | 
| 36 | import Control.Category | 39 | import Control.Category | 
| 37 | import Control.Monad.Catch (MonadMask) | 40 | import Control.Monad.Catch (MonadMask(mask), finally) | 
| 38 | import Prelude hiding (id, (.)) | 41 | import Prelude hiding (id, (.)) | 
| 39 | 42 | ||
| 40 | import qualified Control.Monad as M | 43 | import qualified Control.Monad as M | 
| @@ -56,12 +59,16 @@ import Database.Persist.Sql (runMigrationSilent, ConnectionPool, runSqlPool) | |||
| 56 | 59 | ||
| 57 | import Thermoprint.API (thermoprintAPI, PrinterId) | 60 | import Thermoprint.API (thermoprintAPI, PrinterId) | 
| 58 | 61 | ||
| 62 | import Thermoprint.Server.Fork | ||
| 63 | |||
| 59 | import Thermoprint.Server.Database | 64 | import Thermoprint.Server.Database | 
| 60 | import Thermoprint.Server.Printer | 65 | import Thermoprint.Server.Printer | 
| 61 | import Thermoprint.Server.Queue | 66 | import Thermoprint.Server.Queue | 
| 62 | import qualified Thermoprint.Server.API as API (thermoprintServer) | 67 | import qualified Thermoprint.Server.API as API (thermoprintServer) | 
| 63 | import Thermoprint.Server.API hiding (thermoprintServer) | 68 | import Thermoprint.Server.API hiding (thermoprintServer) | 
| 64 | 69 | ||
| 70 | import Debug.Trace | ||
| 71 | |||
| 65 | -- | Compile-time configuration for 'thermoprintServer' | 72 | -- | Compile-time configuration for 'thermoprintServer' | 
| 66 | data Config m = Config { dyreError :: Maybe String -- ^ Set by 'Dyre' -- sent to log as an error | 73 | data Config m = Config { dyreError :: Maybe String -- ^ Set by 'Dyre' -- sent to log as an error | 
| 67 | , warpSettings :: Warp.Settings -- ^ Configure 'Warp's behaviour | 74 | , warpSettings :: Warp.Settings -- ^ Configure 'Warp's behaviour | 
| @@ -107,21 +114,25 @@ thermoprintServer :: ( MonadLoggerIO m | |||
| 107 | , MonadReader ConnectionPool m | 114 | , MonadReader ConnectionPool m | 
| 108 | , MonadResourceBase m | 115 | , MonadResourceBase m | 
| 109 | , MonadMask m | 116 | , MonadMask m | 
| 110 | ) => (m :~> IO) -- ^ 'dyre' controls the base of the monad-transformer-stack ('IO') but we let the user specify much of the rest of it (we handle 'ResourceT' ourselves, since we need it to fork properly). Therefore we require a specification of how to collapse the stack. | 117 | ) => Bool -- ^ Invoke 'dyre' to look for and attempt to compile custom configurations (pass 'False' iff testing) | 
| 118 | -> (m :~> IO) -- ^ 'dyre' controls the base of the monad-transformer-stack ('IO') but we let the user specify much of the rest of it (we handle 'ResourceT' ourselves, since we need it to fork properly). Therefore we require a specification of how to collapse the stack. | ||
| 111 | -> ResourceT m (Config (ResourceT m)) -> IO () | 119 | -> ResourceT m (Config (ResourceT m)) -> IO () | 
| 112 | -- ^ Run the server | 120 | -- ^ Run the server | 
| 113 | thermoprintServer io = Dyre.wrapMain $ Dyre.defaultParams | 121 | thermoprintServer dyre io = Dyre.wrapMain $ Dyre.defaultParams | 
| 114 | { Dyre.projectName = "thermoprint-server" | 122 | { Dyre.projectName = "thermoprint-server" | 
| 115 | , Dyre.realMain = realMain | 123 | , Dyre.realMain = realMain | 
| 116 | , Dyre.showError = flip (\msg -> fmap (\cfg -> cfg { dyreError = Just msg })) | 124 | , Dyre.showError = flip (\msg -> fmap (\cfg -> cfg { dyreError = Just msg })) | 
| 125 | , Dyre.configCheck = dyre | ||
| 117 | } | 126 | } | 
| 118 | where | 127 | where | 
| 119 | realMain cfg = unNat (io . Nat runResourceT) $ do | 128 | realMain cfg = unNat (io . Nat runResourceT) $ do | 
| 120 | Config{..} <- cfg | 129 | tMgr <- threadManager resourceForkIO | 
| 121 | maybe (return ()) ($(logErrorS) "Dyre" . T.pack) dyreError | 130 | flip finally (cleanup tMgr) $ do | 
| 122 | mapM_ ($(logWarnS) "DB") =<< runSqlPool (runMigrationSilent migrateAll) =<< ask | 131 | Config{..} <- cfg | 
| 123 | forM_ printers $ resourceForkIO . runPrinter | 132 | maybe (return ()) ($(logErrorS) "Dyre" . T.pack) dyreError | 
| 124 | let | 133 | mapM_ ($(logWarnS) "DB") =<< runSqlPool (runMigrationSilent migrateAll) =<< ask | 
| 125 | runQM' (queueManagers -> QMConfig qm nat) printer = unNat nat $ runQM qm printer | 134 | forM_ printers $ fork tMgr . runPrinter | 
| 126 | Map.foldrWithKey (\k p a -> resourceForkIO (runQM' k p) >> a) (return ()) printers | 135 | let | 
| 127 | liftIO . Warp.runSettings warpSettings . serve thermoprintAPI . flip enter API.thermoprintServer =<< handlerNat printers | 136 | runQM' (queueManagers -> QMConfig qm nat) printer = unNat nat $ runQM qm printer | 
| 137 | mapM_ (fork tMgr . uncurry runQM') $ Map.toList printers | ||
| 138 | liftIO . Warp.runSettings warpSettings . serve thermoprintAPI . flip enter API.thermoprintServer =<< handlerNat printers | ||
| diff --git a/server/src/Thermoprint/Server/Fork.hs b/server/src/Thermoprint/Server/Fork.hs new file mode 100644 index 0000000..402c1f8 --- /dev/null +++ b/server/src/Thermoprint/Server/Fork.hs | |||
| @@ -0,0 +1,81 @@ | |||
| 1 | {-# LANGUAGE FlexibleContexts #-} | ||
| 2 | |||
| 3 | module Thermoprint.Server.Fork | ||
| 4 | ( ThreadManager | ||
| 5 | , fork | ||
| 6 | , cleanup | ||
| 7 | , threadManager | ||
| 8 | ) where | ||
| 9 | |||
| 10 | import Control.Monad.Reader.Class | ||
| 11 | import Control.Monad.Trans.Class | ||
| 12 | import Control.Monad.Trans.Reader (ReaderT, runReaderT) | ||
| 13 | import Control.Monad.Catch | ||
| 14 | |||
| 15 | import Control.Monad.IO.Class | ||
| 16 | |||
| 17 | import Control.Monad | ||
| 18 | import Control.Applicative | ||
| 19 | import Data.Maybe | ||
| 20 | |||
| 21 | import Data.Foldable | ||
| 22 | |||
| 23 | import Data.Map (Map) | ||
| 24 | import qualified Data.Map as Map | ||
| 25 | |||
| 26 | import Control.Concurrent | ||
| 27 | import Control.Concurrent.STM | ||
| 28 | import Control.Concurrent.STM.TVar (TVar) | ||
| 29 | import qualified Control.Concurrent.STM.TVar as T | ||
| 30 | import Control.Concurrent.STM.TSem (TSem) | ||
| 31 | import qualified Control.Concurrent.STM.TSem as S | ||
| 32 | |||
| 33 | data ThreadManager m = ThreadManager | ||
| 34 | { fork :: m () -> m ThreadId | ||
| 35 | , cleanup :: m () | ||
| 36 | } | ||
| 37 | |||
| 38 | threadManager :: (MonadIO m, MonadMask m) => (m () -> m ThreadId) -> m (ThreadManager m) | ||
| 39 | threadManager f = do | ||
| 40 | tVar <- newTVar Map.empty | ||
| 41 | return ThreadManager | ||
| 42 | { fork = \act -> do | ||
| 43 | let | ||
| 44 | unregisterSelf :: MonadIO m => m () | ||
| 45 | unregisterSelf = do | ||
| 46 | tMap <- readTVar tVar | ||
| 47 | tId <- liftIO $ myThreadId | ||
| 48 | modifyTVar' tVar $ Map.delete tId | ||
| 49 | maybeM signalTSem $ Map.lookup tId tMap | ||
| 50 | |||
| 51 | mask $ \unmask -> do | ||
| 52 | tId <- f (unmask act `finally` unregisterSelf) | ||
| 53 | modifyTVar' tVar =<< (Map.insert tId <$> newTSem 0) | ||
| 54 | return tId | ||
| 55 | , cleanup = liftIO $ | ||
| 56 | mapM_ (\(tId, s) -> killThread tId >> waitTSem s) . Map.toList =<< readTVar tVar | ||
| 57 | } | ||
| 58 | where | ||
| 59 | atomically' :: MonadIO m => STM a -> m a | ||
| 60 | atomically' = liftIO . atomically | ||
| 61 | |||
| 62 | newTSem :: MonadIO m => Int -> m TSem | ||
| 63 | newTSem = atomically' . S.newTSem | ||
| 64 | |||
| 65 | waitTSem :: MonadIO m => TSem -> m () | ||
| 66 | waitTSem = atomically' . S.waitTSem | ||
| 67 | |||
| 68 | signalTSem :: MonadIO m => TSem -> m () | ||
| 69 | signalTSem = atomically' . S.signalTSem | ||
| 70 | |||
| 71 | newTVar :: MonadIO m => a -> m (TVar a) | ||
| 72 | newTVar = atomically' . T.newTVar | ||
| 73 | |||
| 74 | readTVar :: MonadIO m => TVar a -> m a | ||
| 75 | readTVar = atomically' . T.readTVar | ||
| 76 | |||
| 77 | modifyTVar' :: MonadIO m => TVar a -> (a -> a) -> m () | ||
| 78 | modifyTVar' t = atomically' . T.modifyTVar t | ||
| 79 | |||
| 80 | maybeM :: Applicative m => (a -> m ()) -> Maybe a -> m () | ||
| 81 | maybeM = maybe $ pure () | ||
