diff options
| author | Gregor Kleen <gkleen@yggdrasil.li> | 2016-02-05 17:41:12 +0100 |
|---|---|---|
| committer | Gregor Kleen <gkleen@yggdrasil.li> | 2016-02-05 17:41:12 +0100 |
| commit | 96d4e2f86bd4a568d0cb68b9b716c8a53113f34c (patch) | |
| tree | a2e5eebbe6d31a7bf251c4da96704aebaa6af596 | |
| parent | 04e26a5b118155c5dffb817b523bc8eada952b55 (diff) | |
| download | thermoprint-96d4e2f86bd4a568d0cb68b9b716c8a53113f34c.tar thermoprint-96d4e2f86bd4a568d0cb68b9b716c8a53113f34c.tar.gz thermoprint-96d4e2f86bd4a568d0cb68b9b716c8a53113f34c.tar.bz2 thermoprint-96d4e2f86bd4a568d0cb68b9b716c8a53113f34c.tar.xz thermoprint-96d4e2f86bd4a568d0cb68b9b716c8a53113f34c.zip | |
Combination of QueueManagers
| -rw-r--r-- | server/src/Thermoprint/Server.hs | 2 | ||||
| -rw-r--r-- | server/src/Thermoprint/Server/Queue.hs | 114 | ||||
| -rw-r--r-- | server/thermoprint-server.cabal | 1 | ||||
| -rw-r--r-- | server/thermoprint-server.nix | 8 | ||||
| -rw-r--r-- | shell.nix | 3 |
5 files changed, 103 insertions, 25 deletions
diff --git a/server/src/Thermoprint/Server.hs b/server/src/Thermoprint/Server.hs index 7767c12..8061b20 100644 --- a/server/src/Thermoprint/Server.hs +++ b/server/src/Thermoprint/Server.hs | |||
| @@ -12,6 +12,7 @@ module Thermoprint.Server | |||
| 12 | , module Data.Default.Class | 12 | , module Data.Default.Class |
| 13 | , module Servant.Server.Internal.Enter | 13 | , module Servant.Server.Internal.Enter |
| 14 | , module Thermoprint.Server.Printer | 14 | , module Thermoprint.Server.Printer |
| 15 | , module Thermoprint.Server.Queue | ||
| 15 | ) where | 16 | ) where |
| 16 | 17 | ||
| 17 | import Data.Default.Class | 18 | import Data.Default.Class |
| @@ -49,6 +50,7 @@ import Thermoprint.API (thermoprintAPI, PrinterId) | |||
| 49 | 50 | ||
| 50 | import Thermoprint.Server.Database | 51 | import Thermoprint.Server.Database |
| 51 | import Thermoprint.Server.Printer | 52 | import Thermoprint.Server.Printer |
| 53 | import Thermoprint.Server.Queue | ||
| 52 | import qualified Thermoprint.Server.API as API (thermoprintServer) | 54 | import qualified Thermoprint.Server.API as API (thermoprintServer) |
| 53 | import Thermoprint.Server.API hiding (thermoprintServer) | 55 | import Thermoprint.Server.API hiding (thermoprintServer) |
| 54 | 56 | ||
diff --git a/server/src/Thermoprint/Server/Queue.hs b/server/src/Thermoprint/Server/Queue.hs index 68dc7a9..69295bb 100644 --- a/server/src/Thermoprint/Server/Queue.hs +++ b/server/src/Thermoprint/Server/Queue.hs | |||
| @@ -1,4 +1,4 @@ | |||
| 1 | {-# LANGUAGE FlexibleInstances #-} | 1 | {-# LANGUAGE FlexibleInstances, FlexibleContexts #-} |
| 2 | {-# LANGUAGE ViewPatterns #-} | 2 | {-# LANGUAGE ViewPatterns #-} |
| 3 | {-# LANGUAGE RecordWildCards #-} | 3 | {-# LANGUAGE RecordWildCards #-} |
| 4 | {-# LANGUAGE DeriveGeneric, DeriveAnyClass #-} | 4 | {-# LANGUAGE DeriveGeneric, DeriveAnyClass #-} |
| @@ -7,7 +7,9 @@ | |||
| 7 | module Thermoprint.Server.Queue | 7 | module Thermoprint.Server.Queue |
| 8 | ( Queue(..), QueueEntry(..) | 8 | ( Queue(..), QueueEntry(..) |
| 9 | , HasQueue(..) | 9 | , HasQueue(..) |
| 10 | , QueueManager(..), runQM | 10 | , QueueManager, QueueManagerM, runQM |
| 11 | , intersection, idQM | ||
| 12 | , union, nullQM | ||
| 11 | ) where | 13 | ) where |
| 12 | 14 | ||
| 13 | import Thermoprint.API (PrintingError(..), Printout) | 15 | import Thermoprint.API (PrintingError(..), Printout) |
| @@ -15,11 +17,14 @@ import qualified Thermoprint.API as API (JobStatus(..)) | |||
| 15 | 17 | ||
| 16 | import Thermoprint.Server.Database | 18 | import Thermoprint.Server.Database |
| 17 | 19 | ||
| 18 | import Data.Sequence (Seq, ViewL(..), viewl) | 20 | import Data.Sequence (Seq, ViewL(..), viewl, (|>), (<|)) |
| 19 | import qualified Data.Sequence as Seq | 21 | import qualified Data.Sequence as Seq |
| 22 | import Data.Set (Set) | ||
| 23 | import qualified Data.Set as Set | ||
| 20 | 24 | ||
| 21 | import Data.Time | 25 | import Data.Time |
| 22 | import Data.Time.Clock | 26 | import Data.ExtendedReal |
| 27 | import Data.Fixed | ||
| 23 | 28 | ||
| 24 | import Control.DeepSeq (NFData) | 29 | import Control.DeepSeq (NFData) |
| 25 | import Data.Typeable (Typeable) | 30 | import Data.Typeable (Typeable) |
| @@ -31,10 +36,14 @@ import Control.Monad.State | |||
| 31 | 36 | ||
| 32 | import Data.Default.Class | 37 | import Data.Default.Class |
| 33 | 38 | ||
| 39 | import Control.Monad | ||
| 34 | import Control.Monad.Morph | 40 | import Control.Monad.Morph |
| 35 | import Control.Monad.Trans.Compose | 41 | import Control.Monad.Trans.Compose |
| 42 | import Data.Foldable | ||
| 43 | import Data.Function | ||
| 36 | 44 | ||
| 37 | import Data.Monoid | 45 | import Data.Monoid |
| 46 | import Data.Ord | ||
| 38 | 47 | ||
| 39 | -- | Zipper for 'Seq QueueEntry' with additional support for 'PrintingError' in the section after point | 48 | -- | Zipper for 'Seq QueueEntry' with additional support for 'PrintingError' in the section after point |
| 40 | data Queue = Queue | 49 | data Queue = Queue |
| @@ -44,17 +53,6 @@ data Queue = Queue | |||
| 44 | } | 53 | } |
| 45 | deriving (Typeable, Generic, NFData) | 54 | deriving (Typeable, Generic, NFData) |
| 46 | 55 | ||
| 47 | toSeq :: Queue -> Seq (Bool, QueueEntry, Maybe PrintingError) | ||
| 48 | toSeq Queue{..} = fmap (\x -> (False, x, Nothing)) pending <> maybe Seq.empty (\c -> Seq.singleton (True, c, Nothing)) current <> fmap (\(x, p) -> (False, x, p)) history | ||
| 49 | |||
| 50 | fromSeq :: Seq (Bool, QueueEntry, Maybe PrintingError) -> Queue | ||
| 51 | fromSeq s = Queue pending' current' history' | ||
| 52 | where | ||
| 53 | (fmap (\(_, x, _) -> x) -> pending', pending'') = Seq.breakl (\(c, _, _) -> c) s | ||
| 54 | (current', history') = case viewl pending'' of | ||
| 55 | EmptyL -> (Nothing, Seq.empty) | ||
| 56 | (_, a, _) :< as -> (Just a, fmap (\(_, x, p) -> (x, p)) as) | ||
| 57 | |||
| 58 | class HasQueue a where | 56 | class HasQueue a where |
| 59 | extractQueue :: a -> TVar Queue | 57 | extractQueue :: a -> TVar Queue |
| 60 | 58 | ||
| @@ -72,10 +70,59 @@ data QueueEntry = QueueEntry | |||
| 72 | { jobId :: JobId | 70 | { jobId :: JobId |
| 73 | , created :: UTCTime | 71 | , created :: UTCTime |
| 74 | } | 72 | } |
| 75 | deriving (Typeable, Generic, NFData) | 73 | deriving (Typeable, Generic, NFData, Eq, Ord) |
| 74 | |||
| 75 | data QueueItem = Pending Int QueueEntry | Current QueueEntry | History Int QueueEntry (Maybe PrintingError) | ||
| 76 | |||
| 77 | instance Eq QueueItem where | ||
| 78 | (Pending i a ) == (Pending j b ) = i == j && a == b | ||
| 79 | (Current a ) == (Current b ) = a == b | ||
| 80 | (History i a _) == (History j b _) = i == j && a == b | ||
| 81 | _ == _ = False | ||
| 82 | |||
| 83 | instance Ord QueueItem where | ||
| 84 | (Pending i a ) `compare` (Pending j b ) = compare i j <> compare a b | ||
| 85 | (Current a ) `compare` (Current b ) = compare a b | ||
| 86 | (History i a _) `compare` (History j b _) = compare i j <> compare a b | ||
| 87 | (Pending _ _ ) `compare` _ = LT | ||
| 88 | (Current _ ) `compare` (Pending _ _ ) = GT | ||
| 89 | (Current _ ) `compare` _ = LT | ||
| 90 | (History _ _ _) `compare` _ = GT | ||
| 91 | |||
| 92 | newtype PlainQueueItem = Plain { unPlain :: QueueItem } | ||
| 93 | |||
| 94 | instance Eq PlainQueueItem where | ||
| 95 | (unPlain -> Pending _ a ) == (unPlain -> Pending _ b ) = a == b | ||
| 96 | (unPlain -> Current a ) == (unPlain -> Current b ) = a == b | ||
| 97 | (unPlain -> History _ a _) == (unPlain -> History _ b _) = a == b | ||
| 98 | _ == _ = False | ||
| 99 | |||
| 100 | instance Ord PlainQueueItem where | ||
| 101 | (unPlain -> Pending _ a ) <= (unPlain -> Pending _ b ) = a <= b | ||
| 102 | (unPlain -> Current a ) <= (unPlain -> Current b ) = a <= b | ||
| 103 | (unPlain -> History _ a _) <= (unPlain -> History _ b _) = a <= b | ||
| 104 | (unPlain -> Current _ ) <= (unPlain -> Pending _ _ ) = False | ||
| 105 | (unPlain -> History _ _ _) <= (unPlain -> Pending _ _ ) = False | ||
| 106 | (unPlain -> History _ _ _) <= (unPlain -> Current _ ) = False | ||
| 107 | (unPlain -> Pending _ _ ) <= _ = True | ||
| 108 | (unPlain -> Current _ ) <= _ = True | ||
| 109 | |||
| 110 | fromZipper :: Queue -> Set QueueItem | ||
| 111 | fromZipper Queue{..} = Set.fromList . toList $ mconcat [ Seq.mapWithIndex Pending pending | ||
| 112 | , maybe Seq.empty (Seq.singleton . Current) current | ||
| 113 | , Seq.mapWithIndex (\i (a, e) -> History i a e) history | ||
| 114 | ] | ||
| 115 | |||
| 116 | toZipper :: Set QueueItem -> Queue | ||
| 117 | toZipper = Set.foldr' insert def | ||
| 118 | where | ||
| 119 | insert (Pending _ a) q@(Queue{..}) = q { pending = pending |> a } | ||
| 120 | insert (Current a) q = q { current = Just a } | ||
| 121 | insert (History _ a e) q@(Queue{..}) = q { history = history |> (a, e) } | ||
| 76 | 122 | ||
| 77 | -- | A queue manager periodically modifies a 'Queue', e.g. for cleanup of old jobs | 123 | -- | A queue manager periodically modifies a 'Queue', e.g. for cleanup of old jobs |
| 78 | type QueueManager t = ComposeT (StateT Queue) t STM DiffTime | 124 | type QueueManager t = QueueManagerM t (Extended Micro) |
| 125 | type QueueManagerM t = ComposeT (StateT Queue) t STM | ||
| 79 | 126 | ||
| 80 | runQM :: ( HasQueue q | 127 | runQM :: ( HasQueue q |
| 81 | , MFunctor t | 128 | , MFunctor t |
| @@ -84,7 +131,36 @@ runQM :: ( HasQueue q | |||
| 84 | , Monad (t STM) | 131 | , Monad (t STM) |
| 85 | ) => QueueManager t -> q -> t IO () | 132 | ) => QueueManager t -> q -> t IO () |
| 86 | -- ^ Periodically modify a 'Queue' | 133 | -- ^ Periodically modify a 'Queue' |
| 87 | runQM qm (extractQueue -> q) = forever $ liftIO . threadDelay . toMicro =<< qm' | 134 | runQM qm (extractQueue -> q) = sleep =<< qm' |
| 88 | where | 135 | where |
| 89 | qm' = hoist atomically $ (\(a, s) -> lift (writeTVar q s) >> return a) =<< runStateT (getComposeT qm) =<< lift (readTVar q) | 136 | qm' = hoist atomically $ (\(a, s) -> lift (writeTVar q s) >> return a) =<< runStateT (getComposeT qm) =<< lift (readTVar q) |
| 90 | toMicro = (`div` 10^6) . fromEnum | 137 | sleep (abs -> delay) |
| 138 | | (Finite d) <- delay = liftIO (threadDelay $ fromEnum d) >> runQM qm q | ||
| 139 | | otherwise = return () | ||
| 140 | |||
| 141 | intersection :: (Foldable f, MonadState Queue (QueueManagerM t)) => f (QueueManager t) -> QueueManager t | ||
| 142 | -- ^ Combine two 'QueueManager's keeping only 'QueueEntry's both managers decide to keep | ||
| 143 | intersection = foldr' (qmCombine Set.intersection) idQM | ||
| 144 | |||
| 145 | idQM :: Monad (QueueManagerM t) => QueueManager t | ||
| 146 | -- ^ Identity of 'intersect' | ||
| 147 | idQM = return PosInf | ||
| 148 | |||
| 149 | union :: (Foldable f, MonadState Queue (QueueManagerM t)) => f (QueueManager t) -> QueueManager t | ||
| 150 | -- ^ Combine two 'QueueManager's keeping all 'QueueEntry's either of the managers decides to keep | ||
| 151 | union = foldr' (qmCombine Set.union) nullQM | ||
| 152 | |||
| 153 | nullQM :: MonadState Queue (QueueManagerM t) => QueueManager t | ||
| 154 | -- ^ Identity of 'union' | ||
| 155 | nullQM = put def >> return PosInf | ||
| 156 | |||
| 157 | qmCombine :: MonadState Queue (QueueManagerM t) | ||
| 158 | => (Set PlainQueueItem -> Set PlainQueueItem -> Set PlainQueueItem) | ||
| 159 | -> (QueueManager t -> QueueManager t -> QueueManager t) | ||
| 160 | qmCombine setCombine a b = do | ||
| 161 | (d1, s1) <- local a | ||
| 162 | (d2, s2) <- local b | ||
| 163 | put . toZipper . Set.map unPlain $ on setCombine (Set.map Plain . fromZipper) s1 s2 | ||
| 164 | return $ min d1 d2 | ||
| 165 | where | ||
| 166 | local x = ((,) <$> get <*> ((,) <$> x <*> get)) >>= (\(oldS, r) -> r <$ put oldS) | ||
diff --git a/server/thermoprint-server.cabal b/server/thermoprint-server.cabal index 2eda3b5..124fd11 100644 --- a/server/thermoprint-server.cabal +++ b/server/thermoprint-server.cabal | |||
| @@ -48,6 +48,7 @@ library | |||
| 48 | , wai >=3.0.4 && <4 | 48 | , wai >=3.0.4 && <4 |
| 49 | , warp >=3.1.9 && <4 | 49 | , warp >=3.1.9 && <4 |
| 50 | , mmorph >=1.0.5 && <2 | 50 | , mmorph >=1.0.5 && <2 |
| 51 | , extended-reals >=0.2.1 && <1 | ||
| 51 | hs-source-dirs: src | 52 | hs-source-dirs: src |
| 52 | default-language: Haskell2010 | 53 | default-language: Haskell2010 |
| 53 | 54 | ||
diff --git a/server/thermoprint-server.nix b/server/thermoprint-server.nix index 1cbccd2..41f7198 100644 --- a/server/thermoprint-server.nix +++ b/server/thermoprint-server.nix | |||
| @@ -1,6 +1,6 @@ | |||
| 1 | { mkDerivation, base, conduit, containers, data-default-class | 1 | { mkDerivation, base, conduit, containers, data-default-class |
| 2 | , deepseq, dyre, either, exceptions, mmorph, monad-control | 2 | , deepseq, dyre, either, exceptions, extended-reals, mmorph |
| 3 | , monad-logger, mtl, persistent, persistent-sqlite | 3 | , monad-control, monad-logger, mtl, persistent, persistent-sqlite |
| 4 | , persistent-template, resourcet, servant-server, stdenv, stm, text | 4 | , persistent-template, resourcet, servant-server, stdenv, stm, text |
| 5 | , thermoprint-spec, time, transformers, wai, warp | 5 | , thermoprint-spec, time, transformers, wai, warp |
| 6 | }: | 6 | }: |
| @@ -12,8 +12,8 @@ mkDerivation { | |||
| 12 | isExecutable = true; | 12 | isExecutable = true; |
| 13 | libraryHaskellDepends = [ | 13 | libraryHaskellDepends = [ |
| 14 | base conduit containers data-default-class deepseq dyre either | 14 | base conduit containers data-default-class deepseq dyre either |
| 15 | exceptions mmorph monad-control monad-logger mtl persistent | 15 | exceptions extended-reals mmorph monad-control monad-logger mtl |
| 16 | persistent-template resourcet servant-server stm text | 16 | persistent persistent-template resourcet servant-server stm text |
| 17 | thermoprint-spec time transformers wai warp | 17 | thermoprint-spec time transformers wai warp |
| 18 | ]; | 18 | ]; |
| 19 | executableHaskellDepends = [ | 19 | executableHaskellDepends = [ |
| @@ -6,8 +6,7 @@ let | |||
| 6 | packages = ps: with ps; [ | 6 | packages = ps: with ps; [ |
| 7 | cabal-install hlint cabal2nix | 7 | cabal-install hlint cabal2nix |
| 8 | hspec QuickCheck quickcheck-instances | 8 | hspec QuickCheck quickcheck-instances |
| 9 | aeson-pretty | 9 | ] ++ (builtins.attrValues (import ./default.nix { inherit pkgs; })); |
| 10 | ] ++ (builtins.attrValues (import ./default.nix {})); | ||
| 11 | ghc = pkgs.haskell.packages.${compilerName}.ghcWithPackages packages; | 10 | ghc = pkgs.haskell.packages.${compilerName}.ghcWithPackages packages; |
| 12 | in | 11 | in |
| 13 | pkgs.stdenv.mkDerivation rec { | 12 | pkgs.stdenv.mkDerivation rec { |
