aboutsummaryrefslogtreecommitdiff
path: root/server/src
diff options
context:
space:
mode:
authorGregor Kleen <gkleen@yggdrasil.li>2016-02-05 17:41:12 +0100
committerGregor Kleen <gkleen@yggdrasil.li>2016-02-05 17:41:12 +0100
commit96d4e2f86bd4a568d0cb68b9b716c8a53113f34c (patch)
treea2e5eebbe6d31a7bf251c4da96704aebaa6af596 /server/src
parent04e26a5b118155c5dffb817b523bc8eada952b55 (diff)
downloadthermoprint-96d4e2f86bd4a568d0cb68b9b716c8a53113f34c.tar
thermoprint-96d4e2f86bd4a568d0cb68b9b716c8a53113f34c.tar.gz
thermoprint-96d4e2f86bd4a568d0cb68b9b716c8a53113f34c.tar.bz2
thermoprint-96d4e2f86bd4a568d0cb68b9b716c8a53113f34c.tar.xz
thermoprint-96d4e2f86bd4a568d0cb68b9b716c8a53113f34c.zip
Combination of QueueManagers
Diffstat (limited to 'server/src')
-rw-r--r--server/src/Thermoprint/Server.hs2
-rw-r--r--server/src/Thermoprint/Server/Queue.hs114
2 files changed, 97 insertions, 19 deletions
diff --git a/server/src/Thermoprint/Server.hs b/server/src/Thermoprint/Server.hs
index 7767c12..8061b20 100644
--- a/server/src/Thermoprint/Server.hs
+++ b/server/src/Thermoprint/Server.hs
@@ -12,6 +12,7 @@ module Thermoprint.Server
12 , module Data.Default.Class 12 , module Data.Default.Class
13 , module Servant.Server.Internal.Enter 13 , module Servant.Server.Internal.Enter
14 , module Thermoprint.Server.Printer 14 , module Thermoprint.Server.Printer
15 , module Thermoprint.Server.Queue
15 ) where 16 ) where
16 17
17import Data.Default.Class 18import Data.Default.Class
@@ -49,6 +50,7 @@ import Thermoprint.API (thermoprintAPI, PrinterId)
49 50
50import Thermoprint.Server.Database 51import Thermoprint.Server.Database
51import Thermoprint.Server.Printer 52import Thermoprint.Server.Printer
53import Thermoprint.Server.Queue
52import qualified Thermoprint.Server.API as API (thermoprintServer) 54import qualified Thermoprint.Server.API as API (thermoprintServer)
53import Thermoprint.Server.API hiding (thermoprintServer) 55import Thermoprint.Server.API hiding (thermoprintServer)
54 56
diff --git a/server/src/Thermoprint/Server/Queue.hs b/server/src/Thermoprint/Server/Queue.hs
index 68dc7a9..69295bb 100644
--- a/server/src/Thermoprint/Server/Queue.hs
+++ b/server/src/Thermoprint/Server/Queue.hs
@@ -1,4 +1,4 @@
1{-# LANGUAGE FlexibleInstances #-} 1{-# LANGUAGE FlexibleInstances, FlexibleContexts #-}
2{-# LANGUAGE ViewPatterns #-} 2{-# LANGUAGE ViewPatterns #-}
3{-# LANGUAGE RecordWildCards #-} 3{-# LANGUAGE RecordWildCards #-}
4{-# LANGUAGE DeriveGeneric, DeriveAnyClass #-} 4{-# LANGUAGE DeriveGeneric, DeriveAnyClass #-}
@@ -7,7 +7,9 @@
7module Thermoprint.Server.Queue 7module Thermoprint.Server.Queue
8 ( Queue(..), QueueEntry(..) 8 ( Queue(..), QueueEntry(..)
9 , HasQueue(..) 9 , HasQueue(..)
10 , QueueManager(..), runQM 10 , QueueManager, QueueManagerM, runQM
11 , intersection, idQM
12 , union, nullQM
11 ) where 13 ) where
12 14
13import Thermoprint.API (PrintingError(..), Printout) 15import Thermoprint.API (PrintingError(..), Printout)
@@ -15,11 +17,14 @@ import qualified Thermoprint.API as API (JobStatus(..))
15 17
16import Thermoprint.Server.Database 18import Thermoprint.Server.Database
17 19
18import Data.Sequence (Seq, ViewL(..), viewl) 20import Data.Sequence (Seq, ViewL(..), viewl, (|>), (<|))
19import qualified Data.Sequence as Seq 21import qualified Data.Sequence as Seq
22import Data.Set (Set)
23import qualified Data.Set as Set
20 24
21import Data.Time 25import Data.Time
22import Data.Time.Clock 26import Data.ExtendedReal
27import Data.Fixed
23 28
24import Control.DeepSeq (NFData) 29import Control.DeepSeq (NFData)
25import Data.Typeable (Typeable) 30import Data.Typeable (Typeable)
@@ -31,10 +36,14 @@ import Control.Monad.State
31 36
32import Data.Default.Class 37import Data.Default.Class
33 38
39import Control.Monad
34import Control.Monad.Morph 40import Control.Monad.Morph
35import Control.Monad.Trans.Compose 41import Control.Monad.Trans.Compose
42import Data.Foldable
43import Data.Function
36 44
37import Data.Monoid 45import Data.Monoid
46import Data.Ord
38 47
39-- | Zipper for 'Seq QueueEntry' with additional support for 'PrintingError' in the section after point 48-- | Zipper for 'Seq QueueEntry' with additional support for 'PrintingError' in the section after point
40data Queue = Queue 49data Queue = Queue
@@ -44,17 +53,6 @@ data Queue = Queue
44 } 53 }
45 deriving (Typeable, Generic, NFData) 54 deriving (Typeable, Generic, NFData)
46 55
47toSeq :: Queue -> Seq (Bool, QueueEntry, Maybe PrintingError)
48toSeq Queue{..} = fmap (\x -> (False, x, Nothing)) pending <> maybe Seq.empty (\c -> Seq.singleton (True, c, Nothing)) current <> fmap (\(x, p) -> (False, x, p)) history
49
50fromSeq :: Seq (Bool, QueueEntry, Maybe PrintingError) -> Queue
51fromSeq s = Queue pending' current' history'
52 where
53 (fmap (\(_, x, _) -> x) -> pending', pending'') = Seq.breakl (\(c, _, _) -> c) s
54 (current', history') = case viewl pending'' of
55 EmptyL -> (Nothing, Seq.empty)
56 (_, a, _) :< as -> (Just a, fmap (\(_, x, p) -> (x, p)) as)
57
58class HasQueue a where 56class HasQueue a where
59 extractQueue :: a -> TVar Queue 57 extractQueue :: a -> TVar Queue
60 58
@@ -72,10 +70,59 @@ data QueueEntry = QueueEntry
72 { jobId :: JobId 70 { jobId :: JobId
73 , created :: UTCTime 71 , created :: UTCTime
74 } 72 }
75 deriving (Typeable, Generic, NFData) 73 deriving (Typeable, Generic, NFData, Eq, Ord)
74
75data QueueItem = Pending Int QueueEntry | Current QueueEntry | History Int QueueEntry (Maybe PrintingError)
76
77instance Eq QueueItem where
78 (Pending i a ) == (Pending j b ) = i == j && a == b
79 (Current a ) == (Current b ) = a == b
80 (History i a _) == (History j b _) = i == j && a == b
81 _ == _ = False
82
83instance Ord QueueItem where
84 (Pending i a ) `compare` (Pending j b ) = compare i j <> compare a b
85 (Current a ) `compare` (Current b ) = compare a b
86 (History i a _) `compare` (History j b _) = compare i j <> compare a b
87 (Pending _ _ ) `compare` _ = LT
88 (Current _ ) `compare` (Pending _ _ ) = GT
89 (Current _ ) `compare` _ = LT
90 (History _ _ _) `compare` _ = GT
91
92newtype PlainQueueItem = Plain { unPlain :: QueueItem }
93
94instance Eq PlainQueueItem where
95 (unPlain -> Pending _ a ) == (unPlain -> Pending _ b ) = a == b
96 (unPlain -> Current a ) == (unPlain -> Current b ) = a == b
97 (unPlain -> History _ a _) == (unPlain -> History _ b _) = a == b
98 _ == _ = False
99
100instance Ord PlainQueueItem where
101 (unPlain -> Pending _ a ) <= (unPlain -> Pending _ b ) = a <= b
102 (unPlain -> Current a ) <= (unPlain -> Current b ) = a <= b
103 (unPlain -> History _ a _) <= (unPlain -> History _ b _) = a <= b
104 (unPlain -> Current _ ) <= (unPlain -> Pending _ _ ) = False
105 (unPlain -> History _ _ _) <= (unPlain -> Pending _ _ ) = False
106 (unPlain -> History _ _ _) <= (unPlain -> Current _ ) = False
107 (unPlain -> Pending _ _ ) <= _ = True
108 (unPlain -> Current _ ) <= _ = True
109
110fromZipper :: Queue -> Set QueueItem
111fromZipper Queue{..} = Set.fromList . toList $ mconcat [ Seq.mapWithIndex Pending pending
112 , maybe Seq.empty (Seq.singleton . Current) current
113 , Seq.mapWithIndex (\i (a, e) -> History i a e) history
114 ]
115
116toZipper :: Set QueueItem -> Queue
117toZipper = Set.foldr' insert def
118 where
119 insert (Pending _ a) q@(Queue{..}) = q { pending = pending |> a }
120 insert (Current a) q = q { current = Just a }
121 insert (History _ a e) q@(Queue{..}) = q { history = history |> (a, e) }
76 122
77-- | A queue manager periodically modifies a 'Queue', e.g. for cleanup of old jobs 123-- | A queue manager periodically modifies a 'Queue', e.g. for cleanup of old jobs
78type QueueManager t = ComposeT (StateT Queue) t STM DiffTime 124type QueueManager t = QueueManagerM t (Extended Micro)
125type QueueManagerM t = ComposeT (StateT Queue) t STM
79 126
80runQM :: ( HasQueue q 127runQM :: ( HasQueue q
81 , MFunctor t 128 , MFunctor t
@@ -84,7 +131,36 @@ runQM :: ( HasQueue q
84 , Monad (t STM) 131 , Monad (t STM)
85 ) => QueueManager t -> q -> t IO () 132 ) => QueueManager t -> q -> t IO ()
86-- ^ Periodically modify a 'Queue' 133-- ^ Periodically modify a 'Queue'
87runQM qm (extractQueue -> q) = forever $ liftIO . threadDelay . toMicro =<< qm' 134runQM qm (extractQueue -> q) = sleep =<< qm'
88 where 135 where
89 qm' = hoist atomically $ (\(a, s) -> lift (writeTVar q s) >> return a) =<< runStateT (getComposeT qm) =<< lift (readTVar q) 136 qm' = hoist atomically $ (\(a, s) -> lift (writeTVar q s) >> return a) =<< runStateT (getComposeT qm) =<< lift (readTVar q)
90 toMicro = (`div` 10^6) . fromEnum 137 sleep (abs -> delay)
138 | (Finite d) <- delay = liftIO (threadDelay $ fromEnum d) >> runQM qm q
139 | otherwise = return ()
140
141intersection :: (Foldable f, MonadState Queue (QueueManagerM t)) => f (QueueManager t) -> QueueManager t
142-- ^ Combine two 'QueueManager's keeping only 'QueueEntry's both managers decide to keep
143intersection = foldr' (qmCombine Set.intersection) idQM
144
145idQM :: Monad (QueueManagerM t) => QueueManager t
146-- ^ Identity of 'intersect'
147idQM = return PosInf
148
149union :: (Foldable f, MonadState Queue (QueueManagerM t)) => f (QueueManager t) -> QueueManager t
150-- ^ Combine two 'QueueManager's keeping all 'QueueEntry's either of the managers decides to keep
151union = foldr' (qmCombine Set.union) nullQM
152
153nullQM :: MonadState Queue (QueueManagerM t) => QueueManager t
154-- ^ Identity of 'union'
155nullQM = put def >> return PosInf
156
157qmCombine :: MonadState Queue (QueueManagerM t)
158 => (Set PlainQueueItem -> Set PlainQueueItem -> Set PlainQueueItem)
159 -> (QueueManager t -> QueueManager t -> QueueManager t)
160qmCombine setCombine a b = do
161 (d1, s1) <- local a
162 (d2, s2) <- local b
163 put . toZipper . Set.map unPlain $ on setCombine (Set.map Plain . fromZipper) s1 s2
164 return $ min d1 d2
165 where
166 local x = ((,) <$> get <*> ((,) <$> x <*> get)) >>= (\(oldS, r) -> r <$ put oldS)