aboutsummaryrefslogtreecommitdiff
path: root/server/src/Thermoprint/Server/Queue.hs
blob: aa26fe3c4295fec5a56a9494e66201a47353c943 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
{-# LANGUAGE FlexibleInstances, FlexibleContexts, MultiParamTypeClasses #-}
{-# LANGUAGE ViewPatterns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE DeriveGeneric, DeriveAnyClass #-}
{-# LANGUAGE ExistentialQuantification #-}

module Thermoprint.Server.Queue
       ( Queue(..), QueueEntry(..)
       , fromZipper, toZipper, QueueItem(..)
       , HasQueue(..)
       , QueueManager, QueueManagerM, runQM
       , jobGC
       , intersection, idQM
       , union, nullQM
       ) where

import           Thermoprint.API (PrintingError(..), Printout)
import qualified Thermoprint.API as API (JobStatus(..))

import           Thermoprint.Server.Database

import           Data.Sequence (Seq, ViewL(..), viewl, (|>), (<|))
import qualified Data.Sequence as Seq
import           Data.Set (Set)
import qualified Data.Set as Set

import           Data.Time
import           Data.ExtendedReal
import           Data.Fixed

import           Control.DeepSeq
import           Data.Typeable (Typeable)
import           GHC.Generics (Generic)

import           Control.Concurrent
import           Control.Concurrent.STM
import           Control.Monad.State

import           Data.Default.Class

import           Control.Monad
import           Control.Monad.Morph
import           Control.Monad.Trans.Compose
import           Control.Monad.Trans.Resource
import           Control.Monad.Reader
import           Data.Function

import           Data.Foldable
import           Data.Monoid
import           Data.Ord

import Database.Persist (delete)
import Database.Persist.Sql (ConnectionPool, runSqlPool)
  
import Test.QuickCheck.Arbitrary (Arbitrary(..), CoArbitrary(..))
import Test.QuickCheck.Gen (Gen, scale)
import Test.QuickCheck.Instances
import Test.QuickCheck.Modifiers (NonNegative(..))

-- | Zipper for 'Seq QueueEntry' with additional support for 'PrintingError' in the section after point
data Queue = Queue
  { pending :: Seq QueueEntry -- ^ Pending jobs, closest last
  , current :: Maybe QueueEntry
  , history :: Seq (QueueEntry, Maybe PrintingError) -- ^ Completed jobs, closest first
  }
  deriving (Typeable, Generic, NFData, Show)

instance Arbitrary Queue where
  arbitrary = Queue
    <$> scale (`div` 2) arbitrary
    <*> arbitrary
    <*> scale (`div` 2) arbitrary

instance CoArbitrary Queue

class HasQueue a where
  extractQueue :: a -> TVar Queue

instance HasQueue (TVar Queue) where
  extractQueue = id

instance Default Queue where
  def = Queue
    { pending = Seq.empty
    , current = Nothing
    , history = Seq.empty
    }

data QueueEntry = QueueEntry
  { jobId :: JobId
  , created :: UTCTime
  }
  deriving (Typeable, Generic, NFData, Eq, Ord, Show)

instance Arbitrary QueueEntry where
  arbitrary = QueueEntry <$> (fromIntegral . getNonNegative <$> (arbitrary :: Gen (NonNegative Integer))) <*> arbitrary

instance CoArbitrary QueueEntry where
  coarbitrary QueueEntry{..} = coarbitrary created . coarbitrary (fromIntegral jobId :: Integer)

data QueueItem = Pending { pos :: Int, entry :: QueueEntry }
               | Current { entry :: QueueEntry }
               | History { pos :: Int, entry :: QueueEntry, err :: Maybe PrintingError }

instance Eq QueueItem where
  (Pending i a  ) == (Pending j b  ) = i == j && a == b
  (Current   a  ) == (Current   b  ) =           a == b
  (History i a _) == (History j b _) = i == j && a == b
  _ == _ = False

instance Ord QueueItem where
  (Pending i a  ) `compare` (Pending j b  ) = compare i j <> compare a b
  (Current   a  ) `compare` (Current   b  ) = compare a b
  (History i a _) `compare` (History j b _) = compare i j <> compare a b
  (Pending _ _  ) `compare` _               = LT
  (Current   _  ) `compare` (Pending _ _  ) = GT
  (Current   _  ) `compare` _               = LT
  (History _ _ _) `compare` _               = GT

newtype PlainQueueItem = Plain { unPlain :: QueueItem }

instance Eq PlainQueueItem where
  (unPlain -> Pending _ a  ) == (unPlain -> Pending _ b  ) = a == b
  (unPlain -> Current   a  ) == (unPlain -> Current   b  ) = a == b
  (unPlain -> History _ a _) == (unPlain -> History _ b _) = a == b
  _ == _ = False

instance Ord PlainQueueItem where
  (unPlain -> Pending _ a  ) <= (unPlain -> Pending _ b  ) = a <= b
  (unPlain -> Current   a  ) <= (unPlain -> Current   b  ) = a <= b
  (unPlain -> History _ a _) <= (unPlain -> History _ b _) = a <= b
  (unPlain -> Current _    ) <= (unPlain -> Pending _ _  ) = False
  (unPlain -> History _ _ _) <= (unPlain -> Pending _ _  ) = False
  (unPlain -> History _ _ _) <= (unPlain -> Current _    ) = False
  (unPlain -> Pending _ _  ) <= _                          = True
  (unPlain -> Current _    ) <= _                          = True

fromZipper :: Queue -> Set QueueItem
fromZipper Queue{..} = foldr Set.insert Set.empty $ mconcat [ Seq.mapWithIndex Pending pending
                                                            , maybe Seq.empty (Seq.singleton . Current) current
                                                            , Seq.mapWithIndex (\i (a, e) -> History i a e) history
                                                            ]

toZipper :: Set QueueItem -> Queue
toZipper = Set.foldl' (flip insert) def
  where
    insert (Pending _ a) q@(Queue{..}) = q { pending = pending |> a }
    insert (Current a) q = q { current = Just a }
    insert (History _ a e) q@(Queue{..}) = q { history = history |> (a, e) }

-- | A queue manager periodically modifies a 'Queue', e.g. for cleanup of old jobs
type QueueManager t = QueueManagerM t (Extended Micro)
type QueueManagerM t = ComposeT (StateT Queue) t STM

runQM :: ( HasQueue q
         , MFunctor t
         , MonadTrans t
         , MonadIO (t IO)
         , Monad (t STM)
         ) => TChan JobId -- ^ Channel for deleted 'JobId's ready to be garbage collected
         -> QueueManager t -> q -> t IO ()
-- ^ Periodically modify a 'Queue'
--
-- We use 'fromZipper' to determine which 'QueueEntry's were deleted by the manager and write
-- those to the garbage collectors channel.
-- Since there is no crosstalk between managers this works only if the relatien between
-- 'JobId's and the 'QueueManager' responsible for them is a single-valued function
runQM gcChan qm (extractQueue -> q) = sleep =<< qm'
  where
    qm' = hoist atomically $ do
      before <- lift $ readTVar q
      (delay, after) <- runStateT (getComposeT qm) before
      lift $ writeTVar q $!! after
      let
        deleted = Set.map (jobId . entry . unPlain) $ (Set.difference `on` (Set.map Plain . fromZipper)) before after
      mapM_ (lift . writeTChan gcChan) deleted
      return delay
    sleep (abs -> delay)
      | (Finite d) <- delay = liftIO (threadDelay $ fromEnum d) >> runQM gcChan qm q
      | otherwise = return ()

jobGC :: ( MonadReader ConnectionPool m
         , MonadBaseControl IO m
         , MonadIO m
         ) => TChan JobId -> m ()
-- ^ Listen for 'JobId's on a 'TChan' and delete them from the database 'forever'
jobGC gcChan = forever $ liftIO (atomically $ readTChan gcChan) >>= (\jId -> runSqlPool (delete jId) =<< ask)

intersection :: (Foldable f, MonadState Queue (QueueManagerM t)) => f (QueueManager t) -> QueueManager t
-- ^ Combine two 'QueueManager's keeping only 'QueueEntry's both managers decide to keep
--
-- Side effects propagate left to right
intersection = foldr' (qmCombine Set.intersection) idQM

idQM :: Monad (QueueManagerM t) => QueueManager t
-- ^ Identity of 'intersect'
idQM = return PosInf

union :: (Foldable f, MonadState Queue (QueueManagerM t)) => f (QueueManager t) -> QueueManager t
-- ^ Combine two 'QueueManager's keeping all 'QueueEntry's either of the managers decides to keep
--
-- Side effects propagate left to right
union = foldr' (qmCombine Set.union) nullQM

nullQM :: MonadState Queue (QueueManagerM t) => QueueManager t
-- ^ Identity of 'union'
nullQM = put def >> return PosInf

qmCombine :: MonadState Queue (QueueManagerM t)
             => (Set PlainQueueItem -> Set PlainQueueItem -> Set PlainQueueItem)
             -> (QueueManager t -> QueueManager t -> QueueManager t)
qmCombine setCombine a b = do
  (d1, s1) <- local a
  (d2, s2) <- local b
  put . toZipper . Set.map unPlain $ on setCombine (Set.map Plain . fromZipper) s1 s2
  return $ min d1 d2
  where
    local x = ((,) <$> get <*> ((,) <$> x <*> get)) >>= (\(oldS, r) -> r <$ put oldS)