FastDFS并发问题的排查经历

发布时间 2023-03-28 19:39:53作者: HeavenTang

附件用的fastdf上传和下载的, 本地开发时就没考虑过多文件上传就会有并发的问题,比如多个只上传成功了一个或者上传了但是文档内容缺失了,变成0字节。

呵。。都是一次难忘的经历。

经过本地模拟大批量的上传下载, 发现fastdf是在启动时就初始化了tracker和stroge, 每次调用过他的接口后都会关闭连接, 这样就导致上传的不完整或者不成功。也是后面找的博客看到的,非常感谢这篇文章。https://blog.csdn.net/AFSGEFEGH/article/details/109034532?spm=1001.2101.3001.6650.3&utm_medium=distribute.pc_relevant.none-task-blog-2~default~CTRLIST~Rate-3-109034532-blog-114929991.235^v27^pc_relevant_multi_platform_whitelistv3&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2~default~CTRLIST~Rate-3-109034532-blog-114929991.235^v27^pc_relevant_multi_platform_whitelistv3&utm_relevant_index=6

记得方法上面加上synchronized

点击查看代码

	@RequestMapping(value = "/batchDownloadForThesisCheck2", method = RequestMethod.POST)
	public synchronized void batchDownloadForThesisCheck2(@RequestBody List<FileInfoDto> fileInfoList)  {
		if(CollectionUtils.isEmpty(fileInfoList)){
			throw new EducationException("下载附件失败");
		}
		List<FileInfoDto> fileInfoDtos = differentFileName(fileInfoList);
//		List<FileInfoDto> fileInfoDtos = new ArrayList<>();

//		for(int fi=0;fi<300;fi++){
//			FileInfoDto it = new FileInfoDto();
//			it.setFileName("20210115_小鱼儿"+fi+"_jjlw.doc");
//			fileInfoDtos.add(it);
//		}
		String zipName = request.getParameter("zipName");
		if(StringUtils.isEmpty(zipName)) zipName = "批量下载";
		ZipOutputStream zipOS = null ;
		InputStream is = null;

		OutputStream os = null;
		// 计算百分值
		int index =1;
		int totalSize =CollectionUtils.isNotEmpty(fileInfoDtos) ? fileInfoDtos.size():1;


		try {
			response.setContentType("application/octet-stream; charset=UTF-8");
			response.setHeader("Access-Control-Expose-Headers", "fileName");
			response.setHeader("fileName", URLEncoder.encode(zipName, "UTF-8"));
			os = response.getOutputStream();
			zipOS = new ZipOutputStream(os);
			for (FileInfoDto info : fileInfoDtos) {
				// 机检论文换名字,学号_姓名_jjlw命名
				String itemFileName= info.getFileName();
//				itemFileName = "S20020804005_陈明鑫_jjlw .docx";
				int secondShowIndex = Common.findNumber(itemFileName,"_",2);
				int firstShowIndex = Common.findNumber(itemFileName,"_",1);
				if("1".equals(info.getPaperToName())){
					// 文件格式1:学校代码_学号_LW.doc 2:学号_姓名_jjlw
					itemFileName = "10356_"+itemFileName.substring(0,firstShowIndex)+"_LW"+itemFileName.substring(itemFileName.lastIndexOf("."),itemFileName.length());
				}else{
					itemFileName = itemFileName.substring(0,secondShowIndex)+"_jjlw"+itemFileName.substring(itemFileName.lastIndexOf("."),itemFileName.length());
				}
				logger.error("已下载学生:{} "  ,itemFileName);
				zipOS.putNextEntry(new ZipEntry(itemFileName));
				try{
					is = fastDFS.downloadFile(info.getFileId());
//					is = fastDFS.downloadFile("group1/M00/00/C0/wKgjdWQYEJ-AbefsAdOyZXrKanw028.doc");
					int len = 0;
					byte[] buffer = new byte[1024*8];
					while ((len = is.read(buffer)) != -1) {
						zipOS.write(buffer, 0, len);
					}
//					is.close();

					// 计算进度,向下取整
					double nowProcess = Math.floor((index*100)/totalSize);
					logger.error("已下载”{}",index);
//					createProcessDownFile(nowProcess,"已下载"+nowProcess+"%",info.getTimeId(),info.getUserId());
					index ++;
				}catch(Exception ignored){

				}
				zipOS.flush();
//				zipOS.closeEntry();
			}
		} catch (IOException e) {
//			createProcessDownFile(100d,"批量下载发生错误",fileInfoList.get(0).getTimeId(),fileInfoList.get(0).getUserId());
			logger.error("批量下载发生错误: " + e.getMessage(), e);
		} finally {
			try {
				if (zipOS != null) {
					zipOS.closeEntry();
					zipOS.close();
				}
				if (os != null) os.close();
//				createProcessDownFile(100d,"已下载100%",fileInfoList.get(0).getTimeId(),fileInfoList.get(0).getUserId());
				logger.warn("关闭机检下载 :"  );
			} catch (IOException e) {
//				createProcessDownFile(100d,"批量下载发生错误,关闭文件流失败",fileInfoList.get(0).getTimeId(),fileInfoList.get(0).getUserId());
				logger.warn("关闭文件流失败, cause by :" + e.getMessage());

			}
//			finally {
//				createProcessDownFile(100d,"已下载100%",fileInfoList.get(0).getTimeId(),fileInfoList.get(0).getUserId());
//				logger.warn("关闭机检下载 :"  );
//			}

		}
	}

下面是封装的上传FastDFS

点击查看代码
package fastdfs.config;

import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

import javax.annotation.PostConstruct;

import org.apache.commons.lang.StringUtils;
import org.csource.common.NameValuePair;
import org.csource.fastdfs.ClientGlobal;
import org.csource.fastdfs.ProtoCommon;
import org.csource.fastdfs.StorageClient1;
import org.csource.fastdfs.StorageServer;
import org.csource.fastdfs.TrackerClient;
import org.csource.fastdfs.TrackerGroup;
import org.csource.fastdfs.TrackerServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.CollectionUtils;

import com.xx.commons.exception.EducationException;

import lombok.extern.slf4j.Slf4j;



/**
 * FastDfs文件系统工具类
 * 连接Fast
 * 上传图片 
 * 返回上传之后的路径 用此路径就能访问此图片
 *    group1/M00/00/01/wKjIgFWOYc6APpjAAAD-qk29i78248.jpg
 *
 */
@Slf4j
public class FastDFS {

    @Autowired
    private  FastDFSProperty fastDFSProperty;

    /**
     * 跟踪器
     */
    private TrackerServer trackerServer;
    
    /**
     * 存储器
     */
    private StorageServer storageServer;
    
    /**
     * 默认编码
     */
    private static final String DEFAULT_ENCODING = "UTF-8";

    @PostConstruct
    public void init(){
        try {
            ClientGlobal.setG_charset(DEFAULT_ENCODING);
            ClientGlobal.setG_connect_timeout(fastDFSProperty.getConnect_timeout());
            ClientGlobal.setG_network_timeout(fastDFSProperty.getNetwork_timeout());
            ClientGlobal.setG_secret_key(fastDFSProperty.getSecret_key());
            ClientGlobal.setG_tracker_http_port(fastDFSProperty.getTracker_http_port());

            String tracker_server = fastDFSProperty.getTracker_server();
            InetSocketAddress isadd = new InetSocketAddress(
                    tracker_server.substring(0, tracker_server.indexOf(':')), 
                    Integer.parseInt(tracker_server.substring(tracker_server.indexOf(':') + 1, tracker_server.length())));
            InetSocketAddress[] tracker_servers  = {isadd};
            ClientGlobal.setG_tracker_group(new TrackerGroup(tracker_servers));

            TrackerClient trackerClient = new TrackerClient(ClientGlobal.g_tracker_group);
            trackerServer = trackerClient.getConnection();
            if (trackerServer == null) {
                throw new EducationException("getConnection return null");
            }
            storageServer = trackerClient.getStoreStorage(trackerServer);
            if (storageServer == null) {
                throw new EducationException("getStoreStorage return null");
            }
            ProtoCommon.activeTest(storageServer.getSocket());
        } catch (Exception e) {
            throw new EducationException("初始化 fastdfs 配置失败", e);
        }
    }

    /**
     * 
     * @param file
     *            文件
     * @param fileName
     *            文件名
     * @return 返回Null则为失败
     */
    public String uploadFile(File file, String fileName) {
        InputStream fis = null;
        try {
            NameValuePair[] meta_list = null; 
            fis = Files.newInputStream(file.toPath());
            byte[] file_buff = new byte[1024];
            int len = fis.available();
            file_buff = new byte[len];
            while (fis.read(file_buff) > 0) {
                break;
            }
            StorageClient1 storageClient1 = new StorageClient1(trackerServer, storageServer);
            String fileid = storageClient1.upload_file1(file_buff, getFileExt(fileName), meta_list);
            return fileid;
        } catch (Exception ex) {
            throw new EducationException("上传文件错误",ex);
        } finally{
            if (fis != null) {
                try {
                    fis.close();
                } catch (IOException e) {
                    log.error("Close {} InputStream failed", fis);
                }
            }
        }
    }

    /**
     * 上传文件
     * @param bytes
     * @param name
     * @param size
     * @return
     */
    public String uploadFile(byte[] bytes, String name, Long size) {
        try {
            //扩展名
            String ext = name.substring(name.lastIndexOf('.')+1);
            NameValuePair[] meta_list = new NameValuePair[3];
            meta_list[0] = new NameValuePair("filename",name);
            meta_list[1] = new NameValuePair("fileext",ext);
            meta_list[2] = new NameValuePair("filesize",String.valueOf(size));
            StorageClient1 storageClient1 = new StorageClient1(trackerServer, storageServer);
            return storageClient1.upload_file1(bytes, ext, meta_list);
        } catch (Exception ex) {
            throw new EducationException("上传文件错误",ex);
        }
    }

    /**
     * 根据组名和远程文件名来删除一个文件
     * 
     * @param groupName
     *            例如 "group1" 如果不指定该值,默认为group1
     * @param fileName
     *            例如"M00/00/00/wKgxgk5HbLvfP86RAAAAChd9X1Y736.jpg"
     * @return 0为成功,非0为失败,具体为错误代码
     */
    public int deleteFile(String groupName, String fileName) {
        try {
            StorageClient1 storageClient1 = new StorageClient1(trackerServer, storageServer);
            return storageClient1.delete_file(StringUtils.isBlank(groupName)? "group1" : groupName, fileName);
        } catch (Exception ex) {
            throw new EducationException("return null",ex);
        }
    }

    /**
     * 根据fileId来删除一个文件(我们现在用的就是这样的方式,上传文件时直接将fileId保存在了数据库中)
     * 
     * @param fileId
     *            file_id源码中的解释file_id the file id(including group name and filename);例如 group1/M00/00/00/ooYBAFM6MpmAHM91AAAEgdpiRC0012.xml
     * @return 0为成功,非0为失败,具体为错误代码
     */
    public int deleteFile(String fileId) {
        try {
            StorageClient1 storageClient1 = new StorageClient1(trackerServer, storageServer);
            return storageClient1.delete_file1(fileId);
        } catch (Exception ex) {
            throw new EducationException("删除文件错误",ex);
        }
    }

    /**
     * 修改一个已经存在的文件
     * 
     * @param oldFileId
     *            原来旧文件的fileId, file_id源码中的解释file_id the file id(including group name and filename);例如 group1/M00/00/00/ooYBAFM6MpmAHM91AAAEgdpiRC0012.xml
     * @param file
     *            新文件
     * @param filePath
     *            新文件路径
     * @return 返回空则为失败
     */
    public String modifyFile(String oldFileId, File file, String filePath) {
        String fileid = null;
        try {
            // 先上传
            fileid = uploadFile(file, filePath);
            if (fileid == null) {
                return null;
            }
            // 再删除
            int delResult = deleteFile(oldFileId);
            if (delResult != 0) {
                return null;
            }
        } catch (Exception ex) {
            throw new EducationException("修改一个已经存在的文件错误",ex);
        }
        return fileid;
    }

    /**
     * 文件下载
     * 
     * @param fileId
     * @return 返回一个流
     */
    public InputStream downloadFile(String fileId) {
        try {
            StorageClient1 storageClient1 = new StorageClient1(trackerServer, storageServer);
            byte[] bytes = storageClient1.download_file1(fileId);
            return new ByteArrayInputStream(bytes);
        } catch (Exception ex) {
            throw new EducationException("文件下载错误",ex);
        }
    }

    /**
     * 
     * @param fileId
     * @return 返回一个字节数组
     */
    public byte[] downloadFileToByte(String fileId) {
        try {
            StorageClient1 storageClient1 = new StorageClient1(trackerServer, storageServer);
            return storageClient1.download_file1(fileId);
        } catch (Exception ex) {
            throw new EducationException("文件下载错误",ex);
        }
    }

    /**  
     * 批量文件下载,在map中给出文件名:fileName, 文件对应路径:filePath;   
     * 方法返回  由所有文件生成的压缩包ZIP   
     * @param fileList 在map中给出文件名:fileName, 文件对应路径:filePath;  
     * @return 返回一个流  
     */  
    public InputStream downloadFile(List<Map<String, String>> fileList){  
        if(CollectionUtils.isEmpty(fileList)){  
            throw new EducationException("文件下载错误,fileList为空!");  
        }  
        Date date = new Date();  
        long timeStr = date.getTime();  
        
        StorageClient1 storageClient1 = new StorageClient1(trackerServer, storageServer);
        
        try(ZipOutputStream zos = new ZipOutputStream(Files.newOutputStream(Paths.get(".", timeStr+".zip")));) {  
              
            ZipEntry entry;  
            int count, bufferLen = 1024;    
            byte data[] = new byte[bufferLen];  
            for(Map<String, String> map : fileList){  
                entry = new ZipEntry(map.get("fileName"));  
                  
                zos.putNextEntry(entry);  
                byte[] bytes = storageClient1.download_file1(map.get("filePath"));  
                try(BufferedInputStream bis = new BufferedInputStream(new ByteArrayInputStream(bytes));){  
                    while ((count = bis.read(data, 0, bufferLen)) != -1) {    
                        zos.write(data, 0, count);    
                    }    
                    zos.closeEntry();  
                }  
            }  
            return Files.newInputStream(Paths.get(".", timeStr+".zip"));  
        } catch (Exception ex) {  
            throw new EducationException("文件下载错误", ex);  
        }  
    } 

    /**
     * 获取文件后缀名(不带点).
     * 
     * @return 如:"jpg" or "".
     */
    private  String getFileExt(String fileName) {
        if (StringUtils.isBlank(fileName) || !fileName.contains(".")) {
            return "";
        } else {
            return fileName.substring(fileName.lastIndexOf('.') + 1); // 不带最后的点
        }
    }
}

fastdf源码中storageServer每次用完都会关闭

image