Skip to content

Commit

Permalink
basic sink funtionality with json encoder
Browse files Browse the repository at this point in the history
  • Loading branch information
xzhseh committed Mar 12, 2024
1 parent db090a9 commit 95310bf
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 23 deletions.
78 changes: 67 additions & 11 deletions src/connector/src/sink/snowflake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,25 @@ use std::sync::Arc;

use anyhow::anyhow;
use async_trait::async_trait;
use risingwave_common::array::StreamChunk;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::buffer::Bitmap;
use risingwave_common::catalog::Schema;
use serde::Deserialize;
use serde_derive::Serialize;
use serde_json::Value;
use serde_with::serde_as;
use with_options::WithOptions;

use super::encoder::JsonEncoder;
use super::encoder::{
JsonEncoder, RowEncoder, TimeHandlingMode, TimestampHandlingMode, TimestamptzHandlingMode,
};
use super::snowflake_connector::{SnowflakeHttpClient, SnowflakeS3Client};
use super::writer::LogSinkerOf;
use super::{SinkError, SinkParam};
use crate::sink::writer::SinkWriterExt;
use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkWriter, SinkWriterParam};

pub const SNOWFLAKE_SINK: &str = "snowflake";
const MAX_BATCH_NUM: u32 = 1000000;

// TODO: add comments
#[derive(Deserialize, Debug, Clone, WithOptions)]
Expand Down Expand Up @@ -108,7 +110,9 @@ impl Sink for SnowflakeSink {
self.schema.clone(),
self.pk_indices.clone(),
self.is_append_only,
).into_log_sink(writer_param.sink_metrics))
)
.await
.into_log_sinker(writer_param.sink_metrics))
}

async fn validate(&self) -> Result<()> {
Expand Down Expand Up @@ -140,22 +144,74 @@ pub struct SnowflakeSinkWriter {
http_client: SnowflakeHttpClient,
/// the client to insert file to external storage (i.e., s3)
s3_client: SnowflakeS3Client,
row_encoder: JsonEncoder,
counter: u32,
payload: String,
}

impl SnowflakeSinkWriter {
pub fn new(
pub async fn new(
config: SnowflakeConfig,
schema: Schema,
pk_indices: Vec<usize>,
is_append_only: bool,
) -> Self {
todo!()
let http_client = SnowflakeHttpClient::new(
config.common.account_identifier.clone(),
config.common.user.clone(),
config.common.database.clone(),
config.common.schema.clone(),
config.common.pipe.clone(),
config.common.rsa_public_key_fp.clone(),
config.common.private_key.clone(),
HashMap::new(),
);

let s3_client = SnowflakeS3Client::new(config.common.s3_bucket.clone()).await;

Self {
config,
schema: schema.clone(),
pk_indices,
is_append_only,
http_client,
s3_client,
row_encoder: JsonEncoder::new(
schema,
None,
super::encoder::DateHandlingMode::String,
TimestampHandlingMode::String,
TimestamptzHandlingMode::UtcString,
TimeHandlingMode::String,
),
counter: 0,
payload: String::new(),
}
}

fn reset(&mut self) {
self.payload.clear();
self.counter = 0;
}

async fn append_only(&mut self, chunk: StreamChunk) -> Result<()> {
for (op, row) in chunk.rows() {
if op != Op::Insert {
continue;
}
let row_json_string = Value::Object(self.row_encoder.encode(row)?).to_string();
self.payload.push_str(&row_json_string);
}
self.s3_client.sink_to_s3(self.payload.clone().into()).await?;
self.http_client.send_request().await?;
self.reset();
Ok(())
}
}

#[async_trait]
impl SinkWriter for SnowflakeSinkWriter {
async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
async fn begin_epoch(&mut self, _epoch: u64) -> Result<()> {
Ok(())
}

Expand All @@ -167,11 +223,11 @@ impl SinkWriter for SnowflakeSinkWriter {
Ok(())
}

async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
todo!()
async fn barrier(&mut self, _is_checkpoint: bool) -> Result<Self::CommitMetadata> {
Ok(())
}

async fn barrier(&mut self, is_checkpoint: bool) -> Result<Self::CommitMetadata> {
todo!()
async fn write_batch(&mut self, chunk: StreamChunk) -> Result<()> {
self.append_only(chunk).await
}
}
51 changes: 39 additions & 12 deletions src/connector/src/sink/snowflake_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};

use aws_config;
use aws_sdk_s3::{Client as S3Client, Error as S3Error};
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::Client as S3Client;
use bytes::Bytes;
use http::request::Builder;
use http::header;
use hyper::body::Body;
use hyper::client::HttpConnector;
use hyper::{Client, Request, StatusCode};
Expand All @@ -30,6 +33,7 @@ use super::{Result, SinkError};

const SNOWFLAKE_HOST_ADDR: &str = "snowflakecomputing.com";
const SNOWFLAKE_REQUEST_ID: &str = "RW_SNOWFLAKE_SINK";
const S3_INTERMEDIATE_FILE_NAME: &str = "RW_SNOWFLAKE_S3_SINK_FILE";

/// Claims is used when constructing `jwt_token`
/// with payload specified.
Expand Down Expand Up @@ -65,7 +69,7 @@ impl SnowflakeHttpClient {
) -> Self {
// TODO: ensure if we need user to *explicitly* provide the `request_id`
let url = format!(
"https://{}.{}/v1/data/pipes/{}.{}.{}/insertFiles?request_id={}",
"https://{}.{}/v1/data/pipes/{}.{}.{}/insertFiles?requestId={}",
account.clone(),
SNOWFLAKE_HOST_ADDR,
db,
Expand All @@ -74,6 +78,8 @@ impl SnowflakeHttpClient {
SNOWFLAKE_REQUEST_ID
);

println!("url: {}", url);

Self {
url,
rsa_public_key_fp,
Expand Down Expand Up @@ -132,9 +138,6 @@ impl SnowflakeHttpClient {

fn build_request_and_client(&self) -> (Builder, Client<HttpsConnector<HttpConnector>>) {
let mut builder = Request::post(self.url.clone());
for (k, v) in &self.header {
builder = builder.header(k, v);
}

let connector = HttpsConnector::new();
let client = Client::builder()
Expand All @@ -152,12 +155,16 @@ impl SnowflakeHttpClient {
// Generate the jwt_token
let jwt_token = self.generate_jwt_token()?;
let builder = builder
.header(header::CONTENT_TYPE, "text/plain")
.header("Authorization", format!("Bearer {}", jwt_token))
.header("X-Snowflake-Authorization-Token-Type".to_string(), "KEYPAIR_JWT");
.header(
"X-Snowflake-Authorization-Token-Type".to_string(),
"KEYPAIR_JWT",
);

let request = builder
// TODO: ensure this
.body(Body::from(self.s3_file.clone()))
.body(Body::from(S3_INTERMEDIATE_FILE_NAME))
.map_err(|err| SinkError::Snowflake(err.to_string()))?;

let response = client
Expand All @@ -171,25 +178,45 @@ impl SnowflakeHttpClient {
response.status()
)));
}

println!("resp: {:#?}", response);

Ok(())
}
}

/// TODO(Zihao): refactor this part after s3 sink is available
pub struct SnowflakeS3Client {
s3_bucket: String,
s3_file: String,
s3_client: S3Client,
}

impl SnowflakeS3Client {
pub fn new(s3_bucket: String, s3_file: String) -> Self {
pub async fn new(s3_bucket: String) -> Self {
let config = aws_config::load_from_env().await;
let s3_client = S3Client::new(&config);

Self {
s3_bucket,
s3_file,
s3_client,
}
}

pub fn sink_to_s3() -> Result<()> {
todo!()
pub async fn sink_to_s3(&self, data: Bytes) -> Result<()> {
self.s3_client
.put_object()
.bucket(self.s3_bucket.clone())
.key(S3_INTERMEDIATE_FILE_NAME)
.body(ByteStream::from(data))
.send()
.await
.map_err(|err| {
SinkError::Snowflake(format!(
"failed to sink data to S3, error: {}",
err.to_string()
))
})?;

Ok(())
}
}

0 comments on commit 95310bf

Please sign in to comment.