Skip to content

Commit

Permalink
[Improve] Partially update Kafka (#380)
Browse files Browse the repository at this point in the history
Signed-off-by: tison <[email protected]>
  • Loading branch information
tisonkun authored Jun 7, 2024
1 parent cdadddf commit 8ce5095
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 97 deletions.
87 changes: 40 additions & 47 deletions docs/connector/1-kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,59 +14,57 @@ import TabItem from '@theme/TabItem';

[Apache Flink](https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/kafka.html) integrates with the generic Kafka connector, which tries to keep up with the latest version of the Kafka client. The version of the Kafka client used by this connector may change between Flink versions. The current Kafka client is backward compatible with Kafka broker version 0.10.0 or later. For more details on Kafka compatibility, please refer to the [Apache Kafka](https://kafka.apache.org/protocol.html#protocol_compatibility) official documentation.


```xml
<dependency>
<groupId>org.apache.streampark</groupId>
<artifactId>streampark-flink-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.streampark</groupId>
<artifactId>streampark-flink-core</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.12.0</version>
</dependency>
```

In the development phase, the following dependencies are also necessary
In the development phase, the following dependencies are also necessary:

```xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
```

## Kafka Source (Consumer)

First, we introduce the standard kafka consumer approach based on the official website, the following code is taken from the [official website documentation](https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/kafka.html#kafka-consumer)
First, we introduce the standard kafka consumer approach based on the official website, the following code is taken from the [official website documentation](https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/kafka.html#kafka-consumer):

```scala
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
val stream = env.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties))

```

You can see a series of kafka connection information defined, this way the parameters are hard-coded, very insensitive, let's see how to use `StreamPark` to access `kafka` data, we just define the configuration file in the rule format and then write the code
You can see a series of kafka connection information defined. In this way, the parameters are hard-coded and insensitive. Let's see how to use StreamPark to access data in Kafka. You need to define the configuration file in the rule format, and then write the code.

### example

Expand All @@ -80,7 +78,7 @@ kafka.source:
```
:::info Cautions
The prefix `kafka.source` is fixed, and the parameters related to kafka properties must comply with the [kafka official website](http://kafka.apache.org) specification for setting the parameter key
The prefix `kafka.source` is fixed, and the parameters related to Kafka properties must comply with the official specification for setting the parameter key.
:::

<Tabs>
Expand All @@ -95,18 +93,14 @@ import org.apache.streampark.flink.core.scala.source.KafkaSource
import org.apache.flink.api.scala._
object kafkaSourceApp extends FlinkStreaming {
override def handle(): Unit = {
val source = KafkaSource().getDataStream[String]()
print(source)
}
}
```

</TabItem>

<TabItem value="Java" label="Java">

```java
Expand All @@ -119,28 +113,26 @@ import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
public class KafkaSimpleJavaApp {
public static void main(String[] args) {
StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
StreamingContext context = new StreamingContext(envConfig);
DataStream<String> source = new KafkaSource<String>(context)
.getDataStream()
.map((MapFunction<KafkaRecord<String>, String>) KafkaRecord::value);
source.print();
context.start();
}
}
```

</TabItem>
</Tabs>

### Advanced configuration parameters

`KafkaSource` is based on the Flink Kafka Connector construct a simpler kafka reading class, the constructor needs to pass `StreamingContext`, when the program starts to pass the configuration file can be, framework will automatically parse the configuration file, when `new KafkaSource` it will automatically get the relevant information from the configuration file, initialize and return a Kafka Consumer, in this case, only configuration one topic, so in the consumption of the time without specifying the topic directly by default to get this topic to consume, this is the simple example, more complex rules and read operations through the `. getDataStream()` pass parameters in the method to achieve
Let's look at the signature of the `getDataStream` method

Let's look at the signature of the `getDataStream` method:

```scala
def getDataStream[T: TypeInformation](topic: java.io.Serializable = null,
Expand All @@ -149,7 +141,8 @@ def getDataStream[T: TypeInformation](topic: java.io.Serializable = null,
strategy: WatermarkStrategy[KafkaRecord[T]] = null
): DataStream[KafkaRecord[T]]
```
The specific description of the parameters are as follows

The specific description of the parameters are as follows:

| Parameter Name | Parameter Type | Description | Default |
|:---------------|:----------------------|:--------------------------------------|:-------------------------------------|
Expand All @@ -158,7 +151,7 @@ The specific description of the parameters are as follows
| `deserializer` | DeserializationSchema | deserialize class of the data in the topic | KafkaStringDeserializationSchema |
| `strategy` | WatermarkStrategy | watermark generation strategy | |

Let's take a look at more usage and configuration methods
Let's take a look at more usage and configuration methods.

<div class="counter">

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,60 +15,61 @@ Apache StreamPark 中 `KafkaSource` 和 `KafkaSink` 基于官网的 Kafka Connec
[Apache Flink](https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/kafka.html) 集成了通用的 Kafka 连接器,它会尽力与 Kafka client 的最新版本保持同步。该连接器使用的 Kafka client 版本可能会在 Flink 版本之间发生变化。当前 Kafka client 向后兼容 0.10.0 或更高版本的 Kafka broker。有关 Kafka 兼容性的更多细节,请参考 [Apache Kafka](https://kafka.apache.org/protocol.html#protocol_compatibility) 的官方文档。

```xml
<!--必须要导入的依赖-->
<dependency>
<groupId>org.apache.streampark</groupId>
<artifactId>streampark-flink-core</artifactId>
<version>${project.version}</version>
</dependency>

<!--flink-connector-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.12.0</version>
</dependency>
<!--必须要导入的依赖-->
<dependency>
<groupId>org.apache.streampark</groupId>
<artifactId>streampark-flink-core</artifactId>
<version>${project.version}</version>
</dependency>

<!--flink-connector-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.12.0</version>
</dependency>

```

同时在开发阶段,以下的依赖也是必要的
同时,在开发阶段,以下依赖也是必要的

```xml
<!--以下scope为provided的依赖也是必须要导入的-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!--以下 scope 为 provided 的依赖也是必须要导入的-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

```

## Kafka Source (Consumer)
先介绍基于官网的标准的kafka consumer的方式,以下代码摘自 [Apache Kafka 官网文档](https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/kafka.html#kafka-consumer)

先介绍基于官网的标准的 kafka 消费方式,以下代码摘自 [Apache Kafka 官网文档](https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/kafka.html#kafka-consumer)

```scala
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
val stream = env.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties))

```
可以看到一上来定义了一堆kafka的连接信息,这种方式各项参数都是硬编码的方式写死的,非常的不灵敏,下面我们来看看如何用`StreamPark`接入 `kafka`的数据,只需要按照规定的格式定义好配置文件然后编写代码即可,配置和代码如下

可以看到,一上来定义了一堆 Kafka 的连接信息。这种方式下,各项参数都是硬编码的方式写死的,非常不灵敏。下面,我们来看看如何用 StreamPark 接入 Kafka 的数据。只需要按照规定的格式定义好配置文件然后编写代码即可,配置和代码介绍如下。

### 基础消费示例

Expand All @@ -82,7 +83,7 @@ kafka.source:
```
:::info 注意事项
`kafka.source`这个前缀是固定的,kafka properties相关的参数必须遵守 [Apache Kafka 官网文档](http://kafka.apache.org) 对参数key的设置规范
`kafka.source` 这个前缀是固定的。Kafka properties 相关的参数必须遵守 [Apache Kafka 官网文档](http://kafka.apache.org) 对参数的设置规范。
:::

<Tabs>
Expand All @@ -97,18 +98,14 @@ import org.apache.streampark.flink.core.scala.source.KafkaSource
import org.apache.flink.api.scala._
object kafkaSourceApp extends FlinkStreaming {
override def handle(): Unit = {
val source = KafkaSource().getDataStream[String]()
print(source)
}
}
```

</TabItem>

<TabItem value="Java" label="Java">

```java
Expand All @@ -121,28 +118,28 @@ import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
public class KafkaSimpleJavaApp {
public static void main(String[] args) {
StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
StreamingContext context = new StreamingContext(envConfig);
DataStream<String> source = new KafkaSource<String>(context)
.getDataStream()
.map((MapFunction<KafkaRecord<String>, String>) KafkaRecord::value);
source.print();
context.start();
}
}
```

</TabItem>
</Tabs>

### 高级配置参数

`KafkaSource`是基于Flink Kafka Connector封装一个更简单的kafka读取类,构造方法里需要传入`StreamingContext`,当程序启动时传入配置文件即可,框架会自动解析配置文件,在`new KafkaSource`的时候会自动的从配置文件中获取相关信息,初始化并返回一个Kafka Consumer,在这里topic下只配置了一个topic,因此在消费的时候不用指定topic直接默认获取这个topic来消费, 这只是一个最简单的例子,更多更复杂的规则和读取操作则要通过`.getDataStream()`在该方法里传入参数才能实现
我们看看`getDataStream`这个方法的签名
`KafkaSource` 是基于 Flink Kafka Connector 封装一个更简单的 Kafka 读取类。其构造方法需要传入 `StreamingContext`。当程序启动时,传入配置文件即可,框架会自动解析配置文件。

在 `new KafkaSource` 的时候,会自动的从配置文件中获取相关信息,初始化并返回一个 Kafka Consumer 实例。上面例子只配置了一个 topic,因此在消费的时候不用指定 topic 直接默认获取这个 topic 来消费,。这只是一个最简单的例子,更多更复杂的规则和读取操作则要通过 `.getDataStream()` 在该方法里传入参数才能实现。

我们看看 `getDataStream` 这个方法的签名:

```scala
def getDataStream[T: TypeInformation](topic: java.io.Serializable = null,
Expand All @@ -151,7 +148,8 @@ def getDataStream[T: TypeInformation](topic: java.io.Serializable = null,
strategy: WatermarkStrategy[KafkaRecord[T]] = null
): DataStream[KafkaRecord[T]]
```
参数具体作用如下

参数具体作用如下:

| 参数名 | 参数类型 |作用 | 默认值|
| :-----| :---- | :---- | :---- |
Expand Down

0 comments on commit 8ce5095

Please sign in to comment.