Skip to content

Commit

Permalink
SchemaTransformProviderTranslation
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmedabu98 committed Apr 12, 2024
1 parent e33dec6 commit c4a8b58
Show file tree
Hide file tree
Showing 51 changed files with 484 additions and 357 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,9 @@ public String description() {
}

@Override
public Class<GenerateSequenceConfiguration> configurationClass() {
return GenerateSequenceConfiguration.class;
}

@Override
public SchemaTransform from(GenerateSequenceConfiguration configuration) {
return new GenerateSequenceSchemaTransform(configuration);
public SchemaTransform<GenerateSequenceConfiguration> from(
GenerateSequenceConfiguration configuration) {
return new GenerateSequenceSchemaTransform(configuration, identifier());
}

@DefaultSchema(AutoValueSchema.class)
Expand Down Expand Up @@ -163,10 +159,13 @@ public void validate() {
}
}

protected static class GenerateSequenceSchemaTransform extends SchemaTransform {
protected static class GenerateSequenceSchemaTransform
extends SchemaTransform<GenerateSequenceConfiguration> {
private final GenerateSequenceConfiguration configuration;

GenerateSequenceSchemaTransform(GenerateSequenceConfiguration configuration) {
GenerateSequenceSchemaTransform(
GenerateSequenceConfiguration configuration, String identifier) {
super(configuration, identifier);
configuration.validate();
this.configuration = configuration;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,17 @@
*/
package org.apache.beam.sdk.schemas.transforms;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import java.lang.reflect.ParameterizedType;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;

/**
* An abstraction representing schema capable and aware transforms. The interface is intended to be
Expand All @@ -33,5 +41,38 @@
* compatibility guarantees and it should not be implemented outside of the Beam repository.
*/
@Internal
public abstract class SchemaTransform
extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {}
public abstract class SchemaTransform<ConfigT>
extends PTransform<PCollectionRowTuple, PCollectionRowTuple> {
private final Row configurationRow;
private final String identifier;

@SuppressWarnings("unchecked")
protected SchemaTransform(ConfigT configuration, String identifier) {
this.identifier = identifier;
@Nullable
ParameterizedType parameterizedType = (ParameterizedType) getClass().getGenericSuperclass();
checkStateNotNull(parameterizedType, "Could not get the SchemaTransform's parameterized type.");
checkArgument(
parameterizedType.getActualTypeArguments().length == 1,
String.format(
"Expected one parameterized type, but got %s.",
parameterizedType.getActualTypeArguments().length));

Class<ConfigT> typedClass = (Class<ConfigT>) parameterizedType.getActualTypeArguments()[0];

try {
this.configurationRow =
SchemaRegistry.createDefault().getToRowFunction(typedClass).apply(configuration);
} catch (NoSuchSchemaException e) {
throw new RuntimeException("Unable to find schema for this SchemaTransform's config.", e);
}
}

public Row getConfigurationRow() {
return configurationRow;
}

public String getIdentifier() {
return identifier;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ default String description() {
* Produce a {@link SchemaTransform} from some transform-specific configuration object. Can throw
* a {@link InvalidConfigurationException} or a {@link InvalidSchemaException}.
*/
SchemaTransform from(Row configuration);
SchemaTransform<?> from(Row configuration);

/** Returns the input collection names of this transform. */
default List<String> inputCollectionNames() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.schemas.transforms;

import static org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator;

import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.construction.SdkComponents;
import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.checkerframework.checker.nullness.qual.Nullable;

public class SchemaTransformProviderTranslation {
public static class SchemaTransformTranslator
implements TransformPayloadTranslator<SchemaTransform<?>> {
private final String identifier;
private SchemaTransformProvider provider;

public SchemaTransformTranslator(String identifier) {
this.identifier = identifier;
try {
for (SchemaTransformProvider schemaTransformProvider :
ServiceLoader.load(SchemaTransformProvider.class)) {
if (schemaTransformProvider.identifier().equalsIgnoreCase(identifier)) {
if (this.provider != null) {
throw new IllegalArgumentException(
"Found multiple SchemaTransformProvider implementations with the same identifier "
+ identifier);
}
this.provider = schemaTransformProvider;
}
}
if (this.provider == null) {
throw new IllegalArgumentException(
"Could not find SchemaTransformProvider implementation for identifier " + identifier);
}
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
}

@Override
public String getUrn() {
return identifier;
}

@Override
@SuppressWarnings("argument")
public @Nullable FunctionSpec translate(
AppliedPTransform<?, ?, SchemaTransform<?>> application, SdkComponents components)
throws IOException {
return FunctionSpec.newBuilder().setUrn(getUrn()).setPayload(ByteString.empty()).build();
}

@Override
public Row toConfigRow(SchemaTransform<?> transform) {
return transform.getConfigurationRow();
}

@Override
public SchemaTransform<?> fromConfigRow(Row configRow, PipelineOptions options) {
return provider.from(configRow);
}
}

@AutoService(TransformPayloadTranslatorRegistrar.class)
public static class SchemaTransformRegistrar implements TransformPayloadTranslatorRegistrar {
@Override
@SuppressWarnings({
"rawtypes",
})
public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
getTransformPayloadTranslators() {
Map<Class<SchemaTransform>, SchemaTransformTranslator> translators = new HashMap<>();

try {
for (SchemaTransformProvider schemaTransformProvider :
ServiceLoader.load(SchemaTransformProvider.class)) {
translators.put(
SchemaTransform.class,
new SchemaTransformTranslator(schemaTransformProvider.identifier()));
}
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}

return translators;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ protected Class<ConfigT> configurationClass() {
* Produce a SchemaTransform from ConfigT. Can throw a {@link InvalidConfigurationException} or a
* {@link InvalidSchemaException}.
*/
protected abstract SchemaTransform from(ConfigT configuration);
protected abstract SchemaTransform<ConfigT> from(ConfigT configuration);

/**
* List the dependencies needed for this transform. Jars from classpath are used by default when
Expand All @@ -87,7 +87,7 @@ public final Schema configurationSchema() {
}

@Override
public final SchemaTransform from(Row configuration) {
public final SchemaTransform<ConfigT> from(Row configuration) {
return from(configFromRow(configuration));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,8 @@ public class FlattenTransformProvider
protected static final String OUTPUT_ROWS_TAG = "output";

@Override
protected Class<Configuration> configurationClass() {
return Configuration.class;
}

@Override
protected SchemaTransform from(Configuration configuration) {
return new SchemaTransform() {
protected SchemaTransform<Configuration> from(Configuration configuration) {
return new SchemaTransform<Configuration>(configuration, identifier()) {
@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
return PCollectionRowTuple.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,8 @@ public class JavaExplodeTransformProvider
protected static final String OUTPUT_ROWS_TAG = "output";

@Override
protected Class<Configuration> configurationClass() {
return Configuration.class;
}

@Override
protected SchemaTransform from(Configuration configuration) {
return new ExplodeTransform(configuration);
protected SchemaTransform<Configuration> from(Configuration configuration) {
return new ExplodeTransform(configuration, identifier());
}

@Override
Expand Down Expand Up @@ -105,11 +100,11 @@ public abstract static class Builder {
}

/** A {@link SchemaTransform} for Explode. */
protected static class ExplodeTransform extends SchemaTransform {

protected static class ExplodeTransform extends SchemaTransform<Configuration> {
private final Configuration configuration;

ExplodeTransform(Configuration configuration) {
ExplodeTransform(Configuration configuration, String identifier) {
super(configuration, identifier);
this.configuration = configuration;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,8 @@ public class JavaFilterTransformProvider
protected static final String OUTPUT_ROWS_TAG = "output";

@Override
protected Class<Configuration> configurationClass() {
return Configuration.class;
}

@Override
protected SchemaTransform from(Configuration configuration) {
return new JavaFilterTransform(configuration);
protected SchemaTransform<Configuration> from(Configuration configuration) {
return new JavaFilterTransform(configuration, identifier());
}

@Override
Expand Down Expand Up @@ -107,11 +102,12 @@ public abstract static class Builder {
}

/** A {@link SchemaTransform} for Filter-java. */
protected static class JavaFilterTransform extends SchemaTransform {
protected static class JavaFilterTransform extends SchemaTransform<Configuration> {

private final Configuration configuration;

JavaFilterTransform(Configuration configuration) {
JavaFilterTransform(Configuration configuration, String identifier) {
super(configuration, identifier);
this.configuration = configuration;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,8 @@ public class JavaMapToFieldsTransformProvider
protected static final String OUTPUT_ROWS_TAG = "output";

@Override
protected Class<Configuration> configurationClass() {
return Configuration.class;
}

@Override
protected SchemaTransform from(Configuration configuration) {
return new JavaMapToFieldsTransform(configuration);
protected SchemaTransform<Configuration> from(Configuration configuration) {
return new JavaMapToFieldsTransform(configuration, identifier());
}

@Override
Expand Down Expand Up @@ -119,11 +114,12 @@ public abstract static class Builder {
}

/** A {@link SchemaTransform} for MapToFields-java. */
protected static class JavaMapToFieldsTransform extends SchemaTransform {
protected static class JavaMapToFieldsTransform extends SchemaTransform<Configuration> {

private final Configuration configuration;

JavaMapToFieldsTransform(Configuration configuration) {
JavaMapToFieldsTransform(Configuration configuration, String identifier) {
super(configuration, identifier);
this.configuration = configuration;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,8 @@ public class LoggingTransformProvider
protected static final String OUTPUT_ROWS_TAG = "output";

@Override
protected Class<Configuration> configurationClass() {
return Configuration.class;
}

@Override
protected SchemaTransform from(Configuration configuration) {
return new LoggingTransform(configuration);
protected SchemaTransform<Configuration> from(Configuration configuration) {
return new LoggingTransform(configuration, identifier());
}

@Override
Expand Down Expand Up @@ -134,13 +129,14 @@ public abstract static class Builder {
}

/** A {@link SchemaTransform} for logging. */
protected static class LoggingTransform extends SchemaTransform {
protected static class LoggingTransform extends SchemaTransform<Configuration> {

private static final Logger LOG = LoggerFactory.getLogger(LoggingTransform.class);

private final Configuration configuration;

LoggingTransform(Configuration configuration) {
LoggingTransform(Configuration configuration, String identifier) {
super(configuration, identifier);
this.configuration = configuration;
}

Expand Down
Loading

0 comments on commit c4a8b58

Please sign in to comment.