多线程&并发设计原理
hm
# 1 多线程回顾
# 1.1 Thread和Runnable
# 1.1.1 Java中的线程
创建执行线程有两种方法:
- 扩展Thread 类。
- 实现Runnable 接口。
扩展Thread类的方式创建新线程:
package com.lagou.concurrent.demo;
public class MyThread extends Thread {
@Override
public void run() {
while (true) {
System.out.println(Thread.currentThread().getName() + " 运行了");
try {
Thread.sleep(800);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package com.lagou.concurrent.demo;
public class Main {
public static void main(String[] args) {
MyThread thread = new MyThread();
thread.start();
}
}
实现Runnable接口的方式创建线程:
package com.lagou.concurrent.demo;
public class MyRunnable implements Runnable {
@Override
public void run() {
while (true) {
System.out.println(Thread.currentThread().getName() + " 运行了");
try {
Thread.sleep(800);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package com.lagou.concurrent.demo;
public class Main {
public static void main(String[] args) {
Thread thread = new Thread(new MyRunnable());
thread.start();
}
}
# 1.1.2 Java中的线程:特征和状态
所有的Java 程序,不论并发与否,都有一个名为主线程的Thread 对象。执行该程序时, Java虚拟机( JVM )将创建一个新Thread 并在该线程中执行main()方法。这是非并发应用程序中唯一的线程,也是并发应用程序中的第一个线程。
Java中的线程共享应用程序中的所有资源,包括内存和打开的文件,快速而简单地共享信息。但是必须使用同步避免数据竞争。
Java中的所有线程都有一个优先级,这个整数值介于Thread.MIN_PRIORITY(1)和
Thread.MAX_PRIORITY(10)之间,默认优先级是Thread.NORM_PRIORITY(5)。线程的
执行顺序并没有保证,通常,较高优先级的线程将在较低优先级的钱程之前执行。
- 在Java 中,可以创建两种线程:
- 守护线程。
- 非守护线程。
区别在于它们如何影响程序的结束。
Java程序结束执行过程的情形:
- 程序执行Runtime类的exit()方法, 而且用户有权执行该方法。
- 应用程序的所有非守护线程均已结束执行,无论是否有正在运行的守护线程。
守护线程通常用在作为垃圾收集器或缓存管理器的应用程序中,执行辅助任务。在线程start之前调用isDaemon()方检查线程是否为守护线程,也可以使用setDaemon()方法将某个线程确立为守护线程。
- Thread.States类中定义线程的状态如下:
- NEW:Thread对象已经创建,但是还没有开始执行。
- RUNNABLE:Thread对象正在Java虚拟机中运行。
- BLOCKED : Thread对象正在等待锁定。
- WAITING:Thread 对象正在等待另一个线程的动作。
- TIME_WAITING:Thread对象正在等待另一个线程的操作,但是有时间限制。
- TERMINATED:Thread对象已经完成了执行。
getState()方法获取Thread对象的状态,可以直接更改线程的状态。
在给定时间内, 线程只能处于一个状态。这些状态是JVM使用的状态,不能映射到操作系统的线程状态。
线程状态的源码:
# 1.1.3 Thread类和Runnable 接口
Runnable接口只定义了一种方法:run()方法。这是每个线程的主方法。当执行start()方法启动新线程时,它将调用run()方法。
Thread类其他常用方法:
获取和设置Thread对象信息的方法。
- getId():该方法返回Thread对象的标识符。该标识符是在钱程创建时分配的一个正整数。在线程的整个生命周期中是唯一且无法改变的。
- getName()/setName():这两种方法允许你获取或设置Thread对象的名称。这个名称是一个String对象,也可以在Thread类的构造函数中建立。
- getPriority()/setPriority():你可以使用这两种方法来获取或设置Thread对象的优先级。
- isDaemon()/setDaemon():这两种方法允许你获取或建立Thread对象的守护条件。
- getState():该方法返回Thread对象的状态。
interrupt():中断目标线程,给目标线程发送一个中断信号,线程被打上中断标记。
interrupted():判断目标线程是否被中断,但是将清除线程的中断标记。
isinterrupted():判断目标线程是否被中断,不会清除中断标记。
sleep(long ms):该方法将线程的执行暂停ms时间。
join():暂停线程的执行,直到调用该方法的线程执行结束为止。可以使用该方法等待另一个Thread对象结束。
setUncaughtExceptionHandler():当线程执行出现未校验异常时,该方法用于建立未校验异常的控制器。
currentThread():Thread类的静态方法,返回实际执行该代码的Thread对象。
join示例程序:
package com.lagou.concurrent.demo;
public class MyThread extends Thread {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println("MyThread线程:" + i);
}
}
}
package com.lagou.concurrent.demo;
public class Main {
public static void main(String[] args) throws InterruptedException {
MyThread myThread = new MyThread();
myThread.start();
myThread.join();
System.out.println("main线程 - 执行完成");
}
}
# 1.1.4 Callable
Callable 接口是一个与Runnable 接口非常相似的接口。Callable 接口的主要特征如下。
- 接口。有简单类型参数,与call()方法的返回类型相对应。
- 声明了call()方法。执行器运行任务时,该方法会被执行器执行。它必须返回声明中指定类型的对象。
- call()方法可以抛出任何一种校验异常。可以实现自己的执行器并重载afterExecute()方法来处理这些异常。
package com.lagou.concurrent.demo;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class Main {
public static void main(String[] args) throws ExecutionException,
InterruptedException {
MyCallable myCallable = new MyCallable();
// 设置Callable对象,泛型表示Callable的返回类型
FutureTask<String> futureTask = new FutureTask<String>(myCallable);
// 启动处理线程
new Thread(futureTask).start();
// 同步等待线程运行的结果
String result = futureTask.get();
// 5s后得到结果
System.out.println(result);
}
}
package com.lagou.concurrent.demo;
import java.util.concurrent.Callable;
public class MyCallable implements Callable<String> {
@Override
public String call() throws Exception {
Thread.sleep(5000);
return "hello world call() invoked!";
}
}
package com.lagou.concurrent.demo;
import java.util.concurrent.*;
public class Main2 {
public static void main(String[] args) throws ExecutionException,
InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 5, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)
) {
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
}
};
Future<String> future = executor.submit(new MyCallable());
String s = future.get();
System.out.println(s);
executor.shutdown();
}
}
# 1.2 synchronized关键字
# 1.2.1 锁的对象
synchronized关键字“给某个对象加锁”,示例代码:
public Class MyClass {
public void synchronized method1() {
// ...
}
public static void synchronized method2() {
// ...
}
}
等价于:
public class MyClass {
public void method1() {
synchronized(this) {
// ...
}
}
public static void method2() {
synchronized(MyClass.class) {
// ...
}
}
}
实例方法的锁加在对象myClass上;静态方法的锁加在MyClass.class上。
# 1.2.2 锁的本质
如果一份资源需要多个线程同时访问,需要给该资源加锁。加锁之后,可以保证同一时间只能有一个线程访问该资源。资源可以是一个变量、一个对象或一个文件等。
锁是一个“对象”,作用如下:
- 这个对象内部得有一个标志位(state变量),记录自己有没有被某个线程占用。最简单的情况是这个state有0、1两个取值,0表示没有线程占用这个锁,1表示有某个线程占用了这个锁。
- 如果这个对象被某个线程占用,记录这个线程的thread ID。
- 这个对象维护一个thread id list,记录其他所有阻塞的、等待获取拿这个锁的线程。在当前线程释放锁之后从这个thread id list里面取一个线程唤醒。
要访问的共享资源本身也是一个对象,例如前面的对象myClass,这两个对象可以合成一个对象。代码就变成synchronized(this) {…},要访问的共享资源是对象a,锁加在对象a上。当然,也可以另外新建一个对象,代码变成synchronized(obj1) {…}。这个时候,访问的共享资源是对象a,而锁加在新建的对象obj1上。
资源和锁合二为一,使得在Java里面,synchronized关键字可以加在任何对象的成员上面。这意味着,这个对象既是共享资源,同时也具备“锁”的功能!
# 1.2.3 实现原理
锁如何实现?
在对象头里,有一块数据叫Mark Word。在64位机器上,Mark Word是8字节(64位)的,这64位中有2个重要字段:锁标志位和占用该锁的thread ID。因为不同版本的JVM实现,对象头的数据结构会有各种差异。
# 1.3 wait与notify
# 1.3.1 生产者−消费者模型
生产者-消费者模型是一个常见的多线程编程模型,如下图所示:
一个内存队列,多个生产者线程往内存队列中放数据;多个消费者线程从内存队列中取数据。要实现这样一个编程模型,需要做下面几件事情:
- 内存队列本身要加锁,才能实现线程安全。
- 阻塞。当内存队列满了,生产者放不进去时,会被阻塞;当内存队列是空的时候,消费者无事可做,会被阻塞。
- 双向通知。消费者被阻塞之后,生产者放入新数据,要notify()消费者;反之,生产者被阻塞之后,消费者消费了数据,要notify()生产者。
第1件事情必须要做,第2件和第3件事情不一定要做。例如,可以采取一个简单的办法,生产者放不进去之后,睡眠几百毫秒再重试,消费者取不到数据之后,睡眠几百毫秒再重试。但这个办法效率低下,也不实时。所以,我们只讨论如何阻塞、如何通知的问题。
1.如何阻塞?
办法1:线程自己阻塞自己,也就是生产者、消费者线程各自调用wait()和notify()。
办法2:用一个阻塞队列,当取不到或者放不进去数据的时候,入队/出队函数本身就是阻塞的。
2.如何双向通知?
办法1:wait()与notify()机制。
办法2:Condition机制。
单个生产者单个消费者线程的情形:
package com.lagou.concurrent.demo;
public class Main {
public static void main(String[] args) {
MyQueue myQueue = new MyQueue();
ProducerThread producerThread = new ProducerThread(myQueue);
ConsumerThread consumerThread = new ConsumerThread(myQueue);
producerThread.start();
consumerThread.start();
}
}
package com.lagou.concurrent.demo;
public class MyQueue {
private String[] data = new String[10];
private int getIndex = 0;
private int putIndex = 0;
private int size = 0;
public synchronized void put(String element) {
if (size == data.length) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
data[putIndex] = element;
++putIndex;
if (putIndex == data.length) putIndex = 0;
++size;
notify();
}
public synchronized String get() {
if (size == 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
String result = data[getIndex];
++getIndex;
if (getIndex == data.length) getIndex = 0;
--size;
notify();
return result;
}
}
package com.lagou.concurrent.demo;
import java.util.Random;
public class ProducerThread extends Thread {
private final MyQueue myQueue;
private final Random random = new Random();
private int index = 0;
public ProducerThread(MyQueue myQueue) {
this.myQueue = myQueue;
}
@Override
public void run() {
while (true) {
String tmp = "ele-" + index;
myQueue.put(tmp);
System.out.println("添加元素:" + tmp);
index++;
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package com.lagou.concurrent.demo;
import java.util.Random;
public class ConsumerThread extends Thread {
private final MyQueue myQueue;
private final Random random = new Random();
public ConsumerThread(MyQueue myQueue) {
this.myQueue = myQueue;
}
@Override
public void run() {
while (true) {
String s = myQueue.get();
System.out.println("\t\t消费元素:" + s);
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
多个生产者多个消费者的情形:
package com.lagou.concurrent.demo;
public class Main2 {
public static void main(String[] args) {
MyQueue2 myQueue = new MyQueue2();
for (int i = 0; i < 3; i++) {
new ConsumerThread(myQueue).start();
}
for (int i = 0; i < 5; i++) {
new ProducerThread(myQueue).start();
}
}
}
package com.lagou.concurrent.demo;
public class MyQueue2 extends MyQueue {
private String[] data = new String[10];
private int getIndex = 0;
private int putIndex = 0;
private int size = 0;
@Override
public synchronized void put(String element) {
if (size == data.length) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
put(element);
} else {
put0(element);
notify();
}
}
private void put0(String element) {
data[putIndex] = element;
++putIndex;
if (putIndex == data.length) putIndex = 0;
++size;
}
@Override
public synchronized String get() {
if (size == 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
return get();
} else {
String result = get0();
notify();
return result;
}
}
private String get0() {
String result = data[getIndex];
++getIndex;
if (getIndex == data.length) getIndex = 0;
--size;
return result;
}
}
# 1.3.2 为什么必须和synchronized一起使用
在Java里面,wait()和notify()是Object的成员函数,是基础中的基础。为什么Java要把wait()和
notify()放在如此基础的类里面,而不是作为像Thread一类的成员函数,或者其他类的成员函数呢?
先看为什么wait()和notify()必须和synchronized一起使用?请看下面的代码:
class MyClass1 {
private Object obj1 = new Object();
public void method1() {
synchronized(obj1) {
//...
obj1.wait();
//...
}
}
public void method2() {
synchronized(obj1) {
//...
obj1.notify();
//...
}
}
}
或者下面的代码:
public class MyClass1 {
public void synchronized method1() {
//...
this.wait();
//...
}
public void synchronized method2() {
//...
this.notify();
//...
}
}
然后,开两个线程,线程A调用method1(),线程B调用method2()。答案已经很明显:两个线程之间要通信,对于同一个对象来说,一个线程调用该对象的wait(),另一个线程调用该对象的notify(),该对象本身就需要同步!所以,在调用wait()、notify()之前,要先通过synchronized关键字同步给对象,也就是给该对象加锁。
synchronized关键字可以加在任何对象的实例方法上面,任何对象都可能成为锁。因此,wait()和notify()只能放在Object里面了。
# 1.3.3 为什么wait()的时候必须释放锁
当线程A进入synchronized(obj1)中之后,也就是对obj1上了锁。此时,调用wait()进入阻塞状态,一直不能退出synchronized代码块;那么,线程B永远无法进入synchronized(obj1)同步块里,永远没有机会调用notify(),发生死锁。
这就涉及一个关键的问题:在wait()的内部,会先释放锁obj1,然后进入阻塞状态,之后,它被另外一个线程用notify()唤醒,重新获取锁!其次,wait()调用完成后,执行后面的业务逻辑代码,然后退出synchronized同步块,再次释放锁。
wait()内部的伪代码如下:
wait() {
// 释放锁
// 阻塞,等待被其他线程notify
// 重新获取锁
}
# 1.3.4 wait()与notify()的问题
以上述的生产者-消费者模型来看,其伪代码大致如下:
public void enqueue() {
synchronized(queue) {
while (queue.full()) {
queue.wait();
}
//... 数据入列
queue.notify(); // 通知消费者,队列中有数据了。
}
}
public void dequeue() {
synchronized(queue) {
while (queue.empty()) {
queue.wait();
}
// 数据出队列
queue.notify(); // 通知生产者,队列中有空间了,可以继续放数据了。
}
}
生产者在通知消费者的同时,也通知了其他的生产者;消费者在通知生产者的同时,也通知了其他消费者。原因在于wait()和notify()所作用的对象和synchronized所作用的对象是同一个,只能有一个对象,无法区分队列空和列队满两个条件。这正是Condition要解决的问题。
# 1.4 InterruptedException与interrupt()方法
# 1.4.1 Interrupted异常
什么情况下会抛出Interrupted异常
假设while循环中没有调用任何的阻塞函数,就是通常的算术运算,或者打印一行日志,如下所示。
package com.lagou.concurrent.demo;
public class MyThread extends Thread {
@Override
public void run() {
while (true) {
boolean interrupted = isInterrupted();
System.out.println("中断标记:" + interrupted);
}
}
}
这个时候,在主线程中调用一句thread.interrupt(),请问该线程是否会抛出异常?不会。
package com.lagou.concurrent.demo;
public class Main42 {
public static void main(String[] args) throws InterruptedException {
MyThread42 myThread = new MyThread42();
myThread.start();
Thread.sleep(10);
myThread.interrupt();
Thread.sleep(100);
System.exit(0);
}
}
只有那些声明了会抛出InterruptedException的函数才会抛出异常,也就是下面这些常用的函数:
public static native void sleep(long millis) throws InterruptedException
{...}
public final void wait() throws InterruptedException {...}
public final void join() throws InterruptedException {...}
# 1.4.2 轻量级阻塞与重量级阻塞
能够被中断的阻塞称为轻量级阻塞,对应的线程状态是WAITING或者TIMED_WAITING;而像
synchronized 这种不能被中断的阻塞称为重量级阻塞,对应的状态是 BLOCKED。如图所示:调用不同的方法后,一个线程的状态迁移过程。
初始线程处于NEW状态,调用start()开始执行后,进入RUNNING或者READY状态。如果没有调用任何的阻塞函数,线程只会在RUNNING和READY之间切换,也就是系统的时间片调度。这两种状态的切换是操作系统完成的,除非手动调用yield()函数,放弃对CPU的占用。
一旦调用了图中的任何阻塞函数,线程就会进入WAITING或者TIMED_WAITING状态,两者的区别只是前者为无限期阻塞,后者则传入了一个时间参数,阻塞一个有限的时间。如果使用了synchronized关键字或者synchronized块,则会进入BLOCKED状态。
不太常见的阻塞/唤醒函数,LockSupport.park()/unpark()。这对函数非常关键,Concurrent包中Lock的实现即依赖这一对操作原语。
因此thread.interrupted()的精确含义是“唤醒轻量级阻塞”,而不是字面意思“中断一个线程”。
thread.isInterrupted()与Thread.interrupted()的区别
因为 thread.interrupted()相当于给线程发送了一个唤醒的信号,所以如果线程此时恰好处于
WAITING或者TIMED_WAITING状态,就会抛出一个InterruptedException,并且线程被唤醒。而如果线程此时并没有被阻塞,则线程什么都不会做。但在后续,线程可以判断自己是否收到过其他线程发来的中断信号,然后做一些对应的处理。
这两个方法都是线程用来判断自己是否收到过中断信号的,前者是实例方法,后者是静态方法。二者的区别在于,前者只是读取中断状态,不修改状态;后者不仅读取中断状态,还会重置中断标志位。
package com.lagou.concurrent.demo;
public class Main {
public static void main(String[] args) throws InterruptedException {
MyThread myThread = new MyThread();
myThread.start();
Thread.sleep(10);
myThread.interrupt();
Thread.sleep(7);
System.out.println("main中断状态检查-1:" + myThread.isInterrupted());
System.out.println("main中断状态检查-2:" + myThread.isInterrupted());
}
}
package com.lagou.concurrent.demo;
public class MyThread extends Thread {
@Override
public void run() {
int i = 0;
while (true) {
boolean interrupted = isInterrupted();
System.out.println("中断标记:" + interrupted);
++i;
if (i > 200) {
// 检查并重置中断标志。
boolean interrupted1 = Thread.interrupted();
System.out.println("重置中断状态:" + interrupted1);
interrupted1 = Thread.interrupted();
System.out.println("重置中断状态:" + interrupted1);
interrupted = isInterrupted();
System.out.println("中断标记:" + interrupted);
break;
}
}
}
}
# 1.5 线程的优雅关闭
# 1.5.1 stop与destory函数
线程是“一段运行中的代码”,一个运行中的方法。运行到一半的线程能否强制杀死?
不能。在Java中,有stop()、destory()等方法,但这些方法官方明确不建议使用。原因很简单,如果强制杀死线程,则线程中所使用的资源,例如文件描述符、网络连接等无法正常关闭。
因此,一个线程一旦运行起来,不要强行关闭,合理的做法是让其运行完(也就是方法执行完
毕),干净地释放掉所有资源,然后退出。如果是一个不断循环运行的线程,就需要用到线程间的通信机制,让主线程通知其退出。
# 1.5.2 守护线程
daemon线程和非daemon线程的对比:
package com.lagou.concurrent.demo;
public class Main {
public static void main(String[] args) {
MyDaemonThread myDaemonThread = new MyDaemonThread();
// 设置为daemon线程
myDaemonThread.setDaemon(true);
myDaemonThread.start();
// 启动非daemon线程,当非daemon线程结束,不管daemon线程是否结束,都结束JVM进程。
new MyThread().start();
}
}
package com.lagou.concurrent.demo;
public class MyDaemonThread extends Thread {
@Override
public void run() {
while (true) {
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package com.lagou.concurrent.demo;
public class MyThread extends Thread {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
System.out.println("非Daemon线程");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
对于上面的程序,在thread.start()前面加一行代码thread.setDaemon(true)。当main(...)函数退出后,线程thread就会退出,整个进程也会退出。
当在一个JVM进程里面开多个线程时,这些线程被分成两类:守护线程和非守护线程。默认都是非守护线程。
在Java中有一个规定:当所有的非守护线程退出后,整个JVM进程就会退出。意思就是守护线程“不算作数”,守护线程不影响整个 JVM 进程的退出。
例如,垃圾回收线程就是守护线程,它们在后台默默工作,当开发者的所有前台线程(非守护线
程)都退出之后,整个JVM进程就退出了。
# 1.5.3 设置关闭的标志位
开发中一般通过设置标志位的方式,停止循环运行的线程。
代码1
package com.lagou.concurrent.demo;
public class MyThread extends Thread{
private boolean running = true;
@Override
public void run() {
while (running) {
System.out.println("线程正在运行。。。");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void stopRunning() {
this.running = false;
}
public static void main(String[] args) throws InterruptedException {
MyThread myThread = new MyThread();
myThread.start();
Thread.sleep(5000);
myThread.stopRunning();
myThread.join();
}
}
但上面的代码有一个问题:如果MyThread t在while循环中阻塞在某个地方,例如里面调用了
object.wait()函数,那它可能永远没有机会再执行 while( ! stopped)代码,也就一直无法退出循环。
此时,就要用到InterruptedException()与interrupt()函数。
# 2 并发核心概念
# 2.1 并发与并行
在单个处理器上采用单核执行多个任务即为并发。在这种情况下,操作系统的任务调度程序会很快从一个任务切换到另一个任务,因此看起来所有的任务都是同时运行的。
同一时间内在不同计算机、处理器或处理器核心上同时运行多个任务,就是所谓的“并行”。
另一个关于并发的定义是,在系统上同时运行多个任务(不同的任务)就是并发。而另一个关于并行的定义是:同时在某个数据集的不同部分上运行同一任务的不同实例就是并行。
关于并行的最后一个定义是,系统中同时运行了多个任务。关于并发的最后一个定义是,一种解释程序员将任务和它们对共享资源的访问同步的不同技术和机制的方法。
这两个概念非常相似,而且这种相似性随着多核处理器的发展也在不断增强。
# 2.2 同步
在并发中,我们可以将同步定义为一种协调两个或更多任务以获得预期结果的机制。同步的方式有两种:
- 控制同步:例如,当一个任务的开始依赖于另一个任务的结束时,第二个任务不能再第一个任务 完成之前开始。
- 数据访问同步:当两个或更多任务访问共享变量时,再任意时间里,只有一个任务可以访问该变量。
与同步密切相关的一个概念时临界段。临界段是一段代码,由于它可以访问共享资源,因此再任何给定时间内,只能被一个任务执行。互斥是用来保证这一要求的机制,而且可以采用不同的方式来实现。
同步可以帮助你在完成并发任务的同时避免一些错误,但是它也为你的算法引入了一些开销。你必须非常仔细地计算任务的数量,这些任务可以独立执行,而无需并行算法中的互通信。这就涉及并发算法的粒度。如果算法有着粗粒度(低互通信的大型任务),同步方面的开销就会较低。然而,也许你不会用到系统所有的核心。如果算法有者细粒度(高互通信的小型任务),同步方面的开销就会很高,而且该算法的吞吐量可能不会很好。
并发系统中有不同的同步机制。从理论角度看,最流行的机制如下:
- 信号量(semaphore):一种用于控制对一个或多个单位资源进行访问的机制。它有一个用于存放可用资源数量的变量,而且可以采用两种原子操作来管理该变量。互斥(mutex,mutual exclusion的简写形式)是一种特殊类型的信号量,它只能取两个值(即资源空闲和资源忙),而且只有将互斥设置为忙的那个进程才可以释放它。互斥可以通过保护临界段来帮助你避免出现竞争条件。
- 监视器:一种在共享资源上实现互斥的机制。它有一个互斥、一个条件变量、两种操作(等待条件和通报条件)。一旦你通报了该条件,在等待它的任务中只有一个会继续执行。
如果共享数据的所有用户都受到同步机制的保护,那么代码(或方法、对象)就是线程安全的。数据的非阻塞的CAS(compare-and-swap,比较和交换)原语是不可变的,这样就可以在并发应用程序中使用该代码而不会出任何问题。
# 2.3 不可变对象
不可变对象是一种非常特殊的对象。在其初始化后,不能修改其可视状态(其属性值)。如果想修改一个不可变对象,那么你就必须创建一个新的对象。
不可变对象的主要优点在于它是线程安全的。你可以在并发应用程序中使用它而不会出现任何问题。
不可变对象的一个例子就是java中的String类。当你给一个String对象赋新值时,会创建一个新的String对象。
# 2.4 原子操作和原子变量
与应用程序的其他任务相比,原子操作是一种发生在瞬间的操作。在并发应用程序中,可以通过一个临界段来实现原子操作,以便对整个操作采用同步机制。
原子变量是一种通过原子操作来设置和获取其值的变量。可以使用某种同步机制来实现一个原子变量,或者也可以使用CAS以无锁方式来实现一个原子变量,而这种方式并不需要任何同步机制。
# 2.5 共享内存与消息传递
任务可以通过两种不同的方式来相互通信。第一种方法是共享内存,通常用于在同一台计算机上运行多任务的情况。任务在读取和写入值的时候使用相同的内存区域。为了避免出现问题,对该共享内存的访问必须在一个由同步机制保护的临界段内完成。
另一种同步机制是消息传递,通常用于在不同计算机上运行多任务的情形。当一个任务需要与另一个任务通信时,它会发送一个遵循预定义协议的消息。如果发送方保持阻塞并等待响应,那么该通信就是同步的;如果发送方在发送消息后继续执行自己的流程,那么该通信就是异步的。
# 3 并发的问题
# 3.1 数据竞争
如果有两个或者多个任务在临界段之外对一个共享变量进行写入操作,也就是说没有使用任何同步机制,那么应用程序可能存在数据竞争(也叫做竞争条件)。
在这些情况下,应用程序的最终结果可能取决于任务的执行顺序。
package com.lagou.concurrent.demo;
public class ConcurrentDemo {
private float myFloat;
public void modify(float difference) {
float value = this.myFloat;
this.myFloat = value + difference;
}
public static void main(String[] args) {
}
}
假设有两个不同的任务执行了同一个modify方法。由于任务中语句的执行顺序不同,最终结果也会不同。
modify方法不是原子的, ConcurrentDemo 也不是线程安全的。
# 3.2 死锁
当两个(或多个)任务正在等待必须由另一线程释放的某个共享资源,而该线程又正在等待必须由前述任务之一释放的另一共享资源时,并发应用程序就出现了死锁。当系统中同时出现如下四种条件时,就会导致这种情形。我们将其称为Coffman 条件。
- 互斥: 死锁中涉及的资源必须是不可共享的。一次只有一个任务可以使用该资源。
- 占有并等待条件: 一个任务在占有某一互斥的资源时又请求另一互斥的资源。当它在等待时,不会释放任何资源。
- 不可剥夺:资源只能被那些持有它们的任务释放。
- 循环等待:任务1正等待任务2 所占有的资源, 而任务2 又正在等待任务3 所占有的资源,以此类推,最终任务n又在等待由任务1所占有的资源,这样就出现了循环等待。
有一些机制可以用来避免死锁。
- 忽略它们:这是最常用的机制。你可以假设自己的系统绝不会出现死锁,而如果发生死锁,结果就是你可以停止应用程序并且重新执行它。
- 检测:系统中有一项专门分析系统状态的任务,可以检测是否发生了死锁。如果它检测到了死锁,可以采取一些措施来修复该问题,例如,结束某个任务或者强制释放某一资源。
- 预防:如果你想防止系统出现死锁,就必须预防Coffman 条件中的一条或多条出现。
- 规避:如果你可以在某一任务执行之前得到该任务所使用资源的相关信息,那么死锁是可以规避的。当一个任务要开始执行时,你可以对系统中空闲的资源和任务所需的资源进行分析,这样就可以判断任务是否能够开始执行。
# 3.3 活锁
如果系统中有两个任务,它们总是因对方的行为而改变自己的状态, 那么就出现了活锁。最终结果是它们陷入了状态变更的循环而无法继续向下执行。
例如,有两个任务:任务1和任务2 ,它们都需要用到两个资源:资源1和资源2 。假设任务1对资源1加了一个锁,而任务2 对资源2 加了一个锁。当它们无法访问所需的资源时,就会释放自己的资源并且重新开始循环。这种情况可以无限地持续下去,所以这两个任务都不会结束自己的执行过程。
# 3.4 资源不足
当某个任务在系统中无法获取维持其继续执行所需的资源时,就会出现资源不足。当有多个任务在等待某一资源且该资源被释放时,系统需要选择下一个可以使用该资源的任务。如果你的系统中没有设计良好的算法,那么系统中有些线程很可能要为获取该资源而等待很长时间。
要解决这一问题就要确保公平原则。所有等待某一资源的任务必须在某一给定时间之内占有该资源。可选方案之一就是实现一个算法,在选择下一个将占有某一资源的任务时,对任务已等待该资源的时间因素加以考虑。然而,实现锁的公平需要增加额外的开销,这可能会降低程序的吞吐量。
# 3.5 优先权反转
当一个低优先权的任务持有了一个高优先级任务所需的资源时,就会发生优先权反转。这样的话,低优先权的任务就会在高优先权的任务之前执行。
# 4 JMM内存模型
# 4.1 JMM与happen-before
# 4.1.1 为什么会存在“内存可见性”问题
下图为x86架构下CPU缓存的布局,即在一个CPU 4核下,L1、L2、L3三级缓存与主内存的布局。
每个核上面有L1、L2缓存,L3缓存为所有核共用。
因为存在CPU缓存一致性协议,例如MESI,多个CPU核心之间缓存不会出现不同步的问题,不会有“内存可见性”问题。
缓存一致性协议对性能有很大损耗,为了解决这个问题,又进行了各种优化。例如,在计算单元和L1之间加了Store Buffer、Load Buffer(还有其他各种Buffer),如下图:
L1、L2、L3和主内存之间是同步的,有缓存一致性协议的保证,但是Store Buffer、Load Buffer和L1之间却是异步的。向内存中写入一个变量,这个变量会保存在Store Buffer里面,稍后才异步地写入L1中,同时同步写入主内存中。
操作系统内核视角下的CPU缓存模型:
多CPU,每个CPU多核,每个核上面可能还有多个硬件线程,对于操作系统来讲,就相当于一个个的逻辑CPU。每个逻辑CPU都有自己的缓存,这些缓存和主内存之间不是完全同步的。
对应到Java里,就是JVM抽象内存模型,如下图所示:
# 4.1.2 重排序与内存可见性的关系
Store Buffer的延迟写入是重排序的一种,称为内存重排序(Memory Ordering)。除此之外,还有编译器和CPU的指令重排序。
重排序类型:
- 编译器重排序。
对于没有先后依赖关系的语句,编译器可以重新调整语句的执行顺序。
- CPU指令重排序。
在指令级别,让没有依赖关系的多条指令并行。
- CPU内存重排序。
CPU有自己的缓存,指令的执行顺序和写入主内存的顺序不完全一致。
在三种重排序中,第三类就是造成“内存可见性”问题的主因,如下案例:
线程1:
X=1
a=Y
线程2:
Y=1
b=X
假设X、Y是两个全局变量,初始的时候,X=0,Y=0。请问,这两个线程执行完毕之后,a、b的正确结果应该是什么?
很显然,线程1和线程2的执行先后顺序是不确定的,可能顺序执行,也可能交叉执行,最终正确的
结果可能是:
1. a=0,b=1
2. a=1,b=0
3. a=1,b=1
也就是不管谁先谁后,执行结果应该是这三种场景中的一种。但实际可能是a=0,b=0。
两个线程的指令都没有重排序,执行顺序就是代码的顺序,但仍然可能出现a=0,b=0。原因是线程1先执行X=1,后执行a=Y,但此时X=1还在自己的Store Buffer里面,没有及时写入主内存中。所以,线程2看到的X还是0。线程2的道理与此相同。
虽然线程1觉得自己是按代码顺序正常执行的,但在线程2看来,a=Y和X=1顺序却是颠倒的。指令没有重排序,是写入内存的操作被延迟了,也就是内存被重排序了,这就造成内存可见性问题。
# 4.1.3 内存屏障
为了禁止编译器重排序和 CPU 重排序,在编译器和 CPU 层面都有对应的指令,也就是内存屏障(Memory Barrier)。这也正是JMM和happen-before规则的底层实现原理。
编译器的内存屏障,只是为了告诉编译器不要对指令进行重排序。当编译完成之后,这种内存屏障就消失了,CPU并不会感知到编译器中内存屏障的存在。
而CPU的内存屏障是CPU提供的指令,可以由开发者显示调用。
内存屏障是很底层的概念,对于 Java 开发者来说,一般用 volatile 关键字就足够了。但从JDK 8开始,Java在Unsafe类中提供了三个内存屏障函数,如下所示。
public final class Unsafe {
// ...
public native void loadFence();
public native void storeFence();
public native void fullFence();
// ...
}
在理论层面,可以把基本的CPU内存屏障分成四种:
- LoadLoad:禁止读和读的重排序。
- StoreStore:禁止写和写的重排序。
- LoadStore:禁止读和写的重排序。
- StoreLoad:禁止写和读的重排序。
Unsafe中的方法:
- loadFence=LoadLoad+LoadStore
- storeFence=StoreStore+LoadStore
- fullFence=loadFence+storeFence+StoreLoad
# 4.1.4 as-if-serial语义
重排序的原则是什么?什么场景下可以重排序,什么场景下不能重排序呢?
# 1.单线程程序的重排序规则
无论什么语言,站在编译器和CPU的角度来说,不管怎么重排序,单线程程序的执行结果不能改变,这就是单线程程序的重排序规则。
即只要操作之间没有数据依赖性,编译器和CPU都可以任意重排序,因为执行结果不会改变,代码看起来就像是完全串行地一行行从头执行到尾,这也就是as-if-serial语义。
对于单线程程序来说,编译器和CPU可能做了重排序,但开发者感知不到,也不存在内存可见性问题。
# 2.多线程程序的重排序规则
编译器和CPU的这一行为对于单线程程序没有影响,但对多线程程序却有影响。
对于多线程程序来说,线程之间的数据依赖性太复杂,编译器和CPU没有办法完全理解这种依赖性并据此做出最合理的优化。
编译器和CPU只能保证每个线程的as-if-serial语义。
线程之间的数据依赖和相互影响,需要编译器和CPU的上层来确定。
上层要告知编译器和CPU在多线程场景下什么时候可以重排序,什么时候不能重排序。
# 4.1.5 happen-before是什么
使用happen-before描述两个操作之间的内存可见性。
java内存模型(JMM)是一套规范,在多线程中,一方面,要让编译器和CPU可以灵活地重排序;另一方面,要对开发者做一些承诺,明确告知开发者不需要感知什么样的重排序,需要感知什么样的重排序。然后,根据需要决定这种重排序对程序是否有影响。如果有影响,就需要开发者显示地通过volatile、synchronized等线程同步机制来禁止重排序。
关于happen-before:
如果A happen-before B,意味着A的执行结果必须对B可见,也就是保证跨线程的内存可见性。A happen before B不代表A一定在B之前执行。因为,对于多线程程序而言,两个操作的执行顺序是不确定的。happen-before只确保如果A在B之前执行,则A的执行结果必须对B可见。定义了内存可见性的约束,也就定义了一系列重排序的约束。
基于happen-before的这种描述方法,JMM对开发者做出了一系列承诺:
- 单线程中的每个操作,happen-before 对应该线程中任意后续操作(也就是 as-if-serial语义保证)。
- 对volatile变量的写入,happen-before对应后续对这个变量的读取。
- 对synchronized的解锁,happen-before对应后续对这个锁的加锁。
……
JMM对编译器和CPU 来说,volatile 变量不能重排序;非 volatile 变量可以任意重排序。
# 4.1.6 happen-before的传递性
除了这些基本的happen-before规则,happen-before还具有传递性,即若A happen-before B,B happen-before C,则A happen-before C。
如果一个变量不是volatile变量,当一个线程读取、一个线程写入时可能有问题。那岂不是说,在多线程程序中,我们要么加锁,要么必须把所有变量都声明为volatile变量?这显然不可能,而这就得归功于happen-before的传递性。
class A {
private int a = 0;
private volatile int c = 0;
public void set() {
a = 5; // 操作1
c = 1; // 操作2
}
public int get() {
int d = c; // 操作3
return a; // 操作4
}
}
假设线程A先调用了set,设置了a=5;之后线程B调用了get,返回值一定是a=5。为什么呢?
操作1和操作2是在同一个线程内存中执行的,操作1 happen-before 操作2,同理,操作3 happen-before操作4。又因为c是volatile变量,对c的写入happen-before对c的读取,所以操作2 happen-before操作3。利用happen-before的传递性,就得到:
操作1 happen-before 操作2 happen-before 操作3 happen-before操作4。
所以,操作1的结果,一定对操作4可见。
class A {
private int a = 0;
private int c = 0;
public synchronized void set() {
a = 5; // 操作1
c = 1; // 操作2
}
public synchronized int get() {
return a;
}
}
假设线程A先调用了set,设置了a=5;之后线程B调用了get,返回值也一定是a=5。
因为与volatile一样,synchronized同样具有happen-before语义。展开上面的代码可得到类似于下面的伪代码:
线程A:
加锁; // 操作1
a = 5; // 操作2
c = 1; // 操作3
解锁; // 操作4
线程B:
加锁; // 操作5
读取a; // 操作6
解锁; // 操作7
根据synchronized的happen-before语义,操作4 happen-before 操作5,再结合传递性,最终就会得到:
操作1 happen-before 操作2……happen-before 操作7。所以,a、c都不是volatile变量,但仍然有内存可见性。
# 4.2 volatile关键字
# 4.2.1 64位写入的原子性(Half Write)
如,对于一个long型变量的赋值和取值操作而言,在多线程场景下,线程A调用set(100),线程B调用get(),在某些场景下,返回值可能不是100。
public class MyClass {
private long a = 0;
// 线程A调用set(100)
public void set(long a) {
this.a = a;
}
// 线程B调用get(),返回值一定是100吗?
public long get() {
return this.a;
}
}
因为JVM的规范并没有要求64位的long或者double的写入是原子的。在32位的机器上,一个64位变量的写入可能被拆分成两个32位的写操作来执行。这样一来,读取的线程就可能读到“一半的值”。解决办法也很简单,在long前面加上volatile关键字。
# 4.2.2 重排序:DCL问题
单例模式的线程安全的写法不止一种,常用写法为DCL(Double Checking Locking),如下所示:
public class Singleton {
private static Singleton instance;
public static Singleton getInstance() {
if (instance == null) {
synchronized(Singleton.class) {
if (instance == null) {
// 此处代码有问题
instance = new Singleton();
}
}
}
return instance;
}
}
上述的 instance = new Singleton(); 代码有问题:其底层会分为三个操作:
- 分配一块内存。
- 在内存上初始化成员变量。
- 把instance引用指向内存。
在这三个操作中,操作2和操作3可能重排序,即先把instance指向内存,再初始化成员变量,因为二者并没有先后的依赖关系。此时,另外一个线程可能拿到一个未完全初始化的对象。这时,直接访问里面的成员变量,就可能出错。这就是典型的“构造方法溢出”问题。
解决办法也很简单,就是为instance变量加上volatile修饰。
volatile的三重功效:64位写入的原子性、内存可见性和禁止重排序。
# 4.2.3 volatile实现原理
由于不同的CPU架构的缓存体系不一样,重排序的策略不一样,所提供的内存屏障指令也就有差异。
这里只探讨为了实现volatile关键字的语义的一种参考做法:
- 在volatile写操作的前面插入一个StoreStore屏障。保证volatile写操作不会和之前的写操作重排序。
- 在volatile写操作的后面插入一个StoreLoad屏障。保证volatile写操作不会和之后的读操作重排序。
- 在volatile读操作的后面插入一个LoadLoad屏障+LoadStore屏障。保证volatile读操作不会和之后的读操作、写操作重排序。
具体到x86平台上,其实不会有LoadLoad、LoadStore和StoreStore重排序,只有StoreLoad一种重排序(内存屏障),也就是只需要在volatile写操作后面加上StoreLoad屏障。
# 4.2.4 JSR-133对volatile语义的增强
在JSR -133之前的旧内存模型中,一个64位long/ double型变量的读/ 写操作可以被拆分为两个32位的读/写操作来执行。从JSR -133内存模型开始 (即从JDK5开始),仅仅只允许把一个64位long/ double型变量的写操作拆分为两个32位的写操作来执行,任意的读操作在JSR -133中都必须具有原子性(即 任意读操作必须要在单个读事务中执行)。
这也正体现了Java对happen-before规则的严格遵守。
# 4.3 final关键字
# 4.3.1 构造方法溢出问题
考虑下面的代码:
public class MyClass {
private int num1;
private int num2;
private static MyClass myClass;
public MyClass() {
num1 = 1;
num2 = 2;
}
/**
* 线程A先执行write()
*/
public static void write() {
myClass = new MyClass();
}
/**
* 线程B接着执行read()
*/
public static void read() {
if (myClass != null) {
int num3 = myClass.num1;
int num4 = myClass.num2;
}
}
}
num3和num4的值是否一定是1和2?
num3、num4不见得一定等于1,2。和DCL的例子类似,也就是构造方法溢出问题。
myClass = new MyClass()这行代码,分解成三个操作:
- 分配一块内存;
- 在内存上初始化i=1,j=2;
- 把myClass指向这块内存。
操作2和操作3可能重排序,因此线程B可能看到未正确初始化的值。对于构造方法溢出,就是一个对象的构造并不是“原子的”,当一个线程正在构造对象时,另外一个线程却可以读到未构造好的“一半对象”。
# 4.3.2 final的happen-before语义
要解决这个问题,不止有一种办法。
办法1:给num1,num2加上volatile关键字。
办法2:为read/write方法都加上synchronized关键字。
如果num1,num2只需要初始化一次,还可以使用final关键字。
之所以能解决问题,是因为同volatile一样,final关键字也有相应的happen-before语义:
- 对final域的写(构造方法内部),happen-before于后续对final域所在对象的读。
- 对final域所在对象的读,happen-before于后续对final域的读。
通过这种happen-before语义的限定,保证了final域的赋值,一定在构造方法之前完成,不会出现另外一个线程读取到了对象,但对象里面的变量却还没有初始化的情形,避免出现构造方法溢出的问题。
# 4.3.3 happen-before规则总结
- 单线程中的每个操作,happen-before于该线程中任意后续操作。
- 对volatile变量的写,happen-before于后续对这个变量的读。
- 对synchronized的解锁,happen-before于后续对这个锁的加锁。
- 对final变量的写,happen-before于final域对象的读,happen-before于后续对final变量的读。
四个基本规则再加上happen-before的传递性,就构成JMM对开发者的整个承诺。在这个承诺以外的部分,程序都可能被重排序,都需要开发者小心地处理内存可见性问题。