ThreadPoolTaskExecutor
- 스레드 풀을 사용하는 Executor
- java.util.concurrent.Executor를 Spring에서 구현한 것
- org.springframework.scheduling.concurrent 패키지에서 제공
- 주로 spring에서 비동기처리를 위해 사용
- 스레드풀을 사용하여 멀티스레드 구현을 쉽게 해줌
- 기본 생성자 하나만 존재
Configuration
Pool size configuration
1
2
3
4
5
6
7
@Bean("simpleTaskExecutor")
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5); // 기본 스레드 수
taskExecutor.setMaxPoolSize(10); // 최대 스레드 수
return taskExecutor;
}
- core와 max 사이즈를 설정할 수 있다.
- 최초 core사이즈만큼 동작하다가 더 이상 처리할 수 없을 경우 max사이즈만큼 스레드가 증가할 것이라 일반적으로 생각하지만 사실 그렇지 않다.
- 내부적으로는 Integer.MAX_VALUE사이즈의 LinkedBlockingQueue를 생성해서 core사이즈만큼의 스레드에서 task를 처리할 수 없을 경우 queue에서 대기하게 됩니다. queue가 꽉 차게 되면 그때 max 사이즈만큼 스레드를 생성해서 처리하게 된다.
Capacity configuration
1
2
3
4
5
6
7
8
@Bean("simpleTaskExecutor")
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5); // 기본 스레드 수
taskExecutor.setMaxPoolSize(10); // 최대 스레드 수
taskExecutor.setQueueCapacity(100); // Queue 사이즈
return taskExecutor;
}
- core 사이즈 보다 많은 요청이 발생할 경우 Integer.MAX_VALUE 사이즈만큼의 queue의 용량이 너무 크다고 생각된다면 queueCapacity사이즈를 변경할 수 있다.
- 위의 예시 코드와 같이 설정한다면 최초 5개의 스레드에서 처리하다가 처리 속도가 밀릴 경우 100개 사이즈 queue에서 대기하고 그보다 많은 요청이 들어올 경우 최대 10개의 스레드까지 생성해서 처리하게 된다.
RejectedExecutionHandler configuration
- max 스레드까지 생성하고 queue까지 꽉 찬 상태에서 추가 요청이 오면 RejectedExecutionException 예외가 발생합니다. 더 이상 처리할 수 없다는 오류이다.
- 오류가 발생하는 걸 손 놓고 지켜봐야만 하는게 아니라 몇 가지 예외 정책을 설정해줘야합니다.
- 기본적으로 RejectedExecutionHandler 인터페이스를 구현한 몇 가지 클래스가 존재한다.
- AbortPolicy
- 기본 설정(Default)
- RejectedExecutionException을 발생시킨다
- DiscardOldestPolicy
- 오래된 작업을 skip
- 모든 task가 무조건 처리되어야 할 필요가 없을 경우 사용
- DiscardPolicy
- 처리하려는 작업을 skip
- 역시 모든 task가 무조건 처리되어야 할 필요가 없을 경우 사용
- CallerRunsPolicy
- shutdown 상태가 아니라면 ThreadPoolTaskExecutor에 요청한 thread에서 직접 처리
- 예외와 누락 없이 최대한 처리하려면 CallerRunsPolicy로 설정하는 것이 좋음
1 2 3 4 5 6 7 8 9
@Bean("simpleTaskExecutor") public TaskExecutor taskExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(5); // 기본 스레드 수 taskExecutor.setMaxPoolSize(10); // 최대 스레드 수 taskExecutor.setQueueCapacity(100); // Queue 사이즈 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return taskExecutor; }
- AbortPolicy
Shutdown configuration
- 위와 같이 정의한 스레드 풀에서 열심히 작업이 이루어이고 있을 때 애플리케이션 종료를 요청하면 어떻게 될까?
- Spring Boot Actuator를 이용해서 종료를 시켜보면 호출 즉시 애플리케이션이 바로 종료 되는 것을 확인할 수 있다.
1
POST http://localhost:8888/actuator/shutdown
- 이렇게 즉시 종료되면 아직 처리되지 못한 task는 유실된다. 유실 없이 마지막까지 다 처리하고 종료되길 원한다면 설정을 추가해야 합니다.
- waitForTasksToCompleteOnShutdown을 true로 하게 되면 queue에 남아 있는 모든 작업이 완료될 때 까지 기다리게 된다.
1 2 3 4 5 6 7 8 9 10
@Bean("simpleTaskExecutor") public TaskExecutor taskExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(5); // 기본 스레드 수 taskExecutor.setMaxPoolSize(10); // 최대 스레드 수 taskExecutor.setQueueCapacity(100); // Queue 사이즈 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); taskExecutor.setWaitForTasksToCompleteOnShutdown(true); return taskExecutor; }
Timeout configuration
- 만약 모든 작업이 처리되길 기다리기 힘든 경우라면 setAwaitTerminationSeconds메서드를 통해 최대 종료 대기 시간을 설정할 수 있다.
1 2 3 4 5 6 7 8 9 10 11
@Bean("simpleTaskExecutor") public TaskExecutor taskExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(5); // 기본 스레드 수 taskExecutor.setMaxPoolSize(10); // 최대 스레드 수 taskExecutor.setQueueCapacity(100); // Queue 사이즈 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); taskExecutor.setWaitForTasksToCompleteOnShutdown(true); taskExecutor.setAwaitTerminationSeconds(60); // shutdown 최대 60초 대기 return taskExecutor; }
기타 Configuration
메서드 | 설명 | 기본값 |
---|---|---|
setAllowCoreThreadTimeout | 코어 스레드의 타임아웃을 허용할 것인지에 대한 설정 메서드. true로 설정할 경우 코어 슬데ㅡ를 10으로 설정했어도 일정시간(keepAliveSeconds)이 지나면 코어 스레드 개수가 줄어듦 | false |
setkeepAliveSeconds | 코어 스레드 타임아웃을 허용했을 경우 사용되는 설정값으로, 여기 설정된 식나이 지날 때까지 코어 스레드 풀의 스레드가 사용되지 않을 경우 해당 스레드는 terminate된다. | 60초 |
프로젝트에 적용하기
Autowired로 ThreadPoolTaskExecutor 사용하기
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.example.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
@Service
public class TestService {
@Autowired
@Qualifier("executor")
private ThreadPoolTaskExecutor executor;
public void executeThreads() {
System.out.println("executing threads");
for(int i=0;i<10;i++) {
executor.execute(new Job());
}
}
class Job implements Runnable {
@Override
public void run() {
try {
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
- TestService라는 서비스 클래스에서 멤버변수 executor에 위에서 빈으로 등록한 객체를 쓰도록 @Qualifier로 executor이름을 명시주었다.
- execute메소드로 Runnable을 수행한 것을 알 수 있다.
@Async로 ThreadPoolTaskExecutor 사용하기
- 1) ThreadPoolTaskExecutor bean 설정 클래스에 @EnableAsync도 추가해야 함
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.example.configuration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@EnableAsync
public class TaskExecutorConfig {
@Bean(name = "executor")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5); // 기본 스레드 수
taskExecutor.setMaxPoolSize(10); // 최대 스레드 수
taskExecutor.setQueueCapacity(100); // Queue 사이즈
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.setAwaitTerminationSeconds(60); // shutdown 최대 60초 대기
return taskExecutor;
}
}
- 2) 서비스의 메서드에 @Async(“스레드풀네임”)어노테이션 추가
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.example.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
@Service
public class TestService2 {
@Async("executor")
public void executeThreads() {
System.out.println("executing threads");
try {
Thread.sleep(3000);
System.out.println("[TestService2]" + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
정리
- 1)@Configuration으로 등록한 클래스에 executor @Bean추가 (@Async를 이용할 경우 @EnableSync도 추가)
- 2)@Autowired, @Qualifier로 주입하여 사용하거나 또는 메서드 레벨에 @Async를 붙여 사용
@Async 어노테이션 사용시 주의사항
Async is not a silver bullet(@Async는 탄환이 아니다)
- private 메서드에는 적용이 안된다. public만 된다.
- self-invocation(자가 호출)해서는 안된다. -> 같은 클래스 내부의 메서드를 호출하는 것은 안된다
예시
- TestController.java
1
2
3
4
5
6
7
8
9
10
11
@RestController
public class TestController {
@Autowired private TestService testService;
@GetMapping("/test2")
public void test2() {
for(int i=0;i<10000;i++) {
testService.innerMethodCall(i);
}
}
}
- TestService.java
1
2
3
4
5
6
7
8
9
10
11
12
13
@Service
public class TestService {
private static final Logger logger
= LoggerFactory.getLogger(TestService.class);
@Async public void innerMethod(int i) {
logger.info("async i = " + i);
}
public void innerMethodCall(int i) {
innerMethod(i);
}
}
- 위 코드로 테스트 해보면 controller에서 testService.innerMethodCall()를 동기로 호출하지만 내부에서 하는 작업이 비동기 @Async가 걸린 innerMethod를 호출하니까 결국에는 비동기로 로그가 찍힐 것을 예상할 수 있다.
- 하지만 틀렸다. 아래 처럼 하나의 스레드로 동기 처리됨을 볼 수 있다.
위와 같은 결과가 발생하는 이유
- https://dzone.com/articles/effective-advice-on-spring-async-part-1
- 위의 출처에서 제대로 설명해준다.
결론부터 말하면 AOP가 적용되어 spring context에 등록되어 있는 빈 객체의 메서드가 호출되었을 때 스프링이 끼어들 수 있고 @Async가 적용되어 있따면 스프링이 메서드를 가로채서 다른 스레드(풀)에서 실행시켜주는 메커니즘이라는 것이다.
- 그렇기 때문에 위에 제약조건이었던 것들이 이해가 된다.
- public이어야 가로챈 스프링의 다른 클래스에서 호출이 가능할 것이고, self-invocation이 불가능 했던 이유도 spring context에 등록된 메소드 호출이어야 프록시를 적용받을 수 있기에 내부 메서드 호출은 프록시 영향을 받지 않기 때문이다.
해결책
1
2
3
4
5
6
@Service public class AsyncService {
@Async
public void run(Runnable runnable) {
runnable.run();
}
}
- 위의 코드와 같이 AsyncService를 하나 두고 해당 서비스는 유틸 클래스처럼 전역에서 사용하도록 두는 것이다.
- @Async메서드 run을 통해 들어오는 Runnable을 그냥 실행만 해주는 메서드다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Service
public class TestService {
private static final Logger logger = LoggerFactory.getLogger(TestService.class);
@Autowired
private AsyncService asyncService;
public void innerMethod(int i) {
logger.info("async i = " + i);
}
public void innerMethodCall(int i) {
asyncService.run(()->innerMethod(i));
}
}
- 그 다음에 비동기 메서드 호출이 필요할 때 해당 서비스로 메서드를 호출해버리는 것이다.
- 저렇게 하니깐 결과도 비동기로 처리하는 모습을 확인할 수 있다.
- 위와 같은 해결책은 service의 메서드는 동기로 호출되길 원하지만 내부에서 하는 기능(동작)에서 일부만 비동기로 실행되기를 원할때 사용하면 좋다
- 참고한 블로그 작성자님의 생각으로는 차라리 CompletableFuture를 쓰되 해당 스레드 풀에서 실행되기를 바라면 아래와 같이 Executor를 주입받고 호출하는 것이 나을 것 같다고 한다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14
@Service public class TestService { private static final Logger logger = LoggerFactory.getLogger(TestService.class); @Autowired private Executor executor; public void innerMethod(int i) { logger.info("async i = " + i); } public void innerMethodCall(int i) { CompletableFuture.runAsync(()->innerMethod(i),executor); } }
- 위 코드를 실행해도 executor로 등록한 스레드풀이 주입되어 해당 풀에서 작업들이 수행된다.
- 참고로, void나 future가 아닌 String 리턴 값을 가진 메서드에 @Async를 달았는데도 잘 수행되었다.