Skip to content

Commit

Permalink
Merge pull request #168 from ElrondNetwork/feat/EN-2123-resolvers-bul…
Browse files Browse the repository at this point in the history
…k-data

implemented sending transactions in configurable chunks
  • Loading branch information
iulianpascalau authored May 25, 2019
2 parents 3547435 + 4ebc707 commit dc8ee21
Show file tree
Hide file tree
Showing 25 changed files with 931 additions and 137 deletions.
44 changes: 43 additions & 1 deletion cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/ElrondNetwork/elrond-go-sandbox/core"
"github.com/ElrondNetwork/elrond-go-sandbox/core/genesis"
"github.com/ElrondNetwork/elrond-go-sandbox/core/logger"
"github.com/ElrondNetwork/elrond-go-sandbox/core/partitioning"
"github.com/ElrondNetwork/elrond-go-sandbox/core/statistics"
"github.com/ElrondNetwork/elrond-go-sandbox/crypto"
"github.com/ElrondNetwork/elrond-go-sandbox/crypto/signing"
Expand All @@ -44,6 +45,7 @@ import (
"github.com/ElrondNetwork/elrond-go-sandbox/dataRetriever/factory/containers"
metafactoryDataRetriever "github.com/ElrondNetwork/elrond-go-sandbox/dataRetriever/factory/metachain"
shardfactoryDataRetriever "github.com/ElrondNetwork/elrond-go-sandbox/dataRetriever/factory/shard"
"github.com/ElrondNetwork/elrond-go-sandbox/dataRetriever/resolvers"
"github.com/ElrondNetwork/elrond-go-sandbox/dataRetriever/shardedData"
"github.com/ElrondNetwork/elrond-go-sandbox/facade"
"github.com/ElrondNetwork/elrond-go-sandbox/hashing"
Expand Down Expand Up @@ -80,6 +82,7 @@ const (
blsHashSize = 16
blsConsensusType = "bls"
bnConsensusType = "bn"
maxTxsToRequest = 100
)

var (
Expand Down Expand Up @@ -685,6 +688,11 @@ func createShardNode(
return nil, nil, nil, err
}

dataPacker, err := partitioning.NewSizeDataPacker(marshalizer)
if err != nil {
return nil, nil, nil, err
}

txSignKeyGen, txSignPrivKey, txSignPubKey, err := getSigningParams(
ctx,
log,
Expand Down Expand Up @@ -731,6 +739,7 @@ func createShardNode(
marshalizer,
datapool,
uint64ByteSliceConverter,
dataPacker,
)
if err != nil {
return nil, nil, nil, err
Expand Down Expand Up @@ -769,7 +778,7 @@ func createShardNode(
accountsAdapter,
shardCoordinator,
forkDetector,
createRequestHandler(resolversFinder, factory.TransactionTopic, log),
createTxRequestHandler(resolversFinder, factory.TransactionTopic, log),
createRequestHandler(resolversFinder, factory.MiniBlocksTopic, log),
)

Expand Down Expand Up @@ -1125,6 +1134,39 @@ func createMetaNode(
return nd, externalResolver, tpsBenchmark, nil
}

func createTxRequestHandler(resolversFinder dataRetriever.ResolversFinder, baseTopic string, log *logger.Logger) func(destShardID uint32, txHashes [][]byte) {
return func(destShardID uint32, txHashes [][]byte) {
log.Debug(fmt.Sprintf("Requesting %d transactions from shard %d from network...\n", len(txHashes), destShardID))
resolver, err := resolversFinder.CrossShardResolver(baseTopic, destShardID)
if err != nil {
log.Error(fmt.Sprintf("missing resolver to %s topic to shard %d", baseTopic, destShardID))
return
}

txResolver, ok := resolver.(*resolvers.TxResolver)
if !ok {
log.Error("wrong assertion type when creating transaction resolver")
return
}

go func() {
dataSplit := &partitioning.DataSplit{}
sliceBatches, err := dataSplit.SplitDataInChunks(txHashes, maxTxsToRequest)
if err != nil {
log.Error("error requesting transactions: " + err.Error())
return
}

for _, batch := range sliceBatches {
err = txResolver.RequestDataFromHashArray(batch)
if err != nil {
log.Debug("error requesting tx batch: " + err.Error())
}
}
}()
}
}

func createRequestHandler(resolversFinder dataRetriever.ResolversFinder, baseTopic string, log *logger.Logger) func(destShardID uint32, txHash []byte) {
return func(destShardID uint32, txHash []byte) {
log.Debug(fmt.Sprintf("Requesting %s from shard %d with hash %s from network\n", baseTopic, destShardID, toB64(txHash)))
Expand Down
9 changes: 9 additions & 0 deletions core/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,12 @@ var ErrInvalidShardId = errors.New("invalid shard id")

// ErrInvalidRoundDuration signals that an invalid round duration was provided
var ErrInvalidRoundDuration = errors.New("invalid round duration")

// ErrNilMarshalizer signals that a new marshalizer has been provided
var ErrNilMarshalizer = errors.New("nil marshalizer provided")

// ErrInvalidValue signals that a nil value has been provided
var ErrInvalidValue = errors.New("invalid value provided")

// ErrNilInputData signals that a nil data has been provided
var ErrNilInputData = errors.New("nil input data")
47 changes: 47 additions & 0 deletions core/mock/marshalizerMock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package mock

import (
"encoding/json"
"errors"
)

var errMockMarshalizer = errors.New("MarshalizerMock generic error")

// MarshalizerMock that will be used for testing
type MarshalizerMock struct {
Fail bool
}

// Marshal converts the input object in a slice of bytes
func (mm *MarshalizerMock) Marshal(obj interface{}) ([]byte, error) {
if mm.Fail {
return nil, errMockMarshalizer
}

if obj == nil {
return nil, errors.New("nil object to serilize from")
}

return json.Marshal(obj)
}

// Unmarshal applies the serialized values over an instantiated object
func (mm *MarshalizerMock) Unmarshal(obj interface{}, buff []byte) error {
if mm.Fail {
return errMockMarshalizer
}

if obj == nil {
return errors.New("nil object to serilize to")
}

if buff == nil {
return errors.New("nil byte buffer to deserialize from")
}

if len(buff) == 0 {
return errors.New("empty byte buffer to deserialize from")
}

return json.Unmarshal(buff, obj)
}
41 changes: 41 additions & 0 deletions core/partitioning/dataSplit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package partitioning

import (
"github.com/ElrondNetwork/elrond-go-sandbox/core"
)

const minimumMaxPacketNum = 1

// DataSplit can split a large slice of byte slices in chunks with len(chunks) <= limit
// It does not marshal the data
type DataSplit struct {
}

// SplitDataInChunks splits the provided data into smaller chunks
// limit is expressed in number of elements
func (ds *DataSplit) SplitDataInChunks(data [][]byte, limit int) ([][][]byte, error) {
if limit < minimumMaxPacketNum {
return nil, core.ErrInvalidValue
}
if data == nil {
return nil, core.ErrNilInputData
}

returningBuff := make([][][]byte, 0)

elements := make([][]byte, 0)
for idx, element := range data {
elements = append(elements, element)

if (idx+1)%limit == 0 {
returningBuff = append(returningBuff, elements)
elements = make([][]byte, 0)
}
}

if len(elements) > 0 {
returningBuff = append(returningBuff, elements)
}

return returningBuff, nil
}
130 changes: 130 additions & 0 deletions core/partitioning/dataSplit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package partitioning_test

import (
"bytes"
"errors"
"fmt"
"testing"

"github.com/ElrondNetwork/elrond-go-sandbox/core"
"github.com/ElrondNetwork/elrond-go-sandbox/core/partitioning"
"github.com/stretchr/testify/assert"
)

func checkExpectedElementsNoUnmarshal(buffer [][]byte, expectedElements [][]byte) error {
if len(buffer) != len(expectedElements) {
return errors.New(fmt.Sprintf("expected %d elements, got %d", len(expectedElements), len(buffer)))
}

for idx, expElem := range expectedElements {
elem := buffer[idx]
if !bytes.Equal(elem, expElem) {
return errors.New(fmt.Sprintf("error at index %d expected %v, got %v", idx, expElem, elem))
}
}

return nil
}

//------- PackDataInChunks

func TestNewNumDataPacker_PackDataInChunksInvalidLimitShouldErr(t *testing.T) {
t.Parallel()

ds := &partitioning.DataSplit{}
buff, err := ds.SplitDataInChunks(make([][]byte, 0), 0)

assert.Equal(t, core.ErrInvalidValue, err)
assert.Nil(t, buff)
}

func TestNewNumDataPacker_PackDataInChunksNilInputDataShouldErr(t *testing.T) {
t.Parallel()

ds := &partitioning.DataSplit{}
buff, err := ds.SplitDataInChunks(nil, 1)

assert.Equal(t, core.ErrNilInputData, err)
assert.Nil(t, buff)
}

func TestNumDataPacker_PackDataInChunksEmptyDataShouldReturnEmpty(t *testing.T) {
t.Parallel()

ds := &partitioning.DataSplit{}
buff, err := ds.SplitDataInChunks(make([][]byte, 0), 1)

assert.Nil(t, err)
assert.Empty(t, buff)
}

func TestNumDataPacker_PackDataInChunksSmallElementsCountShouldPackTogheter(t *testing.T) {
t.Parallel()

numPackets := 1000
ds := &partitioning.DataSplit{}

elem1 := []byte("element1")
elem2 := []byte("element2")
elem3 := []byte("element3")

buffSent, err := ds.SplitDataInChunks([][]byte{elem1, elem2, elem3}, numPackets)

assert.Nil(t, err)
assert.Equal(t, 1, len(buffSent))
assert.Nil(t, checkExpectedElementsNoUnmarshal(buffSent[0], [][]byte{elem1, elem2, elem3}))
}

func TestNumDataPacker_PackDataInChunksEqualElementsShouldPackTogheter(t *testing.T) {
t.Parallel()

numPackets := 3
ds := &partitioning.DataSplit{}

elem1 := []byte("element1")
elem2 := []byte("element2")
elem3 := []byte("element3")

buffSent, err := ds.SplitDataInChunks([][]byte{elem1, elem2, elem3}, numPackets)

assert.Nil(t, err)
assert.Equal(t, 1, len(buffSent))
assert.Nil(t, checkExpectedElementsNoUnmarshal(buffSent[0], [][]byte{elem1, elem2, elem3}))
}

func TestNumDataPacker_PackDataInChunksLargeElementCountShouldPackTogheter(t *testing.T) {
t.Parallel()

numPackets := 2
ds := &partitioning.DataSplit{}

elem1 := []byte("element1")
elem2 := []byte("element2")
elem3 := []byte("element3")

buffSent, err := ds.SplitDataInChunks([][]byte{elem1, elem2, elem3}, numPackets)

assert.Nil(t, err)
assert.Equal(t, 2, len(buffSent))
assert.Nil(t, checkExpectedElementsNoUnmarshal(buffSent[0], [][]byte{elem1, elem2}))
assert.Nil(t, checkExpectedElementsNoUnmarshal(buffSent[1], [][]byte{elem3}))
}

func TestNumDataPacker_PackDataInChunksSingleElementsShouldPackTogheter(t *testing.T) {
t.Parallel()

numPackets := 1
ds := &partitioning.DataSplit{}

elem1 := []byte("element1")
elem2 := []byte("element2")
elem3 := []byte("element3")

buffSent, err := ds.SplitDataInChunks([][]byte{elem1, elem2, elem3}, numPackets)

assert.Nil(t, err)
assert.Equal(t, 3, len(buffSent))
assert.Nil(t, checkExpectedElementsNoUnmarshal(buffSent[0], [][]byte{elem1}))
assert.Nil(t, checkExpectedElementsNoUnmarshal(buffSent[1], [][]byte{elem2}))
assert.Nil(t, checkExpectedElementsNoUnmarshal(buffSent[2], [][]byte{elem3}))
}
Loading

0 comments on commit dc8ee21

Please sign in to comment.