diff options
Diffstat (limited to 'server/src/Thermoprint/Server')
-rw-r--r-- | server/src/Thermoprint/Server/Queue.hs | 135 |
1 files changed, 98 insertions, 37 deletions
diff --git a/server/src/Thermoprint/Server/Queue.hs b/server/src/Thermoprint/Server/Queue.hs index 17865b6..832b876 100644 --- a/server/src/Thermoprint/Server/Queue.hs +++ b/server/src/Thermoprint/Server/Queue.hs | |||
@@ -1,4 +1,4 @@ | |||
1 | {-# LANGUAGE FlexibleInstances #-} | 1 | {-# LANGUAGE FlexibleInstances, FlexibleContexts #-} |
2 | {-# LANGUAGE ViewPatterns #-} | 2 | {-# LANGUAGE ViewPatterns #-} |
3 | {-# LANGUAGE RecordWildCards #-} | 3 | {-# LANGUAGE RecordWildCards #-} |
4 | {-# LANGUAGE DeriveGeneric, DeriveAnyClass #-} | 4 | {-# LANGUAGE DeriveGeneric, DeriveAnyClass #-} |
@@ -6,8 +6,11 @@ | |||
6 | 6 | ||
7 | module Thermoprint.Server.Queue | 7 | module Thermoprint.Server.Queue |
8 | ( Queue(..), QueueEntry(..) | 8 | ( Queue(..), QueueEntry(..) |
9 | , fromZipper, toZipper, QueueItem(..) | ||
9 | , HasQueue(..) | 10 | , HasQueue(..) |
10 | , QueueManager(..), runQM | 11 | , QueueManager, QueueManagerM, runQM |
12 | , intersection, idQM | ||
13 | , union, nullQM | ||
11 | ) where | 14 | ) where |
12 | 15 | ||
13 | import Thermoprint.API (PrintingError(..), Printout) | 16 | import Thermoprint.API (PrintingError(..), Printout) |
@@ -17,14 +20,14 @@ import Thermoprint.Server.Database | |||
17 | 20 | ||
18 | import Data.Sequence (Seq, ViewL(..), viewl, (|>), (<|)) | 21 | import Data.Sequence (Seq, ViewL(..), viewl, (|>), (<|)) |
19 | import qualified Data.Sequence as Seq | 22 | import qualified Data.Sequence as Seq |
20 | |||
21 | import Data.Set (Set) | 23 | import Data.Set (Set) |
22 | import qualified Data.Set as Set | 24 | import qualified Data.Set as Set |
23 | 25 | ||
24 | import Data.Time | 26 | import Data.Time |
25 | import Data.Time.Clock | 27 | import Data.ExtendedReal |
28 | import Data.Fixed | ||
26 | 29 | ||
27 | import Control.DeepSeq (NFData) | 30 | import Control.DeepSeq |
28 | import Data.Typeable (Typeable) | 31 | import Data.Typeable (Typeable) |
29 | import GHC.Generics (Generic) | 32 | import GHC.Generics (Generic) |
30 | 33 | ||
@@ -34,11 +37,14 @@ import Control.Monad.State | |||
34 | 37 | ||
35 | import Data.Default.Class | 38 | import Data.Default.Class |
36 | 39 | ||
40 | import Control.Monad | ||
37 | import Control.Monad.Morph | 41 | import Control.Monad.Morph |
38 | import Control.Monad.Trans.Compose | 42 | import Control.Monad.Trans.Compose |
43 | import Data.Function | ||
39 | 44 | ||
40 | import Data.Foldable | 45 | import Data.Foldable |
41 | import Data.Monoid | 46 | import Data.Monoid |
47 | import Data.Ord | ||
42 | 48 | ||
43 | -- | Zipper for 'Seq QueueEntry' with additional support for 'PrintingError' in the section after point | 49 | -- | Zipper for 'Seq QueueEntry' with additional support for 'PrintingError' in the section after point |
44 | data Queue = Queue | 50 | data Queue = Queue |
@@ -48,33 +54,6 @@ data Queue = Queue | |||
48 | } | 54 | } |
49 | deriving (Typeable, Generic, NFData) | 55 | deriving (Typeable, Generic, NFData) |
50 | 56 | ||
51 | data QueueElem = Pending QueueEntry | Current QueueEntry | History QueueEntry (Maybe PrintingError) | ||
52 | |||
53 | instance Eq QueueElem where | ||
54 | (Pending a) == (Pending b) = a == b | ||
55 | (Current a) == (Current b) = a == b | ||
56 | (History a _) == (History b _) = a == b | ||
57 | _ == _ = False | ||
58 | |||
59 | instance Ord QueueElem where | ||
60 | (Pending _) <= _ = True | ||
61 | (Current _) <= (Pending _) = False | ||
62 | (Current _) <= _ = True | ||
63 | (History _ _) <= (History _ _) = True | ||
64 | (History _ _) <= _ = False | ||
65 | |||
66 | toSeq :: Queue -> Set (Int, QueueElem) | ||
67 | toSeq Queue{..} = fromSeq $ fmap Pending pending <> maybe Seq.empty (Seq.singleton . Current) current <> fmap (uncurry History) history | ||
68 | where | ||
69 | fromSeq = Set.fromAscList . toList . Seq.mapWithIndex ((,)) | ||
70 | |||
71 | fromSeq :: Set (Int, QueueElem) -> Queue | ||
72 | fromSeq = foldr' (insert . snd) def | ||
73 | where | ||
74 | insert (Pending e) q@(Queue{..}) = q { pending = pending |> e } | ||
75 | insert (Current e) q = q { current = Just e } | ||
76 | insert (History e p) q@(Queue{..}) = q { history = history |> (e, p) } | ||
77 | |||
78 | class HasQueue a where | 57 | class HasQueue a where |
79 | extractQueue :: a -> TVar Queue | 58 | extractQueue :: a -> TVar Queue |
80 | 59 | ||
@@ -92,10 +71,59 @@ data QueueEntry = QueueEntry | |||
92 | { jobId :: JobId | 71 | { jobId :: JobId |
93 | , created :: UTCTime | 72 | , created :: UTCTime |
94 | } | 73 | } |
95 | deriving (Typeable, Generic, NFData, Eq) | 74 | deriving (Typeable, Generic, NFData, Eq, Ord) |
75 | |||
76 | data QueueItem = Pending Int QueueEntry | Current QueueEntry | History Int QueueEntry (Maybe PrintingError) | ||
77 | |||
78 | instance Eq QueueItem where | ||
79 | (Pending i a ) == (Pending j b ) = i == j && a == b | ||
80 | (Current a ) == (Current b ) = a == b | ||
81 | (History i a _) == (History j b _) = i == j && a == b | ||
82 | _ == _ = False | ||
83 | |||
84 | instance Ord QueueItem where | ||
85 | (Pending i a ) `compare` (Pending j b ) = compare i j <> compare a b | ||
86 | (Current a ) `compare` (Current b ) = compare a b | ||
87 | (History i a _) `compare` (History j b _) = compare i j <> compare a b | ||
88 | (Pending _ _ ) `compare` _ = LT | ||
89 | (Current _ ) `compare` (Pending _ _ ) = GT | ||
90 | (Current _ ) `compare` _ = LT | ||
91 | (History _ _ _) `compare` _ = GT | ||
92 | |||
93 | newtype PlainQueueItem = Plain { unPlain :: QueueItem } | ||
94 | |||
95 | instance Eq PlainQueueItem where | ||
96 | (unPlain -> Pending _ a ) == (unPlain -> Pending _ b ) = a == b | ||
97 | (unPlain -> Current a ) == (unPlain -> Current b ) = a == b | ||
98 | (unPlain -> History _ a _) == (unPlain -> History _ b _) = a == b | ||
99 | _ == _ = False | ||
100 | |||
101 | instance Ord PlainQueueItem where | ||
102 | (unPlain -> Pending _ a ) <= (unPlain -> Pending _ b ) = a <= b | ||
103 | (unPlain -> Current a ) <= (unPlain -> Current b ) = a <= b | ||
104 | (unPlain -> History _ a _) <= (unPlain -> History _ b _) = a <= b | ||
105 | (unPlain -> Current _ ) <= (unPlain -> Pending _ _ ) = False | ||
106 | (unPlain -> History _ _ _) <= (unPlain -> Pending _ _ ) = False | ||
107 | (unPlain -> History _ _ _) <= (unPlain -> Current _ ) = False | ||
108 | (unPlain -> Pending _ _ ) <= _ = True | ||
109 | (unPlain -> Current _ ) <= _ = True | ||
110 | |||
111 | fromZipper :: Queue -> Set QueueItem | ||
112 | fromZipper Queue{..} = Set.fromList . toList $ mconcat [ Seq.mapWithIndex Pending pending | ||
113 | , maybe Seq.empty (Seq.singleton . Current) current | ||
114 | , Seq.mapWithIndex (\i (a, e) -> History i a e) history | ||
115 | ] | ||
116 | |||
117 | toZipper :: Set QueueItem -> Queue | ||
118 | toZipper = Set.foldr' insert def | ||
119 | where | ||
120 | insert (Pending _ a) q@(Queue{..}) = q { pending = pending |> a } | ||
121 | insert (Current a) q = q { current = Just a } | ||
122 | insert (History _ a e) q@(Queue{..}) = q { history = history |> (a, e) } | ||
96 | 123 | ||
97 | -- | A queue manager periodically modifies a 'Queue', e.g. for cleanup of old jobs | 124 | -- | A queue manager periodically modifies a 'Queue', e.g. for cleanup of old jobs |
98 | type QueueManager t = ComposeT (StateT Queue) t STM DiffTime | 125 | type QueueManager t = QueueManagerM t (Extended Micro) |
126 | type QueueManagerM t = ComposeT (StateT Queue) t STM | ||
99 | 127 | ||
100 | runQM :: ( HasQueue q | 128 | runQM :: ( HasQueue q |
101 | , MFunctor t | 129 | , MFunctor t |
@@ -104,7 +132,40 @@ runQM :: ( HasQueue q | |||
104 | , Monad (t STM) | 132 | , Monad (t STM) |
105 | ) => QueueManager t -> q -> t IO () | 133 | ) => QueueManager t -> q -> t IO () |
106 | -- ^ Periodically modify a 'Queue' | 134 | -- ^ Periodically modify a 'Queue' |
107 | runQM qm (extractQueue -> q) = forever $ liftIO . threadDelay . toMicro =<< qm' | 135 | runQM qm (extractQueue -> q) = sleep =<< qm' |
136 | where | ||
137 | qm' = hoist atomically $ (\(a, s) -> lift (writeTVar q $!! s) >> return a) =<< runStateT (getComposeT qm) =<< lift (readTVar q) | ||
138 | sleep (abs -> delay) | ||
139 | | (Finite d) <- delay = liftIO (threadDelay $ fromEnum d) >> runQM qm q | ||
140 | | otherwise = return () | ||
141 | |||
142 | intersection :: (Foldable f, MonadState Queue (QueueManagerM t)) => f (QueueManager t) -> QueueManager t | ||
143 | -- ^ Combine two 'QueueManager's keeping only 'QueueEntry's both managers decide to keep | ||
144 | -- | ||
145 | -- Side effects propagate left to right | ||
146 | intersection = foldr' (qmCombine Set.intersection) idQM | ||
147 | |||
148 | idQM :: Monad (QueueManagerM t) => QueueManager t | ||
149 | -- ^ Identity of 'intersect' | ||
150 | idQM = return PosInf | ||
151 | |||
152 | union :: (Foldable f, MonadState Queue (QueueManagerM t)) => f (QueueManager t) -> QueueManager t | ||
153 | -- ^ Combine two 'QueueManager's keeping all 'QueueEntry's either of the managers decides to keep | ||
154 | -- | ||
155 | -- Side effects propagate left to right | ||
156 | union = foldr' (qmCombine Set.union) nullQM | ||
157 | |||
158 | nullQM :: MonadState Queue (QueueManagerM t) => QueueManager t | ||
159 | -- ^ Identity of 'union' | ||
160 | nullQM = put def >> return PosInf | ||
161 | |||
162 | qmCombine :: MonadState Queue (QueueManagerM t) | ||
163 | => (Set PlainQueueItem -> Set PlainQueueItem -> Set PlainQueueItem) | ||
164 | -> (QueueManager t -> QueueManager t -> QueueManager t) | ||
165 | qmCombine setCombine a b = do | ||
166 | (d1, s1) <- local a | ||
167 | (d2, s2) <- local b | ||
168 | put . toZipper . Set.map unPlain $ on setCombine (Set.map Plain . fromZipper) s1 s2 | ||
169 | return $ min d1 d2 | ||
108 | where | 170 | where |
109 | qm' = hoist atomically $ (\(a, s) -> lift (writeTVar q s) >> return a) =<< runStateT (getComposeT qm) =<< lift (readTVar q) | 171 | local x = ((,) <$> get <*> ((,) <$> x <*> get)) >>= (\(oldS, r) -> r <$ put oldS) |
110 | toMicro = (`div` 10^6) . fromEnum | ||