定时任务的应用场景十分广泛,如定时清理文件、定时生成报表、定时数据同步备份等。
jdk自带的库中,有两种技术可以实现定时任务,一种是Timer,另一种是
ScheduledThreadPoolExecutor
Timer是一个线程,控制执行TimerTask所需要执行的内容
public class Timer {
/**
* The timer task queue. This data structure is shared with the timer
* thread. The timer produces tasks, via its various schedule calls,
* and the timer thread consumes, executing timer tasks as Appropriate,
* and removing them from the queue when they're obsolete.
*/
private final TaskQueue queue = new TaskQueue();
/**
* The timer thread.
*/
private final TimerThread thread = new TimerThread(queue);
。。。。。。
}
其中,需要注意,Timer类有几个方法创建不同的线程执行:
//其中的delay是延时时间,表示多少毫秒后执行一次task
public void schedule(TimerTask task, long delay) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
sched(task, System.currentTimeMillis()+delay, 0);
}
//到达指定时间time的时候执行一次task
public void schedule(TimerTask task, Date time) {
sched(task, time.getTime(), 0);
}
//经过delay毫秒后按每period毫秒执行一次的周期执行task
public void schedule(TimerTask task, long delay, long period) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, System.currentTimeMillis()+delay, -period);
}
//到达指定时间firstTime之后按照每period毫秒执行一次的周期执行task
public void schedule(TimerTask task, Date firstTime, long period) {
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, firstTime.getTime(), -period);
}
TimerTask是一个实现了Runable接口的类,所以能够放到线程去执行:
public abstract class TimerTask implements Runnable {
/**
* This object is used to control access to the TimerTask internals.
*/
final Object lock = new Object();
。。。。。。
}
示例:
public class JavaTimerJob {
public static void mAIn(String[] args) {
Timer timer = new Timer();
Task task = new Task();
//当前时间开始,每1秒执行一次
timer.schedule(task, new Date(),1000);
}
}
class Task extends TimerTask {
@Override
public void run() {
System.out.println(new Date()+": This is my job...");
}
}
执行结果:
Tue May 30 13:45:47 CST 2022: This is my job...
Tue May 30 13:45:48 CST 2022: This is my job...
Tue May 30 13:45:49 CST 2022: This is my job...
Tue May 30 13:45:50 CST 2022: This is my job...
。。。。
弊端:Timer是单线程的,一旦定时任务中某一过程时刻抛出异常,将会导致整体线程停止,定时任务停止。
继承了ThreadPoolExecutor,,是一个基于线程池的调度器 通过实现ScheduledExecutorService接口方法去实现任务调度,主要方法如下:
//command是待执行的线程,delay表示延时时长,unit代表时间单位
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
//command是待执行的线程,initialDelay表示延时时长,period代表执行间隔时长,unit代表时间单位
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
//command是待执行的线程,initialDelay表示延时时长,delay代表每次执行线程前的延时时长,unit代表时间单位
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
示例:
public class JavaScheduledThreadPoolExecutor {
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(8);
//延时1秒后开始执行,每3秒执行一次
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println(new Date()+": This is my job...");
}
}, 1, 3, TimeUnit.SECONDS);
}
}
执行结果:
Tue May 30 15:05:16 CST 2022: This is my job...
Tue May 30 15:05:19 CST 2022: This is my job...
Tue May 30 15:05:22 CST 2022: This is my job...
Tue May 30 15:05:25 CST 2022: This is my job...
。。。。。
Timer
ScheduledThreadPoolExecutor
Spring原生定时任务主要依靠@Scheduled注解实现:
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Repeatable(Schedules.class)
public @interface Scheduled {
String CRON_DISABLED = "-";
String cron() default ""; //类似于corn表达式,可以指定定时任务执行的延迟及周期规则
String zone() default ""; //指明解析cron表达式的时区。
long fixedDelay() default -1; //在最后一次调用结束和下一次调用开始之间以固定周期(以毫秒为单位)执行带注解的方法。(要等待上次任务完成后)
String fixedDelayString() default ""; //同上面作用一样,只是String类型
long fixedRate() default -1; //在调用之间以固定的周期(以毫秒为单位)执行带注解的方法。(不需要等待上次任务完成)
String fixedRateString() default ""; //同上面作用一样,只是String类型
long initialDelay() default -1; //第一次执行fixedRate()或fixedDelay()任务之前延迟的毫秒数 。
String initialDelayString() default ""; //同上面作用一样,只是String类型
}
Spring静态定时任务示例:
@Slf4j
@Component
public class TestJob {
//每40秒执行一次
@Scheduled(cron = "0/40 * * * * ?")
public void logJob(){
if(log.isDebugEnabled()){
log.debug("现在是:{}",LocalDateTime.now());
}
}
}
执行结果:
现在是:2022-05-30T16:03:40.006
现在是:2022-05-30T16:04
现在是:2022-05-30T16:04:40.003
①项目启动扫描带有注解@Scheduled的所有方法信息由
ScheduledAnnotationBeanPostProcessor的postProcessAfterInitialization方法实现功能:
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
bean instanceof ScheduledExecutorService) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
if (!this.nonAnnotatedClasses.contains(targetClass)) {
//获取定时任务的方法
Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
method, Scheduled.class, Schedules.class);
return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
});
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(targetClass);
if (logger.isTraceEnabled()) {
logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
}
}
else {
// Non-empty set of methods
annotatedMethods.forEach((method, scheduledMethods) ->
//调用processScheduled方法将定时任务方法存放到任务队列中
scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
if (logger.isTraceEnabled()) {
logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
"': " + annotatedMethods);
}
}
}
return bean;
}
②调用processScheduled方法将定时任务方法存放到任务队列中
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
try {
//创建任务线程
Runnable runnable = createRunnable(bean, method);
boolean processedSchedule = false;
String errorMessage =
"Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
//解析任务执行初始延迟
long initialDelay = scheduled.initialDelay();
String initialDelayString = scheduled.initialDelayString();
if (StringUtils.hasText(initialDelayString)) {
Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
if (this.embeddedValueResolver != null) {
initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
}
if (StringUtils.hasLength(initialDelayString)) {
try {
initialDelay = parseDelayAsLong(initialDelayString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid initialDelayString value "" + initialDelayString + "" - cannot parse into long");
}
}
}
//解析cron表达式
String cron = scheduled.cron();
if (StringUtils.hasText(cron)) {
String zone = scheduled.zone();
if (this.embeddedValueResolver != null) {
cron = this.embeddedValueResolver.resolveStringValue(cron);
zone = this.embeddedValueResolver.resolveStringValue(zone);
}
if (StringUtils.hasLength(cron)) {
Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
processedSchedule = true;
if (!Scheduled.CRON_DISABLED.equals(cron)) {
TimeZone timeZone;
if (StringUtils.hasText(zone)) {
timeZone = StringUtils.parseTimeZoneString(zone);
}
else {
timeZone = TimeZone.getDefault();
}
tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
}
}
}
// At this point we don't need to differentiate between initial delay set or not anymore
if (initialDelay < 0) {
initialDelay = 0;
}
//解析fixedDelay参数
long fixedDelay = scheduled.fixedDelay();
if (fixedDelay >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
//存放任务到任务队列中
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
String fixedDelayString = scheduled.fixedDelayString();
if (StringUtils.hasText(fixedDelayString)) {
if (this.embeddedValueResolver != null) {
fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
}
if (StringUtils.hasLength(fixedDelayString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {
fixedDelay = parseDelayAsLong(fixedDelayString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedDelayString value "" + fixedDelayString + "" - cannot parse into long");
}
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
}
//解析fixedRate参数
long fixedRate = scheduled.fixedRate();
if (fixedRate >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
String fixedRateString = scheduled.fixedRateString();
if (StringUtils.hasText(fixedRateString)) {
if (this.embeddedValueResolver != null) {
fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
}
if (StringUtils.hasLength(fixedRateString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {
fixedRate = parseDelayAsLong(fixedRateString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedRateString value "" + fixedRateString + "" - cannot parse into long");
}
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
}
// 断言检查
Assert.isTrue(processedSchedule, errorMessage);
//并发控制将任务队列存入注册任务列表
synchronized (this.scheduledTasks) {
Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
regTasks.addAll(tasks);
}
}
catch (IllegalArgumentException ex) {
throw new IllegalStateException(
"Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
}
}
③将任务解析并添加到任务队列后,交由ScheduledTaskRegistrar类的scheduleTasks方法添加(注册)定时任务到环境中:
protected void scheduleTasks() {
if (this.taskScheduler == null) {
//获取ScheduledExecutorService对象,实际上都是使用ScheduledThreadPoolExecutor执行定时任务调度
this.localExecutor = Executors.newSingleThreadScheduledExecutor();
this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
}
if (this.triggerTasks != null) {
for (TriggerTask task : this.triggerTasks) {
addScheduledTask(scheduleTriggerTask(task));
}
}
if (this.cronTasks != null) {
for (CronTask task : this.cronTasks) {
addScheduledTask(scheduleCronTask(task));
}
}
if (this.fixedRateTasks != null) {
for (IntervalTask task : this.fixedRateTasks) {
addScheduledTask(scheduleFixedRateTask(task));
}
}
if (this.fixedDelayTasks != null) {
for (IntervalTask task : this.fixedDelayTasks) {
addScheduledTask(scheduleFixedDelayTask(task));
}
}
}
private void addScheduledTask(@Nullable ScheduledTask task) {
if (task != null) {
this.scheduledTasks.add(task);
}
}
由上述源码可以看出,Spring原生定时任务的大概步骤如下: 1.扫描带@Scheduled注解的类和方法(
ScheduledAnnotationBeanPostProcessor.postProcessAfterInitialization(........)) 2.将定时任务解析完成后加入任务队列(ScheduledAnnotationBeanPostProcessor.processScheduled(........)) 3.将定时任务注册到当前运行环境,等待执行(ScheduledTaskRegistrar.scheduleTasks(.......)) 且@Scheduled的底层调度实现是ScheduledThreadPoolExecutor
作者:MrDong先生
链接:
https://juejin.cn/post/7111992569034178574
来源:稀土掘金