2022. 08. 02(화) Spark & Hadoop 초격차 7일차

Dylan·2022년 8월 3일
0
post-thumbnail

Reduce-side join 실습 2

순서

  1. 정렬하고자 하는 값을 포함하는 복합 키 클래스 정의
  2. 정의한 복합키를 통해서 어느 리듀스에게 전달될지 결정하는 파티셔너 클래스 정의
  3. 리듀스에 입력될 값을 그룹핑해주는 클래스 정의
  4. 리듀스에 입력을 키를 기준으로 정렬해 주기위한 클래스를 정의

1. TextText 클래스 - 복합 키 클래스 정의

package com.fastcampus.hadoop;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class TextText implements WritableComparable<TextText> {
    private Text first;
    private Text second;

    // 생성자 정의
    public TextText() {
        set(new Text(), new Text());
    }

    public TextText(String first, String second) {
        set(new Text(first), new Text(second));
    }

    public TextText(Text first, Text second) {
        set(first, second);
    }

    public void set(Text first, Text second) {
        this.first = first;
        this.second = second;
    }

    public Text getFirst() {
        return first;
    }

    public Text getSecond() {
        return second;
    }

    @Override
    public int compareTo(TextText o) {
        // 첫번 째 들어있는 first 값들을 기준으로 비교를 하고
        int cmp = first.compareTo(o.first);
        // 일치하지 않을 때에
        if (cmp != 0) {
            return cmp;
        }

        // 일치할땐 second 값들을 리턴
        return second.compareTo(o.second);
    }

    /** Writable 인터페이스 밑에 있는 두개의 메소드를 재정의 */
    // 직렬화를 위한
    @Override
    public void write(DataOutput out) throws IOException {
        first.write(out);
        second.write(out);
    }

    // 직렬화된 데이터를 읽을 때
    @Override
    public void readFields(DataInput in) throws IOException {
        first.readFields(in);
        second.readFields(in);
    }

    // Hashcode 재정의
    @Override
    public int hashCode() {
        // 해쉬코드 값을 재정의 할때는 소수를 많이 곱해줌
        return first.hashCode() * 163 + second.hashCode();
    }

    // equals 재정의
    @Override
    public boolean equals(Object obj) {
        if (obj instanceof TextText) {
            TextText tt = (TextText) obj;
            return first.equals(tt.first) && second.equals(tt.second);
        }

        return false;
    }

    // toString 재정의
    @Override
    public String toString() {
        return first.toString() + ", " + second.toString();
    }
}

2. KeyPartitioner 클래스 - 파티셔너 클래스 정의

// 파티셔너 클래스 정의
    public static class KeyPartitioner extends Partitioner<TextText, Text> {
        @Override
        public int getPartition(TextText key, Text value, int numPartitions) {
            return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
        }
    }

3. GroupComparator 클래스 - 그룹핑 클래스 정의

// 그룹핑 클래스 정의
    public static class GroupComparator extends WritableComparator {
        protected GroupComparator() {
            super(TextText.class, true);
        }

        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            TextText t1 = (TextText) a;
            TextText t2 = (TextText) b;
            return t1.getFirst().compareTo(t2.getFirst());
        }
    }

4. KeyComparator 클래스 - 키 정렬 클래스 정의

// 키 정렬 클래스 정의
    public static class KeyComparator extends WritableComparator {
        protected KeyComparator() {
            super(TextText.class, true);
        }

        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            TextText t1 = (TextText) a;
            TextText t2 = (TextText) b;
            int cmp = t1.getFirst().compareTo(t2.getFirst());
            if (cmp != 0) {
                return cmp;
            }
            return t1.getSecond().compareTo(t2.getSecond());
        }
    }

전체 코드

package com.fastcampus.hadoop;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;
import java.util.Iterator;

public class ReduceSideJoinCustomkey extends Configured implements Tool {
    static enum DataType {
        DEPARTMENT("a"), EMPLOYEE("b");

        DataType(String value) {
            this.value = value;
        }
        private final String value;
        public String value() {
            return value;
        }
    }

    public static class DepartmentMapper extends Mapper<LongWritable, Text, TextText, Text> {
        TextText outKey = new TextText();
        Text outValue = new Text();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // dept_no, dept_name
            String[] split = value.toString().split(",");

            outKey.set(new Text(split[0]), new Text(DataType.DEPARTMENT.value));
            outValue.set(split[1]);
            context.write(outKey, outValue);
        }
    }

    public static class EmployeeMapper extends  Mapper<LongWritable, Text, TextText, Text> {
        TextText outKey = new TextText();
        Text outValue = new Text();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // emp_no, birth_date, first_name, last_name, gender, hire_date, dept_no
            String[] split = value.toString().split(",");

            outKey.set(new Text(split[6]), new Text(DataType.EMPLOYEE.value));
            outValue.set(new Text(split[0] + "\t" + split[2] + "\t" + split[4]));
            context.write(outKey, outValue);
        }
    }

    public static class ReduceJoinReducer extends Reducer<TextText, Text, Text, Text> {
        Text outKey = new Text();
        Text outValue = new Text();

        @Override
        protected void reduce(TextText key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            Iterator<Text> iter = values.iterator();

            // 항상 values에는 values로  값이 전달 될땐 두번째의 키 값으로 정렬이 되어 있기 떄문에 항상 department text가 먼저 오게 된다.
            String departmentText = iter.next().toString();

            while (iter.hasNext()) {
                Text employeeText = iter.next();
                String[] employeeSplit = employeeText.toString().split("\t");
                outKey.set(employeeSplit[0]);
                outValue.set(employeeSplit[1] + "\t" + employeeSplit[2] + "\t" + departmentText);
                context.write(outKey, outValue);
            }
        }
    }

    // 키 정렬 클래스 정의
    public static class KeyComparator extends WritableComparator {
        protected KeyComparator() {
            super(TextText.class, true);
        }

        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            TextText t1 = (TextText) a;
            TextText t2 = (TextText) b;
            int cmp = t1.getFirst().compareTo(t2.getFirst());
            if (cmp != 0) {
                return cmp;
            }
            return t1.getSecond().compareTo(t2.getSecond());
        }
    }

    // 그룹핑 클래스 정의
    public static class GroupComparator extends WritableComparator {
        protected GroupComparator() {
            super(TextText.class, true);
        }

        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            TextText t1 = (TextText) a;
            TextText t2 = (TextText) b;
            return t1.getFirst().compareTo(t2.getFirst());
        }
    }

    // 파티셔너 클래스 정의
    public static class KeyPartitioner extends Partitioner<TextText, Text> {
        @Override
        public int getPartition(TextText key, Text value, int numPartitions) {
            return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
        }
    }

    // 드라이버 정의
    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(getConf(), "ReduceSideJoinCustomKey");

        job.setJarByClass(ReduceSideJoinCustomkey.class);
        job.setReducerClass(ReduceJoinReducer.class);
        job.setMapOutputKeyClass(TextText.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setPartitionerClass(KeyPartitioner.class);
        job.setSortComparatorClass(KeyComparator.class);
        job.setGroupingComparatorClass(GroupComparator.class);

        MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, EmployeeMapper.class);
        MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, DepartmentMapper.class);

        FileOutputFormat.setOutputPath(job, new Path(args[2]));
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new ReduceSideJoinCustomkey(), args);
        System.exit(exitCode);
    }
}


실행 결과

0개의 댓글