请选择 进入手机版 | 继续访问电脑版

[Netty] 网络编程模型 Reactor设计模式(四)

框架与中间件 框架与中间件 1626 人阅读 | 0 人回复

关于Java中的网络编程模型,强烈建议阅读下图文章,作者的名气就不在此赘述了,本文中也将穿插着引用相关内容,出处不在特殊说明,因为java的网络编程模型的具体实现就来源于此,他也是java NIO的作者。

image.png

网络服务结构

image.png

关于网络服务的基础结构如下

image.png

典型的服务设计

image.png

有的人看到handler可能会不知所云,其实就是指业务处理部分的代码单元

比如一个代码块({}),或者一个方法的封装。

最初的阻塞式的网络编程模型,就是这种形式。

下图红框中的处理部分,就可以认为是handler.

如果我们把处理逻辑部分封装到一个runable或者线程中,不断地创建新的线程处理,也就是所谓的

Each handler may be started in its own thread

也叫作 :connection per thread 最大问题是无法并发,效率太低,如果当前的请求没有处理完,那么后面的请求只能被阻塞,服务器的吞吐量太低。

ps:

读取-解码-处理-编码-返回,只是一个通用的抽象模型,概括的概念。

image.png

文中介绍的示例,也是这个意思

Classic ServerSocket Loop

class Server implements Runnable {
    public void run() {
        try {
            ServerSocket ss = new ServerSocket(PORT);
            while (!Thread.interrupted())
                new Thread(new Handler(ss.accept())).start();
// or, single-threaded, or a thread pool
        } catch (IOException ex) { /* ... */ }
    }
    static class Handler implements Runnable {
        final Socket socket;
        Handler(Socket s) { socket = s; }
        public void run() {
            try {
                byte[] input = new byte[MAX_INPUT];
                socket.getInputStream().read(input);
                byte[] output = process(input);
                socket.getOutputStream().write(output);
            } catch (IOException ex) { /* ... */ }
        }
        private byte[] process(byte[] cmd) { /* ... */ }
    }
}

image.png

构建高性能可伸缩的IO服务

在构建高性能可伸缩IO服务的过程中,我们希望达到以下的目标:

① 能够在海量负载连接情况下优雅降级;

② 能够随着硬件资源的增加,性能持续改进;

③ 具备低延迟、高吞吐量、可调节的服务质量等特点;

而分发处理就是实现上述目标的一个最佳方式。

image.png

分发模式

分发模式具有以下几个机制:

① 将一个完整处理过程分解为一个个细小的任务;每个任务执行相关的动作且不产生阻塞;

② 在任务执行状态被触发时才会去执行,例如只在有数据时才会触发读操作;

③java.nio包就很好的实现了上述的机制:

① 非阻塞的读和写

② 通过感知IO事件分发任务的执行

所以结合一系列基于事件驱动模式的设计,给高性能IO服务的架构与设计带来丰富的可扩展性;

在一般的服务开发当中,IO事件通常被当做任务执行状态的触发器使用,在hander处理过程中主要针对的也就是IO事件;

image.png

基于事件驱动模式的设计

①基于事件驱动的架构设计通常比其他架构模型更加有效

因为可以节省一定的性能资源,事件驱动模式下通常不需要为每一个客户端建立一个线程

更少的线程开销,更少的上下文切换和更少的锁互斥

但任务的调度可能会慢一些

②通常实现的复杂度也会增加

相关功能必须分解成简单的非阻塞操作

类似与GUI的事件驱动机制,当然也不可能把所有阻塞都消除掉,特别是GC, page faults(内存缺页中断)等。

由于是基于事件驱动的,所以需要跟踪服务的相关状态(因为你需要知道什么时候事件会发生);

image.png

事件主要包含两部分:

事件源

事件

上图中,事件源是Button,事件是点击, 也就是在Button上发生了点击事件。

为了对事件进行响应,通常还有事件监听器

事件源,通过,addXXXListener 或者removeXXXListener 对事件监听器进行管理

不同事件源,事先定义分类好了不同的事件,给不同的事件添加监听器,就可以产生不同的响应。

比如Button 可以有单击、双击等。

这就是一种事件驱动模型,当事件发生时,自然就会调用相应的事件处理器。

Reactor模式

image.png

Reactor也可以称作反应器模式,它有以下几个特点:

① Reactor模式中会通过分配适当的handler(处理程序)来响应IO事件,类似与AWT 事件处理线程;

② 每个handler执行非阻塞的操作,类似于AWT ActionListeners 事件监听

③ 通过将handler绑定到事件进行管理,类似于AWT addActionListener 添加事件监听;

image.png

当事件产生时,就可以调用相关的handler进行处理。

单线程模式

java.nio中相关的几个概念:

Channels 支持非阻塞读写的socket连接;

Buffers 用于被Channels可以直接读写的类似数组的对象

Selectors 用于判断channle发生IO事件的选择器

SelectionKeys 负责IO事件的状态与绑定

image.png

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

class Reactor implements Runnable {
    final Selector selector;
    final ServerSocketChannel serverSocket;

    Reactor(int port) throws IOException {
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(
                new InetSocketAddress(port));
        serverSocket.configureBlocking(false);
        SelectionKey sk =
                serverSocket.register(selector,
                        SelectionKey.OP_ACCEPT);
        sk.attach(new Acceptor());
    }

    /* Alternatively, use explicit SPI provider:
    SelectorProvider p = SelectorProvider.provider();
    selector = p.openSelector();
    serverSocket = p.openServerSocketChannel();
*/
    // class Reactor continued
    public void run() { // normally in a new Thread

        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                while (it.hasNext())
                    dispatch((SelectionKey) (it.next());
                selected.clear();
            }
        } catch (IOException ex) { /* ... */ }
    }

    void dispatch(SelectionKey k) {
        Runnable r = (Runnable) (k.attachment());
        if (r != null)
            r.run();
    }

    // class Reactor continued
    class Acceptor implements Runnable { // inner
        public void run() {
            try {
                SocketChannel c = serverSocket.accept();
                if (c != null)
                    new Handler(selector, c);
            } catch (IOException ex) { /* ... */ }
        }
    }
}

image.png

精简点说的话,上面的过程其实只做了一件事情,那就是注册关注OP_ACCPET事件,然后当事件就绪后,分发调用执行 accept方法,获得SocketChannel 然后丢给Handler进行处理

通过Handler的代码中,再次对相关Selector注册自己关心的事件,然后继续后续逻辑。

package com.crazybytex.fragment.reactor;


import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;

/**
 * @Author 本文作者 程序员潇然 疯狂的字节X https://www.crazybytex.com/
 * @Date 2022/10/10 16:56
 * @Description TODO
 **/
final class Handler implements Runnable {
    final SocketChannel socket;
    final SelectionKey sk;
    ByteBuffer input = ByteBuffer.allocate(MAXIN);
    ByteBuffer output = ByteBuffer.allocate(MAXOUT);
    static final int READING = 0, SENDING = 1;
    int state = READING;

    Handler(Selector sel, SocketChannel c)
            throws IOException {
        socket = c;
        c.configureBlocking(false);
// Optionally try first read now
        sk = socket.register(sel, 0);
        sk.attach(this);
        sk.interestOps(SelectionKey.OP_READ);
        sel.wakeup();
    }

    boolean inputIsComplete() { /* ... */ }

    boolean outputIsComplete() { /* ... */ }

    void process() { /* ... */ }

    // class Handler continued
    public void run() {
        try {
            if (state == READING) read();
            else if (state == SENDING) send();
        } catch (IOException ex) { /* ... */ }
    }

    void read() throws IOException {
        socket.read(input);
        if (inputIsComplete()) {
            process();
            state = SENDING;
// Normally also do first write now
            sk.interestOps(SelectionKey.OP_WRITE);
        }
    }

    void send() throws IOException {
        socket.write(output);
        if (outputIsComplete()) sk.cancel();
    }
}

下面是基于GoF状态对象模式对Handler类的一个优化实现,不需要再进行状态的判断。

image.png

之所以叫做单线程模式,是因为在任务处理阶段,也就是Handler中,只有一个当前线程进行处理任务。

如果遇到阻塞,那么整个Reactor的反应速度就会被拖慢了

多线程设计模式

在多处理器场景下,为实现服务的高性能我们可以有目的的采用多线程模式:

1、增加Worker线程,专门用于处理非IO操作,因为通过上面的程序我们可以看到,反应器线程需要迅速触发处理流程,而如果处理过程也就是process()方法产生阻塞会拖慢反应器线程的性能,所以我们需要把一些非IO操作交给Woker线程来做;

2、拆分并增加反应器Reactor线程

一方面在压力较大时可以饱和处理IO操作,提高处理能力;

另一方面维持多个Reactor线程也可以做负载均衡使用;线程的数量可以根据程序本身是CPU密集型还是IO密集型操作来进行合理的分配;

2.1 多工作线程模式

Reactor多线程设计模式具备以下几个特点:

① 通过卸载非IO操作来提升Reactor 线程的处理性能,这类似与POSA2 中Proactor的设计;

② 比将非IO操作重新设计为事件驱动的方式更简单;

③ 但是很难与IO重叠处理,最好能在第一时间将所有输入读入缓冲区;(这里我理解的是最好一次性读取缓冲区数据,方便异步非IO操作处理数据)

④ 可以通过线程池的方式对线程进行调优与控制,一般情况下需要的线程数量比客户端数量少很多;

下面是Reactor多线程设计模式的一个示意图与示例代码(我们可以看到在这种模式中在Reactor线程的基础上把非IO操作放在了Worker线程中执行):

image.png

class Handler implements Runnable {
    // uses util.concurrent thread pool
    static PooledExecutor pool = new PooledExecutor(...);
    static final int PROCESSING = 3;
    // ...
    synchronized void read() { // ...
        socket.read(input);
        if (inputIsComplete()) {
            state = PROCESSING;
            pool.execute(new Processer());
        }
    }
    synchronized void processAndHandOff() {
        process();
        state = SENDING; // or rebind attachment
        sk.interest(SelectionKey.OP_WRITE);
    }
    class Processer implements Runnable {
        public void run() { processAndHandOff(); }
    }
}

只是将执行过程,进行了再次封装,封装到Processer中,然后丢到线程池中进行使用。

当你把非IO操作放到线程池中运行时,你需要注意以下几点问题:

① 任务之间的协调与控制,每个任务的启动、执行、传递的速度是很快的,不容易协调与控制;

② 每个hander中dispatch的回调与状态控制;

③ 不同线程之间缓冲区的线程安全问题;

④ 需要任务返回结果时,任务线程等待和唤醒状态间的切换;

为解决上述问题可以使用PooledExecutor线程池框架,这是一个可控的任务线程池,主函数采用execute(Runnable r),它具备以下功能,可以很好的对池中的线程与任务进行控制与管理:

① 可设置线程池中最大与最小线程数;

② 按需要判断线程的活动状态,及时处理空闲线程;

③ 当执行任务数量超过线程池中线程数量时,有一系列的阻塞、限流的策略;

基于多个反应器的多线程模式

是对上面模式的进一步完善,使用反应器线程池

一方面根据实际情况用于匹配调节CPU处理与IO读写的效率,提高系统资源的利用率

另一方面在静态或动态构造中每个反应器线程都包含对应的Selector,Thread,dispatch loop

下面是一个简单的代码示例与示意图(Netty就是基于这个模式设计的,一个处理Accpet连接的mainReactor线程,多个处理IO事件的subReactor线程)

注意蓝色字体

image.png

image.png

在服务的设计当中,我们还需要注意与java.nio包特性的结合:

一是注意线程安全,每个selectors 对应一个Reactor 线程,并将不同的处理程序绑定到不同的IO事件,在这里特别需要注意线程之间的同步;

二是java nio中文件传输的方式:

① Memory-mapped files 内存映射文件的方式,通过缓存区访问文件;

② Direct buffers直接缓冲区的方式,在合适的情况下可以使用零拷贝传输,但同时这会带来初始化与内存释放的问题(需要池化与主动释放);

核心模型

image.png

其实仔细观察多线程模式,其实就是单线程模式的简单扩展,没有其他更多的悬念

实际执行任务逻辑的Handler多线程(工作线程)

Selector多线程

common_log.png 转载务必注明出处:程序员潇然,疯狂的字节X,https://crazybytex.com/thread-203-1-1.html

关注下面的标签,发现更多相似文章

文章被以下专栏收录:

    黄小斜学Java

    疯狂的字节X

  • 目前专注于分享Java领域干货,公众号同步更新。原创以及收集整理,把最好的留下。
    包括但不限于JVM、计算机科学、算法、数据库、分布式、Spring全家桶、微服务、高并发、Docker容器、ELK、大数据等相关知识,一起进步,一起成长。
热门推荐
[若依]微服务springcloud版新建增添加一个
[md]若依框架是一个比较出名的后台管理系统,有多个不同版本。
[CXX1300] CMake '3.18.1' was not
[md][CXX1300] CMake '3.18.1' was not found in SDK, PATH, or
java 解析modbus 协议 Modbus4j应用 使用mo
[md]本文记录使用java开发modbus协议程序,与串口进行连接,使用