异步方法(线程池)的实现
# 异步方法(线程池)的实现
在日常开发中,我们会遇到一些任务(比如进行群发邮件),它们可能很耗时,但是对于他们的运行结果,并不会影响到我们主程序的执行,不论他们的执行结果是成功还是失败(即使其中有些失败了,再次执行该方法,对失败的重发即可)。
此时,我们就没必要等待他们执行完毕,而是可以直接继续执行后续的代码。传统的方式,我们是需要等待他们执行完毕,才能继续执行。如果要想实现上述的不等待,则可以使用异步
的形式来执行方法(或者称 异步方法),也就是使用新的线程
来执行对应方法。
在springboot中也对此功能进行了实现。Spring Boot中实现异步方法主要依赖于Spring框架提供的@Async
注解。以下是对应步骤:
# 通过@Async实现
# 开启异步支持
为了让Spring能够识别并处理带有@Async
注解的方法,你需要在某个配置类上启用异步支持。这通常通过在主类或配置类上添加@EnableAsync
注解来完成:
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableAsync
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
2
3
4
5
6
7
8
9
10
11
# 创建异步方法
接下来,你可以在任何Spring管理的Bean中定义异步方法。只需要在方法上加上@Async
注解即可:
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
@Service
public class AsyncService {
@Async
public void executeAsyncTask() throws InterruptedException{
// 这里是异步执行的任务
System.out.println("执行异步任务:" + Thread.currentThread().getName());
Thread.sleep(1000);
TimeUnit.SECONDS.sleep(2);
}
@Async
public Future<String> executeAsyncTaskWithResult() throws InterruptedException {
// 异步任务,有返回值
long duration = (long) (Math.random() * 5);
System.out.println("异步任务耗时(秒): " + duration);
Thread.sleep(duration * 1000);
return new AsyncResult<>("异步任务完成");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 调用异步方法
最后,你可以在需要的地方调用这些异步方法。请注意,如果异步方法有返回值(如上面的executeAsyncTaskWithResult
),你需要使用java.util.concurrent.Future
来接收结果:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class AsyncController {
@Autowired
private AsyncService asyncService;
@GetMapping("/async")
public String callAsyncMethod() {
asyncService.executeAsyncTask();
// 这里就是异步任务,不会发生阻塞情况 尽管该方法会暂停几秒
try {
String result = asyncService.executeAsyncTaskWithResult().get();
return "调用完成,结果为: " + result;
} catch (InterruptedException | ExecutionException e) {
return "调用失败: " + e.getMessage();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 本质
Spring 使用AOP(Aspect-Oriented Programming)来实现 @Async
注解的功能。当一个方法被 @Async
标注时,Spring会为这个方法创建一个代理对象。这个代理对象负责在不同的线程中调用实际的方法。
具体来说,Spring使用CGLIB
或JDK
动态代理来生成代理对象。当调用一个带有 @Async
注解的方法时,实际上是在调用代理对象的方法,而不是直接调用原始方法。代理对象会在一个新的线程中调用原始方法,从而实现异步执行。
# 注意事项
了解的本质后,也就是所谓的了解了特点后。我们也可以推测出他的一些注意事项,比如什么情况下会失效。
因为它是基于aop实现代理对象,所以如果无法实现代理的话,则会发生即使加了注解,也无法实现异步调用。
# 自调用问题
在一个类中,如果一个带有 @Async
注解的方法被同一个类中的其他方法调用,那么异步功能可能不会生效。这是因为Spring的AOP代理机制只对通过接口或代理对象的调用生效,而不是对类内部的直接调用。
@Service
public class AsyncService {
@Async
public void asyncMethod() {
// 异步执行的代码
}
public void someMethod() {
asyncMethod(); // 这里不会异步执行
}
}
2
3
4
5
6
7
8
9
10
11
12
当一个带有
@Async
注解的方法在同一个类的其他方法中被调用时,实际上是直接调用了类的实例方法,而不是通过代理对象调用的。因此,Spring的AOP机制无法拦截到这个方法调用,也就无法应用异步执行的逻辑。
# 方法不是公共的
@Async
注解的方法必须是公共的(public
)。如果方法是私有的(private
)或受保护的(protected
),Spring将无法为其创建代理,从而导致异步功能失效。
# 所属类没被IOC管理
如果异步方法,没被加入IOC容器,也就是成为一个bean,那该注解也会失效。
# 主启动类没启用
如果没有在配置类上添加 @EnableAsync
注解,Spring将不会启用异步方法的支持。确保你的配置类中包含 @EnableAsync
# 事务管理
如果异步方法在事务管理的上下文中调用,可能会导致一些意外的行为。确保事务管理配置正确,并且不会干扰异步方法的执行。
# 能效问题
尽管 @Async
注解在Spring框架中提供了一种方便的方式来实现异步方法,但在某些情况下,开发人员可能会选择禁用或不使用 @Async
注解。
其根本原因,在于它对线程的创建问题上。因为使用@Async
注解,默认使用的是 SimpleAsyncTaskExecutor
作为线程池。然而,SimpleAsyncTaskExecutor
并不是一个真正的线程池,因为它不会重用线程,而是每次执行异步任务时都会创建一个新的线程。这在高并发场景下可能会导致性能问题和资源浪费,比如:可能会出现oom的情况。
因此不建议直接使用@Async
注解。
# 优雅的使用多线程
既然出问题的地方是默认线程池导致的,那我们是否可以给它换一个线程池呢?答案是ok的,@Async
注解支持替换使用的线程池。
只需要加上,线程池的名称即可。
@Async("customTaskExecutor")
public void asyncMethod() {
System.out.println("异步方法执行:" + Thread.currentThread().getName());
}
2
3
4
那么在springboot中定义一个线程池有什么特别的要求吗?或者怎么实现
# 自定义线程池
以下仅是一个例子,你完全可以根据需求来定制更适合你的线程池。
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
@EnableAsync
public class AsyncConfig {
@Bean(name = "customTaskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5); // 核心线程数
executor.setMaxPoolSize(10); // 最大线程数
executor.setQueueCapacity(20); // 队列容量
executor.setThreadNamePrefix("CustomExecutor-"); // 线程名称前缀
// 拒绝策略
executor.setRejectedExecutionHandler(new ThreadExecutor.DiscardPolicy());
executor.initialize();
return executor;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
确保在配置类或主类上添加 @EnableAsync
注解,以启用异步方法的支持。
# ThreadPoolTaskExecutor
细心的朋友们,应该会发现定义线程池时上述用的是ThreadPoolTaskExecutor
,一时间可能有点懵,因为它并不属于任何一个jdk下的线程池。不要慌,它是spring自带的一个线程池。下面就一起来看下,它的优势与不足:
- 简化线程池管理:Spring 提供了简单的配置方式来创建和管理线程池。
- 集成 Spring 管理:可以将线程池作为 Spring Bean 进行管理,方便注入和使用。
- 灵活的配置:可以通过属性文件或代码配置线程池的各种参数,如核心线程数、最大线程数、队列容量等。
- 异常处理:可以方便地处理未捕获的异常。
- 支持多种任务类型:支持 Runnable 和 Callable 任务。
关于集成这里就不再赘述,上方已经给出。
下面来看看它的灵活配置:
你也可以通过 application.yml
或 application.properties
文件来配置 ThreadPoolTaskExecutor
。
spring:
task:
execution:
pool:
core-size: 5
max-size: 10
queue-capacity: 20
thread-name-prefix: my-task-
2
3
4
5
6
7
8
然后在配置类中自动配置 ThreadPoolTaskExecutor
即可,不必再重新设置参数。
接下来我们再来看看异常捕获:
如果一个任务在 ThreadPoolExecutor
中抛出未捕获的异常,线程池会简单地打印堆栈跟踪,并且该任务会被标记为已完成。这可能会导致一些问题,例如你可能希望记录异常、重试任务或采取其他措施来处理这些异常。或者说处理它们会稍微有些复杂。
而如果我们使用ThreadPoolTaskExecutor来捕获异常时————Spring 的 ThreadPoolTaskExecutor
允许你设置一个全局的 Thread.UncaughtExceptionHandler
,并且还提供了一些额外的方法来处理异常。
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
public class ThreadPoolConfig {
@Value("${thread.pool.core-pool-size}")
private int corePoolSize;
@Value("${thread.pool.max-pool-size}")
private int maxPoolSize;
@Value("${thread.pool.queue-capacity}")
private int queueCapacity;
@Value("${thread.pool.keep-alive-seconds}")
private int keepAliveSeconds;
@Value("${thread.pool.thread-name-prefix}")
private String threadNamePrefix;
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(keepAliveSeconds);
executor.setThreadNamePrefix(threadNamePrefix);
// 设置全局的 UncaughtExceptionHandler
executor.setThreadFactory(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, threadNamePrefix + "-" + (executor.getThreadPoolExecutor().getPoolSize() + 1));
t.setUncaughtExceptionHandler((thread, throwable) -> {
System.err.println("Uncaught exception in thread " + thread.getName() + ": " + throwable.getMessage());
// 可以在这里进行日志记录、重试等操作
});
return t;
}
});
executor.initialize();
return executor;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
使用springboot时,为什么推荐使用ThreadPoolTaskExecutor来实现线程池?
其实根本在于其实它也是————继承自
ThreadPoolExecutor
类的,但它同时还进行了一定的封装与 Spring 进行了良好的整合,并提供了额外的功能。跟spring做了良好的集成。在使用上可能更方便,同时它也支持自定义各种参数。当然,你也可以继续用
ThreadPoolExecutor
来自定义线程池。
# 扩展-获得返回值
对于异步方法的返回值,只允许两种
- void
- CompletableFuture<?>
更多待补充