diff options
author | Gregor Kleen <gkleen@yggdrasil.li> | 2016-02-22 20:22:42 +0000 |
---|---|---|
committer | Gregor Kleen <gkleen@yggdrasil.li> | 2016-02-22 20:22:42 +0000 |
commit | 760027dbcd7185be038299efb18e0cc37c8088c4 (patch) | |
tree | 818a7b5700c904530a633da5139d1a0ee237eba4 /server/src/Thermoprint | |
parent | 6dfb26d6f2966b98c278afd3e269826c96c0ab26 (diff) | |
download | thermoprint-760027dbcd7185be038299efb18e0cc37c8088c4.tar thermoprint-760027dbcd7185be038299efb18e0cc37c8088c4.tar.gz thermoprint-760027dbcd7185be038299efb18e0cc37c8088c4.tar.bz2 thermoprint-760027dbcd7185be038299efb18e0cc37c8088c4.tar.xz thermoprint-760027dbcd7185be038299efb18e0cc37c8088c4.zip |
Websocket based push notifications
Diffstat (limited to 'server/src/Thermoprint')
-rw-r--r-- | server/src/Thermoprint/Server.hs | 28 | ||||
-rw-r--r-- | server/src/Thermoprint/Server/API.hs | 19 | ||||
-rw-r--r-- | server/src/Thermoprint/Server/Push.hs | 59 |
3 files changed, 99 insertions, 7 deletions
diff --git a/server/src/Thermoprint/Server.hs b/server/src/Thermoprint/Server.hs index df2d8e9..446c63e 100644 --- a/server/src/Thermoprint/Server.hs +++ b/server/src/Thermoprint/Server.hs | |||
@@ -6,6 +6,7 @@ | |||
6 | {-# LANGUAGE ImpredicativeTypes #-} | 6 | {-# LANGUAGE ImpredicativeTypes #-} |
7 | {-# LANGUAGE ExistentialQuantification #-} | 7 | {-# LANGUAGE ExistentialQuantification #-} |
8 | {-# LANGUAGE ViewPatterns #-} | 8 | {-# LANGUAGE ViewPatterns #-} |
9 | {-# LANGUAGE DataKinds #-} | ||
9 | 10 | ||
10 | module Thermoprint.Server | 11 | module Thermoprint.Server |
11 | ( thermoprintServer | 12 | ( thermoprintServer |
@@ -25,9 +26,17 @@ import qualified Data.Map as Map | |||
25 | 26 | ||
26 | import Data.Set (Set) | 27 | import Data.Set (Set) |
27 | import qualified Data.Set as Set | 28 | import qualified Data.Set as Set |
29 | |||
30 | import Data.Sequence (Seq) | ||
31 | import qualified Data.Sequence as Seq | ||
32 | |||
33 | import Data.Time (UTCTime) | ||
28 | 34 | ||
29 | import Data.Maybe (maybe) | 35 | import Data.Maybe (maybe) |
30 | import Data.Foldable (mapM_, forM_, foldlM) | 36 | import Data.Foldable (mapM_, forM_, foldlM) |
37 | import Data.Function hiding (id, (.)) | ||
38 | import Data.Bifunctor | ||
39 | import Data.Proxy | ||
31 | 40 | ||
32 | import Control.Monad.Trans.Resource | 41 | import Control.Monad.Trans.Resource |
33 | import Control.Monad.Trans.Control | 42 | import Control.Monad.Trans.Control |
@@ -53,14 +62,20 @@ import Network.Wai (Application) | |||
53 | 62 | ||
54 | import Servant.Server (serve) | 63 | import Servant.Server (serve) |
55 | import Servant.Server.Internal.Enter (enter, (:~>)(..)) | 64 | import Servant.Server.Internal.Enter (enter, (:~>)(..)) |
65 | import Servant.API | ||
66 | import Servant.Utils.Links | ||
67 | import Network.URI | ||
56 | 68 | ||
57 | import Database.Persist.Sql (runMigrationSilent, ConnectionPool, runSqlPool) | 69 | import Database.Persist.Sql (runMigrationSilent, ConnectionPool, runSqlPool) |
58 | 70 | ||
59 | 71 | ||
60 | import Thermoprint.API (thermoprintAPI, PrinterId) | 72 | import Thermoprint.API (thermoprintAPI, PrinterStatus, JobStatus) |
73 | import qualified Thermoprint.API as API (PrinterId, JobId) | ||
61 | 74 | ||
62 | import Thermoprint.Server.Fork | 75 | import Thermoprint.Server.Fork |
63 | 76 | ||
77 | import Thermoprint.Server.Push | ||
78 | |||
64 | import Thermoprint.Server.Database | 79 | import Thermoprint.Server.Database |
65 | import Thermoprint.Server.Printer | 80 | import Thermoprint.Server.Printer |
66 | import Thermoprint.Server.Queue | 81 | import Thermoprint.Server.Queue |
@@ -72,8 +87,8 @@ import Debug.Trace | |||
72 | -- | Compile-time configuration for 'thermoprintServer' | 87 | -- | Compile-time configuration for 'thermoprintServer' |
73 | data Config m = Config { dyreError :: Maybe String -- ^ Set by 'Dyre' -- sent to log as an error | 88 | data Config m = Config { dyreError :: Maybe String -- ^ Set by 'Dyre' -- sent to log as an error |
74 | , warpSettings :: Warp.Settings -- ^ Configure 'Warp's behaviour | 89 | , warpSettings :: Warp.Settings -- ^ Configure 'Warp's behaviour |
75 | , printers :: Map PrinterId Printer | 90 | , printers :: Map API.PrinterId Printer |
76 | , queueManagers :: PrinterId -> QMConfig m | 91 | , queueManagers :: API.PrinterId -> QMConfig m |
77 | } | 92 | } |
78 | 93 | ||
79 | data QMConfig m = forall t. ( MonadTrans t | 94 | data QMConfig m = forall t. ( MonadTrans t |
@@ -137,4 +152,9 @@ thermoprintServer dyre io = Dyre.wrapMain $ Dyre.defaultParams | |||
137 | let | 152 | let |
138 | runQM' (queueManagers -> QMConfig qm nat) printer = unNat nat $ runQM gcChan qm printer | 153 | runQM' (queueManagers -> QMConfig qm nat) printer = unNat nat $ runQM gcChan qm printer |
139 | mapM_ (fork tMgr . uncurry runQM') $ Map.toList printers | 154 | mapM_ (fork tMgr . uncurry runQM') $ Map.toList printers |
140 | liftIO . Warp.runSettings warpSettings . serve thermoprintAPI . flip enter API.thermoprintServer =<< handlerNat printers | 155 | nChan <- liftIO $ newBroadcastTChanIO |
156 | let | ||
157 | printerUrl :: API.PrinterId -> URI | ||
158 | printerUrl = safeLink thermoprintAPI (Proxy :: Proxy ("jobs" :> QueryParam "printer" API.PrinterId :> Get '[JSON] (Seq (API.JobId, UTCTime, JobStatus)))) | ||
159 | mapM_ (fork tMgr . uncurry (notifyOnChange nChan ((==) `on` fromZipper)) . bimap printerUrl queue) $ Map.toList printers | ||
160 | liftIO . Warp.runSettings warpSettings . withPush nChan . serve thermoprintAPI . flip enter API.thermoprintServer =<< handlerNat printers nChan | ||
diff --git a/server/src/Thermoprint/Server/API.hs b/server/src/Thermoprint/Server/API.hs index 770737a..cbf727c 100644 --- a/server/src/Thermoprint/Server/API.hs +++ b/server/src/Thermoprint/Server/API.hs | |||
@@ -2,6 +2,7 @@ | |||
2 | {-# LANGUAGE FlexibleContexts #-} | 2 | {-# LANGUAGE FlexibleContexts #-} |
3 | {-# LANGUAGE TemplateHaskell #-} | 3 | {-# LANGUAGE TemplateHaskell #-} |
4 | {-# LANGUAGE OverloadedStrings #-} | 4 | {-# LANGUAGE OverloadedStrings #-} |
5 | {-# LANGUAGE DataKinds #-} | ||
5 | 6 | ||
6 | module Thermoprint.Server.API | 7 | module Thermoprint.Server.API |
7 | ( ProtoHandler, Handler | 8 | ( ProtoHandler, Handler |
@@ -15,6 +16,7 @@ import qualified Thermoprint.API as API (JobId(..), DraftId(..)) | |||
15 | import Thermoprint.Server.Printer | 16 | import Thermoprint.Server.Printer |
16 | import Thermoprint.Server.Queue | 17 | import Thermoprint.Server.Queue |
17 | import Thermoprint.Server.Database | 18 | import Thermoprint.Server.Database |
19 | import Thermoprint.Server.Push | ||
18 | 20 | ||
19 | import Data.Set (Set) | 21 | import Data.Set (Set) |
20 | import qualified Data.Set as Set | 22 | import qualified Data.Set as Set |
@@ -28,6 +30,7 @@ import qualified Data.Text as T | |||
28 | import Servant | 30 | import Servant |
29 | import Servant.Server | 31 | import Servant.Server |
30 | import Servant.Server.Internal.Enter | 32 | import Servant.Server.Internal.Enter |
33 | import Servant.Utils.Links | ||
31 | 34 | ||
32 | import Control.Monad.Logger | 35 | import Control.Monad.Logger |
33 | import Control.Monad.Reader | 36 | import Control.Monad.Reader |
@@ -67,6 +70,7 @@ type Handler = EitherT ServantErr ProtoHandler | |||
67 | -- ^ Runtime configuration of our handlers | 70 | -- ^ Runtime configuration of our handlers |
68 | data HandlerInput = HandlerInput { sqlPool :: ConnectionPool -- ^ How to interact with 'persistent' storage | 71 | data HandlerInput = HandlerInput { sqlPool :: ConnectionPool -- ^ How to interact with 'persistent' storage |
69 | , printers :: Map PrinterId Printer | 72 | , printers :: Map PrinterId Printer |
73 | , nChan :: TChan Notification | ||
70 | } | 74 | } |
71 | 75 | ||
72 | instance MonadLogger m => MonadLogger (EitherT a m) where | 76 | instance MonadLogger m => MonadLogger (EitherT a m) where |
@@ -74,17 +78,18 @@ instance MonadLogger m => MonadLogger (EitherT a m) where | |||
74 | 78 | ||
75 | handlerNat :: ( MonadReader ConnectionPool m | 79 | handlerNat :: ( MonadReader ConnectionPool m |
76 | , MonadLoggerIO m | 80 | , MonadLoggerIO m |
77 | ) => Map PrinterId Printer -> m (Handler :~> EitherT ServantErr IO) | 81 | ) => Map PrinterId Printer -> TChan Notification -> m (Handler :~> EitherT ServantErr IO) |
78 | -- ^ Servant requires its handlers to be 'EitherT ServantErr IO' | 82 | -- ^ Servant requires its handlers to be 'EitherT ServantErr IO' |
79 | -- | 83 | -- |
80 | -- This generates a 'Nat'ural transformation for squashing the monad-transformer-stack we use in our handlers to the monad 'servant-server' wants | 84 | -- This generates a 'Nat'ural transformation for squashing the monad-transformer-stack we use in our handlers to the monad 'servant-server' wants |
81 | handlerNat printerMap = do | 85 | handlerNat printerMap nChan = do |
82 | sqlPool <- ask | 86 | sqlPool <- ask |
83 | logFunc <- askLoggerIO | 87 | logFunc <- askLoggerIO |
84 | let | 88 | let |
85 | handlerInput = HandlerInput | 89 | handlerInput = HandlerInput |
86 | { sqlPool = sqlPool | 90 | { sqlPool = sqlPool |
87 | , printers = printerMap | 91 | , printers = printerMap |
92 | , nChan = nChan | ||
88 | } | 93 | } |
89 | protoNat :: ProtoHandler :~> IO | 94 | protoNat :: ProtoHandler :~> IO |
90 | protoNat = Nat runResourceT . Nat (($ logFunc) . runLoggingT) . runReaderTNat handlerInput | 95 | protoNat = Nat runResourceT . Nat (($ logFunc) . runLoggingT) . runReaderTNat handlerInput |
@@ -103,6 +108,9 @@ thermoprintServer = listPrinters | |||
103 | (<||>) = liftM2 (:<|>) | 108 | (<||>) = liftM2 (:<|>) |
104 | infixr 9 <||> | 109 | infixr 9 <||> |
105 | 110 | ||
111 | notify :: Notification -> Handler () | ||
112 | notify n = liftIO . atomically =<< flip writeTChan n <$> asks nChan | ||
113 | |||
106 | lookupPrinter :: Maybe PrinterId -> Handler (PrinterId, Printer) | 114 | lookupPrinter :: Maybe PrinterId -> Handler (PrinterId, Printer) |
107 | -- ^ Make sure a printer exists | 115 | -- ^ Make sure a printer exists |
108 | lookupPrinter pId = asks printers >>= maybePrinter' pId | 116 | lookupPrinter pId = asks printers >>= maybePrinter' pId |
@@ -167,7 +175,9 @@ abortJob needle = do | |||
167 | let filtered = Seq.filter ((/= castId needle) . jobId) pending | 175 | let filtered = Seq.filter ((/= castId needle) . jobId) pending |
168 | writeTVar (queue p) $ current { pending = filtered } | 176 | writeTVar (queue p) $ current { pending = filtered } |
169 | return . not $ ((==) `on` length) pending filtered | 177 | return . not $ ((==) `on` length) pending filtered |
170 | when found . $(logInfo) $ "Removed job #" <> (T.pack $ show (castId needle :: Integer)) <> " from " <> (T.pack . show $ pId') | 178 | when found $ do |
179 | $(logInfo) $ "Removed job #" <> (T.pack $ show (castId needle :: Integer)) <> " from " <> (T.pack . show $ pId') | ||
180 | notify $ safeLink thermoprintAPI (Proxy :: Proxy ("jobs" :> Get '[JSON] (Seq (API.JobId, UTCTime, JobStatus)))) | ||
171 | return found | 181 | return found |
172 | when (not found) $ left err404 | 182 | when (not found) $ left err404 |
173 | 183 | ||
@@ -180,12 +190,14 @@ addDraft :: Maybe DraftTitle -> Printout -> Handler API.DraftId | |||
180 | addDraft title content = do | 190 | addDraft title content = do |
181 | id <- fmap castId . runSqlPool (insert $ Draft title content) =<< asks sqlPool | 191 | id <- fmap castId . runSqlPool (insert $ Draft title content) =<< asks sqlPool |
182 | $(logInfo) $ "Added draft #" <> (T.pack $ show (castId id :: Integer)) <> " (" <> (T.pack $ show title) <> ")" | 192 | $(logInfo) $ "Added draft #" <> (T.pack $ show (castId id :: Integer)) <> " (" <> (T.pack $ show title) <> ")" |
193 | notify $ safeLink thermoprintAPI (Proxy :: Proxy ("drafts" :> Get '[JSON] (Map API.DraftId (Maybe DraftTitle)))) | ||
183 | return id | 194 | return id |
184 | 195 | ||
185 | updateDraft :: API.DraftId -> Maybe DraftTitle -> Printout -> Handler () | 196 | updateDraft :: API.DraftId -> Maybe DraftTitle -> Printout -> Handler () |
186 | updateDraft draftId title content = handle (\(KeyNotFound _) -> left $ err404) $ do | 197 | updateDraft draftId title content = handle (\(KeyNotFound _) -> left $ err404) $ do |
187 | runSqlPool (update (castId draftId) [ DraftTitle =. title, DraftContent =. content ]) =<< asks sqlPool | 198 | runSqlPool (update (castId draftId) [ DraftTitle =. title, DraftContent =. content ]) =<< asks sqlPool |
188 | $(logInfo) $ "Updated draft #" <> (T.pack $ show (castId draftId :: Integer)) | 199 | $(logInfo) $ "Updated draft #" <> (T.pack $ show (castId draftId :: Integer)) |
200 | notify $ safeLink thermoprintAPI (Proxy :: Proxy ("draft" :> Capture "draftId" API.DraftId :> Get '[JSON] (Maybe DraftTitle, Printout))) $ draftId | ||
189 | 201 | ||
190 | getDraft :: API.DraftId -> Handler (Maybe DraftTitle, Printout) | 202 | getDraft :: API.DraftId -> Handler (Maybe DraftTitle, Printout) |
191 | getDraft draftId = fmap (\(Draft title content) -> (title, content)) . maybe (left err404) return =<< runSqlPool (get $ castId draftId) =<< asks sqlPool | 203 | getDraft draftId = fmap (\(Draft title content) -> (title, content)) . maybe (left err404) return =<< runSqlPool (get $ castId draftId) =<< asks sqlPool |
@@ -194,6 +206,7 @@ deleteDraft :: API.DraftId -> Handler () | |||
194 | deleteDraft draftId = do | 206 | deleteDraft draftId = do |
195 | runSqlPool (delete $ (castId draftId :: Key Draft)) =<< asks sqlPool | 207 | runSqlPool (delete $ (castId draftId :: Key Draft)) =<< asks sqlPool |
196 | $(logInfo) $ "Made sure draft #" <> (T.pack $ show (castId draftId :: Integer)) <> " is Deleted" | 208 | $(logInfo) $ "Made sure draft #" <> (T.pack $ show (castId draftId :: Integer)) <> " is Deleted" |
209 | notify $ safeLink thermoprintAPI (Proxy :: Proxy ("drafts" :> Get '[JSON] (Map API.DraftId (Maybe DraftTitle)))) | ||
197 | 210 | ||
198 | printDraft :: API.DraftId -> Maybe PrinterId -> Handler API.JobId | 211 | printDraft :: API.DraftId -> Maybe PrinterId -> Handler API.JobId |
199 | printDraft draftId printerId = (\(Draft _ content) -> queueJob printerId content) =<< maybe (left err404) return =<< runSqlPool (get $ castId draftId) =<< asks sqlPool | 212 | printDraft draftId printerId = (\(Draft _ content) -> queueJob printerId content) =<< maybe (left err404) return =<< runSqlPool (get $ castId draftId) =<< asks sqlPool |
diff --git a/server/src/Thermoprint/Server/Push.hs b/server/src/Thermoprint/Server/Push.hs new file mode 100644 index 0000000..b2eca6b --- /dev/null +++ b/server/src/Thermoprint/Server/Push.hs | |||
@@ -0,0 +1,59 @@ | |||
1 | {-# LANGUAGE ViewPatterns #-} | ||
2 | |||
3 | module Thermoprint.Server.Push | ||
4 | ( Notification | ||
5 | , withPush | ||
6 | , protocolSpec | ||
7 | , notifyOnChange | ||
8 | ) where | ||
9 | |||
10 | import Network.WebSockets | ||
11 | import Network.Wai.Handler.WebSockets | ||
12 | import Network.Wai (Application) | ||
13 | |||
14 | import Network.URI | ||
15 | |||
16 | import Control.Concurrent.STM | ||
17 | |||
18 | import Thermoprint.Server.Queue | ||
19 | |||
20 | import Control.Monad.IO.Class | ||
21 | import Control.Monad | ||
22 | |||
23 | import Paths_thermoprint_server (version) | ||
24 | import Data.Version (showVersion) | ||
25 | |||
26 | import Data.ByteString.Char8 (ByteString) | ||
27 | import qualified Data.ByteString.Char8 as CBS | ||
28 | |||
29 | import Data.Text (Text) | ||
30 | import qualified Data.Text as Text | ||
31 | |||
32 | type Notification = URI | ||
33 | |||
34 | withPush :: TChan Notification -> Application -> Application | ||
35 | withPush chan = websocketsOr defaultConnectionOptions $ flip acceptRequestWith (AcceptRequest $ Just protocolSpec) >=> handleClient chan | ||
36 | |||
37 | protocolSpec :: ByteString | ||
38 | protocolSpec = CBS.pack $ "thermoprint-server.notification." ++ showVersion version | ||
39 | |||
40 | handleClient :: TChan Notification -> Connection -> IO () | ||
41 | handleClient chan conn = do | ||
42 | cChan <- atomically $ dupTChan chan | ||
43 | forever . void $ atomically (readTChan cChan) >>= sendTextData conn . packNotification | ||
44 | |||
45 | packNotification :: Notification -> Text | ||
46 | packNotification = Text.pack . show | ||
47 | |||
48 | notifyOnChange :: MonadIO m => TChan Notification -> (a -> a -> Bool) -> Notification -> TVar a -> m () | ||
49 | notifyOnChange chan cmp n q = void . liftIO $ readTVarIO q >>= notifyOnChange' | ||
50 | where | ||
51 | notifyOnChange' last = do | ||
52 | current <- atomically $ (\current -> current <$ check (not $ cmp last current)) =<< readTVar q | ||
53 | atomically $ writeTChan chan n | ||
54 | notifyOnChange' current | ||
55 | |||
56 | |||
57 | |||
58 | |||
59 | |||