现在很多主流的消息队列都实现了完善的消息可靠性保证机制,即使发生网络中断或者硬件故障,也能确保消息的可靠传递,不丢消息。
所以,绝大部分丢消息的原因都是开发者不熟悉消息队列,没有正确地使用和配置造成的。下面我们一起来了解下消息队列是如何保证消息可靠传递的,只要熟知原理,就能很快知道如何配置消息队列,写出可靠的代码,避免消息丢失。
检查消息队列的方法用消息队列最尴尬的情况不是丢消息,而是丢了消息还不知道。对于一个刚刚上线的系统,各方面肯定都不是很稳定,这个时候就特别许需要监控系统中是否有消息丢失的情况。
如果是对于一些基础设施比较完善的公司,可以使用分布式链路追踪系统,很方便地追踪每一条消息。如果没有的话,下面提供一种简答的方法,来检查是否有消息丢失的情况。
我们可以使用消息队列的有序性来验证是否有消息丢失。原理很简单,在 Producer 端,我们给每个发出的消息附加一个连续递增的序列号,在 Consumer 端来检查这个序列号的连续性。
如果没有消息队列,Consumer 端收到消息的序列号必然是递增的,或者说收到的消息,其中的序号必然是上一条消息的序号 + 1.如果检测到序号不连续,那就是丢消息了。还可以通过缺失的序号确定丢失的是哪条消息,方便进一步排查原因和补救。
大多数消息队列的客户端都支持拦截器,可以利用这个拦截器机制,在 Producer 发送消息之前的拦截器中将序号注入到消息中,在 Consumer 收到消息的拦截器中检测序号的连续性。这样实现的好处是:消息检测的代码不会入侵到业务代码中,待系统稳定后,也方便将这部分代码检测的逻辑关闭或者删除。
如果在一个分布式系统中实现了这个检测机制,有以下几个问题需要注意:
像 Kafka 和 RocketMQ 这样的消息队列,它是不保证在 Topic 上的严格顺序的 ,只能保证队列或分区上的消息是有序的,所以我们在发送消息的时候要指定分区,并且每个分区单独检测消息序号的连续性。
如果系统中 Producer 是多实例的,由于多个 Producer 并不好协调彼此之间的发送顺序,所以每个 Producer 分别生成各自的消息序号,并且需要附加 Producer 标识,在 Consumer 端按照每个 Producer 分别来检测序号的连续性。
Consumer 实例的数量最好是和分区数量一致,做到 Consumer 和分区一一对应,这样会比较方便在 Consumer 内检测消息序号的连续性。
确保信息可靠传递上面讲述了如何检测消息丢失,下面再来看看什么时候会发生消息丢失,该如何避免。
消息从生产到消费完成整个过程,可以分为下面三个阶段:
生产阶段:在这个阶段,消息在 Producer 创建出来,经过网络传输发送到 Broker 端。
存储阶段:在这个阶段,消息在 Broker 端存储,如果是集群,消息会在这个阶段被复制到其他的副本上。
消费阶段:在这个阶段,Consumer 从 Broker 上拉取消息,经过网络传输发送到 Consumer 上。
1、生产阶段在生产阶段,消息队列通过最常用的请求确认机制来保证消息的可靠传递:代码中调用发送消息的方法时,消息队列的客户端会把消息发送到 Broker,Broker 收到消息后,会给客户端返回一个确认响应,表明消息已经收到。客户端收到响应后,一次正常的消息发送就完成了。
只要 Producer 收到了 Broker 的确认响应,就可以保证消息在生产阶段不会丢失。有些消息队列在长时间没收到确认响应后,会自动重试,如果还是失败,就会以返回值或者异常的方式告知调用方。
在编写发送消息代码时,通过正确处理返回值或者捕获异常,就可以保证这个阶段的消息不会丢失了。以 Kafka 为例,我们看一下如何可靠地发送消息:
同步发送时,只要注意捕获异常即可。
partition, offset, err := producer.SendMessage(message) if err != nil { fmt.Println("消息发送失败:", err) } else { fmt.Printf("消息发送成功,分区:%d, 偏移:%d\n", partition, offset) }异步发送时,需要去异步检查返回值,并进行处理:
producer.Input()<- message select { case <-producer.Successes(): fmt.Println("消息发送成功") case err := <-producer.Errors(): fmt.Println("消息发送失败:", err.Err) } 2、存储阶段在存储阶段,只要 Borker 不出现故障,比如进程死掉了或者服务器宕机了,就不会出现丢失消息的问题。但如果出现了的话, 还是可能会丢失消息的。
如果对于消息的可靠性要求非常高,可以通过配置 Broker 参数来避免因为宕机丢失消息。
对于单个节点的 Broker,需要配置 Broker 参数,在收到消息后将消息写入磁盘,再给 Producer 返回确认响应。这样即使发生宕机,由于消息已经写入磁盘,恢复后还可以继续消费。例如,在 RocketMQ 中,需要将刷盘方式 flushDiskType 配置为 SYNC_FLUSH 同步刷盘。
如果 Borker 是由多个节点组成的集群,需要将 Borker 集群配置成:至少消息发送到 2 个以上的节点,再给客户端回复发送确认响应。这样当某个 Broker 宕机时,其他的 Broker 可以替代宕机的 Broker,也不会造成消息的丢失。
3、消费阶段消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递。客户端从 Broker 拉取消息后,执行用户的消费业务逻辑,成功后,才会给 Broker 发送消费确认响应。如果 Broker 没有收到消息的消费确认响应,下次拉消息的时候还是会返回同一条消息,以此来确保消息不会在网络传输过程中丢失,也不会因为客户端执行消费逻辑中出错导致丢失。
在编写代码的过程中,需要注意,不要在收到消息后立马返回消息确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。
同样,我们使用 golang 语言消费 RabbitMQ 消息为例,看看如何实现一段可靠的消费代码:
forever := make(chan bool) go func() { for d := range msgs { body := d.Body fmt.Printf("[x] 收到消息 %s\n", body) // 在这里处理收到的消息 // 你可以在这里调用 database.save(body) 来保存消息 fmt.Println("[x] 消费完成") // 完成消费业务逻辑后发送消费确认响应 d.Ack(false) } }() log.Printf("等待消息。要退出,请按 CTRL+C") <-forever正确的顺序时,先把消息保存到数据库中,然后再发送消费确认。这样如果保存消息失败了,就不会执行消费代码,下次拉取的还是这条消息,直到消费成功。
小结这一篇文章,先讲述了在系统中,如果检查消息队列消息丢失的情况,然后分析了一条消息从发送到消费成功的整个过程,以及消息队列是如何确保消息的可靠性,不会丢失的。这个过程可以分为分三个阶段,每个阶段都需要正确的编写代码并且设置正确的配置项,才能配合消息队列的可靠性机制,确保消息不会丢失。
在生产阶段,你需要捕获消息发送的错误,并重发消息。
在存储阶段,你可以通过配置刷盘和复制相关的参数,让消息写入到多个副本的磁盘上,来确保消息不会因为某个 Broker 宕机或者磁盘损坏而丢失。
在消费阶段,你需要在处理完全部消费业务逻辑之后,再发送消费确认。
知道这几个阶段的原理后,如果再出现丢消息的情况,可以通过在代码中加一些日志的方式,很快定位到是哪个阶段出了问题,然后再进一步分析,快速找到问题的原因。