From 0f0f23b8649ebc63e389d65a5007ad4e667955bd Mon Sep 17 00:00:00 2001 From: Jeffrey Kinard Date: Tue, 1 Oct 2024 22:04:14 -0400 Subject: [PATCH 1/2] [yaml] package kafka_clients 3.1.2 in Kafka Provider jar Signed-off-by: Jeffrey Kinard --- sdks/java/io/expansion-service/build.gradle | 3 ++- sdks/java/io/kafka/build.gradle | 1 + sdks/java/io/kafka/kafka-312/build.gradle | 24 +++++++++++++++++++++ 3 files changed, 27 insertions(+), 1 deletion(-) create mode 100644 sdks/java/io/kafka/kafka-312/build.gradle diff --git a/sdks/java/io/expansion-service/build.gradle b/sdks/java/io/expansion-service/build.gradle index 6097e5f5a5a5..d7fef3d82332 100644 --- a/sdks/java/io/expansion-service/build.gradle +++ b/sdks/java/io/expansion-service/build.gradle @@ -52,7 +52,8 @@ dependencies { runtimeOnly ("org.apache.iceberg:iceberg-hive-metastore:1.4.2") runtimeOnly project(path: ":sdks:java:io:iceberg:hive:exec", configuration: "shadow") - runtimeOnly library.java.kafka_clients + // TODO(https://github.com/apache/beam/pull/32486/) Use library.java.kafka_clients once 3.1.2 is set as default + runtimeOnly ("org.apache.kafka:kafka-clients:3.1.2") runtimeOnly library.java.slf4j_jdk14 } diff --git a/sdks/java/io/kafka/build.gradle b/sdks/java/io/kafka/build.gradle index 6cba8f8d0fb3..e30099906391 100644 --- a/sdks/java/io/kafka/build.gradle +++ b/sdks/java/io/kafka/build.gradle @@ -44,6 +44,7 @@ def kafkaVersions = [ '231': "2.3.1", '241': "2.4.1", '251': "2.5.1", + '312': "3.1.2", ] kafkaVersions.each{k,v -> configurations.create("kafkaVersion$k")} diff --git a/sdks/java/io/kafka/kafka-312/build.gradle b/sdks/java/io/kafka/kafka-312/build.gradle new file mode 100644 index 000000000000..af2ad3717b65 --- /dev/null +++ b/sdks/java/io/kafka/kafka-312/build.gradle @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +project.ext { + delimited="3.1.2" + undelimited="312" + sdfCompatible=true +} + +apply from: "../kafka-integration-test.gradle" \ No newline at end of file From 9778c15f71362028f4e7218921a91f708ca9e274 Mon Sep 17 00:00:00 2001 From: Jeffrey Kinard Date: Wed, 2 Oct 2024 14:18:25 -0400 Subject: [PATCH 2/2] fix test failure Signed-off-by: Jeffrey Kinard --- .../test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 25ff6dad1244..fb8b29fe7280 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -27,6 +27,7 @@ import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.isA; +import static org.hamcrest.Matchers.matchesPattern; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -970,8 +971,9 @@ public void testUnboundedSourceWithWrongTopic() { thrown.expect(PipelineExecutionException.class); thrown.expectCause(instanceOf(IllegalStateException.class)); thrown.expectMessage( - "Could not find any partitions info. Please check Kafka configuration and make sure that " - + "provided topics exist."); + matchesPattern( + ".*Could not find any partitions(?: info)?\\. Please check Kafka " + + "configuration and (?:make sure that provided topics exist|topic names).*")); int numElements = 1000; KafkaIO.Read reader =