Skip to content

Commit

Permalink
docs improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
wolfboys committed Oct 4, 2023
1 parent c0212af commit 394d5c6
Show file tree
Hide file tree
Showing 13 changed files with 194 additions and 181 deletions.
131 changes: 23 additions & 108 deletions blog/StreamPark 在 Joyme 的生产实践.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
# StreamPark 在 Joyme 的生产实践!
---
slug: streampark-usercase-joyme
title: StreamPark 在 Joyme 的生产实践
tags: [StreamPark, 生产实践, FlinkSQL]
---

**摘要:**本文带来 StreamPark 在 Joyme 中的生产实践, 作者是 Joyme 的大数据工程师秦基勇, 主要内容为:
<br/>

1. 遇见StreamPark
2. Flink Sql 作业开发
3. Custom code 作业开发
4. 监控告警
5. 常见问题
6. 社区印象
7. 总结
**摘要:** 本文带来 StreamPark 在 Joyme 中的生产实践, 作者是 Joyme 的大数据工程师秦基勇, 主要内容为:

- 遇见StreamPark
- Flink Sql 作业开发
- Custom code 作业开发
- 监控告警
- 常见问题
- 社区印象
- 总结


## 01 遇见 StreamPark
## 1 遇见 StreamPark

遇见 StreamPark 是必然的,基于我们现有的实时作业开发模式,不得不寻找一个开源的平台来支撑我司的实时业务。我们的现状如下:

Expand All @@ -25,15 +29,15 @@

第一次遇见 StreamPark 就基本确定了,我们根据官网的文档快速进行了部署安装,搭建以后进行了一些操作,界面友好,Flink 多版本支持,权限管理,作业监控等一系列功能已能较好的满足我们的需求,进一步了解到其社区也很活跃,从 1.1.0 版本开始见证了 StreamPark 功能完善的过程,开发团队是非常有追求的,相信会不断的完善。

## 02 Flink SQL 作业开发
## 2 Flink SQL 作业开发

Flink Sql 开发模式带来了很大的便利,对于一些简单的指标开发,只需要简单的 Sql 就可以完成,不需要写一行代码。Flink Sql 方便了很多同学的开发工作,毕竟一些做仓库的同学在编写代码方面还是有些难度。

打开 StreamPark 的任务新增界面进行添加新任务,默认 Development Mode 就是 Flink Sql 模式。直接在 Flink Sql 部分编写Sql 逻辑。

Flink Sql 部分,按照 Flink 官网的文档逐步编写逻辑 Sql 即可,对于我司来说,一般就三部分: 接入 Source ,中间逻辑处理,最后 Sink。基本上 Source 都是消费 kafka 的数据,逻辑处理层会有关联 MySQL 去做维表查询,最后 Sink 部分大多是 Es,Redis,MySQL。

#### **1. 编写SQL**
### **1. 编写SQL**

```sql
-- 连接kafka
Expand Down Expand Up @@ -96,7 +100,7 @@ SELECT Data.uid FROM source_table;

![](/blog/Joyme/application_job.png)

## 03 Custom code 作业开发
## 3 Custom Code 作业开发

Streaming 作业我们是使用 Flink java 进行开发,将之前 Spark scala,Flink scala,Flink java 的作业进行了重构,然后工程整合到了一起,目的就是为了维护起来方便。Custom code 作业需要提交代码到 Git,然后配置项目:

Expand All @@ -114,7 +118,7 @@ Streaming 作业我们是使用 Flink java 进行开发,将之前 Spark scala

![](/blog/Joyme/application_interface.png)

## 04 监控告警
## 4 监控告警

StreamPark 的监控需要在 setting 模块去配置发送邮件的基本信息。

Expand All @@ -130,96 +134,7 @@ StreamPark 的监控需要在 setting 模块去配置发送邮件的基本信息

关于报警这一块目前我们基于 StreamPark 的 t_flink_app 表进行了一个定时任务的开发。为什么要这么做?因为发送邮件这种通知,大部分人可能不会去及时去看。所以我们选择监控每个任务的状态去把对应的监控信息发送我们的飞书报警群,这样可以及时发现问题去解决任务。一个简单的 python 脚本,然后配置了 crontab 去定时执行。

```python

import MySQLdb
import json
import requests


def connect_mysql():
db = MySQLdb.connect("mysqlhost", "database", "password", "dstream", charset='utf8')

cursor = db.cursor()

cursor.execute("select STATE,JOB_NAME,ALERT_EMAIL from t_flink_app where state in (-9,15,8,9)")

data = cursor.fetchall()

db.close()
return data


def alert( data):
for row in data:
send(row)


def send(row):
webhook = '飞书机器人的hook地址'
payload_mes = {undefined
"msg_type": "text",
"content": {undefined
"text": '作业:' + row[1].encode('utf-8') + ',状态:' + row[0].encode('utf-8')
}
}

state='KILLED'

if int(row[0].encode('utf-8')) == 8:
state='FAILING'
elif int(row[0].encode('utf-8')) == 9:
state='FAILED'
elif int(row[0].encode('utf-8')) == 15:
state='LOST'
else:
state='KILLED'

email=row[2]

if email is None:
email='-'

payload_mes = {undefined
"msg_type": "post",
"content": {undefined
"post": {undefined
"zh_cn": {undefined
"title": "flink实时作业告警",
"content": [
[{undefined
"tag": "text",
"text": "作业名称: " + row[1].encode('utf-8')
}],
[ {undefined
"tag": "text",
"text": "作业状态: " + state
}],
[{undefined
"tag": "text",
"text": "报警邮件: " + email.encode('utf-8')
}]
]
}
}
}
}
headers = {undefined
'Content-Type': 'application/json'
}

res = requests.request('POST', webhook, headers=headers, data=json.dumps(payload_mes))

print(res.text)

if __name__ == '__main__':
data = connect_mysql()
alert(data)
print(data)

```

## 05 常见问题
## 5 常见问题

关于作业的异常问题,我们归纳分析了基本分为这么几种情况:

Expand All @@ -235,12 +150,12 @@ if __name__ == '__main__':

![](/blog/Joyme/yarn_log.png)

## 06 社区印象
## 6 社区印象

很多时候我们在 StreamPark 用户群里讨论问题,都会得到社区小伙伴的即时响应。提交的一些 issue 在当下不能解决的,基本也会在下一个版本或者最新的代码分支中进行修复。在群里,我们也看到很多不是社区的小伙伴,也在积极互相帮助去解决问题。群里也有很多其他社区的大佬,很多小伙伴也积极加入了社区的开发工作。整个社区给我的感觉还是很活跃!

## 07 总结
## 7 总结

目前我司线上运行 60个实时作业,Flink sql 与 Custom-code 差不多各一半。后续也会有更多的实时任务进行上线。很多同学都会担心 StreamPark 稳不稳定的问题,就我司根据几个月的生产实践而言,StreamPark 只是一个帮助你开发作业,部署,监控和管理的一个平台。到底稳不稳,还是要看自家的 Hadoop yarn 集群稳不稳定(我们用的onyan模式),其实已经跟 StreamPark关系不大了。还有就是你写的 Flink Sql 或者是代码健不健壮。更多的是这两方面应该是大家要考虑的,这两方面没问题再充分利用 StreamPark 的灵活性才能让作业更好的运行,单从一方面说 StreamPark 稳不稳定,实属偏激。
目前我司线上运行 60 个实时作业,Flink sql 与 Custom-code 差不多各一半。后续也会有更多的实时任务进行上线。很多同学都会担心 StreamPark 稳不稳定的问题,就我司根据几个月的生产实践而言,StreamPark 只是一个帮助你开发作业,部署,监控和管理的一个平台。到底稳不稳,还是要看自家的 Hadoop yarn 集群稳不稳定(我们用的onyan模式),其实已经跟 StreamPark关系不大了。还有就是你写的 Flink Sql 或者是代码健不健壮。更多的是这两方面应该是大家要考虑的,这两方面没问题再充分利用 StreamPark 的灵活性才能让作业更好的运行,单从一方面说 StreamPark 稳不稳定,实属偏激。

以上就是 StreamPark 在乐我无限的全部分享内容,感谢大家看到这里。非常感谢 StreamPark 提供给我们这么优秀的产品,这就是做的利他人之事。从1.0 到 1.2.1 平时遇到那些bug都会被即时的修复,每一个issue都被认真对待。目前我们还是 onyarn的部署模式,重启yarn还是会导致作业的lost状态,重启yarn也不是天天都干的事,关于这个社区也会尽早的会去修复这个问题。相信 StreamPark 会越来越好,未来可期。
6 changes: 6 additions & 0 deletions blog/StreamPark 在顺网科技的大规模生产实践.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
---
slug: streampark-usercase-shunwang
title: StreamPark 在顺网科技的大规模生产实践
tags: [StreamPark, 生产实践, FlinkSQL]
---

# StreamPark 在顺网科技的大规模生产实践

![](/blog/SF/autor.png)
Expand Down
6 changes: 6 additions & 0 deletions blog/联通 Flink 实时计算平台化运维实践.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
---
slug: streampark-usercase-chinaunion
title: 联通 Flink 实时计算平台化运维实践
tags: [StreamPark, 生产实践, FlinkSQL]
---

# 联通 Flink 实时计算平台化运维实践

**摘要:**本文整理自联通数科实时计算团队负责人、Apache StreamPark Committer 穆纯进在 Flink Forward Asia 2022 平台建设专场的分享,本篇内容主要分为四个部分:
Expand Down
49 changes: 46 additions & 3 deletions docs/components/TableData.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,49 @@ const ClientTables = () => {
};


const ClientEnvs = () => {

const DeploymentEnvs = () => {
return (

<div>
<table className="table-data" style={{width: '100%', display: 'inline-table'}}>
<thead>
<tr>
<td>Item</td>
<td>Version</td>
<td>Required</td>
<td>Other</td>
</tr>
</thead>
<tbody>
{
dataSource.deploymentEnvs.map((item, i) => (
<tr key={i}>
<td>
<span className="label-info">{item.name}</span>
</td>
<td>{item.version}</td>
<td>
{
item.required
?
<span className="icon-toggle-on" title="Required"></span>
:
<span className="icon-toggle-off" title="Optional"></span>
}
</td>
<td>{item.other}</td>
</tr>
))
}
</tbody>
</table>
</div>

);
};

const DevelopmentEnvs = () => {
return (

<div>
Expand All @@ -341,7 +383,7 @@ const ClientEnvs = () => {
</thead>
<tbody>
{
dataSource.envs.map((item, i) => (
dataSource.developmentEnvs.map((item, i) => (
<tr key={i}>
<td>
<span className="label-info">{item.name}</span>
Expand Down Expand Up @@ -378,5 +420,6 @@ export {
ClientFixedDelay,
ClientFailureRate,
ClientTables,
ClientEnvs
DevelopmentEnvs,
DeploymentEnvs
};
27 changes: 19 additions & 8 deletions docs/components/data.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,24 @@ export default {
{name: 'catalog', desc: 'Catalog,Specifies that the will be used during initialization', value: ''},
{name: 'database', desc: 'Database,Specifies that the will be used during initialization', value: ''},
],
envs: [
{name: 'Operating System', version: 'Linux', required: true, other: 'UnSupport Windows'},

deploymentEnvs: [
{name: 'OS', version: 'Linux', required: true, other: 'UnSupport Windows'},
{name: 'JAVA', version: '1.8+', required: true, other: null},
{name: 'Maven', version: '3+', required: false, other: 'Optionally install Maven'},
{name: 'Node.js', version: '', required: true, other: 'Node environment'},
{name: 'Flink', version: '1.12.0+', required: true, other: 'The version must be 1.12+'},
{name: 'Hadoop', version: '2+', required: false, other: 'Optional, If on yarn, hadoop environment is required.'},
{name: 'MySQL', version: '5.6+', required: false, other: 'Optionally install MySQL'}
]
{name: 'MySQL', version: '5.6+', required: true, other: null},
{name: 'Flink', version: '1.12.0+', required: true, other: 'Flink version >= 1.12'},
{name: 'Hadoop', version: '2+', required: false, other: 'Optional, If on yarn, hadoop envs is required.'},
],

developmentEnvs: [
{name: 'OS', version: 'Linux', required: false, other: 'Supports Windows, recommended to use Mac/Linux.'},
{name: 'IDE', version: 'Intellij IDEA', required: false, other: 'Recommended to use Intellij IDEA'},
{name: 'JAVA', version: '1.8 +', required: true, other: null},
{name: 'Scala', version: '2.12.x', required: true, other: null},
{name: 'Nodejs', version: '5.6 +', required: true, other: 'Node >=16.15.1 <= 18, https://nodejs.org'},
{name: 'pnpm', version: '7.11.2', required: true, other: 'npm install -g pnpm'},
{name: 'Flink', version: '1.12.0 +', required: true, other: 'Flink >= 1.12, just download and unpack it.'},
{name: 'MySQL', version: '5.6 +', required: false, other: null},
{name: 'Hadoop', version: '2 +', required: false, other: 'Optional, If on yarn, hadoop envs is required.'},
],
}
10 changes: 2 additions & 8 deletions docs/user-guide/1-deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ title: 'Platform Deployment'
sidebar_position: 1
---

import { ClientEnvs } from '../components/TableData.jsx';
import { DeploymentEnvs } from '../components/TableData.jsx';

The overall component stack structure of StreamPark is as follows. It consists of two major parts: streampark-core and streampark-console. streampark-console is a very important module, positioned as a **integrated real-time data platform**, ** streaming data warehouse Platform**, **Low Code**, **Flink & Spark task hosting platform**, can better manage Flink tasks, integrate project compilation, publishing, parameter configuration, startup, savepoint, flame graph ( flame graph ), Flink SQL, monitoring and many other functions are integrated into one, which greatly simplifies the daily operation and maintenance of Flink tasks and integrates many best practices. Its ultimate goal is to create a one-stop big data solution that integrates real-time data warehouses and batches

Expand All @@ -14,13 +14,7 @@ streampark-console provides an out-of-the-box installation package. Before insta

## Environmental requirements

<ClientEnvs></ClientEnvs>

:::tip Notice
The versions before (including) StreamPark 1.2.2 only support `scala 2.11`. Do not check the corresponding `scala` version when using `flink`
Versions after (including) 1.2.3, support both `scala 2.11` and `scala 2.12` versions
:::

<DeploymentEnvs></DeploymentEnvs>

At present, StreamPark has released tasks for Flink, and supports both `Flink on YARN` and `Flink on Kubernetes` modes.

Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
---
id: 'local development and debugging'
title: 'Local Development and Debugging'
id: 'development'
title: 'Development Guide'
sidebar_position: 3
---

### Environment Requirements

- Maven 3.6+
- nodejs (version >= 16.14)
- npm 7.11.2 ( https://nodejs.org/en/ )
- pnpm (npm install -g pnpm)
- JDK 1.8+
- Scala 2.12.x
import { DevelopmentEnvs } from '../components/TableData.jsx';

<DevelopmentEnvs></DevelopmentEnvs>

### Clone the Source Code

Expand All @@ -26,8 +23,6 @@ cd incubator-streampark/
./build.sh
```

![Build Success](/doc/image/streampark_build_success.png)

### Open the Project

Here, we are using `idea` to open the project.
Expand All @@ -40,12 +35,12 @@ open -a /Applications/IntelliJ\ IDEA\ CE.app/ ./

```bash
cd ./dist
tar -zxvf apache-streampark-2.2.0-SNAPSHOT-incubating-bin.tar.gz
tar -zxvf apache-streampark-2.2.0-incubating-bin.tar.gz
```

### Copy the Path

Copy the path of the extracted directory, for example: `/Users/user/IdeaProjects/incubator-streampark/dist/apache-streampark_2.12-2.2.0-SNAPSHOT-incubating-bin`
Copy the path of the extracted directory, for example: `${workspace}/incubator-streampark/dist/apache-streampark-2.2.0-incubating-bin`

### Start the Backend Service

Expand All @@ -58,7 +53,7 @@ Modify the launch configuration
Check `Add VM options`, and input the parameter `-Dapp.home=$path`, where `$path` is the path we just copied.

```bash
-Dapp.home=/Users/user/IdeaProjects/incubator-streampark/dist/apache-streampark_2.12-2.2.0-SNAPSHOT-incubating-bin
-Dapp.home=${workspace}/incubator-streampark/dist/apache-streampark-2.2.0-incubating-bin
```

![Streampark Run Config](/doc/image/streampark_run_config.jpeg)
Expand Down
Loading

0 comments on commit 394d5c6

Please sign in to comment.