TaskKit 任务相关
介绍
是一个任务、时间、延时监听、超时监听...等相结合的一个工具模块。
使用场景
- 超时处理任务
- 定时器模拟
- 任务调度
- 处理 io 任务
- 延时任务
单次任务的消费
处理一些 io 任务,或者不想阻塞业务线程的任务。
- code 3,使用 CacheThreadPool 线程池执行任务
- code 7,使用虚拟线程执行任务
public void execute() {
TaskKit.execute(() -> {
log.info("CacheThreadPool consumer task");
});
TaskKit.executeVirtual(() -> {
log.info("Virtual consumer task");
});
}
TaskListener 任务监听回调接口
TaskListener 是与时间相关的任务监听回调接口。
使用场景
一次性延时任务、任务调度、轻量可控的延时任务、轻量的定时入库辅助功能 ...等其他扩展场景。
TaskListener 是任务监听回调接口,接口提供几个方法。 只有 onUpdate 是必须实现的,其他方法可按需重写。
方法 | 修饰符 | 描述 |
---|---|---|
onUpdate | 监听回调 | |
triggerUpdate | default | 是否触发 onUpdate() 监听回调方法 |
onException(Throwable) | default | 异常回调。在执行 triggerUpdate() 和 onUpdate() 方法时,如果触发了异常,异常将被该方法捕获。 |
getExecutor | default | 指定执行器来执行上述方法,目的是不占用业务线程。 |
方法描述
- onUpdate,这里编写你的业务逻辑
- triggerUpdate,true 表示执行 onUpdate 方法,false 将不会执行。
- onException,当 triggerUpdate 或 onUpdate 方法抛出异常时,将会被该方法捕获。 方便统一处理,任务之间相互不影响,起到隔离的作用。
- getExecutor,指定执行器来消费当前任务,目的是不占用默认的业务线程。
@Slf4j
public class MyTaskListener implements TaskListener {
@Override
public void onUpdate() {
... your code
}
@Override
public boolean triggerUpdate() {
return true;
}
@Override
public void onException(Throwable e) {
log.error(e.getMessage(), e);
}
@Override
public Executor getExecutor() {
// return TaskKit.getCacheExecutor();
return TaskKit.getVirtualExecutor();
}
}
OnceTaskListener 超时、延时处理任务
超时任务处理、延时处理,只执行一次。适合执行一些没有 io 操作的任务。
类型 | 参数名 | 描述 |
---|---|---|
OnceTaskListener | taskListener | 监听器 |
long | delay | 延时时间 |
TimeUnit | unit | 时间单位 |
代码描述
- code 2,只执行一次,2 秒后执行
- code 3,只执行一次,1 分钟后执行
- code 4,只执行一次,500 milliseconds 后
- code 15,只执行一次,1500 Milliseconds后执行,当 theTriggerUpdate 为 true 时,才执行 onUpdate
public void runOnce() {
TaskKit.runOnce(() -> log.info("2 Seconds"), 2, TimeUnit.SECONDS);
TaskKit.runOnce(() -> log.info("1 Minute"), 1, TimeUnit.MINUTES);
TaskKit.runOnce(() -> log.info("500 delayMilliseconds"), 500, TimeUnit.MILLISECONDS);
boolean theTriggerUpdate = RandomKit.randomBoolean();
TaskKit.runOnce(new OnceTaskListener() {
@Override
public void onUpdate() {
log.info("1500 delayMilliseconds");
}
@Override
public boolean triggerUpdate() {
return theTriggerUpdate;
}
}, 1500, TimeUnit.MILLISECONDS);
}
示例综合练习
这里将使用一个案例来描述该工具的使用。
业务场景描述:
一些技能的释放持续,或者说一些桌游在对局中的倒计时,如炉石传说、三国杀、斗地主、麻将 ...等。
当轮到玩家操作时,通常会在游戏界面显示一个倒计时,玩家需要在规定的时间内完成相关操作。 一些常规的做法是使用定时器,如果玩家在规定的时间内完成了操作,还需要我们去取消该定时器。
而框架提供的 TaskKit.runOnce 方法,可以模拟一个定时器。 同时这种使用方法,可使我们的业务代码更加清晰。 我们可以通过扩展 OnceTaskListener 接口,来满足我们的业务(取消定时器),同时这种设计方式也更符合单一职责。
代码中,我们定义了两个类,分别是 Room、OperationTask。
- Room 是当前房间类,用于存放一些对局中信息和相关玩家信息。 我们在类中定义了一个 round 属性,用于记录当前所轮的次数(我们会利用该属性来控制定时器是否执行)。 每当玩家操作完成后,round 属性会变动(+1)。
- OperationTask 是一个定时器任务监听类,用于辅助我们的业务,其关键代码在 triggerUpdate 方法中。 如果当前 OperationTask currentRound 与 room round 值相同时,才会触发 onUpdate 方法。 这种使用方式不需要我们显示的去取消定时器,而是通过业务逻辑来控制是否执行 onUpdate。
public void test() {
Room room = new Room();
// 开始下一轮
room.startNextRound();
// 开始下一轮,上一个任务将不会执行
room.startNextRound();
}
record OperationTask(int currentRound, Room room) implements OnceTaskListener {
@Override
public void onUpdate() {
room.notice();
}
@Override
public boolean triggerUpdate() {
return currentRound == room.round.get();
}
}
class Room {
AtomicInteger round = new AtomicInteger(1);
long currentOperationPlayerId;
void notice() {
this.startNextRound();
}
void startNextRound() {
int currentRound = this.round.incrementAndGet();
this.currentOperationPlayerId = 100;
TaskKit.runOnce(new OperationTask(currentRound, this), 1, TimeUnit.SECONDS);
}
}
IntervalTaskListener 间隔执行、任务调度
该方法主要用于模拟 ScheduledThreadPoolExecutor, 如果有一些需要间隔执行的任务(如 每 N 秒、每 N 分钟、每 N 小时之类的间隔调度),可以使用此方法来代替。
可消费多次的任务,可控制是否执行任务,支持取消任务。
类型 | 参数名 | 描述 |
---|---|---|
IntervalTaskListener | taskListener | 任务监听 |
long | tick | tick 时间间隔;每 tick 时间间隔,会调用一次监听 |
TimeUnit | timeUnit | tick 时间单位 |
与 OnceTaskListener 类大致类似,这里就不重复了。 与 OnceTaskListener 相比,该接口多了一个 isActive 方法。 当 isActive 方法返回 false 时,将会从调度器中移除。
IntervalTaskListener 接口实现任务调度,支持
- 任务调度(间隔时间和时间单位)
- 任务移除
- 任务跳过
- 指定执行器
- code 2,每 2 秒调用一次
- code 3,每 30 分钟调用一次
public void runInterval() {
TaskKit.runInterval(() -> log.info("tick 2 Seconds"), 2, TimeUnit.SECONDS);
TaskKit.runInterval(() -> log.info("tick 30 Minute"), 30, TimeUnit.MINUTES);
}
- code 12,当返回 false 则表示不活跃,会从监听列表中移除当前 Listener
public void runInterval() {
TaskKit.runInterval(new IntervalTaskListener() {
int hp = 2;
@Override
public void onUpdate() {
hp--;
}
@Override
public boolean isActive() {
return hp != 0;
}
}, 1, TimeUnit.SECONDS);
}
- code 12,当返回值为 true 时,才会执行 onUpdate 方法
public void runInterval() {
TaskKit.runInterval(new IntervalTaskListener() {
int hp;
@Override
public void onUpdate() {
// your code
}
@Override
public boolean triggerUpdate() {
hp++;
return hp % 2 == 0;
}
}, 1, TimeUnit.SECONDS);
}
- code 12,指定执行器来执行当前回调的 onUpdate 方法,以避免阻塞其他任务。
public void runInterval() {
ExecutorService executorService = TaskKit.getCacheExecutor();
TaskKit.runInterval(new IntervalTaskListener() {
@Override
public void onUpdate() {
// your DB code
}
@Override
public Executor getExecutor() {
return executorService;
}
}, 1, TimeUnit.SECONDS);
}
小结
TaskKit 提供了多种消费任务的方式,分别是
- execute 系列,只消费一次的任务。
- runOnce 系列,只消费一次的任务;支持延时执行任务,支持取消任务。
- runInterval 系列,可消费多次的任务;可控制是否执行任务,支持取消任务。