aboutsummaryrefslogtreecommitdiff
path: root/server/src
diff options
context:
space:
mode:
authorGregor Kleen <gkleen@yggdrasil.li>2016-02-22 20:22:42 +0000
committerGregor Kleen <gkleen@yggdrasil.li>2016-02-22 20:22:42 +0000
commit760027dbcd7185be038299efb18e0cc37c8088c4 (patch)
tree818a7b5700c904530a633da5139d1a0ee237eba4 /server/src
parent6dfb26d6f2966b98c278afd3e269826c96c0ab26 (diff)
downloadthermoprint-760027dbcd7185be038299efb18e0cc37c8088c4.tar
thermoprint-760027dbcd7185be038299efb18e0cc37c8088c4.tar.gz
thermoprint-760027dbcd7185be038299efb18e0cc37c8088c4.tar.bz2
thermoprint-760027dbcd7185be038299efb18e0cc37c8088c4.tar.xz
thermoprint-760027dbcd7185be038299efb18e0cc37c8088c4.zip
Websocket based push notifications
Diffstat (limited to 'server/src')
-rw-r--r--server/src/Thermoprint/Server.hs28
-rw-r--r--server/src/Thermoprint/Server/API.hs19
-rw-r--r--server/src/Thermoprint/Server/Push.hs59
3 files changed, 99 insertions, 7 deletions
diff --git a/server/src/Thermoprint/Server.hs b/server/src/Thermoprint/Server.hs
index df2d8e9..446c63e 100644
--- a/server/src/Thermoprint/Server.hs
+++ b/server/src/Thermoprint/Server.hs
@@ -6,6 +6,7 @@
6{-# LANGUAGE ImpredicativeTypes #-} 6{-# LANGUAGE ImpredicativeTypes #-}
7{-# LANGUAGE ExistentialQuantification #-} 7{-# LANGUAGE ExistentialQuantification #-}
8{-# LANGUAGE ViewPatterns #-} 8{-# LANGUAGE ViewPatterns #-}
9{-# LANGUAGE DataKinds #-}
9 10
10module Thermoprint.Server 11module Thermoprint.Server
11 ( thermoprintServer 12 ( thermoprintServer
@@ -25,9 +26,17 @@ import qualified Data.Map as Map
25 26
26import Data.Set (Set) 27import Data.Set (Set)
27import qualified Data.Set as Set 28import qualified Data.Set as Set
29
30import Data.Sequence (Seq)
31import qualified Data.Sequence as Seq
32
33import Data.Time (UTCTime)
28 34
29import Data.Maybe (maybe) 35import Data.Maybe (maybe)
30import Data.Foldable (mapM_, forM_, foldlM) 36import Data.Foldable (mapM_, forM_, foldlM)
37import Data.Function hiding (id, (.))
38import Data.Bifunctor
39import Data.Proxy
31 40
32import Control.Monad.Trans.Resource 41import Control.Monad.Trans.Resource
33import Control.Monad.Trans.Control 42import Control.Monad.Trans.Control
@@ -53,14 +62,20 @@ import Network.Wai (Application)
53 62
54import Servant.Server (serve) 63import Servant.Server (serve)
55import Servant.Server.Internal.Enter (enter, (:~>)(..)) 64import Servant.Server.Internal.Enter (enter, (:~>)(..))
65import Servant.API
66import Servant.Utils.Links
67import Network.URI
56 68
57import Database.Persist.Sql (runMigrationSilent, ConnectionPool, runSqlPool) 69import Database.Persist.Sql (runMigrationSilent, ConnectionPool, runSqlPool)
58 70
59 71
60import Thermoprint.API (thermoprintAPI, PrinterId) 72import Thermoprint.API (thermoprintAPI, PrinterStatus, JobStatus)
73import qualified Thermoprint.API as API (PrinterId, JobId)
61 74
62import Thermoprint.Server.Fork 75import Thermoprint.Server.Fork
63 76
77import Thermoprint.Server.Push
78
64import Thermoprint.Server.Database 79import Thermoprint.Server.Database
65import Thermoprint.Server.Printer 80import Thermoprint.Server.Printer
66import Thermoprint.Server.Queue 81import Thermoprint.Server.Queue
@@ -72,8 +87,8 @@ import Debug.Trace
72-- | Compile-time configuration for 'thermoprintServer' 87-- | Compile-time configuration for 'thermoprintServer'
73data Config m = Config { dyreError :: Maybe String -- ^ Set by 'Dyre' -- sent to log as an error 88data Config m = Config { dyreError :: Maybe String -- ^ Set by 'Dyre' -- sent to log as an error
74 , warpSettings :: Warp.Settings -- ^ Configure 'Warp's behaviour 89 , warpSettings :: Warp.Settings -- ^ Configure 'Warp's behaviour
75 , printers :: Map PrinterId Printer 90 , printers :: Map API.PrinterId Printer
76 , queueManagers :: PrinterId -> QMConfig m 91 , queueManagers :: API.PrinterId -> QMConfig m
77 } 92 }
78 93
79data QMConfig m = forall t. ( MonadTrans t 94data QMConfig m = forall t. ( MonadTrans t
@@ -137,4 +152,9 @@ thermoprintServer dyre io = Dyre.wrapMain $ Dyre.defaultParams
137 let 152 let
138 runQM' (queueManagers -> QMConfig qm nat) printer = unNat nat $ runQM gcChan qm printer 153 runQM' (queueManagers -> QMConfig qm nat) printer = unNat nat $ runQM gcChan qm printer
139 mapM_ (fork tMgr . uncurry runQM') $ Map.toList printers 154 mapM_ (fork tMgr . uncurry runQM') $ Map.toList printers
140 liftIO . Warp.runSettings warpSettings . serve thermoprintAPI . flip enter API.thermoprintServer =<< handlerNat printers 155 nChan <- liftIO $ newBroadcastTChanIO
156 let
157 printerUrl :: API.PrinterId -> URI
158 printerUrl = safeLink thermoprintAPI (Proxy :: Proxy ("jobs" :> QueryParam "printer" API.PrinterId :> Get '[JSON] (Seq (API.JobId, UTCTime, JobStatus))))
159 mapM_ (fork tMgr . uncurry (notifyOnChange nChan ((==) `on` fromZipper)) . bimap printerUrl queue) $ Map.toList printers
160 liftIO . Warp.runSettings warpSettings . withPush nChan . serve thermoprintAPI . flip enter API.thermoprintServer =<< handlerNat printers nChan
diff --git a/server/src/Thermoprint/Server/API.hs b/server/src/Thermoprint/Server/API.hs
index 770737a..cbf727c 100644
--- a/server/src/Thermoprint/Server/API.hs
+++ b/server/src/Thermoprint/Server/API.hs
@@ -2,6 +2,7 @@
2{-# LANGUAGE FlexibleContexts #-} 2{-# LANGUAGE FlexibleContexts #-}
3{-# LANGUAGE TemplateHaskell #-} 3{-# LANGUAGE TemplateHaskell #-}
4{-# LANGUAGE OverloadedStrings #-} 4{-# LANGUAGE OverloadedStrings #-}
5{-# LANGUAGE DataKinds #-}
5 6
6module Thermoprint.Server.API 7module Thermoprint.Server.API
7 ( ProtoHandler, Handler 8 ( ProtoHandler, Handler
@@ -15,6 +16,7 @@ import qualified Thermoprint.API as API (JobId(..), DraftId(..))
15import Thermoprint.Server.Printer 16import Thermoprint.Server.Printer
16import Thermoprint.Server.Queue 17import Thermoprint.Server.Queue
17import Thermoprint.Server.Database 18import Thermoprint.Server.Database
19import Thermoprint.Server.Push
18 20
19import Data.Set (Set) 21import Data.Set (Set)
20import qualified Data.Set as Set 22import qualified Data.Set as Set
@@ -28,6 +30,7 @@ import qualified Data.Text as T
28import Servant 30import Servant
29import Servant.Server 31import Servant.Server
30import Servant.Server.Internal.Enter 32import Servant.Server.Internal.Enter
33import Servant.Utils.Links
31 34
32import Control.Monad.Logger 35import Control.Monad.Logger
33import Control.Monad.Reader 36import Control.Monad.Reader
@@ -67,6 +70,7 @@ type Handler = EitherT ServantErr ProtoHandler
67-- ^ Runtime configuration of our handlers 70-- ^ Runtime configuration of our handlers
68data HandlerInput = HandlerInput { sqlPool :: ConnectionPool -- ^ How to interact with 'persistent' storage 71data HandlerInput = HandlerInput { sqlPool :: ConnectionPool -- ^ How to interact with 'persistent' storage
69 , printers :: Map PrinterId Printer 72 , printers :: Map PrinterId Printer
73 , nChan :: TChan Notification
70 } 74 }
71 75
72instance MonadLogger m => MonadLogger (EitherT a m) where 76instance MonadLogger m => MonadLogger (EitherT a m) where
@@ -74,17 +78,18 @@ instance MonadLogger m => MonadLogger (EitherT a m) where
74 78
75handlerNat :: ( MonadReader ConnectionPool m 79handlerNat :: ( MonadReader ConnectionPool m
76 , MonadLoggerIO m 80 , MonadLoggerIO m
77 ) => Map PrinterId Printer -> m (Handler :~> EitherT ServantErr IO) 81 ) => Map PrinterId Printer -> TChan Notification -> m (Handler :~> EitherT ServantErr IO)
78-- ^ Servant requires its handlers to be 'EitherT ServantErr IO' 82-- ^ Servant requires its handlers to be 'EitherT ServantErr IO'
79-- 83--
80-- This generates a 'Nat'ural transformation for squashing the monad-transformer-stack we use in our handlers to the monad 'servant-server' wants 84-- This generates a 'Nat'ural transformation for squashing the monad-transformer-stack we use in our handlers to the monad 'servant-server' wants
81handlerNat printerMap = do 85handlerNat printerMap nChan = do
82 sqlPool <- ask 86 sqlPool <- ask
83 logFunc <- askLoggerIO 87 logFunc <- askLoggerIO
84 let 88 let
85 handlerInput = HandlerInput 89 handlerInput = HandlerInput
86 { sqlPool = sqlPool 90 { sqlPool = sqlPool
87 , printers = printerMap 91 , printers = printerMap
92 , nChan = nChan
88 } 93 }
89 protoNat :: ProtoHandler :~> IO 94 protoNat :: ProtoHandler :~> IO
90 protoNat = Nat runResourceT . Nat (($ logFunc) . runLoggingT) . runReaderTNat handlerInput 95 protoNat = Nat runResourceT . Nat (($ logFunc) . runLoggingT) . runReaderTNat handlerInput
@@ -103,6 +108,9 @@ thermoprintServer = listPrinters
103 (<||>) = liftM2 (:<|>) 108 (<||>) = liftM2 (:<|>)
104 infixr 9 <||> 109 infixr 9 <||>
105 110
111notify :: Notification -> Handler ()
112notify n = liftIO . atomically =<< flip writeTChan n <$> asks nChan
113
106lookupPrinter :: Maybe PrinterId -> Handler (PrinterId, Printer) 114lookupPrinter :: Maybe PrinterId -> Handler (PrinterId, Printer)
107-- ^ Make sure a printer exists 115-- ^ Make sure a printer exists
108lookupPrinter pId = asks printers >>= maybePrinter' pId 116lookupPrinter pId = asks printers >>= maybePrinter' pId
@@ -167,7 +175,9 @@ abortJob needle = do
167 let filtered = Seq.filter ((/= castId needle) . jobId) pending 175 let filtered = Seq.filter ((/= castId needle) . jobId) pending
168 writeTVar (queue p) $ current { pending = filtered } 176 writeTVar (queue p) $ current { pending = filtered }
169 return . not $ ((==) `on` length) pending filtered 177 return . not $ ((==) `on` length) pending filtered
170 when found . $(logInfo) $ "Removed job #" <> (T.pack $ show (castId needle :: Integer)) <> " from " <> (T.pack . show $ pId') 178 when found $ do
179 $(logInfo) $ "Removed job #" <> (T.pack $ show (castId needle :: Integer)) <> " from " <> (T.pack . show $ pId')
180 notify $ safeLink thermoprintAPI (Proxy :: Proxy ("jobs" :> Get '[JSON] (Seq (API.JobId, UTCTime, JobStatus))))
171 return found 181 return found
172 when (not found) $ left err404 182 when (not found) $ left err404
173 183
@@ -180,12 +190,14 @@ addDraft :: Maybe DraftTitle -> Printout -> Handler API.DraftId
180addDraft title content = do 190addDraft title content = do
181 id <- fmap castId . runSqlPool (insert $ Draft title content) =<< asks sqlPool 191 id <- fmap castId . runSqlPool (insert $ Draft title content) =<< asks sqlPool
182 $(logInfo) $ "Added draft #" <> (T.pack $ show (castId id :: Integer)) <> " (" <> (T.pack $ show title) <> ")" 192 $(logInfo) $ "Added draft #" <> (T.pack $ show (castId id :: Integer)) <> " (" <> (T.pack $ show title) <> ")"
193 notify $ safeLink thermoprintAPI (Proxy :: Proxy ("drafts" :> Get '[JSON] (Map API.DraftId (Maybe DraftTitle))))
183 return id 194 return id
184 195
185updateDraft :: API.DraftId -> Maybe DraftTitle -> Printout -> Handler () 196updateDraft :: API.DraftId -> Maybe DraftTitle -> Printout -> Handler ()
186updateDraft draftId title content = handle (\(KeyNotFound _) -> left $ err404) $ do 197updateDraft draftId title content = handle (\(KeyNotFound _) -> left $ err404) $ do
187 runSqlPool (update (castId draftId) [ DraftTitle =. title, DraftContent =. content ]) =<< asks sqlPool 198 runSqlPool (update (castId draftId) [ DraftTitle =. title, DraftContent =. content ]) =<< asks sqlPool
188 $(logInfo) $ "Updated draft #" <> (T.pack $ show (castId draftId :: Integer)) 199 $(logInfo) $ "Updated draft #" <> (T.pack $ show (castId draftId :: Integer))
200 notify $ safeLink thermoprintAPI (Proxy :: Proxy ("draft" :> Capture "draftId" API.DraftId :> Get '[JSON] (Maybe DraftTitle, Printout))) $ draftId
189 201
190getDraft :: API.DraftId -> Handler (Maybe DraftTitle, Printout) 202getDraft :: API.DraftId -> Handler (Maybe DraftTitle, Printout)
191getDraft draftId = fmap (\(Draft title content) -> (title, content)) . maybe (left err404) return =<< runSqlPool (get $ castId draftId) =<< asks sqlPool 203getDraft draftId = fmap (\(Draft title content) -> (title, content)) . maybe (left err404) return =<< runSqlPool (get $ castId draftId) =<< asks sqlPool
@@ -194,6 +206,7 @@ deleteDraft :: API.DraftId -> Handler ()
194deleteDraft draftId = do 206deleteDraft draftId = do
195 runSqlPool (delete $ (castId draftId :: Key Draft)) =<< asks sqlPool 207 runSqlPool (delete $ (castId draftId :: Key Draft)) =<< asks sqlPool
196 $(logInfo) $ "Made sure draft #" <> (T.pack $ show (castId draftId :: Integer)) <> " is Deleted" 208 $(logInfo) $ "Made sure draft #" <> (T.pack $ show (castId draftId :: Integer)) <> " is Deleted"
209 notify $ safeLink thermoprintAPI (Proxy :: Proxy ("drafts" :> Get '[JSON] (Map API.DraftId (Maybe DraftTitle))))
197 210
198printDraft :: API.DraftId -> Maybe PrinterId -> Handler API.JobId 211printDraft :: API.DraftId -> Maybe PrinterId -> Handler API.JobId
199printDraft draftId printerId = (\(Draft _ content) -> queueJob printerId content) =<< maybe (left err404) return =<< runSqlPool (get $ castId draftId) =<< asks sqlPool 212printDraft draftId printerId = (\(Draft _ content) -> queueJob printerId content) =<< maybe (left err404) return =<< runSqlPool (get $ castId draftId) =<< asks sqlPool
diff --git a/server/src/Thermoprint/Server/Push.hs b/server/src/Thermoprint/Server/Push.hs
new file mode 100644
index 0000000..b2eca6b
--- /dev/null
+++ b/server/src/Thermoprint/Server/Push.hs
@@ -0,0 +1,59 @@
1{-# LANGUAGE ViewPatterns #-}
2
3module Thermoprint.Server.Push
4 ( Notification
5 , withPush
6 , protocolSpec
7 , notifyOnChange
8 ) where
9
10import Network.WebSockets
11import Network.Wai.Handler.WebSockets
12import Network.Wai (Application)
13
14import Network.URI
15
16import Control.Concurrent.STM
17
18import Thermoprint.Server.Queue
19
20import Control.Monad.IO.Class
21import Control.Monad
22
23import Paths_thermoprint_server (version)
24import Data.Version (showVersion)
25
26import Data.ByteString.Char8 (ByteString)
27import qualified Data.ByteString.Char8 as CBS
28
29import Data.Text (Text)
30import qualified Data.Text as Text
31
32type Notification = URI
33
34withPush :: TChan Notification -> Application -> Application
35withPush chan = websocketsOr defaultConnectionOptions $ flip acceptRequestWith (AcceptRequest $ Just protocolSpec) >=> handleClient chan
36
37protocolSpec :: ByteString
38protocolSpec = CBS.pack $ "thermoprint-server.notification." ++ showVersion version
39
40handleClient :: TChan Notification -> Connection -> IO ()
41handleClient chan conn = do
42 cChan <- atomically $ dupTChan chan
43 forever . void $ atomically (readTChan cChan) >>= sendTextData conn . packNotification
44
45packNotification :: Notification -> Text
46packNotification = Text.pack . show
47
48notifyOnChange :: MonadIO m => TChan Notification -> (a -> a -> Bool) -> Notification -> TVar a -> m ()
49notifyOnChange chan cmp n q = void . liftIO $ readTVarIO q >>= notifyOnChange'
50 where
51 notifyOnChange' last = do
52 current <- atomically $ (\current -> current <$ check (not $ cmp last current)) =<< readTVar q
53 atomically $ writeTChan chan n
54 notifyOnChange' current
55
56
57
58
59