From ba761f2362886ab3b73602a3423b73c97a9f5b26 Mon Sep 17 00:00:00 2001 From: August Date: Mon, 7 Oct 2024 08:49:14 +0800 Subject: [PATCH] feat: switch madsim integration and recovery tests to sql backend (#18678) Co-authored-by: Noel Kwan <47273164+kwannoel@users.noreply.github.com> Co-authored-by: Noel Kwan --- Cargo.lock | 1 - ci/scripts/common.sh | 1 + ci/scripts/deterministic-e2e-test.sh | 17 ++-- ci/scripts/deterministic-it-test.sh | 2 +- ci/scripts/deterministic-recovery-test.sh | 29 +++--- ci/scripts/run-deterministic-fuzz-test.sh | 2 +- ci/workflows/main-cron.yml | 36 ++----- ci/workflows/pull-request.yml | 8 +- src/config/ci-sim.toml | 4 + src/meta/node/src/server.rs | 8 +- src/meta/src/barrier/command.rs | 76 +++++++-------- src/meta/src/barrier/recovery.rs | 21 ++-- src/meta/src/stream/stream_graph/fragment.rs | 6 +- src/meta/src/stream/stream_manager.rs | 24 ++++- src/tests/e2e_extended_mode/src/test.rs | 20 +++- src/tests/simulation/Cargo.toml | 1 - src/tests/simulation/src/cluster.rs | 97 ++++++++++--------- src/tests/simulation/src/main.rs | 30 +----- src/tests/simulation/src/slt.rs | 3 + .../tests/integration_tests/batch/mod.rs | 2 +- .../recovery/background_ddl.rs | 2 +- .../integration_tests/scale/shared_source.rs | 36 +++---- .../tests/integration_tests/throttle.rs | 17 +++- .../tests/integration_tests/utils.rs | 12 +-- src/tests/sqlsmith/scripts/gen_queries.sh | 4 +- 25 files changed, 212 insertions(+), 247 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b93e941d152f1..4540940be006a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11721,7 +11721,6 @@ dependencies = [ "lru 0.7.6", "madsim", "madsim-aws-sdk-s3", - "madsim-etcd-client", "madsim-rdkafka", "madsim-tokio", "maplit", diff --git a/ci/scripts/common.sh b/ci/scripts/common.sh index 9593a54aeaf54..5a448a627926e 100755 --- a/ci/scripts/common.sh +++ b/ci/scripts/common.sh @@ -16,6 +16,7 @@ export GCLOUD_DOWNLOAD_TGZ=https://rw-ci-deps-dist.s3.amazonaws.com/google-cloud export NEXTEST_HIDE_PROGRESS_BAR=true export RW_TELEMETRY_TYPE=test export RW_SECRET_STORE_PRIVATE_KEY_HEX="0123456789abcdef" +export RUST_MIN_STACK=4194304 unset LANG if [ -n "${BUILDKITE_COMMIT:-}" ]; then diff --git a/ci/scripts/deterministic-e2e-test.sh b/ci/scripts/deterministic-e2e-test.sh index c561978e428aa..f93ded7bad4a3 100755 --- a/ci/scripts/deterministic-e2e-test.sh +++ b/ci/scripts/deterministic-e2e-test.sh @@ -25,31 +25,30 @@ git clone https://"$GITHUB_TOKEN"@github.com/risingwavelabs/sqlsmith-query-snaps # popd popd -export RUST_LOG=info export LOGDIR=.risingwave/log mkdir -p $LOGDIR echo "--- deterministic simulation e2e, ci-3cn-2fe, ddl" -seq "$TEST_NUM" | parallel MADSIM_TEST_SEED={} './risingwave_simulation ./e2e_test/ddl/\*\*/\*.slt 2> $LOGDIR/ddl-{}.log && rm $LOGDIR/ddl-{}.log' +seq "$TEST_NUM" | parallel './risingwave_simulation ./e2e_test/ddl/\*\*/\*.slt 2> $LOGDIR/ddl-{}.log && rm $LOGDIR/ddl-{}.log' echo "--- deterministic simulation e2e, ci-3cn-2fe, streaming" -seq "$TEST_NUM" | parallel MADSIM_TEST_SEED={} './risingwave_simulation ./e2e_test/streaming/\*\*/\*.slt 2> $LOGDIR/streaming-{}.log && rm $LOGDIR/streaming-{}.log' +seq "$TEST_NUM" | parallel './risingwave_simulation ./e2e_test/streaming/\*\*/\*.slt 2> $LOGDIR/streaming-{}.log && rm $LOGDIR/streaming-{}.log' echo "--- deterministic simulation e2e, ci-3cn-2fe, batch" -seq "$TEST_NUM" | parallel MADSIM_TEST_SEED={} './risingwave_simulation ./e2e_test/batch/\*\*/\*.slt 2> $LOGDIR/batch-{}.log && rm $LOGDIR/batch-{}.log' +seq "$TEST_NUM" | parallel './risingwave_simulation ./e2e_test/batch/\*\*/\*.slt 2> $LOGDIR/batch-{}.log && rm $LOGDIR/batch-{}.log' echo "--- deterministic simulation e2e, ci-3cn-2fe, kafka source" -seq "$TEST_NUM" | parallel MADSIM_TEST_SEED={} './risingwave_simulation --kafka-datadir=./scripts/source/test_data ./e2e_test/source/basic/kafka\*.slt 2> $LOGDIR/source-{}.log && rm $LOGDIR/source-{}.log' +seq "$TEST_NUM" | parallel './risingwave_simulation --kafka-datadir=./scripts/source/test_data ./e2e_test/source/basic/kafka\*.slt 2> $LOGDIR/source-{}.log && rm $LOGDIR/source-{}.log' echo "--- deterministic simulation e2e, ci-3cn-2fe, parallel, streaming" -seq "$TEST_NUM" | parallel MADSIM_TEST_SEED={} './risingwave_simulation -j 16 ./e2e_test/streaming/\*\*/\*.slt 2> $LOGDIR/parallel-streaming-{}.log && rm $LOGDIR/parallel-streaming-{}.log' +seq "$TEST_NUM" | parallel './risingwave_simulation -j 16 ./e2e_test/streaming/\*\*/\*.slt 2> $LOGDIR/parallel-streaming-{}.log && rm $LOGDIR/parallel-streaming-{}.log' echo "--- deterministic simulation e2e, ci-3cn-2fe, parallel, batch" -seq "$TEST_NUM" | parallel MADSIM_TEST_SEED={} './risingwave_simulation -j 16 ./e2e_test/batch/\*\*/\*.slt 2> $LOGDIR/parallel-batch-{}.log && rm $LOGDIR/parallel-batch-{}.log' +seq "$TEST_NUM" | parallel './risingwave_simulation -j 16 ./e2e_test/batch/\*\*/\*.slt 2> $LOGDIR/parallel-batch-{}.log && rm $LOGDIR/parallel-batch-{}.log' echo "--- deterministic simulation e2e, ci-3cn-2fe, fuzzing (pre-generated-queries)" -timeout 10m seq 64 | parallel MADSIM_TEST_SEED={} RUST_MIN_STACK=4194304 './risingwave_simulation --run-sqlsmith-queries ./src/tests/sqlsmith/tests/sqlsmith-query-snapshots/{} 2> $LOGDIR/fuzzing-{}.log && rm $LOGDIR/fuzzing-{}.log' +timeout 10m seq 64 | parallel RUST_MIN_STACK=4194304 './risingwave_simulation --run-sqlsmith-queries ./src/tests/sqlsmith/tests/sqlsmith-query-snapshots/{} 2> $LOGDIR/fuzzing-{}.log && rm $LOGDIR/fuzzing-{}.log' echo "--- deterministic simulation e2e, ci-3cn-2fe, e2e extended mode test" -seq "$TEST_NUM" | parallel MADSIM_TEST_SEED={} './risingwave_simulation -e 2> $LOGDIR/extended-{}.log && rm $LOGDIR/extended-{}.log' +seq "$TEST_NUM" | parallel 'RUST_LOG=info ./risingwave_simulation -e 2> $LOGDIR/extended-{}.log && rm $LOGDIR/extended-{}.log' diff --git a/ci/scripts/deterministic-it-test.sh b/ci/scripts/deterministic-it-test.sh index 40288f5848b16..1190f8b0178f5 100755 --- a/ci/scripts/deterministic-it-test.sh +++ b/ci/scripts/deterministic-it-test.sh @@ -19,7 +19,7 @@ mv target/ci-sim target/sim TEST_PATTERN="$@" echo "--- Run integration tests in deterministic simulation mode" -seq "$TEST_NUM" | parallel "MADSIM_TEST_SEED={} NEXTEST_PROFILE=ci-sim \ +seq "$TEST_NUM" | parallel "NEXTEST_PROFILE=ci-sim \ cargo nextest run \ --no-fail-fast \ --cargo-metadata target/nextest/cargo-metadata.json \ diff --git a/ci/scripts/deterministic-recovery-test.sh b/ci/scripts/deterministic-recovery-test.sh index ced644f4e4879..0cf3590ba1cbc 100755 --- a/ci/scripts/deterministic-recovery-test.sh +++ b/ci/scripts/deterministic-recovery-test.sh @@ -15,7 +15,8 @@ risingwave_meta::rpc::ddl_controller=debug,\ risingwave_meta::barrier::mod=debug,\ risingwave_simulation=debug,\ risingwave_meta::stream::stream_manager=debug,\ -risingwave_meta::barrier::progress=debug" +risingwave_meta::barrier::progress=debug,\ +sqlx=error" # Extra logs you can enable if the existing trace does not give enough info. #risingwave_stream::executor::backfill=trace, @@ -48,11 +49,7 @@ trap filter_stack_trace_for_all_logs ERR # NOTE(kwannoel): We must use `export` here, because the variables are not substituted # directly via bash subtitution. Instead, the `parallel` command substitutes the variables # from the environment. If they are declared without `export`, `parallel` can't read them from the env. -export EXTRA_ARGS="" - -if [[ -n "${USE_SQL_BACKEND:-}" ]]; then - export EXTRA_ARGS="--sqlite-data-dir=." -fi +export EXTRA_ARGS="--sqlite-data-dir=." if [[ -n "${USE_ARRANGEMENT_BACKFILL:-}" ]]; then export EXTRA_ARGS="$EXTRA_ARGS --use-arrangement-backfill" @@ -60,40 +57,40 @@ fi echo "--- EXTRA_ARGS: ${EXTRA_ARGS}" -echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, background_ddl" -seq "$TEST_NUM" | parallel MADSIM_TEST_SEED={} './risingwave_simulation \ +echo "--- deterministic simulation e2e, ci-3cn-2fe-1meta, recovery, background_ddl" +seq "$TEST_NUM" | parallel './risingwave_simulation \ --kill \ --kill-rate=${KILL_RATE} \ ${EXTRA_ARGS:-} \ ./e2e_test/background_ddl/sim/basic.slt \ 2> $LOGDIR/recovery-background-ddl-{}.log && rm $LOGDIR/recovery-background-ddl-{}.log' -echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, ddl" -seq "$TEST_NUM" | parallel MADSIM_TEST_SEED={} './risingwave_simulation \ +echo "--- deterministic simulation e2e, ci-3cn-2fe-1meta, recovery, ddl" +seq "$TEST_NUM" | parallel './risingwave_simulation \ --kill \ --kill-rate=${KILL_RATE} \ --background-ddl-rate=${BACKGROUND_DDL_RATE} \ ${EXTRA_ARGS:-} \ ./e2e_test/ddl/\*\*/\*.slt 2> $LOGDIR/recovery-ddl-{}.log && rm $LOGDIR/recovery-ddl-{}.log' -echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, streaming" -seq "$TEST_NUM" | parallel MADSIM_TEST_SEED={} './risingwave_simulation \ +echo "--- deterministic simulation e2e, ci-3cn-2fe-1meta, recovery, streaming" +seq "$TEST_NUM" | parallel './risingwave_simulation \ --kill \ --kill-rate=${KILL_RATE} \ --background-ddl-rate=${BACKGROUND_DDL_RATE} \ ${EXTRA_ARGS:-} \ ./e2e_test/streaming/\*\*/\*.slt 2> $LOGDIR/recovery-streaming-{}.log && rm $LOGDIR/recovery-streaming-{}.log' -echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, batch" -seq "$TEST_NUM" | parallel MADSIM_TEST_SEED={} './risingwave_simulation \ +echo "--- deterministic simulation e2e, ci-3cn-2fe-1meta, recovery, batch" +seq "$TEST_NUM" | parallel './risingwave_simulation \ --kill \ --kill-rate=${KILL_RATE} \ --background-ddl-rate=${BACKGROUND_DDL_RATE} \ ${EXTRA_ARGS:-} \ ./e2e_test/batch/\*\*/\*.slt 2> $LOGDIR/recovery-batch-{}.log && rm $LOGDIR/recovery-batch-{}.log' -echo "--- deterministic simulation e2e, ci-3cn-2fe-3meta, recovery, kafka source,sink" -seq "$TEST_NUM" | parallel MADSIM_TEST_SEED={} './risingwave_simulation \ +echo "--- deterministic simulation e2e, ci-3cn-2fe-1meta, recovery, kafka source,sink" +seq "$TEST_NUM" | parallel './risingwave_simulation \ --kill \ --kill-rate=${KILL_RATE} \ --kafka-datadir=./scripts/source/test_data \ diff --git a/ci/scripts/run-deterministic-fuzz-test.sh b/ci/scripts/run-deterministic-fuzz-test.sh index c3b8cb2821fd4..85965446b5fc6 100755 --- a/ci/scripts/run-deterministic-fuzz-test.sh +++ b/ci/scripts/run-deterministic-fuzz-test.sh @@ -30,4 +30,4 @@ download-and-decompress-artifact risingwave_simulation . chmod +x ./risingwave_simulation echo "--- deterministic simulation e2e, ci-3cn-2fe, fuzzing (seed)" -seq 32 | parallel MADSIM_TEST_SEED={} './risingwave_simulation --sqlsmith 100 ./src/tests/sqlsmith/tests/testdata 2> $LOGDIR/fuzzing-{}.log && rm $LOGDIR/fuzzing-{}.log' \ No newline at end of file +seq 32 | parallel './risingwave_simulation --sqlsmith 100 ./src/tests/sqlsmith/tests/testdata 2> $LOGDIR/fuzzing-{}.log && rm $LOGDIR/fuzzing-{}.log' diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml index e3084f73681c8..73fd3f3303758 100644 --- a/ci/workflows/main-cron.yml +++ b/ci/workflows/main-cron.yml @@ -247,7 +247,7 @@ steps: - label: "unit test (madsim)" key: "unit-test-deterministic" - command: "MADSIM_TEST_NUM=100 timeout 30m ci/scripts/deterministic-unit-test.sh" + command: "MADSIM_TEST_NUM=100 timeout 50m ci/scripts/deterministic-unit-test.sh" if: | !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null || build.pull_request.labels includes "ci/run-unit-test-deterministic-simulation" @@ -257,7 +257,7 @@ steps: run: rw-build-env config: ci/docker-compose.yml mount-buildkite-agent: true - timeout_in_minutes: 30 + timeout_in_minutes: 50 retry: *auto-retry - label: "integration test (madsim) - scale" @@ -347,7 +347,7 @@ steps: - label: "end-to-end test (madsim)" key: "e2e-test-deterministic" - command: "TEST_NUM=64 timeout 75m ci/scripts/deterministic-e2e-test.sh" + command: "TEST_NUM=32 timeout 120m ci/scripts/deterministic-e2e-test.sh" if: | !(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null || build.pull_request.labels includes "ci/run-e2e-test-deterministic-simulation" @@ -364,30 +364,10 @@ steps: environment: - GITHUB_TOKEN - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 80 + timeout_in_minutes: 120 retry: *auto-retry - # sql backend recovery tests - - label: "recovery test (sql,madsim)" - key: "recovery-test-deterministic-sql" - command: "TEST_NUM=12 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.0 USE_SQL_BACKEND=true timeout 65m ci/scripts/deterministic-recovery-test.sh" - # NOTE(kwannoel): It will only run when the recovery tests label is added currently. - # This is because there are currently some bugs which cause the test to fail. - if: | - build.pull_request.labels includes "ci/run-sql-recovery-test-deterministic-simulation" - || build.env("CI_STEPS") =~ /(^|,)sql-recovery-tests?-deterministic-simulation(,|$$)/ - depends_on: "build-simulation" - plugins: - - docker-compose#v5.1.0: - run: rw-build-env - config: ci/docker-compose.yml - mount-buildkite-agent: true - # Only upload zipped files, otherwise the logs is too much. - - ./ci/plugins/upload-failure-logs-zipped - timeout_in_minutes: 70 - retry: *auto-retry - - - label: "recovery test (etcd,madsim)" + - label: "recovery test (madsim)" key: "recovery-test-deterministic" command: "TEST_NUM=12 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.0 timeout 65m ci/scripts/deterministic-recovery-test.sh" if: | @@ -406,7 +386,7 @@ steps: retry: *auto-retry # Ddl statements will randomly run with background_ddl. - - label: "background_ddl, arrangement_backfill recovery test (etcd,madsim)" + - label: "background_ddl, arrangement_backfill recovery test (madsim)" key: "background-ddl-arrangement-backfill-recovery-test-deterministic" command: "TEST_NUM=12 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.8 USE_ARRANGEMENT_BACKFILL=true timeout 65m ci/scripts/deterministic-recovery-test.sh" if: | @@ -460,7 +440,7 @@ steps: config: ci/docker-compose.yml mount-buildkite-agent: true - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 11 + timeout_in_minutes: 15 retry: *auto-retry - label: "e2e java-binding test (release)" @@ -481,7 +461,7 @@ steps: - ./ci/plugins/upload-failure-logs # Extra 2 minutes to account for docker-compose latency. # See: https://github.com/risingwavelabs/risingwave/issues/9423#issuecomment-1521222169 - timeout_in_minutes: 10 + timeout_in_minutes: 15 retry: *auto-retry - label: "S3 source check on AWS (json parser)" diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml index 151d5ce6ec057..ddba0f82f95fd 100644 --- a/ci/workflows/pull-request.yml +++ b/ci/workflows/pull-request.yml @@ -569,7 +569,7 @@ steps: retry: *auto-retry - label: "end-to-end test (deterministic simulation)" - command: "TEST_NUM=16 ci/scripts/deterministic-e2e-test.sh" + command: "TEST_NUM=4 ci/scripts/deterministic-e2e-test.sh" if: | !(build.pull_request.labels includes "ci/pr/run-selected") && build.env("CI_STEPS") == null || build.pull_request.labels includes "ci/run-e2e-test-deterministic-simulation" @@ -586,12 +586,12 @@ steps: environment: - GITHUB_TOKEN - ./ci/plugins/upload-failure-logs - timeout_in_minutes: 25 + timeout_in_minutes: 30 cancel_on_build_failing: true retry: *auto-retry - label: "recovery test (deterministic simulation)" - command: "TEST_NUM=8 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.0 ci/scripts/deterministic-recovery-test.sh" + command: "TEST_NUM=4 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.0 ci/scripts/deterministic-recovery-test.sh" if: | !(build.pull_request.labels includes "ci/pr/run-selected") && build.env("CI_STEPS") == null || build.pull_request.labels includes "ci/run-recovery-test-deterministic-simulation" @@ -610,7 +610,7 @@ steps: # - test-collector#v1.0.0: # files: "*-junit.xml" # format: "junit" - timeout_in_minutes: 25 + timeout_in_minutes: 30 cancel_on_build_failing: true retry: *auto-retry diff --git a/src/config/ci-sim.toml b/src/config/ci-sim.toml index 9535ff83696cb..30a2b00f7368b 100644 --- a/src/config/ci-sim.toml +++ b/src/config/ci-sim.toml @@ -7,3 +7,7 @@ max_concurrent_creating_streaming_jobs = 0 [meta] meta_leader_lease_secs = 10 + +[meta.developer] +meta_actor_cnt_per_worker_parallelism_soft_limit = 65536 +meta_actor_cnt_per_worker_parallelism_hard_limit = 65536 \ No newline at end of file diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index 11b22014f9f98..6a5914a942021 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -94,9 +94,7 @@ use crate::manager::{ }; use crate::rpc::cloud_provider::AwsEc2Client; use crate::rpc::election::etcd::EtcdElectionClient; -use crate::rpc::election::sql::{ - MySqlDriver, PostgresDriver, SqlBackendElectionClient, SqliteDriver, -}; +use crate::rpc::election::sql::{MySqlDriver, PostgresDriver, SqlBackendElectionClient}; use crate::rpc::metrics::{ start_fragment_info_monitor, start_worker_info_monitor, GLOBAL_META_METRICS, }; @@ -223,9 +221,7 @@ pub async fn rpc_serve( let id = address_info.advertise_addr.clone(); let conn = meta_store_sql.conn.clone(); let election_client: ElectionClientRef = match conn.get_database_backend() { - DbBackend::Sqlite => { - Arc::new(SqlBackendElectionClient::new(id, SqliteDriver::new(conn))) - } + DbBackend::Sqlite => Arc::new(DummyElectionClient::new(id)), DbBackend::Postgres => { Arc::new(SqlBackendElectionClient::new(id, PostgresDriver::new(conn))) } diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 1eff171d019b2..9d0fa100dc506 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -1031,50 +1031,40 @@ impl CommandContext { .unregister_table_ids(table_fragments.all_table_ids().map(TableId::new)) .await?; - match &self.barrier_manager_context.metadata_manager { - MetadataManager::V1(mgr) => { - // NOTE(kwannoel): At this point, catalog manager has persisted the tables already. - // We need to cleanup the table state. So we can do it here. - // The logic is the same as above, for hummock_manager.unregister_table_ids. - if let Err(e) = mgr - .catalog_manager - .cancel_create_materialized_view_procedure( - table_fragments.table_id().table_id, - table_fragments.internal_table_ids(), - ) - .await - { - let table_id = table_fragments.table_id().table_id; - tracing::warn!( - table_id, - error = %e.as_report(), - "cancel_create_table_procedure failed for CancelStreamingJob", - ); - // If failed, check that table is not in meta store. - // If any table is, just panic, let meta do bootstrap recovery. - // Otherwise our persisted state is dirty. - let mut table_ids = table_fragments.internal_table_ids(); - table_ids.push(table_id); - mgr.catalog_manager.assert_tables_deleted(table_ids).await; - } - - // We need to drop table fragments here, - // since this is not done in stream manager (foreground ddl) - // OR barrier manager (background ddl) - mgr.fragment_manager - .drop_table_fragments_vec(&HashSet::from_iter(std::iter::once( - table_fragments.table_id(), - ))) - .await?; - } - MetadataManager::V2(mgr) => { - mgr.catalog_controller - .try_abort_creating_streaming_job( - table_fragments.table_id().table_id as _, - true, - ) - .await?; + if let MetadataManager::V1(mgr) = &self.barrier_manager_context.metadata_manager { + // NOTE(kwannoel): At this point, catalog manager has persisted the tables already. + // We need to cleanup the table state. So we can do it here. + // The logic is the same as above, for hummock_manager.unregister_table_ids. + if let Err(e) = mgr + .catalog_manager + .cancel_create_materialized_view_procedure( + table_fragments.table_id().table_id, + table_fragments.internal_table_ids(), + ) + .await + { + let table_id = table_fragments.table_id().table_id; + tracing::warn!( + table_id, + error = %e.as_report(), + "cancel_create_table_procedure failed for CancelStreamingJob", + ); + // If failed, check that table is not in meta store. + // If any table is, just panic, let meta do bootstrap recovery. + // Otherwise our persisted state is dirty. + let mut table_ids = table_fragments.internal_table_ids(); + table_ids.push(table_id); + mgr.catalog_manager.assert_tables_deleted(table_ids).await; } + + // We need to drop table fragments here, + // since this is not done in stream manager (foreground ddl) + // OR barrier manager (background ddl) + mgr.fragment_manager + .drop_table_fragments_vec(&HashSet::from_iter(std::iter::once( + table_fragments.table_id(), + ))) + .await?; } } diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 266e280ca48f4..606eb9d408786 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -193,21 +193,12 @@ impl GlobalBarrierManagerContext { ) -> MetaResult { let (dropped_actors, cancelled) = scheduled_barriers.pre_apply_drop_cancel_scheduled(); let applied = !dropped_actors.is_empty() || !cancelled.is_empty(); - if !cancelled.is_empty() { - match &self.metadata_manager { - MetadataManager::V1(mgr) => { - mgr.fragment_manager - .drop_table_fragments_vec(&cancelled) - .await?; - } - MetadataManager::V2(mgr) => { - for job_id in cancelled { - mgr.catalog_controller - .try_abort_creating_streaming_job(job_id.table_id as _, true) - .await?; - } - } - }; + if !cancelled.is_empty() + && let MetadataManager::V1(mgr) = &self.metadata_manager + { + mgr.fragment_manager + .drop_table_fragments_vec(&cancelled) + .await?; // no need to unregister state table id from hummock manager here, because it's expected that // we call `purge_state_table_from_hummock` anyway after the current method returns. } diff --git a/src/meta/src/stream/stream_graph/fragment.rs b/src/meta/src/stream/stream_graph/fragment.rs index a28567560b4c1..b6cd2995ca6e9 100644 --- a/src/meta/src/stream/stream_graph/fragment.rs +++ b/src/meta/src/stream/stream_graph/fragment.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::num::NonZeroUsize; use std::ops::{Deref, DerefMut}; use std::sync::LazyLock; @@ -424,8 +424,8 @@ impl StreamFragmentGraph { } /// Retrieve the internal tables map of the whole graph. - pub fn internal_tables(&self) -> HashMap { - let mut tables = HashMap::new(); + pub fn internal_tables(&self) -> BTreeMap { + let mut tables = BTreeMap::new(); for fragment in self.fragments.values() { for table in fragment.extract_internal_tables() { let table_id = table.id; diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index 63a6b3e228b1a..34bfbd4f255b1 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeSet, HashMap, HashSet}; +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::sync::Arc; use futures::future::join_all; @@ -59,7 +59,7 @@ pub struct CreateStreamingJobContext { pub upstream_root_actors: HashMap>, /// Internal tables in the streaming job. - pub internal_tables: HashMap, + pub internal_tables: BTreeMap, /// The locations of the actors to build in the streaming job. pub building_locations: Locations, @@ -301,6 +301,12 @@ impl GlobalStreamManager { "cancelling streaming job {table_id} by issue cancel command." ); + if let MetadataManager::V2(mgr) = &self.metadata_manager { + mgr.catalog_controller + .try_abort_creating_streaming_job(table_id.table_id as _, true) + .await?; + } + self.barrier_scheduler .run_command(Command::CancelStreamingJob(table_fragments)) .await?; @@ -631,8 +637,18 @@ impl GlobalStreamManager { id )))?; } - if let MetadataManager::V1(mgr) = &self.metadata_manager { - mgr.catalog_manager.cancel_create_materialized_view_procedure(id.into(), fragment.internal_table_ids()).await?; + match &self.metadata_manager { + MetadataManager::V1(mgr) => { + mgr.catalog_manager.cancel_create_materialized_view_procedure(id.into(), fragment.internal_table_ids()).await?; + } + MetadataManager::V2(mgr) => { + mgr.catalog_controller + .try_abort_creating_streaming_job( + id.table_id as _, + true, + ) + .await?; + } } self.barrier_scheduler diff --git a/src/tests/e2e_extended_mode/src/test.rs b/src/tests/e2e_extended_mode/src/test.rs index 52b0a004096ba..cb8b7cc6d94d1 100644 --- a/src/tests/e2e_extended_mode/src/test.rs +++ b/src/tests/e2e_extended_mode/src/test.rs @@ -65,7 +65,15 @@ impl TestSuite { Self { config } } + fn init_logger() { + let _ = tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_ansi(false) + .try_init(); + } + pub async fn test(&self) -> anyhow::Result<()> { + Self::init_logger(); self.binary_param_and_result().await?; self.dql_dml_with_param().await?; self.max_row().await?; @@ -520,7 +528,15 @@ impl TestSuite { "; let query_handle = tokio::spawn(async move { - client.query(query_sql, &[]).await.unwrap(); + let result = client.query(query_sql, &[]).await; + match result { + Ok(_) => { + tracing::error!("Query should be canceled"); + } + Err(e) => { + tracing::error!("Query failed with error: {:?}", e); + } + }; }); select! { @@ -528,7 +544,7 @@ impl TestSuite { tracing::error!("Failed to cancel query") }, _ = cancel_token.cancel_query(NoTls) => { - tracing::trace!("Cancel query successfully") + tracing::info!("Cancel query successfully") }, } diff --git a/src/tests/simulation/Cargo.toml b/src/tests/simulation/Cargo.toml index c82f2b7d5911e..a0d3b4b26493b 100644 --- a/src/tests/simulation/Cargo.toml +++ b/src/tests/simulation/Cargo.toml @@ -17,7 +17,6 @@ aws-sdk-s3 = { version = "0.5", package = "madsim-aws-sdk-s3" } cfg-or-panic = "0.2" clap = { workspace = true } console = "0.15" -etcd-client = { workspace = true } expect-test = "1" fail = { version = "0.5" } futures = { version = "0.3", default-features = false, features = ["alloc"] } diff --git a/src/tests/simulation/src/cluster.rs b/src/tests/simulation/src/cluster.rs index a9ffba0063562..22592a730a286 100644 --- a/src/tests/simulation/src/cluster.rs +++ b/src/tests/simulation/src/cluster.rs @@ -86,12 +86,6 @@ pub struct Configuration { /// This determines `worker_node_parallelism`. pub compute_node_cores: usize, - /// The probability of etcd request timeout. - pub etcd_timeout_rate: f32, - - /// Path to etcd data file. - pub etcd_data_path: Option, - /// Queries to run per session. pub per_session_queries: Arc>, @@ -123,8 +117,6 @@ metrics_level = "Disabled" meta_nodes: 1, compactor_nodes: 1, compute_node_cores: 1, - etcd_timeout_rate: 0.0, - etcd_data_path: None, per_session_queries: vec![].into(), sqlite_data_dir: None, } @@ -148,7 +140,7 @@ impl Configuration { config_path: ConfigPath::Temp(config_path.into()), frontend_nodes: 2, compute_nodes: 3, - meta_nodes: 3, + meta_nodes: 1, compactor_nodes: 2, compute_node_cores: 2, ..Default::default() @@ -285,7 +277,7 @@ metrics_level = "Disabled" // in a different process. frontend_nodes: 1, compute_nodes: 3, - meta_nodes: 3, + meta_nodes: 1, compactor_nodes: 2, compute_node_cores: 2, ..Default::default() @@ -323,7 +315,6 @@ metrics_level = "Disabled" /// | frontend-x | 192.168.2.x | /// | compute-x | 192.168.3.x | /// | compactor-x | 192.168.4.x | -/// | etcd | 192.168.10.1 | /// | kafka-broker | 192.168.11.1 | /// | kafka-producer | 192.168.11.2 | /// | object_store_sim | 192.168.12.1 | @@ -350,16 +341,12 @@ impl Cluster { println!("seed = {}", handle.seed()); println!("{:#?}", conf); - if conf.sqlite_data_dir.is_some() && conf.etcd_data_path.is_some() { - bail!("sqlite_data_dir and etcd_data_path cannot be set at the same time"); - } + // TODO: support mutil meta nodes + assert_eq!(conf.meta_nodes, 1); // setup DNS and load balance let net = madsim::net::NetSim::current(); for i in 1..=conf.meta_nodes { - if conf.sqlite_data_dir.is_none() { - net.add_dns_record("etcd", "192.168.10.1".parse().unwrap()); - } net.add_dns_record( &format!("meta-{i}"), format!("192.168.1.{i}").parse().unwrap(), @@ -379,28 +366,6 @@ impl Cluster { ) } - // etcd node - if conf.sqlite_data_dir.is_none() { - let etcd_data = conf - .etcd_data_path - .as_ref() - .map(|path| std::fs::read_to_string(path).unwrap()); - handle - .create_node() - .name("etcd") - .ip("192.168.10.1".parse().unwrap()) - .init(move || { - let addr = "0.0.0.0:2388".parse().unwrap(); - let mut builder = - etcd_client::SimServer::builder().timeout_rate(conf.etcd_timeout_rate); - if let Some(data) = &etcd_data { - builder = builder.load(data.clone()); - } - builder.serve(addr) - }) - .build(); - } - // kafka broker handle .create_node() @@ -434,17 +399,26 @@ impl Cluster { } std::env::set_var("RW_META_ADDR", meta_addrs.join(",")); - let mut sql_endpoint = String::new(); - let mut backend_args = if let Some(sqlite_data_dir) = conf.sqlite_data_dir.as_ref() { - sql_endpoint = format!( - "sqlite://{}stest-{}.sqlite?mode=rwc", + // FIXME: some tests like integration tests will run concurrently, + // resulting in connecting to the same sqlite file if they're using the same seed. + let file_path = if let Some(sqlite_data_dir) = conf.sqlite_data_dir.as_ref() { + format!( + "{}/stest-{}-{}.sqlite", sqlite_data_dir.display(), + handle.seed(), Uuid::new_v4() - ); - vec!["--backend", "sql", "--sql-endpoint", &sql_endpoint] + ) } else { - vec!["--backend", "etcd", "--etcd-endpoints", "etcd:2388"] + format!("./stest-{}-{}.sqlite", handle.seed(), Uuid::new_v4()) }; + if std::fs::exists(&file_path).unwrap() { + panic!( + "sqlite file already exists and used by other cluster: {}", + file_path + ) + } + let sql_endpoint = format!("sqlite://{}?mode=rwc", file_path); + let backend_args = vec!["--backend", "sql", "--sql-endpoint", &sql_endpoint]; // FIXME(kwannoel): // Currently we just use the on-disk version, @@ -914,6 +888,33 @@ impl Cluster { } } +#[cfg_or_panic(madsim)] +impl Drop for Cluster { + fn drop(&mut self) { + // FIXME: remove it when deprecate the on-disk version. + let default_path = PathBuf::from("."); + let sqlite_data_dir = self + .config + .sqlite_data_dir + .as_ref() + .unwrap_or_else(|| &default_path); + for entry in std::fs::read_dir(sqlite_data_dir).unwrap() { + let entry = entry.unwrap(); + let path = entry.path(); + if path + .file_name() + .unwrap() + .to_str() + .unwrap() + .starts_with(&format!("stest-{}-", self.handle.seed())) + { + std::fs::remove_file(path).unwrap(); + break; + } + } + } +} + type SessionRequest = ( String, // query sql oneshot::Sender>, // channel to send result back @@ -960,7 +961,7 @@ impl KillOpts { /// Killing all kind of nodes. pub const ALL: Self = KillOpts { kill_rate: 1.0, - kill_meta: true, + kill_meta: false, // FIXME: make it true when multiple meta nodes are supported kill_frontend: true, kill_compute: true, kill_compactor: true, @@ -968,7 +969,7 @@ impl KillOpts { }; pub const ALL_FAST: Self = KillOpts { kill_rate: 1.0, - kill_meta: true, + kill_meta: false, // FIXME: make it true when multiple meta nodes are supported kill_frontend: true, kill_compute: true, kill_compactor: true, diff --git a/src/tests/simulation/src/main.rs b/src/tests/simulation/src/main.rs index 102db8ccc1c41..6ef89afc5eb93 100644 --- a/src/tests/simulation/src/main.rs +++ b/src/tests/simulation/src/main.rs @@ -47,7 +47,7 @@ pub struct Args { compactor_nodes: usize, /// The number of meta nodes. - #[clap(long, default_value = "3")] + #[clap(long, default_value = "1")] meta_nodes: usize, /// The number of CPU cores for each compute node. @@ -63,10 +63,6 @@ pub struct Args { #[clap(short, long)] jobs: Option, - /// The probability of etcd request timeout. - #[clap(long, default_value = "0.0")] - etcd_timeout_rate: f32, - /// Allow to kill all risingwave node. #[clap(long)] kill: bool, @@ -128,14 +124,6 @@ pub struct Args { #[clap(long)] run_differential_tests: bool, - /// Load etcd data from toml file. - #[clap(long)] - etcd_data: Option, - - /// Dump etcd data into toml file before exit. - #[clap(long)] - etcd_dump: Option, - /// dir to store sqlite backend data of meta node #[clap(long)] sqlite_data_dir: Option, @@ -178,8 +166,6 @@ async fn main() { compactor_nodes: args.compactor_nodes, compute_node_cores: args.compute_node_cores, meta_nodes: args.meta_nodes, - etcd_timeout_rate: args.etcd_timeout_rate, - etcd_data_path: args.etcd_data, sqlite_data_dir: args.sqlite_data_dir, per_session_queries: if args.use_arrangement_backfill { vec!["SET STREAMING_USE_ARRANGEMENT_BACKFILL = true;".to_string()].into() @@ -189,7 +175,7 @@ async fn main() { ..Default::default() }; let kill_opts = KillOpts { - kill_meta: args.kill_meta || args.kill, + kill_meta: false, kill_frontend: args.kill_frontend || args.kill, kill_compute: args.kill_compute || args.kill, kill_compactor: args.kill_compactor || args.kill, @@ -274,18 +260,6 @@ async fn main() { }) .await; - if let Some(path) = args.etcd_dump { - cluster - .run_on_client(async move { - let mut client = etcd_client::Client::connect(["192.168.10.1:2388"], None) - .await - .unwrap(); - let dump = client.dump().await.unwrap(); - std::fs::write(path, dump).unwrap(); - }) - .await; - } - if args.e2e_extended_test { cluster .run_on_client(async move { diff --git a/src/tests/simulation/src/slt.rs b/src/tests/simulation/src/slt.rs index 7ac5a7b27d70b..7bf9d62d19649 100644 --- a/src/tests/simulation/src/slt.rs +++ b/src/tests/simulation/src/slt.rs @@ -408,6 +408,9 @@ pub async fn run_slt_task( } | SqlCmd::CreateMaterializedView { .. } if i != 0 + // It should not be a gRPC request to meta error, + // otherwise it means that the catalog is not yet populated to fe. + && !e.to_string().contains("gRPC request to meta service failed") && e.to_string().contains("exists") && e.to_string().contains("Catalog error") => { diff --git a/src/tests/simulation/tests/integration_tests/batch/mod.rs b/src/tests/simulation/tests/integration_tests/batch/mod.rs index 1ee5132884a2a..747a88ef3dc48 100644 --- a/src/tests/simulation/tests/integration_tests/batch/mod.rs +++ b/src/tests/simulation/tests/integration_tests/batch/mod.rs @@ -78,7 +78,7 @@ mask_worker_temporary_secs = 30 config_path: ConfigPath::Temp(config_path.into()), frontend_nodes: 2, compute_nodes: 0, - meta_nodes: 3, + meta_nodes: 1, compactor_nodes: 2, compute_node_cores: 2, ..Default::default() diff --git a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs index f47f05c4f0c8e..9cc9a6a9ab716 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/background_ddl.rs @@ -320,7 +320,7 @@ async fn test_high_barrier_latency_cancel(config: Configuration) -> Result<()> { tracing::info!(progress, "get progress before cancel stream job"); let progress = progress.replace('%', ""); let progress = progress.parse::().unwrap(); - if progress > 0.01 { + if progress >= 0.01 { break; } else { sleep(Duration::from_micros(1)).await; diff --git a/src/tests/simulation/tests/integration_tests/scale/shared_source.rs b/src/tests/simulation/tests/integration_tests/scale/shared_source.rs index 832909b404f0a..76aaae6a716a8 100644 --- a/src/tests/simulation/tests/integration_tests/scale/shared_source.rs +++ b/src/tests/simulation/tests/integration_tests/scale/shared_source.rs @@ -112,13 +112,13 @@ async fn test_shared_source() -> Result<()> { .collect_vec(); validate_splits_aligned(&mut cluster).await?; expect_test::expect![[r#" - 1 1 HASH {2} {} {SOURCE} 6 256 - 2 3 HASH {4,3} {3} {MVIEW} 6 256 - 3 3 HASH {5} {1} {SOURCE_SCAN} 6 256"#]] + 1 6 HASH {7} {} {SOURCE} 6 256 + 2 8 HASH {9,8} {3} {MVIEW} 6 256 + 3 8 HASH {10} {1} {SOURCE_SCAN} 6 256"#]] .assert_eq(&cluster.run("select * from rw_fragments;").await?); expect_test::expect![[r#" - 1 CREATED ADAPTIVE 256 - 3 CREATED ADAPTIVE 256"#]] + 6 CREATED ADAPTIVE 256 + 8 CREATED ADAPTIVE 256"#]] .assert_eq(&cluster.run("select * from rw_table_fragments;").await?); // SourceBackfill cannot be scaled because of NoShuffle. @@ -137,9 +137,9 @@ async fn test_shared_source() -> Result<()> { .await .unwrap(); expect_test::expect![[r#" - 1 1 HASH {2} {} {SOURCE} 6 256 - 2 3 HASH {4,3} {3} {MVIEW} 5 256 - 3 3 HASH {5} {1} {SOURCE_SCAN} 6 256"#]] + 1 6 HASH {7} {} {SOURCE} 6 256 + 2 8 HASH {9,8} {3} {MVIEW} 5 256 + 3 8 HASH {10} {1} {SOURCE_SCAN} 6 256"#]] .assert_eq(&cluster.run("select * from rw_fragments;").await?); // source is the NoShuffle upstream. It can be scaled, and the downstream SourceBackfill will be scaled together. @@ -156,13 +156,13 @@ async fn test_shared_source() -> Result<()> { .unwrap(); validate_splits_aligned(&mut cluster).await?; expect_test::expect![[r#" - 1 1 HASH {2} {} {SOURCE} 3 256 - 2 3 HASH {4,3} {3} {MVIEW} 5 256 - 3 3 HASH {5} {1} {SOURCE_SCAN} 3 256"#]] + 1 6 HASH {7} {} {SOURCE} 3 256 + 2 8 HASH {9,8} {3} {MVIEW} 5 256 + 3 8 HASH {10} {1} {SOURCE_SCAN} 3 256"#]] .assert_eq(&cluster.run("select * from rw_fragments;").await?); expect_test::expect![[r#" - 1 CREATED CUSTOM 256 - 3 CREATED CUSTOM 256"#]] + 6 CREATED CUSTOM 256 + 8 CREATED CUSTOM 256"#]] .assert_eq(&cluster.run("select * from rw_table_fragments;").await?); // resolve_no_shuffle for backfill fragment is OK, which will scale the upstream together. @@ -180,13 +180,13 @@ async fn test_shared_source() -> Result<()> { .unwrap(); validate_splits_aligned(&mut cluster).await?; expect_test::expect![[r#" - 1 1 HASH {2} {} {SOURCE} 7 256 - 2 3 HASH {4,3} {3} {MVIEW} 5 256 - 3 3 HASH {5} {1} {SOURCE_SCAN} 7 256"#]] + 1 6 HASH {7} {} {SOURCE} 7 256 + 2 8 HASH {9,8} {3} {MVIEW} 5 256 + 3 8 HASH {10} {1} {SOURCE_SCAN} 7 256"#]] .assert_eq(&cluster.run("select * from rw_fragments;").await?); expect_test::expect![[r#" -1 CREATED CUSTOM 256 -3 CREATED CUSTOM 256"#]] + 6 CREATED CUSTOM 256 + 8 CREATED CUSTOM 256"#]] .assert_eq(&cluster.run("select * from rw_table_fragments;").await?); Ok(()) } diff --git a/src/tests/simulation/tests/integration_tests/throttle.rs b/src/tests/simulation/tests/integration_tests/throttle.rs index c7bbb06167ee0..b80c3cb04ce24 100644 --- a/src/tests/simulation/tests/integration_tests/throttle.rs +++ b/src/tests/simulation/tests/integration_tests/throttle.rs @@ -24,18 +24,27 @@ async fn test_throttle_mv() { session.run(SET_PARALLELISM).await.unwrap(); session - .run("create table t1 (id int, val varchar, primary key(id))") // table_id: 1 + .run("create table t1 (id int, val varchar, primary key(id))") .await .unwrap(); session - .run("create materialized view mv1 as select * from t1") // table_id: 2 + .run("create materialized view mv1 as select * from t1") .await .unwrap(); + let res: String = session + .run("select id from rw_materialized_views where name = 'mv1'") + .await + .unwrap(); + let mv_id: u32 = res.parse().unwrap(); + cluster + .throttle_mv(TableId::from(mv_id), Some(200)) + .await + .unwrap(); cluster - .throttle_mv(TableId::from(2), Some(200)) + .throttle_mv(TableId::from(mv_id), None) .await .unwrap(); - cluster.throttle_mv(TableId::from(2), None).await.unwrap(); + session.run("drop table t1 cascade").await.unwrap(); } diff --git a/src/tests/simulation/tests/integration_tests/utils.rs b/src/tests/simulation/tests/integration_tests/utils.rs index 8f06d0acbea2f..0162a40d4e3be 100644 --- a/src/tests/simulation/tests/integration_tests/utils.rs +++ b/src/tests/simulation/tests/integration_tests/utils.rs @@ -26,17 +26,7 @@ pub(crate) async fn kill_cn_and_wait_recover(cluster: &Cluster) { pub(crate) async fn kill_cn_and_meta_and_wait_recover(cluster: &Cluster) { cluster - .kill_nodes( - [ - "compute-1", - "compute-2", - "compute-3", - "meta-1", - "meta-2", - "meta-3", - ], - 0, - ) + .kill_nodes(["compute-1", "compute-2", "compute-3", "meta-1"], 0) .await; sleep(Duration::from_secs(10)).await; } diff --git a/src/tests/sqlsmith/scripts/gen_queries.sh b/src/tests/sqlsmith/scripts/gen_queries.sh index 7a40881021c9c..e7d006f227308 100755 --- a/src/tests/sqlsmith/scripts/gen_queries.sh +++ b/src/tests/sqlsmith/scripts/gen_queries.sh @@ -257,7 +257,7 @@ check_failed_to_generate_queries() { # Otherwise don't update this batch of queries yet. run_queries_timed() { echo "" > $LOGDIR/run_deterministic.stdout.log - timeout "$TIME_BOUND" seq $E2E_TEST_NUM | parallel "MADSIM_TEST_SEED={} \ + timeout "$TIME_BOUND" seq $E2E_TEST_NUM | parallel "\ timeout 6m $MADSIM_BIN --run-sqlsmith-queries $OUTDIR/{} \ 1>>$LOGDIR/run_deterministic.stdout.log \ 2>$LOGDIR/fuzzing-{}.log \ @@ -268,7 +268,7 @@ run_queries_timed() { run_queries() { set +e echo "" > $LOGDIR/run_deterministic.stdout.log - seq $TEST_NUM | parallel "MADSIM_TEST_SEED={} \ + seq $TEST_NUM | parallel "\ timeout 15m $MADSIM_BIN --run-sqlsmith-queries $OUTDIR/{} \ 1>>$LOGDIR/run_deterministic.stdout.log \ 2>$LOGDIR/fuzzing-{}.log \