From 760027dbcd7185be038299efb18e0cc37c8088c4 Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Mon, 22 Feb 2016 20:22:42 +0000 Subject: Websocket based push notifications --- server/src/Thermoprint/Server.hs | 28 ++++++++++++++--- server/src/Thermoprint/Server/API.hs | 19 +++++++++-- server/src/Thermoprint/Server/Push.hs | 59 +++++++++++++++++++++++++++++++++++ 3 files changed, 99 insertions(+), 7 deletions(-) create mode 100644 server/src/Thermoprint/Server/Push.hs (limited to 'server/src') diff --git a/server/src/Thermoprint/Server.hs b/server/src/Thermoprint/Server.hs index df2d8e9..446c63e 100644 --- a/server/src/Thermoprint/Server.hs +++ b/server/src/Thermoprint/Server.hs @@ -6,6 +6,7 @@ {-# LANGUAGE ImpredicativeTypes #-} {-# LANGUAGE ExistentialQuantification #-} {-# LANGUAGE ViewPatterns #-} +{-# LANGUAGE DataKinds #-} module Thermoprint.Server ( thermoprintServer @@ -25,9 +26,17 @@ import qualified Data.Map as Map import Data.Set (Set) import qualified Data.Set as Set + +import Data.Sequence (Seq) +import qualified Data.Sequence as Seq + +import Data.Time (UTCTime) import Data.Maybe (maybe) import Data.Foldable (mapM_, forM_, foldlM) +import Data.Function hiding (id, (.)) +import Data.Bifunctor +import Data.Proxy import Control.Monad.Trans.Resource import Control.Monad.Trans.Control @@ -53,14 +62,20 @@ import Network.Wai (Application) import Servant.Server (serve) import Servant.Server.Internal.Enter (enter, (:~>)(..)) +import Servant.API +import Servant.Utils.Links +import Network.URI import Database.Persist.Sql (runMigrationSilent, ConnectionPool, runSqlPool) -import Thermoprint.API (thermoprintAPI, PrinterId) +import Thermoprint.API (thermoprintAPI, PrinterStatus, JobStatus) +import qualified Thermoprint.API as API (PrinterId, JobId) import Thermoprint.Server.Fork +import Thermoprint.Server.Push + import Thermoprint.Server.Database import Thermoprint.Server.Printer import Thermoprint.Server.Queue @@ -72,8 +87,8 @@ import Debug.Trace -- | Compile-time configuration for 'thermoprintServer' data Config m = Config { dyreError :: Maybe String -- ^ Set by 'Dyre' -- sent to log as an error , warpSettings :: Warp.Settings -- ^ Configure 'Warp's behaviour - , printers :: Map PrinterId Printer - , queueManagers :: PrinterId -> QMConfig m + , printers :: Map API.PrinterId Printer + , queueManagers :: API.PrinterId -> QMConfig m } data QMConfig m = forall t. ( MonadTrans t @@ -137,4 +152,9 @@ thermoprintServer dyre io = Dyre.wrapMain $ Dyre.defaultParams let runQM' (queueManagers -> QMConfig qm nat) printer = unNat nat $ runQM gcChan qm printer mapM_ (fork tMgr . uncurry runQM') $ Map.toList printers - liftIO . Warp.runSettings warpSettings . serve thermoprintAPI . flip enter API.thermoprintServer =<< handlerNat printers + nChan <- liftIO $ newBroadcastTChanIO + let + printerUrl :: API.PrinterId -> URI + printerUrl = safeLink thermoprintAPI (Proxy :: Proxy ("jobs" :> QueryParam "printer" API.PrinterId :> Get '[JSON] (Seq (API.JobId, UTCTime, JobStatus)))) + mapM_ (fork tMgr . uncurry (notifyOnChange nChan ((==) `on` fromZipper)) . bimap printerUrl queue) $ Map.toList printers + liftIO . Warp.runSettings warpSettings . withPush nChan . serve thermoprintAPI . flip enter API.thermoprintServer =<< handlerNat printers nChan diff --git a/server/src/Thermoprint/Server/API.hs b/server/src/Thermoprint/Server/API.hs index 770737a..cbf727c 100644 --- a/server/src/Thermoprint/Server/API.hs +++ b/server/src/Thermoprint/Server/API.hs @@ -2,6 +2,7 @@ {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE TemplateHaskell #-} {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE DataKinds #-} module Thermoprint.Server.API ( ProtoHandler, Handler @@ -15,6 +16,7 @@ import qualified Thermoprint.API as API (JobId(..), DraftId(..)) import Thermoprint.Server.Printer import Thermoprint.Server.Queue import Thermoprint.Server.Database +import Thermoprint.Server.Push import Data.Set (Set) import qualified Data.Set as Set @@ -28,6 +30,7 @@ import qualified Data.Text as T import Servant import Servant.Server import Servant.Server.Internal.Enter +import Servant.Utils.Links import Control.Monad.Logger import Control.Monad.Reader @@ -67,6 +70,7 @@ type Handler = EitherT ServantErr ProtoHandler -- ^ Runtime configuration of our handlers data HandlerInput = HandlerInput { sqlPool :: ConnectionPool -- ^ How to interact with 'persistent' storage , printers :: Map PrinterId Printer + , nChan :: TChan Notification } instance MonadLogger m => MonadLogger (EitherT a m) where @@ -74,17 +78,18 @@ instance MonadLogger m => MonadLogger (EitherT a m) where handlerNat :: ( MonadReader ConnectionPool m , MonadLoggerIO m - ) => Map PrinterId Printer -> m (Handler :~> EitherT ServantErr IO) + ) => Map PrinterId Printer -> TChan Notification -> m (Handler :~> EitherT ServantErr IO) -- ^ Servant requires its handlers to be 'EitherT ServantErr IO' -- -- This generates a 'Nat'ural transformation for squashing the monad-transformer-stack we use in our handlers to the monad 'servant-server' wants -handlerNat printerMap = do +handlerNat printerMap nChan = do sqlPool <- ask logFunc <- askLoggerIO let handlerInput = HandlerInput { sqlPool = sqlPool , printers = printerMap + , nChan = nChan } protoNat :: ProtoHandler :~> IO protoNat = Nat runResourceT . Nat (($ logFunc) . runLoggingT) . runReaderTNat handlerInput @@ -103,6 +108,9 @@ thermoprintServer = listPrinters (<||>) = liftM2 (:<|>) infixr 9 <||> +notify :: Notification -> Handler () +notify n = liftIO . atomically =<< flip writeTChan n <$> asks nChan + lookupPrinter :: Maybe PrinterId -> Handler (PrinterId, Printer) -- ^ Make sure a printer exists lookupPrinter pId = asks printers >>= maybePrinter' pId @@ -167,7 +175,9 @@ abortJob needle = do let filtered = Seq.filter ((/= castId needle) . jobId) pending writeTVar (queue p) $ current { pending = filtered } return . not $ ((==) `on` length) pending filtered - when found . $(logInfo) $ "Removed job #" <> (T.pack $ show (castId needle :: Integer)) <> " from " <> (T.pack . show $ pId') + when found $ do + $(logInfo) $ "Removed job #" <> (T.pack $ show (castId needle :: Integer)) <> " from " <> (T.pack . show $ pId') + notify $ safeLink thermoprintAPI (Proxy :: Proxy ("jobs" :> Get '[JSON] (Seq (API.JobId, UTCTime, JobStatus)))) return found when (not found) $ left err404 @@ -180,12 +190,14 @@ addDraft :: Maybe DraftTitle -> Printout -> Handler API.DraftId addDraft title content = do id <- fmap castId . runSqlPool (insert $ Draft title content) =<< asks sqlPool $(logInfo) $ "Added draft #" <> (T.pack $ show (castId id :: Integer)) <> " (" <> (T.pack $ show title) <> ")" + notify $ safeLink thermoprintAPI (Proxy :: Proxy ("drafts" :> Get '[JSON] (Map API.DraftId (Maybe DraftTitle)))) return id updateDraft :: API.DraftId -> Maybe DraftTitle -> Printout -> Handler () updateDraft draftId title content = handle (\(KeyNotFound _) -> left $ err404) $ do runSqlPool (update (castId draftId) [ DraftTitle =. title, DraftContent =. content ]) =<< asks sqlPool $(logInfo) $ "Updated draft #" <> (T.pack $ show (castId draftId :: Integer)) + notify $ safeLink thermoprintAPI (Proxy :: Proxy ("draft" :> Capture "draftId" API.DraftId :> Get '[JSON] (Maybe DraftTitle, Printout))) $ draftId getDraft :: API.DraftId -> Handler (Maybe DraftTitle, Printout) getDraft draftId = fmap (\(Draft title content) -> (title, content)) . maybe (left err404) return =<< runSqlPool (get $ castId draftId) =<< asks sqlPool @@ -194,6 +206,7 @@ deleteDraft :: API.DraftId -> Handler () deleteDraft draftId = do runSqlPool (delete $ (castId draftId :: Key Draft)) =<< asks sqlPool $(logInfo) $ "Made sure draft #" <> (T.pack $ show (castId draftId :: Integer)) <> " is Deleted" + notify $ safeLink thermoprintAPI (Proxy :: Proxy ("drafts" :> Get '[JSON] (Map API.DraftId (Maybe DraftTitle)))) printDraft :: API.DraftId -> Maybe PrinterId -> Handler API.JobId printDraft draftId printerId = (\(Draft _ content) -> queueJob printerId content) =<< maybe (left err404) return =<< runSqlPool (get $ castId draftId) =<< asks sqlPool diff --git a/server/src/Thermoprint/Server/Push.hs b/server/src/Thermoprint/Server/Push.hs new file mode 100644 index 0000000..b2eca6b --- /dev/null +++ b/server/src/Thermoprint/Server/Push.hs @@ -0,0 +1,59 @@ +{-# LANGUAGE ViewPatterns #-} + +module Thermoprint.Server.Push + ( Notification + , withPush + , protocolSpec + , notifyOnChange + ) where + +import Network.WebSockets +import Network.Wai.Handler.WebSockets +import Network.Wai (Application) + +import Network.URI + +import Control.Concurrent.STM + +import Thermoprint.Server.Queue + +import Control.Monad.IO.Class +import Control.Monad + +import Paths_thermoprint_server (version) +import Data.Version (showVersion) + +import Data.ByteString.Char8 (ByteString) +import qualified Data.ByteString.Char8 as CBS + +import Data.Text (Text) +import qualified Data.Text as Text + +type Notification = URI + +withPush :: TChan Notification -> Application -> Application +withPush chan = websocketsOr defaultConnectionOptions $ flip acceptRequestWith (AcceptRequest $ Just protocolSpec) >=> handleClient chan + +protocolSpec :: ByteString +protocolSpec = CBS.pack $ "thermoprint-server.notification." ++ showVersion version + +handleClient :: TChan Notification -> Connection -> IO () +handleClient chan conn = do + cChan <- atomically $ dupTChan chan + forever . void $ atomically (readTChan cChan) >>= sendTextData conn . packNotification + +packNotification :: Notification -> Text +packNotification = Text.pack . show + +notifyOnChange :: MonadIO m => TChan Notification -> (a -> a -> Bool) -> Notification -> TVar a -> m () +notifyOnChange chan cmp n q = void . liftIO $ readTVarIO q >>= notifyOnChange' + where + notifyOnChange' last = do + current <- atomically $ (\current -> current <$ check (not $ cmp last current)) =<< readTVar q + atomically $ writeTChan chan n + notifyOnChange' current + + + + + -- cgit v1.2.3