rabbitmq management 取消息堆积日志 rabbitmq消息堆积怎么解决

admin2024-05-30  16

01、RabbitMQ消息队列里积压很多消息

场景:上千万条消息在mq里积压了几个小时了还没解决

解决:

1)先修复consumer的问题,确保其恢复消费速度,然后将现有cnosumer都停掉
2)新建一个topic,partition是原来的10倍,临时建立好原先10倍或者20倍的queue数量
3)然后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,
消费之后不做耗时的处理,直接均匀轮询写入临时建立好的10倍数量的queue
4)接着临时征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的数据
5)这种做法相当于是临时将queue资源和consumer资源扩大10倍,以正常的10倍速度来消费数据
6)等快速消费完积压数据之后,得恢复原先部署架构,重新用原先的consumer机器来消费消息

场景:rabbitmq设置过期时间的,就是TTL

如果消息在queue中积压超过一定的时间就会被rabbitmq给清理掉,这个数据就没了。
那这就是第二个坑了。这就不是说数据会大量积压在mq里,而是大量的数据会直接搞丢

解决:

丢了大量的消息。我们可以采取一个方案,就是批量重导,这个时候我们就开始写程序,
将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入mq里面去,把白天丢的数据给他补回来

02、如何处理RabbitMQ 消息堆积和消息丢失问题

解决方案:

  • 增加消费者或后台相关组件的吞吐能力
  • 增加消费的多线程处理
  • 根据不同的业务实现不同的丢弃任务,选择不同的策略淘汰任务
  • 默认情况下,RabbitMQ消费者为单线程串行消费,设置并行消费两个关键属性,他们设置的是对每个消费者在初始化的时候设置的并发消费者个数,prefetchCount 是每次一次性从broker中获取的待消费的消息个数。
  • concurrentConsumer
  • prefetchConcurrentConsumer

消息丢失

解决方案:

  • 持久化
  • 消息确认机制

消息在生产者,消息队列,消费者中都有可能丢失。

1. 在生产者中丢失

原因:生产者发送消息成功后,消息队列没有收到消息,消息在从生产者传输到队列的过程中丢失,一般可能是网络不稳定。

 解决方案: 发送方采用消息确认机制,当消息成功被MQ接收到后, 会给生产者发一个确认消息,表示成功接收。如果没有接受成功,重新用定时器去投递

2. 在消息队列中丢失

原因:消息到MQ后, 还没有被消费就被MQ给丢失了。比如MQ服务器宕机或者未进行持久化重启。

解决方案:持久化交换机,队列和消息。确保MQ服务器重启时仍然能从磁盘恢复对应的队列,交换机和消息,然后我们把MQ 做多台分布式集群,防止出现所有的MQ服务器挂掉。

注意: 交换机,队列和消息都要持久化。

3. 在消费者中丢失

原因:默认消费者消费的时,设置的是自动回复MQ, 收到了消息,MQ会立刻删除自身保存的这条消息,如果消息已经在MQ中被删除,但消费者的业务处理出现异常或者宕机,那么就导致改消息没有被成功处理从而导致消息丢失。

解决方案: 设置手动ACK。

03、如何通过持久化保证消息99.99%不丢失?

要解决该问题,就要用到RabbitMQ中持久化的概念,所谓持久化,就是RabbitMQ会将内存中的数据(Exchange 交换器,Queue 队列,Message 消息)固化到磁盘,以防异常情况发生时,数据丢失。

其中,RabbitMQ的持久化分为三个部分:

  1. 交换器(Exchange)的持久化
  2. 队列(Queue)的持久化
  3. 消息(Message)的持久化

1. 交换器(Exchange)的持久化

在上篇博客中,我们声明Exchange的代码是这样的:

private final static String EXCHANGE_NAME = "normal-confirm-exchange";// 创建一个Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct");

这种情况下声明的Exchange是非持久化的,在RabbitMQ出现异常情况(重启,宕机)时,该Exchange会丢失,会影响后续的消息写入该Exchange,那么如何设置Exchange为持久化的呢?答案是设置durable参数。

durable:设置是否持久化。durable设置为true表示持久化,反之是非持久化。

持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。

设置Exchange持久化:

channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);

此时调用的重载方法为:

public DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException {
    return this.exchangeDeclare(exchange, (String)type, durable, false, (Map)null);
}

为了能更好的理解,我们新建个生产类如下:

package com.zwwhnly.springbootaction.rabbitmq.durable;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class DurableProducer {
    private final static String EXCHANGE_NAME = "durable-exchange";
    private final static String QUEUE_NAME = "durable-queue";    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接
        ConnectionFactory factory = new ConnectionFactory();
        // 设置 RabbitMQ 的主机名
        factory.setHost("localhost");
        // 创建一个连接
        Connection connection = factory.newConnection();
        // 创建一个通道
        Channel channel = connection.createChannel();
        // 创建一个Exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");        // 发送消息
        String message = "durable exchange test";
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());        // 关闭频道和连接
        channel.close();
        connection.close();
    }
}

示例代码中,我们新建了1个非持久化的Exchange,1个非持久化的Queue,并将它们做了绑定,此时运行代码,Exchange和Queue新建成功,消息‘durable exchange test’也被正确地投递到了队列中:

rabbitmq management 取消息堆积日志 rabbitmq消息堆积怎么解决,rabbitmq management 取消息堆积日志 rabbitmq消息堆积怎么解决_重启,第1张

rabbitmq management 取消息堆积日志 rabbitmq消息堆积怎么解决,rabbitmq management 取消息堆积日志 rabbitmq消息堆积怎么解决_数据_02,第2张

此时重启下RabbitMQ服务,会发现Exchange丢失了:

rabbitmq management 取消息堆积日志 rabbitmq消息堆积怎么解决,rabbitmq management 取消息堆积日志 rabbitmq消息堆积怎么解决_rabbitmq_03,第3张

修改下代码,将durable参数设置为ture:

// 创建一个Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);

此时运行完代码,然后重启下RabbitMQ服务,会发现Exchange不再丢失:

rabbitmq management 取消息堆积日志 rabbitmq消息堆积怎么解决,rabbitmq management 取消息堆积日志 rabbitmq消息堆积怎么解决_数据_04,第4张

4. 队列(Queue)的持久化

细心的可能会发现,虽然现在重启RabbitMQ服务后,Exchange不丢失了,但是队列和消息丢失了,那么如何解决队列不丢失呢?答案也是设置durable参数。

durable:设置是否持久化。为true则设置队列为持久化。

持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关信息。

简单修改下上面声明Queue的代码,将durable参数设置为true:

channel.queueDeclare(QUEUE_NAME, true, false, false, null);

此时调用的重载方法如下:

public com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException {
    validateQueueNameLength(queue);
    return (com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk)this.exnWrappingRpc((new com.rabbitmq.client.AMQP.Queue.Declare.Builder()).queue(queue).durable(durable).exclusive(exclusive).autoDelete(autoDelete).arguments(arguments).build()).getMethod();
}

运行代码,然后重启RabbitMQ服务,会发现队列现在不丢失了:

rabbitmq management 取消息堆积日志 rabbitmq消息堆积怎么解决,rabbitmq management 取消息堆积日志 rabbitmq消息堆积怎么解决_持久化_05,第5张

5. 消息(Message)的持久化

虽然现在RabbitMQ重启后,Exchange和Queue都不丢失了,但是存储在Queue里的消息却仍然会丢失,那么如何保证消息不丢失呢?答案是设置消息的投递模式为2,即代表持久化。

修改发送消息的代码为:

// 发送消息
String message = "durable exchange test";
AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(2).build();
channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());

调用的重载方法为:

public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException {
    this.basicPublish(exchange, routingKey, false, props, body);
}

运行代码,然后重启RabbitMQ服务,发现此时Exchange,Queue,消息都不丢失了:

rabbitmq management 取消息堆积日志 rabbitmq消息堆积怎么解决,rabbitmq management 取消息堆积日志 rabbitmq消息堆积怎么解决_数据_06,第6张

rabbitmq management 取消息堆积日志 rabbitmq消息堆积怎么解决,rabbitmq management 取消息堆积日志 rabbitmq消息堆积怎么解决_持久化_07,第7张

至此,我们完美的解决了RabbitMQ重启后,消息丢失的问题。

最终的代码如下,你也可以通过文末的源码链接下载本文用到的所有源码:

package com.zwwhnly.springbootaction.rabbitmq.durable;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class DurableProducer {
    private final static String EXCHANGE_NAME = "durable-exchange";
    private final static String QUEUE_NAME = "durable-queue";    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接
        ConnectionFactory factory = new ConnectionFactory();
        // 设置 RabbitMQ 的主机名
        factory.setHost("localhost");
        // 创建一个连接
        Connection connection = factory.newConnection();
        // 创建一个通道
        Channel channel = connection.createChannel();
        // 创建一个Exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");        // 发送消息
        String message = "durable exchange test";
        AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(2).build();
        channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());        // 关闭频道和连接
        channel.close();
        connection.close();
    }
}

6. 注意事项

1)理论上可以将所有的消息都设置为持久化,但是这样会严重影响RabbitMQ的性能。因为写入磁盘的速度比写入内存的速度慢得不止一点点。对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐量。在选择是否要将消息持久化时,需要在可靠性和吞吐量之间做一个权衡。

2)将交换器、队列、消息都设置了持久化之后仍然不能百分之百保证数据不丢失,因为当持久化的消息正确存入RabbitMQ之后,还需要一段时间(虽然很短,但是不可忽视)才能存入磁盘之中。如果在这段时间内RabbitMQ服务节点发生了宕机、重启等异常情况,消息还没来得及落盘,那么这些消息将会丢失。

3)单单只设置队列持久化,重启之后消息会丢失;单单只设置消息的持久化,重启之后队列消失,继而消息也丢失。单单设置消息持久化而不设置队列的持久化显得毫无意义。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明原文出处。如若内容造成侵权/违法违规/事实不符,请联系SD编程学习网:675289112@qq.com进行投诉反馈,一经查实,立即删除!