加载中...
avatar
文章
42
标签
25
分类
21
首页
Java
Spring全家桶
  • Spring
  • SpringBoot
  • SpringCloud
JVM
源码
  • Mybatis
  • HashMap
归档
其他
  • 互联网电子书汇总
  • JAVA八股文指南
  • 历史
  • 相册
关于
Logo码农StormlingMQ系列(三)| RabbitMQ 消息确认机制
搜索
首页
Java
Spring全家桶
  • Spring
  • SpringBoot
  • SpringCloud
JVM
源码
  • Mybatis
  • HashMap
归档
其他
  • 互联网电子书汇总
  • JAVA八股文指南
  • 历史
  • 相册
关于

MQ系列(三)| RabbitMQ 消息确认机制

发表于2024-11-20|更新于2024-12-22|消息队列架构
|总字数:2.3k|阅读时长:8分钟|浏览量:

RabbitMQ 消息确认机制

:heavy_exclamation_mark::heavy_exclamation_mark::heavy_exclamation_mark:温馨提示:基于JDK17、SpringBoot 2.1.8.RELEASE 版本,由于RabbitMQ 在 SpringBoot3+ 的配置项有所不同, 所以请严格按照该本版来使用,挖一坑:【后续会出一个SpringBoot3+版本的配置相关教程】

架构

RabbitMQ 消息确认机制-可靠抵达.drawio

概念

保证消息不丢失,可靠抵达,可以使用事务消息,性能下降250倍 为此引入确认机制

  • 生产者确认回调:publisher confirmCallback
  • 生产者退回回调:publisher returnCallback未投递到queue退回模式
  • 消费者确认:consumer ack确认机制

ComfirmCallback【生产者确认回调】

  • 概念:ComfirmCallback是生产者消息确认机制的一部分。当生产者发送消息到 RabbitMQ 的交换器(Exchange)后,RabbitMQ 会返回一个确认消息给生产者,这个确认过程可以通过 ConfirmCallback 来处理。
  • 原理:生产者发送消息时,会为每条消息关联一个 CorrelationData 对象,这个对象可以包含一些自定义的信息,用于跟踪消息。当消息成功发送到交换器后,RabbitMQ 会触发 ConfirmCallback 接口中的【 confirm】 方法。

ReturnCallback【生产者退回回调】

  • 概念:ReturnCallback 用于处理消息无法被正确路由到队列的情况。当生产者发送消息到交换器后,如果交换器无法将消息路由到任何队列(例如,没有匹配的绑定规则或者队列不存在),消息会被退回给生产者,这个退回过程可以通过 ReturnCallback 来处理。
  • 原理:生产者需要配置消息退回机制,并且实现 ReturnCallback 接口。当消息被退回时,ReturnCallback 接口中的 【returnedMessage】 方法会被触发。

BasicAck【消费者确认】

  • 概念: BasicAck是消费者确认消息的一种方式。在 RabbitMQ 中,消费者接收到消息后,需要向 RabbitMQ 服务器确认消息已经被正确处理,这样 RabbitMQ 才会从队列中删除该消息。BasicAck 是手动确认模式下用于确认消息的方法之一。
  • 原理:消费者在手动确认模式下,从队列中接收消息并进行处理。当处理完成且没有出现问题时,消费者可以使用 Channel 对象的basicAck方法来确认消息。basicAck方法需要传入两个参数:deliveryTag和multiple。deliveryTag是消息的唯一标识,由 RabbitMQ 服务器分配;multiple是一个布尔值,用于表示是否确认多条消息。

生产者确认回调 ConfirmCallback

添加配置

# 开启生产者消息确认机制
spring.rabbitmq.publisher-confirms=true

添加 RabbitMQConfig

自定义 confirmCallback#confirm

  • CorrelationData:当前消息唯一关联数据【消息的唯一Id】
  • ack:是否成功收到状态
  • cause:失败原因
@Configuration
@Slf4j
public class RabbitMQConfig {
     @Autowired
     RabbitTemplate rabbitTemplate;
    
     @PostConstruct //创建RabbitMQConfig对象后,执行这个方法
     public void initRabbitTemplate() {
        //设置确认回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        /**
         * @param correlationData 当前消息的唯一关联数据(这个消息的唯一id)
         * @param ack 消息是否成功收到
         * @param cause  失败的原因
        */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        	log.info("confirm=>correlationData【{}】=>ack【{}】=>cause【{}】 ", correlationData, ack, cause);
        }
    });
}

测试:生产者确认

@Slf4j
@RestController
public class ProducerController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * 发送消息
     *
     * @param num
     */
    @GetMapping("/send")
    public void sendMessage(@RequestParam("num") int num) {
        for (int i = 0; i < num; i++) {
            if (i % 2 == 0) {
                OrderReturnReasonEntity data = new OrderReturnReasonEntity();
                data.setId(1L).setCreateTime(new Date()).setName("测试-" + i);
                rabbitTemplate.convertAndSend("hello-java-exchange", "hello-java", data);
            } else {
                OrderEntity data = new OrderEntity();
                data.setOrderSn(UUID.randomUUID().toString());
                rabbitTemplate.convertAndSend("hello-java-exchange", "hello-java", data);
            }
        }
        log.info("发送消息: {}条完成", num);
    }
}

消息发送成功,生产者确认回调生效,注意下这里的correlationData的数据为null

image-20241209143121904

修改下发送信息

ProducerController#sendMessage中添加当前消息的唯一id

rabbitTemplate.convertAndSend("hello-java-exchange", "hello-java", data,new CorralationData(UUID.randomUUID().toString()));
  • 这里correlationData.getId()(也就是UUID)可以帮助开发者在多个消息发送场景中,唯一地标识每条消息,从而准确地跟踪某一条特定消息的发送状态,是发送成功还是失败。

测试2:消息唯一Id

image-20241209143421885

生产者回退回调 ReturnCallback

confirm 模式只能保证消息到达 broker,不能保证消息准确投递到目标 queue 里,我们需要保证消息一定要投递到目标 queue 里,此时就需要用到
return 退回模式。

添加配置

spring.rabbitmq.publisher-returns=true # 开启生产者消息抵达队列的确认
spring.rabbitmq.template.mandatory=true # 只要抵达队列,以异步发送优先回调 return confirm,【发送端确认,默认false】,当交换机无法找到队列时,false【直接丢弃数据】,true【会将消息返回给生产者】

RabbitMQConfig 配置类添加

只有当前消息不能抵达队列才会触发这个回调
//设置消息抵达队列的确认回调
    rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
        /**
          * 只要消息没有投递给指定的队列,就触发这个失败回调
          * @param message  投递失败的消息详细信息
          * @param replyCode  回复的状态码
          * @param replyText  回复的文本内容
          * @param exchange  当时这个消息发给哪个交换机
          * @param routingKey  当时这个消息用哪个路由键
        */
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            log.error("消息发送失败,消息:{},失败码:{},失败原因:{},发送的交换机:{},路由键:{}", message, replyCode, replyText, exchange, replyCode);
	}
});

修改发送消息的路由键

`ProducerController`#`sendMessage`发送消息核心代码修改,将其中一个路由键修改成  `hello2-java`(或者修改成没有可绑定的队列即可)
rabbitTemplate.convertAndSend("hello-java-exchange", "hello2-java", data);// 修改这个路由键为 hello2-java

测试:生产者退回回调

执行发送消息,结果如下

image-20241209143647892

  • 消息成功到达 Broker 服务器,消息确认机制生效,打印 confirm 相关信息
  • 消息接收失败,生产者回退模式生效,其中 失败原因:【NO_ROUTE】没有路由到队列,其中路由键:【hello2-java】,交换机和失败码等信息都打印出来

消费者确认:Ack

消费者收到消息,成功处理发送 `Ack` 给 `Broker`    

消费者收到消息自动确认,但是无法确认消息是否被处理完成或者成功处理,需要手动开启`ack`

测试:默认自动 ack

ProducerController 添加一个发送消息方法

@GetMapping("sendMQ/{num}")
public void sendMQ(@PathVariable int num) {
    for (int i = 0; i < num; i++) {
        OrderReturnReasonEntity data = new OrderReturnReasonEntity();
        data.setId(1L).setCreateTime(new Date()).setName("测试-");
        rabbitTemplate.convertAndSend("hello-java-exchange", "hello-java", 
                                      data,new CorrelationData(UUID.randomUUID().toString()));
    }
    log.info("发送消息: {}条完成", num);
}

发送10条消息

image-20241209162941438

客户端接收到消息,开始处理,处理一条消息完成后,接收下一条消息宕机

image-20241209160547226

收到消息处理一条完成,队列剩下9条消息

image-20241209161006108

image-20241209163428598

此时直接结束服务,代表宕机,队列中的未确认的消息自动被确认

image-20241209163452903

手动ack :添加配置

spring.rabbitmq.listener.simple.acknowleage-mode=maunal # 手动ack消息

发送10条消息,收到后模拟宕机,发现消息不会自动确认

image-20241209164445649

image-20241209164430742

宕机后,消息回到准备状态,没有确认

image-20241209164704294

修改接收消息代码

添加消费者消息确认ack

@RabbitHandler
public void receiveOrderReturnReason(Message message, OrderReturnReasonEntity content, Channel channel) {
    log.info("接收到消息:{}", content);
    //消息体
    byte[] body = message.getBody();
    //消息头配置
    MessageProperties messageProperties = message.getMessageProperties();
    log.info("消息处理完成:消息体内容:{}", content.getName());
    //channel内按顺序自增
    long deliveryTag = messageProperties.getDeliveryTag();
    log.info("deliveryTag:{}", deliveryTag);
    //签收获取,非批量模式
    try {
       if (deliveryTag % 2 == 0) {
           channel.basicAck(deliveryTag, false);
           log.info("签收货物:{}", deliveryTag);
       } else {
           // 拒签 requeue=false丢弃 requeue=true 发回服务器,服务器重新入队
           // long deliveryTag, boolean multiple, boolean requeue
           channel.basicNack(deliveryTag, false, true);
           // long deliveryTag, boolean requeue
           // channel.basicReject(deliveryTag, false);
           log.info("拒绝签收货物:{}", deliveryTag);
       }
    } catch (IOException e) {
        //网络中断
    }
}
  • 消息确认ack,从消息头中获取 deliveryTag
  • deliveryTag:是消息传递标签,它是一个正整数,用于唯一标识一条消息的投递。这个标签主要用于消息确认机制。
    • 消息投递顺序:在通道内【channel】内,消息按照顺序被投递,并且【deliveryTag 】值是单调递增的
    • 重试机制:可以根据未确认deliveryTag重新将消息发送给其他消费者或者在一定时间后重新发送给同一消费者。
  • channel.basicAck(deliveryTag,false) 手动确认,false 非批量
  • channel.basicNack(deliveryTag,false,false) 拒绝确认
    • deliveryTag标识消息的标签, multiple=false 非批量,requeue=false丢弃( requeue=true 发回服务器,服务器重新入队)
  • channel.basicReject(deliverTag,false) 拒绝确认,不能批量

测试:重新入队requeue=true

发送10条消息,`channel.basicNack(deliveryTag,false,true)` 中 `requeue=true` ,消息重新入队,再次消费

image-20241209203537538

所有消息消费完毕

image-20241209203737712

测试:丢弃消息 requeue=false

发送10条消息,channel.basicNack(deliveryTag,false,false) 中 requeue=false ,消息直接丢弃image-20241209203750543

拒绝的消息直接丢弃

image-20241209203831722

文章作者: stormling
文章链接: http://www.stormling.top/posts/38624.html
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来源 码农Stormling!
消息队列架构
cover of previous post
上一篇
MQ系列(四)| RabbitMQ 死信队列和延迟队列
死信队列死信是什么死信:无法被消费的消息。由于特定的原因导致队列中的某些消息无法被消费,这些消息没有后续的处理,就会变成死信。当消息在队列中无法被正常消费时,会被发送到死信队列中。 死信来源消息 TTL队列达到最大长度消息拒签(basicNack 或 basicReject)且重入队列为false(requeue=false) 死信架构 消息TTL 名称 交换机 路由键 类型 特征 参数 普通交换机 normal_exchange zhangsan direct / / 普通队列 normal_queue zhangsan / TTL DLX DLK x-dead-letter-exchange:dead_exchangex-dead-letter-routing-key:lisix-message-ttl: 10 * 1000 死信交换机 dead_exchange lisi direct / / 死信队列 dead_queue lisi / / / 生产者发送消息10条消息到队列...
cover of next post
下一篇
MQ系列(二)| RabbitMQ 整合 SpringBoot
RabbitMQ 整合 SpringBoot概述 大多应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦能力、流量削峰 消息服务中两个重要概念:消息代理(`message broker`)和目的地(`destination`) 当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。 消息队列主要有两种形式的目的地1. 队列(`queue`):点对点消息通信(`point-to-point`) 2. 主题(`topic`):发布(`publish`)/订阅(`subscribe`)消息通信 RabbitMQ 架构图 概念生产者 Producer生产者是消息的发送方,它将消息发送到 RabbitMQ 的交换器中。 ✨消息 Message 消息=消息头+消息体,根据routekey发送到指定的交换机 Exchange 消息头:含有各种属性 routing-key(路由键)、priority(优先级)、delivery-mode(指出该消息可能需要持久性存储)等。 ✨消息代理...
相关推荐
cover
2024-11-30
MQ系列(七)| RocketMQ 为什么性能不如Kafka?
RocketMQ 为什么性能不如 Kafka? RocketMQ 使用的是 mmap 零拷贝技术,而 kafka 使用的是 sendfile (硬件设备技术 SG-DMA,不影响(不占用)CPU工作) mmap 内核缓冲区->映射用户缓冲区->内核缓冲区->网卡sendfile 内核缓冲区-> SG-DMA -> 网卡 在上篇文章《rocketmq 是什么》中,我们了解到 RocketMQ 的架构其实参考了 kafka 的设计思想,同时又在 kafka 的基础上做了一些调整。看起来,RocketMQ 好像各方面都比 kafka 更能打。 但 kafka 却一直没被淘汰,说明 RocketMQ 必然是有着不如 kafka 的地方。是啥呢?性能,严格来说是吞吐量。阿里中间件团队对它们做过压测,同样条件下,kafka 比 RocketMQ 快 **50%**左右。但即使这样,RocketMQ 依然能每秒处理 10w 量级的数据,依旧非常能打。你不能说 RocketMQ 弱,只能说 Kafka 性能太强了。 不过这就很奇怪了,为什么...
cover
2024-11-28
MQ系列(六)| RocketMQ 快速入门
RocketMQ 快速入门 本参考链接:RocketMQ 是什么?原作者:小白debug 前言 作为一个程序员,假设你有 A、B 两个服务,A 服务发出消息后,不想让 B 服务立马处理到。而是要过半小时才让 B 服务处理到,该怎么实现? 这类延迟处理消息的场景非常常见,举个例子,比如我每天早上到公司后都会点个外卖,我希望外卖能在中午送过来,而不是立马送过来,这就需要将外卖消息经过延时后,再投递到商家侧。 延时消息场景那么问题就来了,有没有优雅的解决方案?当然有,没有什么是加一层中间层不能解决的,如果有,那就再加一层。这次我们要加的中间层是消息队列 **RocketMQ**。 RocketMQ 是什么?RocketMQ 是阿里自研的国产消息队列,目前已经是 Apache 的顶级项目。和其他消息队列一样,它接受来自生产者的消息,将消息分类,每一类是一个 topic,消费者根据需要订阅 topic,获取里面的消息。 是不是很像我们上篇文章里提到的消息队 Kafka,那么问题很自然就来了,既然都是消息队列,那它们之间有什么区别呢? RocketMQ 和 Kafka...
cover
2024-11-27
MQ系列(五)| Kafka 快速入门
Kafka 快速入门介绍 参考:Kafka 是什么? 架构一个高性能,高扩展性,高可用,支持持久化的超强消息队列,它就是我们常说的消息队列 KafkaZookeeper 协调管理多个 broker 组成,内部有多个 topic 分类,每个 topic 又分成多个 partition ,每个 partition 有多个副本 replia,不同的partition 会分布在不同 broker 上,提升性能同时,还增加了系统可用性和可扩展性 高性能 对消息进行分类,每个类是一个 topic 单个 topic 的消息可能过多,可将单个队列拆分成多个段,每段就是一个分区 partition ,每个消费者负责一个 partition 高扩展性可将 partition 分部在多台设备,每台设备代表一个 broker 高可用存在一个问题,如果其中一个partition所在的 broker 挂了,那么这部分的消息不久丢失了吗? 可以给partition 多加几个副本 replica,从中分为 Leader 和 Follower,Leader 负责生产者和消费者的读写,Follower...
cover
2024-11-24
MQ系列(四)| RabbitMQ 死信队列和延迟队列
死信队列死信是什么死信:无法被消费的消息。由于特定的原因导致队列中的某些消息无法被消费,这些消息没有后续的处理,就会变成死信。当消息在队列中无法被正常消费时,会被发送到死信队列中。 死信来源消息 TTL队列达到最大长度消息拒签(basicNack 或 basicReject)且重入队列为false(requeue=false) 死信架构 消息TTL 名称 交换机 路由键 类型 特征 参数 普通交换机 normal_exchange zhangsan direct / / 普通队列 normal_queue zhangsan / TTL DLX DLK x-dead-letter-exchange:dead_exchangex-dead-letter-routing-key:lisix-message-ttl: 10 * 1000 死信交换机 dead_exchange lisi direct / / 死信队列 dead_queue lisi / / / 生产者发送消息10条消息到队列...
cover
2024-11-14
MQ系列(二)| RabbitMQ 整合 SpringBoot
RabbitMQ 整合 SpringBoot概述 大多应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦能力、流量削峰 消息服务中两个重要概念:消息代理(`message broker`)和目的地(`destination`) 当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。 消息队列主要有两种形式的目的地1. 队列(`queue`):点对点消息通信(`point-to-point`) 2. 主题(`topic`):发布(`publish`)/订阅(`subscribe`)消息通信 RabbitMQ 架构图 概念生产者 Producer生产者是消息的发送方,它将消息发送到 RabbitMQ 的交换器中。 ✨消息 Message 消息=消息头+消息体,根据routekey发送到指定的交换机 Exchange 消息头:含有各种属性 routing-key(路由键)、priority(优先级)、delivery-mode(指出该消息可能需要持久性存储)等。 ✨消息代理...
cover
2024-11-10
MQ系列(一)| RabbitMQ 快速入门
RabbitMQ 快速入门 官网:https://www.rabbitmq.com/ 入门教程:https://www.rabbitmq.com/tutorials 最新版本:4.0.2 版本参考:JDK17、Maven Or Gradle 1、简介RabbitMQ是一个可靠且成熟的消息传递和流代理,易于部署在云环境、本地和本地机器上。它目前被全球数百万人使用。 2、为什么使用公司业务场景核心:解耦、异步、削峰 2.1、解耦A系统发数据给到BCD系统,如果E系统需要接入?C系统不需要了?A系统的负责人就需要来回修改接口对接其他系统。 如果使用MQ,A系统产生一条数据,发送到MQ中,那个系统需要数据自己去MQ消费。如果新的系统需要数据,直接从MQ中消费;某个系统不需要数据的话,取消消费这个MQ即可。这样A系统不需要考虑谁发送数据给谁,不需要考虑是否调用成功、失败超时等问题。 总结:通过一个MQ,Pub/Sub发布订阅消息模型,A系统就和其他系统彻底耦合了。 2.2.1、项目应用...

评论
ValineGitalk
avatar
stormling
文章
42
标签
25
分类
21
Follow Me
公告
欢迎大家来到Stormling博客
目录
  1. 1. RabbitMQ 消息确认机制
  2. 2. 架构
  3. 3. 概念
    1. 3.1. ComfirmCallback【生产者确认回调】
    2. 3.2. ReturnCallback【生产者退回回调】
    3. 3.3. BasicAck【消费者确认】
  4. 4. 生产者确认回调 ConfirmCallback
    1. 4.1. 添加配置
    2. 4.2. 添加 RabbitMQConfig
    3. 4.3. 测试:生产者确认
    4. 4.4. 修改下发送信息
    5. 4.5. 测试2:消息唯一Id
  5. 5. 生产者回退回调 ReturnCallback
    1. 5.1. 添加配置
    2. 5.2. RabbitMQConfig 配置类添加
    3. 5.3. 修改发送消息的路由键
    4. 5.4. 测试:生产者退回回调
  6. 6. 消费者确认:Ack
    1. 6.1. 测试:默认自动 ack
    2. 6.2. 手动ack :添加配置
    3. 6.3. 修改接收消息代码
    4. 6.4. 测试:重新入队requeue=true
    5. 6.5. 测试:丢弃消息 requeue=false
最新文章
面向八股文面试专场
面向八股文面试专场2025-01-22
【每日早报】-2025-01-21 - 星期二
【每日早报】-2025-01-21 - 星期二2025-01-21
规则引擎 Drools 8+ 快速入门
规则引擎 Drools 8+ 快速入门2024-12-11
数据库系列(二) | Mybatis Plus 3.0+快速入门
数据库系列(二) | Mybatis Plus 3.0+快速入门2024-12-09
分布式系列(二) | Redisson分布式锁
分布式系列(二) | Redisson分布式锁2024-12-05
©2019 - 2025 By stormling
码农Stormling程序员,关注公众号【码农Stormling】回复【面试】获取最全面试pdf
搜索
数据加载中