diff --git a/airbyte-api/server-api/src/main/openapi/config.yaml b/airbyte-api/server-api/src/main/openapi/config.yaml
index 84b47ebcb9b..f51df36d205 100644
--- a/airbyte-api/server-api/src/main/openapi/config.yaml
+++ b/airbyte-api/server-api/src/main/openapi/config.yaml
@@ -2358,6 +2358,25 @@ paths:
$ref: "#/components/responses/NotFoundResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"
+ /v1/connections/events/backfill:
+ post:
+ tags:
+ - connection
+ summary: Backfill events for a connection
+ operationId: backfillConnectionEvents
+ requestBody:
+ content:
+ application/json:
+ schema:
+ $ref: "#/components/schemas/ConnectionEventsBackfillRequestBody"
+ required: true
+ responses:
+ "200":
+ description: Successful operation
+ "404":
+ $ref: "#/components/responses/NotFoundResponse"
+ "422":
+ $ref: "#/components/responses/InvalidInputResponse"
/v1/connections/last_job_per_stream:
post:
tags:
@@ -10266,6 +10285,23 @@ components:
description: The end datetime of a time range to filter by
type: string
format: date-time
+ ConnectionEventsBackfillRequestBody:
+ type: object
+ required:
+ - connectionId
+ - createdAtStart
+ - createdAtEnd
+ properties:
+ connectionId:
+ $ref: "#/components/schemas/ConnectionId"
+ createdAtStart:
+ description: The start datetime of a time range to backfill events
+ type: string
+ format: date-time
+ createdAtEnd:
+ description: The end datetime of a time range to backfill events
+ type: string
+ format: date-time
ConnectionLastJobPerStreamRead:
type: array
diff --git a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/ConnectionsHandler.java b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/ConnectionsHandler.java
index 2a1ae2ef8ee..a4c0d3d660a 100644
--- a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/ConnectionsHandler.java
+++ b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/ConnectionsHandler.java
@@ -29,6 +29,7 @@
import io.airbyte.api.model.generated.ConnectionEventList;
import io.airbyte.api.model.generated.ConnectionEventType;
import io.airbyte.api.model.generated.ConnectionEventWithDetails;
+import io.airbyte.api.model.generated.ConnectionEventsBackfillRequestBody;
import io.airbyte.api.model.generated.ConnectionEventsRequestBody;
import io.airbyte.api.model.generated.ConnectionLastJobPerStreamReadItem;
import io.airbyte.api.model.generated.ConnectionLastJobPerStreamRequestBody;
@@ -69,6 +70,7 @@
import io.airbyte.commons.protocol.CatalogDiffHelpers;
import io.airbyte.commons.server.converters.ApiPojoConverters;
import io.airbyte.commons.server.converters.CatalogDiffConverters;
+import io.airbyte.commons.server.converters.JobConverter;
import io.airbyte.commons.server.errors.BadRequestException;
import io.airbyte.commons.server.handlers.helpers.AutoPropagateSchemaChangeHelper;
import io.airbyte.commons.server.handlers.helpers.AutoPropagateSchemaChangeHelper.UpdateSchemaResult;
@@ -120,6 +122,8 @@
import io.airbyte.data.services.shared.ConnectionAutoDisabledReason;
import io.airbyte.data.services.shared.ConnectionAutoUpdatedReason;
import io.airbyte.data.services.shared.ConnectionEvent;
+import io.airbyte.data.services.shared.FailedEvent;
+import io.airbyte.data.services.shared.FinalStatusEvent;
import io.airbyte.featureflag.CheckWithCatalog;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.ResetStreamsStateWhenDisabled;
@@ -141,7 +145,9 @@
import java.io.IOException;
import java.time.Instant;
import java.time.LocalDate;
+import java.time.OffsetDateTime;
import java.time.ZoneId;
+import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
@@ -1154,6 +1160,85 @@ private ConnectionEventWithDetails hydrateConnectionEvent(final ConnectionTimeli
return connectionEventWithDetails;
}
+ /**
+ * Backfill jobs to timeline events. Supported event types:
+ *
+ * 1. SYNC_CANCELLED 2. SYNC_SUCCEEDED 3. SYNC_FAILED 4. SYNC_INCOMPLETE 5. REFRESH_SUCCEEDED 6.
+ * REFRESH_FAILED 7. REFRESH_INCOMPLETE 8. REFRESH_CANCELLED 9. CLEAR_SUCCEEDED 10. CLEAR_FAILED 11.
+ * CLEAR_INCOMPLETE 12. CLEAR_CANCELLED
+ *
+ * Notes:
+ *
+ * 1. Manually started events (X_STARTED) will NOT be backfilled because we don't have that
+ * information from Jobs table.
+ *
+ *
+ * 2. Manually cancelled events (X_CANCELLED) will be backfilled, but the associated user ID will be
+ * missing as it is not trackable from Jobs table.
+ *
+ *
+ * 3. RESET_CONNECTION is just the old enum name of CLEAR.
+ *
+ */
+ public void backfillConnectionEvents(final ConnectionEventsBackfillRequestBody connectionEventsBackfillRequestBody) throws IOException {
+ final UUID connectionId = connectionEventsBackfillRequestBody.getConnectionId();
+ final OffsetDateTime startTime = connectionEventsBackfillRequestBody.getCreatedAtStart();
+ final OffsetDateTime endTime = connectionEventsBackfillRequestBody.getCreatedAtEnd();
+ LOGGER.info("Backfilled events from {} to {} for connection {}", startTime, endTime, connectionId);
+ // 1. list all jobs within a given time window
+ final List allJobsToMigrate = jobPersistence.listJobsForConvertingToEvents(
+ Set.of(ConfigType.SYNC, ConfigType.REFRESH, ConfigType.RESET_CONNECTION),
+ Set.of(JobStatus.SUCCEEDED, JobStatus.FAILED, JobStatus.INCOMPLETE, JobStatus.CANCELLED),
+ startTime,
+ endTime);
+ LOGGER.info("Verify listing jobs. {} jobs found.", allJobsToMigrate.size());
+ // 2. For each job, log a timeline event
+ allJobsToMigrate.forEach(job -> {
+ final JobWithAttemptsRead jobRead = JobConverter.getJobWithAttemptsRead(job);
+ // Construct a timeline event
+ final FinalStatusEvent event;
+ if (job.getStatus() == JobStatus.FAILED || job.getStatus() == JobStatus.INCOMPLETE) {
+ // We need to log a failed event with job stats and the first failure reason.
+ event = new FailedEvent(
+ job.getId(),
+ job.getCreatedAtInSecond(),
+ job.getUpdatedAtInSecond(),
+ jobRead.getAttempts().stream()
+ .mapToLong(attempt -> attempt.getBytesSynced() != null ? attempt.getBytesSynced() : 0)
+ .sum(),
+ jobRead.getAttempts().stream()
+ .mapToLong(attempt -> attempt.getRecordsSynced() != null ? attempt.getRecordsSynced() : 0)
+ .sum(),
+ job.getAttemptsCount(),
+ job.getConfigType().name(),
+ job.getStatus().name(),
+ JobConverter.getStreamsAssociatedWithJob(job),
+ job.getLastAttempt()
+ .flatMap(Attempt::getFailureSummary)
+ .flatMap(summary -> summary.getFailures().stream().findFirst()));
+ } else { // SUCCEEDED and CANCELLED
+ // We need to log a timeline event with job stats.
+ event = new FinalStatusEvent(
+ job.getId(),
+ job.getCreatedAtInSecond(),
+ job.getUpdatedAtInSecond(),
+ jobRead.getAttempts().stream()
+ .mapToLong(attempt -> attempt.getBytesSynced() != null ? attempt.getBytesSynced() : 0)
+ .sum(),
+ jobRead.getAttempts().stream()
+ .mapToLong(attempt -> attempt.getRecordsSynced() != null ? attempt.getRecordsSynced() : 0)
+ .sum(),
+ job.getAttemptsCount(),
+ job.getConfigType().name(),
+ job.getStatus().name(),
+ JobConverter.getStreamsAssociatedWithJob(job));
+ }
+ // Save an event
+ connectionTimelineEventService.writeEventWithTimestamp(
+ UUID.fromString(job.getScope()), event, null, Instant.ofEpochSecond(job.getUpdatedAtInSecond()).atOffset(ZoneOffset.UTC));
+ });
+ }
+
/**
* Returns data history for the given connection for requested number of jobs.
*
diff --git a/airbyte-data/src/main/kotlin/io/airbyte/data/repositories/entities/ConnectionTimelineEvent.kt b/airbyte-data/src/main/kotlin/io/airbyte/data/repositories/entities/ConnectionTimelineEvent.kt
index 618d97570cf..f949dfac966 100644
--- a/airbyte-data/src/main/kotlin/io/airbyte/data/repositories/entities/ConnectionTimelineEvent.kt
+++ b/airbyte-data/src/main/kotlin/io/airbyte/data/repositories/entities/ConnectionTimelineEvent.kt
@@ -1,7 +1,6 @@
package io.airbyte.data.repositories.entities
import io.micronaut.data.annotation.AutoPopulated
-import io.micronaut.data.annotation.DateCreated
import io.micronaut.data.annotation.Id
import io.micronaut.data.annotation.MappedEntity
import io.micronaut.data.annotation.TypeDef
@@ -19,6 +18,5 @@ data class ConnectionTimelineEvent(
var eventType: String,
@field:TypeDef(type = DataType.JSON)
var summary: String? = null,
- @DateCreated
var createdAt: OffsetDateTime? = null,
)
diff --git a/airbyte-data/src/main/kotlin/io/airbyte/data/services/ConnectionTimelineEventService.kt b/airbyte-data/src/main/kotlin/io/airbyte/data/services/ConnectionTimelineEventService.kt
index 9b7617c5f4b..dd58d6e5a66 100644
--- a/airbyte-data/src/main/kotlin/io/airbyte/data/services/ConnectionTimelineEventService.kt
+++ b/airbyte-data/src/main/kotlin/io/airbyte/data/services/ConnectionTimelineEventService.kt
@@ -6,12 +6,21 @@ import java.time.OffsetDateTime
import java.util.UUID
interface ConnectionTimelineEventService {
+ // This function is used to write an event with the current timestamp. Should be used for all new events.
fun writeEvent(
connectionId: UUID,
event: ConnectionEvent,
userId: UUID? = null,
): ConnectionTimelineEvent
+ // This function is used to write an event with a specific timestamp. This is ONLY useful for backfilling.
+ fun writeEventWithTimestamp(
+ connectionId: UUID,
+ event: ConnectionEvent,
+ userId: UUID? = null,
+ createdAt: OffsetDateTime,
+ ): ConnectionTimelineEvent
+
fun getEvent(eventId: UUID): ConnectionTimelineEvent
fun listEvents(
diff --git a/airbyte-data/src/main/kotlin/io/airbyte/data/services/impls/data/ConnectionTimelineEventServiceDataImpl.kt b/airbyte-data/src/main/kotlin/io/airbyte/data/services/impls/data/ConnectionTimelineEventServiceDataImpl.kt
index 22d0d8e24bf..5ad42342ac6 100644
--- a/airbyte-data/src/main/kotlin/io/airbyte/data/services/impls/data/ConnectionTimelineEventServiceDataImpl.kt
+++ b/airbyte-data/src/main/kotlin/io/airbyte/data/services/impls/data/ConnectionTimelineEventServiceDataImpl.kt
@@ -25,7 +25,19 @@ class ConnectionTimelineEventServiceDataImpl(
): ConnectionTimelineEvent {
val serializedEvent = mapper.writeValueAsString(event)
val timelineEvent =
- ConnectionTimelineEvent(null, connectionId, userId, event.getEventType().toString(), serializedEvent, null)
+ ConnectionTimelineEvent(null, connectionId, userId, event.getEventType().toString(), serializedEvent, OffsetDateTime.now())
+ return repository.save(timelineEvent)
+ }
+
+ override fun writeEventWithTimestamp(
+ connectionId: UUID,
+ event: ConnectionEvent,
+ userId: UUID?,
+ createdAt: OffsetDateTime,
+ ): ConnectionTimelineEvent {
+ val serializedEvent = mapper.writeValueAsString(event)
+ val timelineEvent =
+ ConnectionTimelineEvent(null, connectionId, userId, event.getEventType().toString(), serializedEvent, createdAt)
return repository.save(timelineEvent)
}
diff --git a/airbyte-data/src/test/kotlin/io/airbyte/data/repositories/ConnectionTimelineEventRepositoryTest.kt b/airbyte-data/src/test/kotlin/io/airbyte/data/repositories/ConnectionTimelineEventRepositoryTest.kt
index d4b0b49f318..8943ac41278 100644
--- a/airbyte-data/src/test/kotlin/io/airbyte/data/repositories/ConnectionTimelineEventRepositoryTest.kt
+++ b/airbyte-data/src/test/kotlin/io/airbyte/data/repositories/ConnectionTimelineEventRepositoryTest.kt
@@ -10,6 +10,8 @@ import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Nested
import org.junit.jupiter.api.Test
+import java.time.OffsetDateTime
+import java.time.ZoneOffset
import java.util.UUID
@MicronautTest
@@ -31,6 +33,7 @@ internal class ConnectionTimelineEventRepositoryTest : AbstractConfigRepositoryT
ConnectionTimelineEvent(
connectionId = UUID.randomUUID(),
eventType = "Test",
+ createdAt = OffsetDateTime.now(),
)
val saved = connectionTimelineEventRepository.save(event)
@@ -47,21 +50,25 @@ internal class ConnectionTimelineEventRepositoryTest : AbstractConfigRepositoryT
ConnectionTimelineEvent(
connectionId = connectionId,
eventType = ConnectionEvent.Type.SYNC_STARTED.name,
+ createdAt = OffsetDateTime.of(2024, 9, 1, 0, 0, 0, 0, ZoneOffset.UTC),
)
private val event2 =
ConnectionTimelineEvent(
connectionId = connectionId,
eventType = ConnectionEvent.Type.SYNC_CANCELLED.name,
+ createdAt = OffsetDateTime.of(2024, 9, 2, 0, 0, 0, 0, ZoneOffset.UTC),
)
private val event3 =
ConnectionTimelineEvent(
connectionId = connectionId,
eventType = ConnectionEvent.Type.REFRESH_STARTED.name,
+ createdAt = OffsetDateTime.of(2024, 9, 3, 0, 0, 0, 0, ZoneOffset.UTC),
)
private val event4 =
ConnectionTimelineEvent(
connectionId = connectionId,
eventType = ConnectionEvent.Type.REFRESH_SUCCEEDED.name,
+ createdAt = OffsetDateTime.of(2024, 9, 4, 0, 0, 0, 0, ZoneOffset.UTC),
)
@BeforeEach
diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java
index 7a8367b3791..7fb0f569d12 100644
--- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java
+++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java
@@ -1106,6 +1106,30 @@ public List listJobs(final ConfigType configType, final Instant attemptEnde
timeConvertedIntoLocalDateTime)));
}
+ @Override
+ public List listJobsForConvertingToEvents(Set configTypes,
+ Set jobStatuses,
+ OffsetDateTime createdAtStart,
+ OffsetDateTime createdAtEnd)
+ throws IOException {
+ return jobDatabase.query(ctx -> {
+ final String jobsSubquery = "(" + ctx.select(DSL.asterisk()).from(JOBS)
+ .where(JOBS.CONFIG_TYPE.in(configTypeSqlNames(configTypes)))
+ .and(jobStatuses == null ? DSL.noCondition()
+ : JOBS.STATUS.in(jobStatuses.stream()
+ .map(status -> io.airbyte.db.instance.jobs.jooq.generated.enums.JobStatus.lookupLiteral(toSqlName(status)))
+ .collect(Collectors.toList())))
+ .and(createdAtStart == null ? DSL.noCondition() : JOBS.CREATED_AT.ge(createdAtStart))
+ .and(createdAtEnd == null ? DSL.noCondition() : JOBS.CREATED_AT.le(createdAtEnd))
+ .getSQL(ParamType.INLINED) + ") AS jobs";
+
+ final String fullQuery = jobSelectAndJoin(jobsSubquery);
+ LOGGER.debug("jobs query: {}", fullQuery);
+ return getJobsFromResult(ctx.fetch(fullQuery));
+ });
+
+ }
+
@Override
public List listJobsLight(final Set jobIds) throws IOException {
return jobDatabase.query(ctx -> {
diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java
index 294db168ad5..2b13fd5d5f8 100644
--- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java
+++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java
@@ -286,6 +286,12 @@ List listJobs(
*/
List listJobs(ConfigType configType, Instant attemptEndedAtTimestamp) throws IOException;
+ List listJobsForConvertingToEvents(Set configTypes,
+ Set jobStatuses,
+ OffsetDateTime createdAtStart,
+ OffsetDateTime createdAtEnd)
+ throws IOException;
+
/**
* List jobs based on job IDs, nothing more.
*
diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/ConnectionApiController.java b/airbyte-server/src/main/java/io/airbyte/server/apis/ConnectionApiController.java
index bb2e05d5ffc..286d822c099 100644
--- a/airbyte-server/src/main/java/io/airbyte/server/apis/ConnectionApiController.java
+++ b/airbyte-server/src/main/java/io/airbyte/server/apis/ConnectionApiController.java
@@ -21,6 +21,7 @@
import io.airbyte.api.model.generated.ConnectionEventIdRequestBody;
import io.airbyte.api.model.generated.ConnectionEventList;
import io.airbyte.api.model.generated.ConnectionEventWithDetails;
+import io.airbyte.api.model.generated.ConnectionEventsBackfillRequestBody;
import io.airbyte.api.model.generated.ConnectionEventsRequestBody;
import io.airbyte.api.model.generated.ConnectionIdRequestBody;
import io.airbyte.api.model.generated.ConnectionLastJobPerStreamReadItem;
@@ -114,6 +115,17 @@ public InternalOperationResult autoDisableConnection(@Body final ConnectionIdReq
return ApiHelper.execute(() -> connectionsHandler.autoDisableConnection(connectionIdRequestBody.getConnectionId()));
}
+ @Override
+ @Post(uri = "/backfill_events")
+ @Secured({ADMIN})
+ @ExecuteOn(AirbyteTaskExecutors.IO)
+ public void backfillConnectionEvents(ConnectionEventsBackfillRequestBody connectionEventsBackfillRequestBody) {
+ ApiHelper.execute(() -> {
+ connectionsHandler.backfillConnectionEvents(connectionEventsBackfillRequestBody);
+ return null;
+ });
+ }
+
@Override
@Post(uri = "/create")
@Secured({WORKSPACE_EDITOR, ORGANIZATION_EDITOR})