-
Notifications
You must be signed in to change notification settings - Fork 172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
mysql数据接收到kafka topic的时候,ddl语句都被过滤掉了,不会存储到topic中. #269
Comments
目前确实是这样实现的。主要原因是消费端并不是很容易处理 DDL,除非是单 partition 的 topic。 在多 partition 的前提下,如果下发 ddl,由于不同 partition 的消费不能保证顺序,同一个组里的某个客户端消费到 ddl 时,可能有在消费 ddl 执行前数据的客户端,可能有在消费 ddl 执行后数据的客户端,无法保证 ddl 双向 barrier 的语义。 不知道你这边场景是什么样的,针对这个问题想怎么解决呢?如果有好的想法我们也很有兴趣实现。 |
我们的需求是希望将binlog实时的写入到hdfs上面去. 目前做的是准实时级别的数仓。 但是后面为了说做实时数仓,想试着说切换到kafka上面去啊。毕竟大家都是用的kafka,所以好交流些。 我们之前是为了同步binlog到hdfs上面,然后load到hive里面去.之前的做法相对来说比较粗糙,之前的做法是用Canal Server读取binlog,Canal Client解析binlog后存储到RocketMQ中,设计的是一个库一个topic,queue size是10,然后一个表存储到一个queue里面去,当有新表产生的时候根据queue size的大小进行判定均衡负载。这样做不能保证size像kafka那种每个queue能做到相对均衡,只能做到一定程度的均衡。 存储到rocketmq后用spark streaming进行消费,对一个批次的数据先进行type=ddl类型的判定,然后先对hive表进行相应的ddl操作后(当然有些操作像删除字段,修改字段名称,特定位置添加字段等,有特殊的做法.),再把type为:update,insert,delete的数据写入到hdfs上面去.。 所以为了实时接入binlog到hive,ddl语句是必须的啊 |
所以是用每个表只用一个 queue 的方式实现的对吧。跟前面说的单 partition 一个意思。 rmq 不太清楚,kafka 上这个方式吞吐量有瓶颈。我们的环境下单 partition 大概最多 1w rps,流量大一点的表就不太够用了。 不过在流量不是很大的情况下确实也是一个办法,我们增加一个按库名+表名路由 partition 的功能是不是能满足需求? |
嗯,能满足需求。。。只是这种方式目前好像设计上不是那么的友好吧。 topic partition必然会出现数据分布不均衡的现象。 但是相对来说有这种需求的场景不是说有那么高的性能上的要求吧。 我们用这种方式主要是将以前用sqoop或者别的方式抽mysql数据到hive中给替换掉,直接读取日志,在写到hive表文件中。 这样对mysql的压力要小很多了。 而且数据也是相对实时的进入到hive里面吧。 |
这个不是我们的限制,是 |
[output]
type = "async-kafka"
目标端编码规则:输出类型和版本号
- 可选
[output.config]
默认为 json
output-format = "json"
#enable-ddl = true
默认为 0.1 版本
schema-version = "0.1"
[output.config.kafka-global-config]
broker-addrs = ["cdh01:9092","cdh02:9092","cdh03:9092"]
mode = "async"
目标端 kafka SASL 配置
# - 可选
#[output.config.kafka-global-config.net.sasl]
#enable = false
#user = ""
#password = ""
kafka 路由的定义
[[output.config.routes]]
match-schema = "test_gravity"
match-table = "*"
dml-topic = "test_gravity"
The text was updated successfully, but these errors were encountered: