Netty(四)NIO多线程优化

发布时间 2023-11-13 15:11:37作者: Tod4

Netty(四)NIO多线程优化


​ 前面的代码都只有一个选择器,没有充分利用多核CPU,因此可以分两组选择器

  • boss:单线程配一个选择器,专门处理accept事件,不负责数据的读写
  • worker:创建CPU核心数的线程,每个线程配一个选择器,轮流处理read事件
image-20231031160258298

1 多线程问题分析

  • 关键是这一部分的代码,需要保证register在worker线程启动之前执行,因为如果worker线程先执行的话就会导致在selector.select()处堵塞,从而导致使用了这个worker selector的、运行在boss线程的register也会被堵塞无法完成socketChannel的注册,进而worker线程也一直无法关注到读事件从而被一直堵塞

    	worker.register(sc); // 创建selector 启动worker-0线程	监听read事件
    	sc.register(worker.selector, SelectionKey.OP_READ, null); // boss线程,注册连接得到的socketChannel搭到创建的selector上,这行代码先执行的话会堵塞在select() 
    
  • 虽然这么写boss线程先执行不会出现这个问题,但是在第一个客户端连接之后,boss线程处于select()堵塞的状态,此时再有新客户端连接的话,仍然会因为select()方法而导致影响register出现上面的问题

​ 服务器MultiThreadServer

    public static void main(String[] args) throws IOException {
        Thread.currentThread().setName("boss");

        var ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ssc.bind(new InetSocketAddress(8080));

        var boss = Selector.open();
        var sscKey = ssc.register(boss, 0, null);
        sscKey.interestOps(SelectionKey.OP_ACCEPT);

        var worker = new Worker("worker-0");

        while (true) {
            boss.select();
            var iter = boss.selectedKeys().iterator();
            while (iter.hasNext()) {
                var key = iter.next();
                iter.remove();
                if (key.isAcceptable()) {
                    var channel = (ServerSocketChannel) key.channel();
                    var sc = channel.accept();
                    sc.configureBlocking(false);

                    log.debug("connected {}", sc.getRemoteAddress());
                    log.debug("before register {}", sc.getRemoteAddress());

                    worker.register(); // 创建selector 启动worker-0线程
                    log.debug("{} uses selector", Thread.currentThread().getName());
                    sc.register(worker.selector, SelectionKey.OP_READ, null); // boss线程
                    log.debug("after register {}", sc.getRemoteAddress());

                }
            }
        }

    }

​ Worker selector类

	static class Worker implements Runnable {
        private String name;
        private Thread thread;
        private Selector selector;

        private boolean start = false;

        public Worker(String name) {
            this.name = name;
        }

        public void register() throws IOException {
            if (!this.start) {
                this.thread = new Thread(this, this.name);
                this.selector = Selector.open();
                this.thread.start();
                this.start = true;
            }
        }

        @Override
        public void run() {
            while(true) {
                try {
                    log.debug("{} uses selector", Thread.currentThread().getName());
                    selector.select();
                    var iter = this.selector.selectedKeys().iterator();
                    while (iter.hasNext()) {
                        var key = iter.next();
                        iter.remove();
                        if(key.isReadable()) {
                            var buffer = ByteBuffer.allocate(16);
                            var sc = (SocketChannel)key.channel();
                            sc.read(buffer);
                            debugAll(buffer);
                        }
                    }
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

2 netty源码针对该问题的解决思路(使用堵塞队列+selector wakeup)

  • 上面的代码中,在触发accept事件后,得到连接的socketChannel并将其注册到worker selector上,其实还是在boss线程上执行的,因此需要先将这个注册的步骤放到worker启动的线程里面

  • 总结上面的问题,就是在多线程环境下,可能会出现线程执行selector.select()方法被堵塞,导致其他线程无法进行register也就是channel无法注册同一个selector的情况

  • 因此需要将两个操作都放到worker线程,一般会使用堵塞队列完成线程间的通信:

            private LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
    
            public Worker(String name) {
                this.name = name;
            }
    
            public void register(SocketChannel sc) throws IOException {
                if (!this.start) {
                    this.thread = new Thread(this, this.name);
                    this.selector = Selector.open();
                    this.thread.start();
                    this.start = true;
                }
    
                this.queue.add(() -> {
                    try {
                        sc.register(this.selector, SelectionKey.OP_READ, null); // boss线程
                    } catch (ClosedChannelException e) {
                        throw new RuntimeException(e);
                    }
                });
                
                selector.wakeup();
            }
    
  • 这样,每当出现需要进行新客户端连接(也就是需要需要在selector上注册socketChannel的时候),就会在堵塞队列中添加需要注册socketChannel的任务,然后通过wakeup方法唤醒selector,最后worker线程就会从阻塞队列中取到任务执行注册,避免了worker线程注册的时候select方法堵塞导致boss线程无法注册的情况

3 多worker进行负载均衡优化

  • 类似使用一个worker数组实现一个负载均衡,每次出现一个连接就去轮流使用worker数组的一个worker

  • 代码如下:

        public static void main(String[] args) throws IOException {
            Thread.currentThread().setName("boss");
    
            var ssc = ServerSocketChannel.open();
            ssc.configureBlocking(false);
            ssc.bind(new InetSocketAddress(8080));
    
            var boss = Selector.open();
            var sscKey = ssc.register(boss, 0, null);
            sscKey.interestOps(SelectionKey.OP_ACCEPT);
    
    
            var workers = new Worker[3];
            for(var i = 0; i < workers.length; i++) {
                workers[i] = new Worker("worker-" + i);
            }
            AtomicInteger count = new AtomicInteger();
    
            while (true) {
                boss.select();
                var iter = boss.selectedKeys().iterator();
                while (iter.hasNext()) {
                    var key = iter.next();
                    iter.remove();
                    if (key.isAcceptable()) {
                        var channel = (ServerSocketChannel) key.channel();
                        var sc = channel.accept();
                        sc.configureBlocking(false);
    
                        log.debug("connected {}", sc.getRemoteAddress());
                        log.debug("before register {}", sc.getRemoteAddress());
    
                        var index = count.get();
                        workers[index].register(sc); // 创建selector 启动worker-0线程
                        index = (index + 1) % workers.length;
                        count.set(index);
    
                        log.debug("after register {}", sc.getRemoteAddress());
    
                    }
                }
            }
    
        }
    
  • 一般情况下会设置线程数为cpu的核心数:

            log.debug("cpu core nums:{}", Runtime.getRuntime().availableProcessors());
            var workers = new Worker[Runtime.getRuntime().availableProcessors()];
    

    注意:Runtime.getRuntime().availableProcessors()如果工作在docker容器下,因为容器不是物理隔离的,因此会去拿物理cpu的个数,而不是docker被分配的cpu个数

    这个问题直到jdk10才解决,使用jvm参数UseContainerSupport配置可以默认开启获取容器配置