Skip to content

Commit

Permalink
Update 1-kafka.md (#376)
Browse files Browse the repository at this point in the history
Content duplication
  • Loading branch information
WickLiu authored May 26, 2024
1 parent efe0a63 commit 083c34a
Showing 1 changed file with 0 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -897,96 +897,6 @@ class JavaUser implements Serializable {
</TabItem>
</Tabs>

### 指定SerializationSchema

` Flink Kafka Producer` 需要知道如何将 Java/Scala 对象转化为二进制数据。 KafkaSerializationSchema 允许用户指定这样的schema, 相关操作方式和文档请参考[官网文档](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/kafka.html#the-serializationschema)

在`KafkaSink`里默认不指定序列化方式,采用的是`SimpleStringSchema`来进行序列化,这里开发者可以显示的指定一个自定义的序列化器,通过`serializationSchema`参数指定即可,例如,将`user`对象安装自定义的格式写入`kafka`

<Tabs>
<TabItem value="scala" label="Scala" default>

```scala
import org.apache.streampark.common.util.JsonUtils
import org.apache.streampark.flink.core.scala.FlinkStreaming
import org.apache.flink.api.common.serialization.SerializationSchema
import org.apache.streampark.flink.core.scala.sink.JdbcSink
import org.apache.streampark.flink.core.scala.source.KafkaSource
import org.apache.flink.api.scala._
object KafkaSinkApp extends FlinkStreaming {
override def handle(): Unit = {
val source = KafkaSource()
.getDataStream[String]()
.map(x => JsonUtils.read[User](x.value))
KafkaSink().sink[User](source, serialization = new SerializationSchema[User]() {
override def serialize(user: User): Array[Byte] = {
s"${user.name},${user.age},${user.gender},${user.address}".getBytes
}
})
}
}
case class User(name: String, age: Int, gender: Int, address: String)
```

</TabItem>
<TabItem value="Java" label="Java">

```java
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.streampark.flink.core.java.function.StreamEnvConfigFunction;
import org.apache.streampark.flink.core.java.sink.KafkaSink;
import org.apache.streampark.flink.core.java.source.KafkaSource;
import org.apache.streampark.flink.core.scala.StreamingContext;
import org.apache.streampark.flink.core.scala.source.KafkaRecord;
import org.apache.streampark.flink.core.scala.util.StreamEnvConfig;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import java.io.Serializable;
public class kafkaSinkJavaApp {
public static void main(String[] args) {
StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
StreamingContext context = new StreamingContext(envConfig);
ObjectMapper mapper = new ObjectMapper();
DataStream<JavaUser> source = new KafkaSource<String>(context)
.getDataStream()
.map((MapFunction<KafkaRecord<String>, JavaUser>) value ->
mapper.readValue(value.value(), JavaUser.class));
new KafkaSink<JavaUser>(context)
.serializer(
(SerializationSchema<JavaUser>) element ->
String.format("%s,%d,%d,%s", element.name, element.age, element.gender, element.address).getBytes()
).sink(source);
context.start();
}
}
class JavaUser implements Serializable {
String name;
Integer age;
Integer gender;
String address;
}
```

</TabItem>
</Tabs>


### 指定partitioner

Expand Down

0 comments on commit 083c34a

Please sign in to comment.