java api 设置kettle 日志到数据库

发布时间 2023-05-31 13:44:25作者: vx_guanchaoguo0

设置到数据库

       transMeta.addDatabase(getDatabaseMeta(config));

        TransLogTable origTransLogTable = transMeta.getTransLogTable();
        origTransLogTable.setConnectionName(config.getName());
        origTransLogTable.setTableName("r_log_trans");
        origTransLogTable.setLogFieldUsed(true);
        transMeta.setTransLogTable(origTransLogTable);

        StepLogTable origStepLogTable = transMeta.getStepLogTable();
        origStepLogTable.setConnectionName(config.getName());
        origStepLogTable.setTableName("r_log_trans_step");
        transMeta.setStepLogTable(origStepLogTable);
        
        PerformanceLogTable origPerformanceLogTable = transMeta.getPerformanceLogTable();
        origPerformanceLogTable.setConnectionName(config.getName());
        origPerformanceLogTable.setTableName("r_log_trans_running");
        transMeta.setPerformanceLogTable(origPerformanceLogTable);

        ChannelLogTable origChannelLogTable = transMeta.getChannelLogTable();
        origChannelLogTable.setConnectionName(config.getName());
        origChannelLogTable.setTableName("r_log_channel");
        transMeta.setChannelLogTable(origChannelLogTable);

        MetricsLogTable origMetricsLogTable = transMeta.getMetricsLogTable();
        origMetricsLogTable.setConnectionName(config.getName());
        origMetricsLogTable.setTableName("r_log_trans_metrics");
        transMeta.setMetricsLogTable(origMetricsLogTable);

完整操作服务文件

package com.sugon.dataexchangeplatform.services;

import cn.hutool.core.io.resource.ClassPathResource;
import com.sugon.dataexchangeplatform.domain.DataBaseMetaConfig;
import com.sugon.dataexchangeplatform.domain.RequestExecuteTrans;
import com.sugon.dataexchangeplatform.domain.TransJobEntryDomain;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.logging.*;
import org.pentaho.di.job.JobHopMeta;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.job.entries.special.JobEntrySpecial;
import org.pentaho.di.job.entries.success.JobEntrySuccess;
import org.pentaho.di.job.entry.JobEntryCopy;
import org.pentaho.di.repository.ObjectId;
import org.pentaho.di.repository.Repository;
import org.pentaho.di.repository.RepositoryDirectoryInterface;
import org.pentaho.di.repository.kdr.KettleDatabaseRepository;
import org.pentaho.di.repository.kdr.KettleDatabaseRepositoryCreationHelper;
import org.pentaho.di.repository.kdr.KettleDatabaseRepositoryMeta;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransHopMeta;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepIOMetaInterface;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.steps.mergerows.MergeRowsMeta;
import org.pentaho.di.trans.steps.synchronizeaftermerge.SynchronizeAfterMergeMeta;
import org.pentaho.di.trans.steps.tableinput.TableInputMeta;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.io.InputStream;
import java.util.ArrayList;
import java.util.Optional;

import static org.pentaho.di.core.logging.LogLevel.BASIC;


@Service
public class KettleService {

    public void RunTransByName(RequestExecuteTrans executeTrans) throws KettleException {
        KettleEnvironment.init();
        KettleDatabaseRepository repository = RepositoryCon();

        RepositoryDirectoryInterface dir = repository.loadRepositoryDirectoryTree().findDirectory(executeTrans.getDirectory());
        ObjectId objectId = repository.getTransformationID(executeTrans.getTransName(), dir);
        if (objectId == null) {
            System.out.println("找不到任务!");
        }
        TransMeta transMeta = repository.loadTransformation(objectId, null);
        setTransLog(transMeta);

        Trans trans = new Trans(transMeta);
        trans.setLogLevel(BASIC);

        trans.execute(null);
        trans.waitUntilFinished();

//        JobResult result = new JobResult();
//        result.setBatchId(job.getBatchId());
//        result.setErrors(job.getErrors());
//        result.setPassedBatchId(job.getPassedBatchId());
//        result.setStatus(job.getStatus());
//        result.setParams(en);
//        return result;

        if (trans.getErrors() > 0) {
            throw new RuntimeException("There were errors during transformation execution.");
        }
    }

    public void setTransLog(TransMeta transMeta) {

        transMeta.addDatabase(getDatabaseMeta(config));

        TransLogTable origTransLogTable = transMeta.getTransLogTable();
        origTransLogTable.setConnectionName(config.getName());
        origTransLogTable.setTableName("r_log_trans");
        origTransLogTable.setLogFieldUsed(true);
        transMeta.setTransLogTable(origTransLogTable);

        StepLogTable origStepLogTable = transMeta.getStepLogTable();
        origStepLogTable.setConnectionName(config.getName());
        origStepLogTable.setTableName("r_log_trans_step");
        transMeta.setStepLogTable(origStepLogTable);

        PerformanceLogTable origPerformanceLogTable = transMeta.getPerformanceLogTable();
        origPerformanceLogTable.setConnectionName(config.getName());
        origPerformanceLogTable.setTableName("r_log_trans_running");
        transMeta.setPerformanceLogTable(origPerformanceLogTable);

        ChannelLogTable origChannelLogTable = transMeta.getChannelLogTable();
        origChannelLogTable.setConnectionName(config.getName());
        origChannelLogTable.setTableName("r_log_channel");
        transMeta.setChannelLogTable(origChannelLogTable);

        MetricsLogTable origMetricsLogTable = transMeta.getMetricsLogTable();
        origMetricsLogTable.setConnectionName(config.getName());
        origMetricsLogTable.setTableName("r_log_trans_metrics");
        transMeta.setMetricsLogTable(origMetricsLogTable);
    }

    public void TransStatus(String transName, String path) {
        try {
            KettleEnvironment.init();
            KettleDatabaseRepository repository = RepositoryCon();

            RepositoryDirectoryInterface dir = repository.loadRepositoryDirectoryTree().findDirectory(path);
            ObjectId objectId = repository.getTransformationID(transName, dir);
        } catch (KettleException e) {
            throw new RuntimeException(e);
        }
    }

    public void SaveXmlStreamTransToRepo(InputStream xmlStream, String repoPath) {
        try {
            KettleEnvironment.init();
            KettleDatabaseRepository repository = RepositoryCon();
            TransMeta transMeta = new TransMeta(xmlStream, repository, false, null, null);
            RepositoryDirectoryInterface dir = repository.loadRepositoryDirectoryTree().findDirectory(repoPath);

            RepositoryDirectoryInterface rootDir = repository.loadRepositoryDirectoryTree().findDirectory("/");
            String dirName = "task";
            RepositoryDirectoryInterface subDir = repository.findDirectory(dirName);
            if (!Optional.ofNullable(subDir).isPresent()) {
                RepositoryDirectoryInterface newDir = repository.createRepositoryDirectory(rootDir, dirName);
                repository.createRepositoryDirectory(newDir, "trans");
            }

            String transName = transMeta.getName();
            if (!Optional.ofNullable(transName).isPresent()) {
                throw new RuntimeException("xml格式错误!");
            }

            ObjectId objectId = repository.getTransformationID(transMeta.getName(), dir);
            if (Optional.ofNullable(objectId).isPresent()) {
                throw new RuntimeException("任务已经存在!");
            }

            transMeta.setRepositoryDirectory(dir);
            repository.save(transMeta, "");
        } catch (KettleException e) {
            throw new RuntimeException(e);
        }
    }

    public void SaveXmlFileTransToRepo(String fileName) {
        try {
            ClassPathResource resource = new ClassPathResource("trans/template.ktr");
            String path = resource.getAbsolutePath();

            KettleEnvironment.init();
            TransMeta transMeta = new TransMeta(path);
            KettleDatabaseRepository repository = RepositoryCon();
            repository.save(transMeta, "");

        } catch (KettleException e) {
            throw new RuntimeException(e);
        }
    }


    @Async
    public void RunJob(TransJobEntryDomain job) {
        try {
            KettleEnvironment.init();
            KettleDatabaseRepository repository = RepositoryCon();

            RepositoryDirectoryInterface dir = repository.loadRepositoryDirectoryTree().findDirectory("/");
            ObjectId objectId = repository.getTransformationID(job.getTransName(), dir);
            TransMeta transMeta;
            if (objectId == null) {
                transMeta = GenerateTransformation(job, repository);
                repository.save(transMeta, "");
            } else {
                transMeta = repository.loadTransformation(objectId, null);
            }

            Trans trans = new Trans(transMeta);
            trans.execute(null);
            trans.waitUntilFinished();

            String strLog = KettleLogStore.getAppender().getBuffer().toString();
            System.out.println("==========开始打印日志==========");

            System.out.println(KettleLogStore.getAppender().getBuffer().toString());
            System.out.println("==========日志打印结束==========");
            System.out.println("getLastProcessed:" + trans.getStatus());

            String substring = strLog.substring(strLog.lastIndexOf("I=") + 1);
            String successCount = substring.substring(substring.lastIndexOf("W=") + 2, substring.lastIndexOf("W=") + 3);
            System.out.println("成功数:" + successCount);

            System.out.println("errors:" + trans.getErrors());
            if (trans.getErrors() != 0) {
                System.out.println("执行失败!");
            }
        } catch (KettleException e) {
            throw new RuntimeException(e);
        }
    }


    public DataBaseMetaConfig getConfig() {
        return config;
    }

    @Autowired
    DataBaseMetaConfig config;

    public void CreateRepo() throws KettleException {
        if (!KettleEnvironment.isInitialized()) {
            KettleEnvironment.init();
        }
        KettleDatabaseRepositoryMeta kettleDatabaseRepositoryMeta = new KettleDatabaseRepositoryMeta();
        kettleDatabaseRepositoryMeta.setConnection(getDatabaseMeta(config));
        KettleDatabaseRepository repository = new KettleDatabaseRepository();
        repository.init(kettleDatabaseRepositoryMeta);
        repository.connectionDelegate.connect(true, true);
        KettleDatabaseRepositoryCreationHelper helper = new KettleDatabaseRepositoryCreationHelper(repository);
        helper.createRepositorySchema(null, false, new ArrayList<>(), false);
    }

    public TransMeta GenerateTransformation(TransJobEntryDomain job, KettleDatabaseRepository repository) throws KettleException {

        DatabaseMeta targetDatabaseMeta = getDatabaseMeta(job.getTargetDatabaseMeta());
        DatabaseMeta sourceDatabaseMeta = getDatabaseMeta(job.getSourceDatabaseMeta());
        TransMeta transMeta = buildTransMeta(repository, job.getTransName(), sourceDatabaseMeta, targetDatabaseMeta);

        StepMeta targetTableInput = buildTableInputStep(transMeta, job.getTargetDatabaseMeta().getName(), job.getTargetTableName(), job.getTargetInputName(), job.getTargetInputUseDraw(), job.getTargetInputLocationX(), job.getTargetInputLocationY());
        transMeta.addStep(targetTableInput);

        StepMeta sourceTableInput = buildTableInputStep(transMeta, job.getSourceDatabaseMeta().getName(), job.getSourceTableName(), job.getSourceInputName(), job.getSourceInputUseDraw(), job.getSourceInputLocationX(), job.getSourceInputLocationY());
        transMeta.addStep(sourceTableInput);

        StepMeta mergeStepMeta = buildMergeRowsMeta(job, sourceTableInput, targetTableInput, job.getMergeRowUseDraw(), job.getMergeRowLocationX(), job.getMergeRowLocationY());
        transMeta.addStep(mergeStepMeta);
        transMeta.addTransHop(new TransHopMeta(targetTableInput, mergeStepMeta));
        transMeta.addTransHop(new TransHopMeta(sourceTableInput, mergeStepMeta));

        StepMeta synStepMeta = buildSynchronizeAfterMergeMeta(job, targetDatabaseMeta, job.getSynchronizeUseDraw(), job.getSynchronizeLocationX(), job.getSynchronizeLocationY());
        transMeta.addStep(synStepMeta);
        transMeta.addTransHop(new TransHopMeta(mergeStepMeta, synStepMeta));

        return transMeta;
    }


    public JobMeta GenerateJob(TransJobEntryDomain job, Repository repository) throws KettleException {

        JobMeta jobMeta = buildJobMeta(job);

        JobEntryCopy jobMetaStart = buildStartJobEntry(job);
        jobMeta.addJobEntry(jobMetaStart);

        JobEntryCopy transJobEntry = buildTransJobEntry(job, repository);
        jobMeta.addJobEntry(transJobEntry);

        JobEntryCopy successJobEntry = buildSuccessJobEntry(job);
        jobMeta.addJobEntry(successJobEntry);

        jobMeta.addJobHop(new JobHopMeta(jobMetaStart, transJobEntry));
        jobMeta.addJobHop(new JobHopMeta(transJobEntry, successJobEntry));


//        JobMeta jobMeta = buildJobMeta(job);
//        JobEntryCopy start = buildStartJobEntry(job);
//        JobEntryCopy tran = buildTransJobEntry(job, repository);
//        JobEntryCopy success = buildSuccessJobEntry(job);
//
//        jobMeta.addJobEntry(start);
//        jobMeta.addJobEntry(tran);
//        jobMeta.addJobEntry(success);
//
//        jobMeta.addJobHop(new JobHopMeta(start, tran));
//        jobMeta.addJobHop(new JobHopMeta(tran, success));

        return jobMeta;
    }


    public TransMeta buildTransMeta(KettleDatabaseRepository repository, String transName, DatabaseMeta newDatabaseMeta, DatabaseMeta oldDatabaseMeta) throws KettleException {
        TransMeta transMeta = new TransMeta();
        transMeta.setName(transName);
        transMeta.setRepository(repository);
        transMeta.setName(transName);
        transMeta.setRepositoryDirectory(repository.findDirectory("/"));
        transMeta.addDatabase(repository.getDatabaseMeta());
        transMeta.addDatabase(newDatabaseMeta);
        transMeta.addDatabase(oldDatabaseMeta);
        return transMeta;
    }

    public JobMeta buildJobMeta(TransJobEntryDomain job) {
        JobMeta jobMeta = new JobMeta();
        jobMeta.setName(job.getJobName());
        jobMeta.setJobstatus(0);

        return jobMeta;
    }

    public StepMeta buildTableInputStep(TransMeta transMeta, String inputDataBaseName, String inputTableName, String inputName, Boolean inputUseDraw, Integer inputLocationX, Integer inputLocationY) {

        TableInputMeta targetTableInput = new TableInputMeta();
        DatabaseMeta database = transMeta.findDatabase(inputDataBaseName);
        targetTableInput.setDatabaseMeta(database);
        String old_select_sql = "SELECT * FROM " + inputTableName + " order by id";
        targetTableInput.setSQL(old_select_sql);

        StepMeta inputMetaStep = new StepMeta(inputName, targetTableInput);
        if (inputUseDraw) {
            inputMetaStep.setDraw(true);
            inputMetaStep.setLocation(inputLocationX, inputLocationY);
        }

        return inputMetaStep;

    }

    public static DatabaseMeta getDatabaseMeta(DataBaseMetaConfig meta) {
        DatabaseMeta databaseMeta = new DatabaseMeta(meta.getName(), meta.getType(), meta.getAccess(), meta.getHost(), meta.getDb(), meta.getPort(), meta.getUser(), meta.getPass());
        databaseMeta.addExtraOption(meta.getType(), "useSSL", "false");
        databaseMeta.addExtraOption(meta.getType(), "serverTimezone", "Asia/Shanghai");
        databaseMeta.addExtraOption(meta.getType(), "characterEncoding", "utf8");
        return databaseMeta;
    }

    public StepMeta buildMergeRowsMeta(TransJobEntryDomain job, StepMeta sourceStepMeta, StepMeta targetStepMeta, Boolean inputUseDraw, Integer inputLocationX, Integer inputLocationY) {

        MergeRowsMeta mergeRowsMeta = new MergeRowsMeta();
        StepIOMetaInterface stepIOMeta = mergeRowsMeta.getStepIOMeta();
        stepIOMeta.getInfoStreams().get(0).setStepMeta(sourceStepMeta);
        stepIOMeta.getInfoStreams().get(1).setStepMeta(targetStepMeta);
        mergeRowsMeta.setFlagField(job.getMergeRowFlagField());
        mergeRowsMeta.setKeyFields(job.getMergeRowKeyFields());
        mergeRowsMeta.setValueFields(job.getColumns());

        StepMeta mergeRowsStep = new StepMeta(job.getMergeRowName(), mergeRowsMeta);
        if (inputUseDraw) {
            mergeRowsStep.setDraw(true);
            mergeRowsStep.setLocation(inputLocationX, inputLocationY);
        }

        return mergeRowsStep;
    }

    public StepMeta buildSynchronizeAfterMergeMeta(TransJobEntryDomain job, DatabaseMeta targetDatabaseMeta, Boolean inputUseDraw, Integer inputLocationX, Integer inputLocationY) {

        SynchronizeAfterMergeMeta synchronizeAfterMergeMeta = new SynchronizeAfterMergeMeta();
        synchronizeAfterMergeMeta.setCommitSize(job.getSynchronizeCommitSize());
        synchronizeAfterMergeMeta.setDatabaseMeta(targetDatabaseMeta);
        synchronizeAfterMergeMeta.setSchemaName(job.getSynchronizeSchemaName());
        synchronizeAfterMergeMeta.setTableName(job.getTargetTableName());
        synchronizeAfterMergeMeta.setUseBatchUpdate(job.getSynchronizeUseBatchUpdate());
        synchronizeAfterMergeMeta.setKeyLookup(job.getSynchronizeKeyLookup());
        synchronizeAfterMergeMeta.setKeyStream(job.getSynchronizeKeyStream());
        synchronizeAfterMergeMeta.setKeyStream2(job.getSynchronizeKeyStream2());
        synchronizeAfterMergeMeta.setKeyCondition(job.getSynchronizeKeyCondition());

        Boolean[] updateOrNot = new Boolean[job.getColumns().length];
        for (int i = 0; i < job.getColumns().length; i++) {
            updateOrNot[i] = true;
        }
        updateOrNot[0] = false;
        synchronizeAfterMergeMeta.setUpdateLookup(job.getColumns());
        synchronizeAfterMergeMeta.setUpdateStream(job.getColumns());
        synchronizeAfterMergeMeta.setUpdate(updateOrNot);

        synchronizeAfterMergeMeta.setOperationOrderField(job.getSynchronizeOperationOrderField());
        synchronizeAfterMergeMeta.setOrderInsert(job.getSynchronizeOrderInsert());
        synchronizeAfterMergeMeta.setOrderUpdate(job.getSynchronizeOrderUpdate());
        synchronizeAfterMergeMeta.setOrderDelete(job.getSynchronizeOrderDelete());
        StepMeta mergeRowsStep = new StepMeta(job.getSynchronizeName(), synchronizeAfterMergeMeta);

        if (inputUseDraw) {
            mergeRowsStep.setDraw(true);
            mergeRowsStep.setLocation(inputLocationX, inputLocationY);
        }

        return mergeRowsStep;
    }

    public JobEntryCopy buildStartJobEntry(TransJobEntryDomain job) {

        JobEntrySpecial jobEntrySpecial = new JobEntrySpecial();
        jobEntrySpecial.setName(job.getJobEntryStartName());
        jobEntrySpecial.setStart(job.getJobEntryStartUseStart());
        if (job.getSynchronizeUseRepeat()) {
            jobEntrySpecial.setRepeat(job.getSynchronizeUseRepeat());
            jobEntrySpecial.setIntervalSeconds(job.getSynchronizeIntervalSeconds());
        }

        JobEntryCopy jobEntryCopy = new JobEntryCopy(jobEntrySpecial);
        if (job.getSynchronizeUseDraw()) {
            jobEntryCopy.setDrawn();
            jobEntryCopy.setLocation(job.getSynchronizeLocationX(), job.getSynchronizeLocationY());
        }

        return jobEntryCopy;

    }

    public JobEntryCopy buildSuccessJobEntry(TransJobEntryDomain job) {

        JobEntrySuccess jobEntrySuccess = new JobEntrySuccess();
        jobEntrySuccess.setName(job.getJobEntrySuccessName());
        JobEntryCopy jobEntryCopy = new JobEntryCopy(jobEntrySuccess);
        if (job.getJobEntrySuccessUseDraw()) {
            jobEntryCopy.setDrawn();
            jobEntryCopy.setLocation(job.getJobEntrySuccessLocationX(), job.getJobEntrySuccessLocationY());
        }
        return jobEntryCopy;
    }

    public JobEntryCopy buildTransJobEntry(TransJobEntryDomain job, Repository repository) {

        JobEntrySpecial jobEntrySpecial = new JobEntrySpecial();
        jobEntrySpecial.setName(job.getJobEntryStartName());
        jobEntrySpecial.setStart(job.getJobEntryStartUseStart());
        if (job.getSynchronizeUseRepeat()) {
            jobEntrySpecial.setRepeat(true);
            jobEntrySpecial.setIntervalSeconds(job.getSynchronizeIntervalSeconds());
        }

        JobEntryCopy jobEntryCopy = new JobEntryCopy(jobEntrySpecial);
        jobEntryCopy.setName(job.getJobEntryTransName());
        if (job.getJobEntryTransUseDraw()) {
            jobEntryCopy.setDrawn();
            jobEntryCopy.setLocation(job.getJobEntryTransLocationX(), job.getJobEntryTransLocationY());
        }

        return jobEntryCopy;
    }


    public KettleDatabaseRepository RepositoryCon() throws KettleException {

        if (!KettleEnvironment.isInitialized()) {
            try {
                KettleEnvironment.init();
            } catch (KettleException e) {
                e.printStackTrace();
            }
        }
        KettleDatabaseRepositoryMeta kettleDatabaseRepositoryMeta = new KettleDatabaseRepositoryMeta();
        kettleDatabaseRepositoryMeta.getRepositoryCapabilities();

        DatabaseMeta kettleDatabase = getDatabaseMeta(config);
        kettleDatabaseRepositoryMeta.setConnection(kettleDatabase);
        KettleDatabaseRepository kettleDatabaseRepository = new KettleDatabaseRepository();
        kettleDatabaseRepository.init(kettleDatabaseRepositoryMeta);
//        kettleDatabaseRepository.connect("admin", "sugon@666#");
        kettleDatabaseRepository.connect("admin", "admin");
        if (kettleDatabaseRepository.isConnected()) {
            System.out.println("连接成功");
            return kettleDatabaseRepository;
        } else {
            System.out.println("连接失败");
            return null;
        }
    }
}