package com.xlkh.kafka; import cn.hutool.core.collection.CollectionUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.session.pool.SessionPool; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.write.record.Tablet; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*; import java.util.stream.Collectors; @Slf4j @Component("com.xlkh.kafka.DataLoaderKafkaConsumer") public class DataLoaderKafkaConsumer { @Autowired private SessionPool sessionPool; /** * 保存已经创建的时间序列 */ private final static Set<String> STATIC_PATHS = Sets.newConcurrentHashSet(); /** * fluent_data,批量消费 */ @KafkaListener(topics = "fluent_data", groupId = "fluent_data_demo", containerFactory = "batchFactory") @SneakyThrows public void listenBatchByFluent(List<ConsumerRecord<String, String>> records) { log.error("从kafka消费fluent数据" + records.size() + "条,当前偏移量:" + records.get(0).offset()); //创建时间序列,如果序列已经存在,不再重新创建 createTimeseriesIfNotExist(records); log.info("开始把数据放到iotdb-----------------------"); insertIotdbByKafka(records); } private void insertIotdbByKafka(List<ConsumerRecord<String, String>> records) throws ParseException, IoTDBConnectionException, StatementExecutionException { //key为kks的路径,value是时间戳集合 Map<String, List<Long>> timeStampMap = new HashMap<>(); //key为kks路径,value是具体的数据 Map<String, List<Float>> values = new HashMap<>(); //保存kks编码 Set<String> kksSet = new HashSet<>(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); for (ConsumerRecord<String, String> record : records) { String valueData = record.value(); JSONArray jsonArray = JSON.parseArray(valueData); for (int i = 0; i < jsonArray.size(); i++) { Map map = JSON.parseObject(String.valueOf(jsonArray.get(i)), Map.class); JSONArray jsonArray1 = JSON.parseArray(String.valueOf(map.get("msg"))); for (int j = 0; j < jsonArray1.size(); j++) { Map map1 = JSON.parseObject(String.valueOf(jsonArray1.get(j)), Map.class); if (map1.containsKey("big_v")){ String kks = map1.get("kks").toString(); Date date = dateFormat.parse(map1.get("time").toString()); Float val = Float.parseFloat(String.valueOf(map1.get("big_v"))); if(!kksSet.contains(kks)){ timeStampMap.put(kks, new ArrayList<>()); values.put(kks, new ArrayList<>()); kksSet.add(kks); } timeStampMap.get(kks).add(date.getTime()); values.get(kks).add(val); } } } } //遍历批量插入每个设备的数据 for (String kks : kksSet) { List<Long> longs = timeStampMap.get(kks); //声明Tablet对象设备属性 List<MeasurementSchema> schemas = new ArrayList<>(); schemas.add(new MeasurementSchema(kks,TSDataType.FLOAT)); Tablet tablet = new Tablet("root.param.demo", schemas, longs.size()); for (int row = 0; row < longs.size(); row++) { int rowIndex = tablet.rowSize++; //设备时间戳值 tablet.addTimestamp(rowIndex, longs.get(row)); //设置对应的值 tablet.addValue(schemas.get(0).getMeasurementId(), rowIndex, values.get(kks).get(row)); } //批量插入数据 sessionPool.insertTablet(tablet); } log.info("数据成功插入到iotdb-----------------------"+"插入的数据量大小为:"+records.size()); } /** * 创建时间序列,如果序列已经存在,不再重新创建 * * @param records 批量数据 */ private void createTimeseriesIfNotExist(List<ConsumerRecord<String, String>> records) { try { List<String> data = records.stream().map(ConsumerRecord::value).collect(Collectors.toList()); HashSet<String> paths = Sets.newHashSetWithExpectedSize(250); for (String msg : data) { JSONArray jsonArray = JSON.parseArray(msg); for (int i = 0; i <jsonArray.size() ; i++) { Map map = JSON.parseObject(String.valueOf(jsonArray.get(i)), Map.class); JSONArray jsonArray1 = JSON.parseArray(String.valueOf(map.get("msg"))); for (int j = 0; j < jsonArray1.size(); j++){ Map map1 = JSON.parseObject(String.valueOf(jsonArray1.get(j)), Map.class); String kks = map1.get("kks").toString(); String path = "root.param.demo." + kks; paths.add(path); } } } List<String> notExistPaths = Lists.newArrayList(); List<TSDataType> tsDataTypes = Lists.newArrayList(); List<TSEncoding> tsEncodings = Lists.newArrayList(); List<CompressionType> compressionTypes = Lists.newArrayList(); // List<Map<String, String>> propsList = Lists.newArrayList(); for (String path : paths) { if (!STATIC_PATHS.contains(path)) { if (sessionPool.checkTimeseriesExists(path)) { STATIC_PATHS.add(path); } else { notExistPaths.add(path); tsDataTypes.add(TSDataType.FLOAT); tsEncodings.add(TSEncoding.RLE); compressionTypes.add(CompressionType.SNAPPY); } } } if (CollectionUtil.isNotEmpty(notExistPaths)) { //批量创建时间序列 sessionPool.createMultiTimeseries(notExistPaths, tsDataTypes, tsEncodings, compressionTypes, null, null, null, null); //缓存时间序列 STATIC_PATHS.addAll(notExistPaths); } } catch (IoTDBConnectionException | StatementExecutionException e) { log.error(e.getMessage(), e); } } }
切记:对于iotdb来说,节点的第一层一直到倒数第二层,都属于设备id,只有最后一层才是你的属性