java把数据批量插入iotdb

发布时间 2023-11-23 14:53:53作者: 代码吴彦祖
package com.xlkh.kafka;

import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.pool.SessionPool;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.stream.Collectors;


@Slf4j
@Component("com.xlkh.kafka.DataLoaderKafkaConsumer")
public class DataLoaderKafkaConsumer {


    @Autowired
    private SessionPool sessionPool;

    /**
     * 保存已经创建的时间序列
     */
    private final static Set<String> STATIC_PATHS = Sets.newConcurrentHashSet();

    /**
     * fluent_data,批量消费
     */
    @KafkaListener(topics = "fluent_data", groupId = "fluent_data_demo", containerFactory = "batchFactory")
    @SneakyThrows
    public void listenBatchByFluent(List<ConsumerRecord<String, String>> records) {
        log.error("从kafka消费fluent数据" + records.size() + "条,当前偏移量:" + records.get(0).offset());

        //创建时间序列,如果序列已经存在,不再重新创建
        createTimeseriesIfNotExist(records);

        log.info("开始把数据放到iotdb-----------------------");

        insertIotdbByKafka(records);


    }

    private void insertIotdbByKafka(List<ConsumerRecord<String, String>> records) throws ParseException, IoTDBConnectionException, StatementExecutionException {

        //key为kks的路径,value是时间戳集合
        Map<String, List<Long>> timeStampMap = new HashMap<>();

        //key为kks路径,value是具体的数据
        Map<String, List<Float>> values = new HashMap<>();

        //保存kks编码
        Set<String> kksSet = new HashSet<>();

        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        for (ConsumerRecord<String, String> record : records) {
            String valueData = record.value();
            JSONArray jsonArray = JSON.parseArray(valueData);
            for (int i = 0; i < jsonArray.size(); i++) {
                Map map = JSON.parseObject(String.valueOf(jsonArray.get(i)), Map.class);
                JSONArray jsonArray1 = JSON.parseArray(String.valueOf(map.get("msg")));
                for (int j = 0; j < jsonArray1.size(); j++) {
                    Map map1 = JSON.parseObject(String.valueOf(jsonArray1.get(j)), Map.class);
                    if (map1.containsKey("big_v")){
                        String kks = map1.get("kks").toString();
                        Date date = dateFormat.parse(map1.get("time").toString());
                        Float val = Float.parseFloat(String.valueOf(map1.get("big_v")));
                        if(!kksSet.contains(kks)){
                            timeStampMap.put(kks, new ArrayList<>());
                            values.put(kks, new ArrayList<>());
                            kksSet.add(kks);
                        }
                        timeStampMap.get(kks).add(date.getTime());
                        values.get(kks).add(val);
                    }
                }
            }
        }

        //遍历批量插入每个设备的数据
        for (String kks : kksSet) {
            List<Long> longs = timeStampMap.get(kks);

            //声明Tablet对象设备属性
            List<MeasurementSchema> schemas = new ArrayList<>();
            schemas.add(new MeasurementSchema(kks,TSDataType.FLOAT));
            Tablet tablet = new Tablet("root.param.demo", schemas, longs.size());
            for (int row = 0; row < longs.size(); row++) {
                int rowIndex = tablet.rowSize++;
                //设备时间戳值
                tablet.addTimestamp(rowIndex, longs.get(row));
                //设置对应的值
                tablet.addValue(schemas.get(0).getMeasurementId(), rowIndex, values.get(kks).get(row));
            }
            //批量插入数据
            sessionPool.insertTablet(tablet);
        }
        log.info("数据成功插入到iotdb-----------------------"+"插入的数据量大小为:"+records.size());

    }


    /**
     * 创建时间序列,如果序列已经存在,不再重新创建
     *
     * @param records 批量数据
     */
    private void createTimeseriesIfNotExist(List<ConsumerRecord<String, String>> records) {
        try {
            List<String> data = records.stream().map(ConsumerRecord::value).collect(Collectors.toList());
            HashSet<String> paths = Sets.newHashSetWithExpectedSize(250);
            for (String msg : data) {

                JSONArray jsonArray = JSON.parseArray(msg);

                for (int i = 0; i <jsonArray.size() ; i++) {
                    Map map = JSON.parseObject(String.valueOf(jsonArray.get(i)), Map.class);
                    JSONArray jsonArray1 = JSON.parseArray(String.valueOf(map.get("msg")));
                    for (int j = 0; j < jsonArray1.size(); j++){
                        Map map1 = JSON.parseObject(String.valueOf(jsonArray1.get(j)), Map.class);
                        String kks = map1.get("kks").toString();
                        String path = "root.param.demo." + kks;
                        paths.add(path);
                    }
                }
            }

            List<String> notExistPaths = Lists.newArrayList();
            List<TSDataType> tsDataTypes = Lists.newArrayList();
            List<TSEncoding> tsEncodings = Lists.newArrayList();
            List<CompressionType> compressionTypes = Lists.newArrayList();
//            List<Map<String, String>> propsList = Lists.newArrayList();
            for (String path : paths) {
                if (!STATIC_PATHS.contains(path)) {
                    if (sessionPool.checkTimeseriesExists(path)) {
                        STATIC_PATHS.add(path);
                    } else {
                        notExistPaths.add(path);
                        tsDataTypes.add(TSDataType.FLOAT);
                        tsEncodings.add(TSEncoding.RLE);
                        compressionTypes.add(CompressionType.SNAPPY);
                    }
                }
            }
            if (CollectionUtil.isNotEmpty(notExistPaths)) {
                //批量创建时间序列
                sessionPool.createMultiTimeseries(notExistPaths, tsDataTypes, tsEncodings, compressionTypes, null, null, null, null);
                //缓存时间序列
                STATIC_PATHS.addAll(notExistPaths);
            }
        } catch (IoTDBConnectionException | StatementExecutionException e) {
            log.error(e.getMessage(), e);
        }
    }


}

 切记:对于iotdb来说,节点的第一层一直到倒数第二层,都属于设备id,只有最后一层才是你的属性