Skip to content

Commit

Permalink
Report source bytes processed for custom sources (#26477)
Browse files Browse the repository at this point in the history
Authored-by: RuiLong Jiang <[email protected]>
  • Loading branch information
AMOOOMA authored May 9, 2023
1 parent 25f9a28 commit f70e9bf
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import org.apache.beam.runners.dataflow.worker.status.WorkerStatusPages;
import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter;
import org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
Expand Down Expand Up @@ -1252,6 +1253,7 @@ private void process(
mapTask.getStageName(), s -> new StageInfo(s, mapTask.getSystemName(), this));

ExecutionState executionState = null;
String counterName = "dataflow_source_bytes_processed-" + mapTask.getSystemName();

try {
executionState = computationState.getExecutionStateQueue().poll();
Expand Down Expand Up @@ -1330,13 +1332,14 @@ private void process(
readNode.getParallelInstruction().getSystemName(),
readNode.getParallelInstruction().getName());
readOperation.receivers[0].addOutputCounter(
counterName,
new OutputObjectAndByteCounter(
new IntrinsicMapTaskExecutorFactory.ElementByteSizeObservableCoder<>(
readCoder),
mapTaskExecutor.getOutputCounters(),
nameContext)
.setSamplingPeriod(100)
.countBytes("dataflow_input_size-" + mapTask.getSystemName()));
.countBytes(counterName));
}
executionState =
new ExecutionState(mapTaskExecutor, context, keyCoder, executionStateTracker);
Expand Down Expand Up @@ -1400,6 +1403,23 @@ public void close() {
// Blocks while executing work.
executionState.getWorkExecutor().execute();

// Reports source bytes processed to workitemcommitrequest if available.
try {
long sourceBytesProcessed = 0;
HashMap<String, ElementCounter> counters =
((DataflowMapTaskExecutor) executionState.getWorkExecutor())
.getReadOperation()
.receivers[0]
.getOutputCounters();
if (counters.containsKey(counterName)) {
sourceBytesProcessed =
((OutputObjectAndByteCounter) counters.get(counterName)).getByteCount().getAndReset();
}
outputBuilder.setSourceBytesProcessed(sourceBytesProcessed);
} catch (Exception e) {
LOG.error(e.toString());
}

Iterables.addAll(
this.pendingMonitoringInfos, executionState.getWorkExecutor().extractMetricUpdates());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.runners.dataflow.worker.util.common.worker;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

/**
Expand All @@ -26,20 +27,24 @@
*/
public class OutputReceiver implements Receiver {
private final List<Receiver> outputs = new ArrayList<>();
private final List<ElementCounter> outputCounters = new ArrayList<>();
private final HashMap<String, ElementCounter> outputCounters = new HashMap<>();

/** Adds a new receiver that this OutputReceiver forwards to. */
public void addOutput(Receiver receiver) {
outputs.add(receiver);
}

public void addOutputCounter(ElementCounter outputCounter) {
outputCounters.add(outputCounter);
outputCounters.put(Integer.toString(outputCounters.size()), outputCounter);
}

public void addOutputCounter(String counterName, ElementCounter outputCounter) {
outputCounters.put(counterName, outputCounter);
}

@Override
public void process(Object elem) throws Exception {
for (ElementCounter counter : outputCounters) {
for (ElementCounter counter : outputCounters.values()) {
counter.update(elem);
}

Expand All @@ -50,11 +55,15 @@ public void process(Object elem) throws Exception {
}
}

for (ElementCounter counter : outputCounters) {
for (ElementCounter counter : outputCounters.values()) {
counter.finishLazyUpdate(elem);
}
}

public HashMap<String, ElementCounter> getOutputCounters() {
return this.outputCounters;
}

/** Invoked by tests only. */
public int getReceiverCount() {
return outputs.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,13 +603,14 @@ private WorkItemCommitRequest.Builder makeExpectedOutput(
private WorkItemCommitRequest.Builder makeExpectedTruncationRequestOutput(
int index, String key, long shardingKey, long estimatedSize) throws Exception {
StringBuilder expectedCommitRequestBuilder =
initializeExpectedCommitRequest(key, shardingKey, index);
initializeExpectedCommitRequest(key, shardingKey, index, false);
appendCommitTruncationFields(expectedCommitRequestBuilder, estimatedSize);

return parseCommitRequest(expectedCommitRequestBuilder.toString());
}

private StringBuilder initializeExpectedCommitRequest(String key, long shardingKey, int index) {
private StringBuilder initializeExpectedCommitRequest(
String key, long shardingKey, int index, Boolean hasSourceBytesProcessed) {
StringBuilder requestBuilder = new StringBuilder();

requestBuilder.append("key: \"");
Expand All @@ -622,10 +623,15 @@ private StringBuilder initializeExpectedCommitRequest(String key, long shardingK
requestBuilder.append(index);
requestBuilder.append(" ");
requestBuilder.append("cache_token: 3 ");
if (hasSourceBytesProcessed) requestBuilder.append("source_bytes_processed: 0 ");

return requestBuilder;
}

private StringBuilder initializeExpectedCommitRequest(String key, long shardingKey, int index) {
return initializeExpectedCommitRequest(key, shardingKey, index, true);
}

private StringBuilder appendCommitOutputMessages(
StringBuilder requestBuilder, int index, long timestamp, String outKey) {
requestBuilder.append("output_messages {");
Expand Down Expand Up @@ -2370,6 +2376,7 @@ public void testUnboundedSources() throws Exception {
+ "work_token: 1 "
+ "cache_token: 1 "
+ "source_backlog_bytes: 7 "
+ "source_bytes_processed: 18 "
+ "output_messages {"
+ " destination_stream_id: \"out\""
+ " bundles {"
Expand All @@ -2389,9 +2396,6 @@ public void testUnboundedSources() throws Exception {
+ "source_watermark: 1000"))
.build()));

assertEquals(
18L, splitIntToLong(getCounter(counters, "dataflow_input_size-computation").getInteger()));

// Test same key continuing. The counter is done.
server
.whenGetWorkCalled()
Expand Down Expand Up @@ -2429,6 +2433,7 @@ public void testUnboundedSources() throws Exception {
+ "work_token: 2 "
+ "cache_token: 1 "
+ "source_backlog_bytes: 7 "
+ "source_bytes_processed: 0 "
+ "source_state_updates {"
+ " state: \"\000\""
+ " finalize_ids: "
Expand Down Expand Up @@ -2476,6 +2481,7 @@ public void testUnboundedSources() throws Exception {
+ "work_token: 3 "
+ "cache_token: 2 "
+ "source_backlog_bytes: 7 "
+ "source_bytes_processed: 0 "
+ "source_state_updates {"
+ " state: \"\000\""
+ " finalize_ids: "
Expand Down Expand Up @@ -2537,6 +2543,7 @@ public void testUnboundedSourcesDrain() throws Exception {
+ "work_token: 2 "
+ "cache_token: 3 "
+ "source_backlog_bytes: 7 "
+ "source_bytes_processed: 18 "
+ "output_messages {"
+ " destination_stream_id: \"out\""
+ " bundles {"
Expand Down Expand Up @@ -2646,6 +2653,7 @@ public void testUnboundedSourceWorkRetry() throws Exception {
+ "work_token: 1 "
+ "cache_token: 1 "
+ "source_backlog_bytes: 7 "
+ "source_bytes_processed: 18 "
+ "output_messages {"
+ " destination_stream_id: \"out\""
+ " bundles {"
Expand All @@ -2666,8 +2674,6 @@ public void testUnboundedSourceWorkRetry() throws Exception {
.build();

assertThat(commit, equalTo(expectedCommit));
assertEquals(
18L, splitIntToLong(getCounter(counters, "dataflow_input_size-computation").getInteger()));

// Test retry of work item, it should return the same result and not start the reader from the
// position it was left at.
Expand Down Expand Up @@ -2719,6 +2725,7 @@ public void testUnboundedSourceWorkRetry() throws Exception {
+ "work_token: 2 "
+ "cache_token: 1 "
+ "source_backlog_bytes: 7 "
+ "source_bytes_processed: 0 "
+ "source_state_updates {"
+ " state: \"\000\""
+ " finalize_ids: "
Expand Down Expand Up @@ -3060,8 +3067,9 @@ public void testExceptionInvalidatesCache() throws Exception {

assertThat(
// The commit will include a timer to clean up state - this timer is irrelevant
// for the current test.
setValuesTimestamps(commit.toBuilder().clearOutputTimers()).build(),
// for the current test. Also remove source_bytes_processed because it's dynamic.
setValuesTimestamps(commit.toBuilder().clearOutputTimers().clearSourceBytesProcessed())
.build(),
equalTo(
setMessagesMetadata(
PaneInfo.NO_FIRING,
Expand Down

0 comments on commit f70e9bf

Please sign in to comment.