diff --git a/src/connector/src/sink/formatter/append_only.rs b/src/connector/src/sink/formatter/append_only.rs index 8a4b7feda6a4..c0c47fa37e17 100644 --- a/src/connector/src/sink/formatter/append_only.rs +++ b/src/connector/src/sink/formatter/append_only.rs @@ -40,19 +40,22 @@ impl SinkFormatter for AppendOnlyFormatter impl Iterator, Option)>> { - std::iter::from_coroutine(|| { - for (op, row) in chunk.rows() { - if op != Op::Insert { - continue; - } - let event_key_object = match &self.key_encoder { - Some(key_encoder) => Some(tri!(key_encoder.encode(row))), - None => None, - }; - let event_object = Some(tri!(self.val_encoder.encode(row))); + std::iter::from_coroutine( + #[coroutine] + || { + for (op, row) in chunk.rows() { + if op != Op::Insert { + continue; + } + let event_key_object = match &self.key_encoder { + Some(key_encoder) => Some(tri!(key_encoder.encode(row))), + None => None, + }; + let event_object = Some(tri!(self.val_encoder.encode(row))); - yield Ok((event_key_object, event_object)) - } - }) + yield Ok((event_key_object, event_object)) + } + }, + ) } } diff --git a/src/connector/src/sink/formatter/debezium_json.rs b/src/connector/src/sink/formatter/debezium_json.rs index 6fff15058bd6..68d21fd0a3f4 100644 --- a/src/connector/src/sink/formatter/debezium_json.rs +++ b/src/connector/src/sink/formatter/debezium_json.rs @@ -98,100 +98,103 @@ impl SinkFormatter for DebeziumJsonFormatter { &self, chunk: &StreamChunk, ) -> impl Iterator, Option)>> { - std::iter::from_coroutine(|| { - let DebeziumJsonFormatter { - schema, - pk_indices, - db_name, - sink_from_name, - opts, - key_encoder, - val_encoder, - } = self; - let ts_ms = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_millis() as u64; - let source_field = json!({ - // todo: still some missing fields in source field - // ref https://debezium.io/documentation/reference/2.4/connectors/postgresql.html#postgresql-create-events - "db": db_name, - "table": sink_from_name, - "ts_ms": ts_ms, - }); - - let mut update_cache: Option> = None; - - for (op, row) in chunk.rows() { - let event_key_object: Option = Some(json!({ - "schema": json!({ - "type": "struct", - "fields": fields_pk_to_json(&schema.fields, pk_indices), - "optional": false, - "name": concat_debezium_name_field(db_name, sink_from_name, "Key"), - }), - "payload": tri!(key_encoder.encode(row)), - })); - let event_object: Option = match op { - Op::Insert => Some(json!({ - "schema": schema_to_json(schema, db_name, sink_from_name), - "payload": { - "before": null, - "after": tri!(val_encoder.encode(row)), - "op": "c", - "ts_ms": ts_ms, - "source": source_field, - } - })), - Op::Delete => { - let value_obj = Some(json!({ + std::iter::from_coroutine( + #[coroutine] + || { + let DebeziumJsonFormatter { + schema, + pk_indices, + db_name, + sink_from_name, + opts, + key_encoder, + val_encoder, + } = self; + let ts_ms = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + let source_field = json!({ + // todo: still some missing fields in source field + // ref https://debezium.io/documentation/reference/2.4/connectors/postgresql.html#postgresql-create-events + "db": db_name, + "table": sink_from_name, + "ts_ms": ts_ms, + }); + + let mut update_cache: Option> = None; + + for (op, row) in chunk.rows() { + let event_key_object: Option = Some(json!({ + "schema": json!({ + "type": "struct", + "fields": fields_pk_to_json(&schema.fields, pk_indices), + "optional": false, + "name": concat_debezium_name_field(db_name, sink_from_name, "Key"), + }), + "payload": tri!(key_encoder.encode(row)), + })); + let event_object: Option = match op { + Op::Insert => Some(json!({ "schema": schema_to_json(schema, db_name, sink_from_name), "payload": { - "before": tri!(val_encoder.encode(row)), - "after": null, - "op": "d", + "before": null, + "after": tri!(val_encoder.encode(row)), + "op": "c", "ts_ms": ts_ms, "source": source_field, } - })); - yield Ok((event_key_object.clone(), value_obj)); - - if opts.gen_tombstone { - // Tomestone event - // https://debezium.io/documentation/reference/2.1/connectors/postgresql.html#postgresql-delete-events - yield Ok((event_key_object, None)); - } - - continue; - } - Op::UpdateDelete => { - update_cache = Some(tri!(val_encoder.encode(row))); - continue; - } - Op::UpdateInsert => { - if let Some(before) = update_cache.take() { - Some(json!({ + })), + Op::Delete => { + let value_obj = Some(json!({ "schema": schema_to_json(schema, db_name, sink_from_name), "payload": { - "before": before, - "after": tri!(val_encoder.encode(row)), - "op": "u", + "before": tri!(val_encoder.encode(row)), + "after": null, + "op": "d", "ts_ms": ts_ms, "source": source_field, } - })) - } else { - warn!( - "not found UpdateDelete in prev row, skipping, row index {:?}", - row.index() - ); + })); + yield Ok((event_key_object.clone(), value_obj)); + + if opts.gen_tombstone { + // Tomestone event + // https://debezium.io/documentation/reference/2.1/connectors/postgresql.html#postgresql-delete-events + yield Ok((event_key_object, None)); + } + continue; } - } - }; - yield Ok((event_key_object, event_object)); - } - }) + Op::UpdateDelete => { + update_cache = Some(tri!(val_encoder.encode(row))); + continue; + } + Op::UpdateInsert => { + if let Some(before) = update_cache.take() { + Some(json!({ + "schema": schema_to_json(schema, db_name, sink_from_name), + "payload": { + "before": before, + "after": tri!(val_encoder.encode(row)), + "op": "u", + "ts_ms": ts_ms, + "source": source_field, + } + })) + } else { + warn!( + "not found UpdateDelete in prev row, skipping, row index {:?}", + row.index() + ); + continue; + } + } + }; + yield Ok((event_key_object, event_object)); + } + }, + ) } } diff --git a/src/connector/src/sink/formatter/upsert.rs b/src/connector/src/sink/formatter/upsert.rs index 612c22aaaf86..7e586a7917ea 100644 --- a/src/connector/src/sink/formatter/upsert.rs +++ b/src/connector/src/sink/formatter/upsert.rs @@ -40,22 +40,25 @@ impl SinkFormatter for UpsertFormatter { &self, chunk: &StreamChunk, ) -> impl Iterator, Option)>> { - std::iter::from_coroutine(|| { - for (op, row) in chunk.rows() { - let event_key_object = Some(tri!(self.key_encoder.encode(row))); - - let event_object = match op { - Op::Insert | Op::UpdateInsert => Some(tri!(self.val_encoder.encode(row))), - // Empty value with a key - Op::Delete => None, - Op::UpdateDelete => { - // upsert semantic does not require update delete event - continue; - } - }; - - yield Ok((event_key_object, event_object)) - } - }) + std::iter::from_coroutine( + #[coroutine] + || { + for (op, row) in chunk.rows() { + let event_key_object = Some(tri!(self.key_encoder.encode(row))); + + let event_object = match op { + Op::Insert | Op::UpdateInsert => Some(tri!(self.val_encoder.encode(row))), + // Empty value with a key + Op::Delete => None, + Op::UpdateDelete => { + // upsert semantic does not require update delete event + continue; + } + }; + + yield Ok((event_key_object, event_object)) + } + }, + ) } }