EventBus
介绍
分布式事件总线是框架提供的通信方式之一,该通信方式与 Guava EventBus、Redis 发布订阅、MQ ... 等产品类似。
如果使用 Redis、MQ ...等中间件,需要开发者额外的安装这些中间件,并支付所占用机器的费用。 使用 Guava EventBus 则只能在当前进程中通信,无法实现跨进程。 而框架提供的分布式事件总线,拥有上述两者的优点。 此外,还可以有效的帮助企业节省云上 Redis、 MQ 这部分的支出。
事件发布后,除了当前进程所有的订阅者能接收到,远程的订阅者也能接收到(支持跨机器、跨进程、跨不同类型的多个逻辑服)。 可以代替 redis pub sub 、 MQ ,并且具备全链路调用日志跟踪,这点是中间件产品做不到的。
ionet 分布式事件总线,特点
- 使用方式与 Guava EventBus 类似
- 具备全链路调用日志跟踪。(这点是中间件产品做不到的)
- 支持跨多个机器、多个进程通信
- 支持与多种不同类型的多个逻辑服通信
- 纯 javaSE,不依赖其他服务,耦合性低。(不需要安装任何中间件)
- 事件源和事件监听器之间通过事件进行通信,从而实现了模块之间的解耦
- 当没有任何远程订阅者时,将不会触发网络请求。(这点是中间件产品做不到的)
虽然分布式事件总线强调了跨进程、跨机器的能力,实际上同进程中的订阅者也是生效的。 值得称赞的是,如果没有任何远程订阅者,将不会触发网络请求。 简单的说,当事件发布后,当其他进程(其他机器)没有相关订阅者时,只会在内存中传递事件给当前进程的相关订阅者。 所以,你可以将该通信方式当作 Guava EventBus 来使用。
使用场景
关于使用场景可以搜索一些 Guava EventBus 相关的内容。 分布式事件总线的强大之处在于事件发布后,即使订阅者在不同的机器、不同的进程、不同的逻辑服中,都能接收到事件源。
场景-1
玩家登录后,发布一个登录事件。其他进程的逻辑服中的订阅者接收到该事件后,可以做一些业务,如
- 记录玩家登录时间。
- 根据当前时间与上次登录的时间做一些奖励计算,如离线奖励之类的。
- 其他各种
场景-2
比如我们将统计信息相关的单独放到一个【统计信息收集逻辑服】中,并添加相关订阅者。
框架中内置了一些统计插件,开发者收集好这些统计信息后,可以将统计信息以事件的方式发布。 无论【统计信息收集逻辑服】在什么地方,都能接收到该事件的统计信息。 接收到统计信息后就可以将这些数据入库或记录到日志中。
概念介绍
框架提供的分布式事件总线在使用上是简单的,只有 3 个简单的概念,分别是
- EventSource : 事件源
- Subscriber : 订阅者
- Fire : 发布事件
定义事件源
事件源由开发者定义,事件源是业务数据载体等,其主要目的是用于业务数据的传输。
@ProtobufClass
public class UserLoginEventMessage {
public long userId;
public static UserLoginEventMessage of(long userId) {
var message = new UserLoginEventMessage();
message.userId = userId;
return message;
}
}
事件源需要添加 @ProtobufClass 注解,因为可能涉及到远程调用。
订阅者
订阅者由开发者定义,作用是接收事件源并处理业务逻辑。 我们只需要在方法上添加 EventSubscribe 注解,就表示该方法是一个订阅者。
框架为订阅者提供了两个注解标记,分别是:
- EventBusSubscriber ,作用在类上(该注解不是必须的)。
- EventSubscribe ,作用在方法上。
@EventBusSubscriber
public class EmailEventBusSubscriber {
@EventSubscribe
public void mail(UserLoginEventMessage message) {
log.info("EmailEventBus {}", message.userId);
}
}
@EventBusSubscriber
public class UserEventBusSubscriber {
@EventSubscribe
public void userLogin(UserLoginEventMessage message) {
log.info("UserEventBus {}", message.userId);
}
}
我们在 EmailEventBusSubscriber、UserEventBusSubscriber 类中,分别提供了 UserLoginEventMessage 事件源的订阅者。 当有 UserLoginEventMessage 相关的事件触发,相关的订阅者都能接收到事件源。
订阅者类上的 EventBusSubscriber 注解不是必须的,即使不添加也不影响。 该注解只是起到一个标记作用,目的是结合开发工具后,能快速找到所有的订阅者类。
订阅者的创建规则
- 方法必须是 public void。
- 订阅者必须且只能有一个参数,用于接收事件源。
- 方法需要添加 EventSubscribe 注解。
如何使用
EventBus(事件总线)、业务框架、逻辑服三者是 1:1:1 的关系。
通过业务框架的 addRunner 方法来添加分布式事件总线(Runner 机制), EventBus 支持注册多个订阅者,现在我们将 EmailEventBusSubscriber 注册到 EventBus 中。
- code 5~8,启用 EventBus,并将 EmailEventBusSubscriber 注册到 EventBus 中。
public final class EmailLogicServer implements LogicServer {
@Override
public void settingBarSkeletonBuilder(BarSkeletonBuilder builder) {
// add EventBusRunner
builder.addRunner((EventBusRunner) (eventBus, _) -> {
// event bus
eventBus.register(new EmailEventBusSubscriber());
});
}
}
发布事件
作用:将事件源发送给相关的订阅者。
对于发布事件,EventBus 提供了同步、异步的发布方法。
| Async | Sync | Description |
|---|---|---|
| fire | fireSync | 发送事件给订阅者,这些订阅者包括 1. 给当前进程所有逻辑服的订阅者发送事件消息 2. 给其他进程的订阅者发送事件消息 |
| fireLocal | fireLocalSync | 给当前进程所有逻辑服的订阅者发送事件消息 |
| fireMe | fireMeSync | 仅给当前 EventBus 的订阅者发送事件消息 |
| fireAny | fireAnySync | 发送事件给订阅者,这些订阅者包括 1. 给当前进程所有逻辑服的订阅者发送事件消息 2. 给其他进程的订阅者发送事件消息 当有同类型的多个逻辑服时,只会给同类型其中的一个逻辑服发送事件。 |
fireMe 仅给当前 EventBus 的订阅者发送事件消息。 每个逻辑服有自己的 EventBus 对象,当只想给当前逻辑服的订阅者发送事件时,那么此方法就能派上用场了。
public void demo() {
FlowContext flowContext = ...
var message = UserLoginEventMessage.of(userId);
flowContext.fireMe(message);
flowContext.fireMeSync(message);
}
fireLocal 给当前进程所有逻辑服的订阅者发送事件消息,不会给其他进程的逻辑服发送事件。
举例:如果用户逻辑服、邮件逻辑服是在同一进程中启动的,那么 fireLocal 会给这两个逻辑服发送事件。
public void demo() {
FlowContext flowContext = ...
var message = UserLoginEventMessage.of(userId);
flowContext.fireLocal(message);
flowContext.fireLocalSync(message);
}
fire 方法发布事件后,无论是当前进程的订阅者,还是其他机器的订阅者都能接收到。
public void demo() {
FlowContext flowContext = ...
var message = UserLoginEventMessage.of(userId);
flowContext.fire(message);
flowContext.fireSync(message);
}
虽然 fireSync 提供了同步的方法,但这里的同步仅指当前进程订阅者的同步,对其他进程中的订阅者无效(处理远程订阅者使用的是异步处理)。
所以,当调用 fireSync 方法时, 会使用同步阻塞的方式的给当前进程所有逻辑服的订阅者发送事件消息,并等待订阅者执行完成, 并以异步的方式给其他进程的订阅者发送事件消息。
fireAny 方法发布事件后,无论是当前进程的订阅者,还是远程进程的订阅者都能接收到。 当有同类型的多个逻辑服时,只会给同类型其中的一个逻辑服发送事件。
举例: 假设现在有一个发放奖励的邮件逻辑服,我们启动了两个(或者说多个)邮件逻辑服实例来处理业务。 当我们使用 fireAny 方法发送事件时,只会给其中一个实例发送事件。
public void demo() {
FlowContext flowContext = ...
var message = new UserLoginEventMessage(userId);
flowContext.fireAny(message);
flowContext.fireAnySync(message);
}
EventBus
上面的关于事件发布,我们是通过 FlowContext 来完成的,方法内部最终还是通过 EventBus 来完成的。 有时,我们在处理一些特殊业务时,需要通过 EventBus 来完成,下面是关于如何获取 EventBus 对象的示例。
通过 FlowContext
public class EventBusAction {
private void settingEventBus(FlowContext flowContext) {
MyKit.eventBus = flowContext.getEventBus();
}
}
public class MyKit {
public static EventBus eventBus;
}
通过 LogicServer
public final class EmailLogicServer implements LogicServer {
@Override
public void settingBarSkeletonBuilder(BarSkeletonBuilder builder) {
builder.addRunner((EventBusRunner) (eventBus, skeleton) -> {
MyKit.eventBus = = eventBus;
// or
MyKit.eventBus = = skeleton.eventBus;
});
}
}
EventBus 与 FlowContext 如何选择?
通常情况下,如果发布的事件与玩家业务相关的,推荐使用 FlowContext 中提供的事件发布方法。 因为 FlowContext 发布事件时,具备全链路调用日志跟踪。 与玩家无关的业务,可以使用 EventBus。
订阅者线程执行器
该小节的内容不是必须的,只有一些特殊业务才用得上,比如线程编排相关的、订阅者执行顺序编排等。
分布式事件总线支持线程执行器的选择策略,订阅者可以指定由哪个线程执行器来消费业务。
分布式事件总线提供的线程执行器策略有以下几种
- userExecutor (使用 UserThreadExecutorRegion 线程执行器)【默认策略】
- userVirtualExecutor (使用 UserVirtualThreadExecutorRegion 线程执行器)
- methodExecutor(使用 SimpleThreadExecutorRegion 线程执行器)
- simpleExecutor(使用 SimpleThreadExecutorRegion 线程执行器)
- customExecutor(使用 SimpleThreadExecutorRegion 线程执行器)
UserVirtualThreadExecutorRegion、 UserThreadExecutorRegion、 SimpleThreadExecutorRegion 是框架内置的线程执行器。
之前的内容已经介绍过 EventSubscribe 注解了,当方法上添加了该注解时,就表示该方法是一个订阅者。 EventSubscribe 提供了两个可选属性,分别是
- value,执行器选择策略,默认值是
userExecutor。 - order,订阅者的执行顺序,值越大执行优先级越高。
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface EventSubscribe {
ExecutorSelector value() default ExecutorSelector.userExecutor;
int order() default 0;
}
userExecutor Strategy(Default)
该策略是线程安全的,userExecutor 是默认策略, 该策略会使用 UserThreadExecutorRegion 用户线程执行器来执行订阅者业务, 内部通过 userId 来得到对应的线程执行器。
该策略将使用 action 的线程执行器,可确保同一个用户在消费事件和消费 action 时, 使用的是相同的线程执行器,以避免并发问题。
建议使用场景
如果订阅者中的业务是与用户相关的,并且存在并发问题的,可以使用该策略。 当使用该策略后,该订阅者将与用户的 action 在同一线程中消费,以避免并发问题。
@EventBusSubscriber
public class UserEventBusSubscriber {
@EventSubscribe
public void userLogin(UserLoginEventMessage message) {
// your biz code
}
}
不要在此执行器中做耗时的业务,以免阻塞 action 的消费。
userVirtualExecutor Strategy
该策略会使用 UserVirtualThreadExecutorRegion(虚拟线程执行器)执行器来执行订阅者业务,内部通过 userId 来得到对应的虚拟线程执行器。
建议使用场景
有耗时相关的业务可选择此策略,如 DB 入库、IO ...等相关耗时操作。
@EventBusSubscriber
public class EmailEventBusSubscriber {
@EventSubscribe(ExecutorSelector.userVirtualExecutor)
public void mail(UserLoginEventMessage message) {
// your biz code
}
}
methodExecutor Strategy
该策略是线程安全的,该策略会使用 SimpleThreadExecutorRegion 线程执行器来执行订阅者业务。
下面的示例代码中,订阅者无论由哪个用户触发,都将在同一个线程中执行。
@EventBusSubscriber
public class EmailEventBusSubscriber {
@EventSubscribe(ExecutorSelector.methodExecutor)
public void methodExecutor(EmailMethodEventMessage message) {
// your biz code
}
}
这里的线程安全指的是以订阅者方法来划分的,相同的订阅者总是使用相同的线程执行器来执行订阅者业务。
simpleExecutor Strategy
该策略是线程安全的,该策略与 userExecutor 类似,但使用的是独立的线程执行器(SimpleThreadExecutorRegion)。 该策略会使用 SimpleThreadExecutorRegion 线程执行器来执行订阅者业务,内部通过 userId 来得到对应的线程执行器。
customExecutor Strategy
customExecutor 是预留给开发者的自定义策略,默认情况下与 simpleExecutor 策略一致。
如果上述策略都不能满足业务的,开发者可以通过实现 SubscribeExecutorStrategy 接口来做自定义扩展。
builder.addRunner((EventBusRunner) (eventBus, _) -> {
eventBus.setSubscribeExecutorStrategy(new YourSubscribeExecutorStrategy());
});
订阅者执行顺序
同一个事件源可能会被多个订阅者订阅,某些场景下,订阅者之间可能存在着执行顺序优先级的问题,EventSubscribe 注解支持优先级的设置。
下面根据注解 EventSubscribe order 属性来设置优先级,值越大执行优先级越高。
@EventBusSubscriber
public class CustomSubscriber {
@EventSubscribe(order = 1, value = ExecutorSelector.simpleExecutor)
public void myMessage1(MyMessage message) {
log.info("###myMessage1 : {}", message);
}
@EventSubscribe(order = 3, value = ExecutorSelector.simpleExecutor)
public void myMessage3(MyMessage message) {
log.info("###myMessage3 : {}", message);
}
@EventSubscribe(order = 2, value = ExecutorSelector.simpleExecutor)
public void myMessage2(MyMessage message) {
log.info("###myMessage2 : {}", message);
}
}
@ProtobufClass
public class MyMessage {
public String name;
}
EventBusListener
EventBusListener 监听接口可以处理一些特殊业务,如异常、空订阅者...等。
- invokeException 方法,当订阅者在消费事件源时,如果触发了异常就会进入此方法。
- emptySubscribe 方法,当事件源发布时,如果没有任何订阅者订阅该事件源就会进入此方法。
关于 emptySubscribe 方法,这里说个小案例。 比如现在我们有一个独立的奖励逻辑服,并且订阅了登录事件源。 玩家在登录(登录逻辑服)时会发布一个事件源。 此时,如果奖励逻辑服如果没有上线,或者说正好下线了,那么就会进入 emptySubscribe 方法(因为没有任何订阅者订阅该事件)。
开发者可以利用该特性来做个记录,并在后续处理该事件。
final class MyEventBusListener implements EventBusListener {
@Override
public void invokeException(Throwable e, Object eventSource, EventBusMessage eventBusMessage) {
log.error(e.getMessage(), e);
}
@Override
public void emptySubscribe(EventBusMessage eventBusMessage, EventBus eventBus) {
Class<?> clazz = eventBusMessage.getTopicClass();
String simpleName = eventBusMessage.getTopic();
log.warn("[No subscribers: {} - {}", clazz.getSimpleName(), simpleName);
}
}
设置自定义监听
BarSkeletonBuilder builder = ...;
builder.addRunner((EventBusRunner) (eventBus, _) -> {
...
eventBus.setEventBusListener(new MyEventBusListener());
});
小结
ionet 提供的分布式事件总线在使用上是简单的,只有 3 个简单的概念,分别是
- EventSource : 事件源
- Subscriber : 订阅者
- fire : 发布事件
分布式事件总线,特点
- 使用方式与 Guava EventBus 类似
- 具备全链路调用日志跟踪。(这点是中间件产品做不到的)
- 支持跨多个机器、多个进程通信
- 支持与多种不同类型的多个逻辑服通信
- 纯 javaSE,不依赖其他服务,耦合性低。(不需要安装任何中间件)
- 事件源和事件监听器之间通过事件进行通信,从而实现了模块之间的解耦
- 当没有任何远程订阅者时,将不会触发网络请求。(这点是中间件产品做不到的)
虽然分布式事件总线强调了跨进程、跨机器的能力,实际上同进程中的订阅者也是生效的。 值得称赞的是,如果没有任何远程订阅者,将不会触发网络请求。 简单的说,事件发布后,当其他进程(其他机器)没有相关订阅者时,只会在内存中传递事件给当前进程的相关订阅者。 所以,可以将该通信方式当作 guava EventBus 来使用。
EventBus 为事件发布提供了 fireMe、fireLocal、fire、fireAny 等同步、异步的方法。
订阅者支持线程执行器选择策略和执行优先级的设置。 默认情况下,使用的是用户线程执行器策略。 订阅者的执行顺序(优先级),值越大执行优先级越高。
想要确保按顺序执行,订阅者需要使用相同的线程执行器。 比如可以搭配 userExecutor、simpleExecutor 等策略来使用。
与 Spring 结合
如果你的订阅者类是由 Spring 管理的,可以先从 Spring ApplicationContext 中得到该订阅者类后,再将该类注册到 EventBus 中。
更多案例
上面的文档中,我们举了一些关于分布式事件总线的使用案例了,但都缺乏一些想象力。 在这小节中,我们将介绍一些更具想象力的使用案例。 这些案例的特别之处在于,我们可以把分布式事件总线用活。
热更
我们知道,当事件 fire 来发布事件时,所有的订阅者都能接收到事件。 利用好这一特性,我们可以批量的为系统做一些热更。
@ProtobufClass
public class MyHotMessage {
public byte[] hotCode;
}
@EventBusSubscriber
public class HotEventBusSubscriber {
@EventSubscribe(ExecutorSelector.userVirtualExecutor)
public void hot(MyHotMessage message) {
byte[] hotCode = message.hotCode;
...
}
}
当我们需要热更某些业务时,可以通过 GM 后台来发布一个 HotMessage 事件源,hotCode 存放的是需要热更的字节码(提前编译好的 class)。 因为我们是通过 fire 来发布的事件,那么所有的相关订阅者都能接收到事件消息。 此时,即使有数百个逻辑服实例,也能轻松的实现批量更新。
全局配置文件
该案例与热更类似,模拟全局配置的更新。
@ProtobufClass
public class MyConfigMessage {
... your config property
}
@EventBusSubscriber
public class ConfigEventBusSubscriber {
@EventSubscribe(ExecutorSelector.userVirtualExecutor)
public void updateConfig(MyConfigMessage message) {
...
}
}
可插拔
我们知道,在分布式事件总线中,如果没有任何远程订阅者,将不会触发网络请求。 通过该特点,我们可以做一些特殊业务,比如一些临时活动。
该临时活动的业务内容如下: 活动期间内,玩家每日在线 60 分钟,系统将奖励一些物品。(玩家每次登录做一次检测,即在玩家的登录方法中发布事件)
我们可以为该临时活动单独创建一个逻辑服,即【临时活动逻辑服】。该逻辑服只配置了一些订阅者,订阅玩家登录的事件源。 这里想表达的是,当该临时活动结束后,我们只需要将该逻辑服下线即可,无需做其他更多的工作了。
当【临时活动逻辑服】下线后,就意味着没有任何订阅者了。 即使发布了事件,也不会触发任何网络请求。 所以,通过分布式事件总线的这一特性,可以让我们的系统具备可插拔的能力。 如果将来某个时间点我们需要重新开启该临时活动,只需要将该逻辑服启动即可。
从这里你可以发现,我们只是在登录业务中做了一个埋点(发布事件)。 之后通过启动或关闭【临时活动逻辑服】就能扩展业务,而该逻辑服的启动或关闭都不会影响我们的主业务。 最重要的是,只有在该逻辑服在线时,事件才会发起网络请求。 而当该逻辑服关闭后,将不会触发任何网络请求。 这一点是 redis pub sub、MQ ...等之类的中间件产品所做不到的。
小结
最后,发挥你的想象力,把分布式事件总线用活。 只有把东西用活了,才更有意义。
当我们的系统中有了可插拔的能力后,我们可以将更多的功能拆分成一个个的功能逻辑服,以备后续使用。 比如,当触发特殊业务时,临时启动这些功能逻辑服,可使游戏游戏的趣味性得到更多的提升。
Example Source Code
see https://github.com/iohao/ionet-cookbook-code-enterprise
- EnterpriseOneApplication
- EnterpriseOneApplication2
- EnterpriseOneApplication3
- EventBusAction
- EventBusRegion
模拟客户端
模拟客户端示例提供了多个请求命令,开发者可输入对应的命令来测试。 比如输入 20-2,会触发 fireAny,事件总线只会给其中一个 email 逻辑服发送事件。
