Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36611][pipeline-connector][kafka] Add schema info to output of Kafka sink #3791

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
import org.apache.flink.util.Collector;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.debezium.data.Envelope;
import io.debezium.data.SpecialValueDecimal;
import io.debezium.data.VariableScaleDecimal;
Expand All @@ -51,14 +55,18 @@
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.ConverterConfig;
import org.apache.kafka.connect.storage.ConverterType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -82,10 +90,18 @@ public abstract class DebeziumEventDeserializationSchema extends SourceRecordEve
/** Changelog Mode to use for encoding changes in Flink internal data structure. */
protected final DebeziumChangelogMode changelogMode;

private transient JsonConverter jsonConverter;

private final boolean includeSchemaInfo;

public DebeziumEventDeserializationSchema(
SchemaDataTypeInference schemaDataTypeInference, DebeziumChangelogMode changelogMode) {
SchemaDataTypeInference schemaDataTypeInference,
DebeziumChangelogMode changelogMode,
boolean includeSchemaInfo) {
this.schemaDataTypeInference = schemaDataTypeInference;
this.changelogMode = changelogMode;
this.includeSchemaInfo = includeSchemaInfo;

}

@Override
Expand All @@ -102,21 +118,70 @@ public List<DataChangeEvent> deserializeDataChangeRecord(SourceRecord record) th
Schema valueSchema = record.valueSchema();
Map<String, String> meta = getMetadata(record);

if (includeSchemaInfo) {
if (jsonConverter == null) {
initializeJsonConverter();
}
}
if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
RecordData after = extractAfterDataRecord(value, valueSchema);
return Collections.singletonList(DataChangeEvent.insertEvent(tableId, after, meta));
List<DataChangeEvent> dataChangeEvent =
includeSchemaInfo
? Collections.singletonList(
DataChangeEvent.insertEvent(
tableId,
after,
meta,
extractBeforeAndAfterSchema(
jsonConverter.asJsonSchema(valueSchema))))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Through the jsonConverter.asJsonSchema(valueSchema) method, we can easily obtain the complete schema json information of debezium-json.
At first, I wanted to convert the schema josn to GenericRowData, but the schema structure of debezium is too complex and difficult to implement.
Finally, I chose to pass the schema json information as a string to the downstream, which can reduce some serialization and deserialization overhead

: Collections.singletonList(
DataChangeEvent.insertEvent(tableId, after, meta));
return dataChangeEvent;
} else if (op == Envelope.Operation.DELETE) {
RecordData before = extractBeforeDataRecord(value, valueSchema);
return Collections.singletonList(DataChangeEvent.deleteEvent(tableId, before, meta));
List<DataChangeEvent> dataChangeEvent =
includeSchemaInfo
? Collections.singletonList(
DataChangeEvent.deleteEvent(
tableId,
before,
meta,
extractBeforeAndAfterSchema(
jsonConverter.asJsonSchema(valueSchema))))
: Collections.singletonList(
DataChangeEvent.deleteEvent(tableId, before, meta));
return dataChangeEvent;
} else if (op == Envelope.Operation.UPDATE) {
RecordData after = extractAfterDataRecord(value, valueSchema);
if (changelogMode == DebeziumChangelogMode.ALL) {
RecordData before = extractBeforeDataRecord(value, valueSchema);
return Collections.singletonList(
DataChangeEvent.updateEvent(tableId, before, after, meta));
List<DataChangeEvent> dataChangeEvent =
includeSchemaInfo
? Collections.singletonList(
DataChangeEvent.updateEvent(
tableId,
before,
after,
meta,
extractBeforeAndAfterSchema(
jsonConverter.asJsonSchema(valueSchema))))
: Collections.singletonList(
DataChangeEvent.updateEvent(tableId, before, after, meta));
return dataChangeEvent;
}
return Collections.singletonList(
DataChangeEvent.updateEvent(tableId, null, after, meta));
List<DataChangeEvent> dataChangeEvent =
includeSchemaInfo
? Collections.singletonList(
DataChangeEvent.updateEvent(
tableId,
null,
after,
meta,
extractBeforeAndAfterSchema(
jsonConverter.asJsonSchema(valueSchema))))
: Collections.singletonList(
DataChangeEvent.updateEvent(tableId, null, after, meta));
return dataChangeEvent;
} else {
LOG.trace("Received {} operation, skip", op);
return Collections.emptyList();
Expand All @@ -140,6 +205,24 @@ private RecordData extractAfterDataRecord(Struct value, Schema valueSchema) thro
return extractDataRecord(afterValue, afterSchema);
}

/** extract schema of before or after fields. */
private String extractBeforeAndAfterSchema(ObjectNode valueSchema) {
ObjectMapper mapper = new ObjectMapper();
ObjectNode copyNode = valueSchema.deepCopy();

ArrayNode fieldsArray = (ArrayNode) copyNode.get("fields");
ArrayNode newFields = mapper.createArrayNode();
for (JsonNode field : fieldsArray) {
String fieldName = field.get("field").asText();
if (fieldName.equals(Envelope.FieldName.BEFORE)
|| fieldName.equals(Envelope.FieldName.AFTER)) {
newFields.add(field);
}
}
copyNode.set("fields", newFields);
return copyNode.toString();
}

private RecordData extractDataRecord(Struct value, Schema valueSchema) throws Exception {
DataType dataType = schemaDataTypeInference.infer(value, valueSchema);
return (RecordData) getOrCreateConverter(dataType).convert(value, valueSchema);
Expand All @@ -149,6 +232,13 @@ private DeserializationRuntimeConverter getOrCreateConverter(DataType type) {
return CONVERTERS.computeIfAbsent(type, this::createConverter);
}

private void initializeJsonConverter() {
jsonConverter = new JsonConverter();
final HashMap<String, Object> configs = new HashMap<>(2);
configs.put(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName());
jsonConverter.configure(configs);
}

// -------------------------------------------------------------------------------------
// Runtime Converters
// -------------------------------------------------------------------------------------
Expand Down