Skip to content

Commit

Permalink
Add Gradle sub-projects to enable parallel running of Kafka tests (#2…
Browse files Browse the repository at this point in the history
…6153)

* add parallel number to gradle.properties

* remove unused config, attempt to configure higher parallelism for kafka tests

* Create generic task to represent Integration tests. Then, create sub-projects to enable running tests in parallel

* extend test timeout to reduce flakyness

* add comment for running locally

* run spotless

* factor kafka integration tests up a level
  • Loading branch information
johnjcasey authored Apr 13, 2023
1 parent 7919c3f commit 837733e
Show file tree
Hide file tree
Showing 14 changed files with 343 additions and 49 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.gradle.kafka

import org.gradle.api.Project
import org.gradle.api.artifacts.ConfigurationContainer
import org.gradle.api.tasks.testing.Test

import javax.inject.Inject

class KafkaTestUtilities {
abstract static class KafkaBatchIT extends Test {

@Inject
KafkaBatchIT(String delimited, String undelimited, Boolean sdfCompatible, ConfigurationContainer configurations, Project runningProject){
group = "Verification"
description = "Runs KafkaIO IT tests with Kafka clients API $delimited"
outputs.upToDateWhen { false }
testClassesDirs = runningProject.findProject(":sdks:java:io:kafka").sourceSets.test.output.classesDirs
classpath = configurations."kafkaVersion$undelimited" + runningProject.sourceSets.test.runtimeClasspath + runningProject.findProject(":sdks:java:io:kafka").sourceSets.test.runtimeClasspath

def pipelineOptions = [
'--sourceOptions={' +
'"numRecords": "1000",' +
'"keySizeBytes": "10",' +
'"valueSizeBytes": "90"' +
'}',
"--readTimeout=120",
"--kafkaTopic=beam",
"--withTestcontainers=true",
"--kafkaContainerVersion=5.5.2",
]

systemProperty "beamTestPipelineOptions", groovy.json.JsonOutput.toJson(pipelineOptions)
include '**/KafkaIOIT.class'

filter {
excludeTestsMatching "*InStreaming"
if (!sdfCompatible) {
excludeTestsMatching "*DynamicPartitions" //admin client create partitions does not exist in kafka 0.11.0.3 and kafka sdf does not appear to work for kafka versions <2.0.1
excludeTestsMatching "*SDFResumesCorrectly" //Kafka SDF does not work for kafka versions <2.0.1
excludeTestsMatching "*StopReadingFunction" //Kafka SDF does not work for kafka versions <2.0.1
excludeTestsMatching "*WatermarkUpdateWithSparseMessages" //Kafka SDF does not work for kafka versions <2.0.1
}
}
}
}
}
59 changes: 13 additions & 46 deletions sdks/java/io/kafka/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,6 @@ def kafkaVersions = [
'251': "2.5.1",
]

def sdfKafkaVersions = [
'201',
'211',
'222',
'231',
'241',
'251'
]

kafkaVersions.each{k,v -> configurations.create("kafkaVersion$k")}

dependencies {
Expand Down Expand Up @@ -125,48 +116,24 @@ kafkaVersions.each {kv ->
}
}

kafkaVersions.each {kv ->
task "kafkaVersion${kv.key}BatchIT"(type: Test) {
group = "Verification"
description = "Runs KafkaIO IT tests with Kafka clients API $kv.value"
outputs.upToDateWhen { false }
testClassesDirs = sourceSets.test.output.classesDirs
classpath = configurations."kafkaVersion${kv.key}" + sourceSets.test.runtimeClasspath

def pipelineOptions = [
'--sourceOptions={' +
'"numRecords": "1000",' +
'"keySizeBytes": "10",' +
'"valueSizeBytes": "90"' +
'}',
"--readTimeout=120",
"--kafkaTopic=beam",
"--withTestcontainers=true",
"--kafkaContainerVersion=5.5.2",
]

systemProperty "beamTestPipelineOptions", groovy.json.JsonOutput.toJson(pipelineOptions)
include '**/KafkaIOIT.class'

filter {
excludeTestsMatching "*InStreaming"
if (!(kv.key in sdfKafkaVersions)) {
excludeTestsMatching "*DynamicPartitions" //admin client create partitions does not exist in kafka 0.11.0.3 and kafka sdf does not appear to work for kafka versions <2.0.1
excludeTestsMatching "*SDFResumesCorrectly" //Kafka SDF does not work for kafka versions <2.0.1
excludeTestsMatching "*StopReadingFunction" //Kafka SDF does not work for kafka versions <2.0.1
excludeTestsMatching "*WatermarkUpdateWithSparseMessages" //Kafka SDF does not work for kafka versions <2.0.1
}
}
}
}

//Because this runs many integration jobs in parallel, each of which use a
//container, it can fail locally due to performance limitations on a desktop.
//To avoid this, use --max-workers=N, where N is less than half your CPUs.
//4 is a good start for parallelism without overloading your computer.
task kafkaVersionsCompatibilityTest {
group = "Verification"
description = 'Runs KafkaIO with different Kafka client APIs'
def testNames = createTestList(kafkaVersions, "Test")
def batchItTestNames = createTestList(kafkaVersions, "BatchIT")
dependsOn testNames
dependsOn batchItTestNames
dependsOn (":sdks:java:io:kafka:kafka-01103:kafkaVersion01103BatchIT")
dependsOn (":sdks:java:io:kafka:kafka-100:kafkaVersion100BatchIT")
dependsOn (":sdks:java:io:kafka:kafka-111:kafkaVersion111BatchIT")
dependsOn (":sdks:java:io:kafka:kafka-201:kafkaVersion201BatchIT")
dependsOn (":sdks:java:io:kafka:kafka-211:kafkaVersion211BatchIT")
dependsOn (":sdks:java:io:kafka:kafka-222:kafkaVersion222BatchIT")
dependsOn (":sdks:java:io:kafka:kafka-231:kafkaVersion231BatchIT")
dependsOn (":sdks:java:io:kafka:kafka-241:kafkaVersion241BatchIT")
dependsOn (":sdks:java:io:kafka:kafka-251:kafkaVersion251BatchIT")
}

static def createTestList(Map<String, String> prefixMap, String suffix) {
Expand Down
23 changes: 23 additions & 0 deletions sdks/java/io/kafka/kafka-01103/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
project.ext {
delimited="0.11.0.3"
undelimited="01103"
}

apply from: "../kafka-integration-test.gradle"
23 changes: 23 additions & 0 deletions sdks/java/io/kafka/kafka-100/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
project.ext {
delimited="1.0.0"
undelimited="100"
}

apply from: "../kafka-integration-test.gradle"
23 changes: 23 additions & 0 deletions sdks/java/io/kafka/kafka-111/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
project.ext {
delimited="1.1.1"
undelimited="111"
}

apply from: "../kafka-integration-test.gradle"
23 changes: 23 additions & 0 deletions sdks/java/io/kafka/kafka-201/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
project.ext {
delimited="2.0.1"
undelimited="201"
}

apply from: "../kafka-integration-test.gradle"
23 changes: 23 additions & 0 deletions sdks/java/io/kafka/kafka-211/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
project.ext {
delimited="2.1.1"
undelimited="211"
}

apply from: "../kafka-integration-test.gradle"
23 changes: 23 additions & 0 deletions sdks/java/io/kafka/kafka-222/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
project.ext {
delimited="2.2.2"
undelimited="222"
}

apply from: "../kafka-integration-test.gradle"
23 changes: 23 additions & 0 deletions sdks/java/io/kafka/kafka-231/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
project.ext {
delimited="2.3.1"
undelimited="231"
}

apply from: "../kafka-integration-test.gradle"
23 changes: 23 additions & 0 deletions sdks/java/io/kafka/kafka-241/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
project.ext {
delimited="2.4.1"
undelimited="241"
}

apply from: "../kafka-integration-test.gradle"
23 changes: 23 additions & 0 deletions sdks/java/io/kafka/kafka-251/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
project.ext {
delimited="2.5.1"
undelimited="251"
}

apply from: "../kafka-integration-test.gradle"
Loading

0 comments on commit 837733e

Please sign in to comment.