跨境互联网 跨境互联网
首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)

Revin

首页
  • AI 工具

    • 绘图提示词工具 (opens new window)
    • ChatGPT 指令 (opens new window)
  • ChatGPT

    • ChatGP T介绍
    • ChatGPT API 中文开发手册
    • ChatGPT 中文调教指南
    • ChatGPT 开源项目
  • Midjourney

    • Midjourney 文档
  • Stable Diffusion

    • Stable Diffusion 文档
  • 其他

    • AIGC 热门文章
    • 账号合租 (opens new window)
    • 有趣的网站
  • Vue

    • Vue3前置
  • JAVA基础

    • Stream
    • Git
    • Maven
    • 常用第三方类库
    • 性能调优工具
    • UML系统建模
    • 领域驱动设计
    • 敏捷开发
    • Java 测试
    • 代码规范及工具
    • Groovy 编程
  • 并发编程&多线程

    • 并发编程
    • 高性能队列 Disruptor
    • 多线程并发在电商系统下的应用
  • 其他

    • 面试题
  • 消息中间中间件

    • Kafka
    • RabbitMQ
    • RocketMQ
  • 任务调度

    • Quartz
    • XXL-Job
    • Elastic-Job
  • 源码解析

    • Mybatis 高级使用
    • Mybatis 源码剖析
    • Mybatis-Plus
    • Spring Data JPA
    • Spring 高级使用
    • Spring 源码剖析
    • SpringBoot 高级使用
    • SpringBoot 源码剖析
    • Jdk 解析
    • Tomcat 架构设计&源码剖析
    • Tomcat Web应用服务器
    • Zookeeper 高级
    • Netty
  • 微服务框架

    • 分布式原理
    • 分布式集群架构场景化解决方案
    • Dubbo 高级使用
    • Dubbo 核心源码剖析
    • Spring Cloud Gateway
    • Nacos 实战应用
    • Sentinel 实战应用
    • Seata 分布式事务
  • 数据结构和算法的深入应用
  • 存储

    • 图和Neo4j
    • MongoDB
    • TiDB
    • MySQL 优化
    • MySQL 平滑扩容实战
    • MySQL 海量数据存储与优化
    • Elasticsearch
  • 缓存

    • Redis
    • Aerospike
    • Guava Cache
    • Tair
  • 文件存储

    • 阿里云 OSS 云存储
    • FastDF 文件存储
  • 基础

    • Linux 使用
    • Nginx 使用与配置
    • OpenResty 使用
    • LVS+Keepalived 高可用部署
    • Jekins
  • 容器技术

    • Docker
    • K8S
    • K8S
  • 01.全链路(APM)
  • 02.电商终极搜索解决方案
  • 03.电商亿级数据库设计
  • 04.大屏实时计算
  • 05.分库分表的深入实战
  • 06.多维系统下单点登录
  • 07.多服务之间分布式事务
  • 08.业务幂等性技术架构体系
  • 09.高并发下的12306优化
  • 10.每秒100W请求的秒杀架构体系
  • 11.集中化日志管理平台的应用
  • 12.数据中台配置中心
  • 13.每天千万级订单的生成背后痛点及技术突破
  • 14.红包雨的架构设计及源码实现
  • 人工智能

    • Python 笔记
    • Python 工具库
    • 人工智能(AI) 笔记
    • 人工智能(AI) 项目笔记
  • 大数据

    • Flink流处理框架
  • 加密区

    • 机器学习(ML) (opens new window)
    • 深度学习(DL) (opens new window)
    • 自然语言处理(NLP) (opens new window)
AI 导航 (opens new window)
  • 基础

  • 设计模式

  • 并发编程

    • 并发编程

      • 多线程&并发设计原理
      • J.U.C
      • 线程池与Future
      • ForkJoinPool
        • 1 ForkJoinPool用法
          • 1.1 例子1:快排
          • 1.2 例子2:求1到n个数的和
        • 2 核心数据结构
        • 3 工作窃取队列
        • 4 ForkJoinPool状态控制
          • 4.1 状态变量ctl解析
          • 4.2 阻塞栈Treiber Stack
          • 4.3 ctl变量的初始值
          • 4.4 ForkJoinWorkerThread状态与个数分析
        • 5 Worker线程的阻塞-唤醒机制
          • 5.1 阻塞–入栈
          • 5.2 唤醒–出栈
        • 6 任务的提交过程分析
          • 6.1 内部提交任务push
          • 6.2 外部提交任务
        • 7 工作窃取算法:任务的执行过程分析
        • 8 ForkJoinTask的fork/join
          • 8.1 fork
          • 8.2 join的嵌套
          • 8.2.1 join的层层嵌套阻塞原理
          • 8.2.2 ForkJoinTask的状态解析
          • 8.2.3 join的详细实现
          • 8.2.4 join的唤醒
        • 9 ForkJoinPool的优雅关闭
          • 9.1 工作线程的退出
          • 9.2 shutdown()与shutdownNow()的区别
      • 多线程设计模式
    • 多线程并发在电商系统下的应用

    • 高性能队列 Disruptor
    • 资料
  • JVM与性能调优

  • 字节码增强技术

  • java
  • 并发编程
  • 并发编程
Revin
2023-07-17
目录

ForkJoinPool

# 1 ForkJoinPool用法

ForkJoinPool就是JDK7提供的一种“分治算法”的多线程并行计算框架。Fork意为分叉,Join意为合并,一分一合,相互配合,形成分治算法。此外,也可以将ForkJoinPool看作一个单机版的Map/Reduce,多个线程并行计算。

相比于ThreadPoolExecutor,ForkJoinPool可以更好地实现计算的负载均衡,提高资源利用率。

假设有5个任务,在ThreadPoolExecutor中有5个线程并行执行,其中一个任务的计算量很大,其余4个任务的计算量很小,这会导致1个线程很忙,其他4个线程则处于空闲状态。

利用ForkJoinPool,可以把大的任务拆分成很多小任务,然后这些小任务被所有的线程执行,从而实现任务计算的负载均衡。

# 1.1 例子1:快排

快排有2个步骤:

  1. 利用数组的第1个元素把数组划分成两半,左边数组里面的元素小于或等于该元素,右边数组

里面的元素比该元素大;

  1. 对左右的两个子数组分别排序。

左右两个子数组相互独立可以并行计算。利用ForkJoinPool,代码如下:

package com.lagou.concurrent.demo;
import java.util.Arrays;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;
public class ForkJoinPoolDemo01 {
  static class SortTask extends RecursiveAction {
    final long[] array;
    final int lo;
    final int hi;
    public SortTask(long[] array) {
      this.array = array;
      this.lo = 0;
      this.hi = array.length - 1;
   }
    public SortTask(long[] array, int lo, int hi) {
      this.array = array;
      this.lo = lo;
      this.hi = hi;
   }
    private int partition(long[] array, int lo, int hi) {
      long x = array[hi];
  int i = lo - 1;
      for (int j = lo; j < hi; j++) {
        if (array[j] <= x) {
          i++;
          swap(array, i, j);
       }
     }
      swap(array, i + 1, hi);
      return i + 1;
   }
    private void swap(long[] array, int i, int j) {
      if (i != j) {
        long temp = array[i];
        array[i] = array[j];
        array[j] = temp;
     }
   }
    @Override
    protected void compute() {
      if (lo < hi) {
        // 找到分区的元素下标
        int pivot = partition(array, lo, hi);
// 将数组分为两部分
        SortTask left = new SortTask(array, lo, pivot - 1);
        SortTask right = new SortTask(array, pivot + 1, hi);
       
        left.fork();
        right.fork();
        left.join();
        right.join();
     }
   }
 }
  public static void main(String[] args) throws InterruptedException {
    long[] array = {5, 3, 7, 9, 2, 4, 1, 8, 10};
    // 一个任务
    ForkJoinTask sort = new SortTask(array);
    // 一个pool
    ForkJoinPool pool = new ForkJoinPool();
    // ForkJoinPool开启多个线程,同时执行上面的子任务
    pool.submit(sort);
    // 结束ForkJoinPool
    pool.shutdown();
    // 等待结束Pool
    pool.awaitTermination(10, TimeUnit.SECONDS);
    System.out.println(Arrays.toString(array));
 }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71

# 1.2 例子2:求1到n个数的和

package com.lagou.concurrent.demo;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
public class ForkJoinPoolDemo02 {
  static class SumTask extends RecursiveTask<Long> {
    private static final int THRESHOLD = 10;
    private long start;
    private long end;
    public SumTask(long n) {
      this(1, n);
   }
    public SumTask(long start, long end) {
      this.start = start;
      this.end = end;
   }
    @Override
    protected Long compute() {
      long sum = 0;
// 如果计算的范围在threshold之内,则直接进行计算
      if ((end - start) <= THRESHOLD) {
        for (long l = start; l <= end; l++) {
          sum += l;
       }
     } else {
        // 否则找出起始和结束的中间值,分割任务
        long mid = (start + end) >>> 1;
        SumTask left = new SumTask(start, mid);
        SumTask right = new SumTask(mid + 1, end);
        left.fork();
        right.fork();
        // 收集子任务计算结果
        sum = left.join() + right.join();
     }
      return sum;
   }
 }
  public static void main(String[] args) throws ExecutionException,
InterruptedException {
    SumTask sum = new SumTask(100);
    ForkJoinPool pool = new ForkJoinPool();
    ForkJoinTask<Long> future = pool.submit(sum);
    Long aLong = future.get();
    System.out.println(aLong);
    pool.shutdown();
 }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

上面的代码用到了 RecursiveAction 和 RecursiveTask 两个类,它们都继承自抽象类

ForkJoinTask,用到了其中关键的接口 fork()、join()。二者的区别是一个有返回值,一个没有返回值。

WM_ConcurrentProgramming_Page224_001

WM_ConcurrentProgramming_Page224_002

RecursiveAction/RecursiveTask类继承关系:

WM_ConcurrentProgramming_Page224_003

在ForkJoinPool中,对应的接口如下:

WM_ConcurrentProgramming_Page224_004

# 2 核心数据结构

与ThreadPoolExector不同的是,除一个全局的任务队列之外,每个线程还有一个自己的局部队列。

WM_ConcurrentProgramming_Page225_001

核心数据结构如下所示:

public class ForkJoinPool extends AbstractExecutorService {
 
  // 状态变量,类似于ThreadPoolExecutor中的ctl变量。
  volatile long ctl;
  // 工作线程队列
  WorkQueue[] workQueues;
  // 工作线程工厂
  final ForkJoinWorkerThreadFactory factory;
  // 下一个worker的下标
  int indexSeed;
 
  static final class WorkQueue {
    volatile int source;    // source queue id, or sentinel
int id;           // 在ForkJoinPool的workQueues数组中的下标
int base;          // 队列尾部指针
int top;          // 队列头指针
volatile int phase;     // versioned, negative: queued, 1: locked
int stackPred;       // pool stack (ctl) predecessor link
int nsteals;        // number of steals
ForkJoinTask<?>[] array;  // 工作线程的局部队列
final ForkJoinPool pool;  // the containing pool (may be null)
final ForkJoinWorkerThread owner; // 该工作队列的所有者线程,null表示共享的
 }
}
public class ForkJoinWorkerThread extends Thread {
  // 当前工作线程所在的线程池,反向引用
  final ForkJoinPool pool;
  // 工作队列
  final ForkJoinPool.WorkQueue workQueue;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

下面看一下这些核心数据结构的构造过程。

WM_ConcurrentProgramming_Page226_001

public ForkJoinPool(int parallelism,
          ForkJoinWorkerThreadFactory factory,
          UncaughtExceptionHandler handler,
          boolean asyncMode,
          int corePoolSize,
          int maximumPoolSize,
          int minimumRunnable,
          Predicate<? super ForkJoinPool> saturate,
          long keepAliveTime,
          TimeUnit unit) {
  // check, encode, pack parameters
  if (parallelism <= 0 || parallelism > MAX_CAP ||
    maximumPoolSize < parallelism || keepAliveTime <= 0L)
    throw new IllegalArgumentException();
  if (factory == null)
    throw new NullPointerException();
  long ms = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP);
  int corep = Math.min(Math.max(corePoolSize, parallelism), MAX_CAP);
  long c = ((((long)(-corep)    << TC_SHIFT) & TC_MASK) |
       (((long)(-parallelism) << RC_SHIFT) & RC_MASK));
  int m = parallelism | (asyncMode ? FIFO : 0);
  int maxSpares = Math.min(maximumPoolSize, MAX_CAP) - parallelism;
  int minAvail = Math.min(Math.max(minimumRunnable, 0), MAX_CAP);
  int b = ((minAvail - parallelism) & SMASK) | (maxSpares << SWIDTH);
  //
  int n = (parallelism > 1) ? parallelism - 1 : 1; // at least 2 slots
  n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
  n = (n + 1) << 1; // power of two, including space for submission queues
  // 工作线程名称前缀
  this.workerNamePrefix = "ForkJoinPool-" + nextPoolId() + "-worker-";
  // 初始化工作线程数组为n,2的幂次方
  this.workQueues = new WorkQueue[n];
  // worker线程工厂,有默认值
  this.factory = factory;
  this.ueh = handler;
  this.saturate = saturate;
  this.keepAlive = ms;
  this.bounds = b;
  this.mode = m;
  // ForkJoinPool的状态
  this.ctl = c;
  checkPermission();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

# 3 工作窃取队列

关于上面的全局队列,有一个关键点需要说明:它并非使用BlockingQueue,而是基于一个普通的数组得以实现。

这个队列又名工作窃取队列,为 ForkJoinPool 的工作窃取算法提供服务。在 ForkJoinPool开篇的注释中,Doug Lea 特别提到了工作窃取队列的实现,其陈述来自如下两篇论文:"Dynamic Circular Work-Stealing Deque" by Chase and Lev,SPAA 2005与"Idempotent work stealing" by Michael,Saraswat,and Vechev,PPoPP 2009。读者可以在网上查阅相应论文。

所谓工作窃取算法,是指一个Worker线程在执行完毕自己队列中的任务之后,可以窃取其他线程队列中的任务来执行,从而实现负载均衡,以防有的线程很空闲,有的线程很忙。这个过程要用到工作窃取队列。

WM_ConcurrentProgramming_Page227_001

这个队列只有如下几个操作:

  1. Worker线程自己,在队列头部,通过对top指针执行加、减操作,实现入队或出队,这是单线程的。
  2. 其他Worker线程,在队列尾部,通过对base进行累加,实现出队操作,也就是窃取,这是多

线程的,需要通过CAS操作

这个队列,在Dynamic Circular Work-Stealing Deque这篇论文中被称为dynamic-cyclic-array。之所以这样命名,是因为有两个关键点:

  1. 整个队列是环形的,也就是一个数组实现的RingBuffer。并且base会一直累加,不会减小;

top会累加、减小。最后,base、top的值都会大于整个数组的长度,只是计算数组下标的时

候,会取top&(queue.length-1),base&(queue.length-1)。因为queue.length是2的整数次方,这里也就是对queue.length进行取模操作。

当top-base=queue.length-1 的时候,队列为满,此时需要扩容;

当top=base的时候,队列为空,Worker线程即将进入阻塞状态。

  1. 当队列满了之后会扩容,所以被称为是动态的。但这就涉及一个棘手的问题:多个线程同时在读写这个队列,如何实现在不加锁的情况下一边读写、一边扩容呢?

通过分析工作窃取队列的特性,我们会发现:在 base 一端,是多线程访问的,但它们只会使base变大,也就是使队列中的元素变少。所以队列为满,一定发生在top一端,对top进行累加的时候,这一端却是单线程的!队列的扩容恰好利用了这个单线程的特性!即在扩容过程中,不可能有其他线程对top进行修改,只有线程对base进行修改!

下图为工作窃取队列扩容示意图。扩容之后,数组长度变成之前的二倍,但top、base的值是不变的!通过top、base对新的数组长度取模,仍然可以定位到元素在新数组中的位置。

WM_ConcurrentProgramming_Page228_001

下面结合WorkQueue扩容的代码进一步分析。

WM_ConcurrentProgramming_Page228_002

WM_ConcurrentProgramming_Page228_003

WM_ConcurrentProgramming_Page229_001

final void growArray(boolean locked) {
  ForkJoinTask<?>[] newA = null;
  try {
    ForkJoinTask<?>[] oldA; int oldSize, newSize;
    // 当旧的array不是null,旧的array包含元素
    // 并且新的数组长度小于队列最大长度,并且新的长度大于0
    if ((oldA = array) != null && (oldSize = oldA.length) > 0 &&
     (newSize = oldSize << 1) <= MAXIMUM_QUEUE_CAPACITY &&
      newSize > 0) {
      try {
        // 创建新数组
        newA = new ForkJoinTask<?>[newSize];
     } catch (OutOfMemoryError ex) {
     }
      if (newA != null) { // poll from old array, push to new
        int oldMask = oldSize - 1, newMask = newSize - 1;
        for (int s = top - 1, k = oldMask; k >= 0; --k) {
          // 逐个复制
          ForkJoinTask<?> x = (ForkJoinTask<?>)
            // 获取旧的值,将原来的设置为null
            QA.getAndSet(oldA, s & oldMask, null);
          if (x != null)
            newA[s-- & newMask] = x;
          else
            break;
       }
        array = newA;
        VarHandle.releaseFence();
     }
   }
 } finally {
    if (locked)
      phase = 0;
 }
  if (newA == null)
    throw new RejectedExecutionException("Queue capacity exceeded");
} 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

# 4 ForkJoinPool状态控制

# 4.1 状态变量ctl解析

类似于ThreadPoolExecutor,在ForkJoinPool中也有一个ctl变量负责表达ForkJoinPool的整个生命周期和相关的各种状态。不过ctl变量更加复杂,是一个long型变量,代码如下所示。

public class ForkJoinPool extends AbstractExecutorService {
  // ...
  // 线程池状态变量
  volatile long ctl;
  private static final long SP_MASK   = 0xffffffffL;
private static final long UC_MASK   = ~SP_MASK;
  private static final int  RC_SHIFT  = 48;
private static final long RC_UNIT   = 0x0001L << RC_SHIFT;
private static final long RC_MASK   = 0xffffL << RC_SHIFT;
  private static final int  TC_SHIFT  = 32;
private static final long TC_UNIT   = 0x0001L << TC_SHIFT;
private static final long TC_MASK   = 0xffffL << TC_SHIFT;
private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); //
sign
 
  // ...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

WM_ConcurrentProgramming_Page230_001

ctl变量的64个比特位被分成五部分:

  1. AC:最高的16个比特位,表示Active线程数-parallelism,parallelism是上面的构造方法传进去的参数;
  2. TC:次高的16个比特位,表示Total线程数-parallelism;
  3. ST:1个比特位,如果是1,表示整个ForkJoinPool正在关闭;
  4. EC:15个比特位,表示阻塞栈的栈顶线程的wait count(关于什么是wait count,接下来解

释);

  1. ID:16个比特位,表示阻塞栈的栈顶线程对应的id。

WM_ConcurrentProgramming_Page231_001

# 4.2 阻塞栈Treiber Stack

什么叫阻塞栈呢?

要实现多个线程的阻塞、唤醒,除了park/unpark这一对操作原语,还需要一个无锁链表实现的阻塞队列,把所有阻塞的线程串在一起。

在ForkJoinPool中,没有使用阻塞队列,而是使用了阻塞栈。把所有空闲的Worker线程放在一个栈里面,这个栈同样通过链表来实现,名为Treiber Stack。前面讲解Phaser的实现原理的时候,也用过这个数据结构。

下图为所有阻塞的Worker线程组成的Treiber Stack。

WM_ConcurrentProgramming_Page231_002

首先,WorkQueue有一个id变量,记录了自己在WorkQueue[]数组中的下标位置,id变量就相当于每个WorkQueue或ForkJoinWorkerThread对象的地址;

WM_ConcurrentProgramming_Page231_003

其次,ForkJoinWorkerThread还有一个stackPred变量,记录了前一个阻塞线程的id,这个

stackPred变量就相当于链表的next指针,把所有的阻塞线程串联在一起,组成一个Treiber Stack。

最后,ctl变量的最低16位,记录了栈的栈顶线程的id;中间的15位,记录了栈顶线程被阻塞的次数,也称为wait count。

# 4.3 ctl变量的初始值

构造方法中,有如下的代码:

WM_ConcurrentProgramming_Page232_001

因为在初始的时候,ForkJoinPool 中的线程个数为 0,所以 AC=0-parallelism,TC=0-

parallelism。这意味着只有高32位的AC、TC 两个部分填充了值,低32位都是0填充。

# 4.4 ForkJoinWorkerThread状态与个数分析

在ThreadPoolExecutor中,有corePoolSize和maxmiumPoolSize 两个参数联合控制总的线程数,而在ForkJoinPool中只传入了一个parallelism参数,且这个参数并不是实际的线程数。那么,ForkJoinPool在实际的运行过程中,线程数究竟是由哪些因素决定的呢?

要回答这个问题,先得明白ForkJoinPool中的线程都可能有哪几种状态?可能的状态有三种:

  1. 空闲状态(放在Treiber Stack里面)。
  2. 活跃状态(正在执行某个ForkJoinTask,未阻塞)。
  3. 阻塞状态(正在执行某个ForkJoinTask,但阻塞了,于是调用join,等待另外一个任务的结果返回)。

ctl变量很好地反映出了三种状态:

高32位:u=(int) (ctl >>> 32),然后u又拆分成tc、ac 两个16位;

低32位:c=(int) ctl。

  1. c>0,说明Treiber Stack不为空,有空闲线程;c=0,说明没有空闲线程;
  2. ac>0,说明有活跃线程;ac<=0,说明没有空闲线程,并且还未超出parallelism;
  3. tc>0,说明总线程数 >parallelism。

在提交任务的时候:

WM_ConcurrentProgramming_Page232_002

WM_ConcurrentProgramming_Page233_001

WM_ConcurrentProgramming_Page233_002

WM_ConcurrentProgramming_Page233_003

在通知工作线程的时候,需要判断ctl的状态,如果没有闲置的线程,则开启新线程:

WM_ConcurrentProgramming_Page233_004

# 5 Worker线程的阻塞-唤醒机制

ForkerJoinPool 没有使用 BlockingQueue,也就不利用其阻塞/唤醒机制,而是利用了park/unpark原语,并自行实现了Treiber Stack。

下面进行详细分析ForkerJoinPool,在阻塞和唤醒的时候,分别是如何入栈的。

# 5.1 阻塞–入栈

当一个线程窃取不到任何任务,也就是处于空闲状态时就会阻塞入栈。

WM_ConcurrentProgramming_Page234_001

final void runWorker(WorkQueue w) {
  // 随机数
  int r = (w.id ^ ThreadLocalRandom.nextSecondarySeed()) | FIFO; // rng
  // 初始化任务数组
  w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
  for (;;) {
    int phase;
    // 扫描是否有需要执行的一个或多个顶级任务
    // 其中包含了窃取的任务执行,以及线程局部队列中任务的执行
    // 如果发现了就执行,返回true
    // 如果获取不到任务,就需要将该线程入队列,阻塞
    if (scan(w, r)) {
      // 随机数
      r ^= r << 13; r ^= r >>> 17; r ^= r << 5;
   }
    // 如果是已经入队列阻塞的,因为phase大于0表示加锁
    else if ((phase = w.phase) >= 0) {   // enqueue, then rescan
      long np = (w.phase = (phase + SS_SEQ) | UNSIGNALLED) & SP_MASK;
      long c, nc;
     do {
        w.stackPred = (int)(c = ctl);
        // ForkJoinPool中status表示运行中的线程的,数字减一,因为入队列了。
        nc = ((c - RC_UNIT) & UC_MASK) | np;
        // CAS操作,自旋,直到操作成功
     } while (!CTL.weakCompareAndSet(this, c, nc));
   }
    else {                 // already queued
      int pred = w.stackPred;
      Thread.interrupted();       // clear before park
      w.source = DORMANT;        // enable signal
      long c = ctl;
      int md = mode, rc = (md & SMASK) + (int)(c >> RC_SHIFT);
      // 如果ForkJoinPool停止,则break,跳出循环
      if (md < 0)
        break;
      // 优雅关闭
      else if (rc <= 0 && (md & SHUTDOWN) != 0 &&
          tryTerminate(false, false))
        break;
      else if (rc <= 0 && pred != 0 && phase == (int)c) {
        long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred);
        long d = keepAlive + System.currentTimeMillis();
        // 线程阻塞,计时等待
        LockSupport.parkUntil(this, d);
        //
        if (ctl == c &&        // drop on timeout if all idle
          d - System.currentTimeMillis() <= TIMEOUT_SLOP &&
          CTL.compareAndSet(this, c, nc)) {
          // 不再扫描,需要入队列
          w.phase = QUIET;
          break;
       }
     }
      // phase为1,表示加锁,phase为负数表示入队列
      else if (w.phase < 0)
        // 如果phase小于0,表示阻塞,排队中
        LockSupport.park(this);
      w.source = 0;
   }
 }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// 从一个队列中扫描一个或多个顶级任务,如果有,就执行
// 对于非空队列,执行任务,返回true
private boolean scan(WorkQueue w, int r) {
  WorkQueue[] ws; int n;
  // 如果workQueues不是null,并且workQueue的长度大于0,并且w非空,w是线程的workQueue
  if ((ws = workQueues) != null && (n = ws.length) > 0 && w != null) {
    // m是ws长度减一,获取ws顶部workQueue
    for (int m = n - 1, j = r & m;;) {
      WorkQueue q; int b;
      // 随机获取workQueue,如果该workQueue的顶指针和底指针不相等,表示有需要执行的任务
      if ((q = ws[j]) != null && q.top != (b = q.base)) {
        int qid = q.id;
        ForkJoinTask<?>[] a; int cap, k; ForkJoinTask<?> t;
        // 如果workQueue的任务队列不是null,并且元素非空
        if ((a = q.array) != null && (cap = a.length) > 0) {
          // 获取队列顶部任务
          t = (ForkJoinTask<?>)QA.getAcquire(a, k = (cap - 1) & b);
          // 如果q的base值没有被别的线程修改过,t不是null,并且将t从数组中移除成功
          // 即可在当前工作线程执行该任务
          if (q.base == b++ && t != null &&
            QA.compareAndSet(a, k, t, null)) {
            // base+1
            q.base = b;
            // 更改source为当前id
            w.source = qid;
            // 如果还有任务需要执行,通知其他闲置的线程执行
            if (q.top - b > 0)
              signalWork();
            // 让workQueue中的工作线程来执行不管是窃取来的,还是本地的任务,还是从queue中获取的其他任务
            // 公平起见,添加一个随机的边界;剩下的让别的线程来执行。
            w.topLevelExec(t, q,  // random fairness bound
                   r & ((n << TOP_BOUND_SHIFT) - 1));
         }
       }
        return true;
     }
      else if (--n > 0)
        j = (j + 1) & m;
      else
        break;
   }
 }
  return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

# 5.2 唤醒–出栈

在新的任务到来之后,空闲的线程被唤醒,其核心逻辑在signalWork方法里面。

final void signalWork() {
  for (;;) {
    long c; int sp; WorkQueue[] ws; int i; WorkQueue v;
    if ((c = ctl) >= 0L)            // 足够的worker线程
      break;
    else if ((sp = (int)c) == 0) {       // 没有闲置的worker线程
      if ((c & ADD_WORKER) != 0L)      // worker线程太少
        tryAddWorker(c); // 尝试添加新的worker线程
      break;
   }
    else if ((ws = workQueues) == null)
      break;                 // 线程池没有启动或已经停止了
    else if (ws.length <= (i = sp & SMASK))
 break;                 // 线程池停止了
    else if ((v = ws[i]) == null)
      break;                 // 线程池正在停止中
    else {
      int np = sp & ~UNSIGNALLED;
      int vp = v.phase;
      long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + RC_UNIT));
      Thread vt = v.owner;
      if (sp == vp && CTL.compareAndSet(this, c, nc)) {
        v.phase = np;
        // 如果栈顶元素存在,并且
        if (vt != null && v.source < 0)
          // 唤醒线程vt
          LockSupport.unpark(vt);
        break;
     }
   }
 }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

# 6 任务的提交过程分析

在明白了工作窃取队列、ctl变量的各种状态、Worker的各种状态,以及线程阻塞—唤醒机制之后,接下来综合这些知识,详细分析任务的提交和执行过程。

关于任务的提交,ForkJoinPool最外层的接口如下所示。

WM_ConcurrentProgramming_Page237_001

/**
* 将一个可能是外部任务的子任务入队列
*/
private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
  Thread t; ForkJoinWorkerThread w; WorkQueue q;
  // 任务为null,抛异常
  if (task == null)
    throw new NullPointerException();
  // 如果当前线程是ForkJoinWorkerThread类型的,并且该线程的pool就是当前对象
  // 并且当前pool的workQueue不是null,则将当前任务入队列。
  if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
   (w = (ForkJoinWorkerThread)t).pool == this &&
   (q = w.workQueue) != null)
    // 当前任务入队局部队列
    q.push(task);
 else
    // 否则该任务不是当前线程的子任务,调用外部入队方法,加入全局队列
    externalPush(task);
  return task;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

如何区分一个任务是内部任务,还是外部任务呢?

可以通过调用该方法的线程类型判断。

如果线程类型是ForkJoinWorkerThread,说明是线程池内部的某个线程在调用该方法,则把该任务放入该线程的局部队列;

否则,是外部线程在调用该方法,则将该任务加入全局队列。

# 6.1 内部提交任务push

内部提交任务,即上面的q.push(task),会放入该线程的工作窃取队列中,代码如下所示。

WM_ConcurrentProgramming_Page238_001

由于工作窃取队列的特性,操作是单线程的,所以此处不需要执行CAS操作。

# 6.2 外部提交任务

final void externalPush(ForkJoinTask<?> task) {
  int r;
  // 生成随机数
  if ((r = ThreadLocalRandom.getProbe()) == 0) {
    ThreadLocalRandom.localInit();
    r = ThreadLocalRandom.getProbe();
 }
  for (;;) {
    WorkQueue q;
    int md = mode, n;
    WorkQueue[] ws = workQueues;
    // 如果ForkJoinPool关闭,或者任务队列是null,或者ws的长度小于等于0,拒收任务
if ((md & SHUTDOWN) != 0 || ws == null || (n = ws.length) <= 0)
      throw new RejectedExecutionException();
   
    // 如果随机数计算的workQueues索引处的元素为null,则添加队列
    // 即提交任务的时候,是随机向workQueue中添加workQueue,负载均衡的考虑。
    else if ((q = ws[(n - 1) & r & SQMASK]) == null) { // add queue
      // 计算新workQueue对象的id值
      int qid = (r | QUIET) & ~(FIFO | OWNED);
      // worker线程名称前缀
      Object lock = workerNamePrefix;
      // 创建任务数组
      ForkJoinTask<?>[] qa =
        new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
      // 创建WorkQueue,将当前线程作为
      q = new WorkQueue(this, null);
      // 将任务数组赋值给workQueue
      q.array = qa;
      // 设置workQueue的id值
      q.id = qid;
      // 由于是通过客户端线程添加的workQueue,没有前置workQueue
      // 内部提交任务有源workQueue,表示子任务
      q.source = QUIET;
      if (lock != null) {   // unless disabled, lock pool to install
        synchronized (lock) {
          WorkQueue[] vs; int i, vn;
          // 如果workQueues数组不是null,其中有元素,
          // 并且qid对应的workQueues中的元素为null,则赋值
          // 因为有可能其他线程将qid对应的workQueues处的元素设置了,
          // 所以需要加锁,并判断元素是否为null
          if ((vs = workQueues) != null && (vn = vs.length) > 0 &&
            vs[i = qid & (vn - 1) & SQMASK] == null)
            //
            vs[i] = q;
       }
     }
   }
    // CAS操作,使用随机数
    else if (!q.tryLockPhase()) // move if busy
      r = ThreadLocalRandom.advanceProbe(r);
    else {
      // 如果任务添加成功,通知线程池调度,执行。
      if (q.lockedPush(task))
        signalWork();
      return;
   }
 }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

lockedPush(task)方法的实现:

WM_ConcurrentProgramming_Page240_001

外部多个线程会调用该方法,所以要加锁,入队列和扩容的逻辑和线程内部的队列基本相同。最

后,调用signalWork(),通知一个空闲线程来取。

# 7 工作窃取算法:任务的执行过程分析

全局队列有任务,局部队列也有任务,每一个Worker线程都会不间断地扫描这些队列,窃取任务来执行。下面从Worker线程的run方法开始分析:

WM_ConcurrentProgramming_Page240_002

run()方法调用的是所在ForkJoinPool的runWorker方法,如下所示。

final void runWorker(WorkQueue w) {
  int r = (w.id ^ ThreadLocalRandom.nextSecondarySeed()) | FIFO; // rng
  w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; // initialize
  for (;;) {
    int phase;
    if (scan(w, r)) {           // scan until apparently empty
      r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // move (xorshift)
   }
    else if ((phase = w.phase) >= 0) {   // enqueue, then rescan
      long np = (w.phase = (phase + SS_SEQ) | UNSIGNALLED) & SP_MASK;
      long c, nc;
      do {
        w.stackPred = (int)(c = ctl);
        nc = ((c - RC_UNIT) & UC_MASK) | np;
     } while (!CTL.weakCompareAndSet(this, c, nc));
   }
    else {                 // already queued
      int pred = w.stackPred;
      Thread.interrupted();       // clear before park
      w.source = DORMANT;        // enable signal
      long c = ctl;
      int md = mode, rc = (md & SMASK) + (int)(c >> RC_SHIFT);
      if (md < 0)            // terminating
        break;
      else if (rc <= 0 && (md & SHUTDOWN) != 0 &&
          tryTerminate(false, false))
        break;             // quiescent shutdown
      else if (rc <= 0 && pred != 0 && phase == (int)c) {
        long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred);
        long d = keepAlive + System.currentTimeMillis();
        LockSupport.parkUntil(this, d);
        if (ctl == c &&        // drop on timeout if all idle
          d - System.currentTimeMillis() <= TIMEOUT_SLOP &&
          CTL.compareAndSet(this, c, nc)) {
          w.phase = QUIET;
          break;
       }
     }
      else if (w.phase < 0)
        LockSupport.park(this);    // OK if spuriously woken
      w.source = 0;           // disable signal
   }
 }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

下面详细看扫描过程scan(w, a)。

private boolean scan(WorkQueue w, int r) {
  WorkQueue[] ws; int n;
  if ((ws = workQueues) != null && (n = ws.length) > 0 && w != null) {
    for (int m = n - 1, j = r & m;;) {
   WorkQueue q; int b;
      if ((q = ws[j]) != null && q.top != (b = q.base)) {
        int qid = q.id;
        ForkJoinTask<?>[] a; int cap, k; ForkJoinTask<?> t;
        if ((a = q.array) != null && (cap = a.length) > 0) {
          t = (ForkJoinTask<?>)QA.getAcquire(a, k = (cap - 1) &
b);
          if (q.base == b++ && t != null &&
            QA.compareAndSet(a, k, t, null)) {
            q.base = b;
            w.source = qid;
            if (q.top - b > 0)
              signalWork();
            w.topLevelExec(t, q,  // random fairness bound
                   r & ((n << TOP_BOUND_SHIFT) - 1));
         }
       }
        return true;
     }
      else if (--n > 0)
        j = (j + 1) & m;
      else
        break;
   }
 }
  return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

# 8 ForkJoinTask的fork/join

如果局部队列、全局中的任务全部是相互独立的,就很简单了。但问题是,对于分治算法来说,分解出来的一个个任务并不是独立的,而是相互依赖,一个任务的完成要依赖另一个前置任务的完成。

这种依赖关系是通过ForkJoinTask中的join()来体现的。且看前面的代码:

protected void compute() {
  if (lo < hi) {
    // 分区
    int pivot = partition(array, lo, hi);
    SortTask left = new SortTask(array, lo, pivot - 1);
    SortTask right = new SortTask(array, pivot + 1, hi);
    left.fork();
    right.fork();
    left.join();
    right.join();
 }
}
1
2
3
4
5
6
7
8
9
10
11
12

线程在执行当前ForkJoinTask的时候,产生了left、right 两个子Task。

fork是指把这两个子Task放入队列里面;

join则是要等待2个子Task完成。

而子Task在执行过程中,会再次产生两个子Task。如此层层嵌套,类似于递归调用,直到最底层的Task计算完成,再一级级返回。

# 8.1 fork

fork()的代码很简单,就是把自己放入当前线程所在的局部队列中。

如果是外部线程调用fork方法,则直接将任务添加到共享队列中。

WM_ConcurrentProgramming_Page243_001

# 8.2 join的嵌套

# 8.2.1 join的层层嵌套阻塞原理

join会导致线程的层层嵌套阻塞,如图所示:

WM_ConcurrentProgramming_Page243_002

线程1在执行 ForkJoinTask1,在执行过程中调用了 forkJoinTask2.join(),所以要等ForkJoinTask2 完成,线程1才能返回;

线程2在执行ForkJoinTask2,但由于调用了forkJoinTask3.join(),只有等ForkJoinTask3完成后,线程2才能返回;

线程3在执行ForkJoinTask3。

结果是:线程3首先执行完,然后线程2才能执行完,最后线程1再执行完。所有的任务其实组成一个有向无环图DAG。如果线程3调用了forkJoinTask1.join(),那么会形成环,造成死锁。

那么,这种层次依赖、层次通知的 DAG,在 ForkJoinTask 内部是如何实现的呢?站在

ForkJoinTask的角度来看,每个ForkJoinTask,都可能有多个线程在等待它完成,有1个线程在执行它。

所以每个ForkJoinTask就是一个同步对象,线程在调用join()的时候,阻塞在这个同步对象上面,执行完成之后,再通过这个同步对象通知所有等待的线程。

利用synchronized关键字和Java原生的wait()/notify()机制,实现了线程的等待-唤醒机制。调用join()的这些线程,内部其实是调用ForkJoinTask这个对象的wait();执行该任务的Worker线程,在任务执行完毕之后,顺便调用notifyAll()。

WM_ConcurrentProgramming_Page244_001

# 8.2.2 ForkJoinTask的状态解析

要实现fork()/join()的这种线程间的同步,对应的ForkJoinTask一定是有各种状态的,这个状态变量是实现fork/join的基础。

public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
  // ...
  // 由ForkJoinPool和workers直接调用
  // 需要是volatile的
  volatile int status;
private static final int DONE   = 1 << 31; // 负值
private static final int ABNORMAL = 1 << 18; // 设置DONE的时候自动设置
private static final int THROWN  = 1 << 17; // 设置ABNORMAL的时候自动设置
private static final int SIGNAL  = 1 << 16; // 如果在调用join的线程正在等
待,则为true
private static final int SMASK   = 0xffff;  // short bits for tags
  // ...
}
1
2
3
4
5
6
7
8
9
10
11
12
13

初始时,status=0。共有五种状态,可以分为两大类:

  1. 未完成:status>=0。
  2. 已完成:status<0。

所以,通过判断是status>=0,还是status<0,就可知道任务是否完成,进而决定调用join()的线程是否需要被阻塞。

# 8.2.3 join的详细实现

下面看一下代码的详细实现。

WM_ConcurrentProgramming_Page245_001

getRawResult()是ForkJoinTask中的一个模板方法,分别被RecursiveAction和RecursiveTask实现,前者没有返回值,所以返回null,后者返回一个类型为V的result变量。

WM_ConcurrentProgramming_Page245_002

WM_ConcurrentProgramming_Page245_003

阻塞主要发生在上面的doJoin()方法里面。在dojoin()里调用t.join()的线程会阻塞,然后等待任务t执行完成,再唤醒该阻塞线程,doJoin()返回。

注意:当 doJoin()返回的时候,就是该任务执行完成的时候,doJoin()的返回值就是任务的完成状态,也就是上面的几种状态。

public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
  // ...
  private int doJoin() {
    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
    // 如果status < 0表示任务已经完成,不用阻塞,直接返回。
    return (s = status) < 0 ? s :
      // 否则判断线程是否是工作线程
     ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
     (w = (wt = (ForkJoinWorkerThread)t).workQueue).
      tryUnpush(this) && (s = doExec()) < 0 ? s :
      wt.pool.awaitJoin(w, this, 0L) :
      externalAwaitDone();
 }
  // ...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

上面的返回值可读性比较差,变形之后:

// 如果status < 0,直接返回s值
if ((s = status) < 0) {
  return s;
} else {
  // 如果线程是工作线程
  if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
    // 将任务能够从局部队列弹出,并调用doExec()方法执行成功
    if (w = (wt = (ForkJoinWorkerThread)t).workQueue).tryUnpush(this) &&
(s = doExec()) < 0) {
      // 返回s值
      return s;
   } else {
      // 否则等待,线程阻塞
      wt.pool.awaitJoin(w, this, 0L)
   }
 } else {
    // 如果线程不是工作线程,则外部等待任务完成,线程阻塞
    externalAwaitDone();
 }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

先看一下externalAwaitDone(),即外部线程的阻塞过程,相对简单。

private int externalAwaitDone() {
  int s = tryExternalHelp();
  if (s >= 0 && (s = (int)STATUS.getAndBitwiseOr(this, SIGNAL)) >= 0) {
    boolean interrupted = false;
    synchronized (this) {
      for (;;) {
        if ((s = status) >= 0) {
          try {
            // 如果任务还没有完成,阻塞
            wait(0L);
         } catch (InterruptedException ie) {
            interrupted = true;
         }
       }
        else {
          // 唤醒线程,开始执行
          notifyAll();
          break;
       }
     }
   }
    if (interrupted)
      Thread.currentThread().interrupt();
 }
  return s;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

内部Worker线程的阻塞,即上面的wt.pool.awaitJoin(w, this, 0L),相比外部线程的阻塞要做更多工作。它现不在ForkJoinTask里面,而是在ForkJoinWorkerThread里面。

final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
  int s = 0;
  int seed = ThreadLocalRandom.nextSecondarySeed();
  if (w != null && task != null &&
   (!(task instanceof CountedCompleter) ||
    (s = w.helpCC((CountedCompleter<?>)task, 0, false)) >= 0)) {
    // 尝试执行该任务
    w.tryRemoveAndExec(task);
    int src = w.source, id = w.id;
    int r = (seed >>> 16) | 1, step = (seed & ~1) | 2;
    s = task.status;
    while (s >= 0) {
      WorkQueue[] ws;
      int n = (ws = workQueues) == null ? 0 : ws.length, m = n - 1;
      while (n > 0) {
        WorkQueue q; int b;
        if ((q = ws[r & m]) != null && q.source == id &&
          q.top != (b = q.base)) {
          ForkJoinTask<?>[] a; int cap, k;
          int qid = q.id;
          if ((a = q.array) != null && (cap = a.length) > 0) {
            ForkJoinTask<?> t = (ForkJoinTask<?>)
              QA.getAcquire(a, k = (cap - 1) & b);
            if (q.source == id && q.base == b++ &&
              t != null && QA.compareAndSet(a, k, t, null)) {
              q.base = b;
              w.source = qid;
              // 执行该任务
              t.doExec();
              w.source = src;
           }
         }
          break;
       }
        else {
          r += step;
          --n;
       }
     }
      // 如果任务的status < 0,任务执行完成,则退出循环,返回s的值
      if ((s = task.status) < 0)
        break;
      else if (n == 0) { // empty scan
        long ms, ns; int block;
        if (deadline == 0L)
          ms = 0L;            // untimed
        else if ((ns = deadline - System.nanoTime()) <= 0L)
          break;             // timeout
        else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
          ms = 1L;            // avoid 0 for timed wait
        if ((block = tryCompensate(w)) != 0) {
          task.internalWait(ms);
         CTL.getAndAdd(this, (block > 0) ? RC_UNIT : 0L);
       }
        s = task.status;
     }
   }
 }
  return s;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60

上面的方法有个关键点:for里面是死循环,并且只有一个返回点,即只有在task.status<0,任务完成之后才可能返回。否则会不断自旋;若自旋之后还不行,就会调用task.internalWait(ms);阻塞。

task.internalWait(ms);的代码如下。

WM_ConcurrentProgramming_Page248_001

# 8.2.4 join的唤醒

调用t.join()之后,线程会被阻塞。接下来看另外一个线程在任务t执行完毕后如何唤醒阻塞的线程。

WM_ConcurrentProgramming_Page248_002

WM_ConcurrentProgramming_Page249_001

任务的执行发生在doExec()方法里面,任务执行完成后,调用一个setDone()通知所有等待的线程。

这里也做了两件事:

  1. 把status置为完成状态。
  2. 如果s != 0,即 s = SIGNAL,说明有线程正在等待这个任务执行完成。调用Java原生的

notifyAll()通知所有线程。如果s = 0,说明没有线程等待这个任务,不需要通知。

# 9 ForkJoinPool的优雅关闭

同ThreadPoolExecutor一样,ForkJoinPool的关闭也不可能是“瞬时的”,而是需要一个平滑的过渡过程。

# 9.1 工作线程的退出

对于一个Worker线程来说,它会在一个for循环里面不断轮询队列中的任务,如果有任务,则执

行,处在活跃状态;如果没有任务,则进入空闲等待状态。

这个线程如何退出呢?

/**
* 工作线程的顶级循环,通过ForkJoinWorkerThread.run调用
*/
final void runWorker(WorkQueue w) {
  int r = (w.id ^ ThreadLocalRandom.nextSecondarySeed()) | FIFO; // rng
  w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; // 初始化任务数组。
  for (;;) {
    int phase;
    if (scan(w, r)) {           // scan until apparently empty
      r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // move (xorshift)
   }
    else if ((phase = w.phase) >= 0) {   // enqueue, then rescan
      long np = (w.phase = (phase + SS_SEQ) | UNSIGNALLED) & SP_MASK;
      long c, nc;
      do {
        w.stackPred = (int)(c = ctl);
        nc = ((c - RC_UNIT) & UC_MASK) | np;
     } while (!CTL.weakCompareAndSet(this, c, nc));
   }
    else {                 // already queued
      int pred = w.stackPred;
      Thread.interrupted();       // clear before park
      w.source = DORMANT;        // enable signal
      long c = ctl;
      int md = mode, rc = (md & SMASK) + (int)(c >> RC_SHIFT);
     if (md < 0)            // terminating
        break;
      // 优雅退出
      else if (rc <= 0 && (md & SHUTDOWN) != 0 &&
          tryTerminate(false, false))
        break;
      else if (rc <= 0 && pred != 0 && phase == (int)c) {
        long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred);
        long d = keepAlive + System.currentTimeMillis();
        LockSupport.parkUntil(this, d);
        if (ctl == c &&        // drop on timeout if all idle
          d - System.currentTimeMillis() <= TIMEOUT_SLOP &&
          CTL.compareAndSet(this, c, nc)) {
          w.phase = QUIET;
          break;
       }
     }
      else if (w.phase < 0)
        LockSupport.park(this);    // OK if spuriously woken
      w.source = 0;           // disable signal
   }
 }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

(int) (c = ctl) < 0,即低32位的最高位为1,说明线程池已经进入了关闭状态。但线程池进入关闭状态,不代表所有的线程都会立马关闭。

# 9.2 shutdown()与shutdownNow()的区别

public void shutdown() {
  checkPermission();
  tryTerminate(false, true);
}
public List<Runnable> shutdownNow() {
  checkPermission();
  tryTerminate(true, true);
  return Collections.emptyList();
}
1
2
3
4
5
6
7
8
9

二者的代码基本相同,都是调用tryTerminate(boolean, boolean)方法,其中一个传入的是false,另一个传入的是true。tryTerminate意为试图关闭ForkJoinPool,并不保证一定可以关闭成功:

private boolean tryTerminate(boolean now, boolean enable) {
  int md; // 三个阶段:尝试设置为SHUTDOWN,之后STOP,最后TERMINATED
  while (((md = mode) & SHUTDOWN) == 0) {
    if (!enable || this == common)     // cannot shutdown
      return false;
    else
      // 将mode变量CAS操作设置为SHUTDOWN
     MODE.compareAndSet(this, md, md | SHUTDOWN);
 }
  while (((md = mode) & STOP) == 0) {    // try to initiate termination
    if (!now) {              // check if quiescent & empty
      for (long oldSum = 0L;;) {     // repeat until stable
        boolean running = false;
        long checkSum = ctl;
        WorkQueue[] ws = workQueues;
        if ((md & SMASK) + (int)(checkSum >> RC_SHIFT) > 0)
          // 还有正在运行的线程
          running = true;
        else if (ws != null) {
          WorkQueue w;
          for (int i = 0; i < ws.length; ++i) {
            if ((w = ws[i]) != null) {
              int s = w.source, p = w.phase;
              int d = w.id, b = w.base;
              if (b != w.top ||
               ((d & 1) == 1 && (s >= 0 || p >= 0))) {
                running = true;
                // 还正在运行
                break;
             }
              checkSum += (((long)s << 48) + ((long)p << 32) +
                     ((long)b << 16) + (long)d);
           }
         }
       }
        if (((md = mode) & STOP) != 0)
          break;         // already triggered
        else if (running)
          return false;
        else if (workQueues == ws && oldSum == (oldSum = checkSum))
          break;
     }
   }
    if ((md & STOP) == 0)
      // 如果需要立即停止,同时md没有设置为STOP,则设置为STOP
      MODE.compareAndSet(this, md, md | STOP);
 }
  // 如果mode还没有设置为TERMINATED,则进行循环
  while (((md = mode) & TERMINATED) == 0) { // help terminate others
    for (long oldSum = 0L;;) {       // repeat until stable
      WorkQueue[] ws; WorkQueue w;
      long checkSum = ctl;
      if ((ws = workQueues) != null) {
        for (int i = 0; i < ws.length; ++i) {
          if ((w = ws[i]) != null) {
            ForkJoinWorkerThread wt = w.owner;
            // 清空任务队列
            w.cancelAll();
            if (wt != null) {
              try {
                // 中断join或park的线程
                wt.interrupt();
             } catch (Throwable ignore) {
           }
           }
            checkSum += ((long)w.phase << 32) + w.base;
         }
       }
     }
      // 如果已经设置了TERMINATED,则跳出for循环,while循环条件为false,整个方
法返回true,停止
      if (((md = mode) & TERMINATED) != 0 ||
       (workQueues == ws && oldSum == (oldSum = checkSum)))
        break;
   }
    if ((md & TERMINATED) != 0)
      break;
    else if ((md & SMASK) + (short)(ctl >>> TC_SHIFT) > 0)
      break;
    else if (MODE.compareAndSet(this, md, md | TERMINATED)) {
      synchronized (this) {
        // 通知调用awaitTermination的线程,关闭ForkJoinPool了
        notifyAll();
     }
      break;
   }
 }
  return true;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89

总结:shutdown()只拒绝新提交的任务;shutdownNow()会取消现有的全局队列和局部队列中的任务,同时唤醒所有空闲的线程,让这些线程自动退出。

上次更新: 2025/04/03, 11:07:08
线程池与Future
多线程设计模式

← 线程池与Future 多线程设计模式→

最近更新
01
tailwindcss
03-26
02
PaddleSpeech
02-18
03
whisper
02-18
更多文章>
Theme by Vdoing | Copyright © 2019-2025 跨境互联网 | 豫ICP备14016603号-5 | 豫公网安备41090002410995号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式