From 6ee4106526e60d15d2fa49fc79f33c2138bbcead Mon Sep 17 00:00:00 2001 From: William Wen Date: Mon, 9 Sep 2024 18:33:23 +0800 Subject: [PATCH] feat(sink): reimplement sink coordinator worker and support scale --- proto/connector_service.proto | 6 + src/connector/src/sink/coordinate.rs | 19 + .../src/sink/decouple_checkpoint_log_sink.rs | 43 +- .../sink_coordination/coordinator_worker.rs | 458 ++++++++--------- .../src/manager/sink_coordination/handle.rs | 139 ++++++ .../src/manager/sink_coordination/manager.rs | 459 +++++++++++++----- src/meta/src/manager/sink_coordination/mod.rs | 14 +- src/rpc_client/src/sink_coordinate_client.rs | 22 +- 8 files changed, 763 insertions(+), 397 deletions(-) create mode 100644 src/meta/src/manager/sink_coordination/handle.rs diff --git a/proto/connector_service.proto b/proto/connector_service.proto index 964d227452548..99d9c58d4f1ed 100644 --- a/proto/connector_service.proto +++ b/proto/connector_service.proto @@ -229,9 +229,15 @@ message CoordinateRequest { SinkMetadata metadata = 2; } + message UpdateVnodeBitmapRequest { + common.Buffer vnode_bitmap = 1; + } + oneof msg { StartCoordinationRequest start_request = 1; CommitRequest commit_request = 2; + UpdateVnodeBitmapRequest update_vnode_request = 3; + bool stop = 4; } } diff --git a/src/connector/src/sink/coordinate.rs b/src/connector/src/sink/coordinate.rs index c069167870101..fcfb8c0877d6b 100644 --- a/src/connector/src/sink/coordinate.rs +++ b/src/connector/src/sink/coordinate.rs @@ -15,10 +15,12 @@ use std::sync::Arc; use anyhow::anyhow; +use futures::FutureExt; use risingwave_common::array::StreamChunk; use risingwave_common::bitmap::Bitmap; use risingwave_pb::connector_service::SinkMetadata; use risingwave_rpc_client::CoordinatorStreamHandle; +use thiserror_ext::AsReport; use tracing::warn; use super::SinkCoordinationRpcClientEnum; @@ -81,6 +83,23 @@ impl>> SinkWriter for Coordi } async fn update_vnode_bitmap(&mut self, vnode_bitmap: Arc) -> Result<()> { + self.coordinator_stream_handle + .update_vnode_bitmap(&vnode_bitmap) + .await?; self.inner.update_vnode_bitmap(vnode_bitmap).await } } + +impl>> Drop for CoordinatedSinkWriter { + fn drop(&mut self) { + match self.coordinator_stream_handle.stop().now_or_never() { + None => { + warn!("unable to send stop due to channel full") + } + Some(Err(e)) => { + warn!(e = ?e.as_report(), "failed to stop the coordinator"); + } + Some(Ok(_)) => {} + } + } +} diff --git a/src/connector/src/sink/decouple_checkpoint_log_sink.rs b/src/connector/src/sink/decouple_checkpoint_log_sink.rs index 4ba57e3adda7a..61a2f0f70fd05 100644 --- a/src/connector/src/sink/decouple_checkpoint_log_sink.rs +++ b/src/connector/src/sink/decouple_checkpoint_log_sink.rs @@ -65,7 +65,7 @@ impl> LogSinker for DecoupleCheckpointLogSink EpochBegun { curr_epoch: u64 }, /// Mark that the consumer has just received a barrier - BarrierReceived { prev_epoch: u64 }, + BarrierReceived { prev_epoch: u64, committed: bool }, } let mut state = LogConsumerState::Uninitialized; @@ -75,15 +75,34 @@ impl> LogSinker for DecoupleCheckpointLogSink loop { let (epoch, item): (u64, LogStoreReadItem) = log_reader.next_item().await?; - if let LogStoreReadItem::UpdateVnodeBitmap(_) = &item { - match &state { - LogConsumerState::BarrierReceived { .. } => {} + if let LogStoreReadItem::UpdateVnodeBitmap(vnode_bitmap) = &item { + match &mut state { + LogConsumerState::BarrierReceived { + committed, + prev_epoch, + } => { + if !*committed { + // force commit on update vnode bitmap + let start_time = Instant::now(); + sink_writer.barrier(true).await?; + sink_metrics + .sink_commit_duration_metrics + .observe(start_time.elapsed().as_millis() as f64); + log_reader.truncate(TruncateOffset::Barrier { epoch: *prev_epoch })?; + current_checkpoint = 0; + *committed = true; + } + sink_writer + .update_vnode_bitmap(vnode_bitmap.clone()) + .await?; + } _ => unreachable!( "update vnode bitmap can be accepted only right after \ barrier, but current state is {:?}", state ), } + continue; } // begin_epoch when not previously began state = match state { @@ -100,7 +119,7 @@ impl> LogSinker for DecoupleCheckpointLogSink ); LogConsumerState::EpochBegun { curr_epoch: epoch } } - LogConsumerState::BarrierReceived { prev_epoch } => { + LogConsumerState::BarrierReceived { prev_epoch, .. } => { assert!( epoch > prev_epoch, "new epoch {} should be greater than prev epoch {}", @@ -123,7 +142,7 @@ impl> LogSinker for DecoupleCheckpointLogSink LogConsumerState::EpochBegun { curr_epoch } => curr_epoch, _ => unreachable!("epoch must have begun before handling barrier"), }; - if is_checkpoint { + let committed = if is_checkpoint { current_checkpoint += 1; if current_checkpoint >= commit_checkpoint_interval.get() { let start_time = Instant::now(); @@ -133,16 +152,22 @@ impl> LogSinker for DecoupleCheckpointLogSink .observe(start_time.elapsed().as_millis() as f64); log_reader.truncate(TruncateOffset::Barrier { epoch })?; current_checkpoint = 0; + true } else { sink_writer.barrier(false).await?; + false } } else { sink_writer.barrier(false).await?; + false + }; + state = LogConsumerState::BarrierReceived { + prev_epoch, + committed, } - state = LogConsumerState::BarrierReceived { prev_epoch } } - LogStoreReadItem::UpdateVnodeBitmap(vnode_bitmap) => { - sink_writer.update_vnode_bitmap(vnode_bitmap).await?; + LogStoreReadItem::UpdateVnodeBitmap(_) => { + unreachable!("should have been handle earlier") } } } diff --git a/src/meta/src/manager/sink_coordination/coordinator_worker.rs b/src/meta/src/manager/sink_coordination/coordinator_worker.rs index 8409e714852c2..63ca41b43f649 100644 --- a/src/meta/src/manager/sink_coordination/coordinator_worker.rs +++ b/src/meta/src/manager/sink_coordination/coordinator_worker.rs @@ -12,64 +12,191 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; +use std::collections::{BTreeMap, HashMap, HashSet}; +use std::future::{poll_fn, Future}; use std::pin::pin; +use std::task::Poll; +use std::time::{Duration, Instant}; use anyhow::anyhow; use futures::future::{select, Either}; -use futures::stream::FuturesUnordered; -use futures::{StreamExt, TryStreamExt}; +use futures::pin_mut; +use itertools::Itertools; use risingwave_common::bitmap::Bitmap; -use risingwave_common::hash::{VirtualNode, VnodeBitmapExt}; +use risingwave_common::hash::VirtualNode; use risingwave_connector::dispatch_sink; use risingwave_connector::sink::{build_sink, Sink, SinkCommitCoordinator, SinkParam}; -use risingwave_pb::connector_service::coordinate_request::CommitRequest; -use risingwave_pb::connector_service::coordinate_response::{ - CommitResponse, StartCoordinationResponse, -}; -use risingwave_pb::connector_service::{ - coordinate_request, coordinate_response, CoordinateRequest, CoordinateResponse, SinkMetadata, -}; +use risingwave_pb::connector_service::SinkMetadata; use thiserror_ext::AsReport; +use tokio::select; use tokio::sync::mpsc::UnboundedReceiver; +use tokio::time::sleep; use tonic::Status; use tracing::{error, warn}; -use crate::manager::sink_coordination::{ - NewSinkWriterRequest, SinkCoordinatorResponseSender, SinkWriterRequestStream, -}; +use crate::manager::sink_coordination::handle::SinkWriterCoordinationHandle; -macro_rules! send_await_with_err_check { - ($tx:expr, $msg:expr) => { - if $tx.send($msg).await.is_err() { - error!("unable to send msg"); +async fn run_future_with_periodic_fn( + future: F, + interval: Duration, + mut f: impl FnMut(), +) -> F::Output { + pin_mut!(future); + loop { + match select(&mut future, pin!(sleep(interval))).await { + Either::Left((output, _)) => { + break output; + } + Either::Right(_) => f(), } - }; + } } -pub struct CoordinatorWorker { +struct EpochCommitRequests { + epoch: u64, + metadatas: Vec, + handle_ids: HashSet, + bitmap: Bitmap, +} + +impl EpochCommitRequests { + fn new(epoch: u64) -> Self { + Self { + epoch, + metadatas: vec![], + handle_ids: Default::default(), + bitmap: Bitmap::zeros(VirtualNode::COUNT), + } + } + + fn add_new_request( + &mut self, + handle_id: usize, + metadata: SinkMetadata, + vnode_bitmap: Bitmap, + ) -> anyhow::Result<()> { + self.metadatas.push(metadata); + assert!(self.handle_ids.insert(handle_id)); + let check_bitmap = (&self.bitmap) & &vnode_bitmap; + if check_bitmap.count_ones() > 0 { + return Err(anyhow!( + "duplicate vnode {:?} on epoch {}. request vnode: {:?}, prev vnode: {:?}", + check_bitmap.iter_ones().collect_vec(), + self.epoch, + vnode_bitmap, + self.bitmap + )); + } + self.bitmap |= &vnode_bitmap; + Ok(()) + } + + fn can_commit(&self) -> bool { + self.bitmap.count_ones() == VirtualNode::COUNT + } +} + +struct CoordinationHandleManager { param: SinkParam, - request_streams: Vec, - response_senders: Vec, - request_rx: UnboundedReceiver, + writer_handles: HashMap, + next_handle_id: usize, + request_rx: UnboundedReceiver, +} + +impl CoordinationHandleManager { + fn ack_commit( + &mut self, + epoch: u64, + handle_ids: impl IntoIterator, + ) -> anyhow::Result<()> { + for handle_id in handle_ids { + let handle = self.writer_handles.get_mut(&handle_id).ok_or_else(|| { + anyhow!( + "fail to find handle for {} when ack commit on epoch {}", + handle_id, + epoch + ) + })?; + handle.ack_commit(epoch).map_err(|_| { + anyhow!( + "fail to ack commit on epoch {} for handle {}", + epoch, + handle_id + ) + })?; + } + Ok(()) + } + + async fn next_commit_request_inner( + writer_handles: &mut HashMap, + ) -> anyhow::Result<(usize, Bitmap, u64, SinkMetadata)> { + poll_fn(|cx| 'outer: loop { + for (handle_id, handle) in writer_handles.iter_mut() { + if let Poll::Ready(result) = handle.poll_next_commit_request(cx) { + match result { + Ok(Some((epoch, metadata))) => { + return Poll::Ready(Ok(( + *handle_id, + handle.vnode_bitmap().clone(), + epoch, + metadata, + ))); + } + Ok(None) => { + let handle_id = *handle_id; + writer_handles.remove(&handle_id); + continue 'outer; + } + Err(e) => { + return Poll::Ready(Err(e)); + } + } + } + } + return Poll::Pending; + }) + .await + } + + async fn next_commit_request(&mut self) -> anyhow::Result<(usize, Bitmap, u64, SinkMetadata)> { + loop { + select! { + handle = self.request_rx.recv() => { + let mut handle = handle.ok_or_else(|| anyhow!("end of writer request stream"))?; + if handle.param() != &self.param { + warn!(prev_param = ?self.param, new_param = ?handle.param(), "sink param mismatch"); + } + handle.start()?; + let handle_id = self.next_handle_id; + self.next_handle_id += 1; + self.writer_handles.insert(handle_id, handle); + } + result = Self::next_commit_request_inner(&mut self.writer_handles) => { + break result; + } + } + } + } +} + +pub struct CoordinatorWorker { + handle_manager: CoordinationHandleManager, + pending_epochs: BTreeMap, } impl CoordinatorWorker { pub async fn run( - first_writer_request: NewSinkWriterRequest, - request_rx: UnboundedReceiver, + param: SinkParam, + request_rx: UnboundedReceiver, ) { - let sink = match build_sink(first_writer_request.param.clone()) { + let sink = match build_sink(param.clone()) { Ok(sink) => sink, Err(e) => { error!( error = %e.as_report(), "unable to build sink with param {:?}", - first_writer_request.param - ); - send_await_with_err_check!( - first_writer_request.response_tx, - Err(Status::invalid_argument("failed to build sink")) + param ); return; } @@ -81,247 +208,76 @@ impl CoordinatorWorker { error!( error = %e.as_report(), "unable to build coordinator with param {:?}", - first_writer_request.param - ); - send_await_with_err_check!( - first_writer_request.response_tx, - Err(Status::invalid_argument("failed to build coordinator")) + param ); return; } }; - Self::execute_coordinator(first_writer_request, request_rx, coordinator).await + Self::execute_coordinator(param, request_rx, coordinator).await }); } pub async fn execute_coordinator( - first_writer_request: NewSinkWriterRequest, - request_rx: UnboundedReceiver, + param: SinkParam, + request_rx: UnboundedReceiver, coordinator: impl SinkCommitCoordinator, ) { let mut worker = CoordinatorWorker { - param: first_writer_request.param, - request_streams: vec![first_writer_request.request_stream], - response_senders: vec![first_writer_request.response_tx], - request_rx, + handle_manager: CoordinationHandleManager { + param, + writer_handles: HashMap::new(), + next_handle_id: 0, + request_rx, + }, + pending_epochs: Default::default(), }; - if let Err(e) = worker - .wait_for_writers(first_writer_request.vnode_bitmap) - .await - { - error!(error = %e.as_report(), "failed to wait for all writers"); - worker - .send_to_all_sink_writers(|| { - Err(Status::cancelled("failed to wait for all writers")) - }) - .await; - } - - worker.start_coordination(coordinator).await; - } - - async fn send_to_all_sink_writers( - &mut self, - new_msg: impl Fn() -> Result, - ) { - for sender in &self.response_senders { - send_await_with_err_check!(sender, new_msg()); - } - } - - async fn next_new_writer(&mut self) -> anyhow::Result { - // TODO: add timeout log - match select( - pin!(self.request_rx.recv()), - pin!(FuturesUnordered::from_iter( - self.request_streams - .iter_mut() - .map(|stream| stream.try_next()), - ) - .next()), - ) - .await - { - Either::Left((Some(req), _)) => Ok(req), - Either::Left((None, _)) => Err(anyhow!("manager request stream reaches the end")), - Either::Right((Some(Ok(Some(request))), _)) => Err(anyhow!( - "get new request from sink writer before initialize: {:?}", - request - )), - Either::Right((Some(Ok(None)), _)) => Err(anyhow!( - "one sink writer stream reaches the end before initialize" - )), - Either::Right((Some(Err(e)), _)) => { - Err(anyhow!(e).context("unable to poll one sink writer stream")) + if let Err(e) = worker.run_coordination(coordinator).await { + for handle in worker.handle_manager.writer_handles.into_values() { + handle.abort(Status::internal(format!( + "failed to run coordination: {:?}", + e.as_report() + ))) } - Either::Right((None, _)) => unreachable!("request_streams must not be empty"), } } - async fn wait_for_writers(&mut self, first_vnode_bitmap: Bitmap) -> anyhow::Result<()> { - let mut remaining_count = VirtualNode::COUNT; - let mut registered_vnode = HashSet::with_capacity(VirtualNode::COUNT); - - for vnode in first_vnode_bitmap.iter_vnodes() { - remaining_count -= 1; - registered_vnode.insert(vnode); - } - - while remaining_count > 0 { - let new_writer_request = self.next_new_writer().await?; - if self.param != new_writer_request.param { - // TODO: may return error. - warn!( - "get different param {:?} while current param {:?}", - new_writer_request.param, self.param - ); - } - self.request_streams.push(new_writer_request.request_stream); - self.response_senders.push(new_writer_request.response_tx); - - for vnode in new_writer_request.vnode_bitmap.iter_vnodes() { - if registered_vnode.contains(&vnode) { - return Err(anyhow!( - "get overlapped vnode: {}, current vnode {:?}", - vnode, - registered_vnode - )); - } - registered_vnode.insert(vnode); - remaining_count -= 1; - } - } - - self.send_to_all_sink_writers(|| { - Ok(CoordinateResponse { - msg: Some(coordinate_response::Msg::StartResponse( - StartCoordinationResponse {}, - )), - }) - }) - .await; - Ok(()) - } - - async fn collect_all_metadata(&mut self) -> anyhow::Result<(u64, Vec)> { - let mut epoch = None; - let mut metadata_list = Vec::with_capacity(self.request_streams.len()); - let mut uncollected_futures = FuturesUnordered::from_iter( - self.request_streams - .iter_mut() - .map(|stream| stream.try_next()), - ); - + async fn run_coordination( + &mut self, + mut coordinator: impl SinkCommitCoordinator, + ) -> anyhow::Result<()> { loop { - match select( - pin!(self.request_rx.recv()), - pin!(uncollected_futures.next()), - ) - .await + let (handle_id, vnode_bitmap, epoch, metadata) = + self.handle_manager.next_commit_request().await?; + self.pending_epochs + .entry(epoch) + .or_insert_with(|| EpochCommitRequests::new(epoch)) + .add_new_request(handle_id, metadata, vnode_bitmap)?; + if self + .pending_epochs + .first_key_value() + .expect("non-empty") + .1 + .can_commit() { - Either::Left((Some(new_request), _)) => { - warn!("get new writer request while collecting metadata"); - send_await_with_err_check!( - new_request.response_tx, - Err(Status::already_exists( - "coordinator already running, should not get new request" - )) - ); - continue; - } - Either::Left((None, _)) => { - return Err(anyhow!( - "coordinator get notified to stop while collecting metadata" - )); - } - Either::Right((Some(next_result), _)) => match next_result { - Ok(Some(CoordinateRequest { - msg: - Some(coordinate_request::Msg::CommitRequest(CommitRequest { - epoch: request_epoch, - metadata: Some(metadata), - })), - })) => { - match &epoch { - Some(epoch) => { - if *epoch != request_epoch { - warn!( - "current epoch is {} but get request from {}", - epoch, request_epoch - ); - } - } - None => { - epoch = Some(request_epoch); - } - } - metadata_list.push(metadata); - } - Ok(Some(req)) => { - return Err(anyhow!("expect commit request but get {:?}", req)); - } - Ok(None) => { - return Err(anyhow!( - "sink writer input reaches the end while collecting metadata" - )); - } - Err(e) => { - return Err( - anyhow!(e).context("failed to poll one of the writer request streams") - ); - } - }, - Either::Right((None, _)) => { - break; - } - } - } - Ok(( - epoch.expect("should not be empty when have at least one writer"), - metadata_list, - )) - } - - async fn start_coordination(&mut self, mut coordinator: impl SinkCommitCoordinator) { - let result: Result<(), String> = try { - coordinator.init().await.map_err(|e| { - error!(error = %e.as_report(), "failed to initialize coordinator"); - format!("failed to initialize coordinator: {:?}", e.as_report()) - })?; - loop { - let (epoch, metadata_list) = self.collect_all_metadata().await.map_err(|e| { - error!(error = %e.as_report(), "failed to collect all metadata"); - format!("failed to collect all metadata: {:?}", e.as_report()) - })?; + let (epoch, requests) = self.pending_epochs.pop_first().expect("non-empty"); // TODO: measure commit time - coordinator - .commit(epoch, metadata_list) - .await - .map_err(|e| { - error!(epoch, error = %e.as_report(), "failed to commit metadata of epoch"); - format!("failed to commit: {:?}", e.as_report()) - })?; - - self.send_to_all_sink_writers(|| { - Ok(CoordinateResponse { - msg: Some(coordinate_response::Msg::CommitResponse(CommitResponse { - epoch, - })), - }) - }) - .await; + let start_time = Instant::now(); + run_future_with_periodic_fn( + coordinator.commit(epoch, requests.metadatas), + Duration::from_secs(5), + || { + warn!( + elapsed = ?start_time.elapsed(), + sink_id = self.handle_manager.param.sink_id.sink_id, + "committing" + ); + }, + ) + .await + .map_err(|e| anyhow!(e))?; + self.handle_manager.ack_commit(epoch, requests.handle_ids)?; } - }; - - if let Err(err_str) = result { - self.send_to_all_sink_writers(|| { - Err(Status::aborted(format!( - "failed to run coordination: {}", - err_str - ))) - }) - .await; } } } diff --git a/src/meta/src/manager/sink_coordination/handle.rs b/src/meta/src/manager/sink_coordination/handle.rs new file mode 100644 index 0000000000000..60b49cfd623ab --- /dev/null +++ b/src/meta/src/manager/sink_coordination/handle.rs @@ -0,0 +1,139 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::pin::pin; +use std::task::{Context, Poll}; + +use anyhow::anyhow; +use futures::{Future, TryStreamExt}; +use risingwave_common::bitmap::Bitmap; +use risingwave_connector::sink::SinkParam; +use risingwave_pb::connector_service::coordinate_response::{ + CommitResponse, StartCoordinationResponse, +}; +use risingwave_pb::connector_service::{ + coordinate_request, coordinate_response, CoordinateResponse, SinkMetadata, +}; +use tonic::Status; + +use crate::manager::sink_coordination::{SinkCoordinatorResponseSender, SinkWriterRequestStream}; + +pub(super) struct SinkWriterCoordinationHandle { + request_stream: SinkWriterRequestStream, + response_tx: SinkCoordinatorResponseSender, + param: SinkParam, + vnode_bitmap: Bitmap, + prev_epoch: Option, +} + +impl SinkWriterCoordinationHandle { + pub(super) fn new( + request_stream: SinkWriterRequestStream, + response_tx: SinkCoordinatorResponseSender, + param: SinkParam, + vnode_bitmap: Bitmap, + ) -> Self { + Self { + request_stream, + response_tx, + param, + vnode_bitmap, + prev_epoch: None, + } + } + + pub(super) fn param(&self) -> &SinkParam { + &self.param + } + + pub(super) fn vnode_bitmap(&self) -> &Bitmap { + &self.vnode_bitmap + } + + pub(super) fn start(&mut self) -> anyhow::Result<()> { + self.response_tx + .send(Ok(CoordinateResponse { + msg: Some(coordinate_response::Msg::StartResponse( + StartCoordinationResponse {}, + )), + })) + .map_err(|_| anyhow!("fail to send start response")) + } + + pub(super) fn abort(self, status: Status) { + let _ = self.response_tx.send(Err(status)); + } + + pub(super) fn ack_commit(&mut self, epoch: u64) -> anyhow::Result<()> { + self.response_tx + .send(Ok(CoordinateResponse { + msg: Some(coordinate_response::Msg::CommitResponse(CommitResponse { + epoch, + })), + })) + .map_err(|_| anyhow!("fail to send commit response of epoch {}", epoch)) + } + + pub(super) fn poll_next_commit_request( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { + let future = self.next_commit_request(); + let future = pin!(future); + future.poll(cx) + } + + async fn next_commit_request(&mut self) -> anyhow::Result> { + loop { + let request = self + .request_stream + .try_next() + .await? + .ok_or_else(|| anyhow!("end of request stream"))?; + match request.msg.ok_or_else(|| anyhow!("None msg in request"))? { + coordinate_request::Msg::StartRequest(_) => { + return Err(anyhow!("should have started")); + } + coordinate_request::Msg::CommitRequest(request) => { + if let Some(prev_epoch) = self.prev_epoch { + if request.epoch < prev_epoch { + return Err(anyhow!( + "invalid commit epoch {}, prev_epoch {}", + request.epoch, + prev_epoch + )); + } + } + let Some(metadata) = request.metadata else { + return Err(anyhow!("empty commit metadata")); + }; + self.prev_epoch = Some(request.epoch); + return Ok(Some((request.epoch, metadata))); + } + coordinate_request::Msg::UpdateVnodeRequest(request) => { + let bitmap = Bitmap::from( + &request + .vnode_bitmap + .ok_or_else(|| anyhow!("empty vnode bitmap"))?, + ); + self.vnode_bitmap = bitmap; + continue; + } + coordinate_request::Msg::Stop(_) => { + return Ok(None); + } + } + } + } +} diff --git a/src/meta/src/manager/sink_coordination/manager.rs b/src/meta/src/manager/sink_coordination/manager.rs index fd2b986be28e7..2fe2e8bfb3b8c 100644 --- a/src/meta/src/manager/sink_coordination/manager.rs +++ b/src/meta/src/manager/sink_coordination/manager.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::hash_map::Entry; use std::collections::HashMap; use std::pin::pin; @@ -30,12 +29,13 @@ use tokio::sync::mpsc; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot::{channel, Receiver, Sender}; use tokio::task::{JoinError, JoinHandle}; -use tokio_stream::wrappers::ReceiverStream; +use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::Status; use tracing::{debug, error, info, warn}; use crate::manager::sink_coordination::coordinator_worker::CoordinatorWorker; -use crate::manager::sink_coordination::{NewSinkWriterRequest, SinkWriterRequestStream}; +use crate::manager::sink_coordination::handle::SinkWriterCoordinationHandle; +use crate::manager::sink_coordination::SinkWriterRequestStream; macro_rules! send_with_err_check { ($tx:expr, $msg:expr) => { @@ -56,7 +56,7 @@ macro_rules! send_await_with_err_check { const BOUNDED_CHANNEL_SIZE: usize = 16; enum ManagerRequest { - NewSinkWriter(NewSinkWriterRequest), + NewSinkWriter(SinkWriterCoordinationHandle), StopCoordinator { finish_notifier: Sender<()>, /// sink id to stop. When `None`, stop all sink coordinator @@ -71,11 +71,8 @@ pub struct SinkCoordinatorManager { impl SinkCoordinatorManager { pub fn start_worker() -> (Self, (JoinHandle<()>, Sender<()>)) { - Self::start_worker_with_spawn_worker(|writer_request, manager_request_stream| { - tokio::spawn(CoordinatorWorker::run( - writer_request, - manager_request_stream, - )) + Self::start_worker_with_spawn_worker(|param, manager_request_stream| { + tokio::spawn(CoordinatorWorker::run(param, manager_request_stream)) }) } @@ -111,14 +108,11 @@ impl SinkCoordinatorManager { ))); } }; - let (response_tx, response_rx) = mpsc::channel(BOUNDED_CHANNEL_SIZE); + let (response_tx, response_rx) = mpsc::unbounded_channel(); self.request_tx - .send(ManagerRequest::NewSinkWriter(NewSinkWriterRequest { - request_stream, - response_tx, - param, - vnode_bitmap, - })) + .send(ManagerRequest::NewSinkWriter( + SinkWriterCoordinationHandle::new(request_stream, response_tx, param, vnode_bitmap), + )) .await .map_err(|_| { Status::unavailable( @@ -126,7 +120,7 @@ impl SinkCoordinatorManager { ) })?; - Ok(ReceiverStream::new(response_rx)) + Ok(UnboundedReceiverStream::new(response_rx)) } async fn stop_coordinator(&self, sink_id: Option) { @@ -155,7 +149,7 @@ impl SinkCoordinatorManager { struct CoordinatorWorkerHandle { /// Sender to coordinator worker. Drop the sender as a stop signal - request_sender: Option>, + request_sender: Option>, /// Notify when the coordinator worker stops finish_notifiers: Vec>, } @@ -163,7 +157,7 @@ struct CoordinatorWorkerHandle { struct ManagerWorker { request_rx: mpsc::Receiver, // Make it option so that it can be polled with &mut SinkManagerWorker - shutdown_rx: Option>, + shutdown_rx: Receiver<()>, running_coordinator_worker_join_handles: FuturesUnordered)>>, @@ -178,7 +172,7 @@ enum ManagerEvent { }, } -trait SpawnCoordinatorFn = FnMut(NewSinkWriterRequest, UnboundedReceiver) -> JoinHandle<()> +trait SpawnCoordinatorFn = FnMut(SinkParam, UnboundedReceiver) -> JoinHandle<()> + Send + 'static; @@ -186,7 +180,7 @@ impl ManagerWorker { fn new(request_rx: mpsc::Receiver, shutdown_rx: Receiver<()>) -> Self { ManagerWorker { request_rx, - shutdown_rx: Some(shutdown_rx), + shutdown_rx, running_coordinator_worker_join_handles: Default::default(), running_coordinator_worker: Default::default(), } @@ -237,7 +231,6 @@ impl ManagerWorker { } async fn next_event(&mut self) -> Option { - let shutdown_rx = self.shutdown_rx.take().expect("should not be empty"); match select( select( pin!(self.request_rx.recv()), @@ -245,23 +238,20 @@ impl ManagerWorker { self.running_coordinator_worker_join_handles.next() )), ), - shutdown_rx, + &mut self.shutdown_rx, ) .await { - Either::Left((either, shutdown_rx)) => { - self.shutdown_rx = Some(shutdown_rx); - match either { - Either::Left((Some(request), _)) => Some(ManagerEvent::NewRequest(request)), - Either::Left((None, _)) => None, - Either::Right(((sink_id, join_result), _)) => { - Some(ManagerEvent::CoordinatorWorkerFinished { - sink_id, - join_result, - }) - } + Either::Left((either, _)) => match either { + Either::Left((Some(request), _)) => Some(ManagerEvent::NewRequest(request)), + Either::Left((None, _)) => None, + Either::Right(((sink_id, join_result), _)) => { + Some(ManagerEvent::CoordinatorWorkerFinished { + sink_id, + join_result, + }) } - } + }, Either::Right(_) => None, } } @@ -309,39 +299,39 @@ impl ManagerWorker { fn handle_new_sink_writer( &mut self, - request: NewSinkWriterRequest, + new_writer: SinkWriterCoordinationHandle, spawn_coordinator_worker: &mut impl SpawnCoordinatorFn, ) { - let param = &request.param; + let param = new_writer.param(); let sink_id = param.sink_id; - // Launch the coordinator worker task if it is the first - match self.running_coordinator_worker.entry(param.sink_id) { - Entry::Occupied(mut entry) => { - if let Some(sender) = entry.get_mut().request_sender.as_mut() { - send_with_err_check!(sender, request); - } else { - warn!( - "handle a new request while the sink coordinator is being stopped: {:?}", - param - ); - drop(request.response_tx); - } - } - Entry::Vacant(entry) => { + let handle = self + .running_coordinator_worker + .entry(param.sink_id) + .or_insert_with(|| { + // Launch the coordinator worker task if it is the first let (request_tx, request_rx) = unbounded_channel(); - let join_handle = spawn_coordinator_worker(request, request_rx); + let join_handle = spawn_coordinator_worker(param.clone(), request_rx); self.running_coordinator_worker_join_handles.push( join_handle .map(move |join_result| (sink_id, join_result)) .boxed(), ); - entry.insert(CoordinatorWorkerHandle { + CoordinatorWorkerHandle { request_sender: Some(request_tx), finish_notifiers: Vec::new(), - }); - } - }; + } + }); + + if let Some(sender) = handle.request_sender.as_mut() { + send_with_err_check!(sender, new_writer); + } else { + warn!( + "handle a new request while the sink coordinator is being stopped: {:?}", + param + ); + new_writer.abort(Status::internal("the sink is being stopped")); + } } } @@ -357,7 +347,7 @@ mod tests { use futures::{FutureExt, StreamExt}; use itertools::Itertools; use rand::seq::SliceRandom; - use risingwave_common::bitmap::{Bitmap, BitmapBuilder}; + use risingwave_common::bitmap::BitmapBuilder; use risingwave_common::hash::VirtualNode; use risingwave_connector::sink::catalog::{SinkId, SinkType}; use risingwave_connector::sink::{SinkCommitCoordinator, SinkError, SinkParam}; @@ -367,7 +357,7 @@ mod tests { use tokio_stream::wrappers::ReceiverStream; use crate::manager::sink_coordination::coordinator_worker::CoordinatorWorker; - use crate::manager::sink_coordination::{NewSinkWriterRequest, SinkCoordinatorManager}; + use crate::manager::sink_coordination::SinkCoordinatorManager; struct MockCoordinator, &mut C) -> Result<(), SinkError>> { context: C, @@ -434,16 +424,16 @@ mod tests { let (manager, (_join_handle, _stop_tx)) = SinkCoordinatorManager::start_worker_with_spawn_worker({ - let param = param.clone(); + let expected_param = param.clone(); let metadata = metadata.clone(); - move |first_request: NewSinkWriterRequest, new_writer_rx| { - let param = param.clone(); + move |param, new_writer_rx| { let metadata = metadata.clone(); + let expected_param = expected_param.clone(); tokio::spawn(async move { // validate the start request - assert_eq!(first_request.param, param); + assert_eq!(param, expected_param); CoordinatorWorker::execute_coordinator( - first_request, + param.clone(), new_writer_rx, MockCoordinator::new(0, |epoch, metadata_list, count: &mut usize| { *count += 1; @@ -497,14 +487,8 @@ mod tests { .unwrap() }; - let mut build_client_future1 = pin!(build_client(vnode1)); - assert!( - poll_fn(|cx| Poll::Ready(build_client_future1.as_mut().poll(cx))) - .await - .is_pending() - ); let (mut client1, mut client2) = - join(build_client_future1, pin!(build_client(vnode2))).await; + join(build_client(vnode1), pin!(build_client(vnode2))).await; { // commit epoch1 @@ -598,16 +582,16 @@ mod tests { let (manager, (_join_handle, _stop_tx)) = SinkCoordinatorManager::start_worker_with_spawn_worker({ - let param = param.clone(); + let expected_param = param.clone(); let metadata = metadata.clone(); - move |first_request: NewSinkWriterRequest, new_writer_rx| { - let param = param.clone(); + move |param, new_writer_rx| { let metadata = metadata.clone(); + let expected_param = expected_param.clone(); tokio::spawn(async move { // validate the start request - assert_eq!(first_request.param, param); + assert_eq!(param, expected_param); CoordinatorWorker::execute_coordinator( - first_request, + param.clone(), new_writer_rx, MockCoordinator::new(0, |epoch, metadata_list, count: &mut usize| { *count += 1; @@ -686,46 +670,6 @@ mod tests { .unwrap(); } - #[tokio::test] - async fn test_drop_sink_while_init() { - let sink_id = SinkId::from(1); - let param = SinkParam { - sink_id, - sink_name: "test".into(), - properties: Default::default(), - columns: vec![], - downstream_pk: vec![], - sink_type: SinkType::AppendOnly, - format_desc: None, - db_name: "test".into(), - sink_from_name: "test".into(), - }; - - let (manager, (_join_handle, _stop_tx)) = SinkCoordinatorManager::start_worker(); - - let mut build_client_future1 = pin!(CoordinatorStreamHandle::new_with_init_stream( - param.to_proto(), - Bitmap::zeros(VirtualNode::COUNT), - |rx| async { - Ok(tonic::Response::new( - manager - .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed()) - .await - .unwrap() - .boxed(), - )) - }, - )); - assert!( - poll_fn(|cx| Poll::Ready(build_client_future1.as_mut().poll(cx))) - .await - .is_pending() - ); - manager.stop_sink_coordinator(sink_id).await; - - assert!(build_client_future1.await.is_err()); - } - #[tokio::test] async fn test_partial_commit() { let param = SinkParam { @@ -757,14 +701,14 @@ mod tests { let (manager, (_join_handle, _stop_tx)) = SinkCoordinatorManager::start_worker_with_spawn_worker({ - let param = param.clone(); - move |first_request: NewSinkWriterRequest, new_writer_rx| { - let param = param.clone(); + let expected_param = param.clone(); + move |param, new_writer_rx| { + let expected_param = expected_param.clone(); tokio::spawn(async move { // validate the start request - assert_eq!(first_request.param, param); + assert_eq!(param, expected_param); CoordinatorWorker::execute_coordinator( - first_request, + param, new_writer_rx, MockCoordinator::new((), |_, _, _| unreachable!()), ) @@ -836,14 +780,14 @@ mod tests { let (manager, (_join_handle, _stop_tx)) = SinkCoordinatorManager::start_worker_with_spawn_worker({ - let param = param.clone(); - move |first_request: NewSinkWriterRequest, new_writer_rx| { - let param = param.clone(); + let expected_param = param.clone(); + move |param, new_writer_rx| { + let expected_param = expected_param.clone(); tokio::spawn(async move { // validate the start request - assert_eq!(first_request.param, param); + assert_eq!(param, expected_param); CoordinatorWorker::execute_coordinator( - first_request, + param, new_writer_rx, MockCoordinator::new((), |_, _, _| { Err(SinkError::Coordinator(anyhow!("failed to commit"))) @@ -897,4 +841,269 @@ mod tests { assert!(result1.is_err()); assert!(result2.is_err()); } + + #[tokio::test] + async fn test_update_vnode_bitmap() { + let param = SinkParam { + sink_id: SinkId::from(1), + sink_name: "test".into(), + properties: Default::default(), + columns: vec![], + downstream_pk: vec![], + sink_type: SinkType::AppendOnly, + format_desc: None, + db_name: "test".into(), + sink_from_name: "test".into(), + }; + + let epoch1 = 233; + let epoch2 = 234; + let epoch3 = 235; + let epoch4 = 236; + + let mut all_vnode = (0..VirtualNode::COUNT).collect_vec(); + all_vnode.shuffle(&mut rand::thread_rng()); + let (first, second) = all_vnode.split_at(VirtualNode::COUNT / 2); + let build_bitmap = |indexes: &[usize]| { + let mut builder = BitmapBuilder::zeroed(VirtualNode::COUNT); + for i in indexes { + builder.set(*i, true); + } + builder.finish() + }; + let vnode1 = build_bitmap(first); + let vnode2 = build_bitmap(second); + + let metadata = [ + [vec![1u8, 2u8], vec![3u8, 4u8]], + [vec![5u8, 6u8], vec![7u8, 8u8]], + ]; + + let metadata_scale_out = [vec![9u8, 10u8], vec![11u8, 12u8], vec![13u8, 14u8]]; + let metadata_scale_in = [vec![13u8, 14u8], vec![15u8, 16u8]]; + + let (manager, (_join_handle, _stop_tx)) = + SinkCoordinatorManager::start_worker_with_spawn_worker({ + let expected_param = param.clone(); + let metadata = metadata.clone(); + let metadata_scale_out = metadata_scale_out.clone(); + let metadata_scale_in = metadata_scale_in.clone(); + move |param, new_writer_rx| { + let metadata = metadata.clone(); + let metadata_scale_out = metadata_scale_out.clone(); + let metadata_scale_in = metadata_scale_in.clone(); + let expected_param = expected_param.clone(); + tokio::spawn(async move { + // validate the start request + assert_eq!(param, expected_param); + CoordinatorWorker::execute_coordinator( + param.clone(), + new_writer_rx, + MockCoordinator::new(0, |epoch, metadata_list, count: &mut usize| { + *count += 1; + let mut metadata_list = metadata_list + .into_iter() + .map(|metadata| match metadata { + SinkMetadata { + metadata: + Some(Metadata::Serialized(SerializedMetadata { + metadata, + })), + } => metadata, + _ => unreachable!(), + }) + .collect_vec(); + metadata_list.sort(); + let (expected_epoch, expected_metadata_list) = match *count { + 1 => (epoch1, metadata[0].as_slice()), + 2 => (epoch2, metadata[1].as_slice()), + 3 => (epoch3, metadata_scale_out.as_slice()), + 4 => (epoch4, metadata_scale_in.as_slice()), + _ => unreachable!(), + }; + assert_eq!(expected_epoch, epoch); + assert_eq!(expected_metadata_list, &metadata_list); + Ok(()) + }), + ) + .await; + }) + } + }); + + let build_client = |vnode| async { + CoordinatorStreamHandle::new_with_init_stream(param.to_proto(), vnode, |rx| async { + Ok(tonic::Response::new( + manager + .handle_new_request(ReceiverStream::new(rx).map(Ok).boxed()) + .await + .unwrap() + .boxed(), + )) + }) + .await + .unwrap() + }; + + let (mut client1, mut client2) = + join(build_client(vnode1), pin!(build_client(vnode2))).await; + + { + // commit epoch1 + let mut commit_future = pin!(client2 + .commit( + epoch1, + SinkMetadata { + metadata: Some(Metadata::Serialized(SerializedMetadata { + metadata: metadata[0][1].clone(), + })), + }, + ) + .map(|result| result.unwrap())); + assert!(poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx))) + .await + .is_pending()); + join( + commit_future, + client1 + .commit( + epoch1, + SinkMetadata { + metadata: Some(Metadata::Serialized(SerializedMetadata { + metadata: metadata[0][0].clone(), + })), + }, + ) + .map(|result| result.unwrap()), + ) + .await; + } + + let (vnode1, vnode2, vnode3) = { + let (first, second) = all_vnode.split_at(VirtualNode::COUNT / 3); + let (second, third) = second.split_at(VirtualNode::COUNT / 3); + ( + build_bitmap(first), + build_bitmap(second), + build_bitmap(third), + ) + }; + + let mut client3 = build_client(vnode3).await; + { + let mut commit_future3 = pin!(client3 + .commit( + epoch3, + SinkMetadata { + metadata: Some(Metadata::Serialized(SerializedMetadata { + metadata: metadata_scale_out[2].clone(), + })), + }, + ) + .map(|result| result.unwrap())); + assert!(poll_fn(|cx| Poll::Ready(commit_future3.as_mut().poll(cx))) + .await + .is_pending()); + + { + // commit epoch2 + let mut commit_future = pin!(client1 + .commit( + epoch2, + SinkMetadata { + metadata: Some(Metadata::Serialized(SerializedMetadata { + metadata: metadata[1][0].clone(), + })), + }, + ) + .map(|result| result.unwrap())); + assert!(poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx))) + .await + .is_pending()); + join( + commit_future, + client2 + .commit( + epoch2, + SinkMetadata { + metadata: Some(Metadata::Serialized(SerializedMetadata { + metadata: metadata[1][1].clone(), + })), + }, + ) + .map(|result| result.unwrap()), + ) + .await; + } + + client1.update_vnode_bitmap(&vnode1).await.unwrap(); + client2.update_vnode_bitmap(&vnode2).await.unwrap(); + let mut commit_future1 = pin!(client1 + .commit( + epoch3, + SinkMetadata { + metadata: Some(Metadata::Serialized(SerializedMetadata { + metadata: metadata_scale_out[0].clone(), + })), + }, + ) + .map(|result| result.unwrap())); + assert!(poll_fn(|cx| Poll::Ready(commit_future1.as_mut().poll(cx))) + .await + .is_pending()); + assert!(poll_fn(|cx| Poll::Ready(commit_future3.as_mut().poll(cx))) + .await + .is_pending()); + client2 + .commit( + epoch3, + SinkMetadata { + metadata: Some(Metadata::Serialized(SerializedMetadata { + metadata: metadata_scale_out[1].clone(), + })), + }, + ) + .map(|result| result.unwrap()) + .await; + } + + let (vnode2, vnode3) = { + let (first, second) = all_vnode.split_at(VirtualNode::COUNT / 3); + (build_bitmap(first), build_bitmap(second)) + }; + + // client1.stop().await.unwrap(); + client2.update_vnode_bitmap(&vnode2).await.unwrap(); + client3.update_vnode_bitmap(&vnode3).await.unwrap(); + + { + let mut commit_future = pin!(client2 + .commit( + epoch4, + SinkMetadata { + metadata: Some(Metadata::Serialized(SerializedMetadata { + metadata: metadata_scale_in[0].clone(), + })), + }, + ) + .map(|result| result.unwrap())); + assert!(poll_fn(|cx| Poll::Ready(commit_future.as_mut().poll(cx))) + .await + .is_pending()); + join( + commit_future, + client3 + .commit( + epoch4, + SinkMetadata { + metadata: Some(Metadata::Serialized(SerializedMetadata { + metadata: metadata_scale_in[1].clone(), + })), + }, + ) + .map(|result| result.unwrap()), + ) + .await; + } + } } diff --git a/src/meta/src/manager/sink_coordination/mod.rs b/src/meta/src/manager/sink_coordination/mod.rs index ab44965891d5f..2f5f4d6ba62b1 100644 --- a/src/meta/src/manager/sink_coordination/mod.rs +++ b/src/meta/src/manager/sink_coordination/mod.rs @@ -13,22 +13,14 @@ // limitations under the License. mod coordinator_worker; +mod handle; mod manager; use futures::stream::BoxStream; pub use manager::SinkCoordinatorManager; -use risingwave_common::bitmap::Bitmap; -use risingwave_connector::sink::SinkParam; use risingwave_pb::connector_service::{CoordinateRequest, CoordinateResponse}; -use tokio::sync::mpsc::Sender; +use tokio::sync::mpsc::UnboundedSender; use tonic::Status; pub type SinkWriterRequestStream = BoxStream<'static, Result>; -pub type SinkCoordinatorResponseSender = Sender>; - -pub struct NewSinkWriterRequest { - pub request_stream: SinkWriterRequestStream, - pub response_tx: SinkCoordinatorResponseSender, - pub param: SinkParam, - pub vnode_bitmap: Bitmap, -} +pub type SinkCoordinatorResponseSender = UnboundedSender>; diff --git a/src/rpc_client/src/sink_coordinate_client.rs b/src/rpc_client/src/sink_coordinate_client.rs index 06602ef4db3b7..8823dd440bc77 100644 --- a/src/rpc_client/src/sink_coordinate_client.rs +++ b/src/rpc_client/src/sink_coordinate_client.rs @@ -18,7 +18,7 @@ use anyhow::anyhow; use futures::{Stream, TryStreamExt}; use risingwave_common::bitmap::Bitmap; use risingwave_pb::connector_service::coordinate_request::{ - CommitRequest, StartCoordinationRequest, + CommitRequest, StartCoordinationRequest, UpdateVnodeBitmapRequest, }; use risingwave_pb::connector_service::{ coordinate_request, coordinate_response, CoordinateRequest, CoordinateResponse, PbSinkParam, @@ -99,4 +99,24 @@ impl CoordinatorStreamHandle { msg => Err(anyhow!("should get commit response but get {:?}", msg)), } } + + pub async fn update_vnode_bitmap(&mut self, vnode_bitmap: &Bitmap) -> anyhow::Result<()> { + self.send_request(CoordinateRequest { + msg: Some(coordinate_request::Msg::UpdateVnodeRequest( + UpdateVnodeBitmapRequest { + vnode_bitmap: Some(vnode_bitmap.to_protobuf()), + }, + )), + }) + .await?; + Ok(()) + } + + pub async fn stop(&mut self) -> anyhow::Result<()> { + self.send_request(CoordinateRequest { + msg: Some(coordinate_request::Msg::Stop(true)), + }) + .await?; + Ok(()) + } }