Skip to content

Commit

Permalink
[feature] sync block tested
Browse files Browse the repository at this point in the history
  • Loading branch information
lukewwww committed Aug 22, 2023
1 parent 5b72766 commit 3b9cb1d
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 31 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ config/config.yml
config/private_key.go
config/test_private_key.go
*.exe
*.log
6 changes: 5 additions & 1 deletion build/config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ blockchain:
web_socket_endpoint: "wss://block-node.crynux.ai/ws"
start_block_num: 1431
gas_limit: 4294967
account:
address: ""
contracts:
task: "0x81968268d3aCdCba99a677C960C2B5dFb8B38768"
node: "0xcc0576ceEc40A9309f231d59B36A5c6e5625d6e5"
crynux_token: "0x2045334b59E72B91ee072b6971F1eAbFa496A5D7"
crynux_token: "0x2045334b59E72B91ee072b6971F1eAbFa496A5D7"
test:
root_account: ""
4 changes: 4 additions & 0 deletions config/config.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ blockchain:
web_socket_endpoint: "wss://block-node.crynux.ai/ws"
start_block_num: 1431
gas_limit: 4294967
account:
address: ""
contracts:
task: "0x81968268d3aCdCba99a677C960C2B5dFb8B38768"
node: "0xcc0576ceEc40A9309f231d59B36A5c6e5625d6e5"
crynux_token: "0x2045334b59E72B91ee072b6971F1eAbFa496A5D7"
test:
root_account: ""
5 changes: 5 additions & 0 deletions models/inference_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
log "github.com/sirupsen/logrus"
"gorm.io/gorm"
"strconv"
)
Expand Down Expand Up @@ -63,6 +64,8 @@ func (t *InferenceTask) GetTaskHash() (*common.Hash, error) {
return nil, err
}

log.Debugln("task hash string: " + string(taskHashBytes))

hash := crypto.Keccak256Hash(taskHashBytes)
return &hash, nil
}
Expand All @@ -81,6 +84,8 @@ func (t *InferenceTask) GetDataHash() (*common.Hash, error) {
return nil, err
}

log.Debugln("data hash string: " + string(dataHashBytes))

hash := crypto.Keccak256Hash(dataHashBytes)
return &hash, nil
}
109 changes: 80 additions & 29 deletions tasks/sync_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tasks
import (
"context"
"errors"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
Expand All @@ -14,60 +15,103 @@ import (
"time"
)

func StartSyncBlock() {
interval := 1
func StartSyncBlockWithTerminateChannel(ch <-chan int) {
appConfig := config.GetConfig()
syncedBlock := &models.SyncedBlock{}

if err := config.GetDB().First(&syncedBlock).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
syncedBlock.BlockNumber = appConfig.Blockchain.StartBlockNum
} else {
log.Errorln("error getting the block sync checkpoint from db")
log.Fatal(err)
}
}

client, err := blockchain.GetWebSocketClient()
if err != nil {
log.Errorln("error start websocket client from blockchain")
log.Fatal(err)
}

headers := make(chan *types.Header)

sub, err := client.SubscribeNewHead(context.Background(), headers)
if err != nil {
log.Errorln("error subscribing for new blocks")
log.Fatal(err)
}

for {
select {
case err := <-sub.Err():
log.Errorln(err)
time.Sleep(time.Duration(interval) * time.Second)
case header := <-headers:
case stop := <-ch:
if stop == 1 {
return
} else {
processChannel(sub, headers, syncedBlock)
}
default:
processChannel(sub, headers, syncedBlock)
}
}
}

func StartSyncBlock() {
appConfig := config.GetConfig()
syncedBlock := &models.SyncedBlock{}

log.Debugln("new block received: " + header.Number.String())
if err := config.GetDB().First(&syncedBlock).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
syncedBlock.BlockNumber = appConfig.Blockchain.StartBlockNum
} else {
log.Fatal(err)
}
}

currentBlockNum := header.Number
client, err := blockchain.GetWebSocketClient()
if err != nil {
log.Fatal(err)
}

if err := processTaskCreated(syncedBlock.BlockNumber+1, currentBlockNum.Uint64()); err != nil {
log.Errorln(err)
time.Sleep(time.Duration(interval) * time.Second)
continue
}
headers := make(chan *types.Header)

oldNum := syncedBlock.BlockNumber
syncedBlock.BlockNumber = header.Number.Uint64()
if err := config.GetDB().Save(&syncedBlock).Error; err != nil {
syncedBlock.BlockNumber = oldNum
log.Errorln(err)
time.Sleep(time.Duration(interval) * time.Second)
}
sub, err := client.SubscribeNewHead(context.Background(), headers)
if err != nil {
log.Fatal(err)
}

for {
processChannel(sub, headers, syncedBlock)
}
}

func processChannel(sub ethereum.Subscription, headers chan *types.Header, syncedBlock *models.SyncedBlock) {

interval := 1

select {
case err := <-sub.Err():
log.Errorln(err)
time.Sleep(time.Duration(interval) * time.Second)
case header := <-headers:

currentBlockNum := header.Number

log.Debugln("new block received: " + header.Number.String())

if err := processTaskCreated(syncedBlock.BlockNumber+1, currentBlockNum.Uint64()); err != nil {
log.Errorln(err)
time.Sleep(time.Duration(interval) * time.Second)
return
}

oldNum := syncedBlock.BlockNumber
syncedBlock.BlockNumber = currentBlockNum.Uint64()
if err := config.GetDB().Save(syncedBlock).Error; err != nil {
syncedBlock.BlockNumber = oldNum
log.Errorln(err)
time.Sleep(time.Duration(interval) * time.Second)
}
}

time.Sleep(time.Duration(interval) * time.Second)
}

func processTaskCreated(startBlockNum, endBlockNum uint64) error {
Expand All @@ -77,14 +121,11 @@ func processTaskCreated(startBlockNum, endBlockNum uint64) error {
return err
}

ctx, cancelFn := context.WithTimeout(context.Background(), time.Duration(3)*time.Second)
defer cancelFn()

taskCreatedEventIterator, err := taskContractInstance.FilterTaskCreated(
&bind.FilterOpts{
Start: startBlockNum,
End: &endBlockNum,
Context: ctx,
Context: context.Background(),
},
nil,
nil,
Expand Down Expand Up @@ -115,13 +156,23 @@ func processTaskCreated(startBlockNum, endBlockNum uint64) error {
Status: models.InferenceTaskCreatedOnChain,
}

if err := config.GetDB().Create(&task).Error; err != nil {
if err := config.GetDB().Create(task).Error; err != nil {
if !errors.Is(err, gorm.ErrDuplicatedKey) {
return err
} else {
log.Debugln("duplicated task id, the task created events of the same task")

condition := &models.InferenceTask{
TaskId: task.TaskId,
}

if err := config.GetDB().Where(condition).First(task).Error; err != nil {
return err
}
}
}

association := config.GetDB().Model(&task).Association("SelectedNodes")
association := config.GetDB().Model(task).Association("SelectedNodes")

if err := association.Append(&models.SelectedNode{NodeAddress: taskCreated.SelectedNode.Hex()}); err != nil {
return err
Expand Down
34 changes: 33 additions & 1 deletion tasks/sync_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,21 @@ import (
"h_relay/blockchain"
"h_relay/config"
"h_relay/models"
"h_relay/tasks"
"h_relay/tests"
v1 "h_relay/tests/api/v1"
"testing"
"time"
)

func TestTaskCreatedOnChain(t *testing.T) {

err := tests.SyncToLatestBlock()
assert.Nil(t, err, "error syncing to the latest block")

syncBlockChan := make(chan int)
go tasks.StartSyncBlockWithTerminateChannel(syncBlockChan)

addresses, privateKeys, err := tests.PrepareAccountsWithTokens()
assert.Nil(t, err, "error preparing accounts")

Expand All @@ -38,6 +46,30 @@ func TestTaskCreatedOnChain(t *testing.T) {
_, err = blockchain.CreateTaskOnChain(task)
assert.Nil(t, err, "error creating task on chain")

time.Sleep(20 * time.Second)
time.Sleep(30 * time.Second)

taskInDb := &models.InferenceTask{}

err = config.GetDB().Model(taskInDb).First(taskInDb).Error
assert.Nil(t, err, "task not created")

// Task in DB has no params for now
// The params will be uploaded by the task creator later

taskHash, err := task.GetTaskHash()
assert.Nil(t, err, "error getting task hash")

assert.Equal(t, taskHash.Hex(), taskInDb.TaskHash, "task hash mismatch")

taskDataHash, err := task.GetDataHash()
assert.Nil(t, err, "error getting task data hash")

assert.Equal(t, taskDataHash.Hex(), taskInDb.DataHash, "task hash mismatch")

t.Cleanup(func() {
tests.ClearDB()
err := tests.ClearNetwork(addresses, privateKeys)
assert.Nil(t, err, "clear network error")
syncBlockChan <- 1
})
}
2 changes: 2 additions & 0 deletions tests/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ blockchain:
web_socket_endpoint: "wss://block-node.crynux.ai/ws"
start_block_num: 1431
gas_limit: 4294967
account:
address: "0xd075aB490857256e6fc85d75d8315e7c9914e008"
contracts:
task: "0x81968268d3aCdCba99a677C960C2B5dFb8B38768"
node: "0xcc0576ceEc40A9309f231d59B36A5c6e5625d6e5"
Expand Down

0 comments on commit 3b9cb1d

Please sign in to comment.