From db090a9704f4d82c6b1c44e257eba0c8477e8d98 Mon Sep 17 00:00:00 2001 From: Michael Xu Date: Mon, 11 Mar 2024 18:31:14 -0400 Subject: [PATCH] add log sinker --- src/connector/src/sink/snowflake.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/connector/src/sink/snowflake.rs b/src/connector/src/sink/snowflake.rs index a62c6b083912..2820a44341ba 100644 --- a/src/connector/src/sink/snowflake.rs +++ b/src/connector/src/sink/snowflake.rs @@ -30,6 +30,7 @@ use super::encoder::JsonEncoder; use super::snowflake_connector::{SnowflakeHttpClient, SnowflakeS3Client}; use super::writer::LogSinkerOf; use super::{SinkError, SinkParam}; +use crate::sink::writer::SinkWriterExt; use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam}; pub const SNOWFLAKE_SINK: &str = "snowflake"; @@ -102,7 +103,12 @@ impl Sink for SnowflakeSink { const SINK_NAME: &'static str = SNOWFLAKE_SINK; async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result { - todo!() + Ok(SnowflakeSinkWriter::new( + self.config.clone(), + self.schema.clone(), + self.pk_indices.clone(), + self.is_append_only, + ).into_log_sink(writer_param.sink_metrics)) } async fn validate(&self) -> Result<()> {