diff --git a/.idea/workspace.xml b/.idea/workspace.xml index 8804125..9005b0d 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -5,9 +5,47 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - + @@ -111,7 +149,16 @@ - + + + + + + + + + + diff --git a/docs/example/dataflow/_category_.json b/docs/example/dataflow/_category_.json new file mode 100644 index 0000000..8a25aa4 --- /dev/null +++ b/docs/example/dataflow/_category_.json @@ -0,0 +1,4 @@ +{ + "label": "利用TIS实现T+1离线分析", + "position": 3 +} diff --git a/docs/guide/dataflow/add-joiner.png b/docs/example/dataflow/add-joiner.png similarity index 100% rename from docs/guide/dataflow/add-joiner.png rename to docs/example/dataflow/add-joiner.png diff --git a/docs/guide/dataflow/add-table.png b/docs/example/dataflow/add-table.png similarity index 100% rename from docs/guide/dataflow/add-table.png rename to docs/example/dataflow/add-table.png diff --git a/docs/guide/dataflow/define-entity-detail.png b/docs/example/dataflow/define-entity-detail.png similarity index 100% rename from docs/guide/dataflow/define-entity-detail.png rename to docs/example/dataflow/define-entity-detail.png diff --git a/docs/guide/dataflow/define-er-confirm.png b/docs/example/dataflow/define-er-confirm.png similarity index 100% rename from docs/guide/dataflow/define-er-confirm.png rename to docs/example/dataflow/define-er-confirm.png diff --git a/docs/guide/dataflow/dry-run.png b/docs/example/dataflow/dry-run.png similarity index 100% rename from docs/guide/dataflow/dry-run.png rename to docs/example/dataflow/dry-run.png diff --git a/docs/guide/dataflow/employees_db_er.png b/docs/example/dataflow/employees_db_er.png similarity index 100% rename from docs/guide/dataflow/employees_db_er.png rename to docs/example/dataflow/employees_db_er.png diff --git a/docs/guide/dataflow/entry.png b/docs/example/dataflow/entry.png similarity index 100% rename from docs/guide/dataflow/entry.png rename to docs/example/dataflow/entry.png diff --git a/docs/guide/dataflow/exec-dataflow.png b/docs/example/dataflow/exec-dataflow.png similarity index 100% rename from docs/guide/dataflow/exec-dataflow.png rename to docs/example/dataflow/exec-dataflow.png diff --git a/docs/guide/dataflow/index.mdx b/docs/example/dataflow/index.mdx similarity index 75% rename from docs/guide/dataflow/index.mdx rename to docs/example/dataflow/index.mdx index 6276cfb..7f25650 100644 --- a/docs/guide/dataflow/index.mdx +++ b/docs/example/dataflow/index.mdx @@ -1,22 +1,47 @@ --- -title: 数据流管理 +title: 利用TIS实现T+1离线分析 date: 2020-07-26 --- import Link from '/src/components/Link'; import Figure from '/src/components/Figure'; +import BZhan from '/src/components/BZhan'; ## 功能说明 通过`数据流管理`可以方便实现离线批量数据分析,借助传统Map/Reduce工具(Aliyun ODPS,Hive,Spark等)将抽取得到的数据快速进行离线数据分析,得到所需要的结果。 - 离线分析结果可以作为下游实时数仓(Doris,StarRocks,ElasticSearch)的数据源,例如StarRocks作为一款非常优秀的OLAP引擎,内部有强大的多表Join查询能力,如想进一步提高OLAP数据分析能力,可以利用`TIS数据流管理`模块在引擎外部就构建多表关联物化视图 + 虽然,现在业界实时数仓构建非常火热,但是基于传统MapReduce工具为代表的离线分析工具依然不可或缺,在很多业务场景中发挥着重要作用。他的特点是架构简单、可靠,可以执行复杂的T+1报表分析业务。 + + 另外,在实时数仓建设领域,可以有效弥补实时数仓的技术短板。离线分析结果可以作为下游实时数仓(Doris,StarRocks,ElasticSearch)的数据源,例如StarRocks作为一款非常优秀的OLAP引擎,内部有强大的多表Join查询能力,如想进一步提高OLAP数据分析能力,可以利用`TIS数据流管理`模块在引擎外部就构建多表关联物化视图 ,将物化视图作为数据源导入到StarRocks中,这样免去了StarRocks引擎内部执行多表Join操作。 - 目前,`TIS数据流管理`支持三种离线分析引擎,1.Spark 2. Hive 3. AliyunODPS + 一套完整的离线分析引擎需要有三部分, + 1. `离线分析引擎` + 目前,`TIS数据流管理`支持三种`离线分析引擎`, + 1. Spark + 2. Hive + 3. [AliyunODPS](https://maxcompute.console.aliyun.com/) + + 2. 任务调度引擎 + + 因为一个大离线分析任务往往有N个子任务工程,且子任务与子任务之间是有上下游依赖关系(每个子任务的触发需要依赖它的上游任务完成),这些子任务之间构成了DAG图,业界有专门工具 , + 比较成熟的调度工具有 [Apache dolphinscheduler](https://dolphinscheduler.apache.org/) , [Apache Airflow](https://airflow.apache.org/),[PowerJob](www.powerjob.tech/) + + 3. 数据采集工具 + + 数据采集流程是,离线数据分析的前置步骤,现在业界流行的工具有,[Apache SeaTunnel](https://seatunnel.apache.org/),阿里巴巴开源工具[DataX](https://github.com/alibaba/DataX),[Airbyte](https://airbyte.com/) + +
+ +>TIS 系统内部集成了以上三部分,选取业界最优秀的工具组件无缝集成,为用户提供一站式、开箱即用的离线分析平台工具。 ## 使用说明 + + + + ### 案例介绍 假设需要基于关系型数据库来执行离线分析,本案例提供了一个[Demo数据库](https://gitee.com/qlangtech/test_db),有关简单员工(Employees)CRM管理系统的数据库,库中各表ER关系如下: @@ -81,6 +106,7 @@ AS ### TIS中操作流程 + |说明|图示| |--|--| | 初始化[Employees数据库](https://gitee.com/qlangtech/test_db),并且在TIS中注册employees数据源,以备后续流程中使用|
| @@ -88,7 +114,7 @@ AS | 设置离线分析实例名称|
| | 设置离线引擎类型,本例中选择Aliyun ODPS 作为分析引擎(需要先到Aliyun上申请[ODPS服务](https://maxcompute.console.aliyun.com/)) |
| | 设置ODPS引擎相关配置参数 |
| -|1. 点击左侧控件栏中的`数据库表`到托盘中
2.自动打开数据表选择输入框
3.`数据库表`下拉选择框中选择需要导入的表(`employees`,`salaries`,`dept_emp`,`departments`),需要在 前一步 中已经定义完成 |
| +|1. 点击左侧控件栏中的`数据库表`到托盘中
2.自动打开数据表选择输入框
3.`数据库表`下拉选择框中选择需要导入的表(`employees`,`salaries`,`dept_emp`,`departments`),需要在 前一步 中已经定义完成 |
| |1.点击左侧控件栏中的`JOIN`到托盘中
2.设置`名称`JOIN节点名称,选择`依赖节点`,在`SQL`中填写数据处理脚本
3.点击保存 |
| | 点击`保存`按钮,对新添加的数据流规则进行保存 |
| | 保存成功,会跳出确认对话框,询问**是否需要定义表ER关系**
ER关系规则在增量执行流程中使用,如果后期不需要开启增量同步管道,则可以跳过这一步 |
| @@ -103,3 +129,7 @@ AS |在正式开始执行数据分析之前,为保证编写的脚本正确,可以先用**DryRun**模式运行已经完成的dataflow |
| | 以**DryRun**模式运行成功之后,回到列表页面`/offline/wf`,点击右图所示`构建`按钮,开始执行数据流构建 |
| | 触发构建流程成功,就可以跳转到 `/offline/wf/build_history/1/6` 数据流构建执行状态查看页面,在该页面中可以查看构建流程中各种状态信息
如全流程可顺利执行完成说明数据流定义没有错误 |
| + +### 视频教程 + + diff --git a/docs/guide/dataflow/register_employee_db.png b/docs/example/dataflow/register_employee_db.png similarity index 100% rename from docs/guide/dataflow/register_employee_db.png rename to docs/example/dataflow/register_employee_db.png diff --git a/docs/guide/dataflow/save-dataflow.png b/docs/example/dataflow/save-dataflow.png similarity index 100% rename from docs/guide/dataflow/save-dataflow.png rename to docs/example/dataflow/save-dataflow.png diff --git a/docs/guide/dataflow/save-er.png b/docs/example/dataflow/save-er.png similarity index 100% rename from docs/guide/dataflow/save-er.png rename to docs/example/dataflow/save-er.png diff --git a/docs/guide/dataflow/set_parser_engine_props.png b/docs/example/dataflow/set_parser_engine_props.png similarity index 100% rename from docs/guide/dataflow/set_parser_engine_props.png rename to docs/example/dataflow/set_parser_engine_props.png diff --git a/docs/guide/dataflow/set_parser_engine_type.png b/docs/example/dataflow/set_parser_engine_type.png similarity index 100% rename from docs/guide/dataflow/set_parser_engine_type.png rename to docs/example/dataflow/set_parser_engine_type.png diff --git a/docs/guide/dataflow/set_parser_instance_name.png b/docs/example/dataflow/set_parser_instance_name.png similarity index 100% rename from docs/guide/dataflow/set_parser_instance_name.png rename to docs/example/dataflow/set_parser_instance_name.png diff --git a/docs/example/dataflow/t-plus1-offline-dag.png b/docs/example/dataflow/t-plus1-offline-dag.png new file mode 100644 index 0000000..c91f3f4 Binary files /dev/null and b/docs/example/dataflow/t-plus1-offline-dag.png differ diff --git a/docs/guide/dataflow/view-dataflow-build-status.png b/docs/example/dataflow/view-dataflow-build-status.png similarity index 100% rename from docs/guide/dataflow/view-dataflow-build-status.png rename to docs/example/dataflow/view-dataflow-build-status.png diff --git a/docs/example/mysql-sync-doris/datasource-impl.png b/docs/example/mysql-sync-doris/datasource-impl.png new file mode 100644 index 0000000..2164b28 Binary files /dev/null and b/docs/example/mysql-sync-doris/datasource-impl.png differ diff --git a/docs/example/mysql-sync-doris/datasource-plugin-install.png b/docs/example/mysql-sync-doris/datasource-plugin-install.png new file mode 100644 index 0000000..2d3ec7e Binary files /dev/null and b/docs/example/mysql-sync-doris/datasource-plugin-install.png differ diff --git a/docs/example/mysql-sync-doris/index.mdx b/docs/example/mysql-sync-doris/index.mdx new file mode 100644 index 0000000..2f2f108 --- /dev/null +++ b/docs/example/mysql-sync-doris/index.mdx @@ -0,0 +1,152 @@ +--- +title: 多源同步Doris方案介绍 +date: 2023-05-25 +--- +import Link from '/src/components/Link'; +import Figure from '/src/components/Figure'; +import BZhan from '/src/components/BZhan'; + +## 本文导读 + +Apache Doris 是新一代MPP架构的高性能、实时的分析型数据库。作为一款可靠的OLAP引擎产品,企业内部会将OLTP数据库与其进行整合,通过从OLTP数据库(例如:MySQL,SqlServer)同步数据到Doris的方式,为线上业务团队提供业务支持,例如:用户行为分析、AB 实验平台、日志检索分析、用户画像分析、订单分析等应用。 + +随着这种模式越来越流行,企业内部需要构建大量端到端的数据通道,有的是批量同步构建T+1报表用,有的需要实时同步构建,有的还需要在同步过程对数据作一些处理,比如:数据脱敏,格式变化等等。这些工作,耗费了数据开发工程师大量时间。TIS为了将数据开发工程师从琐碎、重复、繁重的数据通道构建工作中解放出来,在大数据端到端的数据整合方面进行了有益的探索及尝试。 + +本文就据源中最常用到的,MySQL同步Doris作为案例,向大家讲解TIS原理,功能特性、优势。希望通过此文大家能够对TIS有一个初步的了解。 + + +## TIS 介绍 + +TIS 实现了多数据源端到端的数据同步,使用批量和实时增量的方式。TIS经过多年精心打造,专注用户侧使用体验,在操作界面化、流程化上下了不少功夫。 + +TIS有别于传统大数据ETL工具,它借鉴了DataOps、DataPipeline理念,对大数据ETL各个执行流程建模,将传统的ETL工具执行以黑屏化工具包的方式(json+命令行执行)升级到了白屏化2.0的产品化方式(系统借助底层的MetaData自动生成脚本,用户只需轻点鼠标,借助系统给出的提示快速录入配置),从而大大提高了工作效率。 + +所谓白屏化1.0,是系统虽然也是基于UI界面的,但是交互方式基本上是给一个大大的TextArea,里面填写的Json、XML、Yaml需要用户自己发挥了,这对用户来说执行效率还是太低了,我们暂且称这交互方式的系统为DevOps系统,TIS已经跨越到了白屏化2.0的DataOps系统了。 + +
+ +正因以上这些特性,TIS的使用者不仅只限于大数据技术开发工程师,而且,对数据分析师也非常友好,只需专注业务流程而不需要过多关注底层技术细节,就能轻松玩转大数据。 + + +TIS 在构建过程中参考了Jenkins 的架构设计,在以下几个方面做尝试: + +### 插件化封装 + +TIS 参考了Jenkins 实现方式,通过对业务模型建模,按照 OCP 开发准则,在TIS 概念抽象层定义了一套标准的面向大数据, 数据整合的扩展点 ,并且基于这些扩展点进行扩展,例如,针对目标端 Doris,TIS 现已基于数据源扩展点DataSourceFactory 进行扩展,实现了多个业务扩展,如下图: + +
+ +在构建数据通道过程中,用户可以选择任何一种数据类型作为Doris的数据源。使用方式也很简单,在TIS界面有一个类似安卓、苹果AppStore的的插件池列表,根据列表中的插件功能介绍,选择安装即可生效并使用。 + + +
+ + +### 界面化处理 + +TIS 在内部构件了一套强大的 UI-DSL系统,利用 Java 领域模型在运行时动态渲染生成所有前端 html javascript 脚本,好处是:TIS 使用体验统一 ,另外对于 TIS 的开源贡献者来说友好,开发者一般都是后端开发者不擅长前端开发,而 TIS 只需要用户变现基于 Java 的领域模型即可。 + +### 使用更简单 + +TIS 屏蔽底层组件的配置复杂度,使用大量开源社区的组件,例如:Flink-CDC,Chunjun,DataX,Doris-connecto,Elastic-Connector,kafka-connector 等,这些开源组件每个单独都是可以使用的,基本都是通过代码,Json、XML配置的方式来驱动执行的,因此,需要重复配置大量参数。 + +TIS通过元数据反射,采用大量自动化生成配置的方式,加上,预设参数免去了用户手动配置参数,例如:Doris Stream load 数据导入流程需要选择,传输文本以 CSV 或者 JSON 方式传输,TIS 预设 JSON 作为文本传输格式,因为在用户视角来看,使用何种传输格式他并不关心,而关心的是,是否可以顺畅地将数据源中的数据传输到 Doris 数据库中,TIS 在实施过程中发现,JSON 作为传输文本最可靠,所以就使用 JSON 格式作为传输文本。 + +TIS 充当了厨师的角色,有各种新鲜的食材,利用厨师厨艺焕发出食材的美味,各位食客只需坐等厨师烹饪完毕,享用美味即可。 + + +## 案例演示 + + +以上说了这么特性还是不够形象,接下来我们通过MySQL同步Doris的具体案例向大家进一步说明 + + +### STEP.1 安装TIS + +安装部署相关具体步骤请查看官网安装教程,下面给出项目安装相关演示视频: + + + +### STEP.2 准备数据 + + +还需要准备好源端数据库 MySQL(版本为5.7.1)和目标端 Doris 库 + +* Doris库准备 + + 本案例使用 Doris 最新稳定版本 1.2.4.1 [Doris部署说明](https://doris.apache.org/docs/dev/install/standard-deployment) 作测试。 + +* MySQL库准备 + + MySQL数据库中需要初始化数据表,本案例可使用的是开源[MySQL基准测试库](https://gitee.com/qlangtech/test_db) 按照介绍自行初始化数据库。 + +### STEP.3 构建批量数据管道 + +所谓批量数据管道就是原中的历史记录通过TIS底层依赖的DataX组件一次性全部倒入,批量同步的优点是,架构实现简单,吞吐量大,具体操作步骤可观看以下演示视频 +以上视频中已经实现了针对Doris的添加、更新、删除同步操作,尤其是更新和删除操作Doris有自己独特的实现方式: + + + +* 删除操作 + + 借助Doris Stream Load方式[1] 的merge_type为Merge的操作来对需要删除的记录进行标记删除 + +* 更新操作 + + 借助 Doris 的 Sequnce 机制[2],在 Source 端乱序到达的情况下,保证 Doris库中的记录不会有脏写的情况发生。在设置表信息,需要设置表 Sequence 列属性,在表属性页面提供下拉列表,其中的值为该表中所有整型数字类型和日期类型(date、datetime)类型的字段,供用户选择,当设定某列为 Sequence 列,TIS 自动生成的 DDL 语句,会自动在 PROPERTIES 中添加以下两个属性: + ```sql + CREATE TABLE `test` ( + id BIGINT + ) + UNIQUE KEY(`totalpay_id`) + PROPERTIES("... + , "function_column.sequence_col" = 'last_ver' + , "function_column.sequence_type"='BIGINT' ) + ``` + Doris 内部会在新建表中添加一个隐藏列:__DORIS_SEQUENCE_COL__, 每次更新时,会在stream load的提交请求的http head中添加一个字段: + ```sql + http.setHeader( + "function_column.sequence_col", "last_ver"); + ``` + Doris 服务端会将记录的某一列值与之比较,如果大于等于,则会更新该记录并且将__DORIS_SEQUENCE_COL__的最新值置为,否则直接丢弃掉。 + + 以上的设置都是 TIS 内部自动实现的,毋需用户关心,用户只需保证选定的 sequence 列每次记录更新时,该列的值严格自增,不然就会发生 Doris 记录脏写的问题。 + +### STEP.4 构建增量数据管道 + + 生产环境中如有实时 OLAP 查询的需求,可以在前一批量同步的基础之上继续创建增量同步通道实例,来实现 MySQL 到 Doris 的毫秒级近实时同步效果,详细请查看以下视频。 + + + +### STEP.5 执行结果确认 + + TIS 3.7 版本开始,为了方便用户做数据验证操作,已经将 Apache Zeppelin 整合进了 TIS,以下视频演示了如何通过 TIS Notebook 来验证 MySQL 同步 Doris 的增量执行逻辑是否正确 + + + +## 后期规划 + + +通过使用 TIS 之后,使原有的构建数据通道的效率有了较大的提升。希望本文阅读者能够举一反三,利用 TIS 功能丰富的插件实现更多的数据同步案例。 +接下来,TIS 并不会止步于此,还会在以下几方面作出努力: + +### 拥抱 CloudNative + +随着云原生时代到来,云上 PAAS 产品的低成本、快捷、稳定可靠是几大优势,相信未来会有越来越多的企业拥抱云原生。 +图片 +TIS 在这方面也有自己的计划,接下来会将 TIS 全面支持 Docker K8S 部署,可以很方便地部署在各大云厂商的 K8S 环境中,与云厂商提供的 IAAS 层产品打通。支持用户跨云厂商部署,TIS 作为各云厂商 PAAS 产品之上的一个 Facade(门面)适配云厂商底层的 PAAS 层产品,让用户在跨云使用 ETL 工具有一致的用户体验。 + +### 丰富插件生态 + +目前 TIS 支持 MySQL、SqlServer、Oracle 等业界常用的关系数据库作为 Doris 的源端数据类型,后期还要扩展更多的数据类型,例如:TDEngine,AWS S3 及 S3 Glue 等等,让更多的数据类型的数据源能够汇聚到 Doris,发挥 Doris 的威力。 + +另外,TIS 还要开发一系列的支持生态开发者的工具,让开发者更顺畅地利用 TIS 来扩展 TIS 的 Extned Point。 + +### 扩展功能边界 + +读者已经发现,以上介绍的案例还只是实现了端到端的数据同步,只是实现了 ETL的 Extra(数据抽取)和Load(数据加载)两个环节,TIS 新测试版本中已经引入了一个 Transfer(数据处理)模块,用户在使用过程中,可以很方便地切换数据处理引擎(目前支持 Hive,Spark和 AliyunODPS )[3],利用 TIS 内置的调度引擎完成离线 DAG 任务调度,这样 Doris 的用户可以很方便地在 TIS 中构建一个离线分析计算宽表,待计算完成,再将计算结果导入到 Doris 集群中,可以进一步提升 OLAP 查询分析性能。 + +## 结语 + +TIS 最终的目标是打造一款人人都会用的数据集成产品,让用户可以在 TIS 之上,一站式、开箱即用地打造属于自己的ETL数据仓库。 + diff --git a/docs/example/mysql-sync-doris/tis-doris-architect.png b/docs/example/mysql-sync-doris/tis-doris-architect.png new file mode 100644 index 0000000..9917c79 Binary files /dev/null and b/docs/example/mysql-sync-doris/tis-doris-architect.png differ diff --git a/docs/example/mysql-sync-kafka/add-flink-cluster-4-confirm-streamcode.png b/docs/example/mysql-sync-kafka/add-flink-cluster-4-confirm-streamcode.png new file mode 100644 index 0000000..890caed Binary files /dev/null and b/docs/example/mysql-sync-kafka/add-flink-cluster-4-confirm-streamcode.png differ diff --git a/docs/example/mysql-sync-kafka/add-flink-cluster-success.png b/docs/example/mysql-sync-kafka/add-flink-cluster-success.png new file mode 100644 index 0000000..4a24315 Binary files /dev/null and b/docs/example/mysql-sync-kafka/add-flink-cluster-success.png differ diff --git a/docs/example/mysql-sync-kafka/add-plugins.png b/docs/example/mysql-sync-kafka/add-plugins.png new file mode 100644 index 0000000..ea864ca Binary files /dev/null and b/docs/example/mysql-sync-kafka/add-plugins.png differ diff --git a/docs/example/mysql-sync-kafka/add-step-1.png b/docs/example/mysql-sync-kafka/add-step-1.png new file mode 100644 index 0000000..b8fc1c0 Binary files /dev/null and b/docs/example/mysql-sync-kafka/add-step-1.png differ diff --git a/docs/example/mysql-sync-kafka/close-plugins-panel.png b/docs/example/mysql-sync-kafka/close-plugins-panel.png new file mode 100644 index 0000000..dc94e9b Binary files /dev/null and b/docs/example/mysql-sync-kafka/close-plugins-panel.png differ diff --git a/docs/example/mysql-sync-kafka/confirm-create.png b/docs/example/mysql-sync-kafka/confirm-create.png new file mode 100644 index 0000000..7213fef Binary files /dev/null and b/docs/example/mysql-sync-kafka/confirm-create.png differ diff --git a/docs/example/mysql-sync-kafka/install-sink-kafka-plugin.png b/docs/example/mysql-sync-kafka/install-sink-kafka-plugin.png new file mode 100644 index 0000000..493374c Binary files /dev/null and b/docs/example/mysql-sync-kafka/install-sink-kafka-plugin.png differ diff --git a/docs/example/mysql-sync-kafka/mysql-sync-kafka.png b/docs/example/mysql-sync-kafka/mysql-sync-kafka.png new file mode 100644 index 0000000..abdb20c Binary files /dev/null and b/docs/example/mysql-sync-kafka/mysql-sync-kafka.png differ diff --git a/docs/example/mysql-sync-kafka/plugin-type-chose.png b/docs/example/mysql-sync-kafka/plugin-type-chose.png new file mode 100644 index 0000000..fb74a31 Binary files /dev/null and b/docs/example/mysql-sync-kafka/plugin-type-chose.png differ diff --git a/docs/example/mysql-sync-kafka/set-chosen-tab.png b/docs/example/mysql-sync-kafka/set-chosen-tab.png new file mode 100644 index 0000000..be5041b Binary files /dev/null and b/docs/example/mysql-sync-kafka/set-chosen-tab.png differ diff --git a/docs/example/mysql-sync-kafka/set-kafka-writer.png b/docs/example/mysql-sync-kafka/set-kafka-writer.png new file mode 100644 index 0000000..73d7bfa Binary files /dev/null and b/docs/example/mysql-sync-kafka/set-kafka-writer.png differ diff --git a/docs/example/mysql-sync-kafka/set-source-sink.png b/docs/example/mysql-sync-kafka/set-source-sink.png new file mode 100644 index 0000000..980219d Binary files /dev/null and b/docs/example/mysql-sync-kafka/set-source-sink.png differ diff --git a/docs/example/sink-2-kafka.mdx b/docs/example/sink-2-kafka.mdx new file mode 100644 index 0000000..3ad9d54 --- /dev/null +++ b/docs/example/sink-2-kafka.mdx @@ -0,0 +1,146 @@ +--- +title: 将数据变更同步到Kafka +date: 2023-05-06 +type: book +weight: 2 +toc: true +--- + +import Figure from '/src/components/Figure'; +import BZhan from '/src/components/BZhan'; +import Link from '/src/components/Link'; + +## 场景介绍 + +众所周知,[Apache Kafka](https://kafka.apache.org/)的优势有吞吐量大、响应高效等特点。 +正因如此,生产环境中,运维团队经常使用[Apache Kafka](https://kafka.apache.org/)作为消息转发工具来中转数据库变更消息,下游业务团队统一监听Kafka的Topic以做进一步消息处理。 + +这样做的好处是,数据运维团队可以统一控制数据库增量消息订阅出口,对下游业务统一做数据治理授权,另外,由于数据库的增量消息使用统一集中管理,避免了多个客户端同时监听数据库增量消息,从而降低了数据库负载 。 + +
+ +## 化繁为简 + +使用TIS来实现以上需求,整个流程简化成几个步骤,只需要轻点鼠标,不需要做额外过多设置就能将一个数据库中的全部表的变更消息同步到一个Kafka Topic中。 + +## 实现原理 + +TIS实时数据同步管道是基于Flink来实现的,Source端监听数据库增量事件的算子是基于[Flink-CDC](https://ververica.github.io/flink-cdc-connectors/)的,Kafka Sink是基于 [Chunjun](https://dtstack.github.io/chunjun/) Kafka Connector来实现的, + +TIS的优势在于整合,杜绝重复造轮子,致力于将开源社区优秀的框架工具无缝整合在一起提供给用户畅快地使用。 + +TIS为实现该场景需求,为了最大限度方便用户,特地做了如下定制: + +1. Sink端支持 Stream API 和Flink SQL 两种脚本实现方式 + + SQL: 优点逻辑清晰,便于用户自行修改执行逻辑 + Stream API:优点基于系统更底层执行逻辑执行、轻量、高性能 + + 两种方式各有特点,用户可以根据自己需要进行选择 + +2. 支持多种消息格式 + + 在Flink官网中列出了消息中间件传输的[多种消息格式](https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/formats/overview/),种类比较多 + ,TIS选取了最常使用的两种消息格式:`canal-json` 和 `debezium-json` + +3. 在消息体中增加表名称 + + 原生的Flink [flink-formats](https://github.com/qlangtech/flink/commit/9844b3750f01e16d7ab4917b5f507fcd54773700) 模块实现中 + ,没有在消息体上添加被监听的表名属性,因此,用户往往会在Kafka中为数据库中每张表都单独构建一个与之对应的Topic以区分不同的表,这样做极大地浪费了Kafka的资源,并且难以维护。 + + TIS做了优化,只要同一个库的表,就向同一个Kafka Topic中写入,另外在消息体中添加表名,下游消费端就能通过表名属性区别不同的表了。 + + +## 流程实操 + +本示例构建一个MySQL整库表变更消息同步到Kafka某一Topic的例子 + +### 准备工作 + +需要在本地环境中安装好以下几个组件: + +1. Kafka集群 + + 按照 文档 [https://blog.csdn.net/lijiewen2017/article/details/127609875](https://blog.csdn.net/lijiewen2017/article/details/127609875) 中介绍,使用docker容器来简化 + Kafka安装 + +2. TIS控制台 + + TIS安装说明 + +3. TIS Flink组件 + + TIS Flink安装说明 + +4. 准备一个MySQL 数据库实例 + + 准备好一个MySQL5.7 版本的数据库,内有数据库表若干 + +### 操作步骤 + +#### 基本信息配置 +| 说明 | 图示 | +|---------------------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------| +| 当完成 安装 步骤之后,进入TIS操作界面,点击菜单栏中`实例`链接 |
| +| 进入实例列表,点击右侧`添加`下拉按钮中的`数据管道`,进行MySQL端到`Kafka`端的数据同步通道构建 |
| +| 添加流程一共分为5步,第1步添加数据通道的基本信息 |
| +| 进入数据端选择步骤,选择Reader Writer类型选择,由于系统刚安装,数据端类型对应的插件(`tis-datax-kafka-plugin`,`tis-ds-mysql-plugin`)还没有选取,需要点击插件安装`添加`按钮,安装插件 |
| +| 插件安装完毕,将`插件管理`页面关闭 |
| +| Reader端选择`MySQL`,Writer端选择`Kafka`,点击`下一步`按钮,进行MySQL Reader的设置 |
| +| 在Reader设置页面,点击`数据库名`项右侧`配置`下拉框中**MySqlV5** 数据源,完成表单填写,点击`保存`按钮,其他输入项目使用默认值即可,然后再点击`下一步`选取Reader端中需要处理的表 |
| +| 选择需要的表:

点击`设置`按钮,对Kafka目标表设置,设置目标表的**目标列**等属性设置.

点击`保存`按钮,然后点击下一步,进入Kafka Writer表单设置 |
| +| Kafka Writer表单

1. 设置Kafka服务端`Bootstrap Servers`地址及`Topic`配置

2.其他配置项按无特殊需求,按系统默认即可

3.点击进入下一步 |
| +| 确认页面,对上几步流程中录入的信息进行确认。

点击`创建`按钮完成数据流通道定义 |
| + +#### 实时同步启用 + +| 说明 | 图示 | +|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------| +| 接下来,开通实时增量通道

首先需要安装**Flink单机版** 安装说明 |
| +| Flink集群启动之后,在TIS中添加Flink集群对应配置

表单填写完成之后,点击`保存&下一步`按钮进入下一步Sink,Source相关属性设置 |
| +| 在该步骤,首先需要添加Sink端插件 `tis-flink-chunjun-kafka-plugin` |
| +| 添加Flink SourceFunction对应的[flink-connector-mysql-cdc](https://github.com/ververica/flink-cdc-connectors/tree/master/flink-connector-mysql-cdc)插件 和 Fink Sink对应的Kafka插件:

1.选择Kafka消息传输格式 2.选择flink 增量执行脚本 `Flink SQL`/`Stream API`
设置完成之后进入下一步 |
| +| TIS会解析Reader选取的表元数据信息,自动生成Flink Stream Code

在该版本中,自动生成的Flink Stream Code还不支持用户自定义编写业务逻辑

点击`部署`按钮,进入向Flink Cluster中部署流处理逻辑 |
| +| 至此,MySQL与Kafka数据增量通道已经添加完成,MySQL到Kafka实时数据同步可以保证在毫秒级完成 |
+ + +## 测试确认 + +最后我们来验证MySQL库变更同步到Kafka的数据通道执行是否正常 + +首先,在Source端的数据库表上更新几条数据, 执行inert或者update SQL语句,例如: + +```sql +update instancedetail set last_ver=last_ver+1 where instance_id = '1'; +``` + +然后,打开Kafka客户端监听Topic接收到的消息是否正确: + +启动Kafka客户端监听`Test` Topic的消息内容: +```shell +/opt/kafka_2.13-2.8.1/bin/kafka-console-consumer.sh --topic=test --bootstrap-server=192.168.28.201:9092 +``` +会在控制台中接收到如下,消息内容: +```json +{"before":null,"after":{"instance_id":"1", + "order_id":"1", + "batch_msg":"","type":0,"ext":"","waitinginstance_id":"","kind":1, + "parent_id":"","pricemode":1,"name":"美味香酥鸡","makename":"","taste":"", + "spec_detail_name":"","num":1,"account_num":1,"unit":"份","account_unit":"", + "price":32,"member_price":99,"fee":32,"ratio":1E+2,"ratio_fee":32,"ratio_cause":"", + "status":2, + "addition_price":0,"has_addition":0,"seat_id":""}, + "op":"c", + "source":{"table":"instancedetail"}, + "ts_ms":1683779944246} +``` +符合预期 + +## 总结 + +本文档就MySQL同步Kafka作为例子进行说明,整个方案实现基本上达到了开箱即用,只需用户做简单设置就能把同步实例构建起来。 + +其实作为Source和Sink端的类型,在TIS中是可以随意切换的,例如,可以将MySQL换成SQLServer、Oracle等其他连接器,Sink端也可以使用RabbitMQ +,RocketMQ等其他MQ实现,实现流程与MySQL同步Kafka的实现方式基本一致。 + +希望通过本例的说明,起到抛砖引玉,举一反三的效果。还等什么呢? 赶紧下载TIS,在自己本地环境试试吧。 diff --git a/docs/guide/dataflow/_category_.json b/docs/guide/dataflow/_category_.json deleted file mode 100644 index 15e9508..0000000 --- a/docs/guide/dataflow/_category_.json +++ /dev/null @@ -1,4 +0,0 @@ -{ - "label": "数据流管理", - "position": 3 -} diff --git a/docs/guide/datasource/index.mdx b/docs/guide/datasource/index.mdx index 453f77f..51dbaab 100644 --- a/docs/guide/datasource/index.mdx +++ b/docs/guide/datasource/index.mdx @@ -29,7 +29,7 @@ TIS系统可从关系数据源中抽取数据,经过加工之后导入到下 ## 总结 -在下一步 数据流管理 中需要依赖到的 表配置资源,需要在此定义完成 +在下一步 数据流管理 中需要依赖到的 表配置资源,需要在此定义完成 diff --git a/docs/install/flink-cluster/standalone.mdx b/docs/install/flink-cluster/standalone.mdx index e06266a..f9fdd31 100644 --- a/docs/install/flink-cluster/standalone.mdx +++ b/docs/install/flink-cluster/standalone.mdx @@ -123,4 +123,29 @@ Caused by: java.io.IOException: Unable to create directory /opt/data/tis env.java.opts: "-Ddata.dir=/opt/data/tis" ``` +### 3. 执行增量实例Cancel操作出现超时异常 + +#### 象限描述: +本Case在创建增量实例时,使用了Chunjun Sink实现了整库表的同步功能,在手动取消(Cancel)实例过程中抛出以下异常,且一旦出现超时异常,Flink会将TM中剩下的全部slot耗尽,导致不能在重新部署新的Flink Job + +```shell +2023-05-13 16:39:39,532 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Task did not exit gracefully within 180 + seconds. +org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds. + at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1735) [flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1] + at java.lang.Thread.run(Thread.java:748) [?:1.8.0_191] +2023-05-13 16:39:39,536 ERROR org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Fatal error occurred while executing the TaskManager. Shutting it down... +org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 180 + seconds. + at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1735) [flink-dist_2.11-tis-1.13.1.jar:tis-1.13.1] + at java.lang.Thread.run(Thread.java:748) [?:1.8.0_191] +``` + +#### 原因分析: +由于Chujun Sink 在执行关闭时,需要为每张表对应的`StreamLoadManager`执行`close`操作,而该`close`执行又是比较耗时的,而且从日志执行过程来看每个表对应的close是线性排队执行的,所以当通道本身选择的表 +数据量比较多的情况下,会出现超时异常也就不足为奇了。 + +#### 解决办法: + +在`$FLINK_HOME/conf/flink-conf.yaml`配置文件中设置 配置参数 `task.cancellation.timeout`,详细配置请查阅:https://study.sf.163.com/documents/read/service_support/dsc-t-d-0501 + + diff --git a/docusaurus.config.js b/docusaurus.config.js index ddcbd9d..866de38 100644 --- a/docusaurus.config.js +++ b/docusaurus.config.js @@ -33,7 +33,7 @@ const config = { /** @type {import('@docusaurus/preset-classic').Options} */ ({ docs: { - includeCurrentVersion: false, + includeCurrentVersion: true, sidebarPath: require.resolve('./sidebars.js'), // Please change this to your repo. // Remove this to remove the "edit this page" links. diff --git a/src/components/Contact/index.js b/src/components/Contact/index.js index 6edcc92..2fa0bdf 100644 --- a/src/components/Contact/index.js +++ b/src/components/Contact/index.js @@ -13,8 +13,8 @@ export default function Contact() {

联系我们

-

钉钉讨论群

-
+

微信讨论群

+

微信公众号

diff --git a/static/img/weixin.jpeg b/static/img/weixin.jpeg new file mode 100644 index 0000000..c44548c Binary files /dev/null and b/static/img/weixin.jpeg differ