当前位置:首页 > CN2资讯 > 正文内容

时间轮算法在定时任务中的工作原理以及简单版本的quartz的实现

3天前CN2资讯

时间轮实现

时间轮这个数据结构大家可能不太熟悉,简单介绍一下:

时间轮(Time Wheel)是一种用于高效管理和调度定时任务的数据结构。它特别适用于需要处理大量定时任务的场景,时间轮的设计理念是通过将时间划分成多个槽(Slot),每个槽代表一个固定的时间间隔

当需要添加一个定时任务时,计算该任务的到期时间相对于当前时间的位置。根据到期时间,将任务插入到相应的槽中

指针每经过一个时间间隔(例如1秒)向前移动一个槽。当指针移动到某个槽时,检查该槽中的任务,并执行这些任务

有些同学可能要问,那如果我需要设置的时间离现在太远了,一轮根本装不下,怎么办,我总不能生成一个无限长度的轮子吧。好问题,我们可以用很多个轮子来存放资源,像时针分针秒针一样,任务资源会存放多个轮子的数据,只有所有数据都达标的时候,任务才会执行

时间轮通常用于实现 XX 时间后的延时任务(如定时任务、延迟 MQ 等),或周期性任务。


一个长度为8的单时间轮

其实本质上它就是一个环形的数组,如图所示,假设我们创建了一个长度为 8 的时间轮。



使用单时间轮用AI仿照quartz进行写了一个单时间轮的模式的

Job.java

package com.cqsym.myquartz; /** * 任务接口。所有需要被调度执行的任务都必须实现此接口。 */ public interface Job { /** * 任务执行的具体逻辑。 * @param context 执行上下文,包含任务信息和触发时间。 * @throws JobExecutionException 任务执行过程中可能抛出的异常。 */ void execute(JobExecutionContext context) throws JobExecutionException; } /** * 任务执行异常。 */ class JobExecutionException extends Exception { public JobExecutionException(String message) { super(message); } public JobExecutionException(String message, Throwable cause) { super(message, cause); } }

CronExpression.java

package com.cqsym.myquartz; import java.time.LocalDateTime; import java.util.HashSet; import java.util.Set; /** * 简化的 Cron 表达式解析器,支持格式:秒 分 时 日 月 周 * 支持 * , - / 操作符。 * 周:1-7,1=周日,7=周六。 */ public class CronExpression { private final Set<Integer> seconds; private final Set<Integer> minutes; private final Set<Integer> hours; private final Set<Integer> daysOfMonth; private final Set<Integer> months; private final Set<Integer> daysOfWeek; public CronExpression(String expression) throws IllegalArgumentException { String[] parts = expression.trim().split("\\s+"); if (parts.length != 6) { throw new IllegalArgumentException("Cron expression must have 6 parts: 秒 分 时 日 月 周"); } this.seconds = parseField(parts[0], 0, 59); this.minutes = parseField(parts[1], 0, 59); this.hours = parseField(parts[2], 0, 23); this.daysOfMonth = parseField(parts[3], 1, 31); this.months = parseField(parts[4], 1, 12); this.daysOfWeek = parseField(parts[5], 1, 7); } // 解析单个字段,如 "0-10/2" 或 "*/15" 或 "1,2,3" private Set<Integer> parseField(String field, int min, int max) { Set<Integer> result = new HashSet<>(); if (field.equals("*")) { for (int i = min; i <= max; i++) { result.add(i); } return result; } String[] ranges = field.split(","); for (String range : ranges) { if (range.contains("/")) { // 处理步长,如 */15 或 0-30/5 String[] parts = range.split("/"); int step = Integer.parseInt(parts[1]); if (parts[0].equals("*")) { for (int i = min; i <= max; i += step) { result.add(i); } } else if (parts[0].contains("-")) { String[] bounds = parts[0].split("-"); int start = Integer.parseInt(bounds[0]); int end = Integer.parseInt(bounds[1]); for (int i = start; i <= end; i += step) { if (i >= min && i <= max) { result.add(i); } } } } else if (range.contains("-")) { // 处理范围,如 0-10 String[] bounds = range.split("-"); int start = Integer.parseInt(bounds[0]); int end = Integer.parseInt(bounds[1]); for (int i = start; i <= end; i++) { if (i >= min && i <= max) { result.add(i); } } } else { // 单个值,如 5 int value = Integer.parseInt(range); if (value >= min && value <= max) { result.add(value); } else { throw new IllegalArgumentException("Value " + value + " out of range [" + min + "," + max + "]"); } } } return result; } /** * 判断给定时间是否匹配此表达式。 * @param time LocalDateTime * @return true if matches */ public boolean matches(LocalDateTime time) { return seconds.contains(time.getSecond()) && minutes.contains(time.getMinute()) && hours.contains(time.getHour()) && daysOfMonth.contains(time.getDayOfMonth()) && months.contains(time.getMonthValue()) && daysOfWeek.contains(time.getDayOfWeek().getValue() % 7 + 1); // JDK: MON=1, SUN=7 -> 1=周日, 7=周六 } // 用于调试 @Override public String toString() { return String.format("CronExpression{sec=%s, min=%s, hour=%s, dom=%s, mon=%s, dow=%s}", seconds, minutes, hours, daysOfMonth, months, daysOfWeek); } }

JobExecutionContext.java

package com.cqsym.myquartz; import java.time.LocalDateTime; /** * 任务执行上下文,提供任务执行时的环境信息。 */ public class JobExecutionContext { private final String jobName; private final LocalDateTime fireTime; public JobExecutionContext(String jobName, LocalDateTime fireTime) { this.jobName = jobName; this.fireTime = fireTime; } public String getJobName() { return jobName; } public LocalDateTime getFireTime() { return fireTime; } }

MyQuartzScheduler.java

package com.cqsym.myquartz; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; /** * 基于时间轮的简化 Quartz 调度器。 * 时间轮大小为 86400(一天的秒数),tick 为 1 秒。 */ public class MyQuartzScheduler { private static final int WHEEL_SIZE = 86400; // 24 * 60 * 60 private final List<JobEntry>[] timeWheel; private final ScheduledExecutorService scheduler; private final Map<String, JobEntry> jobRegistry; private final AtomicBoolean running; private int currentSecond; public MyQuartzScheduler() { this.timeWheel = new List[WHEEL_SIZE]; for (int i = 0; i < WHEEL_SIZE; i++) { timeWheel[i] = new ArrayList<>(); } this.scheduler = Executors.newSingleThreadScheduledExecutor(r -> { Thread t = new Thread(r, "MyQuartz-Timer-Thread"); t.setDaemon(true); return t; }); this.jobRegistry = new ConcurrentHashMap<>(); this.running = new AtomicBoolean(false); this.currentSecond = getCurrentSecondOfDay(); } /** * 启动调度器。 */ public synchronized void start() { if (running.get()) { System.out.println("Scheduler is already running."); return; } running.set(true); System.out.println("MyQuartzScheduler started."); // 每秒推进一次时间轮 scheduler.scheduleAtFixedRate(this::advanceWheel, 1000 - System.currentTimeMillis() % 1000, 1000, TimeUnit.MILLISECONDS); } /** * 停止调度器。 */ public synchronized void shutdown() { if (!running.get()) { System.out.println("Scheduler is not running."); return; } running.set(false); scheduler.shutdown(); try { if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) { scheduler.shutdownNow(); } } catch (InterruptedException e) { scheduler.shutdownNow(); Thread.currentThread().interrupt(); } System.out.println("MyQuartzScheduler stopped."); } /** * 添加一个任务。 * @param jobName 任务名称 * @param job 任务实例 * @param cronExpression Cron 表达式 (秒 分 时 日 月 周) */ public void scheduleJob(String jobName, Job job, String cronExpression) { Objects.requireNonNull(jobName, "jobName cannot be null"); Objects.requireNonNull(job, "job cannot be null"); Objects.requireNonNull(cronExpression, "cronExpression cannot be null"); CronExpression expr; try { expr = new CronExpression(cronExpression); } catch (IllegalArgumentException e) { throw new IllegalArgumentException("Invalid cron expression: " + cronExpression, e); } JobEntry entry = new JobEntry(jobName, job, expr); jobRegistry.put(jobName, entry); // 预计算未来24小时内的所有触发时间,并加入时间轮 preScheduleJob(entry); System.out.println("Job scheduled: " + jobName + " with expression " + cronExpression); } /** * 从调度器中移除任务。 * @param jobName 任务名称 */ public void unscheduleJob(String jobName) { JobEntry removed = jobRegistry.remove(jobName); if (removed != null) { System.out.println("Job unscheduled: " + jobName); } else { System.out.println("Job not found: " + jobName); } } /** * 推进时间轮指针。 */ private void advanceWheel() { if (!running.get()) return; LocalDateTime now = LocalDateTime.now(); int nowSecond = (now.getHour() * 3600) + (now.getMinute() * 60) + now.getSecond(); // 处理时间跳跃(如手动修改系统时间) int diff = nowSecond - currentSecond; if (diff > 1) { // 时间向前跳,可能错过了多个秒 for (int i = 1; i < diff; i++) { int missedSecond = (currentSecond + i) % WHEEL_SIZE; executeJobsAt(missedSecond); } } else if (diff < -WHEEL_SIZE / 2) { // 时间向后跳(超过半天),重置 System.out.println("System time jumped backward significantly. Resetting wheel."); // 理论上应清空轮子并重新预计算,此处简化处理 } currentSecond = nowSecond; executeJobsAt(currentSecond); // 预计算下一个24小时周期的任务(每天凌晨执行一次) if (now.getHour() == 0 && now.getMinute() == 0 && now.getSecond() == 0) { jobRegistry.values().forEach(this::preScheduleJob); } } /** * 在指定秒执行该秒的所有任务。 * @param secondOfDay 0-86399 */ private void executeJobsAt(int secondOfDay) { List<JobEntry> jobs = timeWheel[secondOfDay]; if (jobs.isEmpty()) return; LocalDateTime fireTime = LocalDateTime.now(); // 使用独立线程池执行任务,避免阻塞时间轮推进 ExecutorService jobExecutor = Executors.newCachedThreadPool(); for (JobEntry entry : jobs) { jobExecutor.submit(() -> { try { JobExecutionContext context = new JobExecutionContext(entry.jobName, fireTime); entry.job.execute(context); } catch (Exception e) { System.err.println("Job execution failed: " + entry.jobName); e.printStackTrace(); } }); } jobExecutor.shutdown(); // 清空当前槽 jobs.clear(); } /** * 预计算未来24小时内的所有触发时间,并将任务加入时间轮。 * @param entry 任务条目 */ private void preScheduleJob(JobEntry entry) { LocalDateTime start = LocalDateTime.now().truncatedTo(java.time.temporal.ChronoUnit.DAYS); LocalDateTime end = start.plusDays(1); for (LocalDateTime time = start; time.isBefore(end); time = time.plusSeconds(1)) { if (entry.expression.matches(time)) { int secondOfDay = time.getHour() * 3600 + time.getMinute() * 60 + time.getSecond(); timeWheel[secondOfDay].add(entry); } } } /** * 获取当前时间在一天中的秒数(0-86399) */ private int getCurrentSecondOfDay() { LocalDateTime now = LocalDateTime.now(); return now.getHour() * 3600 + now.getMinute() * 60 + now.getSecond(); } // 内部任务条目 private static class JobEntry { final String jobName; final Job job; final CronExpression expression; JobEntry(String jobName, Job job, CronExpression expression) { this.jobName = jobName; this.job = job; this.expression = expression; } } // --- 主函数用于测试 --- public static void main(String[] args) throws InterruptedException { MyQuartzScheduler scheduler = new MyQuartzScheduler(); // 定义一个简单任务 Job helloJob = context -> System.out.println("Hello from job: " + context.getJobName() + " at " + context.getFireTime()); // 添加任务:每分钟的第10秒执行 scheduler.scheduleJob("helloJob", helloJob, "10 * * * * *"); // 添加任务:每10秒执行一次 scheduler.scheduleJob("every10Sec", context -> System.out.println("Tick! " + context.getFireTime()), "*/10 * * * * *"); // 启动调度器 scheduler.start(); // 运行1分钟 Thread.sleep(60_000); // 关闭调度器 scheduler.shutdown(); } }

运行MyQuartzScheduler的结果

    你可能想看:

    扫描二维码推送至手机访问。

    版权声明:本文由皇冠云发布,如需转载请注明出处。

    本文链接:https://www.idchg.com/info/20341.html

    分享给朋友:

    “时间轮算法在定时任务中的工作原理以及简单版本的quartz的实现” 的相关文章

    RackNerd优惠活动详解:如何享受高性价比虚拟主机和VPS折扣

    RackNerd是一家在2019年成立的美国主机商。虽然成立时间不久,它却迅速在市场上崭露头角,赢得了许多VPS用户的青睐。公司的数据中心分别位于洛杉矶、圣何塞、西雅图和纽约等地,这些地理位置的选择让它的服务在各个区域都有稳定的覆盖。从我个人的体验来说,RackNerd的性价比非常高,尤其在价格和服...

    Windows SSH Keygen 无法连接问题解决指南

    在现代网络环境中,SSH(Secure Shell)协议扮演着至关重要的角色,确保了远程登录的安全性与可靠性。在Windows操作系统中,了解SSH的基本知识是非常必要的。SSH不仅提供了加密的网络服务,还为我们在远程管理服务器时提供了安全的通道。 当我们谈到SSH的时候,首先想到的就是它的密钥认证...

    CMI香港:助力企业洞察市场与消费者需求的关键工具

    在了解CMI香港之前,首先需要弄清楚CMI的定义与作用。CMI,即客户市场信息(Customer Market Insight),专注于帮助企业深入理解市场动态与消费者需求。简单来说,CMI就像是企业在市场中找到导航指南,确保它们能够精准地把握客户的期望、习惯及其变化。 当我们把视角转向香港,相信大...

    如何实现Windows链接服务器的应用与配置

    在现代工作和生活中,远程连接的重要性日益凸显。Windows链接服务器作为一种强大的工具,帮助用户在不同的设备之间实现无缝的远程访问。它的定义其实就是这样一款可以让用户通过网络访问和管理远程Windows服务器的技术。这意味着无论是在办公室还是在家中,只要有网络连接,我都能方便地使用和维护我的服务器...

    腾讯云国际版:全球云服务的强者之选,满足您的业务需求

    腾讯云国际版概述 腾讯云国际版的定义与背景 腾讯云国际版是腾讯公司专为全球客户推出的一款云服务产品,旨在满足不同国家和地区用户的需求。它在功能上与国内版本相似,但根据国际市场的需求进行了优化,以确保服务的稳定性和流畅性。作为一名用户,我发现腾讯云国际版特别注重数据隐私和合规性,这对于希望拓展海外市场...

    VPS测速:优化服务器性能与用户体验的关键工具

    当谈到网络服务时,VPS(虚拟专用服务器)是一个绕不开的话题。它为用户提供了一个灵活且高效的网络环境,通过虚拟化技术将一台物理服务器分割成多个独立的虚拟服务器。用户可以拥有自己的操作系统、存储空间和带宽,这使得VPS成为了许多企业和个人的理想选择。 通过使用VPS,用户能够执行各种任务,比如搭建网站...