diff --git a/blog/3-streampark-usercase-bondex-paimon.md b/blog/3-streampark-usercase-bondex-paimon.md index eb61a5980..d3b645350 100644 --- a/blog/3-streampark-usercase-bondex-paimon.md +++ b/blog/3-streampark-usercase-bondex-paimon.md @@ -1,139 +1,139 @@ --- slug: streampark-usercase-bondex-with-paimon -title: 海程邦达基于 Apache Paimon + StreamPark 的流式数仓实践 -tags: [StreamPark, 生产实践, paimon, streaming-warehouse] +title: Based on Apache Paimon + StreamPark's Streaming Data Warehouse Practice by Bondex +tags: [StreamPark, Production Practice, paimon, streaming-warehouse] --- ![](/blog/bondex/Bondex.png) -**导读:**本文主要介绍作为供应链物流服务商海程邦达在数字化转型过程中采用 Paimon + StreamPark 平台实现流式数仓的落地方案。我们以 Apache StreamPark 流批一体平台提供了一个易于上手的生产操作手册,以帮助用户提交 Flink 任务并迅速掌握 Paimon 的使用方法。 +**Foreword: **This article mainly introduces the implementation of a streaming data warehouse by Bondex, a supply chain logistics service provider, in the process of digital transformation using the Paimon + StreamPark platform. We provide an easy-to-follow operational manual with the Apache StreamPark integrated stream-batch platform to help users submit Flink tasks and quickly master the use of Paimon. -- 公司业务情况介绍 -- 大数据技术痛点以及选型 -- 生产实践 -- 问题排查分析 -- 未来规划 +- Introduction to Company Business +- Pain Points and Selection in Big Data Technology +- Production Practice +- Troubleshooting Analysis +- Future Planning -## 01 公司业务情况介绍 +## 01 Introduction to Company Business -海程邦达集团一直专注于供应链物流领域,通过打造优秀的国际化物流平台,为客户提供端到端一站式智慧型供应链物流服务。集团现有员工 2000 余人,年营业额逾 120 亿人民币,网络遍及全球 200 余个港口,在海内外有超 80 家分、子公司,助力中国企业与世界互联互通。 +Bondex Group has always focused on the field of supply chain logistics. By creating an excellent international logistics platform, it provides customers with end-to-end one-stop intelligent supply chain logistics services. The group currently has over 2,000 employees, an annual turnover of more than 12 billion RMB, a network covering over 200 ports globally, and more than 80 branches and subsidiaries at home and abroad, aiding Chinese enterprises to connect with the world. -### **业务背景** +### **Business Background** -随着公司规模的不断扩大和业务复杂性的增加,为了更好地实现资源优化和流程改进,公司运营与流程管理部需要实时监控公司的业务运转情况,以确保业务流程的稳定性和高效性。 +As the company continues to expand and the complexity of its business increases, in order to better achieve resource optimization and process improvement, the Operations and Process Management Department needs to monitor the company's business operations in real time to ensure the stability and efficiency of business processes. -公司运营与流程管理部负责监督公司各类业务流程的执行,包括海运、空运、铁运各个大区和事业部的订单量,大客户的订单量,航线订单量,关务、仓储、陆运各个操作站点的委托量,公司当天各个大区和事业部实际收入和支出情况等。通过对这些流程的监控和分析,公司能够识别出潜在的问题和瓶颈,提出改进措施和建议,以优化公司运营效率。 +The Operations and Process Management Department is responsible for overseeing the execution of various business processes within the company, including the volume of orders for sea, air, and rail transport across different regions and business divisions, large customer order volumes, route order volumes, the amount of business entrusted to each operation site for customs, warehousing, and land transportation, as well as the actual revenue and expenses of each region and business division on the day. Through monitoring and analysis of these processes, the company can identify potential issues and bottlenecks, propose measures for improvement and suggestions, in order to optimize operational efficiency. -**实时数仓架构:** +**Real-Time Data Warehouse Architecture:** ![](/blog/bondex/realtime_warehouse.png) -当前系统要求直接从生产系统收集实时数据,但存在多个数据源需要进行关联查询,而帆软报表在处理多个数据源时展示不够友好,且无法再次聚合多个数据源。定时查询生产系统会给生产系统数据库带来压力,影响生产系统的稳定运行。因此,我们需要引入一个可以通过 [Flink CDC](https://github.com/ververica/flink-cdc-connectors) 技术实现流式处理的数仓,以解决实时数据处理的问题。这个数仓需要能够从多个数据源收集实时数据并在此基础上实现复杂的关联 SQL 查询、机器学习等操作,并且可以避免不定时查询生产系统,从而减轻生产系统的压力,保障生产系统的稳定运行。 +The current system requires direct collection of real-time data from the production system, but there are multiple data sources that need to be associated for queries. The Fanruan report is not user-friendly when dealing with multiple data sources and cannot re-aggregate multiple data sources. Scheduled queries to the production system database can put pressure on it, affecting the stable operation of the production system. Therefore, we need to introduce a warehouse that can handle real-time data through [Flink CDC](https://github.com/ververica/flink-cdc-connectors) technology for stream processing. This data warehouse needs to be able to collect real-time data from multiple data sources and on this basis, perform complex associated SQL queries, machine learning, etc., and avoid unscheduled queries to the production system, thereby reducing the load on the production system and ensuring its stable operation. -## 02 大数据技术痛点以及选型 +## 02 Big Data Technology Pain Points and Selection -海程邦达大数据团队建立以来一直以高效运维工具或平台来实现对人员的高效配置,优化重复劳动,手工作业。 +Since its establishment, the Bondex big data team has always focused on using efficient operational tools or platforms to achieve effective staffing arrangements, optimize repetitive labor, and reduce manual operations. -在离线批数处理已能够支持集团基础驾驶舱和管理报表的情况下,集团运管部门提出了业务要实时统计订单数量,操作单量的需求,财务部门有现金流实时展示的需求,在这样的背景下,基于大数据的流批一体方案势在必行。 +While offline batch data processing has been able to support the group's basic cockpit and management reporting, the Transportation Management Department of the group has proposed the need for real-time statistics on order quantities and operational volumes. The finance department has expressed the need for a real-time display of cash flow. In this context, a big data-based integrated stream-batch solution became imperative. -虽然大数据部门已经使用了 [Apache Doris](https://github.com/apache/doris) 来实现湖仓一体的存储和计算,此前已在 Doris 社区发表湖仓一体建设的文章,但是有些问题有待解决,流式数据存储无法复用、中间层数据不可查、做不到实时聚合计算问题。 +Although the big data department has already utilized [Apache Doris](https://github.com/apache/doris) for integrated storage and computing of lakehouse architecture and has published articles on lakehouse construction in the Doris community, there are some issues that need to be addressed, such as the inability to reuse streaming data storage, the inaccessibility of intermediate layer data, and the inability to perform real-time aggregation calculations. -按照架构演进时间排序,近几年通用的架构解决方案如下: +Sorted by the evolution of architecture over recent years, the common architectural solutions are as follows: -### **hadoop架构** +### **Hadoop Architecture** -传统数仓和互联网数仓的分界点,在互联网早期的时候,大家对于数据分析的要求也不高,主要是做实时性不高的报表、支撑决策,对应的离线数据分析方案就产生了。 +The demarcation point between traditional data warehouses and internet data warehouses dates back to the early days of the internet when the requirements for data analysis were not high, mainly focusing on reports with low real-time needs to support decision-making. This gave rise to offline data analysis solutions. -**优点:**数据类型支持丰富,支持海量运算,机器配置要求低,时效性低,容错 +**Advantages: **Supports a rich variety of data types, capable of massive computations, low requirements for machine configurations, low timeliness, fault-tolerant. -**缺点:**不支持实时;运维复杂;查询优化器不如 MPP,响应慢 +**Disadvantages: **Does not support real-time; complex to maintain; the query optimizer is not as good as MPP, slow response. -选型依据:不支持实时;运维复杂,不符合人员精简配置原则;性能差 +Selection Basis: Does not support real-time; maintenance is complex, which does not conform to the principle of streamlined staffing; poor performance. -### **lambda架构** +### **Lambda Architecture** -Lambda 架构是由 Storm 的作者 Nathan Marz 提出的一个实时大数据处理框架。Marz 在 Twitter 工作期间开发了著名的实时大数据处理框架 [Apache Storm](https://github.com/apache/storm) ,Lambda 架构是其根据多年进行分布式大数据系统的经验总结提炼而成。 +Lambda Architecture is a real-time big data processing framework proposed by Nathan Marz, the author of Storm. Marz developed the famous real-time big data processing framework [Apache Storm](https://github.com/apache/storm) while working at Twitter, and the Lambda Architecture is the culmination of his years of experience in distributed big data systems. ![](/blog/bondex/lambda.png) -数据流处理分为 ServingLayer、SpeedLayer、BatchLayer 三层: +Data stream processing is divided into three layers: Serving Layer, Speed Layer, and Batch Layer: -- Batch层: 对离线数据进行处理,最后提供 view 服务给到业务; -- Speed层: 对实时增量数据进行处理,最后提供 view 服务给到业务; -- Serving层: 响应用户的请求,实现离线和增量数据的聚合计算,并最终提供服务; +- Batch Layer: Processes offline data and eventually provides a view service to the business; +- Speed Layer: Processes real-time incremental data and eventually provides a view service to the business; +- Serving Layer: Responds to user requests, performs aggregation calculations on offline and incremental data, and ultimately provides the service; -优点是:离线和实时分开计算,使用两套框架,架构稳定 +The advantage is that offline and real-time computations are separated, using two sets of frameworks, which makes the architecture stable. -缺点是:离线和实时数据很难保持一致性,运维人员需要维护两套框架三层架构,开发人员需要写三套代码 +The disadvantage is that it is difficult to maintain consistency between offline and real-time data, and operational staff need to maintain two sets of frameworks and three layers of architecture. Developers need to write three sets of code. -选型依据:数据一致性不可控;运维、开发工作量大,不符合人员精简配置的原则; +Selection Basis: Data consistency is uncontrollable; operations and development require significant workload, which does not conform to the principle of streamlined staffing. -### **kappa架构** +### **Kappa Architecture** -kappa 架构只用一套数据流处理架构来解决离线和实时数据,用实时流来解决所有问题,旨在提供快速可靠的查询访问结果。它非常适合各种数据处理工作负载,包括连续数据管道、实时数据处理、机器学习模型和实时数据分析、物联网系统以及许多其他具有单一技术堆栈的用例。 +The Kappa Architecture uses a single stream-processing framework to address both offline and real-time data, solving all problems with a real-time stream, with the goal of providing fast and reliable query access to results. It is highly suitable for various data processing workloads, including continuous data pipelines, real-time data processing, machine learning models and real-time analytics, IoT systems, and many other use cases with a single technology stack. -它通常使用流处理引擎实现,例如Apache Flink、Apache Storm、Apache Kinesis、 Apache Kafka,旨在处理大量数据流并提供快速可靠的查询访问结果。 +It is typically implemented using a streaming processing engine such as Apache Flink, Apache Storm, Apache Kinesis, Apache Kafka, designed to handle large data streams and provide fast and reliable query results. ![](/blog/bondex/kappa.png) -**优点是:**单数据流处理框架 +**Advantages: **Single data stream processing framework. -**缺点是:**虽然它的架构相对 lamabda 架构简单,但是流式处理框架的设置和维护相对复杂,不具备真正意义上的离线数据处理能力;流平台中存储大数据成本高昂 +**Disadvantages: **Although its architecture is simpler compared to Lambda Architecture, the setup and maintenance of the streaming processing framework are relatively complex, and it lacks true offline data processing capabilities; storing large amounts of data in streaming platforms can be costly. -选型依据:离线数据处理能力需要保留,控制成本 +Selection Basis: The capability for offline data processing needs to be retained, and costs controlled. ### **Iceberg** -为此我们也调研了 [Apache Iceberg](https://github.com/apache/Iceberg) ,它的快照功能一定程度上能够实现流批一体,但是它的问题是基于 kafka 做的实时表中间层不可查或者无法复用已经存在的表,对 kafka 有强依赖,需要利用 kafka 将中间结果写到 iceberg 表,增加了系统的复杂度和可维护性。 +Therefore, we also researched [Apache Iceberg](https://github.com/apache/iceberg), whose snapshot feature can to some extent achieve streaming-batch integration. However, the issue with it is that the real-time table layer based on Kafka is either not queryable or cannot reuse existing tables, with a strong dependency on Kafka. It requires the use of Kafka to write intermediate results to Iceberg tables, increasing system complexity and maintainability. -选型依据:无 kafka 实时架构已落地,中间数据无法实现可查可复用 +Selection Basis: A Kafka-free real-time architecture has been implemented, intermediate data cannot be made queryable or reusable. -### **流式数仓(kappa架构的延续)** +### **Streaming Data Warehouse (Continuation of the Kappa Architecture)** -海程邦达大数据团队自 FTS0.3.0 版本开始参与流式数仓建设,旨在进一步降低数据处理框架的复杂度和人员的精简配置,前期的宗旨是既然是趋势就要参与进来,过程中不断学习精进,向最前沿的技术靠拢,团队一致认为有坑就踩坑,摸着石头也要过河,好在经过几个版本的迭代,在社区的高效配合下,最开始出现的问题也慢慢得以解决 +Since the FTS0.3.0 version, the BonDex big data team has participated in the construction of a streaming data warehouse, aiming to further reduce the complexity of the data processing framework and streamline personnel configuration. The initial principle was to get involved with the trend, to continuously learn and improve, and to move closer to cutting-edge technology. The team unanimously believes that it is essential to embrace challenges and "cross the river by feeling the stones." Fortunately, after several iterations, the problems that initially arose have been gradually resolved with the efficient cooperation of the community. -**流式数仓架构如下:** +**The architecture of the streaming data warehouse is as follows:** ![](/blog/bondex/streaming_warehouse.png) -延续了 kappa 架构的特点,一套流处理架构,好处在与,底层 Paimon 的技术支撑使得数据在全链路可查,数仓分层架构得以复用,同时兼顾了离线和实时的处理能力,减少存储和计算的浪费 +Continuing the characteristics of the Kappa architecture with a single stream processing framework, the advantage lies in the fact that the underlying Paimon technology support makes the data traceable throughout the entire chain. The data warehouse layer architecture can be reused, while also considering the processing capabilities of both offline and real-time data, reducing the waste of storage and computing resources. -## 03 生 产 实 践 +## 03 Production Practices -本方案采用 Flink Application On K8s 集群,Flink CDC 实时摄取业务系统关系型数据库数据,通过 StreamPark 任务平台提交 Flink + Paimon Streaming Data Warehouse 任务, 最后采用 Trino 引擎接入 Finereport 提供服务和开发人员的查询。Paimon 底层存储支持 S3 协议,因为公司大数据服务依赖于阿里云所以使用对象存储OSS作为数据文件系统。 +This solution adopts Flink Application on K8s clusters, with Flink CDC for real-time ingestion of relational database data from business systems. Tasks for Flink + Paimon Streaming Data Warehouse are submitted through the StreamPark task platform, with the Trino engine ultimately used to access Finereport for service provision and developer queries. Paimon's underlying storage supports the S3 protocol, and as the company's big data services rely on Alibaba Cloud, Object Storage Service (OSS) is used as the data filesystem. -[StreamPark](https://github.com/apache/incubator-streampark) 是一个实时计算平台,与 [Paimon](https://github.com/apache/incubator-paimon) 结合使用其强大功能来处理实时数据流。此平台提供以下主要功能: +[StreamPark](https://github.com/apache/incubator-streampark) is a real-time computing platform that leverages the powerful capabilities of [Paimon](https://github.com/apache/incubator-paimon) to process real-time data streams. This platform offers the following key features: -**实时数据处理:**StreamPark 支持提交实时数据流任务,能够实时获取、转换、过滤和分析数据。这对于需要快速响应实时数据的应用非常重要,例如实时监控、实时推荐和实时风控等领域。 +**Real-time Data Processing: **StreamPark supports the submission of real-time data stream tasks, capable of real-time acquisition, transformation, filtering, and analysis of data. This is extremely important for applications that require rapid response to real-time data, such as real-time monitoring, real-time recommendations, and real-time risk control. -**可扩展性:**可以高效处理大规模实时数据,具备良好的可扩展性。可以在分布式计算环境中运行,并能够自动处理并行化、故障恢复和负载均衡等问题,以确保高效且可靠地处理数据。 +**Scalability: **Capable of efficiently processing large-scale real-time data with good scalability. It can operate in a distributed computing environment, automatically handling parallelization, fault recovery, and load balancing to ensure efficient and reliable data processing. -**Flink 集成:**基于 [Apache Flink](https://github.com/apache/flink) 构建,利用 Flink 的强大流处理引擎来实现高性能和鲁棒性。用户可以充分利用 Flink 的特性和生态系统,如广泛的连接器、状态管理和事件时间处理等。 +**Flink Integration: **Built on [Apache Flink](https://github.com/apache/flink), it leverages Flink’s powerful stream processing engine for high performance and robustness. Users can fully utilize the features and ecosystem of Flink, such as its extensive connectors, state management, and event-time processing. -**易用性:**提供了直观的图形界面和简化的 API,可以轻松地构建和部署数据处理任务,而无需深入了解底层技术细节。 +**Ease of Use: **Provides a straightforward graphical interface and simplified API, enabling easy construction and deployment of data processing tasks without needing to delve into underlying technical details. -通过在 StreamPark 平台上提交 Paimon 任务,我们可以建立一个全链路实时流动、可查询和分层可复用的 Pipline。 +By submitting Paimon tasks on the StreamPark platform, we can establish a full-link real-time flowing, queryable, and layered reusable Pipline. ![](/blog/bondex/pipline.png) -主要采用组件版本如下: +The main components versions used are as follows: - flink-1.16.0-scala-2.12 - paimon-flink-1.16-0.4-20230424.001927-40.jar - apache-streampark_2.12-2.0.0 - kubernetes v1.18.3 -### **环境构建** +### **Environment Setup** -下载 flink-1.16.0-scala-2.12.tar.gz 可以在 flink官网 下载对应版本的安装包到StreamPark 部署服务器 +Download flink-1.16.0-scala-2.12.tar.gz which can be obtained from the official Flink website. Download the corresponding version of the package to the StreamPark deployment server. ```shell -#解压 +# Unzip tar zxvf flink-1.16.0-scala-2.12.tar.gz -#修改 flink-conf 配置文件并启动集群 +# Modify the flink-conf configuration file and start the cluster jobmanager.rpc.address: localhost jobmanager.rpc.port: 6123 @@ -152,11 +152,11 @@ state.checkpoints.dir: file:///opt/flink/checkpoints state.savepoints.dir: file:///opt/flink/savepoints execution.checkpointing.interval: 2min -#当作业手动取消/暂停时,将会保留作业的 Checkpoint 状态信息 +# When the job is manually canceled/paused, the Checkpoint state information of the job will be retained execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION state.backend: rocksdb -#已完成的CP保存个数 +# Number of completed Checkpoints to retain state.checkpoints.num-retained: 2000 state.backend.incremental: true execution.checkpointing.checkpoints-after-tasks-finish.enabled: true @@ -171,7 +171,7 @@ rest.port: 8081 rest.address: localhost ``` -建议可以在本地添加 FLINK_HOME 方便在上 k8s 之前本地排查问题使用 +It is suggested to add FLINK_HOME locally for convenient troubleshooting before deploying on k8s. vim /etc/profile @@ -182,24 +182,24 @@ export PATH=$PATH:$FLINK_HOME/bin source /etc/profile ``` -在 StreamPark 添加 Flink conf: +In StreamPark, add Flink conf: ![](/blog/bondex/flink_conf.jpg) -构建 Flink 1.16.0 基础镜像从 dockerhub拉取对应版本的镜像 +To build the Flink 1.16.0 base image, pull the corresponding version of the image from Docker Hub. ```dockerfile -#拉取镜像 +# Pull the image docker pull flink:1.16.0-scala_2.12-java8 -#打上 tag +# Tag the image docker tagflink:1.16.0-scala_2.12-java8 registry-vpc.cn-zhangjiakou.aliyuncs.com/xxxxx/flink:1.16.0-scala_2.12-java8 -#push 到公司仓库 +# Push to the company repository docker pushregistry-vpc.cn-zhangjiakou.aliyuncs.com/xxxxx/flink:1.16.0-scala_2.12-java8 ``` -创建 Dockerfile 文件 & target 目录将 flink-oss-fs-hadoop JAR包放置在该目录 Shaded Hadoop OSS file system jar 包下载地址: +Create a Dockerfile & target directory to place the flink-oss-fs-hadoop JAR package in that directory. Download the shaded Hadoop OSS file system jar package from: https://repository.apache.org/snapshots/org/apache/paimon/paimon-oss/ @@ -224,7 +224,7 @@ RUN mkdir /opt/flink/plugins/oss-fs-hadoop COPY target/flink-oss-fs-hadoop-1.16.0.jar /opt/flink/plugins/oss-fs-hadoop ``` -### **build 基础镜像** +### **Build base image** ```shell docker build -t flink-table-store:v1.16.0 . @@ -234,71 +234,71 @@ docker tag flink-table-store:v1.16.0 registry-vpc.cn-zhangjiakou.aliyuncs.com/xx docker push registry-vpc.cn-zhangjiakou.aliyuncs.com/xxxxx/flink-table-store:v1.16.0 ``` -接下来准备 Paimon jar 包,可以在 Apache [Repository](https://repository.apache.org/content/groups/snapshots/org/apache/paimon) 下载对应版本,需要注意的是要和 flink 大版本保持一致 +Next, prepare the Paimon jar package. You can download the corresponding version from the Apache [Repository](https://repository.apache.org/content/groups/snapshots/org/apache/paimon). It's important to note that it should be consistent with the major version of Flink. -### **使用 StreamPark 管理作业** +### **Managing Jobs with StreamPark** -**前提条件:** +**Prerequisites:** -- Kubernetes 客户端连接配置 -- Kubernetes RBAC 配置 -- 容器镜像仓库配置 (案例中使用的是阿里云镜像免费版) -- 创建挂载 checkpoint/savepoint 的 pvc 资源 +- Kubernetes client connection configuration +- Kubernetes RBAC configuration +- Container image repository configuration (the free version of Alibaba Cloud image is used in this case) +- Create a PVC resource to mount checkpoints/savepoints -**Kubernetes 客户端连接配置:** +**Kubernetes Client Connection Configuration:** -将 k8s master节点~/.kube/config 配置直接拷贝到 StreamPark 服务器的目录,之后在 StreamPark 服务器执行以下命令显示 k8s 集群 running 代表权限和网络验证成功。 +Copy the k8s master node's `~/.kube/config` configuration directly to the directory on the StreamPark server, then execute the following command on the StreamPark server to display the k8s cluster as running, which indicates successful permission and network verification. ```shell kubectl cluster-info ``` -Kubernetes RBAC 配置,创建 streamx 命名空间: +Kubernetes RBAC Configuration, create the streampark namespace: ```shell -kubectl create ns streamx +kubectl create ns streampark ``` -使用 default 账户创建 clusterrolebinding 资源: +Use the default account to create the clusterrolebinding resource: ```shell -kubectl create secret docker-registry streamparksecret ---docker-server=registry-vpc.cn-zhangjiakou.aliyuncs.com ---docker-username=xxxxxx +kubectl create secret docker-registry streamparksecret +--docker-server=registry-vpc.cn-zhangjiakou.aliyuncs.com +--docker-username=xxxxxx --docker-password=xxxxxx -n streamx``` ``` -**容器镜像仓库配置:** +**Container Image Registry Configuration:** -案例中使用阿里云容器镜像服务ACR,也可以使用自建镜像服务harbor代替。 +In this case, Alibaba Cloud's Container Registry Service (ACR) is used, but you can also substitute it with a self-hosted image service such as Harbor. -创建命名空间 StreamPark (安全设置需要设置为私有) +Create a namespace named StreamPark (set the security setting to private). ![](/blog/bondex/aliyun.png) -在 StreamPark 配置镜像仓库,任务构建镜像会推送到镜像仓库 +Configure the image repository in StreamPark; task build images will be pushed to the repository. ![](/blog/bondex/dockersystem_setting.png) -创建 k8s secret 密钥用来拉取 ACR 中的镜像 streamparksecret 为密钥名称 自定义 +Create a k8s secret key to pull images from ACR; streamparksecret is the name of the secret, customizable. ```shell -kubectl create secret docker-registry streamparksecret ---docker-server=registry-vpc.cn-zhangjiakou.aliyuncs.com ---docker-username=xxxxxx +kubectl create secret docker-registry streamparksecret +--docker-server=registry-vpc.cn-zhangjiakou.aliyuncs.com +--docker-username=xxxxxx --docker-password=xxxxxx -n streamx ``` -创建挂载 checkpoint/savepoint 的 pvc 资源,基于阿里云的对象存储OSS做K8S的持久化 +Creation of PVC Resources for Checkpoints/Savepoints, Utilizing Alibaba Cloud's OSS for K8S Persistence -**OSS CSI 插件:** +**OSS CSI Plugin:** -可以使用 OSS CSI 插件来帮助简化存储管理。您可以使用 csi 配置创建 pv,并且 pvc、pod 像往常一样定义,yaml 文件参考: +The OSS CSI plugin can be used to help simplify storage management. You can use the CSI configuration to create a PV, and define PVCs and pods as usual. For the YAML file reference, visit: https://bondextest.oss-cn-zhangjiakou.aliyuncs.com/ossyaml.zip -**配置要求:** +**Configuration Requirements:** -\- 创建具有所需 RBAC 权限的服务帐户,参考: +\- Create a service account with the necessary RBAC permissions, please refer as below: https://github.com/kubernetes-sigs/alibaba-cloud-csi-driver/blob/master/docs/oss.md @@ -306,31 +306,31 @@ https://github.com/kubernetes-sigs/alibaba-cloud-csi-driver/blob/master/docs/oss kubectl -f rbac.yaml ``` -\- 部署OSS CSI 插件 +\- Deploy the OSS CSI Plugin: ```shell kubectl -f oss-plugin.yaml ``` -\- 创建 CP&SP 的 PV +\- Create PV for CP (Checkpoints) & SP (Savepoints): ```shell kubectl -f checkpoints_pv.yaml kubectl -f savepoints_pv.yaml ``` -\- 创建 CP&SP 的 PVC +\- Create PVC for CP & SP: ```shell kubectl -f checkpoints_pvc.yaml kubectl -f savepoints_pvc.yaml ``` -配置好依赖环境,接下来我们就开始使用 Paimon 进行流式数仓的分层开发。 +Once the dependent environment is configured, we can start using Paimon for layered development of the streaming data warehouse. -### **案例** +### **Case Study** -统计海运空运实时委托单量 +Real-time calculation of sea and air freight consignment volumes. -任务提交:初始化 Paimon catalog 配置 +Job Submission: Initialize Paimon catalog configuration. ![](/blog/bondex/paimon_catalog_setting.png) @@ -338,15 +338,15 @@ kubectl -f checkpoints_pvc.yaml kubectl -f savepoints_pvc.yaml SET 'execution.runtime-mode' = 'streaming'; set 'table.exec.sink.upsert-materialize' = 'none'; SET 'sql-client.execution.result-mode' = 'tableau'; --- 创建并使用 FTS Catalog 底层存储方案采用阿里云oss +-- Create and use FTS Catalog with underlying storage solution using Alibaba Cloud OSS CREATE CATALOG `table_store` WITH ( 'type' = 'paimon', -'warehouse' = 'oss://xxxxx/xxxxx' #自定义oss存储路径 +'warehouse' = 'oss://xxxxx/xxxxx' # customize oss storage path ); USE CATALOG `table_store`; ``` -一个任务同时抽取 postgres、mysql、sqlserver 三种数据库的表数据写入到 Paimon +A single job extracts table data from postgres, mysql, and sqlserver databases and writes it into Paimon. ![](/blog/bondex/application_setting.png) @@ -354,7 +354,7 @@ USE CATALOG `table_store`; ![](/blog/bondex/pod_template.png) -**信息如下:** +**Details are as follows:** ``` Development Mode:Flink SQL @@ -365,20 +365,20 @@ Flink Version :flink-1.16.0-scala-2.12 Kubernetes Namespace :streamx -Kubernetes ClusterId :(任务名自定义即可) +Kubernetes ClusterId :(Task name can be customized) -#上传到阿里云镜像仓库的基础镜像 -Flink Base Docker Image :registry-vpc.cn-zhangjiakou.aliyuncs.com/xxxxx/flink-table-store:v1.16.0 +# Base image uploaded to Alibaba Cloud Image Repository +Flink Base Docker Image :registry-vpc.cn-zhangjiakou.aliyuncs.com/xxxxx/flink-table-store:v1.16.0 Rest-Service Exposed Type:NodePort -#paimon基础依赖包: +# Paimon base dependency package: paimon-flink-1.16-0.4-20230424.001927-40.jar flink-shaded-hadoop-2-uber-2.8.3-10.0.jar -#flinkcdc依赖包下载地址: +# FlinkCDC dependency package download address: https://github.com/ververica/flink-cdc-connectors/releases/tag/release-2.2.0 ``` @@ -414,25 +414,25 @@ imagePullSecrets: **Flink sql:** -1. 构建源表和paimon中ods表的关系,这里就是源表和目标表一对一映射 +1. Establish the relationship between the source table and the ODS table in Paimon, here it is a one-to-one mapping between the source table and the target table. ```sql --- postgre数据库 示例 +-- PostgreSQL database example CREATE TEMPORARY TABLE `shy_doc_hdworkdochd` ( -`doccode` varchar(50) not null COMMENT '主键', -`businessmodel` varchar(450) COMMENT '业务模式', -`businesstype` varchar(450) COMMENT '业务性质', -`transporttype` varchar(50) COMMENT '运输类型', +`doccode` varchar(50) not null COMMENT 'Primary Key', +`businessmodel` varchar(450) COMMENT 'Business Model', +`businesstype` varchar(450) COMMENT 'Business Type', +`transporttype` varchar(50) COMMENT 'Transportation Type', ...... -`bookingguid` varchar(50) COMMENT '操作编号', +`bookingguid` varchar(50) COMMENT 'Operation Number', PRIMARY KEY (`doccode`) NOT ENFORCED ) WITH ( 'connector' = 'postgres-cdc', -'hostname' = '数据库服务器IP地址', -'port' = '端口号', -'username' = '用户名', -'password' = '密码', -'database-name' = '数据库名', +'hostname' = 'Database server IP address', +'port' = 'Port number', +'username' = 'Username', +'password' = 'Password', +'database-name' = 'Database name', 'schema-name' = 'dev', 'decoding.plugin.name' = 'wal2json',, 'table-name' = 'doc_hdworkdochd', @@ -440,43 +440,43 @@ PRIMARY KEY (`doccode`) NOT ENFORCED ); CREATE TEMPORARY TABLE `shy_base_enterprise` ( -`entguid` varchar(50) not null COMMENT '主键', -`entorgcode` varchar(450) COMMENT '客户编号', -`entnature` varchar(450) COMMENT '客户类型', -`entfullname` varchar(50) COMMENT '客户名称', +`entguid` varchar(50) not null COMMENT 'Primary Key', +`entorgcode` varchar(450) COMMENT 'Customer Code', +`entnature` varchar(450) COMMENT 'Customer Type', +`entfullname` varchar(50) COMMENT 'Customer Full Name', PRIMARY KEY (`entguid`,`entorgcode`) NOT ENFORCED ) WITH ( 'connector' = 'postgres-cdc', -'hostname' = '数据库服务器IP地址', -'port' = '端口号', -'username' = '用户名', -'password' = '密码', -'database-name' = '数据库名', +'hostname' = 'Database server IP address', +'port' = 'Port number', +'username' = 'Username', +'password' = 'Password', +'database-name' = 'Database name', 'schema-name' = 'dev', 'decoding.plugin.name' = 'wal2json', 'table-name' = 'base_enterprise', -'debezium.snapshot.mode'='never', -- 增量同步(全量+增量忽略该属性) +'debezium.snapshot.mode'='never', -- Incremental synchronization (ignore this property for full + incremental) 'debezium.slot.name' = 'base_enterprise_slotname03' ); --- 根据源表结构在paimon上ods层创建对应的目标表 +-- Create the corresponding target table in the ods layer on Paimon according to the source table structure CREATE TABLE IF NOT EXISTS ods.`ods_shy_jh_doc_hdworkdochd` ( -`o_year` BIGINT NOT NULL COMMENT '分区字段', -`create_date` timestamp NOT NULL COMMENT '创建时间', +`o_year` BIGINT NOT NULL COMMENT 'Partition Field', +`create_date` timestamp NOT NULL COMMENT 'Creation Time', PRIMARY KEY (`o_year`, `doccode`) NOT ENFORCED ) PARTITIONED BY (`o_year`) WITH ( 'changelog-producer.compaction-interval' = '2m' ) LIKE `shy_doc_hdworkdochd` (EXCLUDING CONSTRAINTS EXCLUDING OPTIONS); CREATE TABLE IF NOT EXISTS ods.`ods_shy_base_enterprise` ( -`create_date` timestamp NOT NULL COMMENT '创建时间', +`create_date` timestamp NOT NULL COMMENT 'Creation Time', PRIMARY KEY (`entguid`,`entorgcode`) NOT ENFORCED ) WITH ( 'changelog-producer.compaction-interval' = '2m' ) LIKE `shy_base_enterprise` (EXCLUDING CONSTRAINTS EXCLUDING OPTIONS); --- 设置作业名,执行作业任务将源表数据实时写入到paimon对应表中 +-- Set the job name, execute the job task to write the source table data into the corresponding Paimon table in real time SET 'pipeline.name' = 'ods_doc_hdworkdochd'; INSERT INTO ods.`ods_shy_jh_doc_hdworkdochd` @@ -495,35 +495,35 @@ SELECT FROM `shy_base_enterprise` where entorgcode is not null and entorgcode <> ''; --- mysql数据库 示例 +-- MySQL database example CREATE TEMPORARY TABLE `doc_order` ( -`id` BIGINT NOT NULL COMMENT '主键', -`order_no` varchar(50) NOT NULL COMMENT '订单号', -`business_no` varchar(50) COMMENT 'OMS服务号', +`id` BIGINT NOT NULL COMMENT 'Primary Key', +`order_no` varchar(50) NOT NULL COMMENT 'Order Number', +`business_no` varchar(50) COMMENT 'OMS Service Number', ...... -`is_deleted` int COMMENT '是否作废', +`is_deleted` int COMMENT 'Is Deleted', PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', -'hostname' = '数据库服务器地址', -'port' = '端口号', -'username' = '用户名', -'password' = '密码', -'database-name' = '库名', +'hostname' = 'Database server address', +'port' = 'Port number', +'username' = 'Username', +'password' = 'Password', +'database-name' = 'Database name', 'table-name' = 'doc_order' ); --- 根据源表结构在paimon上ods层创建对应的目标表 +-- Create the corresponding target table in the ods layer on Paimon according to the source table structure CREATE TABLE IF NOT EXISTS ods.`ods_bondexsea_doc_order` ( -`o_year` BIGINT NOT NULL COMMENT '分区字段', -`create_date` timestamp NOT NULL COMMENT '创建时间', +`o_year` BIGINT NOT NULL COMMENT 'Partition Field', +`create_date` timestamp NOT NULL COMMENT 'Creation Time', PRIMARY KEY (`o_year`, `id`) NOT ENFORCED ) PARTITIONED BY (`o_year`) WITH ( 'changelog-producer.compaction-interval' = '2m' ) LIKE `doc_order` (EXCLUDING CONSTRAINTS EXCLUDING OPTIONS); --- 设置作业名,执行作业任务将源表数据实时写入到paimon对应表中 +-- Set the job name, execute the job task to write the source table data into the corresponding Paimon table in real time SET 'pipeline.name' = 'ods_bondexsea_doc_order'; INSERT INTO ods.`ods_bondexsea_doc_order` @@ -533,37 +533,37 @@ SELECT ,TO_TIMESTAMP(CONVERT_TZ(cast(CURRENT_TIMESTAMP as varchar), 'UTC', 'Asia/Shanghai')) AS `create_date` FROM `doc_order` where gmt_create > '2023-01-01'; --- sqlserver数据库 示例 +-- SQL Server database example CREATE TEMPORARY TABLE `OrderHAWB` ( -`HBLIndex` varchar(50) NOT NULL COMMENT '主键', -`CustomerNo` varchar(50) COMMENT '客户编号', +`HBLIndex` varchar(50) NOT NULL COMMENT 'Primary Key', +`CustomerNo` varchar(50) COMMENT 'Customer Number', ...... -`CreateOPDate` timestamp COMMENT '制单日期', +`CreateOPDate` timestamp COMMENT 'Billing Date', PRIMARY KEY (`HBLIndex`) NOT ENFORCED ) WITH ( 'connector' = 'sqlserver-cdc', -'hostname' = '数据库服务器地址', -'port' = '端口号', -'username' = '用户名', -'password' = '密码', -'database-name' = '数据库名', +'hostname' = 'Database server address', +'port' = 'Port number', +'username' = 'Username', +'password' = 'Password', +'database-name' = 'Database name', 'schema-name' = 'dbo', --- 'debezium.snapshot.mode' = 'initial' -- 全量增量都抽取 -'scan.startup.mode' = 'latest-offset',-- 只抽取增量数据 +-- 'debezium.snapshot.mode' = 'initial' -- Extract both full and incremental +'scan.startup.mode' = 'latest-offset',-- Extract only incremental data 'table-name' = 'OrderHAWB' ); --- 根据源表结构在paimon上ods层创建对应的目标表 +-- Create the corresponding target table in the ods layer on Paimon according to the source table structure CREATE TABLE IF NOT EXISTS ods.`ods_airsea_airfreight_orderhawb` ( -`o_year` BIGINT NOT NULL COMMENT '分区字段', -`create_date` timestamp NOT NULL COMMENT '创建时间', +`o_year` BIGINT NOT NULL COMMENT 'Partition Field', +`create_date` timestamp NOT NULL COMMENT 'Creation Time', PRIMARY KEY (`o_year`, `HBLIndex`) NOT ENFORCED ) PARTITIONED BY (`o_year`) WITH ( 'changelog-producer.compaction-interval' = '2m' ) LIKE `OrderHAWB` (EXCLUDING CONSTRAINTS EXCLUDING OPTIONS); --- 设置作业名,执行作业任务将源表数据实时写入到paimon对应表中 +-- Set the job name, execute the job task to write the source table data into the corresponding Paimon table in real time SET 'pipeline.name' = 'ods_airsea_airfreight_orderhawb'; INSERT INTO ods.`ods_airsea_airfreight_orderhawb` @@ -576,47 +576,47 @@ RTRIM(`HBLIndex`) as `HBLIndex` FROM `OrderHAWB` where CreateOPDate > '2023-01-01'; ``` -业务表数据实时写入 Paimon ods 表效果如下: +The real-time data writing effect of the business table into the Paimon ods table is as follows: ![](/blog/bondex/ods.png) -2. 将ods层表的数据打宽写入 dwd 层中,这里其实就是将 ods 层相关业务表合并写入dwd 中,这里主要是处理 count_order 字段的值,因为源表中的数据存在逻辑删除和物理删除所以通过 count 函数统计会有问题,所以我们这里采用 sum 聚合来计算单量,每个reference_no对应的count_order是1,如果逻辑作废通过sql将它处理成0,物理删除 Paimon 会自动处理。 +2. Flatten the data from the ods layer tables and write it into the dwd layer. This process is essentially about merging the related business tables from the ods layer into the dwd layer. The main focus here is on processing the value of the count_order field. Since the data in the source table may be logically or physically deleted, using the count function for statistics may lead to issues. Therefore, we use the sum aggregate to calculate the order quantity. Each reference_no corresponds to a count_order of 1. If it is logically voided, it will be processed as 0 through SQL, and Paimon will automatically handle physical deletions. -dim 维表我们这边直接拿的 doris 里面处理好的维表来使用,维表更新频率低,所以没有在 Paimon 里面进行二次开发。 +For the dim dimension table, we directly use the dimension table processed in Doris, as the update frequency of the dimension table is low, so there was no need for secondary development within Paimon. ```sql --- 在paimon-dwd层创建宽表 +-- Create a wide table at the paimon-dwd layer CREATE TABLE IF NOT EXISTS dwd.`dwd_business_order` ( -`reference_no` varchar(50) NOT NULL COMMENT '委托单号主键', -`bondex_shy_flag` varchar(8) NOT NULL COMMENT '区分', -`is_server_item` int NOT NULL COMMENT '是否已经关联订单', -`order_type_name` varchar(50) NOT NULL COMMENT '业务分类', -`consignor_date` DATE COMMENT '统计日期', -`consignor_code` varchar(50) COMMENT '客户编号', -`consignor_name` varchar(160) COMMENT '客户名称', -`sales_code` varchar(32) NOT NULL COMMENT '销售编号', -`sales_name` varchar(200) NOT NULL COMMENT '销售名称', -`delivery_center_op_id` varchar(32) NOT NULL COMMENT '交付编号', -`delivery_center_op_name` varchar(200) NOT NULL COMMENT '交付名称', -`pol_code` varchar(100) NOT NULL COMMENT '起运港代码', -`pot_code` varchar(100) NOT NULL COMMENT '中转港代码', -`port_of_dest_code` varchar(100) NOT NULL COMMENT '目的港代码', -`is_delete` int not NULL COMMENT '是否作废', -`order_status` varchar(8) NOT NULL COMMENT '订单状态', -`count_order` BIGINT not NULL COMMENT '订单数', -`o_year` BIGINT NOT NULL COMMENT '分区字段', -`create_date` timestamp NOT NULL COMMENT '创建时间', +`reference_no` varchar(50) NOT NULL COMMENT 'Primary key for the consignment order number', +`bondex_shy_flag` varchar(8) NOT NULL COMMENT 'Differentiator', +`is_server_item` int NOT NULL COMMENT 'Whether it has been linked to an order', +`order_type_name` varchar(50) NOT NULL COMMENT 'Business category', +`consignor_date` DATE COMMENT 'Statistic date', +`consignor_code` varchar(50) COMMENT 'Customer code', +`consignor_name` varchar(160) COMMENT 'Customer name', +`sales_code` varchar(32) NOT NULL COMMENT 'Sales code', +`sales_name` varchar(200) NOT NULL COMMENT 'Sales name', +`delivery_center_op_id` varchar(32) NOT NULL COMMENT 'Delivery number', +`delivery_center_op_name` varchar(200) NOT NULL COMMENT 'Delivery name', +`pol_code` varchar(100) NOT NULL COMMENT 'POL code', +`pot_code` varchar(100) NOT NULL COMMENT 'POT code', +`port_of_dest_code` varchar(100) NOT NULL COMMENT 'POD code', +`is_delete` int not NULL COMMENT 'Whether it is void', +`order_status` varchar(8) NOT NULL COMMENT 'Order status', +`count_order` BIGINT not NULL COMMENT 'Order count', +`o_year` BIGINT NOT NULL COMMENT 'Partition field', +`create_date` timestamp NOT NULL COMMENT 'Creation time', PRIMARY KEY (`o_year`,`reference_no`,`bondex_shy_flag`) NOT ENFORCED ) PARTITIONED BY (`o_year`) WITH ( --- 每个 partition 下设置 2 个 bucket +-- Set 2 buckets under each partition 'bucket' = '2', 'changelog-producer' = 'full-compaction', 'snapshot.time-retained' = '2h', 'changelog-producer.compaction-interval' = '2m' ); --- 设置作业名,将ods层的相关业务表合并写入到dwd层 +-- Set the job name to merge the related business tables from the ods layer into the dwd layer SET 'pipeline.name' = 'dwd_business_order'; INSERT INTO dwd.`dwd_business_order` @@ -649,33 +649,33 @@ FROM ; ``` -flink ui 可以看到 ods 数据经过 paimon 实时 join 清洗到表 dwd_business_order +In the Flink UI, you can see the ods data is joined and cleansed in real-time through Paimon to the table dwd_business_order. ![](/blog/bondex/dwd_business_order.png) -2.将dwd层数据轻度聚合到dwm层中,将相关数据写入dwm.`dwm_business_order_count` 表中,该表数据会根据主键对聚合字段做 sum,sum_orderCount 字段就是聚合结果,物理删除的数据 sum 时 paimon 会自动处理。 +2. The data from the dwd layer is lightly aggregated to the dwm layer, and the related data is written into the dwm.dwm_business_order_count table. The data in this table will perform a sum on the aggregated fields based on the primary key, with the sum_orderCount field being the result of the aggregation. Paimon will automatically handle the data that is physically deleted during the sum. ```sql --- 创建dwm层轻度汇总表,根据日期、销售、操作、业务类别、客户、起运港、目的港汇总单量 +-- Create a lightweight summary table on the dwm layer, summarizing the order volume by date, sales, operation, business category, customer, port of loading, and destination port CREATE TABLE IF NOT EXISTS dwm.`dwm_business_order_count` ( -`l_year` BIGINT NOT NULL COMMENT '统计年', -`l_month` BIGINT NOT NULL COMMENT '统计月', -`l_date` DATE NOT NULL COMMENT '统计日期', -`bondex_shy_flag` varchar(8) NOT NULL COMMENT '区分', -`order_type_name` varchar(50) NOT NULL COMMENT '业务分类', -`is_server_item` int NOT NULL COMMENT '是否已经关联订单', -`customer_code` varchar(50) NOT NULL COMMENT '客户编号', -`sales_code` varchar(50) NOT NULL COMMENT '销售编号', -`delivery_center_op_id` varchar(50) NOT NULL COMMENT '交付编号', -`pol_code` varchar(100) NOT NULL COMMENT '起运港代码', -`pot_code` varchar(100) NOT NULL COMMENT '中转港代码', -`port_of_dest_code` varchar(100) NOT NULL COMMENT '目的港代码', -`customer_name` varchar(200) NOT NULL COMMENT '客户名称', -`sales_name` varchar(200) NOT NULL COMMENT '销售名称', -`delivery_center_op_name` varchar(200) NOT NULL COMMENT '交付名称', -`sum_orderCount` BIGINT NOT NULL COMMENT '订单数', -`create_date` timestamp NOT NULL COMMENT '创建时间', -PRIMARY KEY (`l_year`, +`l_year` BIGINT NOT NULL COMMENT 'Statistic year', +`l_month` BIGINT NOT NULL COMMENT 'Statistic month', +`l_date` DATE NOT NULL COMMENT 'Statistic date', +`bondex_shy_flag` varchar(8) NOT NULL COMMENT 'Identifier', +`order_type_name` varchar(50) NOT NULL COMMENT 'Business category', +`is_server_item` int NOT NULL COMMENT 'Whether an order has been associated', +`customer_code` varchar(50) NOT NULL COMMENT 'Customer code', +`sales_code` varchar(50) NOT NULL COMMENT 'Sales code', +`delivery_center_op_id` varchar(50) NOT NULL COMMENT 'Delivery ID', +`pol_code` varchar(100) NOT NULL COMMENT 'Port of loading code', +`pot_code` varchar(100) NOT NULL COMMENT 'Transshipment port code', +`port_of_dest_code` varchar(100) NOT NULL COMMENT 'Destination port code', +`customer_name` varchar(200) NOT NULL COMMENT 'Customer name', +`sales_name` varchar(200) NOT NULL COMMENT 'Sales name', +`delivery_center_op_name` varchar(200) NOT NULL COMMENT 'Delivery name', +`sum_orderCount` BIGINT NOT NULL COMMENT 'Order count', +`create_date` timestamp NOT NULL COMMENT 'Creation time', +PRIMARY KEY (`l_year`, `l_month`, `l_date`, `order_type_name`, @@ -690,7 +690,7 @@ PRIMARY KEY (`l_year`, ) WITH ( 'changelog-producer' = 'full-compaction', 'changelog-producer.compaction-interval' = '2m', - 'merge-engine' = 'aggregation', -- 使用 aggregation 聚合计算 sum + 'merge-engine' = 'aggregation', -- Use aggregation to calculate sum 'fields.sum_orderCount.aggregate-function' = 'sum', 'fields.create_date.ignore-retract'='true', 'fields.sales_name.ignore-retract'='true', @@ -698,7 +698,7 @@ PRIMARY KEY (`l_year`, 'snapshot.time-retained' = '2h', 'fields.delivery_center_op_name.ignore-retract'='true' ); --- 设置作业名 +-- Set the job name SET 'pipeline.name' = 'dwm_business_order_count'; INSERT INTO dwm.`dwm_business_order_count` @@ -712,32 +712,32 @@ dwd.`dwd_business_order` o ; ``` -Flink UI 效果如下 dwd_business_orders 数据聚合写到 dwm_business_order_count: +The Flink UI effect is as follows: the data from dwd_business_orders is aggregated into dwm_business_order_count: ![](/blog/bondex/dwm_business_order_count.png) -4.将 dwm 层数据聚合到 dws 层中,dws 层是做了更小维度的汇总 +4. Aggregate the data from the dwm layer to the dws layer, where the dws layer performs summaries at even finer dimensions. ```sql --- 创建根据操作人、业务类型聚合当天的单量 +-- Create a table to aggregate daily order counts by operator and business type CREATE TABLE IF NOT EXISTS dws.`dws_business_order_count_op` ( - `l_year` BIGINT NOT NULL COMMENT '统计年', - `l_month` BIGINT NOT NULL COMMENT '统计月', - `l_date` DATE NOT NULL COMMENT '统计日期', - `order_type_name` varchar(50) NOT NULL COMMENT '业务分类', - `delivery_center_op_id` varchar(50) NOT NULL COMMENT '交付编号', - `delivery_center_op_name` varchar(200) NOT NULL COMMENT '交付名称', - `sum_orderCount` BIGINT NOT NULL COMMENT '订单数', - `create_date` timestamp NOT NULL COMMENT '创建时间', + `l_year` BIGINT NOT NULL COMMENT 'Statistic Year', + `l_month` BIGINT NOT NULL COMMENT 'Statistic Month', + `l_date` DATE NOT NULL COMMENT 'Statistic Date', + `order_type_name` varchar(50) NOT NULL COMMENT 'Business Category', + `delivery_center_op_id` varchar(50) NOT NULL COMMENT 'Delivery ID', + `delivery_center_op_name` varchar(200) NOT NULL COMMENT 'Delivery Name', + `sum_orderCount` BIGINT NOT NULL COMMENT 'Order Count', + `create_date` timestamp NOT NULL COMMENT 'Creation Time', PRIMARY KEY (`l_year`, `l_month`,`l_date`,`order_type_name`,`delivery_center_op_id`) NOT ENFORCED ) WITH ( - 'merge-engine' = 'aggregation', -- 使用 aggregation 聚合计算 sum + 'merge-engine' = 'aggregation', -- Use aggregation to compute sum 'fields.sum_orderCount.aggregate-function' = 'sum', 'fields.create_date.ignore-retract'='true', 'snapshot.time-retained' = '2h', 'fields.delivery_center_op_name.ignore-retract'='true' ); --- 设置作业名 +-- Set the job name SET 'pipeline.name' = 'dws_business_order_count_op'; INSERT INTO dws.`dws_business_order_count_op` @@ -755,15 +755,15 @@ FROM ; ``` -Flink UI 效果如下 dws_business_order_count_op 数据写到 dws_business_order_count_op: +The Flink UI reflects the following: Data from `dws_business_order_count_op` is written to `dws_business_order_count_op`: ![](/blog/bondex/dws_business_order_count_op.png) -总体数据流转示例 +Overall Data Flow Example ![](/blog/bondex/all_datastream.jpg) -源表: +Source Table: ![](/blog/bondex/source.png) @@ -783,27 +783,27 @@ paimon-dws: ![](/blog/bondex/paimon-dws.png) -特别提醒 sqlserver 数据库抽取时如果源表数据量过大全量抽取会锁表,建议在业务允许的情况下采用增量抽取。对于全量抽取 sqlserver 可以采用中转的方式 sqlserver 全量数据导入到 mysql,从 mysql 再到 paimon-ods ,后面再通过 sqlserever 做增量抽取。 +A special reminder: When extracting from SQL Server databases, if the source table is too large, a full extraction will lock the table. It is recommended to use incremental extraction if the business allows. For a full extraction, SQL Server can use an interim approach where the full data is imported into MySQL, then from MySQL to Paimon-ODS, and later incremental extraction is done through SQL Server. -## 04 问题排查分析 +## 04 Troubleshooting and Analysis -**1. 聚合数据计算不准** +**1. Inaccurate Aggregated Data Calculation** -sqlserver cdc 采集数据到 paimon 表,说明: +SQL Server CDC collects data into the Paimon table, explanation: -**dwd 表:** +**DWD table:** 'changelog-producer' = 'input' -**ads 表:** +**ADS table:** -'merge-engine' = 'aggregation', -- 使用 aggregation 聚合计算 sum +'merge-engine' = 'aggregation', -- Using aggregation to compute sum 'fields.sum_amount.aggregate-function' = 'sum' -ADS 层聚合表采用 agg sum 会出现 dwd 数据流不产生 update_before,产生错误数据流 update_after 比如上游源表 update 10 到 30 dwd 层数据会变更为 30,ads 聚合层数据也会变更为 30,但是现在变为了 append 数据变成了 10+30=40 的错误数据。 +The ADS layer's aggregated table uses agg sum, which can result in the DWD data stream not generating update_before, creating an incorrect data stream with update_after. For example, if the upstream source table updates from 10 to 30, the DWD layer's data will change to 30, and the ADS aggregation layer's data will also change to 30. But now it turns into an append data, resulting in incorrect data of 10+30=40. -解决办法: +Solution: By specifying 'changelog-producer' = 'full-compaction', Table Store will compare the results between full compactions and produce the differences as changelog. @@ -813,72 +813,72 @@ By specifying changelog-producer.compaction-interval table property (default val users can define the maximum interval between two full compactions to ensure latency. This table property does not affect normal compactions and they may still be performed once in a while by writers to reduce reader costs. -这样能解决上述问题。但是随之而来出现了新的问题。默认 changelog-producer.compaction-interval 是 30min,意味着 上游的改动到 ads 查询要间隔 30min,生产过程中发现将压缩间隔时间改成 1min 或者 2 分钟的情况下,又会出现上述 ADS 层聚合数据不准的情况。 +This approach can solve the above mentioned issue. However, it has led to a new problem. The default changelog-producer.compaction-interval is 30 minutes, meaning that it takes 30 minutes for changes upstream to be reflected in the ads query. During production, it has been found that changing the compaction interval to 1 minute or 2 minutes can cause inaccuracies in the ADS layer aggregation data again. ```sql 'changelog-producer.compaction-interval' = '2m' ``` -需要在写入 Flink Table Store 时需要配置 table.exec.sink.upsert-materialize= none,避免产生 Upsert 流,以保证 Flink Table Store 中能够保存完整的 changelog,为后续的流读操作做准备。 +When writing into the Flink Table Store, it is necessary to configure table.exec.sink.upsert-materialize= none to avoid generating an upsert stream, ensuring that the Flink Table Store can retain a complete changelog for subsequent stream read operations. ```sql set 'table.exec.sink.upsert-materialize' = 'none' ``` -**2. 相同 sequence.field 导致 dwd 明细宽表无法收到 update 数据更新** +**2. The same sequence.field causes the dwd detailed wide table to not receive data updates** -**mysql cdc 采集数据到 paimon 表** +**mysql cdc collects data into the paimon table** -说明: +Explanation: -在 MySQL 源端执行 update 数据修改成功后,dwd_orders 表数据能同步成功 +After executing an update on the MySQL source, the data modifications are successfully synchronized to the dwd_orders table. ![](/blog/bondex/dwd_orders.png) -但是查看 dwd_enriched_orders 表数据无法同步,启动流模式查看数据,发现没有数据流向 +However, upon examining the dwd_enriched_orders table data, it is found to be unsynchronized. When starting the stream mode to observe the data, it is discovered that there is no data flow. ![](/blog/bondex/log.png) -**解决:** +**Solution:** -排查发现是由于配置了参数 'sequence.field' = 'o_orderdate'(使用 o_orderdate 生成 sequence id,相同主键合并时选择 sequence id 更大的记录)导致的,因为在修改价格的时候 o_orderdate 字段时间不变, 继而'sequence.field' 是相同的,导致顺序不确定,所以 ROW1 和 ROW2,它们的 o_orderdate 是一样的,所以在更新时会随机选择,所有将该参数去掉即可,去掉后正常按照输入的先后顺序,自动生成一个 sequence number,不会影响同步结果。 +Upon investigation, it was discovered that the issue was caused by the configuration of the parameter 'sequence.field' = 'o_orderdate' (using o_orderdate to generate a sequence ID, and when merging records with the same primary key, the record with the larger sequence ID is chosen). Since the o_orderdate field does not change when modifying the price, the 'sequence.field' remains the same, leading to an uncertain order. Therefore, for ROW1 and ROW2, since their o_orderdate are the same, the update will randomly select between them. By removing this parameter, the system will normally generate a sequence number based on the input order, which will not affect the synchronization result. **3. Aggregate function 'last_non_null_value' does not support retraction** -报错: +Error: Caused by: java.lang.UnsupportedOperationException: Aggregate function 'last_non_null_value' does not support retraction, If you allow this function to ignore retraction messages, you can configure 'fields.${field_name}.ignore-retract'='true'. -可以在官方文档找到解释: +An explanation can be found in the official documentation: -Only sum supports retraction (UPDATE_BEFORE and DELETE), others aggregate functions do not support retraction. +Only sum supports retraction (UPDATE_BEFORE and DELETE), other aggregate functions do not support retraction. -可以理解为:除了 SUM 函数,其他的 Agg 函数都不支持 Retraction,为了避免接收到 DELETE 和 UPDATEBEFORE 消息报错,需要通过给指定字段配 'fields.${field_name}.ignore-retract'='true' 忽略,解决这个报错 +This can be understood as: except for the SUM function, other Agg functions do not support Retraction. To avoid errors when receiving DELETE and UPDATEBEFORE messages, it is necessary to configure 'fields.${field_name}.ignore-retract'='true' for the specified field to ignore retraction and solve this error. ```sql WITH ( -'merge-engine' = 'aggregation', -- 使用 aggregation 聚合计算 sum +'merge-engine' = 'aggregation', -- Use aggregation to compute sum 'fields.sum_orderCount.aggregate-function' = 'sum', -'fields.create_date.ignore-retract'='true' #create_date 字段 +'fields.create_date.ignore-retract'='true' #field create_date ); ``` -**4. paimon任务中断失败** +**4. Paimon Task Interruption Failure** -任务异常中断 导致pod挂掉,查看loki日志显示akka.pattern.AskTimeoutException: Ask timed out on +Task abnormal interruption leads to pod crash, viewing loki logs shows akka.pattern.AskTimeoutException: Ask timed out on ![](/blog/bondex/loki.png) -java.util.concurrent.TimeoutException: Invocation of [RemoteRpcInvocation(JobMasterGateway.updateTaskExecutionState(TaskExecutionState))] at recipient [akka.tcp://flink@fts-business-order-count.streamx:6123/user/rpc/jobmanager_2] timed out. This is usually caused by: 1) Akka failed sending the message silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs. 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase akka.ask.timeout.\n" +java.util.concurrent.TimeoutException: Invocation of [RemoteRpcInvocation(JobMasterGateway.updateTaskExecutionState(TaskExecutionState))] at recipient [akka.tcp://flink@fts-business-order-count.streampark:6123/user/rpc/jobmanager_2] timed out. This is usually caused by: 1) Akka failed sending the message silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs. 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase akka.ask.timeout.\n" -初步判断应该是由于以上2个原因导致触发了 akka 的超时机制,那就调整集群的akka超时间配置和进行单个任务拆分或者调大资源配置。 +The preliminary judgment is that the triggering of akka's timeout mechanism is likely due to the above two reasons. Therefore, adjust the cluster's akka timeout settings and carry out individual task segmentation or increase resource configuration. -我们这边先看如何进行参数修改: +To proceed, let's first see how to modify the parameters: @@ -887,31 +887,31 @@ java.util.concurrent.TimeoutException: Invocation of [RemoteRpcInvocation(JobMas | akka.ask.timeout | 10s | Timeout used for all futures and blocking Akka calls. If Flink fails due to timeouts then you should try to increase this value. Timeouts can be caused by slow machines or a congested network. The timeout value requires a time-unit specifier (ms/s/min/h/d). | | web.timeout | 600000 | Timeout for asynchronous operations by the web monitor in milliseconds. | -在 conf/flink-conf.yaml 最后增加下面参数: +In `conf/flink-conf.yaml`, add the following parameters at the end: **akka.ask.timeout: 100s** **web.timeout:1000000** -然后在 streampark 手动刷新下 flink-conf.yaml 验证参数是否同步成功。 +Then manually refresh the `flink-conf.yaml` in Streampark to verify if the parameters have synchronized successfully. -**5. snapshot no such file or director** +**5. snapshot no such file or directory** -发现 cp 出现失败情况 +It was discovered that `cp` (checkpoint) is failing. -![](/blog/bondex/cp_fail.jpg) +[Image showing failure of cp] -对应时间点日志显示 Snapshot 丢失,任务显示为 running 状态,但是源表 mysql 数据无法写入 paimon ods 表 +Corresponding logs at the time show Snapshot missing, with the task status shown as running, but data from the MySQL source table cannot be written into the paimon ods table. -![](/blog/bondex/status_log.png) +[Image showing status log] -定位cp失败原因为:计算量大,CPU密集性,导致TM内线程一直在processElement,而没有时间做CP +The reason for cp failure is identified as: due to high computation and CPU intensity, threads within the TaskManager are constantly processing elements, without having time to perform the checkpoint. -无法读取 Snapshot 原因为:flink 集群资源不够,Writer 和 Committer 产生竞争,Full-Compaction 时读到了已过期部分的不完整的 Snapshot,目前官方针对这个问题已经修复: +The reason for being unable to read the Snapshot is: insufficient resources in the Flink cluster, leading to competition between Writer and Committer. During Full-Compaction, an incomplete Snapshot of an expired part was read. The official response to this issue has been fixed: https://github.com/apache/incubator-paimon/pull/1308 -而解决 cp 失败的解决办法增加并行度,增加 deploymenttaskmanager slot 和 jobmanager cpu +And the solution to the checkpoint failure is to increase parallelism, by adding more deployment taskmanager slots and enhancing the jobmanager CPU resources. ``` -D kubernetes.jobmanager.cpu=0.8 @@ -927,12 +927,12 @@ https://github.com/apache/incubator-paimon/pull/1308 ![](/blog/bondex/flink_dashboard.png) -在复杂的实时任务中,可以通过修改动态参数的方式,增加资源。 +In complex real-time tasks, resources can be increased by modifying dynamic parameters. -## 05 未 来 规 划 +## 05 Future Planning -- 自建的数据平台 bondata 正在集成 paimon 的元数据信息、数据指标体系、血缘、一键 pipline 等功能,形成海程邦达的数据资产,并将在此基础上展开一站式数据治理 -- 后面将基于 trino Catalog接入Doris,实现真正的离线数据和实时数据的one service -- 采用 doris + paimon 的架构方案继续推进集团内部流批一体数仓建设的步伐 +- Our self-built data platform, Bondata, is integrating Paimon's metadata information, data metric system, lineage, and one-click pipeline features. This integration aims to form HCBondata's data assets and will serve as the foundation for a one-stop data governance initiative. +- Subsequently, we will integrate with Trino Catalog to access Doris, realizing a one-service solution for both offline and real-time data. +- We will continue to advance the pace of building an integrated streaming and batch data warehouse within the group, adopting the architecture of Doris + Paimon. -在这里要感谢之信老师和 StreamPark 社区在使用 StreamPark + Paimon 过程中的大力支持,在学习使用过程中遇到的问题,都能在第一时间给到解惑并得到解决,我们后面也会积极参与社区的交流和建设,让 paimon 能为更多开发者和企业提供流批一体的数据湖解决方案。 +Here, I would like to thank Teacher Zhixin and the StreamPark community for their strong support during the use of StreamPark + Paimon. The problems encountered in the learning process are promptly clarified and resolved. We will also actively participate in community exchanges and contributions in the future, enabling Paimon to provide more developers and enterprises with integrated stream and batch data lake solutions. diff --git a/i18n/zh-CN/docusaurus-plugin-content-blog/3-streampark-usercase-bondex-paimon.md b/i18n/zh-CN/docusaurus-plugin-content-blog/3-streampark-usercase-bondex-paimon.md index eb61a5980..d66e83b72 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-blog/3-streampark-usercase-bondex-paimon.md +++ b/i18n/zh-CN/docusaurus-plugin-content-blog/3-streampark-usercase-bondex-paimon.md @@ -253,19 +253,19 @@ docker push registry-vpc.cn-zhangjiakou.aliyuncs.com/xxxxx/flink-table-store:v1. kubectl cluster-info ``` -Kubernetes RBAC 配置,创建 streamx 命名空间: +Kubernetes RBAC 配置,创建 streampark 命名空间: ```shell -kubectl create ns streamx +kubectl create ns streampark ``` 使用 default 账户创建 clusterrolebinding 资源: ```shell -kubectl create secret docker-registry streamparksecret ---docker-server=registry-vpc.cn-zhangjiakou.aliyuncs.com ---docker-username=xxxxxx ---docker-password=xxxxxx -n streamx``` +kubectl create secret docker-registry streamparksecret +--docker-server=registry-vpc.cn-zhangjiakou.aliyuncs.com +--docker-username=xxxxxx +--docker-password=xxxxxx -n streampark``` ``` **容器镜像仓库配置:** @@ -283,10 +283,10 @@ kubectl create secret docker-registry streamparksecret 创建 k8s secret 密钥用来拉取 ACR 中的镜像 streamparksecret 为密钥名称 自定义 ```shell -kubectl create secret docker-registry streamparksecret ---docker-server=registry-vpc.cn-zhangjiakou.aliyuncs.com ---docker-username=xxxxxx ---docker-password=xxxxxx -n streamx +kubectl create secret docker-registry streamparksecret +--docker-server=registry-vpc.cn-zhangjiakou.aliyuncs.com +--docker-username=xxxxxx +--docker-password=xxxxxx -n streampark ``` 创建挂载 checkpoint/savepoint 的 pvc 资源,基于阿里云的对象存储OSS做K8S的持久化 @@ -363,7 +363,7 @@ Execution Mode :kubernetes application Flink Version :flink-1.16.0-scala-2.12 -Kubernetes Namespace :streamx +Kubernetes Namespace :streampark Kubernetes ClusterId :(任务名自定义即可) @@ -872,7 +872,7 @@ WITH ( ![](/blog/bondex/loki.png) -java.util.concurrent.TimeoutException: Invocation of [RemoteRpcInvocation(JobMasterGateway.updateTaskExecutionState(TaskExecutionState))] at recipient [akka.tcp://flink@fts-business-order-count.streamx:6123/user/rpc/jobmanager_2] timed out. This is usually caused by: 1) Akka failed sending the message silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs. 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase akka.ask.timeout.\n" +java.util.concurrent.TimeoutException: Invocation of [RemoteRpcInvocation(JobMasterGateway.updateTaskExecutionState(TaskExecutionState))] at recipient [akka.tcp://flink@fts-business-order-count.streampark:6123/user/rpc/jobmanager_2] timed out. This is usually caused by: 1) Akka failed sending the message silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs. 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase akka.ask.timeout.\n" 初步判断应该是由于以上2个原因导致触发了 akka 的超时机制,那就调整集群的akka超时间配置和进行单个任务拆分或者调大资源配置。