Skip to content

Commit

Permalink
feat(bench): Add sink bench tool (#14064)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored Jan 16, 2024
1 parent b1cdb98 commit 84c9a08
Show file tree
Hide file tree
Showing 12 changed files with 832 additions and 14 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

public class CassandraSink extends SinkWriterBase {
private static final Logger LOG = LoggerFactory.getLogger(CassandraSink.class);
private static final Integer MAX_BATCH_SIZE = 1024 * 16;

private final CqlSession session;
private final List<SinkRow> updateRowCache = new ArrayList<>(1);
private final HashMap<String, PreparedStatement> stmtMap;
Expand Down Expand Up @@ -122,6 +124,7 @@ private void write_append_only(Iterator<SinkRow> rows) {
.withDescription("Unknown operation: " + op)
.asRuntimeException();
}
tryCommit();
}
}

Expand Down Expand Up @@ -155,6 +158,13 @@ private void write_upsert(Iterator<SinkRow> rows) {
.withDescription("Unknown operation: " + op)
.asRuntimeException();
}
tryCommit();
}
}

private void tryCommit() {
if (batchBuilder.getStatementsCount() >= MAX_BATCH_SIZE) {
sync();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ public BulkListener(RequestTracker requestTracker) {
/** This method is called just before bulk is executed. */
@Override
public void beforeBulk(long executionId, BulkRequest request) {
LOG.info("Sending bulk of {} actions to Elasticsearch.", request.numberOfActions());
LOG.debug("Sending bulk of {} actions to Elasticsearch.", request.numberOfActions());
}

/** This method is called after bulk execution. */
Expand All @@ -262,7 +262,7 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon
this.requestTracker.addErrResult(errMessage);
} else {
this.requestTracker.addOkResult(request.numberOfActions());
LOG.info("Sent bulk of {} actions to Elasticsearch.", request.numberOfActions());
LOG.debug("Sent bulk of {} actions to Elasticsearch.", request.numberOfActions());
}
}

Expand Down
9 changes: 9 additions & 0 deletions src/bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ bytes = "1"
bytesize = { version = "1", features = ["serde"] }
clap = { version = "4", features = ["derive"] }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
futures-async-stream = { workspace = true }
hdrhistogram = "7"
itertools = "0.12"
libc = "0.2"
Expand All @@ -26,9 +27,13 @@ parking_lot = "0.12"
prometheus = { version = "0.13", features = ["process"] }
rand = "0.8"
risingwave_common = { workspace = true }
risingwave_connector = { workspace = true }
risingwave_pb = { workspace = true }
risingwave_rt = { workspace = true, optional = true }
risingwave_storage = { workspace = true }
risingwave_stream = { workspace = true }
serde = { version = "1", features = ["derive"] }
serde_yaml = "0.9"
tokio = { version = "0.2", package = "madsim-tokio", features = [
"fs",
"rt",
Expand All @@ -53,6 +58,10 @@ nix = { version = "0.27", features = ["fs", "mman"] }
name = "s3-bench"
path = "s3_bench/main.rs"

[[bin]]
name = "sink-bench"
path = "sink_bench/main.rs"

[features]
bpf = ["bcc", "risingwave_storage/bpf"]
trace = ["opentelemetry", "risingwave_rt", "tracing/release_max_level_trace"]
Expand Down
Loading

0 comments on commit 84c9a08

Please sign in to comment.