专业的JAVA编程教程与资源

网站首页 > java教程 正文

Java线程池核心(十三):线程池是如何添加线程的?

temp10 2024-09-16 05:28:49 java教程 13 ℃ 0 评论
  • 难度:中级
  • 开发语言:Java
  • 学习时间:50分钟

1.添加线程

向线程池中添加线程是通过 boolean addWorker(Runnable firstTask, boolean core) 方法:

/**
 * 添加新线程。
 *
 * @param firstTask 线程运行的首个任务。
 *
 * @param core 如果为true,则使用corePoolSize作为约束,否则使用maximumPoolSize作为约束。 
 * @return 创建成功返回true。
 */
private boolean addWorker(Runnable firstTask, boolean core) {
  	// 外循环
    retry:
  	// ctl.get():获取当前线程池状态
    for (int c = ctl.get();;) {
        // runStateAtLeast(c, SHUTDOWN):当前线程池状态 >= SHUTDOWN
      	// (runStateAtLeast(c, STOP):当前线程池状态 >= STOP
      	// firstTask != null:任务不能为null
      	// workQueue.isEmpty():任务队列不能为空
      	// 综上所述,满足以下任意条件则添加失败:
      	// 1.当前线程池状态 >= SHUTDOWN 且 当前线程池状态 >= STOP
      	// 2.当前线程池状态 >= SHUTDOWN 且 任务不为null
      	// 3.当前线程池状态 >= SHUTDOWN 且 任务队列为空
      	// 当前线程池状态 >= SHUTDOWN 是线程池处于关闭状态。
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP)
                || firstTask != null
                || workQueue.isEmpty()))
          	// 添加失败
            return false;

      	// 无限循环
        for (;;) {
          	// workerCountOf(c):获取线程数。
          	// (core ? corePoolSize : maximumPoolSize) & COUNT_MASK):
          	// 当 core 为true时,取corePoolSize值;
          	// 当 core 为false时,取maximumPoolSize值。
          	// core:添加的新线程是否为核心线程。
          	// 综上所述,添加的新线程为核心线程时,如果当前线程数 >= corePoolSize,即核心线程已满,那么添加失败;
          	// 添加的新线程为非核心线程时,如果当前当前线程数 >= maximumPoolSize,即线程池中的线程已满,那么添加失败。
            if (workerCountOf(c)
                >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
              	// 添加失败
                return false;
          	// 递增 ctl。
          	// ctl:用于记录线程池状态和线程数的属性。
          	// 递增成功表示添加一个线程进线程池。
            if (compareAndIncrementWorkerCount(c))
              	// 结束外循环retry
                break retry;
          	// 到这一步,说明递增失败,也就是说暂时还不能添加线程,得重新检查线程池状态。
          	// 重新获取线程池状态
            c = ctl.get();
          	// 当当前线程池状态 >= SHUTDOWN,即线程池处于关闭状态
            if (runStateAtLeast(c, SHUTDOWN))
              	// 跳过外循环
                continue retry;
        }
    }

  	// 如果上面代码执行没有问题,那么说明可以添加新线程
  
  	// 记录线程是否已经开始工作,初始值是没有。
    boolean workerStarted = false;
  	// 记录线程是否已经成功添加,初始化是没有。
    boolean workerAdded = false;
  	// 定义即将要添加的 Worker,Worker实际上就是要添加的线程,只不过线程在Worker内部。
    Worker w = null;
    try {
      	// 初始化一个Worker,并指定首个任务
        w = new Worker(firstTask);
      	// 获取Worker中的线程
        final Thread t = w.thread;
      	// 当线程不为null时
        if (t != null) {
          	// 获取线程池同步锁
            final ReentrantLock mainLock = this.mainLock;
          	// 加锁
            mainLock.lock();
            try {
                // 重新获取线程池状态
                int c = ctl.get();

              	// isRunning(c):线程池是否处于运行状态。
              	//(runStateLessThan(c, STOP):
                if (isRunning(c) ||
                    (runStateLessThan(c, STOP) && firstTask == null)) {
                    // 当线程已经启动时,即线程已经调用过start方法。
										// 新添加线程一定时没有启动的,所以在这里判断线程有没有启动过很有必要。
										// 如果线程已经启动,那么就抛非法线程状态异常。
										if (t.isAlive())
  											// 抛出IllegalThreadStateException异常
  											// IllegalThreadStateException:非法线程状态异常
    										throw new IllegalThreadStateException();
                  	// 将新线程添加至线程集合中
                    workers.add(w);
                  	// 获取当前线程数
                    int s = workers.size();
                  	// 当当前线程数 > 自创建线程池以来最大的线程数时
                    if (s > largestPoolSize)
                      	// 将当前线程数记为自创建线程以来最大的线程数
                        largestPoolSize = s;
                  	// 新线程已添加
                    workerAdded = true;
                }
            } finally {
              	// 解锁
                mainLock.unlock();
            }
         
            // 当新Worker添加成功时
						if (workerAdded) {
						  	// 启动Worker中线程
						  	// 到这一步,才开始启动线程
						    t.start();
						  	// 此时Worker中的线程已启动
						    workerStarted = true;
						}
        }
    } finally {
        // 当Worker中的线程没有启动时
if (! workerStarted)
  	// 说明添加了一个失败的Worker
    addWorkerFailed(w);
    }
    return workerStarted;
}

整个添加过程分为两部分:for 循环for 循环以下

Java线程池核心(十三):线程池是如何添加线程的?

for 循环在说:能不能添加新线程。

for 循环以下在说:添加新线程成没成功。

2.能不能添加新线程

在能不能添加新线程部分,最主要的有两块。

第一块:添加的新线程是否为核心线程?

如果是,那么就判断当前线程数是否比核心线程数大;

如果不是,那么就判断当前线程数是否比允许的最大线程数大;

即,如下代码:

// workerCountOf(c):获取线程数。
// (core ? corePoolSize : maximumPoolSize) & COUNT_MASK):
// 当 core 为true时,取corePoolSize值;
// 当 core 为false时,取maximumPoolSize值。
// core:添加的新线程是否为核心线程。
// 综上所述,添加的新线程为核心线程时,如果当前线程数 >= corePoolSize,即核心线程已满,那么添加失败;
// 添加的新线程为非核心线程时,如果当前当前线程数 >= maximumPoolSize,即线程池中的线程已满,那么添加失败。
if (workerCountOf(c)
    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
  	// 添加失败
    return false;

第二块:递增 ctl 是否成功?

这一步决定了是否能添加新线程,如果递增成功,那么就可以添加;如果递增失败,那么就不可以添加。

即,如下代码:

// 递增 ctl。
// ctl:用于记录线程池状态和线程数的属性。
// 递增成功表示添加一个线程进线程池。
if (compareAndIncrementWorkerCount(c))
  	// 结束外循环retry
    break retry;

3.添加新线程成没成功

在添加新线程成没成功这一部分,最主要的有三块。

第一块:Worker中的线程是否已经启动过?

如果启动过,那么说明该线程的状态异常。

如果没启动过,那么说明该线程的状态正常。

即,如下代码:

// 当线程已经启动时,即线程已经调用过start方法。
// 新添加线程一定时没有启动的,所以在这里判断线程有没有启动过很有必要。
// 如果线程已经启动,那么就抛非法线程状态异常。
if (t.isAlive())
  	// 抛出IllegalThreadStateException异常
  	// IllegalThreadStateException:非法线程状态异常
    throw new IllegalThreadStateException();

我们新添加的线程一定是没有启动过的,崭新的。

第二块:新线程添加成功后启动。

新线程添加成功之后,它就会随即启动;

即,如下代码:

// 当新Worker添加成功时
if (workerAdded) {
  	// 启动Worker中线程
  	// 到这一步,才开始启动线程
    t.start();
  	// 此时Worker中的线程已启动
    workerStarted = true;
}

第三块:添加了一个失败的线程。

当新添加的线程没有启动时,说明添加新线程失败。

要调用 addWorkerFailed 方法去处理这个失败的线程。

即,如下代码:

// 当Worker中的线程没有启动时
if (! workerStarted)
  	// 说明添加了一个失败的Worker
    addWorkerFailed(w);

4.处理失败的线程

处理失败的线程需要用到 void addWorkerFailed(Worker w) 方法。

void addWorkerFailed(Worker w) 方法源码:

/**
 * 处理添加失败的线程。
 * - 从线程集合中删除该线程。
 * - 当前线程数-1。
 * - 调用一次关闭线程池方法,看看线程池没有关闭成功的原因是不是因为这个失败的线程所导致的。
 *
 * @param w 失败的线程。
 *
 */
private void addWorkerFailed(Worker w) {
  	// 获取线程池同步锁
    final ReentrantLock mainLock = this.mainLock;
  	// 加锁
    mainLock.lock();
    try {
      	// 当线程不为null时
        if (w != null)
          	// 从线程集合中移除该线程
            workers.remove(w);
      	// 递减线程数
        decrementWorkerCount();
      	// 调用一次关闭线程池方法,看看线程池没有关闭成功的原因是不是因为这个失败的线程所导致的。
        tryTerminate();
    } finally {
      	// 解锁
        mainLock.unlock();
    }
}

处理添加失败的线程有 3 步:

  1. 从线程集合中删除该线程。
  2. 当前线程数-1。
  3. 调用一次关闭线程池方法,看看线程池没有关闭成功的原因是不是因为这个失败的线程所导致的。

其中,递减线程数用到 void decrementWorkerCount() 方法:

/**
 * 线程数-1。
 */
private void decrementWorkerCount() {
    ctl.addAndGet(-1);
}

与此同时,还有另外两个方法也是和递增、递减线程数有关的:

/**
 * 递增线程数。
 */
private boolean compareAndIncrementWorkerCount(int expect) {
  	// 线程数+1
    return ctl.compareAndSet(expect, expect + 1);
}

/**
 * 递减线程数。
 */
private boolean compareAndDecrementWorkerCount(int expect) {
  	// 线程数-1
    return ctl.compareAndSet(expect, expect - 1);
}

总结

整个添加过程分为两部分:for 循环for 循环以下

for 循环在说:能不能添加新线程。

for 循环以下在说:添加新线程成没成功。


在能不能添加新线程部分,最主要的有两块。

第一块:添加的新线程是否为核心线程?

如果是,那么就判断当前线程数是否比核心线程数大;

如果不是,那么就判断当前线程数是否比允许的最大线程数大;

第二块:递增 ctl 是否成功?

这一步决定了是否能添加新线程,如果递增成功,那么就可以添加;如果递增失败,那么就不可以添加。


在添加新线程成没成功这一部分,最主要的有三块。

第一块:Worker中的线程是否已经启动过?

如果启动过,那么说明该线程的状态异常。

如果没启动过,那么说明该线程的状态正常。

我们新添加的线程一定是没有启动过的,崭新的。

第二块:新线程添加成功后启动。

新线程添加成功之后,它就会随即启动;

第三块:添加了一个失败的线程。

当新添加的线程没有启动时,说明添加新线程失败。

要调用 addWorkerFailed 方法去处理这个失败的线程。


处理失败的线程需要用到 void addWorkerFailed(Worker w) 方法。

处理添加失败的线程有 3 步:

  1. 从线程集合中删除该线程。
  2. 当前线程数-1。
  3. 调用一次关闭线程池方法,看看线程池没有关闭成功的原因是不是因为这个失败的线程所导致的。

答疑

如果大家有任何疑问,请在下方留言或评论。

上一章

Java线程池核心(十二):提交任务后做了什么?

下一章

Java线程池核心(十四):线程池是如何拒绝任务的?

学习小组

加入同步学习小组,共同交流与进步。

欢迎加入“人人都是程序员”编程圈子,与圈友一起交流讨论。

(此处已添加圈子卡片,请到今日头条客户端查看)

版权声明

原创不易,未经允许不得转载!

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表