kafka动态生产者

发布时间 2023-06-02 11:10:27作者: Aoul
package com.sunclouder.das.data.kafka.forward;

import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.sunclouder.das.data.kafka.entity.ConfigEntity;
import com.sunclouder.das.data.kafka.entity.DasConfigKafka;
import com.sunclouder.das.data.kafka.service.DasConfigKafkaService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Objects;
import java.util.Properties;

@Slf4j
@Service
public class KafkaForward {

@Autowired
private DasConfigKafkaService dasConfigKafkaService;

private KafkaProducer<String, String> producer;

private String produceTopic;


private ConfigEntity configEntity = new ConfigEntity();


private String ip = "{}:{}";


public void forward(JSONObject dataEvent) {
  try {
    JSONObject entries = JSONUtil.parseObj(dataEvent.get("payload"));
    DasConfigKafka configKafka = dasConfigKafkaService.lambdaQuery()
      .eq(DasConfigKafka::getTenantId, entries.get("tenant").toString())
      .eq(DasConfigKafka::getDeviceId, entries.get("asset").toString())
      .eq(DasConfigKafka::getGetInfoType, "mq")
      .eq(DasConfigKafka::getIsDeleted, 1)
      .one();
      if (Objects.nonNull(configKafka)) {
      String configPort = StrUtil.format(ip, configKafka.getKafkaIp(), configKafka.getKafkaPort());
      configEntity.setTopic(configKafka.getTopics());
      configEntity.setBootstrapServers(configPort);
      if (configKafka.getUserName() != null && configKafka.getPassWord() != null) {
        configEntity.setUserName(configKafka.getUserName());
        configEntity.setPassword(configKafka.getPassWord());
          }
      sendToSgl(configEntity, "", dataEvent, configKafka);
      }
    } catch (Exception e) {
        log.error("转发异常,", e);
      }
    }


/**
* 实例化kafkaTemplate
*
* @param configEntity
* @param key
* @param value
*/
public void sendToSgl(ConfigEntity configEntity, String key, JSONObject value, DasConfigKafka configKafka) {
      Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, configEntity.getBootstrapServers());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put("group.id", configEntity.getGroupId() == null ? "" : configEntity.getGroupId());
      if (configEntity.getUserName() != null && configEntity.getPassword() != null) {
         props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
         props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
         props.put(SaslConfigs.SASL_JAAS_CONFIG, configEntity.getSaslJaasConfig().replace("YH", configEntity.getUserName()).replace("MM", configEntity.getPassword()));
        }
         props.put("enable.auto.commit", "true");
           props.put("auto.commit.interval.ms", "1000");
           props.put("session.timeout.ms", "30000");
           props.put("auto.offset.reset", "earliest");
           this.producer = new KafkaProducer<String, String>(props);
           this.produceTopic = configEntity.getTopic();
            startSend(key, value, configKafka);
    }

/**
* 下发消息
*
* @param vin
* @param value
*/
public void startSend(String vin, JSONObject value, DasConfigKafka configKafka) {
    try {
        producer.send(new ProducerRecord<String, String>(produceTopic, vin, JSONUtil.toJsonStr(value)), new Callback() {
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if (e == null) {
          // 消息发送成功
            if (configKafka.getOnlineOrNot() != 1) {
              configKafka.setOnlineOrNot(1);
              dasConfigKafkaService.updateById(configKafka);
              }
              log.info(configKafka.getKafkaIp() + "-->消息转发成功.");
              } else {
              // 执行错误逻辑处理
            if (configKafka.getOnlineOrNot() != 0) {
              configKafka.setOnlineOrNot(0);
              dasConfigKafkaService.updateById(configKafka);
              }
               log.info(configKafka.getKafkaIp() + "-->消息转发失败.");
              }
              }
              });
              // 异步处理 资源变更通知,避免阻塞线程
               new Thread(() -> log.info("===============信息内容====================" + value)).start();
              } catch (Exception e) {
                e.printStackTrace();
              } finally {
                  producer.close();
                }
               }
              }