Skip to content

Commit

Permalink
Extract dag descriptor dao (#92)
Browse files Browse the repository at this point in the history
* adapt v2.0 when get descriptors

---------

Co-authored-by: zeyu10 <[email protected]>
  • Loading branch information
techloghub and zeyu10 authored Oct 29, 2024
1 parent 0353b75 commit d52996e
Show file tree
Hide file tree
Showing 54 changed files with 3,815 additions and 857 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ dependency-reduced-pom.xml
.DS_Store
/rill-flow-web/src/main/resources/static/
.vscode/
logs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.weibo.rill.flow.interfaces.model.mapping.Mapping;
import com.weibo.rill.flow.interfaces.model.resource.BaseResource;
import com.weibo.rill.flow.olympicene.core.model.strategy.CallbackConfig;
import com.weibo.rill.flow.interfaces.model.strategy.Timeline;
import com.weibo.rill.flow.interfaces.model.task.BaseTask;
import com.weibo.rill.flow.olympicene.core.model.strategy.CallbackConfig;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
Expand All @@ -39,6 +39,7 @@ public class DAG {
private String version;
private DAGType type;
private Timeline timeline;
@Setter
private List<BaseTask> tasks;
@Setter
private List<BaseResource> resources;
Expand All @@ -48,6 +49,9 @@ public class DAG {
@Setter
private Map<String, List<Mapping>> commonMapping;
private String inputSchema;
private Map<String, Object> output;
@Setter
private String endTaskName;

@JsonCreator
public DAG(@JsonProperty("workspace") String workspace,
Expand All @@ -62,7 +66,8 @@ public DAG(@JsonProperty("workspace") String workspace,
@JsonProperty("commonMapping") Map<String, List<Mapping>> commonMapping,
@JsonProperty("namespace") String namespace,
@JsonProperty("service") String service,
@JsonProperty("inputSchema") String inputSchema) {
@JsonProperty("inputSchema") String inputSchema,
@JsonProperty("output") Map<String, Object> output) {
this.workspace = StringUtils.isBlank(workspace) ? namespace : workspace;
this.dagName = StringUtils.isBlank(dagName) ? service : dagName;
this.version = version;
Expand All @@ -74,5 +79,6 @@ public DAG(@JsonProperty("workspace") String workspace,
this.defaultContext = defaultContext;
this.commonMapping = commonMapping;
this.inputSchema = inputSchema;
this.output = output;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2021-2023 Weibo, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.weibo.rill.flow.olympicene.core.model.dag;

import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
public class DescriptorPO {
private String descriptor;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2021-2023 Weibo, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.weibo.rill.flow.olympicene.core.model.dag;

import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
public class DescriptorVO {
private String descriptor;
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public ChoiceTask(
@JsonProperty("description") String description,
@JsonProperty("category") String category,
@JsonProperty("next") String next,
@JsonProperty("input") Map<String, Object> input,
@JsonProperty("inputMappings") List<Mapping> inputMappings,
@JsonProperty("outputMappings") List<Mapping> outputMappings,
@JsonProperty("choices") List<Choice> choices,
Expand All @@ -55,7 +56,8 @@ public ChoiceTask(
@JsonProperty("keyExp") String keyExp,
@JsonProperty("parameters") Map<String, Object> parameters,
@JsonProperty("templateId") String templateId) {
super(name, title, description, category, next, false, inputMappings, outputMappings, progress, degrade, timeline, isKeyCallback, keyExp, parameters, templateId);
super(name, title, description, category, next, false, inputMappings, outputMappings, progress, degrade,
timeline, isKeyCallback, keyExp, parameters, templateId, input);
this.choices = choices;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ public ForeachTask(
@JsonProperty("isKeyCallback") boolean isKeyCallback,
@JsonProperty("keyExp") String keyExp,
@JsonProperty("parameters") Map<String, Object> parameters,
@JsonProperty("templateId") String templateId) {
super(name, title, description, category, next, false, inputMappings, outputMappings, progress, degrade, timeline, isKeyCallback, keyExp, parameters, templateId);
@JsonProperty("templateId") String templateId,
@JsonProperty("input") Map<String, Object> input) {
super(name, title, description, category, next, false, inputMappings, outputMappings, progress, degrade,
timeline, isKeyCallback, keyExp, parameters, templateId, input);
this.synchronization = synchronization;
this.iterationMapping = iterationMapping;
this.tasks = tasks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.weibo.rill.flow.interfaces.model.strategy.Timeline;
import com.weibo.rill.flow.interfaces.model.task.BaseTask;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

import java.util.ArrayList;
Expand All @@ -35,6 +36,7 @@
@Getter
@Setter
@JsonTypeName("pass")
@NoArgsConstructor
public class PassTask extends BaseTask {

@JsonCreator
Expand All @@ -51,8 +53,10 @@ public PassTask(@JsonProperty("name") String name,
@JsonProperty("isKeyCallback") boolean isKeyCallback,
@JsonProperty("keyExp") String keyExp,
@JsonProperty("parameters") Map<String, Object> parameters,
@JsonProperty("templateId") String templateId) {
super(name, title, description, category, next, false, inputMappings, outputMappings, progress, degrade, timeline, isKeyCallback, keyExp, parameters, templateId);
@JsonProperty("templateId") String templateId,
@JsonProperty("input") Map<String, Object> input) {
super(name, title, description, category, next, false, inputMappings, outputMappings, progress, degrade,
timeline, isKeyCallback, keyExp, parameters, templateId, input);
Optional.ofNullable(timeline).ifPresent(it -> it.setTimeoutInSeconds(null));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ public ReturnTask(@JsonProperty("name") String name,
@JsonProperty("isKeyCallback") boolean isKeyCallback,
@JsonProperty("keyExp") String keyExp,
@JsonProperty("parameters") Map<String, Object> parameters,
@JsonProperty("templateId") String templateId) {
super(name, title, description, category, next, false, inputMappings, outputMappings, progress, degrade, timeline, isKeyCallback, keyExp, parameters, templateId);
@JsonProperty("templateId") String templateId,
@JsonProperty("input") Map<String, Object> input) {
super(name, title, description, category, next, false, inputMappings, outputMappings, progress, degrade,
timeline, isKeyCallback, keyExp, parameters, templateId, input);
Optional.ofNullable(timeline).ifPresent(it -> it.setTimeoutInSeconds(null));
this.conditions = conditions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ public SuspenseTask(@JsonProperty("name") String name,
@JsonProperty("isKeyCallback") boolean isKeyCallback,
@JsonProperty("keyExp") String keyExp,
@JsonProperty("parameters") Map<String, Object> parameters,
@JsonProperty("templateId") String templateId) {
super(name, title, description, category, next, tolerance, inputMappings, outputMappings, progress, degrade, timeline, isKeyCallback, keyExp, parameters, templateId);
@JsonProperty("templateId") String templateId,
@JsonProperty("input") Map<String, Object> input) {
super(name, title, description, category, next, tolerance, inputMappings, outputMappings, progress, degrade,
timeline, isKeyCallback, keyExp, parameters, templateId, input);
this.conditions = conditions;
this.interruptions = interruptions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ public SwitchTask(
@JsonProperty("isKeyCallback") boolean isKeyCallback,
@JsonProperty("keyExp") String keyExp,
@JsonProperty("parameters") Map<String, Object> parameters,
@JsonProperty("templateId") String templateId) {
@JsonProperty("templateId") String templateId,
@JsonProperty("input") Map<String, Object> input) {
super(name, title, description, category, null, false, inputMappings, null, progress,
degrade, timeline, isKeyCallback, keyExp, parameters, templateId);
degrade, timeline, isKeyCallback, keyExp, parameters, templateId, input);
this.switches = switches;
this.setNext(switches.stream().map(Switch::getNext).collect(Collectors.joining(",")));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class DAGWalkHelperTest extends Specification {

def "task status of parent task should be correct"() {
given:
ForeachTask baseTask = new ForeachTask('base_1', '', '',TaskCategory.FOREACH.getValue(), null, null, null, null, null, null, null, null, null, false, null, null, null)
ForeachTask baseTask = new ForeachTask('base_1', '', '',TaskCategory.FOREACH.getValue(), null, null, null, null, null, null, null, null, null, false, null, null, null, null)

TaskInfo parentTask = new TaskInfo(name: 'parent',
task: baseTask,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class ObjectMapperFactory {
}

private static final YAMLFactory YAML_FACTORY = new YAMLFactory()
.configure(YAMLGenerator.Feature.USE_NATIVE_TYPE_ID, false)
.configure(YAMLGenerator.Feature.WRITE_DOC_START_MARKER, false)
.configure(YAMLGenerator.Feature.USE_NATIVE_OBJECT_ID, false);
private static final ObjectMapper YAML_MAPPER = new ObjectMapper(YAML_FACTORY);
Expand All @@ -63,6 +64,7 @@ public class ObjectMapperFactory {
YAML_MAPPER.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE);
YAML_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
YAML_MAPPER.configure(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL, true);
YAML_MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL);
YAML_MAPPER.setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
YAML_MAPPER.registerSubtypes(
new NamedType(FunctionTask.class, "function"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class DAGInfoDAOTest extends Specification {
def setup() {
dagInfo.executionId = executionId
dagInfo.dagStatus = DAGStatus.NOT_STARTED
dagInfo.dag = new DAG("workspace", "dagName", "1.0.0", DAGType.FLOW, null, Lists.newArrayList(), null, null, null, null, "ns", "service", null)
dagInfo.dag = new DAG("workspace", "dagName", "1.0.0", DAGType.FLOW, null, Lists.newArrayList(), null, null, null, null, "ns", "service", null, null)
}

def "updateDagInfo redis eval shardingKey check"() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
@Slf4j
public class JSONPathInputOutputMapping implements InputOutputMapping, JSONPath {
Configuration conf = Configuration.builder().options(Option.DEFAULT_PATH_LEAF_TO_NULL).build();
private static final Pattern JSONPATH_PATTERN = Pattern.compile("\\[\"(.*?)\"]|\\['(.*?)']");
private static final Pattern JSONPATH_PATTERN = Pattern.compile("\\[(.*?)]");

@Value("${rill.flow.function.trigger.uri}")
private String rillFlowFunctionTriggerUri;
Expand Down Expand Up @@ -201,26 +201,69 @@ public Map<String, Object> setValue(Map<String, Object> map, Object value, Strin
while (matcher.find()) {
if (matcher.group(1) != null) {
jsonPathParts.add(matcher.group(1));
} else if (matcher.group(2) != null) {
jsonPathParts.add(matcher.group(2));
}
}

jsonPathParts.remove(jsonPathParts.size() - 1);

Object current = map;
for (String part: jsonPathParts) {
for (int i = 0; i < jsonPathParts.size() - 1; i++) {
String part = jsonPathParts.get(i);
if (part.startsWith("\"") || part.startsWith("'")) {
part = part.substring(1, part.length() - 1);
}
if (current instanceof Map) {
Map<String, Object> mapCurrent = (Map<String, Object>) current;
current = mapCurrent.computeIfAbsent(part, k -> new HashMap<>());
} else {
break;
current = processMapJsonPathPart(current, part, jsonPathParts, i);
} else if (current instanceof List) {
current = processListJsonPathPart(current, part, jsonPathParts, i);
}
}

return JsonPath.using(conf).parse(map).set(path, value).json();
}

private Object processListJsonPathPart(Object current, String part, List<String> jsonPathParts, int i) {
List<Object> listCurrent = (List<Object>) current;
int index = Integer.parseInt(part);
Object insertPosition = listCurrent.get(index);
if (jsonPathParts.get(i + 1).matches("\\d+")) {
// 1. 下一个元素是数字,也就是数组的索引,所以需要创建数组并且填充到索引位置
List<Object> nextArray = createAndFillNextArrayPart(insertPosition, jsonPathParts, i);
listCurrent.set(index, nextArray);
} else if (i + 1 < jsonPathParts.size() && insertPosition == null) {
// 2. 下一个元素不是数字,则创建 map
listCurrent.set(index, new HashMap<>());
}
return listCurrent.get(index);
}

private Object processMapJsonPathPart(Object current, String part, List<String> jsonPathParts, int i) {
Map<String, Object> mapCurrent = (Map<String, Object>) current;
Object currentValue = mapCurrent.get(part);
if (jsonPathParts.get(i + 1).matches("\\d+")) {
List<Object> nextArray = createAndFillNextArrayPart(currentValue, jsonPathParts, i);
mapCurrent.put(part, nextArray);
} else if (i + 1 < jsonPathParts.size() && currentValue == null) {
mapCurrent.put(part, new HashMap<>());
}
return mapCurrent.get(part);
}

/**
* 为下一个元素创建数组类型对象,并用 null 值填充指定元素个数
*/
private List<Object> createAndFillNextArrayPart(Object nextPart, List<String> jsonPathParts, int i) {
List<Object> nextArray;
if (nextPart instanceof List) {
nextArray = (List<Object>) nextPart;
} else {
nextArray = new ArrayList<>();
}
int nextIndex = Integer.parseInt(jsonPathParts.get(i + 1));
for (int j = nextArray.size(); j <= nextIndex; j++) {
nextArray.add(null);
}
return nextArray;
}

@Override
public Map<String, Map<String, Object>> delete(Map<String, Map<String, Object>> map, String path) {
if (map == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import com.google.common.collect.Sets;
import com.weibo.rill.flow.interfaces.model.task.TaskInfo;
import com.weibo.rill.flow.interfaces.model.task.TaskStatus;
import com.weibo.rill.flow.olympicene.core.model.task.ExecutionResult;
import com.weibo.rill.flow.olympicene.core.model.task.TaskCategory;
import com.weibo.rill.flow.olympicene.core.runtime.DAGContextStorage;
import com.weibo.rill.flow.olympicene.core.runtime.DAGInfoStorage;
import com.weibo.rill.flow.olympicene.core.runtime.DAGStorageProcedure;
import com.weibo.rill.flow.olympicene.core.model.task.ExecutionResult;
import com.weibo.rill.flow.olympicene.core.switcher.SwitcherManager;
import com.weibo.rill.flow.olympicene.traversal.helper.ContextHelper;
import com.weibo.rill.flow.olympicene.traversal.mappings.InputOutputMapping;
Expand Down Expand Up @@ -60,7 +60,7 @@ protected ExecutionResult doRun(String executionId, TaskInfo taskInfo, Map<Strin
log.info("pass task begin to run executionId:{}, taskInfoName:{}", executionId, taskInfo.getName());
if (CollectionUtils.isNotEmpty(taskInfo.getTask().getOutputMappings())) {
Map<String, Object> context = ContextHelper.getInstance().getContext(dagContextStorage, executionId, taskInfo);
outputMappings(context, new HashMap<>(), new HashMap<>(), taskInfo.getTask().getOutputMappings());
outputMappings(context, new HashMap<>(), input, taskInfo.getTask().getOutputMappings());
saveContext(executionId, context, Sets.newHashSet(taskInfo));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,44 @@ class JSONPathTest extends Specification {
['input': ['type': 'gif']] | _
}

def "set value when there is an array in the path"() {
given:
String path = "\$.input.meta.user[1].id"
long value = 1

when:
mapping.setValue(map, value, path)

then:
mapping.getValue(map, path) == value

where:
map | _
[:] | _
['context': 123] | _
['input': ['type': 'gif']] | _
['input': ['meta': ['user': [['id': 1]]]]] | _
}

def "set value when the last part of the path is an array"() {
given:
String path = "\$.input.meta.user[1]"
long value = 1

when:
mapping.setValue(map, value, path)

then:
mapping.getValue(map, path) == value

where:
map | _
[:] | _
['context': 123] | _
['input': ['type': 'gif']] | _
['input': ['meta': ['user': [['id': 1]]]]] | _
}

def "set value intermediate route test contains dot"() {
given:
String path = "\$.input.meta.user.[\"id.key\"]"
Expand Down
Loading

0 comments on commit d52996e

Please sign in to comment.