From 9932d12bf91c822164a06551fe2b56363d77e99c Mon Sep 17 00:00:00 2001 From: congyi wang <58715567+wcy-fdu@users.noreply.github.com> Date: Mon, 26 Aug 2024 22:19:50 +0800 Subject: [PATCH] feat(connector): introduce azblob sink (#18244) --- src/connector/Cargo.toml | 1 + src/connector/src/sink/file_sink/azblob.rs | 131 ++++++++++++++++++ src/connector/src/sink/file_sink/mod.rs | 1 + .../src/sink/file_sink/opendal_sink.rs | 1 + src/connector/src/sink/mod.rs | 3 + src/connector/with_options_sink.yaml | 23 +++ src/frontend/src/handler/create_sink.rs | 4 + 7 files changed, 164 insertions(+) create mode 100644 src/connector/src/sink/file_sink/azblob.rs diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 30dbc2a7c721..8b6e8fa9775f 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -88,6 +88,7 @@ nexmark = { version = "0.2", features = ["serde"] } num-bigint = "0.4" opendal = { workspace = true, features = [ "executors-tokio", + "services-azblob", "services-fs", "services-gcs", "services-memory", diff --git a/src/connector/src/sink/file_sink/azblob.rs b/src/connector/src/sink/file_sink/azblob.rs new file mode 100644 index 000000000000..3a600994639a --- /dev/null +++ b/src/connector/src/sink/file_sink/azblob.rs @@ -0,0 +1,131 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +use std::collections::{BTreeMap, HashMap}; + +use anyhow::anyhow; +use opendal::layers::{LoggingLayer, RetryLayer}; +use opendal::services::Azblob; +use opendal::Operator; +use serde::Deserialize; +use serde_with::serde_as; +use with_options::WithOptions; + +use super::opendal_sink::FileSink; +use crate::sink::file_sink::opendal_sink::OpendalSinkBackend; +use crate::sink::{Result, SinkError, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT}; +use crate::source::UnknownFields; +#[derive(Deserialize, Debug, Clone, WithOptions)] +pub struct AzblobCommon { + #[serde(rename = "azblob.container_name")] + pub container_name: String, + /// The directory where the sink file is located. + #[serde(rename = "azblob.path")] + pub path: String, + #[serde(rename = "azblob.credentials.account_name", default)] + pub account_name: Option, + #[serde(rename = "azblob.credentials.account_key", default)] + pub account_key: Option, + #[serde(rename = "azblob.endpoint_url")] + pub endpoint_url: String, +} + +#[serde_as] +#[derive(Clone, Debug, Deserialize, WithOptions)] +pub struct AzblobConfig { + #[serde(flatten)] + pub common: AzblobCommon, + + pub r#type: String, // accept "append-only" + + #[serde(flatten)] + pub unknown_fields: HashMap, +} + +pub const AZBLOB_SINK: &str = "azblob"; + +impl FileSink { + pub fn new_azblob_sink(config: AzblobConfig) -> Result { + // Create azblob builder. + let mut builder = Azblob::default(); + builder.container(&config.common.container_name); + + builder.endpoint(&config.common.endpoint_url); + + if let Some(account_name) = config.common.account_name { + builder.account_name(&account_name); + } else { + tracing::warn!( + "account_name azblob is not set, container {}", + config.common.container_name + ); + } + + if let Some(account_key) = config.common.account_key { + builder.account_key(&account_key); + } else { + tracing::warn!( + "account_key azblob is not set, container {}", + config.common.container_name + ); + } + let operator: Operator = Operator::new(builder)? + .layer(LoggingLayer::default()) + .layer(RetryLayer::default()) + .finish(); + + Ok(operator) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct AzblobSink; + +impl UnknownFields for AzblobConfig { + fn unknown_fields(&self) -> HashMap { + self.unknown_fields.clone() + } +} + +impl OpendalSinkBackend for AzblobSink { + type Properties = AzblobConfig; + + const SINK_NAME: &'static str = AZBLOB_SINK; + + fn from_btreemap(btree_map: BTreeMap) -> Result { + let config = + serde_json::from_value::(serde_json::to_value(btree_map).unwrap()) + .map_err(|e| SinkError::Config(anyhow!(e)))?; + if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT { + return Err(SinkError::Config(anyhow!( + "`{}` must be {}, or {}", + SINK_TYPE_OPTION, + SINK_TYPE_APPEND_ONLY, + SINK_TYPE_UPSERT + ))); + } + Ok(config) + } + + fn new_operator(properties: AzblobConfig) -> Result { + FileSink::::new_azblob_sink(properties) + } + + fn get_path(properties: Self::Properties) -> String { + properties.common.path + } + + fn get_engine_type() -> super::opendal_sink::EngineType { + super::opendal_sink::EngineType::Azblob + } +} diff --git a/src/connector/src/sink/file_sink/mod.rs b/src/connector/src/sink/file_sink/mod.rs index 39e0f0208f88..fe25df4a5d1e 100644 --- a/src/connector/src/sink/file_sink/mod.rs +++ b/src/connector/src/sink/file_sink/mod.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod azblob; pub mod fs; pub mod gcs; pub mod opendal_sink; diff --git a/src/connector/src/sink/file_sink/opendal_sink.rs b/src/connector/src/sink/file_sink/opendal_sink.rs index 4b201bba131e..1f6ec2b635fe 100644 --- a/src/connector/src/sink/file_sink/opendal_sink.rs +++ b/src/connector/src/sink/file_sink/opendal_sink.rs @@ -86,6 +86,7 @@ pub enum EngineType { Gcs, S3, Fs, + Azblob, } impl Sink for FileSink { diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 3cfd72feabaa..b73455108029 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -102,7 +102,10 @@ macro_rules! for_all_sinks { { Doris, $crate::sink::doris::DorisSink }, { Starrocks, $crate::sink::starrocks::StarrocksSink }, { S3, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::s3::S3Sink>}, + { Gcs, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::gcs::GcsSink> }, + { Azblob, $crate::sink::file_sink::opendal_sink::FileSink<$crate::sink::file_sink::azblob::AzblobSink>}, + { Fs, $crate::sink::file_sink::opendal_sink::FileSink }, { Snowflake, $crate::sink::snowflake::SnowflakeSink }, { DeltaLake, $crate::sink::deltalake::DeltaLakeSink }, diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 4672da3025d5..6b661139e13e 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -1,6 +1,29 @@ # THIS FILE IS AUTO_GENERATED. DO NOT EDIT # UPDATE WITH: ./risedev generate-with-options +AzblobConfig: + fields: + - name: azblob.container_name + field_type: String + required: true + - name: azblob.path + field_type: String + comments: The directory where the sink file is located. + required: true + - name: azblob.credentials.account_name + field_type: String + required: false + default: Default::default + - name: azblob.credentials.account_key + field_type: String + required: false + default: Default::default + - name: azblob.endpoint_url + field_type: String + required: true + - name: r#type + field_type: String + required: true BigQueryConfig: fields: - name: bigquery.local.path diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 84d98f43e2a1..f0c1ed074ed1 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -869,6 +869,7 @@ fn bind_sink_format_desc(session: &SessionImpl, value: ConnectorSchema) -> Resul static CONNECTORS_COMPATIBLE_FORMATS: LazyLock>>> = LazyLock::new(|| { + use risingwave_connector::sink::file_sink::azblob::AzblobSink; use risingwave_connector::sink::file_sink::fs::FsSink; use risingwave_connector::sink::file_sink::gcs::GcsSink; use risingwave_connector::sink::file_sink::opendal_sink::FileSink; @@ -896,6 +897,9 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock::SINK_NAME => hashmap!( Format::Plain => vec![Encode::Parquet], ), + FileSink::::SINK_NAME => hashmap!( + Format::Plain => vec![Encode::Parquet], + ), FileSink::::SINK_NAME => hashmap!( Format::Plain => vec![Encode::Parquet], ),