Skip to content

Commit

Permalink
Merge pull request #7 from TongchengOpenSource/remove_plugin_lifecycle
Browse files Browse the repository at this point in the history
[Improve][Transform] Remove Fallback during parsing Transform process.
  • Loading branch information
xiaochen-zhou authored Apr 3, 2024
2 parents d8f8c27 + f0b9a62 commit 6d83fb6
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,14 @@
package org.apache.seatunnel.api.transform;

import org.apache.seatunnel.api.common.PluginIdentifierInterface;
import org.apache.seatunnel.api.common.SeaTunnelPluginLifeCycle;
import org.apache.seatunnel.api.source.SeaTunnelJobAware;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;

import java.io.Serializable;

public interface SeaTunnelTransform<T>
extends Serializable,
PluginIdentifierInterface,
SeaTunnelPluginLifeCycle,
SeaTunnelJobAware {
extends Serializable, PluginIdentifierInterface, SeaTunnelJobAware {

/** call it when Transformer initialed */
default void open() {}
Expand All @@ -45,22 +41,14 @@ default void setTypeInfo(SeaTunnelDataType<T> inputDataType) {
throw new UnsupportedOperationException("setTypeInfo method is not supported");
}

/**
* Get the data type of the records produced by this transform.
*
* @deprecated Please use {@link #getProducedCatalogTable}
* @return Produced data type.
*/
@Deprecated
SeaTunnelDataType<T> getProducedType();

/** Get the catalog table output by this transform */
CatalogTable getProducedCatalogTable();

/**
* Transform input data to {@link this#getProducedType()} types data.
* Transform input data to {@link this#getProducedCatalogTable().getSeaTunnelRowType()} types
* data.
*
* @param row the data need be transform.
* @param row the data need be transformed.
* @return transformed data.
*/
T map(T row);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import java.net.URL;
import java.util.Collections;
Expand Down Expand Up @@ -119,24 +118,25 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS

protected DataStream<Row> flinkTransform(
SeaTunnelRowType sourceType, SeaTunnelTransform transform, DataStream<Row> stream) {
TypeInformation rowTypeInfo = TypeConverterUtils.convert(transform.getProducedType());
TypeInformation rowTypeInfo =
TypeConverterUtils.convert(
transform.getProducedCatalogTable().getSeaTunnelRowType());
FlinkRowConverter transformInputRowConverter = new FlinkRowConverter(sourceType);
FlinkRowConverter transformOutputRowConverter =
new FlinkRowConverter(transform.getProducedCatalogTable().getSeaTunnelRowType());
DataStream<Row> output =
stream.flatMap(
new FlatMapFunction<Row, Row>() {
@Override
public void flatMap(Row value, Collector<Row> out) throws Exception {
SeaTunnelRow seaTunnelRow =
transformInputRowConverter.reconvert(value);
SeaTunnelRow dataRow = (SeaTunnelRow) transform.map(seaTunnelRow);
if (dataRow != null) {
Row copy = transformOutputRowConverter.convert(dataRow);
out.collect(copy);
}
}
},
(FlatMapFunction<Row, Row>)
(value, out) -> {
SeaTunnelRow seaTunnelRow =
transformInputRowConverter.reconvert(value);
SeaTunnelRow dataRow =
(SeaTunnelRow) transform.map(seaTunnelRow);
if (dataRow != null) {
Row copy = transformOutputRowConverter.convert(dataRow);
out.collect(copy);
}
},
rowTypeInfo);
return output;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,15 @@
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.core.dag.actions.TransformAction;

import org.apache.commons.lang3.tuple.ImmutablePair;

Expand Down Expand Up @@ -99,38 +96,6 @@ public Tuple2<CatalogTable, Action> parseSource(
return new Tuple2<>(catalogTable, action);
}

public Tuple2<CatalogTable, Action> parseTransform(
Config config,
JobConfig jobConfig,
String tableId,
int parallelism,
SeaTunnelRowType rowType,
Set<Action> inputActions) {
final ImmutablePair<SeaTunnelTransform<?>, Set<URL>> tuple =
ConnectorInstanceLoader.loadTransformInstance(
config, jobConfig.getJobContext(), commonPluginJars);
final SeaTunnelTransform<?> transform = tuple.getLeft();
// old logic: prepare(initialization) -> set job context -> set row type (There is a logical
// judgment that depends on before and after, not a simple set)
transform.prepare(config);
transform.setJobContext(jobConfig.getJobContext());
transform.setTypeInfo((SeaTunnelDataType) rowType);
final String actionName = createTransformActionName(0, tuple.getLeft().getPluginName());
final TransformAction action =
new TransformAction(
idGenerator.getNextId(),
actionName,
new ArrayList<>(inputActions),
transform,
tuple.getRight(),
new HashSet<>());
action.setParallelism(parallelism);
CatalogTable catalogTable =
CatalogTableUtil.getCatalogTable(
tableId, (SeaTunnelRowType) transform.getProducedType());
return new Tuple2<>(catalogTable, action);
}

public List<SinkAction<?, ?, ?, ?>> parseSinks(
int configIndex,
List<List<Tuple2<CatalogTable, Action>>> inputVertices,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableTransformFactory;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
Expand Down Expand Up @@ -405,35 +403,14 @@ private void parseTransform(
final String tableId =
readonlyConfig.getOptional(CommonOptions.RESULT_TABLE_NAME).orElse(DEFAULT_ID);

boolean fallback =
isFallback(
classLoader,
TableTransformFactory.class,
factoryId,
(factory) -> factory.createTransform(null));

Set<Action> inputActions =
inputs.stream()
.map(Tuple2::_2)
.collect(Collectors.toCollection(LinkedHashSet::new));
SeaTunnelDataType<?> expectedType = getProducedType(inputs.get(0)._2());
checkProducedTypeEquals(inputActions);
int spareParallelism = inputs.get(0)._2().getParallelism();
int parallelism =
readonlyConfig.getOptional(CommonOptions.PARALLELISM).orElse(spareParallelism);
if (fallback) {
Tuple2<CatalogTable, Action> tuple =
fallbackParser.parseTransform(
config,
jobConfig,
tableId,
parallelism,
(SeaTunnelRowType) expectedType,
inputActions);
tableWithActionMap.put(tableId, Collections.singletonList(tuple));
return;
}

CatalogTable catalogTable = inputs.get(0)._1();
SeaTunnelTransform<?> transform =
FactoryUtil.createAndPrepareTransform(
Expand Down Expand Up @@ -476,8 +453,10 @@ public static SeaTunnelDataType<?> getProducedType(Action action) {
.getProducedCatalogTable()
.getSeaTunnelRowType();
} catch (UnsupportedOperationException e) {
// TODO remove it when all connector use `getProducedCatalogTables`
return ((TransformAction) action).getTransform().getProducedType();
return ((TransformAction) action)
.getTransform()
.getProducedCatalogTable()
.getSeaTunnelRowType();
}
}
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void testMetricsOnJobRestart() throws InterruptedException {
server.getCoordinatorService().getJobStatus(jobId3)));

// check metrics
await().atMost(60000, TimeUnit.MILLISECONDS)
await().atMost(600000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
JobMetrics jobMetrics = coordinatorService.getJobMetrics(jobId3);
Expand All @@ -161,12 +161,12 @@ public void testMetricsOnJobRestart() throws InterruptedException {
server.getCoordinatorService().cancelJob(jobId3);
}

private void startJob(Long jobid, String path, boolean isStartWithSavePoint) {
LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan(path, jobid.toString(), jobid);
private void startJob(Long jobId, String path, boolean isStartWithSavePoint) {
LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan(path, jobId.toString(), jobId);

JobImmutableInformation jobImmutableInformation =
new JobImmutableInformation(
jobid,
jobId,
"Test",
isStartWithSavePoint,
nodeEngine.getSerializationService().toData(testLogicalDag),
Expand All @@ -177,7 +177,7 @@ private void startJob(Long jobid, String path, boolean isStartWithSavePoint) {
Data data = nodeEngine.getSerializationService().toData(jobImmutableInformation);

PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
server.getCoordinatorService().submitJob(jobid, data);
server.getCoordinatorService().submitJob(jobId, data);
voidPassiveCompletableFuture.join();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;

import lombok.NonNull;

Expand Down Expand Up @@ -61,12 +59,4 @@ private CatalogTable transformCatalogTable() {
protected abstract TableSchema transformTableSchema();

protected abstract TableIdentifier transformTableIdentifier();

@Override
public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
if (outputRowType != null) {
return outputRowType;
}
return getProducedCatalogTable().getTableSchema().toPhysicalRowDataType();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.seatunnel.transform.common;

import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
Expand All @@ -30,11 +29,6 @@ public abstract class AbstractSeaTunnelTransform implements SeaTunnelTransform<S

protected SeaTunnelRowType outputRowType;

@Override
public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
return outputRowType;
}

@Override
public SeaTunnelRow map(SeaTunnelRow row) {
return transformRow(row);
Expand Down

0 comments on commit 6d83fb6

Please sign in to comment.