多线程 J.U.C
在spring这种web开发环境下,尽量使用方法内的变量,尽量不要定义类变量,如果要使用定义类变量要考虑线程安全性。
# 1 线程池
# 1.1 概念
# 1.1.1 回顾线程创建方式
- 继承Thread
- 实现Runnable
# 1.1.2 线程的状态
- NEW:刚刚创建,没做任何操作
Thread thread = new Thread();
System.out.println(thread.getState());
- RUNNABLE:
调用run,可以执行,但不代表一定在执行(RUNNING,READY)
thread.start();
System.out.println(thread.getState());
- BLOCKED:抢不到锁
final byte[] lock = new byte[0];
new Thread(new Runnable() {
public void run() {
synchronized (lock){
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
Thread thread2 = new Thread(new Runnable() {
public void run() {
synchronized (lock){
}
}
});
thread2.start();
Thread.sleep(1000);
System.out.println(thread2.getState());
- WAITING
Thread thread2 = new Thread(new Runnable() {
public void run() {
LockSupport.park();
}
});
thread2.start();
Thread.sleep(500);
System.out.println(thread2.getState());
LockSupport.unpark(thread2);
Thread.sleep(500);
System.out.println(thread2.getState());
- TIMED_WAITING
Thread thread3 = new Thread(new Runnable() {
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread3.start();
Thread.sleep(500);
System.out.println(thread3.getState());
- TERMINATED
//等待1s后再来看
Thread.sleep(1000);
System.out.println(thread.getState());
# 1.1.3 线程池基本概念
根据上面的状态,普通线程执行完,就会进入TERMINATED销毁掉,而线程池就是创建一个缓冲池存放线程,执行结束以后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等候下次任务来临,这使得线程池比手动创建线程有着更多的优势:
- 降低系统资源消耗,通过重用已存在的线程,降低线程创建和销毁造成的消耗;
- 提高系统响应速度,当有任务到达时,通过复用已存在的线程,无需等待新线程的创建便能立即执行;
- 方便线程并发数的管控。因为线程若是无限制的创建,可能会导致内存占用过多而产生OOM
- 节省cpu切换线程的时间成本(需要保持当前执行线程的现场,并恢复要执行线程的现场)。
- 提供更强大的功能,延时定时线程池。(Timer vs ScheduledThreadPoolExecutor)
# 1.1.4 常用线程池类结构
可以通过idea查看到 ( 查看: ScheduledThreadPoolExecutor , ForkJoinPool 类图 )
说明:
- 最常用的是ThreadPoolExecutor
- 调度用ScheduledThreadPoolExecutor
- 任务拆分合并用ForkJoinPool
- Executors是工具类,协助你创建线程池的
# 1.1.2 核心参数
Java 提供的线程池相关的工具类中,最核心的是 ThreadPoolExecutor,我们首先来看它的类体系及构造
核心构造:
public class ThreadPoolExecutor extends AbstractExecutorService {
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
//核心的构造函数,其他构造函数都是调用该构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
}
线程池核心参数介绍:
参数名 | 作用 |
---|---|
corePoolSize | 核心线程池基本大小,核心线程数 |
maximumPoolSize | 线程池最大线程数 |
keepAliveTime | 线程空闲后的存活时间 |
TimeUnit unit | 线程空闲后的存活时间单位 |
BlockingQueue workQueue | 存放任务的阻塞队列 |
ThreadFactory threadFactory | 创建线程的工厂 |
RejectedExecutionHandler handler | 当阻塞队列和最大线程池都满了之后的饱和策略 |
- corePoolSize:核心线程数量
1:线程池刚创建时,线程数量为0,当每次执行execute
添加新的任务时会在线程池创建一个新的线程,直到线程数量达到corePoolSize
为止。
2:核心线程会一直存活,即使没有任务需要执行,当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理
3:设置allowCoreThreadTimeout=true
(默认false)时,核心线程超时会关闭
- workQueue:阻塞队列
1:当线程池正在运行的线程数量已经达到corePoolSize
,那么再通过execute
添加新的任务则会被加workQueue
队列中,在队列中排队等待执行,而不会立即执行。
一般来说,这里的阻塞队列有以下几种选择:
ArrayBlockingQueue
,LinkedBlockingQueue
,SynchronousQueue
;
- maximumPoolSize:最大线程数
1:当池中的线程数>=corePoolSize
,且任务队列已满时。线程池会创建新线程来处理任务
2:当池中的线程数=maximumPoolSize
,且任务队列已满时,线程池会拒绝处理任务而抛出异常
- keepAliveTime:线程空闲时间
1:当线程空闲时间达到keepAliveTime
时,线程会退出,直到线程数量=corePoolSize
2:如果allowCoreThreadTimeout=true
,则会直到线程数量=0
- threadFactory:线程工厂,主要用来创建线程
- rejectedExecutionHandler:任务拒绝处理器,两种情况会拒绝处理任务
1:当线程数已经达到maxPoolSize
,且队列已满,会拒绝新任务
2:当线程池被调用shutdown()
后,会等待线程池里的任务执行完毕,再shutdown
。如果在调用shutdown()
和线程池真正shutdown
之间提交任务,会拒绝新任务
3:当拒绝处理任务时线程池会调用rejectedExecutionHandler
来处理这个任务。如果没有设置默认是AbortPolicy
,另外在ThreadPoolExecutor
类有几个内部实现类来处理这类情况
ThreadPoolExecutor.AbortPolicy
:丢弃任务并抛出RejectedExecutionException
异常。
ThreadPoolExecutor.CallerRunsPolicy
:由调用线程处理该任务
ThreadPoolExecutor.DiscardPolicy
:也是丢弃任务,但是不抛出异常
ThreadPoolExecutor.DiscardOldestPolicy
:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
# 1.2 工作机制
在线程池的编程模式下,任务是提交给整个线程池,而不是直接提交给某个线程,线程池在拿到任务后,就在内部协调空闲的线程,如果有,则将任务交给某个空闲的线程。一个线程同时只能执行一个任务,但可以同时向一个线程池提交多个任务。
# 1.2.1 线程池状态
- **RUNNING:**初始化状态是RUNNING。线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0。RUNNING状态下,能够接收新任务,以及对已添加的任务进行处理。
- **SHUTDOWN:**SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。
//shutdown后不接受新任务,但是task1,仍然可以执行完成
ExecutorService poolExecutor = Executors.newFixedThreadPool(5);
poolExecutor.execute(new Runnable() {
public void run() {
try {
Thread.sleep(1000);
System.out.println("finish task 1");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
poolExecutor.shutdown();
poolExecutor.execute(new Runnable() {
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
System.out.println("ok");
- **STOP:**不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。调用线程池的shutdownNow()接口时,线程池由(RUNNING 或 SHUTDOWN ) -> STOP
注意:运行中的任务还会打印,直到结束,因为调的是 Thread.interrupt
//改为shutdownNow后,任务立马终止,sleep被打断,新任务无法提交,task1停止
poolExecutor.shutdownNow();
- **TIDYING:**所有的任务已终止,队列中的”任务数量”为0,线程池会变为TIDYING。线程池变为TIDYING状态时,会执行钩子函数terminated(),可以通过重载terminated()函数来实现自定义行为
//自定义类,重写terminated方法
public class MyExecutorService extends ThreadPoolExecutor {
public MyExecutorService(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void terminated() {
super.terminated();
System.out.println("treminated");
}
//调用 shutdownNow, ternimated方法被调用打印
public static void main(String[] args) throws InterruptedException {
MyExecutorService service = new MyExecutorService(1,2,10000,TimeUnit.SECONDS,new
LinkedBlockingQueue<Runnable>(5));
service.shutdownNow();
}
}
- TERMINATED:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED
# 1.2.2 结构说明
(源码查看:两个集合,一个 queue ,一个 hashset )
# 1.2.3 任务的提交
- 添加任务,如果线程池中线程数没达到coreSize,直接创建新线程执行
- 达到core,放入queue
- queue已满,未达到maxSize继续创建线程
- 达到maxSize,根据reject策略处理
- 超时后,线程被释放,下降到coreSize
# 1.3 源码剖析
execute详解
ThreadPoolExecutor
的最基本使用方式就是通过execute
方法提交一个Runnable
任务,首先看图理解execute
的执行逻辑
//任务提交阶段:(4个if条件路线)
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//判断工作数,如果小于coreSize,addWork,注意第二个参数core=true
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//否则,如果线程池还在运行,offer到队列
if (isRunning(c) && workQueue.offer(command)) {
//再检查一下状态
int recheck = ctl.get();
//如果线程池已经终止,直接移除任务,不再响应
if (! isRunning(recheck) && remove(command))
reject(command);
//否则,如果没有线程干活的话,创建一个空work,该work会从队列获取任务去执行
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//队列也满,继续调addWork,但是注意,core=false,开启到maxSize的大门
//超出max的话,addWork会返回false,进入reject
else if (!addWorker(command, false))
reject(command);
}
//线程创建
private boolean addWorker(Runnable firstTask, boolean core) {
//第一步,计数判断,不符合条件打回false
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//判断线程数,注意这里!
//也就说明线程池的线程数是不可能设置任意大的。
//最大29位(CAPACITY=29位二进制)
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re‐read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//第二步,创建新work放入线程集合works(一个HashSet)
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//符合条件,创建新的work并包装task
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//在这里!!!
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//注意,只要是成功add了新的work,那么将该新work立即启动,任务得到执行
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
//任务获取与执行
//在worker执行runWorker()的时候,不停循环,先查看自己有没有携带Task,如果有,执行
while (task != null || (task = getTask()) != null)
//如果没用,会调用getTask,从队列获取任务
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
//判断是不是要超时处理,重点!!!决定了当前线程要不要被释放
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//线程数超出max,并且上次循环中poll等待超时了,那么说明该线程已终止
//将线程队列数量原子性减
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//重点!!!
//如果线程可被释放,那就poll,释放的时间为:keepAliveTime
//否则,线程是不会被释放的,take一直被阻塞在这里,知道来了新任务继续工作
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
//到这里说明可被释放的线程等待超时,已经销毁,设置该标记,下次循环将线程数减少
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
完整流程回顾:
# 1.4 Executors 工具
以上构造函数比较多,为了方便使用,提供了一个Executors工具类
1)newCachedThreadPool() : 弹性线程数
2)newFixedThreadPool(int nThreads) : 固定线程数
3)newSingleThreadExecutor() : 单一线程数
4)newScheduledThreadPool(int corePoolSize) : 可调度,常用于定时
# 1.5 注意点
# 1.5.1 线程池是如何保证线程不被销毁的呢?
答案:如果队列中没有任务时,核心线程会一直阻塞在获取任务的方法,直到返回任务。而任务执行完后,又会进入下一轮 work.runWork()中循环
验证:秘密就藏在核心源码里 ThreadPoolExecutor.getTask()
//work.runWork():
while (task != null || (task = getTask()) != null)
//work.getTask():
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
# 1.5.2 那么线程池中的线程会处于什么状态?
答案:TIMED_WAITING,RUNNABLE,WAITING
验证:起一个线程池,放置一个任务sleep,debug查看结束前后的状态
//debug add watcher:
((ThreadPoolExecutor) poolExecutor).workers.iterator().next().thread.getState()
ThreadPoolExecutor poolExecutor = Executors.newFixedThreadPool(5);
poolExecutor.execute(new Runnable() {
public void run() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
System.out.println("ok");
# 1.5.3 核心线程与非核心线程有区别吗?
答案:没有。被销毁的线程和创建的先后无关。即便是第一个被创建的核心线程,仍然有可能被销毁
验证:看源码,每个works在runWork的时候去getTask,在getTask内部,并没有针对性的区分当前work是否是核心线程或者类似的标记。只要判断works数量超出core,就会调用poll(),否则take()
# 2 锁
# 2.1 概述
锁是一种互斥的机制,在多线程环境中实现对资源的协调与控制,凡是有资源被多线程共享,涉及到你改我改的情况就要考虑锁的加持。
从一个案例看起,在写代码的时候,不注意往往会遇到以下代码...
# 2.1.1 糟糕的实现
package com.itheima;
public class BadCounter {
private static int i=0;
public int get(){
return i;
}
public void inc(){
int j=get();
try {
Thread.sleep(100);
j++;
} catch (InterruptedException e) {
e.printStackTrace();
}
i=j;
}
public static void main(String[] args) throws InterruptedException {
final BadCounter counter = new BadCounter();
//不使用线程10次,对比使用线程10次,看结果
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
public void run() {
counter.inc();
}
}).start();
}
Thread.sleep(3000);
//理论上10才对。可是....
System.out.println(counter.i);
}
}
出问题了....
在遍地spring bean的年代,单例模式下的类变量尤其要注意!
# 2.2 实现方式
# 2.2.1 synchronized
//加synchronized,再测试
public synchronized void inc()
# 2.2.2 Lock
//换lock方式测试
Lock lock = new ReentrantLock();
public void inc() {
lock.lock();
//...
lock.unlock();
}
无论哪种方式加锁均能实现正确计数,但是这个性能实在是感人,后面调优还会提到。
# 2.3 锁的分类及详解
# 2.3.1 乐观锁 / 悲观锁
乐观锁顾名思义,很乐观的认为每次读取数据的时候总是认为没人动过,所以不去加锁。但是在更新的时候回去对比一下原来的值,看有没有被别人更改过。适用于读多写少的场景。
mysql中类比version号更新 update xxx set a=aaa where id=xx and version=1
java中的atomic包属于乐观锁实现,即CAS(下节会详细介绍)
悲观锁在每次读取数据的时候都认为其他人会修改数据,所以读取数据的时候也加锁,这样别人想拿的时候就会阻塞,直到这个线程释放锁,这就影响了并发性能。适合写操作比较多的场景。
mysql中类比 select xxx for update; update xx set a = aaa
案例中synchronized实现就是悲观锁(1.6之后优化为锁升级机制),悲观锁书写不当很容易影响性能(性能部分会讲到)
# 2.3.2 独享锁 / 共享锁
很好理解,独享锁是指该锁一次只能被一个线程所持有,而共享锁是指该锁可被多个线程所持有。
- 案例一:ReentrantLock,独享锁
package com.itheima;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class PrivateLock {
Lock lock = new ReentrantLock();
long start = System.currentTimeMillis();
void read() {
lock.lock();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
System.out.println("read time = "+(System.currentTimeMillis() ‐ start));
}
public static void main(String[] args) {
final PrivateLock lock = new PrivateLock();
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
public void run() {
lock.read();
}
}).start();
}
}
}
结果分析:每个线程结束的时间点逐个上升,锁被独享,一个用完下一个,依次获取锁
- 案例二:ReadWriteLock,read共享,write独享
package com.itheima;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class SharedLock {
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
Lock lock = readWriteLock.readLock();
long start = System.currentTimeMillis();
void read() {
lock.lock();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
System.out.println("end time = "+(System.currentTimeMillis() ‐ start));
}
public static void main(String[] args) {
final SharedLock lock = new SharedLock();
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
public void run() {
lock.read();
}
}).start();
}
}
}
结果分析:每个线程独自跑,各在100ms左右,证明是共享的
- 案例三:同样是上例,换成writeLock
Lock lock = readWriteLock.writeLock();
结果分析:恢复到了1s时长,变为独享
小节:
- 读锁的共享锁可保证并发读是非常高效的,读写,写读 ,写写的过程是互斥的。
- 独享锁与共享锁也是通过AQS来实现的,通过实现不同的方法,来实现独享或者共享。
# 2.3.3 分段锁
从Map一家子说起....
HashMap是线程不安全的,在多线程环境下,使用HashMap进行put操作时,可能会引起死循环,导致CPU利用率接近100%,所以在并发情况下不能使用HashMap。
于是有了HashTable,HashTable是线程安全的。但是HashTable线程安全的策略实在不怎么高明,将get/put等所有相关操作都整成了synchronized的。
那有没有办法做到线程安全,又不这么粗暴呢?基于分段锁的ConcurrentHashMap诞生...
ConcurrentHashMap使用Segment(分段锁)技术,将数据分成一段一段的存储,Segment数组的意义就是将一个大的table分割成多个小的table来进行加锁,Segment数组中每一个元素一把锁,每一个Segment元素存储的是HashEntry数组+链表,这个和HashMap的数据存储结构一样。当访问其中一个段数据被某个线程加锁的时候,其他段的数据也能被其他线程访问,这就使得ConcurrentHashMap不仅保证了线程安全,而且提高了性能。
但是这也引来一个负面影响:ConcurrentHashMap 定位一个元素的过程需要进行两次Hash操作,第一次 Hash定位到 Segment,第二次 Hash 定位到元素所在的链表。所以 Hash 的过程比普通的 HashMap 要长。
备注: JDK1.8ConcurrentHashMap 中抛弃了原有的 Segment 分段锁,而采用了CAS + synchronized安全性。
# 2.3.4 可重入锁
可重入锁指的获取到锁后,如果同步块内需要再次获取同一把锁的时候,直接放行,而不是等待。其意义在于防止死锁。前面使用的synchronized 和ReentrantLock 都是可重入锁。
实现原理实现是通过为每个锁关联一个请求计数器和一个占有它的线程。如果同一个线程再次请求这个锁,计数器将递增,线程退出同步块,计数器值将递减。直到计数器为0锁被释放。
场景见于父类和子类的锁的重入(调super方法),以及多个加锁方法的嵌套调用。
- 案例一:父子可重入
package com.itheima;
public class ParentLock {
byte[] lock = new byte[0];
public void f1(){
synchronized (lock){
System.out.println("f1 from parent");
}
}
}
package com.itheima;
public class SonLock extends ParentLock {
public void f1() {
synchronized (super.lock){
super.f1();
System.out.println("f1 from son");
}
}
public static void main(String[] args) {
SonLock lock = new SonLock();
lock.f1();
}
}
- 案例二:内嵌方法可重入
package com.itheima;
public class NestedLock {
public synchronized void f1(){
System.out.println("f1");
}
public synchronized void f2(){
f1();
System.out.println("f2");
}
public static void main(String[] args) {
NestedLock lock = new NestedLock();
//可以正常打印 f1,f2
lock.f2();
}
}
- 案例三:不可重入锁的典型错误,不要这么做!!!
先看代码,猜一猜结果?
public class BadLock{
Lock lock = new Lock();
public void f1(){
System.out.println("f1");
lock.lock();
f2();
lock.unlock();
}
public void f2(){
lock.lock();
System.out.println("f2");
lock.unlock();
}
public static void main(String[] args) {
BadLock badLock = new BadLock();
//理论上,会打印 f1 和 f2
//实际上,这个错误的设计会导致卡死在f1
badLock.f1();
}
//自定义的锁,现实中不要这么做!!!
class Lock{
private boolean isLocked = false;
public synchronized void lock(){
try {
//想要拿锁,一直判断标记,如果被占就wait等待
while(isLocked) {
wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
//一旦被唤醒,退出while了,自己拿到锁,将标记改为true(已占用)
isLocked = true;
}
public synchronized void unlock(){
//占用标记改成false
isLocked = false;
//同时唤醒等待锁的线程
notify();
}
}
}
# 2.3.5 公平锁 / 非公平锁
基本概念:
常见于AQS,公平锁就是在并发环境中,每个线程在获取锁时会先查看此锁维护的等待队列,如果为空,或者当前线程是等待队列的第一个,就占有锁,否则就会加入到等待队列中,直到按照FIFO的规则从队列中取到自己。
非公平锁与公平锁基本类似,只是在放入队列前先判断当前锁是否被线程持有。如果锁空闲,那么他可以直接抢占,而不需要判断当前队列中是否有等待线程。只有锁被占用的话,才会进入排队。
在现实中想象一下游乐场旋转木马插队现象.....
优缺点:
公平锁的优点是等待锁的线程不会饿死,进入队列规规矩矩的排队,迟早会轮到。缺点是整体吞吐效率相对非公平锁要低,等待队列中除第一个线程以外的所有线程都会阻塞,CPU唤醒阻塞线程的开销比非公平锁大。
非公平锁的性能要高于公平锁,因为线程有几率不阻塞直接获得锁。ReentrantLock默认使用非公平锁就是基于性能考量。但是非公平锁的缺点是可能引发队列中的线程始终拿不到锁,一直排队被饿死。
编码方式:
很简单,ReentrantLock支持创建公平锁和非公平锁(默认),想要实现公平锁,使用new
ReentrantLock(true)。
背后原理:
AQS,后面还会详细讲到。AQS中有一个state标识锁的占用情况,一个队列存储等待线程。
state=0表示锁空闲。如果是公平锁,那就看看队列有没有线程在等,有的话不参与竞争乖乖追加到尾部。如果是非公平锁,那就直接参与竞争,不管队列有没有等待者。
state>0表示有线程占着锁,这时候无论公平与非公平,都直接去排队(想抢也没有)
备注:
因为 ReentrantLock 是可以定义公平非公平锁,次数。所以是 >0 而不是简单的 0 和 1
而 synchronized 只能是非公平锁
# 2.3.6 锁升级
java中每个对象都可作为锁,锁有四种级别,按照量级从轻到重分为:无锁、偏向锁、轻量级锁、重量级锁。
如何理解呢?A占了锁,B就要阻塞等。但是,在操作系统中,阻塞就要存储当前线程状态,唤醒就要再恢复,这个过程是要消耗时间的...
如果A使用锁的时间远远小于B被阻塞和挂起的执行时间,那么我们将B挂起阻塞就相当的不合算。
于是出现自旋:自旋指的是锁已经被其他线程占用时,当前线程不会被挂起,而是在不停的试图获取锁(可以理解为不停的循环),每循环一次表示一次自旋过程。显然这种操作会消耗CPU时间,但是相比线程下文切换时间要少的时候,自旋划算。
而偏向锁、轻量锁、重量锁就是围绕如何使得cpu的占用更划算而展开的。
举个生活的例子,假设公司只有一个会议室(共享资源)
偏向锁:
前期公司只有1个团队,那么什么时候开会都能满足,就不需要询问和查看会议室的占用情况,直接进入使用状态。会议室门口挂了个牌子写着A使用,A默认不需要预约(ThreadID=A)
轻量级锁:
随着业务发展,扩充为2个团队,B团队肯定不会同意A无法无天,于是当AB同时需要开会时,两者竞争,谁抢到谁算谁的。偏向锁升级为轻量级锁,但是未抢到者在门口会不停敲门询问(自旋,循环),开完没有?开完没有?
重量级锁:
后来发现,这种不停敲门的方式很烦,A可能不理不睬,但是B要不停的闹腾。于是锁再次升级。
如果会议室被A占用,那么B团队直接闭嘴,在门口安静的等待(wait进入阻塞),直到A用完后会通知B(notify)。
注意点:
- 上面几种锁都是JVM自己内部实现,我们不需要干预,但是可以配置jvm参数开启/关闭自旋锁、偏向锁。
- 锁可以升级,但是不能反向降级:偏向锁→轻量级锁→重量级锁
- 无锁争用的时候使用偏向锁,第二个线程到了升级为轻量级锁进行竞争,更多线程时,进入重量级锁阻塞
偏重场景:
# 2.3.7 互斥锁 / 读写锁
- 典型的互斥锁:synchronized,ReentrantLock,读写锁:ReadWriteLock 前面都用过了
- 互斥锁属于独享锁,读写锁里的写锁属于独享锁,而读锁属于共享锁
案例:互斥锁用不好可能会失效,看一个典型的锁不住现象!
package com.itheima;
public class ObjectLock {
public static Integer i=0;
public void inc(){
synchronized (this){
int j=i;
try {
Thread.sleep(100);
j++;
} catch (InterruptedException e) {
e.printStackTrace();
}
i=j;
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
public void run() {
//重点!
new ObjectLock().inc();
}
}).start();
}
Thread.sleep(3000);
//理论上10才对。可是....
System.out.println(ObjectLock.i);
}
}
结果分析:每个线程内都是new对象,所以this不是同一把锁,结果锁不住,输出1
- this,换成static的 i 变量试试?
- 换成ObjectLock.class 试试? 可以
- 换成String.class 可以
- 去掉synchronized块,外部方法上加 static synchronized
# 2.4 AQS
# 2.4.1 前言
如果要想真正的理解JUC
下的并发工具的实现原理,我们必须要来学习AQS
,因为它是JUC
下很多类的基石。
在讲解AQS之前,如果老板让你自己写一个SDK层面的锁,给其他同事去使用,你会如何写呢?
1、搞一个状态标记,用来表示持有或未持有锁,但得是volatile
类型的保证线程可见性。
2、编写一个lock
,unlock
函数用于抢锁和释放锁,就是对状态标记的修改操作
3、lock
函数要保证并发下只能有一个线程能抢到锁,其他线程要等待获取锁(阻塞式),可以采用CAS+自旋的方式实现
初步实现如下:
public class MyLock {
// 定义一个状态变量status:为1表示锁被持有,为0表示锁未被持有
private volatile int status;
private static final Unsafe unsafe = reflectGetUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(MyLock.class.getDeclaredField("status"));
} catch (Exception ex) { throw new Error(ex); }
}
private static Unsafe reflectGetUnsafe() {
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
return (Unsafe) field.get(null);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 阻塞式获取锁
* @return
*/
public boolean lock() {
while (!compareAndSet(0,1)) {
}
return true;
}
// cas 设置 status
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
/**
* 释放锁
*/
public void unlock() {
status = 0;
}
}
问题:获取不到锁自旋时,是空转,浪费CPU
1、使用yield
让出CPU执行权,等待调度
public boolean lock() {
while (!compareAndSet(0,1)) {
Thread.yield();//yield+自旋,尽可能的防止CPU空转,让出CPU资源
}
return true;
}
或者可以采用线程休眠的方式,但是休眠时间不太好确定,太长太短都不好。
2、采用等待唤醒机制,但是这里由于没有使用synchronized
关键字,所以也无法使用wait/notify
,但是我们可以使用park/unpark
,获取不到锁的线程park
并且去队列排队,释放锁时从队列拿出一个线程unpark
private static final Queue<Thread> QUEUE = new LinkedBlockingQueue<>();
public boolean lock() {
while (!compareAndSet(0,1)) {
QUEUE.offer(Thread.currentThread());
LockSupport.park();//线程休眠
}
return true;
}
public void unlock() {
status = 0;
LockSupport.unpark(QUEUE.poll());
}
# 2.4.2 AQS概述
AQS(AbstractQueuedSynchronizer):抽象队列同步器,定义了一套多线程访问共享资源的同步器框架,提供了SDK层面的锁机制,JUC中的很多类譬如:ReentrantLock/Semaphore/CountDownLatch......等都是基于它。
通过查阅作者的对于该类的文档注释可以得到如下核心信息:
1、AQS用一个volatile int state;
属性表示锁状态,1表示锁被持有,0表示未被持有,具体的维护由子类去维护,但是提供了修改该属性的三个方法:getState()
,setState(int newState)
,compareAndSetState(int expect, int update)
,其中CAS方法是核心。
2、框架内部维护了一个FIFO的等待队列,是用双向链表实现的,我们称之为CLH队列,
3、框架也内部也实现了条件变量Condition
,用它来实现等待唤醒机制,并且支持多个条件变量
4、AQS支持两种资源共享的模式:独占模式(Exclusive)和共享模式(Share),所谓独占模式就是任意时刻只允许一个线程访问共享资源,譬如ReentrantLock;而共享模式指的就是允许多个线程同时访问共享资源,譬如Semaphore/CountDownLatch
5、使用者只需继承AbstractQueuedSynchronizer
并重写指定的方法,在方法内完成对共享资源state
的获取和释放,至于具体线程等待队列的维护,AQS已经在顶层实现好了,在那些final
的模板方法里。
* <p>To use this class as the basis of a synchronizer, redefine the
* following methods, as applicable, by inspecting and/or modifying
* the synchronization state using {@link #getState}, {@link
* #setState} and/or {@link #compareAndSetState}:
*
* <ul>
* <li> {@link #tryAcquire}
* <li> {@link #tryRelease}
* <li> {@link #tryAcquireShared}
* <li> {@link #tryReleaseShared}
* <li> {@link #isHeldExclusively}
* </ul>
*
* Each of these methods by default throws {@link
* UnsupportedOperationException}. Implementations of these methods
* must be internally thread-safe, and should in general be short and
* not block. Defining these methods is the <em>only</em> supported
* means of using this class. All other methods are declared
* {@code final} because they cannot be independently varied.
6、AQS底层使用了模板方法模式,给我们提供了许多模板方法,我们直接使用即可。
API | 说明 | |
---|---|---|
final void acquire(int arg) | 独占模式获取锁,AQS顶层已实现,内部调用了tryAcquire | 模板方法 |
boolean tryAcquire(int arg) | 独占模式 尝试获取锁,AQS中未实现,由子类去实现,获取到锁返回true | |
final boolean release(int arg) | 释放独占锁,AQS顶层已实现,内部调用了tryRelease | 模板方法 |
boolean tryRelease(int arg) | 尝试释放独占锁,AQS中未实现,由子类去实现,成功释放返回true | |
final void acquireShared(int arg) | 共享模式获取锁,AQS顶层已实现,内部调用了tryAcquireShared | 模板方法 |
int tryAcquireShared(int arg) | 尝试获取共享锁,返回负数表示失败,0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源,AQS中未实现,由子类实现 | |
final boolean releaseShared(int arg) | 释放共享锁,返回true代表释放成功,AQS中已实现,内部调用了tryReleaseShared | 模板方法 |
boolean tryReleaseShared(int arg) | 尝试释放锁,释放后允许唤醒后续等待结点返回true,否则返回false,AQS中未实现,需要由子类实现 | |
boolean isHeldExclusively() | 共享资源是否被独占 |
# 2.4.3 基本使用
此时老板给你加了需求,要求你实现一个基于AQS的锁,那该怎么办呢?
在AbstractQueuedSynchronizer
的类注释中给出了使用它的基本方法,我们按照它的写法尝试即可
/**
* 基于 aqs实现锁
*/
public class MyLock implements Lock {
//同步器
private Syn syn = new Syn();
@Override
public void lock() {
//调用模板方法
syn.acquire(1);
}
@Override
public void unlock() {
//调用模板方法
syn.release(0);
}
// 其他接口方法暂时先不实现 省略
// 实现一个独占同步器
class Syn extends AbstractQueuedSynchronizer{
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0,arg)) {
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
setState(arg);
return true;
}
}
}
# 2.4.4 原理解析
自己实现的锁在使用过程中发现一个问题,就是有时候有的线程特别容易抢到锁,而有的线程老是抢不到锁,虽说线程们抢锁确实看命,但能不能加入一种设计,让各个线程机会均等些,起码不要出现某几个线程总是特倒霉抢不到锁的情况吧!
这其实就是涉及到锁是否是公平的,那么什么是公平锁什么是非公平锁呢?
这我们就不得不深入我们使用的模板方法中看一眼了
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//结合我自己写的尝试获取锁的方法
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0,arg)) {
return true;
}
return false;
}
这里大概描述如下:
1、线程一来首先调用tryAcquire
,在tryAcquire
中直接CAS获取锁,如果获取不成功通过addWaiter
加入等待队列,然后走acquireQueued
让队列中的某个等待线程去获取锁。
2、不公平就体现在这里,线程来了也不先看一下等待队列中是否有线程在等待,如果没有线程等待,那直接获取锁没什么 问题,如果有线程等待就直接去获取锁不就相当于插队么?
那如何实现这种公平性呢?这就不得不探究一下AQS的内部的实现原理了,下面我们依次来看:
1、查看AbstractQueuedSynchronizer
的类定义,虽然它里面代码很多,但重要的属性就那么几个,
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private volatile int state;
private transient volatile Node head;
private transient volatile Node tail;
static final class Node {
//其他不重要的略
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
}
public class ConditionObject implements Condition, java.io.Serializable {...}
}
结合前面讲的AQS的类文档注释不难猜到,内部类 Node
以及其类型的变量 head
和 tail
就表示 AQS 内部的一个等待队列,而剩下的 state
变量就用来表示锁的状态。
等待队列应该就是线程获取锁失败时,需要临时存放的一个地方,用来等待被唤醒并尝试获取锁。再看 Node
的属性我们知道,Node
存放了当前线程的指针 thread
,也即可以表示当前线程并对其进行某些操作,prev
和 next
说明它构成了一个双向链表,也就是为某些需要得到前驱或后继节点的算法提供便利。
2、AQS加锁最核心的代码就是如下,我们要来探究它的实现原理
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
它的原理及整个过程我们以图的形式说明如下:
3、原理搞懂了,那如何让自定义的锁是公平的呢?
其实导致不公平的原因就是线程每次调用acquire
时,都会先去tryAcquire
,而该方法目前的实现时直接去抢锁,也不看现在等待队列中有没有线程在排队,如果有线程在排队,那岂不是变成了插队,导致不公平。
所以现在的解决办法就是,在tryAcquire
时先看一下等待队列中是否有在排队的,如果有那就乖乖去排队,不插队,如果没有则可以直接去获取锁。
那如何知道线程AQS等待队列中是否有线程排队呢?其实AQS顶层已经实现好了,它提供了一个hasQueuedPredecessors
函数:如果在当前线程之前有一个排队的线程,则为True; 如果当前线程位于队列的头部(head.next
)或队列为空,则为false。
protected boolean tryAcquire(int arg) {
//先判断等待队列中是否有线程在排队 没有线程排队则直接去获取锁
if (!hasQueuedPredecessors() && compareAndSetState(0,arg)) {
return true;
}
return false;
}
4、现在已经有公平锁了,但是成年人的世界不是做选择题,而是都想要,自己编写的锁既能支持公平锁,也支持非公平锁,让使用者可以自由选择,怎么办?
其实只要稍微改造一下即可,
public class MyLock implements Lock {
//同步器
private Sync syn ;
MyLock () {
syn = new NoFairSync();
}
MyLock (boolean fair) {
syn = fair ? new FairSync():new NoFairSync();
}
@Override
public void lock() {
//调用模板方法
syn.acquire(1);
}
@Override
public void unlock() {
//调用模板方法
syn.release(0);
}
// Lock接口其他方法暂时先不实现 略
// 实现一个独占同步器
class Sync extends AbstractQueuedSynchronizer{
@Override
protected boolean tryRelease(int arg) {
setState(arg);
return true;
}
}
class FairSync extends Sync {
@Override
protected boolean tryAcquire(int arg) {
//先判断等待队列中是否有线程在排队 没有线程排队则直接去获取锁
if (!hasQueuedPredecessors() && compareAndSetState(0,arg)) {
return true;
}
return false;
}
}
class NoFairSync extends Sync {
@Override
protected boolean tryAcquire(int arg) {
//直接去获取锁
if (compareAndSetState(0,arg)) {
return true;
}
return false;
}
}
}
5、现在锁的公平性问题解决了,但是老板又出了新的需求,要求我们的锁支持可重入,因为它写了如下一段代码,发现一直获取不到锁
static Lock lock = new MyLock();
static void test3() {
lock.lock();
try {
System.out.println("test3 get lock,then do something ");
test4();
} finally {
lock.unlock();
}
}
static void test4() {
lock.lock();
try {
System.out.println("test4 get lock,then do something ");
} finally {
lock.unlock();
}
}
那如何让锁支持可重入呢?也就是说如果一个线程持有锁之后,还能继续获取锁,也就是说让锁只对不同线程互斥。
查看AbstractQueuedSynchronizer
的定义我们发现,它还继承自另一个类:AbstractOwnableSynchronizer
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {...}
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread thread) {...}
protected final Thread getExclusiveOwnerThread(){...}
}
看到这我们明白了,原来AQS
中有个变量是可以保存当前持有独占锁的线程的。那好办了,当我们获取锁时,如果发现锁被持有不要着急放弃,先看看持有锁的线程是否时当前线程,如果是还能继续获取锁。
另外关于可重入锁,还要注意一点,锁的获取和释放操作是成对出现的,就像下面这样
lock
lock
lock
lock
....
unlock
unlock
unlock
unlock
所以对于重入锁不仅要能记录锁被持有,还要记录重入的次数,释放的时候也不是直接将锁真实的释放,而是先减少重入次数,能释放的时候在释放。
故此时状态变量state
不在只有两个取值0,1
,某线程获取到锁state=1
,如果当前线程重入获取只需增加状态值state=2
,依次同理,锁释放时释放一次状态值-1
,当state=0
时才真正释放,其他线程才能继续获取锁。
修改我们锁的代码如下:公平非公平在可重入上的逻辑是一样的
public class MyLock implements Lock {
//同步器
private Sync syn ;
MyLock () {
syn = new NoFairSync();
}
MyLock (boolean fair) {
syn = fair ? new FairSync():new NoFairSync();
}
@Override
public void lock() {
//调用模板方法
syn.acquire(1);
}
@Override
public void unlock() {
//调用模板方法
syn.release(1);
}
// Lock接口其他方法暂时先不实现 略
// 实现一个独占同步器
class Sync extends AbstractQueuedSynchronizer{
@Override
protected boolean tryRelease(int arg) {
if (Thread.currentThread() != getExclusiveOwnerThread()) {
throw new IllegalMonitorStateException();
}
boolean realRelease = false;
int nextState = getState() - arg;
if (nextState == 0) {
realRelease = true;
setExclusiveOwnerThread(null);
}
setState(nextState);
return realRelease;
}
}
class FairSync extends Sync {
@Override
protected boolean tryAcquire(int arg) {
final Thread currentThread = Thread.currentThread();
int currentState = getState();
if (currentState == 0 ) { // 可以获取锁
//先判断等待队列中是否有线程在排队 没有线程排队则直接去获取锁
if (!hasQueuedPredecessors() && compareAndSetState(0,arg)) {
setExclusiveOwnerThread(currentThread);
return true;
}
}else if (currentThread == getExclusiveOwnerThread()) {
//重入逻辑 增加 state值
int nextState = currentState + arg;
if (nextState < 0) {
throw new Error("Maximum lock count exceeded");
}
setState(nextState);
return true;
}
return false;
}
}
class NoFairSync extends Sync {
@Override
protected boolean tryAcquire(int arg) {
final Thread currentThread = Thread.currentThread();
int currentState = getState();
if (currentState ==0 ) { // 可以获取锁
//直接去获取锁
if (compareAndSetState(0,arg)) {
setExclusiveOwnerThread(currentThread);
return true;
}
}else if (currentThread == getExclusiveOwnerThread()) {
//重入逻辑 增加 state值
int nextState = currentState + arg;
if (nextState < 0) {
throw new Error("Maximum lock count exceeded");
}
setState(nextState);
return true;
}
return false;
}
}
}
好了至此我们已经掌握了AQS
的核心原理以及它的一个经典实现ReentrantLock
几乎全部的知识点,此时打开ReentrantLock
的源码你会发现一切都很清爽!!!
# 3 原子操作 (atomic)
# 3.1 概念
原子(atom)本意是“不能被进一步分割的最小粒子”,而原子操作(atomic operation)意为"不可被中断的一个或一系列操作" 。类比于数据库事务,redis的multi。
# 3.2 CAS
Compare And Set(或Compare And Swap),翻译过来就是比较并替换,CAS操作包含三个操作数——内存位置(V)、预期原值(A)、新值(B)。从第一视角来看,理解为:我认为位置 V 应该是 A,如果是A,则将 B 放到这个位置;否则,不要更改,只告诉我这个位置现在的值即可。
CAS(Compare-and-Swap/Exchange),即比较并替换,是一种实现并发常用到的技术。CAS的整体架构如下:
计数器问题发生归根结底是取值和运算后的赋值中间,发生了插队现象,他们不是原子的操作。前面的计数器使用加锁方式实现了正确计数,下面,基于CAS的原子类上场....
juc中提供了Atomic开头的类,基于cas实现原子性操作,最基本的应用就是计数器
package com.itheima;
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicCounter {
private static AtomicInteger i = new AtomicInteger(0);
public int get(){
return i.get();
}
public void inc(){
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
i.incrementAndGet();
}
public static void main(String[] args) throws InterruptedException {
final AtomicCounter counter = new AtomicCounter();
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
public void run() {
counter.inc();
}
}).start();
}
Thread.sleep(3000);
//同样可以正确输出10
System.out.println(counter.i.get());
}
}
CAS虽然很高效的解决了原子操作问题,但是CAS仍然存在三大问题。
- 自旋(循环)时间长开销很大,如果CAS失败,会一直进行尝试。如果CAS长时间一直不成功,可能会给CPU带来很大的开销,注意这里的自旋是在用户态/SDK 层面实现的。
- 只能保证一个共享变量的原子操作,对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁来保证原子性。
- ABA问题,在使用CAS前要考虑清楚“ABA”问题是否会影响程序并发的正确性,如果需要解决ABA问题,改用传统的互斥同步可能会比CAS更高效。
# 3.3 atomic
上面展示了AtomicInteger,关于atomic包,还有很多其他类型:
基本类型
- AtomicBoolean:以原子更新的方式更新boolean;
- AtomicInteger:以原子更新的方式更新Integer;
- AtomicLong:以原子更新的方式更新Long;
引用类型
- AtomicReference : 原子更新引用类型
- AtomicReferenceFieldUpdater :原子更新引用类型的字段
- AtomicMarkableReference : 原子更新带有标志位的引用类型
数组
- AtomicIntegerArray:原子更新整型数组里的元素。
- AtomicLongArray:原子更新长整型数组里的元素。
- AtomicReferenceArray:原子更新引用类型数组里的元素。
字段
- AtomicIntegerFieldUpdater:原子更新整型的字段的更新器。
- AtomicLongFieldUpdater:原子更新长整型字段的更新器。
- AtomicStampedReference:原子更新带有版本号的引用类型。
# 3.4 注意!
使用atomic要注意原子性的边界,把握不好会起不到应有的效果,原子性被破坏。
案例:原子性被破坏现象
package com.itheima;
import java.util.concurrent.atomic.AtomicInteger;
public class BadAtomic {
AtomicInteger i = new AtomicInteger(0);
static int j=0;
public void badInc(){
int k = i.incrementAndGet();
try {
Thread.sleep(new Random().nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
j=k;
}
public static void main(String[] args) throws InterruptedException {
BadAtomic atomic = new BadAtomic();
for (int i = 0; i < 10; i++) {
new Thread(()‐>{
atomic.badInc();
}).start();
}
Thread.sleep(3000);
System.out.println(atomic.j);
}
}
结果分析:
- 每次都不一样,总之不是10
- i是原子性的,没问题。但是再赋值,变成了两部操作,原子性被打破
- 在badInc上加synchronized,问题解决
volatile 只能保可见性,保证不了原子性。
# 4 Fork/Join
# 4.1 概念
一个大的任务,可能拆出来多个子任务之后,分配给多个线程去处理,处理完成后进行合并结果
ForkJoin是由JDK1.7后提供多线并发处理框架。ForkJoinPool由Java大师Doug Lea主持编写,处理逻辑大概分为两步。
1.任务分割:Fork(分岔),先把大的任务分割成足够小的子任务,如果子任务比较大的话还要对子任务进行继续分割。
2.合并结果:join,分割后的子任务被多个线程执行后,再合并结果,得到最终的完整输出。
# 4.2 组成
- ForkJoinTask:主要提供fork和join两个方法用于任务拆分与合并;多数使用RecursiveAction(无返回值的任务)和RecursiveTask(需要返回值)来实现compute方法。
- **ForkJoinPool:**调度ForkJoinTask的线程池;
- **ForkJoinWorkerThread:**Thread的子类,存放于线程池中的工作线程(Worker);
- **WorkQueue:**任务队列,用于保存任务;
# 4.3 基本使用
一个典型的例子:计算1-1000的和
package com.itheima.thread;
import java.util.concurrent.*;
public class SumTask {
private static final Integer MAX = 100; //每隔100创建一个子任务做这个事情
static class SubTask extends RecursiveTask<Integer> {
// 子任务开始计算的值
private Integer start;
// 子任务结束计算的值
private Integer end;
// 框架会不断边界赋值
public SubTask(Integer start , Integer end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
if(end ‐ start < MAX) {
//小于边界,开始计算
System.out.println("start = " + start + ";end = " + end);
Integer totalValue = 0;
for(int index = this.start ; index <= this.end ; index++) {
totalValue += index;
}
return totalValue;
}else {
//否则,中间劈开继续拆分
SubTask subTask1 = new SubTask(start, (start + end) / 2);
subTask1.fork();
SubTask subTask2 = new SubTask((start + end) / 2 + 1 , end);
subTask2.fork();
return subTask1.join() + subTask2.join();
}
}
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
Future<Integer> taskFuture = pool.submit(new SubTask(1,1000));
try {
Integer result = taskFuture.get();
System.out.println("result = " + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace(System.out);
}
}
}
# 4.4 设计思想
- 普通线程池内部有两个重要集合:工作线程集合,和任务队列。
- ForkJoinPool也类似,工作集合里放的是特殊线程ForkJoinWorkerThread,任务队列里放的是特殊任务ForkJoinTask
- 不同之处在于,普通线程池只有一个队列。而ForkJoinPool的工作线程ForkJoinWorkerThread每个线程内都绑定一个双端队列。
- 在fork的时候,也就是任务拆分,将拆分的task会被当前线程放到自己的队列中。
- 队列中的任务被线程执行时,有两种模式,默认是同步模式(asyncMode==false)从队尾取任务(LIFO)
- 窃取:当自己队列中执行完后,工作线程会到其他队列的队首获取任务(FIFO),取到后如果任务再次fork,拆分会被放入当前线程的队列,依次扩张
# 4.5 注意点
使用ForkJoin将相同的计算任务通过多线程执行。但是在使用中需要注意:
- 注意任务切分的粒度,也就是fork的界限。并非越小越好
- 判断要不要使用ForkJoin。任务量不是太大的话,串行可能优于并行。因为多线程会涉及到上下文的切换
# 5 volatile
# 5.1基本概念
回顾Java 内存模型中的可见性、原子性和有序性:
- 可见性,是指线程之间的可见性,一个线程修改的状态对另一个线程是可见的
- 原子性,指的是这个操作是原子不可拆分的,不允许别的线程中间插队操作
- 有序性指的是你写的代码的顺序要和最终执行的指令保持一致。因为在Java内存模型中,允许编译器和处理器对指令进行重排序,重排序过程不会影响到单线程程序的执行,却会影响到多线程并发执行的正确性。
volatile要解决的就是可见性和有序性问题,无法解决原子性。
volatile用处,加在变量上的,也就是一个线程更新了这个变量,其他线程可以及时可见。
volatile要解决的就是可见性和有序性问题,无法解决原子性
如果保证原子性需要搭配锁。
# 5.2 使用方式
先看一个经典案例:
public class VolatileTest extends Thread {
private static boolean flag = true;
public void run() {
while (flag) ;
System.out.println("finish");
}
public static void main(String[] args) throws Exception {
new VolatileTest().start();
Thread.sleep(2000);
flag = false;
}
}
猜一猜结果?
给flag加上 volatile再试试......对于新启 的线程内变量是可见的,终止了循环。
# 5.3 原理
Java内存模型分为主内存和线程工作内存两大类。
- 主内存:多个线程共享的内存。方法区和堆属于主内存区域。
- 线程工作内存:每个线程独享的内存。虚拟机栈、本地方法栈、程序计数器属于线程独享的工作内存。
Java内存模型规定,所有变量都需要存储在主内存中,线程需要时,在自己的工作内存保存变量的副本,线程对变量的所有操作都在工作内存中进行,执行结束后再同步到主内存中去。这里必然会存在时间差,在这个时间差内,该线程对副本的操作,对于其他线程是不见的,从而造成了可见性问题。
但是,当对volatile变量进行写操作的时候,JVM会向处理器发送一条lock前缀的指令,将这个缓存中的变量回写到系统主存中。
同时,在多处理器下,为了保证各个处理器的缓存是一致的,就会实现缓存一致性协议。每个处理器通过嗅探在总线上传播的数据来检查自己缓存的值是不是过期,一旦发现过期就会将当前处理器的缓存行设置成无效状态,强制从主内存读取,这就保障了可见性。
而volatile变量,通过内存屏障(JMM课程)可以禁止指令重排。从而实现指令的有序性。
1.当对volatile变量进行写操作的时候(比如工作内存B),会将变量的新值写回到主内容的共享变量里。
2。每个处理器通过嗅探在总线上传播的数据来检查自己缓存的值是不是过期(工作内存A),一旦发现过期就会将当前处理器的缓存行设置成无效状态,强制从主内存读取,这就保障了可见性、
# 5.4 注意!
volatile不能保证锁的原子性。
案例:给前面的计数器案例里加上volatile试试
package com.itheima;
public class BadVolatile {
private static volatile int i=0;
public int get(){
return i;
}
public void inc(){
int j=get();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
i=j+1;
}
public static void main(String[] args) throws InterruptedException {
final BadVolatile counter = new BadVolatile();
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
public void run() {
counter.inc();
}
}).start();
}
Thread.sleep(3000);
//理论上10才对。可是....
System.out.println(counter.i);
}
}
达不到目的。说明原子性无法保障。
# 6 ConcurrentHashMap
# 6.1 基本使用
很简单,new创建即可:
public static void main(String[] args) throws InterruptedException {
//定义ConcurrentHashMap
Map map = new ConcurrentHashMap();
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
//多线程下的put可以放心使用
map.put(UUID.randomUUID().toString(), "1");
}
}).start();
}
Thread.sleep(3000);
System.out.println(map);
}
# 6.2 实现原理
1.7是分段锁,上面阐述过,1.8采用的是cas + synchronized 操作,具体看代码:
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());//计算hash
int binCount = 0;
for (Node<K,V>[] tab = table;;) {//自旋,确保插入成功
Node<K,V> f; int n, i, fh; K fk; V fv;
if (tab == null || (n = tab.length) == 0)
tab = initTable();//表为空的话,初始化表
else if ((f = tabAt(tab, i = (n ‐ 1) & hash)) == null) {
//否则,插入元素,看下面的 casTabAt 方法
//cas 在这里!比较是否为null,如果null才会设置并break,否则到else
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
break;
}
//...
else {
V oldVal = null;
//其他情况下,加锁保持
//synchronized 在这里!
//加锁,锁的是当前插槽上的头节点f(类似分段锁)
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
//当前插槽上的节点数量
binCount = 1;
//沿着Node链往后找
for (Node<K,V> e = f;; ++binCount) {
K ek;
//如果找到相同key,说明之前put过,覆盖
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
//put方法上,onlyIfAbsent=false
//即要不要覆盖?
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//否则,新key,新Node插入到最后
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value);
break;
}
}
}
//如果是红黑树,说明已经转化过,按树的规则放入Node
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
if (binCount != 0) {
//如果节点数达到临界值,链表转成树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//计数
addCount(1L, binCount);
return null;
}
//compareAndSetObject,比较并插入,典型CAS操作
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSetObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
//get取值
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
//判断table是不是空的,当前桶上是不是空的
//如果为空,返回null
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n ‐ 1) & h)) != null) {
//找到对应hash槽的第一个node,如果key相等,返回value
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//如果正在扩容,不影响,继续顺着node找即可
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//其他情况,逐个遍历,比对key,找到后返回value
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
# 总结:
put过程:
1.根据key的hash值定位到桶位置
2.如果table为空if,先初始化table。
3.如果table当前桶里没有node,cas添加元素。成功则跳出循环,失败则进入下一轮for循环。
4.判断是否有其他线程在扩容,有则帮忙扩容,扩容完成再添加元素。
5.如果桶的位置不为空,遍历该桶的链表或者红黑树,若key已存在,则覆盖,不存在则将key插入到链表或红黑树的尾部。
get过程:
1.根据key的hash值定位到桶位置。
2.map是否初始化,没有初始化则返回null
3.定位的桶是否有头结点,没有返回null
4.是否有其他线程在扩容,有的话调用find方法沿node指针往后查找。扩容与find可以并行,因为node的next指针不会变
5.若没有其他线程在扩容,则遍历桶对应的链表或者红黑树,使用equals方法进行比较。key相同则返回value,不存在则返回null
# 6.3 注意!
注意正确理解ConcurrentHashMap线程安全这个问题。看一个典型案例:
package com.itheima;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class BadConcurrent {
public static void main(String[] args) throws InterruptedException {
Map<String,Integer> map = new ConcurrentHashMap();
map.put("val",0);
for (int i = 0; i < 10; i++) {
new Thread(()‐>{
int v = map.get("val");
v++;
try {
Thread.currentThread().sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
map.put("val",v);
}).start();
}
Thread.sleep(3000);
System.out.println(map);
}
}
猜一猜结果? 会不会累加到10?
结果是达不到效果:输出 {val=1}
# 7 并发容器
除了上面提到的ConcurrentHashMap,还有很多其他的并发容器,本节统一汇总。
# 7.1 背景
java中的集合类非常丰富(ArrayList,HashMap之类),在单线程下用的顺风顺水,但这些集合类都是非线程安全的,即在多线程的环境下,都需要其他额外的手段来保证数据的正确性。常见手段有两种:
- 自己通过synchronized关键字将所有使用到非线程安全的容器代码全部同步执行
- Vector、Stack、HashTable、Collections.synchronized等同步容器法,在早期的jdk中用的比较多,实现方式和上面几乎一样,而且多步操作时如果外面不额外加一层synchronized,依然锁不住。实际效果还不如上面
于是,并发容器诞生......
# 7.2 清单
1.ConcurrentHashMap
对应:HashMap
目标:代替Hashtable、synchronizedMap,使用最多,前面详细介绍过
原理:JDK7中采用Segment分段锁,JDK8中采用CAS+synchronized
2.CopyOnWriteArrayList
对应:ArrayList
目标:代替Vector、synchronizedList
原理:高并发往往是读多写少的特性,读操作不加锁,而对写操作加Lock独享锁,先复制一份新的集合,在新的集合上面修改,然后将新集合赋值给旧的引用,并通过volatile 保证其可见性。
查看源码: volatile array , lock 加锁,数组复制
3.CopyOnWriteArraySet
对应:HashSet
目标:代替synchronizedSet
原理:与CopyOnWriteArrayList实现原理类似。
4.ConcurrentSkipListMap
对应:TreeMap
目标:代替synchronizedSortedMap(TreeMap)
原理:基于Skip list(跳表)来代替平衡树,按照分层key上下链接指针来实现。
附加:跳表(数据结构)
5.ConcurrentSkipListSet
对应:TreeSet
目标:代替synchronizedSortedSet(TreeSet)
原理:内部基于ConcurrentSkipListMap实现,原理一致
6.ConcurrentLinkedQueue
对应:LinkedList
对应:无界线程安全队列
原理:通过队首队尾指针,以及Node类元素的next实现FIFO队列
7.BlockingQueue
对应:Queue
特点:拓展了Queue,增加了可阻塞的插入和获取等操作
原理:通过ReentrantLock实现线程安全,通过Condition实现阻塞和唤醒
实现类:
- LinkedBlockingQueue:基于链表实现的可阻塞的FIFO队列
- ArrayBlockingQueue:基于数组实现的可阻塞的FIFO队列
- PriorityBlockingQueue:按优先级排序的队列