Skip to content

Commit

Permalink
feat(sink): reimplement sink coordinator worker and support scale
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Sep 9, 2024
1 parent e2c89f4 commit 6ee4106
Show file tree
Hide file tree
Showing 8 changed files with 763 additions and 397 deletions.
6 changes: 6 additions & 0 deletions proto/connector_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,15 @@ message CoordinateRequest {
SinkMetadata metadata = 2;
}

message UpdateVnodeBitmapRequest {
common.Buffer vnode_bitmap = 1;
}

oneof msg {
StartCoordinationRequest start_request = 1;
CommitRequest commit_request = 2;
UpdateVnodeBitmapRequest update_vnode_request = 3;
bool stop = 4;
}
}

Expand Down
19 changes: 19 additions & 0 deletions src/connector/src/sink/coordinate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
use std::sync::Arc;

use anyhow::anyhow;
use futures::FutureExt;
use risingwave_common::array::StreamChunk;
use risingwave_common::bitmap::Bitmap;
use risingwave_pb::connector_service::SinkMetadata;
use risingwave_rpc_client::CoordinatorStreamHandle;
use thiserror_ext::AsReport;
use tracing::warn;

use super::SinkCoordinationRpcClientEnum;
Expand Down Expand Up @@ -81,6 +83,23 @@ impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> SinkWriter for Coordi
}

async fn update_vnode_bitmap(&mut self, vnode_bitmap: Arc<Bitmap>) -> Result<()> {
self.coordinator_stream_handle
.update_vnode_bitmap(&vnode_bitmap)
.await?;
self.inner.update_vnode_bitmap(vnode_bitmap).await
}
}

impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> Drop for CoordinatedSinkWriter<W> {
fn drop(&mut self) {
match self.coordinator_stream_handle.stop().now_or_never() {
None => {
warn!("unable to send stop due to channel full")
}
Some(Err(e)) => {
warn!(e = ?e.as_report(), "failed to stop the coordinator");
}
Some(Ok(_)) => {}
}
}
}
43 changes: 34 additions & 9 deletions src/connector/src/sink/decouple_checkpoint_log_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl<W: SinkWriter<CommitMetadata = ()>> LogSinker for DecoupleCheckpointLogSink
EpochBegun { curr_epoch: u64 },

/// Mark that the consumer has just received a barrier
BarrierReceived { prev_epoch: u64 },
BarrierReceived { prev_epoch: u64, committed: bool },
}

let mut state = LogConsumerState::Uninitialized;
Expand All @@ -75,15 +75,34 @@ impl<W: SinkWriter<CommitMetadata = ()>> LogSinker for DecoupleCheckpointLogSink

loop {
let (epoch, item): (u64, LogStoreReadItem) = log_reader.next_item().await?;
if let LogStoreReadItem::UpdateVnodeBitmap(_) = &item {
match &state {
LogConsumerState::BarrierReceived { .. } => {}
if let LogStoreReadItem::UpdateVnodeBitmap(vnode_bitmap) = &item {
match &mut state {
LogConsumerState::BarrierReceived {
committed,
prev_epoch,
} => {
if !*committed {
// force commit on update vnode bitmap
let start_time = Instant::now();
sink_writer.barrier(true).await?;
sink_metrics
.sink_commit_duration_metrics
.observe(start_time.elapsed().as_millis() as f64);
log_reader.truncate(TruncateOffset::Barrier { epoch: *prev_epoch })?;
current_checkpoint = 0;
*committed = true;
}
sink_writer
.update_vnode_bitmap(vnode_bitmap.clone())
.await?;
}
_ => unreachable!(
"update vnode bitmap can be accepted only right after \
barrier, but current state is {:?}",
state
),
}
continue;
}
// begin_epoch when not previously began
state = match state {
Expand All @@ -100,7 +119,7 @@ impl<W: SinkWriter<CommitMetadata = ()>> LogSinker for DecoupleCheckpointLogSink
);
LogConsumerState::EpochBegun { curr_epoch: epoch }
}
LogConsumerState::BarrierReceived { prev_epoch } => {
LogConsumerState::BarrierReceived { prev_epoch, .. } => {
assert!(
epoch > prev_epoch,
"new epoch {} should be greater than prev epoch {}",
Expand All @@ -123,7 +142,7 @@ impl<W: SinkWriter<CommitMetadata = ()>> LogSinker for DecoupleCheckpointLogSink
LogConsumerState::EpochBegun { curr_epoch } => curr_epoch,
_ => unreachable!("epoch must have begun before handling barrier"),
};
if is_checkpoint {
let committed = if is_checkpoint {
current_checkpoint += 1;
if current_checkpoint >= commit_checkpoint_interval.get() {
let start_time = Instant::now();
Expand All @@ -133,16 +152,22 @@ impl<W: SinkWriter<CommitMetadata = ()>> LogSinker for DecoupleCheckpointLogSink
.observe(start_time.elapsed().as_millis() as f64);
log_reader.truncate(TruncateOffset::Barrier { epoch })?;
current_checkpoint = 0;
true
} else {
sink_writer.barrier(false).await?;
false
}
} else {
sink_writer.barrier(false).await?;
false
};
state = LogConsumerState::BarrierReceived {
prev_epoch,
committed,
}
state = LogConsumerState::BarrierReceived { prev_epoch }
}
LogStoreReadItem::UpdateVnodeBitmap(vnode_bitmap) => {
sink_writer.update_vnode_bitmap(vnode_bitmap).await?;
LogStoreReadItem::UpdateVnodeBitmap(_) => {
unreachable!("should have been handle earlier")
}
}
}
Expand Down
Loading

0 comments on commit 6ee4106

Please sign in to comment.