队列消费入口
@Autowired
RabbitMQRetryUtil retryUtil;
@RabbitListener(queues = RabbitMQConstant.QUEUE_PROMOTIONS_COUPON_SEND)
public void execute(Message message, Channel channel) throws IOException {
LogUtil.logApplicationInfo("进入发券逻辑");
byte[] body = message.getBody();
if (null != body) {
try {
String msg = new String(body);
LogUtil.logApplicationInfo(String.format("发券入参信息:%s", msg));
SendMemberCouponRequest sendMemberCouponRequest = JSONObject.parseObject(msg, SendMemberCouponRequest.class);
memberCouponService.sendMemberCouponByMembersAndCoupons(sendMemberCouponRequest);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
LogUtil.logApplicationError("发券异常", e);
retryUtil.retry(message, channel, e);
}
} else {
LogUtil.logApplicationWarn("没有消费数据");
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
- 正常队列逻辑处理
@RabbitListener(queues = "queue.order.center.create")
public void orderCreate(Message message, Channel channel) throws IOException {
byte[] body = message.getBody();
String msg = "";
try {
if (null != body && body.length > 0) {
msg = new String(body);
LogUtil.logApplicationInfo("获取队列信息:" + msg);
}
} catch (BusinessException ex) {
LogUtil.logApplicationWarn("异常日志", ex);
} catch (Exception ex) {
LogUtil.logApplicationError("异常日志", ex);
} finally {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}