面试系列(八)| 消息队列
消息队列
消息队列解决的核心场景:解构、异步、削峰
1.为什么使用MQ
公司业务场景,核心:解耦、异步、削峰
解耦:
A系统发数据给到BCD系统,如果E系统需要接入?C系统不需要了?A系统的负责人就需要来回修改接口对接其他系统。
如果使用MQ,A系统产生一条数据,发送到MQ中,那个系统需要数据自己去MQ消费。如果新的系统需要数据,直接从MQ中消费;某个系统不需要数据的话,取消消费这个MQ即可。这样A系统不需要考虑谁发送数据给谁,不需要考虑是否调用成功、失败超时等问题。
总结:通过一个MQ,Pub/Sub发布订阅消息模型,A系统就和其他系统彻底耦合了。
项目应用:
车站系统通过控制命令下发给各个设备,其中车站的设备通常包含:闸机、半自动售票机、自动售票机、手持机等设备。如果按照常规的同步方式来对接不同的设备,这将使得系统冗余的代码很多,当车站增减一个设备就可能需要重新对接接口,造成系统耦合性很高,这样的效率不高且不优雅。所以当系统需要发送命令(生产一个数据),将数据放到MQ中,不需要知道那个设备收到成功或者失败,其中需要消费的设备自己去订阅并且获取相应的消息即可。这样就可以达到,系统下发设备控制命令,不同设备响应。
异步:
A系统接收请求,需要本地入库,还需要BCD三个系统入库,本地入库(3ms),BCD(300ms+400ms+500ms),用户体验很差等待时间太长。业内请求需要做到 200ms 以内,对用户几乎无感。
使用MQ,A系统连续发送3条消息到消息队列,假如消耗5ms,请求花了 5 + 3 = 8ms ,对于用户来说就是点了一个按钮返回很快。
削峰:
每天一段时间,A系统风平浪静,每秒请求数量就50个。结果每次一到 12:00~13:00,每秒并发请求数量突然暴增到5k+条。但是系统是直接基于MySQL,大量请求涌入MySQL,每秒执行约5k条SQL,
一般情况下MySQL 每秒可抗 2k请求,5k的请求可能打死MySQL,导致无法使用。
一旦过了高峰,到了下午就到了低峰期,每秒请求数量 50 左右,对整个系统没有多少压力了。
如果使用MQ,每秒 5k 请求写入 MQ , A系统每秒最多处理 2k 个请求,因为 MySQL每秒最多请求 2k 个请求。A系统从MQ中慢慢拉取请求,每秒2k个请求,不超过自己每秒最大的请求数量即可。所以再高峰期,A系统不会挂掉。而MQ每秒进 5k ,出 2k,请求就会在高峰期积压可能多大十几万甚至百万的消息再 MQ中。
短暂的挤压后是可允许的,等到高峰期过后,每秒进入MQ的消息降低很多,但是系统依然按照 2k 的请求取消费,A系统很快的就会把挤压解决掉了。
2.MQ的优缺点
- 降低系统可用性,引出如何保证消息高可用
- 系统复杂度提高,引出如何保证消息没有重复消费、处理消息丢失、保证消息传递顺序性
- 一致性问题:A系统处理完返回成功,BC处理完写库成功,D写库失败了,数据不能一致了。
3.不同MQ的区别(kafka、RabbitMQ、RocketMQ)
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
单机吞吐量 | 万级,比 RocketMQ、Kafka 低一个数量级 | 万级别 | 10 万级,支撑高吞吐 | 10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景 |
topic 数量对吞吐量的影响 | topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topic | topic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源 | ||
时效性 | ms 级 | 微秒级,这是 RabbitMQ 的一大特点,延迟最低 | ms 级 | 延迟在 ms 级以内 |
可用性 | 高,基于主从架构实现高可用 | 基于主从架构实现高可用 | 非常高,分布式架构 | 非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 |
消息可靠性 | 有较低的概率丢失数据 | 基本不丢 | 经过参数优化配置,可以做到 0 丢失 | 同 RocketMQ |
功能支持 | MQ 领域的功能极其完备 | 基于 erlang 开发,并发能力很强,性能极好,延时很低 | MQ 功能较为完善,还是分布式的,扩展性好 | 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用 |
- 单机吞吐量:RocketMQ 10万级
- 时效性:RabbitMQ 微秒级
- 可用性:RocketMQ 非常高,分布式架构
- 消息可靠性:RocketMQ 参数优化,可以做到0丢失
4.保证MQ的高可用
RabbitMQ高可性
RabbitMQ是典型的基于主从(非分布式)做高可用。
三种模式:单机模式、普通集群模式、镜像集群模式
单机模式:
Demo级别,没有生产使用这个
普通集群模式:<无高可用>
多台机器启动多个RabbitMQ实例,每台机器启动一个。创建的queue,只会放在一个RabbitMQ实例上, 但每个实例同步 queue 元数据(queue的配置,可通过这个找到queue实例)。实际消费的时候连接到另一个实例,这个实例从queue所在的实例拉取数据。
没有做到分布式,只是普通的集群。导致消费者每次随机获取一个实例然后拉取数据,要么是固定连接 queue 的那个实例,前者有数据拉取的开销,后者有单实例性能开销。
这样只是提高了吞吐量,集群多个节点服务某个 queue 的读写操作。
镜像集群模式:<高可用>
RabbitMQ 的高可用模式。和普通集群模型不一样,该模式下,创建的 queue,无论是元数据还是 queue 里的消息都会存在多个实例中,每个节点都有 queue 的完整镜像。每次写消息到 queue 时,都会自动消息同步到多个实例上 queue 。
如何开启镜像集群模式?
在后台新增一个策略,这个策略就是镜像集群模式的策略。可以指定数据同步到所有节点,也可同步到指定数量的节点,再次创建queue 的时候,应用这个策略,就会自动同步到其他节点上去了。好处在于,你任何一个机器宕机了,没事,其他节点有完整的数据,别的消费者都可以到其他的节点消费。坏处是,第一,同步性能开销大,网络带宽压力和消耗很重。第二,不是分布式,没有扩展性,某个queue 负载很重,加机器,新增的机器包含这个 queue 的所有数据,并没有办法线性扩展。如果 queue 的数量很大,大到这个机器上的容量无法容纳该怎么办?
Kafka 的高可用性
Kafka 基本架构认识:由多个broker组成,每个broker是一个节点;你创建一个topic ,这个topic可以划分多个partition,每个 partition 可以存在不同的 broker 上,每个partition 就放一部分的数据。
天然的分布式消息队列,一个topic 的数据,分散到多个机器上,每个机器放一部分。
RabbitMQ 之类的并不是什么分布式消息队列,传统的消息队列,只不过提供了集群、HA的机制,无论如何操作,RabbitMQ 一个 queue 的数据都是放在一个节点里的,镜像集群模式下,也是每个节点都放 queue 的完整数据。
Kafka 0.8 之前没有 HA 机制,任何的 broker 挂了,那个 broker 上的 partition 就废了,就没有办法读写,无法保证高可用。
我们假如创建一个 topic ,指定了其 partition 数量是 3 个 ,分别在三台机器上。但是,如果第二台宕机了,会导致1/3的数据丢失,做不到高可用了。
Kafka 0.8 后提供了 HA 机制,就是 replica 副本机制。每个 partition 的数据都会同步到其他机器上,形成多个 replica 副本。所有 replica 中选举一个 leader 出来,那么生产者和消费者都跟这个 leader 打交道,其他的 replica 就是 follower (从属)。
写的时候,leader 负责将数据同步到其他的 follower 上,读的时候,直接读取 leader 上的数据。
为什么只能读写leader?
如果可以随意读取的每个 follower ,那么就要关系数据的一致性问题了,系统复杂度势必增加。Kafka 会将一个 partition 的所有 replica 分布在不同的机器中,可以提高容错性。
这就是所谓的高可用,因为如果某个 broker 宕机了,没事,那么 broker 上面的 partition 在其他机器上都有副本的。如果这个宕机的 broker 上面由某个 partition 的 leader ,那么此时会从 follower 中重新选举一个新的 leader 出来,继续读写这个 leader 即可。
写数据的时候,生产者就写 leader ,然后 leader 将数据落地到写本地磁盘,接着其他 follower 自己主动从 leader 来 pull 数据。一旦所有 follower 同步好数据,就会发送 ack 给 leader,leader 收到所有 follower 的 ack 之后,就会返回写成功消息给生产者(一种模式之一)。
消费的时候,只会从 leader 去读,但是只有当一个消费已经被所有 follower 都同步成功返回 ack 后,这个消息才会被消费者读到。
5.保证消息不被重复消费(消费队列保证幂等性)
大概说一说可能哪些重复消息的问题
RabbitMQ、RocketMQ、Kafka 都可能出现消息重复消费的问题。
Kafka 实际上有个 offset 的概念,每个消息写进去,都有一个 offset ,代表消息的序号,然后 consumer 消费了数据之后,每隔一段时间(定时定期),会把自己消费过的消息的 offset 提交一下,表示 “我消费过来,下次我要重启,就让我继续从上次消费的 offset 来继续消费吧”。
如果生产重启系统,紧急情况直接 kill 进程,可能导致 consumer 由消息处理,但是没有来的及提交 offset 。重启之后少数消费会再次消费一次。
举个例子。
场景带入,数据 1/2/3 依次进入 Kafka,Kafka 会给这三条数据每条分配一个 offset ,代表这条数据的序号,我们就假设分配的 offset 依次是 152/ 153 /154 。消费从 Kafka 去消费的时候也是按照这个顺序。假如消费者消费了 offset= 153 这个数据,刚准备提交 offset 到 Zookeeper ,此时消费者进程被重启。此时消费过的数据 1/2 的offset 没有提交,Kafka 也就不知道你消费 offset = 153 这条数据。重启之后,消费者找 Kafka 说,哥们,你给我把上次我消费的的地方后面的数据继续传给我。由于没有提交 1/2 数据再次传过来,如果消费者没有去重的话,那么就会导致消费重复。
注意:新版本的 Kafka 已经将 offset
存储到 Zookeeper 转移到 Kafka brokers
,并使用内部位移注意 __consumer_offsets
进行存储。
结果就会导致数据 1/2 往数据插入2 次,那么数据就错了。重复消费不可怕,可怕没有考虑重复消费之后,怎么保证幂等性。
举例说明。假如有个系统,消费一条消息就是往数据库插入一条数据,要是你消息重复两次,就插入了两条,数据就是错了。但是你消费到第二次的时候,判断是否消费过了,若是就直接丢了,这样就保证了一条数据了。
一条数据重复出现两次,数据库只有一条数据,这就保证了系统的幂等性。
幂等性:就是一个数据,或者一个请求,给你重复来多次,你的确保对应的数据是不会改变的,不能出错。
如何保证消息的幂等性?
结合业务思考,思路:
如果是写库,根据主键查一下,如果这个数据有了,就update
写 Redis,每次都是 set ,天然幂等性 。当数据1 消费之后,存入Redis ,再次消费后将修改数据1的值
复杂一点,生产者发送每条消息数据的时候,里面假如一个全局的唯一id,类似订单id,然后消费之后,先根据这个 id 去比如 Redis 里去查,之前的消费过吗?如果没有消费过,你就处理,然后这个id写 Redis。如果消费,那就别处理,保重别重复处理相同的消息即可。
基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束,重复数据插入只会报错,不会导致数据库中出现脏数据。
6.保证消息的可靠性传输?保证消费不丢失
MQ的基本原则,数据不能多一条,也不能少一条,不能多,就是上面说的 重复消费和幂等性的问题。不能少。就是数据别弄丢。
如果用MQ来传递非常核心的肖,如计费、扣费,就需要保证 MQ 绝对不会把计费消息给弄丢
RabbitMQ
生产者弄丢了数据
生产者将数据发送到 RabbitMQ ,可能在半路弄丢了,因为网络获取其他问题。
事务机制–消耗性能:
选择 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ事务 channel.txSelect()
,然后发送消息,如果消息没有成功被 RabbitMQ 接收,那么生产者会收到异常报错的消息,此时可以回滚事务 channel.txRollback()
,然后重试发送消息;如果收到消息,那么就可以提交事务了 channel.txCommit()
try {
// 通过工厂创建连接
connection = factory.newConnection();
// 获取通道
channel = connection.createChannel();
// 开启事务
channel.txSelect();
// 这里发送消息
channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
// 模拟出现异常
int result = 1 / 0;
// 提交事务
channel.txCommit();
} catch (IOException | TimeoutException e) {
// 捕捉异常,回滚事务
channel.txRollback();
}
问题是,RabbitMQ 事务机制,导致吞吐量下来,因为消耗性能。
confirm 模式:
如果要保证 RabbitMQ 消息不丢,可以开启 confirm
模式,你每次些的消息都会分配一个唯一的id。
如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack
消息,告诉你这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你一个 nack
接口,告诉你这个消息接收失败了,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 状态,如果超过一定时间还没有接收这个消息的回调,那个可以重发。
事务机制和 confirm
机制最大不同在于,事务机制是同步的,你提交一个事务之后会阻塞,但是 confirm
机制是异步的,发送消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收了之后会异步回调一个接口通知你这个消息接收到了。
所以生产者 避免丢失数据,都是用 confirm
机制的
在 transaction 事务模式的channel 不能设置成 confirm 模式的,这两种不共存
客户端实现生产者 confirm
三种方式:
普通 confirm 模式:每发送一条消息后,调用
waitForConfirms()
方法,等待服务器端 confirm,如果服务端返回 false 或者在一段时间内都没返回,客户端可以进行消息重发。channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes()); if (!channel.waitForConfirms()) { // 消息发送失败 // ... }
批量 confirm 模式:每发送一批消息后,调用
waitForConfirms()
方法,等待服务端 confirm。channel.confirmSelect(); for (int i = 0; i < batchCount; ++i) { channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes()); } if (!channel.waitForConfirms()) { // 消息发送失败 // ... }
异步 confirm 模式:提供一个回调方法,服务端 confirm 了一条或者多条消息后客户端会回调这个方法。
SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>()); channel.confirmSelect(); channel.addConfirmListener(new ConfirmListener() { public void handleAck(long deliveryTag, boolean multiple) throws IOException { if (multiple) { confirmSet.headSet(deliveryTag + 1).clear(); } else { confirmSet.remove(deliveryTag); } } public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple); if (multiple) { confirmSet.headSet(deliveryTag + 1).clear(); } else { confirmSet.remove(deliveryTag); } } }); while (true) { long nextSeqNo = channel.getNextPublishSeqNo(); channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes()); confirmSet.add(nextSeqNo); }
RabbitMQ 弄丢了数据
开启 RabbitMQ的持久化,消息写入之后会持久化到磁盘,挂后恢复会自动读取之前存储的数据,一般数据不会丢。小概率情况,RabbitMQ还没有持久化,就挂了,可能导致少量数据丢失。
设置持久化:
- 创建 queue 时候设置为持久化。可以保证 RabbitMQ 持久化 queue 的元数据,但是它是不会持久化 queue 里的数据的
- 第二个是发送消息的时候将消息的
deliveryMode
设置为2。就是将消息设置为持久化的,此时RMQ 就会将消息持久化到磁盘上了。
必须同时设置这个两个持久化才行,RMQ 哪怕挂了,再次重启,也会磁盘上重启恢复 queue,恢复这个 queue 里的数据。
注意,哪怕是开启持久化机制了,也有可能,在消息写到 RMQ中,还没有来的机持久化到磁盘上,结果不巧,此时的 RMQ 挂了,就会导致内存里的的一点点数据丢失。
所以,持久化可以跟生产者那边的 confirm
机制配合,只有消息被持久化到磁盘后,才会通知生产者 ack
了,哪怕 持久化到磁盘之前,RMQ 挂了,数据丢了,生产者收不到 ack
,自己也可以重发。
消费者弄丢了数据:<关闭自动 ack,声明队列,指定noAck = false>
RabbitMQ 如果丢失消息,主要是因为你消费的时候,刚消费,还没去处理,结果进程挂了。重启完后,RMQ认为你都消费了,这数据就丢了。
这时候的用RabbitMQ提供的 ack
机制。就是必须关闭RMQ自动 ack
,可以通过 api 来调用就行,然后每次再代码处理完后,再**主动的 ack
**。这样没有处理的话,RMQ 就收不到 ack
,RMQ 就会认为你没有处理完,这时候RMQ 会把消息分给别的消费者处理,消息就不会丢。
RMQ 为了保证消息可靠不丢失提供消息确认机制。消费者在声明队列时,可以指定noAck 参数,当noAck = false, RMQ 会等待消费者显示的发送 ack 信号后,才从内存(和磁盘,如果持久化)移除消息。否则消息一旦被消费者消费,RMQ 会立即删除它。
Kafka
消费端弄丢了数据
唯一可能导致消费者弄丢数据的情况,就是说,你消费到了这个消息,然后消费者那边自动提交了 offset,让 Kafka 以为你已经消费好了这个消息,但其实你才刚准备处理这个消息,你还没处理,你自己就挂了,此时这条消息就丢咯。
这不是跟 RabbitMQ 差不多吗,大家都知道 Kafka 会自动提交 offset,那么只要关闭自动提交 offset,在处理完之后自己手动提交 offset,就可以保证数据不会丢。但是此时确实还是可能会有重复消费,比如你刚处理完,还没提交 offset,结果自己挂了,此时肯定会重复消费一次,自己保证幂等性就好了。
生产环境碰到的一个问题,就是说我们的 Kafka 消费者消费到了数据之后是写到一个内存的 queue 里先缓冲一下,结果有的时候,你刚把消息写入内存 queue,然后消费者会自动提交 offset。然后此时我们重启了系统,就会导致内存 queue 里还没来得及处理的数据就丢失了。
Kafka 弄丢了数据
这块比较常见的一个场景,就是 Kafka 某个 broker 宕机,然后重新选举 partition 的 leader。大家想想,要是此时其他的 follower 刚好还有些数据没有同步,结果此时 leader 挂了,然后选举某个 follower 成 leader 之后,不就少了一些数据?这就丢了一些数据啊。
生产环境也遇到过,我们也是,之前 Kafka 的 leader 机器宕机了,将 follower 切换为 leader 之后,就会发现说这个数据就丢了。
所以此时一般是要求起码设置如下 4 个参数:
- 给 topic 设置
replication.factor
参数:这个值必须大于 1,要求每个 partition 必须有至少 2 个副本。 - 在 Kafka 服务端设置
min.insync.replicas
参数:这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保 leader 挂了还有一个 follower 吧。 - 在 producer 端设置
acks=all
:这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了。 - 在 producer 端设置
retries=MAX
(很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了。
我们生产环境就是按照上述要求配置的,这样配置之后,至少在 Kafka broker 端就可以保证在 leader 所在 broker 发生故障,进行 leader 切换时,数据不会丢失。
生产者会不会弄丢数据?
如果按照上述的思路设置了 acks=all
,一定不会丢,要求是,你的 leader 接收到消息,所有的 follower 都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。
7.保证消费顺序
我举个例子,我们以前做过一个 mysql binlog
同步的系统,压力还是非常大的,日同步数据要达到上亿,就是说数据从一个 mysql 库原封不动地同步到另一个 mysql 库里面去(mysql -> mysql)。常见的一点在于说比如大数据 team,就需要同步一个 mysql 库过来,对公司的业务系统的数据做各种复杂的操作。
你在 mysql 里增删改一条数据,对应出来了增删改 3 条 binlog
日志,接着这三条 binlog
发送到 MQ 里面,再消费出来依次执行,起码得保证人家是按照顺序来的吧?不然本来是:增加、修改、删除;你愣是换了顺序给执行成删除、修改、增加,不全错了么。
本来这个数据同步过来,应该最后这个数据被删除了;结果你搞错了这个顺序,最后这个数据保留下来了,数据同步就出错了。
先看看顺序会错乱的俩场景:
- RabbitMQ:一个 queue,多个 consumer。比如,生产者向 RabbitMQ 里发送了三条数据,顺序依次是 data1/data2/data3,压入的是 RabbitMQ 的一个内存队列。有三个消费者分别从 MQ 中消费这三条数据中的一条,结果消费者 2 先执行完操作,把 data2 存入数据库,然后是 data1/data3。这不明显乱了。
- Kafka:比如说我们建了一个 topic,有三个 partition。生产者在写的时候,其实可以指定一个 key,比如说我们指定了某个订单 id 作为 key,那么这个订单相关的数据,一定会被分发到同一个 partition 中去,而且这个 partition 中的数据一定是有顺序的。
消费者从 partition 中取出来数据的时候,也一定是有顺序的。到这里,顺序还是 ok 的,没有错乱。接着,我们在消费者里可能会搞多个线程来并发处理消息。因为如果消费者是单线程消费处理,而处理比较耗时的话,比如处理一条消息耗时几十 ms,那么 1 秒钟只能处理几十条消息,这吞吐量太低了。而多个线程并发跑的话,顺序可能就乱掉了。
解决方案
RabbitMQ
拆分多个 queue,每个 queue 一个 consumer,就是多一些 queue 而已,确实是麻烦点;或者就一个 queue 但是对应一个 consumer,然后这个 consumer 内部用内存队列做排队,然后分发给底层不同的 worker 来处理。
Kafka
- 一个 topic,一个 partition,一个 consumer,内部单线程消费,单线程吞吐量太低,一般不会用这个。
- 写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue;然后对于 N 个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性。