【Azure 事件中心】Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially 异常

发布时间 2023-04-26 20:59:03作者: 路边两盏灯

问题描述

开发Java Spring Cloud应用,需要发送消息到Azure Event Hub中。使用 Spring Cloud Stream Event Hubs Binder 依赖,应用执行一会就会遇见报错:reactor.core.publisher.Sinks$EmissionException: Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.

 

问题解答

从错误来看,这明显是多线程并发处理时,多个线程同时触发了onSubscribe 或 onNext 或 onError 或 onComplete 事件,而这些事件在与 Subscriber处理时只能一个一个串行处理。

因为SpringCloud的 Controller 并发请求时,会分配多个线程同时调用many.emitNext(),这时如果之前请求线程处理还未结束,新请求的线程会直接这样的报错。

异常产生的代码为:

private final Sinks.EmitFailureHandler emitFailureHandler = (signalType, emitResult) -> emitResult.equals(Sinks.EmitFailureHandler.FAIL_FAST);  

Sinks 对 FAIL_FAST 和 FAIL_NON_SERIALIZED的枚举值说明参考:reactor-core/Sinks.java at main · reactor/reactor-core · GitHub

  • FAIL_FAST:表示对失败不会进行任何重试,会马上触发异常处理机制,这里就是抛出EmissionException异常。( A pre-made handler that will not instruct to retry any failure and trigger the failure handling immediately.)
  • FAIL_NON_SERIALIZED:表示会持续重试,直至成功。

如果发送到Event Hub的消息允许丢失,可以通过Try Catch捕获异常后记录日志即可。

但是,如果发送的消息不能丢失,必须成功传递到Event Hub中,就可以使用 FAIL_NON_SERIALIZED 模式。

修改为:

private final Sinks.EmitFailureHandler emitFailureHandler = (signalType, emitResult) -> emitResult.equals(Sinks.EmitResult.FAIL_NON_SERIALIZED);  

 

 

参考资料

ReactorDispatcher: Making sure spec 1.3 is not violated and under race, signals are not lost upon concurrent ClosedChannelException : https://github.com/Azure/azure-sdk-for-java/issues/27320

FAIL NON SERIALIZED : 

https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/Handler.java#L89

https://github.com/reactor/reactor-core/blob/main/reactor-core/src/main/java/reactor/core/publisher/Sinks.java#L118

/**
* Has successfully emitted the signal
*/
OK,
/**
* Has failed to emit the signal because the sink was previously terminated successfully or with an error
*/
FAIL_TERMINATED,
/**
* Has failed to emit the signal because the sink does not have buffering capacity left
*/
FAIL_OVERFLOW,
/**
* Has failed to emit the signal because the sink was previously interrupted by its consumer
*/
FAIL_CANCELLED,
/**
* Has failed to emit the signal because the access was not serialized
*/
FAIL_NON_SERIALIZED,
/**
* Has failed to emit the signal because the sink has never been subscribed to has no capacity
* to buffer the signal.
*/
FAIL_ZERO_SUBSCRIBER;