Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bench): Add sink bench tool #14064

Merged
merged 16 commits into from
Jan 16, 2024
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

public class CassandraSink extends SinkWriterBase {
private static final Logger LOG = LoggerFactory.getLogger(CassandraSink.class);
private static final Integer MAX_BATCH_SIZE = 1024 * 16;

private final CqlSession session;
private final List<SinkRow> updateRowCache = new ArrayList<>(1);
private final HashMap<String, PreparedStatement> stmtMap;
Expand Down Expand Up @@ -122,6 +124,7 @@ private void write_append_only(Iterator<SinkRow> rows) {
.withDescription("Unknown operation: " + op)
.asRuntimeException();
}
tryCommit();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This optimization of cassandra sink can be put in a separate PR and we can paste improvement gained from the optimization in the PR description.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bug, fixed in this pr by the way, the exact description has been added to the pr description

}
}

Expand Down Expand Up @@ -155,6 +158,13 @@ private void write_upsert(Iterator<SinkRow> rows) {
.withDescription("Unknown operation: " + op)
.asRuntimeException();
}
tryCommit();
}
}

private void tryCommit() {
if (batchBuilder.getStatementsCount() >= MAX_BATCH_SIZE) {
sync();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ public BulkListener(RequestTracker requestTracker) {
/** This method is called just before bulk is executed. */
@Override
public void beforeBulk(long executionId, BulkRequest request) {
LOG.info("Sending bulk of {} actions to Elasticsearch.", request.numberOfActions());
LOG.debug("Sending bulk of {} actions to Elasticsearch.", request.numberOfActions());
}

/** This method is called after bulk execution. */
Expand All @@ -282,7 +282,7 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon
this.requestTracker.addErrResult(errMessage);
} else {
this.requestTracker.addOkResult(request.numberOfActions());
LOG.info("Sent bulk of {} actions to Elasticsearch.", request.numberOfActions());
LOG.debug("Sent bulk of {} actions to Elasticsearch.", request.numberOfActions());
}
}

Expand Down
9 changes: 9 additions & 0 deletions src/bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ bytes = "1"
bytesize = { version = "1", features = ["serde"] }
clap = { version = "4", features = ["derive"] }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
futures-async-stream = { workspace = true }
hdrhistogram = "7"
itertools = "0.12"
libc = "0.2"
Expand All @@ -26,9 +27,13 @@ parking_lot = "0.12"
prometheus = { version = "0.13", features = ["process"] }
rand = "0.8"
risingwave_common = { workspace = true }
risingwave_connector = { workspace = true }
risingwave_pb = { workspace = true }
risingwave_rt = { workspace = true, optional = true }
risingwave_storage = { workspace = true }
risingwave_stream = { workspace = true }
serde = { version = "1", features = ["derive"] }
serde_yaml = "0.9"
tokio = { version = "0.2", package = "madsim-tokio", features = [
"fs",
"rt",
Expand All @@ -53,6 +58,10 @@ nix = { version = "0.27", features = ["fs", "mman"] }
name = "s3-bench"
path = "s3_bench/main.rs"

[[bin]]
name = "sink-bench"
path = "sink_bench/main.rs"

[features]
bpf = ["bcc", "risingwave_storage/bpf"]
trace = ["opentelemetry", "risingwave_rt", "tracing/release_max_level_trace"]
Expand Down
Loading
Loading