Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[source-mysql] 7084 source mysql regression in 39x date parsing errors #49932

Merged
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.9.2
dockerImageTag: 3.9.3
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ import io.airbyte.cdk.util.Jsons
import io.micronaut.context.annotation.Primary
import jakarta.inject.Singleton
import java.time.LocalDateTime
import java.time.OffsetDateTime
import java.time.ZoneOffset.UTC
import java.time.format.DateTimeFormatter
import java.time.format.DateTimeFormatterBuilder
import java.time.format.DateTimeParseException
import java.time.temporal.ChronoField
import java.util.*
import java.util.concurrent.ConcurrentHashMap
Expand Down Expand Up @@ -350,15 +353,29 @@ class MysqlJdbcPartitionFactory(
}
LeafAirbyteSchemaType.TIMESTAMP_WITH_TIMEZONE -> {
val timestampInStatePattern = "yyyy-MM-dd'T'HH:mm:ss"
val formatter =
DateTimeFormatterBuilder()
.appendPattern(timestampInStatePattern)
.optionalStart()
.appendFraction(ChronoField.NANO_OF_SECOND, 1, 9, true)
.optionalEnd()
.optionalStart()
.optionalStart()
.appendLiteral(' ')
.optionalEnd()
.appendOffset("+HH:mm", "Z")
.optionalEnd()
.toFormatter()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is painful 😓

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found chatgpt useful for generating this kind of code. Even so, it's still annoying.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It actually gave me wrong advice here, much to my delight
maybe humans still got a chance 🤭

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yay humans!


try {
val formatter: DateTimeFormatter =
DateTimeFormatter.ofPattern(timestampInStatePattern)
Jsons.valueToTree(
LocalDateTime.parse(stateValue, formatter)
.minusDays(1)
.atOffset(java.time.ZoneOffset.UTC)
.format(OffsetDateTimeCodec.formatter)
)
val offsetDateTime =
try {
OffsetDateTime.parse(stateValue, formatter)
} catch (_: DateTimeParseException) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If value include no +xx:xx or Z then we create an offset datetime with an assumed UTC offset.
This is done via a LocalDate.

// if no offset exists, we assume it's UTC
LocalDateTime.parse(stateValue, formatter).atOffset(UTC)
}
Jsons.valueToTree(offsetDateTime.format(OffsetDateTimeCodec.formatter))
} catch (_: RuntimeException) {
// Resolve to use the new format.
Jsons.valueToTree<JsonNode>(stateValue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import kotlin.test.assertNull
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource

class MysqlJdbcPartitionFactoryTest {
companion object {
Expand Down Expand Up @@ -219,13 +221,28 @@ class MysqlJdbcPartitionFactoryTest {
assertTrue(jdbcPartition is MysqlJdbcCursorIncrementalPartition)
}

@Test
fun testResumeFromCompletedCursorBasedReadTimestamp() {
@ParameterizedTest
@CsvSource(
"'2025-09-03T05:23:35', '2025-09-03T05:23:35.000000Z'",
"'2025-09-03T05:23:35.0', '2025-09-03T05:23:35.000000Z'",
"'2025-09-03T05:23:35.1', '2025-09-03T05:23:35.100000Z'",
"'2025-09-03T05:23:35.123', '2025-09-03T05:23:35.123000Z'",
"'2025-09-03T05:23:35.123456789', '2025-09-03T05:23:35.123456Z'",
"'2025-09-03T05:23:35.123+00:00', '2025-09-03T05:23:35.123000Z'",
"'2025-09-03T05:23:35.123+00:00', '2025-09-03T05:23:35.123000Z'",
"'2025-09-03T05:23:35Z', '2025-09-03T05:23:35.000000Z'",
"'2025-09-03T05:23:35 Z', '2025-09-03T05:23:35.000000Z'",
"'2025-09-03T05:23:35.12345 +12:34', '2025-09-03T05:23:35.123450+12:34'",
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

excellent

fun testResumeFromCompletedCursorBasedReadTimestamp(
cursorVal: String,
expectedLowerBound: String
) {
val incomingStateValue: OpaqueStateValue =
Jsons.readTree(
"""
{
"cursor": "2025-09-03T05:23:35",
"cursor": "$cursorVal",
"version": 2,
"state_type": "cursor_based",
"stream_name": "stream2",
Expand All @@ -245,7 +262,7 @@ class MysqlJdbcPartitionFactoryTest {
assertTrue(jdbcPartition is MysqlJdbcCursorIncrementalPartition)

assertEquals(
Jsons.valueToTree("2025-09-02T05:23:35.000000Z"),
Jsons.valueToTree("$expectedLowerBound"),
(jdbcPartition as MysqlJdbcCursorIncrementalPartition).cursorLowerBound
)
}
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ Any database or table encoding combination of charset and collation is supported

| Version | Date | Pull Request | Subject |
|:---------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.9.3 | 2024-12-18 | [49932](https://github.com/airbytehq/airbyte/pull/49932) | Backward compatibility for saved states with timestamp that include timezone offset. |
| 3.9.2 | 2024-12-16 | [49830](https://github.com/airbytehq/airbyte/pull/49830) | Fixes an issue with auto generated tinyint columns |
| 3.9.1 | 2024-12-12 | [49456](https://github.com/airbytehq/airbyte/pull/49456) | Bump version to re-relase |
| 3.9.0 | 2024-12-12 | [49423](https://github.com/airbytehq/airbyte/pull/49423) | Promoting release candidate 3.9.0-rc.27 to a main version. |
Expand Down
Loading