Hadoop Yarn Tool接口接入

发布时间 2023-12-17 21:43:34作者: SpringCore

项目搭建参考
Java实现对Hadoop HDFS的API操作

1.驱动类

package cn.coreqi.mapreduce.tool;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.util.Arrays;

public class WordCountDriver {
    private static Tool tool;
    public static void main(String[] args) throws Exception {
        // 创建配置
        Configuration conf = new Configuration();
        switch (args[0]){
            case "wordcount":
                tool = new WordCount();
                break;
            default:
                throw new RuntimeException("no such tool " + args[0]);
        }
        // 执行程序
        int run = ToolRunner.run(conf, tool, Arrays.copyOfRange(args, 1, args.length));
        System.exit(run);
    }
}

2.核心逻辑

package cn.coreqi.mapreduce.tool;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;

import java.io.IOException;

public class WordCount implements Tool {
    private Configuration conf;
    // 核心驱动(conf需要传入)
    @Override
    public int run(String[] strings) throws Exception {
        // 1.获取job
        Job job = Job.getInstance(conf);

        // 2.设置jar包路径
        job.setJarByClass(WordCountDriver.class);   //通过反射指定类所在的包地址来获取当前jar包的路径

        // 3.关联mapper和reducer
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        // 4.设置mapper输出的KV类型[因为泛型擦除的问题,所以需要手动指定类型]
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 5. 设置最终输出(reducer)的KV类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 6. 设置输入路径和输出路径(读取命令行参数集群模式运行)
        FileInputFormat.setInputPaths(job,new Path(strings[0]));
        FileOutputFormat.setOutputPath(job,new Path(strings[1]));

        // 7.提交job,获取更多返回信息
        return job.waitForCompletion(true) ? 0 : 1;
    }

    @Override
    public void setConf(Configuration configuration) {
        this.conf = configuration;
    }

    @Override
    public Configuration getConf() {
        return conf;
    }

    // mapper
    public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
        private Text outK = new Text();
        private IntWritable outV = new IntWritable(1);
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            // 将一行的内容转换成字符串
            String line = value.toString();
            // 切割内容
            String[] words = line.split(" ");
            // 循环写出
            for (String word : words) {
                outK.set(word);
                context.write(outK,outV);
            }
        }
    }
    //reducer
    public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
        private IntWritable outV = new IntWritable();
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            int sum = 0;
            //累加
            for (IntWritable value : values) {
                sum += value.get();
            }
            outV.set(sum);

            //写出
            context.write(key,outV);
        }
    }
}