From 44a6279b86deecc865f05d2ee519f64f39ac1ccb Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Sun, 31 Jan 2016 15:03:57 +0000 Subject: Recording job creation time in printer queues --- server/src/Thermoprint/Server/API.hs | 41 +++++++++++-------- server/src/Thermoprint/Server/Printer.hs | 67 +++++++++++++++++++------------- server/thermoprint-server.cabal | 31 ++++++++------- server/thermoprint-server.nix | 4 +- spec/src/Thermoprint/API.hs | 45 ++++++++++++++------- spec/thermoprint-spec.cabal | 1 + spec/thermoprint-spec.nix | 4 +- 7 files changed, 117 insertions(+), 76 deletions(-) diff --git a/server/src/Thermoprint/Server/API.hs b/server/src/Thermoprint/Server/API.hs index 4d036ce..add771a 100644 --- a/server/src/Thermoprint/Server/API.hs +++ b/server/src/Thermoprint/Server/API.hs @@ -2,7 +2,6 @@ {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE TemplateHaskell #-} {-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE TupleSections #-} module Thermoprint.Server.API ( ProtoHandler, Handler @@ -59,6 +58,8 @@ import Data.Acquire (with) import Control.Monad.Catch (handle, catch) +import Data.Time + type ProtoHandler = ReaderT HandlerInput (LoggingT (ResourceT IO)) type Handler = EitherT ServantErr ProtoHandler @@ -116,18 +117,18 @@ queue' :: MonadIO m => Printer -> m Queue -- ^ Call 'queue' and handle concurrency queue' = fmap force . liftIO . readTVarIO . queue -extractJobs :: (PrinterId, Queue) -> Seq (API.JobId, JobStatus) +extractJobs :: (PrinterId, Queue) -> Seq (API.JobId, UTCTime, JobStatus) -- ^ Get an API-compatible list of all jobs from a 'Printer' 'Queue' -extractJobs (pId, Queue pending current history) = mconcat [ fmap ((, Queued pId) . castId) pending - , maybe Seq.empty Seq.singleton $ fmap ((, Printing pId) . castId) current - , fmap (bimap castId $ maybe Done Failed) history +extractJobs (pId, Queue pending current history) = mconcat [ fmap (\e -> (castId $ jobId e, created e, Queued pId)) pending + , maybe Seq.empty Seq.singleton $ fmap (\e -> (castId $ jobId e, created e, Printing pId)) current + , fmap (\(e, s) -> (castId $ jobId e, created e, maybe Done Failed $ s)) history ] listPrinters :: Handler (Map PrinterId PrinterStatus) listPrinters = fmap toStatus <$> (mapM (liftIO . readTVarIO . queue) . printers =<< ask) where toStatus (Queue _ Nothing _) = Available - toStatus (Queue _ (Just id) _) = Busy . castId $ fromSqlKey id + toStatus (Queue _ (Just id) _) = Busy . castId $ jobId id queueJob :: Maybe PrinterId -> Printout -> Handler API.JobId queueJob pId printout = lift . fmap castId . withReaderT sqlPool . addToQueue printout . snd =<< lookupPrinter pId @@ -136,31 +137,39 @@ printerStatus :: PrinterId -> Handler PrinterStatus printerStatus = fmap queueToStatus . queue' . snd <=< lookupPrinter . Just where queueToStatus (Queue _ Nothing _) = Available - queueToStatus (Queue _ (Just id) _) = Busy $ castId id - -listJobs :: Maybe PrinterId -> Maybe API.JobId -> Maybe API.JobId -> Handler (Seq (API.JobId, JobStatus)) -listJobs Nothing minId maxId = fmap mconcat . mapM (\pId -> listJobs (Just pId) minId maxId) =<< asks (Map.keys . printers) -listJobs pId minId maxId = fmap (filterJobs . extractJobs) . (\(a, b) -> (,) a <$> queue' b) =<< lookupPrinter pId + queueToStatus (Queue _ (Just c) _) = Busy . castId $ jobId c + +listJobs :: Maybe PrinterId + -> Maybe API.JobId -> Maybe API.JobId + -> Maybe UTCTime -> Maybe UTCTime + -> Handler (Seq (API.JobId, UTCTime, JobStatus)) +listJobs Nothing minId maxId minTime maxTime = fmap mconcat . mapM (\pId -> listJobs (Just pId) minId maxId minTime maxTime) =<< asks (Map.keys . printers) +listJobs pId minId maxId minTime maxTime = fmap (filterJobs . extractJobs) . (\(a, b) -> (,) a <$> queue' b) =<< lookupPrinter pId where - filterJobs = Seq.filter (\(id, _) -> maybe True (< id) minId && maybe True (> id) maxId) + filterJobs = Seq.filter (\(id, time, _) -> and [ maybe True (<= id) minId + , maybe True (>= id) maxId + , maybe True (<= time) minTime + , maybe True (>= time) maxTime + ] + ) getJob :: API.JobId -> Handler Printout getJob jobId = fmap jobContent . maybe (left err404) return =<< runSqlPool (get $ castId jobId) =<< asks sqlPool jobStatus :: API.JobId -> Handler JobStatus -jobStatus jobId = maybe (left err404) return . lookup jobId . toList =<< listJobs Nothing Nothing Nothing +jobStatus jobId = maybe (left err404) return . lookup jobId . map (\(id, _, st) -> (id, st)) . toList =<< listJobs Nothing Nothing Nothing Nothing Nothing abortJob :: API.JobId -> Handler () -abortJob jobId = do +abortJob needle = do printerIds <- asks (Map.keys . printers) found <- fmap or . forM printerIds $ \pId -> do (pId', p) <- lookupPrinter $ Just pId found <- liftIO . atomically $ do current@(Queue pending _ _) <- readTVar $ queue p - let filtered = Seq.filter (/= castId jobId) pending + let filtered = Seq.filter ((/= castId needle) . jobId) pending writeTVar (queue p) $ current { pending = filtered } return . not $ ((==) `on` length) pending filtered - when found . $(logInfo) $ "Removed job #" <> (T.pack $ show (castId jobId :: Integer)) <> " from " <> (T.pack . show $ pId') + when found . $(logInfo) $ "Removed job #" <> (T.pack $ show (castId needle :: Integer)) <> " from " <> (T.pack . show $ pId') return found when (not found) $ left err404 diff --git a/server/src/Thermoprint/Server/Printer.hs b/server/src/Thermoprint/Server/Printer.hs index 67180c4..7f41430 100644 --- a/server/src/Thermoprint/Server/Printer.hs +++ b/server/src/Thermoprint/Server/Printer.hs @@ -12,38 +12,41 @@ module Thermoprint.Server.Printer ( PrinterMethod(..), Printer(..) , printer , Queue(..) + , QueueEntry(..) , runPrinter , addToQueue ) where -import Thermoprint.API (PrintingError(..), Printout) +import Thermoprint.API (PrintingError(..), Printout) import qualified Thermoprint.API as API (JobStatus(..)) -import Thermoprint.Server.Database +import Thermoprint.Server.Database -import Database.Persist -import Database.Persist.Sql +import Database.Persist +import Database.Persist.Sql -import Data.Sequence (Seq, ViewL(..), viewl, (<|)) +import Data.Sequence (Seq, ViewL(..), viewl, (<|), (|>)) import qualified Data.Sequence as Seq -import Data.Map (Map) +import Data.Map (Map) import qualified Data.Map as Map import qualified Data.Text as T (pack) -import Data.Typeable (Typeable) -import GHC.Generics (Generic) -import Control.DeepSeq -import Data.Default.Class +import Data.Typeable (Typeable) +import GHC.Generics (Generic) +import Control.DeepSeq +import Data.Default.Class -import Control.Monad.Trans.Resource -import Control.Monad.IO.Class -import Control.Monad.Logger -import Control.Monad.Reader +import Control.Monad.Trans.Resource +import Control.Monad.IO.Class +import Control.Monad.Logger +import Control.Monad.Reader -import Control.Monad (forever) +import Control.Monad (forever) -import Control.Concurrent.STM +import Control.Concurrent.STM + +import Data.Time.Clock newtype PrinterMethod = PM { unPM :: forall m. (MonadResource m, MonadLogger m) => Printout -> m (Maybe PrintingError) } @@ -52,11 +55,11 @@ data Printer = Printer , queue :: TVar Queue } --- | Zipper for 'Seq JobId' +-- | Zipper for 'Seq QueueEntry' data Queue = Queue - { pending :: Seq JobId -- ^ Pending jobs, closest first - , current :: Maybe JobId - , history :: Seq (JobId, Maybe PrintingError) -- ^ Completed jobs, closest first + { pending :: Seq QueueEntry -- ^ Pending jobs, closest last + , current :: Maybe QueueEntry + , history :: Seq (QueueEntry, Maybe PrintingError) -- ^ Completed jobs, closest first } deriving (Typeable, Generic, NFData) @@ -67,6 +70,12 @@ instance Default Queue where , history = Seq.empty } +data QueueEntry = QueueEntry + { jobId :: JobId + , created :: UTCTime + } + deriving (Typeable, Generic, NFData) + printer :: MonadResource m => m PrinterMethod -> m Printer printer p = Printer <$> p <*> liftIO (newTVarIO def) @@ -80,13 +89,13 @@ runPrinter :: ( MonadReader ConnectionPool m ) => Printer -> m () -- ^ Loop 'forever' pushing entries from the 'Queue' associated to a 'Printer' into its 'print'-method runPrinter Printer{..} = forever $ do - jobId <- atomically' $ do + entry@(QueueEntry{..}) <- atomically' $ do (Queue queuePending Nothing history) <- readTVar queue case viewl queuePending of EmptyL -> retry - (jobId :< remaining) -> do - writeTVar queue $!! Queue remaining (Just jobId) history - return jobId + (current :< remaining) -> do + writeTVar queue $!! Queue remaining (Just current) history + return current job <- runSqlPool (get jobId) =<< ask case job of Nothing -> do @@ -96,7 +105,7 @@ runPrinter Printer{..} = forever $ do $(logInfo) . T.pack $ "Printing " ++ show (unSqlBackendKey . unJobKey $ jobId) printReturn <- (unPM print) (jobContent job) -- We could, at this point, do some exception handling. It was decided that this would be undesirable, because we really don't have any idea what exceptions to catch maybe (return ()) ($(logWarn) . T.pack . (("Error while printing " ++ show (unSqlBackendKey . unJobKey $ jobId) ++ ": ") ++) . show) $ printReturn - atomically' $ modifyTVar' queue (\Queue{..} -> force . Queue pending Nothing $ (jobId, printReturn) <| history) + atomically' $ modifyTVar' queue (\Queue{..} -> force . Queue pending Nothing $ (entry, printReturn) <| history) addToQueue :: ( MonadReader ConnectionPool m , MonadLogger m @@ -105,6 +114,12 @@ addToQueue :: ( MonadReader ConnectionPool m ) => Printout -> Printer -> m JobId addToQueue printout Printer{..} = do jobId <- runSqlPool (insert $ Job printout) =<< ask + time <- liftIO getCurrentTime + let + entry = QueueEntry + { jobId = jobId + , created = time + } $(logInfo) . T.pack $ "Queueing " ++ show (unSqlBackendKey . unJobKey $ jobId) - atomically' $ modifyTVar' queue (\Queue{..} -> force $ Queue (jobId <| pending) current history) + atomically' $ modifyTVar' queue (\Queue{..} -> force $ Queue (pending |> entry) current history) return jobId diff --git a/server/thermoprint-server.cabal b/server/thermoprint-server.cabal index 181bd9a..ebe1055 100644 --- a/server/thermoprint-server.cabal +++ b/server/thermoprint-server.cabal @@ -25,26 +25,27 @@ library other-modules: Thermoprint.Server.Database.Instances -- other-extensions: build-depends: base >=4.8 && <5 - , thermoprint-spec ==3.0.* - , dyre >=0.8.12 && <1 + , conduit >=1.2.6 && <2 + , containers >=0.5.6 && <1 , data-default-class >=0.0.1 && <1 - , wai >=3.0.4 && <4 - , servant-server >=0.4.4 && <1 - , warp >=3.1.9 && <4 + , deepseq >=1.4.1 && <2 + , dyre >=0.8.12 && <1 + , either >=4.4.1 && <5 + , exceptions >=0.8.0 && <1 + , monad-control >=1.0.0 && <2 + , monad-logger >=0.3.13 && <1 + , mtl >=2.2.1 && <3 , persistent >=2.2 && <3 , persistent-template >=2.1.4 && <3 - , transformers >=0.3.0 && <1 - , mtl >=2.2.1 && <3 , resourcet >=1.1.7 && <2 - , monad-logger >=0.3.13 && <1 - , containers >=0.5.6 && <1 - , either >=4.4.1 && <5 - , text >=1.2.1 && <2 + , servant-server >=0.4.4 && <1 , stm >=2.4.4 && <3 - , deepseq >=1.4.1 && <2 - , monad-control >=1.0.0 && <2 - , conduit >=1.2.6 && <2 - , exceptions >=0.8.0 && <1 + , text >=1.2.1 && <2 + , thermoprint-spec ==3.0.* + , time >=1.5.0 && <2 + , transformers >=0.3.0 && <1 + , wai >=3.0.4 && <4 + , warp >=3.1.9 && <4 hs-source-dirs: src default-language: Haskell2010 diff --git a/server/thermoprint-server.nix b/server/thermoprint-server.nix index 8ac5456..afcf2ba 100644 --- a/server/thermoprint-server.nix +++ b/server/thermoprint-server.nix @@ -2,7 +2,7 @@ , deepseq, dyre, either, exceptions, monad-control, monad-logger , mtl, persistent, persistent-sqlite, persistent-template , resourcet, servant-server, stdenv, stm, text, thermoprint-spec -, transformers, wai, warp +, time, transformers, wai, warp }: mkDerivation { pname = "thermoprint-server"; @@ -14,7 +14,7 @@ mkDerivation { base conduit containers data-default-class deepseq dyre either exceptions monad-control monad-logger mtl persistent persistent-template resourcet servant-server stm text - thermoprint-spec transformers wai warp + thermoprint-spec time transformers wai warp ]; executableHaskellDepends = [ base monad-logger mtl persistent-sqlite resourcet diff --git a/spec/src/Thermoprint/API.hs b/spec/src/Thermoprint/API.hs index e3d4b61..4f9d074 100644 --- a/spec/src/Thermoprint/API.hs +++ b/spec/src/Thermoprint/API.hs @@ -14,29 +14,33 @@ module Thermoprint.API , module Thermoprint.Printout ) where -import Thermoprint.Printout -import Thermoprint.Identifiers +import Thermoprint.Printout +import Thermoprint.Identifiers -import Servant.API -import Data.Aeson +import Servant.API +import Data.Aeson -import Data.Set (Set) -import Data.Map (Map) +import Data.Set (Set) +import Data.Map (Map) import qualified Data.Map as Map (foldMapWithKey, singleton) -import Data.Sequence (Seq) +import Data.Sequence (Seq) -import Data.IntMap.Strict (IntMap) +import Data.IntMap.Strict (IntMap) import qualified Data.IntMap.Strict as IntMap (foldMapWithKey, singleton) -import Data.Text (Text) +import Data.Text (Text) +import qualified Data.Text as T -import Data.Typeable (Typeable) -import GHC.Generics (Generic) -import Control.DeepSeq (NFData) +import Data.Typeable (Typeable) +import GHC.Generics (Generic) +import Control.DeepSeq (NFData) -import Data.Proxy (Proxy(..)) +import Data.Proxy (Proxy(..)) -import Control.Exception (Exception) +import Control.Exception (Exception) + +import Data.Time (UTCTime) +import Data.Time.Format instance (Integral k, Ord k, ToJSON v) => ToJSON (Map k v) where toJSON = toJSON . Map.foldMapWithKey (IntMap.singleton . castId) @@ -60,9 +64,20 @@ data PrintingError = UnknownError type DraftTitle = Text +instance FromText UTCTime where + fromText = parseTimeM True defaultTimeLocale "%s%Q" . T.unpack + +instance ToText UTCTime where + toText = T.pack . formatTime defaultTimeLocale "%s%Q" + type ThermoprintAPI = "printers" :> Get '[JSON] (Map PrinterId PrinterStatus) :<|> "jobs" :> ( - QueryParam "printer" PrinterId :> QueryParam "min" JobId :> QueryParam "max" JobId :> Get '[JSON] (Seq (JobId, JobStatus)) + QueryParam "printer" PrinterId + :> QueryParam "min" JobId + :> QueryParam "max" JobId + :> QueryParam "minTime" UTCTime + :> QueryParam "maxTime" UTCTime + :> Get '[JSON] (Seq (JobId, UTCTime, JobStatus)) :<|> QueryParam "printer" PrinterId :> ReqBody '[JSON] Printout :> Post '[JSON] JobId ) :<|> "job" :> Capture "jobId" JobId :> ( diff --git a/spec/thermoprint-spec.cabal b/spec/thermoprint-spec.cabal index da79ee8..7446da3 100644 --- a/spec/thermoprint-spec.cabal +++ b/spec/thermoprint-spec.cabal @@ -44,6 +44,7 @@ library , aeson >=0.9.0 && <1 , base64-bytestring >=1.0.0 && <2 , encoding >=0.8 && <1 + , time >=1.5.0 && <2 -- hs-source-dirs: default-language: Haskell2010 diff --git a/spec/thermoprint-spec.nix b/spec/thermoprint-spec.nix index 1825ddd..b67273a 100644 --- a/spec/thermoprint-spec.nix +++ b/spec/thermoprint-spec.nix @@ -1,6 +1,6 @@ { mkDerivation, aeson, base, base64-bytestring, bytestring, Cabal , cabal-test-quickcheck, containers, deepseq, encoding, hspec -, QuickCheck, quickcheck-instances, servant, stdenv, text +, QuickCheck, quickcheck-instances, servant, stdenv, text, time }: mkDerivation { pname = "thermoprint-spec"; @@ -9,7 +9,7 @@ mkDerivation { libraryHaskellDepends = [ aeson base base64-bytestring bytestring Cabal cabal-test-quickcheck containers deepseq encoding QuickCheck quickcheck-instances servant - text + text time ]; testHaskellDepends = [ aeson base hspec QuickCheck quickcheck-instances -- cgit v1.2.3