Java分批将List数据导入数据库

一、项目场景:

在工作中的一个需求中,需要创建一张新的表,表格的初始数据需要从之前的多张表格中联查出来并且添加到当前表格中。由于在生产环境中数据量级达到了百万级别,因此在插入数据到MySQL中时需要分批次进行导入,我写了三种方法进行数据的导入,最后采用了第三种方法来进行数据导入,将实现过程在此进行记录。

在文章中,我将使用User来作为示例对象用于演示


二、为什么要分批次?

待更…


三、解决方案:

1. MyBatisPlus原生方法导入

// 获取到要插入数据库的集合,数据量很大
List<User> list = new ArrayList<>();
// 插入数据到MySQL中
userService.saveBatch(list);

2. List分组导入

(1)UserServiceImpl类中导入方法

@Service
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {
	@Autowired
	private UserMapper userMapper;

	@Override
	public void insert() {
    	// 通过一系列操作获取到要插入的集合,在此使用list代替
    	List<User> list = new ArrayList<>();
        
    	// 每次插入的数量
    	int batchSize = 1000;
    	// 计算需要分多少批插入数据库
    	int batch = list.size() / batchSize;
    	// 计算最后一批的大小
    	int lastSize = list.size() % batchSize;

    	// 将筛选出的结果分批次添加到表中
    	for (int i = batchSize; i <= batch * batchSize; i = i + batchSize) {
        	// 截取本次要添加的数据
        	List<User> insertList = list.subList(i - batchSize, i);
        	// 添加本批次数据到数据库中
        	userMapper.batchInsert(insertList);
    	}
    	// 最后一批元素的大小是否为0
    	if (lastSize != 0) {
        	// 如果元素有剩余则将所有元素作为一个子列表一次性插入
        	List<User> lastList = list.subList(batchSize * batch, list.size());
        	// 添加集合到数据库中
        	userMapper.batchInsert(lastList);
    	}
	}
	
}

代码解析:

先将列表分成每个1000个元素一批的子列表,然后使用自定义的 batchInsert() 方法对子列表进行批量插入操作。如果列表大小不是 1000 的倍数,则将剩余元素全部一次性插入。具体实现细节如下:

  1. 首先定义每一批次的插入数量 batchSize ,算出需要分几批插入变量 batch ,以及最后一批插入数量(集合中元素总量如果不是1000倍数时最后一批的剩余数量)的大小 lastSize 。
  2. 通过 for 循环,将列表分成每个 1000 个元素一批的子列表,随后使用 subList() 方法来获取当前批次要插入的元素
  3. 对于每一批要插入的子列表,使用自定义的 batchInsert() 方法进行批量插入操作。
  4. 判断最后一批的大小 lastSize 是否为 0,如果不为 0,则使用 subList() 方法将剩余所有元素作为一个子列表进行一次性插入。

在这里对最后一批插入元素时,下标为什么要使用 batchSize(每一批次的大小) * batch(批次数量) 来作为起始变量进行解析:

解析: 使用 batchSize * batch 作为集合的起始位置,是因为在 for 循环中已经将前 a 个元素作为起始位置插入到数据库中了,因此下一个起始位置应该是 batchSize 的倍数,即 batchSize * batch 。这样可以避免重复插入已经插入过的元素

(2)UserMapper数据持久化接口

将集合作为参数传递到Mapper层中

/**
 * 用户数据持久化接口
 *
 * @author Dream_飞翔
 * @since 2023/5/16
 */
public interface UserMapper extends BaseMapper<User> {
	/**
     * 添加指定集合内的数据到数据库中
     *
     * @param insertList 要添加的内容
     * @return 受影响的行数
     */
    Integer batchInsert(@Param("insertList") List<User> insertList);
}

(3)UserMapper.xml映射文件

<?xml version="1.0" encoding="utf-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.zrkizzy.data.mapper.UserMapper">
	<!-- 批量添加数据到数据库中 -->
	<insert id="batchInsert">
        INSERT INTO tb_user (id, username, password)
        VALUES
        <foreach collection ="userList" item="user" separator =",">
            (#{user.id}, #{user.username}, #{user.password})
        </foreach>
    </insert>
</mapper>

3. 多线程分批次插入

在第二种方法中,使用了分批次处理的数据导入方式,但是在数据量特别大的情况下,单线程的压力还是很大,因此使用多线程是比较好的一种方式。

(1)UserServiceImpl类中导入方法

@Service
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {
	@Autowired
	private UserMapper userMapper;

	@Override
	public void insert() {
    	// 通过一系列操作获取到要插入的集合,在此使用list代替
    	List<User> list = new ArrayList<>();
    	    
        // 获取虚拟机可用的最大处理器数量
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        // 获取要添加的数据集合大小
        int total = list.size();
        // 每次插入的数量
        int batchSize = 1000;
        // 计算需要分多少批插入数据库(向上取整)
        int totalBatch = (total + batchSize - 1) / batchSize;
        // 手动创建线程池
        ExecutorService executor = new ThreadPoolExecutor(
                // 线程池核心线程数量
                availableProcessors,
                // 线程池最大数量
                availableProcessors + 1000,
                // 空闲线程存活时间
                1000,
                // 时间单位
                TimeUnit.MILLISECONDS,
                // 线程池所使用的缓冲队列
                new ArrayBlockingQueue<>(100),
                // 线程池对拒绝任务的处理策略
                new ThreadPoolExecutor.CallerRunsPolicy());

        // 将筛选出的结果分批次添加到表中
        for (int batchIndex = 0; batchIndex < totalBatch; batchIndex++) {
            // 当前插入批次的起始索引
            int startIndex = batchIndex * batchSize;
            // 当前插入批次的结束索引
            int endIndex = Math.min((batchIndex + 1) * batchSize, total);
            // 截取本次要添加的数据
            List<LuckyDrawHistory> insertList = list.subList(startIndex, endIndex);
            // 将每个批次的插入逻辑封装成一个Runnable对象
            Runnable task = () -> {
                // 添加本批次数据到数据库中
                userMapper.batchInsert(insertList);
            };
            // 提交添加任务
            executor.submit(task);
        }
        // 关闭线程池释放资源
        executor.shutdown();
    }
	
}

代码解析: 待更…