您的当前位置:首页>全部文章>文章详情
Java 操作 rabbitmq
发表于:2022-02-25浏览:27次TAG: #rabbitmq

概念简介

https://www.cnblogs.com/williamjie/p/9481774.html


  • fanout

类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。topic前面讲到

  • direct

类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。

  • topic

类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定:routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”binding key与routing key一样也是句点号“. ”分隔的字符串binding key中可以存在两种特殊字符“*”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)


环境安装

docker安装rabbitmq

docker search rabbitmq
docker pull rabbitmq
启动 docker run -d  --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq
进入容器内部
docker exec -it 镜像ID /bin/bash
rabbitmq-plugins enable rabbitmq_management

java代码集成

  1. 引入依赖

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

配置文件


rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /
    username: guest
    password: guest
    #发送者相关配置
    publisher-confirms: true #开启confirms回调  true开启,false关闭
    publisher-returns: true #开启returnedMessage回调
    # 消费者监听相关配置
    listener:
      simple:
        retry:
          # 开启消费者(程序出现异常)重试机制,默认开启并一直重试
          enabled: true
          # 最大重试次数
          max-attempts: 3
          # 重试间隔时间(毫秒)
          initial-interval: 3000
        # 开启手动ack(一般在消费端配置)
        acknowledge-mode: manual

2.mq的常量信息类

/**
 * @ClassName MqConstants
 * @Description:  mq的常量信息
 * @Author chengq
 * @Date 2020-07-20
 **/
public class MqConstants {
    public static final Integer DELIVERING = 0; //消息投递中
    public static final Integer SUCCESS = 1; //消息投递成功
    public static final Integer FAILURE = 2; //消息投递失败
    public static final Integer MAX_TRY_COUNT = 3; //最大重试次数
    /*会有定时任务扫描发送失败的id,可能刚发出去就定时任务被扫描到了导致重新发一次,一分钟时候还没投递成功就认为是失败了
     *创建的时候 sendLog.setTryTime(new Date(System.currentTimeMillis()+1000*60*MqConstants.MSG_TIMEOUT))
     */
    public static final Integer MSG_TIMEOUT = 1; //消息超时时间(分钟)
    public static final String MAIL_QUEUE_NAME = "javaboy.mail.queue"; //队列名称
    public static final String MAIL_EXCHANGE_NAME = "javaboy.mail.exchange"; //交换机名称
    public static final String MAIL_ROUTING_KEY_NAME = "javaboy.mail.routing.key"; //路由键名称
}

3.发送时记录一条日志,可采用定时任务进行扫描此表,日志表,可根据业务进行变通

CREATE TABLE `mq_send_log` (
  `msgId` bigint(32) NOT NULL COMMENT '消息id',
  `module` varchar(255) DEFAULT NULL COMMENT '模块名',
  `businessId` bigint(20) DEFAULT NULL COMMENT '业务id',
  `status` int(1) DEFAULT NULL COMMENT '0消息投递中  1投递成功  2投递失败',
  `routeKey` varchar(255) DEFAULT NULL COMMENT '路由键',
  `exchange` varchar(255) DEFAULT NULL COMMENT '交换机',
  `count` int(11) DEFAULT NULL COMMENT '次数',
  `tryTime` datetime DEFAULT NULL COMMENT '重试时间',
  `create_time` datetime DEFAULT NULL COMMENT '创建时间',
  `update_time` datetime DEFAULT NULL COMMENT '更新时间',
  PRIMARY KEY (`msgId`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='mq消息发送记录';

4.rabbitmq配置类

/**
 * 消息队列配置(保障了可靠性投递)
 * Created by chengq on 2020/7/20
 */
@Configuration
@Slf4j
public class RabbitMqConfig2 {
    @Autowired
    CachingConnectionFactory cachingConnectionFactory;
    //引入消息存储的service
    @Autowired
    IMqSendLogService mqSendLogService;
    @Bean
    RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
        rabbitTemplate.setConfirmCallback((data, ack, cause) -> {      //消息从生产者到消息中间件失败
            String msgId = data.getId();    //消息id
            System.out.println("RabbitMqConfig2==========="+msgId);
            if (ack) {
                log.info("消息投递成功");
//                mqSendLogService.updateMqSendLogStatus(msgId,1);
                //根据消息msgId更新消息状态为投递成功
            } else {
                log.info("消息投递失败");
            }
        });
        rabbitTemplate.setReturnCallback((msg, repCode, repText, exchange, routingkey) -> {      //消息到queue的时候失败
            log.info("消息发送失败");
        });
        return rabbitTemplate;
    }
    @Bean
    Queue mailQueue() {
        return new Queue(MqConstants.MAIL_QUEUE_NAME, true);
    }
    @Bean
    DirectExchange mailDirect() {
        return new DirectExchange(MqConstants.MAIL_EXCHANGE_NAME, true, false);
    }
    @Bean
    Binding mailBinding() {
        return BindingBuilder
                .bind(mailQueue())
                .to(mailDirect())
                .with(MqConstants.MAIL_ROUTING_KEY_NAME);
    }
}

5.生产者代码

/**
 * 消息发出者
 * Created by chengq on 2020/7/20
 */
@Component
@Slf4j
public class Sender {
    @Autowired
    private RabbitTemplate amqpTemplate;
    public void sendMessage(Long orderId) {
        Message message = MessageBuilder.withBody((orderId + "").getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8").setMessageId(orderId + "123").build();
        // message.getMessageProperties().setMessageId(orderId + "lalala");
        message.getMessageProperties().setCorrelationId(orderId + "lalala");//根据correlationId判断消息发送是否成功
        this.amqpTemplate.convertAndSend(MqConstants.MAIL_EXCHANGE_NAME, MqConstants.MAIL_ROUTING_KEY_NAME, message, new CorrelationData(orderId + "lalala"));
        log.info("消息已发送=======");
    }
}

6.定时任务扫描日志表,重试时间为发送消息后一分钟,超过一分钟status还是0的并且当前时间超过tryTime的进行重发,重发大于等于三次的直接判定为失败

/**
 * @ClassName SendTask
 * @Description: TODO
 * @Author 95702
 * @Date 2020-07-21
 * @Version V1.0
 **/
public class SendTask {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private IMqSendLogService mqSendLogService;
//    @Scheduled(cron = "0/10 * * * * ?")
    public void resendTask() {
        List<MqSendLog> logs = mqSendLogService.selectMqSendLogListRetry();
        logs.forEach(sendLog -> {
            if (sendLog.getCount() >= 3) {
                mqSendLogService.updateMqSendLogStatus(sendLog.getMsgId() + "", 2);//重试次数超过三次直接设置发送失败
            } else {
                mqSendLogService.updateCount(sendLog.getMsgId() + "", new Date());
                //根据sendLog.getBusinessid()获取消息记录,重新发送消息,或者直接发送orderId
//                rabbitTemplate.convertAndSend(MqConstants.MAIL_EXCHANGE_NAME, MqConstants.MAIL_ROUTING_KEY_NAME, orderId,new CorrelationData(orderId+""));
            }
        });
    }
}

7.消费者

@Slf4j
@Component
public class Reciver {
    @Autowired
    private RedisTemplate redisTemplate;
    @RabbitHandler
    @RabbitListener(queues = MqConstants.MAIL_QUEUE_NAME)
    public void handle(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
        // String messageId = message.getMessageProperties().getMessageId();
        String messageId = message.getMessageProperties().getCorrelationId();
        String s = new String(message.getBody());
        System.out.println("消费端messageId===" + messageId);
        System.out.println("消费端消息===" + s);
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        if (redisTemplate.opsForHash().entries("mq_send_log").containsKey(messageId)) {
            log.info("消费者已经被消费");
            channel.basicAck(deliveryTag, false);
        }
        //消息没有被消费,则执行业务代码,下边用逻辑代码代替
        try {
            //1.比如发送邮件业务
            //执行成功
            redisTemplate.opsForHash().put("mq_send_log", messageId, "chen123");
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            channel.basicNack(deliveryTag, false, true);//第三个参数执行失败消息自动回到队列中
            e.printStackTrace();
            log.info("逻辑代码执行失败" + e.getMessage());
        }
    }
}


标签云