Skip to content

Commit

Permalink
[Avro] Use "extensions/avro" instead of avro from"core" in Java SDK m…
Browse files Browse the repository at this point in the history
…odules (#24992)

* Use "extensions/avro" instead of avro from"core" in Java SDK modules

* Use "extensions/avro" instead of avro from"core" in sdks/java/io/kafka

* Use "extensions/avro" instead of avro from"core" in sdks/java/io/google-cloud-platform

* Resolve rebase conflicts

* Use "extensions/avro" for Dataflow runner

* Use "extensions/avro" for core-construction runner

* Add avro extension as a dependency for direct runner

* Copy Avro-related tests to Avro extension

* Address review comments

* Depend javaPreCommit on extensions:avro

* Address review comments

* Revert CountingSource.java

* Up-to-date Avro usages
  • Loading branch information
aromanenko-dev authored Feb 22, 2023
1 parent d8e5047 commit 95a6566
Show file tree
Hide file tree
Showing 127 changed files with 614 additions and 126 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
* RunInference PTransform will accept model paths as SideInputs in Python SDK. ([#24042](https://github.com/apache/beam/issues/24042))
* RunInference supports ONNX runtime in Python SDK ([#22972](https://github.com/apache/beam/issues/22972))
* Tensorflow Model Handler for RunInference in Python SDK ([#25366](https://github.com/apache/beam/issues/25366))
* Java SDK modules migrated to use `:sdks:java:extensions:avro` ([#24748](https://github.com/apache/beam/issues/24748))

## I/Os

Expand Down
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ tasks.register("javaPreCommit") {
dependsOn(":sdks:java:expansion-service:build")
dependsOn(":sdks:java:expansion-service:app:build")
dependsOn(":sdks:java:extensions:arrow:build")
dependsOn(":sdks:java:extensions:avro:build")
dependsOn(":sdks:java:extensions:euphoria:build")
dependsOn(":sdks:java:extensions:google-cloud-platform-core:build")
dependsOn(":sdks:java:extensions:jackson:build")
Expand Down
1 change: 1 addition & 0 deletions examples/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ dependencies {
implementation library.java.vendored_guava_26_0_jre
implementation library.java.kafka_clients
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(":sdks:java:extensions:avro")
implementation project(":sdks:java:extensions:google-cloud-platform-core")
implementation project(":sdks:java:extensions:python")
implementation project(":sdks:java:io:google-cloud-platform")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.Default;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.Default;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.avro.reflect.Nullable;
import org.apache.beam.examples.complete.game.utils.WriteToText;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
*/
package org.apache.beam.examples.complete.kafkatopubsub.avro;

import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;

/**
* Example of AVRO serialization class. To configure your AVRO schema, change this class to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import org.apache.beam.examples.complete.kafkatopubsub.avro.AvroDataClassKafkaAvroDeserializer;
import org.apache.beam.examples.complete.kafkatopubsub.kafka.consumer.SslConsumerFactoryFn;
import org.apache.beam.examples.complete.kafkatopubsub.options.KafkaToPubsubOptions;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.kafka.KafkaIO;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@
import javax.annotation.Nullable;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.DoubleCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.extensions.ml.AnnotateText;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.FileIO;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;

@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.beam.examples.subprocess.utils;

import org.apache.beam.examples.subprocess.configuration.SubProcessConfiguration;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;

/** Contains the configuration for the external library. */
@DefaultCoder(AvroCoder.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.beam.examples.complete.game.LeaderBoard.CalculateTeamScores;
import org.apache.beam.examples.complete.game.LeaderBoard.CalculateUserScores;
import org.apache.beam.examples.complete.game.UserScore.GameActionInfo;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

import org.apache.beam.examples.complete.game.StatefulTeamScore.UpdateTeamScoreFn;
import org.apache.beam.examples.complete.game.UserScore.GameActionInfo;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
Expand Down
1 change: 1 addition & 0 deletions examples/kotlin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ dependencies {
// Add the dependency that sdks:java:core that is marked as provided
implementation library.java.hamcrest
permitUnusedDeclared library.java.hamcrest
implementation project(":sdks:java:extensions:avro")
implementation project(":sdks:java:extensions:google-cloud-platform-core")
implementation project(":sdks:java:io:google-cloud-platform")
implementation library.java.avro
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import com.google.api.services.bigquery.model.*
import com.google.common.collect.ImmutableList
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import org.apache.beam.sdk.Pipeline
import org.apache.beam.sdk.coders.AvroCoder
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder
import org.apache.beam.sdk.coders.DefaultCoder
import org.apache.beam.sdk.coders.DoubleCoder
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
Expand Down
2 changes: 2 additions & 0 deletions runners/core-construction-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ dependencies {
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation project(path: ":model:job-management", configuration: "shadow")
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":sdks:java:extensions:avro")
implementation project(path: ":sdks:java:fn-execution")
implementation library.java.vendored_grpc_1_48_1
implementation library.java.vendored_guava_26_0_jre
Expand All @@ -73,5 +74,6 @@ dependencies {
testImplementation library.java.jackson_dataformat_yaml
testImplementation project(path: ":model:fn-execution", configuration: "shadow")
testImplementation project(path: ":sdks:java:core", configuration: "testRuntimeMigration")
testImplementation project(path: ":sdks:java:extensions:avro")
testRuntimeOnly library.java.slf4j_jdk14
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import com.google.auto.service.AutoService;
import java.util.Map;
import org.apache.beam.sdk.coders.AvroGenericCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.avro.coders.AvroGenericCoder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;

/** Coder registrar for AvroGenericCoder. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import java.util.List;
import org.apache.avro.Schema;
import org.apache.beam.runners.core.construction.CoderTranslation.TranslationContext;
import org.apache.beam.sdk.coders.AvroGenericCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.avro.coders.AvroGenericCoder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;

/** Coder translator for AvroGenericCoder. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
import org.apache.beam.runners.core.construction.CoderTranslation.TranslationContext;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.BooleanCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
Expand All @@ -48,6 +47,7 @@
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
Expand Down
4 changes: 3 additions & 1 deletion runners/direct-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ def dependOnProjects = [":runners:core-construction-java",
":runners:core-java",
":runners:local-java",
":runners:java-fn-execution",
":sdks:java:fn-execution"
":sdks:java:fn-execution",
":sdks:java:extensions:avro"
]

applyJavaNature(
Expand Down Expand Up @@ -96,6 +97,7 @@ dependencies {
permitUnusedDeclared library.java.vendored_grpc_1_48_1
permitUnusedDeclared project(":runners:java-fn-execution")
permitUnusedDeclared project(":sdks:java:fn-execution")
permitUnusedDeclared project(":sdks:java:extensions:avro")
examplesJavaIntegrationTest project(project.path)
examplesJavaIntegrationTest project(":examples:java")
examplesJavaIntegrationTest project(path: ":examples:java", configuration: "testRuntimeMigration")
Expand Down
2 changes: 2 additions & 0 deletions runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ dependencies {
implementation library.java.vendored_guava_26_0_jre
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(":sdks:java:extensions:avro")
implementation project(":sdks:java:extensions:google-cloud-platform-core")
implementation project(":sdks:java:io:kafka")
implementation project(":sdks:java:io:google-cloud-platform")
Expand Down Expand Up @@ -115,6 +116,7 @@ dependencies {
testImplementation library.java.junit
testImplementation project(path: ":sdks:java:io:google-cloud-platform", configuration: "testRuntimeMigration")
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
testImplementation project(path: ":sdks:java:extensions:avro")
testImplementation project(path: ":sdks:java:extensions:google-cloud-platform-core", configuration: "testRuntimeMigration")
testImplementation project(path: ":runners:core-construction-java", configuration: "testRuntimeMigration")
testImplementation project(path: ":sdks:java:extensions:python", configuration: "testRuntimeMigration")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.avro.Schema;
import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;

/** A {@link CloudObjectTranslator} for {@link AvroCoder}. */
@SuppressWarnings({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.Set;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
Expand All @@ -54,6 +53,7 @@
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.TimestampPrefixingWindowCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.SchemaCoder;
Expand Down
1 change: 1 addition & 0 deletions runners/google-cloud-dataflow-java/worker/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def sdk_provided_shaded_project_dependencies = [
]
def sdk_provided_project_dependencies = [
":runners:google-cloud-dataflow-java",
":sdks:java:extensions:avro",
":sdks:java:extensions:google-cloud-platform-core",
":sdks:java:io:google-cloud-platform",
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.apache.avro.Schema;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.AvroSource;
import org.apache.beam.sdk.io.AvroSource.AvroReader;
import org.apache.beam.sdk.extensions.avro.io.AvroSource;
import org.apache.beam.sdk.extensions.avro.io.AvroSource.AvroReader;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.OffsetBasedSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
import org.apache.beam.runners.core.construction.resources.PipelineResources;
import org.apache.beam.runners.spark.structuredstreaming.SparkStructuredStreamingPipelineOptions;
import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.SideInputValues;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.AvroGenericCoder;
import org.apache.beam.sdk.coders.BigDecimalCoder;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
Expand Down Expand Up @@ -227,9 +225,11 @@ public void registerClasses(Kryo kryo) {
kryo.register(SideInputValues.ByWindow.class);
kryo.register(SideInputValues.Global.class);

// avro coders
tryToRegister(kryo, "org.apache.beam.sdk.extensions.avro.coders.AvroCoder");
tryToRegister(kryo, "org.apache.beam.sdk.extensions.avro.coders.AvroGenericCoder");

// standard coders of org.apache.beam.sdk.coders
kryo.register(AvroCoder.class);
kryo.register(AvroGenericCoder.class);
kryo.register(BigDecimalCoder.class);
kryo.register(BigEndianIntegerCoder.class);
kryo.register(BigEndianLongCoder.class);
Expand Down Expand Up @@ -283,5 +283,13 @@ public void registerClasses(Kryo kryo) {
kryo.register(TupleTag.class);
kryo.register(TupleTagList.class);
}

private void tryToRegister(Kryo kryo, String className) {
try {
kryo.register(Class.forName(className));
} catch (ClassNotFoundException e) {
LOG.info("Class {}} was not found on classpath", className);
}
}
}
}
2 changes: 2 additions & 0 deletions runners/spark/spark_runner.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ dependencies {
implementation project(":runners:java-fn-execution")
implementation project(":runners:java-job-service")
implementation project(":sdks:java:extensions:google-cloud-platform-core")
implementation project(":sdks:java:extensions:avro")
implementation library.java.jackson_annotations
implementation library.java.slf4j_api
implementation library.java.joda_time
Expand Down Expand Up @@ -191,6 +192,7 @@ dependencies {
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
// SparkStateInternalsTest extends abstract StateInternalsTest
testImplementation project(path: ":runners:core-java", configuration: "testRuntimeMigration")
testImplementation project(path: ":sdks:java:extensions:avro", configuration: "testRuntimeMigration")
testImplementation project(":sdks:java:harness")
testImplementation library.java.avro
testImplementation "org.apache.kafka:kafka_$spark_scala_version:2.4.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.extensions.avro.io.AvroIO;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public List<CoderProvider> getCoderProviders() {
* A {@link CoderProvider} that uses the {@code @DefaultCoder} annotation to provide {@link
* CoderProvider coder providers} that create {@link Coder}s.
*/
static class DefaultCoderProvider extends CoderProvider {
public static class DefaultCoderProvider extends CoderProvider {
private static final Logger LOG = LoggerFactory.getLogger(DefaultCoderProvider.class);

/**
Expand Down
Loading

0 comments on commit 95a6566

Please sign in to comment.