From 083c34a935b52aec52575fbdebe78c91d493df17 Mon Sep 17 00:00:00 2001 From: Liuqitao <982328619@qq.com> Date: Sun, 26 May 2024 21:34:29 +0800 Subject: [PATCH] Update 1-kafka.md (#376) Content duplication --- .../current/connector/1-kafka.md | 90 ------------------- 1 file changed, 90 deletions(-) diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/connector/1-kafka.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/connector/1-kafka.md index b881c0c84..a98122989 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/connector/1-kafka.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/connector/1-kafka.md @@ -897,96 +897,6 @@ class JavaUser implements Serializable { -### 指定SerializationSchema - -` Flink Kafka Producer` 需要知道如何将 Java/Scala 对象转化为二进制数据。 KafkaSerializationSchema 允许用户指定这样的schema, 相关操作方式和文档请参考[官网文档](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/kafka.html#the-serializationschema) - -在`KafkaSink`里默认不指定序列化方式,采用的是`SimpleStringSchema`来进行序列化,这里开发者可以显示的指定一个自定义的序列化器,通过`serializationSchema`参数指定即可,例如,将`user`对象安装自定义的格式写入`kafka` - - - - -```scala -import org.apache.streampark.common.util.JsonUtils -import org.apache.streampark.flink.core.scala.FlinkStreaming -import org.apache.flink.api.common.serialization.SerializationSchema -import org.apache.streampark.flink.core.scala.sink.JdbcSink -import org.apache.streampark.flink.core.scala.source.KafkaSource -import org.apache.flink.api.scala._ - -object KafkaSinkApp extends FlinkStreaming { - -override def handle(): Unit = { -val source = KafkaSource() -.getDataStream[String]() -.map(x => JsonUtils.read[User](x.value)) - - KafkaSink().sink[User](source, serialization = new SerializationSchema[User]() { - override def serialize(user: User): Array[Byte] = { - s"${user.name},${user.age},${user.gender},${user.address}".getBytes - } - }) - -} - -} - -case class User(name: String, age: Int, gender: Int, address: String) -``` - - - - -```java -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.streampark.flink.core.java.function.StreamEnvConfigFunction; -import org.apache.streampark.flink.core.java.sink.KafkaSink; -import org.apache.streampark.flink.core.java.source.KafkaSource; -import org.apache.streampark.flink.core.scala.StreamingContext; -import org.apache.streampark.flink.core.scala.source.KafkaRecord; -import org.apache.streampark.flink.core.scala.util.StreamEnvConfig; -import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.common.serialization.SerializationSchema; -import org.apache.flink.streaming.api.datastream.DataStream; - -import java.io.Serializable; - -public class kafkaSinkJavaApp { - - public static void main(String[] args) { - - StreamEnvConfig envConfig = new StreamEnvConfig(args, null); - StreamingContext context = new StreamingContext(envConfig); - ObjectMapper mapper = new ObjectMapper(); - - DataStream source = new KafkaSource(context) - .getDataStream() - .map((MapFunction, JavaUser>) value -> - mapper.readValue(value.value(), JavaUser.class)); - - new KafkaSink(context) - .serializer( - (SerializationSchema) element -> - String.format("%s,%d,%d,%s", element.name, element.age, element.gender, element.address).getBytes() - ).sink(source); - - context.start(); - } - -} - -class JavaUser implements Serializable { - String name; - Integer age; - Integer gender; - String address; -} -``` - - - - ### 指定partitioner