Java之线程池

感受线程池的好处

需求:查看文档,请求文档,返回“文档处理中,请稍后在试”,服务器异步处理将Word文档转为PDF

image-20201102192817914

代码实现

新老实现方式的对比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package com.java.example.threadpool;

import org.testng.annotations.Test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @author jingLv
* @date 2020/11/02
*/
public class ThreadVs {

/**
* 新的处理方式
*/
@Test
public void newHandle() {
// 开启了一个线程池:线程个数是10个
ExecutorService threadPool = Executors.newFixedThreadPool(10);
// 使用循环来模拟许多用户请求的场景
for (int request = 1; request < 100; request++) {
threadPool.execute(() -> {
System.out.println("文档处理开始!");
try {
// 将Word转换为PDF格式:处理时长很差的耗时过程
Thread.sleep(1000L * 30);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("文档处理结束!");
});
}

try {
Thread.sleep(1000L * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

/**
* 老的处理方式
*/
@Test
public void oldHandle() {
// 使用循环来模拟许多用户请求的场景
for (int request = 1; request < 100; request++) {
new Thread(() -> {
System.out.println("文档处理开始!");
try {
// 将Word转换为PDF格式:处理时长很差的耗时过程
Thread.sleep(1000L * 30);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("文档处理结束!");
}).start();
}

try {
Thread.sleep(1000L * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

线程池简介

什么是线程池?

线程池顾名思义就是事先创建若干个可执行的线程放入一个池(容器)中,需要的时候从池中获取线程不用自行创建,使用完毕不需要销毁线程而是放回池中,从而减少创建和销毁线程对象的开销。

线程池带来的好处

  • 降低资源消耗
    • 通过对重复利用已创建的资源,包括线程,来降低线程创建和销毁造成的消耗,包括数据库连接池也是一样的
  • 提高响应速度
    • 当有任务到达时,任务不需要等待线程的创建,就能立即执行
  • 提高线程的可管理性
    • 线程本就是有限制资源,无限制的创建不仅会消耗系统的资源还会降低系统的稳定性,严重会使系统崩溃,使用线程池就可以观察线程有多少,可以动态的分配,包括可进行优化

简单线程池设计

面试题:独立设计一个简单线程池

image-20201103114614198

优化版的线程池

image-20201103114656578

线程池参数与处理流程

线程池的核心参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
*线程池核心参数
*
* @param corePoolSize 核心线程数量
* @param maximumPoolSize 最大线程数量
* @param keepAliveTIme 线程空闲的存活时间
* @param unit 时间单位
* @param workQueue 用于存放任务的阻塞队列
* @param threadFactory 线程工厂类
* @param handler 当队列和最大线程池都满了之后的饱和策略
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTIme,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler){}

线程池的处理流程

image-20201103120619849

线程池可选择的阻塞队列

什么是阻塞队列?

阻塞队列就是一个支持两个附加操作的队列,这两个附加操作队列是阻塞的插入和移除的方法。

  • 阻塞插入:当队列满时,队列会阻塞插入元素的线程,直到队列不满
  • 阻塞移除:当队列为空时,获取元素的线程会等待队列变为非空(会等待队列中又数据)

阻塞队列

  • 无界队列
  • 有界队列
  • 同步移交队列(不存储元素的阻塞队列,每个插入的操作必须要等到另个一线程调用移除操作才能成功,否则插入的操作一直处于阻塞状态)

实战案例:三种实现阻塞队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package com.java.example.threadpool;

import org.testng.annotations.Test;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;

/**
* @author jingLv
* @date 2020/11/03
*/
public class QueueTest {

/**
* 基于数组的有界队列,队列容量为10
* 添加到10个元素后,队列就会阻塞
*/
@Test
public void testArrayBlockingQueue() {
ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10);

// 循环向队列添加值
for (int i = 0; i < 20; i++) {
try {
queue.put(i);
System.out.println("向队列中添加值:" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

/**
* 基于链表的有界/无界队列,队列容量为10
* 有界--添加到10个元素后,队列就会阻塞
*/
@Test
public void testLinkedBlockingQueue() {
// 有界
// LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);
// 无界
LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();


// 循环向队列添加值
for (int i = 0; i < 20; i++) {
try {
queue.put(i);
System.out.println("向队列中添加值:" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

/**
* 同步移交队列
* 没有存储元素的能力,每个插入的操作必须要等到另个一线程调用移除操作才能成功,否则插入的操作一直处于阻塞状态
*/
@Test
public void test() {
SynchronousQueue<Integer> queue = new SynchronousQueue<>();

// 插入值
new Thread(() -> {
try {
queue.put(1);
System.out.println("插入成功");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

// 从队列中删除值
new Thread(() -> {
try {
queue.take();
System.out.println("删除成功");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}

}

线程池可选择的饱和策略与执行示意图

饱和策略

  • AbortPolicy终止策略(默认)
  • DiscardPolicy抛弃策略
  • DiscardOldestPolicy抛弃就任务策略
  • CallerRunsPolicy调用者运行策略

image-20201103180330511

执行示意图

image-20201103181528868

实战案例:饱和策略之终止策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package com.java.example.threadpool;

import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* 饱和策略
*
* @author jingLv
* @date 2020/11/05
*/
public class PolicyTest {

/**
* 线程池
*/
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
// 核心线程数和最大线程数
2, 3,
// 线程空闲后的存活时间
60L, TimeUnit.SECONDS,
// 有界阻塞队列
new LinkedBlockingQueue<>(5));

/**
* 定义线程池中执行任务
*/
class Task implements Runnable {

/**
* 任务名称
*/
private String taskName;

public Task(String taskName) {
this.taskName = taskName;
}

@Override
public void run() {
System.out.println("线程[" + Thread.currentThread().getName() + "]正在执行[" + this.taskName + "]任务!");
try {
Thread.sleep(1000L * 5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程[" + Thread.currentThread().getName() + "]已执行完成[" + this.taskName + "]任务!");
}
}

/**
* 终止策略
* 线程池的执行过程
* 2个核心线程
* 5个任务队列
* 3个最大线程:除去2个核心线程,只有1个可用
* <p>
* 前2个任务,会占用2个核心线程
* 第3个到第7个任务,会暂存到任务队列中
* 第8个任务,会启动最大线程执行
* 第9个任务,没有线程可以去执行
*/
@Test
public void abortPolicyTest() {
// 设置饱和策略为终止策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
for (int i = 1; i <= 10; i++) {
try {
// 提交10个线程任务
executor.execute(new Task("线程任务" + i));
} catch (Exception e) {
System.err.println(e);
}
}
// 关闭线程池
executor.shutdown();
}

/**
* 单元测试执行完,主线程等待100秒,防止主线程退出
*
* @throws InterruptedException
*/
@AfterMethod
public void after() throws InterruptedException {
Thread.sleep(1000L * 10);
}
}

执行结果:

image-20201105133845821

实战案例:其他三种饱和策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package com.java.example.threadpool;

import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* 饱和策略
*
* @author jingLv
* @date 2020/11/05
*/
public class PolicyTest {

/**
* 线程池
*/
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
// 核心线程数和最大线程数
2, 3,
// 线程空闲后的存活时间
60L, TimeUnit.SECONDS,
// 有界阻塞队列
new LinkedBlockingQueue<>(5));

/**
* 定义线程池中执行任务
*/
class Task implements Runnable {

/**
* 任务名称
*/
private String taskName;

public Task(String taskName) {
this.taskName = taskName;
}

@Override
public void run() {
System.out.println("线程[" + Thread.currentThread().getName() + "]正在执行[" + this.taskName + "]任务!");
try {
Thread.sleep(1000L * 5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程[" + Thread.currentThread().getName() + "]已执行完成[" + this.taskName + "]任务!");
}
}

/**
* 抛弃策略
* 执行结果,不会有线程任务9和10,抛弃最新的任务
*/
@Test
public void discardPolicyTest() {
// 设置饱和策略为抛弃策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
for (int i = 1; i <= 10; i++) {
try {
// 提交10个线程任务
executor.execute(new Task("线程任务" + i));
} catch (Exception e) {
System.err.println(e);
}
}
// 关闭线程池
executor.shutdown();
}

/**
* 抛弃旧任务策略
* 执行结果看到,会有线程任务9和10,而没有线程任务3和4,相对于9和10,3和4是暂存到任务队列中还未执行的旧的任务,就将3好4抛弃,执行9和10
*/
@Test
public void discardOldestPolicyTest() {
// 设置饱和策略为抛弃旧任务策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
for (int i = 1; i <= 10; i++) {
try {
// 提交10个线程任务
executor.execute(new Task("线程任务" + i));
} catch (Exception e) {
System.err.println(e);
}
}
// 关闭线程池
executor.shutdown();
}

/**
* 调用者运行策略
* 借助于调用者的主线程来运行要被提交到线程池中的任务
* 调用者往线程池中提交任务,发现线程池有拒绝的情况或者已经超负载的情况,就会使用自己的线程(主线程)来执行任务运行,当执行完之后,再次尝试将其余的任务向线程池中提交
*/
@Test
public void callerRunsPolicyTest() {
// 设置饱和策略为调用者运行策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 1; i <= 10; i++) {
try {
// 提交10个线程任务
executor.execute(new Task("线程任务" + i));
} catch (Exception e) {
System.err.println(e);
}
}
// 关闭线程池
executor.shutdown();
}

/**
* 单元测试执行完,主线程等待100秒,防止主线程退出
*
* @throws InterruptedException
*/
@AfterMethod

public void after() throws InterruptedException {
Thread.sleep(1000L * 10);
}
}

常用线程池

newCachedThreadPool

1
2
3
4
5
6
7
8
9
10
/**
* 线程数量无限线程池
*
* @return
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

newFixedThreadPool

1
2
3
4
5
6
7
8
9
10
11
/**
* 线程数量固定线程池
*
* @param nThreads 核心线程数
* @return
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

newSingleThreadExecutor

1
2
3
4
5
6
7
8
9
10
11
/**
* 单一线程线程池
*
* @return
*/
public static ExecutorService newSingleThreadExecutor() {
return new Executors.FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

实战案例:向线程池提交任务

向线程池提交任务的两种方式

  • submit
  • execute
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package com.java.example.threadpool;

import org.testng.annotations.Test;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
* @author jingLv
* @date 2020/11/03
*/
public class RunTest {

@Test
public void submitTest() {
// 创建线程池
ExecutorService executorService = Executors.newCachedThreadPool();

// 利用submit方法提交任务,接收任务的返回结果
Future<Integer> future = executorService.submit(() -> {
Thread.sleep(1000L * 10);
return 2 * 5;
});

Integer num = 0;
try {
// 阻塞方法,直到任务有返回值后,才向下执行
num = future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}

System.out.println("执行结果:" + num);
}

@Test
public void executeTest() {
// 创建线程池
ExecutorService executorService = Executors.newCachedThreadPool();

// 利用execute方法提交任务,没有返回结果
executorService.execute(() -> {
try {
Thread.sleep(1000L * 10);
} catch (InterruptedException e) {
e.printStackTrace();
}

Integer num = 2 * 5;
System.out.println("执行结果:" + num);
});

try {
Thread.sleep(1000L * 10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

线程池的状态

五种状态的切换

image-20201103185627286