aboutsummaryrefslogtreecommitdiff
path: root/server/src/Thermoprint/Server
diff options
context:
space:
mode:
Diffstat (limited to 'server/src/Thermoprint/Server')
-rw-r--r--server/src/Thermoprint/Server/Queue.hs40
1 files changed, 33 insertions, 7 deletions
diff --git a/server/src/Thermoprint/Server/Queue.hs b/server/src/Thermoprint/Server/Queue.hs
index 3c8fb9e..aa26fe3 100644
--- a/server/src/Thermoprint/Server/Queue.hs
+++ b/server/src/Thermoprint/Server/Queue.hs
@@ -9,6 +9,7 @@ module Thermoprint.Server.Queue
9 , fromZipper, toZipper, QueueItem(..) 9 , fromZipper, toZipper, QueueItem(..)
10 , HasQueue(..) 10 , HasQueue(..)
11 , QueueManager, QueueManagerM, runQM 11 , QueueManager, QueueManagerM, runQM
12 , jobGC
12 , intersection, idQM 13 , intersection, idQM
13 , union, nullQM 14 , union, nullQM
14 ) where 15 ) where
@@ -40,16 +41,21 @@ import Data.Default.Class
40import Control.Monad 41import Control.Monad
41import Control.Monad.Morph 42import Control.Monad.Morph
42import Control.Monad.Trans.Compose 43import Control.Monad.Trans.Compose
44import Control.Monad.Trans.Resource
45import Control.Monad.Reader
43import Data.Function 46import Data.Function
44 47
45import Data.Foldable 48import Data.Foldable
46import Data.Monoid 49import Data.Monoid
47import Data.Ord 50import Data.Ord
48 51
52import Database.Persist (delete)
53import Database.Persist.Sql (ConnectionPool, runSqlPool)
54
49import Test.QuickCheck.Arbitrary (Arbitrary(..), CoArbitrary(..)) 55import Test.QuickCheck.Arbitrary (Arbitrary(..), CoArbitrary(..))
50import Test.QuickCheck.Gen (Gen, scale) 56import Test.QuickCheck.Gen (Gen, scale)
51import Test.QuickCheck.Instances 57import Test.QuickCheck.Instances
52import Test.QuickCheck.Modifiers 58import Test.QuickCheck.Modifiers (NonNegative(..))
53 59
54-- | Zipper for 'Seq QueueEntry' with additional support for 'PrintingError' in the section after point 60-- | Zipper for 'Seq QueueEntry' with additional support for 'PrintingError' in the section after point
55data Queue = Queue 61data Queue = Queue
@@ -92,7 +98,9 @@ instance Arbitrary QueueEntry where
92instance CoArbitrary QueueEntry where 98instance CoArbitrary QueueEntry where
93 coarbitrary QueueEntry{..} = coarbitrary created . coarbitrary (fromIntegral jobId :: Integer) 99 coarbitrary QueueEntry{..} = coarbitrary created . coarbitrary (fromIntegral jobId :: Integer)
94 100
95data QueueItem = Pending Int QueueEntry | Current QueueEntry | History Int QueueEntry (Maybe PrintingError) 101data QueueItem = Pending { pos :: Int, entry :: QueueEntry }
102 | Current { entry :: QueueEntry }
103 | History { pos :: Int, entry :: QueueEntry, err :: Maybe PrintingError }
96 104
97instance Eq QueueItem where 105instance Eq QueueItem where
98 (Pending i a ) == (Pending j b ) = i == j && a == b 106 (Pending i a ) == (Pending j b ) = i == j && a == b
@@ -149,17 +157,35 @@ runQM :: ( HasQueue q
149 , MonadTrans t 157 , MonadTrans t
150 , MonadIO (t IO) 158 , MonadIO (t IO)
151 , Monad (t STM) 159 , Monad (t STM)
152 ) => QueueManager t -> q -> t IO () 160 ) => TChan JobId -- ^ Channel for deleted 'JobId's ready to be garbage collected
161 -> QueueManager t -> q -> t IO ()
153-- ^ Periodically modify a 'Queue' 162-- ^ Periodically modify a 'Queue'
154-- 163--
155-- /TODO/: Garbage collect deleted jobs -- maybe switch to 'Set QueueItem' in 'QueueManager' 164-- We use 'fromZipper' to determine which 'QueueEntry's were deleted by the manager and write
156runQM qm (extractQueue -> q) = sleep =<< qm' 165-- those to the garbage collectors channel.
166-- Since there is no crosstalk between managers this works only if the relatien between
167-- 'JobId's and the 'QueueManager' responsible for them is a single-valued function
168runQM gcChan qm (extractQueue -> q) = sleep =<< qm'
157 where 169 where
158 qm' = hoist atomically $ (\(a, s) -> a <$ lift (writeTVar q $!! s)) =<< runStateT (getComposeT qm) =<< lift (readTVar q) 170 qm' = hoist atomically $ do
171 before <- lift $ readTVar q
172 (delay, after) <- runStateT (getComposeT qm) before
173 lift $ writeTVar q $!! after
174 let
175 deleted = Set.map (jobId . entry . unPlain) $ (Set.difference `on` (Set.map Plain . fromZipper)) before after
176 mapM_ (lift . writeTChan gcChan) deleted
177 return delay
159 sleep (abs -> delay) 178 sleep (abs -> delay)
160 | (Finite d) <- delay = liftIO (threadDelay $ fromEnum d) >> runQM qm q 179 | (Finite d) <- delay = liftIO (threadDelay $ fromEnum d) >> runQM gcChan qm q
161 | otherwise = return () 180 | otherwise = return ()
162 181
182jobGC :: ( MonadReader ConnectionPool m
183 , MonadBaseControl IO m
184 , MonadIO m
185 ) => TChan JobId -> m ()
186-- ^ Listen for 'JobId's on a 'TChan' and delete them from the database 'forever'
187jobGC gcChan = forever $ liftIO (atomically $ readTChan gcChan) >>= (\jId -> runSqlPool (delete jId) =<< ask)
188
163intersection :: (Foldable f, MonadState Queue (QueueManagerM t)) => f (QueueManager t) -> QueueManager t 189intersection :: (Foldable f, MonadState Queue (QueueManagerM t)) => f (QueueManager t) -> QueueManager t
164-- ^ Combine two 'QueueManager's keeping only 'QueueEntry's both managers decide to keep 190-- ^ Combine two 'QueueManager's keeping only 'QueueEntry's both managers decide to keep
165-- 191--