Skip to content

Commit

Permalink
Change location of DataflowGBK, delete DataflowGBKTranslation
Browse files Browse the repository at this point in the history
  • Loading branch information
celeste-zeng authored Jun 14, 2024
1 parent 3b94fa4 commit 1a02260
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory.ParDoSingle;
import org.apache.beam.runners.dataflow.TransformTranslator.StepTranslationContext;
import org.apache.beam.runners.dataflow.TransformTranslator.TranslationContext;
import org.apache.beam.runners.dataflow.internal.DataflowGroupByKey;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.CloudObject;
import org.apache.beam.runners.dataflow.util.CloudObjects;
Expand All @@ -73,7 +74,6 @@
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DataflowGroupByKey;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.Flatten;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
package org.apache.beam.runners.dataflow;

import java.util.Collections;
import org.apache.beam.runners.dataflow.internal.DataflowGroupByKey;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.runners.PTransformOverrideFactory.PTransformReplacement;
import org.apache.beam.sdk.transforms.DataflowGroupByKey;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.Element;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.transforms;
package org.apache.beam.runners.dataflow.internal;

import com.google.auto.service.AutoService;
import java.util.Collections;
import java.util.Map;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.util.construction.PTransformTranslation;
import org.apache.beam.sdk.util.construction.SdkComponents;
import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
Expand Down Expand Up @@ -154,4 +163,32 @@ static <K, V> Coder<Iterable<V>> getOutputValueCoder(Coder<KV<K, V>> inputCoder)
public static <K, V> KvCoder<K, Iterable<V>> getOutputKvCoder(Coder<KV<K, V>> inputCoder) {
return KvCoder.of(getKeyCoder(inputCoder), getOutputValueCoder(inputCoder));
}

static class DataflowGroupByKeyTranslator
implements PTransformTranslation.TransformPayloadTranslator<DataflowGroupByKey<?, ?>> {
@Override
public String getUrn() {
return PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN;
}

@Override
@SuppressWarnings("nullness")
public RunnerApi.FunctionSpec translate(
AppliedPTransform<?, ?, DataflowGroupByKey<?, ?>> transform, SdkComponents components) {
return RunnerApi.FunctionSpec.newBuilder().setUrn(getUrn(transform.getTransform())).build();
}
}

/** Registers {@link DataflowGroupByKeyTranslator}. */
@AutoService(TransformPayloadTranslatorRegistrar.class)
public static class Registrar implements TransformPayloadTranslatorRegistrar {
@Override
@SuppressWarnings("rawtypes")
public Map<
? extends Class<? extends PTransform>,
? extends PTransformTranslation.TransformPayloadTranslator>
getTransformPayloadTranslators() {
return Collections.singletonMap(DataflowGroupByKey.class, new DataflowGroupByKeyTranslator());
}
}
}

This file was deleted.

0 comments on commit 1a02260

Please sign in to comment.