컬렉션에 parallelStream
을 호출하면 병렬 스트림
이 생성된다. 병렬 스트림
이란 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림을 말한다. parrallel()
이나 sequential()
로도 병렬 혹은 순차 스트림으로 중간에 변경이 가능하지만, 이는 내부적으로 병렬 혹은 순차 여부를 나타내는 boolean
값만 변경해주는데 의미가 있다.
전체적인 동작의 병렬 혹은 순차 여부는 마지막에 호출되는 스트림 동작 방식에 의해 결정된다. 다음 코드들을 통해 이해해보자.
stream.parallel()
.filter(...)
.sequential()
.map(...)
.parallel()
.reduce();
parallel()
이므로 이 코드는 병렬로 실행된다.private static void streamSequential() {
getTime(t -> {
return t.reduce(0L, Long::sum);
}, Stream.iterate(1L, i -> i + 1).limit(N), "streamSequential()");
}
private static void streamParallel() {
getTime(t -> {
return t.reduce(0L, Long::sum);
}, Stream.iterate(1L, i -> i + 1).limit(N).parallel(), "streamParallel()");
}
private static void longStreamSequential() {
getTime(t -> {
return t.reduce(0L, Long::sum);
}, LongStream.rangeClosed(1, N), "longStreamSequential()");
}
private static void longStreamParallel() {
getTime(t -> {
return t.reduce(0L, Long::sum);
}, LongStream.rangeClosed(1, N).parallel(), "longStreamParallel()");
}
private static <T, R> void getTime(Function<T, R> function, T t, String msg) {
System.out.print(msg + " = [");
long startTime = System.currentTimeMillis();
function.apply(t);
long endTIme = System.currentTimeMillis();
System.out.println(endTIme - startTime + "ms]");
}
streamParallel() = [604ms]
streamSequential() = [192ms]
longStreamParallel() = [33ms]
longStreamSequential() = [32ms]
위 Stream의 결과를 보면 알 수 있듯, 항상 병렬 처리가 더 빠른 것만은 아니다. Parralel
보다 Sequential
이 더 빠른 성능을 보인 점은, 주어진 Stream.iterate가 병렬 처리를 위해 청크 단위로 분할하기가 어렵기 때문이다.
반면 LongStream
을 사용한 stream이 훨씬 빠른 성능을 보였는데, 그 이유는 박싱과 언박싱 오버헤드가 사라졌기 때문이다. (LongStream
은 primitive type인 long을 직접 사용한다.) 그리고 rangeClosed
는 쉽게 청크로 분할할 수 있는 숫자 범위를 생산해주기 때문에 병렬 처리를 위한 독립적인 청크를 효과적으로 생성할 수 있었기 때문이다.
limit
, findFirst
같이 요소의 순서에 의존하는 연산을 수행하기 위해서는 비싼 비용을 치러야만 한다. 반면 findAny
는 요소의 순서와 상관없으므로 성능이 좋다.ArrayList
는 LinkedList
보다 더 효율적인데, 그 이유는 LinkedList
같은 경우, 분할하기 위해 모든 요소를 탐색해야만 하기 때문이다.포크 조인 프레임워크는 작업 훔치기
라는 기법을 통해 모든 스레드를 거의 공정하게 분할하고 각각의 스레드가 유휴 상태에 빠지지 않게 다른 스레드의 큐에 있는 작업을 훔쳐와서 작업을 수행
한다. 모든 태스크가 작업을 끝날 때 까지 이 과정을 반복한다.
포크 조인 프레임워크를 사용하기 위해서는 이미 구현되어 있는 추상클래스 RecursiveAction
나 RecursiveTask
를 사용하면 된다.
RecursiveAction
: 작업의 결과를 반환하지 않는다.RecursiveTask
: 작업의 결과를 반환한다.public class ForkJoinSumCalculator extends RecursiveTask<Long>{
private final long[] numbers;
private final int start;
private final int end;
public static final long THRESHOLD = 500;
public ForkJoinSumCalculator(long[] numbers) {
this(0, numbers.length, numbers);
}
private ForkJoinSumCalculator(int start, int end, long[] numbers) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
int length = end - start;
if(length <= THRESHOLD) {
return computeSequentially();
}
ForkJoinSumCalculator f1 = new ForkJoinSumCalculator(start, start + length / 2, numbers);
ForkJoinSumCalculator f2 = new ForkJoinSumCalculator(start + length / 2, end, numbers);
f1.fork();
Long rightResult = f2.compute();
Long leftResult = f1.join();
return leftResult + rightResult;
}
private long computeSequentially() {
long sum = 0;
for(int i = start ; i < end ; i++) sum += numbers[i];
return sum;
}
}
join
메서드를 태스크에 호출하면 태스크의 결과가 리턴될 때 까지 블록된다. 따라서 join
은 두 서브태스크가 모두 시작된 다음에 호출해야 한다. 그렇지 않으면 서브태스크가 다른 태스크가 끝날때까지 기다리는 일이 발생하면서 더 느려질 수 있다.RecursiveTask
내에서는 invoke
메서드를 사용하면 안된다. 대신 compute
나 fork
메서드를 직접 호출할 수 있다. 순차 코드에서 병렬 계산을 시작할 때만 invoke
를 사용한다.fork
를 호출하는게 자연스럽지만, 같은 스레드를 재사용하기 위해 한쪽에는 fork
, 다른 한쪽에는 compute
를 호출하는 것이 스레드 사용 효율면에서 효율적이다.fork
라 불리는 다른 스레드에서 compute
를 호출하기 때문에 스택 트레이스가 도움이 되지 않기 때문이다.Spliterator
는 분할할 수 있는 반복자
라는 의미이다. Iterator 처럼 소스의 요소 탐색 기능을 제공한다는 점은 같지만 Spliterator
는 병렬 작업에 특화되어 있다.
public interface Spliterator<T> {
boolean tryAdvance(Consumer<? super T> action);
Spliterator<T> trySplit();
long estimateSize();
int characteristics();
}
tryAdvance
: Spliterator의 요소를 하나씩 순차적으로 소비하면서 탐색해야 할 요소가 남아있으면 TRUE
을 반환한다.trySplit
: Spliterator의 일부 요소를 분할해서 두 번째 Spliterator를 생성한다. 더 이상 분할할 요소가 없어서null
이 반환될 때 까지 진행된다.estimateSize
: 메서드로 탐색해야 할 요소 수 정보를 제공한다.characteristics
: Spliterator 자체의 특성 집합을 포함하는 int를 반환한다.public class SumSpliterator implements Spliterator<Integer>{
private final Integer[] numbers;
private final int THRESHOLD = 10;
private int index = 0;
public SumSpliterator(Integer[] numbers) {
this.numbers = numbers;
}
@Override
public boolean tryAdvance(Consumer<? super Integer> action) {
action.accept(numbers[index++]);
return index < numbers.length;
}
@Override
public Spliterator<Integer> trySplit() {
int length = numbers.length - index;
if(length <= THRESHOLD) {
return null;
}
Integer[] newNumbers = Arrays.copyOfRange(numbers, index, Math.min(numbers.length, index + THRESHOLD));
this.index = Math.min(numbers.length, index + THRESHOLD);
return new SumSpliterator(newNumbers);
}
@Override
public long estimateSize() {
return numbers.length - index;
}
@Override
public int characteristics() {
return SIZED + NONNULL + IMMUTABLE;
}
}
public static void main(String[] args) {
Integer[] list = IntStream.rangeClosed(0, 100).boxed().toArray(Integer[]::new);
Spliterator<Integer> spliterator = new SumSpliterator(list);
Stream<Integer> stream = StreamSupport.stream(spliterator, true);
System.out.println(stream.reduce(0, Integer::sum, Integer::sum)); // reduce(identity, accumulator, combiner)
}