Skip to content

Commit

Permalink
make the sampler consistent across work items so that it actually tra…
Browse files Browse the repository at this point in the history
…cks multiple trackers
  • Loading branch information
clmccart committed Dec 1, 2023
1 parent be9da8a commit 8690565
Showing 1 changed file with 5 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import java.util.function.Supplier;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
import org.apache.beam.runners.core.metrics.MetricsLogger;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.internal.CustomSources;
Expand Down Expand Up @@ -287,6 +286,8 @@ public class StreamingDataflowWorker {
// Possibly overridden by streaming engine config.
private int maxWorkItemCommitBytes = Integer.MAX_VALUE;

private DataflowExecutionStateSampler sampler = DataflowExecutionStateSampler.instance();

@VisibleForTesting
StreamingDataflowWorker(
List<MapTask> mapTasks,
Expand Down Expand Up @@ -575,7 +576,7 @@ public void start() {
memoryMonitorThread.start();
dispatchThread.start();
commitThread.start();
ExecutionStateSampler.instance().start();
sampler.start();

// Periodically report workers counters and other updates.
globalWorkerUpdatesTimer = executorSupplier.apply("GlobalWorkerUpdatesTimer");
Expand Down Expand Up @@ -993,7 +994,7 @@ private void process(
(InstructionOutputNode) Iterables.getOnlyElement(mapTaskNetwork.successors(readNode));
DataflowExecutionContext.DataflowExecutionStateTracker executionStateTracker =
new DataflowExecutionContext.DataflowExecutionStateTracker(
ExecutionStateSampler.instance(),
sampler,
stageInfo
.executionStateRegistry()
.getState(
Expand Down Expand Up @@ -1292,6 +1293,7 @@ public void close() {
stageInfo.timerProcessingMsecs().addValue(processingTimeMsecs);
}

sampler.resetForWorkId(constructWorkId(work.getWorkItem()));
DataflowWorkerLoggingMDC.setWorkId(null);
DataflowWorkerLoggingMDC.setStageName(null);
}
Expand Down

0 comments on commit 8690565

Please sign in to comment.