Java网络编程NIO实现简易多人聊天室

发布时间 2023-09-27 19:46:11作者: 万里阳光号船长

BIO模型

BIO即blocking IO,顾名思义是一种阻塞模型。当没有客户端连接时,服务端会一直阻塞,当有客户端新建连接时,服务端会新开一个线程去响应(不用多线程的话服务端同一时刻最多只能接收一个连接)。但不断的新开线程对服务器的压力是巨大的,为了缓解压力可以采用线程池技术实现线程复用,但这种做法治标不治本,本质还是一个连接一个线程。代码如下:

//服务端
public class BIOServer {

    private final static int PORT = 8888;

    public static void main(String[] args) throws IOException {
        ServerSocket server = new ServerSocket(PORT);
        ExecutorService pool = Executors.newFixedThreadPool(20);
        while(true) {
            Socket socket = server.accept();
            pool.execute(()->{
                handler(socket);
            });
        }
    }

    public static void handler(Socket socket) {
        try {
            InputStream inputStream = socket.getInputStream();
            byte[] bytes = new byte[1024];
            while(true) {
                int len = inputStream.read(bytes);
                if(len != -1) {
                    System.out.println("收到来自客户端的消息:" + new String(bytes,0, len));
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}
//客户端
public class BIOClient {

    private final static String IP = "localhost";
    private final static int PORT = 8888;

    public static void main(String[] args) throws IOException {
        Socket socket = new Socket(IP,PORT);
        Scanner sc = new Scanner(System.in);
        BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
        while(sc.hasNext()) {
            String str = sc.nextLine();
            bw.write(str);
            bw.flush();
        }
    }

}

NIO模型

NIO即non-blocking IO,顾名思义是一种非阻塞模型。NIO的目的就是实现一个线程处理多个连接,故引入了几个重要的核心概念:

  • Channel,管道。Channel可以理解为连接,与BIO中Sokcet类似,一个连接对应一个Channel,但Channel中仍内置了一个Socket,可以调用socket()获取。
  • Selector,选择器。Selector类似一个调度中心,所有Channel都需要注册到选择器中,并绑定一个SelectionKey,绑定时还会指定要监听的事件,如:连接就绪、读就绪、写就绪等。可以调用Selector提供的API实现对发生监听事件的连接进行处理。
  • Buffer,缓冲区。Buffer底层是一个数组,供Channel实现对数据的读写。Buffer的position、limit、capacity分别指当前索引、读/写上限索引、数组容量。

三者之间的关系如下图所示(图比较乱......序号表示NIO流程执行的大概步骤,部分连线只为了便于理解,不等于实际调用):
在这里插入图片描述
下面是用NIO实现的简易多人聊天室代码:

//服务端
public class NIOServer {

    private final String IP = "localhost";
    private final int PORT = 8888;
    private ServerSocketChannel serverChannel;
    private Selector selector;
    private int count;

    public NIOServer init() {
        try {
            serverChannel = ServerSocketChannel.open();
            selector = Selector.open();
            serverChannel.bind(new InetSocketAddress(IP,PORT));
            serverChannel.configureBlocking(false);
            serverChannel.register(selector, SelectionKey.OP_ACCEPT);
            count = 0;
        } catch (IOException e) {
            System.out.println("初始化失败...");
        } finally {
            return this;
        }
    }

    public void start() {
        while(true) {
            try {
                //阻塞直到有任意通道发生任一事件
                selector.select();
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    //监听到新用户连接
                    if (key.isAcceptable()) {
                        userConnect(key);
                    }
                    //监听到有用户发消息
                    if (key.isReadable()) {
                        userSendMsg(key);
                    }
                    iterator.remove();
                }
            } catch (IOException e) {
                System.out.println("未知的bug...");
            }
        }
    }

    public void userConnect(SelectionKey key) throws IOException {
        SocketChannel channel = serverChannel.accept();
        channel.configureBlocking(false);
        channel.register(selector, SelectionKey.OP_READ);
        String msg = "有新的用户接入:" + channel.socket().getRemoteSocketAddress() +
                ",在线用户总数:" + ++count + "个";
        System.out.println(msg);
        transfer(msg, channel);
    }

    public void userSendMsg(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        String msg = null;
        try {
            //用户非正常关闭会直接在read这里抛IO异常
            int len = channel.read(buffer);
            //用户正常关闭连接需要手动抛IO异常,不然会一直空轮询分发读就绪事件
            if (len == -1)
                throw new IOException();
            msg = channel.socket().getPort() + ":" + new String(buffer.array(), 0, len);
        } catch (IOException e) {
            //有用户断开连接
            msg = channel.socket().getPort() + "断开连接,在线用户总数:" + --count + "个";
            key.cancel();
            channel.socket().close();
            channel.close();
        } finally {
            System.out.println(msg);
            transfer(msg, channel);
        }
    }

    public void transfer(String msg, SocketChannel self) throws IOException{
        Iterator<SelectionKey> iterator = selector.keys().iterator();
        ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
        while(iterator.hasNext()) {
            SelectionKey key = iterator.next();
            if(key.channel() instanceof ServerSocketChannel)
                continue;
            SocketChannel channel = (SocketChannel) key.channel();
            //转发用户消息
            if(self != key.channel()) {
                channel.write(buffer);
                buffer.clear();
            }
        }
    }

    public static void main(String[] args) {
        new NIOServer().init().start();
    }

}
public class NIOClient {

    private final static String IP = "localhost";
    private final static int PORT = 8888;
    private Selector selector;
    private SocketChannel channel;

    public void init() {
        try {
            selector = Selector.open();
            InetSocketAddress address = new InetSocketAddress(IP, PORT);
            channel = SocketChannel.open();
            channel.connect(address);
            channel.configureBlocking(false);
            channel.register(selector, SelectionKey.OP_READ);
        } catch (IOException e) {
            System.out.println("初始化失败...");
        }
    }

    public void sendMsg(String msg) {
        try {
            ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
            channel.write(buffer);
        } catch (IOException e) {
            System.out.println("服务器异常...");
        }
    }

    public void readMsg(ByteBuffer buffer) {
        try {
            //只绑定了一个channel且监听的读事件...
            if(selector.select() > 0) {
                int len = channel.read(buffer);
                if (len != -1) {
                    buffer.flip();
                    System.out.println(new String(buffer.array(), 0, len));
                }
                //这一步一定不能少,不然只能读一次消息
                selector.selectedKeys().clear();
            }
        } catch (IOException e) {
            System.out.println("服务器异常...");
        }
    }

    public static void main(String[] args) {
        NIOClient client = new NIOClient();
        client.init();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        new Thread(() -> {
            while (true) {
                buffer.clear();
                client.readMsg(buffer);
            }
        }).start();
        Scanner sc = new Scanner(System.in);
        while(sc.hasNext()) {
            client.sendMsg(sc.nextLine());
        }
        try {
            client.channel.socket().close();
            client.channel.close();
        } catch (IOException e) {
            System.out.println("退出异常...");
        }
    }

}

写代码时遇到的问题及个人理解

  • 当调用Channel的register方法注册到Selector前,一定要先把管道设置为非阻塞模式,即调用configureBlocking(false),否则会报模式错误。
  • SelectionKey与Channel是绑定关系,可以用SelectionKey获取绑定的Channel。在Selector中存储SelectionKey时其实有两个表:注册表与事件表。其中,注册表包含了所有注册到Selector的Channel对应的SelectionKey,而事件表只包含那些发生了监听事件的SelectionKey。
  • Selector提供的几个重要的API:select()、selectedKeys()、keys()。selectedKeys()返回的是事件表集合,keys()返回的是注册表集合。select()比较特殊,它返回的是发生监听事件的SelectionKey数目,并不是简单的返回事件表的大小,而是在注册表中存在但不在事件表中存在且此时发生了监听事件的SelectionKey数目(即从注册表新增到事件表的数目),且它会阻塞直到该数目>0。这几个方法弄清楚就可以解释为何每次遍历时需要移除当前元素。
  • 用户正常断开与非正常断开时在服务端的情况是不一样的。正常断开指客户端正常调用channel和socket的close方法关闭,此时服务端调用channel.read()时会返回-1,需要判断该情况并断开连接。非正常断开即客户端没有调用close方法关闭直接结束程序,此时服务端调用channel.read()会直接报IO异常,所以需要自己抓。
  • 多人聊天时客户端需要自己不断发消息同时不断接收服务端发送的消息,需使用两个线程分别处理。
  • 上述BIO实现的聊天室是主线程负责监听端口并建立连接,后丢给线程池中的线程负责每个连接的网络IO(一个连接一个线程);上述NIO实现的聊天室仅包含一个线程即主线程,负责监听端口建立连接以及每个连接的网络IO。实际上NIO也可扩展为多个selector的模式,效率更高,可参考Netty模型。