From 9e45c04c45aef1fa71815c61512c354d0d5ee3e3 Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Fri, 19 Feb 2016 15:41:58 +0100 Subject: Garbage collection of jobs in db --- server/src/Thermoprint/Server.hs | 4 +++- server/src/Thermoprint/Server/Queue.hs | 40 ++++++++++++++++++++++++++++------ server/thermoprint-server.cabal | 4 ++-- server/thermoprint-server.nix | 2 +- 4 files changed, 39 insertions(+), 11 deletions(-) (limited to 'server') diff --git a/server/src/Thermoprint/Server.hs b/server/src/Thermoprint/Server.hs index 4559414..df2d8e9 100644 --- a/server/src/Thermoprint/Server.hs +++ b/server/src/Thermoprint/Server.hs @@ -132,7 +132,9 @@ thermoprintServer dyre io = Dyre.wrapMain $ Dyre.defaultParams maybe (return ()) ($(logErrorS) "Dyre" . T.pack) dyreError mapM_ ($(logWarnS) "DB") =<< runSqlPool (runMigrationSilent migrateAll) =<< ask forM_ printers $ fork tMgr . runPrinter + gcChan <- liftIO newTChanIO + fork tMgr $ jobGC gcChan let - runQM' (queueManagers -> QMConfig qm nat) printer = unNat nat $ runQM qm printer + runQM' (queueManagers -> QMConfig qm nat) printer = unNat nat $ runQM gcChan qm printer mapM_ (fork tMgr . uncurry runQM') $ Map.toList printers liftIO . Warp.runSettings warpSettings . serve thermoprintAPI . flip enter API.thermoprintServer =<< handlerNat printers diff --git a/server/src/Thermoprint/Server/Queue.hs b/server/src/Thermoprint/Server/Queue.hs index 3c8fb9e..aa26fe3 100644 --- a/server/src/Thermoprint/Server/Queue.hs +++ b/server/src/Thermoprint/Server/Queue.hs @@ -9,6 +9,7 @@ module Thermoprint.Server.Queue , fromZipper, toZipper, QueueItem(..) , HasQueue(..) , QueueManager, QueueManagerM, runQM + , jobGC , intersection, idQM , union, nullQM ) where @@ -40,16 +41,21 @@ import Data.Default.Class import Control.Monad import Control.Monad.Morph import Control.Monad.Trans.Compose +import Control.Monad.Trans.Resource +import Control.Monad.Reader import Data.Function import Data.Foldable import Data.Monoid import Data.Ord +import Database.Persist (delete) +import Database.Persist.Sql (ConnectionPool, runSqlPool) + import Test.QuickCheck.Arbitrary (Arbitrary(..), CoArbitrary(..)) import Test.QuickCheck.Gen (Gen, scale) import Test.QuickCheck.Instances -import Test.QuickCheck.Modifiers +import Test.QuickCheck.Modifiers (NonNegative(..)) -- | Zipper for 'Seq QueueEntry' with additional support for 'PrintingError' in the section after point data Queue = Queue @@ -92,7 +98,9 @@ instance Arbitrary QueueEntry where instance CoArbitrary QueueEntry where coarbitrary QueueEntry{..} = coarbitrary created . coarbitrary (fromIntegral jobId :: Integer) -data QueueItem = Pending Int QueueEntry | Current QueueEntry | History Int QueueEntry (Maybe PrintingError) +data QueueItem = Pending { pos :: Int, entry :: QueueEntry } + | Current { entry :: QueueEntry } + | History { pos :: Int, entry :: QueueEntry, err :: Maybe PrintingError } instance Eq QueueItem where (Pending i a ) == (Pending j b ) = i == j && a == b @@ -149,17 +157,35 @@ runQM :: ( HasQueue q , MonadTrans t , MonadIO (t IO) , Monad (t STM) - ) => QueueManager t -> q -> t IO () + ) => TChan JobId -- ^ Channel for deleted 'JobId's ready to be garbage collected + -> QueueManager t -> q -> t IO () -- ^ Periodically modify a 'Queue' -- --- /TODO/: Garbage collect deleted jobs -- maybe switch to 'Set QueueItem' in 'QueueManager' -runQM qm (extractQueue -> q) = sleep =<< qm' +-- We use 'fromZipper' to determine which 'QueueEntry's were deleted by the manager and write +-- those to the garbage collectors channel. +-- Since there is no crosstalk between managers this works only if the relatien between +-- 'JobId's and the 'QueueManager' responsible for them is a single-valued function +runQM gcChan qm (extractQueue -> q) = sleep =<< qm' where - qm' = hoist atomically $ (\(a, s) -> a <$ lift (writeTVar q $!! s)) =<< runStateT (getComposeT qm) =<< lift (readTVar q) + qm' = hoist atomically $ do + before <- lift $ readTVar q + (delay, after) <- runStateT (getComposeT qm) before + lift $ writeTVar q $!! after + let + deleted = Set.map (jobId . entry . unPlain) $ (Set.difference `on` (Set.map Plain . fromZipper)) before after + mapM_ (lift . writeTChan gcChan) deleted + return delay sleep (abs -> delay) - | (Finite d) <- delay = liftIO (threadDelay $ fromEnum d) >> runQM qm q + | (Finite d) <- delay = liftIO (threadDelay $ fromEnum d) >> runQM gcChan qm q | otherwise = return () +jobGC :: ( MonadReader ConnectionPool m + , MonadBaseControl IO m + , MonadIO m + ) => TChan JobId -> m () +-- ^ Listen for 'JobId's on a 'TChan' and delete them from the database 'forever' +jobGC gcChan = forever $ liftIO (atomically $ readTChan gcChan) >>= (\jId -> runSqlPool (delete jId) =<< ask) + intersection :: (Foldable f, MonadState Queue (QueueManagerM t)) => f (QueueManager t) -> QueueManager t -- ^ Combine two 'QueueManager's keeping only 'QueueEntry's both managers decide to keep -- diff --git a/server/thermoprint-server.cabal b/server/thermoprint-server.cabal index d6613ad..e90eb4f 100644 --- a/server/thermoprint-server.cabal +++ b/server/thermoprint-server.cabal @@ -2,7 +2,7 @@ -- documentation, see http://haskell.org/cabal/users-guide/ name: thermoprint-server -version: 0.0.0 +version: 1.0.0 synopsis: Server for thermoprint-spec -- description: homepage: http://dirty-haskell.org/tags/thermoprint.html @@ -65,7 +65,7 @@ Test-Suite tests hs-source-dirs: test main-is: Spec.hs build-depends: base >=4.8.1 && <5 - , thermoprint-server ==0.0.* + , thermoprint-server ==1.0.* , thermoprint-client ==0.0.* , thermoprint-spec -any , hspec >=2.2.1 && <3 diff --git a/server/thermoprint-server.nix b/server/thermoprint-server.nix index d7a7684..f472cbc 100644 --- a/server/thermoprint-server.nix +++ b/server/thermoprint-server.nix @@ -8,7 +8,7 @@ }: mkDerivation { pname = "thermoprint-server"; - version = "0.0.0"; + version = "1.0.0"; src = ./.; isLibrary = true; isExecutable = true; -- cgit v1.2.3