Java 재활 훈련 13일차 - Stream

0

java

목록 보기
13/18

Stream 요소 처리

Java8부터 collection 및 배열의 요소를 반복 처리하기 위해 Stream을 사용할 수 있다. stream은 요소들이 하나씩 흘러가면서 처리된다는 의미를 가지고 있다. List collection에서 요소를 반복 처리하기 위해 stream을 사용하면 다음과 같다.

Stream<String> stream = list.stream();
stream.forEach(item -> // iterm처리 );

List collection의 stream 메서드로 Stream 객체를 얻고, forEach 메서드로 요소를 어떻게 처리 할지를 람다식으로 제공한다. 아래는 Set collection의 요소를 하나씩 읽고 출력하기 위해 stream을 사용한다.

  • Main
public class Main {
    public static void main(String[] args) {
        Set<String> set = new HashSet<>();
        set.add("Hong");
        set.add("Shin");
        set.add("Kim");

        Stream<String> stream = set.stream();
        stream.forEach(name -> System.out.println(name)); // Hong Shin Kim
    }
}

StreamIterator와 비슷한 반복자이지만, 다음과 같은 차이점이 있다.
1. 내부 반복자이므로 처리 속도가 빠르고 병렬 처리에 효율적이다.
2. 람다식으로 다양한 요소 처리를 정의할 수 있다.
3. 중간 처리와 최종 처리를 수행하도록 파이프 라인을 형성할 수 있다.

내부 반복자

for문과 Iterator는 collection의 요소를 collection 밖으로 가여와서 처리하는 방식이다. 이를 외부 반복자라고 하는데, 다음과 같다.

[데이터 처리]           [데이터 저장소]
개발자 코드             Collection
    -------next()-----> element1
                       /
                      /
element1 처리  <------
    \
     \
      ------next()----> element2
                       /
                      /
element2 처리  <------

    ...                 ...

반면 stream은 요소 처리 방법을 collection 내부로 주입시켜서 요소를 반복 처리하는데, 이를 내부 반복자라고한다.

개발자 코드                collection
    |                          |
    | ----데이터 처리----> element1 처리
    |                          |
    |                     element2 처리
    |                       ...
    v

내부 반복자의 경우 개발자 코드에서 제공한 데이터 처리 코드를 가지고 collection 내부에서 요소(element)를 반복 처리한다.

내부 반복자는 multi-core CPU를 최대한 활용하기 위해서 요소들을 분배시켜 병렬 작업을 할 수 있다. 하나씩 처리하는 순차적 외부 반복자보다는 효율적으로 요소를 만복시킬 수 있다는 장점이 있다.

개발자 코드                     collection
-------------------------------------------------
    |                        core1           core2
    |                          |               |
    | ----데이터 처리----> element1 처리     element1 처리
    |                          |               |
    |                     element2 처리     element2 처리
    |                       ...                ...
    v

아래는 List collection 내부 반복자를 이용해서 병렬 처리하는 방법을 보여준다. parallelStream 메서드로 병렬 처리 스트림을 얻고, forEach 메서드를 호출할 때, 요소 처리 방법인 람다식을 제공한다. 람다식은 처리되는 요소가 무엇이고, 어떤 thread가 처리하는 지를 출력한다.

  • Main
public class Main {
    public static void main(String[] args) {
        List<String> list = new ArrayList<>();
        list.add("Hong");
        list.add("Shin");
        list.add("Kim");
        list.add("Ram");
        list.add("Park");

        Stream<String> parallelStream = list.parallelStream();
        parallelStream.forEach(name -> {
            System.out.println("Thread name: " + Thread.currentThread().getName() + ", elem: " + name);
        });
    }
}

결과는 아래와 같다.

Thread name: ForkJoinPool.commonPool-worker-2, elem: Hong
Thread name: ForkJoinPool.commonPool-worker-3, elem: Park
Thread name: ForkJoinPool.commonPool-worker-1, elem: Shin
Thread name: main, elem: Kim
Thread name: ForkJoinPool.commonPool-worker-4, elem: Ram

각 element마다 core에 할당된 thread들이 병렬적으로 순회하여 처리하는 것을 볼 수 있다.

중간 처리와 최종 처리

Stream은 하나 이상 연결될 수 있다. 아래는 collection의 original stream 뒤에 filtering 동작을 하는 중간 Stream이 연결될 수 있고, 그 뒤에 매핑 중간 역할을 하는 중간 stream이 연결될 수 있다. 이와 같이 stream이 연결되어 있는 것을 stream pipeline이라고 한다.

------------------                     [            중간 처리           ][  최종 처리  ]
|collection/Array|---Original Stream---Filtering Stream---Mapping Stream---Aggregation--->Result
------------------

아래는 Student object를 요소로 가지는 collection에서 Student stream을 얻고, 중간 처리를 통해 score stream으로 변환한 후 최종 집계 처리로 score 평균을 구하는 과정을 나타낸 것이다.

-----------------
|List Collection|
|    Student1   |                       [중간 처리] [최종 처리] 
|    Student2   |---->Student1--->(변환) Score ----> 평균 계산 ---> 평균 점수
|      ...      |[Student Stream]      [Score Stream]
|    StudentN   |
-----------------

code로 표현하면 다음과 같다.

// student stream
Stream<Student> studentStream = list.stream();
// score stream
IntStream scoreStream = studentStream.mapToInt(student -> student.getScore());
// 평균 계산
double avg = scoreStream.average().getAsDouble();

mapToInt 메서드를 통해서 객체를 int값으로 매핑해 IntStream으로 변환시킨 것이다. 어떤 객체를 어떤 int 값으로 매핑할 것인지는 람다식으로 제공해야한다. student -> student.getScoreStudent 객체를 getScore의 반환값으로 매핑한다. IntStream은 최종 처리를 위해 다양한 메서드를 제공하는데, average메서드는 요소들의 평균 값을 계산한다.

메서드 체이닝 패턴을 이용하면 앞의 코드를 다음과 같이 간결하게 작성할 수 있다.

double avg = list.stream().mapToInt(student -> student.getScore()).average().getAsDouble();

stream 파이프라인으로 구성할 때 주의할 점은 파이프라인의 맨 끝에는 반드시 최종 처리 부분이 있어야 한다는 것이다. 최종 처리가 없다면 오리지널 및 중간 처리 스트림은 동작하지 않는다. 즉, 위 코드에서 average 이하를 생략하면 stream, mapToInt는 동작하지 않는다.

  • Student
public class Student {
    private String name;
    private int score;

    public Student (String name, int score) {
        this.name = name;
        this.score = score;
    }

    public String getName() { return name; }
    public int getScore() { return score; }
}
  • Main
public class Main {
    public static void main(String[] args) {
        List<Student> list = Arrays.asList(
                new Student("Hong", 10),
                new Student("Shin", 20),
                new Student("You", 30)
        );

        double avg = list.stream().mapToInt(student -> student.getScore()).average().getAsDouble();
        System.out.println("Average Score: "+ avg); // Average Score: 20.0
    }
}

resource로부터 stream 얻기

다음은 BaseStream interface를 부모로 한 자식 인터페이스들은 다음과 같은 상속 관계를 이루고 있다.

            BaseStream
                ^
                |
    ---------------------------------
    |       |           |           |
Stream  IntStream   LongStream  DoubleStream

BaseStream에는 모든 스트림에서 사용할 수 있는 공통 메서드들이 정의되어 있다. Stream은 객체 요소를 처리하는 stream이고, IntStream, LongStream, DoubleStream은 각각 기본 타입인 int, long, double 요소를 처리하는 스트림이다.

이 스트림 인터페이스들의 구현 객체는 다양한 resource들로 부터 얻을 수 있다. 주로 collection과 배열에서 얻지만 int, long, directroy, text file, random 수 등등 다양한 resource로부터 stream 구현 객체를 얻을 수 있다.

아래는 List<Product> collection에서 Product stream을 얻는 방법을 보여준다.

  • Product
public class Product {
    public int pno;
    public String name;
    public String company;
    public int price;

    public Product(int pno, String name, String company, int price) {
        this.pno = pno;
        this.name = name;
        this.company = company;
        this.price = price;
    }

    @Override
    public String toString() {
        return "Product{" +
                "pno=" + pno +
                ", name='" + name + '\'' +
                ", company='" + company + '\'' +
                ", price=" + price +
                '}';
    }
}
  • Main
public class Main {
    public static void main(String[] args) {
        List<Product> list = new ArrayList<>();
        for(int i = 1; i <= 5; i++) {
            Product product = new Product(
                    i, "product " + i,
                    "company",
                    (int) (10000*Math.random())
            );

            list.add(product);
        }

        Stream<Product> stream = list.stream();
        stream.forEach(p -> System.out.println(p));
    }
}

위는 collection으로부터 stream을 받는 경우이다.

결과는 아래와 같다.

Product{pno=1, name='product 1', company='company', price=407}
Product{pno=2, name='product 2', company='company', price=2724}
Product{pno=3, name='product 3', company='company', price=4265}
Product{pno=4, name='product 4', company='company', price=9385}
Product{pno=5, name='product 5', company='company', price=3225}

배열로부터 stream을 얻어보도록 하자. Arrays class를 이용하면 다양한 종류의 배열로부터 stream을 얻을 수 있다. 다음은 문자열 배열과 정수 배열로부터 stream을 얻는 방법을 보여준다.

  • Main
public class Main {
    public static void main(String[] args) {
        String[] strArray = {"Hong", "Shin", "Kim"};
        Stream<String> strStream = Arrays.stream(strArray);
        strStream.forEach(item -> System.out.println(item + ", ")); // Hong, Shin, Kim, 
        System.out.println();

        int[] intArray = {1,2,3,4,5};
        IntStream intStream = Arrays.stream(intArray);
        intStream.forEach(item -> System.out.println(item + ", ")); // 1, 2, 3, 4, 5,
        System.out.println();
    }
}

숫자 범위로부터도 stream을 얻을 수 있다. IntStream, LongStream의 정적 메서드인 range()rangeClosed() 메서드를 이용하면 특정 범위의 정수 stream을 얻을 수 있다. 첫 번째 매개값은 시작 수이고, 두 번째 매개값은 끝 수인데, 끝 수를 포함하지 않으면 range(), 포함하면 rangeClosed()를 사용한다.

  • Main
public class Main {
    public static int sum;

    public static void main(String[] args) {
        IntStream stream = IntStream.rangeClosed(1, 100);
        stream.forEach(a -> sum += a);
        System.out.println("total: " + sum); // total: 5050
    }
}

file로부터도 stream을 얻을 수 있다. Fileslines() 메서드를 이용하면 텍스트 파일의 행 단위 stream을 얻을 수 있다. 이는 텍스트 파일에서 한 행씩 읽고 처리할 때 유용하게 사용할 수 있다.

다음은 data.txt 파일이 있고, 한 행에 하나씩 상품에 대한 정보를 담고 있다.

  • data.txt
{"pno": 1, "name": "product1", "company": "company1", "price": 1558}
{"pno": 2, "name": "product2", "company": "company1", "price": 4671}
{"pno": 3, "name": "product3", "company": "company1", "price": 470}
{"pno": 4, "name": "product4", "company": "company1", "price": 9584}
{"pno": 5, "name": "product5", "company": "company1", "price": 6868}

data.txt 파일을 Main.java 파일 옆에 두도록 하자. 아래는 file을 읽고 stream으로 처리하는 것을 보여준다.

public class Main {
    public static void main(String[] args) throws Exception {
        Path path = Paths.get(Main.class.getResource("data.txt").toURI());
        Stream<String> stream = Files.lines(path, Charset.defaultCharset());

        stream.forEach(line -> System.out.println(line));
        stream.close();
    }
}

결과는 아래와 같다.

{"pno": 1, "name": "product1", "company": "company1", "price": 1558}
{"pno": 2, "name": "product2", "company": "company1", "price": 4671}
{"pno": 3, "name": "product3", "company": "company1", "price": 470}
{"pno": 4, "name": "product4", "company": "company1", "price": 9584}
{"pno": 5, "name": "product5", "company": "company1", "price": 6868}

Filtering

filtering은 stream에서 요소를 걸러내는 중간 처리 기능이다. filtering method에는 다음과 같이 distinctfilter가 있다.

  1. distinct(): 중복 제거
  2. filter(): 조건 필터링, 매개 타입은 요소 타입에 따른 함수형 인터페이스이므로 람다식으로 작성 가능

distinct 메서드는 stream 요소의 중복을 제거한다. 객체 stream인 경우 equals 메서드의 반환값이 true이면 동일한 요소로 판단한다. IntStream, LongStream, DoubleStream은 같은 값일 경우 중복을 제거한다.

---original stream----                 ----new stream----
|        B A B A     |---distinct()--->|    B   A   ->
----------------------                 ------------------

filter는 매개변수로 주어진 Predicatetrue를 반환하는 요소만 filtering에 통과한다.

---original stream----      filter()         ----new stream----
|        B A B A     |----  A -> true    --->| C A --->
----------------------      B -> false       ------------------
                            C -> true

Predicate는 함수형 interface로 다음과 같은 종류가 있다.

interface추상 메서드설명
Predicateboolean test(T t)객체 T를 조사
IntPredicateboolean test(int value)int값을 조사
LongPredicateboolean test(long value)long값을 조사
DoublePredicateboolean test(double value)double값을 조사

모든 Predicate는 매개값을 조사한 후 boolean을 반환하는 test() 메서드를 가지고 있다.

                -------
--- 매개값 ---> | test | --- boolean -->
                -------

Predicate<T>을 람다식으로 표현하면 다음과 같다.

T -> { ... return true }
또는
T -> true

아래의 예제는 List에서 중복된 이름을 제거하고 출력한다. 이어서 성이 'Shin'인 이름만 필터링해서 출력한다.

public class Main {
    public static void main(String[] args) throws Exception {
        List<String> list = new ArrayList<>();
        list.add("Hong gildong");
        list.add("Shin younggweon");
        list.add("Shin younggweon");
        list.add("Kim java");
        list.add("Shin java");

        list.stream().distinct().forEach(n -> System.out.println(n));
        System.out.println();

        list.stream().filter(n -> n.startsWith("Shin")).forEach(n -> System.out.println(n));
    }
}

결과는 다음과 같다.

Hong gildong
Shin younggweon
Kim java
Shin java

Shin younggweon
Shin younggweon
Shin java

중복된 값들이 제거된 결과와 그 이후에 Shin 성을 가진 사람들만 filtering하였다.

요소 변환(매핑)

mapping은 stream의 요소를 다른 요소로 변환하는 중간 처리가 가능하다. mapping method는 mapXxx(), asDoubleStream(), asLongStream(), boxed(), flatMapXxx() 등이 있다.

mapXxx 메서드는 요소를 다른 요소로 변환한 새로운 stream을 반환한다. 다음 그림처럼 원래 stream의 A요소는 C요소로, B요소는 D요소로 변환해서 C, D요소를 가지는 새로운 stream을 생성한다.

---original stream----    A --> C     ----new stream----
|        B A         |--- B --> D --->|  C  D --->
----------------------                ------------------

maxXxx 메서드들의 종류가 타입 별로 굉장히 많지만 하나만 알면 모두 쓸 수 있다.
1. map(Function<T, R>): T -> R로 변환한다.

매개타입인 Function은 함수형 인터페이스로 다양한 종류들이 있지만, 이것 또한 하나만 알면 된다.
1. Function<T, R>: R apply(T t)를 추상 메서드로 가지고 있으며 매개변수 T를 받아서 반환 타입인 R로 변환한다.

다음은 Student stream을 score stream으로 변환하고 점수를 console에 반환하는 것이다.

  • Student
public class Student {
    private String name;
    private int score;

    public Student (String name, int score) {
        this.name = name;
        this.score = score;
    }

    public String getName() { return name; }
    public int getScore() { return score; }
}
  • Main
public class Main {
    public static void main(String[] args) {
        List<Student> studentList = new ArrayList<Student>();
        studentList.add(new Student("Hone", 85));
        studentList.add(new Student("Hone", 92));
        studentList.add(new Student("Hone", 87));

        studentList.stream().map(student -> student.getScore()).forEach(score -> System.out.println(score));
        // 85 92 87
    }
}

Student라는 객체를 받은 stream이 map을 거치면서 요소들이 Interger 타입을 가진 stream으로 변환된 것이다.

만약, 기본 타입 간의 변환 이거나 기본 타입 요소(int, float, double 등)를 wrapper 객체 요소로 변환하려면 다음과 같은 간편화된 메서드를 사용할 수도 있다.

메서드설명
LongStream asLongStreamint -> long
DoubleStream asDoubleStreamint -> double, long -> double
Stream boxed()int -> Interger
Stream boxed()long -> Long
Stream boxed()double -> Double

다음은 정수 스트림을 실수 스트림으로 변환하고, 기본 타입 스트림을 wrapper 스트림으로 변환하는 방법을 보여준다.

public class Main {
    public static void main(String[] args) {
        int[]  intArray = {1,2,3,4,5};

        IntStream intStream = Arrays.stream(intArray);
        intStream.asDoubleStream().forEach(d -> System.out.println(d));

        System.out.println();

        intStream = Arrays.stream(intArray);
        intStream.boxed().forEach(obj -> System.out.println(obj.intValue()));
    }
}

결과는 다음과 같다.

1.0
2.0
3.0
4.0
5.0

1
2
3
4
5

flatMap 메서드는 하나의 요소를 복수 개의 요소들로 변환하는 새로운 stream을 만든다. 더불어 map처럼 타입도 바꿔줄 수 있다.

---original stream----    A --> A1, A2       ----new stream----
|        B A         |--- B --> B1, B2 --->  |  B1, B2, A1, A2
----------------------                       ------------------

위의 예제처럼 하나의 요소인 A를 두 개로 쪼개어 A1, A2 이렇게 만들어줄 수 있다.

다양한 flatMap 메서드 종류가 있지만, 하나만 알면 된다.
1. Stream<R> flatMap(Function<T, Stream<>R>): T -> Stream<R> 로 변환하는 것이다.

반환 타입이 Stream인 것에 유의하자.

아래의 예제는 문장 stream을 단어 stream으로 바꾸고, 문자열 숫자 목록 stream을 숫자 stream으로 변환한다.

  • Main
public class Main {
    public static void main(String[] args) {
        List<String> list1 = new ArrayList<String>();
        list1.add("this is java");
        list1.add("i am a best developer");

        list1.stream().flatMap(data -> Arrays.stream(data.split(" ")))
                .forEach(word -> System.out.println(word));

        System.out.println();

        List<String> list2 = Arrays.asList("10, 20, 30", "40, 50");
        list2.stream().flatMapToInt(data -> {
            String[] strArr = data.split(",");
            int[] intArr = new int[strArr.length];
            for(int i =0; i< strArr.length; i++) {
                intArr[i] = Integer.parseInt(strArr[i].trim());
            }
            return Arrays.stream(intArr);
        }).forEach(number -> System.out.println(number));
    }
}

첫번째 flatMap은 주어진 문자열에 대해서 split을 진행한 후에 단어 단위의 stream을 만들어낸 것이다. 두번째 flatMapToInt는 문자열로 된 배열을 분할하여 정수값으로 바꿔주고 IntStream으로 바꿔주는 것이다.

결과는 다음과 같다.

this
is
java
i
am
a
best
developer

10
20
30
40
50

요소 정렬

정렬은 요소를 오름차순, 내림차순으로 정렬하는 중간 처리 기능이다.

  1. Stream<T> sorted(): Comparable 요소를 정렬한 새로운 stream 생성
  2. Stream<T> sorted(Comparator<T>): 요소를 Comparator에 따라 정렬한 새 stream 생성

stream의 요소가 객체일 경우, 객체가 Comparable을 구현하고 있어야만, sorted 메서드를 사용하여 정렬할 수 있다. 그렇지 않다면 ClassCastException이 발생한다.

만약 내림차순으로 정렬하고 싶다면 다음과 같이 Comparator.reverseOrder() 메서드가 리턴하는 Comparator를 매개값으로 제공하면 된다.

Stream<Xxx> reverseOrderedStream = stream.sorted(Comparator.reverseOrder());

다음은 Student stream을 score기준으로 오름차순 또는 내림차순으로 정렬한 새로운 Student stream을 생성하는 방법을 보여준다. 정렬을 하기 위해 Student class가 Comparable을 구현하고 있는 것을 볼 수 있다.

  • Student
public class Student implements Comparable<Student> {
    private String name;
    private int score;

    public Student (String name, int score) {
        this.name = name;
        this.score = score;
    }

    public String getName() { return name; }
    public int getScore() { return score; }

    @Override
    public int compareTo(Student student) {
        return Integer.compare(score, student.score);
    }
}
  • Main
public class Main {
    public static void main(String[] args) {
        List<Student> studentList = new ArrayList<>();
        studentList.add(new Student("Hong", 30));
        studentList.add(new Student("Shin", 10));
        studentList.add(new Student("You", 20));

        studentList.stream().sorted().forEach(s -> System.out.println(s.getName() + ": " + s.getScore()));
        System.out.println();

        studentList.stream().sorted(Comparator.reverseOrder()).forEach(
                s -> System.out.println(s.getName() + ": " + s.getScore())
        );
    }
}

아래와 같이 오름차순으로 정렬한 결과와 내림차순으로 정렬한 결과가 나온다.

Shin: 10
You: 20
Hong: 30

Hong: 30
You: 20
Shin: 10

만약 요소 객체가 Comparable을 구현하고 있지 않다면, Comparator를 제공하면 요소를 정렬시킬 수 있다. ComparatorComparator interface를 구현한 객체를 말한다.

sorted((o1, o2) -> { ... });

아래는 sortedComparator를 제공하여 정렬을 제공하는 모습이다.

public class Main {
    public static void main(String[] args) {
        List<Student> studentList = new ArrayList<>();
        studentList.add(new Student("Hong", 30));
        studentList.add(new Student("Shin", 10));
        studentList.add(new Student("You", 20));

        studentList.stream().sorted((s1, s2) -> Integer.compare(s1.getScore(), s2.getScore())).forEach(
                s -> System.out.println(s.getName()  + ": " + s.getScore())
        );
    }
}

sorted에 람다식으로 Comparator를 정의해주면 된다.

결과는 아래와 같다.

Shin: 10
You: 20
Hong: 30

요소 조건 만족 여부(매칭)

매칭은 요소들이 특정 조건에 만족하는 지 여부를 조사하는 최종 처리 기능이다. 매칭과 관련된 메서드들은 다음과 같다.

  1. boolean allMatch(Predicate<T> predicate): 모든 요소가 만족하는 지 여부
  2. boolean anyMatch(Predicate<T> predicate): 최소한 하나의 요소가 만족하는 지 여부
  3. boolean noneMatch(Predicate<T> predicate): 모든 요소가 만족하지 않는 지 여부

3 메서드 모두 주어진 Predicate가 반환하는 값에 따라 true 또는 false를 반환한다. 가령 allMatch는 모든 요소의 Predicatetrue를 반환해야만 true를 반환한다.

아래 예제는 정수 스트림에서 모든 요소가 2의 배수인지, 하나라도 3의 배수가 존재하는 지, 또는 모든 요소가 3의 배수가 아닌 지를 조사한다.

  • Main
public class Main {
    public static void main(String[] args) {
        int[] intArr = {2,4,6};

        boolean res = Arrays.stream(intArr).allMatch(a -> a%2 ==0);
        System.out.println("모두 2의 배수인가: " + res);

        res = Arrays.stream(intArr).anyMatch(a -> a%3 == 0);
        System.out.println("하나라도 3의 배수가 있는가: " + res);

        res = Arrays.stream(intArr).noneMatch(a -> a%3 == 0);
        System.out.println("3의 배수가 없는가: " + res);
    }
}

결과는 다음과 같다.

모두 2의 배수인가: true
하나라도 3의 배수가 있는가: true
3의 배수가 없는가: false

요소 집계 함수

stream은 sum, average, count, max, min 등을 제공하지만, 집계 결과물을 만들 수 있도록 reduce 메서드를 제공한다.

int sum = stream.reduce((a,b) -> a+b).getAsInt(); // 요소들의 합
int sum = stream.reduce(0,(a,b) -> a+b); // stream에 요소들이 없을 경우 0 반환

두번째 사용 예는 첫번째 인자가 초기값 역할을 하는 것이다. 첫번째 경우에 요소가 없으면 NoSuchElementException을 발생시킨다.

참고로 (a, b)에서 a가 누적값이고, b가 새로운 값이다.

92 95 88

이렇게 있으면 맨 처음에 a는 default로 0이고, b는 92이다.

a(누적합)       b
0              92
92             95
187            88

결과로 275가 나온다.

reduce 메서드는 n개의 요소들을 받아서 1개의 결과를 만들어준다는 점에 유념하도록 하자.

다음은 기본 집계 메서드인 sum과 동일한 결과를 산출하는 reduce 메서드 사용 방법을 알려준다.

  • Student
public class Student {
    private String name;
    private int score;

    public Student (String name, int score) {
        this.name = name;
        this.score = score;
    }

    public String getName() { return name; }
    public int getScore() { return score; }
}
  • Main
public class Main {
    public static void main(String[] args) {
        List<Student> studentList = Arrays.asList(
                new Student("Hong", 92),
                new Student("Shin", 95),
                new Student("Kim", 88)
        );

        // method1
        int sum1 = studentList.stream().mapToInt(s -> s.getScore()).sum();

        // reduce method
        int sum2 = studentList.stream().map(s -> s.getScore()).reduce(0, (a,b) -> a+b);
        System.out.println("sum1: " + sum1); // sum1: 275
        System.out.println("sum2: " + sum2); // sum2: 275
    }
}

요소 수집

stream은 요소들을 filtering 또는 mapping한 후 요소들을 수집하는 최종 처리 메서드인 collect를 제공한다. 이 method를 이용하면 필요한 요소만 collection에 담을 수 있고, 요소들을 그룹핑한 후에 집계도 할 수 있다.

collect(Collector<T,A,R> collector) 메서드는 필터링 또는 매핑된 요소들을 새로운 컬렉션에 수집하고, 이 컬렉션을 리턴한다. 매개값인 Collector는 어떤 요소를 어떤 컬렉션에 수집할 것인지를 결정한다.

T는 요소, A는 누적기(Accumulator), R은 요소가 저장될 collection이다. 즉, T 요소를 A 누적기가 R에 저장한다는 의미이다.

Collectors 클래스에는 toList, toSet, toMap과 같은 정적 메서드를 제공하여 T요소를 어떤 collection에 저장할 지 결정한다. List, Set, Map에 저장할 때는 A인 누적기가 필요없기 때문에 사용되지 않는다.

다음은 Student stream에서 남학생만 필터링해서 별도의 List로 생성하는 코드이다.

List<Student> maleList = totalList.stream().filter(s -> s.getSex().equals("남")).collect(Collectors.toList()); 

다음은 Student stream에서 이름을 키로, 점수를 값으로 갖는 Map collection을 생성하는 코드이다.

Map<String, Integer> map = totalList.stream().collect(Collectors.toMap(s -> s.getName(), s-> s.getScore()));

collect 메서드는 단순히 요소를 수집하는 기능 이외에 요소들을 그룹핑해서 Map 객체를 생성하는 기능도 제공한다. Collectors.groupingBy() 메서드에서 얻은 Collectorcollect 메서드를 호출할 때 제공하면 된다.

  1. Collector<T, ? , Map<K, List<T>>> groupingBy(Function<T,K> classifier)

groupingByFunction을 이용해서 TK로 매핑하고 K를 key로 하고 List<T>를 값으로 Map collection을 만든다.

다음은 '남', '여'를 key로 설정하여 List<Student>로 그룹핑하는 코드이다.

Map<String, List<Student>> map = totalList.stream().collect(Collectors.groupingBy(s->s.getSex()));

Collectors.groupingBy 메서드는 그룹핑 후 매핑 및 집계(평균, 카운팅, 최대, 최소, 합계)를 구할 수 있도록 두 번째 매개값인 Collector를 가질 수 있다. 다음은 두 번째 매개값으로 사용될 Collector를 얻을 수 있는, Collectors 정적 메서드이다.

  1. Collector mapping(Function, Collector): 매핑
  2. Collector averagingDouble(ToDoubleFunction): 평균값
  3. Collector counting(): 요소 수
  4. Collector maxBy(Comparator): 최대값
  5. Collector minBy(Comparator): 최소값
  6. Collector reducing(BinaryOperator<T>): 커스텀 집계 값
  7. Collector reducing(T identity,BinaryOperator<T>): 커스텀 집계 값

다음은 학생들을 성별로 그룹핑하고 각각의 평균 점수를 구해서 Map으로 얻은 코드이다.

Map<String, Double> map = totalList.stream().collect(
    Collectors.groupingBy(
        s -> s.getSex(),
        Collectors.averagingDouble(s->getScore())
    )
);

요소 병렬 처리

요소 병렬 처리(parallel operation)이란 multi code CPU 환경에서 전체 요소를 분할해 각각의 code가 병렬적으로 처리하는 것을 말한다. 자바는 요소 병렬 처리를 위해 병렬 스트림을 제공한다.

병렬성은 데이터 병렬성(Data parallelism)과 작업 병렬성(task parallelism)으로 구분할 수 있다.

데이터 병렬성은 전체 데이터를 분할해서 sub dataset으로 만들고 이 sub dataset들을 병렬 처리해서 작업을 빨리 끝내는 것을 말한다. 자바 병렬 스트림은 데이터 병렬성을 구현한 것이다.

작업 병렬성은 서로 다른 작업을 병렬 처리하는 것을 말한다. 가령, 하나의 서버에서 여러 요청을 받으면서 클라이언에게 응답을 보내는 것이 있을 각각의 thread로 나누어 처리하는 것이 대표적이다.

자바 병렬 스트림은 요소들을 병렬 처리하기 위해 fork-join framework를 사용한다. fork-join framework는 fork 단계에서 전체 요소들을 sub 요소셋으로 분할하고, 각각의 sub 요소셋을 멀티 코어에서 병렬로 처리한다. join 단계에서 sub 결과를 결합해서 최종 결과를 만들어낸다.

가령, 쿼드 코어 CPU에서 병렬 스트림으로 요소들을 처리할 경우, 먼저 fork 단계에서 stream의 전체 요소들을 4개의 sub 요소셋으로 분할한다. 그리고 각각의 sub 요소셋을 개별 code에서 처리하고, join 단계에서 3번의 결합 과정을 거쳐 최종 결과를 산출한다.

                           요소(1~N)
                 ----------------------------------------
                /           |           \               \
[Fork]   요소셋1         요소셋2         요소셋3         요소셋4
            |               |             |              |
        core1 작업      core2 작업      core3 작업      core4 작업
            \               |             |             /
             ---------------              ---------------
[Join]            결합1                         결합2
                    \                           /
                     ---------------------------
                                결합 

병렬 처리 스트림은 fork 단계에서 요소를 순서대로 분할하지 않는다. 내부적으로 알고리즘에 따라 분할한다.

fork-join framework는 병렬 처리를 위해 threadpool을 사용한다. 각각의 core에서 sub 요소셋을 처리하는 것은 작업 thread가 해야 하므로 thread 관리가 필요하다. fork-join framework는 ExecutorService의 구현 객체인 ForkJoinPool을 사용해서 작업 thread들을 관리하고 있는 것이다.

병렬 stream 사용

자바 병렬 스트림을 이용할 때 백그라운드에서 fork-join framework가 사용되기 때문에 개발자는 매우 쉽게 병렬 처리를 할 수 있다. 병렬 스트림은 다음 두 가지 메서드로 얻을 수 있다.

  1. parallelStream(): 메서드는 collection(List, Set)으로 부터 병렬 스트림을 바로 반환ㄴ
  2. parallel(): 기존 스트림을 병렬 처리 스트림으로 변환하는 것이다.

다음은 1억 개의 점수에 대한 평균을 얻을 때 일반 스트림과 병렬 스트림의 처리 시간을 측정한 것이다. 실행 결과를 보면 병렬 스트림에서 요소 처리 시간이 더 빠르다는 것을 알 수 있다.

  • Main
public class Main {
    public static void main(String[] args) {
        Random random = new Random();

        List<Integer> scores = new ArrayList<>();
        for(int i = 0; i < 100000000; i++) {
            scores.add(random.nextInt(101));
        }

        double avg = 0;
        long startTime = 0, endTime = 0, time = 0;

        Stream<Integer> stream = scores.stream();
        startTime = System.nanoTime();
        avg = stream.mapToInt(i -> i.intValue()).average().getAsDouble();
        endTime = System.nanoTime();

        time = endTime - startTime;
        System.out.println("stream processing time: " + time); // stream processing time: 97913869

        Stream<Integer> parallelStream = scores.parallelStream();
        startTime = System.nanoTime();
        avg = parallelStream.mapToInt(i -> i.intValue()).average().getAsDouble();
        endTime = System.nanoTime();

        time = endTime - startTime;
        System.out.println("Parallel stream processing time: " + time); // Parallel stream processing time: 47020814
    }
}

약 2배의 차이가 나는 것을 볼 수 있다.

그런, stream 병렬 처리가 스트림 순차 처리보다 항상 실행 성능이 좋다고 판단해서는 안된다. 그 전에 먼저 병렬 처리에 영향을 미치는 다음 3가지 요인들을 잘 살펴보아야 한다.

  1. 요소의 수와 요소당 처리 시간: 요소의 수가 적고, 처리 시간이 짧으면 일반 스트림이 fork-join 과정을 거치는 병렬 스트림보다 빠르다.

  2. stream source 종류: ArrayList 배열은 인덱스로 요소를 관리하기 때문에 fork-join 단계에서 요소를 쉽게 분리할 수 있지만, HashSet, TreeSet은 요소 분리가 쉽지 않고 LinkedList 역시 링크를 따라가야하므로 요소 분리가 쉽지 않다.

  3. Core 수: CPU core가 많으면 많을 수록 병렬 스트림 성능이 좋아지지만 core수가 적다면 병렬 스트림이 오히려 순차 스트림보다 느리다. 이는 동시성을 처리해야하기 때문이다.

0개의 댓글