Skip to content

Commit

Permalink
Refactor StateFetcher (apache#28755)
Browse files Browse the repository at this point in the history
Refactor and cleanup of StateFetcher in preparation for future changes
  • Loading branch information
m-trieu authored and Kanishk Karanawat committed Oct 21, 2023
1 parent 09035c9 commit 247b26a
Show file tree
Hide file tree
Showing 13 changed files with 626 additions and 420 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import org.apache.beam.runners.dataflow.worker.streaming.WeightedBoundedQueue;
import org.apache.beam.runners.dataflow.worker.streaming.Work;
import org.apache.beam.runners.dataflow.worker.streaming.Work.State;
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter;
Expand Down Expand Up @@ -228,7 +229,7 @@ public class StreamingDataflowWorker {
private final Thread commitThread;
private final AtomicLong activeCommitBytes = new AtomicLong();
private final AtomicBoolean running = new AtomicBoolean();
private final StateFetcher stateFetcher;
private final SideInputStateFetcher sideInputStateFetcher;
private final StreamingDataflowWorkerOptions options;
private final boolean windmillServiceEnabled;
private final long clientId;
Expand Down Expand Up @@ -406,7 +407,7 @@ public void run() {
this.metricTrackingWindmillServer =
new MetricTrackingWindmillServerStub(windmillServer, memoryMonitor, windmillServiceEnabled);
this.metricTrackingWindmillServer.start();
this.stateFetcher = new StateFetcher(metricTrackingWindmillServer);
this.sideInputStateFetcher = new SideInputStateFetcher(metricTrackingWindmillServer);
this.clientId = clientIdGenerator.nextLong();

for (MapTask mapTask : mapTasks) {
Expand Down Expand Up @@ -1078,7 +1079,7 @@ public void close() {
}
};
});
StateFetcher localStateFetcher = stateFetcher.byteTrackingView();
SideInputStateFetcher localSideInputStateFetcher = sideInputStateFetcher.byteTrackingView();

// If the read output KVs, then we can decode Windmill's byte key into a userland
// key object and provide it to the execution context for use with per-key state.
Expand Down Expand Up @@ -1114,7 +1115,7 @@ public void close() {
outputDataWatermark,
synchronizedProcessingTime,
stateReader,
localStateFetcher,
localSideInputStateFetcher,
outputBuilder);

// Blocks while executing work.
Expand Down Expand Up @@ -1184,7 +1185,7 @@ public void close() {
shuffleBytesRead += message.getSerializedSize();
}
}
long stateBytesRead = stateReader.getBytesRead() + localStateFetcher.getBytesRead();
long stateBytesRead = stateReader.getBytesRead() + localSideInputStateFetcher.getBytesRead();
windmillShuffleBytesRead.addValue(shuffleBytesRead);
windmillStateBytesRead.addValue(stateBytesRead);
windmillStateBytesWritten.addValue(stateBytesWritten);
Expand Down
Loading

0 comments on commit 247b26a

Please sign in to comment.