Quartz
Quartz是一个老牌的定时任务框架,目前在很多企业中目前还会用到,所以我们有必要讲解一下Quartz;
# 1 Quartz概述
Quartz 是一个功能强大的任务调度框架,它可以满足更多更复杂的调度需求,可以在JavaSE项目中使用,也可以在JavaEE的项目中使用,可以方便的和Spring,Spring Boot进行整合。是目前使用范围最广的定时任务框架。单体应用以及大型互联网项目中都可以看到他的身影。
特点:
- 方便和Spring及Spring Boot进行整合
- 支持集群
- 不支持分布式
# 2 Quartz在javaSE项目中使用
创建项目引入依赖
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2</version>
</dependency>
编写Job的实现类(任务)
public class MyJob implements Job {
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
System.out.println("任务执行了..." + new Date());
}
}
创建Scheduler(调度器)
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
编写JobDetail(封装任务)
JobDetail jb = JobBuilder.newJob(MyJob.class).withIdentity("myJob","group1").build();
编写Trigger(定义任务的触发时机)
Trigger trigger = TriggerBuilder.newTrigger().withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(3).repeatForever()).build();
注册JobDetail和Trigger到Schedule中;
scheduler.scheduleJob(jb,trigger);
scheduler.start();
编写quartz的配置文件(定义定时任务的配置信息)
#调度器名称
org.quartz.scheduler.instanceName=scheduler
#线程池,线程数量,配置成2,表示支持2个任务并行执行;
org.quartz.threadPool.threadCount=2
#任务信息存储在内存中
org.quartz.jobStore.class=org.quartz.simpl.RAMJobStore
# 3 体系结构图
Quartz 设计有四个核心类,分别是Scheduler(调度器)、Job(任务) 、Trigger(触发器)、JobDetail(任务详情),他们是使用Quartz的关键。
调度器作为作业的总指挥,触发器作为作业的操作者,作业为应用的功能模块,其关系如下图所示:
# Job接口
定时任务的接口,具体定时任务需要实现该接口
定义需要执行的任务,该类是一个接口,只定义了一个方法execute(JobExecutionContext context)
,在实现类的execute
方法中编写所需要定时执行的Job(任务),JobExcutionContext
类提供了调度应用的一些信息。Job运行时的信息保存在JobDataMap实例中。
public class MyJob implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
System.out.println("开始执行定时任务...");
}
}
# Trigger接口
负责设置调度策略,该类是一个接口,描述触发job执行的时间触发规则
有以下这些子类,其中经常用到的是cronTigger
# 公共属性
- triggerKey:表示Trigger身份的属性
- jobKey:Trigger触发时被执行的Job的身份
- startTime:Trigger第一次触发的时间
- endTime:Trigger失效的时间点
- 优先级(priority):如果Trigger很多,或者Quartz线程池的工作线程太少,Quartz可能没有足够的资源同时触发所有的Trigger,这种情况下,如果希望某些Trigger优先被触发,就需要给它设置优先级,Trigger默认的优先级为5,优先级priority属性的值可以是任意整数,正数、负数都可以。(只有同时触发的Trigger之间才会比较优先级)
# SimpleTrigger
指定从某一个时间开始,以一定时间间隔(单位:毫秒)执行的任务
关键属性
- repeatInterval:重复间隔
- repeatCount:重复次数,实际执行次数是repeatCount+1(因为在startTime的时候一定会执行一次)
代码示例
TriggerBuilder.newTrigger()
//设置Trigger的name以及group
.withIdentity("my_job_tigger", "my_job_tigger_group")
//trigger 开始生效时间
.startAt(new Date(System.currentTimeMillis() + 5000))
//调度策略
.withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(5).withRepeatCount(10))
//trigger开始失效时间
.endAt(new Date(System.currentTimeMillis() + 15000))
//任务名词
.forJob("自定义JOB")
.build();
# CalendarIntervalTrigger
类似于SimpleTrigger,指定从某一个时间开始,以一定的时间间隔执行的任务
但是不同的是SimpleTrigger指定的时间间隔为毫秒,没办法指定每隔一个月执行一次(每月的时间间隔不是固定值),而CalendarIntervalTrigger支持的间隔单位有秒,分钟,小时,天,月,年,星期
优点
- 更方便,比如每隔1小时执行,你不用自己去计算1小时等于多少毫秒
- 支持不是固定长度的间隔,比如间隔为月和年。但劣势是精度只能到秒
关键属性
- interval 执行间隔:intervalUnit 执行间隔的单位(秒,分钟,小时,天,月,年,星期)
代码示例
TriggerBuilder.newTrigger()
//设置Trigger的name以及group
.withIdentity("my_job_tigger", "my_job_tigger_group")
//trigger 开始生效时间,马上生效
.startNow()
//调度策略
.withSchedule(CalendarIntervalScheduleBuilder.calendarIntervalSchedule().withInterval(10, DateBuilder.IntervalUnit.SECOND))
//trigger开始失效时间
.endAt(new Date(System.currentTimeMillis() + 15000))
//任务名词
.forJob("calendar_tigger_test")
.build();
# DailyTimeIntervalTrigger
指定每天的某个时间段内,以一定的时间间隔执行任务,并且它可以支持指定星期
关键属性
- startTimeOfDay:每天开始时间
- endTimeOfDay:每天结束时间
- daysOfWeek:需要执行的星期
代码案例
TriggerBuilder.newTrigger()
//设置Trigger的name以及group
.withIdentity("my_job_tigger", "my_job_tigger_group")
//trigger 开始生效时间
.startNow()
//调度策略
.withSchedule(DailyTimeIntervalScheduleBuilder.dailyTimeIntervalSchedule()
//早上10点开始执行
.startingDailyAt(TimeOfDay.hourAndMinuteOfDay(10, 0))
//晚上8点停止执行
.endingDailyAt(TimeOfDay.hourAndMinuteOfDay(20, 0))
// 周一到周四执行,不写即每天执行
.onDaysOfTheWeek(DateBuilder.MONDAY, DateBuilder.TUESDAY, DateBuilder.WEDNESDAY, DateBuilder.THURSDAY)
//一小时执行一次
.withIntervalInHours(1)
//重复执行10次,总共执行11次
.withRepeatCount(10)
)
//trigger开始失效时间
.endAt(new Date(System.currentTimeMillis() + 15000))
//任务名词
.forJob("calendar_tigger_test")
.build();
# CronTrigger
适合于更复杂的任务,它支持类型于Linux Cron的语法(并且更强大)
代码案例
TriggerBuilder.newTrigger()
//设置Trigger的name以及group
.withIdentity("my_job_tigger", "my_job_tigger_group")
//trigger 开始生效时间
.startNow()
//调度策略 每隔5S执行一次
.withSchedule(CronScheduleBuilder.cronSchedule("*/5 * * * * ?"))
//trigger开始失效时间
.endAt(new Date(System.currentTimeMillis() + 15000))
//任务名词
.forJob("calendar_tigger_test")
.build();
# JobDetail
描述Job的实现类及其它相关的静态信息,如:Job名字、描述、关联监听器等信息
Quartz在每次执行Job时,都重新创建一个Job实例,所以它不直接接受一个Job的实例,相反它接收一个Job实现类,以便运行时通过newInstance()的反射机制实例化Job。
因此需要通过一个类来描述Job的实现类及其它相关的静态信息,如Job名字、描述、关联监听器等信息,JobDetail承担了这一角色,JobDetail 用来保存我们作业的详细信息。
一个JobDetail可以有多个Trigger,但是一个Trigger只能对应一个JobDetail
JobBuilder.newJob(MyJob.class).withIdentity("MyJob_1", "JobGroup_1").build();
# Scheduler
调度器就相当于一个容器,装载着任务和触发器
Scheduler负责管理Quartz的运行环境,Quartz它是基于多线程架构的,它启动的时候会初始化一套线程,这套线程会用来执行一些预置的作业。
Trigger和JobDetail可以注册到Scheduler中,Scheduler可以将Trigger绑定到某一JobDetail中,这样当Trigger触发时,对应的Job就被执行
Scheduler拥有一个SchedulerContext,它类似于ServletContext,保存着Scheduler上下文信息,Job和Trigger都可以访问SchedulerContext内的信息。Scheduler使用一个线程池作为任务运行的基础设施,任务通过共享线程池中的线程提高运行效率
# 创建调度器
Scheduler接口有两个实现类,分别为StdScheduler(标准默认调度器)和RemoteScheduler(远程调度器),我们重点介绍下StdScheduler实例,StdScheduler只提供了一个带参构造方法,此构造需要传递QuartzScheduler和SchedulingContext两个实例参数
public StdScheduler(QuartzScheduler sched, SchedulingContext schedCtxt)
然而我们一般不使用构造方法去创建调度器,而是通过调度器工厂来创建,调度器工厂接口SchedulerFactory
提供了两种不同类型的工厂实现,分别是DirectSchedulerFactory
和StdSchedulerFactory
而DirectSchedulerFactory
一般用的比较少,更多的场景下我们使用StdSchedulerFactory
工厂来创建
创建方式
StdSchedulerFactory提供三种方式创建调度器实例
- 通过java.util.Properties属性实例
- 通过外部属性文件提供
- 通过有属性文件内容的 java.io.InputStream 文件流提供
public static void main(String[] args) {
try {
StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
// 第一种方式 通过Properties属性实例创建
Properties props = new Properties();
props.put(StdSchedulerFactory.PROP_THREAD_POOL_CLASS, "org.quartz.simpl.SimpleThreadPool");
props.put("org.quartz.threadPool.threadCount", 5);
schedulerFactory.initialize(props);
// 第二种方式 通过传入文件名
// schedulerFactory.initialize("my.properties");
// 第三种方式 通过传入包含属性内容的文件输入流
// InputStream is = new FileInputStream(new File("my.properties"));
// schedulerFactory.initialize(is);
// 获取调度器实例
Scheduler scheduler = schedulerFactory.getScheduler();
} catch (Exception e) {
e.printStackTrace();
}
}
执行流程:
Trigger和JobDetail 注册到Scheduler,由Scheduler根据Trigger中的触发规则管理调度Job,
Scheduler每次调度任务都会通过反射 实例化一个Job对象,然后调用execute方法执行任务;
# 4 Job参数传递
# Job在运行时如何传递参数
Job实例是Scheduler通过反射创建的,有时候我们需要在Job运行的时候,需要传递参数,如何传递呢?
在JobDetail中定义Job运行传递的参数。
usingJobData("name","itcast")
在Job中就可以获取参数。
方式1:在通过Job的形参jobExecutionContext获取JobDetail,然后即可获取JobDetail定义参数
String name = jobExecutionContext.getJobDetail().getJobDataMap().getString("name");
方式2: 在Job中定义成员变量,并且提供set方法,Scheduler注入JobDetail中定义参数;
private String name;
public void setName(String name) {
this.name = name;
}
# Job运行时如何修改参数
在前面的课程中我们给大家讲解了,如何在Job运行的时候传递参数,可以通过在JobDetail中设置参数,在Job中获取参数。但是在Job运行过程中,可能需要对参数发生改变。
案例:第一次运行Job,传递name参数为itcast.后面运行希望是ithima;
System.out.println(Thread.currentThread().getName() + "任务开始执行..." + new Date());
System.out.println(name);
System.out.println(Thread.currentThread().getName() +"任务执行结束..." + new Date());
jobExecutionContext.getJobDetail().getJobDataMap().put("name","itheima");
@PersistJobDataAfterExecution
# 5 定时任务并发处理
什么是定时任务的并发呢?
场景1:假如我们有一个任务每隔5秒执行一次,3秒即可执行完毕,这种情况下当前任务是不存在并发的。
场景2:假如我们有一个任务每隔5秒执行一次,7秒才能执行完毕。这个时候,在5秒~7秒的时候及10秒到12秒的时候,当前的任务存在并发执行(也就是在内存中有2个线程同时执行任务);并发会导致业务的重复执行;
演示和解决方案:
System.out.println(Thread.currentThread().getName() + "任务开始执行..." + new Date());
try {
Thread.sleep(7000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() +"任务执行结束..." + new Date());
scheduler_Worker-1任务开始执行...Sun Jun 21 16:24:41 CST 2020 41秒开启一个新线程,第一次执行任务
scheduler_Worker-2任务开始执行...Sun Jun 21 16:24:46 CST 2020 46秒开启一个新线程,第二次执行任务
scheduler_Worker-1任务执行结束...Sun Jun 21 16:24:48 CST 2020 48秒新线1程执行任务结束
scheduler_Worker-3任务开始执行...Sun Jun 21 16:24:51 CST 2020 51秒开启一个新线程3执行任务
scheduler_Worker-2任务执行结束...Sun Jun 21 16:24:53 CST 2020 ...
scheduler_Worker-1任务开始执行...Sun Jun 21 16:24:56 CST 2020 ...
scheduler_Worker-3任务执行结束...Sun Jun 21 16:24:58 CST 2020 ...
scheduler_Worker-2任务开始执行...Sun Jun 21 16:25:01 CST 2020
scheduler_Worker-1任务执行结束...Sun Jun 21 16:25:03 CST 2020
scheduler_Worker-3任务开始执行...Sun Jun 21 16:25:06 CST 2020
@DisallowConcurrentExecution
scheduler_Worker-1任务开始执行...Sun Jun 21 16:30:23 CST 2020 23秒开启一个新线程,第一次执行任务
scheduler_Worker-1任务执行结束...Sun Jun 21 16:30:30 CST 2020 30秒新线1的任务执行完毕
scheduler_Worker-2任务开始执行...Sun Jun 21 16:30:30 CST 2020 30秒开启一个新线2第二次开始执行任务
按理说应该第二次执行任务在28秒执行 任务,但是由于28s第一次的任务没有执
行完毕,所以在30秒第一次任务执行
完成后,立即执行28秒的任务,
这个在Quartz中成为misfire,如果错 过了任务执行时机,下次立即执行
scheduler_Worker-2任务执行结束...Sun Jun 21 16:30:37 CST 2020
scheduler_Worker-3任务开始执行...Sun Jun 21 16:30:37 CST 2020
scheduler_Worker-3任务执行结束...Sun Jun 21 16:30:44 CST 2020
scheduler_Worker-1任务开始执行...Sun Jun 21 16:30:47 CST 2020
scheduler_Worker-1任务执行结束...Sun Jun 21 16:30:54 CST 2020
scheduler_Worker-2任务开始执行...Sun Jun 21 16:30:54 CST 2020
# 6 Trigger
# 6.1 Trigger的优先级
什么是Trigger的优先级,比如如果程序有需要有5个任务在某个时刻同时执行,但是线程池中线程的数量只有3个,优先执行那个Trigger对应的任务;
优先级使用时机:线程池中的线程数量小于同时要执行的任务的数量;
withPriority
# 6.2 SimpleTrigger
可以在指定的开始时间,指定的结束时间,以一定的时间间隔执行任务,也可以设置执行的次数;
Date startTime = DateBuilder.futureDate(10, DateBuilder.IntervalUnit.SECOND); //当前时间5s以后
Date endTime = DateBuilder.futureDate(3, DateBuilder.IntervalUnit.HOUR); //当前时间3h以后
Trigger trigger1 = TriggerBuilder.newTrigger().
startAt(startTime).
endAt(endTime).
withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(5).withRepeatCount(5)).build();
当前时间超过结束时间或者执行次数大于设置的次数,任务都将不会执行;
# 6.3 CronTrigger
可以通过Cron表达式设置触发时机的Trigger;
Trigger trigger1 = TriggerBuilder.newTrigger().
startAt(startTime).
endAt(endTime).
withSchedule(CronScheduleBuilder.cronSchedule("3,8,10,20 * * * * ?")).build();
# 6.4 Missfire
所谓的Missfire就是错过触发,本应该在某个时间点触发的任务,由于一些原因没有触发。Missfire就是用来解决这个问题的。
原因:
在某个时刻触发的任务超过线程池中线程的数量,导致优先级低的Trigger没有触发;
eg:在某个时刻触发了10个任务,但是线程池中线程的数量只有3个,有7个Trigger没有触发;
任务执行时长,超过触发的时间。
eg:每隔3秒执行一次任务,但是任务执行需要7秒;
CronTrigger和SimpleTrigger都定义了MissFire机制方法。本案例以CronTrigger为例
默认情况下Quartz认为超过5s,才认定错过触发;
org.quartz.jobStore.misfireThreshold=1000
withMisfireHandlingInstructionDoNothing():下一次触发时间进行;
withMisfireHandlingInstructionFireAndProceed():任务执行完毕立即执行;
withMisfireHandlingInstructionIgnoreMisfires():eg:一个每隔15秒触发一次的Trigger,如果5分钟没有触发分钟,一旦有机会触发,它将触发20次。
# 7 Spring Boot整合Quartz
在实际开发中使用Quartz,通常都会和spring框架进行整合。Quartz除了方便和Spring框架整合以外,也可以方便和Spring boot整合。
# (1)引入依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
</parent>
<!--quartz的起步依赖-->
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
spring:
quartz:
scheduler-name: myScheduler
# (2) 编写Job类
public class MyJob extends QuartzJobBean {
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
System.out.println("任务执行了" + new Date());
}
}
# (3) 编写配置类,将JobDetail和Trigger交由spring管理,在Trigger中需要注入JobDetail。
@Bean
public JobDetail jobDetail() {
return JobBuilder.
newJob(MyJob.class).
withIdentity("myJob", "group1").
storeDurably().
build();
}
@Bean
public Trigger trigger1(JobDetail jobDetail) {
return TriggerBuilder.newTrigger().
withIdentity("trigger1","group1").
startNow().
withSchedule(CronScheduleBuilder.cronSchedule("0/5 * * * * ?")).
forJob(jobDetail).
build();
}
# 8 将任务的相关信息存入数据库
# (1) 引入JPA和依赖和MySQL的驱动,底层是通过Spring Data Jpa生成相关表
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
# (2)yml文件中配置数据库的连接信息
spring:
quartz:
scheduler-name: myScheduler #schedular的名称
job-store-type: jdbc #Job的信息使用jdbc存储
jdbc:
initialize-schema: always #always是否每次启动服务都执行脚本创建新的数据库,never,如果已经创建好数据库
comment-prefix: "#" #sql脚本的注释
wait-for-jobs-to-complete-on-shutdown: true #停止scheduler需要等待任务执行完成
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/quartz?
useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
username: root
password: 123
# (3) 程序启动后根据数据库的连接信息,找到对应的数据库脚本执行,底层使用的就是Spring Data JPA
org/quartz/impl/jdbcjobstore/tables_mysql_innodb.sql
qrtz_cron_triggers------->CronTrigger信息
qrtz_simple_triggers------->SimpleTrigger信息 qrtz_job_details---------->JobDetail信息
qrtz_triggers--------->trigger和jobdetail关联信息
# 9 Quartz集群搭建
在实际开发中为了防止单点故障,通常我们都会搭建集群。但是作为定时任务的集群,我们需要考虑一个问题,就是如何防止任务重跑。
# 9.1 Quartz集群的特点
支持失败转移:当某个节点产生了宕机,任务会转移到其他可用的节点执行;
不支持分布式:任务是不可拆分的单元,一个任务只能由一个确定的节点执行;不能将任务再进行拆分。
# 9.2 Quartz集群的体系架构
这幅图是Quartz集群的体系架构,首先有一个数据库,Quartz集群是依赖于数据库的,Quartz服务需要从数据库中读取任务信息;这也就是为什么在上一节课我们需要将任务相关信息存储到数据库中;
再一个就是Quartz服务,每一个任务都会落在指定的服务中。eg:我们图中描述的任务A和任务B落在第一个Quartz服务中;任务D和任务E落在第二Quartz服务中,任务H和任务M落在第二Quartz服务中。
如果有一个服务产生宕机,eg:第一个Quartz服务宕机,那么任务A和任务B就会落在第二个和第三个Quartz服务上;具体落在那个服务由Quartz来确定;(这个就是Quartz集群的整体性架构)
# 9.2 Quartz集群方案
# 初始化数据库
docker run -itd --name mysql-quartz -p 3306:3306 -v /opt/scheduleTask/quartz:/opt -e MYSQL_ROOT_PASSWORD=123456 mysql:5.7
docker exec -it mysql-quartz bash
mysql> create database quartz default charset 'utf8';
mysql> use quartz;
mysql> source /opt/tables_mysql.sql
为Quartz集群依赖于数据库,所以必须首先创建Quartz数据库表,Quartz发布包中包括了所有被支持的数据库平台的SQL脚本
这些SQL脚本存放于<quartz_home>/docs/dbTables 目录下找到对应数据库的SQL文件这里采用的是tables_mysql.sql
对应表简单含义如下
表明 | 功能 |
---|---|
QRTZ_CALENDARS | 以 Blob 类型存储 Quartz 的 Calendar 信息 |
QRTZ_CRON_TRIGGERS | 存储 Cron Trigger,包括 Cron 表达式和时区信息 |
QRTZ_FIRED_TRIGGERS | 存储与已触发的 Trigger 相关的状态信息,以及相联 Job 的执行信息 |
QRTZ_PAUSED_TRIGGER_GRPS | 存储已暂停的 Trigger 组的信息 |
QRTZ_SCHEDULER_STATE | 存储少量的有关 Scheduler 的状态信息,和别的 Scheduler 实例(假如是用于一个集群中) |
QRTZ_LOCKS | 存储程序的悲观锁的信息(假如使用了悲观锁) |
QRTZ_JOB_DETAILS | 存储每一个已配置的 Job 的详细信息 |
QRTZ_SIMPLE_TRIGGERS | 存储简单的 Trigger,包括重复次数,间隔,以及已触发的次数 |
QRTZ_BLOG_TRIGGERS | Trigger 作为 Blob 类型存储(用于 Quartz 用户用 JDBC 创建他们自己定制的 Trigger 类型,JobStore 并不知道如何存储实例的时候) |
QRTZ_TRIGGER_LISTENERS | 存储已配置的 TriggerListener 的信息 |
QRTZ_TRIGGERS | 存储已配置的 Trigger 的信息 |
# 引入pom
将需要的pom文件引入
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.2.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-lang/commons-lang -->
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
# 编辑quartz.properties
# Default Properties file for use by StdSchedulerFactory
# to create a Quartz Scheduler Instance, if a different
# properties file is not explicitly specified.
#
#集群配置
org.quartz.scheduler.instanceName: DefaultQuartzScheduler
org.quartz.scheduler.rmi.export: false
org.quartz.scheduler.rmi.proxy: false
org.quartz.scheduler.wrapJobExecutionInUserTransaction: false
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 10
org.quartz.threadPool.threadPriority: 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
org.quartz.jobStore.misfireThreshold: 60000
#============================================================================
# Configure JobStore
#============================================================================
#默认配置,数据保存到内存
#org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore
#持久化配置
org.quartz.jobStore.class:org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass:org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.useProperties:true
#数据库表前缀
org.quartz.jobStore.tablePrefix:qrtz_
org.quartz.jobStore.dataSource:qzDS
#============================================================================
# Configure Datasources
#============================================================================
#JDBC驱动
org.quartz.dataSource.qzDS.driver:com.mysql.jdbc.Driver
org.quartz.dataSource.qzDS.URL:jdbc:mysql://192.168.10.30:3306/quartz
org.quartz.dataSource.qzDS.user:root
org.quartz.dataSource.qzDS.password:123456
org.quartz.dataSource.qzDS.maxConnection:10
# 整合SpringBoot
# 注册Quartz注册工厂
该类是将quartz自己创建的类交给spring进行管理以及自动注入
@Component
public class QuartzJobFactory extends AdaptableJobFactory {
@Autowired
private AutowireCapableBeanFactory capableBeanFactory;
@Override
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
//调用父类的方法
Object jobInstance = super.createJobInstance(bundle);
//进行注入
capableBeanFactory.autowireBean(jobInstance);
return jobInstance;
}
}
# 注册调度工厂
@Configuration
public class QuartzConfig {
@Autowired
private QuartzJobFactory jobFactory;
@Bean
public SchedulerFactoryBean schedulerFactoryBean() throws IOException {
//获取配置属性
PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties"));
//在quartz.properties中的属性被读取并注入后再初始化对象
propertiesFactoryBean.afterPropertiesSet();
//创建SchedulerFactoryBean
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setQuartzProperties(propertiesFactoryBean.getObject());
factory.setJobFactory(jobFactory);//支持在JOB实例中注入其他的业务对象
factory.setApplicationContextSchedulerContextKey("applicationContextKey");
factory.setWaitForJobsToCompleteOnShutdown(true);//这样当spring关闭时,会等待所有已经启动的quartz job结束后spring才能完全shutdown。
factory.setOverwriteExistingJobs(false);//是否覆盖己存在的Job
factory.setStartupDelay(10);//QuartzScheduler 延时启动,应用启动完后 QuartzScheduler 再启动
return factory;
}
/**
* 通过SchedulerFactoryBean获取Scheduler的实例
*
* @return
* @throws IOException
* @throws SchedulerException
*/
@Bean(name = "scheduler")
public Scheduler scheduler() throws IOException, SchedulerException {
Scheduler scheduler = schedulerFactoryBean().getScheduler();
return scheduler;
}
}
# 配置Quartz数据源
默认 Quartz 的数据连接池是 c3p0,由于性能不太稳定,不推荐使用,因此我们将其改成
driud
数据连接池
public class DruidConnectionProvider implements ConnectionProvider {
/**
* 常量配置,与quartz.properties文件的key保持一致(去掉前缀),同时提供set方法,Quartz框架自动注入值。
*
* @return
* @throws SQLException
*/
//JDBC驱动
public String driver;
//JDBC连接串
public String URL;
//数据库用户名
public String user;
//数据库用户密码
public String password;
//数据库最大连接数
public int maxConnection;
//数据库SQL查询每次连接返回执行到连接池,以确保它仍然是有效的。
public String validationQuery;
private boolean validateOnCheckout;
private int idleConnectionValidationSeconds;
public String maxCachedStatementsPerConnection;
private String discardIdleConnectionsSeconds;
public static final int DEFAULT_DB_MAX_CONNECTIONS = 10;
public static final int DEFAULT_DB_MAX_CACHED_STATEMENTS_PER_CONNECTION = 120;
//Druid连接池
private DruidDataSource datasource;
@Override
public Connection getConnection() throws SQLException {
return datasource.getConnection();
}
@Override
public void shutdown() throws SQLException {
datasource.close();
}
@Override
public void initialize() throws SQLException {
if (this.URL == null) {
throw new SQLException("DBPool could not be created: DB URL cannot be null");
}
if (this.driver == null) {
throw new SQLException("DBPool driver could not be created: DB driver class name cannot be null!");
}
if (this.maxConnection < 0) {
throw new SQLException("DBPool maxConnectins could not be created: Max connections must be greater than zero!");
}
datasource = new DruidDataSource();
try {
datasource.setDriverClassName(this.driver);
} catch (Exception e) {
try {
throw new SchedulerException("Problem setting driver class name on datasource: " + e.getMessage(), e);
} catch (SchedulerException e1) {
}
}
datasource.setUrl(this.URL);
datasource.setUsername(this.user);
datasource.setPassword(this.password);
datasource.setMaxActive(this.maxConnection);
datasource.setMinIdle(1);
datasource.setMaxWait(0);
datasource.setMaxPoolPreparedStatementPerConnectionSize(DEFAULT_DB_MAX_CONNECTIONS);
if (this.validationQuery != null) {
datasource.setValidationQuery(this.validationQuery);
if (!this.validateOnCheckout) {
datasource.setTestOnReturn(true);
} else {
datasource.setTestOnBorrow(true);
}
datasource.setValidationQueryTimeout(this.idleConnectionValidationSeconds);
}
}
public String getDriver() {
return driver;
}
public void setDriver(String driver) {
this.driver = driver;
}
public String getURL() {
return URL;
}
public void setURL(String URL) {
this.URL = URL;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public int getMaxConnection() {
return maxConnection;
}
public void setMaxConnection(int maxConnection) {
this.maxConnection = maxConnection;
}
public String getValidationQuery() {
return validationQuery;
}
public void setValidationQuery(String validationQuery) {
this.validationQuery = validationQuery;
}
public boolean isValidateOnCheckout() {
return validateOnCheckout;
}
public void setValidateOnCheckout(boolean validateOnCheckout) {
this.validateOnCheckout = validateOnCheckout;
}
public int getIdleConnectionValidationSeconds() {
return idleConnectionValidationSeconds;
}
public void setIdleConnectionValidationSeconds(int idleConnectionValidationSeconds) {
this.idleConnectionValidationSeconds = idleConnectionValidationSeconds;
}
public DruidDataSource getDatasource() {
return datasource;
}
public void setDatasource(DruidDataSource datasource) {
this.datasource = datasource;
}
public String getDiscardIdleConnectionsSeconds() {
return discardIdleConnectionsSeconds;
}
public void setDiscardIdleConnectionsSeconds(String discardIdleConnectionsSeconds) {
this.discardIdleConnectionsSeconds = discardIdleConnectionsSeconds;
}
}
创建完成之后,还需要在
quartz.properties
配置文件中设置以下数据源
#数据库连接池,将其设置为druid
org.quartz.dataSource.qzDS.connectionProvider.class=cn.itcast.config.DruidConnectionProvider
{
"jobName":"myJob",
"groupName":"default",
"jobClass":"cn.itcast.quartz.MyJob",
"cronExpression":"0/5 * * * * ?",
"param":{
"hello":"world"
}
}
# 任务管理
默认quartz的功能是有限的,我们可以自己实现quartz的任务管理,比如添加、删除、暂停、运行定时任务
# 管理接口
该接口是定时任务的管理接口,可以对定时任务进行管理
public interface QuartzJobService {
/**
* 添加任务可以传参数
* @param clazzName
* @param jobName
* @param groupName
* @param cronExp
* @param param
*/
void addJob(String clazzName, String jobName, String groupName, String cronExp, Map<String, Object> param);
/**
* 暂停任务
* @param jobName
* @param groupName
*/
void pauseJob(String jobName, String groupName);
/**
* 恢复任务
* @param jobName
* @param groupName
*/
void resumeJob(String jobName, String groupName);
/**
* 立即运行一次定时任务
* @param jobName
* @param groupName
*/
void runOnce(String jobName, String groupName);
/**
* 更新任务
* @param jobName
* @param groupName
* @param cronExp
* @param param
*/
void updateJob(String jobName, String groupName, String cronExp, Map<String, Object> param);
/**
* 删除任务
* @param jobName
* @param groupName
*/
void deleteJob(String jobName, String groupName);
/**
* 启动所有任务
*/
void startAllJobs();
/**
* 暂停所有任务
*/
void pauseAllJobs();
/**
* 恢复所有任务
*/
void resumeAllJobs();
/**
* 关闭所有任务
*/
void shutdownAllJobs();
}
# 管理实现类
该类是定时任务的具体实现,是实现了quartz的各种操作
@Service
public class QuartzJobServiceImpl implements QuartzJobService {
private static final Logger log = LoggerFactory.getLogger(QuartzJobServiceImpl.class);
@Autowired
private Scheduler scheduler;
@Override
public void addJob(String clazzName, String jobName, String groupName, String cronExp, Map<String, Object> param) {
try {
// 启动调度器,默认初始化的时候已经启动
// scheduler.start();
//构建job信息
Class<? extends Job> jobClass = (Class<? extends Job>) Class.forName(clazzName);
JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(jobName, groupName).build();
//表达式调度构建器(即任务执行的时间)
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExp);
//按新的cronExpression表达式构建一个新的trigger
CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(jobName, groupName).withSchedule(scheduleBuilder).build();
//获得JobDataMap,写入数据
if (param != null) {
trigger.getJobDataMap().putAll(param);
}
scheduler.scheduleJob(jobDetail, trigger);
} catch (Exception e) {
log.error("创建任务失败", e);
}
}
@Override
public void pauseJob(String jobName, String groupName) {
try {
scheduler.pauseJob(JobKey.jobKey(jobName, groupName));
} catch (SchedulerException e) {
log.error("暂停任务失败", e);
}
}
@Override
public void resumeJob(String jobName, String groupName) {
try {
scheduler.resumeJob(JobKey.jobKey(jobName, groupName));
} catch (SchedulerException e) {
log.error("恢复任务失败", e);
}
}
@Override
public void runOnce(String jobName, String groupName) {
try {
scheduler.triggerJob(JobKey.jobKey(jobName, groupName));
} catch (SchedulerException e) {
log.error("立即运行一次定时任务失败", e);
}
}
@Override
public void updateJob(String jobName, String groupName, String cronExp, Map<String, Object> param) {
try {
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, groupName);
CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
if (cronExp != null) {
// 表达式调度构建器
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExp);
// 按新的cronExpression表达式重新构建trigger
trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();
}
//修改map
if (param != null) {
trigger.getJobDataMap().putAll(param);
}
// 按新的trigger重新设置job执行
scheduler.rescheduleJob(triggerKey, trigger);
} catch (Exception e) {
log.error("更新任务失败", e);
}
}
@Override
public void deleteJob(String jobName, String groupName) {
try {
//暂停、移除、删除
scheduler.pauseTrigger(TriggerKey.triggerKey(jobName, groupName));
scheduler.unscheduleJob(TriggerKey.triggerKey(jobName, groupName));
scheduler.deleteJob(JobKey.jobKey(jobName, groupName));
} catch (Exception e) {
log.error("删除任务失败", e);
}
}
@Override
public void startAllJobs() {
try {
scheduler.start();
} catch (Exception e) {
log.error("开启所有的任务失败", e);
}
}
@Override
public void pauseAllJobs() {
try {
scheduler.pauseAll();
} catch (Exception e) {
log.error("暂停所有任务失败", e);
}
}
@Override
public void resumeAllJobs() {
try {
scheduler.resumeAll();
} catch (Exception e) {
log.error("恢复所有任务失败", e);
}
}
@Override
public void shutdownAllJobs() {
try {
if (!scheduler.isShutdown()) {
// 需谨慎操作关闭scheduler容器
// scheduler生命周期结束,无法再 start() 启动scheduler
scheduler.shutdown(true);
}
} catch (Exception e) {
log.error("关闭所有的任务失败", e);
}
}
}
# API接口
通过实现该接口可以通过外部API对定时任务进行管理
@RestController
@RequestMapping("/quartz")
public class QuartzController {
private static final Logger log = LoggerFactory.getLogger(QuartzController.class);
@Autowired
private QuartzJobService quartzJobService;
/**
* 添加新任务
*
* @param configDTO
* @return
*/
@RequestMapping("/addJob")
public Object addJob(@RequestBody QuartzConfigDTO configDTO) {
quartzJobService.addJob(configDTO.getJobClass(), configDTO.getJobName(), configDTO.getGroupName(), configDTO.getCronExpression(), configDTO.getParam());
return HttpStatus.OK;
}
/**
* 暂停任务
*
* @param configDTO
* @return
*/
@RequestMapping("/pauseJob")
public Object pauseJob(@RequestBody QuartzConfigDTO configDTO) {
quartzJobService.pauseJob(configDTO.getJobName(), configDTO.getGroupName());
return HttpStatus.OK;
}
/**
* 恢复任务
*
* @param configDTO
* @return
*/
@RequestMapping("/resumeJob")
public Object resumeJob(@RequestBody QuartzConfigDTO configDTO) {
quartzJobService.resumeJob(configDTO.getJobName(), configDTO.getGroupName());
return HttpStatus.OK;
}
/**
* 立即运行一次定时任务
*
* @param configDTO
* @return
*/
@RequestMapping("/runOnce")
public Object runOnce(@RequestBody QuartzConfigDTO configDTO) {
quartzJobService.runOnce(configDTO.getJobName(), configDTO.getGroupName());
return HttpStatus.OK;
}
/**
* 更新任务
*
* @param configDTO
* @return
*/
@RequestMapping("/updateJob")
public Object updateJob(@RequestBody QuartzConfigDTO configDTO) {
quartzJobService.updateJob(configDTO.getJobName(), configDTO.getGroupName(), configDTO.getCronExpression(), configDTO.getParam());
return HttpStatus.OK;
}
/**
* 删除任务
*
* @param configDTO
* @return
*/
@RequestMapping("/deleteJob")
public Object deleteJob(@RequestBody QuartzConfigDTO configDTO) {
quartzJobService.deleteJob(configDTO.getJobName(), configDTO.getGroupName());
return HttpStatus.OK;
}
/**
* 启动所有任务
*
* @return
*/
@RequestMapping("/startAllJobs")
public Object startAllJobs() {
quartzJobService.startAllJobs();
return HttpStatus.OK;
}
/**
* 暂停所有任务
*
* @return
*/
@RequestMapping("/pauseAllJobs")
public Object pauseAllJobs() {
quartzJobService.pauseAllJobs();
return HttpStatus.OK;
}
/**
* 恢复所有任务
*
* @return
*/
@RequestMapping("/resumeAllJobs")
public Object resumeAllJobs() {
quartzJobService.resumeAllJobs();
return HttpStatus.OK;
}
/**
* 关闭所有任务
*
* @return
*/
@RequestMapping("/shutdownAllJobs")
public Object shutdownAllJobs() {
quartzJobService.shutdownAllJobs();
return HttpStatus.OK;
}
}
# 测试
可以通过Postman通过接口动态对定时任务进行管理
# 添加定时任务
通过PostMan添加任务
添加完成后,可以在控制台看到任务正在执行
补充其他
(一):任务信息的存储必须选择jdbc,而不能是memory。集群的节点之间需要依靠数据库来共享任务信息,并且使用数据库作为分布式锁。
(二): 必须在配置文件中开启集群。
保证多个服务scheduler-name名称一致;
properties: #集群节点的ID必须唯一,由quartz自动生成 org.quartz.scheduler.instanceId: AUTO #通知服务加入集群 org.quartz.jobStore.isClustered: true
(三):必须生成任务相关的表;
(四):启动2个服务
-Dserver.port=8888
# 9.3 Quartz集群中运行多个任务
public class MyJob2 extends QuartzJobBean {
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
System.out.println("任务2执行了" + new Date());
}
}
@Bean
public JobDetail jobDetail2() {
return JobBuilder.
newJob(MyJob2.class).
withIdentity("myJob2", "group1").
storeDurably().
build();
}
@Bean
public Trigger trigger2(JobDetail jobDetail2) {
return TriggerBuilder.newTrigger().
withIdentity("trigger2","group1").
startNow().
withSchedule(CronScheduleBuilder.cronSchedule("0/3 * * * * ?")).
forJob(jobDetail2).
build();
}
# 9.3 Quartz集群原理
# (一) Quartz集群原理如何保证,集群环境下防止任务重复执行。
1.1 每一个Quartz服务只有一个Scheduler实例;
1.2 每一个服务Scheduler实例要想执行任务,获取qrtz_locks表中行锁。
锁被占用;需要等待其他服务Scheduler释放锁;
获取到锁:
获取qrtz_triggers表中Waiting状态,并且即将出发的trigger;
将Trigger状态由Waiting修改为Acquired
将trigger信息插入qrtz_fire_triggers表
public static final String SELECT_FOR_LOCK = "SELECT * FROM " + TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST + " AND " + COL_LOCK_NAME + " = ? FOR UPDATE";
1.3 qrtz_fired_triggers 修改为executing
执行Job
修改qrtz_triggers中下次触发时间
将状态修改为Waiting
# (二) 故障判断
每个Quartz服务中Scheduler会定时(org.quartz.jobStore.clusterCheckinInterval这个时间)更新qrtz_scheduler_state表中的LAST_CHECK_TIME字段,当集群的一个Scheduler执行更新时,他会查看是否有其他节点Scheduler实例在到达他们预期的时间还未更新,则认为该节点故障。
# (三)失败迁移
检测到故障节点,就会更新触发器的状态,状态更新如下:
删除qrtz_scheduler_state表中对应的schedular信息;将任务可以由其他schedular执行