Skip to content

Commit

Permalink
Revert "Allow dropLataData in GBK for SamzaRunner (apache#28461)"
Browse files Browse the repository at this point in the history
This reverts commit 6765bf9.
  • Loading branch information
ajothomas committed Feb 28, 2024
1 parent 0b94f58 commit d334b91
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,4 @@ public ExecutorService create(PipelineOptions options) {
new ThreadFactoryBuilder().setNameFormat("Process Element Thread-%d").build());
}
}

@Description("Enable/disable late data dropping in GroupByKey/Combine transforms")
@Default.Boolean(false)
boolean getDropLateData();

void setDropLateData(boolean dropLateData);
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,19 +180,11 @@ public TimerInternals timerInternals() {
DoFnSchemaInformation.create(),
Collections.emptyMap());

final DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> dropLateDataRunner =
pipelineOptions.getDropLateData()
? DoFnRunners.lateDataDroppingRunner(
doFnRunner, keyedInternals.timerInternals(), windowingStrategy)
: doFnRunner;

final SamzaExecutionContext executionContext =
(SamzaExecutionContext) context.getApplicationContainerContext();
final DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFnRunnerWithMetrics =
this.fnRunner =
DoFnRunnerWithMetrics.wrap(
dropLateDataRunner, executionContext.getMetricsContainer(), transformFullName);

this.fnRunner = new DoFnRunnerWithKeyedInternals<>(doFnRunnerWithMetrics, keyedInternals);
doFnRunner, executionContext.getMetricsContainer(), transformFullName);
}

@Override
Expand Down

This file was deleted.

0 comments on commit d334b91

Please sign in to comment.