Skip to content

Commit

Permalink
ensured to send summary event in case of partial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Aditya Pratap Singh committed Dec 9, 2024
1 parent 3db4063 commit 90d2af9
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.PropertiesUtils;
import org.apache.gobblin.util.executors.IteratorExecutor;
import org.apache.gobblin.temporal.exception.FailedDatasetUrnsException;


@Slf4j
public class CommitActivityImpl implements CommitActivity {
Expand Down Expand Up @@ -97,12 +99,20 @@ public CommitStats commit(WUProcessingSpec workSpec) {
Map<String, JobState.DatasetState> datasetStatesByUrns = jobState.calculateDatasetStatesByUrns(ImmutableList.copyOf(taskStates), Lists.newArrayList());
TaskState firstTaskState = taskStates.get(0);
log.info("TaskState (commit) [{}] (**first of {}**): {}", firstTaskState.getTaskId(), taskStates.size(), firstTaskState.toJsonString(true));
commitTaskStates(jobState, datasetStatesByUrns, jobContext);
CommitStats commitStats = CommitStats.createEmpty();
try {
commitTaskStates(jobState, datasetStatesByUrns, jobContext);
} catch (FailedDatasetUrnsException exception) {
log.info("Some datasets failed to be committed, proceeding with publishing commit step");
commitStats.setOptFailure(Optional.of(exception));
}

boolean shouldIncludeFailedTasks = PropertiesUtils.getPropAsBoolean(jobState.getProperties(), ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS, "false");

Map<String, DatasetStats> datasetTaskSummaries = summarizeDatasetOutcomes(datasetStatesByUrns, jobContext.getJobCommitPolicy(), shouldIncludeFailedTasks);
return new CommitStats(datasetTaskSummaries, datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum());
return commitStats.setDatasetStats(datasetTaskSummaries)
.setNumCommittedWorkUnits(
datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum());
} catch (Exception e) {
//TODO: IMPROVE GRANULARITY OF RETRIES
throw ApplicationFailure.newNonRetryableFailureWithCause(
Expand Down Expand Up @@ -164,8 +174,8 @@ public Callable<Void> apply(final Map.Entry<String, JobState.DatasetState> entry

if (!failedDatasetUrns.isEmpty()) {
String allFailedDatasets = String.join(", ", failedDatasetUrns);
log.error("Failed to commit dataset state for dataset(s) {}" + allFailedDatasets);
throw new IOException("Failed to commit dataset state for " + allFailedDatasets);
log.error("Failed to commit dataset state for dataset(s) {}", allFailedDatasets);
throw new FailedDatasetUrnsException(allFailedDatasets);
}
if (!IteratorExecutor.verifyAllSuccessful(result)) {
// TODO: propagate cause of failure and determine whether or not this is retryable to throw a non-retryable failure exception
Expand Down Expand Up @@ -205,7 +215,7 @@ private Map<String, DatasetStats> summarizeDatasetOutcomes(Map<String, JobState.
// Only process successful datasets unless configuration to process failed datasets is set
for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {
if (datasetState.getState() == JobState.RunningState.COMMITTED || (datasetState.getState() == JobState.RunningState.FAILED
&& commitPolicy == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS)) {
&& (commitPolicy == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS || commitPolicy == JobCommitPolicy.COMMIT_ON_PARTIAL_SUCCESS))) {
long totalBytesWritten = 0;
long totalRecordsWritten = 0;
int totalCommittedTasks = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.experimental.Accessors;


/**
Expand All @@ -34,11 +36,13 @@
@Data
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
@RequiredArgsConstructor
@Accessors(chain = true)
public class CommitStats {
@NonNull private Map<String, DatasetStats> datasetStats;
@NonNull private int numCommittedWorkUnits;
@NonNull private Optional<Exception> optFailure;

public static CommitStats createEmpty() {
return new CommitStats(new HashMap<>(), 0);
return new CommitStats(new HashMap<>(), 0, Optional.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import io.temporal.activity.ActivityOptions;
import io.temporal.common.RetryOptions;
import io.temporal.failure.ApplicationFailure;
import io.temporal.workflow.Workflow;

import java.time.Duration;
Expand All @@ -36,6 +37,7 @@
import org.apache.gobblin.temporal.ddm.work.DatasetStats;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow;
import org.apache.gobblin.temporal.exception.FailedDatasetUrnsException;
import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;


Expand All @@ -59,10 +61,19 @@ public class CommitStepWorkflowImpl implements CommitStepWorkflow {
@Override
public CommitStats commit(WUProcessingSpec workSpec) {
CommitStats commitGobblinStats = activityStub.commit(workSpec);
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
.withMetadata(TimingEvent.DATASET_TASK_SUMMARIES, GsonUtils.GSON_WITH_DATE_HANDLING.toJson(convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats())))
.submit();

if(!commitGobblinStats.getOptFailure().isPresent() || commitGobblinStats.getNumCommittedWorkUnits() > 0) {
TemporalEventTimer.Factory timerFactory = new TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
.withMetadata(TimingEvent.DATASET_TASK_SUMMARIES, GsonUtils.GSON_WITH_DATE_HANDLING.toJson(
convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats())))
.submit();
}
if(commitGobblinStats.getOptFailure().isPresent()){
throw ApplicationFailure.newNonRetryableFailureWithCause(
String.format("Failed to commit dataset state for some dataset(s)"), FailedDatasetUrnsException.class.toString(),
commitGobblinStats.getOptFailure().get());
}
return commitGobblinStats;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.temporal.exception;

import java.io.IOException;

/**
* An exception thrown when a set of dataset URNs fail to be processed.
*/
public class FailedDatasetUrnsException extends IOException {


/**
* Creates a new instance of this exception with the failed dataset URNs.
*
* @param failedDatasetUrns the String of failed dataset URNs joined by comma
*/
public FailedDatasetUrnsException(String failedDatasetUrns) {
super("Failed to process the following dataset URNs: " + failedDatasetUrns);
}

}

0 comments on commit 90d2af9

Please sign in to comment.