2022. 08. 05(금) Spark & Hadoop 초격차 10일차

Dylan·2022년 8월 5일
0

두번째 MapReduce - Sorting

추가 된 소스

public static class TopKMapper extends Mapper<Object, Text, Text, Text> {
        // Key값을 기준으로 정렬이 되어있는 맵
        private TreeMap<Double, Text> topKMap = new TreeMap<>();

        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] columns = value.toString().split("\t");
            topKMap.put(Double.parseDouble(columns[1]), new Text(columns[0]));

            if (topKMap.size() > K) {
                topKMap.remove(topKMap.firstKey());
            }
        }

        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            for (Double k : topKMap.keySet()) {
                // 맵의 아웃풋으로 평점이 키로 출력이 되고, 두번째로 영화제목이 출력이 된다
                context.write(new Text(k.toString()), topKMap.get(k));
            }
        }
    }

    public static class TopKReducer extends Reducer<Text, Text, Text, Text> {
        private TreeMap<Double, Text> topKMap = new TreeMap<>();

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            for (Text value : values) {
                topKMap.put(Double.parseDouble(key.toString()), new Text(value));
                if (topKMap.size() > K) {
                    topKMap.remove(topKMap.firstKey());
                }
            }
        }

        @Override
        protected  void cleanup(Context context) throws IOException, InterruptedException {
            // 내림차순 키값으로 가져와서 처리
            for (Double k : topKMap.descendingKeySet()) {
                context.write(topKMap.get(k), new Text(k.toString()));
            }
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(getConf(), "MovieAverageRateTopK First");
        job.setJarByClass(MovieAverageRateTopK.class);
        job.setReducerClass(MovieRatingJoinReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, MovieMapper.class);
        MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, RatingMapper.class);

        FileOutputFormat.setOutputPath(job, new Path(args[2]));

        int returnCode = job.waitForCompletion(true) ? 0 : 1;

        // 정상적으로 완료가 됐을 시
        if (returnCode == 0) {
            Job job2 = Job.getInstance(getConf(), "MovieAverageRateTopK Second" );
            job2.setJarByClass(MovieAverageRateTopK.class);
            job2.setMapperClass(TopKMapper.class);
            job2.setReducerClass(TopKReducer.class);
            job2.setNumReduceTasks(1);
            job2.setOutputKeyClass(Text.class);
            job2.setOutputValueClass(Text.class);

            FileInputFormat.addInputPath(job2, new Path(args[2]));
            FileOutputFormat.setOutputPath(job2, new Path(args[3]));

            return job2.waitForCompletion(true) ? 0 : 1;
        }
        return 1;
    }

두번째 MapReduce - Unit Test

추가된 소스

/** 두번째 MapReduce - Unit Test */
    @Test
    public void topKMapTest() throws IOException {
        // withOutput 위치를 바꿀 시 에러가 뜸 즉, 출력순서가 뒤바뀜
        // 작은 순서부터 실행이 되야 하는데 순서가 뒤바뀌면서 큰 순서부터 되버려서 에러가 생김
        new MapDriver<Object, Text, Text, Text>()
                .withMapper(new MovieAverageRateTopK.TopKMapper())
                .withInput(new LongWritable(0), new Text("Toy Story (1995)\t4.25"))
                .withInput(new LongWritable(1), new Text("Jumanji (1995)\t3.5"))
                .withOutput(new Text("3.5"), new Text("Jumanji (1995)"))
                .withOutput(new Text("4.25"), new Text("Toy Story (1995)"))
                .runTest();
    }

    @Test
    public void topKReduceTest() throws IOException {
        // 여기도 출력 결과의 순서가 중요함. 여긴 높은 평점부터 낮은평점순으로 순서가 되야 함.
        new ReduceDriver<Text, Text, Text, Text>()
                .withReducer(new MovieAverageRateTopK.TopKReducer())
                .withInput(new Text("3.5"), Arrays.asList(new Text("Jumanji (1995)")))
                .withInput(new Text("4.25"), Arrays.asList(new Text("Toy Story (1995)")))
                .withOutput(new Text("Toy Story (1995)"), new Text("4.25"))
                .withOutput(new Text("Jumanji (1995)"), new Text("3.5"))
                .runTest();
    }

영화 평점 Top 30 실행

명령어를 통해 코드 실행




실행 결과 1

실행 결과 2

잘 작동되는걸 확인 할 수 있다.

0개의 댓글