RocketMQ发送和接收方式详解

RocketMQ有几种发送方式

RocketMQ 提供了几种不同的消息发送方式,以满足不同场景下的需求。这些发送方式主要包括:

同步发送(Synchronous):这是最常见的一种发送方式,客户端发送消息后,会等待服务器端的响应。只有当消息完全被服务器接收,客户端才会继续执行。这种方式适用于对可靠性要求较高的场景。

异步发送(Asynchronous):客户端在发送消息后不会立即等待服务器的响应,而是通过一个回调接口来处理服务器的响应,这样可以提高消息发送的吞吐量。这种方式适用于对响应时间要求较敏感的场景。

单向发送(One-way):这种方式是最轻量级的一种发送方式,客户端只负责发送消息,不等待服务器的响应,也不关心消息是否被服务器接收。这种方式的可靠性最低,但吞吐量最高,适用于对可靠性要求不高的日志收集等场景。

顺序发送(Ordered):RocketMQ 还支持顺序发送消息,确保特定主题的消息能按照发送的顺序被消费。这对于需要保证消息顺序的业务场景(如订单处理)非常重要。**顺序必须是同步的**

这些发送方式可以根据具体的业务需求和场景来选择使用,以达到最佳的性能和可靠性平衡。

不同发送方式的调用方式或者不同配置

在使用 RocketMQ 进行消息发送时,可以根据不同的发送需求选择合适的发送方式。以下是不同发送方式的基本调用方式或配置示例:

1. 同步发送(Synchronous)
同步发送是指消息发送方将消息发送给服务器后,会等待服务器的响应,确认消息已经被接收后才继续执行后续操作。

// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 设置NameServer地址
producer.setNamesrvAddr("nameserver1:9876");
// 启动Producer实例
producer.start();

// 创建消息实例,指定topic,tag和消息体
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));

// 调用send方法发送消息,并获取发送结果
SendResult sendResult = producer.send(msg);

// 打印发送结果
System.out.printf("%s%n", sendResult);

// 当不再发送消息时,关闭Producer实例
producer.shutdown();

2. 异步发送(Asynchronous)
异步发送是指消息发送方发送消息后,不会立即等待服务器的响应,而是提供一个回调接口,服务器响应时通过回调接口来通知发送方。

// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 设置NameServer地址
producer.setNamesrvAddr("nameserver1:9876");
// 启动Producer实例
producer.start();

// 创建消息实例,指定topic,tag和消息体
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));

// 异步发送消息
producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        // 消息发送成功的回调
        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
    }

    @Override
    public void onException(Throwable e) {
        // 消息发送失败的回调
        System.out.printf("%-10d Exception %s %n", index, e);
        e.printStackTrace();
    }
});

// 关闭Producer实例
producer.shutdown();

3. 单向发送(One-way)
单向发送是指消息发送方只负责发送消息,不等待服务器的响应也不关心消息是否被服务器接收,适用于对可靠性要求不高的场景。

// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 设置NameServer地址
producer.setNamesrvAddr("nameserver1:9876");
// 启动Producer实例
producer.start();

// 创建消息实例,指定topic,tag和消息体
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));

// 单向发送消息,没有返回结果
producer.sendOneway(msg);

// 关闭Producer实例
producer.shutdown();

4. 顺序发送(Ordered)
顺序发送保证了特定主题的消息能按照发送的顺序来消费。

// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 设置NameServer地址
producer.setNamesrvAddr("nameserver1:9876");
// 启动Producer实例
producer.start();

String[] tags = new String[]{"TagA", "TagB", "TagC"};
// 订单列表
List<OrderStep> orderList = new ArrayList<OrderStep>();
orderList.add(new OrderStep(15103111039L, "创建"));
orderList.add(new OrderStep(15103111065L, "支付"));
orderList.add(new OrderStep(15103111039L, "完成"));

for (int i = 0; i < 10; i++) {
    // 加入订单的时间戳来模拟简单的订单ID
    String body = orderList.get(i % orderList.size()) + " 时间:" + System.currentTimeMillis();
    Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());
    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            Long id = (Long) arg;  // 根据订单id选择发送queue
            long index = id % mqs.size();
            return mqs.get((int) index);
        }
    }, orderList.get(i % orderList.size()).getOrderId());  // 订单id

    System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
            sendResult.getSendStatus(),
            sendResult.getMessageQueue().getQueueId(),
            body));
}

// 关闭Producer实例
producer.shutdown();

每种发送方式都有其适用的场景,可以根据实际的业务需求选择使用。在实际开发中,需要根据具体的业务逻辑对示例代码进行适当的调整和优化。

不同接收方式的调用方式或者不同配置

1. 集群消费(Clustering)
集群消费模式下,同一个消费者组中的多个消费者实例共同消费主题下的消息,每条消息只会被消费一次。

// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
// 设置NameServer地址
consumer.setNamesrvAddr("nameserver1:9876");
// 订阅一个或多个Topic,以及Tag来过滤需要消费的消息
consumer.subscribe("TopicTest", "*");
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            // 消费消息
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));
        }
        // 标记该消息已经被成功消费
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
// 启动消费者实例
consumer.start();

2. 广播消费(Broadcasting)
广播消费模式下,消息会被消费者组中的每个消费者都消费一次。

// 实例化消费者,并设置消费模式为广播模式
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setNamesrvAddr("nameserver1:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

3. 顺序消费(Orderly)
顺序消费保证了特定主题的消息能按照发送的顺序来消费。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("nameserver1:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        for (MessageExt msg : msgs) {
            System.out.printf("%s Receive New Messages: %s, QueueId: %d %n", Thread.currentThread().getName(), new String(msg.getBody()), msg.getQueueId());
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
});
consumer.start();

4. 延迟消费
延迟消费不是通过消费者的特定设置来实现的,而是在发送消息时设置消息的延迟等级。

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("nameserver1:9876");
producer.start();
Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes());
// 设置延迟等级3,这会使消息延迟10s再被消费
msg.setDelayTimeLevel(3);
SendResult sendResult = producer.send(msg);
producer.shutdown();

接收延迟消息的代码与普通消息的接收方式相同,不需要特殊配置。

5. 重试和死信队列
对于处理失败的消息,RocketMQ 会自动重试,无需特别配置。如果重试次数达到上限仍然失败,消息会被转移到死信队列。消费死信队列中的消息需要订阅特定的Topic(%DLQ%+消费者组名)。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("nameserver1:9876");
// 订阅死信队列
consumer.subscribe("%DLQ%ConsumerGroupName", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            System.out.printf("DLQ Message: %s %n", new String(msg.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

不同的接收方式适用于不同的业务场景,可以根据实际需求选择最合适的方式。在实际应用中,可能需要结合业务逻辑对示例代码进行适当的调整和优化。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.kler.cn/a/274089.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

<JavaEE> 了解网络层协议 -- IP协议

目录 初识IP协议 什么是IP协议&#xff1f; IP协议中的基础概念 IP协议格式 图示 4bit版本号&#xff08;version&#xff09; 4bit头部长度&#xff08;headerlength&#xff09; 8bit服务类型&#xff08;TypeOfService&#xff09; 16bit总长度&#xff08;total l…

【蓝桥杯每日一题】填充颜色超详细解释!!!

为了让蓝桥杯不变成蓝桥悲&#xff0c;我决定在舒适的周日再来一道题。 例&#xff1a; 输入&#xff1a; 6 0 0 0 0 0 0 0 0 1 1 1 1 0 1 1 0 0 1 1 1 0 0 0 1 1 0 0 0 0 1 1 1 1 1 1 1 输出&#xff1a; 0 0 0 0 0 0 0 0 1 1 1 1 0 1 1 2 2 1 1 1 2 2 2 1 1 2 2 2 2 1 1…

AWS监控,AWS 性能监控工具

监控云部署的性能是 IT 环境正常运行的内在条件。AWS 云是一个架构良好的框架&#xff0c;管理员可以使用专用的AWS 性能监控工具增强服务的功能。执行AWS监视是为了跟踪在AWS环境中积极运行的应用程序工作负载和资源。AWS监视器跟踪各种AWS云指标&#xff0c;以帮助提高在其上…

【日常记录】【插件】使用ColorThief,跟随图片变化改变网页背景

文章目录 1、效果图2、ColorThief3、实现4、参考链接 1、效果图 想要实现,界面的背景颜色,跟随图片的 颜色来进行展示, 2、ColorThief 要想实现跟随图片变化实现网页背景渐变效果&#xff0c;则需要获取图片的主要颜色&#xff0c;可以使用ColorThief库来获取图片的颜色 需要注…

JDK1.8超详细安装教程

1、下载jdk1.8 大家可以直接去百度云盘下载&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/187N6CU9Gu4bjtOz5_cjd-A?pwd3535 提取码&#xff1a;35352、开始安装 双击下载好的.exe文件&#xff0c;点击下一步 修改安装路径&#xff0c;点击下一步 会顺带安装jre…

Json Web Token(JWT) 快速入门

推荐视频&#xff1a;【从零开始掌握JWT】 目录 第一章 会话跟踪 01 使用Cookie和Session&#xff0c;jsessionid 02 使用token 例子一&#xff1a;自定义token 例子二&#xff1a;使用redis存储token 第一章 会话跟踪 应用背景 &#xff1a;浏览器访问web应用&#xff…

Android 13 源码编译及报错修复

下载AOSP指定分支 repo init -u git://aosp../platform/manifest -b android-13.0.0_r83 同步代码到本地 repo sync -c 初始化编译环境, 选择构建目标 source build/envsetup.sh lunch 选择需要构建的目标&#xff0c;此处以aosp_arm64-eng为例 进行固件编译 make -j12 期间编译…

【C++庖丁解牛】继承的概念及定义 | 继承中的作用域 | 继承与友元继承与静态成员 | 复杂的菱形继承及菱形虚拟继承

&#x1f341;你好&#xff0c;我是 RO-BERRY &#x1f4d7; 致力于C、C、数据结构、TCP/IP、数据库等等一系列知识 &#x1f384;感谢你的陪伴与支持 &#xff0c;故事既有了开头&#xff0c;就要画上一个完美的句号&#xff0c;让我们一起加油 目录 1.继承的概念及定义1.1继…

Ubuntu双系统/home分区扩容

一、Windows系统中利用磁盘管理分出空闲区域&#xff0c;如果多就多分一些 二、插入安装Ubuntu的U盘启动盘&#xff0c;lenovo电脑F12&#xff08;其他电脑可选择其他类似方式&#xff09;选择U盘启动项&#xff0c;然后选择ubuntu&#xff0c;出现安装界面&#xff0c;再选择t…

clipboard好用的复制剪切库

clipboard是现代复制到剪贴板的工具&#xff0c;其 gzip 压缩后只有 3kb&#xff0c;能够减少选择文本的重复操作&#xff0c;点击按钮就可以复制指定内容&#xff0c;支持原生HTMLjs&#xff0c;vue3和vue2。使用方法参照官方文档&#xff0c;so easy&#xff01;&#xff01;…

springcloud gateway

一、 predicate : 就是你定义一些规则&#xff0c;如果满足了这些规则&#xff0c;就去找到对应的路由。 对于strip 二、自定义过略器和全局过滤器 约定大于配置&#xff0c;后缀不变&#xff0c;只改前缀 sentinel持久化 三、sentinel quick-start | Sentinel 信号量虽然简…

RPC 和 序列化

RPC 1 RPC调用流程 1.1 clerk客户端调用远程服务 Clerk::PutAppend() raftServerRpcUtil::PutAppend() raftServerRpcUtil是client与kvserver通信的入口&#xff0c; 包含kvserver功能的一对一映射&#xff1a;Get/PutAppend&#xff0c;通过stub对象——raftKVRpcProctoc:…

【系统架构师】-第19章-大数据架构设计理论与实践

四个特点&#xff1a; 大规模&#xff08;Volume&#xff09;、高速度&#xff08;Velocity&#xff09;和多样化&#xff08;Variety&#xff09;&#xff0c;价值&#xff08;Value&#xff09;。 五个问题&#xff1a; 异构性&#xff08;Heterogeneity&#xff09;、规模…

STP环路避免实验(思科)

华为设备参考&#xff1a;STP环路避免实验&#xff08;华为&#xff09; 一&#xff0c;技术简介 Spanning Tree Protocol&#xff08;STP&#xff09;&#xff0c;即生成树协议&#xff0c;是一种数据链路层协议。主要作用是防止二层环路&#xff0c;并自适应网络变化和故障…

代码随想录day20(2)二叉树:完全二叉树节点个数(leetcode222)

题目要求&#xff1a;求一个完全二叉树的节点个数 思路&#xff1a;首先完全二叉树可以用普通二叉树的方法来求&#xff0c;但是需要遍历所有的节点。 但是对于完全二叉树来说&#xff0c;只有最底层右侧的节点可能没满&#xff0c;其余每层节点都达到了最大值。所以我们可以…

Spring启动“--”设置参数没生效

现象 在idea中启动SpringBoot项目时&#xff0c;使用“--”设置的启动参数没有生效&#xff0c;如修改端口号“--server.port8082” 原因 排查发现是因为在使用SpringApplication.run启动项目时&#xff0c;没有将args参数传入run方法。 修复方案 SpringApplication.run参数中…

想要通过湖北建筑安全员ABC考试?这5个技巧助你一臂之力!

想要通过湖北建筑安全员ABC考试&#xff1f;这5个技巧助你一臂之力&#xff01; 2024年湖北建筑安全员ABC报名考试通过率 关于湖北省建筑安管人员考核管理系统考核通过率不是很固定&#xff0c;或高或低。安全员ABC测试有合格分数线&#xff0c;交卷后30分钟即可查询你的成绩…

RSA加密解密签名加签验签RsaUtils工具类

RSA加密解密RsaUtils工具类题 引言一、RsaUtils工具类代码二、优点三、缺点四、声明 引言 RSA算法基于大数因子分解难题&#xff0c;提供了公钥加密和私钥解密的能力。公钥用于加密&#xff0c;私钥则负责解密。这种特性使得RSA成为保证数据传输安全的理想选择。 公钥加密私钥…

106 基于消息队列来做 mysql 大数据表数据的遍历处理

前言 最近有这样的一个需求, 我们存在一张 很大的 mysql 数据表, 数据量大概是在 六百万左右 然后 需要获取所有的记录, 将数据传输到 es 中 然后 当时 我就写了一个脚本来读取 这张大表, 然后 分页获取数据, 然后 按页进行数据处理 转换到 es 但是存在的问题是, 前面 还…

尚硅谷SpringBoot3笔记 (二) Web开发

Spring Boot Web开发&#xff1a;24.Web开发-自动配置原理_哔哩哔哩_bilibili 1. Web场景 1.1 自动配置 整合web场景&#xff1a; <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId&g…
最新文章