FlowContext
介绍
FlowContext 是业务框架 action 流程处理的上下文,生命周期存在于这一次的 action 请求。 如果你之前只接触过 web 开发,可以暂时将其理解为 Cookie。
FlowContext 功能包括但不限于如下:
- userId 的获取。
 - 具备动态属性能力。
 - 提供通信能力,为各通讯方式提供了同步、异步、异步回调 ...等便捷 api, 并具备全链路调用日志跟踪特性。
 - 支持元信息-附加信息的扩展(开发者可扩展玩家数据的存储)。
 - 支持自定义扩展 (开发者可定制符合团队、符合业务的使用 API)。
 - 得到当前 FlowContext 对应的线程执行器,可轻松实现无锁高并发。
 
由于 FlowContext 具备了通信能力,并且提供了同步、异步、异步回调 ...等便捷 api, 并具备全链路调用日志跟踪特性。 所以你可以使用简洁的代码编写跨服访问,或给自己发送数据。
业务框架流程简图
下图是关于 action 的处理流程 , FlowContext 会经过图中的每个环节,如果开发者想对某个环节做一些特殊的定制扩展的,可以配合上动态属性,几乎可以做任意扩展。
如何获取 FlowContext
只需要在 action 的方法参数中定义该变量即可,业务框架会自动识别。
@ActionController(1)
public class DemoAction {
    @ActionMethod(1)
    public void demo(FlowContext flowContext) {
    }
}
userId
玩家登录后,可以通过 flowContext 来获取 userId。
long userId = flowContext.getUserId();
通信能力
FlowContext 内部整合了通讯模型, 并提供了同步、异步、异步回调的便捷使用方式。
更详细的内容,请阅读 javadoc - FlowContext。
request/response,请求/响应
示例代码中提供了同步、异步回调、异步的编码风格,其中,异步回调写法具备全链路调用日志跟踪。
示例方法说明
- invokeModuleMessageSync 同步演示
 - invokeModuleMessageAsync 异步回调演示
 - invokeModuleMessageFuture 异步演示
 
...
void invokeModuleMessageSync() {
    // code - 1
    ResponseMessage responseMessage = flowContext.invokeModuleMessage(cmdInfo);
    RoomNumMsg roomNumMsg = responseMessage.getData(RoomNumMsg.class);
    log.info("{}", roomNumMsg.roomCount);
    // code - 2
    ResponseMessage responseMessage2 = flowContext.invokeModuleMessage(cmdInfo, yourData);
    RoomNumMsg roomNumMsg2 = responseMessage2.getData(RoomNumMsg.class);
    log.info("{}", roomNumMsg2.roomCount);
}
void invokeModuleMessageAsync() {
    // code - 1
    flowContext.invokeModuleMessageAsync(cmdInfo, responseMessage -> {
        RoomNumMsg roomNumMsg = responseMessage.getData(RoomNumMsg.class);
        log.info("{}", roomNumMsg.roomCount);
    });
    // code - 2
    flowContext.invokeModuleMessageAsync(cmdInfo, yourData, responseMessage -> {
        RoomNumMsg roomNumMsg = responseMessage.getData(RoomNumMsg.class);
        log.info("{}", roomNumMsg.roomCount);
    });
}
void invokeModuleMessageFuture() {
    CompletableFuture<ResponseMessage> future;
    try {
        // code - 1
        future = flowContext.invokeModuleMessageFuture(cmdInfo);
        ResponseMessage responseMessage = future.get();
    } catch (InterruptedException | ExecutionException e) {
        throw new RuntimeException(e);
    }
    // code - 2
    future = flowContext.invokeModuleMessageFuture(cmdInfo, yourData);
}
request/void,请求/无响应
无阻塞、适合不需要接收响应的业务。 被请求的 action 通常是声明为 void 的,因为知道是无响应的,所以只提供了无阻塞的调用。
...
void invokeModuleVoidMessage() {
    // code - 1
    flowContext.invokeModuleVoidMessage(cmdInfo);
    // code - 2
    flowContext.invokeModuleVoidMessage(cmdInfo, yourData);
}
request/multiple_response,请求同类型多个逻辑服
可接收多个游戏逻辑服的响应,示例代码中提供了同步、异步回调、异步的编码风格。 其中,异步回调写法具备全链路调用日志跟踪。
示例方法说明
- invokeModuleCollectMessageSync 同步演示
 - invokeModuleCollectMessageAsync 异步回调演示
 - invokeModuleCollectMessageFuture 异步演示
 
void invokeModuleCollectMessageSync() {
    // code - 1
    ResponseCollectMessage response = flowContext.invokeModuleCollectMessage(cmdInfo);
    for (ResponseCollectItemMessage message : response.getMessageList()) {
        RoomNumMsg roomNumMsg = message.getData(RoomNumMsg.class);
        log.info("{}", roomNumMsg.roomCount);
    }
    // code - 2
    ResponseCollectMessage response2 = flowContext.invokeModuleCollectMessage(cmdInfo, yourData);
    log.info("{}", response2.getMessageList());
}
void invokeModuleCollectMessageAsync() {
    // code - 1
    flowContext.invokeModuleCollectMessageAsync(cmdInfo, responseCollectMessage -> {
        List<ResponseCollectItemMessage> messageList = responseCollectMessage.getMessageList();
        for (ResponseCollectItemMessage message : messageList) {
            RoomNumMsg roomNumMsg = message.getData(RoomNumMsg.class);
            log.info("{}", roomNumMsg.roomCount);
        }
    });
    // code - 2
    flowContext.invokeModuleCollectMessageAsync(cmdInfo, yourData, responseCollectMessage -> {
        log.info("{}", responseCollectMessage.getMessageList());
    });
}
void invokeModuleCollectMessageFuture() {
    CompletableFuture<ResponseCollectMessage> future;
    try {
        // code - 2
        future = flowContext.invokeModuleCollectMessageFuture(cmdInfo);
        ResponseCollectMessage response = future.get();
        log.info("{}", response.getMessageList());
    } catch (InterruptedException | ExecutionException e) {
        throw new RuntimeException(e);
    }
    // code - 2
    future = flowContext.invokeModuleCollectMessageFuture(cmdInfo, yourData);
}
broadcast,广播
代码说明
- code 3,全服广播。
 - code 6,广播消息给单个用户。
 - code 11,广播消息给指定用户列表。
 - code 13,给自己发送消息。
 - code 14,给自己发送消息,路由则使用当前 action 的路由。
 
@ActionMethod(1)
public void broadcast(FlowContext flowContext) {
    flowContext.broadcast(cmdInfo, yourData);
    long userId = 100;
    flowContext.broadcast(cmdInfo, yourData, userId);
    List<Long> userIdList = new ArrayList<>();
    userIdList.add(100L);
    userIdList.add(200L);
    flowContext.broadcast(cmdInfo, yourData, userIdList);
    flowContext.broadcastMe(cmdInfo, yourData);
    flowContext.broadcastMe(yourData);
}
flowContext.broadcastMe 方法需要玩家登录后才能使用。
EventBus,分布式事件总线
分布式事件总线是通讯方式之一,该通讯方式与 Guava EventBus、Redis 发布订阅、MQ ... 等产品类似。
...
void eventBus() {
    int userId = 100;
    YourEventMessage yourEventMessage = new YourEventMessage(userId);
    flowContext.fire(yourEventMessage);
}
访问游戏对外服
只会访问玩家所在的游戏对外服(即使启动了多个游戏对外服)。 示例代码中提供了同步、异步回调、异步的编码风格,其中,异步回调写法具备全链路调用日志跟踪。
示例方法说明
- invokeExternalModuleCollectMessageSync 同步演示
 - invokeExternalModuleCollectMessageAsync 异步回调演示
 - invokeExternalModuleCollectMessageFuture 异步演示
 
...
void invokeExternalModuleCollectMessageSync() {
    ResponseCollectExternalMessage response;
    // code - 1
    response = flowContext.invokeExternalModuleCollectMessage(bizCode);
    log.info("{}", response);
    // code - 2
    response = flowContext.invokeExternalModuleCollectMessage(bizCode, yourData);
    log.info("{}", response);
}
void invokeExternalModuleCollectMessageAsync() {
    // code - 1
    flowContext.invokeExternalModuleCollectMessageAsync(bizCode, response -> {
        response.optionalAnySuccess().ifPresent(itemMessage -> {
            Serializable data = itemMessage.getData();
            log.info("{}", data);
        });
    });
    // code - 2
    flowContext.invokeExternalModuleCollectMessageAsync(bizCode, yourData, response -> {
    });
}
void invokeExternalModuleCollectMessageFuture() {
    CompletableFuture<ResponseCollectExternalMessage> future;
    try {
        // code - 1
        future = flowContext.invokeExternalModuleCollectMessageFuture(bizCode);
        ResponseCollectExternalMessage response = future.get();
    } catch (InterruptedException | ExecutionException e) {
        throw new RuntimeException(e);
    }
    // code - 2
    future = flowContext.invokeExternalModuleCollectMessageFuture(bizCode, yourData);
}
自定义 FlowContext
框架提供了 FlowContext 的扩展,如果 FlowContext 现有的功能无法满足你现有的业务, 可以通过扩展的方式增强 FlowContext。
MyFlowContext 是 FlowContext 子类,我们在 MyFlowContext 中重写了 getAttachment 方法(稍后在元信息小节中会另外做介绍)。
FlowContextFactory 是业务框架 flow 上下文工厂,用于创建自定义的 FlowContext。 在业务框架中,添加自定义的 FlowContextFactory 生产工厂
code 7,设置一个自定义的 FlowContextFactory
@UtilityClass
public class MyBarSkeletonConfig {
    public BarSkeleton createBarSkeleton() {
        var config = new BarSkeletonBuilderParamConfig();
        BarSkeletonBuilder builder = config.createBuilder()
            .setFlowContextFactory(MyFlowContext::new);
        return builder.build();
    }
}
public class MyFlowContext extends FlowContext {
    MyAttachment attachment;
    @Override
    @SuppressWarnings("unchecked")
    public MyAttachment getAttachment() {
        if (Objects.isNull(attachment)) {
            this.attachment = this.getAttachment(MyAttachment.class);
        }
        return this.attachment;
    }
    public String getNickname() {
        MyAttachment attachment = this.getAttachment();
        return attachment.nickname;
    }
}
在 action 中的使用
自定义的 MyFlowContext 与框架默认提供的 FlowContext 使用方式是一样的
@ActionController(1)
public class SchoolAction {
    @ActionMethod(1)
    public String here(MyFlowContext flowContext) {
        log.info("{}", flowContext.getClass());
        return "MyFlowContext";
    }
}
小结 - FlowContext
自定义 FlowContext 是简单的,只需要两个步骤
- 自定义 MyFlowContext 类。
 - 设置 FlowContextFactory 到业务框架中。
 
元信息-附加信息
FlowContext 对元信息-附加信息的更新、获取上提供了便捷的使用方式。
元信息更新 - 1
该示例展示了通过 FlowContext 更新元信息,提供了同步、异步的更新方法。
代码说明
- code 2,获取元信息。
 - code 5,同步更新,将元信息同步到玩家所在的游戏对外服中。
 - code 6,异步无阻塞更新,将元信息同步到玩家所在的游戏对外服中。
 
void test(FlowContext flowContext) {
    MyAttachment attachment = flowContext.getAttachment(MyAttachment.class);
    attachment.nickname = "Michael Jackson";
    flowContext.updateAttachment(attachment);
    flowContext.updateAttachmentAsync(attachment);
}
public class MyAttachment implements Attachment {
    @Getter
    long userId;
    String nickname;
}
元信息更新 - 2
通过重写 getAttachment 方法,能更加便捷的使用元信息。
代码说明
- code 2,获取元信息。
 - code 5,同步更新,将元信息同步到玩家所在的游戏对外服中。
 - code 6,异步无阻塞更新,将元信息同步到玩家所在的游戏对外服中。
 - code 12,重写 getAttachment 方法。
 
void test(MyFlowContext flowContext) {
    MyAttachment attachment = flowContext.getAttachment();
    attachment.nickname = "渔民小镇";
    flowContext.updateAttachment();
    flowContext.updateAttachmentAsync();
}
public class MyFlowContext extends FlowContext {
    MyAttachment attachment;
    @Override
    @SuppressWarnings("unchecked")
    public MyAttachment getAttachment() {
        if (Objects.isNull(attachment)) {
            this.attachment = this.getAttachment(MyAttachment.class);
        }
        return this.attachment;
    }
}
动态属性使用
FlowContext 动态的添加属性,这在定制插件或处理特殊业务时很有用。
public final class MyDebugInOut implements ActionMethodInOut {
    ...
    // attr key
    final FlowOption<Long> timeKey = FlowOption.valueOf("ExecuteTimeInOutStartTime");
    @Override
    public void fuckIn(final FlowContext flowContext) {
        flowContext.option(timeKey, System.currentTimeMillis());
    }
    @Override
    public void fuckOut(final FlowContext flowContext) {
        long currentTimeMillis = System.currentTimeMillis();
        Long time = flowContext.option(timeKey);
        long ms = currentTimeMillis - time;
        log.info("{}", ms);
    }
}
线程执行器 - 无锁高并发
FlowContext 提供了获取与玩家所关联线程执行器,包括用户虚拟线程执行器和用户线程执行器。 关于线程相关内容,可阅读 ioGame 线程相关。
ioGame 提供了优雅、独特的线程执行器设计。通过该特性,开发者能轻易的编写出无锁高并发的代码。
用户线程执行器
将任务提交到用户所关联的用户线程执行器中执行,该线程执行器也是消费 action 业务的线程执行器。 因为提交的任务与消费玩家 action 业务的是同一个线程执行器,所以可以很好的避免并发问题。
代码说明
- code 2,使用用户线程执行任务,该方法具备全链路调用日志跟踪。
 - code 6,提交任务到用户线程执行器中。
 
void executor() {
    flowContext.execute(() -> {
        // your code
    });
    flowContext.getExecutor().execute(() -> {
        // your code
    });
}
不要在该线程执行器中执行耗时的任务,以免阻塞其他玩家。
虚拟线程执行器
将任务提交到用户所关联的用户虚拟线程执行器中执行。
代码说明
- code 2,使用虚拟线程执行任务,该方法具备全链路调用日志跟踪。
 - code 6,提交任务到虚拟线程执行器中。
 
void executeVirtual() {
    flowContext.executeVirtual(() -> {
        // your code
    });
    flowContext.getVirtualExecutor().execute(() -> {
        // your code
    });
    // for example
    flowContext.executeVirtual(() -> {
        flowContext.updateAttachment();
    });
}
小结
FlowContext 是当前 action 的请求上下文,通过 FlowContext 几乎可以拿到当前逻辑服的所有信息,这些信息包括:逻辑服、业务框架 ... 等。
FlowContext 的动态属性,方便开发者扩展的 option 系列方法(参考 FlowAttr.java)。
框架提供了 FlowContext 的扩展,如果 FlowContext 现有的功能无法满足你现有的业务,可以通过扩展的方式增强 FlowContext。
FlowContext 提供了便捷的通信能力,并提供了同步、异步、异步回调的便捷使用方式。
FlowContext 对元信息的更新、获取上提供了便捷的使用方式。 对于元信息的更新,提供了同步、异步的便捷使用方法,分别是
- updateAttachment 同步更新
 - updateAttachmentAsync 异步更新
 
FlowContext 还提供了获取与玩家所关联线程执行器,包括:用户虚拟线程执行器和用户线程执行器。