跳到主要内容

TaskKit 任务相关

介绍

是一个任务、时间、延时监听、超时监听...等相结合的一个工具模块。

使用场景

  • 超时处理任务
  • 定时器模拟
  • 任务调度
  • 处理 io 任务
  • 延时任务

单次任务的消费

处理一些 io 任务,或者不想阻塞业务线程的任务。

  • code 3,使用 CacheThreadPool 线程池执行任务
  • code 7,使用虚拟线程执行任务
public void execute() {
TaskKit.execute(() -> {
log.info("CacheThreadPool consumer task");
});

TaskKit.executeVirtual(() -> {
log.info("Virtual consumer task");
});
}

TaskListener 任务监听回调接口

TaskListener 是与时间相关的任务监听回调接口。

使用场景

一次性延时任务、任务调度、轻量可控的延时任务、轻量的定时入库辅助功能 ...等其他扩展场景。


TaskListener 是任务监听回调接口,接口提供几个方法。 只有 onUpdate 是必须实现的,其他方法可按需重写。

方法修饰符描述
onUpdate监听回调
triggerUpdatedefault是否触发 onUpdate() 监听回调方法
onException(Throwable)default异常回调。在执行 triggerUpdate() 和 onUpdate() 方法时,如果触发了异常,异常将被该方法捕获。
getExecutordefault指定执行器来执行上述方法,目的是不占用业务线程。

方法描述

  • onUpdate,这里编写你的业务逻辑
  • triggerUpdate,true 表示执行 onUpdate 方法,false 将不会执行。
  • onException,当 triggerUpdate 或 onUpdate 方法抛出异常时,将会被该方法捕获。 方便统一处理,任务之间相互不影响,起到隔离的作用。
  • getExecutor,指定执行器来消费当前任务,目的是不占用默认的业务线程。
@Slf4j
public class MyTaskListener implements TaskListener {
@Override
public void onUpdate() {
... your code
}

@Override
public boolean triggerUpdate() {
return true;
}

@Override
public void onException(Throwable e) {
log.error(e.getMessage(), e);
}

@Override
public Executor getExecutor() {
// return TaskKit.getCacheExecutor();
return TaskKit.getVirtualExecutor();
}
}

OnceTaskListener 超时、延时处理任务

超时任务处理、延时处理,只执行一次。适合执行一些没有 io 操作的任务。

类型参数名描述
OnceTaskListenertaskListener监听器
longdelay延时时间
TimeUnitunit时间单位

代码描述

  • code 2,只执行一次,2 秒后执行
  • code 3,只执行一次,1 分钟后执行
  • code 4,只执行一次,500 milliseconds 后
  • code 15,只执行一次,1500 Milliseconds后执行,当 theTriggerUpdate 为 true 时,才执行 onUpdate
public void runOnce() {
TaskKit.runOnce(() -> log.info("2 Seconds"), 2, TimeUnit.SECONDS);
TaskKit.runOnce(() -> log.info("1 Minute"), 1, TimeUnit.MINUTES);
TaskKit.runOnce(() -> log.info("500 delayMilliseconds"), 500, TimeUnit.MILLISECONDS);

boolean theTriggerUpdate = RandomKit.randomBoolean();
TaskKit.runOnce(new OnceTaskListener() {
@Override
public void onUpdate() {
log.info("1500 delayMilliseconds");
}

@Override
public boolean triggerUpdate() {
return theTriggerUpdate;
}

}, 1500, TimeUnit.MILLISECONDS);
}

示例综合练习

这里将使用一个案例来描述该工具的使用。

业务场景描述:

一些技能的释放持续,或者说一些桌游在对局中的倒计时,如炉石传说、三国杀、斗地主、麻将 ...等。

当轮到玩家操作时,通常会在游戏界面显示一个倒计时,玩家需要在规定的时间内完成相关操作。 一些常规的做法是使用定时器,如果玩家在规定的时间内完成了操作,还需要我们去取消该定时器。

而框架提供的 TaskKit.runOnce 方法,可以模拟一个定时器。 同时这种使用方法,可使我们的业务代码更加清晰。 我们可以通过扩展 OnceTaskListener 接口,来满足我们的业务(取消定时器),同时这种设计方式也更符合单一职责。

代码中,我们定义了两个类,分别是 Room、OperationTask。

  • Room 是当前房间类,用于存放一些对局中信息和相关玩家信息。 我们在类中定义了一个 round 属性,用于记录当前所轮的次数(我们会利用该属性来控制定时器是否执行)。 每当玩家操作完成后,round 属性会变动(+1)。
  • OperationTask 是一个定时器任务监听类,用于辅助我们的业务,其关键代码在 triggerUpdate 方法中。 如果当前 OperationTask currentRound 与 room round 值相同时,才会触发 onUpdate 方法。 这种使用方式不需要我们显示的去取消定时器,而是通过业务逻辑来控制是否执行 onUpdate。
public void test() {
Room room = new Room();
// 开始下一轮
room.startNextRound();
// 开始下一轮,上一个任务将不会执行
room.startNextRound();
}

record OperationTask(int currentRound, Room room) implements OnceTaskListener {
@Override
public void onUpdate() {
room.notice();
}

@Override
public boolean triggerUpdate() {
return currentRound == room.round.get();
}
}

class Room {
AtomicInteger round = new AtomicInteger(1);
long currentOperationPlayerId;

void notice() {
this.startNextRound();
}

void startNextRound() {
int currentRound = this.round.incrementAndGet();
this.currentOperationPlayerId = 100;
TaskKit.runOnce(new OperationTask(currentRound, this), 1, TimeUnit.SECONDS);
}
}

IntervalTaskListener 间隔执行、任务调度

该方法主要用于模拟 ScheduledThreadPoolExecutor, 如果有一些需要间隔执行的任务(如 每 N 秒、每 N 分钟、每 N 小时之类的间隔调度),可以使用此方法来代替。

可消费多次的任务,可控制是否执行任务,支持取消任务。

类型参数名描述
IntervalTaskListenertaskListener任务监听
longticktick 时间间隔;每 tick 时间间隔,会调用一次监听
TimeUnittimeUnittick 时间单位

与 OnceTaskListener 类大致类似,这里就不重复了。 与 OnceTaskListener 相比,该接口多了一个 isActive 方法。 当 isActive 方法返回 false 时,将会从调度器中移除。

IntervalTaskListener 接口实现任务调度,支持

  • 任务调度(间隔时间和时间单位)
  • 任务移除
  • 任务跳过
  • 指定执行器

  • code 2,每 2 秒调用一次
  • code 3,每 30 分钟调用一次
public void runInterval() {
TaskKit.runInterval(() -> log.info("tick 2 Seconds"), 2, TimeUnit.SECONDS);
TaskKit.runInterval(() -> log.info("tick 30 Minute"), 30, TimeUnit.MINUTES);
}

  • code 12,当返回 false 则表示不活跃,会从监听列表中移除当前 Listener
public void runInterval() {
TaskKit.runInterval(new IntervalTaskListener() {
int hp = 2;

@Override
public void onUpdate() {
hp--;
}

@Override
public boolean isActive() {
return hp != 0;
}
}, 1, TimeUnit.SECONDS);
}

  • code 12,当返回值为 true 时,才会执行 onUpdate 方法
public void runInterval() {
TaskKit.runInterval(new IntervalTaskListener() {
int hp;

@Override
public void onUpdate() {
// your code
}

@Override
public boolean triggerUpdate() {
hp++;
return hp % 2 == 0;
}
}, 1, TimeUnit.SECONDS);
}

  • code 12,指定执行器来执行当前回调的 onUpdate 方法,以避免阻塞其他任务。
public void runInterval() {
ExecutorService executorService = TaskKit.getCacheExecutor();

TaskKit.runInterval(new IntervalTaskListener() {
@Override
public void onUpdate() {
// your DB code
}

@Override
public Executor getExecutor() {
return executorService;
}
}, 1, TimeUnit.SECONDS);

}

小结

TaskKit 提供了多种消费任务的方式,分别是

  1. execute 系列,只消费一次的任务。
  2. runOnce 系列,只消费一次的任务;支持延时执行任务,支持取消任务。
  3. runInterval 系列,可消费多次的任务;可控制是否执行任务,支持取消任务。