From 96d4e2f86bd4a568d0cb68b9b716c8a53113f34c Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Fri, 5 Feb 2016 17:41:12 +0100 Subject: Combination of QueueManagers --- server/src/Thermoprint/Server.hs | 2 + server/src/Thermoprint/Server/Queue.hs | 114 +++++++++++++++++++++++++++------ 2 files changed, 97 insertions(+), 19 deletions(-) (limited to 'server/src') diff --git a/server/src/Thermoprint/Server.hs b/server/src/Thermoprint/Server.hs index 7767c12..8061b20 100644 --- a/server/src/Thermoprint/Server.hs +++ b/server/src/Thermoprint/Server.hs @@ -12,6 +12,7 @@ module Thermoprint.Server , module Data.Default.Class , module Servant.Server.Internal.Enter , module Thermoprint.Server.Printer + , module Thermoprint.Server.Queue ) where import Data.Default.Class @@ -49,6 +50,7 @@ import Thermoprint.API (thermoprintAPI, PrinterId) import Thermoprint.Server.Database import Thermoprint.Server.Printer +import Thermoprint.Server.Queue import qualified Thermoprint.Server.API as API (thermoprintServer) import Thermoprint.Server.API hiding (thermoprintServer) diff --git a/server/src/Thermoprint/Server/Queue.hs b/server/src/Thermoprint/Server/Queue.hs index 68dc7a9..69295bb 100644 --- a/server/src/Thermoprint/Server/Queue.hs +++ b/server/src/Thermoprint/Server/Queue.hs @@ -1,4 +1,4 @@ -{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE FlexibleInstances, FlexibleContexts #-} {-# LANGUAGE ViewPatterns #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE DeriveGeneric, DeriveAnyClass #-} @@ -7,7 +7,9 @@ module Thermoprint.Server.Queue ( Queue(..), QueueEntry(..) , HasQueue(..) - , QueueManager(..), runQM + , QueueManager, QueueManagerM, runQM + , intersection, idQM + , union, nullQM ) where import Thermoprint.API (PrintingError(..), Printout) @@ -15,11 +17,14 @@ import qualified Thermoprint.API as API (JobStatus(..)) import Thermoprint.Server.Database -import Data.Sequence (Seq, ViewL(..), viewl) +import Data.Sequence (Seq, ViewL(..), viewl, (|>), (<|)) import qualified Data.Sequence as Seq +import Data.Set (Set) +import qualified Data.Set as Set import Data.Time -import Data.Time.Clock +import Data.ExtendedReal +import Data.Fixed import Control.DeepSeq (NFData) import Data.Typeable (Typeable) @@ -31,10 +36,14 @@ import Control.Monad.State import Data.Default.Class +import Control.Monad import Control.Monad.Morph import Control.Monad.Trans.Compose +import Data.Foldable +import Data.Function import Data.Monoid +import Data.Ord -- | Zipper for 'Seq QueueEntry' with additional support for 'PrintingError' in the section after point data Queue = Queue @@ -44,17 +53,6 @@ data Queue = Queue } deriving (Typeable, Generic, NFData) -toSeq :: Queue -> Seq (Bool, QueueEntry, Maybe PrintingError) -toSeq Queue{..} = fmap (\x -> (False, x, Nothing)) pending <> maybe Seq.empty (\c -> Seq.singleton (True, c, Nothing)) current <> fmap (\(x, p) -> (False, x, p)) history - -fromSeq :: Seq (Bool, QueueEntry, Maybe PrintingError) -> Queue -fromSeq s = Queue pending' current' history' - where - (fmap (\(_, x, _) -> x) -> pending', pending'') = Seq.breakl (\(c, _, _) -> c) s - (current', history') = case viewl pending'' of - EmptyL -> (Nothing, Seq.empty) - (_, a, _) :< as -> (Just a, fmap (\(_, x, p) -> (x, p)) as) - class HasQueue a where extractQueue :: a -> TVar Queue @@ -72,10 +70,59 @@ data QueueEntry = QueueEntry { jobId :: JobId , created :: UTCTime } - deriving (Typeable, Generic, NFData) + deriving (Typeable, Generic, NFData, Eq, Ord) + +data QueueItem = Pending Int QueueEntry | Current QueueEntry | History Int QueueEntry (Maybe PrintingError) + +instance Eq QueueItem where + (Pending i a ) == (Pending j b ) = i == j && a == b + (Current a ) == (Current b ) = a == b + (History i a _) == (History j b _) = i == j && a == b + _ == _ = False + +instance Ord QueueItem where + (Pending i a ) `compare` (Pending j b ) = compare i j <> compare a b + (Current a ) `compare` (Current b ) = compare a b + (History i a _) `compare` (History j b _) = compare i j <> compare a b + (Pending _ _ ) `compare` _ = LT + (Current _ ) `compare` (Pending _ _ ) = GT + (Current _ ) `compare` _ = LT + (History _ _ _) `compare` _ = GT + +newtype PlainQueueItem = Plain { unPlain :: QueueItem } + +instance Eq PlainQueueItem where + (unPlain -> Pending _ a ) == (unPlain -> Pending _ b ) = a == b + (unPlain -> Current a ) == (unPlain -> Current b ) = a == b + (unPlain -> History _ a _) == (unPlain -> History _ b _) = a == b + _ == _ = False + +instance Ord PlainQueueItem where + (unPlain -> Pending _ a ) <= (unPlain -> Pending _ b ) = a <= b + (unPlain -> Current a ) <= (unPlain -> Current b ) = a <= b + (unPlain -> History _ a _) <= (unPlain -> History _ b _) = a <= b + (unPlain -> Current _ ) <= (unPlain -> Pending _ _ ) = False + (unPlain -> History _ _ _) <= (unPlain -> Pending _ _ ) = False + (unPlain -> History _ _ _) <= (unPlain -> Current _ ) = False + (unPlain -> Pending _ _ ) <= _ = True + (unPlain -> Current _ ) <= _ = True + +fromZipper :: Queue -> Set QueueItem +fromZipper Queue{..} = Set.fromList . toList $ mconcat [ Seq.mapWithIndex Pending pending + , maybe Seq.empty (Seq.singleton . Current) current + , Seq.mapWithIndex (\i (a, e) -> History i a e) history + ] + +toZipper :: Set QueueItem -> Queue +toZipper = Set.foldr' insert def + where + insert (Pending _ a) q@(Queue{..}) = q { pending = pending |> a } + insert (Current a) q = q { current = Just a } + insert (History _ a e) q@(Queue{..}) = q { history = history |> (a, e) } -- | A queue manager periodically modifies a 'Queue', e.g. for cleanup of old jobs -type QueueManager t = ComposeT (StateT Queue) t STM DiffTime +type QueueManager t = QueueManagerM t (Extended Micro) +type QueueManagerM t = ComposeT (StateT Queue) t STM runQM :: ( HasQueue q , MFunctor t @@ -84,7 +131,36 @@ runQM :: ( HasQueue q , Monad (t STM) ) => QueueManager t -> q -> t IO () -- ^ Periodically modify a 'Queue' -runQM qm (extractQueue -> q) = forever $ liftIO . threadDelay . toMicro =<< qm' +runQM qm (extractQueue -> q) = sleep =<< qm' where qm' = hoist atomically $ (\(a, s) -> lift (writeTVar q s) >> return a) =<< runStateT (getComposeT qm) =<< lift (readTVar q) - toMicro = (`div` 10^6) . fromEnum + sleep (abs -> delay) + | (Finite d) <- delay = liftIO (threadDelay $ fromEnum d) >> runQM qm q + | otherwise = return () + +intersection :: (Foldable f, MonadState Queue (QueueManagerM t)) => f (QueueManager t) -> QueueManager t +-- ^ Combine two 'QueueManager's keeping only 'QueueEntry's both managers decide to keep +intersection = foldr' (qmCombine Set.intersection) idQM + +idQM :: Monad (QueueManagerM t) => QueueManager t +-- ^ Identity of 'intersect' +idQM = return PosInf + +union :: (Foldable f, MonadState Queue (QueueManagerM t)) => f (QueueManager t) -> QueueManager t +-- ^ Combine two 'QueueManager's keeping all 'QueueEntry's either of the managers decides to keep +union = foldr' (qmCombine Set.union) nullQM + +nullQM :: MonadState Queue (QueueManagerM t) => QueueManager t +-- ^ Identity of 'union' +nullQM = put def >> return PosInf + +qmCombine :: MonadState Queue (QueueManagerM t) + => (Set PlainQueueItem -> Set PlainQueueItem -> Set PlainQueueItem) + -> (QueueManager t -> QueueManager t -> QueueManager t) +qmCombine setCombine a b = do + (d1, s1) <- local a + (d2, s2) <- local b + put . toZipper . Set.map unPlain $ on setCombine (Set.map Plain . fromZipper) s1 s2 + return $ min d1 d2 + where + local x = ((,) <$> get <*> ((,) <$> x <*> get)) >>= (\(oldS, r) -> r <$ put oldS) -- cgit v1.2.3 From 8ed465db831a534958c05c2670f618fbcef7af38 Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Fri, 5 Feb 2016 17:50:58 +0100 Subject: better docs --- server/src/Thermoprint/Server/Queue.hs | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'server/src') diff --git a/server/src/Thermoprint/Server/Queue.hs b/server/src/Thermoprint/Server/Queue.hs index 69295bb..3c7cce6 100644 --- a/server/src/Thermoprint/Server/Queue.hs +++ b/server/src/Thermoprint/Server/Queue.hs @@ -140,6 +140,8 @@ runQM qm (extractQueue -> q) = sleep =<< qm' intersection :: (Foldable f, MonadState Queue (QueueManagerM t)) => f (QueueManager t) -> QueueManager t -- ^ Combine two 'QueueManager's keeping only 'QueueEntry's both managers decide to keep +-- +-- Side effects propagate left to right intersection = foldr' (qmCombine Set.intersection) idQM idQM :: Monad (QueueManagerM t) => QueueManager t @@ -148,6 +150,8 @@ idQM = return PosInf union :: (Foldable f, MonadState Queue (QueueManagerM t)) => f (QueueManager t) -> QueueManager t -- ^ Combine two 'QueueManager's keeping all 'QueueEntry's either of the managers decides to keep +-- +-- Side effects propagate left to right union = foldr' (qmCombine Set.union) nullQM nullQM :: MonadState Queue (QueueManagerM t) => QueueManager t -- cgit v1.2.3 From 04f1276cb0e7f95056ebb1c336b4b1debdd397da Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Fri, 5 Feb 2016 17:51:49 +0100 Subject: strict queue management --- server/src/Thermoprint/Server/Queue.hs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'server/src') diff --git a/server/src/Thermoprint/Server/Queue.hs b/server/src/Thermoprint/Server/Queue.hs index 3c7cce6..a93498e 100644 --- a/server/src/Thermoprint/Server/Queue.hs +++ b/server/src/Thermoprint/Server/Queue.hs @@ -26,7 +26,7 @@ import Data.Time import Data.ExtendedReal import Data.Fixed -import Control.DeepSeq (NFData) +import Control.DeepSeq import Data.Typeable (Typeable) import GHC.Generics (Generic) @@ -133,7 +133,7 @@ runQM :: ( HasQueue q -- ^ Periodically modify a 'Queue' runQM qm (extractQueue -> q) = sleep =<< qm' where - qm' = hoist atomically $ (\(a, s) -> lift (writeTVar q s) >> return a) =<< runStateT (getComposeT qm) =<< lift (readTVar q) + qm' = hoist atomically $ (\(a, s) -> lift (writeTVar q $!! s) >> return a) =<< runStateT (getComposeT qm) =<< lift (readTVar q) sleep (abs -> delay) | (Finite d) <- delay = liftIO (threadDelay $ fromEnum d) >> runQM qm q | otherwise = return () -- cgit v1.2.3 From 7cf5bfd22b4eaa922b58d777776395173dbc05a3 Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Thu, 11 Feb 2016 20:14:07 +0100 Subject: Export queue morphisms for testing --- server/src/Thermoprint/Server/Queue.hs | 1 + 1 file changed, 1 insertion(+) (limited to 'server/src') diff --git a/server/src/Thermoprint/Server/Queue.hs b/server/src/Thermoprint/Server/Queue.hs index a93498e..a504292 100644 --- a/server/src/Thermoprint/Server/Queue.hs +++ b/server/src/Thermoprint/Server/Queue.hs @@ -6,6 +6,7 @@ module Thermoprint.Server.Queue ( Queue(..), QueueEntry(..) + , fromZipper, toZipper, QueueItem(..) , HasQueue(..) , QueueManager, QueueManagerM, runQM , intersection, idQM -- cgit v1.2.3