SpringBoot3 整合Kafka

官网:https://kafka.apache.org/documentation/

消息队列-场景

1. 异步

img

2. 解耦

img

3. 削峰

img

4. 缓冲

img

消息队列-Kafka

1. 消息模式

img

消息发布订阅模式,MessageQueue中的消息不删除,会记录消费者的偏移量

2. Kafka工作原理

img

同一个消费者组里的消费者是队列竞争模式:Consumer1消费Broker-0的news消息,Consumer2消费Broker-1的news消息,Consumer3消费Broker-2的news消息。如果有Consumer4,那他哪个分区都不能消费,就是消费的饥饿问题。

不同消费组中的消费者是发布/订阅模式:Consumer1和Consumer4都能消费0分区(Broker0)。

3. SpringBoot整合

参照:https://docs.spring.io/spring-kafka/docs/current/reference/html/#preface

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

配置

spring.kafka.bootstrap-servers=172.20.128.1:9092

修改C:\Windows\System32\drivers\etc\hosts文件,配置8.130.32.70 kafka

4. 消息发送

kafkaTemplate发送消息的内容是对象时,需要用json序列化,在配置文件中加上:

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

@SpringBootTest
class Boot07KafkaApplicationTests {

    @Autowired
    KafkaTemplate kafkaTemplate;
    @Test
    void contextLoads() throws ExecutionException, InterruptedException {
        StopWatch watch = new StopWatch();
        watch.start();
        CompletableFuture[] futures = new CompletableFuture[10000];
        for (int i = 0; i < 10000; i++) {
            CompletableFuture send = kafkaTemplate.send("order", "order.create."+i, "订单创建了:"+i);
            futures[i]=send;
        }
        CompletableFuture.allOf(futures).join();
        watch.stop();
        System.out.println("总耗时:"+watch.getTotalTimeMillis());
    }
}
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void someMethod() {
        this.kafkaTemplate.send("someTopic", "Hello");
    }

}

5. 消息监听

@Component
public class OrderMsgListener {

    @KafkaListener(topics = "order",groupId = "order-service")
    public void listen(ConsumerRecord record){
        System.out.println("收到消息:"+record); //可以监听到发给kafka的新消息,以前的拿不到
    }

    @KafkaListener(groupId = "order-service-2",topicPartitions = {
            @TopicPartition(topic = "order",partitionOffsets = {
                    @PartitionOffset(partition = "0",initialOffset = "0")
            })
    })
    public void listenAll(ConsumerRecord record){
        System.out.println("收到partion-0消息:"+record);
    }
}

6. 参数配置

消费者

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another

生产者

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false

7. 自动配置原理

kafka 自动配置在KafkaAutoConfiguration

  1. 容器中放了 KafkaTemplate 可以进行消息收发
  2. 容器中放了KafkaAdmin 可以进行 Kafka 的管理,比如创建 topic 等
  3. kafka 的配置在KafkaProperties
  4. @EnableKafka可以开启基于注解的模式

toConfiguration`

  1. 容器中放了 KafkaTemplate 可以进行消息收发
  2. 容器中放了KafkaAdmin 可以进行 Kafka 的管理,比如创建 topic 等
  3. kafka 的配置在KafkaProperties
  4. @EnableKafka可以开启基于注解的模式

image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/272720.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

开发辅助一(网关gateway+ThreadLocal封装用户信息+远程调用+读取配置文件+统一异常处理)

网关gateway模块 ①、配置文件&#xff0c;添加各个服务模块的路由路径 gateway:routes:-id: server-cart #微服务名称uri: lb://service-cart #负责均衡predicates:- Path/api/order/cart/**ThreadLocal ①、定义一个工具类 public class AuthContextUtil{private static…

Zookeeper应用场景有哪些?

ZooKeeper是⼀个典型的发布/订阅模式的分布式数据管理与协调框架&#xff0c;我们可以使⽤它来进⾏分布式数据的发布与订阅。另⼀⽅⾯&#xff0c;通过对ZooKeeper中丰富的数据节点类型进⾏交叉使⽤&#xff0c;配合Watcher事件通知机制&#xff0c;可以⾮常⽅便地构建⼀系列分…

记一次Mac端mysql重置密码

在执行mysql命令的时候&#xff0c;报如下的错误&#xff0c;表示不支持mysql命令&#xff1a; zsh: command not found: mysql 1. 先查看mysql服务是否存在 在系统偏好设置中查看&#xff1a; 2. 发现mysql服务已经在运行&#xff0c;可能因为/usr/local/bin目录下缺失mysq…

How to Clean Text for Machine Learning with Python

NLP 在本教程中&#xff0c;您将了解如何清理和准备文本&#xff0c;以便使用机器学习进行建模。 完成本教程后&#xff0c;您将了解&#xff1a; 如何通过开发自己的非常简单的文本清理工具开始。 如何更上一层楼并使用 NLTK 库中更复杂的方法。 在使用现代文本表示方法&am…

python3遇到Can‘t connect to HTTPS URL because the SSL module is not available.

远程服务器centos7系统上有minicoda3&#xff0c;觉得太占空间&#xff0c;就把整个文件夹删了&#xff0c;原先的Python3也没了&#xff0c;都要重装。 我自己的步骤&#xff1a;进入管理员模式 1.下载Python3的源码&#xff1a; wget https://www.python.org/ftp/python/3.1…

熟悉DHCP面临的安全威胁与防护机制

一个网络如果要正常地运行&#xff0c;则网络中的主机&#xff08;Host&#xff09;必需要知道某些重要的网络参数&#xff0c;如IP地址、网络掩码、网关地址、DNS服务器地址、网络打印机地址等等。显然&#xff0c;在每台主机上都采用手工方式来配置这些参数是非常困难的、或是…

c语言结构体(初阶)

1. 结构体的声明 1.1 结构体的基础知识 结构是一些值的集合&#xff0c;这些值被称为成员变量。结构的每个成员可以是不同类型的变量。 1.2 结构的声明 struct tag {member - list; }variable-list; 例&#xff1a;描述一个人的信息&#xff1a;名字电话性别身高 //声明的…

使用python netmiko模块批量配置Cisco、华为、H3C路由器交换机(支持 telnet 和 ssh 方式)

0. 当前环境 外网电脑Python版本&#xff1a;3.8.5&#xff08;安装后不要删除安装包&#xff0c;以后卸载的时候用这个&#xff09;外网电脑安装netmiko第三方库&#xff1a;cmd中输入pip install netmiko内网电脑环境&#xff1a;无法搭建python环境&#xff0c;需外网电脑完…

怎么实现Servlet的自动加载

在实际开发时&#xff0c;有时候会希望某些Servlet程序可以在Tomcat启动时随即启动。但在默认情况下&#xff0c;第一次访问servlet的时候&#xff0c;才创建servlet对象。 如果servlet构造函数里面的代码或者init方法里面的代码比较多&#xff0c;就会导致用户第一次访问serv…

Chrome插件精选 — 前端工具

Chrome实现同一功能的插件往往有多款产品&#xff0c;逐一去安装试用耗时又费力&#xff0c;在此为某一类型插件挑选出比较好用的一款或几款&#xff0c;尽量满足界面精致、功能齐全、设置选项丰富的使用要求&#xff0c;便于节省一个个去尝试的时间和精力。 1. FeHelper(前端助…

分布式下如何实现统一日志系统?

在业务系统开发中&#xff0c;日志的收集和分析很重要&#xff0c;特别是在进行故障分析时&#xff0c;日志记录得好&#xff0c;可以帮我们快速定位问题原因。在互联网分布式系统下&#xff0c;日志变得越来越分散&#xff0c;数据规模也越来越大&#xff0c;如何更好地收集和…

现代 NLP:详细概述,第 1 部分:transformer

阿比吉特罗伊 一、说明 近五年来&#xff0c;随着 BERT 和 GPT 等思想的引入&#xff0c;我们在自然语言处理领域取得了巨大的成就。在本文中&#xff0c;我们的目标是逐步深入研究改进的细节&#xff0c;并了解它们带来的演变。 二、关注就是你所需要的 2017 年&#xff0c;来…

环保行业的物联网升级:采用钡铼技术R10

随着环境保护意识的增强和可持续发展的迫切需求&#xff0c;物联网技术在环保行业中扮演着越来越重要的角色。为了满足环保监测和数据采集的需求&#xff0c;钡铼技术R10在物联网应用中具有独特的优势。本文将探讨R10的参数和功能&#xff0c;并阐述其在环保行业中的应用前景。…

主流级显卡的新选择,Sparkle(撼与科技)Intel Arc A750兽人体验分享

▼前言 对于玩家而言&#xff0c;英特尔独显的出现不仅打破了NVIDIA与AMD双雄天下的局面&#xff0c;而且旗下的Arc A系列显卡还拥有不俗的做工性能以及颇具优势的价格&#xff0c;无论是升级或者是装新机都非常合适。如果要在Arc A系列当中选一个性能不俗&#xff0c;能够满足…

anaconda prompt进入虚拟环境 打开spyder

目录 1.查看有多少虚拟环境 2.conda create 指令创建新的虚拟环境 3.进入虚拟环境 4.spyder进入虚拟环境 5.退出虚拟环境 6.删除虚拟环境 1.查看有多少虚拟环境 打开anaconda prompt&#xff0c;输入 conda env list 2.conda create 指令创建新的虚拟环境 conda cre…

LinkedList与链表

[本节目标] 1.ArrayList的缺陷 2.链表 3.链表相关oj题 4.LinkedList的模拟实现 5.LinkedList的使用 6.ArratList和LinkedList的区别 1. ArrayList的缺陷 上篇博客已经熟悉了ArrayList的使用,并且进行了简单模拟实现,ArrayList底层使用数组来储存元素: public class Ar…

K8S 中对 Windows 节点的利用

目录 漏洞概述 漏洞详情 ​编辑 漏洞验证 补丁分析 在集群中探索 参考资料 在许多组织中&#xff0c;所运行的很大一部分服务和应用是 Windows 应用。Windows 容器提供了一种封装进程和包依赖项的方式&#xff0c;从而简化了 DevOps 实践&#xff0c;令 Windows 应用程序…

深入探究MongoDB:从基础到实战,一个全面的指南

MongoDB:海量数据库的介绍 定义与命名由来: MongoDB源自词“humongous”,意味着“巨大无比”。因此,MongoDB可译为“海量数据库”。类型: MongoDB是一种非关系型(NoSQL)数据库。与传统的关系型数据库相比,它的显著特点是不使用SQL语句。数据结构更灵活,没有固定的数据类…

装饰模式(单一责任)

Decorator&#xff08;装饰模式&#xff1a;单一责任模式&#xff09; 链接&#xff1a;装饰模式实例代码 解析 目的 在某些情况下我们可能会“过度地使用继承来扩展对象的功能”&#xff0c;由于继承为类型引入的静态特质&#xff0c;使得这种扩展方式缺乏灵活性&#xff…

【C语言】指针详解(四)

目录 1.assert断言 2.指针的使用和传址调用 2.1strlen的模拟使用 2.2传值调用和传址调用 1.assert断言 assert.h头文件定义了宏 assert()&#xff0c;用于在运行时确保程序符合指定条件&#xff0c;如果不符合&#xff0c;就报错终止运行。这个宏常常被称为“断言”。 例如…
最新文章