diff options
author | Gregor Kleen <gkleen@yggdrasil.li> | 2016-02-19 15:41:58 +0100 |
---|---|---|
committer | Gregor Kleen <gkleen@yggdrasil.li> | 2016-02-19 15:41:58 +0100 |
commit | 9e45c04c45aef1fa71815c61512c354d0d5ee3e3 (patch) | |
tree | 6720e42254e9ada3747e531a2bd4d2c4a3aca8d7 /server/src | |
parent | 9eeb93a2e333b4aa00b5a617af9b995e0f5dd7cc (diff) | |
download | thermoprint-9e45c04c45aef1fa71815c61512c354d0d5ee3e3.tar thermoprint-9e45c04c45aef1fa71815c61512c354d0d5ee3e3.tar.gz thermoprint-9e45c04c45aef1fa71815c61512c354d0d5ee3e3.tar.bz2 thermoprint-9e45c04c45aef1fa71815c61512c354d0d5ee3e3.tar.xz thermoprint-9e45c04c45aef1fa71815c61512c354d0d5ee3e3.zip |
Garbage collection of jobs in db
Diffstat (limited to 'server/src')
-rw-r--r-- | server/src/Thermoprint/Server.hs | 4 | ||||
-rw-r--r-- | server/src/Thermoprint/Server/Queue.hs | 40 |
2 files changed, 36 insertions, 8 deletions
diff --git a/server/src/Thermoprint/Server.hs b/server/src/Thermoprint/Server.hs index 4559414..df2d8e9 100644 --- a/server/src/Thermoprint/Server.hs +++ b/server/src/Thermoprint/Server.hs | |||
@@ -132,7 +132,9 @@ thermoprintServer dyre io = Dyre.wrapMain $ Dyre.defaultParams | |||
132 | maybe (return ()) ($(logErrorS) "Dyre" . T.pack) dyreError | 132 | maybe (return ()) ($(logErrorS) "Dyre" . T.pack) dyreError |
133 | mapM_ ($(logWarnS) "DB") =<< runSqlPool (runMigrationSilent migrateAll) =<< ask | 133 | mapM_ ($(logWarnS) "DB") =<< runSqlPool (runMigrationSilent migrateAll) =<< ask |
134 | forM_ printers $ fork tMgr . runPrinter | 134 | forM_ printers $ fork tMgr . runPrinter |
135 | gcChan <- liftIO newTChanIO | ||
136 | fork tMgr $ jobGC gcChan | ||
135 | let | 137 | let |
136 | runQM' (queueManagers -> QMConfig qm nat) printer = unNat nat $ runQM qm printer | 138 | runQM' (queueManagers -> QMConfig qm nat) printer = unNat nat $ runQM gcChan qm printer |
137 | mapM_ (fork tMgr . uncurry runQM') $ Map.toList printers | 139 | mapM_ (fork tMgr . uncurry runQM') $ Map.toList printers |
138 | liftIO . Warp.runSettings warpSettings . serve thermoprintAPI . flip enter API.thermoprintServer =<< handlerNat printers | 140 | liftIO . Warp.runSettings warpSettings . serve thermoprintAPI . flip enter API.thermoprintServer =<< handlerNat printers |
diff --git a/server/src/Thermoprint/Server/Queue.hs b/server/src/Thermoprint/Server/Queue.hs index 3c8fb9e..aa26fe3 100644 --- a/server/src/Thermoprint/Server/Queue.hs +++ b/server/src/Thermoprint/Server/Queue.hs | |||
@@ -9,6 +9,7 @@ module Thermoprint.Server.Queue | |||
9 | , fromZipper, toZipper, QueueItem(..) | 9 | , fromZipper, toZipper, QueueItem(..) |
10 | , HasQueue(..) | 10 | , HasQueue(..) |
11 | , QueueManager, QueueManagerM, runQM | 11 | , QueueManager, QueueManagerM, runQM |
12 | , jobGC | ||
12 | , intersection, idQM | 13 | , intersection, idQM |
13 | , union, nullQM | 14 | , union, nullQM |
14 | ) where | 15 | ) where |
@@ -40,16 +41,21 @@ import Data.Default.Class | |||
40 | import Control.Monad | 41 | import Control.Monad |
41 | import Control.Monad.Morph | 42 | import Control.Monad.Morph |
42 | import Control.Monad.Trans.Compose | 43 | import Control.Monad.Trans.Compose |
44 | import Control.Monad.Trans.Resource | ||
45 | import Control.Monad.Reader | ||
43 | import Data.Function | 46 | import Data.Function |
44 | 47 | ||
45 | import Data.Foldable | 48 | import Data.Foldable |
46 | import Data.Monoid | 49 | import Data.Monoid |
47 | import Data.Ord | 50 | import Data.Ord |
48 | 51 | ||
52 | import Database.Persist (delete) | ||
53 | import Database.Persist.Sql (ConnectionPool, runSqlPool) | ||
54 | |||
49 | import Test.QuickCheck.Arbitrary (Arbitrary(..), CoArbitrary(..)) | 55 | import Test.QuickCheck.Arbitrary (Arbitrary(..), CoArbitrary(..)) |
50 | import Test.QuickCheck.Gen (Gen, scale) | 56 | import Test.QuickCheck.Gen (Gen, scale) |
51 | import Test.QuickCheck.Instances | 57 | import Test.QuickCheck.Instances |
52 | import Test.QuickCheck.Modifiers | 58 | import Test.QuickCheck.Modifiers (NonNegative(..)) |
53 | 59 | ||
54 | -- | Zipper for 'Seq QueueEntry' with additional support for 'PrintingError' in the section after point | 60 | -- | Zipper for 'Seq QueueEntry' with additional support for 'PrintingError' in the section after point |
55 | data Queue = Queue | 61 | data Queue = Queue |
@@ -92,7 +98,9 @@ instance Arbitrary QueueEntry where | |||
92 | instance CoArbitrary QueueEntry where | 98 | instance CoArbitrary QueueEntry where |
93 | coarbitrary QueueEntry{..} = coarbitrary created . coarbitrary (fromIntegral jobId :: Integer) | 99 | coarbitrary QueueEntry{..} = coarbitrary created . coarbitrary (fromIntegral jobId :: Integer) |
94 | 100 | ||
95 | data QueueItem = Pending Int QueueEntry | Current QueueEntry | History Int QueueEntry (Maybe PrintingError) | 101 | data QueueItem = Pending { pos :: Int, entry :: QueueEntry } |
102 | | Current { entry :: QueueEntry } | ||
103 | | History { pos :: Int, entry :: QueueEntry, err :: Maybe PrintingError } | ||
96 | 104 | ||
97 | instance Eq QueueItem where | 105 | instance Eq QueueItem where |
98 | (Pending i a ) == (Pending j b ) = i == j && a == b | 106 | (Pending i a ) == (Pending j b ) = i == j && a == b |
@@ -149,17 +157,35 @@ runQM :: ( HasQueue q | |||
149 | , MonadTrans t | 157 | , MonadTrans t |
150 | , MonadIO (t IO) | 158 | , MonadIO (t IO) |
151 | , Monad (t STM) | 159 | , Monad (t STM) |
152 | ) => QueueManager t -> q -> t IO () | 160 | ) => TChan JobId -- ^ Channel for deleted 'JobId's ready to be garbage collected |
161 | -> QueueManager t -> q -> t IO () | ||
153 | -- ^ Periodically modify a 'Queue' | 162 | -- ^ Periodically modify a 'Queue' |
154 | -- | 163 | -- |
155 | -- /TODO/: Garbage collect deleted jobs -- maybe switch to 'Set QueueItem' in 'QueueManager' | 164 | -- We use 'fromZipper' to determine which 'QueueEntry's were deleted by the manager and write |
156 | runQM qm (extractQueue -> q) = sleep =<< qm' | 165 | -- those to the garbage collectors channel. |
166 | -- Since there is no crosstalk between managers this works only if the relatien between | ||
167 | -- 'JobId's and the 'QueueManager' responsible for them is a single-valued function | ||
168 | runQM gcChan qm (extractQueue -> q) = sleep =<< qm' | ||
157 | where | 169 | where |
158 | qm' = hoist atomically $ (\(a, s) -> a <$ lift (writeTVar q $!! s)) =<< runStateT (getComposeT qm) =<< lift (readTVar q) | 170 | qm' = hoist atomically $ do |
171 | before <- lift $ readTVar q | ||
172 | (delay, after) <- runStateT (getComposeT qm) before | ||
173 | lift $ writeTVar q $!! after | ||
174 | let | ||
175 | deleted = Set.map (jobId . entry . unPlain) $ (Set.difference `on` (Set.map Plain . fromZipper)) before after | ||
176 | mapM_ (lift . writeTChan gcChan) deleted | ||
177 | return delay | ||
159 | sleep (abs -> delay) | 178 | sleep (abs -> delay) |
160 | | (Finite d) <- delay = liftIO (threadDelay $ fromEnum d) >> runQM qm q | 179 | | (Finite d) <- delay = liftIO (threadDelay $ fromEnum d) >> runQM gcChan qm q |
161 | | otherwise = return () | 180 | | otherwise = return () |
162 | 181 | ||
182 | jobGC :: ( MonadReader ConnectionPool m | ||
183 | , MonadBaseControl IO m | ||
184 | , MonadIO m | ||
185 | ) => TChan JobId -> m () | ||
186 | -- ^ Listen for 'JobId's on a 'TChan' and delete them from the database 'forever' | ||
187 | jobGC gcChan = forever $ liftIO (atomically $ readTChan gcChan) >>= (\jId -> runSqlPool (delete jId) =<< ask) | ||
188 | |||
163 | intersection :: (Foldable f, MonadState Queue (QueueManagerM t)) => f (QueueManager t) -> QueueManager t | 189 | intersection :: (Foldable f, MonadState Queue (QueueManagerM t)) => f (QueueManager t) -> QueueManager t |
164 | -- ^ Combine two 'QueueManager's keeping only 'QueueEntry's both managers decide to keep | 190 | -- ^ Combine two 'QueueManager's keeping only 'QueueEntry's both managers decide to keep |
165 | -- | 191 | -- |