MapReduce入门案例——wordcount词频统计分析

发布时间 2023-12-19 12:29:22作者: 智慧的骆驼

        说实话,wordcount这个案例挺土的,但是作为入门案例,还是值得学习的,本篇就通过MapReduce来对词频进行一个统计分析,并写出核心代码。

一:案例介绍:

        

  • Input : 读取文本文件;
  • Splitting : 将文件按照文件块(block)或者行进行拆分,此时得到的K1为偏移量,V1表示对应行的文本内容
  • Mapping : 并行将每一行按照空格进行拆分,拆分得到的 List(K2,V2),其中 K2 代表每一个单词,由于是做词频统计,所以 V2 的值为 1,代表出现 1 次;
  • Shuffling:由于 Mapping 操作可能是在不同的机器上并行处理的,所以需要通过 shuffling 将相同 key 值的数据分发到同一个节点上去合并,这样才能统计出最终的结果,此时得到 K2 为每一个单词,List(V2) 为可迭代集合,V2就是 Mapping 中的V2;
  • Reducing : 这里的案例是统计单词出现的总次数,所以 Reducing 对 List(V2) 进行归约求和操作,最终输出。

 

二:代码实现:

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
 // Mapper实现
/**
 * KEYIN: Map任务读数据的key类型,offset,是每行数据起始位置的偏移量,Long
 * VALUEIN:Map任务读数据的value类型,其实就是一行行的字符串,String
 *
 * hello world welcome
 * hello welcome
 *
 * KEYOUT: map方法自定义实现输出的key的类型,String
 * VALUEOUT: map方法自定义实现输出的value的类型,Integer
 *
 *
 * 词频统计:相同单词的次数   (word,1)
 *
 * Long,String,String,Integer是Java里面的数据类型
 * Hadoop自定义类型:序列化和反序列化
 *
 * LongWritable,Text
 *
 */

public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable>{

	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		
		// 把value对应的行数据按照指定的分隔符拆开
        String[] words = value.toString().split("\t");
        for(String word : words) {
            // (hello,1)  (world,1)
            context.write(new Text(word.toLowerCase()), new IntWritable(1));
        }
	}
}

// Reducer实现

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text,IntWritable, Text,IntWritable>{

/**
* (hello,1) (world,1)
* (hello,1) (world,1)
* (hello,1) (world,1)
* (welcome,1)
*
* map的输出到reduce端,是按照相同的key分发到一个reduce上去执行
*
* reduce1: (hello,1)(hello,1)(hello,1) ==> (hello, <1,1,1>)
* reduce2: (world,1)(world,1)(world,1) ==> (world, <1,1,1>)
* reduce3 (welcome,1) ==> (welcome, <1>)
*
*
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int count = 0;

Iterator<IntWritable> iterator = values.iterator();

//<1,1,1>
while (iterator.hasNext()) {
IntWritable value = iterator.next();
count += value.get();
}

context.write(key, new IntWritable(count));
}

}

 

Combiner操作:map端的聚合操作(这个聚合操作和reducer逻辑是完全一样的)
 优点:能减少IO,提升作业的执行性能,节省网络开销
 局限性:求平均数

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.net.URI;

// Combiner实现
/** * Combiner操作:map端的聚合操作(这个聚合操作和reducer逻辑是完全一样的) * 优点:能减少IO,提升作业的执行性能,节省网络开销 * 局限性:求平均数 */ public class WordCountCombinerLocalApp { public static void main(String[] args) throws Exception{ Configuration configuration = new Configuration(); // 创建一个Job Job job = Job.getInstance(configuration); // 设置Job对应的参数: 主类 job.setJarByClass(WordCountCombinerLocalApp.class); // 设置Job对应的参数: 设置自定义的Mapper和Reducer处理类 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // 添加Combiner的设置即可 job.setCombinerClass(WordCountReducer.class); // 设置Job对应的参数: Mapper输出key和value的类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 设置Job对应的参数: Reduce输出key和value的类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 设置Job对应的参数: 作业输入和输出的路径 FileInputFormat.setInputPaths(job, new Path("input/wc.input")); FileOutputFormat.setOutputPath(job, new Path("output/wc")); // 提交job boolean result = job.waitForCompletion(true); System.exit(result ? 0 : -1); } }