diff options
author | Gregor Kleen <gkleen@yggdrasil.li> | 2016-02-05 17:41:12 +0100 |
---|---|---|
committer | Gregor Kleen <gkleen@yggdrasil.li> | 2016-02-05 17:41:12 +0100 |
commit | 96d4e2f86bd4a568d0cb68b9b716c8a53113f34c (patch) | |
tree | a2e5eebbe6d31a7bf251c4da96704aebaa6af596 | |
parent | 04e26a5b118155c5dffb817b523bc8eada952b55 (diff) | |
download | thermoprint-96d4e2f86bd4a568d0cb68b9b716c8a53113f34c.tar thermoprint-96d4e2f86bd4a568d0cb68b9b716c8a53113f34c.tar.gz thermoprint-96d4e2f86bd4a568d0cb68b9b716c8a53113f34c.tar.bz2 thermoprint-96d4e2f86bd4a568d0cb68b9b716c8a53113f34c.tar.xz thermoprint-96d4e2f86bd4a568d0cb68b9b716c8a53113f34c.zip |
Combination of QueueManagers
-rw-r--r-- | server/src/Thermoprint/Server.hs | 2 | ||||
-rw-r--r-- | server/src/Thermoprint/Server/Queue.hs | 114 | ||||
-rw-r--r-- | server/thermoprint-server.cabal | 1 | ||||
-rw-r--r-- | server/thermoprint-server.nix | 8 | ||||
-rw-r--r-- | shell.nix | 3 |
5 files changed, 103 insertions, 25 deletions
diff --git a/server/src/Thermoprint/Server.hs b/server/src/Thermoprint/Server.hs index 7767c12..8061b20 100644 --- a/server/src/Thermoprint/Server.hs +++ b/server/src/Thermoprint/Server.hs | |||
@@ -12,6 +12,7 @@ module Thermoprint.Server | |||
12 | , module Data.Default.Class | 12 | , module Data.Default.Class |
13 | , module Servant.Server.Internal.Enter | 13 | , module Servant.Server.Internal.Enter |
14 | , module Thermoprint.Server.Printer | 14 | , module Thermoprint.Server.Printer |
15 | , module Thermoprint.Server.Queue | ||
15 | ) where | 16 | ) where |
16 | 17 | ||
17 | import Data.Default.Class | 18 | import Data.Default.Class |
@@ -49,6 +50,7 @@ import Thermoprint.API (thermoprintAPI, PrinterId) | |||
49 | 50 | ||
50 | import Thermoprint.Server.Database | 51 | import Thermoprint.Server.Database |
51 | import Thermoprint.Server.Printer | 52 | import Thermoprint.Server.Printer |
53 | import Thermoprint.Server.Queue | ||
52 | import qualified Thermoprint.Server.API as API (thermoprintServer) | 54 | import qualified Thermoprint.Server.API as API (thermoprintServer) |
53 | import Thermoprint.Server.API hiding (thermoprintServer) | 55 | import Thermoprint.Server.API hiding (thermoprintServer) |
54 | 56 | ||
diff --git a/server/src/Thermoprint/Server/Queue.hs b/server/src/Thermoprint/Server/Queue.hs index 68dc7a9..69295bb 100644 --- a/server/src/Thermoprint/Server/Queue.hs +++ b/server/src/Thermoprint/Server/Queue.hs | |||
@@ -1,4 +1,4 @@ | |||
1 | {-# LANGUAGE FlexibleInstances #-} | 1 | {-# LANGUAGE FlexibleInstances, FlexibleContexts #-} |
2 | {-# LANGUAGE ViewPatterns #-} | 2 | {-# LANGUAGE ViewPatterns #-} |
3 | {-# LANGUAGE RecordWildCards #-} | 3 | {-# LANGUAGE RecordWildCards #-} |
4 | {-# LANGUAGE DeriveGeneric, DeriveAnyClass #-} | 4 | {-# LANGUAGE DeriveGeneric, DeriveAnyClass #-} |
@@ -7,7 +7,9 @@ | |||
7 | module Thermoprint.Server.Queue | 7 | module Thermoprint.Server.Queue |
8 | ( Queue(..), QueueEntry(..) | 8 | ( Queue(..), QueueEntry(..) |
9 | , HasQueue(..) | 9 | , HasQueue(..) |
10 | , QueueManager(..), runQM | 10 | , QueueManager, QueueManagerM, runQM |
11 | , intersection, idQM | ||
12 | , union, nullQM | ||
11 | ) where | 13 | ) where |
12 | 14 | ||
13 | import Thermoprint.API (PrintingError(..), Printout) | 15 | import Thermoprint.API (PrintingError(..), Printout) |
@@ -15,11 +17,14 @@ import qualified Thermoprint.API as API (JobStatus(..)) | |||
15 | 17 | ||
16 | import Thermoprint.Server.Database | 18 | import Thermoprint.Server.Database |
17 | 19 | ||
18 | import Data.Sequence (Seq, ViewL(..), viewl) | 20 | import Data.Sequence (Seq, ViewL(..), viewl, (|>), (<|)) |
19 | import qualified Data.Sequence as Seq | 21 | import qualified Data.Sequence as Seq |
22 | import Data.Set (Set) | ||
23 | import qualified Data.Set as Set | ||
20 | 24 | ||
21 | import Data.Time | 25 | import Data.Time |
22 | import Data.Time.Clock | 26 | import Data.ExtendedReal |
27 | import Data.Fixed | ||
23 | 28 | ||
24 | import Control.DeepSeq (NFData) | 29 | import Control.DeepSeq (NFData) |
25 | import Data.Typeable (Typeable) | 30 | import Data.Typeable (Typeable) |
@@ -31,10 +36,14 @@ import Control.Monad.State | |||
31 | 36 | ||
32 | import Data.Default.Class | 37 | import Data.Default.Class |
33 | 38 | ||
39 | import Control.Monad | ||
34 | import Control.Monad.Morph | 40 | import Control.Monad.Morph |
35 | import Control.Monad.Trans.Compose | 41 | import Control.Monad.Trans.Compose |
42 | import Data.Foldable | ||
43 | import Data.Function | ||
36 | 44 | ||
37 | import Data.Monoid | 45 | import Data.Monoid |
46 | import Data.Ord | ||
38 | 47 | ||
39 | -- | Zipper for 'Seq QueueEntry' with additional support for 'PrintingError' in the section after point | 48 | -- | Zipper for 'Seq QueueEntry' with additional support for 'PrintingError' in the section after point |
40 | data Queue = Queue | 49 | data Queue = Queue |
@@ -44,17 +53,6 @@ data Queue = Queue | |||
44 | } | 53 | } |
45 | deriving (Typeable, Generic, NFData) | 54 | deriving (Typeable, Generic, NFData) |
46 | 55 | ||
47 | toSeq :: Queue -> Seq (Bool, QueueEntry, Maybe PrintingError) | ||
48 | toSeq Queue{..} = fmap (\x -> (False, x, Nothing)) pending <> maybe Seq.empty (\c -> Seq.singleton (True, c, Nothing)) current <> fmap (\(x, p) -> (False, x, p)) history | ||
49 | |||
50 | fromSeq :: Seq (Bool, QueueEntry, Maybe PrintingError) -> Queue | ||
51 | fromSeq s = Queue pending' current' history' | ||
52 | where | ||
53 | (fmap (\(_, x, _) -> x) -> pending', pending'') = Seq.breakl (\(c, _, _) -> c) s | ||
54 | (current', history') = case viewl pending'' of | ||
55 | EmptyL -> (Nothing, Seq.empty) | ||
56 | (_, a, _) :< as -> (Just a, fmap (\(_, x, p) -> (x, p)) as) | ||
57 | |||
58 | class HasQueue a where | 56 | class HasQueue a where |
59 | extractQueue :: a -> TVar Queue | 57 | extractQueue :: a -> TVar Queue |
60 | 58 | ||
@@ -72,10 +70,59 @@ data QueueEntry = QueueEntry | |||
72 | { jobId :: JobId | 70 | { jobId :: JobId |
73 | , created :: UTCTime | 71 | , created :: UTCTime |
74 | } | 72 | } |
75 | deriving (Typeable, Generic, NFData) | 73 | deriving (Typeable, Generic, NFData, Eq, Ord) |
74 | |||
75 | data QueueItem = Pending Int QueueEntry | Current QueueEntry | History Int QueueEntry (Maybe PrintingError) | ||
76 | |||
77 | instance Eq QueueItem where | ||
78 | (Pending i a ) == (Pending j b ) = i == j && a == b | ||
79 | (Current a ) == (Current b ) = a == b | ||
80 | (History i a _) == (History j b _) = i == j && a == b | ||
81 | _ == _ = False | ||
82 | |||
83 | instance Ord QueueItem where | ||
84 | (Pending i a ) `compare` (Pending j b ) = compare i j <> compare a b | ||
85 | (Current a ) `compare` (Current b ) = compare a b | ||
86 | (History i a _) `compare` (History j b _) = compare i j <> compare a b | ||
87 | (Pending _ _ ) `compare` _ = LT | ||
88 | (Current _ ) `compare` (Pending _ _ ) = GT | ||
89 | (Current _ ) `compare` _ = LT | ||
90 | (History _ _ _) `compare` _ = GT | ||
91 | |||
92 | newtype PlainQueueItem = Plain { unPlain :: QueueItem } | ||
93 | |||
94 | instance Eq PlainQueueItem where | ||
95 | (unPlain -> Pending _ a ) == (unPlain -> Pending _ b ) = a == b | ||
96 | (unPlain -> Current a ) == (unPlain -> Current b ) = a == b | ||
97 | (unPlain -> History _ a _) == (unPlain -> History _ b _) = a == b | ||
98 | _ == _ = False | ||
99 | |||
100 | instance Ord PlainQueueItem where | ||
101 | (unPlain -> Pending _ a ) <= (unPlain -> Pending _ b ) = a <= b | ||
102 | (unPlain -> Current a ) <= (unPlain -> Current b ) = a <= b | ||
103 | (unPlain -> History _ a _) <= (unPlain -> History _ b _) = a <= b | ||
104 | (unPlain -> Current _ ) <= (unPlain -> Pending _ _ ) = False | ||
105 | (unPlain -> History _ _ _) <= (unPlain -> Pending _ _ ) = False | ||
106 | (unPlain -> History _ _ _) <= (unPlain -> Current _ ) = False | ||
107 | (unPlain -> Pending _ _ ) <= _ = True | ||
108 | (unPlain -> Current _ ) <= _ = True | ||
109 | |||
110 | fromZipper :: Queue -> Set QueueItem | ||
111 | fromZipper Queue{..} = Set.fromList . toList $ mconcat [ Seq.mapWithIndex Pending pending | ||
112 | , maybe Seq.empty (Seq.singleton . Current) current | ||
113 | , Seq.mapWithIndex (\i (a, e) -> History i a e) history | ||
114 | ] | ||
115 | |||
116 | toZipper :: Set QueueItem -> Queue | ||
117 | toZipper = Set.foldr' insert def | ||
118 | where | ||
119 | insert (Pending _ a) q@(Queue{..}) = q { pending = pending |> a } | ||
120 | insert (Current a) q = q { current = Just a } | ||
121 | insert (History _ a e) q@(Queue{..}) = q { history = history |> (a, e) } | ||
76 | 122 | ||
77 | -- | A queue manager periodically modifies a 'Queue', e.g. for cleanup of old jobs | 123 | -- | A queue manager periodically modifies a 'Queue', e.g. for cleanup of old jobs |
78 | type QueueManager t = ComposeT (StateT Queue) t STM DiffTime | 124 | type QueueManager t = QueueManagerM t (Extended Micro) |
125 | type QueueManagerM t = ComposeT (StateT Queue) t STM | ||
79 | 126 | ||
80 | runQM :: ( HasQueue q | 127 | runQM :: ( HasQueue q |
81 | , MFunctor t | 128 | , MFunctor t |
@@ -84,7 +131,36 @@ runQM :: ( HasQueue q | |||
84 | , Monad (t STM) | 131 | , Monad (t STM) |
85 | ) => QueueManager t -> q -> t IO () | 132 | ) => QueueManager t -> q -> t IO () |
86 | -- ^ Periodically modify a 'Queue' | 133 | -- ^ Periodically modify a 'Queue' |
87 | runQM qm (extractQueue -> q) = forever $ liftIO . threadDelay . toMicro =<< qm' | 134 | runQM qm (extractQueue -> q) = sleep =<< qm' |
88 | where | 135 | where |
89 | qm' = hoist atomically $ (\(a, s) -> lift (writeTVar q s) >> return a) =<< runStateT (getComposeT qm) =<< lift (readTVar q) | 136 | qm' = hoist atomically $ (\(a, s) -> lift (writeTVar q s) >> return a) =<< runStateT (getComposeT qm) =<< lift (readTVar q) |
90 | toMicro = (`div` 10^6) . fromEnum | 137 | sleep (abs -> delay) |
138 | | (Finite d) <- delay = liftIO (threadDelay $ fromEnum d) >> runQM qm q | ||
139 | | otherwise = return () | ||
140 | |||
141 | intersection :: (Foldable f, MonadState Queue (QueueManagerM t)) => f (QueueManager t) -> QueueManager t | ||
142 | -- ^ Combine two 'QueueManager's keeping only 'QueueEntry's both managers decide to keep | ||
143 | intersection = foldr' (qmCombine Set.intersection) idQM | ||
144 | |||
145 | idQM :: Monad (QueueManagerM t) => QueueManager t | ||
146 | -- ^ Identity of 'intersect' | ||
147 | idQM = return PosInf | ||
148 | |||
149 | union :: (Foldable f, MonadState Queue (QueueManagerM t)) => f (QueueManager t) -> QueueManager t | ||
150 | -- ^ Combine two 'QueueManager's keeping all 'QueueEntry's either of the managers decides to keep | ||
151 | union = foldr' (qmCombine Set.union) nullQM | ||
152 | |||
153 | nullQM :: MonadState Queue (QueueManagerM t) => QueueManager t | ||
154 | -- ^ Identity of 'union' | ||
155 | nullQM = put def >> return PosInf | ||
156 | |||
157 | qmCombine :: MonadState Queue (QueueManagerM t) | ||
158 | => (Set PlainQueueItem -> Set PlainQueueItem -> Set PlainQueueItem) | ||
159 | -> (QueueManager t -> QueueManager t -> QueueManager t) | ||
160 | qmCombine setCombine a b = do | ||
161 | (d1, s1) <- local a | ||
162 | (d2, s2) <- local b | ||
163 | put . toZipper . Set.map unPlain $ on setCombine (Set.map Plain . fromZipper) s1 s2 | ||
164 | return $ min d1 d2 | ||
165 | where | ||
166 | local x = ((,) <$> get <*> ((,) <$> x <*> get)) >>= (\(oldS, r) -> r <$ put oldS) | ||
diff --git a/server/thermoprint-server.cabal b/server/thermoprint-server.cabal index 2eda3b5..124fd11 100644 --- a/server/thermoprint-server.cabal +++ b/server/thermoprint-server.cabal | |||
@@ -48,6 +48,7 @@ library | |||
48 | , wai >=3.0.4 && <4 | 48 | , wai >=3.0.4 && <4 |
49 | , warp >=3.1.9 && <4 | 49 | , warp >=3.1.9 && <4 |
50 | , mmorph >=1.0.5 && <2 | 50 | , mmorph >=1.0.5 && <2 |
51 | , extended-reals >=0.2.1 && <1 | ||
51 | hs-source-dirs: src | 52 | hs-source-dirs: src |
52 | default-language: Haskell2010 | 53 | default-language: Haskell2010 |
53 | 54 | ||
diff --git a/server/thermoprint-server.nix b/server/thermoprint-server.nix index 1cbccd2..41f7198 100644 --- a/server/thermoprint-server.nix +++ b/server/thermoprint-server.nix | |||
@@ -1,6 +1,6 @@ | |||
1 | { mkDerivation, base, conduit, containers, data-default-class | 1 | { mkDerivation, base, conduit, containers, data-default-class |
2 | , deepseq, dyre, either, exceptions, mmorph, monad-control | 2 | , deepseq, dyre, either, exceptions, extended-reals, mmorph |
3 | , monad-logger, mtl, persistent, persistent-sqlite | 3 | , monad-control, monad-logger, mtl, persistent, persistent-sqlite |
4 | , persistent-template, resourcet, servant-server, stdenv, stm, text | 4 | , persistent-template, resourcet, servant-server, stdenv, stm, text |
5 | , thermoprint-spec, time, transformers, wai, warp | 5 | , thermoprint-spec, time, transformers, wai, warp |
6 | }: | 6 | }: |
@@ -12,8 +12,8 @@ mkDerivation { | |||
12 | isExecutable = true; | 12 | isExecutable = true; |
13 | libraryHaskellDepends = [ | 13 | libraryHaskellDepends = [ |
14 | base conduit containers data-default-class deepseq dyre either | 14 | base conduit containers data-default-class deepseq dyre either |
15 | exceptions mmorph monad-control monad-logger mtl persistent | 15 | exceptions extended-reals mmorph monad-control monad-logger mtl |
16 | persistent-template resourcet servant-server stm text | 16 | persistent persistent-template resourcet servant-server stm text |
17 | thermoprint-spec time transformers wai warp | 17 | thermoprint-spec time transformers wai warp |
18 | ]; | 18 | ]; |
19 | executableHaskellDepends = [ | 19 | executableHaskellDepends = [ |
@@ -6,8 +6,7 @@ let | |||
6 | packages = ps: with ps; [ | 6 | packages = ps: with ps; [ |
7 | cabal-install hlint cabal2nix | 7 | cabal-install hlint cabal2nix |
8 | hspec QuickCheck quickcheck-instances | 8 | hspec QuickCheck quickcheck-instances |
9 | aeson-pretty | 9 | ] ++ (builtins.attrValues (import ./default.nix { inherit pkgs; })); |
10 | ] ++ (builtins.attrValues (import ./default.nix {})); | ||
11 | ghc = pkgs.haskell.packages.${compilerName}.ghcWithPackages packages; | 10 | ghc = pkgs.haskell.packages.${compilerName}.ghcWithPackages packages; |
12 | in | 11 | in |
13 | pkgs.stdenv.mkDerivation rec { | 12 | pkgs.stdenv.mkDerivation rec { |