package com.chen.controller; import com.chen.utils.BaseDao; import com.chen.utils.DataImport; import com.chen.utils.StreamUtil; import org.junit.Test; import java.io.File; import java.sql.Connection; import java.util.List; public class BigData1000wTest { @Test public void BigData1000wTest() throws Exception { // 通过JDBCUtil工具类获取数据库连接对象 Connection conn = BaseDao.getConn(); // StreamUtil是已经封装好的使用流读取文件的工具类 List<String> list = StreamUtil.readingLineFormTextFile(new File("D://milliondatatest//test(500W).csv")); String sql = "insert into mysqltest values(?,?,?,?)"; // 定义要导入数据的sql,无需主键将第一个?设置为null long start = System.currentTimeMillis(); // 获取方法开始执行前的时间(单位:毫秒) // 调用刚刚封装好的工具类 DataImport.dispose(conn, list, 0, true, 1000000, sql); long end = System.currentTimeMillis(); // 获取方法执行结束后的时间 // 相减即可得到插入所有数据的耗时 秒=毫秒/1000; System.out.println("成功导入" + list.size() + "条数据!!时长:" + (end - start) / 1000 + "秒"); } }
package com.chen.utils; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; public class BaseDao { // 静态工具类,用于创建数据库连接对象和释放资源,方便调用 // 导入驱动jar包或添加Maven依赖(这里使用的是Maven,Maven依赖代码附在文末) static { try { Class.forName("com.mysql.cj.jdbc.Driver"); } catch (ClassNotFoundException e) { e.printStackTrace(); } } // 获取数据库连接对象 public static Connection getConn() { Connection conn = null; try { // rewriteBatchedStatements=true,一次插入多条数据,只插入一次 conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/million-test?rewriteBatchedStatements=true", "root", "123456"); } catch (SQLException throwables) { throwables.printStackTrace(); } return conn; } // 释放资源 public static void closeAll(AutoCloseable... autoCloseables) { for (AutoCloseable autoCloseable : autoCloseables) { if (autoCloseable != null) { try { autoCloseable.close(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } }
package com.chen.utils; import java.sql.Connection; import java.sql.PreparedStatement; import java.util.List; public class DataImport { // 参数一:数据库连接对象、参数二:流文件读取出的集合、参数三:从第几条数据开始读取,目的是排除表头、参数四:是否包含主键、参数五:每次批量执行添加数据的数量、参数六:sql语句 public static void dispose(Connection conn, List<String> list, Integer startRows, boolean includePrimaryKey, Integer size, String sql) { try { conn.setAutoCommit(false); // 设置事物手动提交 PreparedStatement ps = conn.prepareStatement(sql); String[] split = null; if (includePrimaryKey) { // 包含主键,只需判断一次 for (int i = startRows; i < list.size(); i++) { // 按逗号切割字符串,-1代表忽略数组长度,避免数组长度越界异常 split = list.get(i).split(",", -1); /*下方代码产生警告提示的原因:同一项目中,有重复的代码块(idea很好的提示。但是这里无法将判断放在循环内,不然会多出百万次判断使程序缓慢)*/ for (int j = 0; j < split.length; j++) { // 遍历刚刚获取的数组 // 对集合中的每条数据进行处理,将字符串中多出的引号去掉,避免录入数据库时因字段类型不匹配而导致的格式转换异常 ps.setObject(j + 1, split[j].replace("\"", "")); // 循环赋值 } ps.addBatch(); // 将所有数据转为一条sql if (i % size == 0 && i != 0) { // 如果i能整除size,即执行循环体 ps.executeBatch(); // 批量执行sql conn.commit(); // 事物手动提交 conn.setAutoCommit(false); // 重新设置事物为手动提交 ps = conn.prepareStatement(sql); // 再次为ps对象赋值 } } } else { // 不包含主键 for (int i = startRows; i < list.size(); i++) { String s = list.get(i); // 将集合中的对象从第一个逗号切割,substring包头不包尾,因此此处需加1 split = s.substring(s.indexOf(",") + 1).split(",", -1); for (int j = 0; j < split.length; j++) { ps.setObject(j + 1, split[j].replace("\"", "")); } ps.addBatch(); if (i % size == 0 && i != 0) { ps.executeBatch(); conn.commit(); conn.setAutoCommit(false); ps = conn.prepareStatement(sql); } } } ps.executeBatch(); // 循环外提交是因为可能会出现循环内条件不成立而未提交过的情况 conn.commit(); // 提交事物,避免脏数据(事物太长也有弊端) ps.close(); // 关闭资源 conn.close(); } catch (Exception throwables) { throwables.printStackTrace(); } } }
package com.chen.utils; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.util.ArrayList; import java.util.List; public class StreamUtil { // 批量关闭流 public static void closings(AutoCloseable... closeables) throws Exception { // 非空判断 if (closeables != null) { // 循环关闭 for (AutoCloseable auto : closeables) { // 非空判断 if (auto != null) { auto.close(); } } } } // 按行读取文本文件(返回List集合) // 默认UTF-8格式 public static List<String> readingLineFormTextFile(File textFile) throws Exception { return readingLineFormTextFile(textFile, "UTF-8"); } // 自定义编码格式 public static List<String> readingLineFormTextFile(File textFile, String encode) throws Exception { // 存放结果 List<String> list = new ArrayList<String>(); // 字符输入流 FileReader fr = new FileReader(textFile); // 缓冲字符输入流 BufferedReader br = new BufferedReader(fr); // 用于存入复制的数据 String str = null; // 复制,如果读取的不为空,证明读取到了数据 while ((str = br.readLine()) != null) { list.add(str); } // 关流 closings(br); closings(fr); return list; } }
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.27</version> </dependency>