Skip to content

Commit

Permalink
Cherry-pick #3669: Add WriteTimeout config parameter to StreamServer (#…
Browse files Browse the repository at this point in the history
…3690) (#3701)

* Add WriteTimeout config parameter to StreamServer (#3669)

* Add WriteTimeout config parameter to StreamServer. Update DS library

* update doc

* update default value for StreamServer.WriteTimeout config parameter. Increase buffer for datastream channel

* fix doc

* fix config test

* fix doc
  • Loading branch information
agnusmor authored Jun 7, 2024
1 parent 8f81a8d commit 4c8e55a
Show file tree
Hide file tree
Showing 16 changed files with 62 additions and 9 deletions.
4 changes: 4 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ func Test_Defaults(t *testing.T) {
path: "Sequencer.StreamServer.Version",
expectedValue: uint8(0),
},
{
path: "Sequencer.StreamServer.WriteTimeout",
expectedValue: types.NewDuration(5 * time.Second),
},
{
path: "Sequencer.StreamServer.Enabled",
expectedValue: false,
Expand Down
1 change: 1 addition & 0 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ StateConsistencyCheckInterval = "5s"
Port = 0
Filename = ""
Version = 0
WriteTimeout = "5s"
Enabled = false
[SequenceSender]
Expand Down
1 change: 1 addition & 0 deletions config/environments/local/local.node.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ StateConsistencyCheckInterval = "5s"
[Sequencer.StreamServer]
Port = 0
Filename = ""
WriteTimeout = "5s"
Enabled = false

[SequenceSender]
Expand Down
4 changes: 3 additions & 1 deletion docs/config-file/node-config-doc.html

Large diffs are not rendered by default.

27 changes: 27 additions & 0 deletions docs/config-file/node-config-doc.md
Original file line number Diff line number Diff line change
Expand Up @@ -2502,6 +2502,7 @@ EnableLog=true
| - [Enabled](#Sequencer_StreamServer_Enabled ) | No | boolean | No | - | Enabled is a flag to enable/disable the data streamer |
| - [Log](#Sequencer_StreamServer_Log ) | No | object | No | - | Log is the log configuration |
| - [UpgradeEtrogBatchNumber](#Sequencer_StreamServer_UpgradeEtrogBatchNumber ) | No | integer | No | - | UpgradeEtrogBatchNumber is the batch number of the upgrade etrog |
| - [WriteTimeout](#Sequencer_StreamServer_WriteTimeout ) | No | string | No | - | Duration |

#### <a name="Sequencer_StreamServer_Port"></a>10.9.1. `Sequencer.StreamServer.Port`

Expand Down Expand Up @@ -2639,6 +2640,32 @@ Must be one of:
UpgradeEtrogBatchNumber=0
```

#### <a name="Sequencer_StreamServer_WriteTimeout"></a>10.9.8. `Sequencer.StreamServer.WriteTimeout`

**Title:** Duration

**Type:** : `string`

**Default:** `"5s"`

**Description:** WriteTimeout is the TCP write timeout when sending data to a datastream client

**Examples:**

```json
"1m"
```

```json
"300ms"
```

**Example setting the default value** ("5s"):
```
[Sequencer.StreamServer]
WriteTimeout="5s"
```

## <a name="SequenceSender"></a>11. `[SequenceSender]`

**Type:** : `object`
Expand Down
10 changes: 10 additions & 0 deletions docs/config-file/node-config-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1003,6 +1003,16 @@
"type": "integer",
"description": "UpgradeEtrogBatchNumber is the batch number of the upgrade etrog",
"default": 0
},
"WriteTimeout": {
"type": "string",
"title": "Duration",
"description": "WriteTimeout is the TCP write timeout when sending data to a datastream client",
"default": "5s",
"examples": [
"1m",
"300ms"
]
}
},
"additionalProperties": false,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/0xPolygonHermez/zkevm-node
go 1.21

require (
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240527085154-ca3561dd370b
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-RC4
github.com/didip/tollbooth/v6 v6.1.2
github.com/dop251/goja v0.0.0-20230806174421-c933cf95e127
github.com/ethereum/go-ethereum v1.13.14
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9
dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk=
dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240527085154-ca3561dd370b h1:BzQRXbSnW7BsFvJrnZbCgnxD5+nCGyrYUgqH+3vsnrM=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240527085154-ca3561dd370b/go.mod h1:0QkAXcFa92mFJrCbN3UPUJGJYes851yEgYHLONnaosE=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-RC4 h1:+4K+xSzv0ImbK30B/T9FauNTrTFUmWcNKYhIgwsE4C4=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-RC4/go.mod h1:0QkAXcFa92mFJrCbN3UPUJGJYes851yEgYHLONnaosE=
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7OZ575w+acHgRric5iCyQh+xv+KJ4HB8=
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8=
Expand Down
2 changes: 2 additions & 0 deletions sequencer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type StreamServerCfg struct {
Log log.Config `mapstructure:"Log"`
// UpgradeEtrogBatchNumber is the batch number of the upgrade etrog
UpgradeEtrogBatchNumber uint64 `mapstructure:"UpgradeEtrogBatchNumber"`
// WriteTimeout is the TCP write timeout when sending data to a datastream client
WriteTimeout types.Duration `mapstructure:"WriteTimeout"`
}

// FinalizerCfg contains the finalizer's configuration properties
Expand Down
4 changes: 2 additions & 2 deletions sequencer/l2block.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,15 +244,15 @@ func (f *finalizer) processL2Block(ctx context.Context, l2Block *L2Block) error
if subOverflow { // Sanity check, this cannot happen as reservedZKCounters should be >= that usedZKCounters
return fmt.Errorf("error subtracting L2 block %d [%d] needed resources from the batch %d, overflow resource: %s, batch bytes: %d, L2 block bytes: %d, counters: {batch: %s, used: %s, reserved: %s, needed: %s, high: %s}",
blockResponse.BlockNumber, l2Block.trackingNum, l2Block.batch.batchNumber, overflowResource, l2Block.batch.finalRemainingResources.Bytes, batchL2DataSize,
f.logZKCounters(l2Block.batch.finalRemainingResources.ZKCounters), f.logZKCounters(batchResponse.UsedZkCounters), f.logZKCounters(batchResponse.ReservedZkCounters), f.logZKCounters(neededZKCounters), f.logZKCounters(l2Block.batch.imHighReservedZKCounters))
f.logZKCounters(l2Block.batch.finalRemainingResources.ZKCounters), f.logZKCounters(batchResponse.UsedZkCounters), f.logZKCounters(batchResponse.ReservedZkCounters), f.logZKCounters(neededZKCounters), f.logZKCounters(l2Block.batch.finalHighReservedZKCounters))
}

l2Block.batch.finalHighReservedZKCounters = newHighZKCounters
l2Block.highReservedZKCounters = l2Block.batch.finalHighReservedZKCounters
} else {
overflowLog := fmt.Sprintf("L2 block %d [%d] needed resources exceeds the remaining batch %d resources, overflow resource: %s, batch bytes: %d, L2 block bytes: %d, counters: {batch: %s, used: %s, reserved: %s, needed: %s, high: %s}",
blockResponse.BlockNumber, l2Block.trackingNum, l2Block.batch.batchNumber, overflowResource, l2Block.batch.finalRemainingResources.Bytes, batchL2DataSize,
f.logZKCounters(l2Block.batch.finalRemainingResources.ZKCounters), f.logZKCounters(batchResponse.UsedZkCounters), f.logZKCounters(batchResponse.ReservedZkCounters), f.logZKCounters(neededZKCounters), f.logZKCounters(l2Block.batch.imHighReservedZKCounters))
f.logZKCounters(l2Block.batch.finalRemainingResources.ZKCounters), f.logZKCounters(batchResponse.UsedZkCounters), f.logZKCounters(batchResponse.ReservedZkCounters), f.logZKCounters(neededZKCounters), f.logZKCounters(l2Block.batch.finalHighReservedZKCounters))

f.LogEvent(ctx, event.Level_Warning, event.EventID_ReservedZKCountersOverflow, overflowLog, nil)

Expand Down
4 changes: 2 additions & 2 deletions sequencer/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

const (
datastreamChannelBufferSize = 20
datastreamChannelBufferSize = 50
)

// Sequencer represents a sequencer
Expand Down Expand Up @@ -72,7 +72,7 @@ func (s *Sequencer) Start(ctx context.Context) {

// Start stream server if enabled
if s.cfg.StreamServer.Enabled {
s.streamServer, err = datastreamer.NewServer(s.cfg.StreamServer.Port, s.cfg.StreamServer.Version, s.cfg.StreamServer.ChainID, state.StreamTypeSequencer, s.cfg.StreamServer.Filename, &s.cfg.StreamServer.Log)
s.streamServer, err = datastreamer.NewServer(s.cfg.StreamServer.Port, s.cfg.StreamServer.Version, s.cfg.StreamServer.ChainID, state.StreamTypeSequencer, s.cfg.StreamServer.Filename, s.cfg.StreamServer.WriteTimeout.Duration, &s.cfg.StreamServer.Log)
if err != nil {
log.Fatalf("failed to create stream server, error: %v", err)
}
Expand Down
1 change: 1 addition & 0 deletions test/config/debug.node.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ StateConsistencyCheckInterval = "5s"
Filename = "/datastreamer/datastream.bin"
Version = 3
ChainID = 1337
WriteTimeout = "5s"
Enabled = false

[SequenceSender]
Expand Down
1 change: 1 addition & 0 deletions test/config/test.node.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ StateConsistencyCheckInterval = "5s"
Filename = "/datastreamer/datastream.bin"
Version = 3
ChainID = 1337
WriteTimeout = "5s"
Enabled = true

[SequenceSender]
Expand Down
3 changes: 3 additions & 0 deletions tools/datastreamer/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer"
"github.com/0xPolygonHermez/zkevm-data-streamer/log"
"github.com/0xPolygonHermez/zkevm-node/config/types"
"github.com/0xPolygonHermez/zkevm-node/db"
"github.com/0xPolygonHermez/zkevm-node/state/runtime/executor"
"github.com/mitchellh/mapstructure"
Expand Down Expand Up @@ -48,6 +49,8 @@ type StreamServerCfg struct {
Log log.Config `mapstructure:"Log"`
// UpgradeEtrogBatchNumber is the batch number of the upgrade etrog
UpgradeEtrogBatchNumber uint64 `mapstructure:"UpgradeEtrogBatchNumber"`
// WriteTimeout is the TCP write timeout when sending data to a datastream client
WriteTimeout types.Duration `mapstructure:"WriteTimeout"`
}

// Config is the configuration for the tool
Expand Down
1 change: 1 addition & 0 deletions tools/datastreamer/config/tool.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Port = 6901
Filename = "datastream.bin"
Version = 3
ChainID = 1440
WriteTimeout = "5s"
UpgradeEtrogBatchNumber = 0

[StateDB]
Expand Down
2 changes: 1 addition & 1 deletion tools/datastreamer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func main() {

func initializeStreamServer(c *config.Config) (*datastreamer.StreamServer, error) {
// Create a stream server
streamServer, err := datastreamer.NewServer(c.Offline.Port, c.Offline.Version, c.Offline.ChainID, state.StreamTypeSequencer, c.Offline.Filename, &c.Log)
streamServer, err := datastreamer.NewServer(c.Offline.Port, c.Offline.Version, c.Offline.ChainID, state.StreamTypeSequencer, c.Offline.Filename, c.Offline.WriteTimeout.Duration, &c.Log)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 4c8e55a

Please sign in to comment.