Skip to content

Commit

Permalink
[FLINK-35592][cdc] Fix MysqlDebeziumTimeConverter miss timezone conve…
Browse files Browse the repository at this point in the history
…rt to timestamp (#3380)
  • Loading branch information
czy006 authored Jun 13, 2024
1 parent 7287eac commit f476a5b
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,54 +113,73 @@ private void registerDateConverter(
registration.register(
SchemaBuilder.string().name(schemaName).optional(),
value -> {
log.debug(
"find schema need to change dateType, field name:{} ,field type:{} ,field value:{} ,field "
+ "default:{}",
field.name(),
columnType,
value == null ? "null" : value,
field.hasDefaultValue() ? field.defaultValue() : "null");
if (value == null) {
return convertDateDefaultValue(field);
}
switch (columnType.toUpperCase(Locale.ROOT)) {
case "DATE":
if (value instanceof Integer) {
return this.convertToDate(
columnType, LocalDate.ofEpochDay((Integer) value));
}
return this.convertToDate(columnType, value);
case "TIME":
if (value instanceof Long) {
long l =
Math.multiplyExact(
(Long) value, TimeUnit.MICROSECONDS.toNanos(1));
return this.convertToTime(columnType, LocalTime.ofNanoOfDay(l));
}
return this.convertToTime(columnType, value);
case "DATETIME":
if (value instanceof Long) {
if (getTimePrecision(field) <= 3) {
return this.convertToTimestamp(
columnType,
Conversions.toInstantFromMillis((Long) value));
}
if (getTimePrecision(field) <= 6) {
return this.convertToTimestamp(
columnType,
Conversions.toInstantFromMicros((Long) value));
}
}
return this.convertToTimestamp(columnType, value);
case "TIMESTAMP":
return this.convertToTimestampWithTimezone(columnType, value);
default:
throw new IllegalArgumentException(
"Unknown field type " + columnType.toUpperCase(Locale.ROOT));
try {
return convertDateObject(field, value, columnType);
} catch (Exception e) {
printConvertDateErrorClassLogs(field, registration, value);
throw new RuntimeException("MysqlDebeziumConverter error", e);
}
});
}

private void printConvertDateErrorClassLogs(
RelationalColumn field,
ConverterRegistration<SchemaBuilder> registration,
Object value) {
boolean useDefaultValueConvert = (value == null);
String fieldName = field.name();
String fieldType = field.typeName().toUpperCase();
String defaultValue = "null";
if (field.hasDefaultValue()) {
if (field.defaultValue() != null) {
defaultValue = field.defaultValue().toString();
}
}
log.warn(
"find schema need to change dateType, field name:||{}|| field type:||{}|| is use default "
+ "convert:||{}|| field default value:||{}|| field charge value fail",
fieldName,
fieldType,
useDefaultValueConvert,
defaultValue);
}

private Object convertDateObject(RelationalColumn field, Object value, String columnType) {
if (value == null) {
return convertDateDefaultValue(field);
}
switch (columnType.toUpperCase(Locale.ROOT)) {
case "DATE":
if (value instanceof Integer) {
return this.convertToDate(columnType, LocalDate.ofEpochDay((Integer) value));
}
return this.convertToDate(columnType, value);
case "TIME":
if (value instanceof Long) {
long l = Math.multiplyExact((Long) value, TimeUnit.MICROSECONDS.toNanos(1));
return this.convertToTime(columnType, LocalTime.ofNanoOfDay(l));
}
return this.convertToTime(columnType, value);
case "DATETIME":
if (value instanceof Long) {
if (getTimePrecision(field) <= 3) {
return this.convertToTimestamp(
columnType, Conversions.toInstantFromMillis((Long) value));
}
if (getTimePrecision(field) <= 6) {
return this.convertToTimestamp(
columnType, Conversions.toInstantFromMicros((Long) value));
}
}
return this.convertToTimestamp(columnType, value);
case "TIMESTAMP":
return this.convertToTimestampWithTimezone(columnType, value);
default:
throw new IllegalArgumentException(
"Unknown field type " + columnType.toUpperCase(Locale.ROOT));
}
}

private Object convertToTimestampWithTimezone(String columnType, Object timestamp) {
// In snapshot mode, debezium produces a java.sql.Timestamp object for the TIMESTAMPTZ type.
// Conceptually, a timestamp with timezone is an Instant. But t.toInstant() actually
Expand All @@ -174,11 +193,12 @@ private Object convertToTimestampWithTimezone(String columnType, Object timestam
ZonedDateTime zonedDateTime = value.toInstant().atZone(zoneId);
return ConvertTimeBceUtil.resolveEra(value, zonedDateTime.format(timestampFormatter));
} else if (timestamp instanceof OffsetDateTime) {
OffsetDateTime value = (OffsetDateTime) timestamp;
OffsetDateTime value =
((OffsetDateTime) timestamp).toInstant().atZone(zoneId).toOffsetDateTime();
return ConvertTimeBceUtil.resolveEra(
value.toLocalDate(), value.format(timestampFormatter));
} else if (timestamp instanceof ZonedDateTime) {
ZonedDateTime zonedDateTime = (ZonedDateTime) timestamp;
ZonedDateTime zonedDateTime = ((ZonedDateTime) timestamp).toInstant().atZone(zoneId);
return ConvertTimeBceUtil.resolveEra(
zonedDateTime.toLocalDate(), zonedDateTime.format(timestampFormatter));
} else if (timestamp instanceof Instant) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private void testReadDateConvertDataStreamSource(String timezone) throws Excepti

private void validTimestampValue(List<String> result) throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
String[] timestampValues = new String[] {"14:23:00", "00:00:00", "00:00:00"};
String[] timestampValues = new String[] {"14:23:00", "00:00:00", "00:00:00", "15:04:00"};
for (String after : result) {
JsonNode jsonNode = mapper.readTree(after);
Assert.assertEquals(
Expand Down Expand Up @@ -232,7 +232,8 @@ private void checkData(TableResult tableResult) {
new String[] {
"+I[1, 14:23:00, 2023-04-01 14:24:00, 2023-04-01, 14:25:00]",
"+I[3, 00:00:00, null, null, 00:01:20]",
"+I[2, 00:00:00, null, null, 00:00:00]"
"+I[2, 00:00:00, null, null, 00:00:00]",
"+I[4, 15:04:00, null, null, 00:01:10]"
};

List<String> expectedSnapshotData = new ArrayList<>(Arrays.asList(snapshotForSingleTable));
Expand Down Expand Up @@ -283,7 +284,8 @@ private String buildMySqlConfigWithTimezone(String timezone) {
+ "binlog_format = row\n"
+ "log_bin = mysql-bin\n"
+ "server-id = 223344\n"
+ "binlog_row_image = FULL\n";
+ "binlog_row_image = FULL\n"
+ "sql_mode = ALLOW_INVALID_DATES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION\n";
String timezoneConf = "default-time_zone = '" + timezone + "'\n";
Files.write(
cnf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ INSERT INTO date_convert_test (id,test_timestamp, test_datetime, test_date, test
VALUES
(1,'2023-04-01 14:23:00', '2023-04-01 14:24:00', '2023-04-01', '14:25:00'),
(2,'2024-04-23 00:00:00', DEFAULT, NULL ,'00:00:00'),
(3,'2024-04-23 00:00:00', DEFAULT, NULL ,120);
(3,'2024-04-23 00:00:00', DEFAULT, NULL ,120),
(4,20240612150400, DEFAULT, NULL ,110);

0 comments on commit f476a5b

Please sign in to comment.