Skip to content

Commit

Permalink
feat: backfill jobs to connection time events (#13933)
Browse files Browse the repository at this point in the history
  • Loading branch information
keyihuang committed Sep 26, 2024
1 parent a5e9b59 commit 927fbea
Show file tree
Hide file tree
Showing 9 changed files with 192 additions and 3 deletions.
36 changes: 36 additions & 0 deletions airbyte-api/server-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2358,6 +2358,25 @@ paths:
$ref: "#/components/responses/NotFoundResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/connections/events/backfill:
post:
tags:
- connection
summary: Backfill events for a connection
operationId: backfillConnectionEvents
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/ConnectionEventsBackfillRequestBody"
required: true
responses:
"200":
description: Successful operation
"404":
$ref: "#/components/responses/NotFoundResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/connections/last_job_per_stream:
post:
tags:
Expand Down Expand Up @@ -10266,6 +10285,23 @@ components:
description: The end datetime of a time range to filter by
type: string
format: date-time
ConnectionEventsBackfillRequestBody:
type: object
required:
- connectionId
- createdAtStart
- createdAtEnd
properties:
connectionId:
$ref: "#/components/schemas/ConnectionId"
createdAtStart:
description: The start datetime of a time range to backfill events
type: string
format: date-time
createdAtEnd:
description: The end datetime of a time range to backfill events
type: string
format: date-time

ConnectionLastJobPerStreamRead:
type: array
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.airbyte.api.model.generated.ConnectionEventList;
import io.airbyte.api.model.generated.ConnectionEventType;
import io.airbyte.api.model.generated.ConnectionEventWithDetails;
import io.airbyte.api.model.generated.ConnectionEventsBackfillRequestBody;
import io.airbyte.api.model.generated.ConnectionEventsRequestBody;
import io.airbyte.api.model.generated.ConnectionLastJobPerStreamReadItem;
import io.airbyte.api.model.generated.ConnectionLastJobPerStreamRequestBody;
Expand Down Expand Up @@ -69,6 +70,7 @@
import io.airbyte.commons.protocol.CatalogDiffHelpers;
import io.airbyte.commons.server.converters.ApiPojoConverters;
import io.airbyte.commons.server.converters.CatalogDiffConverters;
import io.airbyte.commons.server.converters.JobConverter;
import io.airbyte.commons.server.errors.BadRequestException;
import io.airbyte.commons.server.handlers.helpers.AutoPropagateSchemaChangeHelper;
import io.airbyte.commons.server.handlers.helpers.AutoPropagateSchemaChangeHelper.UpdateSchemaResult;
Expand Down Expand Up @@ -120,6 +122,8 @@
import io.airbyte.data.services.shared.ConnectionAutoDisabledReason;
import io.airbyte.data.services.shared.ConnectionAutoUpdatedReason;
import io.airbyte.data.services.shared.ConnectionEvent;
import io.airbyte.data.services.shared.FailedEvent;
import io.airbyte.data.services.shared.FinalStatusEvent;
import io.airbyte.featureflag.CheckWithCatalog;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.ResetStreamsStateWhenDisabled;
Expand All @@ -141,7 +145,9 @@
import java.io.IOException;
import java.time.Instant;
import java.time.LocalDate;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -1154,6 +1160,85 @@ private ConnectionEventWithDetails hydrateConnectionEvent(final ConnectionTimeli
return connectionEventWithDetails;
}

/**
* Backfill jobs to timeline events. Supported event types:
* <p>
* 1. SYNC_CANCELLED 2. SYNC_SUCCEEDED 3. SYNC_FAILED 4. SYNC_INCOMPLETE 5. REFRESH_SUCCEEDED 6.
* REFRESH_FAILED 7. REFRESH_INCOMPLETE 8. REFRESH_CANCELLED 9. CLEAR_SUCCEEDED 10. CLEAR_FAILED 11.
* CLEAR_INCOMPLETE 12. CLEAR_CANCELLED
* </p>
* Notes:
* <p>
* 1. Manually started events (X_STARTED) will NOT be backfilled because we don't have that
* information from Jobs table.
* </p>
* <p>
* 2. Manually cancelled events (X_CANCELLED) will be backfilled, but the associated user ID will be
* missing as it is not trackable from Jobs table.
* </p>
* <p>
* 3. RESET_CONNECTION is just the old enum name of CLEAR.
* </p>
*/
public void backfillConnectionEvents(final ConnectionEventsBackfillRequestBody connectionEventsBackfillRequestBody) throws IOException {
final UUID connectionId = connectionEventsBackfillRequestBody.getConnectionId();
final OffsetDateTime startTime = connectionEventsBackfillRequestBody.getCreatedAtStart();
final OffsetDateTime endTime = connectionEventsBackfillRequestBody.getCreatedAtEnd();
LOGGER.info("Backfilled events from {} to {} for connection {}", startTime, endTime, connectionId);
// 1. list all jobs within a given time window
final List<Job> allJobsToMigrate = jobPersistence.listJobsForConvertingToEvents(
Set.of(ConfigType.SYNC, ConfigType.REFRESH, ConfigType.RESET_CONNECTION),
Set.of(JobStatus.SUCCEEDED, JobStatus.FAILED, JobStatus.INCOMPLETE, JobStatus.CANCELLED),
startTime,
endTime);
LOGGER.info("Verify listing jobs. {} jobs found.", allJobsToMigrate.size());
// 2. For each job, log a timeline event
allJobsToMigrate.forEach(job -> {
final JobWithAttemptsRead jobRead = JobConverter.getJobWithAttemptsRead(job);
// Construct a timeline event
final FinalStatusEvent event;
if (job.getStatus() == JobStatus.FAILED || job.getStatus() == JobStatus.INCOMPLETE) {
// We need to log a failed event with job stats and the first failure reason.
event = new FailedEvent(
job.getId(),
job.getCreatedAtInSecond(),
job.getUpdatedAtInSecond(),
jobRead.getAttempts().stream()
.mapToLong(attempt -> attempt.getBytesSynced() != null ? attempt.getBytesSynced() : 0)
.sum(),
jobRead.getAttempts().stream()
.mapToLong(attempt -> attempt.getRecordsSynced() != null ? attempt.getRecordsSynced() : 0)
.sum(),
job.getAttemptsCount(),
job.getConfigType().name(),
job.getStatus().name(),
JobConverter.getStreamsAssociatedWithJob(job),
job.getLastAttempt()
.flatMap(Attempt::getFailureSummary)
.flatMap(summary -> summary.getFailures().stream().findFirst()));
} else { // SUCCEEDED and CANCELLED
// We need to log a timeline event with job stats.
event = new FinalStatusEvent(
job.getId(),
job.getCreatedAtInSecond(),
job.getUpdatedAtInSecond(),
jobRead.getAttempts().stream()
.mapToLong(attempt -> attempt.getBytesSynced() != null ? attempt.getBytesSynced() : 0)
.sum(),
jobRead.getAttempts().stream()
.mapToLong(attempt -> attempt.getRecordsSynced() != null ? attempt.getRecordsSynced() : 0)
.sum(),
job.getAttemptsCount(),
job.getConfigType().name(),
job.getStatus().name(),
JobConverter.getStreamsAssociatedWithJob(job));
}
// Save an event
connectionTimelineEventService.writeEventWithTimestamp(
UUID.fromString(job.getScope()), event, null, Instant.ofEpochSecond(job.getUpdatedAtInSecond()).atOffset(ZoneOffset.UTC));
});
}

/**
* Returns data history for the given connection for requested number of jobs.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.airbyte.data.repositories.entities

import io.micronaut.data.annotation.AutoPopulated
import io.micronaut.data.annotation.DateCreated
import io.micronaut.data.annotation.Id
import io.micronaut.data.annotation.MappedEntity
import io.micronaut.data.annotation.TypeDef
Expand All @@ -19,6 +18,5 @@ data class ConnectionTimelineEvent(
var eventType: String,
@field:TypeDef(type = DataType.JSON)
var summary: String? = null,
@DateCreated
var createdAt: OffsetDateTime? = null,
)
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,21 @@ import java.time.OffsetDateTime
import java.util.UUID

interface ConnectionTimelineEventService {
// This function is used to write an event with the current timestamp. Should be used for all new events.
fun writeEvent(
connectionId: UUID,
event: ConnectionEvent,
userId: UUID? = null,
): ConnectionTimelineEvent

// This function is used to write an event with a specific timestamp. This is ONLY useful for backfilling.
fun writeEventWithTimestamp(
connectionId: UUID,
event: ConnectionEvent,
userId: UUID? = null,
createdAt: OffsetDateTime,
): ConnectionTimelineEvent

fun getEvent(eventId: UUID): ConnectionTimelineEvent

fun listEvents(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,19 @@ class ConnectionTimelineEventServiceDataImpl(
): ConnectionTimelineEvent {
val serializedEvent = mapper.writeValueAsString(event)
val timelineEvent =
ConnectionTimelineEvent(null, connectionId, userId, event.getEventType().toString(), serializedEvent, null)
ConnectionTimelineEvent(null, connectionId, userId, event.getEventType().toString(), serializedEvent, OffsetDateTime.now())
return repository.save(timelineEvent)
}

override fun writeEventWithTimestamp(
connectionId: UUID,
event: ConnectionEvent,
userId: UUID?,
createdAt: OffsetDateTime,
): ConnectionTimelineEvent {
val serializedEvent = mapper.writeValueAsString(event)
val timelineEvent =
ConnectionTimelineEvent(null, connectionId, userId, event.getEventType().toString(), serializedEvent, createdAt)
return repository.save(timelineEvent)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Nested
import org.junit.jupiter.api.Test
import java.time.OffsetDateTime
import java.time.ZoneOffset
import java.util.UUID

@MicronautTest
Expand All @@ -31,6 +33,7 @@ internal class ConnectionTimelineEventRepositoryTest : AbstractConfigRepositoryT
ConnectionTimelineEvent(
connectionId = UUID.randomUUID(),
eventType = "Test",
createdAt = OffsetDateTime.now(),
)

val saved = connectionTimelineEventRepository.save(event)
Expand All @@ -47,21 +50,25 @@ internal class ConnectionTimelineEventRepositoryTest : AbstractConfigRepositoryT
ConnectionTimelineEvent(
connectionId = connectionId,
eventType = ConnectionEvent.Type.SYNC_STARTED.name,
createdAt = OffsetDateTime.of(2024, 9, 1, 0, 0, 0, 0, ZoneOffset.UTC),
)
private val event2 =
ConnectionTimelineEvent(
connectionId = connectionId,
eventType = ConnectionEvent.Type.SYNC_CANCELLED.name,
createdAt = OffsetDateTime.of(2024, 9, 2, 0, 0, 0, 0, ZoneOffset.UTC),
)
private val event3 =
ConnectionTimelineEvent(
connectionId = connectionId,
eventType = ConnectionEvent.Type.REFRESH_STARTED.name,
createdAt = OffsetDateTime.of(2024, 9, 3, 0, 0, 0, 0, ZoneOffset.UTC),
)
private val event4 =
ConnectionTimelineEvent(
connectionId = connectionId,
eventType = ConnectionEvent.Type.REFRESH_SUCCEEDED.name,
createdAt = OffsetDateTime.of(2024, 9, 4, 0, 0, 0, 0, ZoneOffset.UTC),
)

@BeforeEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,30 @@ public List<Job> listJobs(final ConfigType configType, final Instant attemptEnde
timeConvertedIntoLocalDateTime)));
}

@Override
public List<Job> listJobsForConvertingToEvents(Set<ConfigType> configTypes,
Set<JobStatus> jobStatuses,
OffsetDateTime createdAtStart,
OffsetDateTime createdAtEnd)
throws IOException {
return jobDatabase.query(ctx -> {
final String jobsSubquery = "(" + ctx.select(DSL.asterisk()).from(JOBS)
.where(JOBS.CONFIG_TYPE.in(configTypeSqlNames(configTypes)))
.and(jobStatuses == null ? DSL.noCondition()
: JOBS.STATUS.in(jobStatuses.stream()
.map(status -> io.airbyte.db.instance.jobs.jooq.generated.enums.JobStatus.lookupLiteral(toSqlName(status)))
.collect(Collectors.toList())))
.and(createdAtStart == null ? DSL.noCondition() : JOBS.CREATED_AT.ge(createdAtStart))
.and(createdAtEnd == null ? DSL.noCondition() : JOBS.CREATED_AT.le(createdAtEnd))
.getSQL(ParamType.INLINED) + ") AS jobs";

final String fullQuery = jobSelectAndJoin(jobsSubquery);
LOGGER.debug("jobs query: {}", fullQuery);
return getJobsFromResult(ctx.fetch(fullQuery));
});

}

@Override
public List<Job> listJobsLight(final Set<Long> jobIds) throws IOException {
return jobDatabase.query(ctx -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,12 @@ List<Job> listJobs(
*/
List<Job> listJobs(ConfigType configType, Instant attemptEndedAtTimestamp) throws IOException;

List<Job> listJobsForConvertingToEvents(Set<ConfigType> configTypes,
Set<JobStatus> jobStatuses,
OffsetDateTime createdAtStart,
OffsetDateTime createdAtEnd)
throws IOException;

/**
* List jobs based on job IDs, nothing more.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.airbyte.api.model.generated.ConnectionEventIdRequestBody;
import io.airbyte.api.model.generated.ConnectionEventList;
import io.airbyte.api.model.generated.ConnectionEventWithDetails;
import io.airbyte.api.model.generated.ConnectionEventsBackfillRequestBody;
import io.airbyte.api.model.generated.ConnectionEventsRequestBody;
import io.airbyte.api.model.generated.ConnectionIdRequestBody;
import io.airbyte.api.model.generated.ConnectionLastJobPerStreamReadItem;
Expand Down Expand Up @@ -114,6 +115,17 @@ public InternalOperationResult autoDisableConnection(@Body final ConnectionIdReq
return ApiHelper.execute(() -> connectionsHandler.autoDisableConnection(connectionIdRequestBody.getConnectionId()));
}

@Override
@Post(uri = "/backfill_events")
@Secured({ADMIN})
@ExecuteOn(AirbyteTaskExecutors.IO)
public void backfillConnectionEvents(ConnectionEventsBackfillRequestBody connectionEventsBackfillRequestBody) {
ApiHelper.execute(() -> {
connectionsHandler.backfillConnectionEvents(connectionEventsBackfillRequestBody);
return null;
});
}

@Override
@Post(uri = "/create")
@Secured({WORKSPACE_EDITOR, ORGANIZATION_EDITOR})
Expand Down

0 comments on commit 927fbea

Please sign in to comment.