From 760027dbcd7185be038299efb18e0cc37c8088c4 Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Mon, 22 Feb 2016 20:22:42 +0000 Subject: Websocket based push notifications --- server/src/Thermoprint/Server.hs | 28 ++++++++++++++--- server/src/Thermoprint/Server/API.hs | 19 +++++++++-- server/src/Thermoprint/Server/Push.hs | 59 +++++++++++++++++++++++++++++++++++ server/thermoprint-server.cabal | 11 +++++-- server/thermoprint-server.nix | 20 ++++++------ 5 files changed, 119 insertions(+), 18 deletions(-) create mode 100644 server/src/Thermoprint/Server/Push.hs diff --git a/server/src/Thermoprint/Server.hs b/server/src/Thermoprint/Server.hs index df2d8e9..446c63e 100644 --- a/server/src/Thermoprint/Server.hs +++ b/server/src/Thermoprint/Server.hs @@ -6,6 +6,7 @@ {-# LANGUAGE ImpredicativeTypes #-} {-# LANGUAGE ExistentialQuantification #-} {-# LANGUAGE ViewPatterns #-} +{-# LANGUAGE DataKinds #-} module Thermoprint.Server ( thermoprintServer @@ -25,9 +26,17 @@ import qualified Data.Map as Map import Data.Set (Set) import qualified Data.Set as Set + +import Data.Sequence (Seq) +import qualified Data.Sequence as Seq + +import Data.Time (UTCTime) import Data.Maybe (maybe) import Data.Foldable (mapM_, forM_, foldlM) +import Data.Function hiding (id, (.)) +import Data.Bifunctor +import Data.Proxy import Control.Monad.Trans.Resource import Control.Monad.Trans.Control @@ -53,14 +62,20 @@ import Network.Wai (Application) import Servant.Server (serve) import Servant.Server.Internal.Enter (enter, (:~>)(..)) +import Servant.API +import Servant.Utils.Links +import Network.URI import Database.Persist.Sql (runMigrationSilent, ConnectionPool, runSqlPool) -import Thermoprint.API (thermoprintAPI, PrinterId) +import Thermoprint.API (thermoprintAPI, PrinterStatus, JobStatus) +import qualified Thermoprint.API as API (PrinterId, JobId) import Thermoprint.Server.Fork +import Thermoprint.Server.Push + import Thermoprint.Server.Database import Thermoprint.Server.Printer import Thermoprint.Server.Queue @@ -72,8 +87,8 @@ import Debug.Trace -- | Compile-time configuration for 'thermoprintServer' data Config m = Config { dyreError :: Maybe String -- ^ Set by 'Dyre' -- sent to log as an error , warpSettings :: Warp.Settings -- ^ Configure 'Warp's behaviour - , printers :: Map PrinterId Printer - , queueManagers :: PrinterId -> QMConfig m + , printers :: Map API.PrinterId Printer + , queueManagers :: API.PrinterId -> QMConfig m } data QMConfig m = forall t. ( MonadTrans t @@ -137,4 +152,9 @@ thermoprintServer dyre io = Dyre.wrapMain $ Dyre.defaultParams let runQM' (queueManagers -> QMConfig qm nat) printer = unNat nat $ runQM gcChan qm printer mapM_ (fork tMgr . uncurry runQM') $ Map.toList printers - liftIO . Warp.runSettings warpSettings . serve thermoprintAPI . flip enter API.thermoprintServer =<< handlerNat printers + nChan <- liftIO $ newBroadcastTChanIO + let + printerUrl :: API.PrinterId -> URI + printerUrl = safeLink thermoprintAPI (Proxy :: Proxy ("jobs" :> QueryParam "printer" API.PrinterId :> Get '[JSON] (Seq (API.JobId, UTCTime, JobStatus)))) + mapM_ (fork tMgr . uncurry (notifyOnChange nChan ((==) `on` fromZipper)) . bimap printerUrl queue) $ Map.toList printers + liftIO . Warp.runSettings warpSettings . withPush nChan . serve thermoprintAPI . flip enter API.thermoprintServer =<< handlerNat printers nChan diff --git a/server/src/Thermoprint/Server/API.hs b/server/src/Thermoprint/Server/API.hs index 770737a..cbf727c 100644 --- a/server/src/Thermoprint/Server/API.hs +++ b/server/src/Thermoprint/Server/API.hs @@ -2,6 +2,7 @@ {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE TemplateHaskell #-} {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE DataKinds #-} module Thermoprint.Server.API ( ProtoHandler, Handler @@ -15,6 +16,7 @@ import qualified Thermoprint.API as API (JobId(..), DraftId(..)) import Thermoprint.Server.Printer import Thermoprint.Server.Queue import Thermoprint.Server.Database +import Thermoprint.Server.Push import Data.Set (Set) import qualified Data.Set as Set @@ -28,6 +30,7 @@ import qualified Data.Text as T import Servant import Servant.Server import Servant.Server.Internal.Enter +import Servant.Utils.Links import Control.Monad.Logger import Control.Monad.Reader @@ -67,6 +70,7 @@ type Handler = EitherT ServantErr ProtoHandler -- ^ Runtime configuration of our handlers data HandlerInput = HandlerInput { sqlPool :: ConnectionPool -- ^ How to interact with 'persistent' storage , printers :: Map PrinterId Printer + , nChan :: TChan Notification } instance MonadLogger m => MonadLogger (EitherT a m) where @@ -74,17 +78,18 @@ instance MonadLogger m => MonadLogger (EitherT a m) where handlerNat :: ( MonadReader ConnectionPool m , MonadLoggerIO m - ) => Map PrinterId Printer -> m (Handler :~> EitherT ServantErr IO) + ) => Map PrinterId Printer -> TChan Notification -> m (Handler :~> EitherT ServantErr IO) -- ^ Servant requires its handlers to be 'EitherT ServantErr IO' -- -- This generates a 'Nat'ural transformation for squashing the monad-transformer-stack we use in our handlers to the monad 'servant-server' wants -handlerNat printerMap = do +handlerNat printerMap nChan = do sqlPool <- ask logFunc <- askLoggerIO let handlerInput = HandlerInput { sqlPool = sqlPool , printers = printerMap + , nChan = nChan } protoNat :: ProtoHandler :~> IO protoNat = Nat runResourceT . Nat (($ logFunc) . runLoggingT) . runReaderTNat handlerInput @@ -103,6 +108,9 @@ thermoprintServer = listPrinters (<||>) = liftM2 (:<|>) infixr 9 <||> +notify :: Notification -> Handler () +notify n = liftIO . atomically =<< flip writeTChan n <$> asks nChan + lookupPrinter :: Maybe PrinterId -> Handler (PrinterId, Printer) -- ^ Make sure a printer exists lookupPrinter pId = asks printers >>= maybePrinter' pId @@ -167,7 +175,9 @@ abortJob needle = do let filtered = Seq.filter ((/= castId needle) . jobId) pending writeTVar (queue p) $ current { pending = filtered } return . not $ ((==) `on` length) pending filtered - when found . $(logInfo) $ "Removed job #" <> (T.pack $ show (castId needle :: Integer)) <> " from " <> (T.pack . show $ pId') + when found $ do + $(logInfo) $ "Removed job #" <> (T.pack $ show (castId needle :: Integer)) <> " from " <> (T.pack . show $ pId') + notify $ safeLink thermoprintAPI (Proxy :: Proxy ("jobs" :> Get '[JSON] (Seq (API.JobId, UTCTime, JobStatus)))) return found when (not found) $ left err404 @@ -180,12 +190,14 @@ addDraft :: Maybe DraftTitle -> Printout -> Handler API.DraftId addDraft title content = do id <- fmap castId . runSqlPool (insert $ Draft title content) =<< asks sqlPool $(logInfo) $ "Added draft #" <> (T.pack $ show (castId id :: Integer)) <> " (" <> (T.pack $ show title) <> ")" + notify $ safeLink thermoprintAPI (Proxy :: Proxy ("drafts" :> Get '[JSON] (Map API.DraftId (Maybe DraftTitle)))) return id updateDraft :: API.DraftId -> Maybe DraftTitle -> Printout -> Handler () updateDraft draftId title content = handle (\(KeyNotFound _) -> left $ err404) $ do runSqlPool (update (castId draftId) [ DraftTitle =. title, DraftContent =. content ]) =<< asks sqlPool $(logInfo) $ "Updated draft #" <> (T.pack $ show (castId draftId :: Integer)) + notify $ safeLink thermoprintAPI (Proxy :: Proxy ("draft" :> Capture "draftId" API.DraftId :> Get '[JSON] (Maybe DraftTitle, Printout))) $ draftId getDraft :: API.DraftId -> Handler (Maybe DraftTitle, Printout) getDraft draftId = fmap (\(Draft title content) -> (title, content)) . maybe (left err404) return =<< runSqlPool (get $ castId draftId) =<< asks sqlPool @@ -194,6 +206,7 @@ deleteDraft :: API.DraftId -> Handler () deleteDraft draftId = do runSqlPool (delete $ (castId draftId :: Key Draft)) =<< asks sqlPool $(logInfo) $ "Made sure draft #" <> (T.pack $ show (castId draftId :: Integer)) <> " is Deleted" + notify $ safeLink thermoprintAPI (Proxy :: Proxy ("drafts" :> Get '[JSON] (Map API.DraftId (Maybe DraftTitle)))) printDraft :: API.DraftId -> Maybe PrinterId -> Handler API.JobId printDraft draftId printerId = (\(Draft _ content) -> queueJob printerId content) =<< maybe (left err404) return =<< runSqlPool (get $ castId draftId) =<< asks sqlPool diff --git a/server/src/Thermoprint/Server/Push.hs b/server/src/Thermoprint/Server/Push.hs new file mode 100644 index 0000000..b2eca6b --- /dev/null +++ b/server/src/Thermoprint/Server/Push.hs @@ -0,0 +1,59 @@ +{-# LANGUAGE ViewPatterns #-} + +module Thermoprint.Server.Push + ( Notification + , withPush + , protocolSpec + , notifyOnChange + ) where + +import Network.WebSockets +import Network.Wai.Handler.WebSockets +import Network.Wai (Application) + +import Network.URI + +import Control.Concurrent.STM + +import Thermoprint.Server.Queue + +import Control.Monad.IO.Class +import Control.Monad + +import Paths_thermoprint_server (version) +import Data.Version (showVersion) + +import Data.ByteString.Char8 (ByteString) +import qualified Data.ByteString.Char8 as CBS + +import Data.Text (Text) +import qualified Data.Text as Text + +type Notification = URI + +withPush :: TChan Notification -> Application -> Application +withPush chan = websocketsOr defaultConnectionOptions $ flip acceptRequestWith (AcceptRequest $ Just protocolSpec) >=> handleClient chan + +protocolSpec :: ByteString +protocolSpec = CBS.pack $ "thermoprint-server.notification." ++ showVersion version + +handleClient :: TChan Notification -> Connection -> IO () +handleClient chan conn = do + cChan <- atomically $ dupTChan chan + forever . void $ atomically (readTChan cChan) >>= sendTextData conn . packNotification + +packNotification :: Notification -> Text +packNotification = Text.pack . show + +notifyOnChange :: MonadIO m => TChan Notification -> (a -> a -> Bool) -> Notification -> TVar a -> m () +notifyOnChange chan cmp n q = void . liftIO $ readTVarIO q >>= notifyOnChange' + where + notifyOnChange' last = do + current <- atomically $ (\current -> current <$ check (not $ cmp last current)) =<< readTVar q + atomically $ writeTChan chan n + notifyOnChange' current + + + + + diff --git a/server/thermoprint-server.cabal b/server/thermoprint-server.cabal index e90eb4f..bc3650b 100644 --- a/server/thermoprint-server.cabal +++ b/server/thermoprint-server.cabal @@ -2,7 +2,7 @@ -- documentation, see http://haskell.org/cabal/users-guide/ name: thermoprint-server -version: 1.0.0 +version: 1.1.0 synopsis: Server for thermoprint-spec -- description: homepage: http://dirty-haskell.org/tags/thermoprint.html @@ -21,11 +21,13 @@ library , Thermoprint.Server.Fork , Thermoprint.Server.Database , Thermoprint.Server.API + , Thermoprint.Server.Push , Thermoprint.Server.Queue , Thermoprint.Server.Printer , Thermoprint.Server.Printer.Debug , Thermoprint.Server.Printer.Generic other-modules: Thermoprint.Server.Database.Instances + , Paths_thermoprint_server -- other-extensions: build-depends: base >=4.8 && <5 , conduit >=1.2.6 && <2 @@ -42,6 +44,7 @@ library , transformers >=0.3.0 && <1 , persistent >=2.2 && <3 , persistent-template >=2.1.4 && <3 + , servant >=0.4.4 && <1 , servant-server >=0.4.4 && <1 , stm >=2.4.4 && <3 , text >=1.2.1 && <2 @@ -57,6 +60,10 @@ library , binary >=0.7.5 && <1 , QuickCheck >=2.8.1 && <3 , quickcheck-instances >=0.3.11 && <4 + , websockets >=0.9.5 && <1 + , wai-websockets >=3.0.0 && <4 + , wai >=3.0.5 && <4 + , network-uri >=2.6.0 && <3 hs-source-dirs: src default-language: Haskell2010 @@ -65,7 +72,7 @@ Test-Suite tests hs-source-dirs: test main-is: Spec.hs build-depends: base >=4.8.1 && <5 - , thermoprint-server ==1.0.* + , thermoprint-server ==1.1.* , thermoprint-client ==0.0.* , thermoprint-spec -any , hspec >=2.2.1 && <3 diff --git a/server/thermoprint-server.nix b/server/thermoprint-server.nix index f472cbc..69ecd2f 100644 --- a/server/thermoprint-server.nix +++ b/server/thermoprint-server.nix @@ -1,30 +1,32 @@ { mkDerivation, base, binary, bytestring, conduit, containers , data-default-class, deepseq, dyre, either, encoding, exceptions , extended-reals, filelock, hspec, mmorph, monad-control -, monad-logger, mtl, persistent, persistent-sqlite +, monad-logger, mtl, network-uri, persistent, persistent-sqlite , persistent-template, QuickCheck, quickcheck-instances, resourcet -, servant-server, stdenv, stm, temporary, text, thermoprint-client -, thermoprint-spec, time, transformers, wai, warp +, servant, servant-server, stdenv, stm, temporary, text +, thermoprint-client, thermoprint-spec, time, transformers, wai +, wai-websockets, warp, websockets }: mkDerivation { pname = "thermoprint-server"; - version = "1.0.0"; + version = "1.1.0"; src = ./.; isLibrary = true; isExecutable = true; libraryHaskellDepends = [ base binary bytestring conduit containers data-default-class deepseq dyre either encoding exceptions extended-reals filelock - mmorph monad-control monad-logger mtl persistent - persistent-template resourcet servant-server stm text - thermoprint-spec time transformers wai warp + mmorph monad-control monad-logger mtl network-uri persistent + persistent-template QuickCheck quickcheck-instances resourcet + servant servant-server stm text thermoprint-spec time transformers + wai wai-websockets warp websockets ]; executableHaskellDepends = [ base monad-logger mtl persistent-sqlite resourcet ]; testHaskellDepends = [ - base exceptions hspec monad-logger mtl persistent-sqlite QuickCheck - quickcheck-instances resourcet stm temporary text + base containers exceptions hspec monad-logger mtl persistent-sqlite + QuickCheck quickcheck-instances resourcet stm temporary text thermoprint-client thermoprint-spec transformers warp ]; homepage = "http://dirty-haskell.org/tags/thermoprint.html"; -- cgit v1.2.3