aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGregor Kleen <gkleen@yggdrasil.li>2016-02-17 22:08:36 +0000
committerGregor Kleen <gkleen@yggdrasil.li>2016-02-17 22:08:36 +0000
commitb5b4b86427286002081f102d1e97baef9162851e (patch)
tree5d3eeef8abc884931756dd3f9711c355f4a88a4e
parenteebc709302833932d7fca95dfc5c8536f8911e69 (diff)
downloadthermoprint-b5b4b86427286002081f102d1e97baef9162851e.tar
thermoprint-b5b4b86427286002081f102d1e97baef9162851e.tar.gz
thermoprint-b5b4b86427286002081f102d1e97baef9162851e.tar.bz2
thermoprint-b5b4b86427286002081f102d1e97baef9162851e.tar.xz
thermoprint-b5b4b86427286002081f102d1e97baef9162851e.zip
concurrency & dyre fixes for server spec
-rw-r--r--server/default-conf/Main.hs2
-rw-r--r--server/src/Thermoprint/Server.hs33
-rw-r--r--server/src/Thermoprint/Server/Fork.hs81
-rw-r--r--server/test/Thermoprint/ServerSpec.hs43
-rw-r--r--server/thermoprint-server.cabal1
5 files changed, 130 insertions, 30 deletions
diff --git a/server/default-conf/Main.hs b/server/default-conf/Main.hs
index 36f6c12..cbfc476 100644
--- a/server/default-conf/Main.hs
+++ b/server/default-conf/Main.hs
@@ -14,7 +14,7 @@ import Control.Monad.Reader
14import Database.Persist.Sqlite 14import Database.Persist.Sqlite
15 15
16main :: IO () 16main :: IO ()
17main = thermoprintServer (Nat runSqlite) $ def `withPrinters` printers 17main = thermoprintServer True (Nat runSqlite) $ def `withPrinters` printers
18 where 18 where
19 runSqlite :: ReaderT ConnectionPool (LoggingT IO) a -> IO a 19 runSqlite :: ReaderT ConnectionPool (LoggingT IO) a -> IO a
20 runSqlite = runStderrLoggingT . withSqlitePool "thermoprint.sqlite" 1 . runReaderT 20 runSqlite = runStderrLoggingT . withSqlitePool "thermoprint.sqlite" 1 . runReaderT
diff --git a/server/src/Thermoprint/Server.hs b/server/src/Thermoprint/Server.hs
index 678d056..4559414 100644
--- a/server/src/Thermoprint/Server.hs
+++ b/server/src/Thermoprint/Server.hs
@@ -23,6 +23,9 @@ import qualified Config.Dyre as Dyre
23import Data.Map (Map) 23import Data.Map (Map)
24import qualified Data.Map as Map 24import qualified Data.Map as Map
25 25
26import Data.Set (Set)
27import qualified Data.Set as Set
28
26import Data.Maybe (maybe) 29import Data.Maybe (maybe)
27import Data.Foldable (mapM_, forM_, foldlM) 30import Data.Foldable (mapM_, forM_, foldlM)
28 31
@@ -34,7 +37,7 @@ import Control.Monad.Reader
34import Control.Monad.IO.Class 37import Control.Monad.IO.Class
35import Control.Monad.Morph 38import Control.Monad.Morph
36import Control.Category 39import Control.Category
37import Control.Monad.Catch (MonadMask) 40import Control.Monad.Catch (MonadMask(mask), finally)
38import Prelude hiding (id, (.)) 41import Prelude hiding (id, (.))
39 42
40import qualified Control.Monad as M 43import qualified Control.Monad as M
@@ -56,12 +59,16 @@ import Database.Persist.Sql (runMigrationSilent, ConnectionPool, runSqlPool)
56 59
57import Thermoprint.API (thermoprintAPI, PrinterId) 60import Thermoprint.API (thermoprintAPI, PrinterId)
58 61
62import Thermoprint.Server.Fork
63
59import Thermoprint.Server.Database 64import Thermoprint.Server.Database
60import Thermoprint.Server.Printer 65import Thermoprint.Server.Printer
61import Thermoprint.Server.Queue 66import Thermoprint.Server.Queue
62import qualified Thermoprint.Server.API as API (thermoprintServer) 67import qualified Thermoprint.Server.API as API (thermoprintServer)
63import Thermoprint.Server.API hiding (thermoprintServer) 68import Thermoprint.Server.API hiding (thermoprintServer)
64 69
70import Debug.Trace
71
65-- | Compile-time configuration for 'thermoprintServer' 72-- | Compile-time configuration for 'thermoprintServer'
66data Config m = Config { dyreError :: Maybe String -- ^ Set by 'Dyre' -- sent to log as an error 73data Config m = Config { dyreError :: Maybe String -- ^ Set by 'Dyre' -- sent to log as an error
67 , warpSettings :: Warp.Settings -- ^ Configure 'Warp's behaviour 74 , warpSettings :: Warp.Settings -- ^ Configure 'Warp's behaviour
@@ -107,21 +114,25 @@ thermoprintServer :: ( MonadLoggerIO m
107 , MonadReader ConnectionPool m 114 , MonadReader ConnectionPool m
108 , MonadResourceBase m 115 , MonadResourceBase m
109 , MonadMask m 116 , MonadMask m
110 ) => (m :~> IO) -- ^ 'dyre' controls the base of the monad-transformer-stack ('IO') but we let the user specify much of the rest of it (we handle 'ResourceT' ourselves, since we need it to fork properly). Therefore we require a specification of how to collapse the stack. 117 ) => Bool -- ^ Invoke 'dyre' to look for and attempt to compile custom configurations (pass 'False' iff testing)
118 -> (m :~> IO) -- ^ 'dyre' controls the base of the monad-transformer-stack ('IO') but we let the user specify much of the rest of it (we handle 'ResourceT' ourselves, since we need it to fork properly). Therefore we require a specification of how to collapse the stack.
111 -> ResourceT m (Config (ResourceT m)) -> IO () 119 -> ResourceT m (Config (ResourceT m)) -> IO ()
112-- ^ Run the server 120-- ^ Run the server
113thermoprintServer io = Dyre.wrapMain $ Dyre.defaultParams 121thermoprintServer dyre io = Dyre.wrapMain $ Dyre.defaultParams
114 { Dyre.projectName = "thermoprint-server" 122 { Dyre.projectName = "thermoprint-server"
115 , Dyre.realMain = realMain 123 , Dyre.realMain = realMain
116 , Dyre.showError = flip (\msg -> fmap (\cfg -> cfg { dyreError = Just msg })) 124 , Dyre.showError = flip (\msg -> fmap (\cfg -> cfg { dyreError = Just msg }))
125 , Dyre.configCheck = dyre
117 } 126 }
118 where 127 where
119 realMain cfg = unNat (io . Nat runResourceT) $ do 128 realMain cfg = unNat (io . Nat runResourceT) $ do
120 Config{..} <- cfg 129 tMgr <- threadManager resourceForkIO
121 maybe (return ()) ($(logErrorS) "Dyre" . T.pack) dyreError 130 flip finally (cleanup tMgr) $ do
122 mapM_ ($(logWarnS) "DB") =<< runSqlPool (runMigrationSilent migrateAll) =<< ask 131 Config{..} <- cfg
123 forM_ printers $ resourceForkIO . runPrinter 132 maybe (return ()) ($(logErrorS) "Dyre" . T.pack) dyreError
124 let 133 mapM_ ($(logWarnS) "DB") =<< runSqlPool (runMigrationSilent migrateAll) =<< ask
125 runQM' (queueManagers -> QMConfig qm nat) printer = unNat nat $ runQM qm printer 134 forM_ printers $ fork tMgr . runPrinter
126 Map.foldrWithKey (\k p a -> resourceForkIO (runQM' k p) >> a) (return ()) printers 135 let
127 liftIO . Warp.runSettings warpSettings . serve thermoprintAPI . flip enter API.thermoprintServer =<< handlerNat printers 136 runQM' (queueManagers -> QMConfig qm nat) printer = unNat nat $ runQM qm printer
137 mapM_ (fork tMgr . uncurry runQM') $ Map.toList printers
138 liftIO . Warp.runSettings warpSettings . serve thermoprintAPI . flip enter API.thermoprintServer =<< handlerNat printers
diff --git a/server/src/Thermoprint/Server/Fork.hs b/server/src/Thermoprint/Server/Fork.hs
new file mode 100644
index 0000000..402c1f8
--- /dev/null
+++ b/server/src/Thermoprint/Server/Fork.hs
@@ -0,0 +1,81 @@
1{-# LANGUAGE FlexibleContexts #-}
2
3module Thermoprint.Server.Fork
4 ( ThreadManager
5 , fork
6 , cleanup
7 , threadManager
8 ) where
9
10import Control.Monad.Reader.Class
11import Control.Monad.Trans.Class
12import Control.Monad.Trans.Reader (ReaderT, runReaderT)
13import Control.Monad.Catch
14
15import Control.Monad.IO.Class
16
17import Control.Monad
18import Control.Applicative
19import Data.Maybe
20
21import Data.Foldable
22
23import Data.Map (Map)
24import qualified Data.Map as Map
25
26import Control.Concurrent
27import Control.Concurrent.STM
28import Control.Concurrent.STM.TVar (TVar)
29import qualified Control.Concurrent.STM.TVar as T
30import Control.Concurrent.STM.TSem (TSem)
31import qualified Control.Concurrent.STM.TSem as S
32
33data ThreadManager m = ThreadManager
34 { fork :: m () -> m ThreadId
35 , cleanup :: m ()
36 }
37
38threadManager :: (MonadIO m, MonadMask m) => (m () -> m ThreadId) -> m (ThreadManager m)
39threadManager f = do
40 tVar <- newTVar Map.empty
41 return ThreadManager
42 { fork = \act -> do
43 let
44 unregisterSelf :: MonadIO m => m ()
45 unregisterSelf = do
46 tMap <- readTVar tVar
47 tId <- liftIO $ myThreadId
48 modifyTVar' tVar $ Map.delete tId
49 maybeM signalTSem $ Map.lookup tId tMap
50
51 mask $ \unmask -> do
52 tId <- f (unmask act `finally` unregisterSelf)
53 modifyTVar' tVar =<< (Map.insert tId <$> newTSem 0)
54 return tId
55 , cleanup = liftIO $
56 mapM_ (\(tId, s) -> killThread tId >> waitTSem s) . Map.toList =<< readTVar tVar
57 }
58 where
59 atomically' :: MonadIO m => STM a -> m a
60 atomically' = liftIO . atomically
61
62 newTSem :: MonadIO m => Int -> m TSem
63 newTSem = atomically' . S.newTSem
64
65 waitTSem :: MonadIO m => TSem -> m ()
66 waitTSem = atomically' . S.waitTSem
67
68 signalTSem :: MonadIO m => TSem -> m ()
69 signalTSem = atomically' . S.signalTSem
70
71 newTVar :: MonadIO m => a -> m (TVar a)
72 newTVar = atomically' . T.newTVar
73
74 readTVar :: MonadIO m => TVar a -> m a
75 readTVar = atomically' . T.readTVar
76
77 modifyTVar' :: MonadIO m => TVar a -> (a -> a) -> m ()
78 modifyTVar' t = atomically' . T.modifyTVar t
79
80maybeM :: Applicative m => (a -> m ()) -> Maybe a -> m ()
81maybeM = maybe $ pure ()
diff --git a/server/test/Thermoprint/ServerSpec.hs b/server/test/Thermoprint/ServerSpec.hs
index 0d698f0..fe06a05 100644
--- a/server/test/Thermoprint/ServerSpec.hs
+++ b/server/test/Thermoprint/ServerSpec.hs
@@ -3,27 +3,29 @@
3 3
4module Thermoprint.ServerSpec (spec) where 4module Thermoprint.ServerSpec (spec) where
5 5
6import Test.Hspec 6import Test.Hspec
7 7
8import Thermoprint.API 8import Thermoprint.API
9import Thermoprint.Server 9import Thermoprint.Server
10 10
11import Control.Monad 11import Control.Monad
12import Control.Monad.Logger 12import Control.Monad.Logger
13import Control.Monad.Reader 13import Control.Monad.Reader
14import Control.Monad.Trans.Identity 14import Control.Monad.Trans.Identity
15import Control.Monad.Trans.Resource 15import Control.Monad.Trans.Resource
16 16
17import Database.Persist.Sqlite 17import Database.Persist.Sqlite
18 18
19import Control.Concurrent 19import Control.Concurrent
20import Control.Concurrent.STM 20import Control.Concurrent.STM
21 21
22import System.IO 22import System.IO
23import System.IO.Temp 23import System.IO.Temp
24 24
25import qualified Data.Text as T 25import qualified Data.Text as T
26 26
27import Debug.Trace
28
27data TestPrinter = TestPrinter 29data TestPrinter = TestPrinter
28 { outputChan :: TChan Printout 30 { outputChan :: TChan Printout
29 , failSwitch :: TMVar PrintingError 31 , failSwitch :: TMVar PrintingError
@@ -33,13 +35,14 @@ data TestManager = TestManager
33 { manage :: TMVar (QueueManager IdentityT) 35 { manage :: TMVar (QueueManager IdentityT)
34 } 36 }
35 37
36setup :: IO (ThreadId, TestPrinter, TestManager) 38setup :: IO (ThreadId, QSem, TestPrinter, TestManager)
37setup = withSystemTempFile "thermoprint.sqlite" $ \fp h -> hClose h >> do 39setup = withSystemTempFile "thermoprint.sqlite" $ \fp h -> hClose h >> do
38 tPrinter <- TestPrinter <$> newTChanIO <*> newEmptyTMVarIO 40 tPrinter <- TestPrinter <$> newTChanIO <*> newEmptyTMVarIO
39 tManager <- TestManager <$> newEmptyTMVarIO 41 tManager <- TestManager <$> newEmptyTMVarIO
42 termSem <- newQSem 0
40 let 43 let
41 runSqlite :: ReaderT ConnectionPool (LoggingT IO) a -> IO a 44 runSqlite :: ReaderT ConnectionPool (LoggingT IO) a -> IO a
42 runSqlite = runStderrLoggingT . withSqlitePool (T.pack fp) 1 . runReaderT 45 runSqlite = runNoLoggingT . withSqlitePool (T.pack fp) 1 . runReaderT
43 46
44 printers = [ ( pure $ PM tPM 47 printers = [ ( pure $ PM tPM
45 , QMConfig (join . lift $ takeTMVar (manage tManager)) (Nat $ liftIO . runIdentityT) 48 , QMConfig (join . lift $ takeTMVar (manage tManager)) (Nat $ liftIO . runIdentityT)
@@ -48,12 +51,16 @@ setup = withSystemTempFile "thermoprint.sqlite" $ \fp h -> hClose h >> do
48 51
49 tPM :: MonadIO m => Printout -> m (Maybe PrintingError) 52 tPM :: MonadIO m => Printout -> m (Maybe PrintingError)
50 tPM printout = liftIO . atomically $ writeTChan (outputChan tPrinter) printout >> tryTakeTMVar (failSwitch tPrinter) 53 tPM printout = liftIO . atomically $ writeTChan (outputChan tPrinter) printout >> tryTakeTMVar (failSwitch tPrinter)
51 (,,) <$> forkIO (thermoprintServer (Nat runSqlite) $ def `withPrinters` printers) <*> pure tPrinter <*> pure tManager 54 (,,,) <$> forkFinally (thermoprintServer False (Nat runSqlite) $ def `withPrinters` printers) (const $ signalQSem termSem) <*> pure termSem <*> pure tPrinter <*> pure tManager
55
56withSetup :: SpecWith (ThreadId, QSem, TestPrinter, TestManager) -> Spec
57withSetup = beforeAll setup . afterAll (\(tId, termSem, _, _) -> killThread tId >> waitQSem termSem)
52 58
53spec :: Spec 59spec :: Spec
54spec = beforeAll setup $ do 60spec = withSetup $ do
55 describe "blubTests" $ do 61 describe "blubTests" $ do
56 it "prints Blub." $ \(tId, _, _) -> do 62 it "prints Blub." $ \(tId, _, _, _) -> do
63 threadDelay 5000
57 putStrLn "Blub." 64 putStrLn "Blub."
58 System.IO.print tId 65 System.IO.print tId
59 True `shouldSatisfy` id 66 True `shouldSatisfy` id
diff --git a/server/thermoprint-server.cabal b/server/thermoprint-server.cabal
index bfd5b9b..cfef947 100644
--- a/server/thermoprint-server.cabal
+++ b/server/thermoprint-server.cabal
@@ -18,6 +18,7 @@ cabal-version: >=1.10
18 18
19library 19library
20 exposed-modules: Thermoprint.Server 20 exposed-modules: Thermoprint.Server
21 , Thermoprint.Server.Fork
21 , Thermoprint.Server.Database 22 , Thermoprint.Server.Database
22 , Thermoprint.Server.API 23 , Thermoprint.Server.API
23 , Thermoprint.Server.Queue 24 , Thermoprint.Server.Queue