Skip to content

Commit

Permalink
Merge pull request #122 from caraml-dev/remove-job-api
Browse files Browse the repository at this point in the history
feat: add delete/unschedule job API
  • Loading branch information
khorshuheng authored May 20, 2024
2 parents 5ece7db + 68a26af commit 4938b55
Show file tree
Hide file tree
Showing 11 changed files with 455 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ service JobService {
// Cancel a single job
rpc CancelJob (CancelJobRequest) returns (CancelJobResponse);

// Unschedule a job
rpc UnscheduleJob (UnscheduleJobRequest) returns (UnscheduleJobResponse);

// Get details of a single job
rpc GetJob (GetJobRequest) returns (GetJobResponse);

Expand Down Expand Up @@ -251,6 +254,12 @@ message CancelJobRequest{

message CancelJobResponse {}

message UnscheduleJobRequest {
string job_id = 1;
}

message UnscheduleJobResponse {}

message GetHealthMetricsRequest {
string project = 1;
repeated string table_names = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import dev.caraml.store.protobuf.jobservice.JobServiceGrpc;
import dev.caraml.store.protobuf.jobservice.JobServiceProto;
import dev.caraml.store.protobuf.jobservice.JobServiceProto.CancelJobRequest;
import dev.caraml.store.protobuf.jobservice.JobServiceProto.CancelJobResponse;
import dev.caraml.store.protobuf.jobservice.JobServiceProto.GetHistoricalFeaturesRequest;
import dev.caraml.store.protobuf.jobservice.JobServiceProto.GetHistoricalFeaturesResponse;
import dev.caraml.store.protobuf.jobservice.JobServiceProto.GetJobRequest;
Expand All @@ -16,6 +18,8 @@
import dev.caraml.store.protobuf.jobservice.JobServiceProto.StartOfflineToOnlineIngestionJobResponse;
import dev.caraml.store.protobuf.jobservice.JobServiceProto.StartStreamIngestionJobRequest;
import dev.caraml.store.protobuf.jobservice.JobServiceProto.StartStreamIngestionJobResponse;
import dev.caraml.store.protobuf.jobservice.JobServiceProto.UnscheduleJobRequest;
import dev.caraml.store.protobuf.jobservice.JobServiceProto.UnscheduleJobResponse;
import dev.caraml.store.sparkjob.JobNotFoundException;
import dev.caraml.store.sparkjob.JobService;
import io.grpc.stub.StreamObserver;
Expand Down Expand Up @@ -139,4 +143,19 @@ public void getJob(GetJobRequest request, StreamObserver<GetJobResponse> respons
responseObserver.onNext(response);
responseObserver.onCompleted();
}

@Override
public void cancelJob(
CancelJobRequest request, StreamObserver<CancelJobResponse> responseObserver) {
jobService.cancelJob(request.getJobId());
responseObserver.onNext(CancelJobResponse.getDefaultInstance());
responseObserver.onCompleted();
}

public void unscheduleJob(
UnscheduleJobRequest request, StreamObserver<UnscheduleJobResponse> responseObserver) {
jobService.unscheduleJob(request.getJobId());
responseObserver.onNext(UnscheduleJobResponse.getDefaultInstance());
responseObserver.onCompleted();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,14 @@ public Optional<Job> getJob(String id) {
return sparkOperatorApi.getSparkApplication(namespace, id).map(this::sparkApplicationToJob);
}

public void cancelJob(String id) {
sparkOperatorApi.deleteSparkApplication(namespace, id);
}

public void unscheduleJob(String id) {
sparkOperatorApi.deleteScheduledSparkApplication(namespace, id);
}

private String generateProjectTableHash(String project, String tableName) {
return DigestUtils.md5Hex(String.format("%s:%s", project, tableName));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,9 @@ List<ScheduledSparkApplication> listScheduled(String namespace, String labelSele

Optional<ScheduledSparkApplication> getScheduledSparkApplication(String namespace, String name)
throws SparkOperatorApiException;

void deleteSparkApplication(String namespace, String name) throws SparkOperatorApiException;

void deleteScheduledSparkApplication(String namespace, String name)
throws SparkOperatorApiException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,24 @@ public Optional<ScheduledSparkApplication> getScheduledSparkApplication(
default -> throw new SparkOperatorApiException(resp.getStatus().toString());
};
}

@Override
public void deleteSparkApplication(String namespace, String name)
throws SparkOperatorApiException {
try {
sparkApplicationApi.delete(namespace, name).throwsApiException();
} catch (ApiException e) {
throw new SparkOperatorApiException(e.getMessage());
}
}

@Override
public void deleteScheduledSparkApplication(String namespace, String name)
throws SparkOperatorApiException {
try {
scheduledSparkApplicationApi.delete(namespace, name).throwsApiException();
} catch (ApiException e) {
throw new SparkOperatorApiException(e.getMessage());
}
}
}
428 changes: 276 additions & 152 deletions caraml-store-sdk/go/protos/feast_spark/api/JobService.pb.go

Large diffs are not rendered by default.

39 changes: 39 additions & 0 deletions caraml-store-sdk/go/protos/feast_spark/api/JobService_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions caraml-store-sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
GetHistoricalFeaturesResponse,
GetJobRequest,
ListJobsRequest,
CancelJobRequest,
UnscheduleJobRequest,
Job,
ScheduleOfflineToOnlineIngestionJobRequest, ScheduledJob, ListScheduledJobsRequest, StartStreamIngestionJobResponse,
StartStreamIngestionJobRequest, JobType, INVALID_JOB,
Expand Down Expand Up @@ -427,3 +429,21 @@ def delete_feature_table(self, feature_table: str, project: str):
"""
request = DeleteFeatureTableRequest(name=feature_table, project=project)
self._core_service.DeleteFeatureTable(request)

def cancel_job(self, job_id: str):
"""
Cancel job
Args:
job_id: spark job id
"""
request = CancelJobRequest(job_id=job_id)
self._job_service.CancelJob(request)

def unschedule_job(self, job_id: str):
"""
Unschedule job
Args:
job_id: spark job id
"""
request = UnscheduleJobRequest(job_id=job_id)
self._job_service.UnscheduleJob(request)
Loading

0 comments on commit 4938b55

Please sign in to comment.