Skip to content

Commit

Permalink
fix coroutine
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Jun 8, 2024
1 parent 19dfc27 commit 60b2c3c
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 112 deletions.
29 changes: 16 additions & 13 deletions src/connector/src/sink/formatter/append_only.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,22 @@ impl<KE: RowEncoder, VE: RowEncoder> SinkFormatter for AppendOnlyFormatter<KE, V
&self,
chunk: &StreamChunk,
) -> impl Iterator<Item = Result<(Option<Self::K>, Option<Self::V>)>> {
std::iter::from_coroutine(|| {
for (op, row) in chunk.rows() {
if op != Op::Insert {
continue;
}
let event_key_object = match &self.key_encoder {
Some(key_encoder) => Some(tri!(key_encoder.encode(row))),
None => None,
};
let event_object = Some(tri!(self.val_encoder.encode(row)));
std::iter::from_coroutine(
#[coroutine]
|| {
for (op, row) in chunk.rows() {
if op != Op::Insert {
continue;
}
let event_key_object = match &self.key_encoder {
Some(key_encoder) => Some(tri!(key_encoder.encode(row))),
None => None,
};
let event_object = Some(tri!(self.val_encoder.encode(row)));

yield Ok((event_key_object, event_object))
}
})
yield Ok((event_key_object, event_object))
}
},
)
}
}
167 changes: 85 additions & 82 deletions src/connector/src/sink/formatter/debezium_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,100 +98,103 @@ impl SinkFormatter for DebeziumJsonFormatter {
&self,
chunk: &StreamChunk,
) -> impl Iterator<Item = Result<(Option<Value>, Option<Value>)>> {
std::iter::from_coroutine(|| {
let DebeziumJsonFormatter {
schema,
pk_indices,
db_name,
sink_from_name,
opts,
key_encoder,
val_encoder,
} = self;
let ts_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let source_field = json!({
// todo: still some missing fields in source field
// ref https://debezium.io/documentation/reference/2.4/connectors/postgresql.html#postgresql-create-events
"db": db_name,
"table": sink_from_name,
"ts_ms": ts_ms,
});

let mut update_cache: Option<Map<String, Value>> = None;

for (op, row) in chunk.rows() {
let event_key_object: Option<Value> = Some(json!({
"schema": json!({
"type": "struct",
"fields": fields_pk_to_json(&schema.fields, pk_indices),
"optional": false,
"name": concat_debezium_name_field(db_name, sink_from_name, "Key"),
}),
"payload": tri!(key_encoder.encode(row)),
}));
let event_object: Option<Value> = match op {
Op::Insert => Some(json!({
"schema": schema_to_json(schema, db_name, sink_from_name),
"payload": {
"before": null,
"after": tri!(val_encoder.encode(row)),
"op": "c",
"ts_ms": ts_ms,
"source": source_field,
}
})),
Op::Delete => {
let value_obj = Some(json!({
std::iter::from_coroutine(
#[coroutine]
|| {
let DebeziumJsonFormatter {
schema,
pk_indices,
db_name,
sink_from_name,
opts,
key_encoder,
val_encoder,
} = self;
let ts_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let source_field = json!({
// todo: still some missing fields in source field
// ref https://debezium.io/documentation/reference/2.4/connectors/postgresql.html#postgresql-create-events
"db": db_name,
"table": sink_from_name,
"ts_ms": ts_ms,
});

let mut update_cache: Option<Map<String, Value>> = None;

for (op, row) in chunk.rows() {
let event_key_object: Option<Value> = Some(json!({
"schema": json!({
"type": "struct",
"fields": fields_pk_to_json(&schema.fields, pk_indices),
"optional": false,
"name": concat_debezium_name_field(db_name, sink_from_name, "Key"),
}),
"payload": tri!(key_encoder.encode(row)),
}));
let event_object: Option<Value> = match op {
Op::Insert => Some(json!({
"schema": schema_to_json(schema, db_name, sink_from_name),
"payload": {
"before": tri!(val_encoder.encode(row)),
"after": null,
"op": "d",
"before": null,
"after": tri!(val_encoder.encode(row)),
"op": "c",
"ts_ms": ts_ms,
"source": source_field,
}
}));
yield Ok((event_key_object.clone(), value_obj));

if opts.gen_tombstone {
// Tomestone event
// https://debezium.io/documentation/reference/2.1/connectors/postgresql.html#postgresql-delete-events
yield Ok((event_key_object, None));
}

continue;
}
Op::UpdateDelete => {
update_cache = Some(tri!(val_encoder.encode(row)));
continue;
}
Op::UpdateInsert => {
if let Some(before) = update_cache.take() {
Some(json!({
})),
Op::Delete => {
let value_obj = Some(json!({
"schema": schema_to_json(schema, db_name, sink_from_name),
"payload": {
"before": before,
"after": tri!(val_encoder.encode(row)),
"op": "u",
"before": tri!(val_encoder.encode(row)),
"after": null,
"op": "d",
"ts_ms": ts_ms,
"source": source_field,
}
}))
} else {
warn!(
"not found UpdateDelete in prev row, skipping, row index {:?}",
row.index()
);
}));
yield Ok((event_key_object.clone(), value_obj));

if opts.gen_tombstone {
// Tomestone event
// https://debezium.io/documentation/reference/2.1/connectors/postgresql.html#postgresql-delete-events
yield Ok((event_key_object, None));
}

continue;
}
}
};
yield Ok((event_key_object, event_object));
}
})
Op::UpdateDelete => {
update_cache = Some(tri!(val_encoder.encode(row)));
continue;
}
Op::UpdateInsert => {
if let Some(before) = update_cache.take() {
Some(json!({
"schema": schema_to_json(schema, db_name, sink_from_name),
"payload": {
"before": before,
"after": tri!(val_encoder.encode(row)),
"op": "u",
"ts_ms": ts_ms,
"source": source_field,
}
}))
} else {
warn!(
"not found UpdateDelete in prev row, skipping, row index {:?}",
row.index()
);
continue;
}
}
};
yield Ok((event_key_object, event_object));
}
},
)
}
}

Expand Down
37 changes: 20 additions & 17 deletions src/connector/src/sink/formatter/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,25 @@ impl<KE: RowEncoder, VE: RowEncoder> SinkFormatter for UpsertFormatter<KE, VE> {
&self,
chunk: &StreamChunk,
) -> impl Iterator<Item = Result<(Option<Self::K>, Option<Self::V>)>> {
std::iter::from_coroutine(|| {
for (op, row) in chunk.rows() {
let event_key_object = Some(tri!(self.key_encoder.encode(row)));

let event_object = match op {
Op::Insert | Op::UpdateInsert => Some(tri!(self.val_encoder.encode(row))),
// Empty value with a key
Op::Delete => None,
Op::UpdateDelete => {
// upsert semantic does not require update delete event
continue;
}
};

yield Ok((event_key_object, event_object))
}
})
std::iter::from_coroutine(
#[coroutine]
|| {
for (op, row) in chunk.rows() {
let event_key_object = Some(tri!(self.key_encoder.encode(row)));

let event_object = match op {
Op::Insert | Op::UpdateInsert => Some(tri!(self.val_encoder.encode(row))),
// Empty value with a key
Op::Delete => None,
Op::UpdateDelete => {
// upsert semantic does not require update delete event
continue;
}
};

yield Ok((event_key_object, event_object))
}
},
)
}
}

0 comments on commit 60b2c3c

Please sign in to comment.