深入剖析 Java 中线程池的多种实现方式
wptr33 2025-09-19 03:56 2 浏览
在当今高度并发的互联网软件开发领域,高效地管理和利用线程资源是提升程序性能的关键。Java 作为一种广泛应用于后端开发的编程语言,为我们提供了丰富的线程池实现方式。今天,就让我们深入探讨 Java 中线程池的实现方式,帮助广大互联网软件开发人员更好地优化自己的代码。
使用 Executors 工具类创建线程池
Executors 工具类为我们提供了便捷的方法来创建常见类型的线程池,大大简化了线程池的创建过程。
(一)固定大小线程池(Fixed Thread Pool)
通过
Executors.newFixedThreadPool(int nThreads)方法创建。其特点是线程池中的线程数始终保持固定的nThreads个。当有新任务提交时,如果有空闲线程,任务会立即被分配给空闲线程执行;若所有线程都在忙碌,新任务则会被放入任务队列中等待。这种线程池适用于任务量已知且相对固定的场景,比如服务器端处理固定数量并发请求的业务逻辑。它能够有效控制资源的使用,避免线程过多导致的上下文切换开销。例如:
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(4);
for (int i = 0; i < 10; i++) {
fixedThreadPool.submit(() -> {
// 具体任务逻辑
System.out.println("任务在固定大小线程池的线程 " + Thread.currentThread().getName() + " 中执行");
});
}
在这个例子中,我们创建了一个固定大小为 4 的线程池,并向其中提交了 10 个任务。这些任务会在 4 个固定的线程中依次或并发执行,具体取决于任务的执行时间和线程的繁忙程度。
(二)缓存线程池(Cached Thread Pool)
使用
Executors.newCachedThreadPool()创建。此线程池的线程数量会根据任务的提交情况动态调整。当有新任务提交时,如果有空闲线程则立即执行任务;若没有空闲线程,且当前线程数小于最大线程数(理论上为Integer.MAX_VALUE),则会创建新的线程来执行任务。而空闲线程如果在 60 秒内没有新任务,将会被终止并从缓存中移除。这种线程池适合执行大量短期异步任务的场景,比如处理大量的短连接请求。它能够快速响应任务,减少线程创建和销毁的开销。示例代码如下:
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
cachedThreadPool.submit(() -> {
// 具体任务逻辑
System.out.println("任务在缓存线程池的线程 " + Thread.currentThread().getName() + " 中执行");
});
}
这里,我们创建了一个缓存线程池并提交 10 个任务。由于任务可能是短时间内大量提交,缓存线程池会根据需要灵活创建和回收线程,以高效处理这些任务。
(三)单线程池(Single Thread Executor)
通过
Executors.newSingleThreadExecutor()创建。该线程池只有一个线程,所有任务会按照提交的顺序依次执行,保证了任务的顺序性和互斥性。适用于需要按顺序执行任务的场景,例如一些对数据一致性要求较高的操作。如下代码所示:
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
singleThreadExecutor.submit(() -> {
// 具体任务逻辑
System.out.println("任务在单线程池的线程 " + Thread.currentThread().getName() + " 中执行");
});
}
在这个单线程池的例子中,10 个任务会一个接一个地在唯一的线程中执行,不会出现并发冲突。
(四)调度线程池(Scheduled Thread Pool)
分为两种:
Executors.newScheduledThreadPool(int corePoolSize)创建一个固定大小的线程池,支持定时或周期性任务;
Executors.newSingleThreadScheduledExecutor()创建一个单线程的线程池,同样支持定时或周期性任务。以newScheduledThreadPool为例,它适用于需要定时或周期性执行任务的场景,如定时数据备份、定时发送消息提醒等。代码示例如下:
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);
scheduledThreadPool.scheduleAtFixedRate(() -> {
// 具体任务逻辑
System.out.println("定时任务在调度线程池的线程 " + Thread.currentThread().getName() + " 中执行");
}, 1, 3, TimeUnit.SECONDS);
(五)工作窃取线程池(Work Stealing Pool)
通过
Executors.newWorkStealingPool(int parallelism)创建(若不传入参数,默认使用 CPU 核心数作为并行度)。它使用ForkJoinPool实现,支持并行处理大量任务。线程池中的线程会尝试 “窃取” 其他线程队列中的任务来执行,从而充分利用多核处理器的性能。适用于需要并行处理大量任务的场景,例如大规模数据计算。示例代码如下:
ExecutorService workStealingPool = Executors.newWorkStealingPool();
for (int i = 0; i < 10; i++) {
workStealingPool.submit(() -> {
// 具体任务逻辑
System.out.println("任务在工作窃取线程池的线程 " + Thread.currentThread().getName() + " 中执行");
});
}
在工作窃取线程池中,各个线程会积极地从其他线程的任务队列中窃取任务执行,提高整体的任务处理效率。
直接使用 ThreadPoolExecutor 创建线程池
虽然 Executors 工具类创建线程池很方便,但在某些场景下,我们需要更精细地控制线程池的行为,这时就可以直接使用ThreadPoolExecutor类。ThreadPoolExecutor提供了多个重载的构造方法,其中参数最全的构造方法如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize:核心线程数,是线程池中保持活动的最小线程数,即使这些线程处于空闲状态。当有新任务提交时,如果当前线程数小于corePoolSize,即使有空闲线程,线程池也会创建一个新线程来处理任务。
- maximumPoolSize:线程池中允许的最大线程数,包括核心线程和非核心线程。当任务队列已满且当前线程数小于maximumPoolSize时,线程池会创建新的线程来处理任务。
- keepAliveTime:用于控制线程池中非核心线程的生命周期。如果超过这个时间没有新任务,非核心线程将被终止。
- unit:keepAliveTime参数的时间单位,如TimeUnit.SECONDS表示秒。
- workQueue:任务队列,当所有核心线程都在忙碌时(任务数超过了corePoolSize),任务首先会被放入这个队列中,然后由工作线程从队列中取出并执行。常见的任务队列有LinkedBlockingQueue(基于链表的无界阻塞队列)、ArrayBlockingQueue(基于数组的有界阻塞队列)、SynchronousQueue(不存储元素的阻塞队列,每个插入操作必须等待一个相应的移除操作)等。
- threadFactory:用于创建新线程的工厂,通过它可以定制新创建线程的属性,比如名称、优先级等。
- rejectedExecutionHandler:拒绝策略,当任务队列已满且线程数达到maximumPoolSize时,新提交的任务将被拒绝,由拒绝策略来处理这些被拒绝的任务。常见的拒绝策略有AbortPolicy(默认策略,直接抛出RejectedExecutionException异常)、CallerRunsPolicy(将被拒绝的任务交给调用者线程来执行)、DiscardPolicy(直接丢弃被拒绝的任务)、DiscardOldestPolicy(丢弃任务队列中最老的任务,然后尝试提交新任务)。
例如,我们创建一个自定义的线程池:
TimeUnit unit = TimeUnit.SECONDS;
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
ExecutorService threadPool = new ThreadPoolExecutor(
2,
4,
10,
unit,
workQueue,
Executors.defaultThreadFactory(),
handler);
for (int i = 0; i < 10; i++) {
threadPool.submit(() -> {
// 具体任务逻辑
System.out.println("任务在线程 " + Thread.currentThread().getName() + " 中执行");
});
}
在这个例子中,我们创建了一个核心线程数为 2,最大线程数为 4,非核心线程存活时间为 10 秒,任务队列容量为 2,采用CallerRunsPolicy拒绝策略的线程池。通过这种方式,我们可以根据具体业务场景灵活调整线程池的各项参数,以达到最优的性能。
ForkJoinPool—— 并行处理的利器
自 Java 7 引入的ForkJoinPool,是为解决并行计算问题而生的特殊线程池。它专为 “分治”(Fork/Join)并行计算设计,能够将一个大任务分解成多个小任务并行处理,然后把结果合并起来。
(一)工作原理
分治思想:
- 拆分(Fork):把一个大任务拆成若干个小任务。例如,在计算一个大数组的总和时,可以将数组拆分成多个小数组,每个小数组由一个子任务负责计算。
- 执行:当任务足够小时直接计算。比如,当拆分后的小数组元素数量较少时,直接计算该小数组的和。
- 合并(Join):把所有小任务的结果合并起来。将各个小数组的计算结果相加,得到最终大数组的总和。
工作窃取(Work - Stealing)机制:
- 每个工作线程都有自己的双端队列(deque)存储任务。
- 线程从自身队列尾部获取任务(后进先出,LIFO),从其他线程队列头部 “窃取” 任务(先进先出,FIFO)。
- 每个工作线程的任务队列是线程私有的,自身取任务(尾部 LIFO)和窃取任务(头部 FIFO)均通过无锁的 CAS 操作实现,避免了全局锁开销,这是ForkJoinPool高并发效率的关键原因。通过这种机制,确保所有线程都保持忙碌状态,避免资源浪费。
(二)适用场景
计算密集型任务:如矩阵运算、图像处理、排序算法等。这些任务通常需要大量的计算资源,通过并行处理可以充分利用多核 CPU 的性能,显著提高计算速度。
可拆分的问题:任务能被拆分为相互独立的子任务。只有任务可以合理拆分,才能发挥ForkJoinPool的优势。
合并成本低:子任务结果合并的时间远小于计算时间。如果合并结果的操作过于复杂或耗时,会抵消并行计算带来的优势。
任务规模可调:可以控制任务的拆分粒度。根据实际情况调整任务的拆分程度,以达到最佳的性能。
(三)与 ThreadPoolExecutor 的区别
ThreadPoolExecutor适合处理独立的任务,每个任务之间没有依赖关系;而ForkJoinPool适合处理可以分解的计算密集型任务,通过分治和工作窃取机制提高并行处理效率。
(四)示例代码
以使用ForkJoinPool实现归并排序为例:
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
public class ParallelMergeSort extends RecursiveAction {
private static final int THRESHOLD = 1000;
private final int[] array;
private final int start;
private final int end;
public ParallelMergeSort(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected void compute() {
if (end - start <= THRESHOLD) {
// 直接进行插入排序
for (int i = start + 1; i < end; i++) {
int value = array[i];
int j = i - 1;
while (j >= start && array[j] > value) {
array[j + 1] = array[j];
j--;
}
array[j + 1] = value;
}
} else {
int mid = (start + end) / 2;
ParallelMergeSort leftTask = new ParallelMergeSort(array, start, mid);
ParallelMergeSort rightTask = new ParallelMergeSort(array, mid, end);
leftTask.fork();
rightTask.compute();
leftTask.join();
merge(start, mid, end);
}
}
private void merge(int start, int mid, int end) {
int[] temp = new int[end - start];
int i = start;
int j = mid;
int k = 0;
while (i < mid && j < end) {
if (array[i] <= array[j]) {
temp[k++] = array[i++];
} else {
temp[k++] = array[j++];
}
}
while (i < mid) {
temp[k++] = array[i++];
}
while (j < end) {
temp[k++] = array[j++];
}
System.arraycopy(temp, 0, array, start, temp.length);
}
public static void main(String[] args) {
int[] array = {5, 4, 6, 2, 7, 1, 3};
ForkJoinPool pool = new ForkJoinPool();
pool.invoke(new ParallelMergeSort(array, 0, array.length));
for (int num : array) {
System.out.print(num + " ");
}
}
}
在这个例子中,我们通过ForkJoinPool实现了并行归并排序。将大数组不断拆分成小数组进行排序,最后合并结果,大大提高了排序效率。
CompletableFuture—— 异步任务的高层次抽象
CompletableFuture提供了异步任务的更高层次抽象,并支持链式处理和组合。它可以与线程池结合使用,进一步提升异步任务的处理能力。例如:
ExecutorService executorService = Executors.newFixedThreadPool(4);
CompletableFuture.supplyAsync(() -> {
// 异步任务逻辑
return "任务结果";
}, executorService)
.thenApply(result -> {
// 对结果进行处理
return result + " 经过处理";
})
.thenAccept(System.out::println);
在这段代码中,我们首先创建了一个固定大小为 4 的线程池,然后使用
CompletableFuture.supplyAsync方法提交一个异步任务到线程池中执行。任务执行完成后,通过thenApply方法对结果进行转换,最后使用thenAccept方法消费最终的结果。通过这种方式,我们可以方便地管理和组合多个异步任务,提高代码的可读性和可维护性。
综上所述,Java 中线程池的实现方式多种多样,每种方式都有其适用的场景和特点。在实际的互联网软件开发中,我们需要根据具体的业务需求、任务类型和系统资源等因素,选择合适的线程池实现方式,以实现高效的并发编程,提升软件系统的性能和稳定性。希望本文能为大家在使用 Java 线程池方面提供有益的参考和帮助。
相关推荐
- 高性能并发队列Disruptor使用详解
-
基本概念Disruptor是一个高性能的异步处理框架,是一个轻量的Java消息服务JMS,能够在无锁的情况下实现队列的并发操作Disruptor使用环形数组实现了类似队列的功能,并且是一个有界队列....
- Disruptor一个高性能队列_java高性能队列
-
Disruptor一个高性能队列前言说到队列比较熟悉的可能是ArrayBlockingQueue、LinkedBlockingQueue这两个有界队列,大多应用在线程池中使用能保证线程安全,但其安全性...
- 谈谈防御性编程_防御性策略
-
防御性编程对于程序员来说是一种良好的代码习惯,是为了保护自己的程序在不可未知的异常下,避免带来更大的破坏性崩溃,使得程序在错误发生时,依然能够云淡风轻的处理,但很多程序员入行很多年,写出的代码依然都是...
- 有人敲门,开水开了,电话响了,孩子哭了,你先顾谁?
-
前言哎呀,这种情况你肯定遇到过吧!正在家里忙活着,突然——咚咚咚有人敲门,咕噜咕噜开水开了,铃铃铃电话响了,哇哇哇孩子又哭了...我去,四件事一起来,人都懵了!你说先搞哪个?其实这跟我们写Java多线...
- 面试官:线程池如何按照core、max、queue的执行顺序去执行?
-
前言这是一个真实的面试题。前几天一个朋友在群里分享了他刚刚面试候选者时问的问题:"线程池如何按照core、max、queue的执行循序去执行?"。我们都知道线程池中代码执行顺序是:co...
- 深入剖析 Java 中线程池的多种实现方式
-
在当今高度并发的互联网软件开发领域,高效地管理和利用线程资源是提升程序性能的关键。Java作为一种广泛应用于后端开发的编程语言,为我们提供了丰富的线程池实现方式。今天,就让我们深入探讨Java中...
- 并发编程之《彻底搞懂Java线程》_java多线程并发解决方案详解
-
目录引言一、核心概念:线程是什么?...
- Redis怎么实现延时消息_redis实现延时任务
-
一句话总结Redis可通过有序集合(ZSET)实现延时消息:将消息作为value,到期时间戳作为score存入ZSET。消费者轮询用ZRANGEBYSCORE获取到期消息,配合Lua脚本保证原子性获取...
- CompletableFuture真的用对了吗?盘点它最容易被误用的5个场景
-
在Java并发编程中,CompletableFuture是处理异步任务的利器,但不少开发者在使用时踩过这些坑——线上服务突然雪崩、异常悄无声息消失、接口响应时间翻倍……本文结合真实案例,拆解5个最容易...
- 接口性能优化技巧,有点硬_接口性能瓶颈
-
背景我负责的系统到2021年初完成了功能上的建设,开始进入到推广阶段。随着推广的逐步深入,收到了很多好评的同时也收到了很多对性能的吐槽。刚刚收到吐槽的时候,我们的心情是这样的:...
- 禁止使用这5个Java类,每一个背后都有一段"血泪史"
-
某电商平台的支付系统突然报警:大量订单状态异常。排查日志发现,同一笔订单被重复支付了三次。事后复盘显示,罪魁祸首竟是一行看似无害的SimpleDateFormat代码。在Java开发中,这类因使用不安...
- 无锁队列Disruptor原理解析_无锁队列实现原理
-
队列比较队列...
- Java并发队列与容器_java 并发队列
-
【前言:无论是大数据从业人员还是Java从业人员,掌握Java高并发和多线程是必备技能之一。本文主要阐述Java并发包下的阻塞队列和并发容器,其实研读过大数据相关技术如Spark、Storm等源码的,...
- 线程池工具及拒绝策略的使用_线程池处理策略
-
线程池的拒绝策略若线程池中的核心线程数被用完且阻塞队列已排满,则此时线程池的资源已耗尽,线程池将没有足够的线程资源执行新的任务。为了保证操作系统的安全,线程池将通过拒绝策略处理新添加的线程任务。...
- 【面试题精讲】ArrayBlockingQueue 和 LinkedBlockingQueue 区别?
-
有的时候博客内容会有变动,首发博客是最新的,其他博客地址可能会未同步,认准...
- 一周热门
-
-
C# 13 和 .NET 9 全知道 :13 使用 ASP.NET Core 构建网站 (1)
-
程序员的开源月刊《HelloGitHub》第 71 期
-
详细介绍一下Redis的Watch机制,可以利用Watch机制来做什么?
-
假如有100W个用户抢一张票,除了负载均衡办法,怎么支持高并发?
-
如何将AI助手接入微信(打开ai手机助手)
-
Java面试必考问题:什么是乐观锁与悲观锁
-
SparkSQL——DataFrame的创建与使用
-
redission YYDS spring boot redission 使用
-
一文带你了解Redis与Memcached? redis与memcached的区别
-
如何利用Redis进行事务处理呢? 如何利用redis进行事务处理呢英文
-
- 最近发表
- 标签列表
-
- git pull (33)
- git fetch (35)
- mysql insert (35)
- mysql distinct (37)
- concat_ws (36)
- java continue (36)
- jenkins官网 (37)
- mysql 子查询 (37)
- python元组 (33)
- mybatis 分页 (35)
- vba split (37)
- redis watch (34)
- python list sort (37)
- nvarchar2 (34)
- mysql not null (36)
- hmset (35)
- python telnet (35)
- python readlines() 方法 (36)
- munmap (35)
- docker network create (35)
- redis 集合 (37)
- python sftp (37)
- setpriority (34)
- c语言 switch (34)
- git commit (34)