diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/kafka/KafkaTestUtilities.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/kafka/KafkaTestUtilities.groovy index bb08e79edd3c..a3ae6833d579 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/kafka/KafkaTestUtilities.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/kafka/KafkaTestUtilities.groovy @@ -28,11 +28,13 @@ class KafkaTestUtilities { @Inject KafkaBatchIT(String delimited, String undelimited, Boolean sdfCompatible, ConfigurationContainer configurations, Project runningProject){ + def kafkaioProject = runningProject.findProject(":sdks:java:io:kafka") group = "Verification" description = "Runs KafkaIO IT tests with Kafka clients API $delimited" outputs.upToDateWhen { false } testClassesDirs = runningProject.findProject(":sdks:java:io:kafka").sourceSets.test.output.classesDirs - classpath = configurations."kafkaVersion$undelimited" + runningProject.sourceSets.test.runtimeClasspath + runningProject.findProject(":sdks:java:io:kafka").sourceSets.test.runtimeClasspath + classpath = runningProject.sourceSets.test.runtimeClasspath + kafkaioProject.configurations."kafkaVersion$undelimited" + kafkaioProject.sourceSets.test.runtimeClasspath + systemProperty "beam.target.kafka.version", delimited def pipelineOptions = [ '--sourceOptions={' + diff --git a/sdks/java/extensions/avro/build.gradle b/sdks/java/extensions/avro/build.gradle index 8ff0612a0eab..6631779e609c 100644 --- a/sdks/java/extensions/avro/build.gradle +++ b/sdks/java/extensions/avro/build.gradle @@ -128,6 +128,7 @@ avroVersions.each { k, v -> description = "Runs Avro extension tests with Avro version $v" outputs.upToDateWhen { false } classpath = sourceSets."avro$k".runtimeClasspath + systemProperty "beam.target.avro.version", v include '**/*.class' exclude '**/AvroIOTest$NeedsRunnerTests$*.class' diff --git a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/AvroVersionVerificationTest.java b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/AvroVersionVerificationTest.java new file mode 100644 index 000000000000..f9e9a54b0531 --- /dev/null +++ b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/AvroVersionVerificationTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.avro; + +import static org.junit.Assert.assertEquals; + +import org.apache.avro.Schema; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.junit.Assume; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class AvroVersionVerificationTest { + @Test + public void testAvroVersion() { + @Nullable String targetVer = System.getProperty("beam.target.avro.version"); + Assume.assumeTrue(!Strings.isNullOrEmpty(targetVer)); + String actualVer = Schema.class.getPackage().getImplementationVersion(); + assertEquals(targetVer, actualVer); + } +} diff --git a/sdks/java/io/kafka/build.gradle b/sdks/java/io/kafka/build.gradle index e30099906391..4b00dcb4ffa8 100644 --- a/sdks/java/io/kafka/build.gradle +++ b/sdks/java/io/kafka/build.gradle @@ -122,6 +122,8 @@ kafkaVersions.each {kv -> outputs.upToDateWhen { false } testClassesDirs = sourceSets.test.output.classesDirs classpath = configurations."kafkaVersion${kv.key}" + sourceSets.test.runtimeClasspath + systemProperty "beam.target.kafka.version", kv.value + include '**/KafkaIOTest.class' } } diff --git a/sdks/java/io/kafka/kafka-integration-test.gradle b/sdks/java/io/kafka/kafka-integration-test.gradle index 1aeb0c97f93b..3bbab72ff77c 100644 --- a/sdks/java/io/kafka/kafka-integration-test.gradle +++ b/sdks/java/io/kafka/kafka-integration-test.gradle @@ -29,10 +29,8 @@ provideIntegrationTestingDependencies() enableJavaPerformanceTesting() dependencies { - implementation "org.apache.kafka:kafka-clients:$delimited" - permitUnusedDeclared "org.apache.kafka:kafka-clients:$delimited" - implementation project(":sdks:java:io:kafka") - permitUnusedDeclared project(":sdks:java:io:kafka") + // Do not set kafka-client dependency here otherwise the version will be overwritten by BeamModulePlugin + // instead, rely on io/kafka/build.gradle's custom configurations with forced kafka-client resolutionStrategy testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1' testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1' } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index 952e29f75104..4bda8cf28d4e 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -632,13 +632,15 @@ public AverageRecordSize load(TopicPartition topicPartition) throws Exception { @Teardown public void teardown() throws Exception { - final Deserializer keyDeserializerInstance = - Preconditions.checkStateNotNull(this.keyDeserializerInstance); - final Deserializer valueDeserializerInstance = - Preconditions.checkStateNotNull(this.valueDeserializerInstance); try { - Closeables.close(keyDeserializerInstance, true); - Closeables.close(valueDeserializerInstance, true); + if (valueDeserializerInstance != null) { + Closeables.close(valueDeserializerInstance, true); + valueDeserializerInstance = null; + } + if (keyDeserializerInstance != null) { + Closeables.close(keyDeserializerInstance, true); + keyDeserializerInstance = null; + } } catch (Exception anyException) { LOG.warn("Fail to close resource during finishing bundle.", anyException); } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java index fba81c51130d..9f46e140ca52 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assume.assumeFalse; import java.io.IOException; import java.time.Instant; @@ -86,6 +87,7 @@ import org.apache.beam.sdk.values.PCollectionRowTuple; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; @@ -99,6 +101,7 @@ import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.AppInfoParser; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.junit.AfterClass; @@ -168,6 +171,13 @@ public class KafkaIOIT { @BeforeClass public static void setup() throws IOException { + // check kafka version first + @Nullable String targetVer = System.getProperty("beam.target.kafka.version"); + if (!Strings.isNullOrEmpty(targetVer)) { + String actualVer = AppInfoParser.getVersion(); + assertEquals(targetVer, actualVer); + } + options = IOITHelper.readIOTestPipelineOptions(Options.class); sourceOptions = fromJsonString(options.getSourceOptions(), SyntheticSourceOptions.class); if (options.isWithTestcontainers()) { @@ -359,6 +369,10 @@ public void processElement(@Element String element, OutputReceiver outpu // This test verifies that bad data from Kafka is properly sent to the error handler @Test public void testKafkaIOSDFReadWithErrorHandler() throws IOException { + // TODO(https://github.com/apache/beam/issues/32704) re-enable when fixed, or remove the support + // for these old kafka-client versions + String actualVer = AppInfoParser.getVersion(); + assumeFalse(actualVer.compareTo("2.0.0") >= 0 && actualVer.compareTo("2.3.0") < 0); writePipeline .apply(Create.of(KV.of("key", "val"))) .apply( diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index fb8b29fe7280..764e406f71cb 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -115,6 +115,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; @@ -146,12 +147,14 @@ import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.common.utils.Utils; import org.checkerframework.checker.nullness.qual.Nullable; import org.hamcrest.collection.IsIterableContainingInAnyOrder; import org.hamcrest.collection.IsIterableWithSize; import org.joda.time.Duration; import org.joda.time.Instant; +import org.junit.Assume; import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -515,6 +518,15 @@ public void testReadAvroGenericRecordsWithConfluentSchemaRegistry() { p.run(); } + @Test + public void testKafkaVersion() { + // KafkaIO compatibility tests run unit tests in KafkaIOTest + @Nullable String targetVer = System.getProperty("beam.target.kafka.version"); + Assume.assumeTrue(!Strings.isNullOrEmpty(targetVer)); + String actualVer = AppInfoParser.getVersion(); + assertEquals(targetVer, actualVer); + } + @Test public void testReadAvroSpecificRecordsWithConfluentSchemaRegistry() { int numElements = 100; @@ -1582,6 +1594,11 @@ public byte[] serialize(String topic, Long data) { public void configure(Map configs, boolean isKey) { // intentionally left blank for compatibility with older kafka versions } + + @Override + public void close() { + // intentionally left blank for compatibility with kafka-client v2.2 or older + } } @Test diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java index a9e4a4eddb61..3189bbb140f0 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java @@ -150,6 +150,11 @@ public static class FailingDeserializer implements Deserializer { public FailingDeserializer() {} + @Override + public void configure(Map configs, boolean isKey) { + // intentionally left blank for compatibility with older kafka versions + } + @Override public String deserialize(String topic, byte[] data) { throw new SerializationException("Intentional serialization exception");