大数据从入门到实战 - 第3章 MapReduce基础实战——信息挖掘 - 挖掘父子关系

发布时间 2023-11-27 22:48:50作者: Changersh

输出一直顺序不正确,把正确答案和我自己写的混了混,目前感觉是mapper的问题
正确输出:

grand_child    grand_parent
Mark    Jesse
Mark    Alice
Philip    Jesse
Philip    Alice
Jone    Jesse
Jone    Alice
Steven    Jesse
Steven    Alice
Steven    Frank
Steven    Mary
Jone    Frank
Jone    Mary
package org.csh;
/**
 * @author :Changersh
 * @date : 2023/11/27 21:03
 */

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class simple_data_mining {
    public static int time = 0;

    /**
     *
     */
    // Map将输入文件按照空格分割成child和parent,然后正序输出一次作为右表,反序输出一次作为左表,需要注意的是在输出的value中必须加上左右表区别标志
    // 要标记一下,哪个是反过来的
    /*
        一个人既是父亲又是儿子,那么倒过来的时候,ta的名字后跟着的就是ta的父子,这俩是爷孙,根据标记输出即可
     */
    public static class Map extends Mapper<Object, Text, Text, Text> {
        Text outK = new Text();
        Text outV = new Text();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            /********** Begin **********/

            String[] split = value.toString().split("\\s+");
            if (split[0].equals("child")) return;
            outK.set(split[0]);
//            outV.set("+" + split[1]); // 正序
            outV.set("2+" + split[0] + "+" + split[1]);
            context.write(outK, outV);
            outK.set(split[1]);
//            outV.set("-" + split[0]); // 倒序
            outV.set("1+" + split[0] + "+" + split[1]);
            context.write(outK, outV);


//            String line = value.toString();
//            String[] childAndParent = line.split(" ");
//            List<String> list = new ArrayList<>(2);
//            for (String childOrParent : childAndParent) {
//                if (!"".equals(childOrParent)) {
//                    list.add(childOrParent);
//                }
//            }
//            if (!"child".equals(list.get(0))) {
//                String childName = list.get(0);
//                String parentName = list.get(1);
//                String relationType = "1";
//                context.write(new Text(parentName), new Text(relationType + "+"
//                        + childName + "+" + parentName));
//                relationType = "2";
//                context.write(new Text(childName), new Text(relationType + "+"
//                        + childName + "+" + parentName));
//            }
            /********** End **********/
        }
    }

    public static class Reduce extends Reducer<Text, Text, Text, Text> {
        Text outK = new Text();
        Text outV = new Text();

        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            /********** Begin **********/

            //输出表头
            if (time == 0) {
                time = 1;
                outK.set("grand_child");
                outV.set("grand_parent");
                context.write(outK, outV);
            }

//
//            ArrayDeque<String> child = new ArrayDeque<>();
//            ArrayDeque<String> parent = new ArrayDeque<>();
//
//            //获取value-list中value的child
//            //获取value-list中value的parent
//            for (Text value : values) {
//                String s = value.toString();
//                if (s.startsWith("-")) child.addLast(s.substring(1));
//                else if (s.startsWith("+")) parent.addLast(s.substring(1));
//            }
//
//
//            //左表,取出child放入grand_child
//            //右表,取出parent放入grand_parent
//            //输出结果
//
//            for (String s : child) {
//                for (String s1 : parent) {
//                    outK.set(s);
//                    outV.set(s1);
//                    context.write(outK, outV);
//                }
//            }

            //获取value-list中value的child
            List<String> grandChild = new ArrayList<>();

            //获取value-list中value的parent
            List<String> grandParent = new ArrayList<>();

            //左表,取出child放入grand_child
            for (Text text : values) {
                String s = text.toString();
                String[] relation = s.split("\\+");
                String relationType = relation[0];
                String childName = relation[1];
                String parentName = relation[2];
                if ("1".equals(relationType)) {
                    grandChild.add(childName);
                } else {
                    grandParent.add(parentName);
                }
            }

            //右表,取出parent放入grand_parent
            int grandParentNum = grandParent.size();
            int grandChildNum = grandChild.size();
            if (grandParentNum != 0 && grandChildNum != 0) {
                for (int m = 0; m < grandChildNum; m++) {
                    for (int n = 0; n < grandParentNum; n++) {
                        //输出结果
                        context.write(new Text(grandChild.get(m)), new Text(
                                grandParent.get(n)));
                    }
                }
            }


            /********** End **********/

        }
    }

    public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Single table join");
        job.setJarByClass(simple_data_mining.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        String inputPath = "D:\\DataFiles\\Hadoop\\Input\\Educoder";   //设置输入路径
        String outputPath = "D:\\DataFiles\\Hadoop\\Output\\Educoder\\ChildAndParent";   //设置输出路径
        FileInputFormat.addInputPath(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));
        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}