From 8ce5095651520a4ab9533bd18a06c4d31e7e39bc Mon Sep 17 00:00:00 2001 From: tison Date: Fri, 7 Jun 2024 21:44:08 +0800 Subject: [PATCH] [Improve] Partially update Kafka (#380) Signed-off-by: tison --- docs/connector/1-kafka.md | 87 ++++++++-------- .../current/connector/1-kafka.md | 98 +++++++++---------- 2 files changed, 88 insertions(+), 97 deletions(-) diff --git a/docs/connector/1-kafka.md b/docs/connector/1-kafka.md index d5f8a6af6..5ca741088 100644 --- a/docs/connector/1-kafka.md +++ b/docs/connector/1-kafka.md @@ -14,59 +14,57 @@ import TabItem from '@theme/TabItem'; [Apache Flink](https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/kafka.html) integrates with the generic Kafka connector, which tries to keep up with the latest version of the Kafka client. The version of the Kafka client used by this connector may change between Flink versions. The current Kafka client is backward compatible with Kafka broker version 0.10.0 or later. For more details on Kafka compatibility, please refer to the [Apache Kafka](https://kafka.apache.org/protocol.html#protocol_compatibility) official documentation. - ```xml - - org.apache.streampark - streampark-flink-core - ${project.version} - + + org.apache.streampark + streampark-flink-core + ${project.version} + - - org.apache.flink - flink-connector-kafka_2.11 - 1.12.0 - + + org.apache.flink + flink-connector-kafka_2.11 + 1.12.0 + ``` -In the development phase, the following dependencies are also necessary +In the development phase, the following dependencies are also necessary: ```xml - - org.apache.flink - flink-scala_${scala.binary.version} - ${flink.version} - provided - - - - org.apache.flink - flink-clients_${scala.binary.version} - ${flink.version} - provided - - - - org.apache.flink - flink-streaming-scala_${scala.binary.version} - ${flink.version} - provided - + + org.apache.flink + flink-scala_${scala.binary.version} + ${flink.version} + provided + + + + org.apache.flink + flink-clients_${scala.binary.version} + ${flink.version} + provided + + + + org.apache.flink + flink-streaming-scala_${scala.binary.version} + ${flink.version} + provided + ``` ## Kafka Source (Consumer) -First, we introduce the standard kafka consumer approach based on the official website, the following code is taken from the [official website documentation](https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/kafka.html#kafka-consumer) +First, we introduce the standard kafka consumer approach based on the official website, the following code is taken from the [official website documentation](https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/kafka.html#kafka-consumer): ```scala val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") properties.setProperty("group.id", "test") val stream = env.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)) - ``` -You can see a series of kafka connection information defined, this way the parameters are hard-coded, very insensitive, let's see how to use `StreamPark` to access `kafka` data, we just define the configuration file in the rule format and then write the code +You can see a series of kafka connection information defined. In this way, the parameters are hard-coded and insensitive. Let's see how to use StreamPark to access data in Kafka. You need to define the configuration file in the rule format, and then write the code. ### example @@ -80,7 +78,7 @@ kafka.source: ``` :::info Cautions -The prefix `kafka.source` is fixed, and the parameters related to kafka properties must comply with the [kafka official website](http://kafka.apache.org) specification for setting the parameter key +The prefix `kafka.source` is fixed, and the parameters related to Kafka properties must comply with the official specification for setting the parameter key. ::: @@ -95,18 +93,14 @@ import org.apache.streampark.flink.core.scala.source.KafkaSource import org.apache.flink.api.scala._ object kafkaSourceApp extends FlinkStreaming { - override def handle(): Unit = { val source = KafkaSource().getDataStream[String]() print(source) } - } - ``` - ```java @@ -119,28 +113,26 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; public class KafkaSimpleJavaApp { - public static void main(String[] args) { StreamEnvConfig envConfig = new StreamEnvConfig(args, null); StreamingContext context = new StreamingContext(envConfig); DataStream source = new KafkaSource(context) .getDataStream() .map((MapFunction, String>) KafkaRecord::value); - source.print(); - context.start(); } } - ``` + ### Advanced configuration parameters `KafkaSource` is based on the Flink Kafka Connector construct a simpler kafka reading class, the constructor needs to pass `StreamingContext`, when the program starts to pass the configuration file can be, framework will automatically parse the configuration file, when `new KafkaSource` it will automatically get the relevant information from the configuration file, initialize and return a Kafka Consumer, in this case, only configuration one topic, so in the consumption of the time without specifying the topic directly by default to get this topic to consume, this is the simple example, more complex rules and read operations through the `. getDataStream()` pass parameters in the method to achieve -Let's look at the signature of the `getDataStream` method + +Let's look at the signature of the `getDataStream` method: ```scala def getDataStream[T: TypeInformation](topic: java.io.Serializable = null, @@ -149,7 +141,8 @@ def getDataStream[T: TypeInformation](topic: java.io.Serializable = null, strategy: WatermarkStrategy[KafkaRecord[T]] = null ): DataStream[KafkaRecord[T]] ``` -The specific description of the parameters are as follows + +The specific description of the parameters are as follows: | Parameter Name | Parameter Type | Description | Default | |:---------------|:----------------------|:--------------------------------------|:-------------------------------------| @@ -158,7 +151,7 @@ The specific description of the parameters are as follows | `deserializer` | DeserializationSchema | deserialize class of the data in the topic | KafkaStringDeserializationSchema | | `strategy` | WatermarkStrategy | watermark generation strategy | | -Let's take a look at more usage and configuration methods +Let's take a look at more usage and configuration methods.
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/connector/1-kafka.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/connector/1-kafka.md index a98122989..846da3d46 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/connector/1-kafka.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/connector/1-kafka.md @@ -15,60 +15,61 @@ Apache StreamPark 中 `KafkaSource` 和 `KafkaSink` 基于官网的 Kafka Connec [Apache Flink](https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/kafka.html) 集成了通用的 Kafka 连接器,它会尽力与 Kafka client 的最新版本保持同步。该连接器使用的 Kafka client 版本可能会在 Flink 版本之间发生变化。当前 Kafka client 向后兼容 0.10.0 或更高版本的 Kafka broker。有关 Kafka 兼容性的更多细节,请参考 [Apache Kafka](https://kafka.apache.org/protocol.html#protocol_compatibility) 的官方文档。 ```xml - - - org.apache.streampark - streampark-flink-core - ${project.version} - - - - - org.apache.flink - flink-connector-kafka_2.11 - 1.12.0 - + + + org.apache.streampark + streampark-flink-core + ${project.version} + + + + + org.apache.flink + flink-connector-kafka_2.11 + 1.12.0 + ``` -同时在开发阶段,以下的依赖也是必要的: +同时,在开发阶段,以下依赖也是必要的: ```xml - - - org.apache.flink - flink-scala_${scala.binary.version} - ${flink.version} - provided - - - - org.apache.flink - flink-clients_${scala.binary.version} - ${flink.version} - provided - - - - org.apache.flink - flink-streaming-scala_${scala.binary.version} - ${flink.version} - provided - + + + org.apache.flink + flink-scala_${scala.binary.version} + ${flink.version} + provided + + + + org.apache.flink + flink-clients_${scala.binary.version} + ${flink.version} + provided + + + + org.apache.flink + flink-streaming-scala_${scala.binary.version} + ${flink.version} + provided + ``` ## Kafka Source (Consumer) -先介绍基于官网的标准的kafka consumer的方式,以下代码摘自 [Apache Kafka 官网文档](https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/kafka.html#kafka-consumer) + +先介绍基于官网的标准的 kafka 消费方式,以下代码摘自 [Apache Kafka 官网文档](https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/kafka.html#kafka-consumer) ```scala val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") properties.setProperty("group.id", "test") val stream = env.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)) - ``` -可以看到一上来定义了一堆kafka的连接信息,这种方式各项参数都是硬编码的方式写死的,非常的不灵敏,下面我们来看看如何用`StreamPark`接入 `kafka`的数据,只需要按照规定的格式定义好配置文件然后编写代码即可,配置和代码如下 + +可以看到,一上来定义了一堆 Kafka 的连接信息。这种方式下,各项参数都是硬编码的方式写死的,非常不灵敏。下面,我们来看看如何用 StreamPark 接入 Kafka 的数据。只需要按照规定的格式定义好配置文件然后编写代码即可,配置和代码介绍如下。 ### 基础消费示例 @@ -82,7 +83,7 @@ kafka.source: ``` :::info 注意事项 -`kafka.source`这个前缀是固定的,kafka properties相关的参数必须遵守 [Apache Kafka 官网文档](http://kafka.apache.org) 对参数key的设置规范 +`kafka.source` 这个前缀是固定的。Kafka properties 相关的参数必须遵守 [Apache Kafka 官网文档](http://kafka.apache.org) 对参数的设置规范。 ::: @@ -97,18 +98,14 @@ import org.apache.streampark.flink.core.scala.source.KafkaSource import org.apache.flink.api.scala._ object kafkaSourceApp extends FlinkStreaming { - override def handle(): Unit = { val source = KafkaSource().getDataStream[String]() print(source) } - } - ``` - ```java @@ -121,28 +118,28 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; public class KafkaSimpleJavaApp { - public static void main(String[] args) { StreamEnvConfig envConfig = new StreamEnvConfig(args, null); StreamingContext context = new StreamingContext(envConfig); DataStream source = new KafkaSource(context) .getDataStream() .map((MapFunction, String>) KafkaRecord::value); - source.print(); - context.start(); } } - ``` + ### 高级配置参数 -`KafkaSource`是基于Flink Kafka Connector封装一个更简单的kafka读取类,构造方法里需要传入`StreamingContext`,当程序启动时传入配置文件即可,框架会自动解析配置文件,在`new KafkaSource`的时候会自动的从配置文件中获取相关信息,初始化并返回一个Kafka Consumer,在这里topic下只配置了一个topic,因此在消费的时候不用指定topic直接默认获取这个topic来消费, 这只是一个最简单的例子,更多更复杂的规则和读取操作则要通过`.getDataStream()`在该方法里传入参数才能实现 -我们看看`getDataStream`这个方法的签名 +`KafkaSource` 是基于 Flink Kafka Connector 封装一个更简单的 Kafka 读取类。其构造方法需要传入 `StreamingContext`。当程序启动时,传入配置文件即可,框架会自动解析配置文件。 + +在 `new KafkaSource` 的时候,会自动的从配置文件中获取相关信息,初始化并返回一个 Kafka Consumer 实例。上面例子只配置了一个 topic,因此在消费的时候不用指定 topic 直接默认获取这个 topic 来消费,。这只是一个最简单的例子,更多更复杂的规则和读取操作则要通过 `.getDataStream()` 在该方法里传入参数才能实现。 + +我们看看 `getDataStream` 这个方法的签名: ```scala def getDataStream[T: TypeInformation](topic: java.io.Serializable = null, @@ -151,7 +148,8 @@ def getDataStream[T: TypeInformation](topic: java.io.Serializable = null, strategy: WatermarkStrategy[KafkaRecord[T]] = null ): DataStream[KafkaRecord[T]] ``` -参数具体作用如下 + +参数具体作用如下: | 参数名 | 参数类型 |作用 | 默认值| | :-----| :---- | :---- | :---- |