跳到主要内容

Domain-event 领域事件

介绍

Disruptor 是一个开源的并发框架,也可以将 Disruptor 理解为轻量级单机最快 MQ。

由英国外汇交易公司LMAX开发的一个高性能队列,并且大大的简化了并发程序开发的难度,获得 2011Duke’s 程序框架创新奖。

领域事件模块是对 disruptor 的封装使用。 通过领域事件模块,可为你的系统实现类似 Guava-EventBus、Spring 事件驱动模型 ApplicationEvent、业务解耦、规避并发、不阻塞主线程...等,各种浪操作。


领域事件特点

  1. 领域驱动设计,基于LMAX架构。
  2. 单一职责原则,可以给系统的可扩展、高伸缩、低耦合达到极致。
  3. 异步高并发、线程安全的,使用 disruptor 环形数组来消费业务。
  4. 使用事件消费的方式编写代码,即使业务在复杂也不会使得代码混乱,维护代码成本更低
  5. 可灵活的定制业务线程模型。
  6. 插件形式提供事件领域,做到了可插拔,就像玩乐高积木般有趣。

性能数据

see https://lmax-exchange.github.io/disruptor/user-guide/index.html

Multiple Producer
Run 0Disruptor=26,553,372 ops/sec
Run 1Disruptor=28,727,377 ops/sec
Run 2Disruptor=29,806,259 ops/sec
Run 3Disruptor=29,717,682 ops/sec
Run 4Disruptor=28,818,443 ops/sec
Run 5Disruptor=29,103,608 ops/sec
Run 6Disruptor=29,239,766 ops/sec
Single Producer
Run 0Disruptor=89,365,504 ops/sec
Run 1Disruptor=77,579,519 ops/sec
Run 2Disruptor=78,678,206 ops/sec
Run 3Disruptor=80,840,743 ops/sec
Run 4Disruptor=81,037,277 ops/sec
Run 5Disruptor=81,168,831 ops/sec
Run 6Disruptor=81,699,346 ops/sec

使用场景

场景 1

假设有这么一种业务场景,业务为【用户注册】处理完后, 同时触发【邮件通知】业务、【赠送积分】业务的执行,在不利用 MQ 的情况下,会有什么样的解决思路?

可能的解决思路有如下

  1. 业务【用户注册】处理后,开启线程处理【邮件通知】、【赠送积分】的业务
  2. 使用 disruptor 进行处理
  3. 生产消费者模式
  4. 观察者模式 ... 解决的思路有很多。

这里用 disruptor 实现类似 Spring ApplicationEvent 事件驱动模型 ,这里称为领域事件。 事件驱动模式与观察者模式在某些方面极为相似,当一个主体发生改变时,所有依属体都得到通知。 不过,观察者模式与单个事件源关联,而事件驱动模式则可以与多个事件源关联。



场景 2

一个领域事件可以看成是一个线程,计算密集型的业务推荐使用。 比如斗地主中的出牌,所有牌桌可使用同一个领域事件来处理,当牌局结算涉及 IO 业务时, 将结算业务放到其他线程处理,这样就不会阻塞出牌业务线程。

如何安装

由于领域事件是单独的、按需选择的功能模块,使用时需要在 pom.xml 中引入

see https://central.sonatype.com/artifact/com.iohao.game/light-domain-event

<dependency>
<groupId>com.iohao.game</groupId>
<artifactId>light-domain-event</artifactId>
<version>${ioGame.version}</version>
</dependency>

在使用的角度,只需要关注两件事

  1. 定义业务数据载体(领域消息)
  2. 处理该业务数据载体的事件 (领域事件)

定义领域消息

  1. 定义领域实体 - 并实现 Eo 接口
  2. Eo 接口是框架提供,领域消息由开发者自定义, 建议以 Eo 结尾.
public record StudentEo(int id) implements Eo {
}

定义领域事件

定义领域事件需要实现 DomainEventHandler 接口。 用于处理领域消息,一个事件消费类只处理一件事件(单一职责原则)。

public final class StudentEmailEventHandler1 implements DomainEventHandler<StudentEo> {
@Override
public void onEvent(StudentEo studentEo, boolean endOfBatch) {
log.info("Send email: {}", studentEo);
}
}

测试

  • code 7,停止领域事件。
  • code 13,配置领域事件 - 给学生发生一封邮件。
  • code 14,配置领域事件 - 回家。
  • code 15,配置领域事件 - 让学生睡觉。
  • code 18,启动领域事件。
  • code 23,发送事件。
public class StudentDomainEventTest {

DomainEventContext domainEventContext;

@After
public void tearDown() throws Exception {
domainEventContext.stop();
}

@Before
public void setUp() {
DomainEventContextParam contextParam = new DomainEventContextParam();
contextParam.addEventHandler(new StudentEmailEventHandler1());
contextParam.addEventHandler(new StudentGoHomeEventHandler2());
contextParam.addEventHandler(new StudentSleepEventHandler3());

domainEventContext = new DomainEventContext(contextParam);
domainEventContext.startup();
}

@Test
public void testEventSend() {
new StudentEo(1).send();
}
}
提示
  • 发送事件、上面只配置了一个事件。
  • 如果将来还需要记录学生今天上了什么课程,那么也是直接配置 (可扩展),这里的业务代码无需任何改动(松耦合)
  • 如果将来又不需要给学生发送 Email 的事件了,直接删除配置即可,这里还是无需改动代码。(高伸缩)
new StudentEo(1).send();

为第三方库提供领域事件

我们可以为第三方库的类提供领域事件,假设 UserLogin 是第三方库的类。

public record UserLogin(int id, String name) {
}

下面是我们编写的类,用于处理 UserLogin

...
@Test
public void testEventSend() {
var userLogin = new UserLogin(101, "Michael Jackson");
DomainEventPublish.send(userLogin);
}

public class UserLoginEmailEventHandler implements DomainEventHandler<UserLogin> {
@Override
public void onEvent(UserLogin userLogin, boolean endOfBatch) {
log.info("{}", userLogin);
}
}