LiteFlow — 并行编排与异步超时

发布时间 2023-07-17 21:49:52作者: DaleLee

本文使用的 LiteFlow 版本为 2.10.5

LiteFlow 简介

LiteFlow 是一个编排式的规则引擎框架,组件编排,帮助解耦业务代码,让每一个业务片段都是一个组件。

LiteFlow 官网 https://liteflow.yomahub.com

LiteFlow 使用 WHEN 关键字定义并行编排。

<chain name="chain1">
    WHEN(a, b, c);
</chain>

这样 a、b、c三个组件就是以并行方式运行()

组件定义见 https://liteflow.yomahub.com/pages/8486fb/

LiteFlow还提供了一些自关键字细化并行编排的控制。

  • ignoreError(默认为false)来提供忽略错误的特性。
  • any(默认为false)用来提供并行流程中,任一条分支先执行完即忽略其他分支的特性。
<chain name="chain1">
    THEN(
        a,
        WHEN(b, c, d).ignoreError(true),
        e
    );
</chain>
<chain name="chain2">
    THEN(
        a,
        WHEN(b, THEN(c, d), e).any(true),
        f
    );
</chain>

更多细节见 https://liteflow.yomahub.com/pages/b3446a

源码分析

异步超时

LiteFlow 的并行编排是通过 WhenCondition 实现的。

/**
 * 并行器
 *
 * @author Bryan.Zhang
 */
public class WhenCondition extends Condition {
	// 只在when类型下有效,以区分当when调用链调用失败时是否继续往下执行 默认false不继续执行
	private boolean ignoreError = false;

	// 只在when类型下有效,为true的话说明在多个并行节点下,任意一个成功,整个when就成功
	private boolean any = false;

	// when单独的线程池名称
	private String threadExecutorClass;
	// ...
}

其并行逻辑定义在方法 executeAsyncCondition()。其主要实现过程如下:
获取并行线程池。

// 此方法其实只会初始化一次Executor,不会每次都会初始化。Executor是唯一的
ExecutorService parallelExecutor = ExecutorHelper.loadInstance()
	.buildWhenExecutor(this.getThreadExecutorClass());

获得最大超时等待时间。

Integer whenMaxWaitTime;
TimeUnit whenMaxWaitTimeUnit;
if (ObjectUtil.isNotNull(liteflowConfig.getWhenMaxWaitSeconds())){
	whenMaxWaitTime = liteflowConfig.getWhenMaxWaitSeconds();
	whenMaxWaitTimeUnit = TimeUnit.SECONDS;
	}else{
	whenMaxWaitTime = liteflowConfig.getWhenMaxWaitTime();
	whenMaxWaitTimeUnit = liteflowConfig.getWhenMaxWaitTimeUnit();
}

LiteFlow 的并行编排是有最大超时等待时间的,可以在配置文件中进行设置,默认为 15 秒。

接下来就到并行编排的核心逻辑了。并行编排在底层是使用 CompleteableFuture 实现的。

List<CompletableFuture<WhenFutureObj>> completableFutureList = this.getExecutableList()
		.stream()
		.filter(executable -> !(executable instanceof PreCondition) && !(executable instanceof FinallyCondition))
		.filter(executable -> {
			try {
				return executable.isAccess(slotIndex);
			} catch (Exception e) {
				LOG.error("there was an error when executing the when component isAccess", e);
				return false;
			}
		})
		.map(executable -> CompletableFutureTimeout.completeOnTimeout(
				WhenFutureObj.timeOut(executable.getId()),
				CompletableFuture.supplyAsync(new ParallelSupplier(executable, currChainName, slotIndex),
						parallelExecutor),
				whenMaxWaitTime, whenMaxWaitTimeUnit))
		.collect(Collectors.toList());

上面这段代码主要做了以下工作:

  1. 首先遍历 WhenConditon 中的可执行组件(比如上面的 a、b、c),过滤掉前置组件、后置组件以及不可访问的组件。
  2. 将每一个可执行组件Executable 转换为 CompletableFuture<WhenFutureObj>,执行组件并进行超时控制。
  3. 将流处理的结果转化为 List。

下面的代码会涉及到 CompleteableFuture 的以下方法。

方法 参数 描述
supplyAsync Supplier<U> 执行异步任务并返回结果
applyToEither CompletableFuture<T>,T-> V 当其中一个对象的结果可用时,将结果传递给指定的函数
exceptionally Throwable -> T 处理产生的异常
completeExceptionally Throwable 如果任务未完成,则导致对 get() 和相关方法的调用抛出给定的异常
static allOf CompletableFuture<?> ... 在所有Future对象结束后结束,并返回 Object 结果
static allOf CompletableFuture<?> ... 在任何一个Future对象结束后结束,并返回 Void 结果

异步超时的核心在于 CompletableFutureTimeout.completeOnTimeout()方法。该方法定义如下:

// 哪个先完成就 apply 哪一个结果 这是一个关键的 API, exceptionally 出现异常后返回默认值
public static <T> CompletableFuture<T> completeOnTimeout(T t, CompletableFuture<T> future, long timeout,
		TimeUnit unit) {
	final CompletableFuture<T> timeoutFuture = timeoutAfter(timeout, unit);
	return future.applyToEither(timeoutFuture, Function.identity()).exceptionally((throwable) -> t);
}

这就涉及到了 LiteFlow 的异步超时机制。方法注释中是这样介绍的:

Java 8 的 CompletableFuture 并没有 timeout 机制,虽然可以在 get 的时候指定 timeout,是一个同步堵塞的操作。怎样让 timeout 也是异步的呢?Java 8 内有内建的机制支持,一般的实现方案是启动一个 ScheduledThreadpoolExecutor 线程在 timeout 时间后直接调用 CompletableFuture.completeExceptionally(new TimeoutException()),然后用 acceptEither() 或者 applyToEither() 看是先计算完成还是先超时。

timeoutAfter() 方法定义如下,返回一个 CompletableFuture, timeout 时间后抛出一个超时异常。

public static <T> CompletableFuture<T> timeoutAfter(long timeout, TimeUnit unit) {
	CompletableFuture<T> result = new CompletableFuture<T>();
	// timeout 时间后 抛出TimeoutException 类似于sentinel / watcher
	CompletableFutureTimeout.Delayer.delayer.schedule(() -> result.completeExceptionally(new TimeoutException()),
			timeout, unit);
	return result;
}

LiteFlow 通过这种方法完成了异步的超时控制。先将组件任务和超时任务组合,如果组件任务晚于超时任务完成,timeoutFuture 就会抛出超时异常,exceptionally() 方法接收该异常并转为参数 t;如果组件任务早于超时任务完成,就会返回组件任务原有的返回对象,类型为 T。

继续分析completeOnTimeout()的各参数。当 timeoutFuture 抛出超时异常后,会被转为为 t。WhenCondition 的参数 t 由 WhenFutureObj.timeOut() 方法提供。
WhenFutureObj 定义如下,记录组件任务的执行结果,有成功、失败、超时三种结果。其 timeout() 方法会返回一个带有 WhenTimeOutExption 超时信息的 WhenFutureObj。也就是超时信息被统一处理为了WhenTimeOutExption

public class WhenFutureObj {
	private boolean success;
	private boolean timeout;
	private String executorName;
	private Exception ex;
	// ...
	public static WhenFutureObj timeOut(String executorName) {
		WhenFutureObj result = new WhenFutureObj();
		result.setSuccess(false);
		result.setTimeout(true);
		result.setExecutorName(executorName);
		result.setEx(new WhenTimeoutException(
				StrUtil.format("Timed out when executing the component[{}]",executorName)));
		return result;
	}

第二个参数 future 也是运行组件任务的 CompleteableFuture<WhenFutureObj>,其传入的 ParallelSupplier 实现了 Supplierget() 方法,该方法运行组件并将结果包装成一个WhenFutureObj

/**
 * 并行异步worker对象,提供给CompletableFuture用
 *
 * @author Bryan.Zhang
 * @since 2.6.4
 */
public class ParallelSupplier implements Supplier<WhenFutureObj> {
    // ...
	@Override
	public WhenFutureObj get() {
		try {
			executableItem.setCurrChainId(currChainId);
			executableItem.execute(slotIndex);
			return WhenFutureObj.success(executableItem.getId());
		}
		catch (Exception e) {
			return WhenFutureObj.fail(executableItem.getId(), e);
		}
	}
}

剩余的参数就是超时时间和单位了。

细节处理

上述步骤会得到一个CompletableFuture列表。

List<CompletableFuture<WhenFutureObj>> completableFutureList

接下来判断执行方式,是否要等待任务全部完成

// 如果 any 为 false,说明这些异步任务全部执行好或者超时,才返回
// 如果 any 为 true,说明这些异步任务只要任意一个执行完成,就返回
if (this.isAny()) {
	// 把这些CompletableFuture通过anyOf合成一个CompletableFuture
	resultCompletableFuture = CompletableFuture
		.anyOf(completableFutureList.toArray(new CompletableFuture[] {}));
} else {
	// 把这些CompletableFuture通过allOf合成一个CompletableFuture
	resultCompletableFuture = CompletableFuture
		.allOf(completableFutureList.toArray(new CompletableFuture[] {}));
}

静态方法 allof 和 anyof 可以接受一组 CompleteableFuture 对象。allOf 返回一个CompletableFuture<Void>结果,等待 Future 对象全部结束;anyOf 返回一个CompletableFuture<Object>对象,获得最先完成的结果。

之后会执行 get() 方法,等待任务执行完毕。

// 进行执行,这句执行完后,就意味着所有的任务要么执行完毕,要么超时返回
resultCompletableFuture.get();

之后还有一些异常处理,这里就先不展开了。