Elastic-Job介绍
ElasticJob 诞生于 2015 年,当时业界虽然有 QuartZ 等出类拔萃的定时任务框架,但缺乏分布式方面的探索
分布式调度云平台产品的缺失,使得 ElasticJob 从出现便备受关注,它有效的弥补了作业在分布式领域的短板,并且提供了一站式的自动化运维管控端。
# quartz的不足
我们上面详细的介绍了 Quartz 的架构原理以及应用实践,虽然 Quartz 也可以通过集群方式来保证服务高可用,但是它也有弊端,那就是服务节点数量的增加,并不能提升任务的执行效率,即不能实现水平扩展!
之所以产生这样的结果,是因为 Quartz 在分布式集群环境下是通过数据库锁方式来实现有且只有一个有效的服务节点来运行服务,从而保证服务在集群环境下定时任务不会被重复调用!
如果需要运行的定时任务很少的话,使用 Quartz 不会有太大的问题,但是如果 现在有这么一个需求,例如一个考勤系统,每天9点需要对昨天的用户打卡进行统计,计算是否正常出勤,涉及到各种审批,请假等等,如果一个公司有几十万员工,如果在一个实例上面运行,可能需要连续跑上好几天才能完成任务。
类似这样场景还有很多很多,很显然 Quartz 很难满足我们这种大批量、任务执行周期长的任务调度!
ElasticJob 在技术选型时,选择站在了巨人的肩膀上而不是重复制造轮子的理念,将定时任务事实标准的 QuartZ 与 分布式协调的利器 ZooKeeper 完美结合,快速而稳定的搭建了全新概念的分布式调度框架。
# 基本介绍
Elastic-Job提供了一种轻量级,无中心化解决方案。
没有统一的调度中心,集群的每个节点都是对等的, 节点之间通过注册中心进行分布式协调。E-Job 存在主节点的概念,但是主节点没有调度 的功能,而是用于处理一些集中式任务,如分片,清理运行时信息等。
Elastic-Job 最开始只有一个 elastic-job-core 的项目,在 2.X 版本以后主要分为 Elastic-Job-Lite 和 Elastic-Job-Cloud 两个子项目
Elastic-Job-Lite 定位为轻量级无中心化解决方案,使用 jar 包的形式提供分布式任务的协调服务。而 Elastic-Job-Cloud 使用 Mesos + Docker 的解决方案,额外提供资源治理、应用分发以及进程隔离等服务,跟 Lite 的区别只是部署方式不同,他们使用相同的 API,只要开发一次
比较项 | Elastic-Job-Lite | Elastic-Job-Cloud |
---|---|---|
无中心化 | 是 | 否 |
资源分配 | 不支持 | 支持 |
作业模式 | 常驻 | 常驻 + 瞬时 |
部署依赖 | ZooKeeper | ZooKeeper + Mesos |
# 功能列表
- 分布式调度协调
- 弹性扩容缩容
- 失效转移
- 错过执行作业重触发
- 作业分片一致性,保证同一分片在分布式环境中仅一个执行实例
- 自诊断并修复分布式不稳定造成的问题
- 支持并行调度
- 支持作业生命周期操作
- 丰富的作业类型
- Spring整合以及命名空间提供
- 运维平台
# 整体架构图
# App
应用程序,内部包含任务执行业务逻辑和Elastic-Job-Lite组件,其中执行任务需要实现ElasticJob接口完成,与Elastic-Job-Lite组件的集成,并进行任务的相关配置。应用程序可启动多个实例,也就出现了多个任务执行实例
# Elastic-Job-Lite
Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务,此组件负责任务的调度,并产生日志及任务调度记录。 无中心化,是指没有调度中心这一概念,每个运行在集群中的作业服务器都是对等的,各个作业节点是自治的、平等的、节点之间通过注册中心进行分布式协调
# Registry
以Zookeeper作为Elastic-Job的注册中心组件,存储了执行任务的相关信息,同时Elastic-Job利用该 组件进行执行任务实例的选举
# Console
Elastic-Job提供了运维平台,它通过读取Zookeeper数据展现任务执行状态,或更新Zookeeper数据修 改全局配置,通过Elastic-Job-Lite组件产生的数据来查看任务执行历史记录。
elastic-job、zk 和 quartz 关系如下
# 安装部署
参考文档
# 弹性调度
弹性调度是 ElasticJob 最重要的功能,能够让任务通过分片进行水平扩展的任务处理。
# 任务分片
任务的分片执行是指一个批量任务如果由一台服务执行速度会比较慢,那么对任务进行分片交给多台服务器进行执行,这样执行的效率就会得到提高。
ElasticJob 中任务分片项的概念,使得任务可以在分布式的环境下运行,每台任务服务器只运行分配给该服务器的分片,随着服务器的增加或宕机,ElasticJob 会近乎实时的感知服务器数量的变更,从而重新为分布式的任务服务器分配更加合理的任务分片项,使得任务可以随着资源的增加而提升效率。
任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。
举例说明,如果作业分为 4 片,用两台服务器执行,则每个服务器分到 2 片,分别负责作业的 50% 的负载,如下图所示。
# 个性化分片参数
ElasticJob 可以设置分片项和自定义分片参数,个性化参数可以和分片项匹配对应关系,用于将分片项的数字转换为更加可读的业务代码
例如按照地区进行统计数据,北京=1,上海=2,广州=3,如果仅仅按照1、2、3进行分片,对开发者来说很不友好,需要了解具体数字所代表的含义,而使用个性化参数可以让代码的可读性更高,如果使用以下配置
shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
那么在代码中可以更清晰的理解具体分片键的含义,或者使用枚举类型来让业务代码的可读性更高
# 分片策略
框架默认提供了三种分片策略,所有的分片策略都是接口
JobShardingStrategy
的实现
# AverageAllocationJobShardingStrategy
策略说明
基于平均分配算法的分片策略,也是默认的分片策略,如果分片不能整除,则不能整除的多余分片将依次追加到序号小的服务器
- 如果有
3
台服务器,分成9
片,则每台服务器分到的分片是:1=[0,1,2], 2=[3,4,5], 3=[6,7,8]
- 如果有
3
台服务器,分成8
片,则每台服务器分到的分片是:1=[0,1,6], 2=[2,3,7], 3=[4,5]
- 如果有
3
台服务器,分成10
片,则每台服务器分到的分片是:1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8]
# OdevitySortByNameJobShardingStrategy
策略说明
根据作业名的哈希值奇偶数决定
IP
升降序算法的分片策略,用于不同的作业平均分配负载至不同的服务器。
- 作业名的哈希值为奇数则
IP
升序。 - 作业名的哈希值为偶数则
IP
降序。
特点
AverageAllocationJobShardingStrategy
的缺点是,一旦分片数小于作业服务器数,作业将永远分配至IP
地址靠前的服务器,导致IP
地址靠后的服务器空闲。而OdevitySortByNameJobShardingStrategy
则可以根据作业名称重新分配服务器负载。
- 如果有
3
台服务器,分成2
片,作业名称的哈希值为奇数,则每台服务器分到的分片是:1=[0], 2=[1], 3=[]
- 如果有
3
台服务器,分成2
片,作业名称的哈希值为偶数,则每台服务器分到的分片是:3=[0], 2=[1], 1=[]
# RotateServerByNameJobShardingStrategy
策略说明
根据作业名的哈希值对服务器列表进行轮转的分片策略
# 动态调度
ElasticJob 可以根据节点的数量动态进行任务的分派,可以提高业务的执行效率以及提高吞吐量
当新增加作业服务器时,ElasticJob 会通过注册中心的临时节点的变化感知到新服务器的存在,并在下次任务调度的时候重新分片,新的服务器会承载一部分作业分片,如下图所示。
如果将分片项设置为大于服务器的数量,最好是大于服务器倍数的数量,作业将会合理的利用分布式资源,动态的分配分片项。
例如三台服务器,分成 10 片,则分片项分配结果为服务器 A = 0,1,2;服务器 B = 3,4,5;服务器 C = 6,7,8,9
如果服务器 C 崩溃,则分片项分配结果为服务器 A = 0,1,2,3,4; 服务器 B = 5,6,7,8,9。 在不丢失分片项的情况下,最大限度的利用现有资源提高吞吐量。
# 高可用
当定时任务服务器宕机时,注册中心同样会通过临时节点感知,并将在下次运行时将分片转移至仍存活的服务器,以达到作业高可用的效果
本次由于服务器宕机而未执行完的作业,则可以通过失效转移的方式继续执行。如下图所示。
# 作业类型
elastic-job提供了三种类型的作业类型,分别是
Simple
,Dataflow
,ScriptJob
# Simple
SimpleJob需要实现SimpleJob接口,意为简单实现,未经过任何封装,与quartz原生接口相似,比如示例代码中所使用的job
# Dataflow
Dataflow类型用于处理数据流,需实现DataflowJob接口,适用于不间歇的数据处理。
该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据
# 注意事项
- 可通过DataflowJobConfiguration配置是否流式处理。
- 流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去; 非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,随即完成本次作业。
- 如果采用流式作业处理方式,建议processData处理数据后更新其状态,避免fetchData再次抓取到,从而使得作业永不停止。
# 代码案例
这里模拟定时处理订单状态,拉取订单然后进行处理订单。
public class MyDataFlowJob implements DataflowJob<Order> {
Logger log = LoggerFactory.getLogger(MyDataFlowJob.class);
//模拟100个未处理订单
private static List<Order> orders = new ArrayList<>();
{
for (int i = 0; i < 100; i++) {
Order order = new Order();
order.setOrderId(i);
order.setStatus(0);
orders.add(order);
}
}
@Override
public List<Order> fetchData(ShardingContext shardingContext) {
//订单号%分片总数==当前分片项
List<Order> orderList = orders.stream().filter(o -> o.getStatus() == 0)
.filter(o -> o.getOrderId() % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem())
.collect(Collectors.toList());
List<Order> subList = null;
if (orderList != null && orderList.size() > 0) {
int endIndex = 10;
if (orderList.size() < 10) {
endIndex = orderList.size() - 1;
}
subList = orderList.subList(0, endIndex);
}
//由于抓取数据过快,为更好看出效果,此处休眠一会儿
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("分片项:{},我抓取的数据:{}", shardingContext.getShardingItem(), subList);
return subList;
}
@Override
public void processData(ShardingContext shardingContext, List<Order> list) {
list.forEach(o -> o.setStatus(1));
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("分片项:{},处理中.....", shardingContext.getShardingItem());
}
}
# 事件跟踪
在elastic-job中,有一块很重要的功能,与作业的执行密切相关但是又不能影响作业的执行,这就是事件跟踪,定时任务执行一个事件,需要记录定时任务执行时间,状态等等信息,这些都可以通过事件跟踪来完成
# 控制台
Elastic-job控制台能够对elasticjob的作业做运维工作,比如暂停定时任务,修改定时任务执行时间,分片策略等等
# 设计理念
- elastic-job控制台和Elastic Job并无直接关系,是通过读取Elastic Job的注册中心数据展现作业状态,或更新注册中心数据修改全局配置。
- 控制台只能控制作业本身是否运行,但不能控制作业进程的启停,因为控制台和作业本身服务器是完全分布式的,控制台并不能控制作业服务器。
# 支持功能
- 查看作业以及服务器状态
- 快捷的修改以及删除作业设置
- 启用和禁用作业
- 跨注册中心查看作业
- 查看作业运行轨迹和运行状态
# 安装部署
# 下载ElasticJob-UI
访问
https://shardingsphere.apache.org/elasticjob/current/cn/downloads/
地址,找到ElasticJob-UI 的tar包下载即可,注意下载ElasticJob-Lite-UI
的tar包
# 启动服务
解压后,找到
bin
目录下的启动类,启动即可
# 访问测试
ElasticJob-UI 默认启动端口是8088,可以在
application.properties
配置文件进行修改,启动后出现如下访问界面,用户名密码都是root
登录后就可以进入主界面
# 配置注册中心
ElasticJob-UI因为设计的时候就是和
Elastic Job
服务相分离的,通过zk
来查看和控制作业的状态,所以使用elastic-job
第一步就是配置注册中心,如下图添加注册中心
# 配置项说明
- 注册中心名称:可以随意命名,注册中心的名称
- 注册中心地址:也就是zookeeper地址,我们的地址时
localhost:2181
- 命名空间地址:这个一般是我们在
elastic-job
配置的命名空间的地址我们配置的是elasticjob-lite-springboot
- 登录凭证:zk的登录凭证,默认没有配置可以忽略
# 作业操作
可以在作业操作栏目对作业进行操作,比如暂停以及触发任务,和修改任务的执行计划等等