加载中...
avatar
文章
42
标签
25
分类
21
首页
Java
Spring全家桶
  • Spring
  • SpringBoot
  • SpringCloud
JVM
源码
  • Mybatis
  • HashMap
归档
其他
  • 互联网电子书汇总
  • JAVA八股文指南
  • 历史
  • 相册
关于
Logo码农StormlingMQ系列(一)| RabbitMQ 快速入门
搜索
首页
Java
Spring全家桶
  • Spring
  • SpringBoot
  • SpringCloud
JVM
源码
  • Mybatis
  • HashMap
归档
其他
  • 互联网电子书汇总
  • JAVA八股文指南
  • 历史
  • 相册
关于

MQ系列(一)| RabbitMQ 快速入门

发表于2024-11-10|更新于2024-12-22|消息队列架构
|总字数:2k|阅读时长:6分钟|浏览量:

RabbitMQ 快速入门

官网:https://www.rabbitmq.com/

入门教程:https://www.rabbitmq.com/tutorials

最新版本:4.0.2

版本参考:JDK17、Maven Or Gradle

1、简介

RabbitMQ是一个可靠且成熟的消息传递和流代理,易于部署在云环境、本地和本地机器上。它目前被全球数百万人使用。
rabbitmq-logo-with-name

2、为什么使用

公司业务场景核心:解耦、异步、削峰

2.1、解耦

A系统发数据给到BCD系统,如果E系统需要接入?C系统不需要了?A系统的负责人就需要来回修改接口对接其他系统。

未解耦

如果使用MQ,A系统产生一条数据,发送到MQ中,那个系统需要数据自己去MQ消费。如果新的系统需要数据,直接从MQ中消费;某个系统不需要数据的话,取消消费这个MQ即可。这样A系统不需要考虑谁发送数据给谁,不需要考虑是否调用成功、失败超时等问题。

MQ解耦

总结:通过一个MQ,Pub/Sub发布订阅消息模型,A系统就和其他系统彻底耦合了。

2.2.1、项目应用

车站系统通过控制命令下发给各个设备,其中车站的设备通常包含:闸机、半自动售票机、自动售票机、手持机等设备。

如果按照常规的同步方式来对接不同的设备,这将使得系统冗余的代码很多。当车站增减一个设备就可能需要重新对接接口,造成系统耦合性很高,这样的效率不高且不优雅。

所以当系统需要发送命令(生产一个数据),将数据放到MQ中,不需要知道哪个设备接收消息成功或者失败,其中需要消费的设备自己去订阅并且获取相应的消息即可。这样就可以达到系统下发设备控制命令,不同设备响应。

2.2、异步

A系统接收请求,需要本地入库,还需要BCD三个系统入库,本地入库(3ms),BCD(300ms+400ms+500ms),用户体验很差等待时间太长。业内请求需要做到 200ms 以内,对用户几乎无感。

异步-1

使用MQ,A系统连续发送3条消息到消息队列,假如消耗5ms,请求花了 5 + 3 = 8ms ,对于用户来说就是点了一个按钮返回很快。

异步-2

2.3、削峰

每天一段时间,A系统风平浪静,每秒请求数量就50个。结果每次一到 12:00~13:00,每秒并发请求数量突然暴增到5k+条。但是系统是直接基于MySQL,大量请求涌入MySQL,每秒执行约5k条SQL, 一般情况下MySQL 每秒可抗 2k请求,5k的请求可能打死MySQL,导致无法使用。 一旦过了高峰,到了下午就到了低峰期,每秒请求数量 50 左右,对整个系统没有多少压力了。

未削峰

如果使用MQ,每秒 5k 请求写入 MQ , A系统每秒最多处理 2k 个请求,因为 MySQL每秒最多请求 2k 个请求。A系统从MQ中慢慢拉取请求,每秒2k个请求,不超过自己每秒最大的请求数量即可。所以再高峰期,A系统不会挂掉。而MQ每秒进 5k ,出 2k,请求就会在高峰期积压可能多大十几万甚至百万的消息再 MQ中。

MQ削峰

总结:短暂的挤压后是可允许的,等到高峰期过后,每秒进入MQ的消息降低很多,但是系统依然按照 2k 的请求取消费,A系统很快的就会把挤压解决掉了。

2.4、并行

img

2.5、排队

img

3、消息队列工具 RabbitMQ

3.1、常见MQ产品

  • ActiveMQ:基于JMS(Java Message Service)协议,java语言,jdk

  • RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好

  • RocketMQ:基于JMS,阿里巴巴产品,目前交由Apache基金会

  • Kafka:分布式消息系统,高吞吐量

3.2、RabbitMQ基础概念

RabbitMQ组件 Broker:简单来说就是消息队列服务器实体

Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列

Queue:消息队列载体,每个消息都会被投入到一个或多个队列

Binding:绑定,它的作用就是把 exchange和 queue按照路由规则绑定起来

Routing Key:路由关键字, exchange根据这个关键字进行消息投递

vhost:虚拟主机,一个 broker里可以开设多个 vhost,用作不同用户的权限分离

producer:消息生产者,就是投递消息的程序

consumer:消息消费者,就是接受消息的程序

channel:消息通道,在客户端的每个连接里,可建立多个 channel,每个 channel代表一个会话任务

3.3、五种消息模型

RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此不予学习。那么也就剩下5种。

但是其实3、4、5这三种都属于订阅模型,只不过进行路由的方式不同。

img

  • 基本消息模型:生产者–>队列–>消费者
  • work消息模型:生产者–>队列–>多个消费者竞争消费
  • 订阅模型-**Fanout**:广播模式,将消息交给所有绑定到交换机的队列,每个消费者都会收到同一条消息
  • **订阅模型-Direct**:定向,把消息交给符合指定 rotingKey 的队列
  • 订阅模型-Topic 主题模式:通配符,把消息交给符合**routing pattern(**路由模式) 的队列

4、消息不丢失

4.1、MQ角度

  1. 生产者不丢数据

  2. MQ服务器不丢数据

  3. 消费者不丢数据

保证消息不丢失有两种实现方式:

  • 开启事务模式 (不推荐)

  • 消息息确认模式(生产者,消费者)

说明:开启事务会大幅降低消息发送及接收效率,使用的相对较少,因此我们生产环境一般都采取消息确认模式

4.2、消息确认

1、消息持久化

如果希望RabbitMQ重启之后消息不丢失,那么需要对以下3种实体均配置持久化

  • Exchange
  • Queue
  • message

声明exchange时设置持久化(durable = true)并且不自动删除 (autoDelete = false)

boolean durable = true;
boolean autoDelete = false;
channel.exchangeDeclare("dlx", TOPIC, durable, autoDelete, null)

声明queue时设置持久化(durable = true)并且不自动删除 (autoDelete = false)

boolean durable = true;
boolean autoDelete = false;
channel.queueDeclare("order-summary-queue", durable, false, autoDelete, queueArguments);

发送消息时通过设置(deliveryMode=2)持久化消息:

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .contentType("application/json")
                    .deliveryMode(2)
                    .priority(0)
                    .build();
channel.basicPublish("order", "order.created", false, properties, "sample-data".getBytes())
2、发送确认

有时,业务处理成功,消息也发了,但是我们并不知道消息是否成功到达了rabbitmq,如果由于网络等原因导致业务成功而消息发送失败,那么发送方将出现不一致的问题,此时可以使用rabbitmq的发送确认功能,即要求rabbitmq显式告知我们消息是否已成功发送。

3、手动消费确认

有时,消息被正确投递到消费方,但是消费方处理失败,那么便会出现消费方的不一致问题。比如:订单已创建的消息发送到用户积分子系统中用于增加用户积分,但是积分消费方处理却都失败了,用户就会问:我购买了东西为什么积分并没有增加呢?

要解决这个问题,需要引入消费方确认,即只有消息被成功处理之后才告知rabbitmq以ack,否则告知rabbitmq以nack

文章作者: stormling
文章链接: http://www.stormling.top/posts/46172.html
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来源 码农Stormling!
消息队列架构
cover of previous post
上一篇
MQ系列(二)| RabbitMQ 整合 SpringBoot
RabbitMQ 整合 SpringBoot概述 大多应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦能力、流量削峰 消息服务中两个重要概念:消息代理(`message broker`)和目的地(`destination`) 当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。 消息队列主要有两种形式的目的地1. 队列(`queue`):点对点消息通信(`point-to-point`) 2. 主题(`topic`):发布(`publish`)/订阅(`subscribe`)消息通信 RabbitMQ 架构图 概念生产者 Producer生产者是消息的发送方,它将消息发送到 RabbitMQ 的交换器中。 ✨消息 Message 消息=消息头+消息体,根据routekey发送到指定的交换机 Exchange 消息头:含有各种属性 routing-key(路由键)、priority(优先级)、delivery-mode(指出该消息可能需要持久性存储)等。 ✨消息代理...
cover of next post
下一篇
Redis系列(一)| 单机集群搭建3主3从
Redis集群部署-单主机 6节点 3M+3S下载docker redis 镜像docker pull redis:4.0 # 或者直接下载最新版redis镜像 docker pull redis !!! 注意 redis 5.0以上创建集群不在使用ruby,因此如果使用最新版的redis 一下ruby部分内容直接跳过 在 /data下新建 redis-cluster 文件夹,然后创建 redis-cluster.tmpl 文件# bind 127.0.0.1 protected-mode no port ${PORT} daemonize no dir /data/redis appendonly yes cluster-enabled yes cluster-config-file nodes.conf cluster-node-timeout 15000 cluster-announce-port ${PORT} cluster-announce-bus-port...
相关推荐
cover
2024-11-30
MQ系列(七)| RocketMQ 为什么性能不如Kafka?
RocketMQ 为什么性能不如 Kafka? RocketMQ 使用的是 mmap 零拷贝技术,而 kafka 使用的是 sendfile (硬件设备技术 SG-DMA,不影响(不占用)CPU工作) mmap 内核缓冲区->映射用户缓冲区->内核缓冲区->网卡sendfile 内核缓冲区-> SG-DMA -> 网卡 在上篇文章《rocketmq 是什么》中,我们了解到 RocketMQ 的架构其实参考了 kafka 的设计思想,同时又在 kafka 的基础上做了一些调整。看起来,RocketMQ 好像各方面都比 kafka 更能打。 但 kafka 却一直没被淘汰,说明 RocketMQ 必然是有着不如 kafka 的地方。是啥呢?性能,严格来说是吞吐量。阿里中间件团队对它们做过压测,同样条件下,kafka 比 RocketMQ 快 **50%**左右。但即使这样,RocketMQ 依然能每秒处理 10w 量级的数据,依旧非常能打。你不能说 RocketMQ 弱,只能说 Kafka 性能太强了。 不过这就很奇怪了,为什么...
cover
2024-11-28
MQ系列(六)| RocketMQ 快速入门
RocketMQ 快速入门 本参考链接:RocketMQ 是什么?原作者:小白debug 前言 作为一个程序员,假设你有 A、B 两个服务,A 服务发出消息后,不想让 B 服务立马处理到。而是要过半小时才让 B 服务处理到,该怎么实现? 这类延迟处理消息的场景非常常见,举个例子,比如我每天早上到公司后都会点个外卖,我希望外卖能在中午送过来,而不是立马送过来,这就需要将外卖消息经过延时后,再投递到商家侧。 延时消息场景那么问题就来了,有没有优雅的解决方案?当然有,没有什么是加一层中间层不能解决的,如果有,那就再加一层。这次我们要加的中间层是消息队列 **RocketMQ**。 RocketMQ 是什么?RocketMQ 是阿里自研的国产消息队列,目前已经是 Apache 的顶级项目。和其他消息队列一样,它接受来自生产者的消息,将消息分类,每一类是一个 topic,消费者根据需要订阅 topic,获取里面的消息。 是不是很像我们上篇文章里提到的消息队 Kafka,那么问题很自然就来了,既然都是消息队列,那它们之间有什么区别呢? RocketMQ 和 Kafka...
cover
2024-11-27
MQ系列(五)| Kafka 快速入门
Kafka 快速入门介绍 参考:Kafka 是什么? 架构一个高性能,高扩展性,高可用,支持持久化的超强消息队列,它就是我们常说的消息队列 KafkaZookeeper 协调管理多个 broker 组成,内部有多个 topic 分类,每个 topic 又分成多个 partition ,每个 partition 有多个副本 replia,不同的partition 会分布在不同 broker 上,提升性能同时,还增加了系统可用性和可扩展性 高性能 对消息进行分类,每个类是一个 topic 单个 topic 的消息可能过多,可将单个队列拆分成多个段,每段就是一个分区 partition ,每个消费者负责一个 partition 高扩展性可将 partition 分部在多台设备,每台设备代表一个 broker 高可用存在一个问题,如果其中一个partition所在的 broker 挂了,那么这部分的消息不久丢失了吗? 可以给partition 多加几个副本 replica,从中分为 Leader 和 Follower,Leader 负责生产者和消费者的读写,Follower...
cover
2024-11-24
MQ系列(四)| RabbitMQ 死信队列和延迟队列
死信队列死信是什么死信:无法被消费的消息。由于特定的原因导致队列中的某些消息无法被消费,这些消息没有后续的处理,就会变成死信。当消息在队列中无法被正常消费时,会被发送到死信队列中。 死信来源消息 TTL队列达到最大长度消息拒签(basicNack 或 basicReject)且重入队列为false(requeue=false) 死信架构 消息TTL 名称 交换机 路由键 类型 特征 参数 普通交换机 normal_exchange zhangsan direct / / 普通队列 normal_queue zhangsan / TTL DLX DLK x-dead-letter-exchange:dead_exchangex-dead-letter-routing-key:lisix-message-ttl: 10 * 1000 死信交换机 dead_exchange lisi direct / / 死信队列 dead_queue lisi / / / 生产者发送消息10条消息到队列...
cover
2024-11-20
MQ系列(三)| RabbitMQ 消息确认机制
RabbitMQ 消息确认机制 :heavy_exclamation_mark::heavy_exclamation_mark::heavy_exclamation_mark:温馨提示:基于JDK17、SpringBoot 2.1.8.RELEASE 版本,由于RabbitMQ 在 SpringBoot3+ 的配置项有所不同, 所以请严格按照该本版来使用,挖一坑:【后续会出一个SpringBoot3+版本的配置相关教程】 架构 概念保证消息不丢失,可靠抵达,可以使用事务消息,性能下降250倍 为此引入确认机制 生产者确认回调:publisher confirmCallback 生产者退回回调:publisher returnCallback未投递到queue退回模式 消费者确认:consumer ack确认机制 ComfirmCallback【生产者确认回调】 概念:ComfirmCallback是生产者消息确认机制的一部分。当生产者发送消息到 RabbitMQ 的交换器(Exchange)后,RabbitMQ 会返回一个确认消息给生产者,这个确认过程可以通过...
cover
2024-11-14
MQ系列(二)| RabbitMQ 整合 SpringBoot
RabbitMQ 整合 SpringBoot概述 大多应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦能力、流量削峰 消息服务中两个重要概念:消息代理(`message broker`)和目的地(`destination`) 当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。 消息队列主要有两种形式的目的地1. 队列(`queue`):点对点消息通信(`point-to-point`) 2. 主题(`topic`):发布(`publish`)/订阅(`subscribe`)消息通信 RabbitMQ 架构图 概念生产者 Producer生产者是消息的发送方,它将消息发送到 RabbitMQ 的交换器中。 ✨消息 Message 消息=消息头+消息体,根据routekey发送到指定的交换机 Exchange 消息头:含有各种属性 routing-key(路由键)、priority(优先级)、delivery-mode(指出该消息可能需要持久性存储)等。 ✨消息代理...

评论
ValineGitalk
avatar
stormling
文章
42
标签
25
分类
21
Follow Me
公告
欢迎大家来到Stormling博客
目录
  1. 1. RabbitMQ 快速入门
    1. 1.1. 1、简介
    2. 1.2. 2、为什么使用
      1. 1.2.1. 2.1、解耦
        1. 1.2.1.1. 2.2.1、项目应用
      2. 1.2.2. 2.2、异步
      3. 1.2.3. 2.3、削峰
      4. 1.2.4. 2.4、并行
      5. 1.2.5. 2.5、排队
    3. 1.3. 3、消息队列工具 RabbitMQ
      1. 1.3.1. 3.1、常见MQ产品
      2. 1.3.2. 3.2、RabbitMQ基础概念
      3. 1.3.3. 3.3、五种消息模型
    4. 1.4. 4、消息不丢失
      1. 1.4.1. 4.1、MQ角度
      2. 1.4.2. 4.2、消息确认
        1. 1.4.2.0.1. 1、消息持久化
        2. 1.4.2.0.2. 2、发送确认
        3. 1.4.2.0.3. 3、手动消费确认
最新文章
面向八股文面试专场
面向八股文面试专场2025-01-22
【每日早报】-2025-01-21 - 星期二
【每日早报】-2025-01-21 - 星期二2025-01-21
规则引擎 Drools 8+ 快速入门
规则引擎 Drools 8+ 快速入门2024-12-11
数据库系列(二) | Mybatis Plus 3.0+快速入门
数据库系列(二) | Mybatis Plus 3.0+快速入门2024-12-09
分布式系列(二) | Redisson分布式锁
分布式系列(二) | Redisson分布式锁2024-12-05
©2019 - 2025 By stormling
码农Stormling程序员,关注公众号【码农Stormling】回复【面试】获取最全面试pdf
搜索
数据加载中