Skip to content

Commit

Permalink
feat: switch madsim integration and recovery tests to sql backend (#1…
Browse files Browse the repository at this point in the history
…8678)

Co-authored-by: Noel Kwan <[email protected]>
Co-authored-by: Noel Kwan <[email protected]>
  • Loading branch information
3 people authored Oct 7, 2024
1 parent 51c5e3e commit ba761f2
Show file tree
Hide file tree
Showing 25 changed files with 212 additions and 247 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ci/scripts/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export GCLOUD_DOWNLOAD_TGZ=https://rw-ci-deps-dist.s3.amazonaws.com/google-cloud
export NEXTEST_HIDE_PROGRESS_BAR=true
export RW_TELEMETRY_TYPE=test
export RW_SECRET_STORE_PRIVATE_KEY_HEX="0123456789abcdef"
export RUST_MIN_STACK=4194304

unset LANG
if [ -n "${BUILDKITE_COMMIT:-}" ]; then
Expand Down
17 changes: 8 additions & 9 deletions ci/scripts/deterministic-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,31 +25,30 @@ git clone https://"$GITHUB_TOKEN"@github.com/risingwavelabs/sqlsmith-query-snaps
# popd
popd

export RUST_LOG=info
export LOGDIR=.risingwave/log

mkdir -p $LOGDIR

echo "--- deterministic simulation e2e, ci-3cn-2fe, ddl"
seq "$TEST_NUM" | parallel MADSIM_TEST_SEED={} './risingwave_simulation ./e2e_test/ddl/\*\*/\*.slt 2> $LOGDIR/ddl-{}.log && rm $LOGDIR/ddl-{}.log'
seq "$TEST_NUM" | parallel './risingwave_simulation ./e2e_test/ddl/\*\*/\*.slt 2> $LOGDIR/ddl-{}.log && rm $LOGDIR/ddl-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-2fe, streaming"
seq "$TEST_NUM" | parallel MADSIM_TEST_SEED={} './risingwave_simulation ./e2e_test/streaming/\*\*/\*.slt 2> $LOGDIR/streaming-{}.log && rm $LOGDIR/streaming-{}.log'
seq "$TEST_NUM" | parallel './risingwave_simulation ./e2e_test/streaming/\*\*/\*.slt 2> $LOGDIR/streaming-{}.log && rm $LOGDIR/streaming-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-2fe, batch"
seq "$TEST_NUM" | parallel MADSIM_TEST_SEED={} './risingwave_simulation ./e2e_test/batch/\*\*/\*.slt 2> $LOGDIR/batch-{}.log && rm $LOGDIR/batch-{}.log'
seq "$TEST_NUM" | parallel './risingwave_simulation ./e2e_test/batch/\*\*/\*.slt 2> $LOGDIR/batch-{}.log && rm $LOGDIR/batch-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-2fe, kafka source"
seq "$TEST_NUM" | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kafka-datadir=./scripts/source/test_data ./e2e_test/source/basic/kafka\*.slt 2> $LOGDIR/source-{}.log && rm $LOGDIR/source-{}.log'
seq "$TEST_NUM" | parallel './risingwave_simulation --kafka-datadir=./scripts/source/test_data ./e2e_test/source/basic/kafka\*.slt 2> $LOGDIR/source-{}.log && rm $LOGDIR/source-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-2fe, parallel, streaming"
seq "$TEST_NUM" | parallel MADSIM_TEST_SEED={} './risingwave_simulation -j 16 ./e2e_test/streaming/\*\*/\*.slt 2> $LOGDIR/parallel-streaming-{}.log && rm $LOGDIR/parallel-streaming-{}.log'
seq "$TEST_NUM" | parallel './risingwave_simulation -j 16 ./e2e_test/streaming/\*\*/\*.slt 2> $LOGDIR/parallel-streaming-{}.log && rm $LOGDIR/parallel-streaming-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-2fe, parallel, batch"
seq "$TEST_NUM" | parallel MADSIM_TEST_SEED={} './risingwave_simulation -j 16 ./e2e_test/batch/\*\*/\*.slt 2> $LOGDIR/parallel-batch-{}.log && rm $LOGDIR/parallel-batch-{}.log'
seq "$TEST_NUM" | parallel './risingwave_simulation -j 16 ./e2e_test/batch/\*\*/\*.slt 2> $LOGDIR/parallel-batch-{}.log && rm $LOGDIR/parallel-batch-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-2fe, fuzzing (pre-generated-queries)"
timeout 10m seq 64 | parallel MADSIM_TEST_SEED={} RUST_MIN_STACK=4194304 './risingwave_simulation --run-sqlsmith-queries ./src/tests/sqlsmith/tests/sqlsmith-query-snapshots/{} 2> $LOGDIR/fuzzing-{}.log && rm $LOGDIR/fuzzing-{}.log'
timeout 10m seq 64 | parallel RUST_MIN_STACK=4194304 './risingwave_simulation --run-sqlsmith-queries ./src/tests/sqlsmith/tests/sqlsmith-query-snapshots/{} 2> $LOGDIR/fuzzing-{}.log && rm $LOGDIR/fuzzing-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-2fe, e2e extended mode test"
seq "$TEST_NUM" | parallel MADSIM_TEST_SEED={} './risingwave_simulation -e 2> $LOGDIR/extended-{}.log && rm $LOGDIR/extended-{}.log'
seq "$TEST_NUM" | parallel 'RUST_LOG=info ./risingwave_simulation -e 2> $LOGDIR/extended-{}.log && rm $LOGDIR/extended-{}.log'
2 changes: 1 addition & 1 deletion ci/scripts/deterministic-it-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ mv target/ci-sim target/sim
TEST_PATTERN="$@"

echo "--- Run integration tests in deterministic simulation mode"
seq "$TEST_NUM" | parallel "MADSIM_TEST_SEED={} NEXTEST_PROFILE=ci-sim \
seq "$TEST_NUM" | parallel "NEXTEST_PROFILE=ci-sim \
cargo nextest run \
--no-fail-fast \
--cargo-metadata target/nextest/cargo-metadata.json \
Expand Down
29 changes: 13 additions & 16 deletions ci/scripts/deterministic-recovery-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ risingwave_meta::rpc::ddl_controller=debug,\
risingwave_meta::barrier::mod=debug,\
risingwave_simulation=debug,\
risingwave_meta::stream::stream_manager=debug,\
risingwave_meta::barrier::progress=debug"
risingwave_meta::barrier::progress=debug,\
sqlx=error"

# Extra logs you can enable if the existing trace does not give enough info.
#risingwave_stream::executor::backfill=trace,
Expand Down Expand Up @@ -48,52 +49,48 @@ trap filter_stack_trace_for_all_logs ERR
# NOTE(kwannoel): We must use `export` here, because the variables are not substituted
# directly via bash subtitution. Instead, the `parallel` command substitutes the variables
# from the environment. If they are declared without `export`, `parallel` can't read them from the env.
export EXTRA_ARGS=""

if [[ -n "${USE_SQL_BACKEND:-}" ]]; then
export EXTRA_ARGS="--sqlite-data-dir=."
fi
export EXTRA_ARGS="--sqlite-data-dir=."

if [[ -n "${USE_ARRANGEMENT_BACKFILL:-}" ]]; then
export EXTRA_ARGS="$EXTRA_ARGS --use-arrangement-backfill"
fi

echo "--- EXTRA_ARGS: ${EXTRA_ARGS}"

echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, background_ddl"
seq "$TEST_NUM" | parallel MADSIM_TEST_SEED={} './risingwave_simulation \
echo "--- deterministic simulation e2e, ci-3cn-2fe-1meta, recovery, background_ddl"
seq "$TEST_NUM" | parallel './risingwave_simulation \
--kill \
--kill-rate=${KILL_RATE} \
${EXTRA_ARGS:-} \
./e2e_test/background_ddl/sim/basic.slt \
2> $LOGDIR/recovery-background-ddl-{}.log && rm $LOGDIR/recovery-background-ddl-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, ddl"
seq "$TEST_NUM" | parallel MADSIM_TEST_SEED={} './risingwave_simulation \
echo "--- deterministic simulation e2e, ci-3cn-2fe-1meta, recovery, ddl"
seq "$TEST_NUM" | parallel './risingwave_simulation \
--kill \
--kill-rate=${KILL_RATE} \
--background-ddl-rate=${BACKGROUND_DDL_RATE} \
${EXTRA_ARGS:-} \
./e2e_test/ddl/\*\*/\*.slt 2> $LOGDIR/recovery-ddl-{}.log && rm $LOGDIR/recovery-ddl-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, streaming"
seq "$TEST_NUM" | parallel MADSIM_TEST_SEED={} './risingwave_simulation \
echo "--- deterministic simulation e2e, ci-3cn-2fe-1meta, recovery, streaming"
seq "$TEST_NUM" | parallel './risingwave_simulation \
--kill \
--kill-rate=${KILL_RATE} \
--background-ddl-rate=${BACKGROUND_DDL_RATE} \
${EXTRA_ARGS:-} \
./e2e_test/streaming/\*\*/\*.slt 2> $LOGDIR/recovery-streaming-{}.log && rm $LOGDIR/recovery-streaming-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, batch"
seq "$TEST_NUM" | parallel MADSIM_TEST_SEED={} './risingwave_simulation \
echo "--- deterministic simulation e2e, ci-3cn-2fe-1meta, recovery, batch"
seq "$TEST_NUM" | parallel './risingwave_simulation \
--kill \
--kill-rate=${KILL_RATE} \
--background-ddl-rate=${BACKGROUND_DDL_RATE} \
${EXTRA_ARGS:-} \
./e2e_test/batch/\*\*/\*.slt 2> $LOGDIR/recovery-batch-{}.log && rm $LOGDIR/recovery-batch-{}.log'

echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, kafka source,sink"
seq "$TEST_NUM" | parallel MADSIM_TEST_SEED={} './risingwave_simulation \
echo "--- deterministic simulation e2e, ci-3cn-2fe-1meta, recovery, kafka source,sink"
seq "$TEST_NUM" | parallel './risingwave_simulation \
--kill \
--kill-rate=${KILL_RATE} \
--kafka-datadir=./scripts/source/test_data \
Expand Down
2 changes: 1 addition & 1 deletion ci/scripts/run-deterministic-fuzz-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ download-and-decompress-artifact risingwave_simulation .
chmod +x ./risingwave_simulation

echo "--- deterministic simulation e2e, ci-3cn-2fe, fuzzing (seed)"
seq 32 | parallel MADSIM_TEST_SEED={} './risingwave_simulation --sqlsmith 100 ./src/tests/sqlsmith/tests/testdata 2> $LOGDIR/fuzzing-{}.log && rm $LOGDIR/fuzzing-{}.log'
seq 32 | parallel './risingwave_simulation --sqlsmith 100 ./src/tests/sqlsmith/tests/testdata 2> $LOGDIR/fuzzing-{}.log && rm $LOGDIR/fuzzing-{}.log'
36 changes: 8 additions & 28 deletions ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ steps:

- label: "unit test (madsim)"
key: "unit-test-deterministic"
command: "MADSIM_TEST_NUM=100 timeout 30m ci/scripts/deterministic-unit-test.sh"
command: "MADSIM_TEST_NUM=100 timeout 50m ci/scripts/deterministic-unit-test.sh"
if: |
!(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-unit-test-deterministic-simulation"
Expand All @@ -257,7 +257,7 @@ steps:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
timeout_in_minutes: 30
timeout_in_minutes: 50
retry: *auto-retry

- label: "integration test (madsim) - scale"
Expand Down Expand Up @@ -347,7 +347,7 @@ steps:

- label: "end-to-end test (madsim)"
key: "e2e-test-deterministic"
command: "TEST_NUM=64 timeout 75m ci/scripts/deterministic-e2e-test.sh"
command: "TEST_NUM=32 timeout 120m ci/scripts/deterministic-e2e-test.sh"
if: |
!(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-e2e-test-deterministic-simulation"
Expand All @@ -364,30 +364,10 @@ steps:
environment:
- GITHUB_TOKEN
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 80
timeout_in_minutes: 120
retry: *auto-retry

# sql backend recovery tests
- label: "recovery test (sql,madsim)"
key: "recovery-test-deterministic-sql"
command: "TEST_NUM=12 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.0 USE_SQL_BACKEND=true timeout 65m ci/scripts/deterministic-recovery-test.sh"
# NOTE(kwannoel): It will only run when the recovery tests label is added currently.
# This is because there are currently some bugs which cause the test to fail.
if: |
build.pull_request.labels includes "ci/run-sql-recovery-test-deterministic-simulation"
|| build.env("CI_STEPS") =~ /(^|,)sql-recovery-tests?-deterministic-simulation(,|$$)/
depends_on: "build-simulation"
plugins:
- docker-compose#v5.1.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
# Only upload zipped files, otherwise the logs is too much.
- ./ci/plugins/upload-failure-logs-zipped
timeout_in_minutes: 70
retry: *auto-retry

- label: "recovery test (etcd,madsim)"
- label: "recovery test (madsim)"
key: "recovery-test-deterministic"
command: "TEST_NUM=12 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.0 timeout 65m ci/scripts/deterministic-recovery-test.sh"
if: |
Expand All @@ -406,7 +386,7 @@ steps:
retry: *auto-retry

# Ddl statements will randomly run with background_ddl.
- label: "background_ddl, arrangement_backfill recovery test (etcd,madsim)"
- label: "background_ddl, arrangement_backfill recovery test (madsim)"
key: "background-ddl-arrangement-backfill-recovery-test-deterministic"
command: "TEST_NUM=12 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.8 USE_ARRANGEMENT_BACKFILL=true timeout 65m ci/scripts/deterministic-recovery-test.sh"
if: |
Expand Down Expand Up @@ -460,7 +440,7 @@ steps:
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 11
timeout_in_minutes: 15
retry: *auto-retry

- label: "e2e java-binding test (release)"
Expand All @@ -481,7 +461,7 @@ steps:
- ./ci/plugins/upload-failure-logs
# Extra 2 minutes to account for docker-compose latency.
# See: https://github.com/risingwavelabs/risingwave/issues/9423#issuecomment-1521222169
timeout_in_minutes: 10
timeout_in_minutes: 15
retry: *auto-retry

- label: "S3 source check on AWS (json parser)"
Expand Down
8 changes: 4 additions & 4 deletions ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ steps:
retry: *auto-retry

- label: "end-to-end test (deterministic simulation)"
command: "TEST_NUM=16 ci/scripts/deterministic-e2e-test.sh"
command: "TEST_NUM=4 ci/scripts/deterministic-e2e-test.sh"
if: |
!(build.pull_request.labels includes "ci/pr/run-selected") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-e2e-test-deterministic-simulation"
Expand All @@ -586,12 +586,12 @@ steps:
environment:
- GITHUB_TOKEN
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 25
timeout_in_minutes: 30
cancel_on_build_failing: true
retry: *auto-retry

- label: "recovery test (deterministic simulation)"
command: "TEST_NUM=8 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.0 ci/scripts/deterministic-recovery-test.sh"
command: "TEST_NUM=4 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.0 ci/scripts/deterministic-recovery-test.sh"
if: |
!(build.pull_request.labels includes "ci/pr/run-selected") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-recovery-test-deterministic-simulation"
Expand All @@ -610,7 +610,7 @@ steps:
# - test-collector#v1.0.0:
# files: "*-junit.xml"
# format: "junit"
timeout_in_minutes: 25
timeout_in_minutes: 30
cancel_on_build_failing: true
retry: *auto-retry

Expand Down
4 changes: 4 additions & 0 deletions src/config/ci-sim.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@ max_concurrent_creating_streaming_jobs = 0

[meta]
meta_leader_lease_secs = 10

[meta.developer]
meta_actor_cnt_per_worker_parallelism_soft_limit = 65536
meta_actor_cnt_per_worker_parallelism_hard_limit = 65536
8 changes: 2 additions & 6 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,7 @@ use crate::manager::{
};
use crate::rpc::cloud_provider::AwsEc2Client;
use crate::rpc::election::etcd::EtcdElectionClient;
use crate::rpc::election::sql::{
MySqlDriver, PostgresDriver, SqlBackendElectionClient, SqliteDriver,
};
use crate::rpc::election::sql::{MySqlDriver, PostgresDriver, SqlBackendElectionClient};
use crate::rpc::metrics::{
start_fragment_info_monitor, start_worker_info_monitor, GLOBAL_META_METRICS,
};
Expand Down Expand Up @@ -223,9 +221,7 @@ pub async fn rpc_serve(
let id = address_info.advertise_addr.clone();
let conn = meta_store_sql.conn.clone();
let election_client: ElectionClientRef = match conn.get_database_backend() {
DbBackend::Sqlite => {
Arc::new(SqlBackendElectionClient::new(id, SqliteDriver::new(conn)))
}
DbBackend::Sqlite => Arc::new(DummyElectionClient::new(id)),
DbBackend::Postgres => {
Arc::new(SqlBackendElectionClient::new(id, PostgresDriver::new(conn)))
}
Expand Down
76 changes: 33 additions & 43 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1031,50 +1031,40 @@ impl CommandContext {
.unregister_table_ids(table_fragments.all_table_ids().map(TableId::new))
.await?;

match &self.barrier_manager_context.metadata_manager {
MetadataManager::V1(mgr) => {
// NOTE(kwannoel): At this point, catalog manager has persisted the tables already.
// We need to cleanup the table state. So we can do it here.
// The logic is the same as above, for hummock_manager.unregister_table_ids.
if let Err(e) = mgr
.catalog_manager
.cancel_create_materialized_view_procedure(
table_fragments.table_id().table_id,
table_fragments.internal_table_ids(),
)
.await
{
let table_id = table_fragments.table_id().table_id;
tracing::warn!(
table_id,
error = %e.as_report(),
"cancel_create_table_procedure failed for CancelStreamingJob",
);
// If failed, check that table is not in meta store.
// If any table is, just panic, let meta do bootstrap recovery.
// Otherwise our persisted state is dirty.
let mut table_ids = table_fragments.internal_table_ids();
table_ids.push(table_id);
mgr.catalog_manager.assert_tables_deleted(table_ids).await;
}

// We need to drop table fragments here,
// since this is not done in stream manager (foreground ddl)
// OR barrier manager (background ddl)
mgr.fragment_manager
.drop_table_fragments_vec(&HashSet::from_iter(std::iter::once(
table_fragments.table_id(),
)))
.await?;
}
MetadataManager::V2(mgr) => {
mgr.catalog_controller
.try_abort_creating_streaming_job(
table_fragments.table_id().table_id as _,
true,
)
.await?;
if let MetadataManager::V1(mgr) = &self.barrier_manager_context.metadata_manager {
// NOTE(kwannoel): At this point, catalog manager has persisted the tables already.
// We need to cleanup the table state. So we can do it here.
// The logic is the same as above, for hummock_manager.unregister_table_ids.
if let Err(e) = mgr
.catalog_manager
.cancel_create_materialized_view_procedure(
table_fragments.table_id().table_id,
table_fragments.internal_table_ids(),
)
.await
{
let table_id = table_fragments.table_id().table_id;
tracing::warn!(
table_id,
error = %e.as_report(),
"cancel_create_table_procedure failed for CancelStreamingJob",
);
// If failed, check that table is not in meta store.
// If any table is, just panic, let meta do bootstrap recovery.
// Otherwise our persisted state is dirty.
let mut table_ids = table_fragments.internal_table_ids();
table_ids.push(table_id);
mgr.catalog_manager.assert_tables_deleted(table_ids).await;
}

// We need to drop table fragments here,
// since this is not done in stream manager (foreground ddl)
// OR barrier manager (background ddl)
mgr.fragment_manager
.drop_table_fragments_vec(&HashSet::from_iter(std::iter::once(
table_fragments.table_id(),
)))
.await?;
}
}

Expand Down
Loading

0 comments on commit ba761f2

Please sign in to comment.