时间轮实现
时间轮这个数据结构大家可能不太熟悉,简单介绍一下:
时间轮(Time Wheel)是一种用于高效管理和调度定时任务的数据结构。它特别适用于需要处理大量定时任务的场景,时间轮的设计理念是通过将时间划分成多个槽(Slot),每个槽代表一个固定的时间间隔
当需要添加一个定时任务时,计算该任务的到期时间相对于当前时间的位置。根据到期时间,将任务插入到相应的槽中
指针每经过一个时间间隔(例如1秒)向前移动一个槽。当指针移动到某个槽时,检查该槽中的任务,并执行这些任务
有些同学可能要问,那如果我需要设置的时间离现在太远了,一轮根本装不下,怎么办,我总不能生成一个无限长度的轮子吧。好问题,我们可以用很多个轮子来存放资源,像时针分针秒针一样,任务资源会存放多个轮子的数据,只有所有数据都达标的时候,任务才会执行
时间轮通常用于实现 XX 时间后的延时任务(如定时任务、延迟 MQ 等),或周期性任务。
一个长度为8的单时间轮
其实本质上它就是一个环形的数组,如图所示,假设我们创建了一个长度为 8 的时间轮。
使用单时间轮用AI仿照quartz进行写了一个单时间轮的模式的
Job.java
package com.cqsym.myquartz;
/**
* 任务接口。所有需要被调度执行的任务都必须实现此接口。
*/
public interface Job {
/**
* 任务执行的具体逻辑。
* @param context 执行上下文,包含任务信息和触发时间。
* @throws JobExecutionException 任务执行过程中可能抛出的异常。
*/
void execute(JobExecutionContext context) throws JobExecutionException;
}
/**
* 任务执行异常。
*/
class JobExecutionException extends Exception {
public JobExecutionException(String message) {
super(message);
}
public JobExecutionException(String message, Throwable cause) {
super(message, cause);
}
}
CronExpression.java
package com.cqsym.myquartz;
import java.time.LocalDateTime;
import java.util.HashSet;
import java.util.Set;
/**
* 简化的 Cron 表达式解析器,支持格式:秒 分 时 日 月 周
* 支持 * , - / 操作符。
* 周:1-7,1=周日,7=周六。
*/
public class CronExpression {
private final Set<Integer> seconds;
private final Set<Integer> minutes;
private final Set<Integer> hours;
private final Set<Integer> daysOfMonth;
private final Set<Integer> months;
private final Set<Integer> daysOfWeek;
public CronExpression(String expression) throws IllegalArgumentException {
String[] parts = expression.trim().split("\\s+");
if (parts.length != 6) {
throw new IllegalArgumentException("Cron expression must have 6 parts: 秒 分 时 日 月 周");
}
this.seconds = parseField(parts[0], 0, 59);
this.minutes = parseField(parts[1], 0, 59);
this.hours = parseField(parts[2], 0, 23);
this.daysOfMonth = parseField(parts[3], 1, 31);
this.months = parseField(parts[4], 1, 12);
this.daysOfWeek = parseField(parts[5], 1, 7);
}
// 解析单个字段,如 "0-10/2" 或 "*/15" 或 "1,2,3"
private Set<Integer> parseField(String field, int min, int max) {
Set<Integer> result = new HashSet<>();
if (field.equals("*")) {
for (int i = min; i <= max; i++) {
result.add(i);
}
return result;
}
String[] ranges = field.split(",");
for (String range : ranges) {
if (range.contains("/")) {
// 处理步长,如 */15 或 0-30/5
String[] parts = range.split("/");
int step = Integer.parseInt(parts[1]);
if (parts[0].equals("*")) {
for (int i = min; i <= max; i += step) {
result.add(i);
}
} else if (parts[0].contains("-")) {
String[] bounds = parts[0].split("-");
int start = Integer.parseInt(bounds[0]);
int end = Integer.parseInt(bounds[1]);
for (int i = start; i <= end; i += step) {
if (i >= min && i <= max) {
result.add(i);
}
}
}
} else if (range.contains("-")) {
// 处理范围,如 0-10
String[] bounds = range.split("-");
int start = Integer.parseInt(bounds[0]);
int end = Integer.parseInt(bounds[1]);
for (int i = start; i <= end; i++) {
if (i >= min && i <= max) {
result.add(i);
}
}
} else {
// 单个值,如 5
int value = Integer.parseInt(range);
if (value >= min && value <= max) {
result.add(value);
} else {
throw new IllegalArgumentException("Value " + value + " out of range [" + min + "," + max + "]");
}
}
}
return result;
}
/**
* 判断给定时间是否匹配此表达式。
* @param time LocalDateTime
* @return true if matches
*/
public boolean matches(LocalDateTime time) {
return seconds.contains(time.getSecond()) &&
minutes.contains(time.getMinute()) &&
hours.contains(time.getHour()) &&
daysOfMonth.contains(time.getDayOfMonth()) &&
months.contains(time.getMonthValue()) &&
daysOfWeek.contains(time.getDayOfWeek().getValue() % 7 + 1); // JDK: MON=1, SUN=7 -> 1=周日, 7=周六
}
// 用于调试
@Override
public String toString() {
return String.format("CronExpression{sec=%s, min=%s, hour=%s, dom=%s, mon=%s, dow=%s}",
seconds, minutes, hours, daysOfMonth, months, daysOfWeek);
}
}
JobExecutionContext.java
package com.cqsym.myquartz;
import java.time.LocalDateTime;
/**
* 任务执行上下文,提供任务执行时的环境信息。
*/
public class JobExecutionContext {
private final String jobName;
private final LocalDateTime fireTime;
public JobExecutionContext(String jobName, LocalDateTime fireTime) {
this.jobName = jobName;
this.fireTime = fireTime;
}
public String getJobName() {
return jobName;
}
public LocalDateTime getFireTime() {
return fireTime;
}
}
MyQuartzScheduler.java
package com.cqsym.myquartz;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 基于时间轮的简化 Quartz 调度器。
* 时间轮大小为 86400(一天的秒数),tick 为 1 秒。
*/
public class MyQuartzScheduler {
private static final int WHEEL_SIZE = 86400; // 24 * 60 * 60
private final List<JobEntry>[] timeWheel;
private final ScheduledExecutorService scheduler;
private final Map<String, JobEntry> jobRegistry;
private final AtomicBoolean running;
private int currentSecond;
public MyQuartzScheduler() {
this.timeWheel = new List[WHEEL_SIZE];
for (int i = 0; i < WHEEL_SIZE; i++) {
timeWheel[i] = new ArrayList<>();
}
this.scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "MyQuartz-Timer-Thread");
t.setDaemon(true);
return t;
});
this.jobRegistry = new ConcurrentHashMap<>();
this.running = new AtomicBoolean(false);
this.currentSecond = getCurrentSecondOfDay();
}
/**
* 启动调度器。
*/
public synchronized void start() {
if (running.get()) {
System.out.println("Scheduler is already running.");
return;
}
running.set(true);
System.out.println("MyQuartzScheduler started.");
// 每秒推进一次时间轮
scheduler.scheduleAtFixedRate(this::advanceWheel, 1000 - System.currentTimeMillis() % 1000, 1000, TimeUnit.MILLISECONDS);
}
/**
* 停止调度器。
*/
public synchronized void shutdown() {
if (!running.get()) {
System.out.println("Scheduler is not running.");
return;
}
running.set(false);
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
System.out.println("MyQuartzScheduler stopped.");
}
/**
* 添加一个任务。
* @param jobName 任务名称
* @param job 任务实例
* @param cronExpression Cron 表达式 (秒 分 时 日 月 周)
*/
public void scheduleJob(String jobName, Job job, String cronExpression) {
Objects.requireNonNull(jobName, "jobName cannot be null");
Objects.requireNonNull(job, "job cannot be null");
Objects.requireNonNull(cronExpression, "cronExpression cannot be null");
CronExpression expr;
try {
expr = new CronExpression(cronExpression);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid cron expression: " + cronExpression, e);
}
JobEntry entry = new JobEntry(jobName, job, expr);
jobRegistry.put(jobName, entry);
// 预计算未来24小时内的所有触发时间,并加入时间轮
preScheduleJob(entry);
System.out.println("Job scheduled: " + jobName + " with expression " + cronExpression);
}
/**
* 从调度器中移除任务。
* @param jobName 任务名称
*/
public void unscheduleJob(String jobName) {
JobEntry removed = jobRegistry.remove(jobName);
if (removed != null) {
System.out.println("Job unscheduled: " + jobName);
} else {
System.out.println("Job not found: " + jobName);
}
}
/**
* 推进时间轮指针。
*/
private void advanceWheel() {
if (!running.get()) return;
LocalDateTime now = LocalDateTime.now();
int nowSecond = (now.getHour() * 3600) + (now.getMinute() * 60) + now.getSecond();
// 处理时间跳跃(如手动修改系统时间)
int diff = nowSecond - currentSecond;
if (diff > 1) {
// 时间向前跳,可能错过了多个秒
for (int i = 1; i < diff; i++) {
int missedSecond = (currentSecond + i) % WHEEL_SIZE;
executeJobsAt(missedSecond);
}
} else if (diff < -WHEEL_SIZE / 2) {
// 时间向后跳(超过半天),重置
System.out.println("System time jumped backward significantly. Resetting wheel.");
// 理论上应清空轮子并重新预计算,此处简化处理
}
currentSecond = nowSecond;
executeJobsAt(currentSecond);
// 预计算下一个24小时周期的任务(每天凌晨执行一次)
if (now.getHour() == 0 && now.getMinute() == 0 && now.getSecond() == 0) {
jobRegistry.values().forEach(this::preScheduleJob);
}
}
/**
* 在指定秒执行该秒的所有任务。
* @param secondOfDay 0-86399
*/
private void executeJobsAt(int secondOfDay) {
List<JobEntry> jobs = timeWheel[secondOfDay];
if (jobs.isEmpty()) return;
LocalDateTime fireTime = LocalDateTime.now();
// 使用独立线程池执行任务,避免阻塞时间轮推进
ExecutorService jobExecutor = Executors.newCachedThreadPool();
for (JobEntry entry : jobs) {
jobExecutor.submit(() -> {
try {
JobExecutionContext context = new JobExecutionContext(entry.jobName, fireTime);
entry.job.execute(context);
} catch (Exception e) {
System.err.println("Job execution failed: " + entry.jobName);
e.printStackTrace();
}
});
}
jobExecutor.shutdown();
// 清空当前槽
jobs.clear();
}
/**
* 预计算未来24小时内的所有触发时间,并将任务加入时间轮。
* @param entry 任务条目
*/
private void preScheduleJob(JobEntry entry) {
LocalDateTime start = LocalDateTime.now().truncatedTo(java.time.temporal.ChronoUnit.DAYS);
LocalDateTime end = start.plusDays(1);
for (LocalDateTime time = start; time.isBefore(end); time = time.plusSeconds(1)) {
if (entry.expression.matches(time)) {
int secondOfDay = time.getHour() * 3600 + time.getMinute() * 60 + time.getSecond();
timeWheel[secondOfDay].add(entry);
}
}
}
/**
* 获取当前时间在一天中的秒数(0-86399)
*/
private int getCurrentSecondOfDay() {
LocalDateTime now = LocalDateTime.now();
return now.getHour() * 3600 + now.getMinute() * 60 + now.getSecond();
}
// 内部任务条目
private static class JobEntry {
final String jobName;
final Job job;
final CronExpression expression;
JobEntry(String jobName, Job job, CronExpression expression) {
this.jobName = jobName;
this.job = job;
this.expression = expression;
}
}
// --- 主函数用于测试 ---
public static void main(String[] args) throws InterruptedException {
MyQuartzScheduler scheduler = new MyQuartzScheduler();
// 定义一个简单任务
Job helloJob = context -> System.out.println("Hello from job: " + context.getJobName() +
" at " + context.getFireTime());
// 添加任务:每分钟的第10秒执行
scheduler.scheduleJob("helloJob", helloJob, "10 * * * * *");
// 添加任务:每10秒执行一次
scheduler.scheduleJob("every10Sec", context -> System.out.println("Tick! " + context.getFireTime()),
"*/10 * * * * *");
// 启动调度器
scheduler.start();
// 运行1分钟
Thread.sleep(60_000);
// 关闭调度器
scheduler.shutdown();
}
}
运行MyQuartzScheduler的结果