RabbitMQ 发送给延迟交换机的消息调用returnedMessage 方法及returnedMessage() 方法的作用

1:returnedMessage() 方法的作用

交换机返回消息的方法-消息未送达队列触发回调
(1)常用于交换机无法路由回退消息。
(2)如果交换机绑定了备用交换机则是路由到备用交换机,此方法不回调。
(3)如果是发送到延迟交换机则回调此方法,所以如果使用延迟交换机则要对延迟交换机回调的消息过滤。

2:演示错误的Routingkey导致无法路由的现象,触发回调

1、声明交换机和队列并绑定

/**
 * 不可路由的交换机和队列配置
 *
 * @Author darren
 * @Date 2023/3/23 20:02
 */
@Configuration
@Slf4j
public class NotRoutableConfig {

    @Bean("notRoutableExchange")
    public DirectExchange getNotRoutableExchange(){
        return ExchangeBuilder.directExchange(ExchangeUtil.NOT_ROUTABLE_EXCHANGE_NAME).build();
    }

    @Bean("notRoutableQueue")
    public Queue getNotRoutableQueue() {
        return QueueBuilder.durable(QueueUtil.NOT_ROUTABLE_QUEUE_NAME).build();
    }

    @Bean
    public Binding getBinding(
            @Qualifier("notRoutableQueue") Queue notRoutableQueue,
            @Qualifier("notRoutableExchange") DirectExchange notRoutableExchange){
        return BindingBuilder.bind(notRoutableQueue).to(notRoutableExchange).with(RoutingKeyUtil.NOT_ROUTABLE_RIGHT_ROUTING_KEY);
    }
}

2、CallBack实现类


/**
 * 发布确认-消息回调类
 *
 * @Author darren
 * @Date 2023/3/21 22:38
 */
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //依赖注入 rabbitTemplate 之后再设置它的回调对象
    @PostConstruct
    public void init(){
        // 确认回调
        rabbitTemplate.setConfirmCallback(this);
        // 返回回调
        rabbitTemplate.setReturnCallback(this);
    }

    /**
     * 交换机不管是否收到消息的一个回调方法
     *
     * @param correlationData: 消息相关数据
     * @param ack: 交换机是否收到消息
     * @param cause
     */
    @Override
    public void confirm(final CorrelationData correlationData, final boolean ack, final String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if(ack){
            log.info("交换机确认回调方法已经收到id为:{} 的消息",id);
        }else{
            log.info("交换机确认回调方法还未收到id为:{} 消息, 由于原因:{}", id, cause);
        }
    }

    /**
     * 交换机返回消息的方法-消息未送达队列触发回调
     * 常用于交换机无法路由回退消息
     * 如果交换机绑定了备用交换机则是路由到备用交换机,此方法不回调。
     *
     * @param message the returned message.
     * @param replyCode the 回复 code.
     * @param replyText the 回复 text.
     * @param exchange the exchange.
     * @param routingKey the routing key.
     */
    @Override
    public void returnedMessage(final Message message, final int replyCode, final String replyText,
            final String exchange,
            final String routingKey) {
        // 排除调延迟交换机,因为消息在延迟交换机中延迟,并未送达到队列则出发了此函数回调
        if (!ExchangeUtil.DELAYED_EXCHANGE_NAME.equals(exchange)) {
            log.info("交换机返回消息的方法收到的消息:{} 交换机回复的内容:{}, 交换机是:{}, 路由 key:{}",
                    new String(message.getBody()),replyText, exchange, routingKey);
        }
    }
}

3、生产者


/**
 * 不可路由交换机-生产者
 * @Author darren
 * @Date 2023/3/23 20:16
 */
@RestController
@Slf4j
@RequestMapping("/notRoutableExchange")
public class notRoutableExchangeController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/sendMessage/{message}")
    public String sendNotRoutableExchangeMsg(@PathVariable("message") String message) {
        rabbitTemplate.convertAndSend(
                ExchangeUtil.NOT_ROUTABLE_EXCHANGE_NAME,
                RoutingKeyUtil.NOT_ROUTABLE_ERROR_ROUTING_KEY,
                message,
                CorrelationDataUtil.getCorrelationData());
        return "发送到不可路由交换机的消息成功";
    }
}

4、结果

http://localhost:8888/notRoutableExchange/sendMessage/heheh
UUID为:500ba6ed-845c-4ac3-80b2-343f6906a69b
交换机返回消息的方法收到的消息:heheh 交换机回复的内容:NO_ROUTE, 交换机是:not.routable.exchange, 路由 key:not.routable.error.routing.key
交换机确认回调方法已经收到id为:500ba6ed-845c-4ac3-80b2-343f6906a69b 的消息

3:如果交换机绑定了备用交换机则是路由到备用交换机,此方法不回调。

1、声明发布确认交换机和队列并绑定备份交换机

/**
 * 发布确认-配置交换机和队列
 * @Author darren
 * @Date 2023/3/21 22:02
 */
@Configuration
public class ConfirmConfig {

    /**
     * 发布确认交换机并绑定备用交换机
     * @return
     */
    @Bean("confirmExchange")
    public DirectExchange confirmExchange() {
        return ExchangeBuilder.directExchange(ExchangeUtil.CONFIRM_EXCHANGE_NAME)
                .durable(true).withArgument("alternate-exchange", ExchangeUtil.BACKUP_EXCHANGE_NAME).build();
    }

    @Bean("confirmQueue")
    public Queue confirmQueue() {
        return QueueBuilder.durable(QueueUtil.CONFIRM_QUEUE_NAME).build();
    }

    @Bean
    public Binding confirmQueueBindingConfirmExchange(
            @Qualifier("confirmExchange") DirectExchange confirmExchange,
            @Qualifier("confirmQueue") Queue confirmQueue) {
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(RoutingKeyUtil.CONFIRM_ROUTING_KEY);
    }
}

2、声明备份交换机和队列并绑定

/**
 * 备份交换机及队列
 * @Author darren
 * @Date 2023/3/22 20:10
 */
@Configuration
@Slf4j
public class backupConfig {

    @Bean("backupExchange")
    public FanoutExchange getBackupExchange(){
        return ExchangeBuilder.fanoutExchange(ExchangeUtil.BACKUP_EXCHANGE_NAME).build();
    }

    @Bean("backupQueue")
    public Queue getBackupQueue(){
        return QueueBuilder.durable(QueueUtil.BACKUP_QUEUE_NAME).build();
    }

    @Bean("warningQueue")
    public Queue getWarningQueue(){
        return QueueBuilder.durable(QueueUtil.WARNING_QUEUE_NAME).build();
    }

    @Bean
    public Binding backupQueueBindingBackupExchange(
            @Qualifier("backupQueue") Queue backupQueue,
            @Qualifier("backupExchange") FanoutExchange backupExchange) {
        return BindingBuilder.bind(backupQueue).to(backupExchange);
    }

    @Bean
    public Binding warningQueueBindingBackupExchange(
            @Qualifier("warningQueue") Queue warningQueue,
            @Qualifier("backupExchange") FanoutExchange backupExchange) {
        return BindingBuilder.bind(warningQueue).to(backupExchange);
    }
}

3、消费者

/**
 * 发布确认模式队列消费者
 *
 * @Author darren
 * @Date 2023/3/21 22:49
 */
@Component
@Slf4j
public class ConfirmQueueConsumer {

    @RabbitListener(queues = QueueUtil.CONFIRM_QUEUE_NAME)
    public void receiveConfirmQueue(Message message) {
        String msg = new String(message.getBody());
        log.info("消费者收到队列:{} 的消息:{}", QueueUtil.CONFIRM_QUEUE_NAME, msg);
    }

    @RabbitListener(queues = QueueUtil.BACKUP_QUEUE_NAME)
    public void receiveWarningQueueMessage(Message message) {
        String msg = new String(message.getBody());
        log.info("消费者收到队列:{} 的消息:{}", QueueUtil.BACKUP_QUEUE_NAME, msg);
    }
}

4、生产者


/**
 * 发布确认模式-生产者
 * @Author darren
 * @Date 2023/3/21 22:12
 */
@RestController
@Slf4j
@RequestMapping("/confirmExchange")
public class ConfirmExchangeController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发布消息
     *
     * 发到交换机正确的路由可以消费成功
     * 发到交换机错误的路由无法消费,但交换机可以收到消息,因无法路由所以到达不了队列
     * @param message
     * @return
     */
    @GetMapping("/sendMessage/{message}")
    public String sendMessage(@PathVariable String message) {

        log.info("发送消息:{} 到交换机:{} ",message, ExchangeUtil.CONFIRM_EXCHANGE_NAME);
        rabbitTemplate.convertAndSend(
                ExchangeUtil.CONFIRM_EXCHANGE_NAME,
                RoutingKeyUtil.CONFIRM_ROUTING_KEY,
                message,
                CorrelationDataUtil.getCorrelationData());

        rabbitTemplate.convertAndSend(
                ExchangeUtil.CONFIRM_EXCHANGE_NAME,
                RoutingKeyUtil.CONFIRM_ERROR_ROUTING_KEY,
                message,
                CorrelationDataUtil.getCorrelationData());
        return "发布确认模式发送消息成功";
    }

}

5、结果发现 returnedMessage()没有回调

发送消息:heheh 到交换机:confirm.exchange 
UUID为:dec7a8b8-eadb-43ba-b6e3-fe1d94f28bce
UUID为:83611012-9de2-4d52-8e6d-03baf0bc1e44
交换机确认回调方法已经收到id为:dec7a8b8-eadb-43ba-b6e3-fe1d94f28bce 的消息
交换机确认回调方法已经收到id为:83611012-9de2-4d52-8e6d-03baf0bc1e44 的消息
消费者收到队列:warning.queue 的消息:heheh
消费者收到队列:confirm.queue 的消息:heheh
消费者收到队列:backup.queue 的消息:heheh

4: 如果是发送到延迟交换机则回调此方法

 

1、声明延迟交换机和队列并绑定


/**
 * 3-延时消息插件
 *
 * 原理:
 *  Exchange类型 x-delayed-message ,该类型消息支持延迟投递机制。
 *  接收到消息后并不会立即将消息投递至目标队列,而是存储在mnesia table(一个分布式数据库)中,
 *  然后检测消息延迟时间,如果达到可投递时间( 过期时间 )后,将其通过 x-delayed-type
 *  类型标记的交换机投递到目标队列中。
 * @Author darren
 * @Date 2023/3/21 20:56
 */
@Configuration
public class DelayedExchangeConfig {

    /**
     * 自定义交换机-延迟交换机
     * @return
     */
    @Bean("delayedExchange")
    public CustomExchange delayedExchange(){
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(
                ExchangeUtil.DELAYED_EXCHANGE_NAME,
                "x-delayed-message", true, false, args);
    }

    /**
     * 普通队列
     *
     * @return
     */
    @Bean("delayedQueue")
    public Queue delayedQueue(){
        return QueueBuilder.durable(QueueUtil.DELAYED_QUEUE_NAME).build();
    }

    /**
     * 队列绑定延时交换机
     * @param delayedQueue
     * @param delayedExchange
     * @return
     */
    @Bean
    public Binding delayedQueueBindingDelayedExchange(
            @Qualifier("delayedQueue") Queue delayedQueue,
            @Qualifier("delayedExchange") CustomExchange delayedExchange ){
        return BindingBuilder.bind(delayedQueue).to(delayedExchange)
                .with(RoutingKeyUtil.DELAYED_ROUTING_KEY).noargs();
    }
}

2、消费者


/**
 * 延迟交换机队列消费者
 *
 * @Author darren
 * @Date 2023/3/23 16:25
 */
@Component
@Slf4j
public class DelayedQueueConsumer {
    @RabbitListener(queues = QueueUtil.DELAYED_QUEUE_NAME)
    public void receiveDelayedQueue(Message message) {
        String msg = new String(message.getBody());
        log.info("消费者收到队列:{} 的消息:{}", QueueUtil.DELAYED_QUEUE_NAME, msg);
    }
}

3、生产者

/**
 * 延时交换机-生产者
 *
 * @Author darren
 * @Date 2023/3/21 17:20
 */
@RestController
@Slf4j
@RequestMapping("/delayedExchange")
public class DelayedExchangeController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送到延时交换机
     * 消息在交换机中具有按时间排序的功能
     * @return
     */
    @GetMapping("/sendMessage/{message}/{delayTime}")
    public String sendMessage(@PathVariable String message,
            @PathVariable Integer delayTime) {
        rabbitTemplate.convertAndSend(
                ExchangeUtil.DELAYED_EXCHANGE_NAME,
                RoutingKeyUtil.DELAYED_ROUTING_KEY,
                message,
                messagePostProcessor -> {
                    messagePostProcessor.getMessageProperties().setDelay(delayTime);
                    return messagePostProcessor;
                },
                CorrelationDataUtil.getCorrelationData());
        log.info("发送一条延迟:{} 毫秒的消息:{} 给交换机:{}",
                delayTime, message, ExchangeUtil.DELAYED_EXCHANGE_NAME);
        return "发送延时交换机的消息成功";
    }
}

4、returnedMessage()方法

    /**
     * 交换机返回消息的方法-消息未送达队列触发回调
     * 常用于交换机无法路由回退消息
     * 如果交换机绑定了备用交换机则是路由到备用交换机,此方法不回调。
     *
     * @param message the returned message.
     * @param replyCode the 回复 code.
     * @param replyText the 回复 text.
     * @param exchange the exchange.
     * @param routingKey the routing key.
     */
    @Override
    public void returnedMessage(final Message message, final int replyCode, final String replyText,
            final String exchange,
            final String routingKey) {
        // 排除调延迟交换机,因为消息在延迟交换机中延迟,并未送达到队列则出发了此函数回调
        //if (!ExchangeUtil.DELAYED_EXCHANGE_NAME.equals(exchange)) {
            log.info("交换机返回消息的方法收到的消息:{} 交换机回复的内容:{}, 交换机是:{}, 路由 key:{}",
                    new String(message.getBody()),replyText, exchange, routingKey);
        //}
    }

5、结果,发现延迟交换机会回调returnedMessage()方法。所以如果有延迟队列则要排除掉。

UUID为:e8b70810-852b-400c-9fe0-bffdf1a5247d
发送一条延迟:10000 毫秒的消息:haha 给交换机:delayed.exchange
交换机返回消息的方法收到的消息:haha 交换机回复的内容:NO_ROUTE, 交换机是:delayed.exchange, 路由 key:delayed.routing.key
交换机确认回调方法已经收到id为:e8b70810-852b-400c-9fe0-bffdf1a5247d 的消息
消费者收到队列:delayed.queue 的消息:haha

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.kler.cn/a/4535.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

fwdiary(8) 区间dp,树形dp 记忆化搜索

1.划分大理石&#xff08;背包问题&#xff09; 多重背包问题&#xff0c;本题数据用二进制优化即可。 从6种大理石&#xff0c;每种有ai个&#xff0c;恰好选出重量为j&#xff0c;最终答案是 重量为总重/2 #include <cstring> #include <iostream> #include &l…

SpringBoot整合Flink(施耐德PLC物联网信息采集)

SpringBoot整合Flink&#xff08;施耐德PLC物联网信息采集&#xff09;Linux环境安装kafka前情&#xff1a;施耐德PLC设备&#xff08;TM200C16R&#xff09;设置好信息采集程序&#xff0c;连接局域网&#xff0c;SpringBoot订阅MQTT主题&#xff0c;消息转至kafka&#xff0c…

Wing IDE 解决鼠标悬浮

Wing IDE 解决鼠标悬浮 通过修改文件配置&#xff0c;解决鼠标悬浮没有出现变量值和函数没有自动提示的问题。 配置文件路径查看&#xff1a; 打开该文件夹下的下图配置文件&#xff1a; 添加下图两行配置&#xff0c;然后重启wingide即可。 Wing IDE 常用快捷键 调节字…

人工智能、深度学习和机器学习有哪些区别?

很多人可能不明白『机器学习』、『AI&#xff08;人工智能&#xff09;』和『深度学习』之间的区别。这些都是现代数据技术应用中的重要关键字&#xff0c;但由于它们很相似&#xff0c;因此极易混淆。但是为了将 AI 引入日常工作中&#xff0c;正确理解这三个关键字的范围很重…

English Learning - L2 第 9 次小组纠音 辅音 [s] [z] [ʃ] [ʒ] [h] [ʧ] [ʤ] 2023.3.25 周六

English Learning - L2 第 9 次小组纠音 辅音 [s] [z] [ʃ] [ʒ] [h] [ʧ] [ʤ] 2023.3.25 周六共性问题babies /ˈbeɪbɪz/Zoo /zuː/jazz /ʤz/reads /riːdz/slowly /ˈsləʊli/shop /ʃɒp/delicious /dɪˈlɪʃəs/usually /ˈjuːʒʊəlɪ/whole /həʊl/help /help/…

NDK FFmpeg音视频播放器五

NDK前期基础知识终于学完了&#xff0c;现在开始进入项目实战学习&#xff0c;通过FFmpeg实现一个简单的音视频播放器。 音视频一二三四节已经实现了音视频播放和解决内存泄漏问题&#xff0c;本节主要是完成音视频同步问题。 本节内容如下&#xff1a; 1.音视频同步画图分析…

深入学习JavaScript系列(三)——this

本篇为此系列第三篇&#xff0c;本系列文章会在后续学习后持续更新。 第一篇&#xff1a;#深入学习JavaScript系列&#xff08;一&#xff09;—— ES6中的JS执行上下文 第二篇&#xff1a;# 深入学习JavaScript系列&#xff08;二&#xff09;——作用域和作用域链 第三篇&…

JWT基础教程

JWT 目标 JWT 实现无状态 Web 服务【掌握】 nimbus-jose-jwt 库【重点】 token续期【重点】 一、 JWT 实现无状态 Web 服务 1、什么是有状态 有状态服务&#xff0c;即服务端需要记录每次会话的客户端信息&#xff0c;从而识别客户端身份&#xff0c;根据用户身份进行请…

注意力汇聚 笔记

10.2. 注意力汇聚&#xff1a;Nadaraya-Watson 核回归 — 动手学深度学习 2.0.0 documentation 要是有错请指正 想了半天这个玩意才搞懂一点点, 1.设计出一个函数 给予真实的连续x 得出真实连续的y 2.随机生成用于测试的一批不连续的 离散的x 并将测试x放进函数中并加入一…

IO进程线程-标准IO(结)

目录 1.思维导图 2.笔记 3.作业 3.1题 3.2题 1.思维导图 2.笔记 有道云笔记 3.作业 3.1题 计算文件行数 // 使用fputs fgets // 要求拷贝一个文件&#xff0c;例如将1.c中的内容拷贝到2.c中 // 要求计算一个文件的大小。#include <stdio.h> #include <string.…

python条件语句与循环语句

目录 一、条件语句 1.1if 二、循环语句 2.1while 2.2for循环 2.3break和continue 三、test和总结 一、条件语句 1.1if Python条件语句是通过一条或多条语句的执行结果&#xff08;True或者False&#xff09;来决定执行的代码块。 Python程序语言指定&#xff1a; 任…

TCC真没这么简单,一文讲透|分布式事务系列(三)

本文从两个场景说起&#xff0c;详细描述了TCC的详细过程&#xff0c;以及对比2PC有什么区别&#xff0c;适用什么样的场景。点击上方“后端开发技术”&#xff0c;选择“设为星标” &#xff0c;优质资源及时送达在面试前复习 TCC 的时候你是不是这样做的&#xff1a;百度TCC关…

Java基础 -- 关键字Static和Final

Java基础 -- 关键字Static和Final1. Static1.1 修饰成员变量1.2 修饰方法1.3 代码块1.3.1 代码块011.3.2 代码块022. Final2.1 初始化2.2 思考3. 类属性值的Null和非空判断4. Awakening1. Static java static关键字可以用在变量、方法、代码块和嵌套类上 1.静态变量 2.静态方法…

docker-compose部署rabbitmq集群

1、集群分类 RabbitMQ的是基于Erlang语言编写&#xff0c;而Erlang又是一个面向并发的语言&#xff0c;天然支持集群模式。 RabbitMQ的集群以下分类&#xff1a; 标准集群&#xff1a;是一种分布式集群&#xff0c;将队列分散到集群的各个节点&#xff0c;从而提高整个集群的并…

解决 Git 错误 error: failed to push some refs to ‘https://*****.git‘

1. 错误描述 当在 git 上创建好仓库后在上传时出现 ! [rejected] main -> main (fetch first)&#xff0c;error: failed to push some refs to *****。 2. 产生错误的原因 我们在创建仓库的时候&#xff0c;都会勾选 添加 README 文件&#xff0c;这个操作自动创建了一个 …

春分-面试

青岛 zc&#xff1a; 1.String的类型 string、stringbuilder 、stringbuffer&#xff1f; String不可变、另外两个可变、StringBuilder线程不安全、但是效率高、并且String不能被继承。 JVM是C写的编译后的机器码&#xff0c; 2.集合类的用法&#xff0c;还问了键值对。key如果…

LeetCode:242. 有效的字母异位词

&#x1f34e;道阻且长&#xff0c;行则将至。&#x1f353; &#x1f33b;算法&#xff0c;不如说它是一种思考方式&#x1f340;算法专栏&#xff1a; &#x1f449;&#x1f3fb;123 文章目录一、&#x1f331;[242. 有效的字母异位词](https://leetcode.cn/problems/valid-…

【Autoware规控】Lattice规划节点

文章目录1. Lattice规划介绍2. 相关代码1. Lattice规划介绍 Lattice Planner 是一种基于栅格地图的规划算法&#xff0c;通过搜索和优化实现路径规划的目的。Lattice Planner 的核心思想是将路径规划问题转化为一系列离散化的决策问题&#xff0c;通过搜索和优化得到最优路径&…

CentOS挂载U盘拷贝文件

1.登录linux操作系统&#xff0c;将U盘插入主机 2.新建一个目录将U盘挂载到该目录 使用命令: mkdir /mnt/usb 3.查看可用的挂载点 使用命令&#xff1a; fdisk -l 4. 将U盘挂载到刚才建立的目录下 使用命令: mount /dev/sdb4 /mnt/usb 5.查看U盘识别情况 使用命令 &#x…

【基础算法】1-2:归并排序

归并排序 OVERVIEW归并排序1.归并排序&#xff08;1&#xff09;基本思想&#xff08;2&#xff09;归并排序特性2.归并排序模板3.归并排序练习&#xff08;1&#xff09;AcWing787.归并排序&#xff08;2&#xff09;AcWing788.逆序对的数量1.归并排序 &#xff08;1&#xf…
最新文章