BIO

发布时间 2023-08-24 22:14:04作者: JavaCoderPan

BIO

1BIO介绍

传统阻塞Java IO编程,其相关的类和接口在Java.io 包中

BIO(blocking I/O)同步阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制改善 (实现多个客户连接服务器)

2工作机制

3BIO传统实例

网络编程的基本模型是Client/Server模型,也就是两个进程之间进行相互通信,其中服务端提供位置信息(绑定IP地址和端口),客户端通过连接操作向服务端监听的端口地址发起连接请求,基于TCP协议下进行三次握手连接,连接成功后,双方通过网络套接字(Socket)进行通信。

传统的同步阻塞模型开发中,服务端ServerSocket负责绑定IP地址,启动监听端口;客户端Socket负责发起 连接操作。连接成功后,双方通过输入和输出流进行同步阻塞式通信。

基于BIO模式下的通信,客户端-服务端是完全同步,完全藕合的。

客户端案例如下:

import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.Socket;
import java.util.Scanner;

/*** 客户端 */
public class Client {
    public static void main(String[] args) {
        try {//1.创建Socket对象请求服务端的连接
            Socket socket = new Socket("127.0.0.1", 9999);
            //2.从Socket对象中获取一个字节输出流
            OutputStream os = socket.getOutputStream();
            //3.把字节输出流包装成一个打印流
            PrintStream ps = new PrintStream(os);
            // 单词发送消息
//            ps.print("hello World! 服务端,你好");
//            ps.println("hello World! 服务端,你好");
//            ps.flush();
            // 发送多条消息
            Scanner sc = new Scanner(System.in);
            while (true) {
                System.out.print("请说:");
                String msg = sc.nextLine();
                ps.println(msg);
                ps.flush();
            }
        } catch (
                IOException e) {
            e.printStackTrace();
        }
    }
}

服务端案例如下:

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * TODO 服务端
 * 服务端会一直等待客户端的消息,如果客户端没有进行消息的发送,服务端将一直进入阻塞状态
 * @author ss_419
 * @version 1.0
 * @date 2023/8/23 11:09
 */
public class Server {
    public static void main(String[] args) {
        try{
            System.out.println("===服务端启动===");
            // 1、定义一个ServerSocket对象进行服务端的端口注册
            ServerSocket ss = new ServerSocket(9999);
            // 2、监听客户端的Socket连接请求
            Socket socket = ss.accept();
            // 3、从socket管道中得到一个字节输入流对象
            InputStream is = socket.getInputStream();
            // 4、把字节输入流包装成一个缓存字符输入流
            BufferedReader br = new BufferedReader(new InputStreamReader(is));
            String msg;
            // 接收多条消息
            while ((msg = br.readLine()) != null){
                System.out.println("服务端收到:" + msg);
            }
            // 接收一条消息
//            if ((msg = br.readLine()) != null){
//                System.out.println("服务端收到:" + msg);
//            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

小结

  • 在以上通信中,服务端会一直等待客户端的消息,如果客户端没有进行消息的发送,服务端将一直进入阻塞状态

  • 同时服务端是按照行获取消息的,这意味育客户端也必须按照行进行消息的发送,否则服务端将进入等待消息的阻塞状态!

4BIO模式下接收多个客户端

在上述的案例中,一个服务端只能接收一个客户端的通信请求,那么如果服务端需要处理很多个客户端的消息通信请求应该如何处理呢,此时我们就需要在服务端引入线程了,也就是说客户端每发起一个请求,服务端就创建一个新的线程来处理这个客户端请求,这样就实现了一个客户端一个线程的模型

图解如下:

客户端代码:

import java.io.PrintStream;
import java.net.Socket;
import java.util.Scanner;

/**
 * TODO
 *
 * @author ss_419
 * @version 1.0
 * @date 2023/8/23 11:20
 */
public class Client {
    public static void main(String[] args) {
        try{
            // 1、请求与服务器的Socket对象连接
            Socket socket = new Socket("localhost", 9999);
            // 2、得到一个打印流
            PrintStream ps = new PrintStream(socket.getOutputStream());
            // 3、循环不断的发送消息给服务器端接收
            Scanner sc = new Scanner(System.in);
            while (true){
                System.out.print(" [root@localhost]:");
                String msg = sc.nextLine();
                ps.println(msg);
                ps.flush();
            }
        }catch (Exception e){

        }
    }
}

服务端代码:

import java.net.ServerSocket;
import java.net.Socket;

/**
 * TODO 服务端可以实现同时接收多个客户端的Socket通信需求
 *
 * 服务端每接收到一个客户端Socket请求对象之后都交给一个独立的线程来处理客户端的数据交互需求
 *
 * @author ss_419
 * @version 1.0
 * @date 2023/8/23 11:28
 */
public class Server {
    public static void main(String[] args) {
        try {
            // 1、注册端口
            ServerSocket ss = new ServerSocket(9999);
            // 2、循环接收客户端的Socket连接请求
            while (true) {
                Socket socket = ss.accept();
                // 3、创建一个独立的线程来处理与这个客户端的socket通信需求
                new ServerThreadReader(socket).start();
            }
        }catch (Exception e) {
            e.printStackTrace();
        }
    }
}

线程类:

public class ServerThreadReader extends Thread{
    private Socket socket;

    public ServerThreadReader(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        try{
            // 1、从socket对象中得到一个字节输入流
            InputStream is = socket.getInputStream();
            // 2、使用缓存字符输入流包装字节输入流
            BufferedReader br = new BufferedReader(new InputStreamReader(is));
            String msg;
            // 3、循环打印客户端的消息
            while ((msg = br.readLine()) != null){
                System.out.println(msg);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

小结:

  • 每个Socket接收到,都会创建一个线程,线程的竞争、切换上下文影响性能

  • 每个线程都会占用栈空间和CPU资源

  • 并不是每个socket都进行lO操作,无意义的线程处理

  • 客户端的并发访问增加时。服务端将呈现1:1的线程开销,访问量越大,系统将发生线程栈溢出,线程创建失败,最终导致进程宕机或者僵死,从而不能对外提供服务

5伪异步I/O

在上述案例中,客户端的并发访问增加时。服务端将呈现1:1的线程开销,访问量越大,系统将发送线程栈溢出,线程创建失败,最终导致进程宕机或者僵死,从而不能对外提供服务。

接下来我们采用一个伪异步I/O的通信框架,采用线程池和任务队列实现,当客户端接入时,将客户端的Socket封装成一个Task(该任务实现Java. lang. Runnable(线程任务接口)交给后端的线程池中进行处理。JDK的线程池维护一个消息队列和N个活跃的线程,对消息队列中Socket任务进行处理,由于线程池可以设置消息队列的大小和最大线程数,因此,它的资源占用是可控的,无论多少个客户端并发访问,都不会导致资源的耗尽和宕机。

客户端代码:

import java.io.PrintStream;
import java.net.Socket;
import java.util.Scanner;

/**
 * TODO 客户端
 *
 * @author ss_419
 * @version 1.0
 * @date 2023/8/23 11:42
 */
public class Client {
    public static void main(String[] args) {
        try{
            // 1、请求与服务器端的Socket对象连接
            Socket socket = new Socket("localhost", 9999);
            // 2、得到一个打印流
            PrintStream ps = new PrintStream(socket.getOutputStream());
            // 3、使用循环不断向服务端发送消息
            Scanner sc = new Scanner(System.in);
            while (true){
                System.out.print(" [root@localhost]:");
                String msg = sc.nextLine();
                ps.println(msg);
                ps.flush();
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

线程池处理类:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * TODO 线程池处理类
 *
 * @author ss_419
 * @version 1.0
 * @date 2023/8/23 11:46
 */
public class HandlerSocketServerPool {
    // 1、创建一个线程池的成员变量用于存储一个线程池对象
    private ExecutorService executorService;

    /**
     * 2、创建这个类的对象的时候就需要初始化线程池对象
     * public ThreadPoolExecutor(int corePoolSize,
     *                               int maximumPoolSize,
     *                               long keepAliveTime,
     *                               TimeUnit unit,
     *                               BlockingQueue<Runnable> workQueue)
     */
    public HandlerSocketServerPool(int maxThreadNum, int queueSize) {
        executorService = new ThreadPoolExecutor(
                3,
                maxThreadNum,
                120,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(queueSize));
    }
    /**
     * 3、提供一个方法来提交任务给线程池的任务队列来暂存,等待线程池来处理
     */
    public void execute(Runnable target){
        executorService.execute(target);
    }
}

Socket 任务类:

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.Socket;

/**
 * TODO 线程任务类
 *
 * @author ss_419
 * @version 1.0
 * @date 2023/8/23 11:52
 */
public class ServerRunnableTarget implements Runnable{
    private Socket socket;

    public ServerRunnableTarget(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        // 处理接收到客户端socket通信需求
        try{
            // 1、从socket管道中得到一个字节输入流对象
            InputStream is = socket.getInputStream();
            // 2、把字节输入流包装成一个缓存字符输入流
            BufferedReader br = new BufferedReader(new InputStreamReader(is));
            String msg;
            while ((msg = br.readLine()) != null){
                System.out.println("服务端收到 => " + msg);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

服务端代码:

import java.net.ServerSocket;
import java.net.Socket;

/**
 * TODO
 *
 * @author ss_419
 * @version 1.0
 * @date 2023/8/23 11:54
 */
public class Server {
    public static void main(String[] args) {
        try{
            // 1、注册端口
            ServerSocket ss = new ServerSocket(9999);
            // 2、定义死循环,负责不断接收客户端的Socket请求
            // 初始化一个线程池对象
            HandlerSocketServerPool pool = new HandlerSocketServerPool(3, 10);
            while (true){
                Socket socket = ss.accept();
                // 3、把socket对象交给一个线程池进行处理
                ServerRunnableTarget target = new ServerRunnableTarget(socket);
                pool.execute(target);
            }
        }catch(Exception e){
            e.printStackTrace();
        }
    }
}

6BIO文件上传

支持任意类型文件形式的上传

客户端代码:

import java.io.DataOutputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.net.Socket;

/**
 * TODO
 *
 * @author ss_419
 * @version 1.0
 * @date 2023/8/23 14:58
 */
public class Client {
    public static void main(String[] args) {
        try(
                InputStream is = new FileInputStream("/Library/Soft/data/io/1.jpg");
                ){
            // 1、请求与服务端的Socket连接
            Socket socket = new Socket("localhost", 8888);
            // 2、把字节输出流包装成一个数据输出流(DataOutputStream可以做分段数据发送)
            DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
            // 3、先发送上传文件的后缀给服务器
            dos.writeUTF(".jpg");
            // 4、把文件数据发送给服务端进行接收
            byte[] buffer = new byte[1024];
            int len;
            while ((len = is.read(buffer)) > 0){
                dos.write(buffer,0,len);
            }
            dos.flush();
            socket.shutdownOutput();// 通知服务端,客户端发送完毕
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

服务端代码:

public class Server {
    public static void main(String[] args) {
        try{
            ServerSocket ss = new ServerSocket(8888);
            while (true){
                Socket socket = ss.accept();
                // 交给一个独立的线程来处理与这个客户端的文件通信需求
                new ServerReadThread(socket).start();
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

Socket 线程处理类:

import java.io.DataInputStream;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.UUID;

/**
 * TODO
 *
 * @author ss_419
 * @version 1.0
 * @date 2023/8/23 14:46
 */
public class ServerReadThread extends Thread{
    private Socket socket;

    public ServerReadThread(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        try{
            // 1、得到一个数据输入流来读取客户端发送过来的睡
            DataInputStream dis = new DataInputStream(socket.getInputStream());
            // 2、读取客户端发送过来的文件类型
            String suffix = dis.readUTF();
            System.out.println("服务端已经成功接收到了文件类型:" + suffix);
            // 3、定义一个字节输出管道,负责把客户端发来的文件数据写出去
            OutputStream os = new FileOutputStream("/Library/Soft/data/io/" + UUID.randomUUID().toString() + suffix);
            // 4、从数据输入流中读取文件数据,写出到字节输出流中去
            byte[] buffer = new byte[1024];
            int len;
            while ((len = dis.read(buffer)) > 0){
                os.write(buffer,0,len);
            }
            os.close();
            System.out.println("服务端接收文件保存成功");
        }catch (Exception e){

        }
    }
}

7BIO 端口转发

需求:实现一个群聊,即一个客户端消息可以发送给所有客户端接收

客户端代码:

import java.io.OutputStream;
import java.io.PrintStream;
import java.net.Socket;
import java.util.Scanner;

/**
 * TODO
 *
 * @author ss_419
 * @version 1.0
 * @date 2023/8/23 15:28
 */
public class Client {
    public static void main(String[] args) {
        try{
            // 1、请求与服务端的Socket对象连接
            Socket socket = new Socket("localhost", 9999);
            // 收消息
            Thread clientThread = new ClientReaderThread(socket);
            clientThread.start();

            while (true){
                // 发消息
                OutputStream os = socket.getOutputStream();
                PrintStream ps = new PrintStream(os);
                // 3、使用循环不断发送消息给服务端接收
                Scanner sc = new Scanner(System.in);
                String msg = sc.nextLine();
                ps.println(msg);
                ps.flush();
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

客户端线程处理类:

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.Socket;

/**
 * TODO 客户端线程处理
 *
 * @author ss_419
 * @version 1.0
 * @date 2023/8/23 15:31
 */
public class ClientReaderThread extends Thread {

    private Socket socket;

    public ClientReaderThread(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        try{
            while (true) {
                InputStream is = socket.getInputStream();
                BufferedReader br = new BufferedReader(new InputStreamReader(is));
                String msg;
//                while ((msg = br.readLine()) != null){
//                    System.out.println(msg);
//                }
                if ((msg = br.readLine()) != null){
                    System.out.println(msg);
                }
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

服务端代码:

import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;

/**
 * TODO
 *
 * @author ss_419
 * @version 1.0
 * @date 2023/8/23 15:46
 */
public class Server {
    // 定义一个静态集合,用于存放在线的用户
    public static List<Socket> allSocketOnLine = new ArrayList<>();

    public static void main(String[] args) {
        try{
            ServerSocket ss = new ServerSocket(9999);
            while (true){
                Socket socket = ss.accept();
                // 登录成功的客户端存入在线集合
                allSocketOnLine.add(socket);
                // 为当前登录成功的socket分配一个独立的线程来处理与之通信
                new ServerReaderThread(socket).start();
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

服务端线程处理类:

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.Socket;

/**
 * TODO
 *
 * @author ss_419
 * @version 1.0
 * @date 2023/8/23 15:38
 */
public class ServerReaderThread extends Thread {
    private Socket socket;

    public ServerReaderThread(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        try{
           // 1、从socket中获取当前客户端的输入流
            BufferedReader br = new BufferedReader(
                                        new InputStreamReader(
                                            socket.getInputStream()));
            String msg;
            while ((msg = br.readLine()) != null){
                System.out.println("服务器收到消息 :" + msg);
                // 2、服务端收到了客户端的消息后,需要推送给所有的当前在线的socket
                sendMsgToAllClient(msg,socket);
            }
        }catch (Exception e){
            e.printStackTrace();
            System.out.println("当前有人下线了!");
            // 从在线socket集合中移出本socket
            Server.allSocketOnLine.remove(socket);
        }
    }

    /**
     * 把当前客户端发送来的消息推送给全部在线的socket
     * @param msg
     * @param socket
     */
    private void sendMsgToAllClient(String msg, Socket socket) throws IOException {
        for (Socket sk : Server.allSocketOnLine){
            // 只发送给除自己意外的其他客户端
            if (socket != sk){
                PrintStream ps = new PrintStream(sk.getOutputStream());
                ps.println(msg);
                ps.flush();
            }
        }
    }
}

8BIO即时通讯

基于BIO模式下的即时通信,我们需要解决客户端到客户端的通信,也就是需要实现客户端与客户端的端口消息转发逻辑