diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java index 7eb6b225016..4b4b9c8374c 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java @@ -113,8 +113,8 @@ protected interface CheckedFunction { private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s (" + "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, flow_name varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " + " flow_action varchar(100) NOT NULL, " - + "event_timestamp TIMESTAMP NOT NULL, " - + "lease_acquisition_timestamp TIMESTAMP NULL, " + + "event_timestamp TIMESTAMP(3) NOT NULL, " + + "lease_acquisition_timestamp TIMESTAMP(3) NULL, " + "PRIMARY KEY (flow_group,flow_name,flow_action))"; // Deletes rows older than retention time period regardless of lease status as they should all be invalid or completed // since retention >> linger @@ -144,7 +144,8 @@ protected interface CheckedFunction { + "WHEN CURRENT_TIMESTAMP(3) < DATE_ADD(lease_acquisition_timestamp, INTERVAL linger*1000 MICROSECOND) then 1 " + "WHEN CURRENT_TIMESTAMP(3) >= DATE_ADD(lease_acquisition_timestamp, INTERVAL linger*1000 MICROSECOND) then 2 " + "ELSE 3 END as lease_validity_status, linger, " - + "UTC_TIMESTAMP(3) as utc_current_timestamp FROM %s, %s " + WHERE_CLAUSE_TO_MATCH_KEY; + + "CONVERT_TZ(CURRENT_TIMESTAMP(3), @@session.time_zone, '+00:00') as utc_current_timestamp FROM %s, %s " + + WHERE_CLAUSE_TO_MATCH_KEY; // Same as query above, except that isWithinEpsilon is True if the reminder event timestamp (provided by caller) is // OLDER than or equal to the db event_timestamp and within epsilon away from it. protected static final String GET_EVENT_INFO_STATEMENT_FOR_REMINDER = "SELECT " @@ -154,7 +155,8 @@ protected interface CheckedFunction { + "WHEN CURRENT_TIMESTAMP(3) < DATE_ADD(lease_acquisition_timestamp, INTERVAL linger*1000 MICROSECOND) then 1 " + "WHEN CURRENT_TIMESTAMP(3) >= DATE_ADD(lease_acquisition_timestamp, INTERVAL linger*1000 MICROSECOND) then 2 " + "ELSE 3 END as lease_validity_status, linger, " - + "UTC_TIMESTAMP(3) as utc_current_timestamp FROM %s, %s " + WHERE_CLAUSE_TO_MATCH_KEY; + + "CONVERT_TZ(CURRENT_TIMESTAMP(3), @@session.time_zone, '+00:00') as utc_current_timestamp FROM %s, %s " + + WHERE_CLAUSE_TO_MATCH_KEY; // Insert or update row to acquire lease if values have not changed since the previous read // Need to define three separate statements to handle cases where row does not exist or has null values to check protected static final String ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT INTO %s (flow_group, flow_name, " @@ -169,7 +171,7 @@ protected interface CheckedFunction { + WHERE_CLAUSE_TO_MATCH_ROW; // Complete lease acquisition if values have not changed since lease was acquired protected static final String CONDITIONALLY_COMPLETE_LEASE_STATEMENT = "UPDATE %s SET " - + "lease_acquisition_timestamp = NULL " + WHERE_CLAUSE_TO_MATCH_ROW; + + "event_timestamp=event_timestamp, lease_acquisition_timestamp = NULL " + WHERE_CLAUSE_TO_MATCH_ROW; private static final ThreadLocal UTC_CAL = ThreadLocal.withInitial(() -> Calendar.getInstance(TimeZone.getTimeZone("UTC")));