aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGregor Kleen <gkleen@yggdrasil.li>2016-02-19 15:41:58 +0100
committerGregor Kleen <gkleen@yggdrasil.li>2016-02-19 15:41:58 +0100
commit9e45c04c45aef1fa71815c61512c354d0d5ee3e3 (patch)
tree6720e42254e9ada3747e531a2bd4d2c4a3aca8d7
parent9eeb93a2e333b4aa00b5a617af9b995e0f5dd7cc (diff)
downloadthermoprint-9e45c04c45aef1fa71815c61512c354d0d5ee3e3.tar
thermoprint-9e45c04c45aef1fa71815c61512c354d0d5ee3e3.tar.gz
thermoprint-9e45c04c45aef1fa71815c61512c354d0d5ee3e3.tar.bz2
thermoprint-9e45c04c45aef1fa71815c61512c354d0d5ee3e3.tar.xz
thermoprint-9e45c04c45aef1fa71815c61512c354d0d5ee3e3.zip
Garbage collection of jobs in db
-rw-r--r--server/src/Thermoprint/Server.hs4
-rw-r--r--server/src/Thermoprint/Server/Queue.hs40
-rw-r--r--server/thermoprint-server.cabal4
-rw-r--r--server/thermoprint-server.nix2
4 files changed, 39 insertions, 11 deletions
diff --git a/server/src/Thermoprint/Server.hs b/server/src/Thermoprint/Server.hs
index 4559414..df2d8e9 100644
--- a/server/src/Thermoprint/Server.hs
+++ b/server/src/Thermoprint/Server.hs
@@ -132,7 +132,9 @@ thermoprintServer dyre io = Dyre.wrapMain $ Dyre.defaultParams
132 maybe (return ()) ($(logErrorS) "Dyre" . T.pack) dyreError 132 maybe (return ()) ($(logErrorS) "Dyre" . T.pack) dyreError
133 mapM_ ($(logWarnS) "DB") =<< runSqlPool (runMigrationSilent migrateAll) =<< ask 133 mapM_ ($(logWarnS) "DB") =<< runSqlPool (runMigrationSilent migrateAll) =<< ask
134 forM_ printers $ fork tMgr . runPrinter 134 forM_ printers $ fork tMgr . runPrinter
135 gcChan <- liftIO newTChanIO
136 fork tMgr $ jobGC gcChan
135 let 137 let
136 runQM' (queueManagers -> QMConfig qm nat) printer = unNat nat $ runQM qm printer 138 runQM' (queueManagers -> QMConfig qm nat) printer = unNat nat $ runQM gcChan qm printer
137 mapM_ (fork tMgr . uncurry runQM') $ Map.toList printers 139 mapM_ (fork tMgr . uncurry runQM') $ Map.toList printers
138 liftIO . Warp.runSettings warpSettings . serve thermoprintAPI . flip enter API.thermoprintServer =<< handlerNat printers 140 liftIO . Warp.runSettings warpSettings . serve thermoprintAPI . flip enter API.thermoprintServer =<< handlerNat printers
diff --git a/server/src/Thermoprint/Server/Queue.hs b/server/src/Thermoprint/Server/Queue.hs
index 3c8fb9e..aa26fe3 100644
--- a/server/src/Thermoprint/Server/Queue.hs
+++ b/server/src/Thermoprint/Server/Queue.hs
@@ -9,6 +9,7 @@ module Thermoprint.Server.Queue
9 , fromZipper, toZipper, QueueItem(..) 9 , fromZipper, toZipper, QueueItem(..)
10 , HasQueue(..) 10 , HasQueue(..)
11 , QueueManager, QueueManagerM, runQM 11 , QueueManager, QueueManagerM, runQM
12 , jobGC
12 , intersection, idQM 13 , intersection, idQM
13 , union, nullQM 14 , union, nullQM
14 ) where 15 ) where
@@ -40,16 +41,21 @@ import Data.Default.Class
40import Control.Monad 41import Control.Monad
41import Control.Monad.Morph 42import Control.Monad.Morph
42import Control.Monad.Trans.Compose 43import Control.Monad.Trans.Compose
44import Control.Monad.Trans.Resource
45import Control.Monad.Reader
43import Data.Function 46import Data.Function
44 47
45import Data.Foldable 48import Data.Foldable
46import Data.Monoid 49import Data.Monoid
47import Data.Ord 50import Data.Ord
48 51
52import Database.Persist (delete)
53import Database.Persist.Sql (ConnectionPool, runSqlPool)
54
49import Test.QuickCheck.Arbitrary (Arbitrary(..), CoArbitrary(..)) 55import Test.QuickCheck.Arbitrary (Arbitrary(..), CoArbitrary(..))
50import Test.QuickCheck.Gen (Gen, scale) 56import Test.QuickCheck.Gen (Gen, scale)
51import Test.QuickCheck.Instances 57import Test.QuickCheck.Instances
52import Test.QuickCheck.Modifiers 58import Test.QuickCheck.Modifiers (NonNegative(..))
53 59
54-- | Zipper for 'Seq QueueEntry' with additional support for 'PrintingError' in the section after point 60-- | Zipper for 'Seq QueueEntry' with additional support for 'PrintingError' in the section after point
55data Queue = Queue 61data Queue = Queue
@@ -92,7 +98,9 @@ instance Arbitrary QueueEntry where
92instance CoArbitrary QueueEntry where 98instance CoArbitrary QueueEntry where
93 coarbitrary QueueEntry{..} = coarbitrary created . coarbitrary (fromIntegral jobId :: Integer) 99 coarbitrary QueueEntry{..} = coarbitrary created . coarbitrary (fromIntegral jobId :: Integer)
94 100
95data QueueItem = Pending Int QueueEntry | Current QueueEntry | History Int QueueEntry (Maybe PrintingError) 101data QueueItem = Pending { pos :: Int, entry :: QueueEntry }
102 | Current { entry :: QueueEntry }
103 | History { pos :: Int, entry :: QueueEntry, err :: Maybe PrintingError }
96 104
97instance Eq QueueItem where 105instance Eq QueueItem where
98 (Pending i a ) == (Pending j b ) = i == j && a == b 106 (Pending i a ) == (Pending j b ) = i == j && a == b
@@ -149,17 +157,35 @@ runQM :: ( HasQueue q
149 , MonadTrans t 157 , MonadTrans t
150 , MonadIO (t IO) 158 , MonadIO (t IO)
151 , Monad (t STM) 159 , Monad (t STM)
152 ) => QueueManager t -> q -> t IO () 160 ) => TChan JobId -- ^ Channel for deleted 'JobId's ready to be garbage collected
161 -> QueueManager t -> q -> t IO ()
153-- ^ Periodically modify a 'Queue' 162-- ^ Periodically modify a 'Queue'
154-- 163--
155-- /TODO/: Garbage collect deleted jobs -- maybe switch to 'Set QueueItem' in 'QueueManager' 164-- We use 'fromZipper' to determine which 'QueueEntry's were deleted by the manager and write
156runQM qm (extractQueue -> q) = sleep =<< qm' 165-- those to the garbage collectors channel.
166-- Since there is no crosstalk between managers this works only if the relatien between
167-- 'JobId's and the 'QueueManager' responsible for them is a single-valued function
168runQM gcChan qm (extractQueue -> q) = sleep =<< qm'
157 where 169 where
158 qm' = hoist atomically $ (\(a, s) -> a <$ lift (writeTVar q $!! s)) =<< runStateT (getComposeT qm) =<< lift (readTVar q) 170 qm' = hoist atomically $ do
171 before <- lift $ readTVar q
172 (delay, after) <- runStateT (getComposeT qm) before
173 lift $ writeTVar q $!! after
174 let
175 deleted = Set.map (jobId . entry . unPlain) $ (Set.difference `on` (Set.map Plain . fromZipper)) before after
176 mapM_ (lift . writeTChan gcChan) deleted
177 return delay
159 sleep (abs -> delay) 178 sleep (abs -> delay)
160 | (Finite d) <- delay = liftIO (threadDelay $ fromEnum d) >> runQM qm q 179 | (Finite d) <- delay = liftIO (threadDelay $ fromEnum d) >> runQM gcChan qm q
161 | otherwise = return () 180 | otherwise = return ()
162 181
182jobGC :: ( MonadReader ConnectionPool m
183 , MonadBaseControl IO m
184 , MonadIO m
185 ) => TChan JobId -> m ()
186-- ^ Listen for 'JobId's on a 'TChan' and delete them from the database 'forever'
187jobGC gcChan = forever $ liftIO (atomically $ readTChan gcChan) >>= (\jId -> runSqlPool (delete jId) =<< ask)
188
163intersection :: (Foldable f, MonadState Queue (QueueManagerM t)) => f (QueueManager t) -> QueueManager t 189intersection :: (Foldable f, MonadState Queue (QueueManagerM t)) => f (QueueManager t) -> QueueManager t
164-- ^ Combine two 'QueueManager's keeping only 'QueueEntry's both managers decide to keep 190-- ^ Combine two 'QueueManager's keeping only 'QueueEntry's both managers decide to keep
165-- 191--
diff --git a/server/thermoprint-server.cabal b/server/thermoprint-server.cabal
index d6613ad..e90eb4f 100644
--- a/server/thermoprint-server.cabal
+++ b/server/thermoprint-server.cabal
@@ -2,7 +2,7 @@
2-- documentation, see http://haskell.org/cabal/users-guide/ 2-- documentation, see http://haskell.org/cabal/users-guide/
3 3
4name: thermoprint-server 4name: thermoprint-server
5version: 0.0.0 5version: 1.0.0
6synopsis: Server for thermoprint-spec 6synopsis: Server for thermoprint-spec
7-- description: 7-- description:
8homepage: http://dirty-haskell.org/tags/thermoprint.html 8homepage: http://dirty-haskell.org/tags/thermoprint.html
@@ -65,7 +65,7 @@ Test-Suite tests
65 hs-source-dirs: test 65 hs-source-dirs: test
66 main-is: Spec.hs 66 main-is: Spec.hs
67 build-depends: base >=4.8.1 && <5 67 build-depends: base >=4.8.1 && <5
68 , thermoprint-server ==0.0.* 68 , thermoprint-server ==1.0.*
69 , thermoprint-client ==0.0.* 69 , thermoprint-client ==0.0.*
70 , thermoprint-spec -any 70 , thermoprint-spec -any
71 , hspec >=2.2.1 && <3 71 , hspec >=2.2.1 && <3
diff --git a/server/thermoprint-server.nix b/server/thermoprint-server.nix
index d7a7684..f472cbc 100644
--- a/server/thermoprint-server.nix
+++ b/server/thermoprint-server.nix
@@ -8,7 +8,7 @@
8}: 8}:
9mkDerivation { 9mkDerivation {
10 pname = "thermoprint-server"; 10 pname = "thermoprint-server";
11 version = "0.0.0"; 11 version = "1.0.0";
12 src = ./.; 12 src = ./.;
13 isLibrary = true; 13 isLibrary = true;
14 isExecutable = true; 14 isExecutable = true;