im-system学习

文章目录

    • LimServer
    • LimServer
    • snakeyaml
      • 依赖
      • 使用
      • 配置类
      • 配置文件
    • 私有协议解码
      • MessageDecoder
      • ByteBufToMessageUtils

这个很全: IM即时通讯系统[SpringBoot+Netty]——梳理(总)

IO线程模型

Redis 分布式客户端 Redisson 分布式锁快速入门

LimServer

public class LimServer {

    private final static Logger logger = LoggerFactory.getLogger(LimServer.class);
    BootstrapConfig.TcpConfig config;
    EventLoopGroup mainGroup;
    EventLoopGroup subGroup;
    ServerBootstrap server;

    public LimServer(BootstrapConfig.TcpConfig config) {
        this.config = config;
        mainGroup = new NioEventLoopGroup(config.getBossThreadSize());
        subGroup = new NioEventLoopGroup(config.getWorkThreadSize());
        server = new ServerBootstrap();
        server.group(mainGroup,subGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 10240) // 服务端可连接队列大小
                .option(ChannelOption.SO_REUSEADDR, true) // 参数表示允许重复使用本地地址和端口
                .childOption(ChannelOption.TCP_NODELAY, true) // 是否禁用Nagle算法 简单点说是否批量发送数据 true关闭 false开启。 开启的话可以减少一定的网络开销,但影响消息实时性
                .childOption(ChannelOption.SO_KEEPALIVE, true) // 保活开关2h没有数据服务端会发送心跳包
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new MessageDecoder());
                        ch.pipeline().addLast(new MessageEncoder());
//                        ch.pipeline().addLast(new IdleStateHandler(
//                                0, 0,
//                                10));
                        ch.pipeline().addLast(new HeartBeatHandler(config.getHeartBeatTime()));
                        ch.pipeline().addLast(new NettyServerHandler(config.getBrokerId(),config.getLogicUrl()));
                    }
                });
    }

    public void start(){
        this.server.bind(this.config.getTcpPort());
    }


}

LimServer

public class LimWebSocketServer {

    private final static Logger logger = LoggerFactory.getLogger(LimWebSocketServer.class);

    BootstrapConfig.TcpConfig config;
    EventLoopGroup mainGroup;
    EventLoopGroup subGroup;
    ServerBootstrap server;

    public LimWebSocketServer(BootstrapConfig.TcpConfig config) {
        this.config = config;
        mainGroup = new NioEventLoopGroup();
        subGroup = new NioEventLoopGroup();
        server = new ServerBootstrap();
        server.group(mainGroup, subGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 10240) // 服务端可连接队列大小
                .option(ChannelOption.SO_REUSEADDR, true) // 参数表示允许重复使用本地地址和端口
                .childOption(ChannelOption.TCP_NODELAY, true) // 是否禁用Nagle算法 简单点说是否批量发送数据 true关闭 false开启。 开启的话可以减少一定的网络开销,但影响消息实时性
                .childOption(ChannelOption.SO_KEEPALIVE, true) // 保活开关2h没有数据服务端会发送心跳包
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        // websocket 基于http协议,所以要有http编解码器
                        pipeline.addLast("http-codec", new HttpServerCodec());
                        // 对写大数据流的支持
                        pipeline.addLast("http-chunked", new ChunkedWriteHandler());
                        // 几乎在netty中的编程,都会使用到此hanler
                        pipeline.   addLast("aggregator", new HttpObjectAggregator(65535));
                        /**
                         * websocket 服务器处理的协议,用于指定给客户端连接访问的路由 : /ws
                         * 本handler会帮你处理一些繁重的复杂的事
                         * 会帮你处理握手动作: handshaking(close, ping, pong) ping + pong = 心跳
                         * 对于websocket来讲,都是以frames进行传输的,不同的数据类型对应的frames也不同
                         */
                        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
                        pipeline.addLast(new WebSocketMessageDecoder());
                        pipeline.addLast(new WebSocketMessageEncoder());
                        pipeline.addLast(new NettyServerHandler(config.getBrokerId(),config.getLogicUrl()));
                    }
                });
    }

    public void start(){
        this.server.bind(this.config.getWebSocketPort());
    }
}

snakeyaml

依赖

<!-- yaml -->
<dependency>
    <groupId>org.yaml</groupId>
    <artifactId>snakeyaml</artifactId>
    <version>${snakeyaml.version}</version>
</dependency>

使用

    private static void start(String path){
        try {
            Yaml yaml = new Yaml();
            InputStream inputStream = new FileInputStream(path);
            BootstrapConfig bootstrapConfig = yaml.loadAs(inputStream, BootstrapConfig.class);

            new LimServer(bootstrapConfig.getLim()).start();
            new LimWebSocketServer(bootstrapConfig.getLim()).start();

            RedisManager.init(bootstrapConfig);
            MqFactory.init(bootstrapConfig.getLim().getRabbitmq());
            MessageReciver.init(bootstrapConfig.getLim().getBrokerId()+"");
            registerZK(bootstrapConfig);

        }catch (Exception e){
            e.printStackTrace();
            System.exit(500);
        }
    }

配置类

@Data
public class BootstrapConfig {

    private TcpConfig lim;


    @Data
    public static class TcpConfig {
        private Integer tcpPort;// tcp 绑定的端口号

        private Integer webSocketPort; // webSocket 绑定的端口号

        private boolean enableWebSocket; //是否启用webSocket

        private Integer bossThreadSize; // boss线程 默认=1

        private Integer workThreadSize; //work线程

        private Long heartBeatTime; //心跳超时时间 单位毫秒

        private Integer loginModel;

        /**
         * redis配置
         */
        private RedisConfig redis;

        /**
         * rabbitmq配置
         */
        private Rabbitmq rabbitmq;

        /**
         * zk配置
         */
        private ZkConfig zkConfig;

        /**
         * brokerId
         */
        private Integer brokerId;

        private String logicUrl;

    }

    @Data
    public static class ZkConfig {
        /**
         * zk连接地址
         */
        private String zkAddr;

        /**
         * zk连接超时时间
         */
        private Integer zkConnectTimeOut;
    }

    @Data
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    public static class RedisConfig {

        /**
         * 单机模式:single 哨兵模式:sentinel 集群模式:cluster
         */
        private String mode;
        /**
         * 数据库
         */
        private Integer database;
        /**
         * 密码
         */
        private String password;
        /**
         * 超时时间
         */
        private Integer timeout;
        /**
         * 最小空闲数
         */
        private Integer poolMinIdle;
        /**
         * 连接超时时间(毫秒)
         */
        private Integer poolConnTimeout;
        /**
         * 连接池大小
         */
        private Integer poolSize;

        /**
         * redis单机配置
         */
        private RedisSingle single;

    }

    /**
     * redis单机配置
     */
    @Data
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    public static class RedisSingle {
        /**
         * 地址
         */
        private String address;
    }

    /**
     * rabbitmq哨兵模式配置
     */
    @Data
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Rabbitmq {
        private String host;

        private Integer port;

        private String virtualHost;

        private String userName;

        private String password;
    }

}

配置文件

lim:
  tcpPort: 9000
  webSocketPort: 19000
  bossThreadSize: 1
  workThreadSize: 8
  heartBeatTime: 20000 #心跳超时时间 单位毫秒
  brokerId: 1000
  loginModel: 3
  logicUrl: http://127.0.0.1:8000/v1
  #  *                多端同步模式:1 只允许一端在线,手机/电脑/web 踢掉除了本client+imel的设备
  #  *                            2 允许手机/电脑的一台设备 + web在线 踢掉除了本client+imel的非web端设备
  #  *                            3 允许手机和电脑单设备 + web 同时在线 踢掉非本client+imel的同端设备
  #  *                            4 允许所有端多设备登录 不踢任何设备

  redis:
    mode: single # 单机模式:single 哨兵模式:sentinel 集群模式:cluster
    database: 0
    password:
    timeout: 3000 # 超时时间
    poolMinIdle: 8 #最小空闲数
    poolConnTimeout: 3000 # 连接超时时间(毫秒)
    poolSize: 10 # 连接池大小
    single: #redis单机配置
      address: 127.0.0.1:6379
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    virtualHost: /
    userName: guest
    password: guest

  zkConfig:
    zkAddr: 127.0.0.1:2181
    zkConnectTimeOut: 5000

私有协议解码

MessageDecoder

public class MessageDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx,
                          ByteBuf in, List<Object> out) throws Exception {
    //请求头(指令
        // 版本
        // clientType
        // 消息解析类型
        // appId
        // imei长度
        // bodylen)+ imei号 + 请求体

        if(in.readableBytes() < 28){
            return;
        }

        Message message = ByteBufToMessageUtils.transition(in);
        if(message == null){
            return;
        }

        out.add(message);
    }
}

ByteBufToMessageUtils

/**
 * @author: Chackylee
 * @description: 将ByteBuf转化为Message实体,根据私有协议转换
 *               私有协议规则,
 *               4位表示Command表示消息的开始,
 *               4位表示version
 *               4位表示clientType
 *               4位表示messageType
 *               4位表示appId
 *               4位表示imei长度
 *               imei
 *               4位表示数据长度
 *               data
 *               后续将解码方式加到数据头根据不同的解码方式解码,如pb,json,现在用json字符串
 * @version: 1.0
 */
public class ByteBufToMessageUtils {

    public static Message transition(ByteBuf in){

        /** 获取command*/
        int command = in.readInt();

        /** 获取version*/
        int version = in.readInt();

        /** 获取clientType*/
        int clientType = in.readInt();

        /** 获取clientType*/
        int messageType = in.readInt();

        /** 获取appId*/
        int appId = in.readInt();

        /** 获取imeiLength*/
        int imeiLength = in.readInt();

        /** 获取bodyLen*/
        int bodyLen = in.readInt();

        if(in.readableBytes() < bodyLen + imeiLength){
            in.resetReaderIndex();
            return null;
        }

        byte [] imeiData = new byte[imeiLength];
        in.readBytes(imeiData);
        String imei = new String(imeiData);

        byte [] bodyData = new byte[bodyLen];
        in.readBytes(bodyData);


        MessageHeader messageHeader = new MessageHeader();
        messageHeader.setAppId(appId);
        messageHeader.setClientType(clientType);
        messageHeader.setCommand(command);
        messageHeader.setLength(bodyLen);
        messageHeader.setVersion(version);
        messageHeader.setMessageType(messageType);
        messageHeader.setImei(imei);

        Message message = new Message();
        message.setMessageHeader(messageHeader);

        if(messageType == 0x0){
            String body = new String(bodyData);
            JSONObject parse = (JSONObject) JSONObject.parse(body);
            message.setMessagePack(parse);
        }

        in.markReaderIndex();
        return message;
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.kler.cn/a/274377.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【FPGA】摄像头模块OV5640

本篇文章包含的内容 一、OV5640简介1.1 基本概述1.2 工作时序1.2.1 DVP Timing&#xff08;数据传输时序&#xff09;1.2.2 帧曝光工作模式 1.3 OV5640 闪光灯工作模式1.3.1 Xenon Flash&#xff08;氙灯闪烁&#xff09;模式1.3.2 LED 1&2 模式1.3.3 LED 3模式1.3.4 手动开…

【ESP32接入国产大模型之MiniMax】

1. MiniMax 讲解视频&#xff1a; ESP32接入语言大模型之MiniMax MM智能助理是一款由MiniMax自研的&#xff0c;没有调用其他产品的接口的大型语言模型。MiniMax是一家中国科技公司&#xff0c;一直致力于进行大模型相关的研究。 随着人工智能技术的不断发展&#xff0c;自然语…

Python入门(小白友好)

知识图谱 搭建环境 安装Python:Download Python | Python.org 安装PyCharm:Download PyCharm: The Python IDE for data science and web development by JetBrains 注意:专业版本是收费的,新手小白使用社区版(community)即可 创建第一个项目: 一些PyCharm的设置(也适用…

Springboot和Spring Cloud版本对应

Spring在不断地升级&#xff0c;各个版本存在一些不兼容的地方&#xff0c;为了避免出现问题&#xff0c;最好注意使用正确的版本。 官网的对应关系&#xff1a;https://start.spring.io/actuator/info 如下图&#xff1a; 下面附一下创建项目的工具&#xff1a; Spring官方…

ClickHouse--13--springboot+mybatis配置clickhouse

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 ClickHouse1.添加maven依赖2.配属数据源3.参数配置4.Druid连接池配置5.entity6.Mapper接口7.Mapper.xml8.controller接口9.创建一个clickhouse表10.测试 ClickHouse…

【复现】【免费】基于多时间尺度滚动优化的多能源微网双层调度模型

目录 主要内容 部分代码 结果一览 1.原文结果 2.程序运行结果 下载链接 主要内容 该模型参考《Collaborative Autonomous Optimization of Interconnected Multi-Energy Systems with Two-Stage Transactive Control Framework》&#xff0c;主要解决的是一个多…

springboot校服订购系统

摘 要 本文首先实现了校服订购系统设计与实现管理技术的发展随后依照传统的软件开发流程&#xff0c;最先为系统挑选适用的言语和软件开发平台&#xff0c;依据需求分析开展控制模块制做和数据库查询构造设计&#xff0c;随后依据系统整体功能模块的设计&#xff0c;制作系统的…

阿里云发布 AI 编程助手 “通义灵码”——VSCode更强了 !!

文章目录 什么是 通义灵码&#xff08;TONGYI Lingma&#xff09; 快速体验“通义灵码” 什么是“通义灵码”&#xff08;TONGYI Lingma&#xff09; 通义灵码&#xff08;TONGYI Lingma&#xff09;&#xff0c;是阿里云出品的一款基于通义大模型的智能编码辅助工具&#xff…

考研失败, 学点Java打小工_Day3_卫语句_循环

1 编码规范——卫语句 表达异常分支时&#xff0c;少用if-else方式。   比如成绩判断中对于非法输入的处理&#xff1a; /*>90 <100 优秀>80 <90 良好>70 <80 一般>60 <70 及格<60 不及格*/Testpu…

阿里云2核4G4M轻量应用服务器价格165元一年

阿里云优惠活动&#xff0c;2核4G4M轻量应用服务器价格165元一年&#xff0c;4Mbps带宽下载速度峰值可达512KB/秒&#xff0c;系统盘是60GB高效云盘&#xff0c;不限制月流量&#xff0c;2核2G3M带宽轻量服务器一年87元12个月&#xff0c;在阿里云CLUB中心查看 aliyun.club 当前…

[QJS xmake] 非常简单地在Windows下编译QuickJS!

文章目录 前言准备C编译器xmake编译包 工程准备修改版本号第一遍编译第二遍编译效果 前言 quickjs是个很厉害的东西啊&#xff0c;我一直想编译一下的&#xff0c;奈何一直没成功。现在找了点时间成功编译了&#xff0c;写篇文章记录一下。当前版本&#xff1a;2024-1-13 应该…

MySQL数据自动同步到Es

Logstash 测试数据准备 DROP DATABASE IF EXISTS es;CREATE DATABASE es DEFAULT CHARACTER SET utf8;USE es;CREATE TABLE book (id INT NOT NULL,title VARCHAR(20),author VARCHAR(20),price DECIMAL(6,2),PRIMARY KEY(id) );DROP PROCEDURE IF EXISTS batchInsertBook;DELI…

关系数据库:关系数据结构基础与概念解析

✨✨ 欢迎大家来访Srlua的博文&#xff08;づ&#xffe3;3&#xffe3;&#xff09;づ╭❤&#xff5e;✨✨ &#x1f31f;&#x1f31f; 欢迎各位亲爱的读者&#xff0c;感谢你们抽出宝贵的时间来阅读我的文章。 我是Srlua小谢&#xff0c;在这里我会分享我的知识和经验。&am…

代码随想录算法训练营第二十八天|93. 复原 IP 地址,78. 子集,90. 子集 II

93. 复原 IP 地址 题目 有效 IP 地址 正好由四个整数&#xff08;每个整数位于 0 到 255 之间组成&#xff0c;且不能含有前导 0&#xff09;&#xff0c;整数之间用 ‘.’ 分隔。 例如&#xff1a;“0.1.2.201” 和 “192.168.1.1” 是 有效 IP 地址&#xff0c;但是 “0.0…

前端 - 基础 表单标签 -- 表单元素( input - type属性) 文本框和密码框

表单元素 &#xff1a; 在表单域中可以定义各种表单元素&#xff0c;这些表单元素就是允许用户在表单中输入或选择 的内容控件。 表单元素的外观也各不一样&#xff0c;有小圆圈&#xff0c;有正方形&#xff0c;也有方框&#xff0c;乱七八糟的&#xff0c;各种各样&#xf…

【深度学习】深度学习md笔记总结第1篇:深度学习课程,要求【附代码文档】

深度学习笔记完整教程&#xff08;附代码资料&#xff09;主要内容讲述&#xff1a;深度学习课程&#xff0c;深度学习介绍要求,目标,学习目标,1.1.1 区别。TensorFlow介绍&#xff0c;2.2 图与TensorBoard学习目标,2.2.1 什么是图结构,2.2.2 图相关操作,2.2.3 TensorBoard:可视…

数据资产管理解决方案:构建高效、安全的数据生态体系

在数字化时代&#xff0c;数据已成为企业最重要的资产之一。然而&#xff0c;如何有效管理和利用这些数据资产&#xff0c;却是许多企业面临的难题。本文将详细介绍数据资产管理解决方案&#xff0c;帮助企业构建高效、安全的数据生态体系。 一、引言 在信息化浪潮的推动下&a…

Linux之线程同步

目录 一、问题引入 二、实现线程同步的方案——条件变量 1、常用接口&#xff1a; 2、使用示例 一、问题引入 我们再次看看上次讲到的多线程抢票的代码&#xff1a;这次我们让一个线程抢完票之后不去做任何事。 #include <iostream> #include <unistd.h> #inc…

PB-03F模组蓝牙基础+主从机指令的使用

文章目录 前言一、蓝牙基础指令1. ATBLEMAC 设置和查询蓝牙 MAC 地址2. ATBLEMODE 查询和设置蓝牙模式3. ATBLERFPWR 蓝牙设置或查询发射功率4. ATBLESTATE 查询连接状态5. ATBLEDISCON 断开蓝牙连接6. ATBLEMTU 查询或者设置 MTU7. ATBLESEND 向蓝牙透传通道发送数据8. ATTRAN…

mapstruct学习笔记-pojo之间的转换

1、前言 mapstruct中常用注解如Mapping,AfterMapping,BeanMapping等的使用,通过案例说明各式各样的业务pojo对象之间如何借助mapstruct完成相互之间的转换,减少代码量的同时也能突出业务逻辑流程,让你的代码里写起来更有规范可言。 2、简介 Reference Guide – MapStruct 3…
最新文章