跳到主要内容

分布式事件总线

介绍

分布式事件总线是 ioGame 提供的通讯方式之一。 该通讯方式与 Guava EventBus、Redis 发布订阅、MQ ... 等产品类似。

如果使用 Redis、MQ ...等中间件,需要开发者额外的安装这些中间件,并支付所占用机器的费用。 使用 Guava EventBus 则只能在当前进程中通信,无法实现跨进程。

而 ioGame 提供的分布式事件总线,拥有上述两者的优点。 此外,还可以有效的帮助企业节省云上 Redis、 MQ 这部分的支出。

事件发布后,除了当前进程所有的订阅者能接收到,远程的订阅者也能接收到(支持跨机器、跨进程、跨不同类型的多个逻辑服)。 可以代替 redis pub sub 、 MQ ,并且具备全链路调用日志跟踪,这点是中间件产品做不到的。


ioGame 分布式事件总线,特点

  • 使用方式与 Guava EventBus 类似
  • 具备全链路调用日志跟踪。(这点是中间件产品做不到的)
  • 支持跨多个机器、多个进程通信
  • 支持与多种不同类型的多个逻辑服通信
  • 纯 javaSE,不依赖其他服务,耦合性低。(不需要安装任何中间件)
  • 事件源和事件监听器之间通过事件进行通信,从而实现了模块之间的解耦
  • 当没有任何远程订阅者时,将不会触发网络请求。(这点是中间件产品做不到的)

虽然分布式事件总线强调了跨进程、跨机器的能力,实际上同进程中的订阅者也是生效的。 值得称赞的是,如果没有任何远程订阅者,将不会触发网络请求。 简单的说,当事件发布后,当其他进程(其他机器)没有相关订阅者时,只会在内存中传递事件给当前进程的相关订阅者。 所以,你可以将该通讯方式当作 Guava EventBus 来使用。

使用场景

关于使用场景可以搜索一些 Guava EventBus 相关的内容。 分布式事件总线的强大之处在于事件发布后,即使订阅者在不同的机器、不同的进程、不同的逻辑服中,都能接收到事件源。


场景-1

玩家登录后,发布一个登录事件。其他进程的游戏逻辑服中的订阅者接收到该事件后,可以做一些业务,如

  • 记录玩家登录时间。
  • 根据当前时间与上次登录的时间做一些奖励计算,如离线奖励之类的。
  • 其他各种

场景-2

比如我们将统计信息相关的单独放到一个【统计信息收集逻辑服】中,并添加相关订阅者。

ioGame 中内置了一些统计插件,开发者收集好这些统计信息后,可以将统计信息以事件的方式发布。 无论【统计信息收集逻辑服】在什么地方,都能接收到该事件的统计信息。 接收到统计信息后就可以将这些数据入库或记录到日志中。

Example Source Code

see https://github.com/iohao/ioGameExamples

path : SimpleExample/example-multiple-eventbus

  • EventbusOneApplication (启动游戏对外服、Broker 游戏网关、User 逻辑服)
  • EmailAnyApp1 和 EmailAnyApp2 (启动两个 Email 逻辑服,用于测试 fireAny)
  • EventbusOneClient (模拟游戏客户端)

启动后的效果

启动上述内容后,会有 4 个进程。我们可以在 EventbusOneClient 的控制台中发送一些模拟客户请求,来观察服务器内容。

An


模拟客户端

模拟客户端示例提供了多个请求命令,开发者可输入对应的命令来测试。 比如输入 1-3,会触发 fireEventAny,事件总线只会给其中一个 email 逻辑服发送事件。

An

概念介绍

ioGame 提供的分布式事件总线在使用上是简单的,只有 3 个简单的概念,分别是

  • EventSource : 事件源
  • Subscriber : 订阅者
  • Fire : 发布事件

定义事件源

事件源由开发者定义,事件源是业务数据载体等,其主要目的是用于业务数据的传输。

public class UserLoginEventMessage implements Serializable {
final long userId;
public UserLoginEventMessage(long userId) {
this.userId = userId;
}
}
注意

事件源需要实现 Serializable 接口

发布事件

作用:将事件源发送给相关的订阅者

@ActionController(UserCmd.cmd)
public class UserAction {
@ActionMethod(UserCmd.fireEvent)
public void fireEventUser(FlowContext flowContext) {
long userId = flowContext.getUserId();
var message = new UserLoginEventMessage(userId);
flowContext.fire(message);
}
}

订阅者

订阅者由开发者定义,作用是接收事件源,并处理业务逻辑。

框架为订阅者提供了两个注解标记,分别是:

  1. EventBusSubscriber ,作用在类上(该注解不是必须的)。
  2. EventSubscribe ,作用在方法上。

我们只需要在方法上添加 EventSubscribe 注解,就表示该方法是一个订阅者。

@Slf4j
@EventBusSubscriber
public class EmailEventBusSubscriber {
@EventSubscribe
public void mail(UserLoginEventMessage message) {
long userId = message.getUserId();
log.info("event - 玩家[{}]登录,发放 email 奖励", userId);
}
}

@Slf4j
@EventBusSubscriber
public class UserEventBusSubscriber {
@EventSubscribe
public void userLogin(UserLoginEventMessage message) {
log.info("event - 玩家[{}]登录,记录登录时间", message.getUserId());
}
}

我们在 EmailEventBusSubscriber、UserEventBusSubscriber 类中,分别提供了 UserLoginEventMessage 事件源的订阅者。

当有 UserLoginEventMessage 相关的事件触发,订阅者都能接收到事件。 别忘记,当前介绍的是分布式事件总线。 所以,即使订阅者在不同的进程中,也能接收到事件。

订阅者类上的 EventBusSubscriber 注解不是必须的(即使不添加也不影响)。 该注解只是起到一个标记作用,目的是结合开发工具后,能快速找到所有的订阅者类。

在 EmailEventBusSubscriber 订阅者类中,虽然只定义了一个订阅者,如果有需要,我们可以在该类中创建多个订阅者。

提示

订阅者的创建规则

  1. 方法必须是 public void
  2. 订阅者必须且只能有一个参数,用于接收事件源。
  3. 方法需要添加 EventSubscribe 注解

开启事件总线

提示

EventBus(事件总线)、业务框架、逻辑服三者是 1:1:1 的关系。

通过业务框架的 addRunner 方法来添加分布式事件总线(Runner 机制),我们将 EmailEventBusSubscriber 注册到 EventBus 中。

EmailEventBusSubscriber 是订阅者类,EventBus 是支持注册多个订阅者类的。

  • code 8 ~ 14,是关键代码。启用 EventBus,并将 EmailEventBusSubscriber 注册到 EventBus 中。
public class EmailLogicStartup extends AbstractBrokerClientStartup {
...
@Override
public BarSkeleton createBarSkeleton() {
BarSkeletonBuilder builder = ...

// add EventBusRunner
builder.addRunner(new EventBusRunner() {
@Override
public void registerEventBus(EventBus eventBus, BarSkeleton skeleton) {
// register EmailEventBusSubscriber
eventBus.register(new EmailEventBusSubscriber());
}
});

return builder.build();
}
}
注意

即使你的逻辑服没有任何的订阅者,只是发送事件,也需要配置 EventBusRunner ,因为事件总线是按需要加载的功能。

按需加载有很多好处,比如 email 逻辑服后续的业务不想参与任何订阅了,那么把这个 Runner 注释掉就行了。 其他代码不用改,这样也不会占用资源。

发布事件的方法

对于发布事件,EventBus 提供了同步、异步的发布方法。

AsyncSyncDescription
firefireSync发送事件给订阅者,这些订阅者包括 1. 给当前进程所有逻辑服的订阅者发送事件消息 2. 给其他进程的订阅者发送事件消息
fireLocalfireLocalSync给当前进程所有逻辑服的订阅者发送事件消息
fireMefireMeSync仅给当前 EventBus 的订阅者发送事件消息
fireAnyfireAnySync发送事件给订阅者,这些订阅者包括 1. 给当前进程所有逻辑服的订阅者发送事件消息 2. 给其他进程的订阅者发送事件消息 当有同类型的多个逻辑服时,只会给同类型其中的一个逻辑服发送事件。

fireMe

fireMe 仅给当前 EventBus 的订阅者发送事件消息。

每个逻辑服有自己的 EventBus 对象,当只想给当前逻辑服的订阅者发送事件时,那么此方法就能派上用场了。

public void demo() {
FlowContext flowContext = ...
var message = new UserLoginEventMessage(userId);
flowContext.fireMe(message);
flowContext.fireMeSync(message);
}

fireLocal

fireLocal 给当前进程所有逻辑服的订阅者发送事件消息,不会给其他进程的逻辑服发送事件。

举例:如果用户逻辑服、邮件逻辑服是在同一进程中启动的,那么 fireLocal 会给这两个逻辑服发送事件。

public void demo() {
FlowContext flowContext = ...
var message = new UserLoginEventMessage(userId);
flowContext.fireLocal(message);
flowContext.fireLocalSync(message);
}

fire

fire 方法发布事件后,无论是当前进程的订阅者,还是其他机器的订阅者都能接收到。

public void demo() {
FlowContext flowContext = ...
var message = new UserLoginEventMessage(userId);
flowContext.fire(message);
flowContext.fireSync(message);
}
注意

虽然 fireSync 提供了同步的方法,但这里的同步仅指当前进程订阅者的同步,对其他进程中的订阅者无效(处理远程订阅者使用的是异步处理)。

所以,当调用 fireSync 方法时, 会使用同步阻塞的方式的给当前进程所有逻辑服的订阅者发送事件消息,并等待订阅者执行完成, 并以异步的方式给其他进程的订阅者发送事件消息。

fireAny

fireAny 方法发布事件后,无论是当前进程的订阅者,还是远程进程的订阅者都能接收到。 当有同类型的多个逻辑服时,只会给同类型其中的一个逻辑服发送事件。

举例: 假设现在有一个发放奖励的邮件逻辑服,我们启动了两个(或者说多个)邮件逻辑服实例来处理业务。 当我们使用 fireAny 方法发送事件时,只会给其中一个实例发送事件。

public void demo() {
FlowContext flowContext = ...
var message = new UserLoginEventMessage(userId);
flowContext.fireAny(message);
flowContext.fireAnySync(message);
}

小结 - 事件发布

对于事件的发布,提供了多种

  1. fireMe 当前 EventBus 中的订阅者。
  2. fireLocal 当前进程中的所有订阅者。
  3. fire 当前进程和远程进程中所有的订阅者。
  4. fireAny 当前进程和远程进程中所有的订阅者。即使同类型逻辑服启动了多个实例,也只会给其中一个实例发送事件。

这些发布事件的方法有对应同步方法,fireMeSync、fireLocalSync、fireSync、fireAnySync, 同步系列方法会在当前线程中执行订阅者中的业务。

EventBus

提示

EventBus(事件总线)、业务框架、逻辑服三者是 1:1:1 的关系。

上面的关于事件发布,我们是通过 FlowContext 来完成的,方法内部最终还是通过 EventBus 来完成的。


获取 EventBus 实例

  1. 通过 FlowContext 获取对应的 eventBus
  2. 通过逻辑服 id 获取对应的 eventBus
// example-1
EventBus eventBus = flowContext.getEventBus();

// example-2
BrokerClientContext brokerClientContext = flowContext.getBrokerClientContext();
String id = brokerClientContext.getId();
EventBus eventBus = EventBusRegion.getEventBus(id);

// example-3
BarSkeleton barSkeleton = ...
EventBus eventBus = barSkeleton.option(SkeletonAttr.eventBus);

提示:除了上述介绍的获取方式外,开发者还可以在游戏逻辑服初始化时,自己保存一下引用

public class MyKit {
public static EventBus eventBus;
}

public BarSkeleton createBarSkeleton() {
var builder = ...

builder.addRunner(new EventBusRunner() {
@Override
public void registerEventBus(EventBus eventBus, BarSkeleton skeleton) {
MyKit.eventBus = eventBus;
}
});
}
提示

EventBus 与 FlowContext 如何选择?

通常情况下,如果发布的事件与玩家业务相关的,推荐使用 FlowContext 中提供的事件发布方法。 因为 FlowContext 发布事件时,具备全链路调用日志跟踪。

与玩家无关的业务,可以使用 EventBus。

订阅者线程执行器

提示

该小节的内容不是必须的,只有一些特殊业务才用得上,比如线程编排相关的、订阅者执行顺序编排等。

ioGame 分布式事件总线支持线程执行器的选择策略,订阅者可以指定由哪个线程执行器来消费业务。


分布式事件总线提供的线程执行器策略有以下几种

  1. userExecutor (使用 UserThreadExecutorRegion 线程执行器)【默认策略】
  2. userVirtualExecutor (使用 UserVirtualThreadExecutorRegion 线程执行器)
  3. methodExecutor(使用 SimpleThreadExecutorRegion 线程执行器)
  4. simpleExecutor(使用 SimpleThreadExecutorRegion 线程执行器)
  5. customExecutor(使用 SimpleThreadExecutorRegion 线程执行器)

UserVirtualThreadExecutorRegion、 UserThreadExecutorRegion、 SimpleThreadExecutorRegion 是框架内置的线程执行器。


之前的内容已经介绍过 EventSubscribe 注解了,当方法上添加了该注解时,就表示该方法是一个订阅者。

EventSubscribe 提供了两个可选属性,分别是

  1. value,执行器选择策略,默认值是 userExecutor
  2. order,订阅者的执行顺序,值越大执行优先级越高。
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface EventSubscribe {

ExecutorSelector value() default ExecutorSelector.userExecutor;

int order() default 0;
}

userExecutor 策略(默认)

该策略是线程安全的,userExecutor 是默认策略, 该策略会使用 UserThreadExecutorRegion 用户线程执行器来执行订阅者业务, 内部通过 userId 来得到对应的线程执行器。

该策略将使用 action 的线程执行器,可确保同一个用户在消费事件和消费 action 时, 使用的是相同的线程执行器,以避免并发问题。

注意

不要在此执行器中做耗时的业务,以免阻塞 action 的消费。


什么场景下使用该策略呢?

如果订阅者中的业务是与用户相关的,并且存在并发问题的,可以使用该策略。 当使用该策略后,该订阅者将与用户的 action 在同一线程中消费,以避免并发问题。

@EventBusSubscriber
public class UserEventBusSubscriber {
@EventSubscribe
public void userLogin(UserLoginEventMessage message) {
// your biz code
}
}

userVirtualExecutor 策略

该策略会使用 UserVirtualThreadExecutorRegion(虚拟线程执行器)执行器来执行订阅者业务,内部通过 userId 来得到对应的虚拟线程执行器。


什么场景下使用该策略呢?

有耗时相关的业务可选择此策略,如 DB 入库、IO ...等相关耗时操作。

@EventBusSubscriber
public class EmailEventBusSubscriber {
@EventSubscribe(ExecutorSelector.userVirtualExecutor)
public void mail(UserLoginEventMessage message) {
// your biz code
}
}

methodExecutor 策略

该策略是线程安全的,该策略会使用 SimpleThreadExecutorRegion 线程执行器来执行订阅者业务, 框架会为每个订阅者分配一个唯一 id,内部通过该唯一 id 来得到对应的线程执行器。

注意

这里的线程安全指的是以订阅者方法来划分的(即添加了 EventSubscribe 注解的方法),相同的订阅者总是使用相同的线程执行器来执行订阅者业务。

下面的示例代码中订阅者,无论由哪个用户触发,订阅者都将在同一个线程中执行。

@EventBusSubscriber
public class EmailEventBusSubscriber {
@EventSubscribe(ExecutorSelector.methodExecutor)
public void methodExecutor(EmailMethodEventMessage message) {
// your biz code
}
}

simpleExecutor 策略

该策略是线程安全的,该策略与 userExecutor 类似,但使用的是独立的线程执行器(SimpleThreadExecutorRegion)。 该策略会使用 SimpleThreadExecutorRegion 线程执行器来执行订阅者业务,内部通过 userId 来得到对应的线程执行器。

customExecutor 策略

customExecutor 是预留给开发者的,默认情况下与 simpleExecutor 策略一致。

提示

如果上述策略都不能满足业务的,开发者可以通过实现 SubscribeSelectorStrategy 接口来做自定义扩展。

builder.addRunner(new EventBusRunner() {
@Override
public void registerEventBus(EventBus eventBus, BarSkeleton skeleton) {
eventBus.setSubscribeSelectorStrategy(new YourSubscribeSelectorStrategy());
}
});

订阅者执行顺序

同一个事件源可能会被多个订阅者订阅,某些场景下,订阅者之间可能存在着执行顺序优先级的问题,EventSubscribe 注解支持优先级的设置。

下面根据注解 EventSubscribe order 属性来设置优先级。

@EventBusSubscriber
public class EmailOrderEventBusSubscriber {
@EventSubscribe(order = 1, value = ExecutorSelector.simpleExecutor)
public void order1(EmailOrderEventMessage message) {
log.info("order1 [{}]", message.userId);
}

@EventSubscribe(order = 2, value = ExecutorSelector.simpleExecutor)
public void order2(EmailOrderEventMessage message) {
log.info("order2 [{}]", message.userId);
}

@EventSubscribe(order = 3, value = ExecutorSelector.simpleExecutor)
public void order3(EmailOrderEventMessage message) {
log.info("order3 [{}]", message.userId);
}
}

public void demo() {
FlowContext flowContext = ...
var message = new EmailOrderEventMessage(userId);
flowContext.fire(message);
}

public class EmailOrderEventMessage implements Serializable {
public final long userId;

public EmailOrderEventMessage(long userId) {
this.userId = userId;
}
}

小结 - 执行器策略

ioGame 分布式事件总线提供了多种线程执行器选择策略,开发者可根据自身业务,为订阅者选择合适的策略。 如果你对线程知识不是很熟悉的,只需要记住 userVirtualExecutor、userExecutor 这两种策略就可以了。

userVirtualExecutor 是虚拟线程执行器策略,主要用于处理耗时 io 相关的业务。 如果业务确定与用户无关的,可无脑选择该策略。 如果业务与用户相关的,且逻辑是直接入库的,也可以使用该策略。

userExecutor 则是同一用户使用的是同一个线程执行器,也是执行 action 业务的执行器。 订阅者使用该策略,相当于在一个线程中执行用户的所有业务。

监听 EventBusListener

EventBusListener 监听接口可以处理一些特殊业务,如异常、空订阅者...等。

  • invokeException 方法,当订阅者在消费事件源时,如果触发了异常就会进入此方法。
  • emptySubscribe 方法,当事件源发布时,如果没有任何订阅者订阅该事件源就会进入此方法。

关于 emptySubscribe 方法,这里说个小案例。 比如现在我们有一个独立的奖励逻辑服,并且订阅了登录事件源。 玩家在登录(登录逻辑服)时会发布一个事件源。 此时,如果奖励逻辑服如果没有上线,或者说正好下线了,那么就会进入 emptySubscribe 方法(因为没有任何订阅者订阅该事件)。

开发者可以利用该特性来做个记录,并在后续处理该事件。

final class MyEventBusListener implements EventBusListener {
@Override
public void invokeException(Throwable e, Object eventSource, EventBusMessage eventBusMessage) {
log.error(e.getMessage(), e);
}

@Override
public void emptySubscribe(EventBusMessage eventBusMessage, EventBus eventBus) {
Class<?> clazz = eventBusMessage.getTopicClass();
String simpleName = eventBusMessage.getTopic();
log.warn("[No subscribers: {} - {}", clazz.getSimpleName(), simpleName);
}
}

设置自定义监听

BarSkeletonBuilder builder = ...;

builder.addRunner(new EventBusRunner() {
@Override
public void registerEventBus(EventBus eventBus, BarSkeleton skeleton) {
eventBus.setEventBusListener(new YourEventBusListener());
}
});

小结

ioGame 提供的分布式事件总线在使用上是简单的,只有 3 个简单的概念,分别是

  • EventSource : 事件源
  • Subscriber : 订阅者
  • fire : 发布事件

ioGame 分布式事件总线,特点

  • 使用方式与 Guava EventBus 类似
  • 具备全链路调用日志跟踪。(这点是中间件产品做不到的)
  • 支持跨多个机器、多个进程通信
  • 支持与多种不同类型的多个逻辑服通信
  • 纯 javaSE,不依赖其他服务,耦合性低。(不需要安装任何中间件)
  • 事件源和事件监听器之间通过事件进行通信,从而实现了模块之间的解耦
  • 当没有任何远程订阅者时,将不会触发网络请求。(这点是中间件产品做不到的)

虽然分布式事件总线强调了跨进程、跨机器的能力,实际上同进程中的订阅者也是生效的。 值得称赞的是如果没有任何远程订阅者,将不会触发网络请求。 简单的说,事件发布后,当其他进程(其他机器)没有相关订阅者时,只会在内存中传递事件给当前进程的相关订阅者。 所以,可以将该通讯方式当作 guava EventBus 来使用。

EventBus 为事件发布提供了 fireMe、fireLocal、fire、fireAny 等同步、异步的方法。

订阅者支持线程执行器选择策略和执行优先级的设置。 默认情况下,使用的是用户线程执行器策略。 订阅者的执行顺序(优先级),值越大执行优先级越高。

提示

想要确保按顺序执行,订阅者需要使用相同的线程执行器。 比如可以搭配 userExecutor、simpleExecutor 等策略来使用。

注意事项

由于自定义的事件源实现了 Serializable 接口,所以我们需要将自定义事件源放到一个公共包中, 之后在游戏对外服、Broker(游戏网关)、游戏逻辑服的 pom.xml 中添加这个公共包。

see https://github.com/iohao/ioGameExamples/tree/main/SimpleExample/example-multiple-eventbus

  • the-eventbus-broker/pom.xml
  • the-eventbus-external/pom.xml

与 Spring 结合

提示

如果你的订阅者类是由 Spring 管理的,可以先从 Spring ApplicationContext 中得到该订阅者类后,再将该类注册到 EventBus 中。

see https://github.com/iohao/ioGame/issues/67

更多案例

上面的文档中,我们举了一些关于分布式事件总线的使用案例了,但都缺乏一些想象力。 在这小节中,我们将介绍一些更具想象力的使用案例。 这些案例的特别之处在于,我们可以把分布式事件总线用活。


热更

我们知道,当事件 fire 来发布事件时,所有的订阅者都能接收到事件。 利用好这一特性,我们可以批量的为系统做一些热更。

@Data
public class MyHotMessage implements Serializable {
byte[] hotCode;
}

@EventBusSubscriber
public class HotEventBusSubscriber {
@EventSubscribe(ExecutorSelector.userVirtualExecutor)
public void hot(MyHotMessage message) {
byte[] hotCode = message.getHotCode();
...
}
}

当我们需要热更某些业务时,可以通过 GM 后台来发布一个 HotMessage 事件源,hotCode 存放的是需要热更的字节码(提前编译好的 class)。 因为我们是通过 fire 来发布的事件,那么所有的相关订阅者都能接收到事件消息。 此时,即使有数百个游戏逻辑服实例,也能轻松的实现批量更新。


全局配置文件

该案例与热更类似,模拟全局配置的更新。

@Data
public class MyConfigMessage implements Serializable {
... your config property
}

@EventBusSubscriber
public class ConfigEventBusSubscriber {
@EventSubscribe(ExecutorSelector.userVirtualExecutor)
public void updateConfig(MyConfigMessage message) {
...
}
}

可插拔

我们知道,在分布式事件总线中,如果没有任何远程订阅者,将不会触发网络请求。 通过该特点,我们可以做一些特殊业务,比如一些临时活动。

该临时活动的业务内容如下: 活动期间内,玩家每日在线 60 分钟,系统将奖励一些物品。(玩家每次登录做一次检测,即在玩家的登录方法中发布事件)

我们可以为该临时活动单独创建一个逻辑服,即【临时活动逻辑服】。该逻辑服只配置了一些订阅者,订阅玩家登录的事件源。 这里想表达的是,当该临时活动结束后,我们只需要将该逻辑服下线即可,无需做其他更多的工作了。

当【临时活动逻辑服】下线后,就没有任何订阅者了。 也就意味着不会触发网络请求(即使在玩家的登录方法中发布了事件)。 所以,通过分布式事件总线的这一特性,可以让我们的系统具备可插拔的能力。 同时,这也意味着将来某个时间点我们需要重新开启该临时活动,只需要将【临时活动逻辑服】启动即可。

从这里你可以发现,我们只是在登录业务中做了一个埋点(发布事件)。 之后通过启动或关闭【临时活动逻辑服】就能扩展业务,而该逻辑服的启动或关闭都不会影响我们的主业务。 最重要的是,只有在该逻辑服在线时,事件才会发起网络请求。 而当该逻辑服关闭后,将不会触发任何网络请求。 这一点是 redis pub sub、MQ ...等之类的中间件产品所做不到的


小结

最后,发挥你的想象力,把分布式事件总线用活。 只有把东西用活了,才更有意义。

当我们的系统中有了可插拔的能力后,我们可以将更多的功能拆分成一个个的功能逻辑服,以备后续使用。 比如,当触发特殊业务时,临时启动这些功能逻辑服,可使游戏游戏的趣味性得到更多的提升。

统计案例

现在,我们用一个统计分析案例,该案例涉及内容如下

  • EventBus 来发布事件。因为该案例的业务与玩家无关,所以使用了 EventBus。
  • userVirtualExecutor,使用了虚拟线程执行器策略。

场景描述

该场景会收集一些系统信息,我们将展示一个收集 action 访问次数的信息,并通过分布式事件总线将收集的信息发布到数据分析逻辑服中。


创建游戏逻辑服

我们给游戏逻辑服添加了相关的订阅者和两个插件

public class UserLogicStartup extends AbstractBrokerClientStartup {
...
@Override
public BarSkeleton createBarSkeleton() {
var builder = ...

builder.addInOut(new StatActionInOut()).addInOut(new TraceIdInOut());

builder.addRunner(new EventBusRunner() {
@Override
public void registerEventBus(EventBus eventBus, BarSkeleton skeleton) {
eventBus.register(new UserEventBusSubscriber());
MyKit.eventStatActionInOut(skeleton, eventBus, appId, appName);
}
});
}
}

public class EmailLogicStartup extends AbstractBrokerClientStartup {
...
@Override
public BarSkeleton createBarSkeleton() {
var builder = ...

builder.addInOut(new StatActionInOut()).addInOut(new TraceIdInOut());

builder.addRunner(new EventBusRunner() {
@Override
public void registerEventBus(EventBus eventBus, BarSkeleton skeleton) {
eventBus.register(new EmailEventBusSubscriber());
MyKit.eventStatActionInOut(skeleton, eventBus, appId, appName);
}
});
}
}

工具类

工具类的 eventStatActionInOut 方法,则会定时的收集 action 相关调用统计信息。 最后将所有信息收集到 ActionStatEventMessageRegion 事件源中后,通过 EventBus.fire 发布事件。

@UtilityClass
public class MyKit {
public void eventStatActionInOut(BarSkeleton skeleton, EventBus eventBus, String appId, String appName) {
InOutManager inOutManager = skeleton.getInOutManager();

inOutManager.getOptional(StatActionInOut.class).ifPresent(statActionInOut -> {
TaskKit.runInterval(() -> {
StatActionInOut.StatActionRegion statActionRegion = statActionInOut.getRegion();
// Collect Statistics.
// cn: 收集统计数据
var messageList = statActionRegion.stream().map(statAction -> {
ActionStatEventMessage message = new ActionStatEventMessage();

message.setCmdMerge(statAction.getCmdInfo().getCmdMerge());
// Action Execution Count Statistics.
// cn: action 执行次数统计
message.setExecuteCount(statAction.getExecuteCount().sum());
// Action Exception Trigger Count
// cn: action 异常触发次数
message.setErrorCount(statAction.getErrorCount().sum());

// Maximum Time Consumption per Call
// cn: 单次调用中的最大耗时
message.setMaxTime(statAction.getMaxTime());
// Average Time Consumption
// cn: 平均耗时
message.setAvgTime(statAction.getAvgTime());
// Total Time Consumption
// cn: 总耗时
message.setTotalTime(statAction.getTotalTime().sum());

return message;
}).collect(Collectors.toList());

// Statistics Region
// cn: 统计数据 region
ActionStatEventMessageRegion messageRegion = new ActionStatEventMessageRegion();
messageRegion.setAppId(appId);
messageRegion.setAppName(appName);
messageRegion.setMessageList(messageList);

eventBus.fire(messageRegion);

}, 10, TimeUnit.SECONDS);
});
}
}

public class ActionStatEventMessage implements Serializable {
int cmdMerge;
long executeCount;
long totalTime;
long errorCount;
long maxTime;
long avgTime;
}

public class ActionStatEventMessageRegion implements Serializable {
String appId;
String appName;
List<ActionStatEventMessage> messageList;
}

数据分析逻辑服

【数据分析逻辑服】比较简单,添加了一个订阅者,用于接收其他逻辑服发布的事件。 订阅者使用了虚拟线程执行器策略,因为我们需要将数据入库。

public class AnalyseLogicStartup extends AbstractBrokerClientStartup {
...

@Override
public BarSkeleton createBarSkeleton() {
var builder = ...

builder.addInOut(new TraceIdInOut());

builder.addRunner(new EventBusRunner() {
@Override
public void registerEventBus(EventBus eventBus, BarSkeleton skeleton) {
eventBus.register(new AnalyseEventBusSubscriber());
}
});

return builder.build();
}
}

@EventBusSubscriber
public class AnalyseEventBusSubscriber {
@EventSubscribe(ExecutorSelector.userVirtualExecutor)
public void analyseAction(ActionStatEventMessageRegion messageRegion) {
... your db code
}
}

小结

该示例演示了在非用户业务中事件总线的使用案例。 案例中,【user、email 逻辑服】收集统计信息后, 通过分布式事件总线将收集好的数据交给【数据分析逻辑服】来处理, 数据分析逻辑服可以做一些 DB 入库相关的业务,这样游戏逻辑服就没有 DB 相关的操作了。

虽然上面只演示了一个业务,但在实际当中,我们可以将各种需要收集的数据交由【数据分析逻辑服】来集中处理。 也就是说,游戏逻辑服只收集数据,之后通过分布式事件总线将收集好的数据交给专门的【数据分析逻辑服】处理。