前言
领域驱动设计基于CQRS的事件架构, 可以使命令发起者和命令执行者解耦。通过一系列的事件的追加存储,可以对事件的追踪和溯源。采用事件架构模式,更加面向与业务职能,将复杂的业务场景拆分成不同事件执行,在一定程度上达到解耦和复用的目的。事件的发起者和执行分离,解耦下游的相关的系统,下游只需要监听关注的事件即可。
概念解释
Command:命令对象。
Event:事件接口。
EventStore:事件存储。
EventBus:消息总线。一般可以用MQ。
EventBody:事件存储实体。
INotificationPublisher:事件发布者。
INotificationListener:事件订阅消息者。
EventSourcingInfoDO:事件存储DO对象。
EventPublishedTrackerDO:事件发布追踪者DO对象。
总体设计架构图
事件驱动架构设计
事件持久化对象设计
EventPublishedTrackerDO
点击查看代码
public class EventPublishedTrackerDO {
private static final long serialVersionUID = 1L;
/**
* 主键
*/
private Long id;
/**
* 事件溯源编号
*/
private Long mostRecentNotifictionId;
/**
* 事件溯源event_id
*/
private String mostRecentTrackerId;
/**
* 以标记名称作为事件发送分割标记
*/
private String tagName;
}
EventSourcingInfoDO
点击查看代码
public class EventSourcingInfoDO {
private static final long serialVersionUID = 1L;
/**
* 主键
*/
private Long id;
/**
* 事件标识
*/
private String eventId;
/**
* 标记名称-一般为表名或者领域名称
*/
private String tagName;
/**
* 持久化编号-每张表的主键或者领域标识
*/
private String persistenceId;
/**
* 事件类型
*/
private String eventType;
/**
* 事件实体
*/
private byte[] eventBody;
/**
* 事件版本
*/
private Integer eventVersion;
/**
* 事件发送者名称
*/
private String sender;
/**
* 事件发送者ID
*/
private Long senderId;
/**
* 事件状态
*/
private Integer eventState;
}
构建Command和Event
首先抽象command命令对象
点击查看代码
public abstract class AbstractDomainCommand implements DomainCommand {
@Default
private Date occurredOn = new Date();//发起时间
private UserId commder;//发起用户
}
interface DomainCommand {
Date occurredOn();
UserId commder();
}
接下来,我们以一个商品销项税的变更举例,销项税变更的ProductTaxCodeUpdateCommand对象
点击查看代码
public class ProductTaxCodeUpdateCommand extends AbstractDomainCommand {
private ProductId productId;//商品实体ID类
private String productTaxCode;//税码如:13
public ProductTaxCodeUpdateEvent toEvent(ProductAggregateRoot product) {
ProductTaxCodeUpdateEvent event = new ProductTaxCodeUpdateEvent();
event.eventBody(
ProductTaxCodeUpdateEventBody
.builder()
.persistenceId(product.identify())
.productTaxCode(product.productTaxCode())
.build());
event.sender(this.commder());
return event;
}
销项税SalesTax值对象
点击查看代码
//税码接口
public interface Tax {
default TaxCodeId id() {return null;}
default String code() {return null;}
default BigDecimal tax() {return null; }
}
//通用值对象类
public class ValueObject implements Serializable {
private static final long serialVersionUID = -5984778999781379523L;
}
//税码值对象
public class SalesTax extends ValueObject implements Tax {
private static final long serialVersionUID = -4283492714059316843L;
private TaxCodeId id;
private String code;
private BigDecimal tax;
}
税码更新事件ProductSaleTaxUpdatedEvent对象及ProductTaxCodeUpdateEventBody
点击查看代码
//领域事件接口
public interface DomainEvent<EventBody extends IEventBody> extends Serializable{
DomainEventId eventId();
String tag();
String eventType();
EventBody eventBody();
Date occurredOn();
UserId sender();
Integer eventVersion();
}
public class ProductTaxCodeUpdateEvent extends ProdcutEvent<ProductTaxCodeUpdateEventBody> implements DomainEvent {
private static final long serialVersionUID = -4770564663557952597L;
}
public class ProductTaxCodeUpdateEventBody extends ProdcutEventBody implements IEventBody{
private String productTaxCode;
}
构建聚合根对象
商品聚合根对象ProductAggregateRoot
点击查看代码
//聚合根抽象父类
public abstract class AggregateRoot<DomainIdentifyType extends DomainIdentify> implements
Serializable, IVersion {
private DomainIdentifyType identify;//领域实体ID对象
private Integer version;//版本
private UserId holder;//操作用户
}
//商品聚合根对象
public static class ProductAggregateRoot extends AggregateRoot<ProductId> {
/**
* 产品编码
*/
private String productCode;
/**
* 产品名称
*/
private String productName;
/**
* 产品税务编码
*/
private String productTaxCode;
....
/**
* 品牌编号,来自产品配置-品牌管理
*/
private BrandId brandId;
/**
* 备案税率
*/
private SalesTax salesTax;
....
}
构建领域服务
商品领域服务ProductService对象
点击查看代码
public final class ProductService {
@autoAuired
private ProductDomainRepository repository;
@autoAuired
private IMediator mediator;
....
//通过商品ID获取聚合根对象Option包装
private Option<ProductAggregateRoot> getState(ProductId id) {
return repository.getById(id);
}
//更新商品聚合根对象
public static Try<ProductAggregateRoot> updateProductTaxCode(ProductAggregateRoot product
, String productTaxCode) {
return Try.success(product.toBuilder()
.productTaxCode(productTaxCode)
.build());
}
//聚合根操作用户持有者(通用方法)
public static AggregateRoot build(AggregateRoot root, UserId holder) {
return root.toBuilder().holder(holder).build();
}
//商品销项税变更
public void when(ProductSaleTaxUpdateCommand aCommand) {
getState(aCommand.productId())
.peek(product -> {
updateSalesTax(product, aCommand.salesTax())
.map(product -> build(product, aCommand.commder()))
.andThen(repository::update)
.andThen(product -> mediator.saveAndPublish(aCommand.toEvent(product)))
.onFailure(ThrowableConsumer.COMMON);
});
}
....
}
事件存储和发送
事件存储值对象StoredEvent及仓库接口IEventStore
点击查看代码
/**
* 事件存储对象
*/
@Data
public class StoredEvent {
/**
* 事件标识
*/
private String eventId;
/**
* 标记名称-一般为表名或者领域名称
*/
private String tagName;
/**
* 持久化编号-每张表的主键或者领域标识
*/
private String persistenceId;
/**
* 事件类型
*/
private String eventType;
/**
* 事件实体
*/
private byte[] eventBody;
/**
* 事件版本
*/
private Integer eventVersion;
private String sender;
private Long senderId;
private Date storedDate;
}
//事件持久化仓储接口
public interface IEventStore {
void appand(DomainEvent event);
.....
}
IMediator接口是实现事件存储和事件发布中间协调者。