질문거리
스레드풀, fork 의 정확한 역할, trySplit() 메서드 구현 부분
자바 7 이전에는 데이터 컬렉션을 병렬로 처리하기가 어려웠다.
데이터를 서브 파트로 분할 후 분할된 서브파트를 각각의 스레드로 할당해야됐다.
스레드로 할당한 다음에는 의도치 않은 race condition 이 발생하지 않도록 적절한 동기화를 추가해야하고, 마지막 결과를 합쳐야 됐다.
하지만 자바 7 등장 이후 자바는 쉽게 병렬화를 수행하면서 에러를 최소화 할 수 있는 포크/조인 프레임워크 기능을 제공한다.
이 글에서는 포크/조인 프레임워크 기능을 살펴보기 전에 어떻게 스프림으로 데이터 컬렉션 관련 동작을 손쉽게 병렬로 실행할 수 있는지 알아보고, 더 나아가 자바 7에 추가된 포크/조인 프레임워크와 내부적인 병렬 스트림 처리의 관계를 알아보고자 한다. 추가로 병렬로 처리하기 위해 청크를 나누는 원리에 대해서도 알아보고자 한다.
병렬 스트림이란 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림이다. 따라서 병렬 스트림을 이용하면 여러 개의 프로세서가 여러 개의 청크를 동시에 처리할 수 있도록 할 수 있다.
숫자 n을 인수로 받아서 1부터 n까지 모든 숫자의 합계를 반환하는 메서드를 알아보자
public long sequentialSum(long n) {
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.reduce(0L, Long::sum);
}
위의 순차 스트림에 parallel 메서드를 호출하면 기존의 함수형 리듀싱 연산이 병렬로 처리된다.
public long sequentialSum(long n) {
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.parallel() //병렬 스트림으로 변환
.reduce(0L, Long::sum);
}
스트림이 여러 청크로 분할되어 각각 리듀싱 연산을 수행한 후 다시 리듀싱 연산으로 합쳐져 결과를 도출한다.
parallel 을 호출하면 스트림 자체에 변화가 생기는 것이 아니라 이후 연산이 병렬로 수행해야 함을 의미하는 불리언 프래그가 설정되는 것이다.
이런식으로 sequential 로 병렬 스트림을 순차 스트림으로 바꿀 수도 있다.
stream.parallel()
.filter(...)
.sequential()
.map(...)
.parallel()
.reduce();
병렬 스트림에서 사용하는 스레드 풀 설정
스트림의 parallel 메서드에서 병렬로 작업을 수행하는 스레드의 생성장소, 몇 개나 생성되는지, 그리고 이 과정을 커스터마이즈도 가능하다.
병렬 스트림은 내부적으로 ForkjoinPool 을 이용하는데, 기본적으로 프로세서 수즉,Runtime.getRuntime().availableProcessors()
가 반환하는 값에 상응하는 스레드를 가진다.
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12")
일반적으로 이 값은 기기의 프로세서 수와 같으므로 이 기본값을 권장한다.
병렬화를 이용하는 것이 정말로 성능에 도움이 되는지 성능을 측정했다.
일반적으로 JVM 으로 실행되는 프로그램을 벤치마크하는 작업은 쉽지 않다고 한다. 핫스팟이 바이트 코드를 최적화하는데 필요한 준비시간, 가비지 컬렉터로 인한 오버헤드 등과 같은 여러 요소를 고려해야 되기 때문이다. 하지만 메이븐 빌드 도구를 사용하고 몇가지 의존성을 추가한다면 JMH라는 라이브러리를 이용해 안정적으로 JVM을 대상으로 하는 벤치마크를 구현할 수 있다고 한다.
책의 내용을 우선 서술하고, 나중에 직접 성능 테스트를 해볼 것이다. 🐸
@BenchmarkMode(Mode.AverageTime) //벤치마크 대상 메서드를 실행하는데 걸린 평균 시간 측정
@OutputTimeUtil(TimeUnit.MILLISECONDS) //벤치마크 결과를 ms 단위로 출력
@Fork(2, jvmArgs = {"-Xms4G", "-Xmx4G"}) //4GB의 힙 공간을 제공한 환경에서 2번의 벤치마크를 수행
public class ParallelStreamBenchmark {
private static final long N = 10_000_000L;
@Benchmark
public long sequentialSum() {
return Stream.iterate(1L, i -> i + 1).limit(N)
.reduce(0L, Long::sum);
}
@TearDown(Level.Invocation) //매 벤치마크 실행한 후에 가비지 컬렉터 동작 시도
public void tearDown() {
System.gc();
}
}
전통적인 for 루프를 사용해 반복하는 방법이 더 저수준으로 동작할 뿐 아니라 기본값을 박싱하거나 언박싱할 필요가 없으므로 더 빠를 것이라 예상할 수 있다.
@Benchmark
public long iterativeSum() {
long result = 0;
for (long i = 1L; i <= N; i++) {
result += i;
}
return result;
}
벤치마크 결과도 역시 순차적 스트림을 사용할 때보다 4배 이상 빠르게 측정되었다.
하지만 병렬스트림의 벤치마크 결과는 순차스트림에 비해 5배나 느린 결과가 나왔다.
아래와 같은 이유를 들 수 있다.
두번째 이유에 주목해야하는데, 우리는 병렬로 수행될 수 있는 스트림 모델이 필요하기 때문이다. 이 문제는 이전 연산의 결과에 따라 다음 함수의 입력이 달라지기에 Iterate 연산을 청크로 분할하기 어려워서 발생한 것이다.
위의 그림과 같은 리듀싱 연산이 수행되지 않는데, 리듀싱 시작 시점에 전체 숫자 리스트가 준비되지 않았으므로 스트림을 병렬로 처리할 수 있도록 청크로 분할할 수 없는 것이다. 결국에 스트림이 병렬처리를 지시해 각각 다른 스레드에서 수행되었지만 순차 처리 방식과 다를게 없어 스레드를 할당하는 오버헤드만 더 증가한 꼴이다.
이처럼 병렬 프로그래밍은 오용하면 오히려 프로그램의 성능이 더 나빠질 수도 있기에 우리는 parrallel 메서드를 호출했을때 내부적으로 어떤 일이 일어나는지 꼭 이해해야한다.
멀티코어 프로세서를 활용해서 효과적으로 병렬 연산을 실행하려면 어떻게 해야할까?
5장에서 LongStream.rangeClosed라는 메서드를 소개했다.
@Benchmark
public long parallelRangedSum() {
return LongStream.rangeClosed(1, N)
.parallel()
.reduce(0L, Long::sum);
}
벤치마크 결과 드디어, 순차로 수행했을때보다 병렬로 수행했을때 더 높은 성능을 보였다. 올바른 자료구조를 선택해야 병렬 실행도 최적의 성능을 발휘한다는 사실을 알수 있다.
병렬화를 이용하려면 스트림을 재귀적으로 분할해야하고, 각 서브 스트림을 서로 다른 스레드의 리듀싱 연산으로 할당하고 이들 결과를 하나의 값으로 합쳐야한다. 멀티 코아 간의 데이터 이동은 생각보다 비싸서, 코어 간에 데이터 전송 시간보다 훨씬 오래 걸리는 작업만 병렬로 다른 코어에서 수행하는 것이 바람직하다.
공유된 상태를 바꾸는 알고리즘을 병렬 스트림으로 사용하면 문제가 발생한다.
public long sideEffectSum(long n) {
Accumlator accumulator = new Accumulator();
LongStream.rangeClosed(1, n).forEach(accumulator::add);
return acculator.total;
}
public class Accumulator {
public long total = 0;
public void add(long value) { total += value; }
}
메서드의 성능 뿐만 아니라 결과값도 올바르게 나오지 않는다.
병렬 스트림이 올바르게 동작하기 위해서는 공유된 가변 상태를 피해야한다.
어떤 상황에서 병렬 스트림을 사용해야 적절한 성능 개선을 얻을 수 있는지 살펴보자
확신이 서지 않을때는 순차 스트림과 병렬 스트림 구현 시의 성능을 직접 측정해라
자동 박싱과 언박싱은 성능을 크게 저하시킬 수 있는 요소이므로 주의해서 사용해야 하며, 기본형 특화 스트림(IntStream, LongStream, DoubleStream)을 사용하는 것이 좋다.
limit이나 findFirst처럼 요소의 순서에 의존하는 연산은 병렬 스트림에서 성능이 더 떨어진다. 요소의 순서가 상관없다면 unordered를 호출해서 비정렬된 스트림을 얻은 후 limit을 호출하는 것이 더 효율적이다.
스트림에서 수행하는 전체 파이프라인 연산 비용을 고려하자. 요소 수가 많고 요소 당 연산 비용이 높다면 병렬 스트림으로 성능을 개선할 여지가 있다.
병렬화 과정의 부가 비용을 상쇄하지 못할 정도의 소량의 데이터에서는 병렬스트림이 도움되지 않는다.
스트림을 구성하는 자료구조가 적절한지 확인한다. ArrayList는 요소를 탐색하지 않고도 분할할 수 있지만 LinkedList는 모든 요소를 탐색해야 분할할 수 있다.
LinkedList는 모든 요소를 탐색해야 분할이 가능하다.
range 팩토리 메서드로 만든 기본형 스트림이나 커스텀 Spliterator를 구현하면 쉽게 분해할 수 있다.
*스트림의 특성과 파이프라인 중간 연산이 스트림의 특성을 어떻게 바꾸는지에 따라 분해 과정의 성능이 달라질 수 있다. SIZED 스트림은 정확히 같은 크기의 두 스트림으로 분할할 수 있으므로 효과적으로 병렬 처리가 가능하다. 반면 필터 연산이 있으면 스트림의 길이를 예측할 수 없으므로 병렬 처리가 어려워진다.
최종 연산의 병합 과정 비용이 비싸다면 병렬 스트림으로 얻은 이익이 상쇄될 수 있다.
아래 표는 스트림 소스와 분해성을 나타낸 것이다. 분해성이 훌륭할 수록 병렬화에 적합한 자료구조이다.
이제 병렬 스트림을 제대로 사용하기 위해 병렬 스트림의 내부구조에 대해 알아보자
병렬화할 수 있는 작업을 재귀적으로 작은 작업으로 분할하여 서브태스크로 처리한 뒤, 각각의 결과를 합쳐서 전체 결과로 만드는 방식
서브 테스크를 스레드 풀(ForkJoinPool)의 작업자 스레드에 분산할당 하는 ExecutorService 인터페이스를 구현한다.
스레드풀을 이용하려면 RecursiveTask<R>
의 서브클래스를 만들어야하고, 추상메서드 compute를 구현해야 한다.
protected abstract R compute();
compute 메서드 구현 형식은 분할 정복 알고리즘의 병렬화 버전을 사용한다.
if(태스크가 충분히 작거나 더이상 분할할 수 없으면) {
순차적으로 태스크 계산
} else {
태스크를두 서브태스크로 분할
태스크가 다시 서브태스크로 분할되도록 메시지를 재귀적으로 호출
모든 서브태스크의 연산이 왑료될때까지 대기
각 서브태스크의 결과를 합침
}
아래는 포크/조인 과정이다.
public class ForkJoinSumCalculator extends java.util.concurrent.RecursiveTask<Long>{
private final long [] numbers;
private final int start;
private final int end;
private final long THRESHOLD = 10_000;
//main task 생성시 public 생성자
public ForkJoinSumCalculator(long[] numbers){
this(numbers, 0, numbers.length);
}
//recursive subtask 생성시 non public 생성자
private ForkJoinSumCalculator(long[] numbers, int start, int end){
this.numbers = numbers;
this.start = start;
this.end = end;
}
@Override
protected Long compute(){
int length = end - start; // task에서 더할 배열의 길이
if (length <= THRESHOLD){
return computeSequentially(); // 기준값보다 작으면 순차적으로 결과를 계산
}
ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length/2);
leftTask.fork(); // ForkJoinPool의 다른 스레드로 새로 생성한 태스크를 비동기로 실행
ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length/2, end);
Long rightResult = rightTask.compute(); // 두 번째 서브태스크를 동기 실행, 추가 분할 일어날 수 있음.
Long leftResult = leftTask.join(); // 첫 번째 서브태스크의 결과를 읽거나 아직 없으면 기다린다.
return leftResult + rightResult;
}
//분할 더 이상 안될 때 서브태스크 결과 계산해주는 알고리즘.
private Long computeSequentially(){
long sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
}
}
다음 코드처럼 ForkJoinSumCalculator의 생성자로 원하는 수의 배열을 넘겨줄 수도 있다.
public static long forkJoinSum(long n){
long[] numbers = LongStream.rangeClosed(1, n).toArray();
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
return new ForkJoinPool().invoke(task);
}
위의 코드에서 ForkJoinPool 은 소프트웨어의 필요한 곳에서 언제든 가져다 쓸 수 있도록 ForkJoinPool 을 한 번만 인스턴스화해서 정적 필드에 싱글턴으로 저장한다. 그래서 인수가 없는 디폴트 생성자를 이용해서 JVM 에서 이용할 수 있는 모든 프로세서가 자유롭게 풀에 접근할 수 있음을 의도했다.
위 코드를 실행해보면 병렬 스트림을 이용할 때 보다 성능이 더 나빠졌다. 그 이유는 ForkJoinSumCalculator 태스크에서 사용할 수 있도록 전체 스트림을 long[]으로 변환했기 때문이다.
포크/조인 분할 전략에서는 주어진 서브테스크를 더 분할할 것인지 결정할 기준을 정해야한다. 이는 작업 훔치기를 통해서 일어난다.
이론적으로는 CPU의 코어 개수만큼 병렬화된 태스크로 작업부하를 분할하면 모든 코어에서 태스크를 실행할 것이고, 같은 시간에 종료될 것이라고 생각할 수 있다. 하지만 다양한 이유로 각각의 서브태스크의 작업완료 시간이 크게 달라질 수 있다.
포크/조인 프레임워크에서는 작업훔치기(work stealing)라는 기법으로 이 문제를 해결한다. 작업 훔치기 기법에서는 ForkJoinPool의 모든 스레드를 거의 공정하게 분할한다. 각각의 스레드는 자신에게 할당된 테스크를 포함하는 이중 연결 리스트를 참조하면서 작업이 끝날때마다 큐의 헤드에서 다른 테스크를 가져와 작업을 처리한다.
이제, 위처럼 분할 로직을 개발하지 않고 병렬 스트림을 이용해 자동으로 스트림을 분할해주는 과정을 살펴보자.
Spliterator는 '분할할 수 있는 반복자'라는 의미이다. Iterator처럼 소스의 요소 탐색 기능을 제공하며, 병렬 작업에 특화되어 있다.
public interface Spliterator<T> {
boolean tryAdvance(Consumer<? super T> action);
Spliterator<T> trySplit();
long estimateSize();
int characteristics();
}
스트림을 여러 스트림으로 분할하는 과정은 재귀적으로 일어난다. 1단계에서 첫 번째 Spliterator에서 trySplit을 호출하면 두 번째 Spliterator가 생성되고, 2단계에서 두 번째 Spliterator에서 trySplit을 호출하면 네 개의 Spliterator가 생성된다. 이는 trySplit가 null이 될때까지 반복한다.
반복형으로 단어 수를 세는 메서드
public int countWordsIteratively(String s) {
int counter = 0;
boolean lastSpace = true;
for (char c : s.toCharArray()) { //문자열의 모든 문자를 하나씩 탐색
if (Character.isWhiteSpace(c)) {
lastSpace = true;
} else {
if (lastSpace) conter++; //공백문자 탐색 시 이전까지의 문자를 단어를 간주하여 단어 수 counting
lastSpace = false;
}
}
return counter;
}
함수형으로 단어 수를 세는 메서드
class WordCounter {
private final int counter;
private final boolean lastSpace;
public WordCounter(int counter, boolean lastSpace) {
this.counter = counter;
this.lastSpace = lastSpace;
}
public WordCounter accumulate(Character c) {
if (Character.isWhitespace(c)) {
return lastSpace ? this : new WordCounter(counter, true);
} else {
return lastSpace ? new WordCounter(counter+1, false) : this;
}
}
public WordCounter combine(WordCounter wordCounter) {
return new WordCounter(counter + wordCounter.conter, wordCounter.lastSpace);
}
public int getConter() {
return conter;
}
}
위 연산을 병렬 스트림으로 처리하면 원하는 결과가 나오지 않는다. 원래 문자열을 임의의 위치에서 둘로 나누다보니 하나의 단어를 둘로 계산하는 상황이 발생할 수 있기 때문이다.
따라서 문자열을 임의의 위치에서 분할하지 않고 단어가 끝나는 위치에서만 분할하도록 trySplit() 메서드를 구현해주면 된다.