page contents

java reactor 教程——JAVA Reactor模型

本文讲述了Java reactor 教程——JAVA Reactor模型!具有很好的参考价值,希望对大家有所帮助。一起跟随六星小编过来看看吧,具体如下:

attachments-2023-07-zyhjoMuP64b0a562b758a.png本文讲述了Java reactor 教程——JAVA Reactor模型!具有很好的参考价值,希望对大家有所帮助。一起跟随六星小编过来看看吧,具体如下:

一.Reactor是一种设计模式。基于事件驱动,然后通过事件分发器,将事件分发给对应的处理器进行处理。 

Reactor:监听网络端口,分发网络连接事件给Acceptor,具体的感兴趣读写事件handler Acceptor:接受新的连接,连接的读写事件操作交给相应的Handler Handler:注册为callback对象,并且注册自己感兴趣的读事件或者写事件等等,然后再相应的方法内进行业务操作内容

1.单线程版

attachments-2023-07-mrfxZ7gm64b0a4b5d4fe0.png

参考代码:

package com.ddcx.utils;

/**

 * @author: xc

 * @ClassName: Test

 * @Date: 2021-03-03 12:47

 * @Description:

 */


import java.io.IOException;

import java.net.InetSocketAddress;

import java.nio.channels.SelectionKey;

import java.nio.channels.Selector;

import java.nio.channels.ServerSocketChannel;

import java.nio.channels.SocketChannel;

import java.util.Iterator;

import java.util.Set;


class Reactor implements Runnable {

    final Selector selector;

    final ServerSocketChannel serverSocket;


    Reactor(int port) throws IOException { //Reactor初始化

        selector = Selector.open();

        serverSocket = ServerSocketChannel.open();

        //要监听的网络端口号

        serverSocket.socket().bind(new InetSocketAddress(port));

        //非阻塞

        serverSocket.configureBlocking(false);

        //分步处理,第一步,接收accept事件

        SelectionKey sk =

                serverSocket.register(selector, SelectionKey.OP_ACCEPT);

        //attach callback object, Acceptor

        sk.attach(new Acceptor());

    }


    @Override

    public void run() {

        try {

            while (!Thread.interrupted()) {

                //阻塞到至少有一个通道在你注册的事件上就绪了。

                selector.select();

                Set selected = selector.selectedKeys();

                Iterator it = selected.iterator();

                while (it.hasNext()) {

                    //Reactor负责dispatch收到的事件

                    dispatch((SelectionKey) (it.next()));

                }

                selected.clear();

            }

        } catch (IOException ex) { /* ... */ }

    }


    void dispatch(SelectionKey k) {

        Runnable r = (Runnable) (k.attachment());

        //调用之前注册的callback对象

        if (r != null) {

            //这里是Acceptor的run方法

            r.run();

        }

    }


    // inner class

    class Acceptor implements Runnable {


        @Override

        public void run() {

            try {

                //阻塞到获取网络连接通道

                SocketChannel channel = serverSocket.accept();

                if (channel != null) {

                    //连接已经就绪,将相应的感兴趣的读写事件注册到回调中

                    new ReadHander(selector, channel);

                }

            } catch (IOException ex) { /* ... */ }

        }

    }



    public static void main(String[] args) throws IOException {

        Reactor reactor = new Reactor(9000);

        reactor.run();

    }

}

package com.ddcx.utils;


/**

 * @author: xc

 * @ClassName: ReadHander

 * @Date: 2021-03-03 12:48

 * @Description:

 */


import java.io.IOException;

import java.nio.ByteBuffer;

import java.nio.channels.SelectionKey;

import java.nio.channels.Selector;

import java.nio.channels.SocketChannel;


class ReadHander implements Runnable {

    final SocketChannel channel;

    final SelectionKey sk;

    ByteBuffer input = ByteBuffer.allocate(90);

    ByteBuffer output = ByteBuffer.allocate(400);

    static final int READING = 0, SENDING = 1;

    int state = READING;


    ReadHander(Selector selector, SocketChannel c) throws IOException {

        channel = c;

        c.configureBlocking(false);

        // Optionally try first read now

        sk = channel.register(selector, 0);


        //将Handler作为callback对象

        sk.attach(this);


        //第二步,注册Read就绪事件

        sk.interestOps(SelectionKey.OP_READ);

        selector.wakeup();

    }


    boolean inputIsComplete() {

        /* ... */

        return false;

    }


    boolean outputIsComplete() {


        /* ... */

        return false;

    }


    void process() {

        /* ... */

        return;

    }


    @Override

    public void run() {

        try {

            if (state == READING) {

                read();

            } else if (state == SENDING) {

                send();

            }

        } catch (IOException ex) { /* ... */ }

    }


    void read() throws IOException {

        channel.read(input);

        if (inputIsComplete()) {

            process();

            state = SENDING;

            // Normally also do first write now

            //第三步,接收write就绪事件

            sk.interestOps(SelectionKey.OP_WRITE);

        }

    }


    void send() throws IOException {

        channel.write(output);


        //write完就结束了, 关闭select key

        if (outputIsComplete()) {

            sk.cancel();

        }

    }

}

①通过Selector的select()方法可以选择已经准备就绪的通道

②通过ServerSocketChannel.accept()方法监听新进来的连接.当accept()方法返回的时候,它返回一个包含新进来的连接的SocketChannel.因此,accept()方法会一直阻塞到有新连接到达.通常不会仅仅只监听一个连接单线程版Reactor模型,其实就是做了一件事情,就是把要监听的socket端口注册到selector中去,并且轮询线程内可以获取到多个已经准备就绪的socket连接通道,同时进行处理这些事件

2. 多线程Reactor模型

attachments-2023-07-h5Y4ypi064b0a4eb47c34.png

多线程主要体现在handler处理的时候,因为处理的事件可能耗时相对于久一些,这样做可以更快的处理感兴趣的事件

selectionKey.attach(new HandlerThreadPool(socketChannel));

3.主从模式多线程

attachments-2023-07-GZyHH8N964b0a50bb121e.png

1.mainReactor负责监听socket连接,用来处理新连接的建立和就绪,将建立的socketChannel指定注册给subReactor。网络连接的建立一般很快,所以这里一个主线程就够了

2.subReactor一般是cpu的核心数,将连接加入到连接队列进行监听,并创建handler进行各种事件处理;当有新事件发生时,subreactor就会调用对应的handler处理,而对具体的读写事件业务处理的功能交给handler线程池来完成。

更多相关技术内容咨询欢迎前往并持续关注六星社区了解详情。

想高效系统的学习Java编程语言,推荐大家关注一个微信公众号:Java圈子。每天分享行业资讯、技术干货供大家阅读,关注即可免费领取整套Java入门到进阶的学习资料以及教程,感兴趣的小伙伴赶紧行动起来吧。

attachments-2023-03-2AoKIjPQ64014b4ad30a3.jpg

  • 发表于 2023-07-14 09:31
  • 阅读 ( 244 )
  • 分类:Java开发

你可能感兴趣的文章

相关问题

0 条评论

请先 登录 后评论
王昭君
王昭君

209 篇文章

作家榜 »

  1. 轩辕小不懂 2403 文章
  2. 小柒 1658 文章
  3. Pack 1135 文章
  4. Nen 576 文章
  5. 王昭君 209 文章
  6. 文双 71 文章
  7. 小威 64 文章
  8. Cara 36 文章