개발자는 "무엇을 할 것인가"에 집중하고, "어떻게 처리할 것인가"는 Stream API 내부에 맡긴다.
👉 Java 8의 스트림 API는 이러한 문제를 선언형 방식으로 간결하고 효율적으로 해결 가능
stream()
→ parallelStream()
으로 쉽게 병렬화filter
, map
, sorted
, collect
)을 조합해 데이터 처리 파이프라인 구성// 명령형 방식
List<Dish> lowCaloricDishes = new ArrayList<>();
for (Dish d : menu) {
if (d.getCalories() < 400) {
lowCaloricDishes.add(d);
}
}
Collections.sort(lowCaloricDishes, Comparator.comparing(Dish::getCalories));
List<String> names = new ArrayList<>();
for (Dish d : lowCaloricDishes) {
names.add(d.getName());
}
// 선언형 방식 (Java 8 스트림)
List<String> names = menu.stream()
.filter(d -> d.getCalories() < 400)
.sorted(comparing(Dish::getCalories))
.map(Dish::getName)
.collect(toList());
filter
, map
, sorted
)은 새로운 스트림을 반환 → 파이프라인 구성 가능// 외부 반복
for (Dish dish : menu) {
names.add(dish.getName());
}
// 내부 반복
List<String> names = menu.stream()
.map(Dish::getName)
.collect(toList());
👉 내부 반복은 병렬 처리 최적화와 선언형 코드 작성을 가능하게 함
비교 항목 | 컬렉션(Collection) | 스트림(Stream) |
---|---|---|
목적 | 데이터 저장 및 조작 | 데이터 처리(변환, 필터링, 매핑 등) |
데이터 저장 | 가능 | 불가능 |
재사용성 | 가능 | 불가능 (1회 사용 후 폐기) |
동시 사용 | 여러 소비자 가능 | 단일 소비자 전용 |
처리 방식 | 내부 저장 후 전처리 | 필요 시 처리 (지연 계산) |
병렬 처리 | 직접 구현 필요 | parallelStream()으로 병렬 처리 지원 |
무한 데이터 | 불가능 | 가능 (Stream.generate , Stream.iterate ) |
filter
, map
, limit
, distinct
, sorted
collect
, forEach
, count
, reduce
List<String> names = menu.stream()
.filter(d -> d.getCalories() > 300)
.map(Dish::getName)
.limit(3)
.collect(toList());
👉 최종 연산 전까지는 아무 연산도 수행하지 않음
filter
는 Predicate<T>
를 받아 해당 조건을 만족하는 요소들만으로 구성된 새로운 스트림을 반환distinct
는 요소의 equals
와 hashCode
를 기반으로 중복 제거를 수행Predicate 대신 람다 함수를 넣으면 투명하게 관리할 수 있다.
limit
)과 건너뛰기 (skip
)연산 유형 | 상태 없음 (Stateless) | 상태 있음 (Stateful) |
---|---|---|
예시 | map, filter, flatMap | sorted, distinct, limit |
특징 | 각 요소만으로 결과 결정 가능 | 이전 요소 또는 전체 필요 |
병렬처리 유리성 | 매우 유리 | 비효율 가능성 있음 |
IntStream
, DoubleStream
등은 박싱/언박싱 오버헤드를 줄이고 성능을 높인다.mapToInt
, summaryStatistics
, boxed
등 사용int totalCalories = menu.stream()
.mapToInt(Dish::getCalories)
.sum();
여러 방식으로 스트림 생성 가능:
Stream.of(...)
, Arrays.stream(...)
Stream.concat
flatMap
Stream<String> stream1 = Stream.of("A", "B");
Stream<String> stream2 = Stream.of("C", "D");
Stream<String> merged = Stream.concat(stream1, stream2);
limit()
과 함께 사용하지 않으면 무한 실행Stream<Integer> evenNumbers = Stream.iterate(0, n -> n + 2);
Stream<Double> randoms = Stream.generate(Math::random);
collect()
이고, 이를 유연하게 만들어주는 게 바로 Collector이다.커스텀 컬렉터를 개발
가능.collect
메서드는 스트림의 요소들을 원하는 형태로 모아주는 최종 연산. 이때 "어떻게 모을지"를 결정하는 것이 바로 컬렉터(Collector)collect
메서드에 컬렉터를 전달하면, collect
메서드는 내부적으로 리듀싱 연산을 수행하며, 최종 결과를 List, Array, Map, 문자열, 통계 등 다양한 형태로 뽑아낼 수 있다.컬렉터는 이 리듀싱 과정을 유연하게 설계할 수 있게 해준다. 즉, 개발자는 "무엇을 원하는지"만 명확히 명시하면, 스트림은 내부적으로 알아서 각 요소를 누적(축소)해서 최종 결과를 만든다.
reducing()
은 reduce()
를 Collector로 확장한 고급 기능groupingBy()
+ toSet()
같은 조합으로 그룹 내 결과 형태도 조절 가능이걸로 나만의 Collector 가능. 이를 통해 성능 최적화도 가능하다.
Collector 인터페이스 메서드
supplier()
: 결과 컨테이너 생성accumulator()
: 각 요소를 결과에 누적combiner()
: 병렬 처리 시 부분 결과 병합finisher()
: 최종 결과 변환characteristics()
: 병렬 가능 여부, 병합 방식 등 힌트 제공
예시
public Supplier<List<String>> supplier() {
return ArrayList::new; // 빈 ArrayList 생성
}
public BiConsumer<List<String>, String> accumulator() {
return List::add; // 요소를 리스트에 추가
}
public BinaryOperator<List<String>> combiner() {
return (list1, list2) -> {
list1.addAll(list2); // 두 리스트 합치기
return list1;
};
}
public Function<List<String>, List<String>> finisher() {
return Function.identity(); // IDENTITY_FINISH인 경우
}
public Set<Characteristics> characteristics() {
return Set.of(Characteristics.IDENTITY_FINISH); // 변환 없음을 명시
}
reduce()는 스트림 요소를 하나의 결과로 축소하는 가장 단순한 방식이고, Collectors.reducing()은 이를 Collector 형태로 확장한 고급 전략. 그리고 collect()는 다양한 Collector 전략 객체를 활용해 결과를 원하는 구조로 만들어내는 스트림의 핵심 최종 연산
1️⃣ reduce() – 스트림 요소를 하나의 결과로 축소
int sum = numbers.stream().reduce(0, Integer::sum);
2️⃣ Collectors.reducing() – 고급 리듀싱 Collector
// collect(reducing(...))은 reduce(...)와 동일한 동작이지만, Collector 문맥 안에서 사용할 수 있는 확장형
int totalCalories = menu.stream()
.collect(Collectors.reducing(0, Dish::getCalories, Integer::sum));
3️⃣ collect() – Collector 전략을 통해 다양한 결과물 생성
Collector
가 CONCURRENT
설정되어 있다면 병렬성 확보 가능List<String> names = menu.stream()
.map(Dish::getName)
.collect(Collectors.toList());
Map<Type, List<Dish>> dishesByType =
menu.stream().collect(Collectors.groupingBy(Dish::getType));
병렬 스트림은 선언적으로 병렬 처리를 가능하게 해주며, Fork/Join 프레임워크와 Spliterator와 같은 기반 기술을 통해 동작한다. 하지만, 무조건 빠르지 않으며 적절한 상황, 자료구조, 작업 종류에 따라 현명하게 사용하는 것이 중요! 성능을 높이기 위해 병렬화를 쓰는 것이지, 단순히 코드를 짧게 하기 위해 사용하는 것은 아니다.
정리
1. 적절한 경우에만 사용: 100만 개 이상 데이터, CPU 집약적 작업
2. 자료구조 신중 선택: ArrayList
> LinkedList
3. 상태 공유 금지: 순수 함수형 프로그래밍 원칙 준수
4. 성능 측정 필수: 실제 환경에서 벤치마크 진행
5. 병렬 처리 친화적 연산: range()
, filter()
, map()
.parallel()
메서드를 호출하면 간단히 변환할 수 있다.List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
// 순차 처리
long sumSequential = numbers.stream().mapToInt(i -> i).sum();
// 병렬 처리
long sumParallel = numbers.parallelStream().mapToInt(i -> i).sum();
병렬 스트림의 작동 원리는 SI 프로젝트의 업무 분할·병렬 진행·통합 과정에 비유 가능.
데이터 = 작업, 스레드 = 사람
잘못된 사용 사례
Stream.iterate()
처럼 순차적으로 생성되는 스트림은 병렬 처리에 부적합// 병렬 처리가 비효율적인 예 (Stream.iterate 사용)
long badParallelSum = Stream.iterate(1L, i -> i + 1)
.limit(10_000_000)
.parallel()
.reduce(0L, Long::sum);
iterate
는 순차적 생성 → 청크 분할 어려움 => 순차 처리보다 몇 배는 느리다.올바른 사용 사례
// 병렬 처리에 적합한 예 (LongStream.rangeClosed 사용)
long goodParallelSum = LongStream.rangeClosed(1, 10_000_000)
.parallel()
.sum();
실제로는 JMH와 같은 도구로 벤치마크 후 사용하는 것이 가장 정확
공유 상태 수정 금지
// 위험한 예: 공유 변수 사용
class Accumulator {
public long total = 0;
public void add(long value) { total += value; }
}
Accumulator acc = new Accumulator();
LongStream.rangeClosed(1, 10_000).parallel().forEach(acc::add);
total
에 대한 데이터 레이스 발생적절한 자료구조 선택
자료구조 | 병렬 적합성 | 이유 |
---|---|---|
ArrayList | ⭐️⭐️⭐️⭐️⭐️ | 랜덤 액세스, 쉬운 분할 |
LinkedList | ⭐️ | 순차 접근 필요 |
IntStream.range | ⭐️⭐️⭐️⭐️⭐️ | 고정 크기, 예측 가능한 분할 |
1. 기본형 스트림 사용
`// 박싱/언박싱 오버헤드 제거
LongStream.rangeClosed(1, 1_000_000) // 기본형
.parallel()
.sum();`
2. 순서 의존성 피하기
`// 비효율적
list.parallelStream()
.sorted() // 전체 데이터 수집 필요
.forEach(System.out::println);
// 효율적
list.parallelStream()
.unordered() // 순서 무시
.forEach(System.out::println);`
3. 적절한 작업 크기 설정
CPU 집약적 작업
// 이미지 처리
images.parallelStream()
.map(img -> applyFilter(img)) // 고비용 연산
.collect(Collectors.toList());
독립적인 작업
// 웹 페이지 크롤링
urls.parallelStream()
.map(url -> downloadContent(url)) // I/O 작업
.collect(Collectors.toList());
JMH를 이용한 정확한 측정
`@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class MyBenchmark {
@Benchmark
public void testMethod() { // 성능 측정할 코드
}
}`
예시: 1GB 넘는 JSON 로그 파일이 있음 (배열 형태로 수천 개 객체가 들어 있음). 이걸 한 번에 파싱하면 메모리도 터지고, 시간도 오래 걸림.
// RecursiveTask<T>는 결과를 반환하는 병렬 작업 클래스
class JsonParseTask extends RecursiveTask<List<MyData>> {
private List<String> lines; // 파싱 대상인 JSON 문자열 목록
protected List<MyData> compute() {
// 작업 크기가 작다면 그냥 순차적 처리
// Fork/Join은 너무 잘게 쪼개면 오히려 오버헤드가 크다.
// 작업 분할과 병합, 스레드 관리 등 병렬 처리 자체에 드는 부가 비용
if (lines.size() <= 1000) {
return lines.stream().map(JsonUtils::parseLine).toList();
}
// 리스트를 절반으로 나눠서 왼쪽/오른쪽 나눈다.
int mid = lines.size() / 2;
JsonParseTask left = new JsonParseTask(lines.subList(0, mid));
JsonParseTask right = new JsonParseTask(lines.subList(mid, lines.size()));
// 모든 작업을 fork 하면 스레드가 과하게 생성될 수 있음
left.fork(); // 왼쪽 작업 비동기 시작!
List<MyData> rightResult = right.compute(); // 오른쪽은 지금 이 스레드에서 처리
List<MyData> leftResult = left.join(); // 왼쪽 작업이 끝날 때까지 기다림
leftResult.addAll(rightResult);
return leftResult;
}
}
// 1. ForkJoinPool을 명시적으로 설정
public static void main(String[] args) {
List<String> lines = ... // JSON 라인 데이터 준비
// ForkJoinPool을 명시적으로 설정 (예: 4개의 스레드 사용)
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
// JsonParseTask 실행
JsonParseTask task = new JsonParseTask(lines);
List<MyData> result = forkJoinPool.invoke(task); // ForkJoinPool에서 병렬 실행
// 결과 처리
result.forEach(System.out::println);
}
// 2. parallelStream() 사용하기
// - parallelStream() 활용하면 ForkJoinPool을 내부적으로 사용하여 병렬 처리를 쉽게 구현
public static void main(String[] args) {
List<String> lines = ... // JSON 라인 데이터 준비
// parallelStream()을 사용해서 스트림을 병렬 처리
List<MyData> result = lines.parallelStream()
.map(JsonUtils::parseLine)
.collect(Collectors.toList());
// 결과 처리
result.forEach(System.out::println);
}
Spliterator
는 Java Stream이 데이터를 병렬로 쪼갤 수 있도록 지원하는 인터페이스예시: 아주 큰 log 파일 한 줄씩 병렬 처리
// `Spliterator`를 커스터마이징해서 병렬 처리에 필요한 제어를 직접 수행
public class LogFileSpliterator implements Spliterator<String> {
private final BufferedReader reader;
public LogFileSpliterator(BufferedReader reader) {
this.reader = reader;
}
// `Stream`이 데이터를 처리할 때 `BufferedReader`로 한 줄씩 읽어서 `Stream`에 넘긴다.
// 이 메서드는 Stream 내부에서 계속 호출된다.
@Override
public boolean tryAdvance(Consumer<? super String> action) {
try {
String line = reader.readLine();
if (line == null) return false; // 다음 줄이 없다면
action.accept(line);
return true;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
// 병렬 처리를 위해 일을 나눠주는 메서드
// 예시: 현재 reader 상태에서 일정 줄 수(1000줄)만큼 분할해서 새로운 Spliterator 생성
// => 이 덩어리를 Java가 병렬로 처리할 수 있도록 Spliterator로 반환
@Override
public Spliterator<String> trySplit() {
List<String> chunk = new ArrayList<>();
try {
for (int i = 0; i < 1000; i++) {
String line = reader.readLine();
if (line == null) break;
chunk.add(line);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
if (chunk.isEmpty()) return null;
return chunk.spliterator(); // chunk는 Stream에서 병렬 분할 가능
}
// 대충. `Stream`이 내부적으로 "데이터가 얼마나 있지?"를 예측할 수 있도록 힌트
// => Stream이 적절히 분할(split) 하려면 대략적인 데이터 양을 알아야 효율적으로 스레드를 배분할 수 있다.
@Override
public long estimateSize() {
return Long.MAX_VALUE;
}
@Override
public int characteristics() {
return ORDERED | NONNULL;
}
}
// 호출
BufferedReader reader = Files.newBufferedReader(Paths.get("large-log.txt"));
Stream<String> lines = StreamSupport.stream(new LogFileSpliterator(reader), true); // 병렬 처리
lines.filter(line -> line.contains("ERROR"))
.forEach(System.out::println);