多线程设计模式
# 1 Single Threaded Execution模式
所谓Single Threaded Execution模式,指的是“以一个线程执行”。该模式用于设置限制,以确保同一时间只能让一个线程执行处理。
Single Threaded Execution有时也称为临界区(critical section)或临界域(critical region)。Single Threaded Execution名称侧重于执行处理的线程,临界区或临界域侧重于执行范围。
# 1.1 示例程序
package com.lagou.concurrent.demo;
public class ConcurrentDemo {
public static void main(String[] args) {
NumberResource resource = new NumberResource();
new UserThread(resource).start();
new UserThread(resource).start();
}
}
package com.lagou.concurrent.demo;
public class UserThread extends Thread {
private NumberResource resource;
public UserThread(NumberResource resource) {
this.resource = resource;
}
@Override
public void run() {
while (true) {
resource.showNumber();
}
}
}
package com.lagou.concurrent.demo;
public class NumberResource {
private Integer number = 0;
private Integer printIndex = 0;
private void add() {
this.number++;
}
private Integer get() {
return this.number;
}
public void showNumber() throws InterruptedException {
// 大概打印100次,退出虚拟机
if (printIndex > 100) System.exit(0);
Integer number1 = this.get();
this.add();
Thread.sleep(5);
Integer number2 = this.get();
if ((number1 + 1) == number2) {
System.out.println(Thread.currentThread().getName() + " => 递增--正确--:" + number1 + " ***** " + number2);
} else {
System.out.println(Thread.currentThread().getName() + " => 递增**异常**:" + number1 + " ***** " + number2);
}
// 增加计数
printIndex++;
}
}
运行效果:
上述代码之所以递增异常,是因为showNumber方法是一个临界区,其中对数字加一,但又不能保证原子性,在多线程执行的时候,就会出现问题。
线程安全的NumberResource类:
package com.lagou.concurrent.demo;
public class NumberResource {
private Integer number = 0;
private Integer printIndex = 0;
private void add() {
this.number++;
}
private Integer get() {
return this.number;
}
public synchronized void showNumber() throws InterruptedException {
// 大概打印100次,退出虚拟机
if (printIndex > 100) System.exit(0);
Integer number1 = this.get();
this.add();
Thread.sleep(5);
Integer number2 = this.get();
if ((number1 + 1) == number2) {
System.out.println(Thread.currentThread().getName() + " => 递增--正确--:" + number1 + " ***** " + number2);
} else {
System.out.println(Thread.currentThread().getName() + " => 递增**异常**:" + number1 + " ***** " + number2);
}
// 增加计数
printIndex++;
}
}
# 1.2 Single Threaded Execution模式总结
- SharedResource(共享资源)
Single Threaded Execution模式中出现了一个发挥SharedResource(共享资源)作用的类。在示例程序中,由NumberResource类扮演SharedResource角色。
SharedResource角色是可以被多个线程访问的类,包含很多方法,但这些方法主要分为如下两
类:
- safeMethod:多个线程同时调用也不会发生问题的方法。
- unsafeMethod:多个线程同时访问会发生问题,因此必须加以保护的方法。
safeMethod,无需考虑。
对于unsafeMethod,在被多个线程同时执行时,实例状态有可能发生分歧。这时就需要保护该方法,使其不被多个线程同时访问。
Single Threaded Execution模式会保护unsafeMethod,使其同时只能由一个线程访问。java则是通过unsafeMethod声明为synchronized方法来进行保护。
我们将只允许单个线程执行的程序范围称为临界区。
# 1,3 Single Threaded Execution类图
# 1.4 何时使用Single Threaded Execution模式
- 多线程时
在单线程程序中使用synchronized关键字并不会破坏程序的安全性,但是调用synchronized方法要比调用一般方法花费时间,稍微降低程序性能。
- 多个线程访问时
当SharedResource角色的实例有可能被多个线程同时访问时,就需要使用Single Threaded
Execution模式。
即便是多线程程序,如果所有线程都是完全独立操作的,也无需使用Single Threaded Execution模式。这种状态称为线程互不干涉。
在某些处理多个线程的框架中,有时线程的独立性是由框架控制的。此时,框架的使用者就无需考虑是否使用Single Threaded Execution模式。
- 状态有可能变化时
之所以需要使用Single Threaded Execution模式,是因为SharedResource角色的状态会发生变化。
如果在创建实例后,实例的状态再也不发生变化,就无需使用Single Threaded Execution模式。
- 需要确保安全性时
只有在需要确保安全性时,才需要使用Single Threaded Execution模式。
Java的集合类大多数都是非线程安全的。这是为了在不需要考虑安全性的时候提高程序运行速度。
用户在使用类时,需要考虑自己要用的类是否时线程安全的。
# 1.5 死锁
使用Single Threaded Execution模式时,存在发生死锁的危险。
死锁是指两个线程分别持有锁,并相互等待对方释放锁的现象。发生死锁的线程都无法再继续运行,程序卡死。
两个人吃饭,都需要刀和叉,但刀叉又只有一套。某时刻,其中一个人拿了刀,另一个拿了叉,而且两人都在等待对方让出自己需要的叉或刀。这种情形下,两个人都只能一直等待下去,这就是发生了死锁。
在Single Threaded Execution模式中,满足下列条件时,会发生死锁:
- 存在多个SharedResource角色
- 线程在持有某个SharedResource角色锁的同时,还想获取其他SharedResource角色的锁
- 获取SharedResource角色的锁的顺序不固定(SharedResource角色是对称的)
# 1.6 临界区的大小和性能
一般情况下,Single Threaded Execution模式会降低程序性能:
- 获取锁花费时间
进入synchronized方法时,线程需要获取对象的锁,该处理会花费时间。
如果SharedResource角色的数量减少了,那么要获取的锁的数量也会相应地减少,从而就能够抑制性能的下降了。
- 线程冲突引起的等待
当线程执行临界区内的处理时,其他想要进入临界区的线程会阻塞。这种状况称为线程冲突。发生冲突时,程序的整体性能会随着线程等待时间的增加而下降。
# 2 Immutable模式
Immutable就是不变的、不发生改变。Immutable模式中存在着确保实例状态不发生改变的类。在访问这些实例时不需要执行耗时的互斥处理。如果能用好该模式,就可以提高程序性能。
如String就是一个不可变类,immutable的。
# 2.1 示例程序
package com.lagou.concurrent.demo;
public class ConcurrentDemo {
public static void main(String[] args) {
User user = new User(1001, "张三", "张三是一个好人");
new UserThread(user).start();
new UserThread(user).start();
new UserThread(user).start();
new UserThread(user).start();
new UserThread(user).start();
}
}
package com.lagou.concurrent.demo;
public class User {
private final Integer userId;
private final String username;
private final String desc;
public User(Integer userId, String username, String desc) {
this.userId = userId;
this.username = username;
this.desc = desc;
}
public Integer getUserId() {
return userId;
}
public String getUsername() {
return username;
}
public String getDesc() {
return desc;
}
@Override
public String toString() {
return "User{" +
"userId=" + userId +
", username='" + username + '\'' +
", desc='" + desc + '\'' +
'}';
}
}
package com.lagou.concurrent.demo;
public class UserThread extends Thread {
private Integer index = 0;
private User user;
public UserThread(User user) {
this.user = user;
}
@Override
public void run() {
while (true) {
if (index >= 100) {
System.exit(0);
}
System.out.println(Thread.currentThread().getName() + " ===> " +
user);
index++;
}
}
}
执行效果:
类图
在Single Threaded Execution模式,将修改或引用实例状态的地方设置为临界区,该区只能由一个线程执行。对于本案例的User类,实例的状态绝对不会发生改变,即使多个线程同时对该实例执行处理,实例也不会出错,因为实例的状态不变。如此也无需使用synchronized关键字来保护实例。
# 2.2 Immutable模式中的角色
- Immutable
Immutable角色是一个类,该角色中的字段值不可修改,也不存在修改字段内容的方法。无需对Immutable角色应用Single Threaded Execution模式。无需使用synchronized关键字。就是本案例的User类。
# 2.3 何时使用Immutable模式
- 创建实例后,状态不再发生改变
必须是实例创建后,状态不再发生变化的。实例的状态由字段的值决定。即使字段是final的且不存在setter,也有可能不是不可变的。因为字段引用的实例有可能发生变化。
- 实例是共享的,且被频繁访问时
Immutable模式的优点是不需要使用synchronized关键字进行保护。意味着在不失去安全性和生存性的前提下提高性能。当实例被多个线程共享,且有可能被频繁访问时,Immutable模式优点明显。
注意:
StringBuffer类表示字符串的可变类,String类表示字符串的不可变类。String实例表示的字符串不可以修改,执行操作的方法都不是synchronized修饰的,引用速度更快。
如果需要频繁修改字符串内容,则使用StringBuffer;如果不需要修改字符串内容,只是引用内
容,则使用String。
# 2.4 JDK中的不可变模式
- java.lang.String
- java.math.BigInteger
- java.math.Decimal
- java.util.regex.Pattern
- java.lang.Boolean
- java.lang.Byte
- java.lang.Character
- java.lang.Double
- java.lang.Float
- java.lang.Integer
- java.lang.Long
- java.lang.Short
- java.lang.Void
# 3 Guarded Suspension模式
Guarded表示被守护、被保卫、被保护。Suspension表示暂停。如果执行现在的处理会造成问题,就让执行处理的线程进行等待——这就是Guarded Suspension模式。
Guarded Suspension模式通过让线程等待来保证实例的安全型。
Guarded Suspension也称为guarded wait、spin lock等名称。
# 3.1 示例程序:
package com.lagou.concurrent.demo;
public class Request {
private final String name;
public Request(String name) {
this.name = name;
}
@Override
public String toString() {
return "Request{" +
"name='" + name + '\'' +
'}';
}
}
package com.lagou.concurrent.demo;
import java.util.LinkedList;
import java.util.Queue;
public class RequestQueue {
private final Queue<Request> queue = new LinkedList<>();
public synchronized Request getRequest() {
while (queue.peek() == null) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return queue.remove();
}
public synchronized void putRequest(Request request) {
queue.offer(request);
notifyAll();
}
}
package com.lagou.concurrent.demo;
import java.util.Random;
public class ServerThread extends Thread {
private final Random random;
private final RequestQueue requestQueue;
public ServerThread(RequestQueue requestQueue, String name, long seed) {
super(name);
this.requestQueue = requestQueue;
random = new Random(seed);
}
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
Request request = requestQueue.getRequest();
System.out.println(Thread.currentThread().getName() + " 处理 " +
request);
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package com.lagou.concurrent.demo;
import java.util.Random;
public class ClientThread extends Thread {
private final Random random;
private final RequestQueue requestQueue;
public ClientThread(RequestQueue requestQueue, String name, long seed) {
super(name);
this.requestQueue = requestQueue;
this.random = new Random(seed);
}
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
Request request = new Request("请求:" + i);
System.out.println(Thread.currentThread().getName() + " 请求 " +
request);
requestQueue.putRequest(request);
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package com.lagou.concurrent.demo;
public class ConcurrentDemo {
public static void main(String[] args) {
RequestQueue requestQueue = new RequestQueue();
new ClientThread(requestQueue, "client-1", 432432L).start();
new ServerThread(requestQueue, "server-1", 9988766L).start();
}
}
执行效果:
应用保护条件进行保护:
上图中,getRequest方法执行的逻辑是从queue中取出一个Request实例,即 queue.remove() ,但是要获取Request实例,必须满足条件: queue.peek() != null 。该条件就是Guarded Suspension模式的守护条件(guard condition)。
当线程执行到while语句时:
- 若守护条件成立,线程不进入while语句块,直接执行queue.remove()方法,线程不会等待。
- 若守护条件不成立,线程进入while语句块,执行wait,开始等待。
若守护条件不成立,则线程等待。等待什么?等待notifyAll()唤醒该线程。
守护条件阻止了线程继续向前执行,除非实例状态发生改变,守护条件成立,被另一个线程唤醒。
该类中的synchronized关键字保护的是queue字段,getRequest方法的synchronized保护该方法只能由一个线程执行。
线程执行this.wait之后,进入this的等待队列,并释放持有的this锁。
notify、notifyAll或interrupt会让线程退出等待队列,实际继续执行之前还必须再次获取this的锁线程才可以继续执行。
# 3.2 时序图
# 3.3 Guarded Suspension模式中的角色
- GuardedObject(被保护的对象)
GuardedObject角色是一个持有被保护(guardedMethod)的方法的类。当线程执行
guardedMethod方法时,若守护条件成立,立即执行;当守护条件不成立,等待。守护条件随着GuardedObject角色的状态不同而变。
除了guardedMethod之外,GuardedObject角色也可以持有其他改变实例状态
(stateChangingMethod)的方法。
java中,guardedMethod通过while语句和wait方法来实现,stateChangingMethod通过
notify/notifyAll方法实现。
在本案例中,RequestQueue为GuardedObject,getRequest方法为guardedMethod,
putRequest为stateChangingMethod。
可以将Guarded Suspension理解为多线程版本的if。
# 3.4 LinkedBlockingQueue
可以使用LinkedBlockingQueue替代RequestQueue。
package com.lagou.concurrent.demo;
import java.util.concurrent.LinkedBlockingQueue;
public class LinkedBlockingQueueRequestQueue {
// private final Queue<Request> queue = new LinkedList<>();
private final LinkedBlockingQueue<Request> queue = new
LinkedBlockingQueue<>();
public Request getRequest() {
Request request = null;
try {
request = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
return request;
}
public void putRequest(Request request) {
try {
queue.put(request);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
# 3.5 类图
# 4 Balking模式
所谓Balk,就是停止并返回的意思。
Balking模式与Guarded Suspension模式一样,也存在守护条件。在Balking模式中,如果守护条件不成立,则立即中断处理。而Guarded Suspension模式一直等待直到可以运行。
# 4.1 示例程序
两个线程,一个是修改线程,修改之后,等待随机时长,保存文件内容。
另一个是保存线程,固定时长进行文件内容的保存。
如果文件需要保存,则执行保存动作
如果文件不需要保存,则不执行保存动作。
package com.lagou.concurrent.demo;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
public class Data {
private final String filename;
private String content;
private boolean changed;
public Data(String filename, String content) {
this.filename = filename;
this.content = content;
}
public synchronized void change(String newContent) {
this.content = newContent;
this.changed = true;
}
public synchronized void save() throws IOException {
if (changed) {
doSave();
changed = false;
} else {
System.out.println(Thread.currentThread().getName() + "不需要保
存");
}
}
private void doSave() throws IOException {
System.out.println(Thread.currentThread().getName() + " 调用doSave,
内容为:" + content);
Writer writer = new FileWriter(filename);
writer.write(content);
writer.close();
}
}
package com.lagou.concurrent.demo;
import java.io.IOException;
import java.util.Random;
public class ChangerThread extends Thread {
private final Data data;
private final Random random = new Random();
public ChangerThread(String name, Data data) {
super(name);
this.data = data;
}
@Override
public void run() {
for (int i = 0; true; i++) {
data.change("第 " + i + " 次修改");
try {
Thread.sleep(random.nextInt(2000));
data.save();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package com.lagou.concurrent.demo;
import java.io.IOException;
public class SaverThread extends Thread {
private final Data data;
public SaverThread(String name, Data data) {
super(name);
this.data = data;
}
@Override
public void run() {
while (true) {
try {
data.save();
Thread.sleep(1000);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
执行效果:
# 4.2 Balking模式中的角色
- GuardedObject(受保护对象)
GuardedObject角色是一个拥有被保护的方法(guardedMethod)的类。当线程执行
guardedMethod时,若保护条件成立,则执行实际的处理,若不成立,则不执行实际的处理,直接返回。
保护条件的成立与否随着GuardedObject角色状态的改变而变动。
除了guardedMethod之外,GuardedObject角色还有可能有其他改变状态的方法
(stateChangingMethod)。
在此案例中,Data类对应于GuardedObject,save方法对应guardedMethod,change方法对应stateChangingMethod方法。
保护条件是changed字段为true。
# 4.3 类图
# 4.4 何时使用Balking模式
- 不需要执行时
在此示例程序中,content字段的内容如果没有修改,就将save方法balk。之所以要balk,是因为content已经写文件了,无需再写了。如果并不需要执行,就可以使用Balking模式。此时可以提高程序性能。
- 不需要等待守护条件成立时
Balking模式的特点就是不等待。若条件成立,就执行,若不成立,就不执行,立即进入下一个操作。
- 守护条件仅在第一次成立时
当“守护条件仅在第一次成立”时,可以使用Balking模式。
比如各种类的初始化操作,检查一次是否初始化了,如果初始化了,就不用执行了。如果没有初始化,则进行初始化。
# 4.5 balk结果的表示
- 忽略balk
最简单的方式就是不通知调用端“发生了balk”。示例程序采用的就是这种方式。
- 通过返回值表示balk
通过boolean值表示balk。若返回true,表示未发生balk,需要执行并执行了处理。若false,则表示发生了balk,处理已执行,不再需要执行。
有时也会使用null来表示“发生了balk”。
- 通过异常表示balk
有时也通过异常表示“发生了balk”。即,当balk时,程序并不从方法return,而是抛异常。
# 4.6 Balking和Guarded Suspension模式之间
介于“直接balk并返回”和“等待到守护条件成立为止“这两种极端之间的还有一种”在守护条件成立之前等待一段时间“。在守护条件成立之前等待一段时间,如果到时条件还未成立,则直接balk。
这种操作称为计时守护(guarded timed)或超时(timeout)。
# 4.7 java.util.concurrent中的超时
- 通过异常通知超时
当发生超时抛出异常时,不适合使用返回值表示超时,需要使用
java.util.concurrent.TimeoutException异常。
如:
java.util.concurrent.Future的get方法;
java.util.concurrent.Exchanger的exchange方法;
java.util.concurrent.Cyclicarrier的await方法
java.util.concurrent.CountDownLatch的await方法。
- 通过返回值通知超时
当执行多次try时,则不使用异常,而使用返回值表示超时。
如:
java.util.concurrent.BlockingQueue接口,当offer方法的返回值为false,或poll方法的返回值为null,表示发生了超时。
java.util.concurrent.Semaphore类,当tryAcquire方法的返回值为false时,表示发生了超时。
java.util.concurrent.locks.Lock接口,当tryLock方法的返回值为false时,表示发生了超时。
# 5 Producer-Consumer模式
生产者安全地将数据交给消费者。
当生产者和消费者以不同的线程运行时,两者之间的处理速度差异会有问题。
生产者消费者模式用于消除线程间处理速度的差异带来的问题。
在该模式中,生产者和消费者都有多个,当生产者和消费者只有一个时,我们称为管道(Pipe)模式。
# 5.1 示例程序
package com.lagou.concurrent.demo;
public class Table {
private final String[] buffer;
private int tail;
private int head;
private int count;
public Table(int count) {
this.buffer = new String[count];
this.head = 0;
this.tail = 0;
this.count = 0;
}
public synchronized void put(String steamedBread) throws
InterruptedException {
System.out.println(Thread.currentThread().getName() + " 蒸出来 " +
steamedBread);
while (count >= buffer.length) {
wait();
}
buffer[tail] = steamedBread;
tail = (tail + 1) % buffer.length;
count++;
notifyAll();
}
public synchronized String take() throws InterruptedException {
while (count <= 0) {
wait();
}
String steamedBreak = buffer[head];
head = (head + 1) % buffer.length;
count--;
notifyAll();
System.out.println(Thread.currentThread().getName() + " 取走 " +
steamedBreak);
return steamedBreak;
}
}
package com.lagou.concurrent.demo;
import java.util.Random;
public class CookerThread extends Thread {
private final Random random;
private final Table table;
private static int id = 0;
public CookerThread(String name, Table table, long seed) {
super(name);
this.table = table;
this.random = new Random(seed);
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(random.nextInt(1000));
String steamedBread = "[ Steamed bread No. " + nextId() + "
by " + getName() + "]";
table.put(steamedBread);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private static synchronized int nextId() {
return id++;
}
}
package com.lagou.concurrent.demo;
import java.util.Random;
public class EaterThread extends Thread {
private final Random random;
private final Table table;
public EaterThread(String name, Table table, long seed) {
super(name);
this.table = table;
this.random = new Random(seed);
}
@Override
public void run() {
try {
while (true) {
String steamedBread = table.take();
Thread.sleep(random.nextInt(1000));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package com.lagou.concurrent.demo;
public class ConcurrentDemo {
public static void main(String[] args) {
Table table = new Table(3);
new CookerThread("厨师-1", table, 12345L).start();
new CookerThread("厨师-2", table, 23456L).start();
new CookerThread("厨师-3", table, 34567L).start();
new EaterThread("饭桶-1", table, 45678L).start();
new EaterThread("饭桶-2", table, 56789L).start();
new EaterThread("饭桶-3", table, 67890L).start();
}
}
执行效果:
# 5.2 关于put方法
put方法会抛出InterruptedException异常。如果抛出,可以理解为”该操作已取消“。
put方法使用了Guarded Suspension模式。
tail和count的更新采取buffer环的形式。
notifyAll方法唤醒正在等待馒头的线程来吃。
# 5.3 关于take方法
take方法会抛出InterruptedException异常,表示该操作已取消。
take方法采用了Guarded Suspension模式。
head和count的更新采用了buffer环的形式。
notifyAll唤醒等待的厨子线程开始蒸馒头。
# 5.4 时序图:
# 5.5 Producer-Consumer模式中的角色
- Data
Data角色由Producer角色生成,供Consumer角色使用。在本案例中,String类的馒头对应于Data角色。
- Producer
Producer角色生成Data角色,并将其传递给Channel角色。本案例中,CookerThread对应于
Producer角色。
- Consumer
Consumer角色从Channel角色获取Data角色并使用。本案例中,EaterThread对应于Consumer角色。
- Channel角色
Channel角色管理从Producer角色获取的Data角色,还负责响应Consumer角色的请求,传递Data角色。为了安全,Channel角色会对Producer角色和Consumer角色进行互斥处理。
当producer角色将Data角色传递给Channel角色时,如果Channel角色状态不能接收Data角色,则Producer角色将一直等待,直到Channel可以接收Data角色为止。
当Consumer角色从Channel角色获取Data角色时,如果Channel角色状态没有可以传递的Data角色,则Consumer角色将一直等待,直到Channel角色状态转变为可以传递Data角色为止。
当存在多个Producer角色和Consumer角色时,Channel角色需要对它们做互斥处理。
# 5.6 类图:
# 5.7 守护安全性的Channel角色(可复用)
在生产者消费者模型中,承担安全守护责任的是Channel角色。Channel角色执行线程间的互斥处理,确保Producer角色正确地将Data角色传递给Consumer角色。
# 5.8 不要直接传递
Consumer角色想要获取Data角色,通常是因为想使用这些Data角色来执行某些处理。如果
Producer角色直接调用Consumer的方法,执行处理的就不是Consumer的线程,而是Producer角色的线程了。这样一来,异步处理变同步处理,会发生不同Data间的延迟,降低程序的性能。
# 5.9 传递Data角色的顺序
- 队列——先生产先消费
- 栈——先生产后消费
- 优先队列——”优先“的先消费
# 5.10 Channel意义
线程的协调要考虑”放在中间的东西“
线程的互斥要考虑”应该保护的东西“
为了让线程协调运行,必须执行互斥处理,以防止共享的内容被破坏。线程的互斥处理时为了线程的协调运行而执行的。
# 5.11 JUC包和Producer-Consumer模式
JUC中提供了BlockingQueue接口及其实现类,相当于Producer-Consumer模式中的Channel角色。
- BlockingQueue接口——阻塞队列
- ArrayBlockingQueue——基于数组的BlockingQueue
- LinkedBlockingQueue——基于链表的BlockingQueue
- PriorityBlockingQueue——带有优先级的BlockingQueue
- DelayQueue——一定时间之后才可以take的BlockingQueue
- SynchronousQueue——直接传递的BlockingQueue
- ConcurrentLinkedQueue——元素个数没有最大限制的线程安全队列
# 6 Read-Write Lock模式
当线程读取实例的状态时,实例的状态不会发生变化。实例的状态仅在线程执行写入操作时才会发生变化。
从实例状态变化来看,读取和写入有本质的区别。
在本模式中,读取操作和写入操作分开考虑。在执行读取操作之前,线程必须获取用于读取的锁。
在执行写入操作之前,线程必须获取用于写入的锁。
可以多个线程同时读取,读取时不可写入。
当线程正在写入时,其他线程不可以读取或写入。
一般来说,执行互斥会降低程序性能。如果把写入的互斥和读取的互斥分开考虑,则可以提高性
能。
# 6.1 示例程序
# 入口程序
package com.lagou.concurrent.demo;
public class ConcurrentDemo {
public static void main(String[] args) {
Data data = new Data(10);
// 启动读取线程,读取同一个对象中的数据
new ReaderThread(data).start();
new ReaderThread(data).start();
new ReaderThread(data).start();
new ReaderThread(data).start();
new ReaderThread(data).start();
new ReaderThread(data).start();
// 启动写线程,两个线程一个写大写,另一个写小写
new WriterThread(data, "ABCDEFGHIJKLMNOPQRSTUVWXYZ").start();
new WriterThread(data, "abcdefghijklmnopqrstuvwxyz").start();
}
}
# 数据对象
package com.lagou.concurrent.demo;
public class Data {
private final char[] buffer;
// Data在线程之间共享
// Data保有读写锁
private final ReadWriteLock lock = new ReadWriteLock();
public Data(int size) {
this.buffer = new char[size];
for (int i = 0; i < buffer.length; i++) {
buffer[i] = '*';
}
}
public char[] read() throws InterruptedException {
lock.readLock();
try {
return doRead();
} finally {
lock.readUnlock();
}
}
public void write(char c) throws InterruptedException {
// 加锁,当前线程执行写操作。
lock.writeLock();
try {
// 写操作
doWrite(c);
} finally {
// 写入完成,释放锁,同时唤醒其他等待的线程
lock.writeUnlock();
}
}
private char[] doRead() {
char[] newbuf = new char[buffer.length];
for (int i = 0; i < buffer.length; i++) {
newbuf[i] = buffer[i];
}
slowly();
return newbuf;
}
private void doWrite(char c) {
for (int i = 0; i < buffer.length; i++) {
buffer[i] = c;
slowly();
}
}
private void slowly() {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
# 读写锁
package com.lagou.concurrent.demo;
public class ReadWriteLock {
private int readingReaders = 0;
private int waitingWriters = 0;
private int writingWriters = 0;
private boolean preferWriter = true;
/**
* 对于读操作:
* 1. 如果有正在写入的writer,则当前线程等待writer写入完成
* 2. 如果没有正在写入的writer,但是有正在等待写入的writer,同时偏向写操作
* 则当前线程等待writer写入完成
* @throws InterruptedException
*/
public synchronized void readLock() throws InterruptedException {
while (writingWriters > 0 || (preferWriter && waitingWriters > 0)) {
// 当前线程等待
wait();
}
readingReaders++;
}
public synchronized void readUnlock() {
readingReaders--;
preferWriter = true;
// 唤醒在当前对象上等待的其他线程
notifyAll();
}
public synchronized void writeLock() throws InterruptedException {
waitingWriters++;
try {
// 如果有正在读取的线程,或者有正在写入的线程,则当前线程等待被唤醒
while (readingReaders > 0 || writingWriters > 0) {
wait();
}
} finally {
waitingWriters--;
}
writingWriters++;
}
public synchronized void writeUnlock() {
writingWriters--;
preferWriter = false;
// 当前线程写入完成,唤醒其他等待的线程进行读写操作
notifyAll();
}
}
# 写线程
package com.lagou.concurrent.demo;
import java.util.Random;
public class WriterThread extends Thread {
private static final Random RANDOM = new Random();
private final Data data;
private final String filler;
private int index = 0;
public WriterThread(Data data, String filler) {
this.data = data;
this.filler = filler;
}
@Override
public void run() {
try {
while (true) {
// 使用轮询的方法,读取当前一个字节
char c = nextchar();
// 写字节
data.write(c);
Thread.sleep(RANDOM.nextInt(3000));
}
} catch (InterruptedException e) {
}
}
private char nextchar() {
char c = filler.charAt(index);
index++;
if (index >= filler.length()) {
index = 0;
}
return c;
}
}
# 读取线程
package com.lagou.concurrent.demo;
public class ReaderThread extends Thread {
private final Data data;
public ReaderThread(Data data) {
this.data = data;
}
@Override
public void run() {
try {
while (true) {
char[] readbuf = data.read();
System.out.println(Thread.currentThread().getName() + " 读取
了 " + String.valueOf(readbuf));
}
} catch (InterruptedException e) {
}
}
}
# 6.2 守护条件
readLock方法和writeLock方法都是用了Guarded Suspension模式。Guarded Suspension模式的重点是守护条件。
# 6.3 readLock方法:
读取线程首先调用readLock方法。当线程从该方法返回,就可以执行实际的读取操作。
当线程开始执行实际的读取操作时,只需要判断是否存在正在写入的线程,以及是否存在正在等待的写入线程。
不考虑读取线程。
如果存在正在写入的线程或者存在正在等待的写线程,则等待。
# 6.4 writeLock方法:
在线程开始写入之前,调用writeLock方法。当线程从该方法返回后,就可以执行实际的写入操作。
开始执行写入的条件:如果有线程正在执行读取操作,出现读写冲突;或者如果有线程正在执行写入的操作,引起写冲突,当前线程等待。
# 6.5 Read-Write Lock模式中的角色:
Reader
该角色对共享资源角色执行读取操作。
Writer
该角色对共享资源角色执行写操作。
SharedResource
共享资源角色表示Reader角色和Writer角色共享的资源。共享资源角色提供不修改内部状态的操作(读取)和修改内部状态的操作(写)。
当前案例中对应于Data类。
ReadWriteLock
读写锁角色提供了共享资源角色实现读操作和写操作时需要的锁,即当前案例中的readLock和
readUnlock,以及writeLock和writeUnlock。对应于当前案例中ReadWriteLock类。
# 6.6 要点
- 利用读取操作的线程之间不会冲突的特性来提高程序性能
Read-Write Lock模式利用了读操作的线程之间不会冲突的特性。由于读取操作不会修改共享
资源的状态,所以彼此之间无需加锁。因此,多个Reader角色同时执行读取操作,从而提高
程序性能。
- 适合读取操作负载较大的情况
如果单纯使用Single Threaded Execution模式,则read也只能运行一个线程。如果read负载
很重,可以使用Read-Write Lock模式。
- 适合少写多读
Read-Write Lock模式优点是Reader之间不会冲突。如果写入很频繁,Writer会频繁停止
Reader的处理,也就无法体现出Read-Write Lock模式的优势了。
# 6.7 锁的含义
synchronized可以用于获取实例的锁。java中同一个对象锁不能由两个以上的线程同时获取。
用于读取的锁和用于写入的锁与使用synchronized获取的锁是不一样的。开发人员可以通过修改ReadWriteLock类来改变锁的运行。
ReadWriteLock类提供了用于读取的锁和用于写入的锁两个逻辑锁,但是实现这两个逻辑锁的物理锁只有一个,就是ReadWriteLock实例持有的锁。
# 6.8 JUC包和Read-Write Lock模式
java.util.concurrent.locks包提供了已实现Read-Write Lock模式的ReadWriteLock接口和
ReentrantReadWriteLock类。
java.util.concurrent.locks.ReadWriteLock接口的功能和当前案例中的ReadWriteLock类类似。不同之处在于该接口用于读取的锁和用于写入的锁是通过其他对象来实现的。
当前案例的ReadWriteLock | java.util.concurrent.locks中的ReadWriteLock接口 |
---|---|
readLock() | readLock().lock() |
readLock().lock() | readLock().unlock() |
writeLock() | writeLock().lock() |
writeUnlock() | writeLock().unlock() |
java.util.concurrent.locks.ReentrantReadWriteLock类实现了ReadWriteLock接口。其特征如下:
- 公平性
当创建ReentrantReadWriteLock类的实例时,可以选择锁的获取顺序是否要设置为fair的。如
果创建的实例是公平的,那么等待时间久的线程将可以优先获取锁。
- 可重入性
ReentrantReadWriteLock类的锁是可重入的。Reader角色的线程可以获取用于写入的锁,
Writer角色的线程可以获取用于读取的锁。
- 锁降级
ReentrantReadWriteLock类可以按如下顺序将用于写入的锁降级为用于读取的锁:
用于读取的锁不能升级为用于写入的锁。
- 快捷方法
ReentrantReadWriteLock类提供了获取等待中的线程个数的方法 getQueueLength ,以及检
查是否获取了用于写入锁的方法 isWriteLocked 等方法。
# 7 Thread-Per-Message模式
该模式可以理解为“每个消息一个线程”。消息这里可以理解为命令或请求。每个命令或请求分配一个线程,由这个线程来处理。
这就是Thread-Per-Message模式。
在Thread-Per-Message模式中,消息的委托方和执行方是不同的线程。
# 7.1 示例程序
在此示例程序中,ConcurrentDemo类委托Host来显示字符。Host类会创建一个线程,来处理委托。启动的线程使用Helper类来执行实际的显示。
# 主入口类
package com.lagou.concurrent.demo;
public class ConcurrentDemo {
public static void main(String[] args) {
System.out.println("主线程 -- 开始执行");
Host host = new Host();
host.request(10, 'A');
host.request(20, 'B');
host.request(30, 'C');
System.out.println("主线程 -- 执行结束");
}
}
# 处理器类
package com.lagou.concurrent.demo;
public class Host {
private final Helper helper = new Helper();
public void request(final int count, final char c) {
System.out.println("\t请求:【" + count + "," + c + "】开始。。。");
new Thread() {
@Override
public void run() {
helper.handle(count, c);
}
}.start();
System.out.println("\t请求:【" + count + "," + c + "】结束!!!");
}
}
# 工具类
package com.lagou.concurrent.demo;
public class Helper {
public void handle(int count, char c) {
System.out.println("\t\t处理:【" + count + "," + c + "】开始。。。");
for (int i = 0; i < count; i++) {
slowly();
System.out.print(c);
}
System.out.println("");
System.out.println("\t\t处理:【" + count + "," + c + "】结束!!!");
}
private void slowly() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
# 7.2 Thread-Per-Message模式中的角色
# Client(委托方)
Client角色向Host角色发起请求,而不用关心Host角色如何实现该请求处理。
当前案例中对应于ConcurrentDemo类。
# Host
Host角色收到Client角色请求后,创建并启用一个线程。新建的线程使用Helper角色来处理请求。
当前案例中对应于Host类。
# Helper
Helper角色为Host角色提供请求处理的功能。Host角色创建的新线程调用Helper角色。
当前案例中对应于Helper类。
# 7.3 类图1
# 7.4 要点
- 提高响应性,缩短延迟时间
Thread-Per-Message模式能够提高与Client角色对应的Host角色的响应性,降低延迟时间。
尤其是当handle操作非常耗时或者handle操作需要等待输入/输出时,效果很明显。
为了缩短线程启动花费的时间,可以使用Worker Thread模式。
- 适用于操作顺序没有要求时
在Thread-Per-Message模式中,handle方法并不一定按照request方法的调用顺序来执行。
- 适用于不需要返回值时
在Thread-Per-Message模式中,request方法并不会等待handle方法的执行结束。request得
不到handle的结果。
当需要获取操作结果时,可以使用Future模式。
- 应用于服务器
JUC包和Thread-Per-Message模式
# 7.5 JUC包和Thread-Per-Message模式
java.lang.Thread类
最基本的创建、启动线程的类
java.lang.Runnable接口
线程锁执行的任务接口
java.util.concurrent.ThreadFactory接口
将线程创建抽象化的接口
java.util.concurrent.Executors
用于创建实例的工具类
java.util.concurrent.Executor接口
将线程执行抽象化的接口
java.util.concurrent.ExecutorService接口
将被复用的线程抽象化的接口
java.util.concurrent.ScheduledExecutorService类
将被调度线程的执行抽象化的接口
# 7.6 类图2
# 8 Worker Thread模式
在Worker Thread模式中,工人线程(worker thread)会逐个取回工作并进行处理。当所有工作全部完成后,工人线程会等待新的工作到来。
Worker Thread模式也被称为Background Thread模式。有时也称为Thread Pool模式。
# 8.1 示例程序
ClientThread类的线程会向Channel类发送工作请求(委托)。
Channel类的实例有五个工人线程进行工作。所有工人线程都在等待工作请求的到来。
当收到工作请求后,工人线程会从Channel获取一项工作请求并开始工作。工作完成后,工人线程回到Channel那里等待下一项工作请求。
# 8.2 类图
# 8.3 时序图
package com.lagou.concurrent.demo;
public class ConcurrentDemo {
public static void main(String[] args) {
Channel channel = new Channel(5);
channel.startWorkers();
new ClientThread("张三", channel).start();
new ClientThread("李四", channel).start();
new ClientThread("王五", channel).start();
}
}
package com.lagou.concurrent.demo;
public class Channel {
private static final int MAX_REQUEST = 100;
private final Request[] requestQueue;
private int tail;
private int head;
private int count;
private final WorkerThread[] threadPool;
public Channel(int threads) {
this.requestQueue = new Request[MAX_REQUEST];
this.head = 0;
this.tail = 0;
this.count = 0;
threadPool = new WorkerThread[threads];
for (int i = 0; i < threadPool.length; i++) {
threadPool[i] = new WorkerThread("Worker-" + i, this);
}
}
public void startWorkers() {
for (int i = 0; i < threadPool.length; i++) {
threadPool[i].start();
}
}
public synchronized void putRequest(Request request) {
while (count >= requestQueue.length) {
try {
wait();
} catch (InterruptedException e) {
}
}
requestQueue[tail] = request;
tail = (tail + 1) % requestQueue.length;
count++;
notifyAll();
}
public synchronized Request takeRequest() {
while (count <= 0) {
try {
wait();
} catch (InterruptedException e) {
}
}
Request request = requestQueue[head];
head = (head + 1) % requestQueue.length;
count--;
notifyAll();
return request;
}
}
package com.lagou.concurrent.demo;
import java.util.Random;
public class ClientThread extends Thread {
private final Channel channel;
private static final Random RANDOM = new Random();
public ClientThread(String name, Channel channel) {
super(name);
this.channel = channel;
}
@Override
public void run() {
try {
for (int i = 0; true; i++) {
Request request = new Request(getName(), i);
channel.putRequest(request);
Thread.sleep(RANDOM.nextInt(1000));
}
} catch (InterruptedException e) {
}
}
}
package com.lagou.concurrent.demo;
import java.util.Random;
public class Request {
private final String name;
private final int number;
private static final Random RANDOM = new Random();
public Request(String name, int number) {
this.name = name;
this.number = number;
}
public void execute() {
System.out.println(Thread.currentThread().getName() + " 执行 " +
this);
try {
Thread.sleep(RANDOM.nextInt(1000));
} catch (InterruptedException e) {
}
}
@Override
public String toString() {
return "Request{" +
"name='" + name + '\'' +
", number=" + number +
'}';
}
}
package com.lagou.concurrent.demo;
public class WorkerThread extends Thread {
private final Channel channel;
public WorkerThread(String name, Channel channel) {
super(name);
this.channel = channel;
}
@Override
public void run() {
while (true) {
Request reqeust = channel.takeRequest();
reqeust.execute();
}
}
}
# 8.4 Worker Thread模式中的角色
- Client(委托者)
Client角色创建Request角色并将其传递给Channel角色。在本例中,ClientThread对应Client
角色。
- Channel
Channel角色接收来自Client角色的Request角色,并将其传递给Worker角色。在本例中,
Channel类对应Channel角色。
- Worker
Worker角色从Channel角色中获取Request角色,并执行其逻辑。当一项工作结束后,继续从
Channel获取另外的Request角色。本例中,WorkerThread类对应Worker角色。
- Request
Request角色表示工作。Request角色中保存了工作的逻辑。本例中,Request类对应Request
角色。
# 8.5 Worker Thread模式的优点
- 提高吞吐量
如果将工作交给其他线程,当前线程就可以处理下一项工作,称为Thread Per Message模
式。
由于启动新线程消耗时间,可以通过Worker Thread模式轮流和反复地使用线程来提高吞吐
量。
- 容量控制
Worker角色的数量在本例中可以传递参数指定。
Worker角色越多,可以并发处理的逻辑越多。同时增加Worker角色会增加消耗的资源。必须
根据程序实际运行环境调整Worker角色的数量。
- 调用与执行的分离
Worker Thread模式和Thread Per Message模式一样,方法的调用和执行是分开的。方法的
调用是invocation,方法的执行是execution。
这样,可以:
- 提高响应速度;
- 控制执行顺序,因为执行不受调用顺序的制约;
- 可以取消和反复执行;
- 进行分布式部署,通过网络将Request角色发送到其他Woker计算节点进行处理。
- Runnable接口的意义
java.lang.Runnable 接口有时用作Worker Thread模式的Request角色。即可以创建
Runnable接口的实现类对象表示业务逻辑,然后传递给Channel角色。
Runnable对象可以作为方法参数,可以放到队列中,可以跨网络传输,也可以保存到文件
中。如此则Runnable对象不论传输到哪个计算节点,都可以执行。
- 多态的Request角色
本案例中,ClientThread传递给Channel的只是Request实例。但是WorkerThread并不知道
Request类的详细信息。
即使我们传递的是Request的子类给Channel,WorkerThread也可以正常执行execute方法。
通过Request的多态,可以增加任务的种类,而无需修改Channel角色和Worker角色。
# 8.6 JUC包和Worker Thread模式
- ThreadPoolExecutor类
java.util.concurrent.ThreadPoolExecutor 类是管理Worker线程的类。可以轻松实现
Worker Thread模式。
- 通过 java.util.concurrent 包创建线程池
java.util.concurrent.Executors 类就是创建线程池的工具类。
# 9 Future模式
Future的意思是未来。假设由一个方法需要长时间执行才能获取结果,则一般不会让调用的程序等待,而是先返回给它一张“提货卡”。获取提货卡并不消耗很多时间。该“提货卡”就是Future角色。
获取Future角色的线程稍后使用Future角色来获取运行结果。
# 9.1 示例程序
类名 | 说明 |
---|---|
Main | 向Host发出请求并获取数据 |
Host | 接收请求,并向客户端返回FutureData |
Data | 访问数据方法的接口。 |
FutureData | RealData的“提货单”。 |
RealData | 实际数据的类。构造函数会花费很长时间。 |
# Host类
package com.lagou.concurrent.demo;
public class Host {
public Data request(final int count, final char c) {
System.out.println("\trequest(" + count + ", " + c + ") 开始");
// 创建FutureData对象
final FutureData future = new FutureData();
// 启动新线程,创建RealData对象
new Thread() {
@Override
public void run() {
RealData realData = new RealData(count, c);
future.setRealData(realData);
}
}.start();
System.out.println("\trequest(" + count + ", " + c + ") 结束");
// 返回提货单
return future;
}
}
# Data接口:
package com.lagou.concurrent.demo;
public interface Data {
String getContent();
}
# FutureData类:
package com.lagou.concurrent.demo;
public class FutureData implements Data {
private RealData realData = null;
private boolean ready = false;
public synchronized void setRealData(RealData realData) {
// balking,如果已经准备好,就返回
if (ready) {
return;
}
this.realData = realData;
this.ready = true;
notifyAll();
}
@Override
public synchronized String getContent() {
// guarded suspension
while (!ready) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return realData.getContent();
}
}
# RealData类:
package com.lagou.concurrent.demo;
public class RealData implements Data {
private final String content;
public RealData(int count, char c) {
System.out.println("\t组装RealData(" + count + ", " + c + ") 开始");
char[] buffer = new char[count];
for (int i = 0; i < count; i++) {
buffer[i] = c;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("\t\t组装RealData(" + count + ", " + c + ") 结
束");
this.content = new String(buffer);
}
@Override
public String getContent() {
return content;
}
}
# Main类:
package com.lagou.concurrent.demo;
public class Main {
public static void main(String[] args) {
Host host = new Host();
Data data1 = host.request(10, 'A');
Data data2 = host.request(20, 'B');
Data data3 = host.request(30, 'C');
System.out.println("等待一会儿再获取结果");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("data1 = " + data1.getContent());
System.out.println("data2 = " + data2.getContent());
System.out.println("data3 = " + data3.getContent());
}
}
# 9.2 流程图
# 9.3 Future模式中的角色
- Client(请求者)
Client角色向Host角色发出请求,并立即接收到请求的处理结果——VirtualData角色,也就是
Future角色。
Client角色不必知道返回值是RealData还是Future角色。稍后通过VirtualData角色来操作。
本案例中,对应Main类。
- Host
Host角色创建新的线程,由新线程创建RealData角色。同时,Host角色将Future角色(当做
VirtualData角色)返回给Client角色。本案例中对应Host类。
- VirtualData(虚拟数据)
VirtualData角色是让Future角色与RealData角色具有一致性的角色。本案例中对应Data接口。
- RealData(真实数据)
RealData角色是表示真实数据的角色。创建该对象需要花费很多时间。本案例中对应
RealData类。
- Future
Future角色是RealData角色的“提货单”,由Host角色传递给Client角色。对Client而言,
Future角色就是VirtualData角色。当Client角色操作Future角色时线程会wait,直到RealData
角色创建完成。
Future角色将Client角色的操作委托给RealData角色。
本案例中,对应于FutureData类。
# 9.4 要点:
- 使用Thread Per Message模式,可以提高程序响应性,但是不能获取结果。Future模式也可
以提高程序响应性,还可以获取处理结果。
- 利用Future模式异步处理特性,可以提高程序吞吐量。虽然并没有减少业务处理的时长,但是如果考虑到I/O,当程序进行磁盘操作时,CPU只是处于等待状态。CPU有空闲时间处理其他
的任务。
- “准备返回值”和“使用返回值”的分离。
- 如果想等待处理完成后获取返回值,还可以考虑采用回调处理方式。即,当处理完成后,由
Host角色启动的线程调用Client角色的方法,进行结果的处理。此时Client角色中的方法需要
线程安全地传递返回值。
# 9.5 JUC包与Future模式
java.util.concurrent包提供了用于支持Future模式的类和接口。
java.util.concurrent.Callable接口将“返回值的某种处理调用”抽象化了。Callable接口声明了call方法。call方法类似于Runnable的run方法,但是call方法有返回值。Callable<String>
表示Callable接口的call方法返回值类型为String类型。
java.util.concurrent.Future接口相当于本案例中的Future角色。Future接口声明了get方法来获取结果,但是没有声明设置值的方法。设置值的方法需要在Future接口的实现类中声明。Future<String>
表示“Future接口的get方法返回值类型是String类型”。除了get方法,Future接口还声明了用于中断运行的cancel方法。
java.util.concurrent.FutureTask类是实现了Future接口的标准类。FutureTask类声明了用于获取值的get方法、用于中断运行的cancel方法、用于设置值的set方法,以及用于设置异常的setException方法。由于FutureTask类实现了Runnable接口,还声明了run方法。