[Java] Stream이 밀려온다

곰민·2022년 12월 31일
2
post-thumbnail

Stream


Java8 에서 추가된 Stream에 대해서 다시한번 공부하고 정리한 것을 간단한 특징들과 사용시 유의해야 하는 부분 이렇게 두 가지로 포스팅 하려고 합니다.

Stream API


Stream API는 순차적 or 병렬적으로 다량의 데이터 처리 작업을 돕고자 Java8에 추가되었습니다.
이 Stream API가 제공하는 추상 개념 중 핵심적인 것은 두 가지입니다.

  1. 스트림(Stream)

    데이터의 유한 혹은 무한한 일련의 시퀀스를 의미합니다.

    • Stream으로 넘어온 데이터를 Stream으로 이어받아서 유한하게 처리하거나 무제한으로 처리가 가능합니다.
  2. 스트림 파이프라인(Stream Pipe Line)

    원소들로 수행하는 연산 단계를 표현하는 개념입니다.

    • 기본값 타입으로 int, long, double 세 가지를 지원하며 스트림의 원소들은 어디로부터든 올 수 있습니다.
    • 대표적으로 컬렉션, 배열, 파일, 정규 표현식 패턴 매처(Matcher), 난수 생성기, 또다른 스트림 등이 있습니다.

Stream & Stream Pipe Line 특징


출처 : https://www.logicbig.com/tutorials/core-java-tutorial/java-util-stream/stream-api-intro.html

  • Source Stream에서 시작해 최종 연산(terminal operation)으로 끝나며 그 사이에 하나 이상의 중간 연산(Intermediate Operation)가 있을 수 있습니다.

🚀 중간연산(Intermediate Operation)


  • Stream을 전달받아서 다른 Stream으로 변환하고 Stream을 리턴한다.
  • ex) filter, map, limit, skip, sorted …
    • 변환 전 스트림 원소 타입과 변환 후 스트림 원소 타입이 같을 수도 있고 다를 수도 있습니다.
  • 자주 사용되는 filter와 map에 대한 예시를 간단하게 확인하겠습니다.
    • filter는 주어진 조건에 맞는 요소만으로 구성된 새로운 스트림을 반환합니다.
    • 다중 filter 예제
      public void whenUsingMultipleFilters_dataShouldBeFiltered() {
      		List<Student> studentList = new ArrayList<>();
      		studentList.add(new Student(14 , 50, "PHYSICS"));
      		List<Student> filteredStream = studentList.stream()
      			.filter(s -> s.getAge() > 13)
      			.filter(s -> s.getScore() > 30)
      			.filter(not(s -> Objects.equals(s.getSubject(), "PHYSICS")))
      			.toList();
      }
      //not은 Predicate static method not을 static import 한것
    • 단일 filter
      @Test
      public void whenUsingSingleComplexFilter_dataShouldBeFiltered() {
      		List<Student> studentList = new ArrayList<>();
      		studentList.add(new Student(14 , 50, "PHYSICS"));
          List<Student> filteredStream = students.stream()
            .filter(s -> s.getScore() > 50 
              && s.getAge() > 13 
              && s.getSubject() == "PHYSICS")
            .toList();
      
      }
    • filter 내부는 이전 함수형 인터페이스 포스팅에서 확인했었던 Predicate로 이루어져 있습니다.
  • Map은 Stream Source에 주어진 함수를 적용한 결과로 구성된 Stream을 반환합니다.
    • people 객체에서 이름만 추출하여 String .List로 반환하는 예시

      List<String> nameList = people.stream()
                      .map(p -> p.getName())
                      .collect(Collectors.toList());
    • 들어올 때는 People 객체 타입이었지만 최종 연산을 거치고 반환 시에는 List<String>으로 반환됩니다

    • 즉 Map은 내부에서 주어진 함수에 따라서 들어오는 타입과 나가는 타입이 같을 수도 있고 같지 않고 달라질 수 도 있습니다.

🚀 최종 연산(Terminal Operation)


  • Stream을 리턴하지 않으며 연산 결과가 Stream이 아니므로 Stream Pipe Line에서 한 번만 가능합니다.

  • ex)collect, allMatch, count, forEach, min, max …

  • foreach 예시

    List<Integer> list = Arrays.asList(5,1,1,2,3,4,5); //Source Stream
    Stream<Integer> intStream = list.stream();
    intStream.distinct().sorted().forEach(System.out::print);
    //중간 연산 distinct()중복제거, sorted()정렬 //최종 연산 foreach() 내부 메서드 레퍼런스 전부 출력
    //12345

🚀 Stream Pipe Line 연산 과정


  1. 스트림 생성 (데이터 소스 → 데이터의 연속적인 흐름)
  2. 중간 연산(0 ~ n번)
  3. 최종 연산(1번)

🚀 Fluent API


  • 스트림은 .매서드.매서드로 이어서 사용 가능한 매서드 연쇄를 지원하는 fluent API 입니다.
    • 위 예시와 같이 Stream Pipe Line 하나를 구성하는 모든 호출을 연결하여 단 하나의 표현식으로 완성할 수 있습니다.

스트림으로 처리하는 데이터는 오직 한 번만 처리된다.


Stream Pipe Line을 한번 쭉 지나가면서 한 번만 처리됩니다.

🚀 지연 평가(lazy evaluation)


  • 스트림 파이프라인의 중계형 연산자는 지연 평가(lazy evaluation)가 됩니다.
    • map() 과 같은 중계형 연산자는 collect() 과 같은 terminal operator가 오기 전까지는 실행을 하지 않습니다.

      List<String>names = new ArrayList<>();
      names.add("gom");
      Stream<String> stringStream = names.stream().map(String::toUppperCase);
      //중간연산 소문자 대문자로 변경 //최종 연산 x
      names.forEach(System.out::println);
      //gom
      //여전히 소문자로 남아있습니다.
    • 위 예제를 보면 최종 연산(terminal operation)을 하지 않았고 중계형 연산자가 Stream Source를 실질적으로 처리하지 않았기 때문에 여전히 stringStream은 소문자로 출력 됩니다.

      List<Integer> list = Arrays.asList(5,1,1,2,3,4,5);
      Stream<Integer> intStream = list.stream();
      intStream.distinct().sorted().forEach(System.out::print);
      //중간 연산 중복제거, 정렬 //최종 연산 메서드 레퍼런스 전부 출력
      //12345
    • 위 예시와 같이 최종 연산이 있는 경우 중간 연산이 전부 적용되어서 잘 출력 됩니다.

    • 즉 evaluation은 terminal operation 시점에 lazy 하게 이루어 집니다.

🚀 Stream의 무제한 처리와 Short-circuiting Operations


  • Stream 으로 넘어온 데이터를 Stream으로 이어받아서 계속 무제한으로 처리가 가능하다고 했었습니다.

    IntStream.iterate(0, i -> i + 2).forEach(System.out::println);
    //중간 연산 i+2 // 최종 연산 Foreach
    //계속 찍힙니다.
    • 계속 끝이 없이 찍힙니다.
    • 이러한 무제한 처리를 Short-circuiting Operations를 통해서 제한할 수 있습니다.
  • Short-circuiting Operations란?

    Short-circuiting은 결과가 결정되자마자 expression에 대한 평가가 멈추는 것을 의미합니다.

    • 예시

      if(a == b || c = d || e == f){
      	//Do something
      }
    • condition 문에 대한 결과가 a == b 가 true면 이미 결정되었기 때문에 c == d 와 e == f는 evaluated되지 않습니다.

  • Stream Shoort-circuiting Operations

    출처 : https://www.logicbig.com/tutorials/core-java-tutorial/java-util-stream/short-circuiting.html

  • Stream 에서 limit()을 활용하여 제한해 보도록 하겠습니다

IntStream.iterate(0, i -> i + 2).limit(100).forEach(System.out::println);
//중간 연산 i+2, limit() 100개까지만 제한 // 최종 연산 Foreach
//100개만 찍힙니다
  • limit()를 활용하여 100개까지만 제한하였고 이후의 것들은 evaluated 되지 않기 때문에 무제한 stream을 제한해서 사용할 수 있습니다.

🚀 Stream 은 손쉽게 병렬 처리를 할 수 있다


for (String bookName : bookNames){
		if(bookname.startWith("Harry potter")) {
				System.out.println(bookName.toUpperCase());ß
		}
}
  • 반복을 돌면서 특정 작업을 하는 for 문의 경우 parallel이 아닌 serial로 동작하게 됩니다.
List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
listOfNumbers.stream().forEach(number ->
    System.out.println(number + " " + Thread.currentThread().getName())
);

//main
//main
//main
//main
  • 기본적으로 병렬로 명시하지 않는 한 모든 stream은 순차적으로 작동하며 단일 쓰레드를 사용하여 stream pipe line을 처리합니다.
List<String> collect = bookNames.parallelStream().map((s) -> {
		System.out.println(s + " " + Thread.currentThread().getName());
		return s.toUpperCase();}).collect(Collectors.toList());
collect.forEach(System.out::println);
//bookNames + Thread name 출력
  • Stream 의 경우 parallelStream() 을 활용하여 손쉽게 병렬 처리를 할 수 있습니다.

  • parallelStream을 사용하면 별도의 코어에서 병렬로 코드를 실행할 수 있으며 최종 결과는 각 개별 결과의 조합입니다.

    • parallelStream은 Java7에서 추가된 fork-join framework로 스레드 간에 소스 데이터를 분할하고 작업 완료시 콜백 처리를 합니다.

    • 단 실행 순서는 통제할 수 없으며 프로그램을 실행할 때마다 변경될 수 있습니다.

    • 특정 연산이 순차 연산과 병렬 연산에서 결과 값이 다를 수도 있습니다.

      • 정수의 합을 병렬로 처리하는 예시 (시작 합계에 5 추가로 더함.)

        List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
        int parallelSum = listOfNumbers.parallelStream().reduce(5, Integer::sum);
        System.out.println("parallelSum = " + parallelSum);
        int serialSum = listOfNumbers.stream().reduce(5, Integer::sum);
        System.out.println("serialSum = " + serialSum);
        
        //parallelSum = 30
        //serialSum = 15
        • parallelStream 의 연산 결과값은 30입니다.

        • common fork-join pool에서 사용하는 스레드에 수에 따라서 값은 달라질 수 있습니다.

          • common fork-join pool은 보통 프로세서 코어수 -1입니다.
          • jvm 설정값을 수정하여 전역 설정으로 parallelstream의 스레드 수를 정해둘 수 있지만 권장하지 않습니다.
        • 값이 동일하게 나오려면 parallel stream 외부로 빼야 합니다

          List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
          int parallelSum = listOfNumbers.parallelStream().reduce(Integer::sum) + 5;
          System.out.println("parallelSum = " + parallelSum);
        • 어떤 연산에 parallelStream()을 활용할지 잘 생각 해봐야 합니다.

  • 처리할 데이터의 양이 많은 경우 parallelStream()을 활용한 병렬 처리의 성능 효과를 누릴 수 있습니다.

    • 하지만 parallelStream()의 사용은 주의해야 하며 순차처리를 잘 사용하다가 성능 이슈가 생기는 대용량 처리에 있어서 serial 처리와 parallell 처리의 성능 측정을 통한 더 나은 방식을 찾는 것이 중요하다고 생각합니다.

참조


Stream (Java Platform SE 8 )

Short-circuit evaluation - Wikipedia

더 자바, Java 8 - 인프런 | 강의

What is short circuiting and how is it used when programming in Java?

baeldung multiple-filter

baeldung parallel-stream

profile
더 나은 개발자로 성장하기 위해 퀘스트 🧙‍♂🧙‍♂ 깨는 중입니다. 관심사는 back-end와 클라우드입니다.

0개의 댓글