SpringAMQP

什么是SpringAMQP

        

官方网址

官方文档https://spring.io/projects/spring-amqp

Base Queue 简单队列模型

 对于生产者

        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
spring:
  rabbitmq:
    port: 5672
    host: 8.130.89.67
    virtual-host: /
    username: itcast
    password: 123
    @Autowired
    private RabbitTemplate template;
    @Test
    public void testSimpleQueue(){
        String queueName="simple.queue";
        String message="hello spring ampq";
        template.convertAndSend(queueName,message);
    }

对于消费者

依赖已经在父工程中到过了

配置和生产者的一样,粘贴过来就行

新建一个类

@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg){
        System.out.println("spring 消费者接收到消息 :【" + msg + "】");
    }
}

启动项目就可以消费消息了

因为是消息队列,所以先生产的消息就先被消费。先进先出。

rabbit没有消息回溯功能,一旦被消费就不可逆。

Work Queue 工作队列模型

可以提高消息处理速度,避免队列消息堆积。

案例

 生产者

    @Test
    public void testWorkQueue() throws InterruptedException {
        String queueName="simple.queue";
        String message="hello , message_";
        for(int i=1;i<=50;i++){
            template.convertAndSend(queueName,message+i);
            Thread.sleep(20);
        }
    }

 消费者

配置文件

spring:
  rabbitmq:
    port: 5672
    host: 8.130.89.67
    virtual-host: /
    username: itcast
    password: 123
    listener:
      simple:
        prefetch: 1    # 每次只能获取一条消息,处理完成才能获取下一个消息
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage1(String msg) throws InterruptedException {
        System.out.println("spring 消费者1接收到消息 :【" + msg + "】"+ LocalTime.now());
        Thread.sleep(20);
    }
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage2(String msg) throws InterruptedException {
        System.err.println("spring 消费者2接收到消息 :【" + msg + "】"+LocalTime.now());
        Thread.sleep(200);
    }

启动消费者项目,

 可以看到消息的处理时按照生产顺序来的,先进先出。
 

多个消费者绑定到一个队列,同一条消息只会被一个消费者处理

通过配置prefetch来控制消费者预取的消息数量

发布、订阅模型-Fanout

允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)。

Fanout Exchange 会将接收到的消息广播到每一个跟其绑定的queue。

 消费者

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
    @Bean
    public FanoutExchange fanoutExchange(){
        return  new FanoutExchange("itcast.fanout");
    }
    @Bean
    public Queue fanoutQueue1(){
        return  new Queue("fanout.queue1");
    }
    @Bean
    public Queue fanoutQueue2(){
        return  new Queue("fanout.queue2");
    }
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2,FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}
    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg) throws InterruptedException {
        System.out.println("spring 消费者1接收到消息 :【" + msg + "】");
    }

    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg) throws InterruptedException {
        System.out.println("spring 消费者1接收到消息 :【" + msg + "】");
    }

发布者

    @Test
    public void testFanoutExchange() {
        String exchangeName="itcast.fanout";    //对应消费者的交换机名字
        String message="hello , everyBody";
        template.convertAndSend(exchangeName,"",message);
    }

 

 

交换机的作用

        接受发布者发布的消息

        将消息按照规则路由到与之绑定的队列

        不能缓存消息,路由失败,消息丢失 

发布、订阅模型-Direct

每一个Queue都与Exchange设置一个BindingKey

发布者发送消息时,指定消息的RoutingKey

Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

 消费者

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct queue1"),
            exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
            key = {"red","blue"}
    ))
    public void listenDirectQueue1(String msg){
        System.out.println("spring 消费者1接收到消息 :【" + msg + "】");
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct queue2"),
            exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
            key = {"red","yellow"}
    ))
    public void listenDirectQueue2(String msg){
        System.out.println("spring 消费者2接收到消息 :【" + msg + "】");
    }

生产者

    @Test
    public void testDirectExchange() {
        String exchangeName="itcast.direct";
        String message="hello , red";
        template.convertAndSend(exchangeName,"red",message);
    }

差异

Direct交换机和Fanout交换机的差异

        Fanout将消息路由给每一个与之绑定的队列

        Direct交换机根据RoutingKey判断路由给哪一个队列

        如果多个队列的RoutingKey相等,则和Fanout功能类似

核心

        @Queue        @Exchange

发布、订阅模型-Topic

TopicExchange与DirectExchange类似,区别在于routingKey,Queue与Exchange指定BindingKey时可以使用通配符:

# :代指0个或多个单词

* :代指一个单词

消费者

 

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic queue1"),
            exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
            key = {"china.#"}
    ))
    public void listenTopicQueue1(String msg){
        System.out.println("spring 消费者1接收到消息 :【" + msg + "】");
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic queue2"),
            exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
            key = {"#.news"}
    ))
    public void listenTopicQueue2(String msg){
        System.out.println("spring 消费者2接收到消息 :【" + msg + "】");
    }

生产者

    @Test
    public void testTopicExchange() {
        String exchangeName="itcast.topic";
        String message="你看到了这句话";
        template.convertAndSend(exchangeName,"Chain.news",message);
    }

差异

Direct交换机与Topic交换机的差异

Topic交换机接收的消息RoutingKey必须是多个单词,以 . 分割

Topic交换机与队列绑定时的bindingKey可以指定通配符

消息转换器

在SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送。

生产者

    @Test
    public void testSimpleQueue(){
        String queueName="simple.queue";
        Map<String ,Object> map=new HashMap<>();
        map.put("name","angelababy");
        map.put("sex","woman");
        template.convertAndSend(queueName,map);
    }

Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。 如果要修改只需要定义一个MessageConverter 类型的Bean即可。推荐用JSON方式序列化,

父项目导入依赖

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

启动类中声明Bean也可以书写配置类,都一样

    @Bean
    public MessageConverter jsonMessageConverter(){
        return  new Jackson2JsonMessageConverter();
    }

消费者

也是需要声明Bean和生产者一样

    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(Map<String ,Object> msg){
        System.out.println("spring 消费者接收到消息 :【" + msg + "】");
    }

需要注意的就是接受消息的参数数据类型修改成Map的

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.kler.cn/a/9403.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

实验6 TensorFlow基础

1. 实验目的 掌握TensorFlow低阶API&#xff0c;能够运用TensorFlow处理数据以及对数据进行运算。 2.实验内容 ①实现张量维度变换&#xff0c;部分采样等&#xff1b; ②实现张量加减乘除、幂指对数运算&#xff1b; ③利用TensorFlow对数据集进行处理。 3.实验过程 题目…

亚马逊测评只能下单上好评?卖家倾向养号测评还有这些骚操作

亚马逊测评这对于绝大部分亚马逊卖家来说都不陌生&#xff0c;如今的亚马逊市场也很多卖家都在用测评科技来打造爆款。不过很多对于亚马逊测评的认知只停留在简单的刷销量&#xff0c;上好评。殊不知亚马逊养号测评还有其它强大的骚操作。 亚马逊自养号测评哪些功能呢&#xf…

Drone+Gitea CICD环境搭建流程笔记

之前没有用过drone&#xff0c;现在公司有用到&#xff0c;看drone.yml配置文件有很多没搞清楚的地方&#xff0c;所以打算自己走一遍配置流程&#xff0c;理清一些概念&#xff0c;这里记下笔记。 为了方便&#xff0c;drone&#xff0c;gitea以及相关软件都是用docker的版本…

Git(四):远程仓库的搭建、获取与更新

目录 1、搭建远程仓库 1.1 github 创建 Repository 2、获取远程仓库 2.1 克隆现有的仓库 2.2 在现有目录中初始化仓库 3、更新提加到仓库 3.1 记录每次更新到仓库 3.2 检查当前文件状态 3.3 跟踪新文件 3.3.1 查看跟踪的文件 3.4 暂存已修改文件 3.5 状态简览 3.6…

redis基础总结-常用命令

redis常用指令3. 常用指令3.1 key 操作分析3.1.1 key应该设计哪些操作&#xff1f;3.1.2 key 基本操作3.1.3 key 扩展操作&#xff08;时效性控制&#xff09;3.1.4 key 扩展操作&#xff08;查询模式&#xff09;3.2 数据库指令3.2.1 key 的重复问题3.2.2 解决方案3.2.3 数据库…

初识C语言 ——“C Primer Plus”

各位CSDN的uu们你们好呀&#xff0c;今天&#xff0c;小雅兰的内容是读一本好书&#xff0c;这一本书的名字就叫做《C Primer Plus》&#xff0c;那么&#xff0c;又回到了我们的初识C语言阶段啦&#xff0c;保证零基础都能看懂噢&#xff0c;下面&#xff0c;让我们进入C语言的…

TOGAF—架构治理

本章为架构治理提供了框架和指南。 3.1 引言 本节介绍治理的性质和治理级别。 3.1.1 企业内部的治理层次 架构治理是管理企业架构和其他架构的实践和方向 并在企业范围内进行控制。 架构治理通常不是孤立运行的&#xff0c;而是在治理结构的层次结构中运行的&#xff0c;它…

leaflet使用L.geoJSON加载文件,参数onEachFeature的使用方法(129)

第129个 点击查看专栏目录 本示例的目的是介绍演示如何在vue+leaflet中加载geojson文件,这里介绍onEachFeature的使用方法。onEachFeature 选项是在将每个功能添加到GeoJSON图层之前调用的功能。使用此选项通常是为了点击某个功能时可以附加弹出窗口。 直接复制下面的 vue+le…

redis set list

Listlist: 插入命令&#xff1a;lpush / rpush 查看list列表所有数据(-1 表示最后一个)&#xff1a;lrange key 0 -1 查看列表长度(key 不存在则长度返回0 ): llen key list长度 获取下表 为 0 的元素 修改下标为0的元素&#xff0c;改为haha 移除列表的第一个元素 或最后一…

一天吃透计算机网络八股文

网络分层结构 计算机网络体系大致分为三种&#xff0c;OSI七层模型、TCP/IP四层模型和五层模型。一般面试的时候考察比较多的是五层模型。最全面的Java面试网站 五层模型&#xff1a;应用层、传输层、网络层、数据链路层、物理层。 应用层&#xff1a;为应用程序提供交互服务…

IDEA2020.1 Failed to execute goal org.codehaus.mojo:exec-maven-plugin

报错内容&#xff1a;Failed to execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:exec (default-cli) on project leetcode: Command execution failed. 解决&#xff1a;不要勾选

Sentry安装使用(最全最细)

Sentry安装使用(最全最细&#xff0c;包括解决邮箱发送问题&#xff0c;https上传问题&#xff0c;https访问问题&#xff0c;安装此教程配置即可) ##服务器操作系统为2核8G,CentOS7.9 ##安装Docker-ce yum install -y yum-utils \device-mapper-persistent-data \lvm2yum-c…

你是真的“C”——宏与函数的英雄本色

你是真的“C”——宏定义的精髓&#x1f60e;前言&#x1f64c;一、何为宏&#xff1f;#define 定义宏二、宏和函数的对比&#xff1a; &#x1f60a;三、 #undef总结撒花&#x1f49e;&#x1f60e;博客昵称&#xff1a;博客小梦 &#x1f60a;最喜欢的座右铭&#xff1a;全神…

4月11日作业修订

A.这主要看你互斥锁锁的资源是那部分的&#xff0c;如果是进程内资源&#xff0c;则可以实现同一进程不同线程之间的互斥&#xff0c;而如果将共享内存作为互斥锁进行操作则可以实现不同进程之间的互斥。 B.这是必然的&#xff0c;加锁是为了防止数据的二义性 C.信号量同时使…

【Linux】基础IO_文件操作

环境&#xff1a;centos7.6&#xff0c;腾讯云服务器Linux文章都放在了专栏&#xff1a;【Linux】欢迎支持订阅 相关文章推荐&#xff1a; 【Linux】冯.诺依曼体系结构与操作系统 【C/进阶】如何对文件进行读写&#xff08;含二进制&#xff09;操作&#xff1f; 预备知识 在C…

Docker 部署Jira8.1.0

Jira与Confluence一样&#xff0c;都需要用到独立的数据库&#xff0c;对于数据库的安装我们不做介绍&#xff0c;主要介绍如何用Docker部署Jira以及对Jira进行破解的操作。 1、数据库准备 关于数据库官方文档说明&#xff1a;https://confluence.atlassian.com/adminjiraserv…

憨批的语义分割重制版11——Keras 搭建自己的HRNetV2语义分割平台

憨批的语义分割重制版11——Keras 搭建自己的HRNetV2语义分割平台学习前言什么是HRNetV2模型代码下载HRNetV2实现思路一、预测部分1、主干网络介绍a、Section-1b、Section-2c、Section-3d、Section-42、特征整合部分3、利用特征获得预测结果二、训练部分1、训练文件详解2、LOSS…

stm32 esp01s Qt 巴法云平台控制小灯

最近一直在想着用esp01s和stm32做个控制的东西&#xff0c;现在先把现在做好的一部分写出来&#xff0c;巴法云平台我觉得是一个不错物联网平台&#xff0c;接口文档资料都十分清晰。 这个demo是esp1s和stm32串口通信&#xff0c;然后qt上位机和esp01s是tcp通信 这样就可以实现…

C++模板基础(六)

类模板与成员函数模板 ● 使用 template 关键字引入模板&#xff1a; template class B {…}; – 类模板的声明与定义 翻译单元的一处定义原则 template<typename T> class B; //类模板的声明template<typename T> class B //类模板的定义 {};template<typenam…

故障定级和定责

故障管理的第一步是对故障的理解&#xff0c;只有正确地面对故障&#xff0c;我们才能够找到更合理的处理方式。 这便需要做两个工作&#xff1a;一是跟踪线上故障处理和组织故障复盘&#xff0c;二是制定故障定级定责标准&#xff0c;同时有权对故障做出定级和定责。 所以&a…
最新文章