Skip to content

Commit

Permalink
Merge pull request #120 from caraml-dev/add-job-type-filter-list-job
Browse files Browse the repository at this point in the history
feat: add job type filter to list job
  • Loading branch information
khorshuheng authored May 17, 2024
2 parents 512e7a6 + 7348579 commit b7163a3
Show file tree
Hide file tree
Showing 7 changed files with 240 additions and 217 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ message ListJobsRequest {
bool include_terminated = 1;
string table_name = 2;
string project = 3;
JobType type = 4;
}

message ListScheduledJobsRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,10 @@ public void getHistoricalFeatures(
public void listJobs(ListJobsRequest request, StreamObserver<ListJobsResponse> responseObserver) {
List<Job> jobs =
jobService.listJobs(
request.getIncludeTerminated(), request.getProject(), request.getTableName());
request.getIncludeTerminated(),
request.getProject(),
request.getTableName(),
request.getType());
ListJobsResponse response = ListJobsResponse.newBuilder().addAllJobs(jobs).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,18 +488,22 @@ public Job createRetrievalJob(
return sparkApplicationToJob(sparkOperatorApi.create(app));
}

public List<Job> listJobs(Boolean includeTerminated, String project, String tableName) {
Stream<String> equalitySelectors =
Map.of(
PROJECT_LABEL,
project,
FEATURE_TABLE_HASH_LABEL,
generateProjectTableHash(project, tableName))
.entrySet()
.stream()
.filter(es -> !es.getValue().isEmpty())
.map(es -> String.format("%s=%s", es.getKey(), es.getValue()));
String labelSelectors = equalitySelectors.collect(Collectors.joining(","));
public List<Job> listJobs(
Boolean includeTerminated, String project, String tableName, JobType jobType) {
Map<String, String> selectorMap = new HashMap<>();
if (!project.isEmpty()) {
selectorMap.put(PROJECT_LABEL, project);
}
if (!tableName.isEmpty() && !project.isEmpty()) {
selectorMap.put(FEATURE_TABLE_HASH_LABEL, generateProjectTableHash(project, tableName));
}
if (jobType != JobType.INVALID_JOB) {
selectorMap.put(JOB_TYPE_LABEL, jobType.toString());
}
String labelSelectors =
selectorMap.entrySet().stream()
.map(es -> String.format("%s=%s", es.getKey(), es.getValue()))
.collect(Collectors.joining(","));
Stream<Job> jobStream =
sparkOperatorApi.list(namespace, labelSelectors).stream().map(this::sparkApplicationToJob);
if (!includeTerminated) {
Expand All @@ -509,17 +513,17 @@ public List<Job> listJobs(Boolean includeTerminated, String project, String tabl
}

public List<ScheduledJob> listScheduledJobs(String project, String tableName) {
String labelSelectors = "";
Map<String, String> selectorMap = new HashMap<>();
if (!project.isEmpty()) {
selectorMap.put(PROJECT_LABEL, project);
}
if (!tableName.isEmpty()) {
selectorMap.put(FEATURE_TABLE_LABEL, tableName);
if (!tableName.isEmpty() && !project.isEmpty()) {
selectorMap.put(FEATURE_TABLE_HASH_LABEL, generateProjectTableHash(project, tableName));
}
selectorMap.entrySet().stream()
.map(es -> String.format("%s=%s", es.getKey(), es.getValue()))
.collect(Collectors.joining(","));
String labelSelectors =
selectorMap.entrySet().stream()
.map(es -> String.format("%s=%s", es.getKey(), es.getValue()))
.collect(Collectors.joining(","));
return sparkOperatorApi.listScheduled(namespace, labelSelectors).stream()
.map(this::scheduledSparkApplicationToScheduledJob)
.toList();
Expand Down
Loading

0 comments on commit b7163a3

Please sign in to comment.