跨境互联网 跨境互联网
首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)

Revin

首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)
  • 任务调度

    • 任务调度的概述
    • crontab
    • Timer、ScheduledExecutor
    • Quartz
    • 分布式-Elastic-Job

      • Elastic-Job介绍
      • Elastic-Job基础使用
      • Elastic-Job高级用法
        • 1 Elastic-Job自定义分片策略
        • 2 Elastic-Job监听器
        • 4 Elastic-Job架构原理
          • 4.1 Elastic-Job架构图
          • 4.2 作业分片
          • 4.3 弹性扩容
          • 4.4 高可用
          • 4.5 失效转移
          • 4.6 幂等性
          • 4.7 总结
      • Elastic-Job运维管理平台
    • xxl-job
  • 消息队列

  • Zookeeper

  • java组件
  • 任务调度
  • 分布式-Elastic-Job
Revin
2023-06-15
目录

Elastic-Job高级用法

# 1 Elastic-Job自定义分片策略

分片策略:将一作业任务拆分成若干个分片后,分片按照那种规则分配到服务中。

Elastic-Job官方提供的分片策略:

平均分片(默认策略):将分片平均分配到指定服务中;

eg:有一个作业任务拆分成10个分片,有3个运行服务。

第一个服务:0,1,2,3分片

第二个服务:4,5,6分片

第三个服务:7,8,9分

根据作业名的哈希值的奇偶数决定IP升降序算法的分片策略.

image-20230615225855357

首先会计算作业任务名称的hash值。

判断hash值是奇数还是偶数

奇数:按照ip升序进行平均分片;

偶数:按照ip降序进行平均分片;

根据作业名的哈希值对服务器列表进行轮转的分片策略

image-20230615225919851

首先会计算作业任务名称的hash值。

对服务数量取余

根据余数决定从哪个服务开始进行平均分片;

自定义分片策略

需求:10个分片,如果有3个服务。

第一个服务[0 3 6 9]

第二个服务[1 4 7]

第三个服务[2 5 8]

(一)定义类实现JobShardingStrategy接口,重写sharding方法

(二)sharding方法中可以获取服务列表,作业任务名称,总分片项

(三)返回Map<服务,分片项集合>

@Override
 public Map<JobInstance, List<Integer>> sharding(List<JobInstance> jobInstances, String jobName, int shardingTotalCount) {
   if (jobInstances.isEmpty()) {
     return Collections.emptyMap();
   }
   Map<JobInstance,List<Integer>> map = new HashMap<>();
   //1.生成分片项的列表
   Queue<Integer> queue = new ArrayDeque<>();
   for (int i = 0; i < shardingTotalCount; i++) {
     // 0 1 2 3 4 5 6 7 8 9
     queue.add(i);
   }
   //1.迭代服务列表
   for (int i = 0; i < jobInstances.size(); i++) {
     //获取服务
     JobInstance jobInstance = jobInstances.get(i);
     //获取该服务对应的分片项;
     List<Integer> shardingItemList = getShardingItemList(i, jobInstances.size(), queue);
     map.put(jobInstance,shardingItemList);
   }
   System.out.println(map);
   return map;
 }
 private List<Integer> getShardingItemList(int jobInstanceIndex,int size,Queue<Integer> queue) {
   //定义jobInstanceIndex 对应的分片列表
   List<Integer> shardingItemList = new ArrayList<>();
   for (int shardingItem : queue) {
     if(shardingItem % size == jobInstanceIndex) {
       shardingItemList.add(shardingItem);
     }
   }
   return shardingItemList;
 }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

(四)、使用自定义分片策略

Spring整合Elastic-Job

<job:simple id="mySimpleJob" registry-center-ref="zk" cron="0/10 * * * * ?" sharding-total-count="10"         class="cn.itcast.MySimpleJob" overwrite="true" job-sharding-strategy-class="cn.itcast.strategy.MyJobShardingStrategy"></job:simple>
1

SpringBoot整合Elastic-Job

修改注解,让用户传递分片策略

@Target(ElementType.TYPE) //注解定义位置
@Retention(RetentionPolicy.RUNTIME) //源码 编译 运行都有效
public @interface ElasticSimpleJob {
 String name();
 String cron();
 int shardingTotalCount() default 1;
 boolean overwrite() default false;
 Class jobShardingStrategyClass() default AverageAllocationJobShardingStrategy.class;
}
1
2
3
4
5
6
7
8
9

设置分片策略

LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(overwrite).jobShardingStrategyClass(jobShardingStrategyClass.getCanonicalName()).build();
1
@Slf4j
@Component
@ElasticSimpleJob(jobName = "mySimpleJob",corn = "0/5 * * * * ?", shardingTotalCount = 10,overwrite = true)
public class MySimpleJob implements SimpleJob {
 @Override
 public void execute(ShardingContext shardingContext) {
   int totalCount = shardingContext.getShardingTotalCount(); //分片总数量
   int item = shardingContext.getShardingItem();//当前分片
   log.info("作业任务被执行了" + new Date() + "总分片数量" + totalCount + "当前分片:" + item);
 }
}
1
2
3
4
5
6
7
8
9
10
11

# 2 Elastic-Job监听器

概念:监听任务作业执行前和执行后。

可通过配置多个任务监听器,在任务执行前和执行后执行监听的方法。

监听器分类:

同一个作业任务的每个节点都执行的监听器。

image-20230615230009324

分布式环境中,只在一个节点执行监听器。

image-20230615230025927

第一类监听器的实现

@Slf4j
public class MyListener implements ElasticJobListener {
 @Override
 public void beforeJobExecuted(ShardingContexts shardingContexts) {
   String jobName = shardingContexts.getJobName();
   log.info(jobName + "任务执行之前执行");
 }
 @Override
 public void afterJobExecuted(ShardingContexts shardingContexts) {
   String jobName = shardingContexts.getJobName();
   log.info(jobName + "任务执行之后执行");
 }
}
1
2
3
4
5
6
7
8
9
10
11
12
13

spring整合ElasticJob配置监听器

<job:simple id="mySimpleJob" registry-center-ref="zk" cron="0/10 * * * * ?" sharding-total-count="3"
       class="cn.itcast.job.MySimpleJob" overwrite="true" >
   <job:listener class="cn.itcast.lisenter.My"lisenter></job:listener>
</job:simple>
1
2
3
4

spring boot整合ElasticJob

(一)修改ElasticSimpleJob注解,让使用者将需要的listener传递进来

@Retention(RetentionPolicy.RUNTIME) //源码 编译 运行都有效
public @interface ElasticSimpleJob {
 String name();
 String cron();
 int shardingTotalCount() default 1;
 boolean overwrite() default false;
 Class jobShardingStrategyClass() default
AverageAllocationJobShardingStrategy.class;
 Class<? extends ElasticJobListener>[] listeners() default {};
}
1
2
3
4
5
6
7
8
9
10

配置:修改ElasticSimpleJobConfiguration,传递listener

LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(overwrite).jobShardingStrategyClass(jobShardingStrategyClass.getCanonicalName()).build();
        ElasticJobListener[] elasticJobListeners = new ElasticJobListener[listeners.length];
        for (int i = 0; i < listeners.length; i++) {
          elasticJobListeners[i] = (ElasticJobListener)listeners[i].newInstance();
       }
        new SpringJobScheduler(bean,coordinatorRegistryCenter, simpleJobRootConfig,elasticJobListeners).init();
1
2
3
4
5
6

分布式环境,只有一个节点监听代码实现:(略)

代码有Bug

# 4 Elastic-Job架构原理

# 4.1 Elastic-Job架构图

下面这张图是Elastic-Job的整体架构图。

image-20230615230052317

绿色部分:elastic-job的jar包,和应用是部署在一起。

zookeeper:主要完成的工作是服务注册和服务发现。

运维平台:通过使用restful的API,从Zookeeper 获取作业操作和作业状态。发布作业轨迹到数据库或者ELK;

# 4.2 作业分片

image-20230615230126277

步骤1:服务首先要将自己的信息暴露给注册中心。;

步骤2:服务通过选举的方式,来选择一个主节点;

步骤3:主节点通过调用分片策略进行分片。eg:分为4片,0,1分片的服务A。2 ,3属于服务B。

步骤4:将分片信息存储到注册中心;

# 4.3 弹性扩容

Elastic-Job如何进行弹性扩容。

image-20230615230141005

1.当有新的服务启动后,比如有一个新的服务服务C启动,首先需要将信息暴露到注册中心。暴露到注册中心后会在Zookeeper产生新的节点。

2.zookeepr中的节点监听器会监听到新服务的注册即服务发现。

  1. 一旦有新的服务注册,服务会进行重新选举,选出新的主节点。
  2. 由主节点调用分片策略,重新分片
  3. 将重新分片的信息存储到注册中心。

注意:分片调整的时机,并不是立即来做,而是先在Zookeeper中异步记录状态,然后在下一次任务执行之前,由主节点调用分片策略重新分片。

image-20230615230159779

每隔1小时执行一次任务,如果在执行过程中有新的服务加入,或者服务宕机。并不是在作业执行过程中进行重新分片,重新分片的时机在下一次任务执行之前。

# 4.4 高可用

弹性扩容讲解完毕后,理解高可用就非常的容易。

如果作业在运行过程中,我们将服务进行了下线,或者机器出现了故障。分片就会指向存活的节点。

image-20230615230216176

假如我们的作业非常重要,我们也可以这么做:eg:我们可以把作业任务分成10个分片。使用20的服务来运行。

这20个服务中,其中有10个运行,10个在等待,等到有服务宕机,就可以进行候补

image-20230615230231964

# 4.5 失效转移

失效转移解决的问题:在作业运行过程中可能中,某些分片所在的服务产生宕机。导致一部分分片执行成功,一部分分片执行失败。

image-20230615230255769

Elastic-Job的解决方案:开启失效转移

spring整合Elastic-Job

<job:simple id="mySimpleJob" registry-center-ref="zk" cron="0/10 * * * * ?"
sharding-total-count="3"
class="cn.itcast.job.MySimpleJob" overwrite="true" failover="true" >
</job:simple>
1
2
3
4

spring boot整合Elastic-Job

JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder(name, cron, shardingTotalCount).failover(true).build();
1

一旦我们设置failover为true后,Elastic-Job中FailoverListenerManager(失效转移监听管理器)中的一个监听器(JobCrashedJobListener)就开启。

该监听器会监听Zookeeper中instances下节点的删除事件。如果其中某个被删除。说明对应服务宕机。就会触发失败转移的逻辑。 失败转移逻辑

首先:在leader下创建一个failover/items节点。并且在items节点下记录宕机服务原来分配的分片(即失败的分片)。

接下来:其他存活的服务,都会监听到Zookeeper中节点删除的时间。存活的服务就会争抢

failover/items下面失败的分片。 为了保证失败的分片不会被重复执行。在Elastic-Job中采用了分布式锁解决。

最后:分片执行完毕后,会删除failover/items节点下失效的分片;

# 4.6 幂等性

image-20230615230316990

假如:任务调度周期为每1小时执行一次,正常每次调度任务处理需要耗时30分钟,如果在某一段时间由于数据库压力变大,导致原本只需要30分钟就能处理完成的任务,现在需要1.10小时才能运行,在一批数据处理未完成的情况下,每1小时又会触发一次调度,这样同一条任务数据可能将被多次处理,如果业务方法未实现幂等,则会引发非常严重的问题,那ElasticJob如何避免这个问题呢。

elasticJob提供了一个配置参数:

monitorExecution=true,开启幂等性
1

在下一个调度周期到达之后,只要发现这个任务的任何一个分片正在执行(running),则为该任务的所有分片都设置为mis-fire,然后忽略本次任务触发。

在任务执行完成后,如果开启misfire

misfire=true
1

ElasticJob检查Zookeeper是否存在misfire状态分片,如果存在,则首先清除misfie相关的信息,然后补偿执行任务。

如果没有开启misfire,任务执行完成后,会等待下一个调度周期到达后触发。

# 4.7 总结

失效转移和幂等性比较适合于作业时间相对较长的任务,作业一次只处理一块数据,比如报表统计,金额汇总,数据一次都不能丢,否则数据可能不完整。

对于一些几个秒执行一次的作业任务。比如超时订单处理。失效转移和幂等性其实可以不使用。

上次更新: 2025/04/03, 11:07:08
Elastic-Job基础使用
Elastic-Job运维管理平台

← Elastic-Job基础使用 Elastic-Job运维管理平台→

最近更新
01
tailwindcss
03-26
02
PaddleSpeech
02-18
03
whisper
02-18
更多文章>
Theme by Vdoing | Copyright © 2019-2025 跨境互联网 | 豫ICP备14016603号-5 | 豫公网安备41090002410995号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式