[하둡 완벽 가이드] 2. 맵리듀스

jake·2023년 2월 28일
0

2.0

맵리듀스는 데이터 처리를 위한 프로그래밍 모델이다. 맵리듀스는 태생 자체가 병행성을 고려하여 설계되었고, 누구든지 이를 이용해 데이터 분석을 할 수 있다. 맵리듀스는 대용량 데이터셋에서 진가가 드러난다.

2.1 기상 데이터셋

예시로 쓰일 기상 데이터는 https://www.ncei.noaa.gov/ 에서 가져오면 된다.

연도별 최고 기온이 얼마인지 궁금하다.
기상 데이터를 살짝 뜯어보면 기상관측소 식별자, 관측 날짜, 관측 시간, 위도, 경도, 바람 뱡향, 고도 등 수많은 정보들이 담겨있다. 이 모든 정보들은 필요하지 않고 필요한 정보(기온, 연도)만 뽑기 위해 전처리 작업을 수행해야 한다.

2.2 유닉스 도구로 데이터 분석하기

for year in all/*
do
  echo -ne `basename $year .gz`"\t"
  gunzip -c $year | \
    awk '{ temp = substr($0, 88, 5) + 0;
           q = substr($0, 93, 1);
           if (temp !=9999 && q ~ /[01459]/ && temp > max) max = temp }
         END { print max }'
done

이 스크립트는 압축된 연도별 파일을 살펴본다. 단순하게 보면 데이터에서 기온을 뽑아낸 후 유효한 값이면 계속해서 최고 기온과 비교하여 최고 기온을 갱신하는 방법이다.

이러한 방법으로 ec2 고성능 인스턴스에서 실행한 결과 42분이 걸렸다. 처리 속도를 높이기 위해 프로그램의 각 부분을 병렬로 수행할 필요가 느껴진다. 그러나 문제점이 있다.

첫째, 일을 동일한 크기로 나누는 것은 언제나 쉽고 명확하지 않다.
어떠한 프로세스가 유난히 긴 데이터를 맡는다면 전체 수행 시간은 가장 긴 파일을 처리하는 프로세스의 처리 시간에 의해 결정된다.

둘째, 독립적으로 처리한 프로세스의 결과를 합치는데 더 시간이 필요할 수 있다.

셋째, 단일 머신의 처리 능력은 여전히 한계가 있다. 단일 머신에서 여러 개의 프로세서로 처리해도 최적의 수행시간을 넘어설 수 없다. 또한 데이터셋이 단일 머신의 처리능력을 초과할 수 있다.

이러한 문제점들도 병렬로 처리하는 것은 실제로 아주 어렵다. 이러한 이슈를 위해 하둡같은 프레임워크를 사용하는 것은 도움이 된다.

2.3 하둡으로 데이터 분석하기

하둡이 제공하는 맵리듀스를 사용하면 병렬 처리의 이점을 이용할 수 있다. 맵리듀스 작업은 크게 맵 단계와 리듀스 단계로 구분된다. 각 단계는 입력과 출력으로 키-값 쌍을 가지며 타입은 프로그래머가 정한다.

(1) 데이터가 한줄씩 들어와 각 행은 키-값 쌍으로 변환되어 맵 함수에 입력된다.
19500112111200 -> 0,19500112111200
19500322125622 -> 106,19500322125622
195010300355-11 -> 212,195010300355-11
194908011212111 -> 318, 194908011212111
19490705231278 -> 424, 19490705231278

(2) 맵 함수는 연도와 기온만 추출하여 내보낸다.
(1950, 0)
(1950, 22)
(1950 -11)
(1949, 111)
(1949, 78)

(3) 리듀스 함수로 가기 전 키값을 기준으로 셔플한다. 셔플한 데이터를 리듀스 함수로 보낸다.
(1949, [111, 78])
(1950. [0, 22, -11])

(4) 연도별로 측정된 모든 기온값이 하나로 묶인다. 리듀스 함수는 전체를 반복하여 최고 측정값을 추출한다.
1949, 111
1950, 22

map

import re
import sys

for line in sys.stdin:
  val = line.strip()
  (year, temp, q) = (val[15:19], val[87:92], val[92:93])
  if (temp != "+9999" and re.match("[01459]", q)):
    print "%s\t%s" % (year, temp)

reduce

import sys

(last_key, max_val) = (None, -sys.maxint)
for line in sys.stdin:
  (key, val) = line.strip().split("\t")
  if last_key and last_key != key:
    print "%s\t%s" % (last_key, max_val)
    (last_key, max_val) = (key, int(val))
  else:
    (last_key, max_val) = (key, max(max_val, int(val)))

if last_key:
  print "%s\t%s" % (last_key, max_val)

2.4 분산형으로 확장하기

0개의 댓글