2022. 08. 04(목) Spark & Hadoop 초격차 9일차

Dylan·2022년 8월 4일
0
post-thumbnail

첫 번째 MapReduce - Join

MovieAverageRateTopK

package com.fastcampus.hadoop;

import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class MovieAverageRateTopK extends Configured implements Tool {
    public static class MovieMapper extends Mapper<Object, Text, Text, Text> {
        private Text movieId = new Text();
        private Text outValue = new Text();

        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] columns = value.toString().split(",");
            if (columns[0].equals("movieId")) {
                return;
            }
            movieId.set(columns[0]);
            outValue.set("M" + columns[1]);
            context.write(movieId, outValue);
        }
    }

    public static class RatingMapper extends Mapper<Object, Text, Text, Text> {
        private Text movieId = new Text();
        private Text outValue = new Text();

        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] colums = value.toString().split(",");
            if (colums[0].equals("userId")) {
                return;
            }
            movieId.set(colums[1]);
            outValue.set("R" + colums[2]);
            context.write(movieId, outValue);
        }
    }

    public static class MovieRatingJoinReducer extends Reducer<Text, Text, Text, Text> {
        private List<String> ratingList = new ArrayList<>();
        private Text movieName = new Text();
        private Text outValue = new Text();

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            ratingList.clear();

            for (Text value : values) {
                if (value.charAt(0) == 'M') {
                    movieName.set(value.toString().substring(1));
                } else if (value.charAt(0) == 'R') {
                    ratingList.add(value.toString().substring(1));
                }
            }

            double average = ratingList.stream().mapToDouble(Double::parseDouble).average().orElse(0.0);
            outValue.set(String.valueOf(average));
            context.write(movieName, outValue);
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(getConf(), "MovieAverageRateTopK First");
        job.setJarByClass(MovieAverageRateTopK.class);
        job.setReducerClass(MovieRatingJoinReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, MovieMapper.class);
        MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, RatingMapper.class);
        return 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new MovieAverageRateTopK(), args);
        System.exit(exitCode);
    }
}

첫 번째 MapReduce - Unit Test

MovieAverageRateTopKTest

package com.fastcampus.hadoop;

import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.apache.hadoop.mrunit.types.Pair;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.junit.Test;

import java.io.IOException;
import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.List;

public class MovieAverageRateTopKTest {
    @Test
    public void movieMapTest() throws IOException {
        MapDriver mapDriver = new MapDriver<Object, Text, Text, Text>()
                .withMapper(new MovieAverageRateTopK.MovieMapper())
                .withInput(new LongWritable(0), new Text("movieId,title,genres"))
                .withInput(new LongWritable(1), new Text("1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy"))
                .withInput(new LongWritable(2), new Text("2,Jumanji (1995),Adventure|Children|Fantasy"));

        /*List<Pair<Text, Text>> result = mapDriver.run();
        System.out.println(result);*/

        mapDriver.withOutput(new Text("1"), new Text("MToy Story (1995)"))
                .withOutput(new Text("2"), new Text("MJumanji (1995)"))
                .runTest();
    }

    @Test
    public void ratingMapTest() throws IOException {
        new MapDriver<Object, Text, Text, Text>()
                .withMapper(new MovieAverageRateTopK.RatingMapper())
                .withInput(new LongWritable(0), new Text("userId,movieId,rating,timestamp"))
                .withInput(new LongWritable(1), new Text("1,1,4.0,964982703"))
                .withInput(new LongWritable(2), new Text("7,1,4.5,1106635946"))
                .withInput(new LongWritable(3), new Text("8,2,4.0,839463806"))
                .withInput(new LongWritable(4), new Text("18,2,3.0,1455617462"))
                .withOutput(new Text("1"), new Text("R4.0"))
                .withOutput(new Text("1"), new Text("R4.5"))
                .withOutput(new Text("2"), new Text("R4.0"))
                .withOutput(new Text("2"), new Text("R3.0"))
                .runTest();
    }

    @Test
    public void movieRatingJoinReduceTest() throws IOException {
        new ReduceDriver<Text, Text, Text, Text>()
                .withReducer(new MovieAverageRateTopK.MovieRatingJoinReducer())
                .withInput(new Text("1"), Arrays.asList(new Text("MToy Story (1995)"), new Text("R4.0"), new Text("R4.5")))
                .withInput(new Text("2"), Arrays.asList(new Text("MJumanji (1995)"), new Text("R4.0"), new Text("R3.0")))
                .withOutput(new Text("Toy Story (1995)"), new Text("4.25"))
                .withOutput(new Text("Jumanji (1995)"), new Text("3.5"))
                .runTest();

    }
}

0개의 댓글