title | date | order | categories | tags | permalink | |||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
消息队列基本原理 |
2019-07-05 08:11:00 -0700 |
2 |
|
|
/pages/214c1fa6/ |
消息队列(Message Queue,简称 MQ)技术是应用间交换信息的一种技术。
消息队列主要解决异步处理、应用间耦合,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。
目前主流的 MQ 有:Kafka、RabbitMQ、RocketMQ、ActiveMQ,而部分数据库如 Redis、MySQL 以及 phxsql 也可实现消息队列的功能。
注意:为了简便,下文中除了文章标题,一律使用 MQ 简称。
消息队列(Message Queue,简称 MQ)技术是应用间交换信息的一种技术。
消息队列主要解决应用耦合,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。
MQ 是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取队列中的消息。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。
MQ 的数据可驻留在内存或磁盘上,直到它们被应用程序读取。通过 MQ,应用程序可独立地执行,它们不需要知道彼此的位置,不需要等待接收程序接收此消息。在分布式计算环境中,为了集成分布式应用,开发者需要对异构网络环境下的分布式应用提供有效的通信手段。为了管理需要共享的信息,对应用提供公共的信息交换机制是重要的。
目前主流的 MQ 有:Kafka、RabbitMQ、RocketMQ、ActiveMQ。
MQ 通信模型大致有以下类型:
- 点对点 - 点对点方式是最为传统和常见的通讯方式,它支持一对一、一对多、多对多、多对一等多种配置方式,支持树状、网状等多种拓扑结构。
- 多点广播 - MQ 适用于不同类型的应用。其中重要的,也是正在发展中的是"多点广播"应用,即能够将消息发送到多个目标站点 (Destination List)。可以使用一条 MQ 指令将单一消息发送到多个目标站点,并确保为每一站点可靠地提供信息。MQ 不仅提供了多点广播的功能,而且还拥有智能消息分发功能,在将一条消息发送到同一系统上的多个用户时,MQ 将消息的一个复制版本和该系统上接收者的名单发送到目标 MQ 系统。目标 MQ 系统在本地复制这些消息,并将它们发送到名单上的队列,从而尽可能减少网络的传输量。
- 发布/订阅 (Publish/Subscribe) - 发布/订阅模式使消息的分发可以突破目的队列地理位置的限制,使消息按照特定的主题甚至内容进行分发,用户或应用程序可以根据主题或内容接收到所需要的消息。发布/订阅模式使得发送者和接收者之间的耦合关系变得更为松散,发送者不必关心接收者的目的地址,而接收者也不必关心消息的发送地址,而只是根据消息的主题进行消息的收发。
- 集群 (Cluster) - 为了简化点对点通讯模式中的系统配置,MQ 提供 Cluster(集群) 的解决方案。集群类似于一个域 (Domain),集群内部的队列管理器之间通讯时,不需要两两之间建立消息通道,而是采用集群 (Cluster) 通道与其它成员通讯,从而大大简化了系统配置。此外,集群中的队列管理器之间能够自动进行负载均衡,当某一队列管理器出现故障时,其它队列管理器可以接管它的工作,从而大大提高系统的高可靠性。
MQ 可以将系统间的处理流程异步化,减少等待响应的时间,从而提高整体并发吞吐量。
一般,MQ 异步处理应用于非核心流程,例如:短信/邮件通知、数据推送、上报数据到监控中心/日志中心等。
假设这样一个场景,用户向系统 A 发起请求,系统 A 处理计算只需要 10 ms,然后通知系统 BCD 写库,系统 BCD 写库耗时分别为:100ms、200ms、300ms。最终总耗时为: 10+100ms+200ms+300ms=610ms。此外,加上请求和响应的网络传输时间,从用户角度看,可能要等待将近 1s 才能得到结果。
如果使用 MQ,系统 A 接到请求后,耗时 10ms 处理计算,然后向系统 BCD 连续发送消息,假设耗时 5ms。那么 这一过程的总耗时为 3ms + 5ms = 8ms,这相比于 610 ms,大大缩短了响应时间。至于系统 BCD 的写库操作,只要自行消费 MQ 后处理即可,用户无需关注。
通过 MQ,可以消除系统间的强耦合。它的好处在于:
- 消息的消费者系统可以随意增加,无需修改生产者系统的代码。
- 生产者系统、消费者系统彼此不会影响对方的流程。
- 如果生产者系统宕机,消费者系统收不到消息,就不会有下一步的动作。
- 如果消费者系统宕机,生产者系统让然可以正常发送消息,不影响流程。
不同系统如果要建立通信,传统的做法是:调用接口。
如果需要和新的系统建立通信或删除已建立的通信,都需要修改代码,这种方案显然耦合度很高。
如果使用 MQ,系统间的通信只需要通过发布/订阅(Pub/Sub)模型即可,彼此没有直接联系,也就不需要相互感知,从而达到 解耦。
当 上下游系统 处理能力存在差距的时候,利用 MQ 做一个 “漏斗” 模型,进行 流控。把 MQ 当成可靠的 消息暂存地,进行一定程度的 消息堆积;在下游有能力处理的时候,再发送消息。
MQ 的流量削峰常用于高并发场景(例如:秒杀、团抢等业务场景),它是缓解瞬时暴增流量的核心手段之一。
如果没有 MQ,两个系统之间通过 协商、滑动窗口、限流/降级/熔断 等复杂的方案也能实现 流控。但 系统复杂性 指数级增长,势必在上游或者下游做存储,并且要处理 定时、拥塞 等一系列问题。而且每当有 处理能力有差距 的时候,都需要 单独 开发一套逻辑来维护这套逻辑。
假设某个系统读写数据库的稳定性能为每秒处理 1000 条数据。平常情况下,远远达不到这么大的处理量。假设,因为因为做活动,系统的瞬时请求量剧增,达到每秒 10000 个并发请求,数据库根本承受不了,可能直接就把数据库给整崩溃了,这样系统服务就不可用了。
如果使用 MQ,每秒写入 10000 条请求,但是系统 A 每秒只从 MQ 中消费 1000 条请求,然后写入数据库。这样,就不会超过数据库的承受能力,而是把请求积压在 MQ 中。只要高峰期一过,系统 A 就会很快把积压的消息给处理掉。
(1)MQ 常被用于做海量数据的传输缓冲。
例如,Kafka 常被用于做为各种日志数据、采集数据的数据中转。然后,Kafka 将数据转发给 Logstash、Elasticsearch 中,然后基于 Elasticsearch 来做日志中心,提供检索、聚合、分析日志的能力。开发者可以通过 Kibana 集成 Elasticsearch 数据进行可视化展示,或自行进行定制化开发。
(2)MQ 也可以被用于流式处理。
例如,Kafka 几乎已经是流计算的数据采集端的标准组件。而流计算通过实时数据处理能力,提供了更为快捷的聚合计算能力,被大量应用于链路监控、实时监控、实时数仓、实时大屏、风控、推荐等应用领域。
最终一致性 不是 消息队列 的必备特性,但确实可以依靠 消息队列 来做 最终一致性 的事情。
- 先写消息再操作,确保操作完成后再修改消息状态。定时任务补偿机制 实现消息 可靠发送接收、业务操作的可靠执行,要注意 消息重复 与 幂等设计。
- 所有不保证
100%
不丢消息 的消息队列,理论上无法实现 最终一致性。
像
Kafka
一类的设计,在设计层面上就有 丢消息 的可能(比如 定时刷盘,如果掉电就会丢消息)。哪怕只丢千分之一的消息,业务也必须用其他的手段来保证结果正确。
消息队列一般都内置了 高效的通信机制,因此也可以用于单纯的 消息通讯,比如实现 点对点消息队列 或者 聊天室 等。
生产者/消费者 模式,只需要关心消息是否 送达队列,至于谁希望订阅和需要消费,是 下游 的事情,无疑极大地减少了开发和联调的工作量。
任何技术都会有利有弊,MQ 给整体系统架构带来很多好处,但也会付出一定的代价。
MQ 主要引入了以下问题:
- 系统可用性降低:引入了 MQ 后,通信需要基于 MQ 完成,如果 MQ 宕机,则服务不可用。因此,MQ 要保证是高可用的,详情参考:MQ 的高可用
- 系统复杂度提高:使用 MQ,需要关注一些新的问题:
- 如何保证消息没有 重复消费?
- 如何处理 消息丢失 的问题?
- 如何保证传递 消息的顺序性?
- 如何处理大量 消息积压 的问题?
- 一致性问题:假设系统 A 处理完直接返回成功的结果给用户,用户认为请求成功。但如果此时,系统 BCD 中只要有任意一个写库失败,那么数据就不一致了。这种情况如何处理?
下面,我们针对以上问题来一一分析。
如何保证消息不被重复消费 和 如何保证消息消费的幂等性 是同一个问题。
必须先明确产生重复消费的原因,才能对症下药。
重复消费问题通常不是 MQ 来处理,而是由开发来处理的。
以 Kafka 举例,Kafka 每个 Partition 都是一个有序的、不可变的记录序列,不断追加到结构化的提交日志中。Partition 中为每条记录分配一个连续的 id 号,称为偏移量(Offset),用于唯一标识 Partition 内的记录。
Kafka 的客户端和 Broker 都会保存 Offset。客户端消费消息后,每隔一段时间,就把已消费的 Offset 提交给 Kafka Broker,表示已消费。
在这个过程中,如果客户端应用消费消息后,因为宕机、重启等情况而没有提交已消费的 Offset 。当系统恢复后,会继续消费消息,由于 Offset 未提交,就会出现重复消费的问题。
应对重复消费问题,需要在业务层面,通过 幂等性设计 来解决。
MQ 重复消费不可怕,可怕的是没有应对机制,可以借鉴的思路有:
- 如果是写关系型数据库,可以先根据主键查询,判断数据是否已存在,存在则更新,不存在则插入;
- 如果是写 Redis,由于 set 操作天然具有幂等性,所以什么都不用做;
- 如果是根据消息做较复杂的逻辑处理,可以在消息中加入全局唯一 ID,例如:订单 ID 等。在客户端存储中(Mysql、Redis 等)保存已消费消息的 ID。一旦接受到新消息,先判断消息中的 ID 是否在已消费消息 ID 表中存在,存在则不再处理,不存在则处理。
在实际开发中,可以参考上面的例子,结合现实场景,设计合理的幂等性方案。
如何处理消息丢失的问题 和 如何保证消息不被重复消费 是同一个问题。关注点有:
- MQ Server 丢失数据
- 消费方丢失数据
- 生产方丢失数据
唯一可能导致消费方丢失数据的情况是:消费方设置了自动提交 Offset。一旦设置了自动提交 Offset,接受到消息后就会自动提交 Offset 给 Kafka ,Kafka 就认为消息已被消费。如果此时,消费方尚未来得及处理消息就挂了,那么消息就丢了。
解决方法就是:消费方关闭自动提交 Offset,处理完消息后手动提交 Offset。但这种情况下可能会出现重复消费的情形,需要自行保证幂等性。
当 Kafka 某个 Broker 宕机,需要重新选举 Partition 的 Leader。若此时其他的 Follower 尚未同步 Leader 的数据,那么新选某个 Follower 为 Leader 后,就丢失了部分数据。
为此,一般要求至少设置 4 个参数:
- 给 Topic 设置
replication.factor
参数 - 这个值必须大于 1,要求每个 Partition 必须有至少 2 个副本。 - 在 Kafka 服务端设置
min.insync.replicas
参数 - 这个值必须大于 1,这是要求一个 Leader 需要和至少一个 Follower 保持通信,这样才能确保 Leader 挂了还有替补。 - 在 Producer 端设置
acks=all
- 这意味着:要求每条数据,必须是写入所有 replica 之后,才能认为写入成功了。 - 在 Producer 端设置
retries=MAX
(很大很大很大的一个值,无限次重试的意思) - 这意味着要求一旦写入失败,就无限重试,卡在这里了。
如果按照上述的思路设置了 acks=all
,生产方一定不会丢数据。
要求是,你的 Leader 接收到消息,所有的 Follower 都同步到了消息之后,才认为本生产消息成功了。如果未满足这个条件,生产者会自动不断的重试,重试无限次。
要保证 MQ 的顺序性,势必要付出一定的代价,所以实施方案前,要先明确业务场景是不是有必要保证消息的顺序性。只有那些明确对消息处理顺序有要求的业务场景才值得去保证消息顺序性。
方案一
一个 Topic,一个 Partition,一个 Consumer,内部单线程消费,单线程吞吐量太低,一般不会用这个。
方案二
- 写入数据到 Partition 时指定一个全局唯一的 ID,例如订单 ID。发送方保证相同 ID 的消息有序的发送到同一个 Partition。
- 基于上一点,消费方从 Kafka Partition 中消费消息时,此刻一定是顺序的。但如果消费方式以并发方式消费消息,顺序就可能会被打乱。为此,还有做到以下几点:
- 消费方维护 N 个缓存队列,具有相同 ID 的数据都写入同一个队列中;
- 创建 N 个线程,每个线程只负责从指定的一个队列中取数据。
假设一个 MQ 消费者可以一秒处理 1000 条消息,三个 MQ 消费者可以一秒处理 3000 条消息,那么一分钟的处理量是 18 万条。如果 MQ 中积压了几百万到上千万的数据,即使消费者恢复了,也需要大概很长的时间才能恢复过来。
对于产线环境来说,漫长的等待是不可接受的,所以面临这种窘境时,只能临时紧急扩容以应对了,具体操作步骤和思路如下:
- 先修复 Consumer 的问题,确保其恢复消费速度,然后将现有 Consumer 都停掉。
- 新建一个 Topic,Partition 是原来的 10 倍,临时建立好原先 10 倍的 Queue 数量。
- 然后写一个临时的分发数据的 Consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 Queue。
- 接着临时征用 10 倍的机器来部署 Consumer ,每一批 Consumer 消费一个临时 Queue 的数据。这种做法相当于是临时将 Queue 资源和 Consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。
- 等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。
不同 MQ 实现高可用的原理各不相同。因为 Kafka 比较具有代表性,所以这里以 Kafka 为例。
了解 Kafka,必须先了解 Kafka 的核心概念:
-
Broker - Kafka 集群包含一个或多个节点,这种节点被称为 Broker。
-
Topic - 每条发布到 Kafka 集群的消息都有一个类别,这个类别被称为 Topic。(不同 Topic 的消息是物理隔离的;同一个 Topic 的消息保存在一个或多个 Broker 上,但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处)。对于每一个 Topic, Kafka 集群都会维持一个分区日志。
-
Partition - 了提高 Kafka 的吞吐率,每个 Topic 包含一个或多个 Partition,每个 Partition 在物理上对应一个文件夹,该文件夹下存储这个 Partition 的所有消息和索引文件。
- Kafka 日志的分区(Partition)分布在 Kafka 集群的节点上。每个节点在处理数据和请求时,共享这些分区。每一个分区都会在已配置的节点上进行备份,确保容错性。
Kafka 是如何实现高可用的呢?
Kafka 在 0.8 以前的版本中,如果一个 Broker 宕机了,其上面的 Partition 都不能用了,这自然不是高可用的。
为了实现高可用,Kafka 引入了复制功能。
简单来说,就是副本机制( Replicate )。
每个 Partition 都有一个 Leader,零个或多个 Follower。Leader 和 Follower 都是 Broker,每个 Broker 都会成为某些分区的 Leader 和某些分区的 Follower,因此集群的负载是平衡的。
- Leader 处理一切对 Partition (分区)的读写请求;
- 而 Follower 只需被动的同步 Leader 上的数据。
同一个 Topic 的不同 Partition 会分布在多个 Broker 上,而且一个 Partition 还会在其他的 Broker 上面进行备份,Producer 在发布消息到某个 Partition 时,先找到该 Partition 的 Leader,然后向这个 Leader 推送消息;每个 Follower 都从 Leader 拉取消息,拉取消息成功之后,向 Leader 发送一个 ACK 确认。
FAQ
问:为什么让 Leader 处理一切对对 Partition (分区)的读写请求?
答:因为如果允许所有 Broker 都可以处理读写请求,就可能产生数据一致性问题。
由上文可知,Partition 在多个 Broker 上存在副本。
如果某个 Follower 宕机,啥事儿没有,正常工作。
如果 Leader 宕机了,会从 Follower 中重新选举一个新的 Leader。
ActiveMQ
是由 Apache
出品,ActiveMQ
是一个完全支持JMS1.1
和 J2EE 1.4
规范的 JMS Provider
实现。它非常快速,支持 多种语言的客户端 和 协议,而且可以非常容易的嵌入到企业的应用环境中,并有许多高级功能。
- 服从 JMS 规范:
JMS
规范提供了良好的标准和保证,包括:同步 或 异步 的消息分发,一次和仅一次的消息分发,消息接收 和 订阅 等等。遵从JMS
规范的好处在于,不论使用什么JMS
实现提供者,这些基础特性都是可用的; - 连接灵活性:
ActiveMQ
提供了广泛的 连接协议,支持的协议有:HTTP/S
,IP
多播,SSL
,TCP
,UDP
等等。对众多协议的支持让ActiveMQ
拥有了很好的灵活性; - 支持的协议种类多:
OpenWire
、STOMP
、REST
、XMPP
、AMQP
; - 持久化插件和安全插件:
ActiveMQ
提供了 多种持久化 选择。而且,ActiveMQ
的安全性也可以完全依据用户需求进行 自定义鉴权 和 授权; - 支持的客户端语言种类多:除了
Java
之外,还有:C/C++
,.NET
,Perl
,PHP
,Python
,Ruby
; - 代理集群:多个
ActiveMQ
代理 可以组成一个 集群 来提供服务; - 异常简单的管理:
ActiveMQ
是以开发者思维被设计的。所以,它并不需要专门的管理员,因为它提供了简单又使用的管理特性。有很多中方法可以 监控ActiveMQ
不同层面的数据,包括使用在JConsole
或者在ActiveMQ
的Web Console
中使用JMX
。通过处理JMX
的告警消息,通过使用 命令行脚本,甚至可以通过监控各种类型的 日志。
ActiveMQ
可以运行在 Java
语言所支持的平台之上。使用 ActiveMQ
需要:
Java JDK
ActiveMQ
安装包
- 跨平台 (
JAVA
编写与平台无关,ActiveMQ
几乎可以运行在任何的JVM
上); - 可以用
JDBC
:可以将 数据持久化 到数据库。虽然使用JDBC
会降低ActiveMQ
的性能,但是数据库一直都是开发人员最熟悉的存储介质; - 支持
JMS
规范:支持JMS
规范提供的 统一接口; - 支持 自动重连 和 错误重试机制;
- 有安全机制:支持基于
shiro
,jaas
等多种 安全配置机制,可以对Queue/Topic
进行 认证和授权; - 监控完善:拥有完善的 监控,包括
Web Console
,JMX
,Shell
命令行,Jolokia
的RESTful API
; - 界面友善:提供的
Web Console
可以满足大部分情况,还有很多 第三方的组件 可以使用,比如hawtio
;
- 社区活跃度不及
RabbitMQ
高; - 根据其他用户反馈,会出莫名其妙的问题,会 丢失消息;
- 目前重心放到
activemq 6.0
产品Apollo
,对5.x
的维护较少; - 不适合用于 上千个队列 的应用场景;
RabbitMQ
于 2007
年发布,是一个在 AMQP
(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。
- 可靠性:提供了多种技术可以让你在 性能 和 可靠性 之间进行 权衡。这些技术包括 持久性机制、投递确认、发布者证实 和 高可用性机制;
- 灵活的路由:消息在到达队列前是通过 交换机 进行 路由 的。
RabbitMQ
为典型的路由逻辑提供了 多种内置交换机 类型。如果你有更复杂的路由需求,可以将这些交换机组合起来使用,你甚至可以实现自己的交换机类型,并且当做RabbitMQ
的 插件 来使用; - 消息集群:在相同局域网中的多个
RabbitMQ
服务器可以 聚合 在一起,作为一个独立的逻辑代理来使用; - 队列高可用:队列可以在集群中的机器上 进行镜像,以确保在硬件问题下还保证 消息安全;
- 支持多种协议:支持 多种消息队列协议;
- 支持多种语言:用
Erlang
语言编写,支持只要是你能想到的 所有编程语言; - 管理界面:
RabbitMQ
有一个易用的 用户界面,使得用户可以 监控 和 管理 消息Broker
的许多方面; - 跟踪机制:如果 消息异常,
RabbitMQ
提供消息跟踪机制,使用者可以找出发生了什么; - 插件机制:提供了许多 插件,来从多方面进行扩展,也可以编写自己的插件。
RabbitMQ
可以运行在 Erlang
语言所支持的平台之上,包括 Solaris
,BSD
,Linux
,MacOSX
,TRU64
,Windows
等。使用 RabbitMQ
需要:
ErLang
语言包RabbitMQ
安装包
- 由于
Erlang
语言的特性,消息队列性能较好,支持 高并发; - 健壮、稳定、易用、跨平台、支持 多种语言、文档齐全;
- 有消息 确认机制 和 持久化机制,可靠性高;
- 高度可定制的 路由;
- 管理界面 较丰富,在互联网公司也有较大规模的应用,社区活跃度高。
- 尽管结合
Erlang
语言本身的并发优势,性能较好,但是不利于做 二次开发和维护; - 实现了 代理架构,意味着消息在发送到客户端之前可以在 中央节点 上排队。此特性使得
RabbitMQ
易于使用和部署,但是使得其 运行速度较慢,因为中央节点 增加了延迟,消息封装后 也比较大; - 需要学习 比较复杂 的 接口和协议,学习和维护成本较高。
RocketMQ
出自 阿里 的开源产品,用 Java
语言实现,在设计时参考了 Kafka
,并做出了自己的一些改进,消息可靠性上 比 Kafka
更好。RocketMQ
在阿里内部 � 被广泛应用在 订单,交易,充值,流计算,消息推送,日志流式处理,binglog
分发 等场景。
- 基于 队列模型:具有 高性能、高可靠、高实时、分布式 等特点;
Producer
、Consumer
、队列 都支持 分布式;Producer
向一些队列轮流发送消息,队列集合 称为Topic
。Consumer
如果做 广播消费,则一个Consumer
实例消费这个Topic
对应的 所有队列;如果做 集群消费,则 多个Consumer
实例 平均消费 这个Topic
对应的队列集合;- 能够保证 严格的消息顺序;
- 提供丰富的 消息拉取模式;
- 高效的订阅者 水平扩展能力;
- 实时 的 消息订阅机制;
- 亿级 消息堆积 能力;
- 较少的外部依赖。
RocketMQ
可以运行在 Java
语言所支持的平台之上。使用 RocketMQ
需要:
Java JDK
- 安装
git
、Maven
RocketMQ
安装包
- 单机 支持
1
万以上 持久化队列; RocketMQ
的所有消息都是 持久化的,先写入系统PAGECACHE
,然后 刷盘,可以保证 内存 与 磁盘 都有一份数据,而 访问 时,直接 从内存读取。- 模型简单,接口易用(
JMS
的接口很多场合并不太实用); - 性能非常好,可以允许 大量堆积消息 在
Broker
中; - 支持 多种消费模式,包括 集群消费、广播消费等;
- 各个环节 分布式扩展设计,支持 主从 和 高可用;
- 开发度较活跃,版本更新很快。
- 支持的 客户端语言 不多,目前是
Java
及C++
,其中C++
还不成熟; RocketMQ
社区关注度及成熟度也不及前两者;- 没有
Web
管理界面,提供了一个CLI
(命令行界面) 管理工具带来 查询、管理 和 诊断各种问题; - 没有在
MQ
核心里实现JMS
等接口;
Apache Kafka
是一个 分布式消息发布订阅 系统。它最初由 LinkedIn
公司基于独特的设计实现为一个 分布式的日志提交系统 (a distributed commit log
),之后成为 Apache
项目的一部分。Kafka
性能高效、可扩展良好 并且 可持久化。它的 分区特性,可复制 和 可容错 都是其不错的特性。
- 快速持久化:可以在
O(1)
的系统开销下进行 消息持久化; - 高吞吐:在一台普通的服务器上既可以达到
10W/s
的 吞吐速率; - 完全的分布式系统:
Broker
、Producer
和Consumer
都原生自动支持 分布式,自动实现 负载均衡; - 支持 同步 和 异步 复制两种 高可用机制;
- 支持 数据批量发送 和 拉取;
- 零拷贝技术(zero-copy):减少
IO
操作步骤,提高 系统吞吐量; - 数据迁移、扩容 对用户透明;
- 无需停机 即可扩展机器;
- 其他特性:丰富的 消息拉取模型、高效 订阅者水平扩展、实时的 消息订阅、亿级的 消息堆积能力、定期删除机制;
使用 Kafka
需要:
Java JDK
Kafka
安装包
- 客户端语言丰富:支持
Java
、.Net
、PHP
、Ruby
、Python
、Go
等多种语言; - 高性能:单机写入
TPS
约在100
万条/秒,消息大小10
个字节; - 提供 完全分布式架构,并有
replica
机制,拥有较高的 可用性 和 可靠性,理论上支持 消息无限堆积; - 支持批量操作;
- 消费者 采用
Pull
方式获取消息。消息有序,通过控制 能够保证所有消息被消费且仅被消费 一次; - 有优秀的第三方
Kafka Web
管理界面Kafka-Manager
; - 在 日志领域 比较成熟,被多家公司和多个开源项目使用。
Kafka
单机超过64
个 队列/分区 时,Load
时会发生明显的飙高现象。队列 越多,负载 越高,发送消息 响应时间变长;- 使用 短轮询方式,实时性 取决于 轮询间隔时间;
- 消费失败 不支持重试;
- 支持 消息顺序,但是 一台代理宕机 后,就会产生 消息乱序;
- 社区更新较慢。
MQ 的技术选型一般要考虑以下几点:
- 是否开源:这决定了能否商用,所以最为重要。
- 社区活跃度越高越好:高社区活跃度,一般保证了低 Bug 率,因为大部分 Bug,已经有人遇到并解决了。
- 技术生态适配性:客户端对各种编程语言的支持。比如:如果使用 MQ 的都是 Java 应用,那么 ActiveMQ、RabbitMQ、RocketMQ、Kafka 都可以。如果需要支持其他语言,那么 RMQ 比较合适,因为它支持的编程语言比较丰富。如果 MQ 是应用于大数据或流式计算,那么 Kafka 几乎是标配。如果是应用于在线业务系统,那么 Kafka 就不合适了,可以考虑 RabbitMQ、 RocketMQ 很合适。
- 高可用性:应用于线上的准入标准。
- 性能:具备足够好的性能,能满足绝大多数场景的性能要求。
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
单机吞吐量 | 万级,比 RocketMQ、Kafka 低一个数量级 | 同 ActiveMQ | 10 万级,支撑高吞吐 | 10 万级,高吞吐,一般配合大数据类的系统来进行流式计算、日志采集等场景 |
topic 数量对吞吐量的影响 | topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topic | topic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源 | ||
时效性 | ms 级 | 微秒级,这是 RabbitMQ 的一大特点,延迟最低 | ms 级 | 延迟在 ms 级以内 |
可用性 | 高,基于主从架构实现高可用 | 同 ActiveMQ | 非常高,分布式架构 | 非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 |
消息可靠性 | 有较低的概率丢失数据 | 基本不丢 | 经过参数优化配置,可以做到 0 丢失 | 同 RocketMQ |
功能支持 | MQ 领域的功能极其完备 | 基于 erlang 开发,并发能力很强,性能极好,延时很低 | MQ 功能较为完善,还是分布式的,扩展性好 | 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用 |
综上,各种对比之后,有如下建议:
- 业务系统场景,建议使用 RocketMQ、RabbitMQ。如果所有应用都是 Java,优选 RocketMQ,因为 RocketMQ 本身就是 Java 开发的,所以最适配。如果业务中有多种编程语言的应用,建议选择 RabbitMQ。
- 大数据和流式计算领域,或是作为日志缓冲,强烈建议选择 Kafka,业界标准,久经考验。
提到 MQ,就顺便提一下 JMS 。
JMS(JAVA Message Service,java 消息服务)API 是一个消息服务的标准/规范,允许应用程序组件基于 JavaEE 平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。
在 EJB 架构中,有消息 bean 可以无缝的与 JMS 消息服务集成。在 J2EE 架构模式中,有消息服务者模式,用于实现消息与应用直接的解耦。
在 JMS 标准中,有两种消息模型:
- P2P(Point to Point)
- Pub/Sub(Publish/Subscribe)
P2P 模式包含三个角色:MQ(Queue),发送者(Sender),接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。
P2P 的特点
- 每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在 MQ 中)
- 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列
- 接收者在成功接收消息之后需向队列应答成功
如果希望发送的每个消息都会被成功处理的话,那么需要 P2P 模式。
包含三个角色主题(Topic),发布者(Publisher),订阅者(Subscriber) 。多个发布者将消息发送到 Topic,系统将这些消息传递给多个订阅者。
Pub/Sub 的特点
- 每个消息可以有多个消费者
- 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
- 为了消费消息,订阅者必须保持运行的状态。
为了缓和这样严格的时间相关性,JMS 允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。
如果希望发送的消息可以不被做任何处理、或者只被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用 Pub/Sub 模型。
在 JMS 中,消息的产生和消费都是异步的。对于消费来说,JMS 的消息者可以通过两种方式来消费消息。
- 同步 - 订阅者或接收者通过
receive
方法来接收消息,receive
方法在接收到消息之前(或超时之前)将一直阻塞; - 异步 - 订阅者或接收者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的
onMessage
方法。
JNDI
- Java 命名和目录接口,是一种标准的 Java 命名系统接口。可以在网络上查找和访问服务。通过指定一个资源名称,该名称对应于数据库或命名服务中的一个记录,同时返回资源连接建立所必须的信息。
JNDI 在 JMS 中起到查找和访问发送目标或消息来源的作用。
创建 Connection 对象的工厂,针对两种不同的 jms 消息模型,分别有 QueueConnectionFactory 和 TopicConnectionFactory 两种。可以通过 JNDI 来查找 ConnectionFactory 对象。
Destination 的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的 Destination 是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的 Destination 也是某个队列或主题(即消息来源)。
所以,Destination 实际上就是两种类型的对象:Queue、Topic。可以通过 JNDI 来查找 Destination。
Connection 表示在客户端和 JMS 系统之间建立的链接(对 TCP/IP socket 的包装)。Connection 可以产生一个或多个 Session。跟 ConnectionFactory 一样,Connection 也有两种类型:QueueConnection 和 TopicConnection。
Session 是操作消息的接口。可以通过 session 创建生产者、消费者、消息等。Session 提供了事务的功能。当需要使用 session 发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分 QueueSession 和 TopicSession。
消息生产者由 Session 创建,并用于将消息发送到 Destination。同样,消息生产者分两种类型:QueueSender 和 TopicPublisher。可以调用消息生产者的方法(send 或 publish 方法)发送消息。
消息消费者由 Session 创建,用于接收被发送到 Destination 的消息。两种类型:QueueReceiver 和 TopicSubscriber。可分别通过 session 的 createReceiver(Queue)或 createSubscriber(Topic)来创建。当然,也可以 session 的 creatDurableSubscriber 方法来创建持久化的订阅者。
消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的 onMessage 方法。EJB 中的 MDB(Message-Driven Bean)就是一种 MessageListener。