diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java index 7d45295b2d8c..080fa7c9dac4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java @@ -29,6 +29,8 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; +import javax.annotation.concurrent.GuardedBy; import org.apache.beam.runners.core.NullSideInputReader; import org.apache.beam.runners.core.SideInputReader; import org.apache.beam.runners.core.StepContext; @@ -177,6 +179,7 @@ protected abstract SideInputReader getSideInputReaderForViews( /** Dataflow specific {@link StepContext}. */ public abstract static class DataflowStepContext implements StepContext { + private final NameContext nameContext; public DataflowStepContext(NameContext nameContext) { @@ -253,10 +256,13 @@ public static class DataflowExecutionStateTracker extends ExecutionStateTracker * Metadata on the message whose processing is currently being managed by this tracker. If no * message is actively being processed, activeMessageMetadata will be null. */ - @Nullable private ActiveMessageMetadata activeMessageMetadata = null; + @GuardedBy("this") + @Nullable + private ActiveMessageMetadata activeMessageMetadata = null; private final MillisProvider clock = System::currentTimeMillis; + @GuardedBy("this") private final Map processingTimesByStep = new HashMap<>(); public DataflowExecutionStateTracker( @@ -313,20 +319,19 @@ public Closeable enterState(ExecutionState newState) { if (isDataflowProcessElementState) { DataflowExecutionState newDFState = (DataflowExecutionState) newState; if (newDFState.getStepName() != null && newDFState.getStepName().userName() != null) { - if (this.activeMessageMetadata != null) { - recordActiveMessageInProcessingTimesMap(); + recordActiveMessageInProcessingTimesMap(); + synchronized (this) { + this.activeMessageMetadata = + ActiveMessageMetadata.create( + newDFState.getStepName().userName(), clock.getMillis()); } - this.activeMessageMetadata = - ActiveMessageMetadata.create(newDFState.getStepName().userName(), clock.getMillis()); } elementExecutionTracker.enter(newDFState.getStepName()); } return () -> { if (isDataflowProcessElementState) { - if (this.activeMessageMetadata != null) { - recordActiveMessageInProcessingTimesMap(); - } + recordActiveMessageInProcessingTimesMap(); elementExecutionTracker.exit(); } baseCloseable.close(); @@ -337,12 +342,21 @@ public String getWorkItemId() { return this.workItemId; } - public Optional getActiveMessageMetadata() { + public synchronized Optional getActiveMessageMetadata() { return Optional.ofNullable(activeMessageMetadata); } - public Map getProcessingTimesByStepCopy() { - Map processingTimesCopy = processingTimesByStep; + public synchronized Map getProcessingTimesByStepCopy() { + Map processingTimesCopy = + processingTimesByStep.entrySet().stream() + .collect( + Collectors.toMap( + e -> e.getKey(), + e -> { + IntSummaryStatistics clone = new IntSummaryStatistics(); + clone.combine(e.getValue()); + return clone; + })); return processingTimesCopy; } @@ -351,17 +365,19 @@ public Map getProcessingTimesByStepCopy() { * processing times map. Sets the activeMessageMetadata to null after the entry has been * recorded. */ - private void recordActiveMessageInProcessingTimesMap() { + private synchronized void recordActiveMessageInProcessingTimesMap() { if (this.activeMessageMetadata == null) { return; } + int processingTime = + (int) (System.currentTimeMillis() - this.activeMessageMetadata.startTime()); this.processingTimesByStep.compute( this.activeMessageMetadata.userStepName(), (k, v) -> { if (v == null) { v = new IntSummaryStatistics(); } - v.accept((int) (System.currentTimeMillis() - this.activeMessageMetadata.startTime())); + v.accept(processingTime); return v; }); this.activeMessageMetadata = null;