Java网络编程:IO、NIO

发布时间 2023-08-21 10:03:50作者: 起风了oc

Socket

套接字(socket)是一个抽象层,应用程序可以通过它发送或接收数据,可对其进行像对文件一样的打开、读写和关闭等操作。套接字允许应用程序将I/O插入到网络中,并与网络中的其他应用程序进行通信。网络套接字是IP地址与端口的组合。

看不懂?别急,先回忆一下大学学的计算机网络。

物理层,就是电话线、光纤、双绞线这些东西。其中电话线、光纤传的都是模拟信号(连续),把这些线缆插到调制解调器(猫)上,可以转换为电流承载的数字信号(离散),然后通过双绞线传到电脑的网卡。

数据链路层,数据链路层就是网卡那一层,网卡将双绞线传来的信号处理一下,把里面一些转义的数据还原,然后输出到计算机的总线上,这时候操作系统会出现硬中断,过来把这些数据写入内存指定的地方,一般是协议栈的缓冲区。

网络层,是一个抽象的概念,由操作系统实现。数据链路层的数据帧过来之后,操作系统会把数据帧解包,将IP分组拿出来先存着。IP分组是传输层的报文拆分出来的小包,因此一般不会只收到一个,需要等一组全到齐了,才会合并送往上层。

传输层,包括TCP和UDP,是应用程序可以直接使用的一层。网络层的IP分组到齐了之后,会把所有分组的数据合并到一起,送到这一层。

好吧前面都是废话。

数据链路层由网卡和驱动实现,网络层和传输层由操作系统实现。Socket就是操作系统提供的传输层API,可以让应用程序通过访问文件的方式来访问计算机网络的传输层。一个IP地址一个端口组合起来,就是一个套接字,IP地址用来找到目标主机,端口用来找到目标进程

Java IO

由于不同OS提供的Socket API不同(比如Linux和windows),Java作为一个跨平台的解释型语言,需要独立设计一套通用的API,然后再使用操作系统提供的API实现,进而使用户的Java代码可以实现“一次编译,处处运行”。

Stream

Java IO是面向流(Stream)的,大体分为输入流(InputStream)和输出流(OutputStream)。输入流用于读取数据,输出流用于写入数据。根据传输的数据类型,又分为字节流和字符流,字节流一般叫stream,字符流叫reader/writer。这种设计的出发点其实是将数据看作是水流,我们的程序就是对水的处理节点。

以字节流为例,最底层的抽象基类有两个:InputStream 和 OutputStream。根据这两个基类,可以基于装饰器模式衍生出各种各样的子类。当数据在文件,可以用FileInputStream;当数据就在内存,可以用 ByteArrayInputStream;当数据在其他机器上,需要网络传过来,则需要先建立网络连接,然后获取一个 InputStream。如果需要加一个缓冲区来提高性能,可以用 BufferedInputStream。输出流同理。

阻塞IO模型

Java IO是一种阻塞型的 IO 。比如读取一个文件,发起IO请求之后,线程就会陷入阻塞,失去CPU的使用权。当数据到了,线程会被唤醒,继续执行后面的代码。如果写客户端代码,在 UI 线程上搞这种阻塞操作,UI 就会变得卡顿。

用这种传统 IO 进行服务端网络编程时,需要用到多线程,大致代码如下:

public static void main(String[] args) {
    // 获取一个线程池
    ExecutorService executor = Executors.newCachedThreadPool();
    // 初始化服务端socket并且绑定9999端口
    try {
        ServerSocket serverSocket = new ServerSocket(9999);
        while (true) {
            // 等待客户端的连接
            Socket socket = serverSocket.accept();
            // 后续处理
            executor.submit(() -> {
                try {
                    // 获取输入流
                    var reader = new InputStreamReader(socket.getInputStream());
                    var bufferedReader = new BufferedReader(reader);
                    // 读取一行数据
                    var str = bufferedReader.readLine();
                    //输出打印
                    System.out.println(str);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}

ServerSocket 每次监听,都会阻塞当前线程,当有连接建立时,才会继续往下走。如果在 ServerSocket 监听的这个线程和连接进行通信,会将这个线程阻塞,导致代码走不到 accept 这一步,这段时间将导致其他连接无法建立,大幅降低程序并发量。因此需要专门分配一个线程用于监听,使用其他线程处理通信,可以来一个连接新建一个线程,也可以用线程池。

Java NIO

传统IO基于字节流和字符流进行操作,而NIO基于Channel和Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择器)用于批量监听通道事件(比如:连接打开,数据到达)。

非阻塞I/O

非阻塞I/O就是将网络I/O用到的各种系统调用设置为非阻塞模式。该模式下,这些函数调用之后,线程不会陷入阻塞。原来的read方法,调用之后线程会陷入阻塞,等待操作系统准备数据,当数据准备好之后,操作系统会将线程唤醒,让它继续执行后续操作。改为非阻塞之后,read方法一旦调用,会立即返回,线程不会陷入阻塞,不会等待操作系统准备数据。这么一来,就需要应用程序自己不断向操作系统询问:“我的数据好了没有?”

多路复用I/O

多路复用I/O是基于非阻塞I/O的一系列 I/O编程模型。多路,指的是很多socket,很多网络连接。复用,指的是将这些很多个socket放在同一个线程中处理。原来的阻塞I/O模型只能一个线程处理一个socket,现在,我想让一个线程处理多个socket,怎么办?于是出现了各种各样的手段来实现这一目标,这些手段统称为多路复用I/O模型。

select

学习 Selector 之前,我们至少要先了解一下操作系统的select系统调用。系统调用就是操作系统的提供的一系列 API,select 就是其中一个,它用于实现多路复用的I/O 操作。"select"的基本思想是允许一个线程同时监视多个文件描述符,从而可以由该线程确定哪些文件描述符已经就绪,然后我们可以只对就绪的文件描述符执行相应的操作,没有就绪的文件描述符就先不用管。由于它允许在多个 I/O 通道上复用同一个线程进行相应处理,被称为 I/O 多路复用。

Select 系统调用本质上就是让操作系统帮我们检查文件描述符的就绪状态。

poll

poll与 "select" 类似,"poll" 系统调用也可以用于实现多路复用。它接受一组文件描述符和超时时间,并在文件描述符就绪或超时发生时返回。

epoll(Linux )

在 Linux 系统上,"epoll" 是一种更高效的多路复用机制,相比于 "select" 和 "poll",它能够处理更大数量的文件描述符,并且在文件描述符就绪时不需要线性扫描。在使用 Selector 的时候,如果运行在支持 "epoll" 的 Linux 上,底层可能会使用 "epoll" 系统调用来实现多路复用。

IOCP(Windows)

Windows 操作系统上实现异步 I/O 的一套API。IOCP 的核心概念是 "完成端口"(Completion Port),是操作系统提供的资源,用于跟踪 I/O 操作的完成状态。应用程序向完成端口注册 I/O 操作,然后可以继续执行其他任务,当 I/O 操作完成时,操作系统会通知应用程序,并提供有关已完成操作的信息。

kqueue(BSD)

全称 Kernel Event Queue,在 BSD 系统(包括 MacOS)上实现异步 I/O 和事件通知的机制。它可以用于监视和处理多种类型的事件,如文件描述符上的读写、定时器、信号等,以及其他内核通知。kqueue 的设计目标是提供高效的事件驱动编程模型,允许应用程序以非阻塞方式处理多个事件源。

Channel(通道)

Channel和传统IO中的Stream类似。但Stream是单向的,或read only、或write only,譬如InputStreamOutputStream,而Channel是双向的,同一个channel,既可以读,又可以写。

  • FileChannel(文件IO)
  • DatagramChannel(UDP)
  • SocketChannel(TCP Client)
  • ServerSocketChannel(TCP Server)
Channel 接口的方法
  • isOpen(): 判断通道是否打开。
  • close(): 关闭通道。
  • read(ByteBuffer dst): 从通道读取数据到缓冲区。
  • write(ByteBuffer src): 将数据从缓冲区写入通道。
  • transferTo(long position, long count, WritableByteChannel target): 将数据从通道传输到另一个可写通道。
  • transferFrom(ReadableByteChannel src, long position, long count): 将数据从另一个可读通道传输到当前通道。
  • position() 和 position(long newPosition): 获取或设置通道的位置,用于文件通道等支持随机访问的通道。
  • size(): 获取通道的大小,仅适用于某些通道类型。
  • truncate(long size): 截断通道到指定大小,仅适用于某些通道类型。
  • force(boolean metaData): 强制将通道中的数据刷新到底层设备,仅适用于某些通道类型。
  • lock() 和 tryLock(): 获取通道的锁,用于文件通道的文件锁操作。
  • closeFuture(): 返回一个 Future 对象,用于在通道关闭时进行异步操作。
  • configureBlocking(boolean block): 设置通道为阻塞或非阻塞模式。
SocketChannel 的方法
  • open(): 静态方法,用于打开一个新的 SocketChannel。
  • connect(SocketAddress remote): 连接到远程服务器。
  • finishConnect(): 完成连接过程,确保连接已经建立。
  • configureBlocking(boolean block): 设置通道为阻塞或非阻塞模式。
  • isConnected(): 判断通道是否已连接。
  • isConnectionPending(): 判断是否正在进行连接。
ServerSocketChannel 的方法
  • open(): 静态方法,用于打开一个新的 ServerSocketChannel。
  • bind(SocketAddress local): 将通道绑定到特定的本地地址。
  • accept(): 接受一个客户端连接,返回一个 SocketChannel 用于与客户端通信。
  • isBound(): 判断通道是否已绑定到本地地址。
  • socket(): 获取关联的 ServerSocket 对象,用于设置服务器 socket 选项。

Buffer(缓冲区)

NIO 中的关键 Buffer 实现:ByteBufferCharBufferDoubleBufferFloatBufferIntBufferLongBufferShortBuffer

下面还有几个特殊的 Buffer

  • MappedByteBuffer

    MappedByteBuffer 继承自 ByteBuffer,内部维护了一个逻辑地址 address。可以将文件映射到虚拟内存,提高文件读写效率。

  • HeapByteBuffer

    JVM堆内内存。

  • DirectByteBuffer

    JVM堆外内存,也就是操作系统管理的内存。

缓冲区的四个核心属性
  • capacity: 容量,表示缓冲区中最大存储数据的容量。一旦声明不能更改。
  • limit: 限制,表示缓冲区中可以操作数据的大小。如果在写入buffer,则limit表示当前有多少空间可以被写入。如果在读取buffer,limit表示当前有多少数据可以被读取
  • position: 位置,表示缓冲区中正在操作数据的位置。
  • mark: 标记,表示记录当前 position 的位置。可以通过 reset() 恢复到 mark 的位置。
缓冲区方法
  • get() 方法系列: 从缓冲区读取数据,并将位置递增。例如,getInt()getDouble() 等。
  • put() 方法系列: 将数据写入缓冲区,并将位置递增。例如,putInt()putDouble() 等。
  • capacity(): 返回缓冲区的容量。
  • clear(): 清空缓冲区,重置 position 和 limit。
  • flip(): 反转缓冲区,将 limit 设置为当前 position,然后将 position 重置为 0,准备读取缓冲区数据。
  • rewind(): 将 position重置为 0,保持limit不变,使缓冲区可重新读取已写入的数据。
  • mark(): 标记当前 position,以便稍后通过调用 reset() 方法恢复到这个 position。
  • reset(): 将位置恢复到最后一次标记的位置。
  • hasRemaining(): 判断是否还有剩余未读取/未写入的数据。
  • remaining(): 返回剩余未读取/未写入数据的数量。
  • position() 和 position(int newPosition): 获取或设置当前位置。
  • limit() 和 limit(int newLimit): 获取或设置限制。
  • slice(): 创建一个新的缓冲区,共享原始缓冲区的数据,但有自己的位置、限制和容量。
  • duplicate(): 创建原始缓冲区的一个副本,共享相同的数据,但有自己的位置、限制和容量。
如何使用Buffer

nio的buffer,通过操作 capacity、limit、position 这三个参数,可以实现各种零拷贝的数据操作。

  • 写入数据

    用put方法写入数据。将buffer传递给channel,让channel进行写入时,channel也是调用buffer的put方法。每put一个数据,position都会跟着变化:position += 1,limit=capacity。

  • 读取数据

    当我们将buffer传递给channel,channel将一段数据写入了buffer,此时我们想要读取这段数据。需要调用一下flip():limit=position,position=0,然后再读取。同样,没调用一次get,position都会跟着变化:position += 1。

  • 重复读取

    当我们读取过这个buffer之后,想要再次读取,需要调用一下rewind():position=0。

  • 重新写入

    数据我们读取完了,里面的数据已经不重要了。我们需要向着个写入新的数据。这时候调用一下clear():position=0,limit=capacity。然后就可以传递给channel让它写数据了。

Selector(选择器)

Selector是 Java 实现 IO 多路复用的重要角色,在不同的操作系统上由不同的实现方式。Linux 上是基于 epoll 实现的。

select函数用于对文件描述符做批量管理。

使用 select,可以将我们手里持有的大量文件描述符交给内核分类,分成“可读”、“可写”、“异常”三组,然后我们可以再根据需要,分配线程去处理。

Java 的 selector 是对 select 函数的一个面向对象的封装。当我们持有大量 channel 时,可以它们注册到 selector上,交给 selector 管理,我们只需要关注其状态,准备好针对每种状态需要执行的操作即可。注册 channel 时需要指定一个操作,也就是 channel 的状态,比如 OP_READ,当某些 channel 数据到达且处于可读状态后,调用 selector.select() 时就会将这些 channel 封装为 SelectionKey 返回。

Demo

简单写个例子,服务器和客户端互相 ping pong 。

服务端
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;


public class NIOServer {
    public static void main(String[] args) throws Exception {

        // 创建Selector
        Selector selector = Selector.open();

        // 创建ServerSocketChannel,并绑定到指定端口
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress(8888));
        serverSocketChannel.configureBlocking(false);

        // 注册ServerSocketChannel到Selector上,监听连接事件
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        System.out.println("Server started at port 8888...");

        while (true) {
            // 监听所有注册到Selector上的Channel,阻塞直到有事件发生
            int readyChannels = selector.select();

            if (readyChannels == 0) {
                continue;
            }

            // 处理所有发生事件的Channel
            Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
            while (keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();
                keyIterator.remove();

                if (!key.isValid()) {
                    continue;
                }

                // 处理新连接
                if (key.isAcceptable()) {
                    ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
                    SocketChannel clientChannel = serverChannel.accept();
                    clientChannel.configureBlocking(false);
                    clientChannel.register(selector, SelectionKey.OP_READ);
                    System.out.println("New client connected: " + clientChannel.getRemoteAddress());
                }

                // 处理读事件
                if (key.isReadable()) {
                    SocketChannel clientChannel = (SocketChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    int bytesRead = clientChannel.read(buffer);
                    if (bytesRead > 0) {
                        buffer.flip();
                        byte[] bytes = new byte[buffer.remaining()];
                        buffer.get(bytes);
                        String message = new String(bytes).trim();
                        System.out.println("Message received from " + clientChannel.getRemoteAddress() + ": " + message);

                        // 回复消息
                        clientChannel.write(ByteBuffer.wrap("Hello !".getBytes()));
                    } else if (bytesRead < 0) {
                        // 客户端断开连接
                        key.cancel();
                        clientChannel.close();
                    }
                }
            }
        }
    }
}
客户端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.concurrent.TimeUnit;

public class NIOClient {
    public static void main(String[] args) {
        try {
            InetSocketAddress serverAddress = new InetSocketAddress("127.0.0.1", 8888);
            SocketChannel socketChannel = SocketChannel.open(serverAddress);

            ByteBuffer buffer = ByteBuffer.allocate(1024);
            while (true) {
                buffer.clear();
                buffer.put("Hello, I am client !".getBytes());
                buffer.flip();
                socketChannel.write(buffer);

                // 读取服务器的响应
                buffer.clear();
                int bytesRead = socketChannel.read(buffer);
                if (bytesRead > 0) {
                    buffer.flip();
                    String receivedMessage = new String(buffer.array(), 0, bytesRead);
                    System.out.println("Received from server: " + receivedMessage);
                }

                TimeUnit.SECONDS.sleep(2);
            }
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}