线程池与任务调度
# 线程池与任务调用
# 线程池介绍
线程池是 Java 并发编程中最重要的概念之一,它是一种管理和复用线程的机制,用于优化线程的使用和管理。
线程池是一种线程管理机制,它预先创建一组线程并保持它们处于就绪状态,当有任务需要执行时,从池中分配一个线程来执行任务,任务完成后线程返回池中等待下一个任务,而不是被销毁。
# 核心作用与价值
# 1. 降低资源消耗
- 减少线程创建和销毁的开销:线程的创建和销毁需要消耗系统资源
- 复用已有线程:避免频繁创建新线程的性能成本
# 2. 提高响应速度
- 任务到达时立即执行:线程已预先创建好,无需等待线程创建
- 减少启动延迟:特别适合大量短耗时任务
# 3. 提高线程的可管理性
- 统一管理线程资源:可以控制并发数量、监控线程状态
- 提供多种管理策略:如任务队列、拒绝策略等
# 4. 防止资源耗尽
- 限制最大线程数:防止创建过多线程导致系统崩溃
- 提供过载保护:通过拒绝策略处理过多任务
# Java 中的线程池体系
Java 通过 java.util.concurrent
包提供了强大的线程池支持:
// 线程池的核心接口和类
Executor // 执行器接口
ExecutorService // 执行服务接口(扩展了Executor)
ThreadPoolExecutor // 线程池执行器(最核心的实现)
ScheduledExecutorService // 支持调度的执行服务
// 工具类
Executors // 线程池工厂工具类
2
3
4
5
6
7
8
# 任务调度-练习题
使用ScheduledThreadPoolExecutor
实现一个定时任务:每5秒输出一次当前时间,并在1分钟后终止任务。
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public class ScheduledTaskExample {
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> {
System.out.println("Current Time: " + new SimpleDateFormat("HH:mm:ss").format(new Date()));
}, 0, 5, TimeUnit.SECONDS);
// 1分钟后终止任务
Thread.sleep(60_000);
future.cancel(true);
scheduler.shutdown();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
创建了一个调度线程池,线程池大小是 1
:
ScheduledExecutorService
是 JDK 提供的定时任务调度器;Executors.newScheduledThreadPool(1)
表示只有一个线程执行定时任务。
scheduler.scheduleAtFixedRate(...)
- 安排一个任务以**固定速率(Fixed Rate)**周期性执行。
- 第一次延迟后开始执行,每隔固定时间间隔再次执行,无论任务执行花多少时间,间隔是相对于开始时间计算的。
- 和
scheduleWithFixedDelay
不同,scheduleWithFixedDelay
是“执行完后延迟一段时间再执行”。
返回值:ScheduledFuture<?>
ScheduledFuture
是一个可以代表未来执行结果的句柄。- 通过
future.cancel(true)
可以取消任务。 - 泛型
<?>
表示任务没有返回结果(因为Runnable
没有返回值)。
# 任务调度
# CompletableFuture
CompletableFuture 是 Java 8 引入的一个非常强大的工具,用于异步编程。它实现了 Future 接口,并在此基础上增加了更丰富的特性,使其在处理并发和并行任务时更加灵活和方便。
# CompletableFuture 的核心特性
- 异步执行: 允许你启动一个任务,然后在后台执行,而不会阻塞主线程。
- 结果聚合: 可以将多个异步任务的结果组合成一个新的结果。
- 异常处理: 提供了更完善的异常处理机制,可以优雅地捕获和处理异步任务中发生的异常。
- 链式调用: 支持链式操作,使得多个异步步骤可以非常流畅地连接起来,形成一个处理流程。
- 非阻塞: 大多数操作都是非阻塞的,提高了系统资源的利用率。
# 常用方法
以下是常用方法
supplyAsync(Supplier<U> supplier): 异步执行一个带返回值的任务。
runAsync(Runnable runnable): 异步执行一个不带返回值的任务。
thenApply(Function<? super T,? extends U> fn): 当前阶段正常完成时,用其结果执行一个新的函数,并返回一个新的 CompletableFuture。
thenAccept(Consumer<? super T> action): 当当前阶段正常完成时,用其结果执行一个 Consumer 操作,不返回结果。
thenRun(Runnable action): 当当前阶段正常完成时,执行一个 Runnable 操作,不使用其结果。
thenCompose(Function<? super T, ? extends CompletionStage<U>> fn): 类似于 thenApply,但其函数参数返回一个 CompletionStage,并将其展平。常用于连接两个独立的 CompletableFuture。
thenCombine(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn): 组合两个 CompletableFuture 的结果,当两者都完成后,用它们的合并结果执行一个 BiFunction。
allOf(CompletableFuture<?>... cfs): 返回一个新的 CompletableFuture,当所有给定的 CompletableFuture 都完成时,它才会完成。
anyOf(CompletableFuture<?>... cfs): 返回一个新的 CompletableFuture,当任何一个给定的 CompletableFuture 完成时,它就会完成。
exceptionally(Function<Throwable, ? extends T> fn): 当当前阶段发生异常时,执行一个异常处理函数,并返回一个新的 CompletableFuture。
handle(BiFunction<? super T, Throwable, ? extends U> fn): 无论当前阶段是正常完成还是异常完成,都会执行一个函数。
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 业务场景
只是记忆它的各种api是很难深入理解的,一定要在对应的例子中使用它,才能有深刻的印象,才能知道使用它的注意事项。下面就让我们一起来看看它的一些使用场景吧。
CompletableFuture 在许多需要处理并发和异步操作的场景中都非常有用
。
为什么说它有用,是因为,它是并行执行任务,
- 串行执行:主线程依次调用多个接口,总耗时等于所有接口耗时之和(如 300ms)。
- 并行执行:多个接口同时调用,总耗时接近最慢接口的耗时(如 100ms)。
并行的耗时更少。
# 场景一:获取各种参数
- 并行数据查询/聚合:
场景: 用户请求一个页面,该页面需要从多个不同的服务或数据库中获取数据(例如:用户基本信息、订单列表、推荐商品)。
应用: 可以同时发起多个 CompletableFuture 去查询这些数据,然后使用 allOf 或 thenCombine 等方法将结果聚合起来,最后渲染页面。这比串行查询能显著减少响应时间。
- 例如:获取用户信息 + 获取用户订单 + 获取用户积分。
// 并行执行(总耗时 ≈ 100ms)
CompletableFuture<UserInfo> userFuture = CompletableFuture.supplyAsync(() -> getUserInfo(userId));
CompletableFuture<OrderList> orderFuture = CompletableFuture.supplyAsync(() -> getOrders(userId));
CompletableFuture<Points> pointsFuture = CompletableFuture.supplyAsync(() -> getPoints(userId));
CompletableFuture<Void> allFutures = CompletableFuture.allOf(userFuture, orderFuture, pointsFuture);
allFutures.join(); // 等待所有任务完成
UserInfo userInfo = userFuture.get();
OrderList orders = orderFuture.get();
Points points = pointsFuture.get();
2
3
4
5
6
7
8
9
10
- 主线程快速、串行地提交了三个任务到线程池。
- 提交完成后,主线程立即执行到
allFutures.join()
并被阻塞。 - 与此同时,线程池的多个线程开始并行执行那三个耗时方法。
- 当最后一个耗时方法执行完毕后,
allFutures
被标记为完成。 - 主线程从
join()
的阻塞中恢复,继续串行地执行后面的.get()
方法获取结果。
以上就是执行流程。
那么新问题来了,上述代码中哪些是串行,哪些是并行呢?
操作 | 执行方式 | 执行线程 |
---|---|---|
创建并提交三个 supplyAsync 任务 | 串行 | 主线程 |
执行 getUserInfo , getOrders , getPoints | 并行 | 线程池中的多个工作线程 |
allFutures.join() | 阻塞等待 | 主线程 |
userFuture.get() 等 | 串行获取结果 | 主线程 |
注意:上述例子中使用的线程池是默认线程池 —— ForkJoinPool.commonPool
- 如果您没有指定线程池(如代码所示),它会使用默认的
ForkJoinPool.commonPool()
。ForkJoinPool.commonPool()
是一个共享的线程池,其大小默认为 CPU 核心数 - 1(例如,在 4 核机器上,默认大小为 3)。- 这三个任务被提交后,线程池会从其内部的工作线程中分配可用线程来执行它们。
- 如果线程池中有至少 3 个可用线程,那么这三个任务会立即被 3 个不同的线程同时执行。
- 如果线程池中可用线程不足 3 个(比如默认大小只有2),那么其中两个任务会先被执行,第三个任务会暂时排队等待,直到有线程空闲出来再执行。
- 因此,从逻辑上讲,您发起了3个异步执行的任务。从物理上讲,JVM 的线程池会使用最多 3 个线程来并行处理它们,但实际使用的线程数取决于线程池的当前状态和大小。
上述代码,这种模式是标准的 “分治-聚合” 模式:
- 分 (Fork):将三个独立任务分发出去并行执行。
- 合 (Join):等待所有并行任务完成。
- 处理结果:聚合所有结果。
# 使用自定义线程池
刚刚的例子使用的是默认的线程池,那如果想使用自定义的呢?
需要在每个 supplyAsync()
调用中显式传入你的线程池实例。以下是修改后的代码:
// 1. 创建自定义线程池
ExecutorService customThreadPool = Executors.newFixedThreadPool(10); // 示例:创建固定大小为10的线程池
// 2. 使用自定义线程池执行异步任务
CompletableFuture<UserInfo> userFuture = CompletableFuture.supplyAsync(() -> getUserInfo(userId), customThreadPool);
CompletableFuture<OrderList> orderFuture = CompletableFuture.supplyAsync(() -> getOrders(userId), customThreadPool);
CompletableFuture<Points> pointsFuture = CompletableFuture.supplyAsync(() -> getPoints(userId), customThreadPool);
// 3. 组合所有任务,等待它们全部完成
CompletableFuture<Void> allFutures = CompletableFuture.allOf(userFuture, orderFuture, pointsFuture);
// 4. 阻塞主线程,直到所有任务完成
allFutures.join();
// 5. 逐个获取结果
UserInfo userInfo = userFuture.get();
OrderList orders = orderFuture.get();
Points points = pointsFuture.get();
// 6. 最后不要忘记关闭线程池(根据应用程序结构决定关闭时机)
customThreadPool.shutdown();
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
关于线程池如何设置,你是否熟悉呢?
# ForkJoinPool
# 介绍
之前说,completableFuture的默认线程池是ForkJoinPool
,那它是什么呢?有什么特性。
ForkJoinPool 是 Java 7 引入的一个特殊的线程池
,它旨在优化那些可以被分解成更小的、独立子任务的工作,然后将这些子任务的结果合并起来的并行计算。这种编程模型被称为“分治法”(Divide and Conquer)。
# 它与普通线程池的不同
要理解 ForkJoinPool
与普通线程池(如 ThreadPoolExecutor)的区别,必须先理解其工作窃取算法。这是它与普通线程池最根本的不同。
工作窃取(Work-Stealing)算法:
- ForkJoinPool:它使用一个双端队列(Deque)来存储任务。每个工作线程都有自己的双端队列。当一个线程完成了自己队列中的任务时,它可以从其他线程的队列尾部“窃取”任务来执行。这大大减少了线程之间的竞争,提高了CPU的利用率,尤其是在任务分配不均匀时。
- 普通线程池:通常使用一个共享的任务队列。所有工作线程都从这个共享队列中获取任务。这可能导致任务竞争激烈,锁开销大,特别是在高并发场景下。
工作窃取的优势:
- 减少竞争:大部分时间,线程只操作自己的队列,避免了多线程争抢同一个队列锁的开销。
- 最大化CPU利用率:它有效地平衡了线程之间的工作量。繁忙的线程不会被打扰,而空闲的线程会主动去找活干,极大地减少了线程空闲的可能性,在高负载下能更好地利用硬件资源。
任务类型:
- ForkJoinPool:主要用于 ForkJoinTask 的子类,例如 RecursiveAction(无返回值)和 RecursiveTask(有返回值)。这些任务被设计成可以递归地分解和合并。
- 普通线程池:通常用于 Runnable 和 Callable 任务。
RecursiveAction
和 RecursiveTask
是 Java 并行编程中 Fork/Join 框架 的核心抽象类,专为 分治算法(Divide and Conquer)设计。
ForkJoinTask:是所有 Fork/Join 任务的基类,定义了 fork()
(异步执行)和 join()
(等待结果)方法。而它们的之类之二,就是
RecursiveAction
:无返回值的任务。RecursiveTask
:有返回值的任务。
特性 | RecursiveAction | RecursiveTask< T > |
---|---|---|
是否返回值 | 无返回值 | 有返回值(泛型 T) |
适用场景 | 数据处理、无结果聚合的任务(如遍历、修改数据) | 结果需要聚合的任务(如求和、统计) |
核心方法 | protected void compute() | protected T compute() |
任务分解逻辑 | 拆分任务并行执行,无需返回结果 | 拆分任务并行执行,合并子任务结果 |
示例 | 并行排序、批量数据修改 | 数组求和、斐波那契数列计算 |
# 使用场景
ForkJoinPool 最适合处理那些具有以下特征的问题:
- 可分解性:任务可以被递归地分解成更小的、独立的子任务。
- 可合并性:子任务的结果可以被合并以产生最终结果。
- 计算密集型:任务主要是进行计算,而不是I/O密集型操作。因为工作窃取机制在计算密集型任务中效果最好,可以充分利用CPU。
# 不适用的场景
- I/O密集型任务:
- ForkJoinPool 的设计目标是最大限度地利用CPU。如果任务涉及大量的I/O操作(例如,读写文件、网络请求、数据库查询),这些操作会导致线程频繁阻塞。ForkJoinPool 的工作窃取机制在这种情况下效果不佳,因为它不能很好地处理阻塞的线程,可能会导致大量线程阻塞,而没有足够的计算任务来窃取,反而降低效率。
- 建议:对于I/O密集型任务,更适合使用普通的 ThreadPoolExecutor,并根据I/O的并发量来调整线程池大小,通常是线程数大于CPU核心数,以弥补I/O等待造成的CPU空闲。
- 任务之间有强依赖关系且难以分解:
- 如果任务之间有非常复杂的依赖关系,或者一个任务必须等待另一个任务的特定中间结果才能继续,那么分解和合并的开销可能会抵消并行带来的好处。
- 建议:对于这类任务,可能更适合使用同步机制(如 CountDownLatch、CyclicBarrier)或响应式编程。
- 任务执行时间极短且数量巨大:
- 如果任务非常小,并且创建、分解 ForkJoinTask 的开销,以及管理双端队列和工作窃取的开销,可能会比任务本身的执行时间还要长。这被称为“任务粒度过细”。
- 建议:对于这种场景,可以考虑将多个小任务合并成一个更大的任务再提交给线程池,或者使用其他更轻量级的并发工具。
- 简单的、非分治型任务:
- 如果任务只是简单的、一次性的操作,不需要分解和合并,使用 ForkJoinPool 会引入不必要的复杂性,并且性能可能不如普通线程池。
- 建议:对于这种任务,直接使用 ThreadPoolExecutor 的 execute() 或 submit() 方法即可。
# 代码示例
业务场景:假设你需要对一个非常大的整数数组进行求和。如果数组非常大,单线程计算会很慢。使用 ForkJoinPool 可以将数组分成多个小段,并行计算每个小段的和,最后将这些局部和合并。
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import java.util.Random;
// 定义一个RecursiveTask来计算数组元素的和
class SumArrayTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 1000; // 任务分解的阈值
private int[] array;
private int start;
private int end;
public SumArrayTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if (end - start <= THRESHOLD) {
// 如果任务足够小,直接计算
long sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
} else {
// 否则,将任务分解成两个子任务
int mid = start + (end - start) / 2;
SumArrayTask leftTask = new SumArrayTask(array, start, mid);
SumArrayTask rightTask = new SumArrayTask(array, mid, end);
// 异步执行左子任务
leftTask.fork();
// 同步执行右子任务(或者也可以fork,然后join)
Long rightResult = rightTask.compute();
// 等待左子任务完成,并获取其结果
Long leftResult = leftTask.join();
// 合并结果
return leftResult + rightResult;
}
}
}
public class ForkJoinSumExample {
public static void main(String[] args) {
int[] data = generateRandomArray(10_000_000); // 生成一个千万级别的大数组
// 创建ForkJoinPool,通常使用默认构造函数,它会根据CPU核心数创建线程
ForkJoinPool forkJoinPool = new ForkJoinPool();
long startTime = System.currentTimeMillis();
// 提交主任务到线程池
SumArrayTask task = new SumArrayTask(data, 0, data.length);
Long result = forkJoinPool.invoke(task); // invoke会阻塞直到任务完成
long endTime = System.currentTimeMillis();
System.out.println("并行计算结果: " + result);
System.out.println("并行计算耗时: " + (endTime - startTime) + " ms");
// 验证结果(可选,单线程计算)
long singleThreadSum = 0;
long singleStartTime = System.currentTimeMillis();
for (int value : data) {
singleThreadSum += value;
}
long singleEndTime = System.currentTimeMillis();
System.out.println("单线程计算结果: " + singleThreadSum);
System.out.println("单线程计算耗时: " + (singleEndTime - singleStartTime) + " ms");
forkJoinPool.shutdown();
}
private static int[] generateRandomArray(int size) {
Random random = new Random();
int[] array = new int[size];
for (int i = 0; i < size; i++) {
array[i] = random.nextInt(100); // 0-99的随机数
}
return array;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
业务场景:在大数据分析中,经常需要在大型数据集中快速找到最大/最小值。
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import java.util.Random;
class FindMaxTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 1000;
private int[] array;
private int start;
private int end;
public FindMaxTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
if (end - start <= THRESHOLD) {
// 基本任务:直接查找最大值
int max = Integer.MIN_VALUE;
for (int i = start; i < end; i++) {
if (array[i] > max) {
max = array[i];
}
}
return max;
} else {
// 分解任务
int mid = start + (end - start) / 2;
FindMaxTask leftTask = new FindMaxTask(array, start, mid);
FindMaxTask rightTask = new FindMaxTask(array, mid, end);
leftTask.fork(); // 异步执行左侧
int rightMax = rightTask.compute(); // 同步执行右侧
int leftMax = leftTask.join(); // 等待左侧结果
return Math.max(leftMax, rightMax); // 合并结果
}
}
}
public class ForkJoinFindMaxExample {
public static void main(String[] args) {
int[] data = generateRandomArray(20_000_000); // 2千万个元素
ForkJoinPool forkJoinPool = new ForkJoinPool();
long startTime = System.currentTimeMillis();
FindMaxTask task = new FindMaxTask(data, 0, data.length);
Integer maxResult = forkJoinPool.invoke(task);
long endTime = System.currentTimeMillis();
System.out.println("并行查找最大值: " + maxResult);
System.out.println("并行查找耗时: " + (endTime - startTime) + " ms");
// 验证(单线程)
long singleStartTime = System.currentTimeMillis();
int singleMax = Integer.MIN_VALUE;
for (int value : data) {
if (value > singleMax) {
singleMax = value;
}
}
long singleEndTime = System.currentTimeMillis();
System.out.println("单线程查找最大值: " + singleMax);
System.out.println("单线程查找耗时: " + (singleEndTime - singleStartTime) + " ms");
forkJoinPool.shutdown();
}
private static int[] generateRandomArray(int size) {
Random random = new Random();
int[] array = new int[size];
for (int i = 0; i < size; i++) {
array[i] = random.nextInt(1_000_000); // 0-999999的随机数
}
return array;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80