Skip to content

Commit

Permalink
feat(sink): implement sink coordinate service (#11359)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Aug 9, 2023
1 parent 55e3dde commit bcd79a8
Show file tree
Hide file tree
Showing 23 changed files with 1,500 additions and 30 deletions.
37 changes: 37 additions & 0 deletions proto/connector_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ syntax = "proto3";
package connector_service;

import "catalog.proto";
import "common.proto";
import "data.proto";
import "plan_common.proto";

Expand Down Expand Up @@ -183,3 +184,39 @@ service ConnectorService {
rpc GetEventStream(GetEventStreamRequest) returns (stream GetEventStreamResponse);
rpc ValidateSource(ValidateSourceRequest) returns (ValidateSourceResponse);
}

message CoordinateRequest {
// The first request that starts a coordination between sink writer and coordinator.
// The service will respond after sink writers of all vnodes have sent the request.
message StartCoordinationRequest {
common.Buffer vnode_bitmap = 1;
SinkParam param = 2;
}

message CommitRequest {
uint64 epoch = 1;
SinkMetadata metadata = 2;
}

oneof msg {
StartCoordinationRequest start_request = 1;
CommitRequest commit_request = 2;
}
}

message CoordinateResponse {
message StartCoordinationResponse {}

message CommitResponse {
uint64 epoch = 1;
}

oneof msg {
StartCoordinationResponse start_response = 1;
CommitResponse commit_response = 2;
}
}

service SinkCoordinationService {
rpc Coordinate(stream CoordinateRequest) returns (stream CoordinateResponse);
}
1 change: 1 addition & 0 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ pub async fn compute_node_serve(
dml_mgr,
system_params_manager.clone(),
source_metrics,
meta_client.clone(),
);

// Generally, one may use `risedev ctl trace` to manually get the trace reports. However, if
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#![feature(type_alias_impl_trait)]
#![feature(return_position_impl_trait_in_trait)]
#![feature(async_fn_in_trait)]
#![feature(associated_type_defaults)]

use std::time::Duration;

Expand Down
87 changes: 87 additions & 0 deletions src/connector/src/sink/coordinate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::anyhow;
use risingwave_common::array::StreamChunk;
use risingwave_common::buffer::Bitmap;
use risingwave_pb::connector_service::SinkMetadata;
use risingwave_rpc_client::{CoordinatorStreamHandle, SinkCoordinationRpcClient};
use tracing::warn;

use crate::sink::{Result, SinkError, SinkParam, SinkWriter};

pub struct CoordinatedSinkWriter<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> {
epoch: u64,
coordinator_stream_handle: CoordinatorStreamHandle,
inner: W,
}

impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> CoordinatedSinkWriter<W> {
pub async fn new(
client: SinkCoordinationRpcClient,
param: SinkParam,
vnode_bitmap: Bitmap,
inner: W,
) -> Result<Self> {
Ok(Self {
epoch: 0,
coordinator_stream_handle: CoordinatorStreamHandle::new(
client,
param.to_proto(),
vnode_bitmap,
)
.await?,
inner,
})
}
}

#[async_trait::async_trait]
impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> SinkWriter for CoordinatedSinkWriter<W> {
async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
self.epoch = epoch;
self.inner.begin_epoch(epoch).await
}

async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
self.inner.write_batch(chunk).await
}

async fn barrier(&mut self, is_checkpoint: bool) -> Result<Self::CommitMetadata> {
let metadata = self.inner.barrier(is_checkpoint).await?;
if is_checkpoint {
let metadata = metadata.ok_or(SinkError::Coordinator(anyhow!(
"should get metadata on checkpoint barrier"
)))?;
// TODO: add metrics to measure time to commit
self.coordinator_stream_handle
.commit(self.epoch, metadata)
.await?;
Ok(())
} else {
if metadata.is_some() {
warn!("get metadata on non-checkpoint barrier");
}
Ok(())
}
}

async fn abort(&mut self) -> Result<()> {
self.inner.abort().await
}

async fn update_vnode_bitmap(&mut self, vnode_bitmap: Bitmap) -> Result<()> {
self.inner.update_vnode_bitmap(vnode_bitmap).await
}
}
17 changes: 10 additions & 7 deletions src/connector/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

pub mod catalog;
pub mod clickhouse;
pub mod coordinate;
pub mod iceberg;
pub mod kafka;
pub mod kinesis;
Expand All @@ -34,7 +35,7 @@ use risingwave_common::error::{anyhow_error, ErrorCode, RwError};
use risingwave_pb::catalog::PbSinkType;
use risingwave_pb::connector_service::{PbSinkParam, SinkMetadata, TableSchema};
use risingwave_rpc_client::error::RpcError;
use risingwave_rpc_client::ConnectorClient;
use risingwave_rpc_client::{ConnectorClient, MetaClient};
use thiserror::Error;
pub use tracing;

Expand All @@ -57,7 +58,7 @@ pub const SINK_TYPE_DEBEZIUM: &str = "debezium";
pub const SINK_TYPE_UPSERT: &str = "upsert";
pub const SINK_USER_FORCE_APPEND_ONLY_OPTION: &str = "force_append_only";

#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SinkParam {
pub sink_id: SinkId,
pub properties: HashMap<String, String>,
Expand Down Expand Up @@ -119,16 +120,17 @@ impl From<SinkCatalog> for SinkParam {
}
}

#[derive(Clone)]
#[derive(Clone, Default)]
pub struct SinkWriterParam {
pub connector_params: ConnectorParams,
pub executor_id: u64,
pub vnode_bitmap: Option<Bitmap>,
pub meta_client: Option<MetaClient>,
}

#[async_trait]
pub trait Sink {
type Writer: SinkWriter;
type Writer: SinkWriter<CommitMetadata = ()>;
type Coordinator: SinkCommitCoordinator;

async fn validate(&self, client: Option<ConnectorClient>) -> Result<()>;
Expand All @@ -142,7 +144,8 @@ pub trait Sink {
}

#[async_trait]
pub trait SinkWriter: Send {
pub trait SinkWriter: Send + 'static {
type CommitMetadata: Send = ();
/// Begin a new epoch
async fn begin_epoch(&mut self, epoch: u64) -> Result<()>;

Expand All @@ -151,7 +154,7 @@ pub trait SinkWriter: Send {

/// Receive a barrier and mark the end of current epoch. When `is_checkpoint` is true, the sink
/// writer should commit the current epoch.
async fn barrier(&mut self, is_checkpoint: bool) -> Result<()>;
async fn barrier(&mut self, is_checkpoint: bool) -> Result<Self::CommitMetadata>;

/// Clean up
async fn abort(&mut self) -> Result<()>;
Expand All @@ -162,7 +165,7 @@ pub trait SinkWriter: Send {

#[async_trait]
// An old version of SinkWriter for backward compatibility
pub trait SinkWriterV1: Send {
pub trait SinkWriterV1: Send + 'static {
async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()>;

// the following interface is for transactions, if not supported, return Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ impl RemoteSinkWriter {
use futures::StreamExt;
use tokio_stream::wrappers::UnboundedReceiverStream;

let stream_handle = SinkWriterStreamHandle::new(
let stream_handle = SinkWriterStreamHandle::for_test(
request_sender,
UnboundedReceiverStream::new(response_receiver).boxed(),
);
Expand Down
5 changes: 5 additions & 0 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use self::progress::TrackingCommand;
use crate::barrier::progress::CreateMviewProgressTracker;
use crate::barrier::BarrierEpochState::{Completed, InFlight};
use crate::hummock::HummockManagerRef;
use crate::manager::sink_coordination::SinkCoordinatorManager;
use crate::manager::{
CatalogManagerRef, ClusterManagerRef, FragmentManagerRef, LocalNotification, MetaSrvEnv,
WorkerId,
Expand Down Expand Up @@ -144,6 +145,8 @@ pub struct GlobalBarrierManager<S: MetaStore> {

source_manager: SourceManagerRef<S>,

sink_manager: SinkCoordinatorManager,

metrics: Arc<MetaMetrics>,

pub(crate) env: MetaSrvEnv<S>,
Expand Down Expand Up @@ -493,6 +496,7 @@ where
fragment_manager: FragmentManagerRef<S>,
hummock_manager: HummockManagerRef<S>,
source_manager: SourceManagerRef<S>,
sink_manager: SinkCoordinatorManager,
metrics: Arc<MetaMetrics>,
) -> Self {
let enable_recovery = env.opts.enable_recovery;
Expand All @@ -509,6 +513,7 @@ where
fragment_manager,
hummock_manager,
source_manager,
sink_manager,
metrics,
env,
tracker: Mutex::new(tracker),
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ where
self.clean_dirty_fragments()
.await
.expect("clean dirty fragments");
self.sink_manager.reset().await;
let retry_strategy = Self::get_retry_strategy();

// We take retry into consideration because this is the latency user sees for a cluster to
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod env;
mod id;
mod idle;
mod notification;
pub(crate) mod sink_coordination;
mod streaming_job;
mod system_param;

Expand Down
Loading

0 comments on commit bcd79a8

Please sign in to comment.