领域驱动架构设计之事件篇

发布时间 2023-03-22 21:08:52作者: 曹化金

前言

领域驱动设计基于CQRS的事件架构, 可以使命令发起者和命令执行者解耦。通过一系列的事件的追加存储,可以对事件的追踪和溯源。采用事件架构模式,更加面向与业务职能,将复杂的业务场景拆分成不同事件执行,在一定程度上达到解耦和复用的目的。事件的发起者和执行分离,解耦下游的相关的系统,下游只需要监听关注的事件即可。

概念解释

Command:命令对象。
Event:事件接口。
EventStore:事件存储。
EventBus:消息总线。一般可以用MQ。
EventBody:事件存储实体。
INotificationPublisher:事件发布者。
INotificationListener:事件订阅消息者。
EventSourcingInfoDO:事件存储DO对象。
EventPublishedTrackerDO:事件发布追踪者DO对象。

总体设计架构图

事件驱动架构设计

事件持久化对象设计

EventPublishedTrackerDO

点击查看代码
public class EventPublishedTrackerDO {

    private static final long serialVersionUID = 1L;

    /**
     * 主键
     */
    private Long id;

    /**
     * 事件溯源编号
     */
    private Long mostRecentNotifictionId;

    /**
     * 事件溯源event_id
     */
    private String mostRecentTrackerId;

    /**
     * 以标记名称作为事件发送分割标记
     */
    private String tagName;
}

EventSourcingInfoDO

点击查看代码

public class EventSourcingInfoDO  {

  private static final long serialVersionUID = 1L;

  /**
   * 主键
   */
  private Long id;

  /**
   * 事件标识
   */
  private String eventId;

  /**
   * 标记名称-一般为表名或者领域名称
   */
  private String tagName;

  /**
   * 持久化编号-每张表的主键或者领域标识
   */
  private String persistenceId;

  /**
   * 事件类型
   */
  private String eventType;

  /**
   * 事件实体
   */
  private byte[] eventBody;

  /**
   * 事件版本
   */
  private Integer eventVersion;

  /**
   * 事件发送者名称
   */
  private String sender;

  /**
   * 事件发送者ID
   */
  private Long senderId;

  /**
   * 事件状态
   */
  private Integer eventState;

}

构建Command和Event

首先抽象command命令对象

点击查看代码
public abstract class AbstractDomainCommand implements DomainCommand {
  @Default
  private Date occurredOn = new Date();//发起时间
  private UserId commder;//发起用户
}

interface DomainCommand {

  Date occurredOn();

  UserId commder();
}

接下来,我们以一个商品销项税的变更举例,销项税变更的ProductTaxCodeUpdateCommand对象

点击查看代码

public class ProductTaxCodeUpdateCommand extends AbstractDomainCommand {
  private ProductId productId;//商品实体ID类
  private String productTaxCode;//税码如:13
  public ProductTaxCodeUpdateEvent toEvent(ProductAggregateRoot product) {
    ProductTaxCodeUpdateEvent event = new ProductTaxCodeUpdateEvent();
    event.eventBody(
        ProductTaxCodeUpdateEventBody
            .builder()
            .persistenceId(product.identify())
            .productTaxCode(product.productTaxCode())
            .build());
    event.sender(this.commder());
    return event;
  }

销项税SalesTax值对象

点击查看代码
//税码接口
public interface Tax {
  default TaxCodeId id() {return null;}
  default String code() {return null;}
  default BigDecimal tax() {return null; }
}
//通用值对象类
public class ValueObject implements Serializable {
  private static final long serialVersionUID = -5984778999781379523L;
}
//税码值对象
public class SalesTax extends ValueObject implements Tax {
  private static final long serialVersionUID = -4283492714059316843L;
  private TaxCodeId id;
  private String code;
  private BigDecimal tax;
}

税码更新事件ProductSaleTaxUpdatedEvent对象及ProductTaxCodeUpdateEventBody

点击查看代码
//领域事件接口
public interface DomainEvent<EventBody extends IEventBody> extends Serializable{

  DomainEventId eventId();

  String tag();

  String eventType();

  EventBody eventBody();

  Date occurredOn();

  UserId sender();

  Integer eventVersion();
}
public class ProductTaxCodeUpdateEvent extends ProdcutEvent<ProductTaxCodeUpdateEventBody> implements DomainEvent {
  private static final long serialVersionUID = -4770564663557952597L;
}
public class ProductTaxCodeUpdateEventBody extends ProdcutEventBody implements IEventBody{
  private String productTaxCode;
}

构建聚合根对象

商品聚合根对象ProductAggregateRoot

点击查看代码
//聚合根抽象父类
public abstract class AggregateRoot<DomainIdentifyType extends DomainIdentify> implements
    Serializable, IVersion {
  private DomainIdentifyType identify;//领域实体ID对象
  private Integer version;//版本
  private UserId holder;//操作用户
  }
  //商品聚合根对象
  public static class ProductAggregateRoot extends AggregateRoot<ProductId> {
         /**
     * 产品编码
     */
    private String productCode;
    /**
     * 产品名称
     */
    private String productName;
	 /**
     * 产品税务编码
     */
    private String productTaxCode;
	....
	    /**
     * 品牌编号,来自产品配置-品牌管理
     */
    private BrandId brandId;
	
    /**
     * 备案税率
     */
    private SalesTax salesTax;
	....
  }

构建领域服务

商品领域服务ProductService对象

点击查看代码
public final class ProductService {
   @autoAuired
   private  ProductDomainRepository repository;
   @autoAuired
   private IMediator mediator;
   ....
   //通过商品ID获取聚合根对象Option包装
   private Option<ProductAggregateRoot> getState(ProductId id) {
     return repository.getById(id);
   }
   //更新商品聚合根对象
   public static Try<ProductAggregateRoot> updateProductTaxCode(ProductAggregateRoot product
      , String productTaxCode) {
    return Try.success(product.toBuilder()
        .productTaxCode(productTaxCode)
        .build());
  }
 //聚合根操作用户持有者(通用方法)
 public static AggregateRoot build(AggregateRoot root, UserId holder) {
    return root.toBuilder().holder(holder).build();
  }
   //商品销项税变更
   public void when(ProductSaleTaxUpdateCommand aCommand) {
    getState(aCommand.productId())
        .peek(product -> {
          updateSalesTax(product, aCommand.salesTax())
              .map(product -> build(product, aCommand.commder()))
              .andThen(repository::update)
              .andThen(product -> mediator.saveAndPublish(aCommand.toEvent(product)))
              .onFailure(ThrowableConsumer.COMMON);
        });
  }
   ....

}

事件存储和发送

事件存储值对象StoredEvent及仓库接口IEventStore

点击查看代码
/**
 * 事件存储对象
 */
@Data
public class StoredEvent {

  /**
   * 事件标识
   */
  private String eventId;

  /**
   * 标记名称-一般为表名或者领域名称
   */
  private String tagName;

  /**
   * 持久化编号-每张表的主键或者领域标识
   */
  private String persistenceId;

  /**
   * 事件类型
   */
  private String eventType;

  /**
   * 事件实体
   */
  private byte[] eventBody;

  /**
   * 事件版本
   */
  private Integer eventVersion;

  private String sender;

  private Long senderId;

  private Date storedDate;

}

//事件持久化仓储接口
public interface IEventStore {
  void appand(DomainEvent event);
  .....
 }

IMediator接口是实现事件存储和事件发布中间协调者。

点击查看代码 //事件存储和发布接口 public interface IMediator { void publish(DomainEvent event); void saveAndPublish(DomainEvent event); } public class DefaultMediator implements IMediator { @autowired private final IEventStore eventStore;//事件存储仓库接口 @autowired private IRocketMQNotificationTemplate rocketMQNotificationTemplate;//RocketMQ实现 @Override public void saveAndPublish(DomainEvent event) { notificationRepository.appand(domainEvent);//事件追加 publishNotification(domainEvent);//事件发布 } @Override public void publish(DomainEvent event) { publishNotification(event); } private void publishNotification(DomainEvent event) { try{ SendResult sr = rocketMQNotificationTemplate.syncSendOrderly(event, String.format("%s-%s", event.getTagName(), event.getPersistenceId()), event.getEventId()); if(sr.getSendStatus().equals(SendStatus.SEND_OK)){ //事件状态更新 eventStore.updateEventStateComplete(Lists.newArrayList(event.getNotificationId())); } }catch (Exception e){ throw new RetryException("消息发送失败!注意重试幂等处理!eventId="+event.getEventId()); } } } ```

事件监听