SeaTunnel V2.3.1源码分析--zeta引擎启动过程分析

发布时间 2023-06-02 12:30:13作者: 田野与天

今天主要看SeaTunnel自研的数据同步引擎,叫Zeta。

首先,如果使用的是zeta引擎,那么第一步一定是运行bin/seatunnel-cluster.sh脚本,这个脚本就是启动zeta的服务端的。

打开seatunnel-cluster.sh看看,可以看到其实是去启动seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelServer.java中的main()方法

这个就是zeta的核心启动方法了。

大家可以先打开自己看看,看了之后可能会一头雾水,怎么就2句代码

ServerCommandArgs serverCommandArgs = CommandLineUtils.parse( args, new ServerCommandArgs(), EngineType.SEATUNNEL.getStarterShellName(), true);
SeaTunnel.run(serverCommandArgs.buildCommand());


就完事了?

其实应该先看看new ServerCommandArgs()这部分,这里返回了一个ServerCommandArgs类,
然后接着应该看看serverCommandArgs.buildCommand(),这里buildCommand()方法是关键。我们把代码贴上来

    @Override
    public Command<?> buildCommand() {
        return new ServerExecuteCommand(this);
    }

接着贴new  ServerExecuteCommand(this)的代码:

public class ServerExecuteCommand implements Command<ServerCommandArgs> {

    private final ServerCommandArgs serverCommandArgs;

    public ServerExecuteCommand(ServerCommandArgs serverCommandArgs) {
        this.serverCommandArgs = serverCommandArgs;
    }

    ······
}

这里ServerExecuteCommand()方法,返回了一个ServerExecuteCommand类。

回到最初的2句代码,可以看到返回的ServerExecuteCommand类会被传递给SeaTunnel.run()方法,这个方法其实最终调用的就是ServerExecuteCommand类的execute()方法。

ServerCommandArgs serverCommandArgs = CommandLineUtils.parse( args, new ServerCommandArgs(), EngineType.SEATUNNEL.getStarterShellName(), true);
SeaTunnel.run(serverCommandArgs.buildCommand());

所以接下来我们就继续分析ServerExecuteCommand类的execute()方法。

    @Override
    public void execute() {
        SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
        if (StringUtils.isNotEmpty(serverCommandArgs.getClusterName())) {
            seaTunnelConfig.getHazelcastConfig().setClusterName(serverCommandArgs.getClusterName());
        }
        HazelcastInstanceFactory.newHazelcastInstance(
                seaTunnelConfig.getHazelcastConfig(),
                Thread.currentThread().getName(),
                new SeaTunnelNodeContext(seaTunnelConfig));
    }

这个方法最重要的是执行了 HazelcastInstanceFactory.newHazelcastInstance( seaTunnelConfig.getHazelcastConfig(), Thread.currentThread().getName(), new SeaTunnelNodeContext(seaTunnelConfig));

这行代码比较复杂,最重要的就是这句代码中的new SeaTunnelNodeContext(seaTunnelConfig),这里会返回一个SeaTunnelNodeContext类,这个类是继承自Hazelcast这个组件的DefaultNodeContext类。在Hazelcast启动的过程中,会去调用DefaultNodeContext类的实现类的createNodeExtension()方法,在这里其实也就是SeaTunnelNodeContext类的createNodeExtension()方法。这里不具体展开讲解Hazelcast类,大家可以去查一下其他Hazelcast资料。

然后我们接着分析createNodeExtension()方法,贴一下代码:

@Override
public NodeExtension createNodeExtension(@NonNull Node node) {
    return new org.apache.seatunnel.engine.server.NodeExtension(node, seaTunnelConfig);
}

这里跟踪进去:

public NodeExtension(@NonNull Node node, @NonNull SeaTunnelConfig seaTunnelConfig) {
    super(node);
    extCommon = new NodeExtensionCommon(node, new SeaTunnelServer(seaTunnelConfig));
}

跟踪后发现进到了NodeExtension类的NodeExtension()方法,在NodeExtension()方法中,可以看到调用了new  SeaTunnelServer()方法,好了,这里其实就是SeaTunnelServer,也就是Zeta引擎的启动点了。

 

我们接着分析new SeaTunnelServer()方法:

public SeaTunnelServer(@NonNull SeaTunnelConfig seaTunnelConfig) {
    this.liveOperationRegistry = new LiveOperationRegistry();
    this.seaTunnelConfig = seaTunnelConfig;
    LOGGER.info("SeaTunnel server start...");
}

可以看到这里打印了 SeaTunnel  server  start...

SeaTunnelServer也是继承了Hazelcast的一些方法,比如init()方法,这个方法不用多想,肯定是会被Hazelcast在启动的过程中调用到的。我们接着看看init()方法

@Override
public void init(NodeEngine engine, Properties hzProperties) {
    this.nodeEngine = (NodeEngineImpl) engine;
    // TODO Determine whether to execute there method on the master node according to the deploy
    // type
    taskExecutionService = new TaskExecutionService(nodeEngine, nodeEngine.getProperties());
    nodeEngine.getMetricsRegistry().registerDynamicMetricsProvider(taskExecutionService);
    taskExecutionService.start();
    getSlotService();
    coordinatorService =
            new CoordinatorService(nodeEngine, this, seaTunnelConfig.getEngineConfig());
    monitorService = Executors.newSingleThreadScheduledExecutor();
    monitorService.scheduleAtFixedRate(
            this::printExecutionInfo,
            0,
            seaTunnelConfig.getEngineConfig().getPrintExecutionInfoInterval(),
            TimeUnit.SECONDS);

    seaTunnelHealthMonitor = new SeaTunnelHealthMonitor(((NodeEngineImpl) engine).getNode());
}

这里就看到TaskExecutionService类了,大家应该舒了一口气,因为我们看一下下面这个图片:

可以看到其实SeaTunnelServer最核心的是调用了TaskExecutionService类的start()方法。

紧接着,又会调用getSlotService(),这个方法用于获取哪些槽位可以执行任务。

这是最关键的2行代码,大家需要自己跟踪研究一下。

这里引用来自一篇官方文章的介绍文字:

 TaskExecutionService 

TaskExecutionService 是一个执行任务的服务,将在每个节点上运行一个实例。它从 JobMaster 接收 TaskGroup 并在其中运行 Task。并维护TaskID->TaskContext,对Task的具体操作都封装在TaskContext中。而Task内部持有OperationService,也就是说Task可以通过OperationService远程调用其他Task或JobMaster进行通信。

CoordinatorService 

CoordinatorService是一个充当协调器的服务,它主要负责处理客户端提交的命令以及切换master后任务的恢复。客户端在提交任务时会找到master节点并将任务提交到CoordinatorService服务上,CoordinatorService会缓存任务信息并等待任务执行结束。当任务结束后再对任务进行归档处理。

SlotService

SlotService是slot管理服务,用于管理集群的可用Slot资源。SlotService运行在所有节点上并定期向master上报资源信息。

 

 

接着分析taskExecutionService.start()方法

public void start() {
    runBusWorkSupplier.runNewBusWork(false);
}

该方法又调用了runNewBusWork(false)方法:

public boolean runNewBusWork(boolean checkTaskQueue) {
    if (!checkTaskQueue || taskQueue.size() > 0) {
        BlockingQueue<Future<?>> futureBlockingQueue = new LinkedBlockingQueue<>();
        CooperativeTaskWorker cooperativeTaskWorker =
                new CooperativeTaskWorker(taskQueue, this, futureBlockingQueue);
        Future<?> submit = executorService.submit(cooperativeTaskWorker);
        futureBlockingQueue.add(submit);
        return true;
    }
    return false;
}

接口看到上面代码中的new cooperativeTaskWorker(taskQueue,this,futureBlockingQueue)方法,

这里新建了一个cooperativeTaskWorker类,这个类对象会被提交到executorService去执行,代码就是下一行的executorService.submit(cooperativeTaskWorker)这个代码。

然后当cooperativeTaskWorker被提交到executorService上的时候,其实是会运行cooperativeTaskWorker这个类的run方法的。所以接下来我们要看cooperativeTaskWorker类中的run方法具体做了什么事情。

public void run() {
    thisTaskFuture = futureBlockingQueue.take();
    futureBlockingQueue = null;
    myThread = currentThread();
    while (keep.get() && isRunning) {
        TaskTracker taskTracker =
                null != exclusiveTaskTracker.get()
                        ? exclusiveTaskTracker.get()
                        : taskqueue.takeFirst();
        TaskGroupExecutionTracker taskGroupExecutionTracker =
                taskTracker.taskGroupExecutionTracker;
        if (taskGroupExecutionTracker.executionCompletedExceptionally()) {
            taskGroupExecutionTracker.taskDone(taskTracker.task);
            if (null != exclusiveTaskTracker.get()) {
                // If it's exclusive need to end the work
                break;
            } else {
                // No action required and don't put back
                continue;
            }
        }
        taskGroupExecutionTracker.currRunningTaskFuture.put(
                taskTracker.task.getTaskID(), thisTaskFuture);
        // start timer, if it's exclusive, don't need to start
        if (null == exclusiveTaskTracker.get()) {
            timer.timerStart(taskTracker);
        }
        ProgressState call = null;
        try {
            // run task
            myThread.setContextClassLoader(
                    executionContexts
                            .get(taskGroupExecutionTracker.taskGroup.getTaskGroupLocation())
                            .getClassLoader());
            call = taskTracker.task.call();
            synchronized (timer) {
                timer.timerStop();
            }
        } catch (InterruptedException e) {
            if (taskGroupExecutionTracker.executionException.get() == null
                    && !taskGroupExecutionTracker.isCancel.get()) {
                taskGroupExecutionTracker.exception(e);
            }
            taskGroupExecutionTracker.taskDone(taskTracker.task);
            logger.warning("Exception in " + taskTracker.task, e);
            if (null != exclusiveTaskTracker.get()) {
                break;
            }
        } catch (Throwable e) {
            // task Failure and complete
            taskGroupExecutionTracker.exception(e);
            taskGroupExecutionTracker.taskDone(taskTracker.task);
            // If it's exclusive need to end the work
            logger.warning("Exception in " + taskTracker.task, e);
            if (null != exclusiveTaskTracker.get()) {
                break;
            }
        } finally {
            // stop timer
            timer.timerStop();
            taskGroupExecutionTracker.currRunningTaskFuture.remove(
                    taskTracker.task.getTaskID());
        }
        // task call finished
        if (null != call) {
            if (call.isDone()) {
                // If it's exclusive, you need to end the work
                taskGroupExecutionTracker.taskDone(taskTracker.task);
                if (null != exclusiveTaskTracker.get()) {
                    break;
                }
            } else {
                // Task is not completed. Put task to the end of the queue
                // If the current work has an exclusive tracker, it will not be put back
                if (null == exclusiveTaskTracker.get()) {
                    taskqueue.offer(taskTracker);
                }
            }
        }
    }
}

这里的代码其实最核心的就是会启动一个while循环,不断去取future队列中是否已经有完成的task,所以刚开始的时候,一般不会进入到while循环,因为还没有启动完成,肯定是去不到future对象的。这里我们会后续再分析,只需要知道在此处启动了一个线程,会不断地去判断是否有task已经做完了,做完了就进入while循环进行后续一系列的处理。我们先回到启动过程的分析上来,这个方法我们后续分析任务执行的时候再深入分析把。

接着就是需要看

getSlotService();

这行代码了。

/** Lazy load for Slot Service */
public SlotService getSlotService() {
    if (slotService == null) {
        synchronized (this) {
            if (slotService == null) {
                SlotService service =
                        new DefaultSlotService(
                                nodeEngine,
                                taskExecutionService,
                                seaTunnelConfig.getEngineConfig().getSlotServiceConfig());
                service.init();
                slotService = service;
            }
        }
    }
    return slotService;
}

首先把getSlotService()方法全部贴上来。

这里会新建一个SlotService 类对象 service,然后会调用该对象的init()方法。

SlotService是一个接口类,它只有一个实现类叫DefaultSlotService类,所以调用SlotService的init()方法就是调用了DefaultSlotService类的init()方法。我们接着看看DefaultSlotService类的init()方法代码。

@Override
public void init() {
    initStatus = true;
    slotServiceSequence = UUID.randomUUID().toString();
    contexts = new ConcurrentHashMap<>();
    assignedSlots = new ConcurrentHashMap<>();
    unassignedSlots = new ConcurrentHashMap<>();
    unassignedResource = new AtomicReference<>(new ResourceProfile());
    assignedResource = new AtomicReference<>(new ResourceProfile());
    scheduledExecutorService =
            Executors.newSingleThreadScheduledExecutor(
                    r ->
                            new Thread(
                                    r,
                                    String.format(
                                            "hz.%s.seaTunnel.slotService.thread",
                                            nodeEngine.getHazelcastInstance().getName())));
    if (!config.isDynamicSlot()) {
        initFixedSlots();
    }
    unassignedResource.set(getNodeResource());
    scheduledExecutorService.scheduleAtFixedRate(
            () -> {
                try {
                    LOGGER.fine(
                            "start send heartbeat to resource manager, this address: "
                                    + nodeEngine.getClusterService().getThisAddress());
                    sendToMaster(new WorkerHeartbeatOperation(getWorkerProfile())).join();
                } catch (Exception e) {
                    LOGGER.warning(
                            "failed send heartbeat to resource manager, will retry later. this address: "
                                    + nodeEngine.getClusterService().getThisAddress());
                }
            },
            0,
            DEFAULT_HEARTBEAT_TIMEOUT,
            TimeUnit.MILLISECONDS);
}

该方法先创建了scheduledExecutorService对象线程池,具体线程池的数量要看配置文件中hz.xxxxxx.seaTunnel.slotService.thread的配置文件中的数量,xxxxxx是Hazelcast实例的名称。

然后创建一个定时器,每隔5秒运行sendToMaster(new WorkerHeartbeatOperation(getWorkerProfile())).join();方法。其实就是心跳。心跳是发给Master的,Master在SeaTunnel中也叫做Coordinater