Skip to content

Commit

Permalink
Add support for Flink 1.19
Browse files Browse the repository at this point in the history
  • Loading branch information
kennknowles committed Oct 16, 2024
1 parent aa74cc0 commit 3978922
Show file tree
Hide file tree
Showing 17 changed files with 313 additions and 12 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/beam_PostCommit_Java_Tpcds_Flink.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,5 +101,5 @@ jobs:
with:
gradle-command: :sdks:java:testing:tpcds:run
arguments: |
-Ptpcds.runner=:runners:flink:1.18 \
-Ptpcds.runner=:runners:flink:1.19 \
"-Ptpcds.args=${{env.tpcdsBigQueryArgs}} ${{env.tpcdsInfluxDBArgs}} ${{ env.GRADLE_COMMAND_ARGUMENTS }} --queries=${{env.tpcdsQueriesArg}}" \
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ jobs:
- name: run validatesRunner script
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :runners:flink:1.18:validatesRunner
gradle-command: :runners:flink:1.19:validatesRunner
- name: Archive JUnit Test Results
uses: actions/upload-artifact@v4
if: ${{ !success() }}
Expand Down
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)).
* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)).
* Added support for Flink 1.19
## I/Os
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
Expand Down
2 changes: 1 addition & 1 deletion contributor-docs/release-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,7 @@ write to BigQuery, and create a cluster of machines for running containers (for
```
**Flink Local Runner**
```
./gradlew :runners:flink:1.18:runQuickstartJavaFlinkLocal \
./gradlew :runners:flink:1.19:runQuickstartJavaFlinkLocal \
-Prepourl=https://repository.apache.org/content/repositories/orgapachebeam-${KEY} \
-Pver=${RELEASE_VERSION}
```
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,6 @@ docker_image_default_repo_root=apache
docker_image_default_repo_prefix=beam_

# supported flink versions
flink_versions=1.15,1.16,1.17,1.18
flink_versions=1.15,1.16,1.17,1.18,1.19
# supported python versions
python_versions=3.8,3.9,3.10,3.11,3.12
2 changes: 1 addition & 1 deletion release/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ task("runJavaExamplesValidationTask") {
dependsOn(":runners:direct-java:runQuickstartJavaDirect")
dependsOn(":runners:google-cloud-dataflow-java:runQuickstartJavaDataflow")
dependsOn(":runners:spark:3:runQuickstartJavaSpark")
dependsOn(":runners:flink:1.18:runQuickstartJavaFlinkLocal")
dependsOn(":runners:flink:1.19:runQuickstartJavaFlinkLocal")
dependsOn(":runners:direct-java:runMobileGamingJavaDirect")
dependsOn(":runners:google-cloud-dataflow-java:runMobileGamingJavaDataflow")
dependsOn(":runners:twister2:runQuickstartJavaTwister2")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.flink.streaming;

import java.util.Collection;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.BackendBuildingException;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;

class MemoryStateBackendWrapper {
static <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry,
TtlTimeProvider ttlTimeProvider,
MetricGroup metricGroup,
Collection<KeyedStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry)
throws BackendBuildingException {

MemoryStateBackend backend = new MemoryStateBackend();
return backend.createKeyedStateBackend(
env,
jobID,
operatorIdentifier,
keySerializer,
numberOfKeyGroups,
keyGroupRange,
kvStateRegistry,
ttlTimeProvider,
metricGroup,
stateHandles,
cancelStreamRegistry);
}

static OperatorStateBackend createOperatorStateBackend(
Environment env,
String operatorIdentifier,
Collection<OperatorStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry)
throws Exception {
MemoryStateBackend backend = new MemoryStateBackend();
return backend.createOperatorStateBackend(
env, operatorIdentifier, stateHandles, cancelStreamRegistry);
}
}
25 changes: 25 additions & 0 deletions runners/flink/1.19/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

project.ext {
flink_major = '1.19'
flink_version = '1.19.0'
}

// Load the main build script which contains all build logic.
apply from: "../flink_runner.gradle"
26 changes: 26 additions & 0 deletions runners/flink/1.19/job-server-container/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

def basePath = '../../job-server-container'

project.ext {
resource_path = basePath
}

// Load the main build script which contains all build logic.
apply from: "$basePath/flink_job_server_container.gradle"
31 changes: 31 additions & 0 deletions runners/flink/1.19/job-server/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

def basePath = '../../job-server'

project.ext {
// Look for the source code in the parent module
main_source_dirs = ["$basePath/src/main/java"]
test_source_dirs = ["$basePath/src/test/java"]
main_resources_dirs = ["$basePath/src/main/resources"]
test_resources_dirs = ["$basePath/src/test/resources"]
archives_base_name = 'beam-runners-flink-1.19-job-server'
}

// Load the main build script which contains all build logic.
apply from: "$basePath/flink_job_server.gradle"
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.flink.streaming;

import java.util.Collection;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.BackendBuildingException;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateBackendParametersImpl;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;

class MemoryStateBackendWrapper {
static <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry,
TtlTimeProvider ttlTimeProvider,
MetricGroup metricGroup,
Collection<KeyedStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry)
throws BackendBuildingException {

MemoryStateBackend backend = new MemoryStateBackend();
return backend.createKeyedStateBackend(
new KeyedStateBackendParametersImpl<>(
env,
jobID,
operatorIdentifier,
keySerializer,
numberOfKeyGroups,
keyGroupRange,
kvStateRegistry,
ttlTimeProvider,
metricGroup,
stateHandles,
cancelStreamRegistry));
}

static OperatorStateBackend createOperatorStateBackend(
Environment env,
String operatorIdentifier,
Collection<OperatorStateHandle> stateHandles,
CloseableRegistry cancelStreamRegistry)
throws Exception {
MemoryStateBackend backend = new MemoryStateBackend();
return backend.createOperatorStateBackend(
new OperatorStateBackendParametersImpl(
env, operatorIdentifier, stateHandles, cancelStreamRegistry));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.flink.streaming;

import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.RegularOperatorChain;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;

/** {@link StreamSource} utilities, that bridge incompatibilities between Flink releases. */
public class StreamSources {

public static <OutT, SrcT extends SourceFunction<OutT>> void run(
StreamSource<OutT, SrcT> streamSource,
Object lockingObject,
Output<StreamRecord<OutT>> collector)
throws Exception {
streamSource.run(lockingObject, collector, createOperatorChain(streamSource));
}

private static OperatorChain<?, ?> createOperatorChain(AbstractStreamOperator<?> operator) {
return new RegularOperatorChain<>(
operator.getContainingTask(),
StreamTask.createRecordWriterDelegate(
operator.getOperatorConfig(), new MockEnvironmentBuilder().build()));
}

/** The emitWatermarkStatus method was added in Flink 1.14, so we need to wrap Output. */
public interface OutputWrapper<T> extends Output<T> {
@Override
default void emitWatermarkStatus(WatermarkStatus watermarkStatus) {}

/** In Flink 1.19 the {@code emitRecordAttributes} method was added. */
@Override
default void emitRecordAttributes(RecordAttributes recordAttributes) {
throw new UnsupportedOperationException("emitRecordAttributes not implemented");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.junit.Ignore;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand All @@ -41,10 +40,9 @@ public class FlinkBroadcastStateInternalsTest extends StateInternalsTest {

@Override
protected StateInternals createStateInternals() {
MemoryStateBackend backend = new MemoryStateBackend();
try {
OperatorStateBackend operatorStateBackend =
backend.createOperatorStateBackend(
MemoryStateBackendWrapper.createOperatorStateBackend(
new DummyEnvironment("test", 1, 0), "", Collections.emptyList(), null);
return new FlinkBroadcastStateInternals<>(
1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
Expand Down Expand Up @@ -185,9 +184,8 @@ public void testGlobalWindowWatermarkHoldClear() throws Exception {
}

public static KeyedStateBackend<ByteBuffer> createStateBackend() throws Exception {
MemoryStateBackend backend = new MemoryStateBackend();
AbstractKeyedStateBackend<ByteBuffer> keyedStateBackend =
backend.createKeyedStateBackend(
MemoryStateBackendWrapper.createKeyedStateBackend(
new DummyEnvironment("test", 1, 0),
new JobID(),
"test_op",
Expand Down
Loading

0 comments on commit 3978922

Please sign in to comment.