博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Netty Reactor 线程模型(随记)
阅读量:4038 次
发布时间:2019-05-24

本文共 10011 字,大约阅读时间需要 33 分钟。

       Netty被称为一个高性能、高可扩展性能的异步事件驱动的网络应用程序框架,它极大地简化了TCP和UDP客户端和服务器开发等网络编程。

       Netty的Reactor模型中有四个核心概念:

  1.  Resources资源(请求和任务) 
  2.  Synchronous Event Demultiplexer同步事件复用器
  3. Dispatcher 分配器
  4.  Request Handler请求处理器

Reactor 模式设计和实现,我们先了解下它两端的通信方式:

服务端通信序列图如下:

客户端通信方式:

        Netty 的 IO 线程 NioEventLoop 由于聚合了多路复用器 Selector,可以同时并发处理成百上千个客户端 Channel,由于读写操作都是非阻塞的,这就可以充分提升 IO 线程的运行效率,避免由于频繁 IO 阻塞导致的线程挂起。

   Reactor线程模型的两次转变:

Reactor 线程接收请求->分发给线程池处理请求

 

 

mainReactor接收->分发给subReactor读写->具体业务逻辑分发给单独的线程池处理

上图出自Doug Lea的著名文档《Scalable IO in java》,建议大家可以去看看。

下载地址:

刚开始自己也没太明白这种设计的好处,于是就深入了下源码,看了看设计思想,深有体会。看百遍不如动手写一遍,这样才会有深入体会。

UML图

代码如下

package com.wywhdgg.netty.example.demo.reactor;import java.io.IOException;import java.net.InetSocketAddress;import java.net.ServerSocket;import java.nio.ByteBuffer;import java.nio.channels.*;import java.util.Iterator;import java.util.Random;import java.util.Set;import java.util.concurrent.*;import java.util.concurrent.atomic.AtomicInteger;/*** *@author dzhbo *@date 2019/12/5 23:07 *@Description:  Reactorc线程模型 *@version 1.0.0 */public class ReactorServer {    /**     * 处理业务的线程池     **/    private static ExecutorService workThreadPool = Executors.newCachedThreadPool();    /**     * 封装了selector.select()等事件轮询的代码     */    abstract class ReactorThread extends Thread {        /**         * 通道选择         **/        Selector selector;        /**         * 任务队列         **/        LinkedBlockingQueue
taskQueue = new LinkedBlockingQueue<>(); /** * Selector监听到有事件后,调用这个方法 */ public abstract void handler(SelectableChannel channel) throws Exception; private ReactorThread() throws IOException { selector = Selector.open(); } volatile boolean running = false; @Override public void run() { // 轮询Selector事件 while (running) { try { /**执行队列中的任务**/ Runnable task; /**非阻塞,如果获取到,就开始处理**/ while ((task = taskQueue.poll()) != null) task.run(); selector.select(1000); /**获取查询结果 遍历查询结果**/ Set
selected = selector.selectedKeys(); Iterator
iter = selected.iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); int readyOps = key.readyOps(); // 关注 Read 和 Accept两个事件 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { try { SelectableChannel channel = (SelectableChannel) key.attachment(); channel.configureBlocking(false); handler(channel); if (!channel.isOpen()) { key.cancel(); // 如果关闭了,就取消这个KEY的订阅 } } catch (Exception ex) { key.cancel(); // 如果有异常,就取消这个KEY的订阅 } } } selector.selectNow(); } catch (IOException e) { e.printStackTrace(); } } } private SelectionKey register(SelectableChannel channel) throws Exception { /*** register要以任务提交的形式,让reactor线程去处理的原因: 1.因为线程在执行channel注册到selector的过程中,会和调用selector.select()方法的线程争用同一把锁 2.select()方法实在eventLoop中通过while循环调用的,争抢的可能性很高,为了让register能更快的执行,就放到同一个线程来处理 */ FutureTask
futureTask = new FutureTask<>(() -> channel.register(selector, 0, channel)); taskQueue.add(futureTask); return futureTask.get(); } private void doStart() { if (!running) { running = true; start(); } } } private ServerSocketChannel serverSocketChannel; // 1、创建多个线程 - accept处理reactor线程 (accept线程) private ReactorThread[] mainReactorThreads = new ReactorThread[1]; // 2、创建多个线程 - io处理reactor线程 (I/O线程) private ReactorThread[] subReactorThreads = new ReactorThread[8]; /** * 初始化线程组 */ private void newGroup() throws IOException { // 创建IO线程,负责处理客户端连接以后socketChannel的IO读写 for (int i = 0; i < subReactorThreads.length; i++) { subReactorThreads[i] = new ReactorThread() { @Override public void handler(SelectableChannel channel) throws IOException { /**work线程只负责处理IO处理,不处理accept事件*/ SocketChannel socketChannel = (SocketChannel) channel; ByteBuffer requestBuffer = ByteBuffer.allocate(1024); while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1) { if (requestBuffer.position() > 0) break; } if (requestBuffer.position() == 0) { return; } /**写模式切换读 初始位置等于限制 limit = position; position = 0; //初始位置设置为0 mark = -1; //清除标记 * **/ requestBuffer.flip(); byte[] content = new byte[requestBuffer.limit()]; requestBuffer.get(content); System.out.println(Thread.currentThread().getName() + "收到数据,IP ADDRESS:" + socketChannel.getRemoteAddress() + " CONTENT:" + new String(content)); workThreadPool.submit(() -> { System.out.println("---------------执行业务数据---------------"); }); // 响应结果 200 String response = "HTTP/1.1 200 OK\r\n" + "Content-Length: 11\r\n\r\n" + "Hello World"; ByteBuffer buffer = ByteBuffer.wrap(response.getBytes()); while (buffer.hasRemaining()) { socketChannel.write(buffer); } } }; } // 创建mainReactor线程, 只负责处理serverSocketChannel for (int i = 0; i < mainReactorThreads.length; i++) { mainReactorThreads[i] = new ReactorThread() { AtomicInteger incr = new AtomicInteger(0); @Override public void handler(SelectableChannel channel) throws Exception { ServerSocketChannel serverSocketChannel = (ServerSocketChannel) channel; SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); // 收到连接建立的通知之后,分发给I/O线程继续去读取数据 int index = incr.getAndIncrement() % subReactorThreads.length; ReactorThread workEventLoop = subReactorThreads[index]; workEventLoop.doStart(); /**注册channel 信息**/ SelectionKey selectionKey = workEventLoop.register(socketChannel); selectionKey.interestOps(SelectionKey.OP_READ); System.out.println(Thread.currentThread().getName() + "收到新连接 : " + socketChannel.getRemoteAddress()); } }; } } /** * 初始化channel,并且绑定一个eventLoop线程 * * @throws IOException IO异常 */ private void initAndRegister() throws Exception { // 1、 创建ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); // 2、 将serverSocketChannel注册到selector int index = new Random().nextInt(mainReactorThreads.length); mainReactorThreads[index].doStart(); SelectionKey selectionKey = mainReactorThreads[index].register(serverSocketChannel); //注册接收事件 selectionKey.interestOps(SelectionKey.OP_ACCEPT); } /** * 绑定端口 * * @throws IOException IO异常 */ private void bind() throws IOException { // 1、 正式绑定端口,对外服务 serverSocketChannel.bind(new InetSocketAddress(8080)); System.out.println("启动完成,端口8080"); } public static void main(String[] args) { ReactorServer reactorServer = new ReactorServer(); try { //初始化线程组 reactorServer.newGroup(); //初始化链接 reactorServer.initAndRegister(); //绑定端口号 reactorServer.bind(); } catch (Exception e) { e.printStackTrace(); } }}// Thread-8收到新连接 : /127.0.0.1:51577// Thread-0收到数据,IP ADDRESS:/127.0.0.1:51577 CONTENT:GET / HTTP/1.1// Host: 127.0.0.1:8080// Connection: keep-alive// Cache-Control: max-age=0// Upgrade-Insecure-Requests: 1// User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.36// Sec-Fetch-User: ?1// Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9//Sec-Fetch-Site: cross-site//Sec-Fetch-Mode: navigate//Accept-Encoding: gzip, deflate, br//Accept-Language: zh-CN,zh;q=0.9//////Thread-8收到新连接 : /127.0.0.1:51578//---------------执行业务数据---------------//Thread-0收到数据,IP ADDRESS:/127.0.0.1:51577 CONTENT:GET /favicon.ico HTTP/1.1//Host: 127.0.0.1:8080//Connection: keep-alive//Pragma: no-cache//Cache-Control: no-cache//User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.36//Accept: image/webp,image/apng,image/*,*/*;q=0.8// Sec-Fetch-Site: same-origin// Sec-Fetch-Mode: no-cors// Referer: http://127.0.0.1:8080/// Accept-Encoding: gzip, deflate, br// Accept-Language: zh-CN,zh;q=0.9////// ---------------执行业务数据---------------

  

转载地址:http://dujdi.baihongyu.com/

你可能感兴趣的文章
[互联网学习]如何提高网站的GooglePR值
查看>>
[关注大学生]求职不可不知——怎样的大学生不受欢迎
查看>>
[关注大学生]读“贫困大学生的自白”
查看>>
[互联网关注]李开复教大学生回答如何学好编程
查看>>
[关注大学生]李开复给中国计算机系大学生的7点建议
查看>>
[关注大学生]大学毕业生择业:是当"鸡头"还是"凤尾"?
查看>>
[茶余饭后]10大毕业生必听得歌曲
查看>>
gdb调试命令的三种调试方式和简单命令介绍
查看>>
C++程序员的几种境界
查看>>
VC++ MFC SQL ADO数据库访问技术使用的基本步骤及方法
查看>>
VUE-Vue.js之$refs,父组件访问、修改子组件中 的数据
查看>>
Vue-子组件改变父级组件的信息
查看>>
Python自动化之pytest常用插件
查看>>
Python自动化之pytest框架使用详解
查看>>
【正则表达式】以个人的理解帮助大家认识正则表达式
查看>>
性能调优之iostat命令详解
查看>>
性能调优之iftop命令详解
查看>>
非关系型数据库(nosql)介绍
查看>>
移动端自动化测试-Windows-Android-Appium环境搭建
查看>>
Xpath使用方法
查看>>