Skip to content

Commit

Permalink
re work goroutine to properly close
Browse files Browse the repository at this point in the history
  • Loading branch information
edjroz committed Nov 21, 2024
1 parent 54a4121 commit 0356b51
Showing 1 changed file with 23 additions and 22 deletions.
45 changes: 23 additions & 22 deletions integration_tests/abstract_account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"path"
"reflect"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -817,17 +816,13 @@ func TestXionClientEvent(t *testing.T) {
require.NoError(t, err)

//eventStream, err := subscribeToEvent(t, ctx, cometTypes.EventTx, cometWsClient)
eventStream, err := subscribeToEvent(t, ctx, aaContractAddr, cometWsClient)
eventStream, err := subscribeToEvent(t, ctx, cometWsClient)

require.NoError(t, err)

// note: MASSIVELY unsafe, need to be able to cancel, consider wrapping around a separate goroutine and adding a work timer
var wg sync.WaitGroup
wg.Add(1)
go receiveEvents(t, &wg, eventStream)
doneChan := make(chan struct{})
go receiveEvents(t, doneChan, eventStream)

time.Sleep(10 * time.Second) // sleeping for 5 seconds to make sure we intercept transactions
fmt.Println("slept for 10s")
jsonExecMsgStr, err = GenerateTx(t, ctx, xion.GetNode(),
xionUser.KeyName(),
"xion", "emit", "arbitrary_data", aaContractAddr,
Expand Down Expand Up @@ -862,36 +857,42 @@ func TestXionClientEvent(t *testing.T) {
require.NoError(t, err) // it's returning an error and it's not throwing
fmt.Println("we have thrown a transaction")

wg.Wait()
stopClient(ctx, cometWsClient) // could be a defered function
//wg.Wait()
<-doneChan
stopClient(ctx, cometWsClient)
}

//TODO: change smart contract for the event emission one
//TODO: create a small client

func getCometClient(hostAddr string) (cometClient.Client, error) {
return rpchttp.New(hostAddr, "/websocket")
}

func subscribeToEvent(t *testing.T, ctx context.Context, eventType string, cli cometClient.Client) (<-chan cometRpcCoreTypes.ResultEvent, error) {
ev := fmt.Sprintf("message.module='wasm' AND message.action='/cosmwasm.wasm.v1.MsgExecuteContract' AND wasm-account_emit._contract_address='%s'", eventType)
fmt.Printf("%s\n\n\n", ev)
return cli.Subscribe(ctx, "helpers", "message.module='wasm' AND message.action='/cosmwasm.wasm.v1.MsgExecuteContract' AND eventType.eventAttribute='wasm-account_emit._contract_address'")
func subscribeToEvent(t *testing.T, ctx context.Context, cli cometClient.Client) (<-chan cometRpcCoreTypes.ResultEvent, error) {
return cli.Subscribe(ctx, "helpers", "message.module='wasm' AND message.action='/cosmwasm.wasm.v1.MsgExecuteContract'")
}

func receiveEvents(t *testing.T, wg *sync.WaitGroup, eventStream <-chan cometRpcCoreTypes.ResultEvent) {
defer wg.Done() // make sure the function has a way out
func receiveEvents(t *testing.T, done chan<- struct{}, eventStream <-chan cometRpcCoreTypes.ResultEvent) {
for {
select {
case event := <-eventStream:
fmt.Println("event intercepted")
fmt.Println(event)
break
contractAddress, ok := event.Events["wasm-account_emit._contract_address"]
if !ok {
fmt.Println("not desired event")
continue
}
arbData, ok := event.Events["wasm-account_emit.data"]
if !ok {
fmt.Println("not desired event")
continue
}
fmt.Println(contractAddress)
fmt.Println(arbData)
done <- struct{}{}
return
default:
continue
}
}
return
}

func stopClient(ctx context.Context, cli cometClient.Client) error {
Expand Down

0 comments on commit 0356b51

Please sign in to comment.