开发人员通常用json来定义消息内容,通过类型来区分不同的消息。其中有几个字段建议添加到消息体中。
处理mq消息都是在服务器内部执行,发生异常不像调用接口那样直接提示给调用方。这时就要主动提前预知错误。在编写代码时,捕获到异常,除了记录日志,还需要持久化存储。建议将消息体和异常信息存储到es,方便查询。发生异常时,还可以发送邮件报警通知开发者,提前介入。
建议将每条mq消息的处理都存储到es,每条消息的处理过程可以用ThreadLocal采集起来,随着mq消息处理完成,一并存储到es。这样方便问题查询和消息重推。
根据我使用的经验来看,消息是一定会重推的。其一是mq自己的重推机制,比如网络抖动或者消息确认不及时,导致mq没有确认成功引发冲退。其二是业务处理出现问题,通常是代码写的有问题或者服务不可用,这时需要开发者自己重新推送了数据。这些场景要考虑幂等处理,有的消息可以多次处理,有的消息不能多次处理,会造成重复数据。
比如一个根据顾客评分计算销售提成的mq消息处理,通一个订单,顾客打了3分,消息处理完成。但是后来销售说顾客忘记打分了,应该是5分。这时就可以通过es查询到之前的mq消息体,将里面的评分修改为5分,再重新推送mq。这时我会先将之前的评分记录删除,再重新添加5分记录。