Skip to content

[Bug]: Python SDFs (e.g. PeriodicImpulse) running in Flink and polling using tracker.defer_remainder have checkpoint size growing indefinitely #63

[Bug]: Python SDFs (e.g. PeriodicImpulse) running in Flink and polling using tracker.defer_remainder have checkpoint size growing indefinitely

[Bug]: Python SDFs (e.g. PeriodicImpulse) running in Flink and polling using tracker.defer_remainder have checkpoint size growing indefinitely #63

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name: LoadTests Java GBK Smoke
on:
issue_comment:
types: [created]
workflow_dispatch:
#Setting explicit permissions for the action to avoid the default permissions which are `write-all` in case of pull_request_target event
permissions:
actions: write
pull-requests: read
checks: read
contents: read
deployments: read
id-token: none
issues: read
discussions: read
packages: read
pages: read
repository-projects: read
security-events: read
statuses: read
# This allows a subsequently queued workflow run to interrupt previous runs
concurrency:
group: '${{ github.workflow }} @ ${{ github.event.issue.number || github.sha || github.head_ref || github.ref }}-${{ github.event.comment.body || github.event.sender.login }}'
cancel-in-progress: true
env:
GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}
jobs:
beam_LoadTests_Java_GBK_Smoke:
if: |
github.event_name == 'workflow_dispatch' ||
github.event.comment.body == 'Run Java Load Tests GBK Smoke'
runs-on: [self-hosted, ubuntu-20.04, main]
timeout-minutes: 720
name: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
strategy:
matrix:
job_name: ["beam_LoadTests_Java_GBK_Smoke"]
job_phrase: ["Run Java Load Tests GBK Smoke"]
steps:
- uses: actions/checkout@v3
- name: Setup repository
uses: ./.github/actions/setup-action
with:
comment_phrase: ${{ matrix.job_phrase }}
github_token: ${{ secrets.GITHUB_TOKEN }}
github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
- name: Prepare test arguments
uses: ./.github/actions/test-arguments-action
with:
test-type: load
test-language: java
argument-file-paths: |
${{ github.workspace }}/.github/workflows/load-tests-job-configs/java_Smoke_GroupByKey_Direct.txt
${{ github.workspace }}/.github/workflows/load-tests-job-configs/java_Smoke_GroupByKey_Dataflow.txt
${{ github.workspace }}/.github/workflows/load-tests-job-configs/java_Smoke_GroupByKey_Flink.txt
${{ github.workspace }}/.github/workflows/load-tests-job-configs/java_Smoke_GroupByKey_Spark.txt
- name: Set current datetime
id: datetime
run: |
echo "datetime=$(date '+%m%d%H%M%S' --utc)" >> $GITHUB_OUTPUT
# The env variables are created and populated in the test-arguments-action as "<github.job>_test_arguments_<argument_file_paths_index>"
- name: run GroupByKey load test Direct
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :sdks:java:testing:load-tests:run
arguments: |
-PloadTest.mainClass=org.apache.beam.sdk.loadtests.GroupByKeyLoadTest \
-Prunner=:runners:direct-java \
'-PloadTest.args=${{ env.beam_LoadTests_Java_GBK_Smoke_test_arguments_1 }}' \
- name: run GroupByKey load test Dataflow
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :sdks:java:testing:load-tests:run
arguments: |
-PloadTest.mainClass=org.apache.beam.sdk.loadtests.GroupByKeyLoadTest \
-Prunner=:runners:google-cloud-dataflow-java \
'-PloadTest.args=${{ env.beam_LoadTests_Java_GBK_Smoke_test_arguments_2 }}' \
- name: run GroupByKey load test Flink
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :sdks:java:testing:load-tests:run
arguments: |
--info \
-PloadTest.mainClass=org.apache.beam.sdk.loadtests.GroupByKeyLoadTest \
-Prunner=:runners:flink:1.15 \
'-PloadTest.args=${{ env.beam_LoadTests_Java_GBK_Smoke_test_arguments_3 }}' \
- name: run GroupByKey load test Spark
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :sdks:java:testing:load-tests:run
arguments: |
-PloadTest.mainClass=org.apache.beam.sdk.loadtests.GroupByKeyLoadTest \
-Prunner=:runners:spark:3 \
'-PloadTest.args=${{ env.beam_LoadTests_Java_GBK_Smoke_test_arguments_4 }}'