概念简介
https://www.cnblogs.com/williamjie/p/9481774.html
fanout
类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。topic前面讲到
direct
类型的Exchange路由规则是完全匹配binding key与routing key,但这种严格的匹配方式在很多情况下不能满足实际业务需求。
topic
类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage相似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不同,它约定:routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”binding key与routing key一样也是句点号“. ”分隔的字符串binding key中可以存在两种特殊字符“*”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
环境安装
docker安装rabbitmq
docker search rabbitmq docker pull rabbitmq 启动 docker run -d --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq 进入容器内部 docker exec -it 镜像ID /bin/bash rabbitmq-plugins enable rabbitmq_management
java代码集成
引入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
配置文件
rabbitmq: host: localhost port: 5672 virtual-host: / username: guest password: guest #发送者相关配置 publisher-confirms: true #开启confirms回调 true开启,false关闭 publisher-returns: true #开启returnedMessage回调 # 消费者监听相关配置 listener: simple: retry: # 开启消费者(程序出现异常)重试机制,默认开启并一直重试 enabled: true # 最大重试次数 max-attempts: 3 # 重试间隔时间(毫秒) initial-interval: 3000 # 开启手动ack(一般在消费端配置) acknowledge-mode: manual
2.mq的常量信息类
/**
* @ClassName MqConstants
* @Description: mq的常量信息
* @Author chengq
* @Date 2020-07-20
**/
public class MqConstants {
public static final Integer DELIVERING = 0; //消息投递中
public static final Integer SUCCESS = 1; //消息投递成功
public static final Integer FAILURE = 2; //消息投递失败
public static final Integer MAX_TRY_COUNT = 3; //最大重试次数
/*会有定时任务扫描发送失败的id,可能刚发出去就定时任务被扫描到了导致重新发一次,一分钟时候还没投递成功就认为是失败了
*创建的时候 sendLog.setTryTime(new Date(System.currentTimeMillis()+1000*60*MqConstants.MSG_TIMEOUT))
*/
public static final Integer MSG_TIMEOUT = 1; //消息超时时间(分钟)
public static final String MAIL_QUEUE_NAME = "javaboy.mail.queue"; //队列名称
public static final String MAIL_EXCHANGE_NAME = "javaboy.mail.exchange"; //交换机名称
public static final String MAIL_ROUTING_KEY_NAME = "javaboy.mail.routing.key"; //路由键名称
}3.发送时记录一条日志,可采用定时任务进行扫描此表,日志表,可根据业务进行变通
CREATE TABLE `mq_send_log` ( `msgId` bigint(32) NOT NULL COMMENT '消息id', `module` varchar(255) DEFAULT NULL COMMENT '模块名', `businessId` bigint(20) DEFAULT NULL COMMENT '业务id', `status` int(1) DEFAULT NULL COMMENT '0消息投递中 1投递成功 2投递失败', `routeKey` varchar(255) DEFAULT NULL COMMENT '路由键', `exchange` varchar(255) DEFAULT NULL COMMENT '交换机', `count` int(11) DEFAULT NULL COMMENT '次数', `tryTime` datetime DEFAULT NULL COMMENT '重试时间', `create_time` datetime DEFAULT NULL COMMENT '创建时间', `update_time` datetime DEFAULT NULL COMMENT '更新时间', PRIMARY KEY (`msgId`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='mq消息发送记录';
4.rabbitmq配置类
/**
* 消息队列配置(保障了可靠性投递)
* Created by chengq on 2020/7/20
*/
@Configuration
@Slf4j
public class RabbitMqConfig2 {
@Autowired
CachingConnectionFactory cachingConnectionFactory;
//引入消息存储的service
@Autowired
IMqSendLogService mqSendLogService;
@Bean
RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
rabbitTemplate.setConfirmCallback((data, ack, cause) -> { //消息从生产者到消息中间件失败
String msgId = data.getId(); //消息id
System.out.println("RabbitMqConfig2==========="+msgId);
if (ack) {
log.info("消息投递成功");
// mqSendLogService.updateMqSendLogStatus(msgId,1);
//根据消息msgId更新消息状态为投递成功
} else {
log.info("消息投递失败");
}
});
rabbitTemplate.setReturnCallback((msg, repCode, repText, exchange, routingkey) -> { //消息到queue的时候失败
log.info("消息发送失败");
});
return rabbitTemplate;
}
@Bean
Queue mailQueue() {
return new Queue(MqConstants.MAIL_QUEUE_NAME, true);
}
@Bean
DirectExchange mailDirect() {
return new DirectExchange(MqConstants.MAIL_EXCHANGE_NAME, true, false);
}
@Bean
Binding mailBinding() {
return BindingBuilder
.bind(mailQueue())
.to(mailDirect())
.with(MqConstants.MAIL_ROUTING_KEY_NAME);
}
}5.生产者代码
/**
* 消息发出者
* Created by chengq on 2020/7/20
*/
@Component
@Slf4j
public class Sender {
@Autowired
private RabbitTemplate amqpTemplate;
public void sendMessage(Long orderId) {
Message message = MessageBuilder.withBody((orderId + "").getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8").setMessageId(orderId + "123").build();
// message.getMessageProperties().setMessageId(orderId + "lalala");
message.getMessageProperties().setCorrelationId(orderId + "lalala");//根据correlationId判断消息发送是否成功
this.amqpTemplate.convertAndSend(MqConstants.MAIL_EXCHANGE_NAME, MqConstants.MAIL_ROUTING_KEY_NAME, message, new CorrelationData(orderId + "lalala"));
log.info("消息已发送=======");
}
}6.定时任务扫描日志表,重试时间为发送消息后一分钟,超过一分钟status还是0的并且当前时间超过tryTime的进行重发,重发大于等于三次的直接判定为失败
/**
* @ClassName SendTask
* @Description: TODO
* @Author 95702
* @Date 2020-07-21
* @Version V1.0
**/
public class SendTask {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private IMqSendLogService mqSendLogService;
// @Scheduled(cron = "0/10 * * * * ?")
public void resendTask() {
List<MqSendLog> logs = mqSendLogService.selectMqSendLogListRetry();
logs.forEach(sendLog -> {
if (sendLog.getCount() >= 3) {
mqSendLogService.updateMqSendLogStatus(sendLog.getMsgId() + "", 2);//重试次数超过三次直接设置发送失败
} else {
mqSendLogService.updateCount(sendLog.getMsgId() + "", new Date());
//根据sendLog.getBusinessid()获取消息记录,重新发送消息,或者直接发送orderId
// rabbitTemplate.convertAndSend(MqConstants.MAIL_EXCHANGE_NAME, MqConstants.MAIL_ROUTING_KEY_NAME, orderId,new CorrelationData(orderId+""));
}
});
}
}7.消费者
@Slf4j
@Component
public class Reciver {
@Autowired
private RedisTemplate redisTemplate;
@RabbitHandler
@RabbitListener(queues = MqConstants.MAIL_QUEUE_NAME)
public void handle(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
// String messageId = message.getMessageProperties().getMessageId();
String messageId = message.getMessageProperties().getCorrelationId();
String s = new String(message.getBody());
System.out.println("消费端messageId===" + messageId);
System.out.println("消费端消息===" + s);
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
if (redisTemplate.opsForHash().entries("mq_send_log").containsKey(messageId)) {
log.info("消费者已经被消费");
channel.basicAck(deliveryTag, false);
}
//消息没有被消费,则执行业务代码,下边用逻辑代码代替
try {
//1.比如发送邮件业务
//执行成功
redisTemplate.opsForHash().put("mq_send_log", messageId, "chen123");
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
channel.basicNack(deliveryTag, false, true);//第三个参数执行失败消息自动回到队列中
e.printStackTrace();
log.info("逻辑代码执行失败" + e.getMessage());
}
}
}