Netty(三)网络编程

发布时间 2023-11-13 15:11:37作者: Tod4

Netty(三)网络编程


1 阻塞和非阻塞

堵塞

  • 在没有数据可读的时候,包括数据复制的过程,线程必须堵塞等待,不会占用CPU但是线程相当于闲置
  • 在单线程下,两个堵塞的方法会相互影响,必须使用多线程,32位JVM一个线程320K,64位JVM一个线程1024K,为了减少线程数,需要采用线程池技术
  • 但是即便使用了线程池,如果有很多连接建立,但长时间inactive,仍然会阻塞线程池中的所有线程

非堵塞

  • 在某个Channel没有可读事件的时候,线程不必堵塞,可以去处理其他有可读事件的channel
  • 但是数据复制过程中,实际上还是堵塞的(AIO进行了改进)
  • 写数据的时候,线程只是等待数据写入Channel即可,无需等待Channel通过网络把数据发送出去
1.1 单线程下阻塞模式存在的问题

​ 如下就实现了服务器和客户端的交互过程,其中ServerSocketChannel的accept方法(用于和客户端建立连接)和SocketChannel的read方法(通过Channel读取客户端传来的数据)都会堵塞当前线程,使其放弃处理机停止运行

​ 这样存在的问题就是,如果同时有两个客户端建立连接,一个客户端连接后服务器走到read方法堵塞,此时第二个客户端的连接请求就不能够得到相应了,只有等到第一个客户端数据发送完毕,服务端线程才能够获得处理机处理第二个客户端的连接

@Slf4j
public class Server {
    public static void main(String[] args) throws IOException {
//        1 创建socketChannel
        var ssc = ServerSocketChannel.open();
//        2 绑定端口号
        ssc.bind(new InetSocketAddress(8080));
//        3 创建channel读取缓冲区
        var buffer = ByteBuffer.allocate(16);

        var channels = new ArrayList<SocketChannel>();

        while(true) {
            log.debug("before connect...");
//            4 建立与客户端连接
            var sc = ssc.accept();
            channels.add(sc);
            log.debug("connected {}", sc);

            for(var channel : channels) {
//                5 遍历已经链接到ServerSocket的socketChannel,读取channel数据到buffer缓冲区
                channel.read(buffer);
//                切换缓冲区读模式
                buffer.flip();
                debugRead(buffer);
                buffer.flip();
            }

        }
    }
}
public class Client {
    public static void main(String[] args) throws IOException {
        var sc = SocketChannel.open();

        sc.connect(new InetSocketAddress("localhost", 8080));
        System.out.println("waiting");
    }
}

1.2 使用非堵塞模式解决
  • ServerSocketChannel在非堵塞模式下,accept方法在没有客户端连接的时候会返回一个null并且不会堵塞线程
  • SocketChannel在非堵塞模式下,read方法同样会返回0不会堵塞线程
  • 如此一来,while(true)内不断重复执行,不断检查两个channel的变化使其不受对方的影响
  • 存在的问题就是虽然不会导致线程的堵塞,但是线程一直占用CPU空转,白白浪费性能
@Slf4j
public class Server {
    public static void main(String[] args) throws IOException {
//        1 创建socketChannel
        var ssc = ServerSocketChannel.open();
//        2 绑定端口号
        ssc.bind(new InetSocketAddress(8080));
//        设置为非堵塞模式
        ssc.configureBlocking(false);
//        3 创建channel读取缓冲区
        var buffer = ByteBuffer.allocate(16);

        var channels = new ArrayList<SocketChannel>();

        while (true) {
//            4 建立与客户端连接
            var sc = ssc.accept();
//            设置socketChannel为非堵塞模式,如果没有客户端连接则accept会返回null
//            以保证线程的正常执行
            if (sc != null) {
                log.debug("connected {}", sc);
                sc.configureBlocking(false);
                channels.add(sc);
            }

            for (var channel : channels) {
//                5 遍历已经链接到ServerSocket的socketChannel,读取channel数据到buffer缓冲区
                var read = channel.read(buffer);
//              如果客户端没有发送数据,则会返回0,保证线程的正常执行
                if (read > 0) {
                    //                切换缓冲区读模式
                    buffer.flip();
                    debugRead(buffer);
                    buffer.flip();
                }
            }

        }
    }
}

2 IO多路复用

  • Selector负责管理、监听channel,当channel没有触发事件的时候就会堵塞当前线程,触发后则归还处理机
  • Selector能够监听的事件类型分为四种:
    • accept:会在服务器端有连接请求的时候触发
    • connect:会在客户端有连接请求的时候触发
    • read:可读事件
    • write:可写事件
2.1 Selector处理accept

​ 步骤为:

  • 1 创建selector
  • 2 在创建的selector上注册需要监听的ServerSocketChannel
  • 3 设置需要监听的事件类型(1:read,4:write,8:connection, 16:accept)
  • 4 select()会在没有事件发生的时候阻塞线程
  • 5 获取事件集合,进而获取事件对应的channel
public class Server {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ssc.bind(new InetSocketAddress(8080));

//       1 创建selector
        Selector selector = Selector.open();
//       2 在创建的selector上注册需要监听的ServerSocketChannel
        SelectionKey sscKey = ssc.register(selector, 0, null);
//       3 设置需要监听的事件类型(1:read,4:write,8:connection, 16:accept)
        sscKey.interestOps(SelectionKey.OP_ACCEPT);

        while(true) {
            log.debug("selector begin");
//            select()会在没有事件发生的时候阻塞线程
            selector.select();
//            获取事件集合
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while(iterator.hasNext()) {
//                打印事件
                SelectionKey key = iterator.next();
                log.debug("selected key {}", key);
//                获取事件的channel
                ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                SocketChannel sc = channel.accept();
                log.debug("connected {}", key);
            }
        }
    }
}
2.2 Selector cancel取消事件

​ selector.select()方法的工作原理是检查新事件,如果不存在则堵塞,存在则不会堵塞线程,因此如果selector的事件得不到处理,会重新被添加到selectedKeys中,导致一直无法堵塞线程,也就是:如果事件未处理则Selector是非堵塞的

​ 如下代码就会导致非堵塞,while(true)一直在执行:

@Slf4j
public class Server {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ssc.bind(new InetSocketAddress(8080));

//       1 创建selector
        Selector selector = Selector.open();
//       2 在创建的selector上注册需要监听的ServerSocketChannel
        SelectionKey sscKey = ssc.register(selector, 0, null);
//       3 设置需要监听的事件类型(1:read,4:write,8:connection, 16:accept)
        sscKey.interestOps(SelectionKey.OP_ACCEPT);

        while(true) {
            log.debug("selector begin");
//            select()会在没有事件发生的时候阻塞线程
            selector.select();
//            获取事件集合
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while(iterator.hasNext()) {
//                打印事件
                SelectionKey key = iterator.next();
                log.debug("selected key {}", key);
//                获取事件的channel
//                ServerSocketChannel channel = (ServerSocketChannel) key.channel();
//                SocketChannel sc = channel.accept();
//                log.debug("connected {}", key);
            }
        }
    }
}

​ 这时候可以对未处理事件key进行取消,以避免继续添加到Selector的selectedKeys造成非堵塞

key.cancel();
2.3 处理read
  • selector负责所有channel的监视和管理,每个channel在selector上注册之后会得到一个selectionKey,通过selectionKey就能够获得触发的事件
  • 因此在发生事件的时候,selector会向相应selectionKey中的selectedKeys中填充key,但是不会删除
  • 这样后面整体的流程就是通过Selector监视所有的事件对线程进行堵塞,发生事件之后再判断事件类型,然后根据事件类型获取触发的事件再进行相应的操作
  • 这样完全避免了堵塞模式下两种事件类型的互相影响以及非堵塞模式下的CPU空转消耗问题,真的牛
image-20231030094234707
@Slf4j
public class Server {
    public static void main(String[] args) throws IOException {
        ByteBuffer buffer = ByteBuffer.allocate(16);
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.bind(new InetSocketAddress(8080));
        ssc.configureBlocking(false);

        Selector selector = Selector.open();
        SelectionKey sscKey = ssc.register(selector, 0, null);
        sscKey.interestOps(SelectionKey.OP_ACCEPT);

        while(true) {
            selector.select();
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while(iterator.hasNext()) {
                SelectionKey key = iterator.next();
                log.debug("key: {}", key);
                if(key.isAcceptable()) {
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    // 处理连接事件
                    SocketChannel sc = channel.accept();
                    sc.configureBlocking(false);

                    // 注册SocketChannel处理read事件
                    SelectionKey scKey = sc.register(selector, 0, null);
                    scKey.interestOps(SelectionKey.OP_READ);

                } else if(key.isReadable()) {
                    SocketChannel channel = (SocketChannel) key.channel();
                    // 处理读事件
                    channel.read(buffer);
                    buffer.flip();
                    debugRead(buffer);
                    buffer.clear();
                }
            }
        }

    }
}
2.4 事件key处理后不删除导致null pointer问题

​ 上面的代码在一个客户端连接发送请求后,会在sc.configureBlocking(false);出出现空指针问题,这是因为上面所说过的selector在处理事件的时候只会添加key而不会删除key,这样的话:

  • channel在accept、read处理事件的时候,selector只会标记事件key为已处理但是不会删除
  • 在客户端发送数据的时候,selectionKey的集合其实还是能够遍历到两个key,一个是用于处理客户端连接的,一个则是处理这次发送的数据的,但是在accept第一个连接key的时候,并没有真正的客户端连接(因为之前的连接已经被accept,也就是被selector标记为处理完毕的了),所以这时候执行accept方法会导致该ServerSocketChannel返回一个null,进而导致后面的空指针报错
  • 解决的方法就是需要手动在每次处理key之后删除selectionKey集合中的对应key就可以了
@Slf4j
public class Server {
    public static void main(String[] args) throws IOException {
        ByteBuffer buffer = ByteBuffer.allocate(16);
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.bind(new InetSocketAddress(8080));
        ssc.configureBlocking(false);

        Selector selector = Selector.open();
        SelectionKey sscKey = ssc.register(selector, 0, null);
        sscKey.interestOps(SelectionKey.OP_ACCEPT);

        while(true) {
            selector.select();
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while(iterator.hasNext()) {
                SelectionKey key = iterator.next();
                // 清除key
                iterator.remove();
                log.debug("key: {}", key);
                if(key.isAcceptable()) {
                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    // 处理连接事件
                    SocketChannel sc = channel.accept();
                    sc.configureBlocking(false);

                    // 注册SocketChannel处理read事件
                    SelectionKey scKey = sc.register(selector, 0, null);
                    scKey.interestOps(SelectionKey.OP_READ);

                } else if(key.isReadable()) {
                    SocketChannel channel = (SocketChannel) key.channel();
                    // 处理读事件
                    channel.read(buffer);
                    buffer.flip();
                    debugRead(buffer);
                    buffer.clear();
                }
            }
        }
    }
}
2.5 客户端强制关闭带来的问题
  • 客户端的关闭实际上也会触发一个读事件,而在selector监听到该读事件的时候,会唤醒被堵塞的线程,然后在代码34行处理读事件,也是就读取channel数据到缓冲区的时候,由于这时候连接已经关闭了所以会触发IOException

  • 可以使用异常捕获的方式处理该IOException,但是在客户端关闭连接后,服务器端会一直循环报错IOException,也就是一直在循环处理上面出现的问题。

    解决方法是取消处理该事件,虽然上面已经在迭代的时候删除了key,但是在通过该key的channel获取数据到缓冲区出现异常后,又会被重新添加到SelectionKey集合中,在while true循环中重复被处理

                        try {
                            SocketChannel channel = (SocketChannel) key.channel();
                            // 处理读事件
                            channel.read(buffer);
                            buffer.flip();
                            debugRead(buffer);
                            buffer.clear();
                        } catch (IOException e) {
    //                        e.printStackTrace();
                            key.cancel();
                        }
    
  • 更加优雅的方式则是使用jdk17的try-with-resourse,关闭channel的同时也会自动cancel key

                    } else if(key.isReadable()) {
                        try (SocketChannel channel = (SocketChannel) key.channel()){
                            // 处理读事件
                            channel.read(buffer);
                            buffer.flip();
                            debugRead(buffer);
                            buffer.clear();
                        } catch (IOException e) {
    
                        }
                    }
    
2.6 客户端正常断开带来的问题
  • 如果是第一种写法,还需要在正常断开的时候进行判断,如果读取到数据的返回结果为-1,则表示连接断开应该取消该事件
  • 否则也会被重新添加到SelectionKey集合从而导致循环处理的
                } else if(key.isReadable()) {
                    try {
                        SocketChannel channel = (SocketChannel) key.channel();
                        // 处理读事件
                        var read = channel.read(buffer);
                        if(read != -1) {
                            buffer.flip();
                            debugRead(buffer);
                            buffer.clear();
                        } else {
                            key.cancel();
                        }

                    } catch (IOException e) {
                        key.cancel();
                    }
                }
2.7 消息边界与处理
  • 消息边界问题:字符集的默认编码(即操作系统编码)为uft-8mb3,而当我们把byteBuffer的大小设置为4的时候,就会导致出现两次读取(read只有通过channel读取全部数据到buffer才能标记事件处理完成),进而出现一个中文字符占三个字节但是读取了四个进而出现的乱码问题

    image-20231030135029683
  • 解决方案:

    • 固定消息长度,即使数据包的大小一样,服务器就会按照预定的长度读取,缺点是浪费带宽

    • 按照分隔符拆分,思路和之前的半包解决相同,缺点是效率较低

          public static void main(String[] args) throws IOException {
              var buffer = ByteBuffer.allocate(16);
              ServerSocketChannel ssc = ServerSocketChannel.open();
              ssc.bind(new InetSocketAddress(8080));
              ssc.configureBlocking(false);
      
              Selector selector = Selector.open();
              SelectionKey sscKey = ssc.register(selector, 0, null);
              sscKey.interestOps(SelectionKey.OP_ACCEPT);
      
              while (true) {
                  selector.select();
                  Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                  while (iterator.hasNext()) {
                      SelectionKey key = iterator.next();
                      // 清除key
                      iterator.remove();
                      log.debug("key: {}", key);
                      if (key.isAcceptable()) {
                          ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                          // 处理连接事件
                          SocketChannel sc = channel.accept();
                          sc.configureBlocking(false);
      
                          // 注册SocketChannel处理read事件
                          SelectionKey scKey = sc.register(selector, 0, null);
                          scKey.interestOps(SelectionKey.OP_READ);
      
                      } else if (key.isReadable()) {
                          try {
                              SocketChannel channel = (SocketChannel) key.channel();
      
                              // 处理读事件
                              var read = channel.read(buffer);
                              if (read != -1) {
                                  split(buffer);
                              } else {
                                  key.cancel();
                              }
                          } catch (IOException e) {
                              key.cancel();
                          }
                      }
                  }
              }
          }
      
          private static void split(ByteBuffer buffer) {
              buffer.flip();
      
              for(var i = 0; i < buffer.limit(); i++) {
                  if(buffer.get(i) == '\n') {
                      var len = i + 1 - buffer.position();
                      var allocate = ByteBuffer.allocate(len);
                      while(len-- > 0) {
                          allocate.put(buffer.get());
                      }
                      debugAll(allocate);
                  }
              }
      
              buffer.compact();
          }
      
    • TLV格式,即Type、Length、Value,类型、长度已知的情况下,就可以方便地获取消息的大小,分配合适的buffer,缺点则是buffer需要提前分配,如果内容过大则会影响server的吞吐量

      • HTTP1.1是TLV格式
      • HTTP2.0之后则变成了LTV格式
  • 消息边界容量超出问题:

    如下面的代码,客户端发送的数据超过了缓冲区的大小,并且buffer设置为了局部变量,在第一次读取channel数据写入byteBuffer写满了之后,不会标记该读事件已完成,那么重新触发selector.select()获取读事件,由于是局部变量会重新创建一个新的ByteBuffer写入剩下的内容,因此会输出最后一次的读取数据:

                    try {
                        SocketChannel channel = (SocketChannel) key.channel();
                        var buffer = ByteBuffer.allocate(16);
                        // 处理读事件
                        var read = channel.read(buffer);
                        if (read != -1) {
                            split(buffer);
                        } else {
                            key.cancel();
                        }
                    } catch (IOException e) {
                        key.cancel();
                    }
    

    输出为:

    +--------+-------------------- all ------------------------+----------------+
    position: [5], limit: [6]
             +-------------------------------------------------+
             |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
    +--------+-------------------------------------------------+----------------+
    |00000000| 33 33 33 33 0a 00                               |3333..          |
    +--------+-------------------------------------------------+----------------+
    

    因此需要对ByteBuffer进行手动扩容,注意扩容的逻辑:在第一次read读取最大容量的数据,然后在第二次读取的时候发现超出容量就会去扩容,先把原来缓冲区的内容拷贝到新ByteBuffer,最后读入新数据

    这就要求两次读取的时候的ByteBuffer的引用指向不能发生变化,也就不能是局部变量,因为如果是新创建的变量的话,扩容的时候就无法拷贝原有数据了,从而导致只会打印最后一次读取的部分

    image-20231030182546211

    并且同时不应该直接在最外面使用全局变量,因为那样的话每一个channel都使用的同一个ByteBuffer,因此可以使用下面的attachment附件解决这一问题

2.8 attachment附件
  • 可以利用附件+手动扩容的方式解决消息边界的问题,附件就是为每一个channel添加的一个附件变量,可以通过selectionKey.attachment()获取到附加的attachment

  • 实现:首先SocketChannel在注册到Selector的时候,为其添加ByteBuffer附件

                        var buffer = key.attachment();
                        // 注册SocketChannel处理read事件
                        SelectionKey scKey = sc.register(selector, 0, buffer);
    

    然后在处理读事件的时候,判断是否需要扩容,从而进行扩容操作

    					   if (read != -1) {
                                split(buffer);
                                if(buffer.position() == buffer.limit()) {
                                    var newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
                                    // 注意buffer需要切换为读模式
                                    buffer.flip();
                                    newBuffer.put(buffer);
                                    key.attach(newBuffer);
                                }
                            }
    

    在写模式下position指针等于limit指针则说明需要扩容(因为表示没有发生数据的读取,数据读取的话compact函数会把已经读取的数据前移,也就是小于limit的

        private static void split(ByteBuffer buffer) {
            buffer.flip();
    
            for(var i = 0; i < buffer.limit(); i++) {
                if(buffer.get(i) == '\n') {
                    var len = i + 1 - buffer.position();
                    var allocate = ByteBuffer.allocate(len+1);
                    while(len-- > 0) {
                        allocate.put(buffer.get());
                    }
                    debugAll(allocate);
                }
            }
    
            buffer.compact();
        }
    

    如此一来就解决了消息读取超出边界的问题,整体代码为:

    public class Server {
        public static void main(String[] args) throws IOException {
    
            ServerSocketChannel ssc = ServerSocketChannel.open();
            ssc.bind(new InetSocketAddress(8080));
            ssc.configureBlocking(false);
    
            Selector selector = Selector.open();
            SelectionKey sscKey = ssc.register(selector, 0, ByteBuffer.allocate(16));
            sscKey.interestOps(SelectionKey.OP_ACCEPT);
    
            while (true) {
                selector.select();
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    // 清除key
                    iterator.remove();
                    log.debug("key: {}", key);
                    if (key.isAcceptable()) {
                        ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                        // 处理连接事件
                        SocketChannel sc = channel.accept();
                        sc.configureBlocking(false);
    
                        var buffer = key.attachment();
                        // 注册SocketChannel处理read事件
                        SelectionKey scKey = sc.register(selector, 0, buffer);
                        scKey.interestOps(SelectionKey.OP_READ);
    
                    } else if (key.isReadable()) {
                        try {
                            SocketChannel channel = (SocketChannel) key.channel();
                            // 处理读事件
                            var buffer = (ByteBuffer) key.attachment();
                            var read = channel.read(buffer);
                            if (read != -1) {
                                split(buffer);
                                if(buffer.position() == buffer.limit()) {
                                    var newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
                                    // 注意buffer需要切换为读模式
                                    buffer.flip();
                                    newBuffer.put(buffer);
                                    key.attach(newBuffer);
                                }
                            } else {
                                key.cancel();
                            }
                        } catch (IOException e) {
                            key.cancel();
                        }
                    }
                }
            }
        }
    
        private static void split(ByteBuffer buffer) {
            buffer.flip();
    
            for(var i = 0; i < buffer.limit(); i++) {
                if(buffer.get(i) == '\n') {
                    var len = i + 1 - buffer.position();
                    var allocate = ByteBuffer.allocate(len+1);
                    while(len-- > 0) {
                        allocate.put(buffer.get());
                    }
                    debugAll(allocate);
                }
            }
    
            buffer.compact();
        }
    }
    
2.9 ByteBuffer大小分配
  • 因为ByteBuffer不能被多个channel共同使用,因此每个channel都需要自己去记录可能被切分的消息,需要为每一个channel维护一个独立的ByteBuffer
  • ByteBuffer不能太大,比如一个设计为1Mb,那么百万连接就需要1Tn内存,需要设计一种可变的ByteBuffer
    • 一种思路是先使用一种小容量的buffer,如果发现数据不够再进行扩容将之前buffer的内容拷贝到扩容之后的ByteBuffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能
    • 另一种思路是用多个数组组成buffer,一个数组不够就把多出来的内容写入新的数组,与前面的区别是消息不连续解析复杂,优点是避免了拷贝引起的性能消耗
2.10 写入内容过多的问题

​ 下面的WriteServerWriteClient就模拟了服务器端写入内容过多的场景,首先在服务器端创建了一个50M的写入缓冲区,然后数据会首先写到网卡中的socket发送缓冲区,因此取决于网络情况也就是操作系统能够分配的缓冲区大小,服务器端的数据可能不会一次性发送完成,因此这里进行了判断然后循环写入。

WriteClient则循环读取到读取缓冲区,直到没有数据的时候就会出现read阻塞当前线程

public class WriteServer {
    public static void main(String[] args) throws IOException {
        var ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ssc.bind(new InetSocketAddress(8080));

        var selector = Selector.open();
        var sscKey = ssc.register(selector, 0, null);
        sscKey.interestOps(SelectionKey.OP_ACCEPT);

        while(true) {
            selector.select();
            var iterator = selector.selectedKeys().iterator();
            while(iterator.hasNext()) {
                var key = iterator.next();
                var channel = (ServerSocketChannel)key.channel();
                SocketChannel sc = channel.accept();
                sc.configureBlocking(false);

                var sb = new StringBuilder();
                for(var i = 0; i < 5000000; i++) {
                    sb.append("a");
                }

                var buffer = Charset.defaultCharset().encode(sb.toString());
                while(buffer.hasRemaining()) {
                    var write = sc.write(buffer);
                    System.out.println(write);
                }
            }
        }

    }
}

​ WriteClient

public class WriteClient {
    public static void main(String[] args) throws IOException {
        var sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost", 8080));

        var count = 0;
        var buffer = ByteBuffer.allocate(1024 * 16);
        while(true) {
            count += sc.read(buffer);
            System.out.println(count);
            buffer.clear();
        }
    }
}

2.11 处理可写事件

如果网卡的socket写入缓冲区一直处于忙碌状态,那么上面WriteServer的26行就会一直在while循环尝试写入,导致处理机空转导致性能浪费,上面代码可进行优化:

  • 不需要非得一次性写入,因为socketChannel的write在没有写完缓冲区的时候会重新触发一个写事件
  • 那么可以判断如果没有写完的话可以设置socketChannel对应的selectionKey的关注类型,添加写事件关注
  • 那么等到socket写入去能够触发写事件的时候,就能够通过selector.select();唤醒当前线程进行数据的写入,如果这次仍然写不完则继续等待写事件触发
  • 注意写完之后需要清除掉附件,并且取消对写事件的关注
public class WriteServer {
    public static void main(String[] args) throws IOException {
        var ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ssc.bind(new InetSocketAddress(8080));

        var selector = Selector.open();
        var sscKey = ssc.register(selector, 0, null);
        sscKey.interestOps(SelectionKey.OP_ACCEPT);

        while (true) {
            selector.select();
            var iterator = selector.selectedKeys().iterator();

            while (iterator.hasNext()) {
                var key = iterator.next();
                iterator.remove();

                if (key.isAcceptable()) {
                    var channel = (ServerSocketChannel) key.channel();
                    SocketChannel sc = channel.accept();
                    sc.configureBlocking(false);

                    var scKey = sc.register(selector, 0, null);
                    scKey.interestOps(SelectionKey.OP_READ);

                    var sb = new StringBuilder();
                    for (var i = 0; i < 5000000; i++) {
                        sb.append("a");
                    }

                    var buffer = Charset.defaultCharset().encode(sb.toString());
                    var write = sc.write(buffer);
                    System.out.println(write);

                    // sc.write 没写完缓冲区就会触发可写事件
                    if (buffer.hasRemaining()) {
                        scKey.interestOps(scKey.interestOps() + SelectionKey.OP_WRITE);
                        scKey.attach(buffer);
                    }
                } else if (key.isWritable()) {
                    var buffer = (ByteBuffer) key.attachment();
                    var sc = (SocketChannel) key.channel();
                    var write = sc.write(buffer);
                    System.out.println(write);

                    if(!buffer.hasRemaining()) {
                        key.attach(null);
                        key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
                    }
                }
            }
        }
    }
}