消息队列

简介

消息队列是一种异步的服务间通信方式,适用于无服务器和微服务架构。消息在被处理和删除之前一直存储在队列上。每条消息仅可被一位用户处理一次。消息队列可被用于分离重量级处理、缓冲或批处理工作以及缓解高峰期工作负载。

在现在的服务架构中,应用程序通常被分解为多个规模较小且更易于开发、部署和维护的独立构建块。消息队列可为这些分布式应用程序提供通信和协调。消息队列可以显著简化分离应用程序的编码,同时提高性能、可靠性和可扩展性。

借助消息队列,系统的不同部分可相互通信并异步执行处理操作。消息队列提供一个临时存储消息的轻量级缓冲区,以及允许软件组件连接到队列以发送和接收消息的终端节点。这些消息通常较小,可以是请求、恢复、错误消息或明文信息等。要发送消息时,一个名为“创建器”的组件会将消息添加到队列。消息将存储在队列中,直至名为“处理器”的另一组件检索该消息并执行相关操作。

原理

设计需求

解耦

解耦是消息队列要解决的最本质问题。

简单点讲就是一个事务,只关心核心的流程。而需要依赖其他系统但不那么重要的事情,有通知即可,无需等待结果,即非实时性。

如果所有的流程都等待结果,即通过接口方式去通知或者更新数据,那么整个系统会显得非常”笨重”,一个请求的响应时间也会大大增加。

最终一致性

最终一致性指的是两个系统的状态保持一致,要么都成功,要么都失败。

理论上越快越好,但实际上在各种异常的情况下,可能会有一定延迟达到最终一致状态,但最后两个系统的状态是一样的。

业界有一些为“最终一致性”而生的消息队列,如 Notify(阿里)、QMQ(去哪儿)等,其设计初衷,就是为了交易系统中的高可靠通知。

所有跨 VM 的一致性问题,从技术的角度讲通用的解决方案是:

  1. 强一致性
    分布式事务,但落地太难且成本太高。
  2. 最终一致性
    主要是用“记录”和“补偿”的方式。在做所有的不确定的事情之前,先把事情记录下来,然后去做不确定的事情,结果可能是:成功、失败或是不确定,“不确定”(例如超时等)可以等价为失败。成功就可以把记录的东西清理掉了,对于失败和不确定,可以依靠定时任务等方式把所有失败的事情重新搞一遍,直到成功为止。

最终一致性不是消息队列的必备特性,但确实可以依靠消息队列来做最终一致性的事情。另外,所有不保证理论上 100%不丢消息的消息队列 (排除系统严重故障和 bug),理论上无法实现最终一致性。像 Kafka 一类的设计,在设计层面上就有丢消息的可能(比如定时刷盘,如果掉电就会丢消息)。哪怕只丢千分之一的消息,业务也必须用其他的手段来保证结果正确。

广播

消息队列的基本功能之一是进行广播。

如果没有消息队列,每当一个新的业务方接入,都要联调一次新接口。有了消息队列,只需要关心消息是否送达了队列,至于谁想订阅,是下游的事情,无疑极大地减少了开发和联调的工作量。

错峰与流控

试想上下游对于事情的处理能力是不同的。比如,Web 前端每秒承受上千万的请求,并不是什么神奇的事情,只需要加多一点机器,再搭建一些 LVS 负载均衡设备和 Nginx 等即可。但数据库的处理能力却十分有限,即使使用 SSD 加分库分表,单机的处理能力仍然在万级。由于成本的考虑,我们不能奢求数据库的机器数量追上前端。 这种问题同样存在于系统和系统之间,如短信系统可能由于短板效应,速度卡在网关上(每秒几百次请求),跟前端的并发量不是一个数量级。但用户晚上个半分钟左右收到短信,一般是不会有太大问题的。

如果没有消息队列,两个系统之间通过协商、滑动窗口等复杂的方案也不是说不能实现。但系统复杂性指数级增长,势必在上游或者下游做存储,并且要处理定时、拥塞等一系列问题。而且每当有处理能力有差距的时候,都需要单独开发一套逻辑来维护这套逻辑。所以,利用中间系统转储两个系统的通信内容,并在下游系统有能力处理这些消息的时候,再处理这些消息,是一套相对较通用的方式。

总而言之,消息队列不是万能的。对于需要强事务保证而且延迟敏感的,RPC 是优于消息队列的。 对于一些无关痛痒,或者对于别人非常重要但是对于自己不是那么关心的事情,可以利用消息队列去做。 支持最终一致性的消息队列,能够用来处理延迟不那么敏感的“分布式事务”场景,而且相对于笨重的分布式事务,可能是更优的处理方式。 当上下游系统处理能力存在差距的时候,利用消息队列做一个通用的“漏斗”。在下游有能力处理的时候,再进行分发。 如果下游有很多系统关心你的系统发出的通知的时候,果断地使用消息队列吧。

综述

基于消息的系统模型,不一定需要 broker(消息队列服务端)。市面上的的 Akka(actor 模型)、ZeroMQ 等,其实都是基于消息的系统设计范式,但是没有 broker。 我们之所以要设计一个消息队列,并且配备 broker,无外乎要做两件事情:

  1. 消息的转储,在更合适的时间点投递,或者通过一系列手段辅助消息最终能送达消费机。
  2. 规范一种范式和通用的模式,以满足解耦、最终一致性、错峰等需求。 掰开了揉碎了看,最简单的消息队列可以做成一个消息转发器,把一次 RPC 做成两次 RPC。发送者把消息投递到服务端(以下简称 broker),服务端再将消息转发一手到接收端,就是这么简单。

一般来讲,设计消息队列的整体思路是先 build 一个整体的数据流,例如 producer 发送给 broker, broker 发送给 consumer, consumer 回复消费确认,broker 删除/备份消息等。 利用 RPC 将数据流串起来。然后考虑 RPC 的高可用性,尽量做到无状态,方便水平扩展。之后考虑如何承载消息堆积,然后在合适的时机投递消息,而处理堆积的最佳方式,就是存储,存储的选型需要综合考虑性能/可靠性和开发维护成本等诸多因素。 为了实现广播功能,我们必须要维护消费关系,可以利用 zk/config server 等保存消费关系。 在完成了上述几个功能后,消息队列基本就实现了。然后我们可以考虑一些高级特性,如可靠投递,事务特性,性能优化等。

mq-队列设计

实现一个消息队列

消息队列设计精要

现有模型

JMS(Java Message Service)

JMS 是一个 Java 平台中关于面向消息中间件(MOM)的 API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

Java 消息服务是一个与具体平台无关的 API,它包括两种消息模式,点对点和发布者/订阅者:

  • JMS 点对点或队列模型:一个生产者向一个特定的队列发布消息,只有一个消费者从该队列中读取消息;生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态;每一个成功处理的消息都由接收者签收;多个消费者对于队列内的消息是竞争消费关系,每个消费者只能收到队列中的一部分消息。
    mq-jms-队列模型
  • JMS 发布者/订阅者模型:一个生产者向一个特定的队列发布消息,0 个或多个订阅者可以接受来自特定消息主题的消息;发布者需要创建一个订阅主题(Topic)以便客户能够购订阅并保持持续的活动以接受消息,一个订阅主题是由至少一个队列(Queue)组成的,除非订阅者创建了持久的订阅,在订阅者未连接时发布的消息将在订阅者重新连接时重新发布,每个消费者都能收到全量的消息。
    mq-jms-订阅模型

AMQP(Advanced Message Queuing Protocol)

AMQP 是一种协议,更准确的说是一种 binary wire-level protocol(链接协议)。

AMQP 不从 API 层进行限定,而是直接定义网络交换的数据格式。这使得实现了 AMQP 的 provider 天然性就是跨平台的。意味着我们可以使用 Java 的 AMQP provider,同时使用一个 python 的 producer 加一个 rubby 的 consumer。从这一点看,AQMP 可以用 http 来进行类比,不关心实现的语言,只要大家都按照相应的数据格式去发送报文请求,不同语言的 client 均可以和不同语言的 server 链接。

协议

从整体来看,AMQP 协议可划分为两层:

  1. Functional Layer (功能层)
    位于协议上层,主要定义了一组命令(基于功能的逻辑分类),用于应用程序调用实现自身所需的业务逻辑。例如:应用程序可以通过功能层定义队列名称,生产消息到指定队列,消费指定队列消息等基于(Message queues 模型)
  2. Transport Layer (传输层)
    基于二进制数据流传输,用于将应用程序调用的指令传回服务器,并返回结果,同时可以进行信道复用,帧处理,内容编码,心跳传输,数据传输和异常处理。

这样分层之后,可以把传输层替换为其它传输协议,而不需要修改功能层。同样,也可以使用同样的传输层,基于此实现不同的上层协议。

设计思路

AMQ Model 的设计是由以下需求驱动的:

  • 确保符合标准的实现之间的互操作性。
  • 提供清晰且直接的方式控制 QoS
  • 保持一致和明确的命名
  • 通过协议能够修改服务端的各种配置
  • 使用可以轻松映射到应用程序级 API 的命令符号
  • 清晰,每个操作只能做一件事。

AMQP 传输层是由以下需求驱动的

  • 紧凑。能够快速封包和解包
  • 可以携带任意大小的消息,没有明显的限制
  • 同一个连接可以承载多个通道(Channel)
  • 长时间存活,没有显著的限制
  • 允许异步命令流水线
  • 容易扩展。易于处理新需求、或者变更需求
  • 向前兼容
  • 使用强大的断言模型,可修复
  • 对编程语言保持中立
  • 适合代码生成过程

在设计过程中,希望能够支持不同的消息架构:

  • 先存后发模型。有多个 Writer,只有一个 Reader
  • 分散工作负载。有多个 Writer 和多个 Reader
  • 发布订阅模型,多个 Writer 和多个 reader
  • 基于消息内容的路由,多个 Writer,多个 Reader
  • 队列文件传输,多个 Writer,多个 Reader
  • 两个节点之间点对点连接
  • 市场数据(Market data)分发。多个数据源,多个 Reader

架构模型

AMQP Model 主要包含了三个主要的组件:

  1. Exchange: message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), fanout (multicast), topic (publish-subscribe)。
  2. Queue: 消息最终被送到这里等待 consumer 取走。一个 message 可以被同时拷贝到多个 queue 中。
  3. Binding: exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据。
Publisher(发布者)

发布者 (或称为生产者) 负责生产消息并将其投递到指定的交换器上

Consumer(消费者)

消费者订阅感兴趣的队列,并负责消费存储在队列中的消息。

Message(消息)

消息由消息头和消息体组成。消息头用于存储与消息相关的元数据:如目标交换器的名字 (exchange_name) 、路由键 (RountingKey) 和其他可选配置 (properties) 信息。消息体为实际需要传递的数据。

Exchange(交换器)

交换器是一个虚拟主机内的消息路由 Agent,交换器可能是持久化的、临时的、自动删除的。交换器把消息路由到消息队列时可以是并行的。这会创建一个消息的多个实例。

交换器负责接收来自生产者的消息,并将将消息路由到一个或者多个队列中,如果路由不到,则返回给生产者或者直接丢弃,这取决于交换器的 mandatory 属性:

  • 当 mandatory 为 true 时:如果交换器无法根据自身类型和路由键找到一个符合条件的队列,则会将该消息返回给生产者;
  • 当 mandatory 为 false 时:如果交换器无法根据自身类型和路由键找到一个符合条件的队列,则会直接丢弃该消息。

四种交换机:Direct Exchange(直连交换机),Fanout Exchange(扇型交换机),Topic Exchange(主题交换机),Headers Exchange(头交换机)

Direct Exchange

Binding 的 Routing Key 要和消息的 Routing Key 完全匹配

Fanout Exchange

不管消息的 Routing Key,广播给这个交换机下的所有绑定队列

Topic Exchange

绑定的 Routing Key 和 消息的 Routing Key 进行字符串的模糊匹配,匹配规则如下:

  • RountingKey 和 BindingKey 由多个单词使用 . 进行连接;
  • BindingKey 支持两个特殊符号:#* 。其中 * 用于匹配一个单词, # 用于匹配零个或者多个单词。
Headers Exchange

在交换器与队列进行绑定时可以指定一组键值对作为 BindingKey;在发送消息的 headers 中的可以指定一组键值对属性,当这些属性与 BindingKey 相匹配时,则将消息路由到该队列。同时还可以使用 x-match 参数指定匹配模式:

  1. x-match = all:所有的键值对都相同才算匹配成功;
  2. x-match = any:只要有一个键值对相同就算匹配成功。

headers 类型的交换器性能比较差,因此其在实际开发中使用得比较少。

BindingKey (绑定键)

交换器与队列通过 BindingKey 建立绑定关系。

Routingkey(路由键)

生产者将消息发给交换器的时候,一般会指定一个 RountingKey,用来指定这个消息的路由规则。当 RountingKey 与 BindingKey 基于交换器类型的规则相匹配时,消息被路由到对应的队列中。

Queue(消息队列)

用于存储路由过来的消息。多个消费者可以订阅同一个消息队列,此时队列会将收到的消息将以轮询 (round-robin) 的方式分发给所有消费者。即每条消息只会发送给一个消费者,不会出现一条消息被多个消费者重复消费的情况。

Connection(连接)

用于传递消息的 TCP 连接。

Channel(信道)

类似 NIO (非阻塞式 IO ) 的设计,通过 Channel 来复用 TCP 连接,并确保每个 Channel 的隔离性,就像是拥有独立的 Connection 连接。

Virtual Host(虚拟主机)

通过虚拟主机来实现逻辑分组和资源隔离,一个虚拟主机就是一个小型的 MQ 服务器,拥有独立的队列、交换器和绑定关系,虚拟主机之间是完全独立的。

AMQP 和 JMS 差异

项目 AMQP JMS
定义 线级协议 Java API
跨平台
跨语言
消息收发模型 4 种消息收发模型:
Direct Exchange
Fanout Exchange
Topic Exchange
Header Exchange
2 种消息收发模型:
P2P
Pub/Sub
消息类型 二进制数据类型 5 种消息类型:
Text message
Object message
Bytes message
Stream message
Map message
消息流 Producer 将消息发送到 Exchange,Exchange 将消息路由到 Queue,Consumer 从 Queue 中消费消息。 Producer 将消息发送到 Queue 或者 Topic,Consumer 从 Queue 或 Topic 中消费消息。

问题

MQ怎么解决重复消费的问题

关键是解决消费的幂等性。每条生产消息都应该配置一个全局唯一D和消费状态,存放在mysqli或edis中。在消费端的业务代码中可以通过中间件或拦截器检查消息表中的状态是否已经被消费过。

MQ消息积压怎么解决

  1. 优先解决线上问题,临时扩容消费端
  2. 通过日志排查,为何会积压消息
  3. 优化业务逻辑,或根据实际情况选择扩容

MQ如何选择

kafka,只做消息持久化,不删消息,消费端需要维护offset,特点是高并发高吞吐,适合日志队列
rabbitmg,数据安全可靠,支持事务等,队列模型多样,适合各种业务场景
rocketmq,数据可靠,阿里维护,比较成熟

几种mq对比

特性 ActiveMQ RabbitMQ RocketMQ Kafka Pulsar
单机吞吐量 万级,比RocketMQ、Kafka低一个数量级 同ActiveMQ 10w+/s,支撑高吞吐,支持强一致,强一致性下吞吐量稍低 10w+/s,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景 14w+/s,高吞吐,支持强一致
topic数量对吞吐量的影响 topic可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是RocketMQ的一大优势,在同等机器下,可以支撑大量的topic topic从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka尽量保证topic数量不要过多,如果要支撑大规模的topic,需要增加更多的机器资源 Pulsar采用存算分离的架构,数据采用bookeeper存储。上层broker是无状态代理, 两层可以独立扩容,因此topic个数对吞吐量不会产生显著的影响,
时效性 ms级 ms级 ms级 延迟在ms级以内 ms级
可用性 高,基于主从架构实现高可用 同ActiveMQ 非常高,分布式架构(分为4.5版本后的Dledger架构和普通master-slave架构两种,但普通主从架构不支持故障自动切换,运维不太友好) 非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 非常高,broker层是无状态代理,动态扩容,数据存储层bookeeper采用segment-oriented存储机制,无写入不可用风险
消息可靠性 有较低的概率丢失数据 基本不丢 经过参数优化配置,可以做到0丢失 经过参数优化配置,可以做到 最小概率不丢失 经过参数配置后,可以做到0丢失
功能支持 MQ领域的功能极其完备 基于erlang开发,并发能力很强,性能极好,延时很低 MQ功能较为完善,还是分布式的,扩展性好,社区和kafka相比,并不是太活跃 功能较为简单易用、生态完善、绝大部分场景都可使用,在大数据领域的实时计算以及日志采集被大规模使用 云原生时代的新一代消息中间件,社区活跃、支持多租户、强一致、跨域部署等诸多特性

延迟队列怎么实现

场景

  1. 订单成功后,在30分钟内没有支付,自动取消订单
  2. 如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存
  3. 支付成功后,2秒后查询支付结果

方案

  1. rabbitmg TTL+死消息队列:基本原理是对消息设置过期时间,过期的消息会放置到死消息队列,专门处理死消息队列视作延迟队列
  2. redis有序集合数据安全不太可靠,集合中的数据量大时,影响速度。基本原理是插入时间戳为score的消息,让zSet按时间自动排序。消费时判断时间戳,到时间则处理,没到时间则轮询等待。
  3. kfka时间轮:原理比较复杂,基本原理使用一个数组模拟时间钟,键作为秒针,把消息任务存放在某个键值内,程序定时取该数组键值执行然后。

参考资料

  1. 消息队列设计精要
  2. 消息队列漫谈:什么是消息模型?
  3. AMQP 协议详解
  4. AMQP 0-9-1 Model Explained
  5. AMQP 协议学习
  6. RabbitMQ 核心概念详解