Skip to content

Commit

Permalink
feat(object_store): add retry for Minio SlowDown and `TooManyRequ…
Browse files Browse the repository at this point in the history
…ests` errors (#14739)
  • Loading branch information
kwannoel authored Jan 23, 2024
1 parent 4c953eb commit f804ed0
Show file tree
Hide file tree
Showing 10 changed files with 151 additions and 35 deletions.
5 changes: 5 additions & 0 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1305,6 +1305,11 @@ command = "target/${BUILD_MODE_DIR}/risedev-dev"
args = ["${@}"]
description = "Clean data and start a full RisingWave dev cluster using risedev-dev"

[tasks.ci-kill-no-dump-logs]
category = "RiseDev - CI"
dependencies = ["k", "check-logs", "wait-processes-exit"]
description = "Kill cluster and check logs, do not dump logs"

[tasks.ci-kill]
category = "RiseDev - CI"
dependencies = ["k", "l", "check-logs", "wait-processes-exit"]
Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/backfill-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ download_and_prepare_rw "$profile" source

################ TESTS

profile=$profile ./ci/scripts/run-backfill-tests.sh
BUILDKITE=${BUILDKITE:-} profile=$profile ./ci/scripts/run-backfill-tests.sh
55 changes: 34 additions & 21 deletions ci/scripts/run-backfill-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,17 @@ BACKGROUND_DDL_DIR=$TEST_DIR/background_ddl
COMMON_DIR=$BACKGROUND_DDL_DIR/common

CLUSTER_PROFILE='ci-1cn-1fe-kafka-with-recovery'
echo "--- Configuring cluster profiles"
if [[ -n "${BUILDKITE:-}" ]]; then
RUNTIME_CLUSTER_PROFILE='ci-3cn-1fe-with-monitoring'
else
echo "Running in buildkite"
RUNTIME_CLUSTER_PROFILE='ci-3cn-1fe'
MINIO_RATE_LIMIT_CLUSTER_PROFILE='ci-3cn-1fe-with-minio-rate-limit'
else
echo "Running locally"
RUNTIME_CLUSTER_PROFILE='ci-3cn-1fe-with-monitoring'
MINIO_RATE_LIMIT_CLUSTER_PROFILE='ci-3cn-1fe-with-monitoring-and-minio-rate-limit'
fi
export RUST_LOG="info,risingwave_meta::barrier::progress=debug,risingwave_meta::rpc::ddl_controller=debug"
export RUST_LOG="info,risingwave_stream=info,risingwave_batch=info,risingwave_storage=info" \

run_sql_file() {
psql -h localhost -p 4566 -d dev -U root -f "$@"
Expand Down Expand Up @@ -60,8 +65,8 @@ rename_logs_with_prefix() {
}

kill_cluster() {
cargo make kill
cargo make wait-processes-exit
cargo make ci-kill-no-dump-logs
wait
}

restart_cluster() {
Expand Down Expand Up @@ -150,7 +155,6 @@ test_backfill_tombstone() {
./risedev psql -c "CREATE MATERIALIZED VIEW m1 as select * from tomb;"
echo "--- Kill cluster"
kill_cluster
cargo make wait-processes-exit
wait
}

Expand All @@ -171,9 +175,7 @@ test_replication_with_column_pruning() {
run_sql_file "$PARENT_PATH"/sql/backfill/replication_with_column_pruning/select.sql </dev/null
run_sql_file "$PARENT_PATH"/sql/backfill/replication_with_column_pruning/drop.sql
echo "--- Kill cluster"
cargo make kill
cargo make wait-processes-exit
wait
kill_cluster
}

# Test sink backfill recovery
Expand All @@ -200,13 +202,11 @@ test_sink_backfill_recovery() {

# Verify data matches upstream table.
sqllogictest -p 4566 -d dev 'e2e_test/backfill/sink/validate_sink.slt'
cargo make kill
cargo make wait-processes-exit
wait
kill_cluster
}

test_arrangement_backfill_snapshot_and_upstream_runtime() {
echo "--- e2e, test_backfill_snapshot_and_upstream_runtime"
echo "--- e2e, test_arrangement_backfill_snapshot_and_upstream_runtime, $RUNTIME_CLUSTER_PROFILE"
cargo make ci-start $RUNTIME_CLUSTER_PROFILE
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_table.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert_snapshot.slt'
Expand All @@ -218,12 +218,11 @@ test_arrangement_backfill_snapshot_and_upstream_runtime() {

sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/validate_rows_arrangement.slt'

cargo make kill
cargo make wait-processes-exit
cargo make ci-kill
}

test_no_shuffle_backfill_snapshot_and_upstream_runtime() {
echo "--- e2e, test_backfill_snapshot_and_upstream_runtime"
echo "--- e2e, test_no_shuffle_backfill_snapshot_and_upstream_runtime, $RUNTIME_CLUSTER_PROFILE"
cargo make ci-start $RUNTIME_CLUSTER_PROFILE
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_table.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert_snapshot.slt'
Expand All @@ -235,12 +234,11 @@ test_no_shuffle_backfill_snapshot_and_upstream_runtime() {

sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/validate_rows_no_shuffle.slt'

cargo make kill
cargo make wait-processes-exit
kill_cluster
}

test_backfill_snapshot_runtime() {
echo "--- e2e, test_backfill_snapshot_runtime"
echo "--- e2e, test_backfill_snapshot_runtime, $RUNTIME_CLUSTER_PROFILE"
cargo make ci-start $RUNTIME_CLUSTER_PROFILE
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_table.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert_snapshot.slt'
Expand All @@ -249,8 +247,22 @@ test_backfill_snapshot_runtime() {
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/validate_rows_no_shuffle.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/validate_rows_arrangement.slt'

cargo make kill
cargo make wait-processes-exit
kill_cluster
}

# Throttle the storage throughput.
# Arrangement Backfill should not fail because of this.
test_backfill_snapshot_with_limited_storage_throughput() {
echo "--- e2e, test_backfill_snapshot_with_limited_storage_throughput, $MINIO_RATE_LIMIT_CLUSTER_PROFILE"
cargo make ci-start $MINIO_RATE_LIMIT_CLUSTER_PROFILE
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_table.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/insert_snapshot.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_arrangement_backfill_mv.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/create_no_shuffle_mv.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/validate_rows_no_shuffle.slt'
sqllogictest -p 4566 -d dev 'e2e_test/backfill/runtime/validate_rows_arrangement.slt'

kill_cluster
}

main() {
Expand All @@ -270,6 +282,7 @@ main() {

# Backfill will happen in sequence here.
test_backfill_snapshot_runtime
test_backfill_snapshot_with_limited_storage_throughput

# No upstream only tests, because if there's no snapshot,
# Backfill will complete almost immediately.
Expand Down
2 changes: 1 addition & 1 deletion ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ steps:

- label: "Backfill tests"
key: "backfill-tests"
command: "ci/scripts/backfill-test.sh -p ci-release"
command: "BUILDKITE=${BUILDKITE:-} ci/scripts/backfill-test.sh -p ci-release"
if: |
!(build.pull_request.labels includes "ci/main-cron/skip-ci") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-backfill-tests"
Expand Down
2 changes: 1 addition & 1 deletion ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ steps:
timeout_in_minutes: 40

- label: "Backfill tests"
command: "ci/scripts/backfill-test.sh -p ci-dev"
command: "BUILDKITE=${BUILDKITE:-} ci/scripts/backfill-test.sh -p ci-dev"
if: build.pull_request.labels includes "ci/run-backfill-tests" || build.env("CI_STEPS") =~ /(^|,)backfill-tests?(,|$$)/
depends_on:
- "build"
Expand Down
50 changes: 50 additions & 0 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,56 @@ profile:
- use: prometheus
- use: grafana

ci-3cn-1fe-with-minio-rate-limit:
config-path: src/config/ci.toml
steps:
- use: minio
api-requests-max: 18
api-requests-deadline: 1s
- use: etcd
unsafe-no-fsync: true
- use: meta-node
- use: compute-node
port: 5687
exporter-port: 1222
enable-tiered-cache: true
- use: compute-node
port: 5688
exporter-port: 1223
enable-tiered-cache: true
- use: compute-node
port: 5689
exporter-port: 1224
enable-tiered-cache: true
- use: frontend
- use: compactor

ci-3cn-1fe-with-monitoring-and-minio-rate-limit:
config-path: src/config/ci.toml
steps:
- use: minio
api-requests-max: 18
api-requests-deadline: 1s
- use: etcd
unsafe-no-fsync: true
- use: meta-node
- use: compute-node
port: 5687
exporter-port: 1222
enable-tiered-cache: true
- use: compute-node
port: 5688
exporter-port: 1223
enable-tiered-cache: true
- use: compute-node
port: 5689
exporter-port: 1224
enable-tiered-cache: true
- use: frontend
- use: compactor
- use: prometheus
- use: grafana

ci-3cn-3fe:
config-path: src/config/ci.toml
steps:
Expand Down
34 changes: 30 additions & 4 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -921,9 +921,29 @@ pub struct S3ObjectStoreConfig {
pub object_store_req_retry_max_delay_ms: u64,
#[serde(default = "default::object_store_config::s3::object_store_req_retry_max_attempts")]
pub object_store_req_retry_max_attempts: usize,
/// Whether to retry s3 sdk error from which no error metadata is provided.
#[serde(default = "default::object_store_config::s3::retry_unknown_service_error")]
/// For backwards compatibility, users should use `S3ObjectStoreDeveloperConfig` instead.
#[serde(
default = "default::object_store_config::s3::developer::object_store_retry_unknown_service_error"
)]
pub retry_unknown_service_error: bool,
#[serde(default)]
pub developer: S3ObjectStoreDeveloperConfig,
}

/// The subsections `[storage.object_store.s3.developer]`.
#[derive(Clone, Debug, Serialize, Deserialize, DefaultFromSerde)]
pub struct S3ObjectStoreDeveloperConfig {
/// Whether to retry s3 sdk error from which no error metadata is provided.
#[serde(
default = "default::object_store_config::s3::developer::object_store_retry_unknown_service_error"
)]
pub object_store_retry_unknown_service_error: bool,
/// An array of error codes that should be retried.
/// e.g. `["SlowDown", "TooManyRequests"]`
#[serde(
default = "default::object_store_config::s3::developer::object_store_retryable_service_error_codes"
)]
pub object_store_retryable_service_error_codes: Vec<String>,
}

impl SystemConfig {
Expand Down Expand Up @@ -1526,8 +1546,14 @@ pub mod default {
DEFAULT_RETRY_MAX_ATTEMPTS
}

pub fn retry_unknown_service_error() -> bool {
false
pub mod developer {
pub fn object_store_retry_unknown_service_error() -> bool {
false
}

pub fn object_store_retryable_service_error_codes() -> Vec<String> {
vec!["SlowDown".into(), "TooManyRequests".into()]
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/config/ci.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ imm_merge_threshold = 2
[system]
barrier_interval_ms = 250
checkpoint_frequency = 5
max_concurrent_creating_streaming_jobs = 0
max_concurrent_creating_streaming_jobs = 0
4 changes: 4 additions & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ object_store_req_retry_max_delay_ms = 10000
object_store_req_retry_max_attempts = 8
retry_unknown_service_error = false

[storage.object_store.s3.developer]
object_store_retry_unknown_service_error = false
object_store_retryable_service_error_codes = ["SlowDown", "TooManyRequests"]

[system]
barrier_interval_ms = 1000
checkpoint_frequency = 1
Expand Down
30 changes: 24 additions & 6 deletions src/object_store/src/object/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -938,12 +938,18 @@ impl From<RetryError> for ObjectError {

struct RetryCondition {
retry_unknown_service_error: bool,
retryable_service_error_codes: Vec<String>,
}

impl RetryCondition {
fn new(config: &S3ObjectStoreConfig) -> Self {
Self {
retry_unknown_service_error: config.retry_unknown_service_error,
retry_unknown_service_error: config.developer.object_store_retry_unknown_service_error
|| config.retry_unknown_service_error,
retryable_service_error_codes: config
.developer
.object_store_retryable_service_error_codes
.clone(),
}
}
}
Expand All @@ -958,12 +964,24 @@ impl tokio_retry::Condition<RetryError> for RetryCondition {
return true;
}
}
SdkError::ServiceError(e) => {
if self.retry_unknown_service_error && e.err().code().is_none() {
tracing::warn!(target: "unknown_service_error", "{e:?} occurs, retry S3 get_object request.");
return true;
SdkError::ServiceError(e) => match e.err().code() {
None => {
if self.retry_unknown_service_error {
tracing::warn!(target: "unknown_service_error", "{e:?} occurs, retry S3 get_object request.");
return true;
}
}
}
Some(code) => {
if self
.retryable_service_error_codes
.iter()
.any(|s| s.as_str().eq_ignore_ascii_case(code))
{
tracing::warn!(target: "retryable_service_error", "{e:?} occurs, retry S3 get_object request.");
return true;
}
}
},
_ => {}
},
Either::Right(_) => {
Expand Down

0 comments on commit f804ed0

Please sign in to comment.