From 553603085e8c62c3b01a79fbbff291fa17841271 Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Thu, 18 Apr 2024 14:01:13 -0700 Subject: [PATCH] Update tracker to handle refreshes (#12058) --- .../persistence/job/tracker/JobTracker.java | 24 +++++++++++---- .../job/tracker/JobTrackerTest.java | 30 +++++++++++++++++-- 2 files changed, 46 insertions(+), 8 deletions(-) diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/tracker/JobTracker.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/tracker/JobTracker.java index e38acd5db1d..126394ad101 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/tracker/JobTracker.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/tracker/JobTracker.java @@ -54,6 +54,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Optional; +import java.util.Set; import java.util.UUID; /** @@ -229,7 +230,12 @@ public void trackSync(final Job job, final JobState jobState) { final ActorDefinitionVersion destinationVersion = actorDefinitionVersionHelper.getDestinationVersion(destinationDefinition, workspaceId, standardSync.getDestinationId()); - final Map jobMetadata = generateJobMetadata(String.valueOf(jobId), configType, job.getAttemptsCount()); + final List jobsHistory = jobPersistence.listJobsIncludingId( + Set.of(ConfigType.SYNC, ConfigType.RESET_CONNECTION, ConfigType.REFRESH), connectionId.toString(), jobId, 2); + + final Optional previousJob = jobsHistory.stream().filter(jobHistory -> jobHistory.getId() != jobId).findFirst(); + + final Map jobMetadata = generateJobMetadata(String.valueOf(jobId), configType, job.getAttemptsCount(), previousJob); final Map jobAttemptMetadata = generateJobAttemptMetadata(jobId, jobState); final Map sourceDefMetadata = generateSourceDefinitionMetadata(sourceDefinition, sourceVersion); final Map destinationDefMetadata = generateDestinationDefinitionMetadata(destinationDefinition, destinationVersion); @@ -278,7 +284,7 @@ public void trackSyncForInternalFailure(final Long jobId, final ActorDefinitionVersion destinationVersion = actorDefinitionVersionHelper.getDestinationVersion(destinationDefinition, workspaceId, standardSync.getDestinationId()); - final Map jobMetadata = generateJobMetadata(String.valueOf(jobId), null, attempts); + final Map jobMetadata = generateJobMetadata(String.valueOf(jobId), null, attempts, Optional.empty()); final Map jobAttemptMetadata = generateJobAttemptMetadata(jobId, jobState); final Map sourceDefMetadata = generateSourceDefinitionMetadata(sourceDefinition, sourceVersion); final Map destinationDefMetadata = generateDestinationDefinitionMetadata(destinationDefinition, destinationVersion); @@ -508,17 +514,25 @@ private Map generateSourceDefinitionMetadata(final StandardSourc } private Map generateJobMetadata(final String jobId, final ConfigType configType) { - return generateJobMetadata(jobId, configType, 0); + return generateJobMetadata(jobId, configType, 0, Optional.empty()); } - private Map generateJobMetadata(final String jobId, final @Nullable ConfigType configType, final int attempt) { + @VisibleForTesting + Map generateJobMetadata(final String jobId, + final @Nullable ConfigType configType, + final int attempt, + final Optional previousJob) { final Map metadata = new HashMap<>(); if (configType != null) { metadata.put("job_type", configType); } metadata.put("job_id", jobId); metadata.put("attempt_id", attempt); - + previousJob.ifPresent(job -> { + if (job.getConfigType() != null) { + metadata.put("previous_job_type", job.getConfigType()); + } + }); return Collections.unmodifiableMap(metadata); } diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/tracker/JobTrackerTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/tracker/JobTrackerTest.java index 32715ca7988..a5f5cb44b54 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/tracker/JobTrackerTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/tracker/JobTrackerTest.java @@ -32,6 +32,7 @@ import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType; import io.airbyte.config.Metadata; import io.airbyte.config.NormalizationSummary; +import io.airbyte.config.RefreshConfig; import io.airbyte.config.Schedule; import io.airbyte.config.Schedule.TimeUnit; import io.airbyte.config.StandardCheckConnectionOutput; @@ -329,6 +330,11 @@ void testTrackSync() throws ConfigNotFoundException, IOException, JsonValidation testAsynchronous(ConfigType.SYNC, SYNC_CONFIG_METADATA); } + @Test + void testTrackRefresh() throws ConfigNotFoundException, IOException, JsonValidationException { + testAsynchronous(ConfigType.REFRESH, SYNC_CONFIG_METADATA); + } + @Test void testTrackSyncForInternalFailure() throws JsonValidationException, ConfigNotFoundException, IOException { final Long jobId = 12345L; @@ -500,6 +506,20 @@ void testConfigToMetadata() throws IOException { assertEquals(expected, actual); } + @Test + void testGenerateMetadata() { + final String jobId = "shouldBeLong"; + final int attemptId = 2; + final ConfigType configType = ConfigType.REFRESH; + final Job previousJob = new Job(0, ConfigType.RESET_CONNECTION, null, null, null, null, null, 0L, 0L); + + final Map metadata = jobTracker.generateJobMetadata(jobId, configType, attemptId, Optional.of(previousJob)); + assertEquals(jobId, metadata.get("job_id")); + assertEquals(attemptId, metadata.get("attempt_id")); + assertEquals(configType, metadata.get("job_type")); + assertEquals(ConfigType.RESET_CONNECTION, metadata.get("previous_job_type")); + } + void testAsynchronousAttempt(final ConfigType configType) throws ConfigNotFoundException, IOException, JsonValidationException { testAsynchronousAttempt(configType, getJobWithAttemptsMock(configType, LONG_JOB_ID), Collections.emptyMap()); } @@ -617,9 +637,6 @@ private Job getJobMock(final ConfigType configType, final long jobId) throws Con .withSyncMode(SyncMode.FULL_REFRESH) .withDestinationSyncMode(DestinationSyncMode.APPEND))); - final JobSyncConfig jobSyncConfig = new JobSyncConfig() - .withConfiguredAirbyteCatalog(catalog); - final AttemptSyncConfig attemptSyncConfig = new AttemptSyncConfig() .withSourceConfiguration(Jsons.jsonNode(ImmutableMap.of("key", "some_value"))) .withDestinationConfiguration(Jsons.jsonNode(ImmutableMap.of("key", false))); @@ -628,8 +645,15 @@ private Job getJobMock(final ConfigType configType, final long jobId) throws Con when(jobConfig.getConfigType()).thenReturn(configType); if (configType == ConfigType.SYNC) { + final JobSyncConfig jobSyncConfig = new JobSyncConfig() + .withConfiguredAirbyteCatalog(catalog); when(jobConfig.getSync()).thenReturn(jobSyncConfig); } + if (configType == ConfigType.REFRESH) { + final RefreshConfig refreshConfig = new RefreshConfig() + .withConfiguredAirbyteCatalog(catalog); + when(jobConfig.getRefresh()).thenReturn(refreshConfig); + } final Attempt attempt = mock(Attempt.class); when(attempt.getSyncConfig()).thenReturn(Optional.of(attemptSyncConfig));