Skip to content

Commit

Permalink
Ws event conduit (#228)
Browse files Browse the repository at this point in the history
* wip

* appears to be working

* tests pass

* remove bad toJSON

* update nameservice tests

* use bracketP

* hlint
  • Loading branch information
martyall authored Mar 11, 2020
1 parent a515e64 commit 69583e3
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 172 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ deploy-nameservice-test: install ## run the nameservice docker network for testi
# Tests
#####################

test-kv-store: install ## Run the test suite for the client interface
test-kv-store: ## Run the test suite for the client interface
stack test hs-tendermint-client

test-simple-storage: install ## Run the test suite for the simple-storage example application
Expand Down
6 changes: 3 additions & 3 deletions hs-abci-docs/nameservice/package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -183,17 +183,17 @@ tests:
- -with-rtsopts=-N
dependencies:
- base >= 4.7 && < 5
- conduit
- data-default-class
- hs-abci-sdk
- hs-abci-test-utils
- hs-abci-types
- hs-tendermint-client
- hspec
- aeson
- mtl
- nameservice
- resourcet
- servant
- text
- unordered-containers
- lens-aeson
- lens
- string-conversions
129 changes: 61 additions & 68 deletions hs-abci-docs/nameservice/test/Nameservice/Test/E2ESpec.hs
Original file line number Diff line number Diff line change
@@ -1,50 +1,51 @@
module Nameservice.Test.E2ESpec (spec) where

import Control.Concurrent (forkIO)
import Control.Concurrent.MVar (MVar, modifyMVar_, newMVar,
readMVar)
import Control.Lens ((^?))
import Control.Monad (forM_, void)
import Control.Monad.Reader (ReaderT, runReaderT)
import qualified Data.Aeson as A
import Data.Aeson.Lens (key)
import Data.Default.Class (def)
import Data.HashSet (fromList)
import Control.Concurrent (forkIO)
import Control.Concurrent.MVar (MVar, modifyMVar_,
newMVar, readMVar)
import Control.Monad (forM_, void)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Reader (ReaderT, runReaderT)
import Control.Monad.Trans.Resource (runResourceT)
import qualified Data.Aeson as A
import Data.Conduit (awaitForever,
runConduit, (.|))
import Data.Default.Class (def)
import Data.HashSet (fromList)
import Data.Proxy
import Data.String.Conversions (cs)
import Data.Text (Text)
import Data.Text (Text)
import Nameservice.Application
import qualified Nameservice.Modules.Nameservice as N
import Nameservice.Test.EventOrphans ()
import qualified Network.Tendermint.Client as RPC
import Servant.API ((:<|>) (..))
import qualified Tendermint.SDK.Application.Module as M
import Tendermint.SDK.BaseApp.Errors (AppError (..))
import Tendermint.SDK.BaseApp.Events (Event (..), ToEvent (..))
import Tendermint.SDK.BaseApp.Query (QueryArgs (..),
QueryResult (..),
defaultQueryArgs)
import qualified Tendermint.SDK.Modules.Auth as Auth
import qualified Tendermint.SDK.Modules.Bank as B
import Tendermint.SDK.Types.Address (Address)
import Tendermint.Utils.Client (ClientConfig (..),
EmptyTxClient (..),
HasQueryClient (..),
HasTxClient (..),
QueryClientResponse (..),
Signer (..),
TxClientResponse (..),
TxOpts (..),
defaultClientTxOpts)
import Tendermint.Utils.ClientUtils (assertQuery, assertTx,
deliverTxEvents,
ensureQueryResponseCode,
ensureResponseCodes,
rpcConfig)
import Tendermint.Utils.Events (FromEvent (..))
import Tendermint.Utils.User (makeSignerFromUser,
makeUser)

import qualified Nameservice.Modules.Nameservice as N
import Nameservice.Test.EventOrphans ()
import qualified Network.ABCI.Types.Messages.FieldTypes as FT
import qualified Network.Tendermint.Client as RPC
import Servant.API ((:<|>) (..))
import qualified Tendermint.SDK.Application.Module as M
import Tendermint.SDK.BaseApp.Errors (AppError (..))
import Tendermint.SDK.BaseApp.Events (ToEvent (..))
import Tendermint.SDK.BaseApp.Query (QueryArgs (..),
QueryResult (..),
defaultQueryArgs)
import qualified Tendermint.SDK.Modules.Auth as Auth
import qualified Tendermint.SDK.Modules.Bank as B
import Tendermint.SDK.Types.Address (Address)
import Tendermint.Utils.Client (ClientConfig (..),
EmptyTxClient (..),
HasQueryClient (..),
HasTxClient (..),
QueryClientResponse (..),
Signer (..),
TxClientResponse (..),
TxOpts (..),
defaultClientTxOpts)
import Tendermint.Utils.ClientUtils (assertQuery, assertTx,
deliverTxEvents,
ensureQueryResponseCode,
ensureResponseCodes,
rpcConfig)
import Tendermint.Utils.Events (FromEvent (..))
import Tendermint.Utils.User (makeSignerFromUser,
makeUser)
import Test.Hspec


Expand Down Expand Up @@ -358,7 +359,7 @@ spec = do
it "Can monitor all events" $ \(TestEnv mvex mvres _) -> do
expected <- readMVar mvex
res <- readMVar mvres
fromList expected `shouldBe` fromList res
fromList (map A.toJSON expected) `shouldBe` fromList (map A.toJSON res)


faucetUser
Expand Down Expand Up @@ -486,38 +487,30 @@ faucet
txApiDP = Proxy

-- Test Init
data TestEnv = TestEnv (MVar [A.Value]) (MVar [A.Value]) [Text]
data TestEnv = TestEnv (MVar [FT.Event]) (MVar [FT.Event]) (MVar [Text])

testInit :: Auth.Amount -> IO TestEnv
testInit faucetAmount = do
forM_ [user1, user2] $ faucetUser faucetAmount
expectedEventsMVar <- newMVar []
resultEventsMVar <- newMVar []
pure $ TestEnv expectedEventsMVar resultEventsMVar []
TestEnv <$> newMVar [] <*> newMVar [] <*> newMVar []


addEventToCheck :: ToEvent a => TestEnv -> a -> IO ()
addEventToCheck (TestEnv mvexpected mvres ses) ev = do
modifyMVar_ mvexpected $ \es -> pure $ es <> [A.toJSON . toEvent $ ev]
let evType = eventType (toEvent ev)
addEventToCheck (TestEnv mvexpected mvseen mveventTypes) ev = do
let appEv = toEvent ev
modifyMVar_ mvexpected $ pure . (appEv :)
ses <- readMVar mveventTypes
let evType = FT.eventType appEv
if evType`elem` ses
then pure ()
else startNewListener evType
else do
_ <- startNewListener evType
modifyMVar_ mveventTypes $ pure . (evType :)
where
startNewListener evType =
let subReq = RPC.RequestSubscribe ("tm.event = 'Tx' AND " <> evType <> " EXISTS")
forkTendermintM = void . forkIO . void . RPC.runTendermintM rpcConfig
in forkTendermintM $ RPC.subscribe subReq (handler evType)
handler evType res = case res ^? txEvents of
Nothing -> pure ()
Just v -> case A.fromJSON v of
A.Error _ -> error ("Failed to parse\n" <> cs (A.encode v) )
A.Success evs ->
let filterFn v' = evType == eventType v'
filteredEvs = filter filterFn evs
in modifyMVar_ mvres $ \es -> pure $ es <> map A.toJSON filteredEvs
txEvents = key "result"
. key "data"
. key "value"
. key "TxResult"
. key "result"
. key "events"
eventStorer = awaitForever $ \as ->
liftIO $ modifyMVar_ mvseen $ \es -> pure $
RPC.txEventEvents as <> es
forkTendermintM = forkIO . RPC.runTendermintM rpcConfig . runResourceT . runConduit
in forkTendermintM $ RPC.subscribe subReq .| eventStorer
151 changes: 73 additions & 78 deletions hs-tendermint-client/kv-test/KVStore/Test/KVSpec.hs
Original file line number Diff line number Diff line change
@@ -1,28 +1,30 @@
module KVStore.Test.KVSpec (spec) where

import Control.Concurrent (forkIO)
import Control.Concurrent.MVar (MVar, modifyMVar_,
newMVar)
import Control.Lens (to, (^.))
import Control.Lens.Fold ((^?))
import Control.Monad (void)
import Control.Monad.Catch (try)
import qualified Data.Aeson as A
import Data.Aeson.Encode.Pretty (encodePretty)
import qualified Data.Aeson.Lens as A
import Data.ByteArray.Base64String (Base64String)
import qualified Data.ByteArray.Base64String as Base64
import qualified Data.ByteArray.HexString as Hex
import Data.ByteString (ByteString)
import Data.Default.Class (def)
import Data.Either (isRight)
--import Data.HashSet (difference, fromList)
import Data.String.Conversions (cs)
import Data.Text (Text)
import GHC.Generics (Generic)
import qualified Network.ABCI.Types.Messages.Response as Response
import qualified Network.Tendermint.Client as RPC
import Tendermint.SDK.BaseApp.Events (Event (..), ToEvent (..))
import Control.Concurrent (forkIO)
import Control.Concurrent.MVar (MVar, modifyMVar_,
newMVar, readMVar)
import Control.Lens ((^.))
import Control.Monad (replicateM)
import Control.Monad.Catch (try)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans.Resource (runResourceT)
import qualified Data.Aeson as A
import Data.Aeson.Encode.Pretty (encodePretty)
import Data.ByteArray.Base64String (Base64String)
import qualified Data.ByteArray.Base64String as Base64
import qualified Data.ByteArray.HexString as Hex
import Data.ByteString (ByteString)
import Data.Conduit (awaitForever,
runConduit, (.|))
import Data.Default.Class (def)
import Data.Either (isRight)
import Data.HashSet (fromList)
import Data.String.Conversions (cs)
import Data.Text (Text)
import qualified Network.ABCI.Types.Messages.FieldTypes as FieldTypes
import qualified Network.ABCI.Types.Messages.Response as Response
import qualified Network.Tendermint.Client as RPC
import System.Random (randomIO)
import Test.Hspec


Expand All @@ -45,24 +47,27 @@ spec = do
result `shouldSatisfy` isRight

it "Can submit a async tx and the response code is 0 (success)" $ \tenv -> do
let asyncTxReq = RPC.RequestBroadcastTxAsync { RPC.requestBroadcastTxAsyncTx = encodeTx "abcd" }
addEventToCheck tenv $ mkAppEvent "abcd"
a <- replicateM 10 $ randomIO @Char
addEventToCheck tenv "name"
let asyncTxReq = RPC.RequestBroadcastTxAsync { RPC.requestBroadcastTxAsyncTx = encodeTx $ "name=" <> a }
-- async returns nothing
resp <- runRPC $ RPC.broadcastTxAsync asyncTxReq
RPC.resultBroadcastTxCode resp `shouldBe` 0

it "Can submit a sync tx and the response code is 0 (success)" $ \tenv -> do
let txReq = RPC.RequestBroadcastTxSync { RPC.requestBroadcastTxSyncTx = encodeTx "efgh" }
addEventToCheck tenv $ mkAppEvent "efgh"
a <- replicateM 10 $ randomIO @Char
addEventToCheck tenv "name"
let txReq = RPC.RequestBroadcastTxSync { RPC.requestBroadcastTxSyncTx = encodeTx $ "name=" <> a }
-- sync only returns a CheckTx
resp <- runRPC $ RPC.broadcastTxSync txReq
RPC.resultBroadcastTxCode resp `shouldBe` 0

it "Can submit a commit tx, make sure the response code is 0 (success), and get the result(s)" $ \tenv -> do
-- /broadcast_tx_commit
-- set name key
let broadcastTxReq = RPC.RequestBroadcastTxCommit { RPC.requestBroadcastTxCommitTx = encodeTx "name=satoshi" }
addEventToCheck tenv $ mkAppEvent "name"
addEventToCheck tenv "name"
a <- replicateM 10 $ randomIO @Char
let broadcastTxReq = RPC.RequestBroadcastTxCommit { RPC.requestBroadcastTxCommitTx = encodeTx $ "name=" <> a }
broadcastResp <- runRPC $ RPC.broadcastTxCommit broadcastTxReq
let deliverResp = RPC.resultBroadcastTxCommitDeliverTx broadcastResp
deliverRespCode = deliverResp ^. Response._deliverTxCode
Expand All @@ -78,10 +83,10 @@ spec = do
RPC.abciQuery queryReq
queryRespWProof <- fmap RPC.resultABCIQueryResponse . runRPC $
RPC.abciQuery queryReqWProof
let foundName = queryResp ^. Response._queryValue . to decodeName
foundNameWProof = queryRespWProof ^. Response._queryValue . to decodeName
foundName `shouldBe` "satoshi"
foundNameWProof `shouldBe` "satoshi"
let foundName = queryResp ^. Response._queryValue
foundNameWProof = queryRespWProof ^. Response._queryValue
decodeQuery foundName `shouldBe` a
decodeQuery foundNameWProof `shouldBe` a
-- check with /tx endpoint (w+w/o proof)
let hash = RPC.resultBroadcastTxCommitHash $ broadcastResp
-- convert hex to base64
Expand All @@ -97,17 +102,11 @@ spec = do
txResultWPHash `shouldBe` hash


it "Can monitor all events" $ const pending
--it "Can monitor all events" $ \(TestEnv mvex mvres _) -> do
-- expected <- readMVar mvex
-- res <- readMVar mvres
-- (fromList expected `difference` fromList res) `shouldBe` fromList []
it "Can monitor all events" $ \(TestEnv mvex mvres _) -> do
expected <- readMVar mvex
res <- readMVar mvres
fromList (map A.toJSON expected) `shouldBe` fromList (map A.toJSON res)

encodeTx :: String -> Base64String
encodeTx = Base64.fromBytes . cs @String @ByteString

decodeName :: Base64String -> String
decodeName = cs @ByteString @String . Base64.toBytes

runRPC :: forall a. RPC.TendermintM a -> IO a
runRPC = RPC.runTendermintM rpcConfig
Expand All @@ -120,49 +119,45 @@ runRPC = RPC.runTendermintM rpcConfig
in RPC.Config baseReq (prettyPrint "RPC Request") (prettyPrint "RPC Response") host port tls

-- See https://github.com/tendermint/tendermint/blob/master/abci/example/kvstore/kvstore.go#L101
mkAppEvent :: Text -> App
mkAppEvent k = App "Cosmoshi Netowoko" k

data App = App
{ creator :: Text
, key :: Text
} deriving (Show, Eq, Generic)
mkAppEvent :: String -> FieldTypes.Event
mkAppEvent k = FieldTypes.Event
{ eventType = "app"
, eventAttributes =
[ FieldTypes.KVPair (encode "creator") (encode "Cosmoshi Netowoko")
, FieldTypes.KVPair (encode "key") (encode k)
]
}
where
encode = Base64.fromBytes . cs @String @ByteString

instance ToEvent App
encodeTx :: String -> Base64String
encodeTx = Base64.fromBytes . cs @_ @ByteString

decodeQuery :: Base64String -> String
decodeQuery = cs @ByteString . Base64.toBytes

-- Test Init
data TestEnv = TestEnv (MVar [A.Value]) (MVar [A.Value]) [Text]
data TestEnv = TestEnv (MVar [FieldTypes.Event]) (MVar [FieldTypes.Event]) (MVar [Text])

testInit :: IO TestEnv
testInit = do
expectedEventsMVar <- newMVar []
resultEventsMVar <- newMVar []
pure $ TestEnv expectedEventsMVar resultEventsMVar []

addEventToCheck :: ToEvent a => TestEnv -> a -> IO ()
addEventToCheck (TestEnv mvexpected mvres ses) ev = do
modifyMVar_ mvexpected $ \es -> pure $ es <> [A.toJSON . toEvent $ ev]
let evType = eventType (toEvent ev)
testInit = TestEnv <$> newMVar [] <*> newMVar [] <*> newMVar []

addEventToCheck :: TestEnv -> String -> IO ()
addEventToCheck (TestEnv mvexpected mvseen mveventTypes) ev = do
let appEv = mkAppEvent ev
modifyMVar_ mvexpected $ pure . (appEv :)
ses <- readMVar mveventTypes
let evType = FieldTypes.eventType appEv
if evType`elem` ses
then pure ()
else startNewListener evType
else do
_ <- startNewListener evType
modifyMVar_ mveventTypes $ pure . (evType :)
where
startNewListener evType =
let subReq = RPC.RequestSubscribe ("tm.event = 'Tx' AND " <> evType <> " EXISTS")
forkTendermintM = void . forkIO . void . runRPC
in forkTendermintM $ RPC.subscribe subReq (handler evType)
handler evType res = case res ^? txEvents of
Nothing -> pure ()
Just v -> case A.fromJSON v of
A.Error _ -> error ("Failed to parse\n" <> cs (A.encode v) )
A.Success evs ->
let filterFn v' = evType == eventType v'
filteredEvs = filter filterFn evs
in modifyMVar_ mvres $ \es -> pure $ es <> map A.toJSON filteredEvs
txEvents = A.key "result"
. A.key "data"
. A.key "value"
. A.key "TxResult"
. A.key "result"
. A.key "events"
eventStorer = awaitForever $ \as ->
liftIO $ modifyMVar_ mvseen $ \es -> pure $
RPC.txEventEvents as <> es
forkTendermintM = forkIO . runRPC . runResourceT . runConduit
in forkTendermintM $ RPC.subscribe subReq .| eventStorer
Loading

0 comments on commit 69583e3

Please sign in to comment.