diff --git a/docs/IcepeakProtocolMultiSub.md b/docs/IcepeakProtocolMultiSub.md new file mode 100644 index 0000000..c01be43 --- /dev/null +++ b/docs/IcepeakProtocolMultiSub.md @@ -0,0 +1,224 @@ +# The Icepeak Multiple-Subcription-Single-Connection Protocol + +This protocol was developed atop the existing protocol to allow the client to subscribe to multiple "paths"/"topics" using a single websocket connection. + +Whereas the original existing protocol required a new websocket connection for each path the client wanted to subscribe to. + +# Initial Connection + +A client can connect to a multiple-subscription protocol connection with a URL such as: +`ws://yourdomain.com/?method=reusable` + +The URL path: +- points to the root of your icepeak service +- has the query parameter `method` set to `reusable` + +The authorisation mechanism is a "subscription deadline timout": Initial websocket connection does not need authorisation, but if the client does not subscribe to a client before the server-configured timeout, then the connection will be closed un-politely, i.e the server will not send a WS close control message. + +Timeout is set by the `--first-subscription-deadline-timeout` flag, the input is in terms of microseconds, the default value is `1000000` (1 second). + + +# Subscribing, Unsubscribing & Updates + +In summary: +- The client can request to subscribe to an array of paths. +- The client, on the single connection, can cumulatively keep subscribing and unsubscribing to paths by sending corresponding payloads. + - The server also sends back a response about the status, and some data of the corresponding subscription or unsubscription request. +- The server will send the client the new value/update at the subscribed path whenever there is a change at that path. +- Both, subscription and unsubscription requests are idempotent, in terms of the effect on the state of the server, actual response received from client varies depending on the state of the server. + +## Update + +`JSON Schema` declaration of the update the server will send to the client upon subscribed path change: +```javascript +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "wss://updates.channable.com", + "title": "Path changes", + "description": "Indicates changes occurred at the subscribed paths", + "type": "object", + "properties": { + "type": { "const": "update" }, + "path": { "type": "string" }, + "value": {} + } +} +``` + +Example: +```javascript +{ + "type": "update", + "path": "path/to/value", + "value": +} +``` + +## Subscribe + +In summary: +- The client can send a payload that contains an array of paths to subscribe to. + - The client has to send a JWT that authorises all the paths. +- The client can expect a response from the server that contains the status/acknowledgement of the request. + - Upon a successful request, the client can expect the payload to also contain the current value of the paths requested. + +### Client Subscribe Request +Each subscription is checked against a JWT to see if the user is authorised to access the path(s). + +`JSON Schema` declaration of the subscribe client request: +```javascript +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "wss://updates.channable.com/", + "title": "Add subscription", + "description": "Adding an additional subscription", + "type": "object", + "properties": { + "type": { "const": "subscribe" }, + "paths": { "type": "array", "items": { "type": "string" } }, + "token": { "type": "string" } + } +} + +``` +Example: +```javascript +{ + "type": "subscribe", + "paths": [ "path/one", "path/two", "otherpath" ], + "token": "eyJ..." +} +``` + + +### Server Subscribe Response +The server sends back a payload to the client. The payload will always contain a status code: + +| Status code | When | +| ---- | -------------------------------------------- | +| 200 | Subscription was successfully processed | +| 400 | Request payload was malformed | +| 401 | No authorization token provided | +| 403 | Authorization token was rejected / malformed | + + +If the status code is `200` and **whether or not the client is already subscribed**, the client can expect a payload from server that contains: +- The status code. +- Path(s) of the subscription requested. +- The current value(s) of that path(s). + +`JSON Schema` declaration of the server response: +```javascript +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "wss://updates.channable.com", + "title": "Subscription status", + "description": "Indicates whether the subscription was successful", + "type": "object", + "properties": { + "type": { "const": "subscribe" }, + "paths": { + "type": "array", + "path": { + "type": "object", + "properties": { + "path": { "type": "string" }, + "value": {} + } + } + }, + "code": { "type": "number" }, + "message": { "type": "string" }, + "extra": {} + } +} +``` + +Example success: +```javascript +{ + "type": "subscribe", + "paths": [ + { "path": "path/one", "value": val }, + { "path": "path/two", "value": val2 }, + { "path": "otherpath", "value": val3 }, + ], + "code": 200, + "message": "You've been successfully subscribed to the paths", + "extra": {} +} +``` + +## Unsubscribe + +In summary: +- The client can send a payload that contains paths to unsubscribe from. +- The client can expect a response from the server that contains the status/acknowledgement of the request, and paths unsubscribed from. + +### Client Unsubscribe Request + +`JSON Schema` declaration of the unsubscribe client request: +```javascript +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "wss://updates.channable.com", + "title": "Remove Subscription", + "description": "Remove an existing subscription", + "type": "object", + "properties": { + "type": { "const": "unsubscribe" }, + "paths": { "type": "array", "items": { "type": "string" } } + } +} +``` + +Example unsubscribe request: +```javascript +{ + "type": "unsubscribe", + "paths": [ "path/one", "path/two", "path/three" ], +} +``` + +### Server Unsubscribe Response +The server sends back a payload to the client. The payload will always contain a status code: + +| Status code | When | +| ------------- | -------------------------------- | +| 200 | Unsubscription was successfully processed | +| 400 | Request payload was malformed | + +If the status code is `200`, the client can expect a payload from server that contains: +- the list of paths that the client had been meaningfully unsubscribe from, i.e only the paths that the client had subscribed to. + +`JSON Schema` declaration of the unsubscribe client request: +```javascript +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "wss://updates.channable.com", + "title": "Unsubscription status", + "description": "Indicates whether the unsubscription was successful", + "type": "object", + "properties": { + "type": { "const": "unsubscribe" }, + "paths": { "type": "array", "items": { "type": "string" } }, + "code": { "type": "number" }, + "message": { "type": "string" }, + "extra": {} + } +} +``` + +Example succesful unsubscribe response: +```javascript +{ + "type": "unsubscribe", + "paths": [ "path/one", "path/two" ], + "code": 200, + "message": "You've been successfully unsubscribed from the paths", + "extra": {} +} +``` + +# Invalid Client Message +The server will close the websocket connection with an informative message if the client payload is not recognised. diff --git a/server/icepeak.cabal b/server/icepeak.cabal index ecca7b6..d9894b8 100644 --- a/server/icepeak.cabal +++ b/server/icepeak.cabal @@ -34,6 +34,10 @@ library Icepeak.Server.Store Icepeak.Server.Subscription Icepeak.Server.WebsocketServer + Icepeak.Server.WebsocketServer.SingleSubscription + Icepeak.Server.WebsocketServer.MultiSubscription + Icepeak.Server.WebsocketServer.Payload + Icepeak.Server.WebsocketServer.Utils other-modules: Paths_icepeak hs-source-dirs: src @@ -177,6 +181,7 @@ test-suite spec Icepeak.Server.SocketSpec Icepeak.Server.StoreSpec Icepeak.Server.SubscriptionTreeSpec + Icepeak.Server.MultiSubscriptionSpec OrphanInstances Paths_icepeak @@ -223,5 +228,6 @@ test-suite spec , wai-websockets , warp , websockets + , hspec-expectations-json default-language: Haskell2010 diff --git a/server/icepeak.nix b/server/icepeak.nix index 58a555a..50913fa 100644 --- a/server/icepeak.nix +++ b/server/icepeak.nix @@ -5,8 +5,9 @@ # Haskell packages , aeson, async, base, bytestring, clock, containers, directory, hashable, hspec -, hspec-wai, http-types, jwt, monad-logger, mtl, network, optparse-applicative -, prometheus-client, prometheus-metrics-ghc, QuickCheck, quickcheck-instances +, hspec-wai, hspec-expectations-json, http-types, jwt, monad-logger, mtl, network +, optparse-applicative, prometheus-client +, prometheus-metrics-ghc, QuickCheck, quickcheck-instances , random, raven-haskell, scotty, sqlite-simple, securemem, stm, text, time, unix , unordered-containers, uuid, wai, wai-extra, wai-middleware-prometheus , wai-websockets, warp, websockets }: @@ -83,7 +84,7 @@ mkDerivation { websockets ]; - testHaskellDepends = [ hspec hspec-wai QuickCheck quickcheck-instances ]; + testHaskellDepends = [ hspec hspec-wai QuickCheck quickcheck-instances hspec-expectations-json ]; license = lib.licenses.bsd3; } diff --git a/server/src/Icepeak/Server/Config.hs b/server/src/Icepeak/Server/Config.hs index b9d00c3..4aa629d 100644 --- a/server/src/Icepeak/Server/Config.hs +++ b/server/src/Icepeak/Server/Config.hs @@ -1,3 +1,5 @@ +{-# LANGUAGE NumericUnderscores #-} + module Icepeak.Server.Config ( Config (..), MetricsConfig (..), @@ -51,6 +53,11 @@ data Config = Config -- to respond with a pong. If no pong is sent within this timeframe then the -- connection is considered to have timed out and it will be terminated. , configWebSocketPongTimeout:: Int + -- | The amount of time in microseconds to wait for a subscription request before closing the connection. + -- This is used for the 'MultiSubscription.hs' protocol. + -- The initial connection to the server is not behind authorisation, this timeout mechanism + -- is used to prevent unwanted connections. + , configInitialSubscriptionTimeoutMicroSeconds:: Int } data MetricsConfig = MetricsConfig @@ -121,6 +128,11 @@ configParser environment = Config metavar "WS-PONG-TIMEOUT" <> value 30 <> help "The timespan in seconds after sending a ping during which the client has to respond with a pong. If no pong is sent within this timeframe then the connection is considered to have timed out and it will be terminated.") + <*> option auto + (long "first-subscription-deadline-timeout" <> + metavar "MICROSECONDS" <> + value 1_000_000 <> -- 1 second + help "The amount of time in microseconds to wait for a subscription request before closing the connection. This is used for the multiple subscription protocol. The initial connection to the server is not behind authorisation, and hence this timeout mechanism is used to disconnect unwanted connections.") where environ var = foldMap value (lookup var environment) diff --git a/server/src/Icepeak/Server/Core.hs b/server/src/Icepeak/Server/Core.hs index ce066d2..e420a6d 100644 --- a/server/src/Icepeak/Server/Core.hs +++ b/server/src/Icepeak/Server/Core.hs @@ -1,11 +1,10 @@ {-# LANGUAGE OverloadedStrings #-} -module Icepeak.Server.Core -( + +module Icepeak.Server.Core ( Core (..), -- TODO: Expose only put for clients. EnqueueResult (..), Command (..), ServerState, - SubscriberState (..), Updated (..), enqueueCommand, tryEnqueueCommand, @@ -13,17 +12,16 @@ module Icepeak.Server.Core withCoreMetrics, lookup, newCore, - newSubscriberState, postQuit, runCommandLoop, - runSyncTimer + runSyncTimer, ) where import Control.Concurrent (threadDelay) -import Control.Concurrent.MVar (MVar, newEmptyMVar, newMVar, putMVar) +import Control.Concurrent.MVar (MVar, newMVar, putMVar) import Control.Concurrent.STM (atomically) -import Control.Concurrent.STM.TBQueue (TBQueue, newTBQueueIO, readTBQueue, writeTBQueue, isFullTBQueue) +import Control.Concurrent.STM.TBQueue (TBQueue, isFullTBQueue, newTBQueueIO, readTBQueue, writeTBQueue) import Control.Concurrent.STM.TVar (TVar, newTVarIO) import Control.Monad (forever, unless, when) import Control.Monad.IO.Class @@ -35,13 +33,13 @@ import Prelude hiding (log, writeFile) import Icepeak.Server.Config (Config (..), periodicSyncingEnabled) import Icepeak.Server.Logger (Logger) -import Icepeak.Server.Store (Path, Modification (..)) +import Icepeak.Server.Persistence (PersistenceConfig (..), PersistentValue) +import Icepeak.Server.Store (Modification (..), Path) import Icepeak.Server.Subscription (SubscriptionTree, empty) -import Icepeak.Server.Persistence (PersistentValue, PersistenceConfig (..)) -import qualified Icepeak.Server.Store as Store -import qualified Icepeak.Server.Persistence as Persistence import qualified Icepeak.Server.Metrics as Metrics +import qualified Icepeak.Server.Persistence as Persistence +import qualified Icepeak.Server.Store as Store -- | Defines the kinds of commands that are handled by the event loop of the Core. data Command @@ -63,56 +61,48 @@ data EnqueueResult = Enqueued | Dropped data Core = Core { coreCurrentValue :: PersistentValue - -- the "dirty" flag is set to True whenever the core value has been modified - -- and is reset to False when it is persisted. - , coreValueIsDirty :: TVar Bool - , coreQueue :: TBQueue Command - , coreUpdates :: TBQueue (Maybe Updated) - , coreClients :: MVar ServerState - , coreLogger :: Logger - , coreConfig :: Config - , coreMetrics :: Maybe Metrics.IcepeakMetrics + , -- the "dirty" flag is set to True whenever the core value has been modified + -- and is reset to False when it is persisted. + coreValueIsDirty :: TVar Bool + , coreQueue :: TBQueue Command + , coreUpdates :: TBQueue (Maybe Updated) + , coreClients :: MVar ServerState + , coreLogger :: Logger + , coreConfig :: Config + , coreMetrics :: Maybe Metrics.IcepeakMetrics } --- This structure describes the state associated to each subscriber in order --- to communicate with them, at the moment, a simple `MVar` as a communication --- channel. This is a newtype in order for it to be extensible without --- rewriting all call sites. -newtype SubscriberState = SubscriberState - -- We don't need actual queues for subscribers because the only relevant value - -- for them is the last one. Since we have one producer and one reader, we can - -- rely on MVar as a simpler mechanism. - -- We can expect from the subscribers to receive all the updates in the - -- absence of timeouts. - { subscriberData :: MVar Value } - --- This structure keeps track of all subscribers. We use one SubscriberState per --- subscriber. -type ServerState = SubscriptionTree UUID SubscriberState +-- This structure keeps track of all subscribers. +-- Each subscriber is associated with a function for how to notify them. +-- This is a a flexability in order to support the multiple protcols that +-- exist for subscribing, i.e 'MultiSubscription.hs' and 'SingleSubscription.hs'. +type ServerState = SubscriptionTree UUID (Value -> IO ()) newServerState :: ServerState newServerState = empty -newSubscriberState :: IO SubscriberState -newSubscriberState = SubscriberState <$> newEmptyMVar - -- | Try to initialize the core. This loads the database and sets up the internal data structures. newCore :: Config -> Logger -> Maybe Metrics.IcepeakMetrics -> IO (Either String Core) newCore config logger metrics = do let queueCapacity = fromIntegral . configQueueCapacity $ config -- load the persistent data from disk - let filePath = Persistence.getDataFile (configStorageBackend config) (configDataFile config) - journalFile - | configEnableJournaling config - && periodicSyncingEnabled config = Just $ filePath ++ ".journal" - | otherwise = Nothing - eitherValue <- Persistence.loadFromBackend (configStorageBackend config) PersistenceConfig - { pcDataFile = filePath - , pcJournalFile = journalFile - , pcLogger = logger - , pcMetrics = metrics - , pcLogSync = configSyncLogging config - } + let + filePath = Persistence.getDataFile (configStorageBackend config) (configDataFile config) + journalFile + | configEnableJournaling config + && periodicSyncingEnabled config = + Just $ filePath ++ ".journal" + | otherwise = Nothing + eitherValue <- + Persistence.loadFromBackend + (configStorageBackend config) + PersistenceConfig + { pcDataFile = filePath + , pcJournalFile = journalFile + , pcLogger = logger + , pcMetrics = metrics + , pcLogSync = configSyncLogging config + } for eitherValue $ \value -> do -- create synchronization channels tdirty <- newTVarIO False @@ -156,7 +146,7 @@ getCurrentValue :: Core -> Path -> IO (Maybe Value) getCurrentValue core path = fmap (Store.lookup path) $ atomically $ Persistence.getValue $ coreCurrentValue core -withCoreMetrics :: MonadIO m => Core -> (Metrics.IcepeakMetrics -> IO ()) -> m () +withCoreMetrics :: (MonadIO m) => Core -> (Metrics.IcepeakMetrics -> IO ()) -> m () withCoreMetrics core act = liftIO $ forM_ (coreMetrics core) act -- | Drain the command queue and execute them. Changes are published to all @@ -164,27 +154,27 @@ withCoreMetrics core act = liftIO $ forM_ (coreMetrics core) act -- queue. runCommandLoop :: Core -> IO () runCommandLoop core = go - where - config = coreConfig core - currentValue = coreCurrentValue core - storageBackend = configStorageBackend config - go = do - command <- atomically $ readTBQueue (coreQueue core) - for_ (coreMetrics core) Metrics.incrementQueueRemoved - case command of - Modify op maybeNotifyVar -> do - Persistence.apply op currentValue - postUpdate (Store.modificationPath op) core - -- when periodic syncing is disabled, data is persisted after every modification - unless (periodicSyncingEnabled $ coreConfig core) $ - Persistence.syncToBackend storageBackend currentValue - mapM_ (`putMVar` ()) maybeNotifyVar - go - Sync -> do - maybe id Metrics.measureSyncDuration (coreMetrics core) $ - Persistence.syncToBackend storageBackend currentValue - go - Stop -> Persistence.syncToBackend storageBackend currentValue + where + config = coreConfig core + currentValue = coreCurrentValue core + storageBackend = configStorageBackend config + go = do + command <- atomically $ readTBQueue (coreQueue core) + for_ (coreMetrics core) Metrics.incrementQueueRemoved + case command of + Modify op maybeNotifyVar -> do + Persistence.apply op currentValue + postUpdate (Store.modificationPath op) core + -- when periodic syncing is disabled, data is persisted after every modification + unless (periodicSyncingEnabled $ coreConfig core) $ + Persistence.syncToBackend storageBackend currentValue + mapM_ (`putMVar` ()) maybeNotifyVar + go + Sync -> do + maybe id Metrics.measureSyncDuration (coreMetrics core) $ + Persistence.syncToBackend storageBackend currentValue + go + Stop -> Persistence.syncToBackend storageBackend currentValue -- | Post an update to the core's update queue (read by the websocket subscribers) postUpdate :: Path -> Core -> IO () @@ -198,15 +188,14 @@ postUpdate path core = do return full for_ (coreMetrics core) $ if isWsQueueFull - then Metrics.incrementWsQueueSkippedUpdates - else Metrics.incrementWsQueueAdded + then Metrics.incrementWsQueueSkippedUpdates + else Metrics.incrementWsQueueAdded -- | Periodically send a 'Sync' command to the 'Core' if enabled in the core -- configuration. runSyncTimer :: Core -> IO () runSyncTimer core = mapM_ go (configSyncIntervalMicroSeconds $ coreConfig core) - where - go interval = forever $ do - enqueueCommand Sync core - threadDelay interval - + where + go interval = forever $ do + enqueueCommand Sync core + threadDelay interval diff --git a/server/src/Icepeak/Server/WebsocketServer.hs b/server/src/Icepeak/Server/WebsocketServer.hs index e4eb326..aec5efc 100644 --- a/server/src/Icepeak/Server/WebsocketServer.hs +++ b/server/src/Icepeak/Server/WebsocketServer.hs @@ -4,31 +4,25 @@ module Icepeak.Server.WebsocketServer ( WSServerApp, -- The constructor is intentionally not exposed since the 'TimeSpec' should be -- initialized using the monotonic clock - WSServerOptions(), + WSServerOptions (), wsConnectionOpts, mkWSServerOptions, - ServerState, acceptConnection, - processUpdates + processUpdates, ) where -import Control.Concurrent (modifyMVar_, readMVar, threadDelay) -import Control.Concurrent.Async (race_, withAsync) -import Control.Concurrent.MVar (MVar, putMVar, takeMVar, tryTakeMVar) +import Control.Concurrent (readMVar, threadDelay) +import Control.Concurrent.Async (race_) import Control.Concurrent.STM (atomically) import Control.Concurrent.STM.TBQueue (readTBQueue) -import Control.Exception (AsyncException, SomeAsyncException, SomeException, catch, finally, fromException, handle, throwIO) -import Control.Monad (forever, unless, void, when) -import Data.Aeson (Value) +import Control.Exception (AsyncException, fromException, handle, throwIO) +import Control.Monad (unless, void) import Data.Foldable (for_) -import Data.IORef (IORef, atomicWriteIORef, readIORef, newIORef) +import Data.IORef (IORef, atomicWriteIORef, newIORef, readIORef) import Data.Text (Text) -import Data.UUID import System.Clock (Clock (Monotonic), TimeSpec (..), getTime) -import System.Random (randomIO) -import qualified Data.Aeson as Aeson import qualified Data.ByteString.Lazy as LBS import qualified Data.Text as T import qualified Data.Time.Clock.POSIX as Clock @@ -36,15 +30,16 @@ import qualified Network.HTTP.Types.Header as HttpHeader import qualified Network.HTTP.Types.URI as Uri import qualified Network.WebSockets as WS +import Icepeak.Server.AccessControl (AccessMode (..)) import Icepeak.Server.Config (Config (..)) -import Icepeak.Server.Core (Core (..), ServerState, SubscriberState (..), Updated (..), getCurrentValue, withCoreMetrics, newSubscriberState) -import Icepeak.Server.Store (Path) -import Icepeak.Server.AccessControl (AccessMode(..)) -import Icepeak.Server.JwtMiddleware (AuthResult (..), isRequestAuthorized, errorResponseBody) +import Icepeak.Server.Core (Core (..), ServerState, Updated (..)) +import Icepeak.Server.JwtMiddleware (AuthResult (..), errorResponseBody, isRequestAuthorized) + +import qualified Icepeak.Server.WebsocketServer.MultiSubscription as MultiSubscription +import qualified Icepeak.Server.WebsocketServer.SingleSubscription as SingleSubscription import qualified Icepeak.Server.Metrics as Metrics import qualified Icepeak.Server.Subscription as Subscription -import Data.Maybe (isJust) import System.Timeout (timeout) -- | 'WS.ServerApp' parameterized over the last received pong timestamp. See @@ -61,7 +56,7 @@ type WSServerApp = WSServerOptions -> WS.ServerApp -- the client answered our last ping with a pong. If it hasn't, then the server -- will terminate the websocket connection as timeouts would otherwise leave -- zombie connections. -newtype WSServerOptions = WSServerOptions { wsOptionLastPongTime :: IORef TimeSpec } +newtype WSServerOptions = WSServerOptions {wsOptionLastPongTime :: IORef TimeSpec} -- | Builds the /per connection/-connection settings. This hooks up the -- connection's pong handler to the last received pong time IORef so timeouts @@ -84,51 +79,75 @@ mkWSServerOptions = do lastPongTime <- getTime Monotonic WSServerOptions <$> newIORef lastPongTime -newUUID :: IO UUID -newUUID = randomIO - --- send the updated data to all subscribers to the path -broadcast :: Core -> [Text] -> Value -> ServerState -> IO () -broadcast core = - let - writeToSub :: MVar Value -> Value -> IO () - writeToSub queue val = do - -- We are the only producer, so either the subscriber already - -- read the value or we can discard it to replace it with the - -- new one. We don't need atomicity for this operation. - -- `tryTakeMVar` basically empties the MVar, from this perspective. - mbQueue <- tryTakeMVar queue - -- If the MVar has not yet been read by the subscriber thread, it means - -- that the update has been skipped. - when (isJust mbQueue) $ for_ (coreMetrics core) Metrics.incrementWsSkippedUpdates - putMVar queue val - in - Subscription.broadcast (writeToSub . subscriberData) +-- loop that is called for every update and that broadcasts the values to all +-- subscribers of the updated path +processUpdates :: Core -> IO () +processUpdates core = go + where + go = do + maybeUpdate <- atomically $ readTBQueue (coreUpdates core) + for_ (coreMetrics core) Metrics.incrementWsQueueRemoved + case maybeUpdate of + Just (Updated path value) -> do + clients <- readMVar (coreClients core) + Subscription.broadcast (\subscriberWrite newValue -> subscriberWrite newValue) path value clients + go + -- Stop the loop when we receive a Nothing. + Nothing -> pure () -- Called for each new client that connects. acceptConnection :: Core -> WSServerOptions -> WS.PendingConnection -> IO () acceptConnection core wsOptions pending = do -- printRequest pending -- TODO: Validate the path and headers of the pending request + let + (path, queryParams) = Uri.decodePath $ WS.requestPath $ WS.pendingRequest pending + config = coreConfig core + pingInterval = configWebSocketPingInterval config + onPing = pingHandler config wsOptions + + -- Fork a pinging thread, for each client, to keep idle connections open and to detect + -- closed connections. Sends a ping message every 30 seconds. + -- Note: The thread dies silently if the connection crashes or is closed. + runHandleClientWithPingThread connection handleClient = + withInterruptiblePingThread connection pingInterval onPing handleClient + + case lookup "method" queryParams of + Just (Just "reusable") -> acceptMultiSubscription core runHandleClientWithPingThread pending + Nothing -> acceptSingleSubscription core path runHandleClientWithPingThread pending + Just _ -> + WS.rejectRequestWith pending $ + WS.RejectRequest + { WS.rejectCode = 400 + , WS.rejectMessage = "Unrecognised query parameter value" + , WS.rejectHeaders = [(HttpHeader.hContentType, "text/plain")] + , WS.rejectBody = "Invalid 'method' query parameter value, expecting 'reusable'" + } + +type RunHandleClient = WS.Connection -> IO () -> IO () + +-- Immediately accept the connection, authorisation mechanism is done via a subscription deadline timeout. +acceptMultiSubscription :: Core -> RunHandleClient -> WS.PendingConnection -> IO () +acceptMultiSubscription core runHandleClient pending = do + connection <- WS.acceptRequest pending + runHandleClient connection $ + MultiSubscription.handleClient connection core + +acceptSingleSubscription :: Core -> [Text] -> RunHandleClient -> WS.PendingConnection -> IO () +acceptSingleSubscription core path runHandleClient pending = do authResult <- authorizePendingConnection core pending case authResult of AuthRejected err -> - WS.rejectRequestWith pending $ WS.RejectRequest - { WS.rejectCode = 401 - , WS.rejectMessage = "Unauthorized" - , WS.rejectHeaders = [(HttpHeader.hContentType, "application/json")] - , WS.rejectBody = LBS.toStrict $ errorResponseBody err - } + WS.rejectRequestWith pending $ + WS.RejectRequest + { WS.rejectCode = 401 + , WS.rejectMessage = "Unauthorized" + , WS.rejectHeaders = [(HttpHeader.hContentType, "application/json")] + , WS.rejectBody = LBS.toStrict $ errorResponseBody err + } AuthAccepted -> do - let path = fst $ Uri.decodePath $ WS.requestPath $ WS.pendingRequest pending - config = coreConfig core - pingInterval = configWebSocketPingInterval config - onPing = pingHandler config wsOptions connection <- WS.acceptRequest pending - -- Fork a pinging thread, for each client, to keep idle connections open and to detect - -- closed connections. Sends a ping message every 30 seconds. - -- Note: The thread dies silently if the connection crashes or is closed. - withInterruptiblePingThread connection pingInterval onPing $ handleClient connection path core + runHandleClient connection (SingleSubscription.handleClient connection path core) -- * Authorization @@ -136,95 +155,15 @@ authorizePendingConnection :: Core -> WS.PendingConnection -> IO AuthResult authorizePendingConnection core conn | configEnableJwtAuth (coreConfig core) = do now <- Clock.getPOSIXTime - let req = WS.pendingRequest conn - (path, query) = Uri.decodePath $ WS.requestPath req - headers = WS.requestHeaders req + let + req = WS.pendingRequest conn + (path, query) = Uri.decodePath $ WS.requestPath req + headers = WS.requestHeaders req return $ isRequestAuthorized headers query now (configJwtSecret (coreConfig core)) path ModeRead | otherwise = pure AuthAccepted --- * Client handling - -handleClient :: WS.Connection -> Path -> Core -> IO () -handleClient conn path core = do - uuid <- newUUID - subscriberState <- newSubscriberState - let - state = coreClients core - onConnect = do - modifyMVar_ state (pure . Subscription.subscribe path uuid subscriberState) - withCoreMetrics core Metrics.incrementSubscribers - onDisconnect = do - modifyMVar_ state (pure . Subscription.unsubscribe path uuid) - withCoreMetrics core Metrics.decrementSubscribers - sendInitialValue = do - currentValue <- getCurrentValue core path - WS.sendTextData conn (Aeson.encode currentValue) - -- For each connection, we want to spawn a client thread with an associated - -- queue, in order to manage subscribers. `withAsync` acts as `forkIO` in this - -- context, with the assurance the child thread is killed when the parent is. - manageConnection = withAsync (updateThread conn subscriberState) - (const $ keepTalking conn) - - -- Simply ignore connection errors, otherwise, Warp handles the exception - -- and sends a 500 response in the middle of a WebSocket connection, and - -- that violates the WebSocket protocol. - -- Note that subscribers are still properly removed by the finally below. - handleConnectionError :: WS.ConnectionException -> IO () - handleConnectionError _ = pure () - -- Put the client in the subscription tree and keep the connection open. - -- Remove it when the connection is closed. - finally (onConnect >> sendInitialValue >> manageConnection) onDisconnect - `catch` handleConnectionError - --- This function handles sending the updates to subscribers. -updateThread :: WS.Connection -> SubscriberState -> IO () -updateThread conn state = - let - send :: Value -> IO () - send value = - WS.sendTextData conn (Aeson.encode value) - `catch` - sendFailed - - sendFailed :: SomeException -> IO () - sendFailed exc - -- Rethrow async exceptions, they are meant for inter-thread communication - -- (e.g. ThreadKilled) and we don't expect them at this point. - | Just asyncExc <- fromException exc = throwIO (asyncExc :: SomeAsyncException) - -- We want to catch all other errors in order to prevent them from - -- bubbling up and disrupting the broadcasts to other clients. - | otherwise = pure () - in forever $ do - value <- takeMVar $ subscriberData state - send value - --- We don't send any messages here; sending is done by the update --- loop; it finds the client in the set of subscriptions. But we do --- need to keep the thread running, otherwise the connection will be --- closed. So we go into an infinite loop here. -keepTalking :: WS.Connection -> IO () -keepTalking conn = forever $ do - -- Note: WS.receiveDataMessage will handle control messages automatically and e.g. - -- do the closing handshake of the websocket protocol correctly - WS.receiveDataMessage conn - --- loop that is called for every update and that broadcasts the values to all --- subscribers of the updated path -processUpdates :: Core -> IO () -processUpdates core = go - where - go = do - maybeUpdate <- atomically $ readTBQueue (coreUpdates core) - for_ (coreMetrics core) Metrics.incrementWsQueueRemoved - case maybeUpdate of - Just (Updated path value) -> do - clients <- readMVar (coreClients core) - broadcast core path value clients - go - -- Stop the loop when we receive a Nothing. - Nothing -> pure () - -- * Timeout handling + -- -- The websockets library lets you send pings, but it has no built in way to -- terminate clients that never send a pong back. We implement this ourselves by @@ -245,9 +184,10 @@ pongHandler (WSServerOptions lastPongTime) = getTime Monotonic >>= void . atomic pingHandler :: Config -> WSServerOptions -> IO Bool pingHandler config (WSServerOptions lastPongTime) = do now <- getTime Monotonic - let pingInterval = TimeSpec (fromIntegral $ configWebSocketPingInterval config) 0 - pongTimeout = TimeSpec (fromIntegral $ configWebSocketPongTimeout config) 0 - lastPongDeadline = now - pingInterval - pongTimeout + let + pingInterval = TimeSpec (fromIntegral $ configWebSocketPingInterval config) 0 + pongTimeout = TimeSpec (fromIntegral $ configWebSocketPongTimeout config) 0 + lastPongDeadline = now - pingInterval - pongTimeout lastPong <- readIORef lastPongTime return $! lastPong < lastPongDeadline @@ -275,26 +215,26 @@ withInterruptiblePingThread conn pingInterval pingAction -- not break anything, although it will cause it to spam pings. interruptiblePingThread :: WS.Connection -> Int -> IO Bool -> IO () interruptiblePingThread conn pingInterval pingAction = ignore `handle` go 1 - where - pingIntervalUs :: Int - pingIntervalUs = pingInterval * 1000 * 1000 - - go :: Int -> IO () - go i = do - threadDelay pingIntervalUs - -- If the send buffer is full (e.g. because we pushed a lot of updates to - -- a client that's timed out) then this send will block indefinitely. - -- Adding the timeout here prevents this from happening, and it also - -- interacts nicely with the @pingAction@. - _ <- timeout pingIntervalUs $ WS.sendPing conn (T.pack $ show i) - -- The difference with the original 'pingThread' is that this action now - -- returns a boolean, and we'll terminate this thread when that action - -- returns true - hasTimedOut <- pingAction - unless hasTimedOut $ go (i + 1) - - -- The rest of this function is exactly the same as the 'pingThread' in - -- @websockets-0.12.7.3@ - ignore e = case fromException e of - Just async -> throwIO (async :: AsyncException) - Nothing -> return () + where + pingIntervalUs :: Int + pingIntervalUs = pingInterval * 1000 * 1000 + + go :: Int -> IO () + go i = do + threadDelay pingIntervalUs + -- If the send buffer is full (e.g. because we pushed a lot of updates to + -- a client that's timed out) then this send will block indefinitely. + -- Adding the timeout here prevents this from happening, and it also + -- interacts nicely with the @pingAction@. + _ <- timeout pingIntervalUs $ WS.sendPing conn (T.pack $ show i) + -- The difference with the original 'pingThread' is that this action now + -- returns a boolean, and we'll terminate this thread when that action + -- returns true + hasTimedOut <- pingAction + unless hasTimedOut $ go (i + 1) + + -- The rest of this function is exactly the same as the 'pingThread' in + -- @websockets-0.12.7.3@ + ignore e = case fromException e of + Just async -> throwIO (async :: AsyncException) + Nothing -> return () diff --git a/server/src/Icepeak/Server/WebsocketServer/MultiSubscription.hs b/server/src/Icepeak/Server/WebsocketServer/MultiSubscription.hs new file mode 100644 index 0000000..0af754a --- /dev/null +++ b/server/src/Icepeak/Server/WebsocketServer/MultiSubscription.hs @@ -0,0 +1,445 @@ +{-# LANGUAGE OverloadedStrings #-} + +module Icepeak.Server.WebsocketServer.MultiSubscription (handleClient) where + +import Control.Concurrent.MVar (MVar) +import Control.Exception (Exception) +import Data.Aeson (Value, (.=)) +import Data.Functor ((<&>)) +import Data.HashMap.Strict (HashMap) +import Data.Text (Text) +import Data.UUID (UUID) + +import qualified Control.Concurrent as Concurrent +import qualified Control.Concurrent.Async as Async +import qualified Control.Concurrent.MVar as MVar +import qualified Control.Exception as Exception +import qualified Control.Monad as Monad +import qualified Data.Aeson as Aeson +import qualified Data.HashMap.Strict as HashMap +import qualified Data.Maybe as Maybe +import qualified Data.Text as Text +import qualified Data.Text.Encoding as Text +import qualified Data.Time.Clock.POSIX as Clock +import qualified Network.WebSockets as WS +import qualified Web.JWT as JWT + +import Icepeak.Server.Config (Config) +import Icepeak.Server.Core (Core) +import Icepeak.Server.Store (Path) +import Icepeak.Server.WebsocketServer.Payload + +import qualified Icepeak.Server.AccessControl as Access +import qualified Icepeak.Server.Config as Config +import qualified Icepeak.Server.Core as Core +import qualified Icepeak.Server.JwtAuth as JwtAuth +import qualified Icepeak.Server.Metrics as Metrics +import qualified Icepeak.Server.Subscription as Subscription +import qualified Icepeak.Server.WebsocketServer.Utils as Utils + +-- * Client handling + +-- ** Sending Response Payloads + +data Client = Client + { clientConn :: WS.Connection + , clientUuid :: UUID + , clientCore :: Core + , clientIsDirty :: MVar () + , clientSubscriptions :: MVar (HashMap Path (MVar Value)) + , clientHasSubscribed :: MVar Bool + } + +doSubscribe :: Client -> [Path] -> IO () +doSubscribe client paths = do + let + core = clientCore client + conn = clientConn client + + uuid = clientUuid client + isDirty = clientIsDirty client + subscriptions = clientSubscriptions client + + coreClients = Core.coreClients core + + pathsWithCurrentValue <- Monad.forM paths $ + \path -> do + valueAtPath <- Core.getCurrentValue core path + pure (Text.intercalate "/" path, valueAtPath) + + MVar.modifyMVar_ (clientHasSubscribed client) (pure . const True) + + WS.sendTextData conn $ + Aeson.encode $ + ResponseSubscribeSuccess + { subscribeSuccessPathsValues = pathsWithCurrentValue + } + + -- WARNING: Race condition, potentially miss an update when program is at this point + + Monad.forM_ paths $ \newPath -> do + pathValueMVar <- MVar.newEmptyMVar + + MVar.modifyMVar_ + subscriptions + (pure . HashMap.insert newPath pathValueMVar) + + MVar.modifyMVar_ + coreClients + ( pure + . Subscription.subscribe + newPath + uuid + (\newValue -> do + Utils.writeToSub core pathValueMVar newValue + Monad.void $ MVar.tryPutMVar isDirty ()) + ) + +onPayloadSubscribeWithAuth + :: Client + -> JWT.VerifySigner + -> RequestSubscribe + -> IO () +onPayloadSubscribeWithAuth client secret (RequestSubscribe paths mbToken) = do + let conn = clientConn client + case mbToken of + Nothing -> do + WS.sendTextData conn $ -- 401 | No authorization token provided + Aeson.encode $ + ResponseSubscribeFailure + { subscribeFailureStatusCode = 401 + , subscribeFailureMessage = "No authorisation token provided" + , subscribeFailureExtraData = Nothing + , subscribeFailurePaths = Just paths + } + Just tokenBS -> do + let segmentedPaths = Text.splitOn "/" <$> paths :: [Path] + now <- Clock.getPOSIXTime + case JwtAuth.extractClaim now secret (Text.encodeUtf8 tokenBS) of + Left tokenError -> do + -- 403 | Authorization token was rejected / malformed | + WS.sendTextData conn $ + Aeson.encode $ + ResponseSubscribeFailure + { subscribeFailureStatusCode = 403 + , subscribeFailureMessage = "Error while extracting claim from JWT: " <> Text.pack (show tokenError) + , subscribeFailureExtraData = Nothing + , subscribeFailurePaths = Just paths + } + Right authenticatedIcePeakClaim -> do + let + pathsIsAuth = + segmentedPaths + <&> ( \path -> + ( path + , Access.isAuthorizedByClaim authenticatedIcePeakClaim path Access.ModeRead + ) + ) + allAuth = and $ snd <$> pathsIsAuth + if allAuth + then doSubscribe client segmentedPaths + else + WS.sendTextData conn $ -- 403 | Authorization token was rejected / malformed | + Aeson.encode $ + ResponseSubscribeFailure + { subscribeFailureStatusCode = 403 + , subscribeFailureMessage = "Some paths are not authorised by the provided JWT claim" + , subscribeFailureExtraData = + Just $ + Aeson.object ["unauthorisedPaths" .= (fst <$> filter (not . snd) pathsIsAuth)] + , subscribeFailurePaths = Just paths + } + +onPayloadSubscribeNoAuth + :: Client + -> RequestSubscribe + -> IO () +onPayloadSubscribeNoAuth client (RequestSubscribe paths _) = do + let segmentedPaths = Text.splitOn "/" <$> paths :: [Path] + doSubscribe client segmentedPaths + +onPayloadUnsubscribe + :: Client + -> RequestUnsubscribe + -> IO () +onPayloadUnsubscribe client (RequestUnsubscribe paths) = do + let + conn = clientConn client + core = clientCore client + + segmentedPaths = Text.splitOn "/" <$> paths :: [Path] + + uuid = clientUuid client + subscriptions = clientSubscriptions client + + coreClients = Core.coreClients core + + unsubscribedPaths <- + Maybe.catMaybes + <$> Monad.forM + segmentedPaths + ( \path -> do + pathIsSubscribed <- MVar.readMVar subscriptions <&> HashMap.member path + case pathIsSubscribed of + True -> do + MVar.modifyMVar_ + coreClients + (pure . Subscription.unsubscribe path uuid) + + MVar.modifyMVar_ + subscriptions + (pure . HashMap.delete path) + pure $ Just $ Text.intercalate "/" path + False -> pure Nothing + ) + + WS.sendTextData conn $ + Aeson.encode $ + ResponseUnsubscribeSuccess{unsubscribeSuccessPaths = unsubscribedPaths} + +onPayloadMalformed + :: Client + -> RequestMalformedError + -> IO () +onPayloadMalformed client requestMalformedError = do + let + conn = clientConn client + + closeConnection :: CloseType -> IO () + closeConnection closeType = do + -- NOTE: + -- This will issue a control message to the peer. + -- The connection will stay alive, and we will be expecting + -- the peer to eventually respond with a close control message + -- of its own, which will cause receiveDataMessage to + -- throw a CloseRequest exception. + WS.sendCloseCode + conn + (closeCode closeType) + (closeMessage closeType) + + respondMalformedSubscribe :: Text -> IO () + respondMalformedSubscribe extra = do + WS.sendTextData conn $ + Aeson.encode $ + ResponseSubscribeFailure + { subscribeFailureStatusCode = 400 + , subscribeFailureMessage = "Subscribe request payload is malformed" + , subscribeFailureExtraData = Just $ Aeson.toJSON extra + , subscribeFailurePaths = Nothing + } + + respondMalformedUnsubscribe :: Text -> IO () + respondMalformedUnsubscribe extra = do + WS.sendTextData conn $ + Aeson.encode $ + ResponseUnsubscribeFailure + { unsubscribeFailureStatusCode = 400 + , unsubscribeFailureMessage = "Unsubscribe request payload is malformed" + , unsubscribeFailurePaths = Nothing + , unsubscribeFailureExtraData = Just $ Aeson.toJSON extra + } + + case requestMalformedError of + PayloadSizeOutOfBounds -> + closeConnection TypeSizeOutOfBounds + UnexpectedBinaryPayload -> + closeConnection TypeBinaryPayload + JsonDecodeError _decodeError -> + closeConnection TypeJsonDecodeError + PayloadNotAnObject -> + closeConnection TypePayloadNotObject + MissingOrUnexpectedType _unexpectedType -> + closeConnection TypeMissingOrUnexpectedType + SubscribePathsMissingOrMalformed pathsParseError -> + respondMalformedSubscribe $ + "Subscribe paths are missing or malformed: " <> pathsParseError + SubscribeTokenNotAString tokenParseError -> + respondMalformedSubscribe $ + "Subscribe token is not a string: " <> tokenParseError + UnsubscribePathsMissingOrMalformed pathsParseError -> + respondMalformedUnsubscribe $ + "Unsubscribe paths are missing or malformed: " <> pathsParseError + +-- | Perform the server response based on the request and config. +-- `Core.Config` is needed to determine if auth is enabled. +onPayload + :: Config -> Client -> RequestPayload -> IO () +onPayload coreConfig client (RequestPayloadSubscribe requestSubscribe) = do + let + jwtEnabled = Config.configEnableJwtAuth coreConfig + jwtSecret = Config.configJwtSecret coreConfig + + case (jwtEnabled, jwtSecret) of + (True, Just secret) -> onPayloadSubscribeWithAuth client secret requestSubscribe + (False, Just _) -> onPayloadSubscribeNoAuth client requestSubscribe + (True, Nothing) -> onPayloadSubscribeNoAuth client requestSubscribe + (False, Nothing) -> onPayloadSubscribeNoAuth client requestSubscribe +onPayload _ client (RequestPayloadUnsubscribe requestUnsubscribe) = + onPayloadUnsubscribe client requestUnsubscribe +onPayload _ client (RequestPayloadMalformed malformedPayload) = + onPayloadMalformed client malformedPayload + +onConnect :: Client -> IO () +onConnect client = + Core.withCoreMetrics (clientCore client) Metrics.incrementSubscribers + +onDisconnect :: Client -> IO () +onDisconnect client = do + let + core = clientCore client + subscriptions = clientSubscriptions client + uuid = clientUuid client + + paths <- HashMap.keys <$> MVar.takeMVar subscriptions + Monad.forM_ + paths + ( \path -> + MVar.modifyMVar_ + (Core.coreClients core) + (pure . Subscription.unsubscribe path uuid) + ) + + Core.withCoreMetrics core Metrics.decrementSubscribers + +data SubscriptionTimeout = SubscriptionTimeout + deriving (Show) + +instance Exception SubscriptionTimeout + +handleClient :: WS.Connection -> Core -> IO () +handleClient conn core = do + uuid <- Utils.newUUID + isDirty <- MVar.newMVar () + hasSubscribed <- MVar.newMVar False + subscriptions <- MVar.newMVar (HashMap.empty :: HashMap [Text] (MVar Value)) + + let + client = + Client + { clientConn = conn + , clientUuid = uuid + , clientCore = core + , clientIsDirty = isDirty + , clientSubscriptions = subscriptions + , clientHasSubscribed = hasSubscribed + } + + manageConnection = + Async.withAsync + (startUpdaterThread client) + -- This thread is only responsive to aync exceptions, and it ignores any other exception that may happen inside it and keeps going. + -- This makes sense for 2 reasons: + -- 1. `withAsync` will send a kill signal to the inner action ("updaterThread" in this case) when the outer action exists or throws an exception. + -- 2. Since we do not use the `Async` handle of the inner action, we do not call `wait` on it in the outer action. + -- `wait` is the mechanism that propagates any exception that might be thrown. + -- Since we do not use `wait`, if an exception is thrown inside the inner action, then it will just silently die. + + -- It's important that the below action is the outer action of the `withAsync` as + -- we rely on the Exceptions the outer action throws for 4 reasons: + -- 1. For `withAsync` to kill the inner "updaterThread" thread when the outer action throws. + -- 2. To propagate the SubscriptionTimeout exception from the `withSubscribeTimeout`. + -- 3. To propagate the ConnectionException from the `receiveDataMessage` that is + -- thrown within the "messageHandlerThread". + -- 4. So that `onDisconnect` runs, see that `manageConnection` is in a `finally`. + ( const $ + withSubscribeTimeout + client + (startMessageHandlerThread client) + ) + + -- Simply ignore connection errors, otherwise, Warp handles the exception + -- and sends a 500 response in the middle of a WebSocket connection, and + -- that violates the WebSocket protocol. + -- Note that subscribers are still properly removed by the finally below. + + -- We can also ignore the websocket 'CloseRequest' exception because wai-websockets ensures + -- that the connection is closed when this IO action finishes, as this IO action is executing in a bracket. + -- https://github.com/yesodweb/wai/blob/c92d7b1993338fb0f91e0f5f34fb9678871028a0/wai-websockets/Network/Wai/Handler/WebSockets.hs#L98 + handleConnectionError :: WS.ConnectionException -> IO () + handleConnectionError _ = pure () + + -- Since we are in a bracket that closes the connection, let the action silently exit. + handleSubscriptionTimeout :: SubscriptionTimeout -> IO () + handleSubscriptionTimeout _ = pure () + + -- Run the threads to propagate updates to client, and to receive and handle payloads + -- Upon threads dying, unsubscribe client from all subscriptions, then exit the procedure + -- to let `wai-websockets` close the websocket stream (since we are in a bracket) + Exception.finally + (onConnect client >> manageConnection) + (onDisconnect client) + `Exception.catch` handleConnectionError + `Exception.catch` handleSubscriptionTimeout + +-- After timeout, throw the action an exception if the client hasn't subscribed. +withSubscribeTimeout :: Client -> IO a -> IO a +withSubscribeTimeout client action = do + let + initalSubscriptionTimeout = + Config.configInitialSubscriptionTimeoutMicroSeconds $ + Core.coreConfig $ + clientCore client + + checkClientHasSubscribed = do + MVar.readMVar $ clientHasSubscribed client + + threadId <- Concurrent.myThreadId + Monad.void $ Concurrent.forkIO $ do + Concurrent.threadDelay initalSubscriptionTimeout + hasSubscribed <- checkClientHasSubscribed + Monad.when (not hasSubscribed) (Concurrent.throwTo threadId SubscriptionTimeout) + action + +startMessageHandlerThread :: Client -> IO () +startMessageHandlerThread client = Monad.forever $ do + let + coreConfig = Core.coreConfig $ clientCore client + conn = clientConn client + -- Note: WS.receiveDataMessage will handle control messages automatically. + -- Upon a close control message, a CloseRequest exception will be thrown. + dataMessage <- WS.receiveDataMessage conn + onPayload coreConfig client $ + parseDataMessage dataMessage + +takeMVarUpdatedValues :: Client -> IO [(Text, Value)] +takeMVarUpdatedValues client = do + let + isDirty = clientIsDirty client + subscriptions = clientSubscriptions client + MVar.takeMVar isDirty + valueMVars <- HashMap.toList <$> MVar.readMVar subscriptions + Maybe.catMaybes + <$> ( Monad.forM valueMVars $ + \(path, valueMVar) -> + fmap (\v -> (Text.intercalate "/" path, v)) + <$> MVar.tryTakeMVar valueMVar + ) + +-- This function handles sending the updates to subscribers. +startUpdaterThread :: Client -> IO () +startUpdaterThread client = + let + conn = clientConn client + + send :: Text -> Value -> IO () + send path value = + Exception.catch + ( WS.sendTextData conn $ + Aeson.encode $ + Update path value + ) + sendFailed + + sendFailed :: Exception.SomeException -> IO () + sendFailed exc + -- Rethrow async exceptions, they are meant for inter-thread communication + -- (e.g. ThreadKilled) and we don't expect them at this point. + | Just asyncExc <- Exception.fromException exc = Exception.throwIO (asyncExc :: Exception.SomeAsyncException) + -- We want to catch all other errors in order to prevent them from + -- bubbling up and disrupting the broadcasts to other clients. + | otherwise = pure () + in + Monad.forever $ do + value <- takeMVarUpdatedValues client + mapM (uncurry send) value diff --git a/server/src/Icepeak/Server/WebsocketServer/Payload.hs b/server/src/Icepeak/Server/WebsocketServer/Payload.hs new file mode 100644 index 0000000..14abdfa --- /dev/null +++ b/server/src/Icepeak/Server/WebsocketServer/Payload.hs @@ -0,0 +1,254 @@ +{-# LANGUAGE NumericUnderscores #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TypeApplications #-} + +module Icepeak.Server.WebsocketServer.Payload where + +import Data.Aeson (Value, (.:), (.:?), (.=)) +import Data.Functor ((<&>)) +import Data.Text (Text) +import Data.Word (Word16) + +import qualified Data.Aeson as Aeson +import qualified Data.Aeson.Types as Aeson +import qualified Data.Bifunctor as Bifunctor +import qualified Data.ByteString as ByteString +import qualified Data.ByteString.Lazy as Lazy (ByteString) +import qualified Data.ByteString.Lazy.Internal as Lazy.ByteString +import qualified Data.Text as Text +import qualified Network.WebSockets as WebSockets + +-- * Request + +data RequestPayload + = RequestPayloadSubscribe RequestSubscribe + | RequestPayloadUnsubscribe RequestUnsubscribe + | RequestPayloadMalformed RequestMalformedError + +data RequestSubscribe = RequestSubscribe + { requestSubscribePaths :: [Text] + , requestSubscribeToken :: Maybe Text + } + +data RequestUnsubscribe = RequestUnsubscribe + { requestUnsubscribePaths :: [Text] + } + +data RequestMalformedError + = PayloadSizeOutOfBounds + | UnexpectedBinaryPayload + | JsonDecodeError Text + | PayloadNotAnObject + | MissingOrUnexpectedType Text + | SubscribePathsMissingOrMalformed Text + | SubscribeTokenNotAString Text + | UnsubscribePathsMissingOrMalformed Text + deriving (Show) + +-- * Update + +data Update = Update + { updatePath :: Text + , updateValue :: Value + } + +instance Aeson.ToJSON Update where + toJSON (Update path value) = + Aeson.object + [ "type" .= ("update" :: Text) + , "path" .= path + , "value" .= value + ] + +-- * Response + +data ResponseSubscribeSuccess = ResponseSubscribeSuccess + {subscribeSuccessPathsValues :: [(Text, Maybe Value)]} + +instance Aeson.ToJSON ResponseSubscribeSuccess where + toJSON (ResponseSubscribeSuccess pathsValues) = + Aeson.object + [ "type" .= ("subscribe" :: Text) + , "code" .= (200 :: Int) + , "message" .= ("You've been successfully subscribed to the paths" :: Text) + , "extra" .= Aeson.object [] + , "paths" + .= ( pathsValues + <&> \(path, value) -> + Aeson.object ["path" .= path, "value" .= value] + ) + ] + +data ResponseSubscribeFailure = ResponseSubscribeFailure + { subscribeFailureStatusCode :: Int + , subscribeFailureMessage :: Text + , subscribeFailurePaths :: Maybe [Text] + , subscribeFailureExtraData :: Maybe Value + } + +instance Aeson.ToJSON ResponseSubscribeFailure where + toJSON (ResponseSubscribeFailure code message mbPaths extra) = + Aeson.object $ baseKeyValues <> addonKeyValues + where + baseKeyValues :: [Aeson.Pair] + baseKeyValues = + [ "type" .= ("subscribe" :: Text) + , "code" .= (code :: Int) + , "message" .= (message :: Text) + , "extra" .= extra + ] + + addonKeyValues :: [Aeson.Pair] + addonKeyValues = + case mbPaths of + Just paths -> ["paths" .= paths] + Nothing -> [] + +newtype ResponseUnsubscribeSuccess = ResponseUnsubscribeSuccess + { unsubscribeSuccessPaths :: [Text] + } + +instance Aeson.ToJSON ResponseUnsubscribeSuccess where + toJSON (ResponseUnsubscribeSuccess paths) = + Aeson.object + [ "type" .= ("unsubscribe" :: Text) + , "paths" .= paths + , "code" .= (200 :: Int) + , "message" .= ("You've been successfully unsubscribed from the paths" :: Text) + , "extra" .= Aeson.object [] + ] + +data ResponseUnsubscribeFailure = ResponseUnsubscribeFailure + { unsubscribeFailureStatusCode :: Int + , unsubscribeFailureMessage :: Text + , unsubscribeFailurePaths :: Maybe [Text] + , unsubscribeFailureExtraData :: Maybe Value + } + +instance Aeson.ToJSON ResponseUnsubscribeFailure where + toJSON (ResponseUnsubscribeFailure code message mbPaths extraData) = + Aeson.object $ baseKeyValues <> addonKeyValues + where + baseKeyValues :: [Aeson.Pair] + baseKeyValues = + [ "type" .= ("unsubscribe" :: Text) + , "code" .= (code :: Int) + , "message" .= (message :: Text) + , "extra" .= extraData + ] + + addonKeyValues :: [Aeson.Pair] + addonKeyValues = + case mbPaths of + Just paths -> ["paths" .= paths] + Nothing -> [] + +-- * Close Conn Reason + +data CloseType + = TypeSizeOutOfBounds + | TypeBinaryPayload + | TypeJsonDecodeError + | TypePayloadNotObject + | TypeMissingOrUnexpectedType + deriving (Show, Eq) + +-- https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/close#parameters + +-- Status codes in the range 3000-3999 are reserved for use by libraries, frameworks, and applications. + +closeCode :: CloseType -> Word16 +closeCode TypeSizeOutOfBounds = 3001 +closeCode TypeBinaryPayload = 3002 +closeCode TypeJsonDecodeError = 3003 +closeCode TypePayloadNotObject = 3004 +closeCode TypeMissingOrUnexpectedType = 3005 + +-- A string providing a custom WebSocket connection close reason (a concise human-readable prose explanation for the closure). +-- The value must be no longer than 123 bytes (encoded in UTF-8). + +closeMessage :: CloseType -> Text +closeMessage TypeSizeOutOfBounds = + "Received a payload size that is out of bounds" +closeMessage TypeBinaryPayload = + "Received a payload using Binary instead of Text" +closeMessage TypeJsonDecodeError = + "Received a payload that resulted in a JSON decode error" +closeMessage TypePayloadNotObject = + "Received a JSON payload which is not an object" +closeMessage TypeMissingOrUnexpectedType = + "Received a payload with a missing or unexpected value for 'type'" + +-- * Parsing + +maxPayloadBytes :: Int +maxPayloadBytes = 1_000_000 + +checkBounds + :: Lazy.ByteString + -> Either RequestMalformedError Lazy.ByteString +checkBounds lazyBS = isOutOfBoundsAcc lazyBS 0 + where + isOutOfBoundsAcc Lazy.ByteString.Empty _ = Right lazyBS + isOutOfBoundsAcc (Lazy.ByteString.Chunk chunk rest) accSize = + let accSize' = accSize + ByteString.length chunk + in if accSize' > maxPayloadBytes + then Left PayloadSizeOutOfBounds + else isOutOfBoundsAcc rest accSize' + +parseDataMessage + :: WebSockets.DataMessage -> RequestPayload +parseDataMessage (WebSockets.Binary _) = RequestPayloadMalformed UnexpectedBinaryPayload +parseDataMessage (WebSockets.Text utf8EncodedLazyByteString _) = + case parsedPayload of + (Left malformed) -> RequestPayloadMalformed malformed + (Right (Left subscribe)) -> RequestPayloadSubscribe subscribe + (Right (Right unsubscribe)) -> RequestPayloadUnsubscribe unsubscribe + where + parsedPayload + :: Either RequestMalformedError (Either RequestSubscribe RequestUnsubscribe) + parsedPayload = do + let + parseEither :: Aeson.Parser a -> Either String a + parseEither parser = Aeson.parseEither (const parser) () + + mapError = flip Bifunctor.first + + boundedBytestring <- checkBounds utf8EncodedLazyByteString + clientPayloadAeson <- + Aeson.eitherDecode @Value boundedBytestring + `mapError` (JsonDecodeError . Text.pack) + + case clientPayloadAeson of + (Aeson.Object clientPayloadObject) -> do + payloadType <- + parseEither @RequestType (clientPayloadObject .: "type") + `mapError` (MissingOrUnexpectedType . Text.pack) + + case payloadType of + RequestSubscribeType -> do + pathsParsed <- + parseEither @[Text] (clientPayloadObject .: "paths") + `mapError` (SubscribePathsMissingOrMalformed . Text.pack) + + mbToken <- + parseEither @(Maybe Text) (clientPayloadObject .:? "token") + `mapError` (SubscribeTokenNotAString . Text.pack) + + pure $ Left $ RequestSubscribe pathsParsed mbToken + RequestUnsubscribeType -> do + parsedPaths <- + parseEither @[Text] (clientPayloadObject .: "paths") + `mapError` (UnsubscribePathsMissingOrMalformed . Text.pack) + + pure $ Right $ RequestUnsubscribe parsedPaths + _ -> Left PayloadNotAnObject + +data RequestType + = RequestSubscribeType + | RequestUnsubscribeType + +instance Aeson.FromJSON RequestType where + parseJSON (Aeson.String "subscribe") = pure RequestSubscribeType + parseJSON (Aeson.String "unsubscribe") = pure RequestUnsubscribeType + parseJSON _ = fail "Expecting 'type' to be either 'subscribe' or 'unsubscribe'" diff --git a/server/src/Icepeak/Server/WebsocketServer/SingleSubscription.hs b/server/src/Icepeak/Server/WebsocketServer/SingleSubscription.hs new file mode 100644 index 0000000..e34703c --- /dev/null +++ b/server/src/Icepeak/Server/WebsocketServer/SingleSubscription.hs @@ -0,0 +1,88 @@ +module Icepeak.Server.WebsocketServer.SingleSubscription (handleClient) where + +import Control.Concurrent (modifyMVar_, newEmptyMVar) +import Control.Concurrent.Async (withAsync) +import Control.Concurrent.MVar (MVar, takeMVar) +import Control.Exception (SomeAsyncException, SomeException, catch, finally, fromException, throwIO) +import Control.Monad (forever) +import Data.Aeson (Value) + +import qualified Data.Aeson as Aeson +import qualified Network.WebSockets as WS + +import Icepeak.Server.Store (Path) +import Icepeak.Server.Core (Core, coreClients, withCoreMetrics, getCurrentValue) + +import qualified Icepeak.Server.Metrics as Metrics +import qualified Icepeak.Server.Subscription as Subscription +import qualified Icepeak.Server.WebsocketServer.Utils as Utils + +-- * Client handling + +handleClient :: WS.Connection -> Path -> Core -> IO () +handleClient conn path core = do + uuid <- Utils.newUUID + pathCurentValueMVar <- newEmptyMVar + let + state = coreClients core + onConnect = do + modifyMVar_ state + (pure . Subscription.subscribe path uuid + (Utils.writeToSub core pathCurentValueMVar) + ) + withCoreMetrics core Metrics.incrementSubscribers + onDisconnect = do + modifyMVar_ state (pure . Subscription.unsubscribe path uuid) + withCoreMetrics core Metrics.decrementSubscribers + sendInitialValue = do + currentValue <- getCurrentValue core path + WS.sendTextData conn (Aeson.encode currentValue) + -- For each connection, we want to spawn a client thread with an associated + -- queue, in order to manage subscribers. `withAsync` acts as `forkIO` in this + -- context, with the assurance the child thread is killed when the parent is. + manageConnection = withAsync (updateThread conn pathCurentValueMVar) + (const $ keepTalking conn) + + -- Simply ignore connection errors, otherwise, Warp handles the exception + -- and sends a 500 response in the middle of a WebSocket connection, and + -- that violates the WebSocket protocol. + -- Note that subscribers are still properly removed by the finally below. + handleConnectionError :: WS.ConnectionException -> IO () + handleConnectionError _ = pure () + -- Put the client in the subscription tree and keep the connection open. + -- Remove it when the connection is closed. + finally (onConnect >> sendInitialValue >> manageConnection) onDisconnect + `catch` handleConnectionError + +-- This function handles sending the updates to subscribers. +updateThread :: WS.Connection -> MVar Value -> IO () +updateThread conn state = + let + send :: Value -> IO () + send value = + WS.sendTextData conn (Aeson.encode value) + `catch` + sendFailed + + sendFailed :: SomeException -> IO () + sendFailed exc + -- Rethrow async exceptions, they are meant for inter-thread communication + -- (e.g. ThreadKilled) and we don't expect them at this point. + | Just asyncExc <- fromException exc = throwIO (asyncExc :: SomeAsyncException) + -- We want to catch all other errors in order to prevent them from + -- bubbling up and disrupting the broadcasts to other clients. + | otherwise = pure () + in forever $ do + value <- takeMVar state + send value + +-- We don't send any messages here; sending is done by the update +-- loop; it finds the client in the set of subscriptions. But we do +-- need to keep the thread running, otherwise the connection will be +-- closed. So we go into an infinite loop here. +keepTalking :: WS.Connection -> IO () +keepTalking conn = forever $ do + -- Note: WS.receiveDataMessage will handle control messages automatically and e.g. + -- do the closing handshake of the websocket protocol correctly + WS.receiveDataMessage conn + diff --git a/server/src/Icepeak/Server/WebsocketServer/Utils.hs b/server/src/Icepeak/Server/WebsocketServer/Utils.hs new file mode 100644 index 0000000..e508eb4 --- /dev/null +++ b/server/src/Icepeak/Server/WebsocketServer/Utils.hs @@ -0,0 +1,33 @@ +module Icepeak.Server.WebsocketServer.Utils where + +import Control.Concurrent.MVar (MVar) +import Data.UUID (UUID) +import Data.Aeson (Value) + +import qualified Control.Concurrent.MVar as MVar +import qualified System.Random as Random +import qualified Control.Monad as Monad +import qualified Data.Maybe as Maybe + +import Icepeak.Server.Core (Core) + +import qualified Icepeak.Server.Core as Core +import qualified Icepeak.Server.Metrics as Metrics + +newUUID :: IO UUID +newUUID = Random.randomIO + +-- this function is imported by both 'MultiSubscription.hs' and 'SingleSubscription.hs' +-- it is subscription logic that they both have in common +writeToSub :: Core -> MVar Value -> Value -> IO () +writeToSub core queue val = do + -- We are the only producer, so either the subscriber already + -- read the value or we can discard it to replace it with the + -- new one. We don't need atomicity for this operation. + -- `tryTakeMVar` basically empties the MVar, from this perspective. + mbQueue <- MVar.tryTakeMVar queue + -- If the MVar has not yet been read by the subscriber thread, it means + -- that the update has been skipped. + Monad.when (Maybe.isJust mbQueue) $ + Monad.forM_ (Core.coreMetrics core) Metrics.incrementWsSkippedUpdates + MVar.putMVar queue val diff --git a/server/tests/Icepeak/Server/MultiSubscriptionSpec.hs b/server/tests/Icepeak/Server/MultiSubscriptionSpec.hs new file mode 100644 index 0000000..2196fc6 --- /dev/null +++ b/server/tests/Icepeak/Server/MultiSubscriptionSpec.hs @@ -0,0 +1,408 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NumericUnderscores #-} + +module Icepeak.Server.MultiSubscriptionSpec (spec) where + +import Test.Hspec +import Test.Hspec.Expectations.Json +import Data.Aeson ((.=)) +import Data.Word (Word16) +import Data.Text (Text) + +import qualified Data.Aeson as Aeson +import qualified Control.Concurrent as MVar +import qualified Control.Concurrent.Async as Async +import qualified Control.Concurrent as Concurrent +import qualified Control.Exception as Exception +import qualified System.Directory as Directory +import qualified Data.ByteString.Lazy as BS.Lazy +import qualified Network.WebSockets.Client as WS +import qualified Network.WebSockets.Connection as WS +import qualified Network.WebSockets as WS + +import qualified Icepeak.Server.Server as Icepeak +import qualified Icepeak.Server.Logger as Icepeak +import qualified Icepeak.Server.Config as Icepeak +import qualified Icepeak.Server.WebsocketServer as Icepeak +import qualified Icepeak.Server.Core as Icepeak +import qualified Icepeak.Server.Store as Icepeak +import qualified Icepeak.Server.HttpServer as IcepeakHttp +import qualified Icepeak.Server.WebsocketServer.Payload as Icepeak + + +icepeakPort :: Int +icepeakPort = 9898 + +data Icepeak = Icepeak + { icepeakCore :: Icepeak.Core + , icepeakShutdown :: IO () + } + +defaultConfig + :: Icepeak.Config +defaultConfig + = Icepeak.Config + { Icepeak.configDataFile = Nothing + , Icepeak.configPort = icepeakPort + , Icepeak.configEnableJwtAuth = False + , Icepeak.configJwtSecret = Nothing + , Icepeak.configMetricsEndpoint = Nothing + , Icepeak.configQueueCapacity = 256 + , Icepeak.configSyncIntervalMicroSeconds = Nothing + , Icepeak.configEnableJournaling = False + , Icepeak.configDisableSentryLogging = True + , Icepeak.configSentryDSN = Nothing + , Icepeak.configStorageBackend = Icepeak.File + , Icepeak.configSyncLogging = False + , Icepeak.configWebSocketPingInterval = 1 + , Icepeak.configWebSocketPongTimeout = 1 + , Icepeak.configInitialSubscriptionTimeoutMicroSeconds = 100_000 + } + +withIcepeak :: IO Icepeak +withIcepeak = do + let + storageFile = "/tmp/icepeak.json" + config = defaultConfig { Icepeak.configDataFile = Just storageFile } + writeFile storageFile "{}" + logger <- Icepeak.newLogger config + core <- Icepeak.newCore config logger Nothing >>= + (\mbCore -> case mbCore of + Left err -> do expectationFailure ("Failed to create Core: " <> err) + undefined + Right core -> pure core) + + let wsServer = Icepeak.acceptConnection core + application <- IcepeakHttp.new core + + -- Thread that accepts websocket connections + webserverThread <- Async.async $ Icepeak.runServer logger wsServer application icepeakPort + + -- Thread that processes the Icepeak.Command queue + commandLoopThread <- Async.async $ Icepeak.runCommandLoop core + + -- Thread that posts updates to websocket clients + webSocketThread <- Async.async $ Icepeak.processUpdates core + + pure $ Icepeak + { icepeakCore = core + , icepeakShutdown = do + mapM_ Async.cancel [webserverThread, commandLoopThread, webSocketThread ] + Directory.removeFile storageFile + } + +createDataSet :: Icepeak -> IO () +createDataSet icepeak = do + mapM_ (makeModification (icepeakCore icepeak)) + [ Icepeak.Put ["A", "A"] "A" + , Icepeak.Put ["A", "B"] "B" + , Icepeak.Put ["B"] "B" + ] + +makeModification :: Icepeak.Core -> Icepeak.Modification -> IO () +makeModification core modification = do + waitMVar <- MVar.newEmptyMVar + Icepeak.enqueueCommand (Icepeak.Modify modification (Just waitMVar)) core + MVar.takeMVar waitMVar + +data CloseExpectation + = Unexpected String + | CloseCode Word16 + deriving (Show, Eq) + +openReusableIcepeakConn :: (WS.Connection -> IO a) -> IO a +openReusableIcepeakConn = WS.runClient "localhost" icepeakPort "/?method=reusable" + +sendJson :: WS.Connection -> Aeson.Value -> IO () +sendJson conn value = WS.sendDataMessage conn (WS.Text (Aeson.encode value) Nothing) + +expectDataMessage :: WS.Connection -> IO WS.DataMessage +expectDataMessage conn = do + Right msg <- Async.race + (do Concurrent.threadDelay 100_000 -- 0.1 s + expectationFailure "Client expected a message, but did not receive anything for the duration of the 0.1s timeout") + (WS.receiveDataMessage conn) + pure msg + +expectNoMessage :: WS.Connection -> IO () +expectNoMessage conn = do + Left () <- Async.race + (Concurrent.threadDelay 10_000) -- 0.01s + (WS.receiveDataMessage conn >>= + (\p -> expectationFailure + $ "Client received an unexpected payload: " <> show p)) + pure () + +withResponseJson :: WS.Connection -> (Aeson.Value -> Expectation) -> Expectation +withResponseJson conn jsonCheck = do + (WS.Text payload _) <- expectDataMessage conn + (Just json) <- pure $ Aeson.decode payload + jsonCheck json + +invalidPayloadsSpec :: SpecWith a +invalidPayloadsSpec = describe "Payload Parse" $ do + context "when client sends invalid payload " $ do + let openThenSend dataMessage = openReusableIcepeakConn $ \conn -> + do WS.sendDataMessage conn dataMessage + Exception.catch + (do unexpectedDataMessage <- WS.receiveDataMessage conn + pure (Unexpected $ "Received data message when socket close was expected: " <> show unexpectedDataMessage)) + + (\case (WS.CloseRequest code _) -> pure $ CloseCode code + otherException -> pure $ Unexpected ("Unexpected exception: " <> show otherException)) + + it "should close and know when provided with non-json input" $ const $ do + notJSON <- openThenSend (WS.Text "Hello" Nothing) + notJSON `shouldBe` CloseCode (Icepeak.closeCode Icepeak.TypeJsonDecodeError) + + it "should close and know when provided with binary payload" $ const $ do + binaryMessage <- openThenSend (WS.Binary "Hello") + binaryMessage `shouldBe` CloseCode (Icepeak.closeCode Icepeak.TypeBinaryPayload) + + it "should close and know when provided with something other than object" $ const $ do + notAnObject <- openThenSend (WS.Text (Aeson.encode $ Aeson.Number 3) Nothing) + notAnObject `shouldBe` CloseCode (Icepeak.closeCode Icepeak.TypePayloadNotObject) + + it "should close and know when provided with unexpected payload type" $ const $ do + unexpectedType <- openThenSend (WS.Text (Aeson.encode $ Aeson.object [ "type" .= ("subskribe" :: String) ]) Nothing) + unexpectedType `shouldBe` CloseCode (Icepeak.closeCode Icepeak.TypeMissingOrUnexpectedType) + + typeMissing <- openThenSend (WS.Text (Aeson.encode $ Aeson.object [ ]) Nothing) + typeMissing `shouldBe` CloseCode (Icepeak.closeCode Icepeak.TypeMissingOrUnexpectedType) + + it "should close and know when provided with a payload size that is out of bounds" $ const $ do + outOfBounds <- openThenSend (WS.Text (BS.Lazy.replicate (fromInteger (toInteger $ Icepeak.maxPayloadBytes + 1)) 0) Nothing) + outOfBounds `shouldBe` CloseCode (Icepeak.closeCode Icepeak.TypeSizeOutOfBounds) + + insideOfBounds <- openThenSend (WS.Text (BS.Lazy.replicate (fromInteger (toInteger Icepeak.maxPayloadBytes)) 0) Nothing) + insideOfBounds `shouldNotBe` CloseCode (Icepeak.closeCode Icepeak.TypeSizeOutOfBounds) + +singleConnectionCommunicationSpec :: SpecWith Icepeak +singleConnectionCommunicationSpec = aroundAllWith + (\specUsingArgs icepeak -> openReusableIcepeakConn (curry specUsingArgs icepeak)) + $ describe "Communication Over Single Connection" $ do + successfulSubscribe + successfulReceiveUpdates + successfulUnsubscribe + successfulUnsubscribeNoUpdates + +successfulSubscribe :: SpecWith (Icepeak, WS.Connection) +successfulSubscribe = context "when client subscribes" $ do + it "should succesfully send subscribe and receive values at paths" $ + \(_, clientConn) -> do + sendJson clientConn $ Aeson.object + [ "type" .= ("subscribe" :: Text) + , "paths" .= ([ "A/B", "A/A" ] :: [Text]) + ] + withResponseJson clientConn + (\responseJson -> do + responseJson `shouldMatchJson` Aeson.object + [ "type" .= ("subscribe" :: Text) + , "code" .= (200 :: Int) + , "paths" .= + [ Aeson.object + [ "path" .= ("A/B" :: Text) + , "value" .= ("B":: Text) + ] + , Aeson.object + [ "path" .= ("A/A" :: Text) + , "value" .= ("A":: Text) + ] + ]]) + + it "should subscribe to non-existent path and get null" $ + \(_, clientConn) -> do + sendJson clientConn $ Aeson.object + [ "type" .= ("subscribe" :: Text) + , "paths" .= ([ "NULL" ] :: [Text]) ] + withResponseJson clientConn + (\responseJson -> do + responseJson `shouldMatchJson` Aeson.object + [ "type" .= ("subscribe" :: Text) + , "code" .= (200 :: Int) + , "paths" .= + [ Aeson.object + [ "path" .= ("NULL" :: Text) + , "value" .= Aeson.Null + ] + ]]) + +successfulReceiveUpdates :: SpecWith (Icepeak, WS.Connection) +successfulReceiveUpdates = context "when values are updated" $ do + it "should send client upated value at subscribed path" $ + \(icepeak, clientConn) -> do + makeModification (icepeakCore icepeak) (Icepeak.Put ["A", "A"] "C") + withResponseJson clientConn + (\responseJson -> do + responseJson `shouldMatchJson` Aeson.object + [ "type" .= ("update" :: Text) + , "value" .= ("C" :: Text) + ]) + + it "should send client nulled sub-paths when path sub-paths are overriden with value" $ + \(icepeak, clientConn) -> do + makeModification (icepeakCore icepeak) (Icepeak.Put ["A"] "C") + withResponseJson clientConn + (\responseJson -> do + responseJson `shouldMatchJson` Aeson.object + [ "type" .= ("update" :: Text) + , "value" .= Aeson.Null + ]) + withResponseJson clientConn + (\responseJson -> do + responseJson `shouldMatchJson` Aeson.object + [ "type" .= ("update" :: Text) + , "value" .= Aeson.Null + ]) + + it "should send client update on previosly overriden path" $ + \(icepeak, clientConn) -> do + makeModification (icepeakCore icepeak) (Icepeak.Put ["A", "A"] "C") + withResponseJson clientConn + (\responseJson -> do + responseJson `shouldMatchJson` Aeson.object + [ "type" .= ("update" :: Text) + , "value" .= ("C" :: Text) + , "path" .= ("A/A" :: Text) + ]) + + it "should send client null update on deleted path" $ + \(icepeak, clientConn) -> do + makeModification (icepeakCore icepeak) (Icepeak.Delete ["A", "A"]) + withResponseJson clientConn + (\responseJson -> do + responseJson `shouldMatchJson` Aeson.object + [ "type" .= ("update" :: Text) + , "value" .= Aeson.Null + , "path" .= ("A/A" :: Text) + ]) + + it "should send client update on previosly deleted path" $ + \(icepeak, clientConn) -> do + makeModification (icepeakCore icepeak) (Icepeak.Put ["A", "A"] "D") + withResponseJson clientConn + (\responseJson -> do + responseJson `shouldMatchJson` Aeson.object + [ "type" .= ("update" :: Text) + , "value" .= ("D" :: Text) + , "path" .= ("A/A" :: Text) + ]) + +successfulUnsubscribe :: SpecWith (Icepeak, WS.Connection) +successfulUnsubscribe = context "when client unsubscribes" $ do + it "should unsubscribe from multiple existing paths" $ + \(_icepeak, clientConn) -> do + sendJson clientConn $ Aeson.object + [ "type" .= ("unsubscribe" :: Text) + , "paths" .= ([ "A/B", "A/A" ] :: [Text]) ] + withResponseJson clientConn + (\responseJson -> do + responseJson `shouldMatchJson` Aeson.object + [ "type" .= ("unsubscribe" :: Text) + , "code" .= (200 :: Int) + , "paths" .= ([ "A/B", "A/A" ] :: [Text]) + ]) + + it "should get no paths from non-subscribed paths" $ + \(_icepeak, clientConn) -> do + sendJson clientConn $ Aeson.object + [ "type" .= ("unsubscribe" :: Text) + , "paths" .= ([ "C/D", "E" ] :: [Text]) ] + + withResponseJson clientConn + (\responseJson -> do + responseJson `shouldMatchJson` Aeson.object + [ "type" .= ("unsubscribe" :: Text) + , "code" .= (200 :: Int) + , "paths" .= ([ ] :: [Text]) + ]) + +successfulUnsubscribeNoUpdates :: SpecWith (Icepeak, WS.Connection) +successfulUnsubscribeNoUpdates = context "when client unsubscribes" $ do + it "should no longer receive updates for unsusbscribed paths" $ + \(icepeak, clientConn) -> do + makeModification (icepeakCore icepeak) (Icepeak.Put ["A", "B"] "C") + expectNoMessage clientConn >>= shouldBe () + + makeModification (icepeakCore icepeak) (Icepeak.Put ["A", "A"] "C") + expectNoMessage clientConn >>= shouldBe () + + it "should be able to resubscribe after unsubscribing" $ + \(icepeak, clientConn) -> do + sendJson clientConn $ Aeson.object + [ "type" .= ("subscribe" :: Text) + , "paths" .= ([ "A/B" ] :: [Text]) ] + withResponseJson clientConn + (\responseJson -> do + responseJson `shouldMatchJson` Aeson.object + [ "type" .= ("subscribe" :: Text) + , "code" .= (200 :: Int) + , "paths" .= [ Aeson.object [ "path" .= ("A/B" :: Text), "value" .= ("C" :: Text)] ] + ]) + makeModification (icepeakCore icepeak) (Icepeak.Put ["A", "B"] "D") + withResponseJson clientConn + (\responseJson -> do + responseJson `shouldMatchJson` Aeson.object + [ "type" .= ("update" :: Text) + , "value" .= ("D" :: Text) + , "path" .= ("A/B" :: Text) + ]) + expectNoMessage clientConn >>= shouldBe () + +spec :: Spec +spec = + aroundAll + (\testSpec -> do + icepeak <- withIcepeak + createDataSet icepeak + testSpec icepeak + icepeakShutdown icepeak) + $ do describe "MultiSubscription Connection Protocol" $ do + invalidPayloadsSpec + singleConnectionCommunicationSpec + describe "MultiSubscription Subscription Timeout Deadline Authorisation Mechanism" $ do + deadlineTimeoutSpec + +deadlineTimeoutSpec :: SpecWith Icepeak +deadlineTimeoutSpec = do + it "should cause the server to close the connection if send subscribe late" + (\icepeak -> openReusableIcepeakConn + (\conn -> do + let timeoutDeadline = Icepeak.configInitialSubscriptionTimeoutMicroSeconds + $ Icepeak.coreConfig + $ icepeakCore icepeak + Concurrent.threadDelay $ timeoutDeadline + 50_000 + sendJson conn $ Aeson.object + [ "type" .= ("subscribe" :: Text) + , "paths" .= ([ "A/B", "A/A" ] :: [Text]) ] + WS.receive conn `shouldThrow` connectionClosed + )) + it "should cause the server to close the connection if other request on time but still not subscribed" + (\icepeak -> openReusableIcepeakConn + (\conn -> do + let timeoutDeadline = Icepeak.configInitialSubscriptionTimeoutMicroSeconds + $ Icepeak.coreConfig + $ icepeakCore icepeak + dummyMsg = Aeson.object + [ "type" .= ("unsubscribe" :: Text) + , "paths" .= ([ "A/B", "A/A" ] :: [Text]) ] + + Concurrent.threadDelay $ timeoutDeadline - 50_000 + + sendJson conn dummyMsg + _ <- expectDataMessage conn + sendJson conn dummyMsg + _ <- expectDataMessage conn + + Concurrent.threadDelay 100_000 + + sendJson conn (Aeson.object + [ "type" .= ("subscribe" :: Text) + , "paths" .= ([ "A/B", "A/A" ] :: [Text]) ]) + WS.receive conn `shouldThrow` connectionClosed + )) + +connectionClosed :: Selector WS.ConnectionException +connectionClosed WS.ConnectionClosed = True +connectionClosed _ = False diff --git a/server/tests/Spec.hs b/server/tests/Spec.hs index 17b1c19..cdb6280 100644 --- a/server/tests/Spec.hs +++ b/server/tests/Spec.hs @@ -9,6 +9,7 @@ import qualified Icepeak.Server.RequestSpec import qualified Icepeak.Server.SocketSpec import qualified Icepeak.Server.StoreSpec import qualified Icepeak.Server.SubscriptionTreeSpec +import qualified Icepeak.Server.MultiSubscriptionSpec main :: IO () main = hspec $ do @@ -21,3 +22,4 @@ main = hspec $ do Icepeak.Server.SocketSpec.spec Icepeak.Server.StoreSpec.spec Icepeak.Server.SubscriptionTreeSpec.spec + Icepeak.Server.MultiSubscriptionSpec.spec