Vue3 + Express 实现大文件分片上传、断点续传、秒传

发布时间 2023-11-29 10:20:26作者: 柯基与佩奇

前言

在日常开发中,文件上传是常见的操作之一。文件上传技术使得用户可以方便地将本地文件上传到 Web 服务器上,这在许多场景下都是必需的,比如网盘上传、头像上传等。 但是当需要上传比较大的文件的时候,容易碰到以下问题:

  1. 上传时间比较久;
  2. 中间一旦出错就需要重新上传;
  3. 一般服务端会对文件的大小进行限制。

这几个问题会导致上传时候的用户体验是很不好的,针对存在的这些问题,可以通过分片上传来解决。

原理

分片上传的原理就像是把一个大蛋糕切成小块一样。

首先,将要上传的大文件分成许多小块,每个小块大小相同,比如每块大小为 1MB。然后,逐个上传这些小块到服务器。上传的时候,可以同时上传多个小块,也可以一个一个地上传。上传每个小块后,服务器会保存这些小块,并记录它们的顺序和位置信息。

所有小块上传完成后,服务器会把这些小块按照正确的顺序拼接起来,还原成完整的大文件。最后,就成功地上传了整个大文件。

image-20231103220204032.png

分片上传的好处在于它可以减少上传失败的风险。如果在上传过程中出现了问题,只需要重新上传出错的那个小块,而不需要重新上传整个大文件。此外,分片上传还可以加快上传速度。因为可以同时上传多个小块,充分利用网络的带宽。这样就能够更快地完成文件的上传过程。

实现

项目搭建

要实现大文件上传,还需要后端的支持,所以就用nodejs来写后端代码。

前端:vue3 + vite

后端:express 框架,用到的工具包:nodemoncorsconnect-multipartyfs-extrabody-parser

获取文件

通过监听 input 标签的 change 事件,当选择本地文件后,可以在回调函数中获取对应的文件。(也可以使用第三方 UI 库的上传组件获取到对应的文件)

<script>
// 绑定上传事件
const handleUpload = async (e: Event) => {
  const file = (e.target as HTMLInputElement).files?.[0];
  if (!file) return;
};
</script>

<template>
  <h1>大文件上传</h1>
  <input type="file" @change="handleUpload" />
</template>

文件分片

文件分片的核心是用Blob 对象的 slice方法,在上一步获取到选择的文件是一个File对象,它是继承于Blob,所以就可以用 slice 方法对文件进行分片。

// 绑定上传事件
const handleUpload = async (e: Event) => {
  ...

+  // 创建文件分片
+  const chunks = createChunks(file);
};
const CHUNK_SIZE = 1024 * 1024; // 1MB

// 创建文件分片
const createChunks = (file: File) => {
  let start = 0;
  const chunks = [];
  while (start < file.size) {
    chunks.push(file.slice(start, start + CHUNK_SIZE));
    start += CHUNK_SIZE;
  }
  return chunks;
};

hash 计算

先来思考一个问题,在向服务器上传文件时,怎么去区分不同的文件呢?如果根据文件名去区分的话可以吗?

答案是不可以,因为文件名可以是随便修改的,所以不能根据文件名去区分。但是每一份文件的文件内容都不一样,可以根据文件的内容去区分,具体怎么做呢?

可以根据文件内容生产一个唯一的 hash 值,大家应该都见过用 webpack 打包出来的文件的文件名都有一串不一样的字符串,这个字符串就是根据文件的内容生成的 hash 值,文件内容变化,hash 值就会跟着发生变化。在这里,也可以用这个办法来区分不同的文件。而且通过这个办法,还可以实现秒传的功能,怎么做呢?

就是服务器在处理上传文件的请求的时候,要先判断下对应文件的 hash 值有没有记录,如果 A 和 B 先后上传一份内容相同的文件,所以这两份文件的 hash 值是一样的。当 A 上传的时候会根据文件内容生成一个对应的 hash 值,然后在服务器上就会有一个对应的文件,B 再上传的时候,服务器就会发现这个文件的 hash 值之前已经有记录了,说明之前已经上传过相同内容的文件了,所以就不用处理 B 的这个上传请求了,给用户的感觉就像是实现了秒传。

那么怎么计算文件的 hash 值呢?可以通过一个工具:spark-md5,所以得先安装它。

const fileHash = ref(""); // 文件hash

// 绑定上传事件
const handleUpload = async (e: Event) => {
  ...

  // 创建文件分片
  const chunks = createChunks(file);

+  // 计算文件内容hash值
+  fileHash.value = await calculateHash(file);
};
// 计算文件内容hash值
const calculateHash = (file: File): Promise<string> => {
  return new Promise((resolve) => {
    const fileReader = new FileReader();
    fileReader.readAsArrayBuffer(file);
    fileReader.onload = function (e) {
      const spark = new SparkMD5.ArrayBuffer();
      spark.append((e.target as FileReader).result as ArrayBuffer);
      resolve(spark.end());
    };
  });
};

文件上传

前端实现

前面已经完成了上传的前置操作,接下来就来看下如何去上传这些切片。

以 1G 的文件来分析,假如每个分片的大小为 1M,那么总的分片数将会是 1024 个,如果同时发送这 1024 个分片,浏览器肯定处理不了,原因是切片文件过多,浏览器一次性创建了太多的请求。这是没有必要的,拿 chrome 浏览器来说,默认的并发数量只有 6,过多的请求并不会提升上传速度,反而是给浏览器带来了巨大的负担。因此,有必要限制前端请求个数。

怎么做呢,要创建最大并发数的请求,比如 6 个,那么同一时刻就允许浏览器只发送 6 个请求,其中一个请求有了返回的结果后再发起一个新的请求,依此类推,直至所有的请求发送完毕。

上传文件时一般还要用到FormData对象,需要将要传递的文件还有额外信息放到这个FormData对象里面。

// 绑定上传事件
const handleUpload = async (e: Event) => {
  ...

+  // 上传文件分片
+  uploadChunks(chunks);
};
// 上传文件分片
const uploadChunks = async (chunks: Array<Blob>) => {
  const formDatas = chunks
    .map((chunk, index) => ({
      fileHash: fileHash.value,
      chunkHash: fileHash.value + "-" + index,
      chunk,
    }))
    .map((item) => {
      const formData = new FormData();
      formData.append("fileHash", item.fileHash);
      formData.append("chunkHash", item.chunkHash);
      formData.append("chunk", item.chunk);
      return formData;
    });

  const taskPool = formDatas.map(
    (formData) => () =>
      fetch("http://localhost:3000/upload", {
        method: "POST",
        body: formData,
      })
  );

  // 控制请求并发
  await concurRequest(taskPool, 6);
};
// 控制请求并发
const concurRequest = (
  taskPool: Array<() => Promise<Response>>,
  max: number
): Promise<Array<Response | unknown>> => {
  return new Promise((resolve) => {
    if (taskPool.length === 0) {
      resolve([]);
      return;
    }

    const results: Array<Response | unknown> = [];
    let index = 0;
    let count = 0;

    const request = async () => {
      if (index === taskPool.length) return;
      const i = index;
      const task = taskPool[index];
      index++;
      try {
        results[i] = await task();
      } catch (err) {
        results[i] = err;
      } finally {
        count++;
        if (count === taskPool.length) {
          resolve(results);
        }
        request();
      }
    };

    const times = Math.min(max, taskPool.length);
    for (let i = 0; i < times; i++) {
      request();
    }
  });
};

后端实现

后端处理文件时需要用到connect-multiparty这个工具,所以也是得先安装,然后再引入它。

在处理每个上传的分片的时候,应该先将它们临时存放到服务器的一个地方,方便合并的时候再去读取。为了区分不同文件的分片,就用文件对应的那个 hash 为文件夹的名称,将这个文件的所有分片放到这个文件夹中。

const multipart = require("connect-multiparty");
const multipartMiddleware = multipart();

// 所有上传的文件存放在该目录下
const UPLOADS_DIR = path.resolve("uploads");

/**
 * 上传
 */
app.post("/upload", multipartMiddleware, (req, res) => {
  const { fileHash, chunkHash } = req.body;

  // 如果临时文件夹(用于保存分片)不存在,则创建
  const chunkDir = path.resolve(UPLOADS_DIR, fileHash);
  if (!fse.existsSync(chunkDir)) {
    fse.mkdirSync(chunkDir);
  }

  // 如果临时文件夹里不存在该分片,则将用户上传的分片移到临时文件夹里
  const chunkPath = path.resolve(chunkDir, chunkHash);
  if (!fse.existsSync(chunkPath)) {
    fse.moveSync(req.files.chunk.path, chunkPath);
  }

  res.send({
    success: true,
    msg: "上传成功",
  });
});

写完前后端代码后就可以来试下看看文件能不能实切片的上传,如果没有错误的话,服务器的 uploads 文件夹下应该就会多一个文件夹,这个文件夹里面就是存储的所有文件的分片了。

文件合并

上一步已经实现了将所有切片上传到服务器了,上传完成之后,就可以将所有的切片合并成一个完整的文件了,下面就一块来实现下。

前端实现

前端只需要向服务器发送一个合并的请求,并且为了区分要合并的文件,需要将文件的 hash 值给传过去。

+  const fileName = ref(""); // 文件名称

  // 绑定上传事件
  const handleUpload = async (e: Event) => {
    const file = (e.target as HTMLInputElement).files?.[0];
    if (!file) return;

+    fileName.value = file.name;

    ...
  };
// 上传文件分片
const uploadChunks = async (
  chunks: Array<Blob>,
) => {
  ...

+  // 合并分片请求
+  mergeRequest();
};
// 合并分片请求
const mergeRequest = () => {
  fetch("http://localhost:3000/merge", {
    method: "POST",
    headers: {
      "Content-Type": "application/json",
    },
    body: JSON.stringify({
      fileHash: fileHash.value,
      fileName: fileName.value,
    }),
  });
};

后端实现

在之前已经可以将所有的切片上传到服务器并存储到对应的目录里面去了,合并的时候需要从对应的文件夹中获取所有的切片,然后利用文件的读写操作,就可以实现文件的台并了。合并完成之后,将生成的文件以 hash 值命名存放到对应的位置就可以了。

/**
 * 合并
 */
app.post("/merge", async (req, res) => {
  const { fileHash, fileName } = req.body;

  // 最终合并的文件路径
  const filePath = path.resolve(UPLOADS_DIR, fileHash + path.extname(fileName));
  // 临时文件夹路径
  const chunkDir = path.resolve(UPLOADS_DIR, fileHash);

  // 读取临时文件夹,获取该文件夹下“所有文件(分片)名称”的数组对象
  const chunkPaths = fse.readdirSync(chunkDir);

  // 读取临时文件夹获得的文件(分片)名称数组可能乱序,需要重新排序
  chunkPaths.sort((a, b) => a.split("-")[1] - b.split("-")[1]);

  // 遍历文件(分片)数组,将分片追加到文件中
  const pool = chunkPaths.map(
    (chunkName) =>
      new Promise((resolve) => {
        const chunkPath = path.resolve(chunkDir, chunkName);
        // 将分片追加到文件中
        fse.appendFileSync(filePath, fse.readFileSync(chunkPath));
        // 删除分片
        fse.unlinkSync(chunkPath);
        resolve();
      })
  );
  await Promise.all(pool);
  // 等待所有分片追加到文件后,删除临时文件夹
  fse.removeSync(chunkDir);

  res.send({
    success: true,
    msg: "合并成功",
  });
});

到这里,就已经实现了大文件的分片上传的基本功能了,但是没有考虑到如果上传相同的文件的情况,而且如果中间网络断了,就得重新上传所有的分片,这些情况在大文件上传中也都需要考虑到,下面,就来解决下这两个问题。

秒传&断点续传

在上面有提到,如果内容相同的文件进行 hash 计算时,对应的 hash 值应该是一样的,而且在服务器上给上传的文件命名的时候就是用对应的 hash 值命名的,所以在上传之前是不是可以加一个判断,如果有对应的这个文件,就不用再重复上传了,直接告诉用户上传成功,给用户的感觉就像是实现了秒传。接下来,就来看下如何实现的。

前端实现

前端在上传之前,需要将对应文件的 hash 值告诉服务器,看看服务器上有没有对应的这个文件,如果有,就直接返回,不执行上传分片的操作了。

// 校验文件、文件分片是否存在
const verify = (fileHash: string, fileName: string) => {
  return fetch("http://localhost:3000/verify", {
    method: "POST",
    headers: {
      "Content-Type": "application/json",
    },
    body: JSON.stringify({
      fileHash,
      fileName,
    }),
  }).then((res) => res.json());
};
// 绑定上传事件
const handleUpload = async (e: Event) => {
  ...

  // 计算文件内容hash值
  fileHash.value = await calculateHash(file);

+  // 校验文件、文件分片是否存在
+  const verifyRes = await verify(fileHash.value, fileName.value);
+  const { existFile, existChunks } = verifyRes.data;
+  if (existFile) return;

  ...
};

后端实现

因为在合并文件时,文件名是根据该文件的 hash 值命名的,所以只需要看看服务器上有没有对应的这个 hash 值的文件就可以判断了。

/**
 * 校验
 */
app.post("/verify", (req, res) => {
  const { fileHash, fileName } = req.body;

  // 判断服务器上是否存在该hash值的文件
  const filePath = path.resolve(UPLOADS_DIR, fileHash + path.extname(fileName));
  const existFile = fse.existsSync(filePath);

  res.send({
    success: true,
    msg: "校验文件",
    data: {
      existFile,
    },
  });
});

完成上面的步骤后,当再上传相同的文件,即使改了文件名,也会提示秒传成功了,因为服务器上已经有对应的那个文件了。上面解决了重复上传的文件,但是对于网络中断需要重新上传的问题没有解决,那该如何解决呢?

如果之前已经上传了一部分分片了,只需要再上传之前拿到这部分分片,然后再过滤掉是不是就可以避免去重复上传这些分片了,也就是只需要上传那些上传失败的分片,所以,再上传之前还得加一个判断。

前端实现

还是在那个/verify的接口中去获取已经上传成功的分片,然后在上传分片前进行一个过滤。

// 绑定上传事件
const handleUpload = async (e: Event) => {
  ...

  // 上传文件分片
-  uploadChunks(chunks);
+  uploadChunks(chunks, existChunks);
};
// 上传文件分片
const uploadChunks = async (
  ...
+  existChunks: Array<string>
) => {
  const formDatas = chunks
    .map((chunk, index) => ({
      fileHash: fileHash.value,
      chunkHash: fileHash.value + "-" + index,
      chunk,
    }))
+    .filter((item) => !existChunks.includes(item.chunkHash))
    .map((item) => {
      const formData = new FormData();
      formData.append("fileHash", item.fileHash);
      formData.append("chunkHash", item.chunkHash);
      formData.append("chunk", item.chunk);
      return formData;
    });

  ...
};

后端实现

只需要在/verify这个接口中加上已经上传成功的所有切片的名称就可以,因为所有的切片都存放在以文件的 hash 值命名的那个文件夹,所以需要读取这个文件夹中所有的切片的名称就可以。

/**
 * 校验
 */
app.post("/verify", (req, res) => {
  const { fileHash, fileName } = req.body;

  // 判断服务器上是否存在该hash值的文件
  const filePath = path.resolve(UPLOADS_DIR, fileHash + path.extname(fileName));
  const existFile = fse.existsSync(filePath);

+  // 获取已经上传到服务器的文件分片
+  const chunkDir = path.resolve(UPLOADS_DIR, fileHash);
+  const existChunks = [];
+  if (fse.existsSync(chunkDir)) {
+    existChunks.push(...fse.readdirSync(chunkDir));
+  }

  res.send({
    success: true,
    msg: "校验文件",
    data: {
      existFile,
+      existChunks,
    },
  });
});

完整代码

前端部分

<script setup lang="ts">
import { ref } from "vue";
import SparkMD5 from "spark-md5";

const CHUNK_SIZE = 1024 * 1024; // 1MB
const fileName = ref(""); // 文件名称
const fileHash = ref(""); // 文件hash

// 创建文件分片
const createChunks = (file: File) => {
  let start = 0;
  const chunks = [];
  while (start < file.size) {
    chunks.push(file.slice(start, start + CHUNK_SIZE));
    start += CHUNK_SIZE;
  }
  return chunks;
};

// 计算文件内容hash值
const calculateHash = (file: File): Promise<string> => {
  return new Promise((resolve) => {
    const fileReader = new FileReader();
    fileReader.readAsArrayBuffer(file);
    fileReader.onload = function (e) {
      const spark = new SparkMD5.ArrayBuffer();
      spark.append((e.target as FileReader).result as ArrayBuffer);
      resolve(spark.end());
    };
  });
};

// 控制请求并发
const concurRequest = (
  taskPool: Array<() => Promise<Response>>,
  max: number
): Promise<Array<Response | unknown>> => {
  return new Promise((resolve) => {
    if (taskPool.length === 0) {
      resolve([]);
      return;
    }

    const results: Array<Response | unknown> = [];
    let index = 0;
    let count = 0;

    const request = async () => {
      if (index === taskPool.length) return;
      const i = index;
      const task = taskPool[index];
      index++;
      try {
        results[i] = await task();
      } catch (err) {
        results[i] = err;
      } finally {
        count++;
        if (count === taskPool.length) {
          resolve(results);
        }
        request();
      }
    };

    const times = Math.min(max, taskPool.length);
    for (let i = 0; i < times; i++) {
      request();
    }
  });
};

// 合并分片请求
const mergeRequest = () => {
  fetch("http://localhost:3000/merge", {
    method: "POST",
    headers: {
      "Content-Type": "application/json",
    },
    body: JSON.stringify({
      fileHash: fileHash.value,
      fileName: fileName.value,
    }),
  });
};

// 上传文件分片
const uploadChunks = async (
  chunks: Array<Blob>,
  existChunks: Array<string>
) => {
  const formDatas = chunks
    .map((chunk, index) => ({
      fileHash: fileHash.value,
      chunkHash: fileHash.value + "-" + index,
      chunk,
    }))
    .filter((item) => !existChunks.includes(item.chunkHash))
    .map((item) => {
      const formData = new FormData();
      formData.append("fileHash", item.fileHash);
      formData.append("chunkHash", item.chunkHash);
      formData.append("chunk", item.chunk);
      return formData;
    });

  const taskPool = formDatas.map(
    (formData) => () =>
      fetch("http://localhost:3000/upload", {
        method: "POST",
        body: formData,
      })
  );

  // 控制请求并发
  await concurRequest(taskPool, 6);

  // 合并分片请求
  mergeRequest();
};

// 校验文件、文件分片是否存在
const verify = (fileHash: string, fileName: string) => {
  return fetch("http://localhost:3000/verify", {
    method: "POST",
    headers: {
      "Content-Type": "application/json",
    },
    body: JSON.stringify({
      fileHash,
      fileName,
    }),
  }).then((res) => res.json());
};

// 绑定上传事件
const handleUpload = async (e: Event) => {
  const file = (e.target as HTMLInputElement).files?.[0];
  if (!file) return;

  fileName.value = file.name;

  // 创建文件分片
  const chunks = createChunks(file);

  // 计算文件内容hash值
  fileHash.value = await calculateHash(file);

  // 校验文件、文件分片是否存在
  const verifyRes = await verify(fileHash.value, fileName.value);
  const { existFile, existChunks } = verifyRes.data;
  if (existFile) return;

  // 上传文件分片
  uploadChunks(chunks, existChunks);
};
</script>

<template>
  <h1>大文件上传</h1>
  <input type="file" @change="handleUpload" />
</template>

后端部分

const express = require("express");
const cors = require("cors");
const bodyParser = require("body-parser");
const fse = require("fs-extra");
const path = require("path");
const multipart = require("connect-multiparty");
const multipartMiddleware = multipart();

const app = express();

app.use(cors());
app.use(bodyParser.json());

// 所有上传的文件存放在该目录下
const UPLOADS_DIR = path.resolve("uploads");

/**
 * 上传
 */
app.post("/upload", multipartMiddleware, (req, res) => {
  const { fileHash, chunkHash } = req.body;

  // 如果临时文件夹(用于保存分片)不存在,则创建
  const chunkDir = path.resolve(UPLOADS_DIR, fileHash);
  if (!fse.existsSync(chunkDir)) {
    fse.mkdirSync(chunkDir);
  }

  // 如果临时文件夹里不存在该分片,则将用户上传的分片移到临时文件夹里
  const chunkPath = path.resolve(chunkDir, chunkHash);
  if (!fse.existsSync(chunkPath)) {
    fse.moveSync(req.files.chunk.path, chunkPath);
  }

  res.send({
    success: true,
    msg: "上传成功",
  });
});

/**
 * 合并
 */
app.post("/merge", async (req, res) => {
  const { fileHash, fileName } = req.body;

  // 最终合并的文件路径
  const filePath = path.resolve(UPLOADS_DIR, fileHash + path.extname(fileName));
  // 临时文件夹路径
  const chunkDir = path.resolve(UPLOADS_DIR, fileHash);

  // 读取临时文件夹,获取该文件夹下“所有文件(分片)名称”的数组对象
  const chunkPaths = fse.readdirSync(chunkDir);

  // 读取临时文件夹获得的文件(分片)名称数组可能乱序,需要重新排序
  chunkPaths.sort((a, b) => a.split("-")[1] - b.split("-")[1]);

  // 遍历文件(分片)数组,将分片追加到文件中
  const pool = chunkPaths.map(
    (chunkName) =>
      new Promise((resolve) => {
        const chunkPath = path.resolve(chunkDir, chunkName);
        // 将分片追加到文件中
        fse.appendFileSync(filePath, fse.readFileSync(chunkPath));
        // 删除分片
        fse.unlinkSync(chunkPath);
        resolve();
      })
  );
  await Promise.all(pool);
  // 等待所有分片追加到文件后,删除临时文件夹
  fse.removeSync(chunkDir);

  res.send({
    success: true,
    msg: "合并成功",
  });
});

/**
 * 校验
 */
app.post("/verify", (req, res) => {
  const { fileHash, fileName } = req.body;

  // 判断服务器上是否存在该hash值的文件
  const filePath = path.resolve(UPLOADS_DIR, fileHash + path.extname(fileName));
  const existFile = fse.existsSync(filePath);

  // 获取已经上传到服务器的文件分片
  const chunkDir = path.resolve(UPLOADS_DIR, fileHash);
  const existChunks = [];
  if (fse.existsSync(chunkDir)) {
    existChunks.push(...fse.readdirSync(chunkDir));
  }

  res.send({
    success: true,
    msg: "校验文件",
    data: {
      existFile,
      existChunks,
    },
  });
});

const server = app.listen(3000, () => {
  console.log(`Example app listening on port ${server.address().port}`);
});