专业的JAVA编程教程与资源

网站首页 > java教程 正文

JAVA 线程池 通过jdk1.8源码深入理解

temp10 2024-09-19 03:57:09 java教程 14 ℃ 0 评论

引言

线程池顾名思义就是存放线程的池子(容器),需要的时候从池中获取线程不用自行创建,使用完毕不需要销毁线程而是放回池中,从而减少创建和销毁线程对象的开销。

在阿里巴巴JAVA编程规范中并发处理的章节中也强制规定线程资源必须通过线程池提供,不允许在应用中自行显示创建线程。因为自行通过继承Thread或者实现Runable接口的方式来创建线程,必然会存在线程创建以及销毁的资源消耗问题、线程上下文切换问题以及过度创建线程引发资源耗尽等问题。从阿里的规范中,也可以看出线程池的重要性。

JAVA 线程池 通过jdk1.8源码深入理解


线程池的参数

线程池的构造函数有7个参数,分别是

  • corePoolSize
  • maximumPoolSize
  • keepAliveTime
  • unit
  • workQueue
  • threadFactory
  • handler

下面会对这7个参数一一解释。

corePoolSize

这个参数的意思是核心线程的数量,线程池将长期保证这些线程处于存活状态,即使线程处于闲置状态,也不会被销毁。除非配置了allowCoreThreadTimeOut将不会保证长期存活于线程池内,在闲置超过keepAliveTime后被销毁。


maximumPoolSize

最大线程数,在核心线程数的基础上可能会额外增加一些非核心线程,需要注意的是只有当workQueue队列填满时才会创建多于corePoolSize的线程(线程池总线程数不超过maxPoolSize)


keepAliveTime

保证存活时间,若线程数超过了corePoolSize,线程闲置时间超过了保证存活时间,该线程将被销毁。

unit

keepAliveTime的时间单位


workQueue

用于保存任务的队列,可以为无界、有界、同步移交三种队列类型之一,当池子里的工作线程数大于corePoolSize时,这时新进来的任务会被放到队列中


threadFactory

创建线程的工厂。默认使用Executors.defaultThreadFactory(),也可以使用guava库的ThreadFactoryBuilder来创建

handler

线程池无法继续接收任务(队列已满且线程数达到maximunPoolSize)时的饱和策略,取值有AbortPolicy、CallerRunsPolicy、DiscardOldestPolicy、DiscardPolicy。

常用的拒绝策略包括 :

  • ThreadPoolExecutor.AbortPolicy: 抛出 RejectedExecutionException 来拒绝新任务的处理,是 Spring 中使用的默认拒绝策略。
  • ThreadPoolExecutor.CallerRunsPolicy: 线程调用运行该任务的 execute 本身,也就是直接在调用 execute 方法的线程中运行 (run) 被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度,但可能造成延迟。若应用程序可以承受此延迟且不能丢弃任何一个任务请求,可以选择这个策略。
  • ThreadPoolExecutor.DiscardPolicy: 不处理新任务,直接丢弃掉。
  • ThreadPoolExecutor.DiscardOldestPolicy: 此策略将丢弃最早的未处理的任务请求。


线程池的工作顺序和工作逻辑基本可以参考如下的流程:



线程池的使用方法

线程池的创建使用可通过java并发包中的Executors类完成,它提供了创建线程池的常用方法。

  • newFixedThreadPool
  • newSingleThreadExecutor
  • newCachedThreadPool



FixThreadPool 固定线程池

FixThreadPool :可重用固定线程数的线程池。

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(
            nThreads, nThreads,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(),
            threadFactory);
    }

执行机制 :

  • 若当前运行的线程数小于 corePoolSize,来新任务时,就创建新的线程来执行任务;
  • 当前运行的线程数等于 corePoolSize 后,如果再来新任务的话,会将任务加到 LinkedBlockingQueue;
  • 线程池中的线程执行完手头的工作后,会在循环中反复从 LinkedBlockingQueue 中获取任务来执行。

FixThreadPool 使用的是无界队列 LinkedBlockingQueue(队列容量为 Integer.MAX_VALUE),而它会给线程池带来如下影响 :

  • 当线程池中的线程数达到 corePoolSize 后,新任务将在无界队列中等待,因此线程池中的线程数不会超过 corePoolSize;
  • 由于使用的是一个无界队列,所以 maximumPoolSize 将是一个无效参数,因为不可能存在任务队列满的情况,所以 FixedThreadPool 的 corePoolSize、maximumPoolSize 被设置为同一个值,且 keepAliveTime 将是一个无效参数;
  • 运行中的 FixedThreadPool(指未执行 shutdown() 或 shutdownNow() 的)不会拒绝任务,因此在任务较多的时候可能会导致 OOM。


SingleThreadExecutor 单一线程池

SingleThreadExecutor 是只有一个线程的线程池。

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(
                    1, 1,
                    0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>(),
                    threadFactory));
}

除了池中只有一个线程外,其他和 FixThreadPool 是基本一致的。

CachedThreadPool 缓存线程池

CachedThreadPool 是一个会根据需要创建新线程的线程池,但会在先前构建的线程可用时重用它。

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(
            0, Integer.MAX_VALUE,
            60L, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>(),
            threadFactory);
}

其 corePoolSize 被设置为 0,maximumPoolSize 被设置为 Integer.MAX.VALUE,也就是无界的。虽然是无界,但由于该线程池还存在一个销毁机制,即如果一个线程 60 秒内未被使用过,则该线程就会被销毁,这样就节省了很多资源。

但是,如果主线程提交任务的速度高于 maximunPool 中线程处理任务的速度,CachedThreadPool 将会源源不断地创建新的线程,从而依然可能导致 CPU 耗尽或内存溢出。

执行机制 :

  • 首先执行 offer 操作,提交任务到任务队列。若当前 maximumPool 中有空闲线程正在执行 poll 操作,且主线程的 offer 与空闲线程的 poll 配对成功时,主线程将把任务交给空闲线程执行,此时视作 execute() 方法执行完成;否则,将执行下面的步骤。
  • 当初始 maximum 为空,或 maximumPool 中没有空闲线程时,将没有线程执行 poll 操作。此时,CachedThreadPool 会创建新线程执行任务,execute() 方法执行完成。


为什么推荐使用 ThreadPoolExecutor 来创建线程?

规约一 :线程资源必须通过线程池提供,不允许在应用中自行显示创建线程。

使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源开销,解决资源不足的问题。如果不使用线程池,有可能会造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。

规约二 :强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 构造函数的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

Executors 返回线程池对象的弊端如下:
FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE,可能会堆积大量请求,从而导致 OOM。
CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE,可能会创建大量线程,从而导致 OOM。


如何拟定线程池的大小?

上下文切换

多线程变编程中一般线程的个数都大于 CPU 核心的个数,而一个 CPU 核心在任意时刻只能被一个线程使用。为了让这些线程都能得到有效执行,CPU 采取的策略是为每个线程分配时间片并轮转的形式。当一个线程的时间片用完的时候就会重新处于就绪状态让给其他线程使用,这个过程就属于一次上下文切换。

概括来说就是,当前任务在执行完 CPU 时间片切换到另一个任务之前,会先保存自己的状态,以便下次再切换回这个任务时,可以直接加载到上次的状态。任务从保存到再加载的过程就是一次上下文切换。

上下文切换通常是计算密集型的。也就是说,它需要相当可观的处理器时间,在每秒几十上百次的切换中,每次切换都需要纳秒量级的时间。所以,上下文切换对系统来说意味着消耗大量的 CPU 时间,事实上,可能是操作系统中时间消耗最大的操作。

Linux 相比与其他操作系统(包括其他类 Unix 系统)有许多,其中有一项就是,其上下文切换和模式切换的时间消耗非常少。

简单的拟定判断

CPU 密集型任务(N+1):

这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。

I/O 密集型任务(2N):

这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。


线程池源码分析

常用变量

   private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;//32-3=29
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;//536870911

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;// -536870912
    private static final int SHUTDOWN   =  0 << COUNT_BITS;//0
    private static final int STOP       =  1 << COUNT_BITS;//536870912
    private static final int TIDYING    =  2 << COUNT_BITS;//1073741824
    private static final int TERMINATED =  3 << COUNT_BITS;//1610612736

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

原子变量ctl可以表示线程池的运行状态和线程数量,其中高3位表示线程池的运行状态,低29位表示线程池中线程的数量。

1、RUNNING:-1 << COUNT_BITS, -1在Java底层是由32个1表示的,左移29位的话,即111 00000 00000000 00000000 00000000,即高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;
2、SHUTDOWN: 0 << COUNT_BITS,在Java底层是由32个0表示的,无论左移多少位,还是32个0,即000 00000 00000000 00000000 00000000,即高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
3、STOP : 1 << COUNT_BITS, 1在Java底层是由前面的31个0和1个1组成的,左移29位的话,即001 00000 00000000 00000000 00000000,即高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
4、TIDYING : 2 << COUNT_BITS, 2在Java底层是由前面的30个0和1个10组成的,左移29位的话,即010 00000 00000000 00000000 00000000,即高3位为010, 所有的任务都已经终止;
5、TERMINATED: 3 << COUNT_BITS,2在Java底层是由前面的30个0和1个11组成的,左移29位的话,即011 00000 00000000 00000000 00000000,即高3位为011, terminated()方法已经执行完成


execute()方法

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {  
    //workerCountOf获取线程池的当前线程数;小于corePoolSize,执行addWorker创建新线程执行command任务
       if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // double check: c, recheck
    // 线程池处于RUNNING状态,把提交的任务成功放入阻塞队列中
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // recheck and if necessary 回滚到入队操作前,即倘若线程池shutdown状态,就remove(command)
        //如果线程池没有RUNNING,成功从阻塞队列中删除任务,执行reject方法处理任务
        if (! isRunning(recheck) && remove(command))
            reject(command);
       // 因为corePoolSize可能等于0,因此进入到此分支时,workerCountOf(c) 可能大于等于0,
      //如果等于0,那么则需要添加非核心线程。
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 往线程池中创建新的线程失败,则reject任务
    else if (!addWorker(command, false))
        reject(command);
}

为什么需要double check线程池的状态?
在多线程环境下,线程池的状态时刻在变化,而ctl.get()是非原子操作,很有可能刚获取了线程池状态后线程池状态就改变了。判断是否将command加入workqueue是线程池之前的状态。倘若没有double check,万一线程池处于非running状态(在多线程环境下很有可能发生),那么command永远不会执行。

除此以外,下面这句话也让人非常费解

else if (workerCountOf(recheck) == 0)
addWorker(null, false);

大多数网上的解释就是草草的说如果线程数等于0,那么就添加一个线程。

可是线程数为什么会等于0呢?

第一个if语句workerCountOf(c) < corePoolSize,然后去添加线程,如果失败才会进入第二个if,失败说明线程池达到了corepoolsize,说明线程还是有一定数量的。怎么会到了这里线程数一下子变成0了呢?

这里就是一个特殊情况,当设置corepoolsize=0时,workerCountOf(c) < corePoolSize不成立,这样子进入第二个if的时候,就可能会出现任务添加到workqueue中,但是线程数还是等于0。回想一下CachedThreadPool就是这样的。因此下面这句话addWorker(null, false);也是去添加一个非核心线程。


addWorker方法

从方法execute的实现可以看出:addWorker主要负责创建新的线程并执行任务
线程池创建新线程执行任务时,需要 获取全局锁:

private final ReentrantLock mainLock = new ReentrantLock();

接下来是addWorker方法

private boolean addWorker(Runnable firstTask, boolean core) {
       // CAS更新线程池数量
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

   /*
       *由上面的一些线程池状态常量值可知,running<shutdown<stop<tidying<terminated
        *若rs>=shutdown,则表明线程池处于stop、tidying、terminated三种状态的一种
        *若rs>=shutdown成立,则进行后面判断,
         *1、线程池处于shutdown状态
         *  1.1、firstTask不为null,则返回false,也即是线程池已经处于shutdown状态,还要添加新的线程,被直接驳回(拒绝)
         *  1.2、firstTask为null
         *     1.2.1、此时意味着线程池状态为shutdown状态,且first为null,若阻塞队列为空,则返回false
         *2、线程处于大于shutdown的状态,则直接返回false
    */
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            /*
             *进入内循环以下两种情况会跳出该内循环,否则一直会循环
             *1、当工作线程数量超过一定阈值,会直接返回false
             *2、添加工作线程成功,即ctl的值进行了加一
            */
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
//若进行到了此步操作,则表明工作线程数量加了1
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;//该w.thread为worker内部新创建的thread
            if (t != null) {
                // 线程池重入锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
//获取锁后,再次获取线程池的状态
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();  // 线程启动,执行任务(Worker.thread(firstTask).start());
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

addWorker方法实际上主要做了两件事情,

  • 循环使用CAS对线程数+1
  • 新建一个线程并启用

其中新建线程是通过新建worker对象实现的,最终会把新建的Worker对象添加到workers这个Set容器中

private final HashSet<Worker> workers = new HashSet<Worker>();

Worker类的定义如下

 private final class Worker
         extends AbstractQueuedSynchronizer
         implements Runnable{
     Worker(Runnable firstTask) {
         setState(-1); // inhibit interrupts until runWorker
         this.firstTask = firstTask;
         this.thread = getThreadFactory().newThread(this); // 创建线程
     }
     /** Delegates main run loop to outer runWorker  */
     public void run() {
         runWorker(this);
     }
     // ...


可以看到worker类自身实现了Runable接口中,其含有一个thread属性,这个属性会将自身传入进去。并且Worker类重写了run方法,所有只要调用thread.start就可以调用到run方法。


在上面addWorker中,这里的t.start启动线程,并最终调用Worker类中重写的run方法。

if (workerAdded) {
        t.start();  // 线程启动,执行任务(Worker.thread(firstTask).start());
         workerStarted = true;
    }


Worker Set如下图所示:



Worker类的runworker方法

从Woker类中的run方法中看出,其最终调用了runworker方法

runWorker方法是线程池的核心:
1. 线程启动之后,通过unlock方法释放锁,设置AQS的state为0,表示运行可中断;
2. Worker执行firstTask或从workQueue中获取任务:
2.1. 进行加锁操作,保证thread不被其他线程中断(除非线程池被中断)
2.2. 检查线程池状态,倘若线程池处于中断状态,当前线程将中断。
2.3. 执行beforeExecute
2.4 执行任务的run方法
2.5 执行afterExecute方法
2.6 解锁操作

通过getTask方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask方法会被阻塞并挂起,不会占用cpu资源;

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // 允许中断
        boolean completedAbruptly = true;// 是否因为异常退出循环
        try {
        // 如果task为空,则通过getTask来获取任务
        // getTask()方法循环获取工作队列的任务
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                  //任务执行前的hook函数
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                      //任务执行后的hook函数
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }


getTask方法

getTask方法从阻塞队列中获取等待的任务

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

注意这里一段代码是keepAliveTime起作用的关键:

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();1234

allowCoreThreadTimeOut为false,线程即使空闲也不会被销毁;倘若为ture,在keepAliveTime内仍空闲则会被销毁。
如果线程允许空闲等待而不被销毁timed == false,workQueue.take任务:如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take方法返回任务,并执行;
如果线程不允许无休止空闲timed == true, workQueue.poll任务:如果在keepAliveTime时间内,阻塞队列还是没有任务,则返回null;


参考文章:

https://blog.csdn.net/programmer_at/article/details/79799267

https://blog.ityuan.com/3432

https://www.cnblogs.com/warehouse/p/10720781.html

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表