专业的JAVA编程教程与资源

网站首页 > java教程 正文

Java线程池工具类Executors详解——多种创建线程池方式

temp10 2024-11-23 22:32:38 java教程 13 ℃ 0 评论

Executors工具类为Executor,ExecutorService,ScheduledExecutorService,ThreadFactory和Callable类提供了一些工具方法,类似于集合中的Collections类的功能。

Executors工具类提供了多种创建线程池方式

Java线程池工具类Executors详解——多种创建线程池方式

一、newCachedThreadPool

newCachedThreadPool创建一个可缓存线程池,该线程池是无界限,可以进行自动线程回收,在内存允许的情况下,可一直进行创建,但容易造成堆内存溢出。如果当前线程池的长度超过了处理的需要时,它可以灵活的回收空闲的线程,当需要增加时, 它可以灵活的添加新的线程,而不会对线程池的长度作任何限制。60s内线程如果没有被使用,将从线程池里移除回收掉,即60s内能够重用已创建的线程

代码样例:

package org.andy.effective.java.study;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class NewCachedThreadPoolTest {

	public static void main(String[] args) {
		// 创建无限大小线程池、由JVM自动回收-默认60s内没有使用的线程将进行回收
		ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();

		// 创建10个子线程执行任务--通过Thread.Sleep阻塞占用线程,以便每次都创建新的线程
		for (int i = 0; i < 10; i++) {
			final int index = i;
			newCachedThreadPool.execute(new Runnable() {
				@Override
				public void run() {
					try {
						Thread.sleep(1000);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}

					System.out.println(Thread.currentThread().getName() + ",index==" + index);
				}
			});
		}
	}
}

二、newFixedThreadPool

newFixedThreadPool创建一个有限定长线程数的线程池,每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小,线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。超出的任务会在任务队列中等待,由于等待的任务队列是由LinkedBlockingQueue实现,所以阻塞队列是无界队列,如果有很多请求积压,阻塞队列越来越长,容易导致OOM(超出内存空间)。定长线程池的大小最好根据系统资源进行设置。如Runtime.getRuntime().availableProcessors()方法是查看电脑CPU核心数量来确定。

代码示例:

package org.andy.effective.java.study;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class NewFixedThreadPoolTest {

	public static void main(String[] args) {

		// 创建一个定长的线程池,线程数大小为系统CPU核心数量--我的系统是8核的,本次创建线程池定长为8
		ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

		// 创建20个执行任务--通过Thread.Sleep阻塞占用线程,以便产生有任务阻塞了
		for (int i = 0; i < 20; i++) {
			final int index = i;
			newFixedThreadPool.execute(new Runnable() {
				@Override
				public void run() {
					try {
						Thread.sleep(1000);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}

					System.out.println(Thread.currentThread().getName() + ",index==" + index);
				}
			});
		}

	}
}

三、newScheduledThreadPool

newScheduledThreadPool创建一个固定大小的定时调度线程池。该线程池可以延迟或定时的方式来执行任务。ScheduledThreadPool核心线程数量固定,任务无限的线程池。此线程池支持定时以及周期性执行任务的需求。图7源码发现ScheduledThreadPool创建线程池workQuene=DelayedWorkQueue,DelayedWorkQueue是一个无界阻塞队列所以ThreadPoolExecutor的maximumPoolSize在ScheduledThreadPoolExecutor中无意义。即newScheduledThreadPool的最大线程数是固定的,即为核心线程数量。如下示例,创建了1000个任务,交替使用6个核心线程进行执行。

代码示例:

package org.andy.effective.java.study;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class NewScheduledThreadPoolTest {

	public static void main(String[] args) {

		// 创建一个核心线程数为6的计划线程池
		ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(6);

		// 创建20个执行任务--通过Thread.Sleep阻塞占用线程,以便产生有任务阻塞了
		for (int i = 0; i < 1000; i++) {
			final int index = i;
			newScheduledThreadPool.schedule(new Runnable() {
				@Override
				public void run() {
					try {
						Thread.sleep(1000);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}

					System.out.println(Thread.currentThread().getName() + ",index==" + index);
				}
			}, 2, TimeUnit.SECONDS);// 延迟2秒执行

		}

	}
}

四、newSingleThreadExecutor

newSingleThreadExecutor创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它,他必须保证前一项任务执行完毕才能执行后一项。保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。LinkedBlockingQueue为阻塞无界队列,所以该线程池执行的任务也是无限的。

代码示例:

package org.andy.effective.java.study;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class NewSingleThreadExecutorTest {

	public static void main(String[] args) {
		// 创建无限大小线程池、由JVM自动回收-默认60s内没有使用的线程将进行回收
		ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();

		// 创建20个任务执行任务--通过Thread.Sleep阻塞占用线程,观察执行情况
		for (int i = 0; i < 20; i++) {
			final int index = i;
			newSingleThreadExecutor.execute(new Runnable() {
				@Override
				public void run() {
					try {
						Thread.sleep(1000);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}

					System.out.println(Thread.currentThread().getName() + ",index==" + index);
				}
			});
		}
	}
}

五、newSingleThreadScheduledExecutor

newSingleThreadScheduledExecutor创建只有单个线程的定时调度线程池,它能按照定时计划依次执行所有的任务。该线程池的任务队列也是无界的。

代码示例:

package org.andy.effective.java.study;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class NewSingleThreadScheduledExecutorTest {

	public static void main(String[] args) {
		// 创建无限大小线程池、由JVM自动回收-默认60s内没有使用的线程将进行回收
		ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();

		// 创建20个任务执行任务--通过Thread.Sleep阻塞占用线程,观察执行情况
		for (int i = 0; i < 20; i++) {
			final int index = i;
			newSingleThreadScheduledExecutor.schedule(new Runnable() {
				@Override
				public void run() {
					try {
						Thread.sleep(1000);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}

					System.out.println(Thread.currentThread().getName() + ",index==" + index);
				}
			}, 3, TimeUnit.SECONDS);// 延迟3秒执行
		}
	}
}

六、newWorkStealingPool

newWorkStealingPool创建了一个新的具有抢占式操作的线程池,每个线程都有一个任务队列存放任务,先工作完成的线程可以去帮助没处理完的线程工程。以实现最快完成工作。

代码示例:

package org.andy.effective.java.study;

import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;

public class NewWorkStealingPoolTest {

	public static void main(String[] args) throws InterruptedException {
		// 创建一个具有抢占式操作的线程池 1.8 之后新增 每个线程都有一个任务队列存放任务
		ExecutorService newWorkStealingPool = Executors.newWorkStealingPool(Runtime.getRuntime().availableProcessors());

		LinkedBlockingDeque<Future<String>> strings = new LinkedBlockingDeque<Future<String>>();

		CountDownLatch countDownLatch = new CountDownLatch(30);

		for (int i = 0; i < 30; i++) {
			final int index = i;
			Future<String> submit = newWorkStealingPool.submit(new Callable<String>() {
				@Override
				public String call() {
					try {
						countDownLatch.await();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}

					return Thread.currentThread().getName() + ",index==" + index;
				}
			});
			strings.offer(submit);

			countDownLatch.countDown();
		}

		System.out.println("over");

		strings.forEach(f -> {
			try {
				System.out.println(f.get());
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}
		});
	}
}

七、ThreadPoolExecutor饱和策略

如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任时,ThreadPoolTaskExecutor 定义了一些策略:
(1)ThreadPoolExecutor.AbortPolicy:抛出 RejectedExecutionException来拒绝新任务的处理。(默认采取这个策略)
(2)ThreadPoolExecutor.CallerRunsPolicy:调用执行自己的线程运行任务。不会拒绝任务。但是这种策略会降低对于新任务提交速度,影响程序的整体性能。另外,这个策略喜欢增加队列容量。如果您的应用程序可以承受此延迟并且你不能任务丢弃任何一个任务请求的话,你可以选择这个策略。
(3)ThreadPoolExecutor.DiscardPolicy:不处理新任务,直接丢弃掉。
(4)ThreadPoolExecutor.DiscardOldestPolicy: 此策略将丢弃最早的未处理的任务请求。

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表