JAVA高效率 (秒级) 将千万条数据导入数据库

发布时间 2023-06-01 10:05:47作者: chen1777
package com.chen.controller;

import com.chen.utils.BaseDao;
import com.chen.utils.DataImport;
import com.chen.utils.StreamUtil;
import org.junit.Test;

import java.io.File;
import java.sql.Connection;
import java.util.List;

public class BigData1000wTest {

    @Test
    public void BigData1000wTest() throws Exception {
        //   通过JDBCUtil工具类获取数据库连接对象
        Connection conn = BaseDao.getConn();
        //   StreamUtil是已经封装好的使用流读取文件的工具类
        List<String> list = StreamUtil.readingLineFormTextFile(new File("D://milliondatatest//test(500W).csv"));
        String sql = "insert into mysqltest values(?,?,?,?)";   //  定义要导入数据的sql,无需主键将第一个?设置为null
        long start = System.currentTimeMillis();   //   获取方法开始执行前的时间(单位:毫秒)
        //  调用刚刚封装好的工具类
        DataImport.dispose(conn, list, 0, true, 1000000, sql);
        long end = System.currentTimeMillis();     //   获取方法执行结束后的时间
        //   相减即可得到插入所有数据的耗时   秒=毫秒/1000;
        System.out.println("成功导入" + list.size() + "条数据!!时长:" + (end - start) / 1000 + "秒");
    }
}

 

package com.chen.utils;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

public class BaseDao {  //  静态工具类,用于创建数据库连接对象和释放资源,方便调用
    //    导入驱动jar包或添加Maven依赖(这里使用的是Maven,Maven依赖代码附在文末)
    static {
        try {
            Class.forName("com.mysql.cj.jdbc.Driver");
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }

    //  获取数据库连接对象
    public static Connection getConn() {
        Connection conn = null;
        try {
            //  rewriteBatchedStatements=true,一次插入多条数据,只插入一次
            conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/million-test?rewriteBatchedStatements=true", "root", "123456");
        } catch (SQLException throwables) {
            throwables.printStackTrace();
        }
        return conn;
    }

    //  释放资源
    public static void closeAll(AutoCloseable... autoCloseables) {
        for (AutoCloseable autoCloseable : autoCloseables) {
            if (autoCloseable != null) {
                try {
                    autoCloseable.close();
                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    }
}
package com.chen.utils;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.List;

public class DataImport {
    //   参数一:数据库连接对象、参数二:流文件读取出的集合、参数三:从第几条数据开始读取,目的是排除表头、参数四:是否包含主键、参数五:每次批量执行添加数据的数量、参数六:sql语句
    public static void dispose(Connection conn, List<String> list, Integer startRows, boolean includePrimaryKey, Integer size, String sql) {
        try {
            conn.setAutoCommit(false);  //  设置事物手动提交
            PreparedStatement ps = conn.prepareStatement(sql);
            String[] split = null;
            if (includePrimaryKey) {    //  包含主键,只需判断一次
                for (int i = startRows; i < list.size(); i++) {
                    //  按逗号切割字符串,-1代表忽略数组长度,避免数组长度越界异常
                    split = list.get(i).split(",", -1);
                    /*下方代码产生警告提示的原因:同一项目中,有重复的代码块(idea很好的提示。但是这里无法将判断放在循环内,不然会多出百万次判断使程序缓慢)*/
                    for (int j = 0; j < split.length; j++) {   //  遍历刚刚获取的数组
                        //   对集合中的每条数据进行处理,将字符串中多出的引号去掉,避免录入数据库时因字段类型不匹配而导致的格式转换异常
                        ps.setObject(j + 1, split[j].replace("\"", ""));    //  循环赋值
                    }
                    ps.addBatch();   //  将所有数据转为一条sql
                    if (i % size == 0 && i != 0) {   //  如果i能整除size,即执行循环体
                        ps.executeBatch();           //  批量执行sql
                        conn.commit();               //  事物手动提交
                        conn.setAutoCommit(false);   //  重新设置事物为手动提交
                        ps = conn.prepareStatement(sql);   //  再次为ps对象赋值
                    }
                }
            } else {    //  不包含主键
                for (int i = startRows; i < list.size(); i++) {
                    String s = list.get(i);
                    //  将集合中的对象从第一个逗号切割,substring包头不包尾,因此此处需加1
                    split = s.substring(s.indexOf(",") + 1).split(",", -1);
                    for (int j = 0; j < split.length; j++) {
                        ps.setObject(j + 1, split[j].replace("\"", ""));
                    }
                    ps.addBatch();
                    if (i % size == 0 && i != 0) {
                        ps.executeBatch();
                        conn.commit();
                        conn.setAutoCommit(false);
                        ps = conn.prepareStatement(sql);
                    }
                }
            }
            ps.executeBatch();  //  循环外提交是因为可能会出现循环内条件不成立而未提交过的情况
            conn.commit();      //  提交事物,避免脏数据(事物太长也有弊端)
            ps.close();         //  关闭资源
            conn.close();
        } catch (Exception throwables) {
            throwables.printStackTrace();
        }
    }
}
package com.chen.utils;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.List;

public class StreamUtil {

    // 批量关闭流
    public static void closings(AutoCloseable... closeables) throws Exception {
        // 非空判断
        if (closeables != null) {
            // 循环关闭
            for (AutoCloseable auto : closeables) {
                // 非空判断
                if (auto != null) {
                    auto.close();
                }
            }
        }
    }

    // 按行读取文本文件(返回List集合)
    // 默认UTF-8格式
    public static List<String> readingLineFormTextFile(File textFile) throws Exception {
        return readingLineFormTextFile(textFile, "UTF-8");
    }

    // 自定义编码格式
    public static List<String> readingLineFormTextFile(File textFile, String encode) throws Exception {
        // 存放结果
        List<String> list = new ArrayList<String>();
        // 字符输入流
        FileReader fr = new FileReader(textFile);
        // 缓冲字符输入流
        BufferedReader br = new BufferedReader(fr);
        // 用于存入复制的数据
        String str = null;
        // 复制,如果读取的不为空,证明读取到了数据
        while ((str = br.readLine()) != null) {
            list.add(str);
        }
        // 关流
        closings(br);
        closings(fr);
        return list;
    }

}

 

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.27</version>
        </dependency>