Skip to content

Commit

Permalink
[FLINK-36931][cdc] FlinkCDC YAML supports batch mode
Browse files Browse the repository at this point in the history
  • Loading branch information
aiwenmo committed Dec 24, 2024
1 parent 8424467 commit 9b3b534
Show file tree
Hide file tree
Showing 5 changed files with 300 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ public class PipelineOptions {
.defaultValue(1)
.withDescription("Parallelism of the pipeline");

public static final ConfigOption<RunTimeMode> PIPELINE_RUNTIME_MODE =
ConfigOptions.key("runtime-mode")
.enumType(RunTimeMode.class)
.defaultValue(RunTimeMode.STREAMING)
.withDescription("Run time mode of the pipeline");

public static final ConfigOption<SchemaChangeBehavior> PIPELINE_SCHEMA_CHANGE_BEHAVIOR =
ConfigOptions.key("schema.change.behavior")
.enumType(SchemaChangeBehavior.class)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.common.pipeline;

import org.apache.flink.cdc.common.annotation.PublicEvolving;

@PublicEvolving
public enum RunTimeMode {
STREAMING,
BATCH
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.flink.cdc.composer.flink;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.pipeline.RunTimeMode;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.common.source.DataSource;
Expand Down Expand Up @@ -93,6 +95,13 @@ private FlinkPipelineComposer(StreamExecutionEnvironment env, boolean isBlocking
public PipelineExecution compose(PipelineDef pipelineDef) {
Configuration pipelineDefConfig = pipelineDef.getConfig();

boolean isBatchMode = false;
if (RunTimeMode.BATCH.equals(
pipelineDefConfig.get(PipelineOptions.PIPELINE_RUNTIME_MODE))) {
isBatchMode = true;
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
}

int parallelism = pipelineDefConfig.get(PipelineOptions.PIPELINE_PARALLELISM);
env.getConfig().setParallelism(parallelism);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.composer.flink;

import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.pipeline.RunTimeMode;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.cdc.composer.definition.PipelineDef;
import org.apache.flink.cdc.composer.definition.SinkDef;
import org.apache.flink.cdc.composer.definition.SourceDef;
import org.apache.flink.cdc.connectors.values.ValuesDatabase;
import org.apache.flink.cdc.connectors.values.factory.ValuesDataFactory;
import org.apache.flink.cdc.connectors.values.sink.ValuesDataSink;
import org.apache.flink.cdc.connectors.values.sink.ValuesDataSinkOptions;
import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper;
import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.junit5.MiniClusterExtension;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.Collections;
import java.util.List;

import static org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper.TABLE_1;
import static org.apache.flink.configuration.CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL;
import static org.assertj.core.api.Assertions.assertThat;

public class FlinkPipelineBatchComposerITCase {

private static final int MAX_PARALLELISM = 4;

// Always use parent-first classloader for CDC classes.
// The reason is that ValuesDatabase uses static field for holding data, we need to make sure
// the class is loaded by AppClassloader so that we can verify data in the test case.
private static final org.apache.flink.configuration.Configuration MINI_CLUSTER_CONFIG =
new org.apache.flink.configuration.Configuration();

static {
MINI_CLUSTER_CONFIG.set(
ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL,
Collections.singletonList("org.apache.flink.cdc"));
}

/**
* Use {@link MiniClusterExtension} to reduce the overhead of restarting the MiniCluster for
* every test case.
*/
@RegisterExtension
static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(MAX_PARALLELISM)
.setConfiguration(MINI_CLUSTER_CONFIG)
.build());

private final PrintStream standardOut = System.out;
private final ByteArrayOutputStream outCaptor = new ByteArrayOutputStream();

@BeforeEach
void init() {
// Take over STDOUT as we need to check the output of values sink
System.setOut(new PrintStream(outCaptor));
// Initialize in-memory database
ValuesDatabase.clear();
}

@AfterEach
void cleanup() {
System.setOut(standardOut);
}

@Test
void testSnapshotStreamTable() throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();

// Setup value source
Configuration sourceConfig = new Configuration();
sourceConfig.set(
ValuesDataSourceOptions.EVENT_SET_ID,
ValuesDataSourceHelper.EventSetId.BATCH_TABLE);
SourceDef sourceDef =
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);

// Setup value sink
Configuration sinkConfig = new Configuration();
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
sinkConfig.set(ValuesDataSinkOptions.SINK_API, ValuesDataSink.SinkApi.SINK_V2);
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);

// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
sinkDef,
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
pipelineConfig);

// Execute the pipeline
PipelineExecution execution = composer.compose(pipelineDef);
execution.execute();

// Check result in ValuesDatabase
List<String> results = ValuesDatabase.getResults(TABLE_1);
assertThat(results)
.contains(
"default_namespace.default_schema.table1:col1=1;col2=1",
"default_namespace.default_schema.table1:col1=2;col2=2",
"default_namespace.default_schema.table1:col1=3;col2=3");

// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().replace("\r\n", "\n").trim().split("\n");
assertThat(outputEvents)
.containsExactly(
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}");
}

@Test
void testSnapshotBatchTable() throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();

// Setup value source
Configuration sourceConfig = new Configuration();
sourceConfig.set(
ValuesDataSourceOptions.EVENT_SET_ID,
ValuesDataSourceHelper.EventSetId.BATCH_TABLE);
SourceDef sourceDef =
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);

// Setup value sink
Configuration sinkConfig = new Configuration();
sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
sinkConfig.set(ValuesDataSinkOptions.SINK_API, ValuesDataSink.SinkApi.SINK_V2);
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);

// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(PipelineOptions.PIPELINE_RUNTIME_MODE, RunTimeMode.BATCH);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
sinkDef,
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
pipelineConfig);

// Execute the pipeline
PipelineExecution execution = composer.compose(pipelineDef);
execution.execute();

// Check result in ValuesDatabase
List<String> results = ValuesDatabase.getResults(TABLE_1);
assertThat(results)
.contains(
"default_namespace.default_schema.table1:col1=1;col2=1",
"default_namespace.default_schema.table1:col1=2;col2=2",
"default_namespace.default_schema.table1:col1=3;col2=3");

// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().replace("\r\n", "\n").trim().split("\n");
assertThat(outputEvents)
.containsExactly(
"CreateTableEvent{tableId=default_namespace.default_schema.table1, schema=columns={`col1` STRING,`col2` STRING}, primaryKeys=col1, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public enum EventSetId {
SINGLE_SPLIT_MULTI_TABLES,
MULTI_SPLITS_SINGLE_TABLE,
CUSTOM_SOURCE_EVENTS,
TRANSFORM_TABLE
TRANSFORM_TABLE,
BATCH_TABLE
}

public static final TableId TABLE_1 =
Expand Down Expand Up @@ -120,6 +121,11 @@ public static void setSourceEvents(EventSetId eventType) {
sourceEvents = transformTable();
break;
}
case BATCH_TABLE:
{
sourceEvents = batchTable();
break;
}
default:
throw new IllegalArgumentException(eventType + " is not supported");
}
Expand Down Expand Up @@ -644,4 +650,52 @@ public static List<List<Event>> transformTable() {
eventOfSplits.add(split1);
return eventOfSplits;
}

public static List<List<Event>> batchTable() {
List<List<Event>> eventOfSplits = new ArrayList<>();
List<Event> split1 = new ArrayList<>();

// create table
Schema schema =
Schema.newBuilder()
.physicalColumn("col1", DataTypes.STRING())
.physicalColumn("col2", DataTypes.STRING())
.primaryKey("col1")
.build();
CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1, schema);
split1.add(createTableEvent);

BinaryRecordDataGenerator generator =
new BinaryRecordDataGenerator(RowType.of(DataTypes.STRING(), DataTypes.STRING()));
// insert
DataChangeEvent insertEvent1 =
DataChangeEvent.insertEvent(
TABLE_1,
generator.generate(
new Object[] {
BinaryStringData.fromString("1"),
BinaryStringData.fromString("1")
}));
split1.add(insertEvent1);
DataChangeEvent insertEvent2 =
DataChangeEvent.insertEvent(
TABLE_1,
generator.generate(
new Object[] {
BinaryStringData.fromString("2"),
BinaryStringData.fromString("2")
}));
split1.add(insertEvent2);
DataChangeEvent insertEvent3 =
DataChangeEvent.insertEvent(
TABLE_1,
generator.generate(
new Object[] {
BinaryStringData.fromString("3"),
BinaryStringData.fromString("3")
}));
split1.add(insertEvent3);
eventOfSplits.add(split1);
return eventOfSplits;
}
}

0 comments on commit 9b3b534

Please sign in to comment.