开头
我一开始学习 RabbitMQ 高级特性时,最大的困惑不是“怎么发消息”,而是:
消息确实发出去了,但它一定成功了吗?
多个消费者一起消费时,为什么有的很忙、有的很闲?
秒杀场景下,MQ 会不会把后端服务直接压垮?
这篇文章我就围绕 RabbitMQ 高级特性中的几个实际问题来整理:事务、消息分发、限流、负载均衡。
一、先介绍背景:不是会发消息就够了
普通的 RabbitMQ 入门代码一般是:
rabbitTemplate.convertAndSend("exchange", "routingKey", "hello rabbitmq");
但项目里真正麻烦的是这些问题:
| 问题 | 具体表现 | 对应特性 |
|---|---|---|
| 消息发送一半失败 | 第一条消息发出去了,第二条代码异常 | 事务 |
| 多消费者消费不均衡 | 慢消费者堆积,快消费者空闲 | 消息分发 |
| 瞬时流量太大 | 消费端一次拿太多消息 | 限流 |
| 快慢消费者能力不同 | 希望谁空闲谁多干活 | prefetch 负载均衡 |
二、一个具体案例:我以为多个消费者会自动均衡
我一开始以为,只要启动两个消费者,RabbitMQ 就会自动判断谁处理得快,然后把更多消息发给快的消费者。
但默认情况下,RabbitMQ 更像是“轮询分发”:
消息1 -> 消费者A
消息2 -> 消费者B
消息3 -> 消费者A
消息4 -> 消费者B

问题是,如果消费者 B 很慢,消息还是可能提前分给它,导致 B 手里堆着未确认消息,而 A 可能已经空了。
三、问题原因分析
1. 事务解决什么问题?
事务解决的是:一组消息发送操作要么都成功,要么都失败。
例如下面这个场景:
rabbitTemplate.convertAndSend("", "trans_queue", "trans test 1...");
int a = 5 / 0;
rabbitTemplate.convertAndSend("", "trans_queue", "trans test 2...");
如果没有事务,第一条消息可能已经进队列了,后面代码异常也不会影响它。
如果加上事务,异常发生后,前面发送的消息也会回滚。
2. 消息分发为什么会不公平?
RabbitMQ 默认会把队列中的消息分发给不同消费者,但它不会主动关心消费者当前处理速度。
也就是说,它不是天然的“谁快谁多拿”,而是需要我们通过 prefetch 来限制消费者手里最多能拿多少未确认消息。
四、一步步给出解决方案
1. 开启 RabbitMQ 事务
先配置事务管理器,并让 RabbitTemplate 使用事务信道:
@Configuration
public class TransactionConfig {
@Bean
public RabbitTransactionManager transactionManager(
CachingConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
@Bean
public RabbitTemplate rabbitTemplate(
CachingConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
}
再声明队列:
@Bean("transQueue")
public Queue transQueue() {
return QueueBuilder.durable("trans_queue").build();
}
生产者示例:
@Transactional
@RequestMapping("/send")
public String send() {
rabbitTemplate.convertAndSend("", "trans_queue", "trans test 1...");
int a = 5 / 0;
rabbitTemplate.convertAndSend("", "trans_queue", "trans test 2...");
return "发送成功";
}
我这里容易忽略的一点是:
只设置 setChannelTransacted(true) 不够,业务方法还要加 @Transactional。
图 1:事务提交与回滚流程
用途:帮助理解为什么加了事务后,异常会导致已发送消息回滚。
关键点:
事务把多次发送动作包在一起,只要中间异常,就不会出现“只成功一半”的情况。
2. 配置消费端限流
配置文件中设置手动确认和 prefetch:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
prefetch: 5
这里的意思是:
每个消费者最多同时持有 5 条未确认消息
消费者代码:
@RabbitListener(queues = "qos.queue")
public void listenerQueue(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("接收到消息: %s, deliveryTag: %d%n",
new String(message.getBody(), "UTF-8"),
deliveryTag);
// 手动确认
channel.basicAck(deliveryTag, false);
}
如果我把 basicAck 注释掉,那么消费者最多只会收到 5 条消息,剩下的消息会停留在队列中。
图 2:prefetch 限流过程
用途:说明为什么设置 prefetch=5 后,消费者不会一次性拿走所有消息。
关键点:
prefetch 控制的是“未确认消息数量”,所以它必须配合手动 ACK 才有意义。
五、补充代码示例
1. 交换机、队列、绑定配置
@Configuration
public class QosConfig {
@Bean("qosExchange")
public Exchange qosExchange() {
return ExchangeBuilder
.directExchange("qos.exchange")
.durable(true)
.build();
}
@Bean("qosQueue")
public Queue qosQueue() {
return QueueBuilder
.durable("qos.queue")
.build();
}
@Bean("qosBinding")
public Binding qosBinding(
@Qualifier("qosExchange") Exchange exchange,
@Qualifier("qosQueue") Queue queue) {
return BindingBuilder
.bind(queue)
.to(exchange)
.with("qos")
.noargs();
}
}
2. 一次发送 20 条消息
@RequestMapping("/qos")
public String qos() {
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend(
"qos.exchange",
"qos",
"qos test..." + i
);
}
return "发送成功";
}
六、负载均衡:让快消费者多处理
如果有两个消费者,一个很快,一个很慢,我更推荐把 prefetch 设置为 1:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
prefetch: 1
意思是:
一个消费者处理并确认完上一条消息之前,不再给它新消息
慢消费者示例:
@RabbitListener(queues = "qos.queue")
public void listenerQueue2(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("消费者2接收到消息: %s, deliveryTag: %d%n",
new String(message.getBody(), "UTF-8"),
deliveryTag);
Thread.sleep(100);
channel.basicAck(deliveryTag, false);
}
图 3:prefetch=1 实现相对公平分发
用途:展示快消费者为什么能拿到更多消息。
关键点:
不是 RabbitMQ 主动识别“谁快”,而是 prefetch=1 + 手动 ACK 让快消费者更快释放窗口。
七、验证方式
验证事务
- 不加
@Transactional - 调用
/producer/trans - 观察队列,第一条消息可能发送成功
- 加上
@Transactional - 再次调用接口,异常后消息整体回滚

不添加事务时, 即使消息的发送过程中出现了异常,对于异常前的任务逻辑仍然会进行
接下来先清空队列中的消息, 添加事务机制 @Transactional

验证限流
- 设置
prefetch: 5 - 注释掉
channel.basicAck - 发送 20 条消息
- 控制台只会打印 5 条
- 管理台中能看到 15 条 ready,5 条 unacked

验证负载均衡
- 设置
prefetch: 1 - 启动两个消费者
- 其中一个消费者加
Thread.sleep(100) - 发送 20 条消息
- 观察日志,快消费者会处理更多消息

八、容易踩坑总结
| 坑点 | 现象 | 解决方式 |
|---|---|---|
只配事务模板,不加 @Transactional | 异常后消息仍可能发送 | 方法上加事务注解 |
只设置 prefetch,但自动 ACK | 限流效果不明显 | 改成手动 ACK |
忘记 basicAck | 消息一直 unacked | 业务成功后确认 |
prefetch=0 | 没有限流上限 | 设置具体数值 |
| 多个消费者 deliveryTag 重复 | 以为消息重复 | deliveryTag 是按 Channel 独立计数 |
| 慢消费者堆积 | 快消费者没活干 | 使用 prefetch=1 |
九、可复用模板
1. 事务发送模板
@Transactional
public void sendInTransaction() {
rabbitTemplate.convertAndSend("", "queue.name", "message 1");
rabbitTemplate.convertAndSend("", "queue.name", "message 2");
}
2. 手动 ACK 消费模板
@RabbitListener(queues = "queue.name")
public void consume(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 业务处理
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
//multiple : false , requeue : true
channel.basicNack(deliveryTag, false, true);
}
}
3. 限流配置模板
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
prefetch: 5
4. 负载均衡配置模板
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
prefetch: 1
结尾总结
这次学习下来,我对 RabbitMQ 高级特性的理解更清楚了:
- 事务解决的是“消息发送要么都成功,要么都失败”的问题。
- RabbitMQ 默认分发不等于真正的负载均衡。
prefetch控制的是消费者手里的未确认消息数量。- 限流必须配合手动 ACK,否则效果会打折。
prefetch=5适合限制单个消费者的处理压力。prefetch=1更适合快慢消费者并存的负载均衡场景。deliveryTag是 Channel 级别的,多个消费者看到重复编号是正常的。- 真正写代码时,不要只看消息有没有发出去,还要看异常、确认、堆积和消费能力。
转载自 CSDN-专业IT技术社区
原文链接:https://blog.csdn.net/2503_91154453/article/details/162193981




