大数据实验(MapReduce编程2)

发布时间 2023-12-05 19:05:23作者: 201812

代码参考:

MapReduce实验 - CodeDancing - 博客园 (cnblogs.com)

编程实现总代码:

编译工具:IDEA

说明:

1.完成不同的任务的时候,需要修改cmd的值

2.conf.set("fs.default.name","hdfs://node1:8020");换上自己的连接路径

3.System.setProperty("HADOOP_USER_NAME","root");

不加上这个会出现权限不足,idea中连接过去是以本机window用户,比如说我用户是1234,连接过去用户就是1234,但是hdfs操作里面没有这个用户。所以要手动加一个root,说明用户是root,同时,root有修改添加等权限。

import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Merge {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        conf.set("fs.default.name","hdfs://node1:8020");
        System.setProperty("HADOOP_USER_NAME","root");
        Job job = Job.getInstance(conf);
        job.setJarByClass(Merge.class);

        int cmd = 3;
//        String[] file = {"input" + cmd, "output" + cmd};
        String[] file = {"/user/root/input", "/user/root/output"};
        if (cmd == 1) {
            job.setMapperClass(Merge.Mapper1.class);
            job.setCombinerClass(Merge.Reducer1.class);
            job.setReducerClass(Merge.Reducer1.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            FileInputFormat.setInputPaths(job, new Path(file[0]));
            FileOutputFormat.setOutputPath(job, new Path(file[1]));

        } else if (cmd == 2) {
            job.setMapperClass(Merge.Mapper2.class);
            job.setReducerClass(Merge.Reducer2.class);
            job.setOutputKeyClass(IntWritable.class);
            job.setOutputValueClass(IntWritable.class);
            FileInputFormat.setInputPaths(job, new Path(file[0]));
            FileOutputFormat.setOutputPath(job, new Path(file[1]));
        } else {
            job.setMapperClass(Merge.Mapper3.class);
            job.setReducerClass(Merge.Reducer3.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            FileInputFormat.setInputPaths(job, new Path(file[0]));
            FileOutputFormat.setOutputPath(job, new Path(file[1]));
        }
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }



    public static class Mapper1 extends Mapper<Object,Text,Text,Text>
    {
        public void map(Object key,Text value,Mapper<Object,Text,Text,Text>.Context context) throws IOException, InterruptedException
        {
            context.write(value, new Text(""));
        }
    }

    public static class Reducer1 extends Reducer<Text,Text,Text,Text>
    {

        public void reduce(Text key,Iterable<Text> values,Reducer<Text,Text,Text,Text>.Context context) throws IOException,InterruptedException
        {
            context.write(key, new Text(""));
        }
    }

    public static class Mapper2 extends Mapper<Object,Text,IntWritable,IntWritable>
    {
        public void map(Object key,Text value,Context context) throws IOException, InterruptedException
        {
            IntWritable data = new IntWritable();
            data.set(Integer.parseInt(value.toString()));
            context.write(data, new IntWritable(1));
        }
    }

    public static class Reducer2 extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable>
    {
        private static int sum = 1;
        public void reduce(IntWritable key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException
        {
            for(IntWritable num:values)
            {
                context.write(new IntWritable(sum),key);
                sum++;//可能存在重复的数字
            }
        }
    }

    public static class Mapper3 extends Mapper<Object,Text,Text,Text>
    {
        public void map(Object key,Text value,Context context) throws IOException, InterruptedException
        {
            String[] splStr = value.toString().split(" ");
            String child = splStr[0];
            String parent = splStr[1];

            if(child.equals("child")&&parent.equals("parent"))
                return;
            context.write(new Text(child), new Text("old#"+parent));
            context.write(new Text(parent), new Text("young#"+child));
        }
    }

    public static class Reducer3 extends Reducer<Text,Text,Text,Text>
    {
        private static boolean head = true ;
        public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException
        {
            if(head)
            {
                context.write(new Text("grandchild"), new Text("grandparent"));
                head = false;
            }
            ArrayList<String> grandchild = new ArrayList<String>();
            ArrayList<String> grandparent = new ArrayList<String>();
            String[] temp;
            for(Text val:values)
            {
                temp = val.toString().split("#");
                if(temp[0].equals("young"))
                    grandchild.add(temp[1]);
                else
                    grandparent.add(temp[1]);
            }
            if(grandchild.size()==0||grandparent.size()==0)
                return;
            for(String gc:grandchild)
                for(String gp:grandparent)
                    context.write(new Text(gc), new Text(gp));
        }
    }

}

 

(一)

(1)编写A.txt、B.txt

 (2)上传到hdfs上

 (3)编程实现的结果

 代码在上面,总代码,但是cmd要自己改为1

 (二)

(1)准备text1.txt、text2.txt、text3.txt数据,且上传到hdfs上

 (2)编程实现

 

总代码cmd改成2

 

(三)

说明:在写child-parent的时候,中间用一个空格隔开数据,不要用其他的。

 

(1)数据准备

(2)编程实现

代码cmd改成3