From 69583e364bbb7aebf08b9e45601d34964dcfdc0c Mon Sep 17 00:00:00 2001 From: Martin Allen <31280145+blinky3713@users.noreply.github.com> Date: Wed, 11 Mar 2020 14:30:33 -0700 Subject: [PATCH] Ws event conduit (#228) * wip * appears to be working * tests pass * remove bad toJSON * update nameservice tests * use bracketP * hlint --- Makefile | 2 +- hs-abci-docs/nameservice/package.yaml | 6 +- .../test/Nameservice/Test/E2ESpec.hs | 129 +++++++-------- .../kv-test/KVStore/Test/KVSpec.hs | 151 +++++++++--------- hs-tendermint-client/package.yaml | 13 +- .../src/Network/Tendermint/Client.hs | 61 ++++++- .../Tendermint/Client/Internal/RPCClient.hs | 26 ++- 7 files changed, 216 insertions(+), 172 deletions(-) diff --git a/Makefile b/Makefile index 876ac36e..defa9c1d 100644 --- a/Makefile +++ b/Makefile @@ -93,7 +93,7 @@ deploy-nameservice-test: install ## run the nameservice docker network for testi # Tests ##################### -test-kv-store: install ## Run the test suite for the client interface +test-kv-store: ## Run the test suite for the client interface stack test hs-tendermint-client test-simple-storage: install ## Run the test suite for the simple-storage example application diff --git a/hs-abci-docs/nameservice/package.yaml b/hs-abci-docs/nameservice/package.yaml index 6652d8de..2f4b224d 100644 --- a/hs-abci-docs/nameservice/package.yaml +++ b/hs-abci-docs/nameservice/package.yaml @@ -183,17 +183,17 @@ tests: - -with-rtsopts=-N dependencies: - base >= 4.7 && < 5 + - conduit - data-default-class - hs-abci-sdk - hs-abci-test-utils + - hs-abci-types - hs-tendermint-client - hspec - aeson - mtl - nameservice + - resourcet - servant - text - unordered-containers - - lens-aeson - - lens - - string-conversions diff --git a/hs-abci-docs/nameservice/test/Nameservice/Test/E2ESpec.hs b/hs-abci-docs/nameservice/test/Nameservice/Test/E2ESpec.hs index d1dc1f36..95251666 100644 --- a/hs-abci-docs/nameservice/test/Nameservice/Test/E2ESpec.hs +++ b/hs-abci-docs/nameservice/test/Nameservice/Test/E2ESpec.hs @@ -1,50 +1,51 @@ module Nameservice.Test.E2ESpec (spec) where -import Control.Concurrent (forkIO) -import Control.Concurrent.MVar (MVar, modifyMVar_, newMVar, - readMVar) -import Control.Lens ((^?)) -import Control.Monad (forM_, void) -import Control.Monad.Reader (ReaderT, runReaderT) -import qualified Data.Aeson as A -import Data.Aeson.Lens (key) -import Data.Default.Class (def) -import Data.HashSet (fromList) +import Control.Concurrent (forkIO) +import Control.Concurrent.MVar (MVar, modifyMVar_, + newMVar, readMVar) +import Control.Monad (forM_, void) +import Control.Monad.IO.Class (liftIO) +import Control.Monad.Reader (ReaderT, runReaderT) +import Control.Monad.Trans.Resource (runResourceT) +import qualified Data.Aeson as A +import Data.Conduit (awaitForever, + runConduit, (.|)) +import Data.Default.Class (def) +import Data.HashSet (fromList) import Data.Proxy -import Data.String.Conversions (cs) -import Data.Text (Text) +import Data.Text (Text) import Nameservice.Application -import qualified Nameservice.Modules.Nameservice as N -import Nameservice.Test.EventOrphans () -import qualified Network.Tendermint.Client as RPC -import Servant.API ((:<|>) (..)) -import qualified Tendermint.SDK.Application.Module as M -import Tendermint.SDK.BaseApp.Errors (AppError (..)) -import Tendermint.SDK.BaseApp.Events (Event (..), ToEvent (..)) -import Tendermint.SDK.BaseApp.Query (QueryArgs (..), - QueryResult (..), - defaultQueryArgs) -import qualified Tendermint.SDK.Modules.Auth as Auth -import qualified Tendermint.SDK.Modules.Bank as B -import Tendermint.SDK.Types.Address (Address) -import Tendermint.Utils.Client (ClientConfig (..), - EmptyTxClient (..), - HasQueryClient (..), - HasTxClient (..), - QueryClientResponse (..), - Signer (..), - TxClientResponse (..), - TxOpts (..), - defaultClientTxOpts) -import Tendermint.Utils.ClientUtils (assertQuery, assertTx, - deliverTxEvents, - ensureQueryResponseCode, - ensureResponseCodes, - rpcConfig) -import Tendermint.Utils.Events (FromEvent (..)) -import Tendermint.Utils.User (makeSignerFromUser, - makeUser) - +import qualified Nameservice.Modules.Nameservice as N +import Nameservice.Test.EventOrphans () +import qualified Network.ABCI.Types.Messages.FieldTypes as FT +import qualified Network.Tendermint.Client as RPC +import Servant.API ((:<|>) (..)) +import qualified Tendermint.SDK.Application.Module as M +import Tendermint.SDK.BaseApp.Errors (AppError (..)) +import Tendermint.SDK.BaseApp.Events (ToEvent (..)) +import Tendermint.SDK.BaseApp.Query (QueryArgs (..), + QueryResult (..), + defaultQueryArgs) +import qualified Tendermint.SDK.Modules.Auth as Auth +import qualified Tendermint.SDK.Modules.Bank as B +import Tendermint.SDK.Types.Address (Address) +import Tendermint.Utils.Client (ClientConfig (..), + EmptyTxClient (..), + HasQueryClient (..), + HasTxClient (..), + QueryClientResponse (..), + Signer (..), + TxClientResponse (..), + TxOpts (..), + defaultClientTxOpts) +import Tendermint.Utils.ClientUtils (assertQuery, assertTx, + deliverTxEvents, + ensureQueryResponseCode, + ensureResponseCodes, + rpcConfig) +import Tendermint.Utils.Events (FromEvent (..)) +import Tendermint.Utils.User (makeSignerFromUser, + makeUser) import Test.Hspec @@ -358,7 +359,7 @@ spec = do it "Can monitor all events" $ \(TestEnv mvex mvres _) -> do expected <- readMVar mvex res <- readMVar mvres - fromList expected `shouldBe` fromList res + fromList (map A.toJSON expected) `shouldBe` fromList (map A.toJSON res) faucetUser @@ -486,38 +487,30 @@ faucet txApiDP = Proxy -- Test Init -data TestEnv = TestEnv (MVar [A.Value]) (MVar [A.Value]) [Text] +data TestEnv = TestEnv (MVar [FT.Event]) (MVar [FT.Event]) (MVar [Text]) testInit :: Auth.Amount -> IO TestEnv testInit faucetAmount = do forM_ [user1, user2] $ faucetUser faucetAmount - expectedEventsMVar <- newMVar [] - resultEventsMVar <- newMVar [] - pure $ TestEnv expectedEventsMVar resultEventsMVar [] + TestEnv <$> newMVar [] <*> newMVar [] <*> newMVar [] + addEventToCheck :: ToEvent a => TestEnv -> a -> IO () -addEventToCheck (TestEnv mvexpected mvres ses) ev = do - modifyMVar_ mvexpected $ \es -> pure $ es <> [A.toJSON . toEvent $ ev] - let evType = eventType (toEvent ev) +addEventToCheck (TestEnv mvexpected mvseen mveventTypes) ev = do + let appEv = toEvent ev + modifyMVar_ mvexpected $ pure . (appEv :) + ses <- readMVar mveventTypes + let evType = FT.eventType appEv if evType`elem` ses then pure () - else startNewListener evType + else do + _ <- startNewListener evType + modifyMVar_ mveventTypes $ pure . (evType :) where startNewListener evType = let subReq = RPC.RequestSubscribe ("tm.event = 'Tx' AND " <> evType <> " EXISTS") - forkTendermintM = void . forkIO . void . RPC.runTendermintM rpcConfig - in forkTendermintM $ RPC.subscribe subReq (handler evType) - handler evType res = case res ^? txEvents of - Nothing -> pure () - Just v -> case A.fromJSON v of - A.Error _ -> error ("Failed to parse\n" <> cs (A.encode v) ) - A.Success evs -> - let filterFn v' = evType == eventType v' - filteredEvs = filter filterFn evs - in modifyMVar_ mvres $ \es -> pure $ es <> map A.toJSON filteredEvs - txEvents = key "result" - . key "data" - . key "value" - . key "TxResult" - . key "result" - . key "events" + eventStorer = awaitForever $ \as -> + liftIO $ modifyMVar_ mvseen $ \es -> pure $ + RPC.txEventEvents as <> es + forkTendermintM = forkIO . RPC.runTendermintM rpcConfig . runResourceT . runConduit + in forkTendermintM $ RPC.subscribe subReq .| eventStorer diff --git a/hs-tendermint-client/kv-test/KVStore/Test/KVSpec.hs b/hs-tendermint-client/kv-test/KVStore/Test/KVSpec.hs index 69f09091..0009cdf6 100644 --- a/hs-tendermint-client/kv-test/KVStore/Test/KVSpec.hs +++ b/hs-tendermint-client/kv-test/KVStore/Test/KVSpec.hs @@ -1,28 +1,30 @@ module KVStore.Test.KVSpec (spec) where -import Control.Concurrent (forkIO) -import Control.Concurrent.MVar (MVar, modifyMVar_, - newMVar) -import Control.Lens (to, (^.)) -import Control.Lens.Fold ((^?)) -import Control.Monad (void) -import Control.Monad.Catch (try) -import qualified Data.Aeson as A -import Data.Aeson.Encode.Pretty (encodePretty) -import qualified Data.Aeson.Lens as A -import Data.ByteArray.Base64String (Base64String) -import qualified Data.ByteArray.Base64String as Base64 -import qualified Data.ByteArray.HexString as Hex -import Data.ByteString (ByteString) -import Data.Default.Class (def) -import Data.Either (isRight) ---import Data.HashSet (difference, fromList) -import Data.String.Conversions (cs) -import Data.Text (Text) -import GHC.Generics (Generic) -import qualified Network.ABCI.Types.Messages.Response as Response -import qualified Network.Tendermint.Client as RPC -import Tendermint.SDK.BaseApp.Events (Event (..), ToEvent (..)) +import Control.Concurrent (forkIO) +import Control.Concurrent.MVar (MVar, modifyMVar_, + newMVar, readMVar) +import Control.Lens ((^.)) +import Control.Monad (replicateM) +import Control.Monad.Catch (try) +import Control.Monad.IO.Class (liftIO) +import Control.Monad.Trans.Resource (runResourceT) +import qualified Data.Aeson as A +import Data.Aeson.Encode.Pretty (encodePretty) +import Data.ByteArray.Base64String (Base64String) +import qualified Data.ByteArray.Base64String as Base64 +import qualified Data.ByteArray.HexString as Hex +import Data.ByteString (ByteString) +import Data.Conduit (awaitForever, + runConduit, (.|)) +import Data.Default.Class (def) +import Data.Either (isRight) +import Data.HashSet (fromList) +import Data.String.Conversions (cs) +import Data.Text (Text) +import qualified Network.ABCI.Types.Messages.FieldTypes as FieldTypes +import qualified Network.ABCI.Types.Messages.Response as Response +import qualified Network.Tendermint.Client as RPC +import System.Random (randomIO) import Test.Hspec @@ -45,15 +47,17 @@ spec = do result `shouldSatisfy` isRight it "Can submit a async tx and the response code is 0 (success)" $ \tenv -> do - let asyncTxReq = RPC.RequestBroadcastTxAsync { RPC.requestBroadcastTxAsyncTx = encodeTx "abcd" } - addEventToCheck tenv $ mkAppEvent "abcd" + a <- replicateM 10 $ randomIO @Char + addEventToCheck tenv "name" + let asyncTxReq = RPC.RequestBroadcastTxAsync { RPC.requestBroadcastTxAsyncTx = encodeTx $ "name=" <> a } -- async returns nothing resp <- runRPC $ RPC.broadcastTxAsync asyncTxReq RPC.resultBroadcastTxCode resp `shouldBe` 0 it "Can submit a sync tx and the response code is 0 (success)" $ \tenv -> do - let txReq = RPC.RequestBroadcastTxSync { RPC.requestBroadcastTxSyncTx = encodeTx "efgh" } - addEventToCheck tenv $ mkAppEvent "efgh" + a <- replicateM 10 $ randomIO @Char + addEventToCheck tenv "name" + let txReq = RPC.RequestBroadcastTxSync { RPC.requestBroadcastTxSyncTx = encodeTx $ "name=" <> a } -- sync only returns a CheckTx resp <- runRPC $ RPC.broadcastTxSync txReq RPC.resultBroadcastTxCode resp `shouldBe` 0 @@ -61,8 +65,9 @@ spec = do it "Can submit a commit tx, make sure the response code is 0 (success), and get the result(s)" $ \tenv -> do -- /broadcast_tx_commit -- set name key - let broadcastTxReq = RPC.RequestBroadcastTxCommit { RPC.requestBroadcastTxCommitTx = encodeTx "name=satoshi" } - addEventToCheck tenv $ mkAppEvent "name" + addEventToCheck tenv "name" + a <- replicateM 10 $ randomIO @Char + let broadcastTxReq = RPC.RequestBroadcastTxCommit { RPC.requestBroadcastTxCommitTx = encodeTx $ "name=" <> a } broadcastResp <- runRPC $ RPC.broadcastTxCommit broadcastTxReq let deliverResp = RPC.resultBroadcastTxCommitDeliverTx broadcastResp deliverRespCode = deliverResp ^. Response._deliverTxCode @@ -78,10 +83,10 @@ spec = do RPC.abciQuery queryReq queryRespWProof <- fmap RPC.resultABCIQueryResponse . runRPC $ RPC.abciQuery queryReqWProof - let foundName = queryResp ^. Response._queryValue . to decodeName - foundNameWProof = queryRespWProof ^. Response._queryValue . to decodeName - foundName `shouldBe` "satoshi" - foundNameWProof `shouldBe` "satoshi" + let foundName = queryResp ^. Response._queryValue + foundNameWProof = queryRespWProof ^. Response._queryValue + decodeQuery foundName `shouldBe` a + decodeQuery foundNameWProof `shouldBe` a -- check with /tx endpoint (w+w/o proof) let hash = RPC.resultBroadcastTxCommitHash $ broadcastResp -- convert hex to base64 @@ -97,17 +102,11 @@ spec = do txResultWPHash `shouldBe` hash - it "Can monitor all events" $ const pending - --it "Can monitor all events" $ \(TestEnv mvex mvres _) -> do - -- expected <- readMVar mvex - -- res <- readMVar mvres - -- (fromList expected `difference` fromList res) `shouldBe` fromList [] + it "Can monitor all events" $ \(TestEnv mvex mvres _) -> do + expected <- readMVar mvex + res <- readMVar mvres + fromList (map A.toJSON expected) `shouldBe` fromList (map A.toJSON res) -encodeTx :: String -> Base64String -encodeTx = Base64.fromBytes . cs @String @ByteString - -decodeName :: Base64String -> String -decodeName = cs @ByteString @String . Base64.toBytes runRPC :: forall a. RPC.TendermintM a -> IO a runRPC = RPC.runTendermintM rpcConfig @@ -120,49 +119,45 @@ runRPC = RPC.runTendermintM rpcConfig in RPC.Config baseReq (prettyPrint "RPC Request") (prettyPrint "RPC Response") host port tls -- See https://github.com/tendermint/tendermint/blob/master/abci/example/kvstore/kvstore.go#L101 -mkAppEvent :: Text -> App -mkAppEvent k = App "Cosmoshi Netowoko" k - -data App = App - { creator :: Text - , key :: Text - } deriving (Show, Eq, Generic) +mkAppEvent :: String -> FieldTypes.Event +mkAppEvent k = FieldTypes.Event + { eventType = "app" + , eventAttributes = + [ FieldTypes.KVPair (encode "creator") (encode "Cosmoshi Netowoko") + , FieldTypes.KVPair (encode "key") (encode k) + ] + } + where + encode = Base64.fromBytes . cs @String @ByteString -instance ToEvent App +encodeTx :: String -> Base64String +encodeTx = Base64.fromBytes . cs @_ @ByteString +decodeQuery :: Base64String -> String +decodeQuery = cs @ByteString . Base64.toBytes -- Test Init -data TestEnv = TestEnv (MVar [A.Value]) (MVar [A.Value]) [Text] +data TestEnv = TestEnv (MVar [FieldTypes.Event]) (MVar [FieldTypes.Event]) (MVar [Text]) testInit :: IO TestEnv -testInit = do - expectedEventsMVar <- newMVar [] - resultEventsMVar <- newMVar [] - pure $ TestEnv expectedEventsMVar resultEventsMVar [] - -addEventToCheck :: ToEvent a => TestEnv -> a -> IO () -addEventToCheck (TestEnv mvexpected mvres ses) ev = do - modifyMVar_ mvexpected $ \es -> pure $ es <> [A.toJSON . toEvent $ ev] - let evType = eventType (toEvent ev) +testInit = TestEnv <$> newMVar [] <*> newMVar [] <*> newMVar [] + +addEventToCheck :: TestEnv -> String -> IO () +addEventToCheck (TestEnv mvexpected mvseen mveventTypes) ev = do + let appEv = mkAppEvent ev + modifyMVar_ mvexpected $ pure . (appEv :) + ses <- readMVar mveventTypes + let evType = FieldTypes.eventType appEv if evType`elem` ses then pure () - else startNewListener evType + else do + _ <- startNewListener evType + modifyMVar_ mveventTypes $ pure . (evType :) where startNewListener evType = let subReq = RPC.RequestSubscribe ("tm.event = 'Tx' AND " <> evType <> " EXISTS") - forkTendermintM = void . forkIO . void . runRPC - in forkTendermintM $ RPC.subscribe subReq (handler evType) - handler evType res = case res ^? txEvents of - Nothing -> pure () - Just v -> case A.fromJSON v of - A.Error _ -> error ("Failed to parse\n" <> cs (A.encode v) ) - A.Success evs -> - let filterFn v' = evType == eventType v' - filteredEvs = filter filterFn evs - in modifyMVar_ mvres $ \es -> pure $ es <> map A.toJSON filteredEvs - txEvents = A.key "result" - . A.key "data" - . A.key "value" - . A.key "TxResult" - . A.key "result" - . A.key "events" + eventStorer = awaitForever $ \as -> + liftIO $ modifyMVar_ mvseen $ \es -> pure $ + RPC.txEventEvents as <> es + forkTendermintM = forkIO . runRPC . runResourceT . runConduit + in forkTendermintM $ RPC.subscribe subReq .| eventStorer diff --git a/hs-tendermint-client/package.yaml b/hs-tendermint-client/package.yaml index b1548aab..c6a25eb5 100644 --- a/hs-tendermint-client/package.yaml +++ b/hs-tendermint-client/package.yaml @@ -32,15 +32,22 @@ dependencies: - exceptions - data-default-class + library: source-dirs: src dependencies: - aeson-casing + - conduit - hs-abci-types - http-client - http-conduit + - lens + - lens-aeson - mtl - random + - resourcet + - stm + - stm-conduit - text - websockets - wuss @@ -69,11 +76,13 @@ tests: - -with-rtsopts=-N dependencies: - aeson-pretty + - conduit - hs-abci-types - hs-tendermint-client - hspec - - lens-aeson - lens - text + - random + - resourcet - string-conversions - - hs-abci-sdk + - unordered-containers diff --git a/hs-tendermint-client/src/Network/Tendermint/Client.hs b/hs-tendermint-client/src/Network/Tendermint/Client.hs index 8df188bd..431210b0 100644 --- a/hs-tendermint-client/src/Network/Tendermint/Client.hs +++ b/hs-tendermint-client/src/Network/Tendermint/Client.hs @@ -8,8 +8,17 @@ module Network.Tendermint.Client ) where -import Control.Monad.Reader (ReaderT, +import Control.Concurrent (forkIO, + killThread) +import Control.Concurrent.STM.TQueue (newTQueueIO, + writeTQueue) +import Control.Lens ((^?)) +import Control.Monad.Catch (throwM) +import Control.Monad.IO.Class (liftIO) +import Control.Monad.Reader (ReaderT, ask, runReaderT) +import Control.Monad.STM (atomically) +import Control.Monad.Trans.Resource (ResourceT) import Data.Aeson (FromJSON (..), ToJSON (..), genericParseJSON, @@ -17,9 +26,13 @@ import Data.Aeson (FromJSON (..), import qualified Data.Aeson as Aeson import Data.Aeson.Casing (aesonDrop, snakeCase) +import qualified Data.Aeson.Lens as AL import qualified Data.ByteArray.Base64String as Base64 import Data.ByteArray.HexString (HexString) import Data.ByteString (ByteString) +import Data.Conduit (ConduitT, + bracketP) +import Data.Conduit.TQueue (sourceTQueue) import Data.Default.Class (Default (..)) import Data.Int (Int64) import Data.Text (Text) @@ -242,13 +255,49 @@ instance FromJSON ResultABCIInfo where -- Subscribe -------------------------------------------------------------------------------- +data TxResultEvent a = TxEvent + { txEventBlockHeight :: FieldTypes.WrappedVal Int64 + , txEventTxIndex :: Int64 + , txEventEvents :: a + } deriving (Generic) + +instance FromJSON (TxResultEvent [FieldTypes.Event]) where + parseJSON val = do + let mtxRes = val ^? AL.key "result" + . AL.key "data" + . AL.key "value" + . AL.key "TxResult" + . AL._Object + txRes <- maybe (fail "key not found: result.data.value.TxResult") pure mtxRes + height <- txRes Aeson..: "height" + idx <- txRes Aeson..: "index" + res' <- txRes Aeson..: "result" + es <- res' Aeson..: "events" + pure TxEvent + { txEventBlockHeight = height + , txEventTxIndex = idx + , txEventEvents = es + } + -- | invokes [/subscribe](https://tendermint.com/rpc/#subscribe) rpc call -- https://github.com/tendermint/tendermint/blob/master/rpc/core/events.go#L17 -subscribe :: RequestSubscribe -> (Aeson.Value -> IO ()) -> TendermintM ResultSubscribe -subscribe req handler = do - RPC.remoteWS (RPC.MethodName "subscribe") req handler - pure ResultSubscribe - +subscribe + :: RequestSubscribe + -> ConduitT () (TxResultEvent [FieldTypes.Event]) (ResourceT TendermintM) () +subscribe req = do + queue <- liftIO newTQueueIO + let handler (val :: Aeson.Value) = + let isEmptyResult = val ^? AL.key "result" == Just (Aeson.Object mempty) + in if isEmptyResult + then pure () + else case Aeson.eitherDecode . Aeson.encode $ val of + Left err -> throwM (RPC.ParsingException err) + Right a -> atomically $ writeTQueue queue a + cfg <- ask + bracketP + (forkIO $ RPC.remoteWS cfg (RPC.MethodName "subscribe") req handler) + killThread + (const $ sourceTQueue queue) newtype RequestSubscribe = RequestSubscribe { requestSubscribeQuery :: Text diff --git a/hs-tendermint-client/src/Network/Tendermint/Client/Internal/RPCClient.hs b/hs-tendermint-client/src/Network/Tendermint/Client/Internal/RPCClient.hs index 19a91b77..d2d2fec9 100644 --- a/hs-tendermint-client/src/Network/Tendermint/Client/Internal/RPCClient.hs +++ b/hs-tendermint-client/src/Network/Tendermint/Client/Internal/RPCClient.hs @@ -2,7 +2,7 @@ module Network.Tendermint.Client.Internal.RPCClient where import Control.Applicative ((<|>)) import Control.Exception (Exception) -import Control.Monad (forever, void) +import Control.Monad (forever) import Control.Monad.Catch (throwM) import Control.Monad.IO.Class (MonadIO, liftIO) import Control.Monad.Reader (MonadReader, ask) @@ -109,34 +109,32 @@ data Config = Config } remoteWS :: - ( MonadIO m - , MonadReader Config m - , FromJSON output + ( FromJSON output , ToJSON input ) - => MethodName + => Config + -> MethodName -> input -> (output -> IO ()) - -> m () -{-# INLINE remoteWS #-} -remoteWS method input handler = do - Config {..} <- ask + -> IO () +remoteWS Config{..} method input handler = do let host = BS.unpack cHost port = fromInteger $ toInteger cPort tlsPort = fromInteger $ toInteger port path = "/websocket" if tlsEnabled - then void . liftIO $ runSecureClient host tlsPort path ws - else void . liftIO $ WS.runClient host port path ws + then runSecureClient host tlsPort path ws + else WS.runClient host port path ws where ws c = do - rid <- abs <$> liftIO randomIO + rid <- abs <$> randomIO let rpcParams = Aeson.toJSON input rpcRequest = Request method rid rpcParams msg = WS.Binary $ Aeson.encode rpcRequest WS.sendDataMessage c msg - void . forever $ do - message <- WS.receiveData c >>= decodeRPCResponse + forever $ do + bs <- WS.receiveData c + message <- decodeRPCResponse bs handler message decodeRPCResponse bs = case Aeson.eitherDecodeStrict bs of Left err -> throwM $ ParsingException err