Skip to content

Commit

Permalink
Support configurable worker count for Fluentd (#288)
Browse files Browse the repository at this point in the history
* Make fluentd worker count configurable
  • Loading branch information
terryyylim authored Nov 18, 2022
1 parent 0e00ced commit 9a04bf1
Show file tree
Hide file tree
Showing 8 changed files with 16 additions and 0 deletions.
1 change: 1 addition & 0 deletions api/turing/cluster/servicebuilder/fluentd.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func (sb *clusterSvcBuilder) NewFluentdService(

tableSplit := strings.Split(routerVersion.LogConfig.BigQueryConfig.Table, ".")
envs := []corev1.EnvVar{
{Name: "FLUENTD_WORKER_COUNT", Value: strconv.Itoa(fluentdConfig.WorkerCount)},
{Name: "FLUENTD_LOG_LEVEL", Value: "info"},
{Name: "FLUENTD_LOG_PATH", Value: "/cache/log/bq_load_logs.*.buffer"},
{Name: "FLUENTD_GCP_JSON_KEY_PATH", Value: secretMountPath + secretKeyNameRouter},
Expand Down
2 changes: 2 additions & 0 deletions api/turing/cluster/servicebuilder/fluentd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func TestNewFluentdService(t *testing.T) {
Image: "fluentdimage:1.0.0",
Tag: "fluentd-tag",
FlushIntervalSeconds: 30,
WorkerCount: 1,
}

project := &mlp.Project{
Expand Down Expand Up @@ -64,6 +65,7 @@ func TestNewFluentdService(t *testing.T) {
"team": "test-team",
},
Envs: []corev1.EnvVar{
{Name: "FLUENTD_WORKER_COUNT", Value: "1"},
{Name: "FLUENTD_LOG_LEVEL", Value: "info"},
{Name: "FLUENTD_LOG_PATH", Value: "/cache/log/bq_load_logs.*.buffer"},
{Name: "FLUENTD_GCP_JSON_KEY_PATH", Value: "/var/secret/router-service-account.json"},
Expand Down
3 changes: 3 additions & 0 deletions api/turing/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,8 @@ type FluentdConfig struct {
Tag string
// Flush interval seconds - value determined by load job frequency to BQ
FlushIntervalSeconds int
// No. of fluentd workers to launch for utilizing multiple CPU, useful to tune for high traffic
WorkerCount int
}

// KafkaConfig captures the defaults used by Turing Router when result logger is set to kafka
Expand Down Expand Up @@ -491,6 +493,7 @@ func setDefaultValues(v *viper.Viper) {
v.SetDefault("RouterDefaults::FluentdConfig::Image", "")
v.SetDefault("RouterDefaults::FluentdConfig::Tag", "turing-result.log")
v.SetDefault("RouterDefaults::FluentdConfig::FlushIntervalSeconds", "90")
v.SetDefault("RouterDefaults::FluentdConfig::WorkerCount", "1")
v.SetDefault("RouterDefaults::Experiment", map[string]interface{}{})
v.SetDefault("RouterDefaults::KafkaConfig::MaxMessageBytes", "1048588")
v.SetDefault("RouterDefaults::KafkaConfig::CompressionType", "none")
Expand Down
4 changes: 4 additions & 0 deletions api/turing/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func TestLoad(t *testing.T) {
FluentdConfig: &config.FluentdConfig{
Tag: "turing-result.log",
FlushIntervalSeconds: 90,
WorkerCount: 1,
},
KafkaConfig: &config.KafkaConfig{
MaxMessageBytes: 1048588,
Expand Down Expand Up @@ -212,6 +213,7 @@ func TestLoad(t *testing.T) {
FluentdConfig: &config.FluentdConfig{
Tag: "turing-result.log",
FlushIntervalSeconds: 60,
WorkerCount: 2,
},
KafkaConfig: &config.KafkaConfig{
MaxMessageBytes: 1048588,
Expand Down Expand Up @@ -297,6 +299,7 @@ func TestLoad(t *testing.T) {
FluentdConfig: &config.FluentdConfig{
Tag: "turing-result.log",
FlushIntervalSeconds: 90,
WorkerCount: 2,
},
ExperimentEnginePlugins: map[string]*config.ExperimentEngineConfig{
"red": {
Expand Down Expand Up @@ -414,6 +417,7 @@ func TestLoad(t *testing.T) {
FluentdConfig: &config.FluentdConfig{
Tag: "turing-result.log",
FlushIntervalSeconds: 90,
WorkerCount: 2,
},
ExperimentEnginePlugins: map[string]*config.ExperimentEngineConfig{
"red": {
Expand Down
1 change: 1 addition & 0 deletions api/turing/config/testdata/config-1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ KnativeServiceDefaults:
RouterDefaults:
FluentdConfig:
FlushIntervalSeconds: 60
WorkerCount: 2
Sentry:
Enabled: true
Labels:
Expand Down
1 change: 1 addition & 0 deletions engines/router/compose/fluentd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ services:
fluentd:
image: asia.gcr.io/gcp-project-id/fluentd-bigquery:latest
environment:
- FLUENTD_WORKER_COUNT=1
- FLUENTD_LOG_LEVEL=info
- FLUENTD_LOG_PATH=/fluentd/log/bq_load_logs.*.buffer
- FLUENTD_BUFFER_LIMIT=3g
Expand Down
3 changes: 3 additions & 0 deletions scripts/fluentd-bigquery/.env.template
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
# https://docs.fluentd.org/deployment/logging
FLUENTD_LOG_LEVEL=

# The number of multi-process workers to configure
FLUENTD_WORKER_COUNT=

# The file buffer limit
FLUENTD_BUFFER_LIMIT=

Expand Down
1 change: 1 addition & 0 deletions scripts/fluentd-bigquery/fluent.conf
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Set fluentd log level to error
<system>
log_level "#{ENV['FLUENTD_LOG_LEVEL']}"
workers "#{ENV['FLUENTD_WORKER_COUNT']}"
</system>

# Accept HTTP input
Expand Down

0 comments on commit 9a04bf1

Please sign in to comment.