From 9a04bf1ab8cb28bcf3dac8c8735efad2f3d042d7 Mon Sep 17 00:00:00 2001 From: Terence Lim Date: Fri, 18 Nov 2022 13:02:33 +0800 Subject: [PATCH] Support configurable worker count for Fluentd (#288) * Make fluentd worker count configurable --- api/turing/cluster/servicebuilder/fluentd.go | 1 + api/turing/cluster/servicebuilder/fluentd_test.go | 2 ++ api/turing/config/config.go | 3 +++ api/turing/config/config_test.go | 4 ++++ api/turing/config/testdata/config-1.yaml | 1 + engines/router/compose/fluentd.yaml | 1 + scripts/fluentd-bigquery/.env.template | 3 +++ scripts/fluentd-bigquery/fluent.conf | 1 + 8 files changed, 16 insertions(+) diff --git a/api/turing/cluster/servicebuilder/fluentd.go b/api/turing/cluster/servicebuilder/fluentd.go index ee52d61d0..367cdc7b8 100644 --- a/api/turing/cluster/servicebuilder/fluentd.go +++ b/api/turing/cluster/servicebuilder/fluentd.go @@ -35,6 +35,7 @@ func (sb *clusterSvcBuilder) NewFluentdService( tableSplit := strings.Split(routerVersion.LogConfig.BigQueryConfig.Table, ".") envs := []corev1.EnvVar{ + {Name: "FLUENTD_WORKER_COUNT", Value: strconv.Itoa(fluentdConfig.WorkerCount)}, {Name: "FLUENTD_LOG_LEVEL", Value: "info"}, {Name: "FLUENTD_LOG_PATH", Value: "/cache/log/bq_load_logs.*.buffer"}, {Name: "FLUENTD_GCP_JSON_KEY_PATH", Value: secretMountPath + secretKeyNameRouter}, diff --git a/api/turing/cluster/servicebuilder/fluentd_test.go b/api/turing/cluster/servicebuilder/fluentd_test.go index e399a9283..bc0b42c2a 100644 --- a/api/turing/cluster/servicebuilder/fluentd_test.go +++ b/api/turing/cluster/servicebuilder/fluentd_test.go @@ -33,6 +33,7 @@ func TestNewFluentdService(t *testing.T) { Image: "fluentdimage:1.0.0", Tag: "fluentd-tag", FlushIntervalSeconds: 30, + WorkerCount: 1, } project := &mlp.Project{ @@ -64,6 +65,7 @@ func TestNewFluentdService(t *testing.T) { "team": "test-team", }, Envs: []corev1.EnvVar{ + {Name: "FLUENTD_WORKER_COUNT", Value: "1"}, {Name: "FLUENTD_LOG_LEVEL", Value: "info"}, {Name: "FLUENTD_LOG_PATH", Value: "/cache/log/bq_load_logs.*.buffer"}, {Name: "FLUENTD_GCP_JSON_KEY_PATH", Value: "/var/secret/router-service-account.json"}, diff --git a/api/turing/config/config.go b/api/turing/config/config.go index 2065674b1..501e0591f 100644 --- a/api/turing/config/config.go +++ b/api/turing/config/config.go @@ -301,6 +301,8 @@ type FluentdConfig struct { Tag string // Flush interval seconds - value determined by load job frequency to BQ FlushIntervalSeconds int + // No. of fluentd workers to launch for utilizing multiple CPU, useful to tune for high traffic + WorkerCount int } // KafkaConfig captures the defaults used by Turing Router when result logger is set to kafka @@ -491,6 +493,7 @@ func setDefaultValues(v *viper.Viper) { v.SetDefault("RouterDefaults::FluentdConfig::Image", "") v.SetDefault("RouterDefaults::FluentdConfig::Tag", "turing-result.log") v.SetDefault("RouterDefaults::FluentdConfig::FlushIntervalSeconds", "90") + v.SetDefault("RouterDefaults::FluentdConfig::WorkerCount", "1") v.SetDefault("RouterDefaults::Experiment", map[string]interface{}{}) v.SetDefault("RouterDefaults::KafkaConfig::MaxMessageBytes", "1048588") v.SetDefault("RouterDefaults::KafkaConfig::CompressionType", "none") diff --git a/api/turing/config/config_test.go b/api/turing/config/config_test.go index b6c2dbaf8..55ee3050b 100644 --- a/api/turing/config/config_test.go +++ b/api/turing/config/config_test.go @@ -147,6 +147,7 @@ func TestLoad(t *testing.T) { FluentdConfig: &config.FluentdConfig{ Tag: "turing-result.log", FlushIntervalSeconds: 90, + WorkerCount: 1, }, KafkaConfig: &config.KafkaConfig{ MaxMessageBytes: 1048588, @@ -212,6 +213,7 @@ func TestLoad(t *testing.T) { FluentdConfig: &config.FluentdConfig{ Tag: "turing-result.log", FlushIntervalSeconds: 60, + WorkerCount: 2, }, KafkaConfig: &config.KafkaConfig{ MaxMessageBytes: 1048588, @@ -297,6 +299,7 @@ func TestLoad(t *testing.T) { FluentdConfig: &config.FluentdConfig{ Tag: "turing-result.log", FlushIntervalSeconds: 90, + WorkerCount: 2, }, ExperimentEnginePlugins: map[string]*config.ExperimentEngineConfig{ "red": { @@ -414,6 +417,7 @@ func TestLoad(t *testing.T) { FluentdConfig: &config.FluentdConfig{ Tag: "turing-result.log", FlushIntervalSeconds: 90, + WorkerCount: 2, }, ExperimentEnginePlugins: map[string]*config.ExperimentEngineConfig{ "red": { diff --git a/api/turing/config/testdata/config-1.yaml b/api/turing/config/testdata/config-1.yaml index 9b35298a8..29f6534ab 100644 --- a/api/turing/config/testdata/config-1.yaml +++ b/api/turing/config/testdata/config-1.yaml @@ -20,6 +20,7 @@ KnativeServiceDefaults: RouterDefaults: FluentdConfig: FlushIntervalSeconds: 60 + WorkerCount: 2 Sentry: Enabled: true Labels: diff --git a/engines/router/compose/fluentd.yaml b/engines/router/compose/fluentd.yaml index ff019fc50..bedaa9bb4 100644 --- a/engines/router/compose/fluentd.yaml +++ b/engines/router/compose/fluentd.yaml @@ -4,6 +4,7 @@ services: fluentd: image: asia.gcr.io/gcp-project-id/fluentd-bigquery:latest environment: + - FLUENTD_WORKER_COUNT=1 - FLUENTD_LOG_LEVEL=info - FLUENTD_LOG_PATH=/fluentd/log/bq_load_logs.*.buffer - FLUENTD_BUFFER_LIMIT=3g diff --git a/scripts/fluentd-bigquery/.env.template b/scripts/fluentd-bigquery/.env.template index 3feaf50c4..165196abe 100644 --- a/scripts/fluentd-bigquery/.env.template +++ b/scripts/fluentd-bigquery/.env.template @@ -2,6 +2,9 @@ # https://docs.fluentd.org/deployment/logging FLUENTD_LOG_LEVEL= +# The number of multi-process workers to configure +FLUENTD_WORKER_COUNT= + # The file buffer limit FLUENTD_BUFFER_LIMIT= diff --git a/scripts/fluentd-bigquery/fluent.conf b/scripts/fluentd-bigquery/fluent.conf index d0a0f9d88..44f834c54 100644 --- a/scripts/fluentd-bigquery/fluent.conf +++ b/scripts/fluentd-bigquery/fluent.conf @@ -1,6 +1,7 @@ # Set fluentd log level to error log_level "#{ENV['FLUENTD_LOG_LEVEL']}" + workers "#{ENV['FLUENTD_WORKER_COUNT']}" # Accept HTTP input