队列消费入口

  • 队列消费逻辑处理(含重试逻辑)
   @Autowired
    RabbitMQRetryUtil retryUtil;

    @RabbitListener(queues = RabbitMQConstant.QUEUE_PROMOTIONS_COUPON_SEND)
    public void execute(Message message, Channel channel) throws IOException {
        LogUtil.logApplicationInfo("进入发券逻辑");
        byte[] body = message.getBody();
        if (null != body) {
            try {
                String msg = new String(body);
                LogUtil.logApplicationInfo(String.format("发券入参信息:%s", msg));
                //发券逻辑
                SendMemberCouponRequest sendMemberCouponRequest = JSONObject.parseObject(msg, SendMemberCouponRequest.class);
                memberCouponService.sendMemberCouponByMembersAndCoupons(sendMemberCouponRequest);
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (Exception e) {
                LogUtil.logApplicationError("发券异常", e);
                retryUtil.retry(message, channel, e);
            }
        } else {
            LogUtil.logApplicationWarn("没有消费数据");
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }
- 正常队列逻辑处理
  @RabbitListener(queues = "queue.order.center.create")
    public void orderCreate(Message message, Channel channel) throws IOException {
        byte[] body = message.getBody();
        String msg = "";
        try {
            if (null != body && body.length > 0) {
                msg = new String(body);
                LogUtil.logApplicationInfo("获取队列信息:" + msg);
            }
        } catch (BusinessException ex) {
            LogUtil.logApplicationWarn("异常日志", ex);
        } catch (Exception ex) {
            LogUtil.logApplicationError("异常日志", ex);
        } finally {
            // 队列ack处理
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }

results matching ""

    No results matching ""