加载中...
avatar
文章
42
标签
25
分类
21
首页
Java
Spring全家桶
  • Spring
  • SpringBoot
  • SpringCloud
JVM
源码
  • Mybatis
  • HashMap
归档
其他
  • 互联网电子书汇总
  • JAVA八股文指南
  • 历史
  • 相册
关于
Logo码农Stormling面试系列(八)| 消息队列
搜索
首页
Java
Spring全家桶
  • Spring
  • SpringBoot
  • SpringCloud
JVM
源码
  • Mybatis
  • HashMap
归档
其他
  • 互联网电子书汇总
  • JAVA八股文指南
  • 历史
  • 相册
关于

面试系列(八)| 消息队列

发表于2021-10-21|更新于2025-01-07|面试
|总字数:6.6k|阅读时长:21分钟|浏览量:

消息队列

消息队列解决的核心场景:解构、异步、削峰

1.为什么使用MQ

​ 公司业务场景,核心:解耦、异步、削峰

解耦:

​ A系统发数据给到BCD系统,如果E系统需要接入?C系统不需要了?A系统的负责人就需要来回修改接口对接其他系统。

解耦-1

​ 如果使用MQ,A系统产生一条数据,发送到MQ中,那个系统需要数据自己去MQ消费。如果新的系统需要数据,直接从MQ中消费;某个系统不需要数据的话,取消消费这个MQ即可。这样A系统不需要考虑谁发送数据给谁,不需要考虑是否调用成功、失败超时等问题。

解耦-2

总结:通过一个MQ,Pub/Sub发布订阅消息模型,A系统就和其他系统彻底耦合了。

项目应用:

​ 车站系统通过控制命令下发给各个设备,其中车站的设备通常包含:闸机、半自动售票机、自动售票机、手持机等设备。如果按照常规的同步方式来对接不同的设备,这将使得系统冗余的代码很多,当车站增减一个设备就可能需要重新对接接口,造成系统耦合性很高,这样的效率不高且不优雅。所以当系统需要发送命令(生产一个数据),将数据放到MQ中,不需要知道那个设备收到成功或者失败,其中需要消费的设备自己去订阅并且获取相应的消息即可。这样就可以达到,系统下发设备控制命令,不同设备响应。

异步:

​ A系统接收请求,需要本地入库,还需要BCD三个系统入库,本地入库(3ms),BCD(300ms+400ms+500ms),用户体验很差等待时间太长。业内请求需要做到 200ms 以内,对用户几乎无感。

异步-1

​ 使用MQ,A系统连续发送3条消息到消息队列,假如消耗5ms,请求花了 5 + 3 = 8ms ,对于用户来说就是点了一个按钮返回很快。

异步-2

削峰:

​ 每天一段时间,A系统风平浪静,每秒请求数量就50个。结果每次一到 12:00~13:00,每秒并发请求数量突然暴增到5k+条。但是系统是直接基于MySQL,大量请求涌入MySQL,每秒执行约5k条SQL,

​ 一般情况下MySQL 每秒可抗 2k请求,5k的请求可能打死MySQL,导致无法使用。

​ 一旦过了高峰,到了下午就到了低峰期,每秒请求数量 50 左右,对整个系统没有多少压力了。

削峰-1

如果使用MQ,每秒 5k 请求写入 MQ , A系统每秒最多处理 2k 个请求,因为 MySQL每秒最多请求 2k 个请求。A系统从MQ中慢慢拉取请求,每秒2k个请求,不超过自己每秒最大的请求数量即可。所以再高峰期,A系统不会挂掉。而MQ每秒进 5k ,出 2k,请求就会在高峰期积压可能多大十几万甚至百万的消息再 MQ中。

削峰-2

​ 短暂的挤压后是可允许的,等到高峰期过后,每秒进入MQ的消息降低很多,但是系统依然按照 2k 的请求取消费,A系统很快的就会把挤压解决掉了。

2.MQ的优缺点

  • 降低系统可用性,引出如何保证消息高可用
  • 系统复杂度提高,引出如何保证消息没有重复消费、处理消息丢失、保证消息传递顺序性
  • 一致性问题:A系统处理完返回成功,BC处理完写库成功,D写库失败了,数据不能一致了。

3.不同MQ的区别(kafka、RabbitMQ、RocketMQ)

特性 ActiveMQ RabbitMQ RocketMQ Kafka
单机吞吐量 万级,比 RocketMQ、Kafka 低一个数量级 万级别 10 万级,支撑高吞吐 10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景
topic 数量对吞吐量的影响 topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topic topic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源
时效性 ms 级 微秒级,这是 RabbitMQ 的一大特点,延迟最低 ms 级 延迟在 ms 级以内
可用性 高,基于主从架构实现高可用 基于主从架构实现高可用 非常高,分布式架构 非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性 有较低的概率丢失数据 基本不丢 经过参数优化配置,可以做到 0 丢失 同 RocketMQ
功能支持 MQ 领域的功能极其完备 基于 erlang 开发,并发能力很强,性能极好,延时很低 MQ 功能较为完善,还是分布式的,扩展性好 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用
  • 单机吞吐量:RocketMQ 10万级
  • 时效性:RabbitMQ 微秒级
  • 可用性:RocketMQ 非常高,分布式架构
  • 消息可靠性:RocketMQ 参数优化,可以做到0丢失

4.保证MQ的高可用

RabbitMQ高可性

​ RabbitMQ是典型的基于主从(非分布式)做高可用。

​ 三种模式:单机模式、普通集群模式、镜像集群模式

单机模式:

​ Demo级别,没有生产使用这个

普通集群模式:<无高可用>

​ 多台机器启动多个RabbitMQ实例,每台机器启动一个。创建的queue,只会放在一个RabbitMQ实例上, 但每个实例同步 queue 元数据(queue的配置,可通过这个找到queue实例)。实际消费的时候连接到另一个实例,这个实例从queue所在的实例拉取数据。

普通集群

​ 没有做到分布式,只是普通的集群。导致消费者每次随机获取一个实例然后拉取数据,要么是固定连接 queue 的那个实例,前者有数据拉取的开销,后者有单实例性能开销。

​ 这样只是提高了吞吐量,集群多个节点服务某个 queue 的读写操作。

镜像集群模式:<高可用>

​ RabbitMQ 的高可用模式。和普通集群模型不一样,该模式下,创建的 queue,无论是元数据还是 queue 里的消息都会存在多个实例中,每个节点都有 queue 的完整镜像。每次写消息到 queue 时,都会自动消息同步到多个实例上 queue 。

RabbitMQ-高可用-镜像集群

如何开启镜像集群模式?

​ 在后台新增一个策略,这个策略就是镜像集群模式的策略。可以指定数据同步到所有节点,也可同步到指定数量的节点,再次创建queue 的时候,应用这个策略,就会自动同步到其他节点上去了。好处在于,你任何一个机器宕机了,没事,其他节点有完整的数据,别的消费者都可以到其他的节点消费。坏处是,第一,同步性能开销大,网络带宽压力和消耗很重。第二,不是分布式,没有扩展性,某个queue 负载很重,加机器,新增的机器包含这个 queue 的所有数据,并没有办法线性扩展。如果 queue 的数量很大,大到这个机器上的容量无法容纳该怎么办?

Kafka 的高可用性

​ Kafka 基本架构认识:由多个broker组成,每个broker是一个节点;你创建一个topic ,这个topic可以划分多个partition,每个 partition 可以存在不同的 broker 上,每个partition 就放一部分的数据。

​ 天然的分布式消息队列,一个topic 的数据,分散到多个机器上,每个机器放一部分。

​ RabbitMQ 之类的并不是什么分布式消息队列,传统的消息队列,只不过提供了集群、HA的机制,无论如何操作,RabbitMQ 一个 queue 的数据都是放在一个节点里的,镜像集群模式下,也是每个节点都放 queue 的完整数据。

​ Kafka 0.8 之前没有 HA 机制,任何的 broker 挂了,那个 broker 上的 partition 就废了,就没有办法读写,无法保证高可用。

​ 我们假如创建一个 topic ,指定了其 partition 数量是 3 个 ,分别在三台机器上。但是,如果第二台宕机了,会导致1/3的数据丢失,做不到高可用了。

Kafka-高可用

Kafka 0.8 后提供了 HA 机制,就是 replica 副本机制。每个 partition 的数据都会同步到其他机器上,形成多个 replica 副本。所有 replica 中选举一个 leader 出来,那么生产者和消费者都跟这个 leader 打交道,其他的 replica 就是 follower (从属)。
写的时候,leader 负责将数据同步到其他的 follower 上,读的时候,直接读取 leader 上的数据。

为什么只能读写leader?

​ 如果可以随意读取的每个 follower ,那么就要关系数据的一致性问题了,系统复杂度势必增加。Kafka 会将一个 partition 的所有 replica 分布在不同的机器中,可以提高容错性。

broker

​ 这就是所谓的高可用,因为如果某个 broker 宕机了,没事,那么 broker 上面的 partition 在其他机器上都有副本的。如果这个宕机的 broker 上面由某个 partition 的 leader ,那么此时会从 follower 中重新选举一个新的 leader 出来,继续读写这个 leader 即可。

​ 写数据的时候,生产者就写 leader ,然后 leader 将数据落地到写本地磁盘,接着其他 follower 自己主动从 leader 来 pull 数据。一旦所有 follower 同步好数据,就会发送 ack 给 leader,leader 收到所有 follower 的 ack 之后,就会返回写成功消息给生产者(一种模式之一)。

​ 消费的时候,只会从 leader 去读,但是只有当一个消费已经被所有 follower 都同步成功返回 ack 后,这个消息才会被消费者读到。

5.保证消息不被重复消费(消费队列保证幂等性)

大概说一说可能哪些重复消息的问题

RabbitMQ、RocketMQ、Kafka 都可能出现消息重复消费的问题。

​ Kafka 实际上有个 offset 的概念,每个消息写进去,都有一个 offset ,代表消息的序号,然后 consumer 消费了数据之后,每隔一段时间(定时定期),会把自己消费过的消息的 offset 提交一下,表示 “我消费过来,下次我要重启,就让我继续从上次消费的 offset 来继续消费吧”。

​ 如果生产重启系统,紧急情况直接 kill 进程,可能导致 consumer 由消息处理,但是没有来的及提交 offset 。重启之后少数消费会再次消费一次。

​ 举个例子。

​ 场景带入,数据 1/2/3 依次进入 Kafka,Kafka 会给这三条数据每条分配一个 offset ,代表这条数据的序号,我们就假设分配的 offset 依次是 152/ 153 /154 。消费从 Kafka 去消费的时候也是按照这个顺序。假如消费者消费了 offset= 153 这个数据,刚准备提交 offset 到 Zookeeper ,此时消费者进程被重启。此时消费过的数据 1/2 的offset 没有提交,Kafka 也就不知道你消费 offset = 153 这条数据。重启之后,消费者找 Kafka 说,哥们,你给我把上次我消费的的地方后面的数据继续传给我。由于没有提交 1/2 数据再次传过来,如果消费者没有去重的话,那么就会导致消费重复。

注意:新版本的 Kafka 已经将 offset 存储到 Zookeeper 转移到 Kafka brokers,并使用内部位移注意 __consumer_offsets 进行存储。

结果就会导致数据 1/2 往数据插入2 次,那么数据就错了。重复消费不可怕,可怕没有考虑重复消费之后,怎么保证幂等性。

​ 举例说明。假如有个系统,消费一条消息就是往数据库插入一条数据,要是你消息重复两次,就插入了两条,数据就是错了。但是你消费到第二次的时候,判断是否消费过了,若是就直接丢了,这样就保证了一条数据了。

​ 一条数据重复出现两次,数据库只有一条数据,这就保证了系统的幂等性。

​ 幂等性:就是一个数据,或者一个请求,给你重复来多次,你的确保对应的数据是不会改变的,不能出错。

如何保证消息的幂等性?

​ 结合业务思考,思路:

  • 如果是写库,根据主键查一下,如果这个数据有了,就update

  • 写 Redis,每次都是 set ,天然幂等性 。当数据1 消费之后,存入Redis ,再次消费后将修改数据1的值

  • 复杂一点,生产者发送每条消息数据的时候,里面假如一个全局的唯一id,类似订单id,然后消费之后,先根据这个 id 去比如 Redis 里去查,之前的消费过吗?如果没有消费过,你就处理,然后这个id写 Redis。如果消费,那就别处理,保重别重复处理相同的消息即可。

  • 基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束,重复数据插入只会报错,不会导致数据库中出现脏数据。

    幂等性

6.保证消息的可靠性传输?保证消费不丢失

​ MQ的基本原则,数据不能多一条,也不能少一条,不能多,就是上面说的 重复消费和幂等性的问题。不能少。就是数据别弄丢。

​ 如果用MQ来传递非常核心的肖,如计费、扣费,就需要保证 MQ 绝对不会把计费消息给弄丢

RabbitMQ

消息可靠性

生产者弄丢了数据

​ 生产者将数据发送到 RabbitMQ ,可能在半路弄丢了,因为网络获取其他问题。

事务机制–消耗性能:

​ 选择 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ事务 channel.txSelect() ,然后发送消息,如果消息没有成功被 RabbitMQ 接收,那么生产者会收到异常报错的消息,此时可以回滚事务 channel.txRollback(),然后重试发送消息;如果收到消息,那么就可以提交事务了 channel.txCommit()

try {
    // 通过工厂创建连接
    connection = factory.newConnection();
    // 获取通道
    channel = connection.createChannel();
    // 开启事务
    channel.txSelect();
    // 这里发送消息
    channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
    // 模拟出现异常
    int result = 1 / 0;
    // 提交事务
    channel.txCommit();
} catch (IOException | TimeoutException e) {
    // 捕捉异常,回滚事务
    channel.txRollback();
}

问题是,RabbitMQ 事务机制,导致吞吐量下来,因为消耗性能。

confirm 模式:

​ 如果要保证 RabbitMQ 消息不丢,可以开启 confirm 模式,你每次些的消息都会分配一个唯一的id。
​ 如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你一个 nack 接口,告诉你这个消息接收失败了,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 状态,如果超过一定时间还没有接收这个消息的回调,那个可以重发。

​ 事务机制和 confirm 机制最大不同在于,事务机制是同步的,你提交一个事务之后会阻塞,但是 confirm 机制是异步的,发送消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收了之后会异步回调一个接口通知你这个消息接收到了。

​ 所以生产者 避免丢失数据,都是用 confirm 机制的

在 transaction 事务模式的channel 不能设置成 confirm 模式的,这两种不共存

客户端实现生产者 confirm 三种方式:

  1. 普通 confirm 模式:每发送一条消息后,调用 waitForConfirms() 方法,等待服务器端 confirm,如果服务端返回 false 或者在一段时间内都没返回,客户端可以进行消息重发。

    channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
    if (!channel.waitForConfirms()) {
        // 消息发送失败
        // ...
    }
  2. 批量 confirm 模式:每发送一批消息后,调用 waitForConfirms() 方法,等待服务端 confirm。

    channel.confirmSelect();
    for (int i = 0; i < batchCount; ++i) {
        channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
    }
    if (!channel.waitForConfirms()) {
        // 消息发送失败
        // ...
    }
  3. 异步 confirm 模式:提供一个回调方法,服务端 confirm 了一条或者多条消息后客户端会回调这个方法。

    SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
    channel.confirmSelect();
    channel.addConfirmListener(new ConfirmListener() {
        public void handleAck(long deliveryTag, boolean multiple) throws IOException {
            if (multiple) {
                confirmSet.headSet(deliveryTag + 1).clear();
            } else {
                confirmSet.remove(deliveryTag);
            }
        }
    
        public void handleNack(long deliveryTag, boolean multiple) throws IOException {
            System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
            if (multiple) {
                confirmSet.headSet(deliveryTag + 1).clear();
            } else {
                confirmSet.remove(deliveryTag);
            }
        }
    });
    
    while (true) {
        long nextSeqNo = channel.getNextPublishSeqNo();
        channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
        confirmSet.add(nextSeqNo);
    }

RabbitMQ 弄丢了数据

开启 RabbitMQ的持久化,消息写入之后会持久化到磁盘,挂后恢复会自动读取之前存储的数据,一般数据不会丢。小概率情况,RabbitMQ还没有持久化,就挂了,可能导致少量数据丢失。

设置持久化:
  • 创建 queue 时候设置为持久化。可以保证 RabbitMQ 持久化 queue 的元数据,但是它是不会持久化 queue 里的数据的
  • 第二个是发送消息的时候将消息的 deliveryMode 设置为2。就是将消息设置为持久化的,此时RMQ 就会将消息持久化到磁盘上了。

必须同时设置这个两个持久化才行,RMQ 哪怕挂了,再次重启,也会磁盘上重启恢复 queue,恢复这个 queue 里的数据。

注意,哪怕是开启持久化机制了,也有可能,在消息写到 RMQ中,还没有来的机持久化到磁盘上,结果不巧,此时的 RMQ 挂了,就会导致内存里的的一点点数据丢失。

所以,持久化可以跟生产者那边的 confirm 机制配合,只有消息被持久化到磁盘后,才会通知生产者 ack 了,哪怕 持久化到磁盘之前,RMQ 挂了,数据丢了,生产者收不到 ack ,自己也可以重发。

消费者弄丢了数据:<关闭自动 ack,声明队列,指定noAck = false>

RabbitMQ 如果丢失消息,主要是因为你消费的时候,刚消费,还没去处理,结果进程挂了。重启完后,RMQ认为你都消费了,这数据就丢了。

这时候的用RabbitMQ提供的 ack 机制。就是必须关闭RMQ自动 ack,可以通过 api 来调用就行,然后每次再代码处理完后,再**主动的 ack**。这样没有处理的话,RMQ 就收不到 ack,RMQ 就会认为你没有处理完,这时候RMQ 会把消息分给别的消费者处理,消息就不会丢。

RMQ 为了保证消息可靠不丢失提供消息确认机制。消费者在声明队列时,可以指定noAck 参数,当noAck = false, RMQ 会等待消费者显示的发送 ack 信号后,才从内存(和磁盘,如果持久化)移除消息。否则消息一旦被消费者消费,RMQ 会立即删除它。

RMQ-持久化

Kafka

消费端弄丢了数据

唯一可能导致消费者弄丢数据的情况,就是说,你消费到了这个消息,然后消费者那边自动提交了 offset,让 Kafka 以为你已经消费好了这个消息,但其实你才刚准备处理这个消息,你还没处理,你自己就挂了,此时这条消息就丢咯。

这不是跟 RabbitMQ 差不多吗,大家都知道 Kafka 会自动提交 offset,那么只要关闭自动提交 offset,在处理完之后自己手动提交 offset,就可以保证数据不会丢。但是此时确实还是可能会有重复消费,比如你刚处理完,还没提交 offset,结果自己挂了,此时肯定会重复消费一次,自己保证幂等性就好了。

生产环境碰到的一个问题,就是说我们的 Kafka 消费者消费到了数据之后是写到一个内存的 queue 里先缓冲一下,结果有的时候,你刚把消息写入内存 queue,然后消费者会自动提交 offset。然后此时我们重启了系统,就会导致内存 queue 里还没来得及处理的数据就丢失了。

Kafka 弄丢了数据

这块比较常见的一个场景,就是 Kafka 某个 broker 宕机,然后重新选举 partition 的 leader。大家想想,要是此时其他的 follower 刚好还有些数据没有同步,结果此时 leader 挂了,然后选举某个 follower 成 leader 之后,不就少了一些数据?这就丢了一些数据啊。

生产环境也遇到过,我们也是,之前 Kafka 的 leader 机器宕机了,将 follower 切换为 leader 之后,就会发现说这个数据就丢了。

所以此时一般是要求起码设置如下 4 个参数:

  • 给 topic 设置 replication.factor 参数:这个值必须大于 1,要求每个 partition 必须有至少 2 个副本。
  • 在 Kafka 服务端设置 min.insync.replicas 参数:这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保 leader 挂了还有一个 follower 吧。
  • 在 producer 端设置 acks=all :这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了。
  • 在 producer 端设置 retries=MAX (很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了。

我们生产环境就是按照上述要求配置的,这样配置之后,至少在 Kafka broker 端就可以保证在 leader 所在 broker 发生故障,进行 leader 切换时,数据不会丢失。

生产者会不会弄丢数据?

如果按照上述的思路设置了 acks=all ,一定不会丢,要求是,你的 leader 接收到消息,所有的 follower 都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。

7.保证消费顺序

我举个例子,我们以前做过一个 mysql binlog 同步的系统,压力还是非常大的,日同步数据要达到上亿,就是说数据从一个 mysql 库原封不动地同步到另一个 mysql 库里面去(mysql -> mysql)。常见的一点在于说比如大数据 team,就需要同步一个 mysql 库过来,对公司的业务系统的数据做各种复杂的操作。

你在 mysql 里增删改一条数据,对应出来了增删改 3 条 binlog 日志,接着这三条 binlog 发送到 MQ 里面,再消费出来依次执行,起码得保证人家是按照顺序来的吧?不然本来是:增加、修改、删除;你愣是换了顺序给执行成删除、修改、增加,不全错了么。

本来这个数据同步过来,应该最后这个数据被删除了;结果你搞错了这个顺序,最后这个数据保留下来了,数据同步就出错了。

先看看顺序会错乱的俩场景:

  • RabbitMQ:一个 queue,多个 consumer。比如,生产者向 RabbitMQ 里发送了三条数据,顺序依次是 data1/data2/data3,压入的是 RabbitMQ 的一个内存队列。有三个消费者分别从 MQ 中消费这三条数据中的一条,结果消费者 2 先执行完操作,把 data2 存入数据库,然后是 data1/data3。这不明显乱了。

RMQ-消息顺序

  • Kafka:比如说我们建了一个 topic,有三个 partition。生产者在写的时候,其实可以指定一个 key,比如说我们指定了某个订单 id 作为 key,那么这个订单相关的数据,一定会被分发到同一个 partition 中去,而且这个 partition 中的数据一定是有顺序的。
    消费者从 partition 中取出来数据的时候,也一定是有顺序的。到这里,顺序还是 ok 的,没有错乱。接着,我们在消费者里可能会搞多个线程来并发处理消息。因为如果消费者是单线程消费处理,而处理比较耗时的话,比如处理一条消息耗时几十 ms,那么 1 秒钟只能处理几十条消息,这吞吐量太低了。而多个线程并发跑的话,顺序可能就乱掉了。

Kafka-消息顺序

解决方案

RabbitMQ

拆分多个 queue,每个 queue 一个 consumer,就是多一些 queue 而已,确实是麻烦点;或者就一个 queue 但是对应一个 consumer,然后这个 consumer 内部用内存队列做排队,然后分发给底层不同的 worker 来处理。

消息顺序

Kafka

  • 一个 topic,一个 partition,一个 consumer,内部单线程消费,单线程吞吐量太低,一般不会用这个。
  • 写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue;然后对于 N 个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性。

消息顺序

文章作者: stormling
文章链接: http://www.stormling.top/posts/43028.html
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来源 码农Stormling!
消息队列
cover of previous post
上一篇
城市地铁AFC系统
城市地铁 AFC 系统简析一、概述1.1 定义AFC 系统的全称是 Automatic Fare Collection System(城市轨道交通自动售检票系统),是基于计算机、通信、网络、自动控制等技术,实现轨道交通售票、检票、计费、收费、统计、清分、管理等全过程的自动化系统。 1.2 主要工作内容1、实现中央系统、车站系统和终端设备之间的数据传输和处理; 2、完成车 票制作、售票、检票、票务统计分析等工作; 3、及时、准确地进行客流、票务数据的收集、整理、汇总和分析; 4、实现轨道交通收益方得清分结算以及与关联系统等外部接口之间的清分结算,同时可通过银行或金融机构实现账务划拨。 1.3...
cover of next post
下一篇
面试系列(七)| JVM常见命令
JVM 常用命令介绍下面介绍一下 JVM 中常用的调优、故障处理等工具。 jps :虚拟机进程工具,全称是 **JVM Process Status Tool**,它的功能和 Linux 中的 ps 类似,可以列出正在运行的虚拟机进程,并显示虚拟机执行主类 Main Class 所在的本地虚拟机唯一ID,虽然功能比较单一,但是这个命令绝对是使用最高频的一个命令。 _jstat_:虚拟机统计信息工具,用于监视虚拟机各种运行状态的信息的命令行工具,它可以显示本地或者远程虚拟机进程中的类加载、内存、垃圾收集、即时编译等运行时数据。 jinfo:Java 配置信息工具,全称是 _Configuration Info for Java_,它的作用是可以实时调整虚拟机各项参数。 _jmap_:Java 内存映像工具,全称是 _Memory Map For Java_,它用于生成转储快照,用来排查内存占用情况 jhat:虚拟机堆转储快照分析工具,全称是 _JVM Heap Analysis Tool_,这个指令通常和 jmap 一起搭配使用,jhat 内置了一个 HTTP/Web...
相关推荐
cover
2024-11-30
MQ系列(七)| RocketMQ 为什么性能不如Kafka?
RocketMQ 为什么性能不如 Kafka? RocketMQ 使用的是 mmap 零拷贝技术,而 kafka 使用的是 sendfile (硬件设备技术 SG-DMA,不影响(不占用)CPU工作) mmap 内核缓冲区->映射用户缓冲区->内核缓冲区->网卡sendfile 内核缓冲区-> SG-DMA -> 网卡 在上篇文章《rocketmq 是什么》中,我们了解到 RocketMQ 的架构其实参考了 kafka 的设计思想,同时又在 kafka 的基础上做了一些调整。看起来,RocketMQ 好像各方面都比 kafka 更能打。 但 kafka 却一直没被淘汰,说明 RocketMQ 必然是有着不如 kafka 的地方。是啥呢?性能,严格来说是吞吐量。阿里中间件团队对它们做过压测,同样条件下,kafka 比 RocketMQ 快 **50%**左右。但即使这样,RocketMQ 依然能每秒处理 10w 量级的数据,依旧非常能打。你不能说 RocketMQ 弱,只能说 Kafka 性能太强了。 不过这就很奇怪了,为什么...
cover
2024-11-28
MQ系列(六)| RocketMQ 快速入门
RocketMQ 快速入门 本参考链接:RocketMQ 是什么?原作者:小白debug 前言 作为一个程序员,假设你有 A、B 两个服务,A 服务发出消息后,不想让 B 服务立马处理到。而是要过半小时才让 B 服务处理到,该怎么实现? 这类延迟处理消息的场景非常常见,举个例子,比如我每天早上到公司后都会点个外卖,我希望外卖能在中午送过来,而不是立马送过来,这就需要将外卖消息经过延时后,再投递到商家侧。 延时消息场景那么问题就来了,有没有优雅的解决方案?当然有,没有什么是加一层中间层不能解决的,如果有,那就再加一层。这次我们要加的中间层是消息队列 **RocketMQ**。 RocketMQ 是什么?RocketMQ 是阿里自研的国产消息队列,目前已经是 Apache 的顶级项目。和其他消息队列一样,它接受来自生产者的消息,将消息分类,每一类是一个 topic,消费者根据需要订阅 topic,获取里面的消息。 是不是很像我们上篇文章里提到的消息队 Kafka,那么问题很自然就来了,既然都是消息队列,那它们之间有什么区别呢? RocketMQ 和 Kafka...
cover
2024-11-27
MQ系列(五)| Kafka 快速入门
Kafka 快速入门介绍 参考:Kafka 是什么? 架构一个高性能,高扩展性,高可用,支持持久化的超强消息队列,它就是我们常说的消息队列 KafkaZookeeper 协调管理多个 broker 组成,内部有多个 topic 分类,每个 topic 又分成多个 partition ,每个 partition 有多个副本 replia,不同的partition 会分布在不同 broker 上,提升性能同时,还增加了系统可用性和可扩展性 高性能 对消息进行分类,每个类是一个 topic 单个 topic 的消息可能过多,可将单个队列拆分成多个段,每段就是一个分区 partition ,每个消费者负责一个 partition 高扩展性可将 partition 分部在多台设备,每台设备代表一个 broker 高可用存在一个问题,如果其中一个partition所在的 broker 挂了,那么这部分的消息不久丢失了吗? 可以给partition 多加几个副本 replica,从中分为 Leader 和 Follower,Leader 负责生产者和消费者的读写,Follower...
cover
2024-11-24
MQ系列(四)| RabbitMQ 死信队列和延迟队列
死信队列死信是什么死信:无法被消费的消息。由于特定的原因导致队列中的某些消息无法被消费,这些消息没有后续的处理,就会变成死信。当消息在队列中无法被正常消费时,会被发送到死信队列中。 死信来源消息 TTL队列达到最大长度消息拒签(basicNack 或 basicReject)且重入队列为false(requeue=false) 死信架构 消息TTL 名称 交换机 路由键 类型 特征 参数 普通交换机 normal_exchange zhangsan direct / / 普通队列 normal_queue zhangsan / TTL DLX DLK x-dead-letter-exchange:dead_exchangex-dead-letter-routing-key:lisix-message-ttl: 10 * 1000 死信交换机 dead_exchange lisi direct / / 死信队列 dead_queue lisi / / / 生产者发送消息10条消息到队列...
cover
2024-11-20
MQ系列(三)| RabbitMQ 消息确认机制
RabbitMQ 消息确认机制 :heavy_exclamation_mark::heavy_exclamation_mark::heavy_exclamation_mark:温馨提示:基于JDK17、SpringBoot 2.1.8.RELEASE 版本,由于RabbitMQ 在 SpringBoot3+ 的配置项有所不同, 所以请严格按照该本版来使用,挖一坑:【后续会出一个SpringBoot3+版本的配置相关教程】 架构 概念保证消息不丢失,可靠抵达,可以使用事务消息,性能下降250倍 为此引入确认机制 生产者确认回调:publisher confirmCallback 生产者退回回调:publisher returnCallback未投递到queue退回模式 消费者确认:consumer ack确认机制 ComfirmCallback【生产者确认回调】 概念:ComfirmCallback是生产者消息确认机制的一部分。当生产者发送消息到 RabbitMQ 的交换器(Exchange)后,RabbitMQ 会返回一个确认消息给生产者,这个确认过程可以通过...
cover
2024-11-14
MQ系列(二)| RabbitMQ 整合 SpringBoot
RabbitMQ 整合 SpringBoot概述 大多应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦能力、流量削峰 消息服务中两个重要概念:消息代理(`message broker`)和目的地(`destination`) 当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。 消息队列主要有两种形式的目的地1. 队列(`queue`):点对点消息通信(`point-to-point`) 2. 主题(`topic`):发布(`publish`)/订阅(`subscribe`)消息通信 RabbitMQ 架构图 概念生产者 Producer生产者是消息的发送方,它将消息发送到 RabbitMQ 的交换器中。 ✨消息 Message 消息=消息头+消息体,根据routekey发送到指定的交换机 Exchange 消息头:含有各种属性 routing-key(路由键)、priority(优先级)、delivery-mode(指出该消息可能需要持久性存储)等。 ✨消息代理...

评论
ValineGitalk
avatar
stormling
文章
42
标签
25
分类
21
Follow Me
公告
欢迎大家来到Stormling博客
目录
  1. 1. 消息队列
    1. 1.1. 1.为什么使用MQ
      1. 1.1.1. 解耦:
        1. 1.1.1.1. 项目应用:
      2. 1.1.2. 异步:
      3. 1.1.3. 削峰:
    2. 1.2. 2.MQ的优缺点
    3. 1.3. 3.不同MQ的区别(kafka、RabbitMQ、RocketMQ)
    4. 1.4. 4.保证MQ的高可用
      1. 1.4.1. RabbitMQ高可性
        1. 1.4.1.1. 单机模式:
        2. 1.4.1.2. 普通集群模式:<无高可用>
        3. 1.4.1.3. 镜像集群模式:<高可用>
          1. 1.4.1.3.1. 如何开启镜像集群模式?
      2. 1.4.2. Kafka 的高可用性
        1. 1.4.2.1. 为什么只能读写leader?
    5. 1.5. 5.保证消息不被重复消费(消费队列保证幂等性)
      1. 1.5.0.1. 如何保证消息的幂等性?
  2. 1.6. 6.保证消息的可靠性传输?保证消费不丢失
    1. 1.6.1. RabbitMQ
      1. 1.6.1.1. 生产者弄丢了数据
        1. 1.6.1.1.1. 事务机制–消耗性能:
        2. 1.6.1.1.2. confirm 模式:
      2. 1.6.1.2. RabbitMQ 弄丢了数据
        1. 1.6.1.2.1. 设置持久化:
      3. 1.6.1.3. 消费者弄丢了数据:<关闭自动 ack,声明队列,指定noAck = false>
    2. 1.6.2. Kafka
      1. 1.6.2.1. 消费端弄丢了数据
      2. 1.6.2.2. Kafka 弄丢了数据
      3. 1.6.2.3. 生产者会不会弄丢数据?
  3. 1.7. 7.保证消费顺序
    1. 1.7.1. 解决方案
      1. 1.7.1.1. RabbitMQ
      2. 1.7.1.2. Kafka
最新文章
面向八股文面试专场
面向八股文面试专场2025-01-22
【每日早报】-2025-01-21 - 星期二
【每日早报】-2025-01-21 - 星期二2025-01-21
规则引擎 Drools 8+ 快速入门
规则引擎 Drools 8+ 快速入门2024-12-11
数据库系列(二) | Mybatis Plus 3.0+快速入门
数据库系列(二) | Mybatis Plus 3.0+快速入门2024-12-09
分布式系列(二) | Redisson分布式锁
分布式系列(二) | Redisson分布式锁2024-12-05
©2019 - 2025 By stormling
码农Stormling程序员,关注公众号【码农Stormling】回复【面试】获取最全面试pdf
搜索
数据加载中