Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Avro] Use "extensions/avro" instead of avro from"core" in Java SDK modules #24992

Merged
merged 13 commits into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
* RunInference PTransform will accept model paths as SideInputs in Python SDK. ([#24042](https://github.com/apache/beam/issues/24042))
* RunInference supports ONNX runtime in Python SDK ([#22972](https://github.com/apache/beam/issues/22972))
* Tensorflow Model Handler for RunInference in Python SDK ([#25366](https://github.com/apache/beam/issues/25366))
* Java SDK modules migrated to use `:sdks:java:extensions:avro` ([#24748](https://github.com/apache/beam/issues/24748))

## I/Os

Expand Down
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ tasks.register("javaPreCommit") {
dependsOn(":sdks:java:expansion-service:build")
dependsOn(":sdks:java:expansion-service:app:build")
dependsOn(":sdks:java:extensions:arrow:build")
dependsOn(":sdks:java:extensions:avro:build")
dependsOn(":sdks:java:extensions:euphoria:build")
dependsOn(":sdks:java:extensions:google-cloud-platform-core:build")
dependsOn(":sdks:java:extensions:jackson:build")
Expand Down
1 change: 1 addition & 0 deletions examples/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ dependencies {
implementation library.java.vendored_guava_26_0_jre
implementation library.java.kafka_clients
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(":sdks:java:extensions:avro")
implementation project(":sdks:java:extensions:google-cloud-platform-core")
implementation project(":sdks:java:extensions:python")
implementation project(":sdks:java:io:google-cloud-platform")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.Default;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.Default;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.avro.reflect.Nullable;
import org.apache.beam.examples.complete.game.utils.WriteToText;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
*/
package org.apache.beam.examples.complete.kafkatopubsub.avro;

import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;

/**
* Example of AVRO serialization class. To configure your AVRO schema, change this class to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import org.apache.beam.examples.complete.kafkatopubsub.avro.AvroDataClassKafkaAvroDeserializer;
import org.apache.beam.examples.complete.kafkatopubsub.kafka.consumer.SslConsumerFactoryFn;
import org.apache.beam.examples.complete.kafkatopubsub.options.KafkaToPubsubOptions;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.kafka.KafkaIO;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@
import javax.annotation.Nullable;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.DoubleCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.extensions.ml.AnnotateText;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.FileIO;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;

@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.beam.examples.subprocess.utils;

import org.apache.beam.examples.subprocess.configuration.SubProcessConfiguration;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;

/** Contains the configuration for the external library. */
@DefaultCoder(AvroCoder.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.beam.examples.complete.game.LeaderBoard.CalculateTeamScores;
import org.apache.beam.examples.complete.game.LeaderBoard.CalculateUserScores;
import org.apache.beam.examples.complete.game.UserScore.GameActionInfo;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

import org.apache.beam.examples.complete.game.StatefulTeamScore.UpdateTeamScoreFn;
import org.apache.beam.examples.complete.game.UserScore.GameActionInfo;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
Expand Down
1 change: 1 addition & 0 deletions examples/kotlin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ dependencies {
// Add the dependency that sdks:java:core that is marked as provided
implementation library.java.hamcrest
permitUnusedDeclared library.java.hamcrest
implementation project(":sdks:java:extensions:avro")
implementation project(":sdks:java:extensions:google-cloud-platform-core")
implementation project(":sdks:java:io:google-cloud-platform")
implementation library.java.avro
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import com.google.api.services.bigquery.model.*
import com.google.common.collect.ImmutableList
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import org.apache.beam.sdk.Pipeline
import org.apache.beam.sdk.coders.AvroCoder
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder
import org.apache.beam.sdk.coders.DefaultCoder
import org.apache.beam.sdk.coders.DoubleCoder
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
Expand Down
2 changes: 2 additions & 0 deletions runners/core-construction-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ dependencies {
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation project(path: ":model:job-management", configuration: "shadow")
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":sdks:java:extensions:avro")
implementation project(path: ":sdks:java:fn-execution")
implementation library.java.vendored_grpc_1_48_1
implementation library.java.vendored_guava_26_0_jre
Expand All @@ -73,5 +74,6 @@ dependencies {
testImplementation library.java.jackson_dataformat_yaml
testImplementation project(path: ":model:fn-execution", configuration: "shadow")
testImplementation project(path: ":sdks:java:core", configuration: "testRuntimeMigration")
testImplementation project(path: ":sdks:java:extensions:avro")
testRuntimeOnly library.java.slf4j_jdk14
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import com.google.auto.service.AutoService;
import java.util.Map;
import org.apache.beam.sdk.coders.AvroGenericCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.avro.coders.AvroGenericCoder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;

/** Coder registrar for AvroGenericCoder. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import java.util.List;
import org.apache.avro.Schema;
import org.apache.beam.runners.core.construction.CoderTranslation.TranslationContext;
import org.apache.beam.sdk.coders.AvroGenericCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.avro.coders.AvroGenericCoder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;

/** Coder translator for AvroGenericCoder. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
import org.apache.beam.runners.core.construction.CoderTranslation.TranslationContext;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.BooleanCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
Expand All @@ -48,6 +47,7 @@
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
Expand Down
4 changes: 3 additions & 1 deletion runners/direct-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ def dependOnProjects = [":runners:core-construction-java",
":runners:core-java",
":runners:local-java",
":runners:java-fn-execution",
":sdks:java:fn-execution"
":sdks:java:fn-execution",
":sdks:java:extensions:avro"
]

applyJavaNature(
Expand Down Expand Up @@ -96,6 +97,7 @@ dependencies {
permitUnusedDeclared library.java.vendored_grpc_1_48_1
permitUnusedDeclared project(":runners:java-fn-execution")
permitUnusedDeclared project(":sdks:java:fn-execution")
permitUnusedDeclared project(":sdks:java:extensions:avro")
examplesJavaIntegrationTest project(project.path)
examplesJavaIntegrationTest project(":examples:java")
examplesJavaIntegrationTest project(path: ":examples:java", configuration: "testRuntimeMigration")
Expand Down
2 changes: 2 additions & 0 deletions runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ dependencies {
implementation library.java.vendored_guava_26_0_jre
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(":sdks:java:extensions:avro")
implementation project(":sdks:java:extensions:google-cloud-platform-core")
implementation project(":sdks:java:io:kafka")
implementation project(":sdks:java:io:google-cloud-platform")
Expand Down Expand Up @@ -115,6 +116,7 @@ dependencies {
testImplementation library.java.junit
testImplementation project(path: ":sdks:java:io:google-cloud-platform", configuration: "testRuntimeMigration")
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
testImplementation project(path: ":sdks:java:extensions:avro")
testImplementation project(path: ":sdks:java:extensions:google-cloud-platform-core", configuration: "testRuntimeMigration")
testImplementation project(path: ":runners:core-construction-java", configuration: "testRuntimeMigration")
testImplementation project(path: ":sdks:java:extensions:python", configuration: "testRuntimeMigration")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.avro.Schema;
import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;

/** A {@link CloudObjectTranslator} for {@link AvroCoder}. */
@SuppressWarnings({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.Set;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
Expand All @@ -54,6 +53,7 @@
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.SchemaCoder;
Expand Down
1 change: 1 addition & 0 deletions runners/google-cloud-dataflow-java/worker/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def sdk_provided_shaded_project_dependencies = [
]
def sdk_provided_project_dependencies = [
":runners:google-cloud-dataflow-java",
":sdks:java:extensions:avro",
":sdks:java:extensions:google-cloud-platform-core",
":sdks:java:io:google-cloud-platform",
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.apache.avro.Schema;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.AvroSource;
import org.apache.beam.sdk.io.AvroSource.AvroReader;
import org.apache.beam.sdk.extensions.avro.io.AvroSource;
import org.apache.beam.sdk.extensions.avro.io.AvroSource.AvroReader;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.OffsetBasedSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
import org.apache.beam.runners.core.construction.resources.PipelineResources;
import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.SideInputValues;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.AvroGenericCoder;
import org.apache.beam.sdk.coders.BigDecimalCoder;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
Expand Down Expand Up @@ -227,9 +225,11 @@ public void registerClasses(Kryo kryo) {
kryo.register(SideInputValues.ByWindow.class);
kryo.register(SideInputValues.Global.class);

// avro coders
tryToRegister(kryo, "org.apache.beam.sdk.extensions.avro.coders.AvroCoder");
tryToRegister(kryo, "org.apache.beam.sdk.extensions.avro.coders.AvroGenericCoder");

// standard coders of org.apache.beam.sdk.coders
kryo.register(AvroCoder.class);
kryo.register(AvroGenericCoder.class);
kryo.register(BigDecimalCoder.class);
kryo.register(BigEndianIntegerCoder.class);
kryo.register(BigEndianLongCoder.class);
Expand Down Expand Up @@ -283,5 +283,13 @@ public void registerClasses(Kryo kryo) {
kryo.register(TupleTag.class);
kryo.register(TupleTagList.class);
}

private void tryToRegister(Kryo kryo, String className) {
try {
kryo.register(Class.forName(className));
} catch (ClassNotFoundException e) {
LOG.info("Class {}} was not found on classpath", className);
}
}
}
}
2 changes: 2 additions & 0 deletions runners/spark/spark_runner.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ dependencies {
implementation project(":runners:java-fn-execution")
implementation project(":runners:java-job-service")
implementation project(":sdks:java:extensions:google-cloud-platform-core")
implementation project(":sdks:java:extensions:avro")
implementation library.java.jackson_annotations
implementation library.java.slf4j_api
implementation library.java.joda_time
Expand Down Expand Up @@ -191,6 +192,7 @@ dependencies {
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
// SparkStateInternalsTest extends abstract StateInternalsTest
testImplementation project(path: ":runners:core-java", configuration: "testRuntimeMigration")
testImplementation project(path: ":sdks:java:extensions:avro", configuration: "testRuntimeMigration")
testImplementation project(":sdks:java:harness")
testImplementation library.java.avro
testImplementation "org.apache.kafka:kafka_$spark_scala_version:2.4.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.extensions.avro.io.AvroIO;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public List<CoderProvider> getCoderProviders() {
* A {@link CoderProvider} that uses the {@code @DefaultCoder} annotation to provide {@link
* CoderProvider coder providers} that create {@link Coder}s.
*/
static class DefaultCoderProvider extends CoderProvider {
public static class DefaultCoderProvider extends CoderProvider {
private static final Logger LOG = LoggerFactory.getLogger(DefaultCoderProvider.class);

/**
Expand Down
Loading