diff --git a/sdks/java/io/expansion-service/build.gradle b/sdks/java/io/expansion-service/build.gradle index 6097e5f5a5a5..d7fef3d82332 100644 --- a/sdks/java/io/expansion-service/build.gradle +++ b/sdks/java/io/expansion-service/build.gradle @@ -52,7 +52,8 @@ dependencies { runtimeOnly ("org.apache.iceberg:iceberg-hive-metastore:1.4.2") runtimeOnly project(path: ":sdks:java:io:iceberg:hive:exec", configuration: "shadow") - runtimeOnly library.java.kafka_clients + // TODO(https://github.com/apache/beam/pull/32486/) Use library.java.kafka_clients once 3.1.2 is set as default + runtimeOnly ("org.apache.kafka:kafka-clients:3.1.2") runtimeOnly library.java.slf4j_jdk14 } diff --git a/sdks/java/io/kafka/build.gradle b/sdks/java/io/kafka/build.gradle index 6cba8f8d0fb3..e30099906391 100644 --- a/sdks/java/io/kafka/build.gradle +++ b/sdks/java/io/kafka/build.gradle @@ -44,6 +44,7 @@ def kafkaVersions = [ '231': "2.3.1", '241': "2.4.1", '251': "2.5.1", + '312': "3.1.2", ] kafkaVersions.each{k,v -> configurations.create("kafkaVersion$k")} diff --git a/sdks/java/io/kafka/kafka-312/build.gradle b/sdks/java/io/kafka/kafka-312/build.gradle new file mode 100644 index 000000000000..af2ad3717b65 --- /dev/null +++ b/sdks/java/io/kafka/kafka-312/build.gradle @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +project.ext { + delimited="3.1.2" + undelimited="312" + sdfCompatible=true +} + +apply from: "../kafka-integration-test.gradle" \ No newline at end of file diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 25ff6dad1244..fb8b29fe7280 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -27,6 +27,7 @@ import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.isA; +import static org.hamcrest.Matchers.matchesPattern; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -970,8 +971,9 @@ public void testUnboundedSourceWithWrongTopic() { thrown.expect(PipelineExecutionException.class); thrown.expectCause(instanceOf(IllegalStateException.class)); thrown.expectMessage( - "Could not find any partitions info. Please check Kafka configuration and make sure that " - + "provided topics exist."); + matchesPattern( + ".*Could not find any partitions(?: info)?\\. Please check Kafka " + + "configuration and (?:make sure that provided topics exist|topic names).*")); int numElements = 1000; KafkaIO.Read reader =