diff options
| -rw-r--r-- | server/src/Thermoprint/Server.hs | 4 | ||||
| -rw-r--r-- | server/src/Thermoprint/Server/Queue.hs | 40 | ||||
| -rw-r--r-- | server/thermoprint-server.cabal | 4 | ||||
| -rw-r--r-- | server/thermoprint-server.nix | 2 |
4 files changed, 39 insertions, 11 deletions
diff --git a/server/src/Thermoprint/Server.hs b/server/src/Thermoprint/Server.hs index 4559414..df2d8e9 100644 --- a/server/src/Thermoprint/Server.hs +++ b/server/src/Thermoprint/Server.hs | |||
| @@ -132,7 +132,9 @@ thermoprintServer dyre io = Dyre.wrapMain $ Dyre.defaultParams | |||
| 132 | maybe (return ()) ($(logErrorS) "Dyre" . T.pack) dyreError | 132 | maybe (return ()) ($(logErrorS) "Dyre" . T.pack) dyreError |
| 133 | mapM_ ($(logWarnS) "DB") =<< runSqlPool (runMigrationSilent migrateAll) =<< ask | 133 | mapM_ ($(logWarnS) "DB") =<< runSqlPool (runMigrationSilent migrateAll) =<< ask |
| 134 | forM_ printers $ fork tMgr . runPrinter | 134 | forM_ printers $ fork tMgr . runPrinter |
| 135 | gcChan <- liftIO newTChanIO | ||
| 136 | fork tMgr $ jobGC gcChan | ||
| 135 | let | 137 | let |
| 136 | runQM' (queueManagers -> QMConfig qm nat) printer = unNat nat $ runQM qm printer | 138 | runQM' (queueManagers -> QMConfig qm nat) printer = unNat nat $ runQM gcChan qm printer |
| 137 | mapM_ (fork tMgr . uncurry runQM') $ Map.toList printers | 139 | mapM_ (fork tMgr . uncurry runQM') $ Map.toList printers |
| 138 | liftIO . Warp.runSettings warpSettings . serve thermoprintAPI . flip enter API.thermoprintServer =<< handlerNat printers | 140 | liftIO . Warp.runSettings warpSettings . serve thermoprintAPI . flip enter API.thermoprintServer =<< handlerNat printers |
diff --git a/server/src/Thermoprint/Server/Queue.hs b/server/src/Thermoprint/Server/Queue.hs index 3c8fb9e..aa26fe3 100644 --- a/server/src/Thermoprint/Server/Queue.hs +++ b/server/src/Thermoprint/Server/Queue.hs | |||
| @@ -9,6 +9,7 @@ module Thermoprint.Server.Queue | |||
| 9 | , fromZipper, toZipper, QueueItem(..) | 9 | , fromZipper, toZipper, QueueItem(..) |
| 10 | , HasQueue(..) | 10 | , HasQueue(..) |
| 11 | , QueueManager, QueueManagerM, runQM | 11 | , QueueManager, QueueManagerM, runQM |
| 12 | , jobGC | ||
| 12 | , intersection, idQM | 13 | , intersection, idQM |
| 13 | , union, nullQM | 14 | , union, nullQM |
| 14 | ) where | 15 | ) where |
| @@ -40,16 +41,21 @@ import Data.Default.Class | |||
| 40 | import Control.Monad | 41 | import Control.Monad |
| 41 | import Control.Monad.Morph | 42 | import Control.Monad.Morph |
| 42 | import Control.Monad.Trans.Compose | 43 | import Control.Monad.Trans.Compose |
| 44 | import Control.Monad.Trans.Resource | ||
| 45 | import Control.Monad.Reader | ||
| 43 | import Data.Function | 46 | import Data.Function |
| 44 | 47 | ||
| 45 | import Data.Foldable | 48 | import Data.Foldable |
| 46 | import Data.Monoid | 49 | import Data.Monoid |
| 47 | import Data.Ord | 50 | import Data.Ord |
| 48 | 51 | ||
| 52 | import Database.Persist (delete) | ||
| 53 | import Database.Persist.Sql (ConnectionPool, runSqlPool) | ||
| 54 | |||
| 49 | import Test.QuickCheck.Arbitrary (Arbitrary(..), CoArbitrary(..)) | 55 | import Test.QuickCheck.Arbitrary (Arbitrary(..), CoArbitrary(..)) |
| 50 | import Test.QuickCheck.Gen (Gen, scale) | 56 | import Test.QuickCheck.Gen (Gen, scale) |
| 51 | import Test.QuickCheck.Instances | 57 | import Test.QuickCheck.Instances |
| 52 | import Test.QuickCheck.Modifiers | 58 | import Test.QuickCheck.Modifiers (NonNegative(..)) |
| 53 | 59 | ||
| 54 | -- | Zipper for 'Seq QueueEntry' with additional support for 'PrintingError' in the section after point | 60 | -- | Zipper for 'Seq QueueEntry' with additional support for 'PrintingError' in the section after point |
| 55 | data Queue = Queue | 61 | data Queue = Queue |
| @@ -92,7 +98,9 @@ instance Arbitrary QueueEntry where | |||
| 92 | instance CoArbitrary QueueEntry where | 98 | instance CoArbitrary QueueEntry where |
| 93 | coarbitrary QueueEntry{..} = coarbitrary created . coarbitrary (fromIntegral jobId :: Integer) | 99 | coarbitrary QueueEntry{..} = coarbitrary created . coarbitrary (fromIntegral jobId :: Integer) |
| 94 | 100 | ||
| 95 | data QueueItem = Pending Int QueueEntry | Current QueueEntry | History Int QueueEntry (Maybe PrintingError) | 101 | data QueueItem = Pending { pos :: Int, entry :: QueueEntry } |
| 102 | | Current { entry :: QueueEntry } | ||
| 103 | | History { pos :: Int, entry :: QueueEntry, err :: Maybe PrintingError } | ||
| 96 | 104 | ||
| 97 | instance Eq QueueItem where | 105 | instance Eq QueueItem where |
| 98 | (Pending i a ) == (Pending j b ) = i == j && a == b | 106 | (Pending i a ) == (Pending j b ) = i == j && a == b |
| @@ -149,17 +157,35 @@ runQM :: ( HasQueue q | |||
| 149 | , MonadTrans t | 157 | , MonadTrans t |
| 150 | , MonadIO (t IO) | 158 | , MonadIO (t IO) |
| 151 | , Monad (t STM) | 159 | , Monad (t STM) |
| 152 | ) => QueueManager t -> q -> t IO () | 160 | ) => TChan JobId -- ^ Channel for deleted 'JobId's ready to be garbage collected |
| 161 | -> QueueManager t -> q -> t IO () | ||
| 153 | -- ^ Periodically modify a 'Queue' | 162 | -- ^ Periodically modify a 'Queue' |
| 154 | -- | 163 | -- |
| 155 | -- /TODO/: Garbage collect deleted jobs -- maybe switch to 'Set QueueItem' in 'QueueManager' | 164 | -- We use 'fromZipper' to determine which 'QueueEntry's were deleted by the manager and write |
| 156 | runQM qm (extractQueue -> q) = sleep =<< qm' | 165 | -- those to the garbage collectors channel. |
| 166 | -- Since there is no crosstalk between managers this works only if the relatien between | ||
| 167 | -- 'JobId's and the 'QueueManager' responsible for them is a single-valued function | ||
| 168 | runQM gcChan qm (extractQueue -> q) = sleep =<< qm' | ||
| 157 | where | 169 | where |
| 158 | qm' = hoist atomically $ (\(a, s) -> a <$ lift (writeTVar q $!! s)) =<< runStateT (getComposeT qm) =<< lift (readTVar q) | 170 | qm' = hoist atomically $ do |
| 171 | before <- lift $ readTVar q | ||
| 172 | (delay, after) <- runStateT (getComposeT qm) before | ||
| 173 | lift $ writeTVar q $!! after | ||
| 174 | let | ||
| 175 | deleted = Set.map (jobId . entry . unPlain) $ (Set.difference `on` (Set.map Plain . fromZipper)) before after | ||
| 176 | mapM_ (lift . writeTChan gcChan) deleted | ||
| 177 | return delay | ||
| 159 | sleep (abs -> delay) | 178 | sleep (abs -> delay) |
| 160 | | (Finite d) <- delay = liftIO (threadDelay $ fromEnum d) >> runQM qm q | 179 | | (Finite d) <- delay = liftIO (threadDelay $ fromEnum d) >> runQM gcChan qm q |
| 161 | | otherwise = return () | 180 | | otherwise = return () |
| 162 | 181 | ||
| 182 | jobGC :: ( MonadReader ConnectionPool m | ||
| 183 | , MonadBaseControl IO m | ||
| 184 | , MonadIO m | ||
| 185 | ) => TChan JobId -> m () | ||
| 186 | -- ^ Listen for 'JobId's on a 'TChan' and delete them from the database 'forever' | ||
| 187 | jobGC gcChan = forever $ liftIO (atomically $ readTChan gcChan) >>= (\jId -> runSqlPool (delete jId) =<< ask) | ||
| 188 | |||
| 163 | intersection :: (Foldable f, MonadState Queue (QueueManagerM t)) => f (QueueManager t) -> QueueManager t | 189 | intersection :: (Foldable f, MonadState Queue (QueueManagerM t)) => f (QueueManager t) -> QueueManager t |
| 164 | -- ^ Combine two 'QueueManager's keeping only 'QueueEntry's both managers decide to keep | 190 | -- ^ Combine two 'QueueManager's keeping only 'QueueEntry's both managers decide to keep |
| 165 | -- | 191 | -- |
diff --git a/server/thermoprint-server.cabal b/server/thermoprint-server.cabal index d6613ad..e90eb4f 100644 --- a/server/thermoprint-server.cabal +++ b/server/thermoprint-server.cabal | |||
| @@ -2,7 +2,7 @@ | |||
| 2 | -- documentation, see http://haskell.org/cabal/users-guide/ | 2 | -- documentation, see http://haskell.org/cabal/users-guide/ |
| 3 | 3 | ||
| 4 | name: thermoprint-server | 4 | name: thermoprint-server |
| 5 | version: 0.0.0 | 5 | version: 1.0.0 |
| 6 | synopsis: Server for thermoprint-spec | 6 | synopsis: Server for thermoprint-spec |
| 7 | -- description: | 7 | -- description: |
| 8 | homepage: http://dirty-haskell.org/tags/thermoprint.html | 8 | homepage: http://dirty-haskell.org/tags/thermoprint.html |
| @@ -65,7 +65,7 @@ Test-Suite tests | |||
| 65 | hs-source-dirs: test | 65 | hs-source-dirs: test |
| 66 | main-is: Spec.hs | 66 | main-is: Spec.hs |
| 67 | build-depends: base >=4.8.1 && <5 | 67 | build-depends: base >=4.8.1 && <5 |
| 68 | , thermoprint-server ==0.0.* | 68 | , thermoprint-server ==1.0.* |
| 69 | , thermoprint-client ==0.0.* | 69 | , thermoprint-client ==0.0.* |
| 70 | , thermoprint-spec -any | 70 | , thermoprint-spec -any |
| 71 | , hspec >=2.2.1 && <3 | 71 | , hspec >=2.2.1 && <3 |
diff --git a/server/thermoprint-server.nix b/server/thermoprint-server.nix index d7a7684..f472cbc 100644 --- a/server/thermoprint-server.nix +++ b/server/thermoprint-server.nix | |||
| @@ -8,7 +8,7 @@ | |||
| 8 | }: | 8 | }: |
| 9 | mkDerivation { | 9 | mkDerivation { |
| 10 | pname = "thermoprint-server"; | 10 | pname = "thermoprint-server"; |
| 11 | version = "0.0.0"; | 11 | version = "1.0.0"; |
| 12 | src = ./.; | 12 | src = ./.; |
| 13 | isLibrary = true; | 13 | isLibrary = true; |
| 14 | isExecutable = true; | 14 | isExecutable = true; |
