1. ThreadPoolExecutor란
Java에서 스레드풀을 관리하는 주요 클래스로,
대량의 작업을 효율적으로 처리하기 위해 미리 생성된 스레드풀을 활용할 수 있도록 설계되었다.
여러개의 작업을 병렬로 실행할 때 성능을 최적화하고, 스레드 생성과 제거를 관리하여 리소스 사용을 최적화하는 역할을 한다.
2. 생성자(주요 매개변수)
public ThreadPoolExecutor(
int corePoolSize, //코어 스레드 개수
int maximumPoolSize, //최대 스레드 개수
long keepAliveTime, //유휴 스레드 유지 시간
TimeUnit unit, //시간 단위(초,밀리초 등)
BlockingQueue<Runnable> workQueue, //작업큐
ThreadFactory threadFactory, //스레드 생성 방법 설정
RejectedExecutionHandler handler // 작업 거부 정책
) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
3. ThreadPoolExecutor 작동 방식
작업이 제출되면 다음 순서대로 처리됨.
- 현재 활성 스레드 개수가 corePoolSize보다 작으면 새로운 스레드를 생성하여 실행.
- corePoolSize만큼 스레드가 모두 실행 중이라면, workQueue에 작업을 추가.
- workQueue가 꽉 차면 maximumPoolSize까지 추가로 스레드 생성.
- maximumPoolSize에도 도달하면 RejectedExecutionHandler에 정의된 정책에 따라 작업을 처리.
유휴 스레드 관리
- corePoolSize 이상의 스레드는 keepAliveTime이 지나면 제거됨
4. 테스트 예시 코드
package com.croquis.catalog.api.controller
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
fun main() {
val executor = ThreadPoolExecutor(
2, // corePoolSize (항상 유지되는 최소 스레드)
4, // maximumPoolSize (최대 스레드 개수)
10, // keepAliveTime (초)
TimeUnit.SECONDS,
LinkedBlockingQueue(2), // 작업 큐 (최대 2개까지 저장)
ThreadPoolExecutor.AbortPolicy() // 거부 정책
)
// 작업 제출
for (i in 1..7) {
executor.execute {
println("Task $i 실행, Thread: ${Thread.currentThread().name}")
try {
Thread.sleep(3000) // 3초간 실행
} catch (e: InterruptedException) {
e.printStackTrace()
}
}
}
executor.shutdown()
}
[실행 결과 및 설명]
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.croquis.catalog.api.controller.ChowonControllerKt$$Lambda/0x00000008000c2a00@5ae63ade rejected from java.util.concurrent.ThreadPoolExecutor@7791a895[Running, pool size = 4, active threads = 4, queued tasks = 2, completed tasks = 0]
at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2081)
at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:841)
at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1376)
at com.croquis.catalog.api.controller.ChowonControllerKt.main(ChowonController.kt:19)
at com.croquis.catalog.api.controller.ChowonControllerKt.main(ChowonController.kt)
Task 6 실행, Thread: pool-1-thread-4
Task 1 실행, Thread: pool-1-thread-1
Task 2 실행, Thread: pool-1-thread-2
Task 5 실행, Thread: pool-1-thread-3
Task 3 실행, Thread: pool-1-thread-4
Task 4 실행, Thread: pool-1-thread-1
- 프로그램 실행 시 - pool size = 0
- Task 1 실행 - 스레드풀에 스레드가 1개 생성되며 처리(core pool size = 1)
- Task 2 실행 - 스레드풀에 스레드가 2개 생성되며 처리(core pool size = 2)
- Task 3 실행 - corePoolSize=2 이므로 작업 큐에서 대기 (workQueue = 1)
- Task 4 실행 - corePoolSize=2 이므로 작업 큐에서 대기 (workQueue = 2)
- Task 5 실행 - 작업큐가 가득 찼음(workQueue=2). maximumPoolSize=4 만큼 허용됨므로 새로운 스레드 만들어서 처리
- Task 6 실행 - 작업큐가 가득 찼음(workQueue=2). maximumPoolSize=4 만큼 허용됨므로 새로운 스레드 만들어서 처리
- Task7 실행 - 작업큐도 가득 찼고, maximumPoolSize도 한계에 이르렀으므로 거부 정책(AbortPolicy)에 의해서 RejectedExecutionException 발생
5. RejectedExecutionHandler (거부 정책)
작업 큐에도 가득 차고 스레드풀에도 한계가 왔을 때는 거부 정책에 의해 들어온 작업이 처리 된다.
java.util.concurrent.ThreadPoolExecutor에서 제공하는 기본적인 정책은 4가지이다.
1. AbortPolicy (기본값)
- 거부된 작업을 즉시 예외(RejectedExecutionException)로 던짐.
- 기본 정책이라 별도로 설정하지 않으면 AbortPolicy로 적용됨.
- 사용 예시
- 작업이 유실되면 안 되는 경우
- 실패 시 즉각적인 대응이 필요한 경우
2. CallerRunsPolicy
- 현재 작업을 실행하려고 했던 스레드가 직접 실행함.
- 즉, 스레드 풀이 가득 차면 새로운 작업을 대기열에 넣는 대신,현재 실행하려던 스레드(보통 호출한 스레드)가 해당 작업을 직접 수행함.
- 사용 예시
- 작업을 유실하지 않고 가능한 한 처리하고 싶을 때
- 성능 저하를 감수하더라도 작업을 보장해야 하는 경우
3. DiscardPolicy
- 새로운 작업을 조용히 버림. (예외 발생 X)
- 작업이 거부되더라도 예외를 던지지 않으므로,실패를 감지하고 싶다면 주의가 필요함.
- 사용 예시
- 중요하지 않은 로그 작업
- 실시간성이 중요하고 일부 작업이 유실되어도 괜찮은 경우
4. DiscardOldestPolicy
- 대기열에서 가장 오래된 작업을 삭제하고 새로운 작업을 추가함.
- 즉, 대기열이 꽉 차면, 가장 오래 기다린 작업을 버리고 새로운 작업을 큐에 넣음.
- 사용 예시
- 최신 작업을 더 중요하게 처리해야 하는 경우
- 오래된 데이터는 의미가 없는 경우 (예: 실시간 모니터링, 주식 거래 시스템 등)
이 중, 내가 현재 현업에서 개발 중인 앱서비스인 경우에는 성능 저하를 감수하더라도 작업을 보장하는 것이 가장 적절한 대응이라 생각하여 CallerRunsPolicy를 별도로 테스트 해보았다. 위 테스트 예시 코드에서 거부 정책만 AbortPolicy -> CallerRunsPolicy 로 바꿔주었다.
[실행 결과]
마지막 7번 작업은 스레드풀에서 처리되지 않고 해당 작업을 호출한 main 스레드에서 처리되었다.
Task 6 실행, Thread: pool-1-thread-4
Task 5 실행, Thread: pool-1-thread-3
Task 7 실행, Thread: main
Task 2 실행, Thread: pool-1-thread-2
Task 1 실행, Thread: pool-1-thread-1
Task 4 실행, Thread: pool-1-thread-1
Task 3 실행, Thread: pool-1-thread-4
어떤걸 선택해야 할까?
정책 | 예외 발생 | 작업 보장 | 적절한 사용 사례 |
AbortPolicy | ✅ 예외 발생 | ❌ 유실 가능 | 중요 작업, 장애 감지가 필요한 경우 |
CallerRunsPolicy | ❌ 예외 없음 | ✅ 최대한 처리 | 성능 저하를 감수하더라도 작업을 보장해야 할 때 |
DiscardPolicy | ❌ 예외 없음 | ❌ 유실 가능 | 로그, 메트릭 수집 등 일부 유실 가능 작업 |
DiscardOldestPolicy | ❌ 예외 없음 | ❌ 유실 가능 | 최신 데이터가 중요한 작업 (ex. 실시간 모니터링) |
6. 자체 QnA
Q-1) 프로그램이 실행하자마자 corePoolSize 만큼 스레드풀에 스레드를 생성해 놓는가?
A-1) 아니다.
ThreadPoolExecutor는 무조건 maximumPoolSize만큼의 스레드를 미리 생성해두는 방식이 아니다.
스레드 생성은 아래와 같은 규칙에 따라 동적으로 이루어진다.
1. 초기에는 아무 스레드도 생성되지 않는다.
- 처음에는 작업이 제출될 때까지 스레드는 생성되지 않는다.
ThreadPoolExecutor를 생성했다고 해서 곧바로 corePoolSize 만큼의 스레드가 만들어지는 게 아니라, 작업이 들어오면 스레드가 생성된다.
2. 작업이 들어오면 corePoolSize까지 스레드를 만든다.
- 예를 들어 corePoolSize = 5라면 처음 5개의 작업이 들어올 때까지 스레드를 하나씩 생성하면서 실행된다.
3. 코어 스레드가 모두 바쁘면 큐에 작업을 넣는다.
- corePoolSize 개수만큼 스레드가 이미 실행 중이면, 추가 작업은 workQueue에 저장된다.
4. 큐가 가득 차면 maximumPoolSize까지 추가로 스레드를 만든다.
- workQueue가 가득 차면 maximumPoolSize까지 추가적인 스레드를 생성하면서 실행된다.
5. 사용하지 않는 스레드는 keepAliveTime 후 제거된다.
- corePoolSize를 초과하여 생성된 스레드는 일정 시간(keepAliveTime) 동안 유휴 상태이면 제거된다.
결론적으로 말하면, 처음부터 corePoolSize 또는 maximumPoolSize만큼의 스레드를 만들어 두는 것이 아닌 작업이 들어오면 그때 스레드를 하나씩 생성하는 방식이다.
Q-2) 작업큐의 크기를 지정하지 않으면(LinkedBlockingQueue 기본 생성자 사용) maximumPoolSize만큼의 스레드는 언제 생성되는가?
A-2) 생성되지 않는다. 기본적으로 corePoolSize 만큼의 스레드만 생성되고, maximumPoolSize까지 확장되지 않는다.
큐가 가득찬 경우에만 maximumPoolSize까지 확장된다. 하지만 큐 크기를 지정하지 않으면 LinkedBlockingQueue 의 기본 capacity는 Integer.MAX_VALUE 로 거의 무한대이다.
즉, 큐가 가득 찰 일이 없으므로 maximumPoolSize 까지 확장 될 일이 없다.
[테스트 코드]
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
fun main() {
val executor = ThreadPoolExecutor(
2, // corePoolSize (항상 유지되는 최소 스레드)
4, // maximumPoolSize (최대 스레드 개수)
10, // keepAliveTime (초)
TimeUnit.SECONDS,
LinkedBlockingQueue(), // 작업 큐 (무제한으로 설정)
ThreadPoolExecutor.CallerRunsPolicy() // 거부 정책
)
// 작업 제출
for (i in 1..7) {
println("[$i] 스레드 개수: ${executor.poolSize}")
executor.execute {
println("Task $i 실행, Thread: ${Thread.currentThread().name}")
try {
Thread.sleep(3000) // 3초간 실행
} catch (e: InterruptedException) {
e.printStackTrace()
}
}
}
executor.shutdown()
}
[실행 결과]
처리할 작업 개수와 상관없이 생성된 스레드의 개수는 최대 2개(corePoolSize)이지 4개(maximumPoolSize)까지 증가하지 않았다.
[1] 스레드 개수: 0
[2] 스레드 개수: 1
[3] 스레드 개수: 2
[4] 스레드 개수: 2
[5] 스레드 개수: 2
[6] 스레드 개수: 2
[7] 스레드 개수: 2
Task 2 실행, Thread: pool-1-thread-2
Task 1 실행, Thread: pool-1-thread-1
Task 3 실행, Thread: pool-1-thread-2
Task 4 실행, Thread: pool-1-thread-1
Task 5 실행, Thread: pool-1-thread-2
Task 6 실행, Thread: pool-1-thread-1
Task 7 실행, Thread: pool-1-thread-1
따라서 적정한 큐 크기를 지정하고 그에 따라 maximumPoolSize도 함께 지정해주어야 한다.
그렇지 않으면 추가 작업은 모두 무제한 큐에 쌓이고 새로운 스레드는 생성되지 않는다.
'개발 > Java' 카테고리의 다른 글
Java의 volatile 키워드 (0) | 2021.10.21 |
---|---|
JAVA 버전별 정리 (0) | 2021.07.22 |