Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
remove index
  • Loading branch information
xxhZs committed Mar 19, 2024
1 parent cb3c3d2 commit 5cc2579
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 6 deletions.
8 changes: 3 additions & 5 deletions src/connector/src/sink/big_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,6 @@ impl BigQuerySinkWriter {
.iter()
.map(|f| (f.name.as_str(), &f.data_type)),
config.common.table.clone(),
1,
);

if !is_append_only {
Expand Down Expand Up @@ -579,13 +578,12 @@ fn build_protobuf_descriptor_pool(desc: &DescriptorProto) -> prost_reflect::Desc
fn build_protobuf_schema<'a>(
fields: impl Iterator<Item = (&'a str, &'a DataType)>,
name: String,
index: i32,
) -> DescriptorProto {
let mut proto = DescriptorProto {
name: Some(name),
..Default::default()
};
let mut index_mut = index;
let mut index_mut = 1;
let mut field_vec = vec![];
let mut struct_vec = vec![];
for (name, data_type) in fields {
Expand Down Expand Up @@ -628,7 +626,7 @@ fn build_protobuf_field(
DataType::Struct(s) => {
field.r#type = Some(field_descriptor_proto::Type::Message.into());
let name = format!("Struct{}", name);
let sub_proto = build_protobuf_schema(s.iter(), name.clone(), 1);
let sub_proto = build_protobuf_schema(s.iter(), name.clone());
field.type_name = Some(name);
return (field, Some(sub_proto));
}
Expand Down Expand Up @@ -701,7 +699,7 @@ mod test {
.fields()
.iter()
.map(|f| (f.name.as_str(), &f.data_type));
let desc = build_protobuf_schema(fields, "t1".to_string(), 1);
let desc = build_protobuf_schema(fields, "t1".to_string());
let pool = build_protobuf_descriptor_pool(&desc);
let t1_message = pool.get_message_by_name("t1").unwrap();
assert_matches!(
Expand Down
1 change: 0 additions & 1 deletion src/connector/src/sink/encoder/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,6 @@ mod tests {
let pool_bytes = std::fs::read(pool_path).unwrap();
let pool = prost_reflect::DescriptorPool::decode(pool_bytes.as_ref()).unwrap();
let descriptor = pool.get_message_by_name("recursive.AllTypes").unwrap();
println!("a{:?}", descriptor.descriptor_proto());
let schema = Schema::new(vec![
Field::with_name(DataType::Boolean, "bool_field"),
Field::with_name(DataType::Varchar, "string_field"),
Expand Down

0 comments on commit 5cc2579

Please sign in to comment.