加载中...
avatar
文章
42
标签
25
分类
21
首页
Java
Spring全家桶
  • Spring
  • SpringBoot
  • SpringCloud
JVM
源码
  • Mybatis
  • HashMap
归档
其他
  • 互联网电子书汇总
  • JAVA八股文指南
  • 历史
  • 相册
关于
Logo码农StormlingMQ系列(二)| RabbitMQ 整合 SpringBoot
搜索
首页
Java
Spring全家桶
  • Spring
  • SpringBoot
  • SpringCloud
JVM
源码
  • Mybatis
  • HashMap
归档
其他
  • 互联网电子书汇总
  • JAVA八股文指南
  • 历史
  • 相册
关于

MQ系列(二)| RabbitMQ 整合 SpringBoot

发表于2024-11-14|更新于2024-12-22|消息队列架构
|总字数:2.1k|阅读时长:9分钟|浏览量:

RabbitMQ 整合 SpringBoot

概述

  1. 大多应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦能力、流量削峰
    
  2. 消息服务中两个重要概念:
    消息代理(`message broker`)和目的地(`destination`)
    当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。
    
  3. 消息队列主要有两种形式的目的地
    1.     队列(`queue`):点对点消息通信(`point-to-point`)
    2.     主题(`topic`):发布(`publish`)/订阅(`subscribe`)消息通信
    

RabbitMQ 架构图

rabbitmq架构

概念

生产者 Producer

生产者是消息的发送方,它将消息发送到 RabbitMQ 的交换器中。

✨消息 Message

  • 消息=消息头+消息体,根据routekey发送到指定的交换机 Exchange
  • 消息头:含有各种属性 routing-key(路由键)、priority(优先级)、delivery-mode(指出该消息可能需要持久性存储)等。

✨消息代理 Broker

  • 消息传递的中间件服务器,负责接收、存储和转发消息,作用类似邮局🏣
  • 消息存储+消息路由
  • Broker = VHost1+Vhost2+Vhost3+…..
  • Virtual Host = Exchange + Queue +Binding

虚拟主机 Virtual Host

  • 逻辑分组机制,将不同的用户、队列、交换器等资源隔离开来

  • Virtual 即 VHost

  • 默认目录 /

✨交换机 Exchange

  • 绑定 routekey接收消息并发送到符合routekey 的 队列
  • 常用三种类型
    • ✨dirct:Direct Exchange(直连交换器) 【单播】完全匹配路由键的队列
      • image-20241207003055398
    • ✨fanout:Fanout Exchange(扇出交换器)【广播】消息分发所有绑定队列上,不处理路由键
      • image-20241207003049286
    • ✨topic:Topic Exchange(主题交换器)【模式匹配】
      • #:配置0个或者多个单词
      • *:匹配一个单词
      • image-20241207003038106 
    • headers:很少使用
    • system:很少使用

✨队列 Queue

  • 存储消息的容器,FIFO
  • 缓冲消息+持久化

绑定 Binding

  • 用于消息队列和交换器之间的关联。
  • 一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
  • Exchange 和Queue的绑定可以是多对多的关系。

连接 Connection

  • 网络连接,比如一个TCP连接

信道 Channel

  • 信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

消费者 Consumer

消费者是消息的接收方,它从 RabbitMQ 的队列中获取消息并进行处理。

Docker 安装 RMQ

docker run -d --restart=always --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:43699-p25672:25672-p 15671:15671 -p 15672:15672 rabbitmq:management

后台页面收发消息

打开 localhost:15672

登录页面

登录的用户密码:guest/guest

登录页面

登录后台首页

首页

交换机 Exchange页面

交换机页面

五种交换机类型:direct、fanout、headers、topic、x-local-random

新增交换机

队列页面

队列页面

绑定:交换机根据路由键绑定到对应的队列

Virtual Host【Exchange –> binding(route-key) 】–> Queue(route-key)

默认的虚拟主机的路径是 “/“,即根目录

交换机和队列绑定

交换机绑定队列

队列和交换机绑定关系

队列和交换机绑定关系

SpringBoot 整合

配置pom 文件

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置application.yaml

server:
  port: 8081
spring:
  application:
    name: rabbitmq-demo
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtualHost: /
#    publisher-confirm-type: CORRELATED
#    publisher-returns: true
#    listener:
#      simple:
#        acknowledge-mode: manual #默认情况下消息消费者是自动确认消息的,如果要手动确认消息则需要修改确认模式为manual
#        prefetch: 1 # 消费者每次从队列获取的消息数量。此属性当不设置时为:轮询分发,设置为1为:公平分发

测试类中-创建交换机

@Slf4j
@SpringBootTest
class RabbitmqDemoApplicationTests {

    @Autowired
    AmqpAdmin amqpAdmin;

    @Test
    public void createExchange() {
        DirectExchange directExchange = new DirectExchange("hello-java-exchange", true, false);
        amqpAdmin.declareExchange(directExchange);
        log.info("Exchange[hello-java-exchange] 创建完成");
    }
}

成功创建image-20241206170654862

创建队列

@Test
public void createQueue() {
    Queue queue = new Queue("hello-java-queue", true, false, false);
    amqpAdmin.declareQueue(queue);
    log.info("Queue[hello-java-queue] 创建完成");
}

执行后成功创建

image-20241206170811552

创建绑定

@Test
public void Binding() {
    Binding binding = new Binding("hello-java-queue",
                                  Binding.DestinationType.QUEUE,
                                  "hello-java-exchange",
                                  "hello-java", null);
    amqpAdmin.declareBinding(binding);
    log.info("Binding[hello-java-binding] 创建完成");
}

直连交换机【hello-java-exchange】和队列【hello-java-queue】用 routingkey 【hello-java】绑定

队列绑定

image-20241206170917982

交换机绑定

image-20241206171001858

发送消息【JSON消息转换器】

配置 RabbitConfig 序列化 json

根据源码 RabbitAutoConfiguration 创建@Bean RabbitTemplate 中的消息转换器属性 MessageConverter messageConverter = new SimpleMessageConverter();

说明了RabbitMQ 自动配置过程中,创建工具类【RabbitTemplate】,其中默认的消息转换器是 【SimpleMessageConverter】,我们来看下【SimpleMessageConverter】源码是如何收发消息的

SimpleMessageConverter创建消息

// 创建消息,默认使用序列化 Serializable类型发送,发送的消息实体需要实现序列化
protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
    if (object instanceof byte[] bytes) {
        messageProperties.setContentType("application/octet-stream");
    } else if (object instanceof String) {
        try {
            bytes = ((String)object).getBytes("UTF-8");
        } catch (UnsupportedEncodingException e) {
            throw new MessageConversionException("failed to convert to Message content", e);
        }
        messageProperties.setContentType("text/plain");
        messageProperties.setContentEncoding("UTF-8");
    } else if (object instanceof Serializable) {
        try {
            bytes = SerializationUtils.serialize(object);
        } catch (IllegalArgumentException e) {
            throw new MessageConversionException("failed to convert to serialized Message content", e);
        }
        messageProperties.setContentType("application/x-java-serialized-object");
    }
    if (bytes != null) {
        messageProperties.setContentLength((long)bytes.length);
        return new Message(bytes, messageProperties);
    } else {
        String var10002 = this.getClass().getSimpleName();
        throw new IllegalArgumentException(var10002 + " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());
    }
}

SimpleMessageConverter消费消息

public Object fromMessage(Message message) throws MessageConversionException {
        Object content = null;
        MessageProperties properties = message.getMessageProperties();
        if (properties != null) {
            String contentType = properties.getContentType();
            if (contentType != null && contentType.startsWith("text")) {
                String encoding = properties.getContentEncoding();
                if (encoding == null) {
                    encoding = this.defaultCharset;
                }

                try {
                    content = new String(message.getBody(), encoding);
                } catch (UnsupportedEncodingException e) {
                    throw new MessageConversionException("failed to convert text-based Message content", e);
                }
            } else if (contentType != null && contentType.equals("application/x-java-serialized-object")) {
                try {
                    content = SerializationUtils.deserialize(this.createObjectInputStream(new ByteArrayInputStream(message.getBody())));
                } catch (IllegalArgumentException | IllegalStateException | IOException e) {
                    throw new MessageConversionException("failed to convert serialized Message content", e);
                }
            }
        }

        if (content == null) {
            content = message.getBody();
        }

        return content;
    }

自定义消息类型转器 MessageConverter

MessageConverter 的层次结构

image-20241207013346381

自定义消息类型转换器

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

}

创建数据:订单退出原因实体对象 注意需要序列化 Serializable

@ToString
@Data
@Accessors(chain = true)
//@TableName("oms_order_return_reason")
public class OrderReturnReasonEntity implements Serializable {
	private static final long serialVersionUID = 1L;

	private Long id;
	/**
	 * 退货原因名
	 */
	private String name;
	/**
	 * 排序
	 */
	private Integer sort;
	/**
	 * 启用状态
	 */
	private Integer status;
	/**
	 * create_time
	 */
	private Date createTime;

}

测试类中发送消息

@Autowired
RabbitTemplate rabbitTemplate;

@Test
public void sendMessage() {
    OrderReturnReasonEntity data = new OrderReturnReasonEntity();
    data.setId(1L)
            .setCreateTime(new Date())
            .setName("测试");
    rabbitTemplate.convertAndSend("hello-java-exchange", "hello-java", data);
    log.info("发送消息: {}", data);
}

队列收到消息

image-20241206192848893

收到消息对象

image-20241206193039689

{"id":1,"name":"测试","sort":null,"status":null,"createTime":1733484472414}

接收信息

在启动类上添加 @EnableRabbit 开启 RabbitMQ

@EnableRabbit
@SpringBootApplication
public class RabbitmqDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(RabbitmqDemoApplication.class, args);
    }
}

在需要接收消息的地方添加方法 @RabbitListerner 、@RabbitHandler

@RabbitListerner :用在类和方法上并绑定对应的队列

@RabbitHandler:用在方法上,可以接收不同的类型的数据

@RabbitListener(queues = {"hello-java-queue"})
@Component
@Slf4j
public class OrderMQHandler {
    @RabbitHandler
    public void receiveOrderReturnReason(Message message, OrderReturnReasonEntity content, Channel channel) {
        //消息体
        byte[] body = message.getBody();
        //消息头配置
        MessageProperties messageProperties = message.getMessageProperties();
        log.info("消息体内容:{}", content);
    }

    @RabbitHandler
    public void receiverOrder(OrderEntity content) {
        log.info("接收消息=>Order:{}", content);
    }
}

成功收到OrderReturnReasonEntity对象数据

2024-12-06T22:46:37.495+08:00  INFO 15808 --- [ntContainer#0-3] c.s.rabbitmqdemo.handler.OrderMQHandler  : 消息体内容:OrderReturnReasonEntity(id=1, name=测试-0, sort=null, status=null, createTime=Fri Dec 06 22:46:37 CST 2024)

成功收到OrderEntity对象数据

2024-12-06T22:46:37.522+08:00  INFO 15808 --- [ntContainer#0-3] c.s.rabbitmqdemo.handler.OrderMQHandler  : 接收消息=>Order:OrderEntity(id=1, memberId=null, orderSn=null, couponId=null, createTime=Fri Dec 06 22:46:37 CST 2024, memberUsername=测试-1, totalAmount=null, payAmount=null, freightAmount=null, promotionAmount=null, integrationAmount=null, couponAmount=null, discountAmount=null, payType=null, sourceType=null, status=null, deliveryCompany=null, deliverySn=null, autoConfirmDay=null, integration=null, growth=null, billType=null, billHeader=null, billContent=null, billReceiverPhone=null, billReceiverEmail=null, receiverName=null, receiverPhone=null, receiverPostCode=null, receiverProvince=null, receiverCity=null, receiverRegion=null, receiverDetailAddress=null, note=null, confirmStatus=null, deleteStatus=null, useIntegration=null, paymentTime=null, deliveryTime=null, receiveTime=null, commentTime=null, modifyTime=null)
文章作者: stormling
文章链接: http://www.stormling.top/posts/41545.html
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来源 码农Stormling!
消息队列架构
cover of previous post
上一篇
MQ系列(三)| RabbitMQ 消息确认机制
RabbitMQ 消息确认机制 :heavy_exclamation_mark::heavy_exclamation_mark::heavy_exclamation_mark:温馨提示:基于JDK17、SpringBoot 2.1.8.RELEASE 版本,由于RabbitMQ 在 SpringBoot3+ 的配置项有所不同, 所以请严格按照该本版来使用,挖一坑:【后续会出一个SpringBoot3+版本的配置相关教程】 架构 概念保证消息不丢失,可靠抵达,可以使用事务消息,性能下降250倍 为此引入确认机制 生产者确认回调:publisher confirmCallback 生产者退回回调:publisher returnCallback未投递到queue退回模式 消费者确认:consumer ack确认机制 ComfirmCallback【生产者确认回调】 概念:ComfirmCallback是生产者消息确认机制的一部分。当生产者发送消息到 RabbitMQ 的交换器(Exchange)后,RabbitMQ 会返回一个确认消息给生产者,这个确认过程可以通过...
cover of next post
下一篇
MQ系列(一)| RabbitMQ 快速入门
RabbitMQ 快速入门 官网:https://www.rabbitmq.com/ 入门教程:https://www.rabbitmq.com/tutorials 最新版本:4.0.2 版本参考:JDK17、Maven Or Gradle 1、简介RabbitMQ是一个可靠且成熟的消息传递和流代理,易于部署在云环境、本地和本地机器上。它目前被全球数百万人使用。 2、为什么使用公司业务场景核心:解耦、异步、削峰 2.1、解耦A系统发数据给到BCD系统,如果E系统需要接入?C系统不需要了?A系统的负责人就需要来回修改接口对接其他系统。 如果使用MQ,A系统产生一条数据,发送到MQ中,那个系统需要数据自己去MQ消费。如果新的系统需要数据,直接从MQ中消费;某个系统不需要数据的话,取消消费这个MQ即可。这样A系统不需要考虑谁发送数据给谁,不需要考虑是否调用成功、失败超时等问题。 总结:通过一个MQ,Pub/Sub发布订阅消息模型,A系统就和其他系统彻底耦合了。 2.2.1、项目应用...
相关推荐
cover
2024-11-30
MQ系列(七)| RocketMQ 为什么性能不如Kafka?
RocketMQ 为什么性能不如 Kafka? RocketMQ 使用的是 mmap 零拷贝技术,而 kafka 使用的是 sendfile (硬件设备技术 SG-DMA,不影响(不占用)CPU工作) mmap 内核缓冲区->映射用户缓冲区->内核缓冲区->网卡sendfile 内核缓冲区-> SG-DMA -> 网卡 在上篇文章《rocketmq 是什么》中,我们了解到 RocketMQ 的架构其实参考了 kafka 的设计思想,同时又在 kafka 的基础上做了一些调整。看起来,RocketMQ 好像各方面都比 kafka 更能打。 但 kafka 却一直没被淘汰,说明 RocketMQ 必然是有着不如 kafka 的地方。是啥呢?性能,严格来说是吞吐量。阿里中间件团队对它们做过压测,同样条件下,kafka 比 RocketMQ 快 **50%**左右。但即使这样,RocketMQ 依然能每秒处理 10w 量级的数据,依旧非常能打。你不能说 RocketMQ 弱,只能说 Kafka 性能太强了。 不过这就很奇怪了,为什么...
cover
2024-11-28
MQ系列(六)| RocketMQ 快速入门
RocketMQ 快速入门 本参考链接:RocketMQ 是什么?原作者:小白debug 前言 作为一个程序员,假设你有 A、B 两个服务,A 服务发出消息后,不想让 B 服务立马处理到。而是要过半小时才让 B 服务处理到,该怎么实现? 这类延迟处理消息的场景非常常见,举个例子,比如我每天早上到公司后都会点个外卖,我希望外卖能在中午送过来,而不是立马送过来,这就需要将外卖消息经过延时后,再投递到商家侧。 延时消息场景那么问题就来了,有没有优雅的解决方案?当然有,没有什么是加一层中间层不能解决的,如果有,那就再加一层。这次我们要加的中间层是消息队列 **RocketMQ**。 RocketMQ 是什么?RocketMQ 是阿里自研的国产消息队列,目前已经是 Apache 的顶级项目。和其他消息队列一样,它接受来自生产者的消息,将消息分类,每一类是一个 topic,消费者根据需要订阅 topic,获取里面的消息。 是不是很像我们上篇文章里提到的消息队 Kafka,那么问题很自然就来了,既然都是消息队列,那它们之间有什么区别呢? RocketMQ 和 Kafka...
cover
2024-11-27
MQ系列(五)| Kafka 快速入门
Kafka 快速入门介绍 参考:Kafka 是什么? 架构一个高性能,高扩展性,高可用,支持持久化的超强消息队列,它就是我们常说的消息队列 KafkaZookeeper 协调管理多个 broker 组成,内部有多个 topic 分类,每个 topic 又分成多个 partition ,每个 partition 有多个副本 replia,不同的partition 会分布在不同 broker 上,提升性能同时,还增加了系统可用性和可扩展性 高性能 对消息进行分类,每个类是一个 topic 单个 topic 的消息可能过多,可将单个队列拆分成多个段,每段就是一个分区 partition ,每个消费者负责一个 partition 高扩展性可将 partition 分部在多台设备,每台设备代表一个 broker 高可用存在一个问题,如果其中一个partition所在的 broker 挂了,那么这部分的消息不久丢失了吗? 可以给partition 多加几个副本 replica,从中分为 Leader 和 Follower,Leader 负责生产者和消费者的读写,Follower...
cover
2024-11-24
MQ系列(四)| RabbitMQ 死信队列和延迟队列
死信队列死信是什么死信:无法被消费的消息。由于特定的原因导致队列中的某些消息无法被消费,这些消息没有后续的处理,就会变成死信。当消息在队列中无法被正常消费时,会被发送到死信队列中。 死信来源消息 TTL队列达到最大长度消息拒签(basicNack 或 basicReject)且重入队列为false(requeue=false) 死信架构 消息TTL 名称 交换机 路由键 类型 特征 参数 普通交换机 normal_exchange zhangsan direct / / 普通队列 normal_queue zhangsan / TTL DLX DLK x-dead-letter-exchange:dead_exchangex-dead-letter-routing-key:lisix-message-ttl: 10 * 1000 死信交换机 dead_exchange lisi direct / / 死信队列 dead_queue lisi / / / 生产者发送消息10条消息到队列...
cover
2024-11-20
MQ系列(三)| RabbitMQ 消息确认机制
RabbitMQ 消息确认机制 :heavy_exclamation_mark::heavy_exclamation_mark::heavy_exclamation_mark:温馨提示:基于JDK17、SpringBoot 2.1.8.RELEASE 版本,由于RabbitMQ 在 SpringBoot3+ 的配置项有所不同, 所以请严格按照该本版来使用,挖一坑:【后续会出一个SpringBoot3+版本的配置相关教程】 架构 概念保证消息不丢失,可靠抵达,可以使用事务消息,性能下降250倍 为此引入确认机制 生产者确认回调:publisher confirmCallback 生产者退回回调:publisher returnCallback未投递到queue退回模式 消费者确认:consumer ack确认机制 ComfirmCallback【生产者确认回调】 概念:ComfirmCallback是生产者消息确认机制的一部分。当生产者发送消息到 RabbitMQ 的交换器(Exchange)后,RabbitMQ 会返回一个确认消息给生产者,这个确认过程可以通过...
cover
2024-11-10
MQ系列(一)| RabbitMQ 快速入门
RabbitMQ 快速入门 官网:https://www.rabbitmq.com/ 入门教程:https://www.rabbitmq.com/tutorials 最新版本:4.0.2 版本参考:JDK17、Maven Or Gradle 1、简介RabbitMQ是一个可靠且成熟的消息传递和流代理,易于部署在云环境、本地和本地机器上。它目前被全球数百万人使用。 2、为什么使用公司业务场景核心:解耦、异步、削峰 2.1、解耦A系统发数据给到BCD系统,如果E系统需要接入?C系统不需要了?A系统的负责人就需要来回修改接口对接其他系统。 如果使用MQ,A系统产生一条数据,发送到MQ中,那个系统需要数据自己去MQ消费。如果新的系统需要数据,直接从MQ中消费;某个系统不需要数据的话,取消消费这个MQ即可。这样A系统不需要考虑谁发送数据给谁,不需要考虑是否调用成功、失败超时等问题。 总结:通过一个MQ,Pub/Sub发布订阅消息模型,A系统就和其他系统彻底耦合了。 2.2.1、项目应用...

评论
ValineGitalk
avatar
stormling
文章
42
标签
25
分类
21
Follow Me
公告
欢迎大家来到Stormling博客
目录
  1. 1. RabbitMQ 整合 SpringBoot
    1. 1.1. 概述
    2. 1.2. RabbitMQ 架构图
    3. 1.3. 概念
      1. 1.3.0.1. 生产者 Producer
      2. 1.3.0.2. ✨消息 Message
      3. 1.3.0.3. ✨消息代理 Broker
      4. 1.3.0.4. 虚拟主机 Virtual Host
      5. 1.3.0.5. ✨交换机 Exchange
      6. 1.3.0.6. ✨队列 Queue
      7. 1.3.0.7. 绑定 Binding
      8. 1.3.0.8. 连接 Connection
      9. 1.3.0.9. 信道 Channel
      10. 1.3.0.10. 消费者 Consumer
  2. 1.4. Docker 安装 RMQ
  3. 1.5. 后台页面收发消息
    1. 1.5.1. 登录页面
    2. 1.5.2. 登录后台首页
    3. 1.5.3. 交换机 Exchange页面
    4. 1.5.4. 队列页面
    5. 1.5.5. 绑定:交换机根据路由键绑定到对应的队列
  4. 1.6. SpringBoot 整合
    1. 1.6.1. 配置pom 文件
    2. 1.6.2. 配置application.yaml
    3. 1.6.3. 测试类中-创建交换机
    4. 1.6.4. 创建队列
    5. 1.6.5. 创建绑定
    6. 1.6.6. 发送消息【JSON消息转换器】
    7. 1.6.7. 接收信息
最新文章
面向八股文面试专场
面向八股文面试专场2025-01-22
【每日早报】-2025-01-21 - 星期二
【每日早报】-2025-01-21 - 星期二2025-01-21
规则引擎 Drools 8+ 快速入门
规则引擎 Drools 8+ 快速入门2024-12-11
数据库系列(二) | Mybatis Plus 3.0+快速入门
数据库系列(二) | Mybatis Plus 3.0+快速入门2024-12-09
分布式系列(二) | Redisson分布式锁
分布式系列(二) | Redisson分布式锁2024-12-05
©2019 - 2025 By stormling
码农Stormling程序员,关注公众号【码农Stormling】回复【面试】获取最全面试pdf
搜索
数据加载中