From 4c8e55a85c32786d0eba2f7a81d28647cfab0201 Mon Sep 17 00:00:00 2001 From: agnusmor <100322135+agnusmor@users.noreply.github.com> Date: Fri, 7 Jun 2024 10:18:27 +0200 Subject: [PATCH] Cherry-pick #3669: Add WriteTimeout config parameter to StreamServer (#3690) (#3701) * Add WriteTimeout config parameter to StreamServer (#3669) * Add WriteTimeout config parameter to StreamServer. Update DS library * update doc * update default value for StreamServer.WriteTimeout config parameter. Increase buffer for datastream channel * fix doc * fix config test * fix doc --- config/config_test.go | 4 +++ config/default.go | 1 + .../environments/local/local.node.config.toml | 1 + docs/config-file/node-config-doc.html | 4 ++- docs/config-file/node-config-doc.md | 27 +++++++++++++++++++ docs/config-file/node-config-schema.json | 10 +++++++ go.mod | 2 +- go.sum | 4 +-- sequencer/config.go | 2 ++ sequencer/l2block.go | 4 +-- sequencer/sequencer.go | 4 +-- test/config/debug.node.config.toml | 1 + test/config/test.node.config.toml | 1 + tools/datastreamer/config/config.go | 3 +++ tools/datastreamer/config/tool.config.toml | 1 + tools/datastreamer/main.go | 2 +- 16 files changed, 62 insertions(+), 9 deletions(-) diff --git a/config/config_test.go b/config/config_test.go index f5e0f5b843..a032fa371a 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -169,6 +169,10 @@ func Test_Defaults(t *testing.T) { path: "Sequencer.StreamServer.Version", expectedValue: uint8(0), }, + { + path: "Sequencer.StreamServer.WriteTimeout", + expectedValue: types.NewDuration(5 * time.Second), + }, { path: "Sequencer.StreamServer.Enabled", expectedValue: false, diff --git a/config/default.go b/config/default.go index a9405c28d9..8e347c238a 100644 --- a/config/default.go +++ b/config/default.go @@ -163,6 +163,7 @@ StateConsistencyCheckInterval = "5s" Port = 0 Filename = "" Version = 0 + WriteTimeout = "5s" Enabled = false [SequenceSender] diff --git a/config/environments/local/local.node.config.toml b/config/environments/local/local.node.config.toml index 11077845ca..c403a0718a 100644 --- a/config/environments/local/local.node.config.toml +++ b/config/environments/local/local.node.config.toml @@ -112,6 +112,7 @@ StateConsistencyCheckInterval = "5s" [Sequencer.StreamServer] Port = 0 Filename = "" + WriteTimeout = "5s" Enabled = false [SequenceSender] diff --git a/docs/config-file/node-config-doc.html b/docs/config-file/node-config-doc.html index 6c0d86709e..fddbc1fce1 100644 --- a/docs/config-file/node-config-doc.html +++ b/docs/config-file/node-config-doc.html @@ -56,7 +56,9 @@
"300ms"
 

Default: 0Type: integer

HaltOnBatchNumber specifies the batch number where the Sequencer will stop to process more transactions and generate new batches.
The Sequencer will halt after it closes the batch equal to this number


Default: falseType: boolean

SequentialBatchSanityCheck indicates if the reprocess of a closed batch (sanity check) must be done in a
sequential way (instead than in parallel)


Default: falseType: boolean

SequentialProcessL2Block indicates if the processing of a L2 Block must be done in the same finalizer go func instead
in the processPendingL2Blocks go func


Metrics is the config for the sequencer metrics
Default: "1h0m0s"Type: string

Interval is the interval of time to calculate sequencer metrics


Examples:

"1m"
 
"300ms"
-

Default: trueType: boolean

EnableLog is a flag to enable/disable metrics logs


StreamServerCfg is the config for the stream server
Default: 0Type: integer

Port to listen on


Default: ""Type: string

Filename of the binary data file


Default: 0Type: integer

Version of the binary data file


Default: 0Type: integer

ChainID is the chain ID


Default: falseType: boolean

Enabled is a flag to enable/disable the data streamer


Log is the log configuration
Default: ""Type: enum (of string)

Must be one of:

  • "production"
  • "development"

Default: ""Type: enum (of string)

Must be one of:

  • "debug"
  • "info"
  • "warn"
  • "error"
  • "dpanic"
  • "panic"
  • "fatal"

Type: array of string

Each item of this array must be:


Default: 0Type: integer

UpgradeEtrogBatchNumber is the batch number of the upgrade etrog


Configuration of the sequence sender service
Default: "5s"Type: string

WaitPeriodSendSequence is the time the sequencer waits until
trying to send a sequence to L1


Examples:

"1m"
+

Default: trueType: boolean

EnableLog is a flag to enable/disable metrics logs


StreamServerCfg is the config for the stream server
Default: 0Type: integer

Port to listen on


Default: ""Type: string

Filename of the binary data file


Default: 0Type: integer

Version of the binary data file


Default: 0Type: integer

ChainID is the chain ID


Default: falseType: boolean

Enabled is a flag to enable/disable the data streamer


Log is the log configuration
Default: ""Type: enum (of string)

Must be one of:

  • "production"
  • "development"

Default: ""Type: enum (of string)

Must be one of:

  • "debug"
  • "info"
  • "warn"
  • "error"
  • "dpanic"
  • "panic"
  • "fatal"

Type: array of string

Each item of this array must be:


Default: 0Type: integer

UpgradeEtrogBatchNumber is the batch number of the upgrade etrog


Default: "5s"Type: string

WriteTimeout is the TCP write timeout when sending data to a datastream client


Examples:

"1m"
+
"300ms"
+

Configuration of the sequence sender service
Default: "5s"Type: string

WaitPeriodSendSequence is the time the sequencer waits until
trying to send a sequence to L1


Examples:

"1m"
 
"300ms"
 

Default: "5s"Type: string

LastBatchVirtualizationTimeMaxWaitPeriod is time since sequences should be sent


Examples:

"1m"
 
"300ms"
diff --git a/docs/config-file/node-config-doc.md b/docs/config-file/node-config-doc.md
index f935840de2..5a6ffce7a2 100644
--- a/docs/config-file/node-config-doc.md
+++ b/docs/config-file/node-config-doc.md
@@ -2502,6 +2502,7 @@ EnableLog=true
 | - [Enabled](#Sequencer_StreamServer_Enabled )                                 | No      | boolean | No         | -          | Enabled is a flag to enable/disable the data streamer            |
 | - [Log](#Sequencer_StreamServer_Log )                                         | No      | object  | No         | -          | Log is the log configuration                                     |
 | - [UpgradeEtrogBatchNumber](#Sequencer_StreamServer_UpgradeEtrogBatchNumber ) | No      | integer | No         | -          | UpgradeEtrogBatchNumber is the batch number of the upgrade etrog |
+| - [WriteTimeout](#Sequencer_StreamServer_WriteTimeout )                       | No      | string  | No         | -          | Duration                                                         |
 
 #### 10.9.1. `Sequencer.StreamServer.Port`
 
@@ -2639,6 +2640,32 @@ Must be one of:
 UpgradeEtrogBatchNumber=0
 ```
 
+#### 10.9.8. `Sequencer.StreamServer.WriteTimeout`
+
+**Title:** Duration
+
+**Type:** : `string`
+
+**Default:** `"5s"`
+
+**Description:** WriteTimeout is the TCP write timeout when sending data to a datastream client
+
+**Examples:** 
+
+```json
+"1m"
+```
+
+```json
+"300ms"
+```
+
+**Example setting the default value** ("5s"):
+```
+[Sequencer.StreamServer]
+WriteTimeout="5s"
+```
+
 ## 11. `[SequenceSender]`
 
 **Type:** : `object`
diff --git a/docs/config-file/node-config-schema.json b/docs/config-file/node-config-schema.json
index 05fcba8522..e001e2fd27 100644
--- a/docs/config-file/node-config-schema.json
+++ b/docs/config-file/node-config-schema.json
@@ -1003,6 +1003,16 @@
 							"type": "integer",
 							"description": "UpgradeEtrogBatchNumber is the batch number of the upgrade etrog",
 							"default": 0
+						},
+						"WriteTimeout": {
+							"type": "string",
+							"title": "Duration",
+							"description": "WriteTimeout is the TCP write timeout when sending data to a datastream client",
+							"default": "5s",
+							"examples": [
+								"1m",
+								"300ms"
+							]
 						}
 					},
 					"additionalProperties": false,
diff --git a/go.mod b/go.mod
index 69ae3792c6..74aea43eb2 100644
--- a/go.mod
+++ b/go.mod
@@ -3,7 +3,7 @@ module github.com/0xPolygonHermez/zkevm-node
 go 1.21
 
 require (
-	github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240527085154-ca3561dd370b
+	github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-RC4
 	github.com/didip/tollbooth/v6 v6.1.2
 	github.com/dop251/goja v0.0.0-20230806174421-c933cf95e127
 	github.com/ethereum/go-ethereum v1.13.14
diff --git a/go.sum b/go.sum
index 6b7b20b679..7731fafe67 100644
--- a/go.sum
+++ b/go.sum
@@ -39,8 +39,8 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9
 dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk=
 dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
 dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
-github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240527085154-ca3561dd370b h1:BzQRXbSnW7BsFvJrnZbCgnxD5+nCGyrYUgqH+3vsnrM=
-github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240527085154-ca3561dd370b/go.mod h1:0QkAXcFa92mFJrCbN3UPUJGJYes851yEgYHLONnaosE=
+github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-RC4 h1:+4K+xSzv0ImbK30B/T9FauNTrTFUmWcNKYhIgwsE4C4=
+github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-RC4/go.mod h1:0QkAXcFa92mFJrCbN3UPUJGJYes851yEgYHLONnaosE=
 github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
 github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7OZ575w+acHgRric5iCyQh+xv+KJ4HB8=
 github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8=
diff --git a/sequencer/config.go b/sequencer/config.go
index 03aeeb740b..5f34cad1d9 100644
--- a/sequencer/config.go
+++ b/sequencer/config.go
@@ -52,6 +52,8 @@ type StreamServerCfg struct {
 	Log log.Config `mapstructure:"Log"`
 	// UpgradeEtrogBatchNumber is the batch number of the upgrade etrog
 	UpgradeEtrogBatchNumber uint64 `mapstructure:"UpgradeEtrogBatchNumber"`
+	// WriteTimeout is the TCP write timeout when sending data to a datastream client
+	WriteTimeout types.Duration `mapstructure:"WriteTimeout"`
 }
 
 // FinalizerCfg contains the finalizer's configuration properties
diff --git a/sequencer/l2block.go b/sequencer/l2block.go
index 7cc21fc928..511d233988 100644
--- a/sequencer/l2block.go
+++ b/sequencer/l2block.go
@@ -244,7 +244,7 @@ func (f *finalizer) processL2Block(ctx context.Context, l2Block *L2Block) error
 		if subOverflow { // Sanity check, this cannot happen as reservedZKCounters should be >= that usedZKCounters
 			return fmt.Errorf("error subtracting L2 block %d [%d] needed resources from the batch %d, overflow resource: %s, batch bytes: %d, L2 block bytes: %d, counters: {batch: %s, used: %s, reserved: %s, needed: %s, high: %s}",
 				blockResponse.BlockNumber, l2Block.trackingNum, l2Block.batch.batchNumber, overflowResource, l2Block.batch.finalRemainingResources.Bytes, batchL2DataSize,
-				f.logZKCounters(l2Block.batch.finalRemainingResources.ZKCounters), f.logZKCounters(batchResponse.UsedZkCounters), f.logZKCounters(batchResponse.ReservedZkCounters), f.logZKCounters(neededZKCounters), f.logZKCounters(l2Block.batch.imHighReservedZKCounters))
+				f.logZKCounters(l2Block.batch.finalRemainingResources.ZKCounters), f.logZKCounters(batchResponse.UsedZkCounters), f.logZKCounters(batchResponse.ReservedZkCounters), f.logZKCounters(neededZKCounters), f.logZKCounters(l2Block.batch.finalHighReservedZKCounters))
 		}
 
 		l2Block.batch.finalHighReservedZKCounters = newHighZKCounters
@@ -252,7 +252,7 @@ func (f *finalizer) processL2Block(ctx context.Context, l2Block *L2Block) error
 	} else {
 		overflowLog := fmt.Sprintf("L2 block %d [%d] needed resources exceeds the remaining batch %d resources, overflow resource: %s, batch bytes: %d, L2 block bytes: %d, counters: {batch: %s, used: %s, reserved: %s, needed: %s, high: %s}",
 			blockResponse.BlockNumber, l2Block.trackingNum, l2Block.batch.batchNumber, overflowResource, l2Block.batch.finalRemainingResources.Bytes, batchL2DataSize,
-			f.logZKCounters(l2Block.batch.finalRemainingResources.ZKCounters), f.logZKCounters(batchResponse.UsedZkCounters), f.logZKCounters(batchResponse.ReservedZkCounters), f.logZKCounters(neededZKCounters), f.logZKCounters(l2Block.batch.imHighReservedZKCounters))
+			f.logZKCounters(l2Block.batch.finalRemainingResources.ZKCounters), f.logZKCounters(batchResponse.UsedZkCounters), f.logZKCounters(batchResponse.ReservedZkCounters), f.logZKCounters(neededZKCounters), f.logZKCounters(l2Block.batch.finalHighReservedZKCounters))
 
 		f.LogEvent(ctx, event.Level_Warning, event.EventID_ReservedZKCountersOverflow, overflowLog, nil)
 
diff --git a/sequencer/sequencer.go b/sequencer/sequencer.go
index c4867005ce..3f57d9cc85 100644
--- a/sequencer/sequencer.go
+++ b/sequencer/sequencer.go
@@ -17,7 +17,7 @@ import (
 )
 
 const (
-	datastreamChannelBufferSize = 20
+	datastreamChannelBufferSize = 50
 )
 
 // Sequencer represents a sequencer
@@ -72,7 +72,7 @@ func (s *Sequencer) Start(ctx context.Context) {
 
 	// Start stream server if enabled
 	if s.cfg.StreamServer.Enabled {
-		s.streamServer, err = datastreamer.NewServer(s.cfg.StreamServer.Port, s.cfg.StreamServer.Version, s.cfg.StreamServer.ChainID, state.StreamTypeSequencer, s.cfg.StreamServer.Filename, &s.cfg.StreamServer.Log)
+		s.streamServer, err = datastreamer.NewServer(s.cfg.StreamServer.Port, s.cfg.StreamServer.Version, s.cfg.StreamServer.ChainID, state.StreamTypeSequencer, s.cfg.StreamServer.Filename, s.cfg.StreamServer.WriteTimeout.Duration, &s.cfg.StreamServer.Log)
 		if err != nil {
 			log.Fatalf("failed to create stream server, error: %v", err)
 		}
diff --git a/test/config/debug.node.config.toml b/test/config/debug.node.config.toml
index a6f43563aa..94f4d05ed4 100644
--- a/test/config/debug.node.config.toml
+++ b/test/config/debug.node.config.toml
@@ -127,6 +127,7 @@ StateConsistencyCheckInterval = "5s"
 		Filename = "/datastreamer/datastream.bin"
 		Version = 3
 		ChainID = 1337
+		WriteTimeout = "5s"
 		Enabled = false
 
 [SequenceSender]
diff --git a/test/config/test.node.config.toml b/test/config/test.node.config.toml
index fd6fa9b65d..11b4f6c2e3 100644
--- a/test/config/test.node.config.toml
+++ b/test/config/test.node.config.toml
@@ -128,6 +128,7 @@ StateConsistencyCheckInterval = "5s"
 		Filename = "/datastreamer/datastream.bin"
 		Version = 3
 		ChainID = 1337
+		WriteTimeout = "5s"
 		Enabled = true
 
 [SequenceSender]
diff --git a/tools/datastreamer/config/config.go b/tools/datastreamer/config/config.go
index 0acb225cf9..b6c841e591 100644
--- a/tools/datastreamer/config/config.go
+++ b/tools/datastreamer/config/config.go
@@ -7,6 +7,7 @@ import (
 
 	"github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer"
 	"github.com/0xPolygonHermez/zkevm-data-streamer/log"
+	"github.com/0xPolygonHermez/zkevm-node/config/types"
 	"github.com/0xPolygonHermez/zkevm-node/db"
 	"github.com/0xPolygonHermez/zkevm-node/state/runtime/executor"
 	"github.com/mitchellh/mapstructure"
@@ -48,6 +49,8 @@ type StreamServerCfg struct {
 	Log log.Config `mapstructure:"Log"`
 	// UpgradeEtrogBatchNumber is the batch number of the upgrade etrog
 	UpgradeEtrogBatchNumber uint64 `mapstructure:"UpgradeEtrogBatchNumber"`
+	// WriteTimeout is the TCP write timeout when sending data to a datastream client
+	WriteTimeout types.Duration `mapstructure:"WriteTimeout"`
 }
 
 // Config is the configuration for the tool
diff --git a/tools/datastreamer/config/tool.config.toml b/tools/datastreamer/config/tool.config.toml
index 0e8fc09fc9..f5530b8271 100644
--- a/tools/datastreamer/config/tool.config.toml
+++ b/tools/datastreamer/config/tool.config.toml
@@ -7,6 +7,7 @@ Port = 6901
 Filename = "datastream.bin"
 Version = 3
 ChainID = 1440
+WriteTimeout = "5s"
 UpgradeEtrogBatchNumber = 0
 
 [StateDB]
diff --git a/tools/datastreamer/main.go b/tools/datastreamer/main.go
index 10d816254d..4ac7dab2dd 100644
--- a/tools/datastreamer/main.go
+++ b/tools/datastreamer/main.go
@@ -184,7 +184,7 @@ func main() {
 
 func initializeStreamServer(c *config.Config) (*datastreamer.StreamServer, error) {
 	// Create a stream server
-	streamServer, err := datastreamer.NewServer(c.Offline.Port, c.Offline.Version, c.Offline.ChainID, state.StreamTypeSequencer, c.Offline.Filename, &c.Log)
+	streamServer, err := datastreamer.NewServer(c.Offline.Port, c.Offline.Version, c.Offline.ChainID, state.StreamTypeSequencer, c.Offline.Filename, c.Offline.WriteTimeout.Duration, &c.Log)
 	if err != nil {
 		return nil, err
 	}