Netty学习——源码篇5 EventLoop 备份

 1 Reactor线程模型

        Reactor线程模型 中对Reactor的三种线程模型——单线程模型、多线程模型、主从多线程模型做了介绍,这里具体分析Reactor在Netty中的应用。

1.1单线程模型

单线程模型处理流程如下图:

        单线程模型,即Accept的处理和Handler的处理都在同一个线程中。这个模型的弊端是:当其中某个Handler阻塞时,会导致其他所有的Client的Handler都无法执行,并且更严重是,Handler的阻塞也会导致整个服务不能接收新的Client请求(因为Accept也被阻塞了)。 因为这个缺陷,所以单线程Reactor模型在Netty中的应用场景比较少。

1.2 多线程模型

        Netty中Reactor多线程模型的应用如如下图:

        1、设计一个专门的线程Accept,用于监听客户端的TCP连接请求。

        2、客户端的I/O操作都是由一个特定的NIO线程池负责。每个客户端连接都与一个特定的NIO线程绑定,因此在这个客户端连接中的所有I/O操作都是在同一个线程中完成的。

        3、客户端连接有很多,但是NIO线程数是比较少的,因此一个NIO线程可以同时绑定到多个客户端连接中。

1.3 主从多线程模型

        主从Reactor多线程模型在Netty中的应用,如下图

             一般情况下,Reactor的多线程模型已经适用于大部分业务场景。但如果服务端需要同时处理大量的客户端连接请求,或者需要再客户端连接时增加一些诸如权限的校验等操作,那么单个Accept就很有可能处理不过来,将会造成大量的客户端连接超时。主从Reactor多线程模型将服务端接收客户端的连接请求专门设计为一个独立的连接池。主从Reactor多线程模型和Reactor多线程模型很类似,只是在主动Reactor多线程模型的Accept线程池中获取数据,通过认证鉴权后进行派遣,再分配给Reactor线程池来处理客户端请求。

2 EventLoopGroup与Reactor关联

        不同的设置NioEventLoopGroup的方式对应了不同的Reactor线程模型。

        1、单线程模型在Netty中的应用代码如下:

        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup);

            首先实例化一个NioEventLoopGroup,接着调用bootstrap.group(bossGroup)设置服务端的EventLoopGroup。这里有个疑惑:在启动服务端的Netty程序时,需要设置bossGroup和workerGroup,为什么这里只设置了1个bossGroup?原因很简单,ServerBootstrap重写了group方法,代码如下:

@Override
    public ServerBootstrap group(EventLoopGroup group) {
        return group(group, group);
    }

        因此,当传入一个group时,bossGroup和workerGroup就是同一个NioEventLoopGrouop,并且这个NioEventLoopGroup线程池数量只设置了1个线程,也就是说Netty中的Acceptor和后续的所有客户端连接的I/O操作都是在一个线程中处理的。那么对应到Reactor的线程模型中,这样设置NioEventLoopGroup,就相当于Reactor的单线程模式。

        2、多线程在Netty中的应用代码如下:

        NioEventLoopGroup bossGroup = new NioEventLoopGroup(16);
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup);

        从代码中可以看出,只需要将NioEventLoopGroup的参数设置大于1,就是Reactor多线程模型。

        3、主从Reactor模型在Netty中的应用代码如下:

        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workGroup = new NioEventLoopGroup();
        ServerBootstrap server = new ServerBootstrap();
        server.group(bossGroup,workGroup);

        bossGroup为主线程,而workerGroup中的线程数是CPU核数*2,因此对应到Reactor线程模型中,这样设置的NioevGroup就是主从Reactor多线程模型。

3 EventLoopGroup的实例化

        首先,来看一下EventLoopGroup的类结构图,如下图:

        然后,在通过时序图来了解一下EventLoopGroup初始化的基本过程,如下图:

 

          基本步骤如下:

        1、EventLoopGroup内部维护一个属性为EventExecutor的children的数组,其大小是nThreads,这样就初始化一个线程池。

        2、在实例化NioEventLoopGroup时,如果指定县城池大小,则nThreads就是指定的值,否则是CPU核数 * 2。

        3、在MultithreadEventExecutorGroup中调用newChild()抽象方法来初始化children数组。

        4、newChild()抽象方法实际上是在NioEventLoopGroup中实现的,由它返回一个NioEventLoop实例。

        5、初始化NioEventLoop对象并给属性赋值,具体赋值属性如下:

                (1)provider:就是在NioEventLoopGroup构造器中,调用SelectorProvider的provider()方法获取的SelectorProvider对象。

                (2)selector:就是在NioEventLoop构造器中,调用selector=provider.openSelector()方法获取的Selector对象。

4 执行任务者EventLoop

        NioEventLoop继承自SingleThreadEventLoop,而SingleThreadEventLoop又继承自SingleThreadEventExecutor。SingleThreadEventExecutor是Netty对本地线程的抽象,它内部有一个Thread属性,实际上就是存储了一个本地Java 线程。因此可以简单的认为,一个NioEventLoop对象其实就是一个和特定的线程进行绑定,并且在NioEventLoop声明周期内,其绑定的线程都不会再改变,NioEventLoop的类层次结构图如下:

        NioEventLoop的类层次结构比较复杂,只需要关注重点即可。首先看NioEventLoop的继承关系:NioEventLoop继承SingleThreadEventLoop, SingleThreadEventLoop继承SingleThreadEventExecutor,SingleThreadEventExecutor继承AbstractScheduledEventExecutor。

        在AbstractScheduledEventExecutor,Netty实现了NioEventLoop的Schedule功能,即通过调用一个NioEventLoop实例的schedule方法来运行一些定时任务。而在SingleThreadEventLoop中,又实现了任务队列的功能。通过它,可以调用一个NioEventLoop实例的execute()方法向任务队列中添加一个Task,并由NioEventLoop进行调度执行。

        通常来说,NioEventLoop负责执行两个任务:第一个任务是作为I/O线程,执行与Channel相关的I/O操作,包括调用Selector等待就绪的I/O事件、读写数据与数据处理等;第二个任务是作为任务队列,执行taskQueue中的人物,例如用户调用eventLoop.schedule提交的定时任务也是由这个线程执行的。

4.1 NioEventLoop的实例化过程

        先了解一下EventLoop实例化的运行时序图,如下:

        从上图可以看出,SingleThreadEventExecutor有一个名为thread的Thread类型属性,这个属性就是与SingleThreadEventExecutor关联的本地线程。来看thread是在哪里被赋值的,代码如下:

private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    for (;;) {
                        int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                            break;
                        }
                    }

                    // Check if confirmShutdown() was called at the end of the loop.
                    if (success && gracefulShutdownStartTime == 0) {
                        logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                                "before run() implementation terminates.");
                    }

                    try {
                        // Run all remaining tasks and shutdown hooks.
                        for (;;) {
                            if (confirmShutdown()) {
                                break;
                            }
                        }
                    } finally {
                        try {
                            cleanup();
                        } finally {
                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            threadLock.release();
                            if (!taskQueue.isEmpty()) {
                                logger.warn(
                                        "An event executor terminated with " +
                                                "non-empty task queue (" + taskQueue.size() + ')');
                            }

                            terminationFuture.setSuccess(null);
                        }
                    }
                }
            }
        });
    }

         前面分析过,SingleThreadEventExecutor启动时会调用doStartThread方法,然后调用executor.execute方法,将当前线程赋值给thread。在这个线程中所做的事情主要就是调用SingleThreadEventExecutor.this.run()方法,因为NioEventLoop实现了这个方法,所以根据多态性,其实调用的是NioEventLoop.run方法。

4.2 EventLoop与Channel关联

        在Netty中,每个Channel都有且仅有一个EventLoop与之关联,它们的关联过程如下图:

               

        从上图可以看到,当调用AbstractChannel.register()方法后,就完成了Channel和EventLoop的关联,register方法的具体实现如下:

        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

        在register方法中,会将一个EventLoop赋值给AbstractChannel内部的eventLoop属性,这句代码就完成了EventLoop与Channel的关联过程。

4.3 EventLoop 启动

        前面已经介绍NioEventLoop本身就是一个SingleThreadEventExecutor,因此NioEventLoop的启动,其实就是NioEventLoop所绑定的本地Java线程的启动。

        按照这个思路,只需要找到在哪里调用了SingleThreadExecutor中thread属性的start方法就可以知道在哪里启动这个线程了。前面分析过,其实thread.start()被封装在SingleThreadExecutor.startThread()方法中,代码如下:

    private void startThread() {
        if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                doStartThread();
            }
        }
    }

        STATE_UPDATER是SingleThreadExecutor内部维护的一个属性,它的作用是标识当前的Thread的状态。在初始化的时候,STATE_UPDATER == ST_NOT_STARTED,因此第一次调用startThread方法时,就会进入if语句内,进而调用thread.start方法。而这个关键的startThread方法又是在哪调用的呢?用方法调用关系反向查找功能,就恢复阿贤,startTahread方法是在SingleThreadEventExecutor的execute方法中调用的,代码如下:

    @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }

        既然如此,现在只需要找到第一次调用SingleThreadEventExecutor的execute方法的位置即可。前面在提到注册Channel的过程中,会在AbstractChannel的register方法中调用eventLoop.execute方法,在EventLoop中进行Channel注册代码的执行,AbstractChannel的register方法的关键代码如下:

        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
           //删除判断代码

            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                   //删除异常处理代码
                }
            }
        }

        很明显,从Boostrap的bind方法一路跟踪到AbstractChannel的register方法,整个代码都是在主线程中运行的,因此上面的eventLoop.inEventLoop()返回值为false,于是进入else分支,在这个分支中调用eventLoop.execute方法,而NioEventLoop没有实现execute方法,因此调用的是SingleThreadEventExecutor的execute方法,关键代码如下:

    @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }

       由于 inEventLoop ==false,因此直行道else分支就调用startThread方法来启动SingleThreadEventExecutor内部关联的Java本地线程。用一句话总结:当EventLoop的execute方法第一次被调用时,会触发startThread方法的调用,进而启动EventLoop所对应的Java本地线程。

        完整的EventLoop启动时序图如下:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/499255.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

(科研篇)如何做科研

1.科研周期: 2.CCF列表 1.搜索论文(顶会) 2.谷歌学术检索 3.如何阅读文献 最重要的部分是abstract introduction 和related work,要明白某个东西的历史,从而进一步发现的缺陷,然后通过实现实验去证明。 通…

HubSpot出海CRM的团队协作与流程优化

在数字化营销日益盛行的今天,团队协作与流程优化已成为企业获取竞争优势的关键因素。HubSpot出海CRM不仅提供了强大的客户管理工具,更在团队协作与流程优化方面展现出卓越的能力。 一、团队协作在营销中的重要性 团队协作在营销中的重要性不言而喻。一…

光伏智慧管理平台:全周期全流程光伏业务管理

随着光伏技术的快速发展和光伏电站规模的不断扩大,光伏业务的管理变得越来越复杂。为了提高管理效率、降低运营成本并提升光伏电站的运行效益,光伏智慧管理平台应运而生。本文将重点介绍光伏智慧管理平台的功能及其在全周期全流程光伏业务管理中的应用。…

最长有效括号(C语言)

题目链接:. - 力扣(LeetCode) 这道题,我看了一种解法,觉得很好,来分享一下 这道题主要是 思考 当前 ) 与之匹配 ( 在哪里 ,记录下来,最后比较最大值 例子: 第…

浅谈 kafka

引言 同事在公司内部分享了关于 kafka 技术一些相关的内容,所以有了这篇文章;部分图片选自网络摘抄; 1 Kafka概述 1.1 定义 Kafka传统定义:kafka是一个分布式的基于发布/订阅模式的消息队列。 Kafka最新定义:kafka…

【Frida】【Android】05_Objection实战

🛫 系列文章导航 【Frida】【Android】01_手把手教你环境搭建 https://blog.csdn.net/kinghzking/article/details/136986950【Frida】【Android】02_JAVA层HOOK https://blog.csdn.net/kinghzking/article/details/137008446【Frida】【Android】03_RPC https://bl…

Kibana操作Elasticsearch教程

文章目录 简介ES文档操作创建索引查看索引创建映射字段查看映射关系字段属性详解typeindexstore 字段映射设置流程 新增数据新增会随机生成id新增自定义id智能判断 修改数据删除数据查询基本查询查询所有(match_all)匹配查询多字段查询词条匹配多词条精确…

HarmonyOS 应用开发之创建PageAbility

开发者需要重写app.js/app.ets中的生命周期回调函数,开发者通过DevEco Studio开发平台创建PageAbility时,DevEco Studio会在app.js/app.ets中默认生成onCreate()和onDestroy()方法,其他方法需要开发者自行实现。接口说明参见前述章节&#xf…

maven 依赖机制

安全工程师为啥关注maven依赖 log 4j事件之后,大家开始更加关注开源组件安全漏洞这个事。纷纷引入SCA 软件成分分析工具来识别项目中存在的开源组件和漏洞。 在sca工具扫描之后,会报出一大堆组件,review这个事就是安全团队投入时间来研判了…

【Linux多线程】线程的同步与互斥

【Linux多线程】线程的同步与互斥 目录 【Linux多线程】线程的同步与互斥分离线程Linux线程互斥进程线程间的互斥相关背景概念问题产生的原因: 互斥量mutex互斥量的接口互斥量实现原理探究对锁进行封装(C11lockguard锁) 可重入VS线程安全概念常见的线程不安全的情况…

是谁?阻止CXL在AI场景大展身手~

CXL虽然被视为业内新宠,但好像在AI场景的应用反而没有得到广泛的响应。 AI场景对内存带宽、容量以及数据一致性有着极高需求,特别是在深度学习训练和推理过程中,大量数据需要在CPU、GPU、加速器以及内存之间快速、高效地流动。CXL作为一种新…

Java基础入门day24

day24 abstract 抽象:似是而非,像又不是,具备某种对象的特征,但不完整 生活中的抽象:动物,并不真实存在的事物 程序中的抽象:不应该被创建的对象,动物近视一种会吃会睡的对象&#…

Netty核心原理剖析与RPC实践16-20

Netty核心原理剖析与RPC实践16-20 16 IO 加速:与众不同的 Netty 零拷贝技术 今天的课程我们继续讨论 Netty 实现高性能的另一个高阶特性——零拷贝。零拷贝是一个耳熟能详的词语,在 Linux、Kafka、RocketMQ 等知名的产品中都有使用,通常用于…

【单调栈】力扣84.柱状图中最大的矩形

上篇文章我们介绍了使用 无重复值 单调栈代码解决 含有重复值 的问题,在文章的最后,留下了一道考察相同思想的题目,今天我们来看看如何套路解决该题。 (还没看过前几篇介绍的小伙伴赶快关注,在 「单调栈」 集合里查看…

通过node 后端实现颜色窃贼 (取出某个图片的主体rgb颜色 )

1.需求 我前端轮播图的背景色 想通过每一张轮播图片的颜色作为背景色 这样的话 需要通过一张图片 取出图片的颜色 这个工作通过前端去处理 也可以通过后端去处理 前端我试了试 color-thief 的插件 但是 这个插件是基于canvas 的模式来的 我需要在小程序中使用这个插件 而且是…

HarmonyOS-如何使用ArkTS声明式语法和基础组件,实现待办列表。

介绍 本篇Codelab将介绍如何使用ArkTS声明式语法和基础组件,实现简易待办列表。效果为点击某一事项,替换标签图片、虚化文字。效果如图所示: 相关概念 ArkTS语法:ArkTS是HarmonyOS的主要应用开发语言。ArkTS基于TypeScript&…

2024/3/29(MybatisPlus插件代码生成,静态工具,逻辑删除,枚举处理器.JSON处理器,分页插件,通用分页实体)

jdbc:mysql://localhost:3306/mp?useUnicodetrue&characterEncodingutf8&serverTimezoneUTC 需要这样 日志查看级别

【C++杂货铺】内管管理

目录 🌈前言🌈 📁 C/C中内存分布 📁 new 和 delete的使用 📁 new 和 delete的优点 📁 new 和 delete的原理 📂 operator new 和 operator delete函数 📂 内置类型 &#x1f4c2…

代码随想录-DAY4|leetcode-24,19,142,面试题 02.07

文章目录 22. 两两交换链表中的节点19. 删除链表的倒数第N个节点size-n方式删除双指针方式(推荐) 面试题 02.07. 链表相交142. 环形链表II暴力解法快慢指针(推荐) 22. 两两交换链表中的节点 leetcode链接:两两交换链表…

怎样一次性给多篇word文档标注拼音?一键批量注音

随着办公自动化的普及,我们经常会遇到需要处理大量Word文档的情况。在这些文档中,有时需要将文字标注上拼音,特别是在处理一些包含生僻字或需要拼音辅助阅读的文档时。然而,手动一篇篇地给Word文档标注拼音不仅效率低下&#xff0…
最新文章