分享概要
一、概述
二、架构简介
三、选型要点
四、功能剖析
五、性能
六、运维
七、常见问题 & 使用场景
八、总结
消息队列是重要的分布式系统组件,在高性能、高可用、低耦合等系统架构中扮演着重要作用。可用于异步通信、削峰填谷、解耦系统、数据缓存等多种业务场景。本文是关于消息队列(MQ)选型和常见问题的精心整理。在这篇文章中,我们将详细介绍消息队列的概念、作用以及如何选择适合自己需求的消息队列系统。
一、概述
消息队列是分布式系统中重要的中间件,在高性能、高可用、低耦合等系统架构中扮演着重要作用。分布式系统可以借助消息队列的能力,轻松实现以下功能:
- 解耦:将一个流程的上下游拆解开,上游专注于生产消息,下游专注于处理消息;
- 广播:上游生产的消息可以轻松被多个下游服务处理;
- 缓冲:应对突发流量,消息队列扮演缓冲器的作用,保护下游服务,使其可以根据自身的实际消费能力处理消息;
- 异步:上游发送消息后可以马上返回,下游可以异步处理消息;
- 冗余:保留历史消息,处理失败或当出现异常时可以进行重试或者回溯,防止丢失;
二、架构简介
2.1 Kafka
2.1.1 系统框架
一个 Kafka 集群由多个 Broker 和一个 ZooKeeper 集群组成,Broker 作为 Kafka 节点的服务器。同一个消息主题 Topic 可以由多个分区 Partition 组成,分区物理存储在 Broker 上。负载均衡考虑,同一个 Topic 的多个分区存储在多个不同的 Broker 上,为了提高可靠性,每个分区在不同的 Broker 会存在副本。
ZookKeeper 是一个分布式开源的应用程序协调服务,可以实现统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等工作。Kafka 里的 ZooKeeper 主要有一下几个作用:
- Broker 注册,当有 Broker 故障的时候能及时感知。
- Topic 注册,维护 Topic 各分区的个副本所在的 Broker 节点,以及对应 leader/follower 的角色。
- Consumer 注册,维护消费者组的 offset 以及消费者与分区的对应关系,实现负载均衡。
2.1.2 基本术语
Producer:消息生产者。一般情况下,一条消息会被发送到特定的主题上。通常情况下,写入的消息会通过轮询将消息写入各分区。生产者也可以通过设定消息 key 值将消息写入指定分区。写入分区的数据越均匀 Kafka 的性能才能更好发挥。
Topic:Topic 是个抽象的虚拟概念,一个集群可以有多个 Topic,作为一类消息的标识。一个生产者将消息发送到 topic,消费者通过订阅 Topic 获取分区消息。
Partition:Partition 是个物理概念,一个 Topic 对应一个或多个 Partition。新消息会以追加的方式写入分区里,在同一个 Partition 里消息是有序的。Kafka 通过分区,实现消息的冗余和伸缩性,以及支持物理上的并发读、写,大大提高了吞吐量。
Replicas:一个 Partition 有多个 Replicas 副本。这些副本保存在 broker,每个 broker 存储着成百上千个不同主题和分区的副本,存储的内容分为两种:master 副本,每个 Partition 都有一个 master 副本,所有内容的写入和消费都会经过 master 副本;follower 副本不处理任何客户端的请求,只同步 master 的内容进行复制。如果 master 发生了异常,很快会有一个 follower 成为新的 master。
Consumer:消息读取者。消费者订阅主题,并按照一定顺序读取消息。Kafka 保证每个分区只能被一个消费者使用。
Offset:偏移量是一种元数据,是不断递增的整数。在消息写入时 Kafka 会把它添加到消息里。在分区内偏移量是唯一的。消费过程中,会将最后读取的偏移量存储在 Kafka 中,消费者关闭偏移量不会丢失,重启会继续从上次位置开始消费。
Broker:独立的 Kafka 服务器。一个 Topic 有 N 个 Partition,一个集群有 N 个 Broker,那么每个 Broker 都会存储一个这个 Topic 的 Partition。如果某 topic 有 N 个 partition,集群有(N+M)个 broker,那么其中有 N 个 broker 存储该 topic 的一个 partition,剩下的 M 个 broker 不存储该 topic 的 partition 数据。如果某 topic 有 N 个 partition,集群中 broker 数目少于 N 个,那么一个 broker 存储该 topic 的一个或多个 partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致 Kafka 集群数据不均衡。
2.2 Pulsar
2.2.1 系统框架
Pulsar 有三个重要的组件,Broker、BookKeeper 和ZooKeeper,Broker 是无状态服务,客户端需要连接到 Broker 上进行消息的传递。BookKeeper 与 ZooKeeper 是有状态服务。BookKeeper 的节点叫 Bookie,负责存储消息和游标,ZooKeeper 存储 Broker 和 Bookie 的元数据。Pulsar 以这种架构,实现存储和计算分离,Broker 负责计算,Bookie 负责有状态存储。
Pulsar 的多层架构影响了存储数据的方式。Pulsar 将 Topic 分区划分为分片(Segment),然后将这些分片存储在 Apache BookKeeper 的存储节点上,以提高性能、可伸缩性和可用性。Pulsar 的分布式日志以分片为中心,借助扩展日志存储(通过 Apache BookKeeper)实现,内置分层存储支持,因此分片可以均匀地分布在存储节点上。由于与任一给定 Topic 相关的数据都不会与特定存储节点进行捆绑,因此很容易替换存储节点或缩扩容。另外,集群中最小或最慢的节点也不会成为存储或带宽的短板。
2.2.2 基本术语
Property:代表租户,每个 property 都可以代表一个团队、一个功能、一个产品线。一个 property 可包含多个 namesapce,多租户是一种资源隔离手段,可以提高资源利用率;
Namespace:Pulsar 的基本管理单元,在 namaspace 级别可设置权限、消息 TTL、Retention 策略等。一个 namaspace 里的所有 topic 都继承相同的设置。命名空间分为两种:本地命名空间,只在集群内可见、全局命名空间对多个集群可见集群命名空间;
Producer:数据生产方,负责创建消息并将消息投递到 Pulsar 中;
Consumer:数据消费方,连接到 Pulsar 接收消息并进行相应的处理;
Broker:无状态 Proxy 服务,负责接收消息、传递消息、集群负载均衡等操作,它对 client 屏蔽了服务端读写流程的复杂性,是保证数据一致性与数据负载均衡的重要角色。Broker 不会持久化保存元数据。可以扩容但不能缩容;
BookKeeper:有状态,负责持久化存储消息。当集群扩容时,Pulsar 会在新增 BookKeeper 和 Segment(即 Bookeeper 的 Ledger),不需要像 kafka 一样在扩容时进行 Rebalance。扩容结果是 Fragments 跨多个 Bookies 以带状分布,同一个 Ledger 的 Fragments 分布在多个 Bookie 上,导致读取和写入会在多个 Bookies 之间跳跃;
ZooKeeper:存储 Pulsar 、 BookKeeper 的元数据,集群配置等信息,负责集群间的协调、服务发现等;
Topic:用作从 producer 到 consumer 传输消息。Pulsar 在 Topic 级别拥有一个 leader Broker,称之为拥有 Topic 的所有权,针对该 Topic 所有的 R/W 都经过该 Broker 完成。Topic 的 Ledger 和 Fragment 之间映射关系等元数据存储在 Zookeeper 中,Pulsar Broker 需要实时跟踪这些关系进行读写流程;
Ledger:即 Segment,Pulsar 底层数据以 Ledger 的形式存储在 BookKeeper 上。是 Pulsar 删除的最小单位;
Fragment :每个 Ledger 由若干 Fragment 组成。
2.3 RocketMQ
2.3.1 系统框架
RocketMQ 是开源的消息中间件,它是一个开源的分布式消息传递和流式数据平台。总共有四大部分:NameServer,Broker,Producer,Consumer。
NameServer 主要用来管理 brokers 以及路由信息。broker 服务器启动时会注册到 NameServer 上,并且两者之间保持心跳监测机制,以此来保证 NameServer 知道 broker 的存活状态。而且,每一台 NameServer 都存有全部的 broker 集群信息和生产者/消费者客户端的请求信息。
Broker 负责管理消息存储分发,主从数据同步,为消息建立索引,提供消息查询等能力。
2.3.2 基本术语
Topic:一个 Topic 可以有 0 个、1 个、多个生产者向其发送消息,一个生产者也可以同时向不同的 Topic 发送消息。一个 Topic 也可以被 0 个、1 个、多个消费者订阅;
Tag:消息二级类型,可以为用户提供额外的灵活度,一条消息可以没有 tag;
Producer:消息生产者;
Broker:存储消息,以 Topic 为纬度轻量级的队列;转发消息,单个 Broker 节点与所有的 NameServer 节点保持长连接及心跳,会定时将 Topic 信息注册到 NameServer;
Consumer:消息消费者,负责接收并消费消息;
MessageQueue:消息的物理管理单位,一个 Topic 可以有多个 Queue,Queue 的引入实现了水平扩展的能力;
NameServer:负责对原数据的管理,包括 Topic 和路由信息,每个 NameServer 之间是没有通信的;
Group:一个组可以订阅多个 Topic,ProducerGroup、ConsumerGroup 分别是一类生产者和一类消费者;
Offset:通过 Offset 访问存储单元,RocketMQ 中所有消息都是持久化的,且存储单元定长。Offset 为 Java Long 类型,理论上 100 年内不会溢出,所以认为 Message Queue 是无限长的数据,Offset 是下标;
Consumer:支持 PUSH 和 PULL 两种消费模式,支持集群消费和广播消费。
2.4 RabbitMQ
2.4.1 系统框架
RabbitMQ 基于 AMQP 协议来实现,主要由 Exchange 和 Queue 两部分组成,然后通过 RoutingKey 关联起来,消息投递到 Exchange 然后通过 Queue 接收。
2.4.2 基本术语
Broker:接收客户端链接实体,实现 AMQP 消息队列和路由功能;
Virtual Host:是一个虚拟概念,权限控制的最小单位。一个 Virtual Host 里包含多个 Exchange 和 Queue;
Exchange:接收消息生产者的消息并将消息转发到队列。发送消息时根据不同 ExchangeType 的决定路由规则,ExchangeType 常用的有:direct、fanout 和 topic 三种;
Message Queue:消息队列,存储为被消费的消息;
Message:由 Header 和 Body 组成,Header 是生产者添加的各种属性,包含 Message 是否持久化、哪个 MessageQueue 接收、优先级。Body 是具体的消息内容;
Binding:Binding 连接起了 Exchange 和 Message Queue。在服务器运行时,会生成一张路由表,这张路由表上记录着 MessageQueue 的条件和 BindingKey 值。当 Exchange 收到消息后,会解析消息中的 Header 得到 BindingKey,并根据路由表和 ExchangeType 将消息发送到对应的 MessageQueue。最终的匹配模式是由 ExchangeType 决定;
Connection:在 Broker 和客户端之间的 TCP 连接;
Channel:信道。Broker 和客户端只有 tcp 连接是不能发送消息的,必须创建信道。AMQP 协议规定只有通过 Channel 才能执行 AMQP 命令。一个 Connection 可以包含多个 Channel。之所以需要建立 Channel,是因为每个 TCP 连接都是很宝贵的。如果每个客户端、每个线程都需要和 Broker 交互,都需要维护一个 TCP 连接的话是机器耗费资源的,一般建议共享 Connection。RabbitMQ 不建议客户端线程之前共享 Channel,至少保证同一 Channel 发小消息是穿行的;
Command:AMQP 命令,客户端通过 Command 来完成和 AMQP 服务器的交互。
2.5 NSQ
2.5.1 系统框架
NSQ 主要有 nsqlookup、nsqd 两部分组成:
- Nsqlookup 为守护进程,负责管理拓扑信息并提供发现服务。客户端通过查询 nsqlookupd 获取指定 Topic 所在的 nsqd 节点。nsqd 往 nsqlookup 上注册和广播自身 topic 和 channel 的信息。
- nsqd 在服务端运行的守护进程,负责接收,排队,投递消息给客户端。
NSQ 由 3 个守护进程组成:
- nsqd 是接收、队列和传送消息到客户端的守护进程。
- nsqlookupd 是管理的拓扑信息,并提供了最终一致发现服务的守护进程。客户端通过查询 nsqlookupd 获取指定 Topic 所在的 nsqd 节点。nsqd 往 nsqlookup 上注册和广播自身 topic 和 channel 的信息。
- nsqadmin 是一个 Web UI 来实时监控集群(和执行各种管理任务)。
三、选型要点
3.1 选型参考
- 消息顺序:发送到队列的消息,消费时是否可以保证消费的顺序;
- 伸缩:当消息队列性能有问题,比如消费太慢,是否可以快速支持扩容;当消费队列过多,浪费系统资源,是否可以支持缩容。
- 消息留存:消息消费成功后,是否还会继续保留在消息队列;
- 容错性:当一条消息消费失败后,是否有一些机制,保证这条消息一定能成功,比如异步第三方退款消息,需要保证这条消息消费掉,才能确定给用户退款成功,所以必须保证这条消息消费成功的准确性;
- 消息可靠性:是否会存在丢消息的情况,比如有 A/B 两个消息,最后只有 B 消息能消费,A 消息丢失;
- 消息时序:主要包括“消息存活时间”和“延迟消息”;
- 吞吐量:支持的最高并发数;
- 消息路由:根据路由规则,只订阅匹配路由规则的消息,比如有 A/B 两者规则的消息,消费者可以只订阅 A 消息,B 消息不会消费。
3.2 消息队列对比
注:作为 LShift 和 CohesiveFT 于 2007 年成立的合资企业,RabbitMQ 于 2010 年 4 月被 VMware 旗下的 SpringSource 收购。
四、功能剖析
4.1 消费推拉模式
客户端消费者获取消息的方式,Kafka 和 RocketMQ 是通过长轮询 Pull 的方式拉取消息,RabbitMQ、Pulsar、NSQ 都是通过 Push 的方式。
pull 类型的消息队列更适合高吞吐量的场景,允许消费者自己进行流量控制,根据消费者实际的消费能力去获取消息。而 push 类型的消息队列,实时性更好,但需要有一套良好的流控策略(backpressure)当消费者消费能力不足时,减少 push 的消费数量,避免压垮消费端。
4.2 延迟队列
消息延迟投递,当消息产生送达消息队列时,有些业务场景并不希望消费者立刻收到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。延迟队列一般分为两种,基于消息的延迟和基于队列的延迟。基于消息的延迟指为每条消息设置不同的延迟时间,当队列有新消息进入的时候根据延迟时间排序,当然这样会对性能造成较大影响。另一种基于队列的延迟指的是设置不同延迟级别的队列,队列中每个消息的延迟时间都是相同的,这样免去了基于延迟时间排序对性能带来的损耗,通过一定的扫描策略即可投递超时的消息。
延迟消息的使用场景比如异常检测重试,订单超时取消等,例如:
- 服务请求异常,需要将异常请求放到单独的队列,隔 5 分钟后进行重试;
- 用户购买商品,但一直处于未支付状态,需要定期提醒用户支付,超时则关闭订单;
- 面试或者会议预约,在面试或者会议开始前半小时,发送通知再次提醒。
Kafka 不支持延迟消息。Pulsar 支持秒级的延迟消息,所有延迟投递的消息会被 Delayed Message Tracker 记录对应的 index,consumer 在消费时,会先去 Delayed Message Tracker 检查,是否有到期需要投递的消息,如果有到期的消息,则从 Tracker 中拿出对应的 index,找到对应的消息进行消费,如果没有到期的消息,则直接消费正常的消息。对于长时间的延迟消息,会被存储在磁盘中,当快到延迟间隔时才被加载到内存里。
RocketMQ 开源版本延迟消息临时存储在一个内部主题中,不支持任意时间精度,支持特定的 level,例如定时 5s,10s,1m 等。
RabbitMQ 需要安装一个
rabbitmq_delayed_message_exchange 插件。
NSQ 通过内存中的优先级队列来保存延迟消息,支持秒级精度,最多 2 个小时延迟。
4.3 死信队列
由于某些原因消息无法被正确的投递,为了确保消息不会被无故的丢弃,一般将其置于一个特殊角色的队列,这个队列一般称之为死信队列。与此对应的还有一个“回退队列”的概念,试想如果消费者在消费时发生了异常,那么就不会对这一次消费进行确认(Ack), 进而发生回滚消息的操作之后消息始终会放在队列的顶部,然后不断被处理和回滚,导致队列陷入死循环。为了解决这个问题,可以为每个队列设置一个回退队列,它和死信队列都是为异常的处理提供的一种机制保障。实际情况下,回退队列的角色可以由死信队列和重试队列来扮演。
- Kafka 没有死信队列,通过 Offset 的方式记录当前消费的偏移量。
- Pulsar 有重试机制,当某些消息第一次被消费者消费后,没有得到正常的回应,则会进入重试 Topic 中,当重试达到一定次数后,停止重试,投递到死信 Topic 中。
- RocketMQ 通过 DLQ 来记录所有消费失败的消息。
- RabbitMQ 是利用类似于延迟队列的形式实现死信队列。
- NSQ 没有死信队列。
4.4 优先级队列
有一些业务场景下,我们需要优先处理一些消息,比如银行里面的金卡客户、银卡客户优先级高于普通客户,他们的业务需要优先处理。如下图:
优先级队列不同于先进先出队列,优先级高的消息具备优先被消费的特权,这样可以为下游提供不同消息级别的保证。不过这个优先级也是需要有一个前提的:如果消费者的消费速度大于生产者的速度,并且消息中间件服务器(一般简单的称之为 Broker)中没有消息堆积,那么对于发送的消息设置优先级也就没有什么实质性的意义了,因为生产者刚发送完一条消息就被消费者消费了,那么就相当于 Broker 中至多只有一条消息,对于单条消息来说优先级是没有什么意义的。
Kafka、RocketMQ、Pulsar、NSQ 不支持优先级队列,可以通过不同的队列来实现消息优先级。
RabbitMQ 支持优先级消息。
4.5 消息回溯
一般消息在消费完成之后就被处理了,之后再也不能消费到该条消息。消息回溯正好相反,是指消息在消费完成之后,还能消费到之前被消费掉的消息。对于消息而言,经常面临的问题是“消息丢失”,至于是真正由于消息中间件的缺陷丢失还是由于使用方的误用而丢失一般很难追查,如果消息中间件本身具备消息回溯功能的话,可以通过回溯消费复现“丢失的”消息进而查出问题的源头之所在。消息回溯的作用远不止与此,比如还有索引恢复、本地缓存重建,有些业务补偿方案也可以采用回溯的方式来实现。
- Kafka 支持消息回溯,可以根据时间戳或指定 Offset,重置 Consumer 的 Offset 使其可以重复消费。
- Pulsar 支持按时间对消息进行回溯。
- RocketMQ 支持按时间回溯,实现的原理跟 Kafka 一致。
- RabbitMQ 不支持回溯,消息一旦标记确认就会被标记删除。
- NSQ 一般消息是不可回溯的,但可以通过 nsq_to_file 工具,将消息写入到文件,然后从文件里重放消息。
4.6 消息持久化
流量削峰是消息中间件的一个非常重要的功能,而这个功能其实得益于其消息堆积能力。从某种意义上来讲,如果一个消息中间件不具备消息堆积的能力,那么就不能把它看做是一个合格的消息中间件。消息堆积分内存式堆积和磁盘式堆积。一般来说,磁盘的容量会比内存的容量要大得多,对于磁盘式的堆积其堆积能力就是整个磁盘的大小。从另外一个角度讲,消息堆积也为消息中间件提供了冗余存储的功能。
Kafka 和 RocketMQ 直接将消息刷入磁盘文件中进行持久化,所有的消息都存储在磁盘中。只要磁盘容量够,可以做到无限消息堆积。
RabbitMQ 是典型的内存式堆积,但这并非绝对,在某些条件触发后会有换页动作来将内存中的消息换页到磁盘(换页动作会影响吞吐),或者直接使用惰性队列来将消息直接持久化至磁盘中。
Pulsar 消息是存储在 BookKeeper 存储集群上,也是磁盘文件。
NSQ 通过 nsq_to_file 工具,将消息写入到文件。
4.7 消息确认机制
消息队列需要管理消费进度,确认消费者是否成功处理消息,使用 push 的方式的消息队列组件往往是对单条消息进行确认,对于未确认的消息,进行延迟重新投递或者进入死信队列。
Kafka 通过 Offset 的方式确认消息。
1)发送方确认机制 ack=0,不管消息是否成功写入分区 ack=1,消息成功写入首领分区后,返回成功 ack=all,消息成功写入所有分区后,返回成功。
2)接收方确认机制 自动或者手动提交分区偏移量,早期版本的 kafka 偏移量是提交给 Zookeeper 的,这样使得 zookeeper 的压力比较大,更新版本的 kafka 的偏移量是提交给 kafka 服务器的,不再依赖于 zookeeper 群组,集群的性能更加稳定。
RocketMQ 与 Kafka 类似也会提交 Offset,区别在于消费者对于消费失败的消息,可以标记为消息消费失败,Broker 会重试投递,如果累计多次消费失败,会投递到死信队列。
RabbitMQ和NSQ 类似,消费者确认单条消息,否则会重新放回队列中等待下次投递。
1)发送方确认机制,消息被投递到所有匹配的队列后,返回成功。如果消息和队列是可持久化的,那么在写入磁盘后,返回成功。支持批量确认和异步确认。
2)接收方确认机制,设置 autoAck 为 false,需要显式确认,设置 autoAck 为 true,自动确认。当 autoAck 为 false 的时候,RabbitMQ 队列会分成两部分,一部分是等待投递给 consumer 的消息,一部分是已经投递但是没收到确认的消息。如果一直没有收到确认信号,并且 consumer 已经断开连接,RabbitMQ 会安排这个消息重新进入队列,投递给原来的消费者或者下一个消费者。未确认的消息不会有过期时间,如果一直没有确认,并且没有断开连接,RabbitMQ 会一直等待,RabbitMQ 允许一条消息处理的时间可以很久很久。
Pulsar 使用专门的 Cursor 管理。累积确认和 Kafka 效果一样;提供单条或选择性确认。
4.8 消息 TTL
消息 TTL 表示一条消息的生存时间,如果消息发出来后,在 TTL 的时间内没有消费者进行消费,消息队列会将消息删除或者放入死信队列中。
Kafka 根据设置的保留期来删除消息。有可能消息没被消费,过期后被删除。不支持 TTL。
Pulsar 支持 TTL,如果消息未在配置的 TTL 时间段内被任何消费者使用,则消息将自动标记为已确认。消息保留期与消息 TTL 之间的区别在于:消息保留期作用于标记为已确认并设置为已删除的消息,而 TTL 作用于未 ack 的消息。上面的图例中说明了 Pulsar 中的 TTL。例如,如果订阅 B 没有活动消费者,则在配置的 TTL 时间段过后,消息 M10 将自动标记为已确认,即使没有消费者实际读取该消息。
RocketMQ 提及到消息 TTL 的资料比较少,不过看接口似乎是支持的。
RabbitMQ 有两种方式,一个是声明队列的时候在队列属性中设置,整个队列中的消息都有相同的有效期。还可以发送消息的时候给消息设置属性,可以位每条消息都设置不同的 TTL。
NSQ 似乎还没支持,有一个 Feature Request 的 Issue 处于 Open 状态。
4.9 多租户隔离
多租户是指通过一个软件实例为多个租户提供服务的能力。租户是指对系统有着相同“视图”的一组用户。不支持多租户的系统里边,往往要为不同用户或者不同集群创建多个消息队列实例实现物理隔离,这样会带来较高的运维成本。作为一种企业级的消息系统,Pulsar 的多租户能力按照设计可满足下列需求:
- 确保严苛的 SLA 可顺利满足。
- 保证不同租户之间的隔离。
- 针对资源利用率强制实施配额。
- 提供每租户和系统级的安全性。
- 确保低成本运维以及尽可能简单的管理。
Pulsar 通过下列方式满足了上述需求:
- 通过为每个租户进行身份验证、授权和 ACL(访问控制列表)获得所需安全性。
- 为每个租户强制实施存储配额。
以策略的方式定义所有隔离机制,策略可在运行过程中更改,借此降低运维成本并简化管理工作。
4.10 消息顺序性
消息顺序性是指保证消息有序。消息消费顺序跟生产的顺序保持一致。
- Kafka 保证了分区内的消息有序。
- Pulsar 支持两种消费模式,独占订阅的流模式只保证了消息的顺序性,共享订阅队列模型不保证有序性。
- RocketMQ 需要用到锁来保证一个队列同时只有一个消费者线程进行消费,保证消息的有序性。
- RabbitMQ 顺序性的条件比较苛刻,需要单线程发送、单线程消费,并且不采用延迟队列、优先级队列等高级功能。
- NSQ 是利用了 golang 自身的 case/select 实现的消息分发,本身不提供有序性保障,不能够把特性消息和消费者对应起来,无法实现消息的有序性。
4.11 消息查询
在实际开发中,经常要查看 MQ 中消息的内容,比如通过某个 MessageKey/ID,查询到 MQ 的具体消息。或者是对消息进行链路追踪,知道消息从哪里来,发送到哪里去,进而快速对问题进行排查定位。
Kafka 存储层是以分布式提交日志的形式实现,每次写操作都顺序追加到日志的末尾。读也是顺序读。不支持检索功能。
Pulsar 可以通过消息 ID,查询到具体某条消息的消息内容、消息参数和消息轨迹。
RocketMQ 支持按 Message Key、Unique Key、Message Id 对消息进行查询。
RabbitMQ 使用基于索引的存储系统。这些将数据保存在树结构中,以提供确认单个消息所需的快速访问。由于 RabbitMQ 的消息在确认后会被删除,因此只能查询未确认的消息。
NSQ 自身不支持消息持久化和消息检索,不过可以使用 nsq_to_http 等工具将消息写入可支持索引的存储里。
4.12 消费模式
Kafka 有两种消费模式,最终都会保证一个分区只有 1 个消费者在消费:
- subscribe 方式:当主题分区数量变化或者 consumer 数量变化时,会进行 rebalance;注册 rebalance 监听器,可以手动管理 offset 不注册监听器,Kafka 自动管理。
- assign 方式:手动将 consumer 与 partition 进行对应,Kafka 不会进行 rebanlance。
Pulsar 有以下四种消费模式,其中独占模式和灾备模式跟 Kafka 类似,为流模型,每个分区只有 1 个消费者消费,能保证消息有序性。共享模式和 Key 共享模式为队列模型,多个消费者能提高消费速度,但不能保证有序性。
- Exclusive 独占模式(默认模式):一个 Subscription 只能与一个 Consumer 关联,只有这个 Consumer 可以接收到 Topic 的全部消息,如果该 Consumer 出现故障了就会停止消费。
- 灾备模式(Failover):当存在多个 consumer 时,将会按字典顺序排序,第一个 consumer 被初始化为唯一接受消息的消费者。当第一个 consumer 断开时,所有的消息(未被确认和后续进入的)将会被分发给队列中的下一个 consumer。
- 共享模式(Shared):消息通过 round robin 轮询机制(也可以自定义)分发给不同的消费者,并且每个消息仅会被分发给一个消费者。当消费者断开连接,所有被发送给他,但没有被确认的消息将被重新安排,分发给其它存活的消费者。
- KEY 共享模式(Key_Shared):当存在多个 consumer 时,将根据消息的 key 进行分发,key 相同的消息只会被分发到同一个消费者。
RocketMQ 有两种消费模式,BROADCASTING 广播模式,CLUSTERING 集群模式。
广播消费指的是:一条消息被多个 consumer 消费,即使这些 consumer 属于同一个 ConsumerGroup,消息也会被 ConsumerGroup 中的每个 Consumer 都消费一次,广播消费中 ConsumerGroup 概念可以认为在消息划分方面无意义。
集群消费模式:一个 ConsumerGroup 中的 Consumer 实例平均分摊消费消息。例如某个 Topic 有 9 条消息,其中一个 ConsumerGroup 有 3 个实例(可能是 3 个进程,或者 3 台机器),那么每个实例只消费其中部分,消费完的消息不能被其他实例消费。
RabbitMQ 和 NSQ 的消费比较类似,都是跟 Pulsar 共享模式类似的,队列的形式,增加一个消费者组里的消费者数量能提高消费速度。
4.13 消息可靠性
消息丢失是使用消息中间件时所不得不面对的一个同点,其背后消息可靠性也是衡量消息中间件好坏的一个关键因素。尤其是在金融支付领域,消息可靠性尤为重要。比如当服务出现故障时,一些对于生产者来说已经生产成功的消息,是否会在高可用切换时丢失。同步刷盘是增强一个组件可靠性的有效方式,消息中间件也不例外,Kafka 和 RabbitMQ 都可以支持同步刷盘,但绝大多数情景下,一个组件的可靠性不应该由同步刷盘这种极其损耗性能的操作来保障,而是采用多副本的机制来保证。
Kafka 可以通过配置 request.required.acks 参数设置可靠级别,表示一条消息有多少个副本确认接收成功后,才被任务发送成功。
- request.required.acks=-1 (全量同步确认,强可靠性保证)
- request.required.acks=1(leader 确认收到,默认)
- request.required.acks=0 (不确认,但是吞吐量大)
Pulsar 有跟 Kafka 类似的概念,叫 Ack Quorum Size(Qa),Qa 是每次写请求发送完毕后需要回复确认的 Bookie 的个数,其数值越大则需要确认写成功的时间越长,其值上限是副本数 Qw。为了一致性,Qa 应该是:(Qw+1)/2 或者更,即为了确保数据安全性,Qa 下限是 (Qw+1)/2。
RocketMQ 与 Kafka 类似。
RabbitMQ 是主从架构,通过镜像环形队列实现多副本及强一致性语义的。多副本可以保证在 master 节点宕机异常之后可以提升 slave 作为新的 master 而继续提供服务来保障可用性。
NSQ 会通过 go-diskqueue 组件将消息落盘到本地文件中,通过 mem-queue-size 参数控制内存中队列大小,如果 mem-queue-size=0 每条消息都会存储到磁盘里,不用担心节点重启引起的消息丢失。但由于是存储在本地磁盘中,如果节点离线,堆积在节点磁盘里的消息会丢失。
4.14 负载均衡
Kafka:支持负载均衡。一个 broker 通常就是一台服务器节点。对于同一个 Topic 的不同分区,Kafka 会尽力将这些分区分布到不同的 Broker 服务器上,zookeeper 保存了 broker、主题和分区的元数据信息。分区首领会处理来自客户端的生产请求,Kafka 分区首领会被分配到不同的 broker 服务器上,让不同的 broker 服务器共同分担任务。
每一个 broker 都缓存了元数据信息,客户端可以从任意一个 broker 获取元数据信息并缓存起来,根据元数据信息知道要往哪里发送请求。
Kafka 的消费者组订阅同一个 topic,会尽可能地使得每一个消费者分配到相同数量的分区,分摊负载。
当消费者加入或者退出消费者组的时候,还会触发再均衡,为每一个消费者重新分配分区,分摊负载。
Kafka 的负载均衡大部分是自动完成的,分区的创建也是 Kafka 完成的,隐藏了很多细节,避免了繁琐的配置和人为疏忽造成的负载问题。
发送端由 topic 和 key 来决定消息发往哪个分区,如果 key 为 null,那么会使用轮询算法将消息均衡地发送到同一个 topic 的不同分区中。如果 key 不为 null,那么会根据 key 的 hashcode 取模计算出要发往的分区。
RabbitMQ :对负载均衡的支持不好。消息被投递到哪个队列是由交换器和 key 决定的,交换器、路由键、队列都需要手动创建。
RabbitMQ 客户端发送消息要和 broker 建立连接,需要事先知道 broker 上有哪些交换器,有哪些队列。通常要声明要发送的目标队列,如果没有目标队列,会在 broker 上创建一个队列,如果有,就什么都不处理,接着往这个队列发送消息。假设大部分繁重任务的队列都创建在同一个 broker 上,那么这个 broker 的负载就会过大。(可以在上线前预先创建队列,无需声明要发送的队列,但是发送时不会尝试创建队列,可能出现找不到队列的问题,RabbitMQ 的备份交换器会把找不到队列的消息保存到一个专门的队列中,以便以后查询使用)
使用镜像队列机制建立 RabbitMQ 集群可以解决这个问题,形成 master-slave 的架构,master 节点会均匀分布在不同的服务器上,让每一台服务器分摊负载。slave 节点只是负责转发,在 master 失效时会选择加入时间最长的 slave 成为 master。
当新节点加入镜像队列的时候,队列中的消息不会同步到新的 slave 中,除非调用同步命令,但是调用命令后,队列会阻塞,不能在生产环境中调用同步命令。
当 RabbitMQ 队列拥有多个消费者的时候,队列收到的消息将以轮询的分发方式发送给消费者。每条消息只会发送给订阅列表里的一个消费者,不会重复。
这种方式非常适合扩展,而且是专门为并发程序设计的。
如果某些消费者的任务比较繁重,那么可以设置 basicQos 限制信道上消费者能保持的最大未确认消息的数量,在达到上限时,RabbitMQ 不再向这个消费者发送任何消息。
对于 RabbitMQ 而言,客户端与集群建立的 TCP 连接不是与集群中所有的节点建立连接,而是挑选其中一个节点建立连接。
但是 RabbitMQ 集群可以借助 HAProxy、LVS 技术,或者在客户端使用算法实现负载均衡,引入负载均衡之后,各个客户端的连接可以分摊到集群的各个节点之中。
客户端均衡算法:
- 轮询法。按顺序返回下一个服务器的连接地址。
- 加权轮询法。给配置高、负载低的机器配置更高的权重,让其处理更多的请求;而配置低、负载高的机器,给其分配较低的权重,降低其系统负载。
- 随机法。随机选取一个服务器的连接地址。
- 加权随机法。按照概率随机选取连接地址。
- 源地址哈希法。通过哈希函数计算得到的一个数值,用该数值对服务器列表的大小进行取模运算。
- 最小连接数法。动态选择当前连接数最少的一台服务器的连接地址。
zeromq:去中心化,不支持负载均衡。本身只是一个多线程网络库。
RocketMQ:支持负载均衡。一个 broker 通常是一个服务器节点,broker 分为 master 和 slave,master 和 slave 存储的数据一样,slave 从 master 同步数据。
nameserver 与每个集群成员保持心跳,保存着 Topic-Broker 路由信息,同一个 topic 的队列会分布在不同的服务器上。
发送消息通过轮询队列的方式发送,每个队列接收平均的消息量。发送消息指定 topic、tags、keys,无法指定投递到哪个队列(没有意义,集群消费和广播消费跟消息存放在哪个队列没有关系)。
tags 选填,类似于 Gmail 为每封邮件设置的标签,方便服务器过滤使用。目前只支 持每个消息设置一个 tag,所以也可以类比为 Notify 的 MessageType 概念。
keys 选填,代表这条消息的业务关键词,服务器会根据 keys 创建哈希索引,设置后, 可以在 Console 系统根据 Topic、Keys 来查询消息,由于是哈希索引,请尽可能 保证 key 唯一,例如订单号,商品 Id 等。
RocketMQ 的负载均衡策略规定:Consumer 数量应该小于等于 Queue 数量,如果 Consumer 超过 Queue 数量,那么多余的 Consumer 将不能消费消息。这一点和 kafka 是一致的,RocketMQ 会尽可能地为每一个 Consumer 分配相同数量的队列,分摊负载。
activemq:支持负载均衡。可以基于 zookeeper 实现负载均衡。
4.15 集群方式
Kafka:天然的‘Leader-Slave’无状态集群,每台服务器既是 Master 也是 Slave。分区首领均匀地分布在不同的 Kafka 服务器上,分区副本也均匀地分布在不同的 Kafka 服务器上,所以每一台 Kafka 服务器既含有分区首领,同时又含有分区副本,每一台 Kafka 服务器是某一台 Kafka 服务器的 Slave,同时也是某一台 Kafka 服务器的 leader。
Kafka 的集群依赖于 zookeeper,zookeeper 支持热扩展,所有的 broker、消费者、分区都可以动态加入移除,而无需关闭服务,与不依靠 zookeeper 集群的 mq 相比,这是最大的优势。
RabbitMQ:支持简单集群,'复制'模式,对高级集群模式支持不好。
RabbitMQ 的每一个节点,不管是单一节点系统或者是集群中的一部分,要么是内存节点,要么是磁盘节点,集群中至少要有一个是磁盘节点。
在 RabbitMQ 集群中创建队列,集群只会在单个节点创建队列进程和完整的队列信息(元数据、状态、内容),而不是在所有节点上创建。
引入镜像队列,可以避免单点故障,确保服务的可用性,但是需要人为地为某些重要的队列配置镜像。
zeromq:去中心化,不支持集群。
RocketMQ:常用 多对'Master-Slave' 模式,开源版本需手动切换 Slave 变成 Master
Name Server 是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
Broker 部署相对复杂,Broker 分为 Master 与 Slave,一个 Master 可以对应多个 Slave,但是一个 Slave 只能对应一个 Master,Master 与 Slave 的对应关系通过指定相同的 BrokerName,不同的 BrokerId 来定义,BrokerId 为 0 表示 Master,非 0 表示 Slave。Master 也可以部署多个。每个 Broker 与 Name Server 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 Name Server。
Producer 与 Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从 Name Server 取 Topic 路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态,可集群部署。
Consumer 与 Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从 Name Server 取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,订阅规则由 Broker 配置决定。
客户端先找到 NameServer, 然后通过 NameServer 再找到 Broker。
一个 topic 有多个队列,这些队列会均匀地分布在不同的 broker 服务器上。RocketMQ 队列的概念和 Kafka 的分区概念是基本一致的,Kafka 同一个 topic 的分区尽可能地分布在不同的 broker 上,分区副本也会分布在不同的 broker 上。
RocketMQ 集群的 slave 会从 master 拉取数据备份,master 分布在不同的 broker 上。
activemq:支持简单集群模式,比如'主-备',对高级集群模式支持不好。