Skip to content

Commit

Permalink
Task template (#34)
Browse files Browse the repository at this point in the history
* add task template

* add unit tests

---------

Co-authored-by: zeyu10 <[email protected]>
  • Loading branch information
techloghub and zeyu10 authored Feb 26, 2024
1 parent 0007151 commit e257c7e
Show file tree
Hide file tree
Showing 57 changed files with 2,512 additions and 47 deletions.
39 changes: 39 additions & 0 deletions docker/init.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2021-2023 Weibo, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

CREATE DATABASE IF NOT EXISTS rill_flow;
USE rill_flow;
CREATE TABLE IF NOT EXISTS `task_template` (
`id` bigint NOT NULL AUTO_INCREMENT,
`name` varchar(64) NOT NULL DEFAULT '' COMMENT 'template name',
`type` tinyint(1) NOT NULL DEFAULT 0 COMMENT 'template type: 0. function,1. plugin,2. logic',
`category` varchar(64) NOT NULL DEFAULT '' COMMENT 'template category: function, foreach, etc.',
`icon` TEXT NOT NULL COMMENT 'icon base64 string',
`task_yaml` TEXT NOT NULL COMMENT 'default task yaml configurations in dag',
`schema` TEXT NOT NULL COMMENT 'json schema for input',
`output` TEXT NOT NULL COMMENT 'json schema to describe the output of the task',
`enable` tinyint(1) NOT NULL DEFAULT 1 COMMENT 'whether it is enabled: 0. disabled, 1. enabled',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time of this record',
`update_time` datetime NOT NULL DEFAULT '1970-01-01 08:00:00' COMMENT 'newly update time of this record',
PRIMARY KEY (`id`),
UNIQUE KEY `name` (`name`),
KEY `idx_type_category` (`type`, `category`),
KEY `idx_update_time` (`update_time`),
KEY `idx_create_time` (`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COMMENT='Rill Flow task template table';

grant all on *.* to 'root'@'%' identified by 'secret';
flush privileges;
1 change: 1 addition & 0 deletions lombok.config
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
lombok.addLombokGeneratedAnnotation = true
34 changes: 34 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
<module>rill-flow-interfaces</module>
<module>rill-flow-plugins</module>
<module>rill-flow-trigger</module>
<module>rill-flow-task-template</module>
</modules>

<properties>
Expand Down Expand Up @@ -106,6 +107,11 @@
<artifactId>rill-flow-service</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.weibo</groupId>
<artifactId>rill-flow-task-template</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.weibo</groupId>
<artifactId>rill-flow-impl</artifactId>
Expand Down Expand Up @@ -234,6 +240,24 @@
</dependency>
<!--spring end -->

<!--mybatis start-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>6.0.6</version>
</dependency>
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis-spring</artifactId>
<version>2.0.6</version>
</dependency>
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>3.5.7</version>
</dependency>
<!--mybatis end-->

<!--swagger start-->
<dependency>
<groupId>io.springfox</groupId>
Expand Down Expand Up @@ -567,6 +591,16 @@
<artifactId>pf4j</artifactId>
<version>3.10.0</version>
</dependency>
<dependency>
<groupId>com.mchange</groupId>
<artifactId>c3p0</artifactId>
<version>0.9.5.5</version>
</dependency>
<dependency>
<groupId>c3p0</groupId>
<artifactId>c3p0</artifactId>
<version>${c3p0.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class DAG {
private Map<String, String> defaultContext;
@Setter
private Map<String, List<Mapping>> commonMapping;
private String inputSchema;

@JsonCreator
public DAG(@JsonProperty("workspace") String workspace,
Expand All @@ -60,7 +61,8 @@ public DAG(@JsonProperty("workspace") String workspace,
@JsonProperty("defaultContext") Map<String, String> defaultContext,
@JsonProperty("commonMapping") Map<String, List<Mapping>> commonMapping,
@JsonProperty("namespace") String namespace,
@JsonProperty("service") String service) {
@JsonProperty("service") String service,
@JsonProperty("inputSchema") String inputSchema) {
this.workspace = StringUtils.isBlank(workspace) ? namespace : workspace;
this.dagName = StringUtils.isBlank(dagName) ? service : dagName;
this.version = version;
Expand All @@ -71,5 +73,6 @@ public DAG(@JsonProperty("workspace") String workspace,
this.callbackConfig = callbackConfig;
this.defaultContext = defaultContext;
this.commonMapping = commonMapping;
this.inputSchema = inputSchema;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ public ChoiceTask(
@JsonProperty("timeline") Timeline timeline,
@JsonProperty("isKeyCallback") boolean isKeyCallback,
@JsonProperty("keyExp") String keyExp,
@JsonProperty("parameters") Map<String, Object> parameters) {
super(name, category, next, false, inputMappings, outputMappings, progress, degrade, timeline, isKeyCallback, keyExp, parameters);
@JsonProperty("parameters") Map<String, Object> parameters,
@JsonProperty("templateId") String templateId) {
super(name, category, next, false, inputMappings, outputMappings, progress, degrade, timeline, isKeyCallback, keyExp, parameters, templateId);
this.choices = choices;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ public ForeachTask(
@JsonProperty("timeline") Timeline timeline,
@JsonProperty("isKeyCallback") boolean isKeyCallback,
@JsonProperty("keyExp") String keyExp,
@JsonProperty("parameters") Map<String, Object> parameters) {
super(name, category, next, false, inputMappings, outputMappings, progress, degrade, timeline, isKeyCallback, keyExp, parameters);
@JsonProperty("parameters") Map<String, Object> parameters,
@JsonProperty("templateId") String templateId) {
super(name, category, next, false, inputMappings, outputMappings, progress, degrade, timeline, isKeyCallback, keyExp, parameters, templateId);
this.synchronization = synchronization;
this.iterationMapping = iterationMapping;
this.tasks = tasks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ public PassTask(@JsonProperty("name") String name,
@JsonProperty("timeline") Timeline timeline,
@JsonProperty("isKeyCallback") boolean isKeyCallback,
@JsonProperty("keyExp") String keyExp,
@JsonProperty("parameters") Map<String, Object> parameters) {
super(name, category, next, false, inputMappings, outputMappings, progress, degrade, timeline, isKeyCallback, keyExp, parameters);
@JsonProperty("parameters") Map<String, Object> parameters,
@JsonProperty("templateId") String templateId) {
super(name, category, next, false, inputMappings, outputMappings, progress, degrade, timeline, isKeyCallback, keyExp, parameters, templateId);
Optional.ofNullable(timeline).ifPresent(it -> it.setTimeoutInSeconds(null));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ public ReturnTask(@JsonProperty("name") String name,
@JsonProperty("timeline") Timeline timeline,
@JsonProperty("isKeyCallback") boolean isKeyCallback,
@JsonProperty("keyExp") String keyExp,
@JsonProperty("parameters") Map<String, Object> parameters) {
super(name, category, next, false, inputMappings, outputMappings, progress, degrade, timeline, isKeyCallback, keyExp, parameters);
@JsonProperty("parameters") Map<String, Object> parameters,
@JsonProperty("templateId") String templateId) {
super(name, category, next, false, inputMappings, outputMappings, progress, degrade, timeline, isKeyCallback, keyExp, parameters, templateId);
Optional.ofNullable(timeline).ifPresent(it -> it.setTimeoutInSeconds(null));
this.conditions = conditions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ public SuspenseTask(@JsonProperty("name") String name,
@JsonProperty("timeline") Timeline timeline,
@JsonProperty("isKeyCallback") boolean isKeyCallback,
@JsonProperty("keyExp") String keyExp,
@JsonProperty("parameters") Map<String, Object> parameters) {
super(name, category, next, tolerance, inputMappings, outputMappings, progress, degrade, timeline, isKeyCallback, keyExp, parameters);
@JsonProperty("parameters") Map<String, Object> parameters,
@JsonProperty("templateId") String templateId) {
super(name, category, next, tolerance, inputMappings, outputMappings, progress, degrade, timeline, isKeyCallback, keyExp, parameters, templateId);
this.conditions = conditions;
this.interruptions = interruptions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package com.weibo.rill.flow.olympicene.core.model.task;


import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
import lombok.AllArgsConstructor;
import lombok.Getter;

Expand Down Expand Up @@ -54,19 +52,18 @@ public enum TaskCategory {
*/
private final int type;

@JsonValue
public String getValue() {
return value;
}

@JsonCreator
public static TaskCategory forValues(String value) {
for (TaskCategory item : TaskCategory.values()) {
if (item.value.equals(value)) {
return item;
}
public static TaskCategory getEnumByValue(String category) {
if (category == null) {
return null;
}

return null;
return switch (category) {
case "function" -> FUNCTION;
case "choice" -> CHOICE;
case "foreach" -> FOREACH;
case "suspense" -> SUSPENSE;
case "pass" -> PASS;
case "return" -> RETURN;
default -> null;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class DAGWalkHelperTest extends Specification {

def "task status of parent task should be correct"() {
given:
ForeachTask baseTask = new ForeachTask('base_1', TaskCategory.FOREACH.getValue(), null, null, null, null, null, null, null, null, null, false, null, null)
ForeachTask baseTask = new ForeachTask('base_1', TaskCategory.FOREACH.getValue(), null, null, null, null, null, null, null, null, null, false, null, null, null)

TaskInfo parentTask = new TaskInfo(name: 'parent',
task: baseTask,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright 2021-2023 Weibo, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.weibo.rill.flow.olympicene.core.model.task


import spock.lang.Specification

/***
* test for this class:
package com.weibo.rill.flow.olympicene.core.model.task;
import lombok.AllArgsConstructor;
import lombok.Getter;
@AllArgsConstructor
@Getter
public enum TaskCategory {
// 调用函数服务的Task
FUNCTION("function", 0),
// 流程控制Task,执行分支语句
CHOICE("choice", 1),
// 流程控制Task,执行循环语句
FOREACH("foreach", 1),
// 本身无处理逻辑,等待外部通知,然后执行 output 更新数据, 兼容olympiadane1.0
SUSPENSE("suspense", 2),
// 空 task
PASS("pass", 2),
// return task
RETURN("return", 2),
;
private final String value;
private final int type;
public static com.weibo.rill.flow.olympicene.core.model.task.TaskCategory getEnumByValue(String category) {
if (category == null) {
return null;
}
return switch (category) {
case "function" -> FUNCTION;
case "choice" -> CHOICE;
case "foreach" -> FOREACH;
case "suspense" -> SUSPENSE;
case "pass" -> PASS;
case "return" -> RETURN;
default -> null;
};
}
}
*/
class TaskCategoryTest extends Specification {
/**
* test enum
* @return
*/
def "test getEnumByValue"() {
when:
TaskCategory taskCategory = TaskCategory.getEnumByValue(category)
then:
taskCategory == expected
where:
category | expected
null | null
"choice" | TaskCategory.CHOICE
"foreach" | TaskCategory.FOREACH
"function" | TaskCategory.FUNCTION
"pass" | TaskCategory.PASS
"suspense" | TaskCategory.SUSPENSE
"return" | TaskCategory.RETURN
"break" | null
}

def "test getType"() {
when:
var category = given
then:
category.getType() == expected
where:
given | expected
TaskCategory.FUNCTION | 0
TaskCategory.FOREACH | 1
TaskCategory.CHOICE | 1
TaskCategory.SUSPENSE | 2
TaskCategory.RETURN | 2
TaskCategory.PASS | 2
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import java.nio.charset.StandardCharsets;


public class RedisScriptLoader {
public class ResourceLoader {
private ResourceLoader() {}
public static String loadResourceAsText(String name) throws IOException {
ClassLoader loader = Thread.currentThread().getContextClassLoader();
InputStream inputStream = loader.getResourceAsStream(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import com.weibo.rill.flow.olympicene.storage.redis.api.RedisClient;
import com.weibo.rill.flow.olympicene.storage.redis.lock.Locker;
import com.weibo.rill.flow.olympicene.storage.redis.lock.RedisScriptLoader;
import com.weibo.rill.flow.olympicene.storage.redis.lock.ResourceLoader;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -32,8 +32,8 @@ public class RedisDistributedLocker implements Locker {
private static final String REDIS_UNLOCK;
static {
try {
REDIS_LOCK = RedisScriptLoader.loadResourceAsText("lua/redis_lock.lua");
REDIS_UNLOCK = RedisScriptLoader.loadResourceAsText("lua/redis_unlock.lua");
REDIS_LOCK = ResourceLoader.loadResourceAsText("lua/redis_lock.lua");
REDIS_UNLOCK = ResourceLoader.loadResourceAsText("lua/redis_unlock.lua");
} catch (IOException e) {
throw new RuntimeException("load script fails", e.getCause());
}
Expand Down
Loading

0 comments on commit e257c7e

Please sign in to comment.