消息队列与 RabbitMQ 入门
这篇笔记用于入门消息队列和 RabbitMQ。
如果你已经接触过微服务、电商系统、订单系统、库存系统、异步通知、削峰填谷这些场景,就会发现消息队列不是为了“炫技”而存在的。它主要解决的是服务之间直接调用带来的耦合、耗时和流量峰值问题。
这篇内容整理自旧笔记,并参考了 RabbitMQ 官方文档。原参考文章链接:消息队列—RabbitMQ(万字详解)
为什么需要消息队列
Section titled “为什么需要消息队列”假设在一个微服务电商项目中,商品修改和商品查询由不同服务负责。
当商户修改了商品价格后,可能需要同步更新多个地方:
- 商户商品库。
- 商品查询库。
- 自媒体商品查询库。
- 搜索索引。
- 缓存。
如果所有服务都由商品修改服务同步调用,就会出现几个问题:
- 商品修改接口响应时间变长。
- 被调用服务不可用时,主流程容易失败。
- 商品修改服务和多个下游服务强耦合。
- 后续新增下游系统时,还要修改主服务代码。
消息队列的思路是:
商品修改服务 | | 修改主库成功 v发送商品变更消息 | v消息队列 | ├─ 商品查询服务消费 ├─ 自媒体查询服务消费 ├─ 搜索服务消费 └─ 缓存服务消费主服务只负责完成自己的核心业务,然后发送消息。后续同步、缓存、搜索、通知等操作由下游服务异步完成。
同步调用和异步调用
Section titled “同步调用和异步调用”服务之间通信一般可以分成两类:同步调用和异步调用。
同步调用指 A 服务调用 B 服务后,需要等待 B 服务返回结果,A 服务才能继续往下执行。
常见方式:
- REST 调用。
- Feign。
- Ribbon。
- Dubbo。
- gRPC。
流程大致是:
A 服务 -> 调用 B 服务 -> 等待 B 服务返回 -> A 服务继续执行优点是调用结果明确,适合强依赖场景。
缺点是调用链变长后,响应时间会累加;下游服务异常也容易影响上游服务。
异步调用指 A 服务发出消息后,不需要等待 B 服务立刻执行完成。
常见方式就是消息队列。
流程大致是:
A 服务 -> 发送消息到 MQ -> A 服务继续执行 | v B 服务异步消费优点是主流程响应更快,服务之间耦合更低。
缺点是系统复杂度提高,需要考虑消息丢失、重复消费、顺序、延迟和最终一致性。
消息队列是什么
Section titled “消息队列是什么”MQ 是 Message Queue 的缩写,也就是消息队列。
可以简单理解为:
消息队列是应用之间传递消息的中间层。生产者把消息放进去,消费者从里面取消息处理。
它的核心角色通常有三个:
| 角色 | 说明 |
|---|---|
| 生产者 | 发送消息的一方 |
| 消息队列 | 存储和转发消息的中间件 |
| 消费者 | 接收并处理消息的一方 |
普通方法调用是“我直接找你办事”。
消息队列是“我把事情写到队列里,你有空来处理”。
常见 MQ 中间件
Section titled “常见 MQ 中间件”常见消息队列中间件包括:
- RabbitMQ。
- ActiveMQ。
- RocketMQ。
- Kafka。
简单对比:
| 中间件 | 特点 | 常见场景 |
|---|---|---|
| RabbitMQ | 稳定可靠,路由模型灵活,支持确认机制,基于 Erlang | 业务消息、异步任务、延迟任务 |
| ActiveMQ | 老牌 MQ,生态较早,但相对不够轻量 | 传统 Java 项目 |
| RocketMQ | 高吞吐、高可用,适合大规模分布式业务消息 | 电商、订单、交易、金融场景 |
| Kafka | 高吞吐、适合日志和流式数据,常用于大数据链路 | 日志采集、埋点、实时计算 |
选型时不要只看性能。
可以按下面的思路判断:
- 业务异步消息、可靠投递、路由灵活:RabbitMQ。
- 大规模业务消息、事务消息、顺序消息:RocketMQ。
- 日志、埋点、实时流处理、高吞吐:Kafka。
消息队列的核心作用
Section titled “消息队列的核心作用”消息队列常见作用有三个:
- 解耦。
- 异步。
- 削峰。
场景:用户下单后,订单系统需要通知库存系统扣减库存。
订单系统直接调用库存系统接口。
用户下单 |订单系统 |调用库存系统 |库存扣减成功 |订单成功问题是:如果库存系统暂时不可用,订单流程就可能失败。
订单系统和库存系统强绑定,一个系统异常会影响另一个系统。
使用消息队列
Section titled “使用消息队列”引入 MQ 后,订单系统只需要把下单消息写入 MQ。
用户下单 |订单系统写订单 |发送下单消息到 MQ |返回用户下单成功 |库存系统异步消费消息并扣库存这样库存系统短暂不可用时,消息可以先堆在队列里,等库存系统恢复后继续消费。
这就是解耦。
场景:用户注册成功后,需要发送注册邮件和注册短信。
写入用户表 -> 发送邮件 -> 发送短信 -> 返回注册成功如果写入数据库 50ms,发邮件 300ms,发短信 300ms,总耗时大约 650ms。
用户要一直等到邮件和短信都发完。
可以用多线程并行发送邮件和短信:
写入用户表 | ├─ 发送邮件 └─ 发送短信 |返回注册成功响应时间会降低,但注册服务仍然要关心邮件和短信的执行结果。
MQ 异步处理
Section titled “MQ 异步处理”使用消息队列后:
写入用户表 -> 发送注册成功消息到 MQ -> 返回注册成功 | ├─ 邮件服务消费 └─ 短信服务消费注册主流程不再等待邮件和短信发送完成。
这就是异步。
场景:秒杀活动开始后,大量请求同时涌入。
如果所有请求直接打到订单系统和数据库,数据库可能会被打垮。
使用消息队列后:
大量秒杀请求 |写入 MQ |订单服务按能力消费 |写数据库MQ 可以承接流量峰值,消费者按自身处理能力慢慢消费。
这就是削峰填谷。
需要注意:削峰不是让请求永远不失败,而是避免数据库被瞬间压垮。队列长度、消费速度、超时策略、库存校验都要一起设计。
RabbitMQ 核心模型
Section titled “RabbitMQ 核心模型”RabbitMQ 中最核心的几个概念:
| 概念 | 说明 |
|---|---|
| Producer | 生产者,负责发送消息 |
| Consumer | 消费者,负责消费消息 |
| Broker | RabbitMQ 服务本身 |
| Exchange | 交换机,接收生产者消息并按规则路由 |
| Queue | 队列,真正存储消息 |
| Binding | 绑定关系,把 Exchange 和 Queue 连接起来 |
| Routing Key | 路由键,Exchange 根据它决定消息去哪里 |
| Channel | 信道,复用 TCP 连接的轻量通信通道 |
RabbitMQ 和很多人想象中的“直接发到队列”不太一样。
在 AMQP 模型里,生产者通常不是直接把消息发给队列,而是先发给 Exchange,再由 Exchange 根据路由规则投递到队列。
Producer -> Exchange -> Queue -> ConsumerExchange 类型
Section titled “Exchange 类型”RabbitMQ 常见交换机类型有四种:
| 类型 | 路由规则 | 适用场景 |
|---|---|---|
direct | routing key 精确匹配 | 按业务类型路由 |
fanout | 广播给所有绑定队列 | 发布订阅、广播通知 |
topic | routing key 通配符匹配 | 多维度灵活路由 |
headers | 根据消息 header 匹配 | 少用,适合复杂 header 匹配 |
direct
Section titled “direct”direct 是精确匹配。
Exchange: order.exchangeRouting Key: order.createdQueue: order.created.queue消息的 routing key 和绑定 key 一致时,消息会被投递到对应队列。
适合订单创建、订单取消、库存扣减这类明确业务事件。
fanout
Section titled “fanout”fanout 是广播。
它不关心 routing key,会把消息投递给所有绑定到该交换机的队列。
商品变更消息 |fanout exchange | ├─ 商品查询服务 ├─ 搜索服务 └─ 缓存服务适合一条消息要通知多个系统的场景。
topic 支持通配符匹配。
常见通配符:
*:匹配一个单词。#:匹配零个或多个单词。
示例:
order.createdorder.cancelledproduct.price.changedproduct.stock.changed绑定关系:
order.* 匹配 order.created、order.cancelledproduct.# 匹配所有 product 开头的消息适合复杂业务事件路由。
队列和消息状态
Section titled “队列和消息状态”消息最终会存储在队列里,消费者从队列中获取消息。
常见状态:
| 状态 | 说明 |
|---|---|
| Ready | 等待消费的消息 |
| Unacked | 已投递给消费者,但还没有确认 |
| Total | 队列中消息总量 |
如果消费者拿到消息后处理成功,需要发送确认。
如果消费者异常退出,没有确认消息,RabbitMQ 可以把消息重新投递给其他消费者。
消息确认机制
Section titled “消息确认机制”RabbitMQ 中常见确认包括两类:
- 生产者确认。
- 消费者确认。
生产者确认用于保证消息成功到达 RabbitMQ。
如果发送失败,生产者可以记录日志、重试或走补偿机制。
消费者确认用于告诉 RabbitMQ:这条消息已经处理完成,可以从队列中删除。
常见方式:
- 自动确认:消息投递给消费者后立即认为成功。
- 手动确认:消费者处理成功后主动 ack。
业务系统中更推荐手动确认。
消费者收到消息 |执行业务逻辑 |处理成功 -> ack处理失败 -> nack / reject / 重试 / 死信队列如果自动确认,消费者刚拿到消息就宕机,消息可能丢失。
想让消息更可靠,需要关注三个阶段:
| 阶段 | 风险 | 常见方案 |
|---|---|---|
| 发送阶段 | 生产者发送失败 | 生产者确认、失败重试 |
| 存储阶段 | MQ 宕机导致消息丢失 | 队列持久化、消息持久化 |
| 消费阶段 | 消费者处理失败 | 手动 ack、重试、死信队列 |
RabbitMQ 中队列和消息都需要考虑持久化。
只声明持久化队列还不够,消息本身也要设置持久化。
重复消费和幂等
Section titled “重复消费和幂等”消息队列通常不能假设消息只会被消费一次。
消费者处理失败、网络异常、ack 丢失,都可能导致消息重新投递。
所以消费者必须考虑幂等。
常见做法:
- 使用业务唯一 ID。
- 处理前查询是否已经处理过。
- 数据库唯一索引。
- Redis 去重。
- 状态机控制。
例如订单支付成功消息:
收到 payment.success 消息 |查询订单状态 |如果已经支付 -> 直接返回成功如果未支付 -> 更新状态并记录流水不要让重复消息导致重复扣库存、重复发券、重复退款。
死信队列用于接收无法正常消费的消息。
常见进入死信队列的情况:
- 消息被拒绝并且不重新入队。
- 消息过期。
- 队列达到最大长度。
常见用途:
- 异常消息隔离。
- 延迟队列。
- 失败任务人工排查。
- 后续补偿处理。
例如:
正常队列 -> 消费失败多次 -> 死信交换机 -> 死信队列死信队列不是为了掩盖错误,而是把异常消息从正常消费链路中隔离出来,避免阻塞主队列。
TTL 和延迟消息
Section titled “TTL 和延迟消息”TTL 是 Time To Live,也就是消息或队列的过期时间。
RabbitMQ 可以设置:
- 消息 TTL。
- 队列 TTL。
结合死信交换机,可以实现延迟消息。
典型场景:
- 订单 30 分钟未支付自动关闭。
- 优惠券过期提醒。
- 延迟重试。
流程:
发送消息到延迟队列 |消息等待 TTL 到期 |进入死信交换机 |投递到业务队列 |消费者处理如果业务大量依赖延迟消息,也可以考虑 RabbitMQ 延迟消息插件、RocketMQ 延时消息或其他调度系统。
如果消费者处理能力有限,不能让 RabbitMQ 一次推太多消息。
可以使用 prefetch 限制未确认消息数量。
prefetch = 1表示消费者一次只处理一条未确认消息,处理完 ack 后再接收下一条。
这样可以避免一个消费者堆积太多未处理消息,也能让多个消费者之间更公平地分配任务。
RabbitMQ 常见工作模式
Section titled “RabbitMQ 常见工作模式”一个生产者,一个队列,一个消费者。
Producer -> Queue -> Consumer适合最简单的异步任务。
一个生产者,一个队列,多个消费者竞争消费。
Producer -> Queue -> Consumer A -> Consumer B适合任务分摊,比如图片处理、邮件发送。
一个消息广播给多个队列。
Producer -> fanout exchange ├-> Queue A -> Consumer A └-> Queue B -> Consumer B适合一条事件要通知多个系统。
通过 routing key 把不同消息路由到不同队列。
error -> error.queueinfo -> info.queuewarning -> warning.queue适合按消息级别或业务类型分发。
Topic 模式
Section titled “Topic 模式”通过通配符实现更灵活的路由。
order.createdorder.cancelledproduct.price.changedproduct.stock.changed适合复杂业务事件体系。
使用 MQ 的注意点
Section titled “使用 MQ 的注意点”使用 MQ 之后,系统并不是一定更简单,而是把直接调用的问题转化成了异步系统的问题。
需要重点关注:
- 消息是否可能丢失。
- 消息是否可能重复。
- 消息是否需要顺序。
- 消费失败如何重试。
- 消息堆积如何处理。
- 下游是否幂等。
- 是否允许最终一致性。
- 监控和告警是否完善。
如果一个业务必须强一致,且调用方必须立即知道结果,就不一定适合用 MQ。
面试时怎么回答
Section titled “面试时怎么回答”如果面试问“为什么使用消息队列”,可以这样回答:
消息队列主要用于解耦、异步和削峰。比如用户下单后,订单系统不直接同步调用库存、短信、搜索等服务,而是把订单事件发送到 MQ,由下游服务异步消费。这样主流程响应更快,下游服务短暂不可用也不会直接影响主流程。在高并发场景下,MQ 还可以承接流量峰值,让消费者按自身处理能力慢慢消费。
如果继续问 RabbitMQ 的核心模型,可以这样回答:
RabbitMQ 中生产者通常把消息发送到 Exchange,Exchange 根据 routing key 和 binding 规则把消息路由到 Queue,消费者从 Queue 中消费消息。常见 Exchange 有 direct、fanout、topic 和 headers。为了保证可靠性,需要考虑生产者确认、队列和消息持久化、消费者手动 ack、失败重试、死信队列以及消费者幂等。
消息队列不是单纯的“中间件”,而是一种系统设计思路。
它解决的核心问题是:
解耦:服务之间不直接强依赖异步:主流程不用等待非核心操作削峰:高峰流量先进入队列,消费者按能力处理RabbitMQ 的核心链路可以记成:
Producer -> Exchange -> Queue -> Consumer真正落到项目里,还要考虑可靠投递、重复消费、幂等、死信队列、延迟消息、消费限流和监控告警。
这些点理解后,RabbitMQ 就不只是“会用”,而是能讲清楚为什么用、怎么用、用了以后要注意什么。