900万大数据量 查询 更新 使用redis 多线程请求

发布时间 2023-04-19 18:15:16作者: 何苦->

表A中有900多万数据,根据900万数据,查询数据并插入B表

  • 创建一个定时任务,定时查询配置条件,查询更新(查询更新使用多线程)预估时间,设置请求时间跟频率
    不想这么干:
  • 可以查看数据执行进度

思路:

  • 设置一个运行队列 runList 里面是待执行的id
  • 设置一个失败队列 failList 里面是执行失败的id
  • 展示进度 最大id - 开始id - runList队列大小 / (最大id - 开始id)
  • 展示预估剩余时间 最后一次循环执行时间 * 总循环数量
  1. 请求开始id接口 设置开始id
  2. 请求结束id接口 设置最大id及每次循环执行的数量 生成组成List<从开始id~结束id(List<切割成(每次循环执行的数量)>)>
  3. 递归拿出List<切割成(每次循环执行的数量)> 开启多线程 请求 存入库

Controller


	/**
	 * 折900万数据主程序
	 * @return
	 */
	@GetMapping("/importOrganizeMiduoCode")
	public ReturnT<String> importOrganizeMiduoCode() {
		XxlJobHelper.log("========= importOrganizeMiduoCode  start : {} ===============", System.currentTimeMillis());
		miDuoService.organizeMiduoCode();
		XxlJobHelper.log("========= importOrganizeMiduoCode  end : {} ===============", System.currentTimeMillis());
		return ReturnT.SUCCESS;
	}

	/**
	 * 设置开始id
	 * @param start
	 * @return
	 */
	@GetMapping("/importOrganizeMiduoCodeSetStart")
	public ReturnT<String> importOrganizeMiduoCodeSetStart(@RequestParam(name = "start", defaultValue = "1") String start) {
		XxlJobHelper.log("========= importOrganizeMiduoCodeSetStart  start : {} ===============", System.currentTimeMillis());
		miDuoService.importOrganizeMiduoCodeSetStart(start);
		XxlJobHelper.log("========= importOrganizeMiduoCodeSetStart  end : {} ===============", System.currentTimeMillis());
		return ReturnT.SUCCESS;
	}

	/**
	 * 设置结束id及步进长度并成生运行队列
	 * @param end
	 * @param step
	 * @return
	 */
	@GetMapping("/importOrganizeMiduoCodeSetEnd")
	public ReturnT<String> importOrganizeMiduoCodeSetEnd(@RequestParam(name = "end", defaultValue = "1") String end, @RequestParam(name = "step", defaultValue = "1000") String step) {
		XxlJobHelper.log("========= importOrganizeMiduoCodeSetEnd  start : {} ===============", System.currentTimeMillis());
		miDuoService.refreshRedisCache(end, step);
		XxlJobHelper.log("========= importOrganizeMiduoCodeSetEnd  end : {} ===============", System.currentTimeMillis());
		return ReturnT.SUCCESS;
	}

	/**
	 * 修改运行停止位
	 * @param flag
	 * @return
	 */
	@GetMapping("/importOrganizeMiduoCodeSetFlag")
	public ReturnT<String> importOrganizeMiduoCodeSetFlag(@RequestParam(name = "flag", defaultValue = "1") String flag) {
		XxlJobHelper.log("========= importOrganizeMiduoCodeSetFlag  start : {} ===============", System.currentTimeMillis());
		miDuoService.importOrganizeMiduoCodeSetFlag(flag);
		XxlJobHelper.log("========= importOrganizeMiduoCodeSetFlag  end : {} ===============", System.currentTimeMillis());
		return ReturnT.SUCCESS;
	}

	/**
	 * 查看进度
	 * @return
	 */
	@GetMapping("/importOrganizeMiduoCodeProgress")
	public R importOrganizeMiduoCodeProgress() {
		XxlJobHelper.log("========= importOrganizeMiduoCode  start : {} ===============", System.currentTimeMillis());
		Map<String, Object> l = miDuoService.importOrganizeMiduoCodeProgress();
		XxlJobHelper.log("========= importOrganizeMiduoCode  end : {} ===============", System.currentTimeMillis());
		return R.data(l);
	}

Service

// 退出标记
private static final String MIDUO_FLAG = "miduo:flag";
// 开始id
private static final String MIDUO_START_NUM = "miduo:startNum";
// 结束id
private static final String MIDUO_TOTAL_NUM = "miduo:totalNum";
// 运行时ids
private static final String MIDUO_RUN_LIST = "miduo:runList";
// 失败时ids
private static final String MIDUO_FAIL_LIST = "miduo:failList";
// 最后两次循环时间
private static final String MIDUO_TIME_LIST = "miduo:timeList";
// 循环次数
private static final String MIDUO_TIME_COUNT = "miduo:timeCount";
// 运行状态
private static final String MIDUO_RUN_STATUS = "miduo:runStatus";
// 每次执行ids数量
private static final String MIDUO_STEP = "miduo:step";


/**
	设置开始id
 */
@Override
public void importOrganizeMiduoCodeSetStart(String start) {
	redisTemplate.setDefaultSerializer(new FastJsonRedisSerializer<>(Object.class));
	redisTemplate.setValueSerializer(new GenericFastJsonRedisSerializer());
	redisTemplate.opsForValue().set(MIDUO_START_NUM, start);
}

/**
	设置结束id 
	每次执行ids数量
 */
@Override
public void refreshRedisCache(String endStr, String stepStr) {
	Long end = NumberUtil.parseLong(endStr);
	Long step = NumberUtil.parseLong(stepStr);
	if(ObjectUtil.isEmpty(end)) {
		MiduoCodeNew one = miduoCodeNewService.getOne(Wrappers.<MiduoCodeNew>lambdaQuery().orderByDesc(MiduoCodeNew::getId).last("LIMIT 1"));
		end = one.getId();
	}
	if (ObjectUtil.isEmpty(step)) {
		step = 1000L;
	}
	// step    每次执行ids数量
	redisTemplate.opsForValue().set(MIDUO_STEP, step.toString());
	// flag     退出标记
	redisTemplate.opsForValue().set(MIDUO_FLAG, "1");
	// totalNum  结束id
	redisTemplate.opsForValue().set(MIDUO_TOTAL_NUM, end.toString());
	// startNum   获取开始id
	Long startNum = getStartNumLong();


	List<Long> info = new LinkedList<>();
	redisTemplate.delete(MIDUO_RUN_LIST);
	appendRange(startNum, end, step, info);
	redisTemplate.opsForValue().set(MIDUO_TIME_COUNT, info.size());
	for (Long aLong : info) {
		Long finalStep = step;
		redisTemplate.executePipelined(
			(RedisCallback<Object>) connection -> {
				connection.openPipeline();
				for (Long i = aLong; i < aLong + finalStep; i++) {
					// key3 runList  待处理队列
					connection.rPush(MIDUO_RUN_LIST.getBytes(StandardCharsets.UTF_8), i.toString().getBytes(StandardCharsets.UTF_8));
				}
				return null;
			}
		);
	}
}

/**
 * 获取开始id
 */
private Long getStartNumLong() {
	Object startObj = redisTemplate.opsForValue().get(MIDUO_START_NUM);
	String startStr = StrUtil.toString(startObj);
	startStr =  "null".equals(startStr) ? "" : startStr;
	long l = NumberUtil.parseLong(startStr);
	return ObjectUtil.isNotEmpty(l) ? l : 1L;
}


/**
 * 退出标记
 */
@Override
public void importOrganizeMiduoCodeSetFlag(String flag) {
	redisTemplate.setDefaultSerializer(new FastJsonRedisSerializer<>(Object.class));
	redisTemplate.setValueSerializer(new GenericFastJsonRedisSerializer());
	redisTemplate.opsForValue().set(MIDUO_FLAG, flag);
}


/**
 * 主处理程序
 * 1. 设置开始
 * 2. 设置结束 并生成执行队列
 */
@Override
public void organizeMiduoCode() {
	redisTemplate.setDefaultSerializer(new FastJsonRedisSerializer<>(Object.class));
	redisTemplate.setValueSerializer(new GenericFastJsonRedisSerializer());

	Long size = redisTemplate.opsForList().size(MIDUO_RUN_LIST);
	if (ObjectUtil.isEmpty(size)) {
		throw new BusinessException("运行队列为空");
	}

	//  每次执行ids数量
	Object stepObj = redisTemplate.opsForValue().get(MIDUO_STEP);
	String stepStr = StrUtil.toString(stepObj);
	stepStr =  "null".equals(stepStr) ? "1000" : stepStr;
	Long batchCount = NumberUtil.parseLong(stepStr);

	// 开启多线程查询数据
	// key5 startNum
	Long startNum = getStartNumLong();
	refresh(startNum, batchCount);
}



/**
 * 获取进度
 * @return
 */
@Override
public Map<String, Object> importOrganizeMiduoCodeProgress() {
	redisTemplate.setDefaultSerializer(new FastJsonRedisSerializer<>(Object.class));
	redisTemplate.setValueSerializer(new GenericFastJsonRedisSerializer());
	Object totalObj = redisTemplate.opsForValue().get(MIDUO_TOTAL_NUM);
	String totalStr = StrUtil.toString(totalObj);
	totalStr =  "null".equals(totalStr) ? "" : totalStr;
	Long total = NumberUtil.parseLong(totalStr);
	Object startObj = redisTemplate.opsForValue().get(MIDUO_START_NUM);
	String startStr = StrUtil.toString(startObj);
	startStr =  "null".equals(startStr) ? "" : startStr;
	Long start = NumberUtil.parseLong(startStr);
	Object countObj = redisTemplate.opsForValue().get(MIDUO_TIME_COUNT);
	String countStr = StrUtil.toString(countObj);
	countStr =  "null".equals(countStr) ? "" : countStr;
	int count = NumberUtil.parseInt(countStr);

	int flag = 2;
	Object runObj = redisTemplate.opsForValue().get(MIDUO_RUN_STATUS);
	if (ObjectUtil.isNotEmpty(runObj)) {
		flag = 1;
	}

	List<Object> range = redisTemplate.opsForList().range(MIDUO_TIME_LIST, 0, -1);
	Long size = redisTemplate.opsForList().size(MIDUO_RUN_LIST);
	long between = 0L;
	if (ObjectUtil.isNotEmpty(range) && range.size() > 1) {
		String startTimeStr = StrUtil.toString(range.get(0));
		startTimeStr =  "null".equals(startTimeStr) ? "" : startTimeStr;
		int startTime = NumberUtil.parseInt(startTimeStr);
		String endTimeStr = StrUtil.toString(range.get(1));
		endTimeStr =  "null".equals(endTimeStr) ? "" : endTimeStr;
		int endTime = NumberUtil.parseInt(endTimeStr);
		between = Math.abs(startTime - endTime);
	}
	between = between * count;
	String formatBetween = DateUtil.formatBetween(between, BetweenFormatter.Level.MILLSECOND);

	total = total - start;
	Map<String, Object> map = new HashMap<>();
	BigDecimal round = BigDecimal.ZERO;
	if (total > 0) {
		round = NumberUtil.round(NumberUtil.mul(NumberUtil.div(NumberUtil.sub(total, size), total), 100), 2);
	}
	map.put("flag", flag);
	map.put("round", round);
	map.put("formatBetween", formatBetween);
	return map;
}

private void refresh(Long j, Long batchCount){
	reLog();
	Long z = j;
	List<Object> list = redisTemplate.executePipelined(
		(RedisCallback<Object>) connection -> {
			connection.openPipeline();
			for (Long i = z; i < batchCount + z; i++) {
				// key3 runList  待处理队列
				connection.lPop(MIDUO_RUN_LIST.getBytes(StandardCharsets.UTF_8));
			}
			return null;
		}
	);

	log.info("[MiDuoServiceImpl][organizeMiduoCode]批次取ids {}", JSONUtil.toJsonStr(list));

	List<MiduoTask> tasks = new ArrayList<>();
	for (Object aLong : list) {
		tasks.add(new MiduoTask(aLong));
	}
	ThreadPoolExecutor myThreadPoolExecutor = new ThreadPoolExecutor(8, 10, 3600, TimeUnit.SECONDS,
		new ArrayBlockingQueue(1000000), new MiduoFactory(new AtomicInteger()));

	try {
		List<Future<MiduoBasicDataVO>> futures = myThreadPoolExecutor.invokeAll(tasks);
		if (CollUtil.isEmpty(futures)) {
			log.info("[MiDuoServiceImpl][organizeMiduoCode],查未询到相关数据");
			return;
		}
		for (Future<MiduoBasicDataVO> future : futures) {
			MiduoBasicDataVO miduoBasicDataVO = future.get();
			if (miduoBasicDataVO.getStatus() == 3) {
				// key2 failList 失败队列
				redisTemplate.opsForList().rightPush(MIDUO_FAIL_LIST, miduoBasicDataVO.getId());
			}
		}
	} catch (InterruptedException | ExecutionException e) {
		redisTemplate.executePipelined(
			(RedisCallback<Object>) connection -> {
				connection.openPipeline();
				for (int i = 0; i < list.size(); i++) {
					// key2 failList 失败队列
					connection.rPush(MIDUO_FAIL_LIST.getBytes(StandardCharsets.UTF_8), list.get(i).toString().getBytes(StandardCharsets.UTF_8));
				}
				return null;
			}
		);
		log.error("利用线程池批量查询生成数据:" + e);
		throw new BusinessException("查询生成数据失败");
	} finally {
		//关闭线程池
		myThreadPoolExecutor.shutdown();
	}

	// 退出标记
	Object flagObj = redisTemplate.opsForValue().get(MIDUO_FLAG);
	int o = NumberUtil.parseInt(StrUtil.toString(flagObj));
	if (ObjectUtil.isNotEmpty(o)) {
		if (o != 1) {
			log.info("[MiDuoServiceImpl][organizeMiduoCode][paragraph] ==================== 查询 中止了 {} ========================", NumberUtil.toStr(j));
			return;
		}
	}

	j = j + batchCount;
	log.info("[MiDuoServiceImpl][organizeMiduoCode][paragraph] ==================== 查询区间 {} - {} ========================", NumberUtil.toStr(z), NumberUtil.toStr(j));
	refresh(j, batchCount);
}

/**
 * 记录时间
 */
private void reLog(){
	try {
		Long size = redisTemplate.opsForList().size(MIDUO_TIME_LIST);
		if (ObjectUtil.isNotEmpty(size) && size > 1) {
			redisTemplate.opsForList().rightPop(MIDUO_TIME_LIST);
		}
		redisTemplate.opsForList().leftPush(MIDUO_TIME_LIST, System.currentTimeMillis());
		redisTemplate.opsForValue().decrement(MIDUO_TIME_COUNT);
		redisTemplate.opsForValue().set(MIDUO_RUN_STATUS, "lock",600, TimeUnit.SECONDS);
	} catch (Exception e){
		log.info("[MiDuoServiceImpl][organizeMiduoCode] ==================== 记录时间报错了 ========================");
	}
}


/**
 * 将给定范围内的整数添加到已有集合中
 *
 * @param start  开始(包含)
 * @param stop   结束(包含)
 * @param step   步进
 * @param values 集合
 * @return 集合
 */
public static Collection<Long> appendRange(Long start, Long stop, Long step, Collection<Long> values) {
	if (start < stop) {
		step = Math.abs(step);
	} else if (start > stop) {
		step = -Math.abs(step);
	} else {// start == end
		values.add(start);
		return values;
	}

	for (Long i = start; (step > 0) ? i <= stop : i >= stop; i += step) {
		values.add(i);
	}
	return values;
}