컬렉션에 paralleStream을 호출하면 병렬 스트림이 생성된다.
병렬 스트림이란 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림이다. 따라서 병렬 스트림을 이용하면 모든 멀티코어 프로세스가 각각의 청크를 처리하도록 할당할 수 있다.
간단한 예제를 보자
public long sequentialSum(long n){
return Stream.iterate(1L, i-> i+1) // 무한 자연수 스트림 생성
.limit(n) // n개 이하로 제한
.reduce(0L, Long:sum); // 모든 숫자를 더하는 스트림 리듀싱 연산
public long iterativeSum(long n){
long result = 0;
for(long i = 1L; i<=n; i++){
result+=i;
}
return result;
}
n이 커진다면 이 연산을 병렬로 처리하는 것이 좋을 것이다.
병렬로 처리한다고 할 때 생각해야 할 지점들은
등등이다. 병렬 스트림을 사용하면 걱정하지 않아도 된다.
순차 스트림에 parallel 메서드를 호출하면 기존의 함수형 리듀싱 연산(숫자 합계 계산)이 병렬로 처리된다.
public long parallelSum(long n){
return Stream.iterate(1L, i-> i+1)
.limit(n)
.parallel() // 스트림을 병렬 스트림으로 변환
.reduce(0L, Long:sum);
리듀싱 연산으로 생성된 부분 결과를 다시 리듀싱 연산으로 합쳐서 전체 스트림의 리듀싱 결과를 도출한다.
순차 스트림에 parallel을 호출해도 스트림 자체에는 아무 변화도 일어나지 않는다.
내부적으로는 parallel을 호출하면 이후 연산이 병렬로 수행해야 함을 의미하는 불리언 플래그가 설정된다.
반대로 sequential로 병렬 스트림을 순차 스트림으로 바꿀 수도 있다.
이 두 메서드를 이용해서 어떤 연산을 병렬로 실행하고 어떤 연산을 순차로 실행할지 제어 할 수 있다.
stream.parallel()
.filter()
.sequential()
.map()
.parallel()
.reduce();
parallel과 sequential 두 메서드 중 최종적으로 호출된 메서드가 전체 파이프라인에 영향을 미친다. 위 예제에서는 마지막 호출은 parallel이므로 파이프라인은 전체적으로 병렬로 실행된다.
병렬 스트림은 내부적으로 ForkJoinPool(포크/조인 프레임워크)을 사용한다. 기본적으로 ForkJoinPool은 프로세서 수, 즉 Runtime.getRuntime().availableProcessors()가 반환하는 값에 상응하는 스레드를 갖는다.
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12");
이 예제는 전역 설정 코드이므로 이후의 모든 병렬 스트림 연산에 영향을 준다.
현재는 하나의 병렬 스트림에 사용할 수 있는 특정한 값을 지정할 수 없다.
일반적으로는 기본값을 사용할 것이 권장된다.
지금까지 반복형, 순차 리듀싱, 병렬 리듀싱 세 방법으로 실행하는 법을 알아봤다. 어느 것이 제일 빠른지 알아보자!
성능 최적화에서 중요한 것은 "측정"이다. 측정 하지 않고 예측 하는 것은 삼가하자.
JMH라이브러리를 통해서 벤치마크를 구현해보겠음.(벤치마크는 컴퓨터나 소프트웨어 시스템의 성능을 측정하기 위한 테스트나 절차를 의미)
메이븐 설정을 하고 나서 아래와 같이 코드를 짜면 된다.
@BenchmarkMode(Mode.AverageTime) // 벤치마크 대상 메서드를 실행하는 데 걸린 평균 시간 측정
@OutputTimeUnit(TimeUnit.MILLISECONDS) // 벤치마크 결과를 밀리초 단위로 출력
@Fork(2,jvmArgs = {"-Xms4G","-Xmx4G"}) // 4Gb의 힙공간을 제공한 환경에서 두 번 벤치마크를 수행해 결과의 신뢰성 확보
public calss ParallelStreamBenchmark{
private static final long N = 10_000_000L;
@Benchmark
public long sequentialSum(long n){
return Stream.iterate(1L, i-> i+1)
.limit(n)
.reduce(0L, Long::sum);
}
@TearDown(Level.Invocation)//매 번 벤치마크를 실행한 다음에는 가비지 컬렉터 동작 시도
public void tearDown(){
System.gc();
}
}
벤치마크가 가능한 가비지 컬렉터의 영향을 받지 않도록 힙의 크기를 충분하게 설정했을 뿐 아니라, 벤치마크가 끝날 때마다 가비지 컬렉터가 실행되도록 강제했다. 이렇게 주의를 기울였지만, 여전히 결과는 정확하지 않을 수 있음을 기억하자.
기계가 지원하는 코어의 갯수 등이 실행 시간에 영향을 미칠 수 있기 때문이다.
그럼 위 3가지의 결과를 보자!!
Benchmark Mode Cnt Score Error Units
Sample.iterativeSum avgt 0.085 ms/op
Sample.parallelSum avgt 6.385 ms/op
Sample.sequentialSum avgt 1.858 ms/op
병렬 버전이 쿼드코어 CPU를 활용하지 못하고 순차 버전에 비해 다섯 배나 느린 실망스러운 결과가 나왔다. 이 의외의 결과를 어떻게 설명할 수 있을까? 두가지 문제를 발견할 수 있다.
이전 연산의 결과에 따라 다음 함수의 입력이 달라지기 때문에 Iterate 연산을 청크로 분할하기가 어렵다. 리듀싱을 시작하는 시점에 전체 숫자 리스트가 준비되어 있지 않으므로 스트림을 병렬로 처리할 수 있도록 청크로 분할할 수 없다.
결국 순차처리 방식과 크게 다른 점이 없으므로 스레드를 할당하는 오버헤드만 증가하게 된다.
멀티코어 프로세서를 활용해서 효과적으로 합계 연산을 병렬로 실행하려면 어떻게 해야할까?
5장에서 LongStream.rangeClosed라는 메서드를 소개했다. 이 메서드는 iterate에 비해 다음과 같은 장점을 제공한다.
@Benchmark
public long rangeSum(MyState myState){
return LongStream.rangeClosed(1, myState.value)
.reduce(0L, Long::sum);
}
@Benchmark
public long parallelRangeSum(MyState myState){
return LongStream.rangeClosed(1, myState.value)
.parallel()
.reduce(0L, Long::sum);
}
Benchmark Mode Cnt Score Error Units
Sample.iterativeSum avgt 158.391 ms/op
Sample.parallelRangeSum avgt 37.148 ms/op
Sample.rangeSum avgt 163.999 ms/op
Sample.sequentialSum avgt 3345.309 ms/op
드디어 리듀싱 연산이 병렬로 처리되면서 제일 빠른 결과를 내놓았다.
하지만 병렬화가 완전 공짜는 아니라는 사실을 기억하자. 병렬화를 사용하려면 스트림을 재귀적으로 분할해야하고, 각 서브스트림을 서로 다른 스레드의 리듀싱 연산으로 할당하고, 이들 결과를 하나의 값으로 합쳐야 한다. 멀티코어 간의 데이터 이동은 우리 생각보다 비싸다. 따라서 코어 간에 데이터 전송 시간보다 훨씬 오래 걸리는 작업만 병렬로 다른 코어에서 수행하는 것이 바람직하다.
또한, 상황에 따라 쉽게 병렬화를 이용할 수 있거나, 아예 이용할 수 없는 경우도 많다.
병렬 스트림을 잘못 하용하면서 발생하는 많은 문제는 공유된 상태를 바꾸는 알고리즘을 사용하기 때문에 일어난다. 다음은 n까지의 자연수를 더하면서 공유된 누적자를 바꾸는 프로그램 코드다.
public long sideEffectParallelSum(long n){
Accumulator accumulator = new Accumulator();
LongStream.rangeClosed(1,n).parallel().forEach(accumulator::add);
return accumulator.total;
}
public class Accumulator{
public long total = 0;
public void add(long value){total+=value;}
}
위 코드는 본질적으로 순차 실행할 수 있도록 구현되어 있으므로 병렬로 실행하면 참사가 일어난다. 특히 total에 접근할 때마다 데이터 레이스 문제가 일어난다. 동기화로 문제를 해결하다보면 결국 병렬화라는 특성이 없어져 버릴 것이다.
System.out.println(
measurePerf(ParallelStreams::sideEffectParallelSum, 100000L) + "mesecs");
결과는 다음과 같아진다.
Result : 5959989000692
Result : 7425264100768
Result : 6827235020033
...
메서드 성능은 둘째치고, 올바른 결과값이 나오지 않는다. 여러 스레드에서 동시에 누적자, 즉 total+=value를 실행하면서 이런 문제가 발생한다.
우선은 병렬 스트림을 올바로 동작하려면 공유된 가변 상태를 피해야 한다는 사실만 기억하자.
'천개 이상의 요소가 있을 때만 병렬 스트림을 사용하라'와 같이 양을 기준으로 병렬 스트림 사용을 결정하는 것은 적절하지 않다. 정해진 기기에서 정해진 연산을 수행할 때는 이와 같은 기준을 사용할 수 있지만, 상황이 달라지면 이와 같은 기준이 제 역할을 하지 못한다.
확신이 서지 않는다면 (벤치마크로) 직접 측정하라.
박싱을 주의해라. 자동박싱과 언박싱은 성능을 크게 저하시킬 수 있는 요소이다. 되도록이면 기본형 특화 스트림을 사용하는 것이 좋다.
순차스트림보다 병렬 스트림에서 성능이 떨어지는 연산이 있다. 특히 limit나 findFirst처럼 요소의 순서에 의존하는 연산을 병렬 스트림에서 수행하려면 비싼 비용을 치러야 한다.
스트림 전체 파이프라인에서, 하나의 요소를 처리하는데 드는 비용이 높아질수록 병렬 스트림으로 성능을 개선할 수 있는 가능성이 있음을 의미한다.
소량의 데이터에서는 병렬 스트림이 도움이 되지 않는다. (병렬화를 수행하는데 드는 부가 비용 > 이득)
스트림의 특성과 파이프라인의 중간 연산이 스트림의 특성을 어떻게 바꾸는지에 따라 분해과정의 성능이 달라질 수 있습니다. 가령 SIZED 스트림은 정확히 같은 크기의 두 스트림으로 분할할 수 있으므로 효과적으로 스트림을 병렬 처리할 수 있다. 반면 filter의 경우 스트림의 길이를 예측할 수 없으므로, 효과적으로 병렬처리할 수 없을 수 있다.
최종연산의 병합과정 비용을 생각하라. 병합 과정의 비용이 비싸다면 병렬 스트림으로 얻는 성능의 이익이 서브스트리므이 부분 결과를 합치는 과정에서 상쇄될 수 있다.
위 프레임워크는 병렬화할 수 있는 작업을 재귀적으로 작은 작업으로 분할한 다음에 서브태스크 각각의 결과를 합쳐서 전체 결과를 만들도록 설계되었다.
해당 프레임워크에서는 서브태스크를 스레드 풀의 작업자 스레드에 분산 할당하는 ExecutorService 인터페이스를 구현한다.
스레드 풀을 이용하려면 RecursiveTask< R>의 서브클래스를 만들어야 한다. 여기서 R은 병렬화된 태스크가 생성하는 결과 형식 또는 결과가 없을 때(결과가 없더라도 다른 비지역 구조를 바꿀 수 있다)는 RecursiveAction 형식이다. RecursiveAction를 정의하려면 추상 메서드 compute를 구현해야 한다.
protected abastract R compute();
compute 메서드는 태스크를 서브태스크로 분할하는 로직과 더 이상 분할할 수 없을 때 개별 서브태스크의 결과를 생산할 알고리즘을 정의한다. 따라서 대부분의 compute 메서드 구현은 다음과 같은 의사코드 형식을 유지한다.
if(태스크가 충분히 작거나 더 이상 분할할 수 없으면){
순차적으로 태스크 계산
} else{
태스크를 두 서브 태스크로 분할
태스크가 다시 서브태스크로 분할 되도록 이 메서드를 재귀적으로 호출함
모든 서브태스크의 연산이 완료될 때까지 기다림
각 서브태스크의 결과를 합침
}
다음 그림은 재귀적인 테스크 분할 과정을 보여준다.
이 알고리즘은 분할 후 정복 알고리즘의 병렬화 버전이다.
포크/ 조인 프레임워크를 이요해서 병렬 합계를 수행해보자.
public class ForkJoinSumCalculator
extends java.util.concurrent.RecursiveTask<Long>{
private final long[] numbers;
private final int start;
private final int end;
public static final long THRESHOLD = 10000;
public ForkJoinSumCalculator(long[] numbers){
this(numbers, 0, numbers.length);
}
private ForkJoinSumCalculator(long[] numbers, int start, int end){
this.numbers = numbers;
this.start = start;
this.end = end;
}
@Override
protected Long compute(){
int length = end - start;
if(length < THRESHOLD){
return computeSequentially();
}
ForkJoinSumCalculator leftTask =
new ForkJoinSumCalculator(numbers, start, start+length/2);
leftTack.fort();
ForkJoinSumCalculator rightTask =
new ForkJoinSumCalculator(numbers, start + length/2, end);
Long rightResult = rightTask.compute();
Long leftResult = leftTask.join();
return leftResult + rightResult;
}
private long computeSequentially(){
long sum = 0;
for( int i = 0; i<end; i++){
sum+= numbers[i];
}
return sum;
}
}
다음 코드처럼 ForkJoinSumCalculator의 생성자로 원하는 수의 배열을 넘겨줄 수 있다.
public static long forkJoinSum(long n){
long[] numbers = LongStream.rangeClosed(1,n).toArray();
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
return new ForkJoinPool().invoke(task);
ForkJoinSumCalculator를 ForkJoinPool로 전달하면 풀의 스레드가 ForkJoinSumCalculator의 compute 메서드를 실행하면서 작업을 수행한다. compute 메서드는 병렬로 실행할 수 있을 만큼 테스크의 크기가 충분히 작아졌는지 확인하며, 아직 태스크의 크기가 크다고 판단되면 숫자 배열을 반으로 분할해서 두개의 새로운 ForkJoinSumCalculator로 할당한다.
그러면 다시 ForkJoinPool이 새로 생성된 ForkJoinSumCalculator의를 실행한다. 결국 이과정이 재귀적으로 반복되면서 주어진 조건을(예제에서는 덧셈을 수행할 항목이 만개 이하여야함) 만족할 때까지 태스크 분할을 반복한다.
이제 각 서브태스크는 순차적으로 처리되며 포킹 프로세스로 만들어진 이진트리의 태스크를 루트에서 역순으로 방문한다. 즉, 각 서브태스크의 부분 결과를 합쳐서 태스크의 최종 결과를 계산한다. 아래 그림은 위 과정을 보여준다.
메서드 성능은 다음과 같다
ForkJoin sum done in : 41 msecs
병렬 스트림일 때보다 성능이 나빠졌지만, 이는 ForkJoinSumCalculator 태스크에서 사용할 수 있도록 전체 스트림을 long[]으로 변환했기 때문이다.
분할 크기와 관련된 힌트를 얻어보자.
기준값을 바꿔가면서 실험해보는 것 외에 뾰족한 방법은 없다.
위 예제 보다 복잡한 시나리오가 사용되는 현실에서는 각각의 서브테스크의 작업완료 시간이 크게 달라질 수 있다. 분할 기법이 효율적이지 않았기 때문일 수도 있고, 아니면 예기치 않게 디스크 접근 속도가 저하되었거나 외부 서비스와 협력하는 과정에서 지연이 생길 수 있기 때문이다.
이러한 문제를 포크/ 조인 프레임워크에서는 작업 훔치기(work stealing)라는 것을 통해 해결한다. 작업 훔치기 기법은 ForkJoinPool의 모든 스레드를 거의 공정하게 분할한다. 각각의 스레드는 자신에게 할당된 태스크를 포함하는 이중 연결 리스트를 참조하면서 작업이 끝날 때마다 큐의 헤드에서 다른 태스크를 가져와서 작업을 처리한다. 다른 스레드보다 더 빨리 일을 처리한 스레드는 유휴상태로 가는 것이 아니라, 다른 스레드 큐의 꼬리에서 작업을 훔쳐온다. 모든 테스크가 작업을 끝낼 때까지, 즉 모든 큐가 빌때까지 이 과정을 반복한다. 따라서 태스크 크기를 작게 나누어야 작업자 스레드 간의 작업부하를 비슷한 수준으로 유지할 수 있다.
자동으로 스트림을 분할하는 기법이다. (분할할 수 있는 반복자라는 의미이다.)
Iterator처럼 Spliterator는 소스의 요소 탐색 기능을 제공한다는 점에서 같지만, 병렬 작업에 특화되어 있다.
여기서 T는 Spliterator가 탐색하는 요소의 형식이다.
tryAdvance 메서드는 Spliterator의 요소를 하나씩 순차적으로 소비하면서 탐색해야 할 요소가 남아있으면 참을 반환한다.
반면 trySplit메서드는 Spliterator의 일부 요소(자신이 반환한 요소)를 분할해서 두 번째 Spliterator를 생성하는 메서드다.
Spliterator에서는 estimateSize 메서드로 탐색해야 할 요소 수 정보를 제공할 수 있다.
특히 탐색해야 할 요소 수가 정확하진 않더라도 제공된 값을 이용해서 더 쉽고 공평하게 Spliterator를 분할할 수 있다.
스트림을 여러 스트림으로 분할하는 과정은 재귀적으로 일어난다.
이 분할 과정은 characteristics 메서드로 정의하는 Spliterator의 특성에 영향을 받는다.
Spliterator는 characteristics라는 추상 메서드도 정의한다. characteristics 메서드는 Spliterator자체의 특성 집합을 포함하는 int를 반환한다. Spliterator를 이용하는 프로그램은 이들 특성을 참고해서 Spliterator를 더 잘 제어하고 최적화할 수 있다.
지금까지는 Spliterator 인터페이스와 Spliterator가 정의하는 메서드를 살펴봤다. 이번에는 커스텀 Spliterator를 구현해보자.