Skip to content

Commit

Permalink
Adds endpoint for delete_stream_reset_records. Calls that from Stream…
Browse files Browse the repository at this point in the history
…… (#9052)
  • Loading branch information
tryangul committed Sep 27, 2023
1 parent d3fcb40 commit 40e58a4
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 13 deletions.
24 changes: 24 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3101,6 +3101,30 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/InternalOperationResult"
/v1/jobs/delete_stream_reset_records:
post:
tags:
- jobs
- internal
summary: Deletes all stream reset records for the specified job
operationId: deleteStreamResetRecordsForJob
requestBody:
content:
application/json:
schema:
type: object
required:
- connectionId
- jobId
properties:
connectionId:
$ref: "#/components/schemas/ConnectionId"
jobId:
$ref: "#/components/schemas/JobId"
required: true
responses:
"204":
description: Successfully delete stream reset records for job

# USERS
/v1/users/create:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal;
package io.airbyte.commons.temporal;

import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.when;

import io.airbyte.commons.temporal.StreamResetRecordsHelper;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.persistence.StreamResetPersistence;
import io.airbyte.persistence.job.JobPersistence;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.airbyte.api.model.generated.CheckInput;
import io.airbyte.api.model.generated.ConnectionIdRequestBody;
import io.airbyte.api.model.generated.ConnectionJobRequestBody;
import io.airbyte.api.model.generated.DeleteStreamResetRecordsForJobRequest;
import io.airbyte.api.model.generated.InternalOperationResult;
import io.airbyte.api.model.generated.JobCreate;
import io.airbyte.api.model.generated.JobDebugInfoRead;
Expand All @@ -35,6 +36,7 @@
import io.airbyte.commons.server.handlers.JobsHandler;
import io.airbyte.commons.server.handlers.SchedulerHandler;
import io.airbyte.commons.server.scheduling.AirbyteTaskExecutors;
import io.airbyte.commons.temporal.StreamResetRecordsHelper;
import io.micronaut.context.annotation.Context;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Post;
Expand All @@ -54,15 +56,18 @@ public class JobsApiController implements JobsApi {
private final SchedulerHandler schedulerHandler;
private final JobsHandler jobsHandler;
private final JobInputHandler jobInputHandler;
private final StreamResetRecordsHelper streamResetRecordsHelper;

public JobsApiController(final JobHistoryHandler jobHistoryHandler,
final SchedulerHandler schedulerHandler,
final JobInputHandler jobInputHandler,
final JobsHandler jobsHandler) {
final JobsHandler jobsHandler,
final StreamResetRecordsHelper streamResetRecordsHelper) {
this.jobHistoryHandler = jobHistoryHandler;
this.schedulerHandler = schedulerHandler;
this.jobInputHandler = jobInputHandler;
this.jobsHandler = jobsHandler;
this.streamResetRecordsHelper = streamResetRecordsHelper;
}

@Post("/cancel")
Expand Down Expand Up @@ -230,4 +235,15 @@ public void persistJobCancellation(final PersistCancelJobRequestBody requestBody
});
}

@Override
@Post("/delete_stream_reset_records")
@Secured({ADMIN})
@ExecuteOn(AirbyteTaskExecutors.IO)
public void deleteStreamResetRecordsForJob(final DeleteStreamResetRecordsForJobRequest requestBody) {
ApiHelper.execute(() -> {
streamResetRecordsHelper.deleteStreamResetRecordsForJob(requestBody.getJobId(), requestBody.getConnectionId());
return null;
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;

import datadog.trace.api.Trace;
import io.airbyte.commons.temporal.StreamResetRecordsHelper;
import io.airbyte.api.client.generated.JobsApi;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.DeleteStreamResetRecordsForJobRequest;
import io.airbyte.commons.temporal.config.WorkerMode;
import io.airbyte.commons.temporal.exception.RetryableException;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Singleton;
Expand All @@ -25,17 +28,24 @@
@Requires(env = WorkerMode.CONTROL_PLANE)
public class StreamResetActivityImpl implements StreamResetActivity {

private final StreamResetRecordsHelper streamResetRecordsHelper;
private final JobsApi jobsApi;

public StreamResetActivityImpl(final StreamResetRecordsHelper streamResetRecordsHelper) {
this.streamResetRecordsHelper = streamResetRecordsHelper;
public StreamResetActivityImpl(final JobsApi jobsApi) {
this.jobsApi = jobsApi;
}

@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
@Override
public void deleteStreamResetRecordsForJob(final DeleteStreamResetRecordsForJobInput input) {
ApmTraceUtils.addTagsToTrace(Map.of(CONNECTION_ID_KEY, input.getConnectionId(), JOB_ID_KEY, input.getJobId()));
streamResetRecordsHelper.deleteStreamResetRecordsForJob(input.getJobId(), input.getConnectionId());

try {
jobsApi.deleteStreamResetRecordsForJob(new DeleteStreamResetRecordsForJobRequest()
.connectionId(input.getConnectionId())
.jobId(input.getJobId()));
} catch (final ApiException e) {
throw new RetryableException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,53 @@

package io.airbyte.workers.temporal.scheduling.activities;

import static org.mockito.ArgumentMatchers.eq;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;

import io.airbyte.commons.temporal.StreamResetRecordsHelper;
import io.airbyte.api.client.generated.JobsApi;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.DeleteStreamResetRecordsForJobRequest;
import io.airbyte.commons.temporal.exception.RetryableException;
import io.airbyte.workers.temporal.scheduling.activities.StreamResetActivity.DeleteStreamResetRecordsForJobInput;
import java.util.UUID;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
class StreamResetActivityTest {

@Mock
private StreamResetRecordsHelper streamResetRecordsHelper;
private JobsApi jobsApi;
@InjectMocks
private StreamResetActivityImpl streamResetActivity;

@Test
void testDeleteStreamResetRecordsForJob() {
void deleteStreamResetRecordsForJobSuccess() throws ApiException {
final DeleteStreamResetRecordsForJobInput input = new DeleteStreamResetRecordsForJobInput(UUID.randomUUID(), Long.valueOf("123"));

final ArgumentCaptor<DeleteStreamResetRecordsForJobRequest> req = ArgumentCaptor.forClass(DeleteStreamResetRecordsForJobRequest.class);

streamResetActivity.deleteStreamResetRecordsForJob(input);
verify(streamResetRecordsHelper).deleteStreamResetRecordsForJob(eq(input.getJobId()), eq(input.getConnectionId()));

verify(jobsApi).deleteStreamResetRecordsForJob(req.capture());
assertEquals(input.getJobId(), req.getValue().getJobId());
assertEquals(input.getConnectionId(), req.getValue().getConnectionId());
}

@Test
void deleteStreamResetRecordsForJobThrowsRetryableException() throws ApiException {
final DeleteStreamResetRecordsForJobInput input = new DeleteStreamResetRecordsForJobInput(UUID.randomUUID(), Long.valueOf("123"));

Mockito.doThrow(new ApiException("bang.")).when(jobsApi).deleteStreamResetRecordsForJob(any());

assertThrows(RetryableException.class, () -> streamResetActivity.deleteStreamResetRecordsForJob(input));
}

}

0 comments on commit 40e58a4

Please sign in to comment.