Java之线程池
感受线程池的好处
需求:查看文档,请求文档,返回“文档处理中,请稍后在试”,服务器异步处理将Word文档转为PDF
data:image/s3,"s3://crabby-images/f0bc9/f0bc90d9ec29b21e5a878b690c61c06123a0207d" alt="image-20201102192817914"
代码实现
新老实现方式的对比
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| package com.java.example.threadpool;
import org.testng.annotations.Test;
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
public class ThreadVs {
@Test public void newHandle() { ExecutorService threadPool = Executors.newFixedThreadPool(10); for (int request = 1; request < 100; request++) { threadPool.execute(() -> { System.out.println("文档处理开始!"); try { Thread.sleep(1000L * 30); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("文档处理结束!"); }); }
try { Thread.sleep(1000L * 1000); } catch (InterruptedException e) { e.printStackTrace(); } }
@Test public void oldHandle() { for (int request = 1; request < 100; request++) { new Thread(() -> { System.out.println("文档处理开始!"); try { Thread.sleep(1000L * 30); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("文档处理结束!"); }).start(); }
try { Thread.sleep(1000L * 1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
|
线程池简介
什么是线程池?
线程池顾名思义就是事先创建若干个可执行的线程放入一个池(容器)中,需要的时候从池中获取线程不用自行创建,使用完毕不需要销毁线程而是放回池中,从而减少创建和销毁线程对象的开销。
线程池带来的好处
- 降低资源消耗
- 通过对重复利用已创建的资源,包括线程,来降低线程创建和销毁造成的消耗,包括数据库连接池也是一样的
- 提高响应速度
- 当有任务到达时,任务不需要等待线程的创建,就能立即执行
- 提高线程的可管理性
- 线程本就是有限制资源,无限制的创建不仅会消耗系统的资源还会降低系统的稳定性,严重会使系统崩溃,使用线程池就可以观察线程有多少,可以动态的分配,包括可进行优化
简单线程池设计
面试题:独立设计一个简单线程池
data:image/s3,"s3://crabby-images/77a3c/77a3c672a17f360bd010ce74efe669d47e841866" alt="image-20201103114614198"
优化版的线程池
data:image/s3,"s3://crabby-images/912b8/912b8d696bf73850d194371c20930480d8c07c2c" alt="image-20201103114656578"
线程池参数与处理流程
线程池的核心参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTIme, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler){}
|
线程池的处理流程
data:image/s3,"s3://crabby-images/b4b1b/b4b1b1c433ff2e271f65dfd0d8d8016eeb5556ab" alt="image-20201103120619849"
线程池可选择的阻塞队列
什么是阻塞队列?
阻塞队列就是一个支持两个附加操作的队列,这两个附加操作队列是阻塞的插入和移除的方法。
- 阻塞插入:当队列满时,队列会阻塞插入元素的线程,直到队列不满
- 阻塞移除:当队列为空时,获取元素的线程会等待队列变为非空(会等待队列中又数据)
阻塞队列
- 无界队列
- 有界队列
- 同步移交队列(不存储元素的阻塞队列,每个插入的操作必须要等到另个一线程调用移除操作才能成功,否则插入的操作一直处于阻塞状态)
实战案例:三种实现阻塞队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
| package com.java.example.threadpool;
import org.testng.annotations.Test;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue;
public class QueueTest {
@Test public void testArrayBlockingQueue() { ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(10);
for (int i = 0; i < 20; i++) { try { queue.put(i); System.out.println("向队列中添加值:" + i); } catch (InterruptedException e) { e.printStackTrace(); } } }
@Test public void testLinkedBlockingQueue() { LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
for (int i = 0; i < 20; i++) { try { queue.put(i); System.out.println("向队列中添加值:" + i); } catch (InterruptedException e) { e.printStackTrace(); } } }
@Test public void test() { SynchronousQueue<Integer> queue = new SynchronousQueue<>();
new Thread(() -> { try { queue.put(1); System.out.println("插入成功"); } catch (InterruptedException e) { e.printStackTrace(); } }).start();
new Thread(() -> { try { queue.take(); System.out.println("删除成功"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); }
}
|
线程池可选择的饱和策略与执行示意图
饱和策略
- AbortPolicy终止策略(默认)
- DiscardPolicy抛弃策略
- DiscardOldestPolicy抛弃就任务策略
- CallerRunsPolicy调用者运行策略
data:image/s3,"s3://crabby-images/80610/8061002b5260f6d628a49f7062abcd81df070f2a" alt="image-20201103180330511"
执行示意图
data:image/s3,"s3://crabby-images/f7efc/f7efcae78c3231f76803833fd466f2443ad98b56" alt="image-20201103181528868"
实战案例:饱和策略之终止策略
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
| package com.java.example.threadpool;
import org.testng.annotations.AfterMethod; import org.testng.annotations.Test;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;
public class PolicyTest {
private static ThreadPoolExecutor executor = new ThreadPoolExecutor( 2, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(5));
class Task implements Runnable {
private String taskName;
public Task(String taskName) { this.taskName = taskName; }
@Override public void run() { System.out.println("线程[" + Thread.currentThread().getName() + "]正在执行[" + this.taskName + "]任务!"); try { Thread.sleep(1000L * 5); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程[" + Thread.currentThread().getName() + "]已执行完成[" + this.taskName + "]任务!"); } }
@Test public void abortPolicyTest() { executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); for (int i = 1; i <= 10; i++) { try { executor.execute(new Task("线程任务" + i)); } catch (Exception e) { System.err.println(e); } } executor.shutdown(); }
@AfterMethod public void after() throws InterruptedException { Thread.sleep(1000L * 10); } }
|
执行结果:
data:image/s3,"s3://crabby-images/22965/229652d0d978d9db1f24246aa9e65e4a3823f768" alt="image-20201105133845821"
实战案例:其他三种饱和策略
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
| package com.java.example.threadpool;
import org.testng.annotations.AfterMethod; import org.testng.annotations.Test;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;
public class PolicyTest {
private static ThreadPoolExecutor executor = new ThreadPoolExecutor( 2, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(5));
class Task implements Runnable {
private String taskName;
public Task(String taskName) { this.taskName = taskName; }
@Override public void run() { System.out.println("线程[" + Thread.currentThread().getName() + "]正在执行[" + this.taskName + "]任务!"); try { Thread.sleep(1000L * 5); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程[" + Thread.currentThread().getName() + "]已执行完成[" + this.taskName + "]任务!"); } }
@Test public void discardPolicyTest() { executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); for (int i = 1; i <= 10; i++) { try { executor.execute(new Task("线程任务" + i)); } catch (Exception e) { System.err.println(e); } } executor.shutdown(); }
@Test public void discardOldestPolicyTest() { executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); for (int i = 1; i <= 10; i++) { try { executor.execute(new Task("线程任务" + i)); } catch (Exception e) { System.err.println(e); } } executor.shutdown(); }
@Test public void callerRunsPolicyTest() { executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); for (int i = 1; i <= 10; i++) { try { executor.execute(new Task("线程任务" + i)); } catch (Exception e) { System.err.println(e); } } executor.shutdown(); }
@AfterMethod
public void after() throws InterruptedException { Thread.sleep(1000L * 10); } }
|
常用线程池
newCachedThreadPool
1 2 3 4 5 6 7 8 9 10
|
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
|
newFixedThreadPool
1 2 3 4 5 6 7 8 9 10 11
|
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
|
newSingleThreadExecutor
1 2 3 4 5 6 7 8 9 10 11
|
public static ExecutorService newSingleThreadExecutor() { return new Executors.FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
|
实战案例:向线程池提交任务
向线程池提交任务的两种方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| package com.java.example.threadpool;
import org.testng.annotations.Test;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future;
public class RunTest {
@Test public void submitTest() { ExecutorService executorService = Executors.newCachedThreadPool();
Future<Integer> future = executorService.submit(() -> { Thread.sleep(1000L * 10); return 2 * 5; });
Integer num = 0; try { num = future.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }
System.out.println("执行结果:" + num); }
@Test public void executeTest() { ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(() -> { try { Thread.sleep(1000L * 10); } catch (InterruptedException e) { e.printStackTrace(); }
Integer num = 2 * 5; System.out.println("执行结果:" + num); });
try { Thread.sleep(1000L * 10); } catch (InterruptedException e) { e.printStackTrace(); } } }
|
线程池的状态
五种状态的切换
data:image/s3,"s3://crabby-images/76a60/76a605f658ca86824fb64c18d8f49ea5dcd3fa6e" alt="image-20201103185627286"