RocketMQ:新增consumer消费组group从最新消息开始消费skip last offset message

admin2024-05-15  0

场景

        想创建一个新的consumer去消费一个已经再使用的topic时,默认情况下会从topic中的第一条消息开始消费,大多数情况是需要从最新的消息开始。然后再使用CONSUME_FROM_LAST_OFFSET设置时并不会对新的consumer生效,它只是在停用consumer重新启用时,如果之前订阅OFFSET消息已经不存在了(默认rocketmq中存放的消息是72小时)就会从最后一条开始。所以代码层面无法实现新的consumer订阅topic最新消息开始消费的操作。

如何实现

        RocketMQ为我们提供了一个强大的消息控制台rocketmq-dashboard,其中就有消息相关的控制功能。操作步骤如下:

  1.创建新的consumer,已存在可以跳过这一步

Consumer>add

RocketMQ:新增consumer消费组group从最新消息开始消费skip last offset message,第1张

RocketMQ:新增consumer消费组group从最新消息开始消费skip last offset message,第2张

  • clusterName:选择需要订阅rocketmq集群名称
  • brokerName:选择需要订阅的broker名称,可能是多组集群
  • groupName:输入新建的Consumer名称
  • consumeEnable:是否开启Consumer消费,这里先将消费关闭(灰色),启动后再开启消费。
  2.启动消费组程序,已启动的跳过。

RocketMQ:新增consumer消费组group从最新消息开始消费skip last offset message,第3张

  • 启动消费程序后就可以看到Consumer消费节点数量,延时的消息数量。
  • 因为我们consumeEnable为false关闭状态,所有Consumer并没有消费消息。
  3.设置Consumer消费offset
  • Topic>SKIP_MESSAGE_ACCUMULATE

RocketMQ:新增consumer消费组group从最新消息开始消费skip last offset message,第4张

RocketMQ:新增consumer消费组group从最新消息开始消费skip last offset message,第5张

  • 在Topic栏目中选择需要跳过的topic,点击SKIP_MESSAGE_ACCUMULATE操作
  • 在SKIP_MESSAGE_ACCUMULATE弹出框SubscriptionGroup中选择需要操作的消费组
  • commit后生效,弹出设置的消费信息框如下

RocketMQ:新增consumer消费组group从最新消息开始消费skip last offset message,第6张

  • 如果新的Consumer订阅了多个topic,也需要将其他进行操作。
4.开启consumeEnable开关

Consumer>Config

RocketMQ:新增consumer消费组group从最新消息开始消费skip last offset message,第7张

  • 在Consumer的模块中,找到新增的消费组SubscriptionGroup ,打开后面Config窗口

RocketMQ:新增consumer消费组group从最新消息开始消费skip last offset message,第8张

  •  打开consumeEnable开关(红色),如果有过个broker集群需要都打开。
  • 设置完成后消费程序会开始消费新的消息
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明原文出处。如若内容造成侵权/违法违规/事实不符,请联系SD编程学习网:675289112@qq.com进行投诉反馈,一经查实,立即删除!