diff options
author | Gregor Kleen <gkleen@yggdrasil.li> | 2016-02-19 15:41:58 +0100 |
---|---|---|
committer | Gregor Kleen <gkleen@yggdrasil.li> | 2016-02-19 15:41:58 +0100 |
commit | 9e45c04c45aef1fa71815c61512c354d0d5ee3e3 (patch) | |
tree | 6720e42254e9ada3747e531a2bd4d2c4a3aca8d7 /server | |
parent | 9eeb93a2e333b4aa00b5a617af9b995e0f5dd7cc (diff) | |
download | thermoprint-9e45c04c45aef1fa71815c61512c354d0d5ee3e3.tar thermoprint-9e45c04c45aef1fa71815c61512c354d0d5ee3e3.tar.gz thermoprint-9e45c04c45aef1fa71815c61512c354d0d5ee3e3.tar.bz2 thermoprint-9e45c04c45aef1fa71815c61512c354d0d5ee3e3.tar.xz thermoprint-9e45c04c45aef1fa71815c61512c354d0d5ee3e3.zip |
Garbage collection of jobs in db
Diffstat (limited to 'server')
-rw-r--r-- | server/src/Thermoprint/Server.hs | 4 | ||||
-rw-r--r-- | server/src/Thermoprint/Server/Queue.hs | 40 | ||||
-rw-r--r-- | server/thermoprint-server.cabal | 4 | ||||
-rw-r--r-- | server/thermoprint-server.nix | 2 |
4 files changed, 39 insertions, 11 deletions
diff --git a/server/src/Thermoprint/Server.hs b/server/src/Thermoprint/Server.hs index 4559414..df2d8e9 100644 --- a/server/src/Thermoprint/Server.hs +++ b/server/src/Thermoprint/Server.hs | |||
@@ -132,7 +132,9 @@ thermoprintServer dyre io = Dyre.wrapMain $ Dyre.defaultParams | |||
132 | maybe (return ()) ($(logErrorS) "Dyre" . T.pack) dyreError | 132 | maybe (return ()) ($(logErrorS) "Dyre" . T.pack) dyreError |
133 | mapM_ ($(logWarnS) "DB") =<< runSqlPool (runMigrationSilent migrateAll) =<< ask | 133 | mapM_ ($(logWarnS) "DB") =<< runSqlPool (runMigrationSilent migrateAll) =<< ask |
134 | forM_ printers $ fork tMgr . runPrinter | 134 | forM_ printers $ fork tMgr . runPrinter |
135 | gcChan <- liftIO newTChanIO | ||
136 | fork tMgr $ jobGC gcChan | ||
135 | let | 137 | let |
136 | runQM' (queueManagers -> QMConfig qm nat) printer = unNat nat $ runQM qm printer | 138 | runQM' (queueManagers -> QMConfig qm nat) printer = unNat nat $ runQM gcChan qm printer |
137 | mapM_ (fork tMgr . uncurry runQM') $ Map.toList printers | 139 | mapM_ (fork tMgr . uncurry runQM') $ Map.toList printers |
138 | liftIO . Warp.runSettings warpSettings . serve thermoprintAPI . flip enter API.thermoprintServer =<< handlerNat printers | 140 | liftIO . Warp.runSettings warpSettings . serve thermoprintAPI . flip enter API.thermoprintServer =<< handlerNat printers |
diff --git a/server/src/Thermoprint/Server/Queue.hs b/server/src/Thermoprint/Server/Queue.hs index 3c8fb9e..aa26fe3 100644 --- a/server/src/Thermoprint/Server/Queue.hs +++ b/server/src/Thermoprint/Server/Queue.hs | |||
@@ -9,6 +9,7 @@ module Thermoprint.Server.Queue | |||
9 | , fromZipper, toZipper, QueueItem(..) | 9 | , fromZipper, toZipper, QueueItem(..) |
10 | , HasQueue(..) | 10 | , HasQueue(..) |
11 | , QueueManager, QueueManagerM, runQM | 11 | , QueueManager, QueueManagerM, runQM |
12 | , jobGC | ||
12 | , intersection, idQM | 13 | , intersection, idQM |
13 | , union, nullQM | 14 | , union, nullQM |
14 | ) where | 15 | ) where |
@@ -40,16 +41,21 @@ import Data.Default.Class | |||
40 | import Control.Monad | 41 | import Control.Monad |
41 | import Control.Monad.Morph | 42 | import Control.Monad.Morph |
42 | import Control.Monad.Trans.Compose | 43 | import Control.Monad.Trans.Compose |
44 | import Control.Monad.Trans.Resource | ||
45 | import Control.Monad.Reader | ||
43 | import Data.Function | 46 | import Data.Function |
44 | 47 | ||
45 | import Data.Foldable | 48 | import Data.Foldable |
46 | import Data.Monoid | 49 | import Data.Monoid |
47 | import Data.Ord | 50 | import Data.Ord |
48 | 51 | ||
52 | import Database.Persist (delete) | ||
53 | import Database.Persist.Sql (ConnectionPool, runSqlPool) | ||
54 | |||
49 | import Test.QuickCheck.Arbitrary (Arbitrary(..), CoArbitrary(..)) | 55 | import Test.QuickCheck.Arbitrary (Arbitrary(..), CoArbitrary(..)) |
50 | import Test.QuickCheck.Gen (Gen, scale) | 56 | import Test.QuickCheck.Gen (Gen, scale) |
51 | import Test.QuickCheck.Instances | 57 | import Test.QuickCheck.Instances |
52 | import Test.QuickCheck.Modifiers | 58 | import Test.QuickCheck.Modifiers (NonNegative(..)) |
53 | 59 | ||
54 | -- | Zipper for 'Seq QueueEntry' with additional support for 'PrintingError' in the section after point | 60 | -- | Zipper for 'Seq QueueEntry' with additional support for 'PrintingError' in the section after point |
55 | data Queue = Queue | 61 | data Queue = Queue |
@@ -92,7 +98,9 @@ instance Arbitrary QueueEntry where | |||
92 | instance CoArbitrary QueueEntry where | 98 | instance CoArbitrary QueueEntry where |
93 | coarbitrary QueueEntry{..} = coarbitrary created . coarbitrary (fromIntegral jobId :: Integer) | 99 | coarbitrary QueueEntry{..} = coarbitrary created . coarbitrary (fromIntegral jobId :: Integer) |
94 | 100 | ||
95 | data QueueItem = Pending Int QueueEntry | Current QueueEntry | History Int QueueEntry (Maybe PrintingError) | 101 | data QueueItem = Pending { pos :: Int, entry :: QueueEntry } |
102 | | Current { entry :: QueueEntry } | ||
103 | | History { pos :: Int, entry :: QueueEntry, err :: Maybe PrintingError } | ||
96 | 104 | ||
97 | instance Eq QueueItem where | 105 | instance Eq QueueItem where |
98 | (Pending i a ) == (Pending j b ) = i == j && a == b | 106 | (Pending i a ) == (Pending j b ) = i == j && a == b |
@@ -149,17 +157,35 @@ runQM :: ( HasQueue q | |||
149 | , MonadTrans t | 157 | , MonadTrans t |
150 | , MonadIO (t IO) | 158 | , MonadIO (t IO) |
151 | , Monad (t STM) | 159 | , Monad (t STM) |
152 | ) => QueueManager t -> q -> t IO () | 160 | ) => TChan JobId -- ^ Channel for deleted 'JobId's ready to be garbage collected |
161 | -> QueueManager t -> q -> t IO () | ||
153 | -- ^ Periodically modify a 'Queue' | 162 | -- ^ Periodically modify a 'Queue' |
154 | -- | 163 | -- |
155 | -- /TODO/: Garbage collect deleted jobs -- maybe switch to 'Set QueueItem' in 'QueueManager' | 164 | -- We use 'fromZipper' to determine which 'QueueEntry's were deleted by the manager and write |
156 | runQM qm (extractQueue -> q) = sleep =<< qm' | 165 | -- those to the garbage collectors channel. |
166 | -- Since there is no crosstalk between managers this works only if the relatien between | ||
167 | -- 'JobId's and the 'QueueManager' responsible for them is a single-valued function | ||
168 | runQM gcChan qm (extractQueue -> q) = sleep =<< qm' | ||
157 | where | 169 | where |
158 | qm' = hoist atomically $ (\(a, s) -> a <$ lift (writeTVar q $!! s)) =<< runStateT (getComposeT qm) =<< lift (readTVar q) | 170 | qm' = hoist atomically $ do |
171 | before <- lift $ readTVar q | ||
172 | (delay, after) <- runStateT (getComposeT qm) before | ||
173 | lift $ writeTVar q $!! after | ||
174 | let | ||
175 | deleted = Set.map (jobId . entry . unPlain) $ (Set.difference `on` (Set.map Plain . fromZipper)) before after | ||
176 | mapM_ (lift . writeTChan gcChan) deleted | ||
177 | return delay | ||
159 | sleep (abs -> delay) | 178 | sleep (abs -> delay) |
160 | | (Finite d) <- delay = liftIO (threadDelay $ fromEnum d) >> runQM qm q | 179 | | (Finite d) <- delay = liftIO (threadDelay $ fromEnum d) >> runQM gcChan qm q |
161 | | otherwise = return () | 180 | | otherwise = return () |
162 | 181 | ||
182 | jobGC :: ( MonadReader ConnectionPool m | ||
183 | , MonadBaseControl IO m | ||
184 | , MonadIO m | ||
185 | ) => TChan JobId -> m () | ||
186 | -- ^ Listen for 'JobId's on a 'TChan' and delete them from the database 'forever' | ||
187 | jobGC gcChan = forever $ liftIO (atomically $ readTChan gcChan) >>= (\jId -> runSqlPool (delete jId) =<< ask) | ||
188 | |||
163 | intersection :: (Foldable f, MonadState Queue (QueueManagerM t)) => f (QueueManager t) -> QueueManager t | 189 | intersection :: (Foldable f, MonadState Queue (QueueManagerM t)) => f (QueueManager t) -> QueueManager t |
164 | -- ^ Combine two 'QueueManager's keeping only 'QueueEntry's both managers decide to keep | 190 | -- ^ Combine two 'QueueManager's keeping only 'QueueEntry's both managers decide to keep |
165 | -- | 191 | -- |
diff --git a/server/thermoprint-server.cabal b/server/thermoprint-server.cabal index d6613ad..e90eb4f 100644 --- a/server/thermoprint-server.cabal +++ b/server/thermoprint-server.cabal | |||
@@ -2,7 +2,7 @@ | |||
2 | -- documentation, see http://haskell.org/cabal/users-guide/ | 2 | -- documentation, see http://haskell.org/cabal/users-guide/ |
3 | 3 | ||
4 | name: thermoprint-server | 4 | name: thermoprint-server |
5 | version: 0.0.0 | 5 | version: 1.0.0 |
6 | synopsis: Server for thermoprint-spec | 6 | synopsis: Server for thermoprint-spec |
7 | -- description: | 7 | -- description: |
8 | homepage: http://dirty-haskell.org/tags/thermoprint.html | 8 | homepage: http://dirty-haskell.org/tags/thermoprint.html |
@@ -65,7 +65,7 @@ Test-Suite tests | |||
65 | hs-source-dirs: test | 65 | hs-source-dirs: test |
66 | main-is: Spec.hs | 66 | main-is: Spec.hs |
67 | build-depends: base >=4.8.1 && <5 | 67 | build-depends: base >=4.8.1 && <5 |
68 | , thermoprint-server ==0.0.* | 68 | , thermoprint-server ==1.0.* |
69 | , thermoprint-client ==0.0.* | 69 | , thermoprint-client ==0.0.* |
70 | , thermoprint-spec -any | 70 | , thermoprint-spec -any |
71 | , hspec >=2.2.1 && <3 | 71 | , hspec >=2.2.1 && <3 |
diff --git a/server/thermoprint-server.nix b/server/thermoprint-server.nix index d7a7684..f472cbc 100644 --- a/server/thermoprint-server.nix +++ b/server/thermoprint-server.nix | |||
@@ -8,7 +8,7 @@ | |||
8 | }: | 8 | }: |
9 | mkDerivation { | 9 | mkDerivation { |
10 | pname = "thermoprint-server"; | 10 | pname = "thermoprint-server"; |
11 | version = "0.0.0"; | 11 | version = "1.0.0"; |
12 | src = ./.; | 12 | src = ./.; |
13 | isLibrary = true; | 13 | isLibrary = true; |
14 | isExecutable = true; | 14 | isExecutable = true; |