{-# LANGUAGE FlexibleInstances, FlexibleContexts, MultiParamTypeClasses #-} {-# LANGUAGE ViewPatterns #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE DeriveGeneric, DeriveAnyClass #-} {-# LANGUAGE ExistentialQuantification #-} module Thermoprint.Server.Queue ( Queue(..), QueueEntry(..) , fromZipper, toZipper, QueueItem(..) , HasQueue(..) , QueueManager, QueueManagerM, runQM , jobGC , intersection, idQM , union, nullQM ) where import Thermoprint.API (PrintingError(..), Printout) import qualified Thermoprint.API as API (JobStatus(..)) import Thermoprint.Server.Database import Data.Sequence (Seq, ViewL(..), viewl, (|>), (<|)) import qualified Data.Sequence as Seq import Data.Set (Set) import qualified Data.Set as Set import Data.Time import Data.ExtendedReal import Data.Fixed import Control.DeepSeq import Data.Typeable (Typeable) import GHC.Generics (Generic) import Control.Concurrent import Control.Concurrent.STM import Control.Monad.State import Data.Default.Class import Control.Monad import Control.Monad.Morph import Control.Monad.Trans.Compose import Control.Monad.Trans.Resource import Control.Monad.Reader import Data.Function import Data.Foldable import Data.Monoid import Data.Ord import Database.Persist (delete) import Database.Persist.Sql (ConnectionPool, runSqlPool) import Test.QuickCheck.Arbitrary (Arbitrary(..), CoArbitrary(..)) import Test.QuickCheck.Gen (Gen, scale) import Test.QuickCheck.Instances import Test.QuickCheck.Modifiers (NonNegative(..)) -- | Zipper for 'Seq QueueEntry' with additional support for 'PrintingError' in the section after point data Queue = Queue { pending :: Seq QueueEntry -- ^ Pending jobs, closest last , current :: Maybe QueueEntry , history :: Seq (QueueEntry, Maybe PrintingError) -- ^ Completed jobs, closest first } deriving (Typeable, Generic, NFData, Show) instance Arbitrary Queue where arbitrary = Queue <$> scale (`div` 2) arbitrary <*> arbitrary <*> scale (`div` 2) arbitrary instance CoArbitrary Queue class HasQueue a where extractQueue :: a -> TVar Queue instance HasQueue (TVar Queue) where extractQueue = id instance Default Queue where def = Queue { pending = Seq.empty , current = Nothing , history = Seq.empty } data QueueEntry = QueueEntry { jobId :: JobId , created :: UTCTime } deriving (Typeable, Generic, NFData, Eq, Ord, Show) instance Arbitrary QueueEntry where arbitrary = QueueEntry <$> (fromIntegral . getNonNegative <$> (arbitrary :: Gen (NonNegative Integer))) <*> arbitrary instance CoArbitrary QueueEntry where coarbitrary QueueEntry{..} = coarbitrary created . coarbitrary (fromIntegral jobId :: Integer) data QueueItem = Pending { pos :: Int, entry :: QueueEntry } | Current { entry :: QueueEntry } | History { pos :: Int, entry :: QueueEntry, err :: Maybe PrintingError } instance Eq QueueItem where (Pending i a ) == (Pending j b ) = i == j && a == b (Current a ) == (Current b ) = a == b (History i a _) == (History j b _) = i == j && a == b _ == _ = False instance Ord QueueItem where (Pending i a ) `compare` (Pending j b ) = compare i j <> compare a b (Current a ) `compare` (Current b ) = compare a b (History i a _) `compare` (History j b _) = compare i j <> compare a b (Pending _ _ ) `compare` _ = LT (Current _ ) `compare` (Pending _ _ ) = GT (Current _ ) `compare` _ = LT (History _ _ _) `compare` _ = GT newtype PlainQueueItem = Plain { unPlain :: QueueItem } instance Eq PlainQueueItem where (unPlain -> Pending _ a ) == (unPlain -> Pending _ b ) = a == b (unPlain -> Current a ) == (unPlain -> Current b ) = a == b (unPlain -> History _ a _) == (unPlain -> History _ b _) = a == b _ == _ = False instance Ord PlainQueueItem where (unPlain -> Pending _ a ) <= (unPlain -> Pending _ b ) = a <= b (unPlain -> Current a ) <= (unPlain -> Current b ) = a <= b (unPlain -> History _ a _) <= (unPlain -> History _ b _) = a <= b (unPlain -> Current _ ) <= (unPlain -> Pending _ _ ) = False (unPlain -> History _ _ _) <= (unPlain -> Pending _ _ ) = False (unPlain -> History _ _ _) <= (unPlain -> Current _ ) = False (unPlain -> Pending _ _ ) <= _ = True (unPlain -> Current _ ) <= _ = True fromZipper :: Queue -> Set QueueItem fromZipper Queue{..} = foldr Set.insert Set.empty $ mconcat [ Seq.mapWithIndex Pending pending , maybe Seq.empty (Seq.singleton . Current) current , Seq.mapWithIndex (\i (a, e) -> History i a e) history ] toZipper :: Set QueueItem -> Queue toZipper = Set.foldl' (flip insert) def where insert (Pending _ a) q@(Queue{..}) = q { pending = pending |> a } insert (Current a) q = q { current = Just a } insert (History _ a e) q@(Queue{..}) = q { history = history |> (a, e) } -- | A queue manager periodically modifies a 'Queue', e.g. for cleanup of old jobs type QueueManager t = QueueManagerM t (Extended Micro) type QueueManagerM t = ComposeT (StateT Queue) t STM runQM :: ( HasQueue q , MFunctor t , MonadTrans t , MonadIO (t IO) , Monad (t STM) ) => TChan JobId -- ^ Channel for deleted 'JobId's ready to be garbage collected -> QueueManager t -> q -> t IO () -- ^ Periodically modify a 'Queue' -- -- We use 'fromZipper' to determine which 'QueueEntry's were deleted by the manager and write -- those to the garbage collectors channel. -- Since there is no crosstalk between managers this works only if the relatien between -- 'JobId's and the 'QueueManager' responsible for them is a single-valued function runQM gcChan qm (extractQueue -> q) = sleep =<< qm' where qm' = hoist atomically $ do before <- lift $ readTVar q (delay, after) <- runStateT (getComposeT qm) before lift $ writeTVar q $!! after let deleted = Set.map (jobId . entry . unPlain) $ (Set.difference `on` (Set.map Plain . fromZipper)) before after mapM_ (lift . writeTChan gcChan) deleted return delay sleep (abs -> delay) | (Finite d) <- delay = liftIO (threadDelay $ fromEnum d) >> runQM gcChan qm q | otherwise = return () jobGC :: ( MonadReader ConnectionPool m , MonadBaseControl IO m , MonadIO m ) => TChan JobId -> m () -- ^ Listen for 'JobId's on a 'TChan' and delete them from the database 'forever' jobGC gcChan = forever $ liftIO (atomically $ readTChan gcChan) >>= (\jId -> runSqlPool (delete jId) =<< ask) intersection :: (Foldable f, MonadState Queue (QueueManagerM t)) => f (QueueManager t) -> QueueManager t -- ^ Combine two 'QueueManager's keeping only 'QueueEntry's both managers decide to keep -- -- Side effects propagate left to right intersection = foldr' (qmCombine Set.intersection) idQM idQM :: Monad (QueueManagerM t) => QueueManager t -- ^ Identity of 'intersect' idQM = return PosInf union :: (Foldable f, MonadState Queue (QueueManagerM t)) => f (QueueManager t) -> QueueManager t -- ^ Combine two 'QueueManager's keeping all 'QueueEntry's either of the managers decides to keep -- -- Side effects propagate left to right union = foldr' (qmCombine Set.union) nullQM nullQM :: MonadState Queue (QueueManagerM t) => QueueManager t -- ^ Identity of 'union' nullQM = put def >> return PosInf qmCombine :: MonadState Queue (QueueManagerM t) => (Set PlainQueueItem -> Set PlainQueueItem -> Set PlainQueueItem) -> (QueueManager t -> QueueManager t -> QueueManager t) qmCombine setCombine a b = do (d1, s1) <- local a (d2, s2) <- local b put . toZipper . Set.map unPlain $ on setCombine (Set.map Plain . fromZipper) s1 s2 return $ min d1 d2 where local x = ((,) <$> get <*> ((,) <$> x <*> get)) >>= (\(oldS, r) -> r <$ put oldS)