Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#28187] Add a Java gradle task to run validates runner tests on Prism. #32919

Merged
merged 16 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 114 additions & 0 deletions .github/workflows/beam_PreCommit_Java_PVR_Prism_Loopback.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

name: PreCommit Java PVR Prism Loopback

on:
push:
tags: ['v*']
branches: ['master', 'release-*']
paths:
- 'model/**'
- 'sdks/go/pkg/beam/runners/prism/**'
- 'sdks/go/cmd/prism/**'
- 'runners/prism/**'
- 'runners/java-fn-execution/**'
- 'sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/**'
- '.github/workflows/beam_PreCommit_Java_PVR_Prism_Loopback.yml'
pull_request_target:
branches: ['master', 'release-*']
paths:
- 'model/**'
- 'sdks/go/pkg/beam/runners/prism/**'
- 'sdks/go/cmd/prism/**'
- 'runners/prism/**'
- 'runners/java-fn-execution/**'
- 'sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/**'
- 'release/trigger_all_tests.json'
- '.github/trigger_files/beam_PreCommit_Java_PVR_Prism_Loopback.json'
issue_comment:
types: [created]
schedule:
- cron: '22 2/6 * * *'
workflow_dispatch:

# This allows a subsequently queued workflow run to interrupt previous runs
concurrency:
group: '${{ github.workflow }} @ ${{ github.event.issue.number || github.event.pull_request.head.label || github.sha || github.head_ref || github.ref }}-${{ github.event.schedule || github.event.comment.id || github.event.sender.login }}'
cancel-in-progress: true

#Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event
permissions:
actions: write
pull-requests: write
checks: write
contents: read
deployments: read
id-token: none
issues: write
discussions: read
packages: read
pages: read
repository-projects: read
security-events: read
statuses: read

env:
DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}

jobs:
beam_PreCommit_Java_PVR_Prism_Loopback:
name: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
strategy:
matrix:
job_name: ["beam_PreCommit_Java_PVR_Prism_Loopback"]
job_phrase: ["Run Java_PVR_Prism_Loopback PreCommit"]
timeout-minutes: 240
runs-on: [self-hosted, ubuntu-20.04]
if: |
github.event_name == 'push' ||
github.event_name == 'pull_request_target' ||
(github.event_name == 'schedule' && github.repository == 'apache/beam') ||
github.event_name == 'workflow_dispatch' ||
github.event.comment.body == 'Run Java_PVR_Prism_Loopback PreCommit'
steps:
- uses: actions/checkout@v4
- name: Setup repository
uses: ./.github/actions/setup-action
with:
comment_phrase: ${{ matrix.job_phrase }}
github_token: ${{ secrets.GITHUB_TOKEN }}
github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
- name: Setup environment
uses: ./.github/actions/setup-environment-action
- name: run prismLoopbackValidatesRunnerTests script
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :runners:prism:java:prismLoopbackValidatesRunnerTests
- name: Archive JUnit Test Results
uses: actions/upload-artifact@v4
if: ${{ !success() }}
with:
name: JUnit Test Results
path: "**/build/reports/tests/"
- name: Upload test report
uses: actions/upload-artifact@v4
with:
name: java-code-coverage-report
path: "**/build/test-results/**/*.xml"
3 changes: 3 additions & 0 deletions runners/prism/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ ext.set('buildTarget', buildTarget)
def buildTask = tasks.named("build") {
// goPrepare is a task registered in applyGoNature.
dependsOn("goPrepare")
// Allow Go to manage the caching, not gradle.
outputs.cacheIf { false }
outputs.upToDateWhen { false }
doLast {
exec {
workingDir = modDir
Expand Down
241 changes: 241 additions & 0 deletions runners/prism/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
* limitations under the License.
*/

import groovy.json.JsonOutput

plugins { id 'org.apache.beam.module' }

applyJavaNature(
Expand Down Expand Up @@ -43,3 +45,242 @@ tasks.test {
var prismBuildTask = dependsOn(':runners:prism:build')
systemProperty 'prism.buildTarget', prismBuildTask.project.property('buildTarget').toString()
}

// Below is configuration to support running the Java Validates Runner tests.

configurations {
validatesRunner
}

dependencies {
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation library.java.hamcrest
permitUnusedDeclared library.java.hamcrest
implementation library.java.joda_time
implementation library.java.slf4j_api
implementation library.java.vendored_guava_32_1_2_jre

testImplementation library.java.hamcrest
testImplementation library.java.junit
testImplementation library.java.mockito_core
testImplementation library.java.slf4j_jdk14

validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
validatesRunner project(path: ":runners:core-java", configuration: "testRuntimeMigration")
validatesRunner project(path: project.path, configuration: "testRuntimeMigration")
}

project.evaluationDependsOn(":sdks:java:core")
project.evaluationDependsOn(":runners:core-java")

def sickbayTests = [
// PortableMetrics doesn't implement "getCommitedOrNull" from Metrics
// Preventing Prism from passing these tests.
// In particular, it doesn't subclass MetricResult with an override, and
// it explicilty passes "false" to commited supported in create.
//
// There is not currently a category for excluding these _only_ in committed mode
'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testAllCommittedMetrics',
'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedCounterMetrics',
'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedDistributionMetrics',
'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedStringSetMetrics',
'org.apache.beam.sdk.metrics.MetricsTest$CommittedMetricTests.testCommittedGaugeMetrics',

// Triggers / Accumulation modes not yet implemented in prism.
// https://github.com/apache/beam/issues/31438
'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testGlobalCombineWithDefaultsAndTriggers',
'org.apache.beam.sdk.transforms.CombineTest$BasicTests.testHotKeyCombiningWithAccumulationMode',
'org.apache.beam.sdk.transforms.windowing.WindowTest.testNoWindowFnDoesNotReassignWindows',
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState',
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testCombiningAccumulatingProcessingTime',
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerEarly',
'org.apache.beam.sdk.transforms.ParDoTest$BundleInvariantsTests.testWatermarkUpdateMidBundle',
'org.apache.beam.sdk.transforms.ViewTest.testTriggeredLatestSingleton',
// Requires Allowed Lateness, among others.
'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testEventTimeTimerSetWithinAllowedLateness',
'org.apache.beam.sdk.testing.TestStreamTest.testFirstElementLate',
'org.apache.beam.sdk.testing.TestStreamTest.testDiscardingMode',
'org.apache.beam.sdk.testing.TestStreamTest.testEarlyPanesOfWindow',
'org.apache.beam.sdk.testing.TestStreamTest.testElementsAtAlmostPositiveInfinity',
'org.apache.beam.sdk.testing.TestStreamTest.testLateDataAccumulating',
'org.apache.beam.sdk.testing.TestStreamTest.testMultipleStreams',
'org.apache.beam.sdk.testing.TestStreamTest.testProcessingTimeTrigger',

// Coding error somehow: short write: reached end of stream after reading 5 bytes; 98 bytes expected
'org.apache.beam.sdk.testing.TestStreamTest.testMultiStage',

// Prism not firing sessions correctly (seems to be merging inapppropriately)
'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testSessionsCombine',
'org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testSessionsCombineWithContext',

// Java side dying during execution.
// https://github.com/apache/beam/issues/32930
'org.apache.beam.sdk.transforms.FlattenTest.testFlattenMultipleCoders',
// Stream corruption error java side: failed:java.io.StreamCorruptedException: invalid stream header: 206E6F74
// Likely due to prism't coder changes.
'org.apache.beam.sdk.transforms.FlattenTest.testFlattenWithDifferentInputAndOutputCoders2',

// java.lang.IllegalStateException: Output with tag Tag<output> must have a schema in order to call getRowReceiver
// Ultimately because getRoeReceiver code path SDK side isn't friendly to LengthPrefix wrapping of row coders.
// https://github.com/apache/beam/issues/32931
'org.apache.beam.sdk.transforms.ParDoSchemaTest.testReadAndWrite',
'org.apache.beam.sdk.transforms.ParDoSchemaTest.testReadAndWriteMultiOutput',
'org.apache.beam.sdk.transforms.ParDoSchemaTest.testReadAndWriteWithSchemaRegistry',

// Technically these tests "succeed"
// the test is just complaining that an AssertionException isn't a RuntimeException
//
// java.lang.RuntimeException: test error in finalize
'org.apache.beam.sdk.transforms.ParDoTest$LifecycleTests.testParDoWithErrorInFinishBatch',
// java.lang.RuntimeException: test error in process
'org.apache.beam.sdk.transforms.ParDoTest$LifecycleTests.testParDoWithErrorInProcessElement',
// java.lang.RuntimeException: test error in initialize
'org.apache.beam.sdk.transforms.ParDoTest$LifecycleTests.testParDoWithErrorInStartBatch',

// Only known window fns supported, not general window merging
// Custom window fns not yet implemented in prism.
// https://github.com/apache/beam/issues/31921
'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindows',
'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindowsKeyedCollection',
'org.apache.beam.sdk.transforms.windowing.WindowTest.testMergingCustomWindowsWithoutCustomWindowTypes',
'org.apache.beam.sdk.transforms.windowing.WindowingTest.testMergingWindowing',
'org.apache.beam.sdk.transforms.windowing.WindowingTest.testNonPartitioningWindowing',
'org.apache.beam.sdk.transforms.GroupByKeyTest$WindowTests.testGroupByKeyMergingWindows',

// Possibly a different error being hidden behind the main error.
// org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow cannot be cast to class java.lang.String
// TODO(https://github.com/apache/beam/issues/29973)
'org.apache.beam.sdk.transforms.ReshuffleTest.testReshufflePreservesMetadata',
// TODO(https://github.com/apache/beam/issues/31231)
'org.apache.beam.sdk.transforms.RedistributeTest.testRedistributePreservesMetadata',

// Prism isn't handling Java's side input views properly.
// https://github.com/apache/beam/issues/32932
// java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view.
// Consider using Combine.globally().asSingleton() to combine the PCollection into a single value
'org.apache.beam.sdk.transforms.ViewTest.testDiscardingNonSingletonSideInput',
// java.util.NoSuchElementException: Empty PCollection accessed as a singleton view.
'org.apache.beam.sdk.transforms.ViewTest.testDiscardingNonSingletonSideInput',
// ava.lang.IllegalArgumentException: Duplicate values for a
'org.apache.beam.sdk.transforms.ViewTest.testMapSideInputWithNullValuesCatchesDuplicates',
// java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view....
'org.apache.beam.sdk.transforms.ViewTest.testNonSingletonSideInput',
// java.util.NoSuchElementException: Empty PCollection accessed as a singleton view.
'org.apache.beam.sdk.transforms.ViewTest.testEmptySingletonSideInput',
// Prism side encoding error.
// java.lang.IllegalStateException: java.io.EOFException
'org.apache.beam.sdk.transforms.ViewTest.testSideInputWithNestedIterables',

// Requires Time Sorted Input
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInput',
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithTestStream',
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateDataAndAllowedLateness',
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testTwoRequiresTimeSortedInputWithLateData',
'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateData',

// Timer race condition/ordering issue in Prism.
'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testTwoTimersSettingEachOtherWithCreateAsInputUnbounded',

// Missing output due to timer skew.
'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testProcessElementSkew',

// TestStream + BundleFinalization.
// Tests seem to assume individual element bundles from test stream, but prism will aggregate them, preventing
// a subsequent firing. Tests ultimately hang until timeout.
// Either a test problem, or a misunderstanding of how test stream must work problem in prism.
// Biased to test problem, due to how they are constructed.
'org.apache.beam.sdk.transforms.ParDoTest$BundleFinalizationTests.testBundleFinalization',
'org.apache.beam.sdk.transforms.ParDoTest$BundleFinalizationTests.testBundleFinalizationWithSideInputs',

// Filtered by PortableRunner tests.
// Teardown not called in exceptions
// https://github.com/apache/beam/issues/20372
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundle',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundleStateful',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetup',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetupStateful',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful',
]

/**
* Runs Java ValidatesRunner tests against the Prism Runner
* with the specified environment type.
*/
def createPrismValidatesRunnerTask = { name, environmentType ->
Task vrTask = tasks.create(name: name, type: Test, group: "Verification") {
description "PrismRunner Java $environmentType ValidatesRunner suite"
classpath = configurations.validatesRunner

var prismBuildTask = dependsOn(':runners:prism:build')
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=TestPrismRunner",
"--experiments=beam_fn_api",
"--defaultEnvironmentType=${environmentType}",
"--prismLogLevel=warn",
"--prismLocation=${prismBuildTask.project.property('buildTarget').toString()}",
"--enableWebUI=false",
])
testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs)
useJUnit {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
// Should be run only in a properly configured SDK harness environment
excludeCategories 'org.apache.beam.sdk.testing.UsesExternalService'
excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment'

// Not yet implemented in Prism
// https://github.com/apache/beam/issues/32211
excludeCategories 'org.apache.beam.sdk.testing.UsesOnWindowExpiration'
// https://github.com/apache/beam/issues/32929
excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'

// Not supported in Portable Java SDK yet.
// https://github.com/apache/beam/issues?q=is%3Aissue+is%3Aopen+MultimapState
excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState'
}
filter {
// Hangs forever with prism. Put here instead of sickbay to allow sickbay runs to terminate.
// https://github.com/apache/beam/issues/32222
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testEventTimeTimerOrderingWithCreate'

for (String test : sickbayTests) {
excludeTestsMatching test
}
}
}
return vrTask
}

tasks.register("validatesRunnerSickbay", Test) {
group = "Verification"
description "Validates Prism local runner (Sickbay Tests)"

var prismBuildTask = dependsOn(':runners:prism:build')
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--runner=TestPrismRunner",
"--experiments=beam_fn_api",
"--enableWebUI=false",
"--prismLogLevel=warn",
"--prismLocation=${prismBuildTask.project.property('buildTarget').toString()}"
])

classpath = configurations.validatesRunner
testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs)

filter {
for (String test : sickbayTests) {
includeTestsMatching test
}
}
}

task prismDockerValidatesRunner {
Task vrTask = createPrismValidatesRunnerTask("prismDockerValidatesRunnerTests", "DOCKER")
vrTask.dependsOn ":sdks:java:container:java8:docker"
}

task prismLoopbackValidatesRunner {
dependsOn createPrismValidatesRunnerTask("prismLoopbackValidatesRunnerTests", "LOOPBACK")
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ abstract class PrismExecutor {
static final String IDLE_SHUTDOWN_TIMEOUT = "-idle_shutdown_timeout=%s";
static final String JOB_PORT_FLAG_TEMPLATE = "-job_port=%s";
static final String SERVE_HTTP_FLAG_TEMPLATE = "-serve_http=%s";
static final String LOG_LEVEL_FLAG_TEMPLATE = "-log_level=%s";

protected @MonotonicNonNull Process process;
protected ExecutorService executorService = Executors.newSingleThreadExecutor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ String resolveSource() {
String resolve() throws IOException {
String from = resolveSource();

// If the location is set, and it's not an http request or a zip,
// use the binary directly.
if (!from.startsWith("http") && !from.endsWith("zip") && Files.exists(Paths.get(from))) {
return from;
}

String fromFileName = getNameWithoutExtension(from);
Path to = Paths.get(userHome(), PRISM_BIN_PATH, fromFileName);

Expand Down
Loading
Loading