网站首页 > java教程 正文
线程池是并发编程中必不可少的一种工具,也是面试高频话题。
线程池,即管理着若干线程的资源池(字面意思)。相比于为每个任务分配一个线程,在线程池中执行任务优势更多:
1.线程复用:线程池中的线程是可以复用的,省去了创建、销毁线程的开销,提高了资源利用率(创建、销毁等操作都是要消耗系统资源的)和响应速度(任务提交过来线程已存在就不用等待线程创建了);

2.合理利用资源:通过调整线程池大小,让所有处理器尽量保持忙碌,又能防止过多线程产生过多竞争浪费资源;
常用的线程池主要是ThreadPoolExecutor 和 ScheduledThreadPoolExecutor(定时任务线程池,继承ThreadPoolExecutor)。
Executor框架
在JAVA中,任务执行的主要抽象不是Thread,而是Executor。Executor基于生产者-消费者模式,提交任务的操作相当于生产者,执行任务的线程相当于消费者。
所谓Executor框架,其实就是定义了一个接口,我们常用的线程池ThreadPoolExecutor 就是对这个接口的一种实现。
1public interface Executor {
2
3 /**
4 * Executes the given command at some time in the future. The command
5 * may execute in a new thread, in a pooled thread, or in the calling
6 * thread, at the discretion of the {@code Executor} implementation.
7 *
8 * @param command 可执行的任务
9 * @throws RejectedExecutionException 任务可能被拒绝(当Executor处理不了的时候)
10 * @throws NullPointerException if command is null
11 */
12 void execute(Runnable command);
13}
Executors与常用线程池
Executors 其实就是Executor(加s)
Executors是一个Executor的工厂,有很多定义好的工厂方法,可以帮助懒惰的 开发者快速创建一个线程池。下面是几个常用的工厂方法:
- newFixedThreadPool 固定长度线程池,每次提交任务都会创建一个新线程,直到线程数量达到指定阈值则不再创建新的;
- newCachedThreadPool 可缓存线程池,每次提交任务都会创建一个新线程(理论上无限制),部分任务执行完后如果没有新的任务,导致某些线程无用武之地,它们将被终结;
- newSingleThreadExecutor 只有一个线程的线程池;
- newScheduledThreadPool 可以延时或者定时执行任务的线程池。
1public class Executors {
2
3 public static ExecutorService newFixedThreadPool(int nThreads) {
4 return new ThreadPoolExecutor(nThreads, nThreads,
5 0L, TimeUnit.MILLISECONDS,
6 new LinkedBlockingQueue<Runnable>());
7 }
8
9 public static ExecutorService newCachedThreadPool() {
10 return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
11 60L, TimeUnit.SECONDS,
12 new SynchronousQueue<Runnable>());
13 }
14
15 public static ExecutorService newSingleThreadExecutor() {
16 return new FinalizableDelegatedExecutorService
17 (new ThreadPoolExecutor(1, 1,
18 0L, TimeUnit.MILLISECONDS,
19 new LinkedBlockingQueue<Runnable>()));
20 }
21
22 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
23 return new ScheduledThreadPoolExecutor(corePoolSize);
24 }
25}
如果查看上述工厂方法的源码,会发现只是 new 了一个线程池对象返回给调用 者而已,没什么花里胡哨的东西。不过看看构造参数还真不少,通过这种方式 比起我们自己 new 一个线程池要简单多了(才怪)。
线程池构造参数
了解线程池构造参数的意义,能让我们更清楚程序执行逻辑。
- int corePoolSize : 核心线程数,有新任务来时,如果当前线程小于核心线程,则新建一个线程来执行该任务
- int maximumPoolSize : 最大线程数,线程池最多拥有的线程数
- long keepAliveTime : 空闲线程存活时间
- TimeUnit unit : 空闲线程存活时间的单位
- BlockingQueue
- workQueue : 存放待执行任务的阻塞队列,新任务来时,若当前线程数>=最大核心线程数,则放到这个队列(具体逻辑更复杂,请看下面源码分析)
- ThreadFactory threadFactory : 创建新线程的工厂,一般用来给线程取个名字方便排查问题
- RejectedExecutionHandler handler : 任务被拒绝后的处理器,默认的处理器会直接抛出异常,建议重新实现
- 配合源码,效果更佳:
1public class ThreadPoolExecutor extends AbstractExecutorService {
2
3 // 构造函数
4 public ThreadPoolExecutor(int corePoolSize, // 核心线程数
5 int maximumPoolSize, // 最大线程数
6 long keepAliveTime, // 空闲线程存活时间
7 TimeUnit unit, // 空闲线程存活时间的单位
8 BlockingQueue<Runnable> workQueue, // 存放待执行任务的阻塞队列
9 ThreadFactory threadFactory, // 创建新线程的工厂
10 RejectedExecutionHandler handler // 任务被拒绝后的处理器
11 ) {
12 // ...
13 }
14
15 // 提交任务
16 public void execute(Runnable command) {
17 if (command == null)
18 throw new NullPointerException();
19 /*
20 * 没翻,懒得翻
21 * Proceed in 3 steps:
22 *
23 * 1. If fewer than corePoolSize threads are running, try to
24 * start a new thread with the given command as its first
25 * task. The call to addWorker atomically checks runState and
26 * workerCount, and so prevents false alarms that would add
27 * threads when it shouldn't, by returning false.
28 *
29 * 2. If a task can be successfully queued, then we still need
30 * to double-check whether we should have added a thread
31 * (because existing ones died since last checking) or that
32 * the pool shut down since entry into this method. So we
33 * recheck state and if necessary roll back the enqueuing if
34 * stopped, or start a new thread if there are none.
35 *
36 * 3. If we cannot queue task, then we try to add a new
37 * thread. If it fails, we know we are shut down or saturated
38 * and so reject the task.
39 */
40
41 // 当前状态值
42 int c = ctl.get();
43 // 当前线程数 = workerCountOf(c) 小于 核心线程数 的上限时
44 // 直接创建一个线程来执行任务
45 if (workerCountOf(c) < corePoolSize) {
46 // 并发提交场景下可能会失败
47 if (addWorker(command, true))
48 return; // 新增成功就可以结束了
49 // 失败就更新下线程池状态
50 c = ctl.get();
51 }
52 // 不能创建核心线程来执行,并不会直接创建非核心线程,而是把任务暂存到阻塞队列
53 // isRunning(c)判断线程池是否还在运行
54 // workQueue.offer(command)返回值表示是否成功提交到队列
55 if (isRunning(c) && workQueue.offer(command)) {
56 // 成功放到队列里了,再检查一下线程池状态
57 int recheck = ctl.get();
58 // 如果线程池已经没有运行了,则尝试把新增的任务从队列移除
59 // remove(command)返回值表示是否移除成功
60 if (! isRunning(recheck) && remove(command))
61 reject(command); // 移除成功后,执行拒绝策略
62 // 检查下当前线程数是否为0,如果是的话新建一个线程
63 else if (workerCountOf(recheck) == 0)
64 addWorker(null, false);
65 }
66 // 线程池没有运行,或者放入队列失败(比如队列已满)
67 // 则创建非核心线程去执行任务,这也失败就只能拒绝了
68 else if (!addWorker(command, false))
69 reject(command);
70 }
、
当对线程池的构造参数和任务处理逻辑有了以上大致的了解后,回想Executors 提供的几个工厂方法,或许会感到所谓提供便利性的方法并不那么便利。因为从方法的名字上来看很难和线程池的配置准确关联,想要清除地知道这些方法创建的线程池如何运作,就需要知道他们用了怎样的构造参数,那为什么不直接使用构造方法呢?
所以尽量使用构造方法是更好的编程习惯,这样不管是作者还是其他开发者,只要看看传了什么参数,就知道这个线程池是怎么运作的了。
线程池创建示例
1import java.util.concurrent.*;
2import java.util.concurrent.atomic.AtomicInteger;
3
4public class Main {
5
6 public static void main(String[] args) throws Exception {
7 AtomicInteger threadCount = new AtomicInteger();
8 ThreadPoolExecutor executor = new ThreadPoolExecutor(
9 5, // 核心线程数
10 10, // 最大线程数
11 1, // 空闲线程存活时间
12 TimeUnit.MINUTES, // 空闲线程存活时间单位
13 new ArrayBlockingQueue<>(100), // 一个指定上限的阻塞队列,存放待执行任务
14 new ThreadFactory() {
15 // 自定义一个线程工厂来给线程池里的线程取名字
16 @Override
17 public Thread newThread(Runnable r) {
18 return new Thread(r, "pool-thread-"
19 + threadCount.incrementAndGet());
20 }
21 },
22 new RejectedExecutionHandler() {
23 // 自定义一个拒绝处理策略,安慰被线程池拒之门外的小可怜
24 @Override
25 public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
26 System.out.println("线程池拒绝了任务: " + r);
27 }
28 }
29 );
30 }
31
32}
有返回值的提交方式
submit
ThreadPoolExecutor.execute() 方法是没有返回值的,也就是说把任务提交给线程池后,我们就失去了它的消息,除非你还保留着它的引用,并且在里面有维护状态。如果不想这么麻烦,可以使用ThreadPoolExecutor.submit()来提交任务,这个方法会返回一个 Future 对象,通过这个对象可以知道任务何时被执行完。
1import java.util.concurrent.*;
2import java.util.concurrent.atomic.AtomicInteger;
3
4public class Main {
5
6 public static void main(String[] args) throws Exception {
7 // 线程池定义
8 // ...
9
10 Future<?> future = executor.submit(new Runnable() {
11 @Override
12 public void run() {
13 try {
14 Thread.sleep(2000);
15 } catch (InterruptedException e) {
16 e.printStackTrace();
17 }
18 System.out.println("我要关注: 一杯82年的JAVA");
19 }
20 });
21 Object r = future.get();
22 System.out.println("返回:" + r);
23 executor.shutdown();
24 }
25
26}
27
28/* 输出:
29
30我要关注: 一杯82年的JAVA
31返回:null
32
33*/
可以看到 Future.get() 是有返回值的,但是上面的例子返回了 null,因为任务是 一个Runnable 实现,run 方法没有返回值。
submit Callable
如果想任务有返回值,可以使用 Callable 作为任务定义。
1import java.util.concurrent.*;
2import java.util.concurrent.atomic.AtomicInteger;
3
4public class Main {
5
6 public static void main(String[] args) throws Exception {
7 // 线程池定义
8 // ...
9
10 Future<String> future = executor.submit(new Callable<String>() {
11 @Override
12 public String call() throws Exception {
13 try {
14 Thread.sleep(2000);
15 } catch (InterruptedException e) {
16 e.printStackTrace();
17 }
18 System.out.println("I'm fine, and you?");
19 return "我要关注: 一杯82年的JAVA";
20 }
21 });
22 String r = future.get();
23 System.out.println("返回:" + r);
24 executor.shutdown();
25 }
26
27}
28
29/* 返回:
30
31I'm fine, and you?
32返回:我要关注: 一杯82年的JAVA
33
34*/
submit实现原理
为什么 submit 就可以让用户等待、获取任务返回?从源码讲起:
1public abstract class AbstractExecutorService implements ExecutorService {
2
3 public <T> Future<T> submit(Callable<T> task) {
4 if (task == null) throw new NullPointerException();
5 // 把任务用一个RunnableFuture又给包装了一下
6 RunnableFuture<T> ftask = newTaskFor(task);
7 // 最后还是调用了没有返回值的execute
8 execute(ftask);
9 return ftask;
10 }
11
12 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
13 return new FutureTask<T>(callable);
14 }
15}
16
17// 看看这个包装类
18public class FutureTask<V> implements RunnableFuture<V> {
19
20 private Callable<V> callable;
21 private volatile int state;
22
23 // 也是Runable的一种实现,所以能在线程池中被执行
24 public void run() {
25 // 有个表示状态的标识
26 if (state != NEW ||
27 !UNSAFE.compareAndSwapObject(this, runnerOffset,
28 null, Thread.currentThread()))
29 return;
30 try {
31 Callable<V> c = callable;
32 if (c != null && state == NEW) {
33 V result;
34 boolean ran;
35 try {
36 // 执行用户的逻辑,获得返回值
37 // 这个步骤可能需要点时间
38 result = c.call();
39 ran = true;
40 } catch (Throwable ex) {
41 result = null;
42 ran = false;
43 setException(ex);
44 }
45 if (ran)
46 set(result);
47 }
48 } finally {
49 // runner must be non-null until state is settled to
50 // prevent concurrent calls to run()
51 runner = null;
52 // state must be re-read after nulling runner to prevent
53 // leaked interrupts
54 int s = state;
55 if (s >= INTERRUPTING)
56 handlePossibleCancellationInterrupt(s);
57 }
58 }
59
60 // 获取执行结果,阻塞直到状态改变
61 public V get() throws InterruptedException, ExecutionException {
62 int s = state;
63 if (s <= COMPLETING)
64 s = awaitDone(false, 0L);
65 return report(s);
66 }
67}
小结:submit 时用一个FutureTask 把用户提交的Callable包装起来,再把FutureTask 提交给线程池执行,FutureTask.run 运行时会执行 Callable 中的业务代码,并且过程中 FutureTask 会维护一个状态标识,根据状态标识,可以知道任务是否执行完成,也可以阻塞到状态为完成获取返回值。
关闭线程池
为什么需要关闭线程池?
- 如果线程池里的线程一直存活,而且这些线程又不是守护线程,那么会导致虚拟机无法正常退出;
- 如果直接粗暴地结束应用,线程池中的任务可能没执行完,业务将处于未知状态;
- 线程中有些该释放的资源没有被释放。
怎么关闭线程池?
- shutdown 停止接收新任务(继续提交会被拒绝,执行拒绝策略),但已提交的任务会继续执行,全部完成后线程池彻底关闭;
- shutdownNow 立即停止线程池,并尝试终止正在进行的线程(通过中断),返回没执行的任务集合;
- awaitTermination 阻塞当前线程,直到全部任务执行完,或者等待超时,或者被中断。
由于 shutdownNow 的终止线程是通过中断,这个方式并不能保证线程会提前停止。(关于中断: 如何处理线程中断)
一般先调用 shutdown 让线程池停止接客,然后调用 awaitTermination 等待正在工作的线程完事。
1// 你的池子对我打了烊 2executor.shutdown(); 3 4// 等待一首歌的时间(bei~bei~~) 5// 如果超时还没结束返回false,你可以选择再等一首长点的歌,或者不等了 6boolean ok = executor.awaitTermination(4, TimeUnit.SECONDS);
扩展线程池
线程池提供了一些扩展的方法,通过重写这些方法可以添加前置、后置操作,让使用更灵活。如 beforeExecute、afterExecute、terminated …
总结
线程池很好用,但使用不当会造成严重的后果,了解它各个属性表示的含义以及执行的流程能帮助我们少踩坑。
举个例子:如果设置了核心线程 < 最大线程数不等(一般都这么设置),但是又设置了一个很大的阻塞队列,那么很可能只有几个核心线程在工作,普通线程一直没机会被创建,因为核心线程满了会优先放到队列里,而不是创建普通线程。
猜你喜欢
- 2024-09-16 java-线程池详解(java线程池入门)
- 2024-09-16 线程进阶:实战应用之Java线程池全面解析
- 2024-09-16 Java开发之高并发必备篇(七)——线程池
- 2024-09-16 Java-线程池专题(什么是线程池,如何使用,为什么要用)
- 2024-09-16 Java线程之Executors线程池的原理和使用
- 2024-09-16 Java的四种线程池的使用,以及自定义线程工厂
- 2024-09-16 Java线程池的概念及使用(java 线程池原理详解)
- 2024-09-16 Java线程池核心(十一):线程池状态
- 2024-09-16 Java 线程池(java线程池原理)
- 2024-09-16 技术进阶:Java线程池的使用和原理分析
欢迎 你 发表评论:
- 12-02win7系统怎么调节电脑屏幕亮度
- 12-02iphone14桌面下载(ios14下载桌面)
- 12-02硬盘恢复数据多少钱(硬盘恢复数据要多少钱)
- 12-02win7电脑恢复出厂(win7电脑恢复出厂设置怎么操作的)
- 12-02快手极速版下载app(11快手极速版下载安装)
- 12-02压缩文件损坏怎么修复(压缩文件损坏怎么修复后无法打开)
- 12-02戴尔服务器售后电话(dell 服务器售后电话)
- 12-02win10系统备份还原(win10系统备份还原方法)
- 最近发表
- 标签列表
-
- java反编译工具 (77)
- java反射 (57)
- java接口 (61)
- java随机数 (63)
- java7下载 (59)
- java数据结构 (61)
- java 三目运算符 (65)
- java对象转map (63)
- Java继承 (69)
- java字符串替换 (60)
- 快速排序java (59)
- java并发编程 (58)
- java api文档 (60)
- centos安装java (57)
- java调用webservice接口 (61)
- java深拷贝 (61)
- 工厂模式java (59)
- java代理模式 (59)
- java.lang (57)
- java连接mysql数据库 (67)
- java重载 (68)
- java 循环语句 (66)
- java反序列化 (58)
- java时间函数 (60)
- java是值传递还是引用传递 (62)

本文暂时没有评论,来添加一个吧(●'◡'●)