Skip to content

Commit

Permalink
Merge pull request #27991: Refactor ConvertHelpers in "core" to not d…
Browse files Browse the repository at this point in the history
…epend on Avro
  • Loading branch information
aromanenko-dev authored Sep 7, 2023
2 parents 61f0184 + 96f3dfb commit 8037b06
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Type;
import java.util.ServiceLoader;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.asm.AsmVisitorWrapper;
import net.bytebuddy.description.type.TypeDescription;
Expand All @@ -35,7 +36,6 @@
import net.bytebuddy.implementation.bytecode.member.MethodVariableAccess;
import net.bytebuddy.jar.asm.ClassWriter;
import net.bytebuddy.matcher.ElementMatchers;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.schemas.JavaFieldSchema.JavaFieldTypeSupplier;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
Expand All @@ -45,7 +45,6 @@
import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.TypeConversionsFactory;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Primitives;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand All @@ -58,6 +57,11 @@
"rawtypes"
})
public class ConvertHelpers {
private static class SchemaInformationProviders {
private static final ServiceLoader<SchemaInformationProvider> INSTANCE =
ServiceLoader.load(SchemaInformationProvider.class);
}

private static final Logger LOG = LoggerFactory.getLogger(ConvertHelpers.class);

/** Return value after converting a schema. */
Expand All @@ -80,17 +84,17 @@ public ConvertedSchemaInformation(
/** Get the coder used for converting from an inputSchema to a given type. */
public static <T> ConvertedSchemaInformation<T> getConvertedSchemaInformation(
Schema inputSchema, TypeDescriptor<T> outputType, SchemaRegistry schemaRegistry) {
ConvertedSchemaInformation<T> convertedSchema = null;
if (outputType.equals(TypeDescriptor.of(Row.class))) {
// If the output is of type Row, then just forward the schema of the input type to the
// output.
convertedSchema =
new ConvertedSchemaInformation<>((SchemaCoder<T>) SchemaCoder.of(inputSchema), null);
} else if (outputType.equals(TypeDescriptor.of(GenericRecord.class))) {
convertedSchema =
new ConvertedSchemaInformation<T>(
(SchemaCoder<T>) AvroUtils.schemaCoder(AvroUtils.toAvroSchema(inputSchema)), null);
} else {

ConvertedSchemaInformation<T> schemaInformation = null;
// Try to load schema information from loaded providers
for (SchemaInformationProvider provider : SchemaInformationProviders.INSTANCE) {
schemaInformation = provider.getConvertedSchemaInformation(inputSchema, outputType);
if (schemaInformation != null) {
return schemaInformation;
}
}

if (schemaInformation == null) {
// Otherwise, try to find a schema for the output type in the schema registry.
Schema outputSchema = null;
SchemaCoder<T> outputSchemaCoder = null;
Expand Down Expand Up @@ -129,9 +133,9 @@ public static <T> ConvertedSchemaInformation<T> getConvertedSchemaInformation(
+ outputSchema);
}
}
convertedSchema = new ConvertedSchemaInformation<T>(outputSchemaCoder, unboxedType);
schemaInformation = new ConvertedSchemaInformation<T>(outputSchemaCoder, unboxedType);
}
return convertedSchema;
return schemaInformation;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.schemas.utils;

import com.google.auto.service.AutoService;
import javax.annotation.Nullable;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;

@AutoService(SchemaInformationProvider.class)
public class RowSchemaInformationProvider implements SchemaInformationProvider {
@Override
@Nullable
public <T> ConvertHelpers.ConvertedSchemaInformation<T> getConvertedSchemaInformation(
Schema inputSchema, TypeDescriptor<T> outputType) {
if (outputType.equals(TypeDescriptor.of(Row.class))) {
// If the output is of type Row, then just forward the schema of the input type to the
// output.
return new ConvertHelpers.ConvertedSchemaInformation<>(
(SchemaCoder<T>) SchemaCoder.of(inputSchema), null);
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.schemas.utils;

import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.utils.ConvertHelpers.ConvertedSchemaInformation;
import org.apache.beam.sdk.values.TypeDescriptor;

/** Provides an instance of {@link ConvertedSchemaInformation}. Use for internal purposes. */
@Internal
public interface SchemaInformationProvider {
@Nullable
<T> ConvertedSchemaInformation<T> getConvertedSchemaInformation(
Schema inputSchema, TypeDescriptor<T> outputType);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.avro.schemas.utils;

import com.google.auto.service.AutoService;
import javax.annotation.Nullable;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.schemas.utils.ConvertHelpers;
import org.apache.beam.sdk.schemas.utils.SchemaInformationProvider;
import org.apache.beam.sdk.values.TypeDescriptor;

@AutoService(SchemaInformationProvider.class)
public class AvroSchemaInformationProvider implements SchemaInformationProvider {

@Override
@Nullable
public <T> ConvertHelpers.ConvertedSchemaInformation<T> getConvertedSchemaInformation(
Schema inputSchema, TypeDescriptor<T> outputType) {
if (outputType.equals(TypeDescriptor.of(GenericRecord.class))) {
return new ConvertHelpers.ConvertedSchemaInformation<T>(
(SchemaCoder<T>) AvroUtils.schemaCoder(AvroUtils.toAvroSchema(inputSchema)), null);
}
return null;
}
}

0 comments on commit 8037b06

Please sign in to comment.