首页 > 分布式 > Hadoop > Hadoop MapReduce编程模型
2015
06-26

Hadoop MapReduce编程模型

本文的行文思路如下:

05. MapReduce编程模型32

一、MapReduce

1、什么是MapReduce?

MapReduce是由Google提出的一个分布式计算模型,用来解决海量数据的计算问题。举个例子说明其解决问题的思想:

05. MapReduce编程模型122

MapReduce由两个阶段组成:Map和Reduce。在Map阶段,将一个大任务分解成小任务,并分发给每个节点,每个节点并行处理这些任务,处理速度很快;最后在Reduce阶段对Map的结果汇总即可,在不要求全局汇总的情况下Reduce阶段也可以并发。

2、Hadoop MapReduce

Hadoop中的MapReduce模块(简称Hadoop MapReduce)是对MapReduce思想的具体实现,主要有两部分组成:编程模型和运行时环境。

  • 编程模型:即Hadoop MapReduce对外提供的编程接口
  • 运行时环境:做一些较为复杂的工作,如任务分发、节点间的通信、节点失效、数据切分等,用户无需关心这些细节

借助Hadoop MapReduce提供的编程接口,我们可以快速的写出分布式计算程序(如WordCount程序中只需实现map()和reduce()两个函数即可),而无需关注分布式环境的一些实现细节(即运行时环境)。

编程模型为用户提供了5个可编程组件,分别是InputFormat、Mapper、Partitioner、Reducer、OutputFormat。【还有一个组件是Combiner、但它实际上也是一个Reducer】

Hadoop MapReduce实现了很多可直接使用的InputFormat、Partitioner、OutputFormat,大部分情况下,用户只需编写Mapper和Reducer即可。

二、Hadoop MapReduce基础组件

1、Hadoop MapReduce原理

下图展示了Hadoop MapReduce原理:

05. MapReduce编程模型817

对上图的说明如下:

1)Mapper任务

① 读取输入文件内容(可以来自于本地文件系统,或HDFS文件系统等),对输入文件的每一行,解析成key-value对[K1,V1]。K1表示行起始偏移量,V1表示读取的一行内容。

② 调用map()方法,将[K1,V1]作为参数传入。在map()方法中封装了数据处理的逻辑,对输入的key、value进行处理。

③ map()方法处理的结果也用key-value的方式进行输出,记为[K2, V2]。

2)Reducer任务

① 在执行Reducer任务之前,有一个shuffle的过程对多个mapper任务的输出进行合并、排序,输出[K2, {V2, ...}]。

② 调用reduce()方法,将[K2, {V2, ...}]作为参数传入。在reducer()方法中封装了数据汇总的逻辑,对输入的key、value进行汇总处理。

③ reduce()方法的输出被保存到指定的目录下。

2、第一个MapReduce程序:WordCount

WordCount程序之于MapReduce就像HelloWorld程序之于任何编程语言,都是初学者接触的第一个程序。这儿不再赘述WordCount程序实现的功能。利用MapReduce实现的WordCount程序的思想如下图:

05. MapReduce编程模型1377

下面是Hadoop MapReduce版的WordCount程序的开发过程:

1)搭建MapReduce开发环境:

本程序在CentOS虚拟机上开发,环境的搭建包括安装CentOS虚拟机、JDK、Eclipse以及网络配置,这些不再赘述。

在Eclipse中新建wordcount项目,添加项目所需jar包,有两种方式:

  • 通过build path直接添加jar包:MapReduce程序需要Hadoop Common模块、MapReduce模块和YARN模块的支持,因此需要添加HADOOP_HOME/share/hadoop/common、mapreduce和yarn目录下的jar包。
  • 如果你的项目是个Maven项目,直接在pom.xml文件中添加hadoop-common、mapreduce和yarn的依赖即可。

本文程序采用第一种方式管理jar包。

2)编写Mapper类:

public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

	/**
	 * 在执行Mapper任务之前,MapReduce框架中有一个组件(FileInputFormat的一个实例对象)会读取文件中的一行,将这一行的起始偏移量和行内容封装成key-value,作为参数传入map()方法。
	 */
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		//拿到一行的内容
		String line = value.toString();
		//将行内容切分成单词数组
		String[] words = StringUtils.split(line, ' ');

		for (String word : words) {
			//输出  <单词,1> 这样的key-value对
			context.write(new Text(word), new LongWritable(1L));
		}
	}
}

编写Mapper类应注意:

  • Mapper类中的四个泛型分别表示KEYIN、VALUEIN、KEYOUT和VALUEOUT。其中KEYIN、VALUEIN的类型默认为LongWritable和Text,表示MR程序输入文件中一行的起始偏移量和行内容。KEYOUT和VALUEOUT同map阶段的输出key-value类型一致。
  • Hadoop中的LongWritable、Text就相当于Java中的Long和String类型,它是Hadoop利用自己的序列化机制对long和string的封装。在Hadoop中有自己的序列化机制(实现Writable接口),它比Java中的序列化机制更加简洁高效。

3)编写WordCountReducer类

public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

	/**
	 * 在调用reduce方法之前有个shuffle过程
	 * reduce()方法的输入数据来自于shuffle的输出,格式形如:<hello, {1,1,……}>
	 */
	protected void reduce(Text key, Iterable<LongWritable> values, Context context)
			throws IOException, InterruptedException {
		//定义一个累加计数器
		long counter = 0;

		//统计单词出现次数
		for (LongWritable value : values) {
			counter += value.get();
		}

		//输出key表示的单词的统计结果
		context.write(key, new LongWritable(counter));
	}

}

下图是Mapper、Reducer类中泛型的说明

05. MapReduce编程模型3194

4)编写WordCountRunner类

/**
 * 定义一个MapReduce程序的入口
 * 本类中有一个Job对象,该对象用于描述一个MapReduce作业,如Mapper类、Reducer类,以及map/reduce过程的输出类型等等
 */
public class WordCountRunner {
	public static void main(String[] args) throws Exception {
		//实例化一个Job对象
		Configuration conf = new Configuration(); //加载配置文件
		Job job = Job.getInstance(conf);

		//设置Job作业所在jar包
		job.setJarByClass(WordCountRunner.class);

		//设置本次作业的Mapper类和Reducer类
		job.setMapperClass(WordCountMapper.class);
		job.setReducerClass(WordCountReducer.class);

		//设置Mapper类的输出key-value类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(LongWritable.class);

		//设置Reducer类的输出key-value类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);

		//指定本次作业要处理的原始文件所在路径(注意,是目录)
		FileInputFormat.setInputPaths(job, new Path("/home/hadoop/wordcount/words-file"));
		//指定本次作业产生的结果输出路径(也是目录)
		FileOutputFormat.setOutputPath(job, new Path("/home/hadoop/wordcount/dest"));

		//提交本次作业,并打印出详细信息
		job.waitForCompletion(true);
	}
}

编写WordCountRunner类应注意:

  • 如果没有单独设置Mapper类的输出key-value类型,那么setOutputKeyClass()、setOutputValueClass()方法对Mapper类也奏效。
  • 在本例中MapReduce作业读写的是本地文件,但MR程序也可以读写HDFS上的文件,而且后者使用的更多。
  • setInputPaths()指定输入的原始文件的路径时,是指文件所在的目录,该目录下的所有文件都会被当成输入文件;setOutputPath()方法指定输出文件的目录,一定要注意的是,不能指定一个已存在的目录作为输出目录,否则会报FileAlreadyExistsException的错误。
  • 本例直接在程序中指定了路径仅仅是说明方便。。。。。但是实际开发时一般不直接在程序中指定路径,而是将输入输出路径作为main方法的参数传入(☆)。

5)运行WordCount程序

  • 在Eclipse中直接运行WordCountRunner类:该模式下可以对MapReduce程序进行断点调试。
  • 打成Jar包,提交到集群上运行:将程序打成jar包,命名为wordcount.jar。执行该jar包的命令为:hadoop jar xxx.jar MainClass。此时一定要注意路径的问题!!!

执行结束后,在setOutputPath指定的目录下可以看到MR作业的输出文件:_SUCCESS和part-r-00000。part-r-00000文件内容如下图:

This 1
WordCount 1
a 1
file 1
flyne 1
hello 3
is 1
jack 1
just 1
of 1
program. 1
test 1
tom 1

Question:setInputPaths和setOutputPath中的路径是本地路径还是HDFS路径由什么决定?即判断一个MR作业使用何种文件系统

Answer:MR作业即job对象,一个job对象是通过Job.getInstance(conf)获得,在conf中可以设置fs.defaultFS属性,该属性即表示了MR作业所使用的文件系统。默认情况下MR作业使用本地文件系统,如果设置了fs.defaultFS为hdfs://xxxx:9000,则使用HDFS文件系统。

本例中的输入输出路径明显是Linux下的本地路径,如果非要将本例中的wordcount.jar提交到hadoop集群上执行,必须要在HDFS上创建对应的目录并提交原始单词文件。执行命令如下图:

05. MapReduce编程模型5301

MR程序中可以使用本地文件系统和HDFS,这也表明MR程序和底层文件系统是解耦的,实际上在MR框架中是通过使用不同的FileSystem实现类访问不同的文件系统。


Hadoop MapReduce编程模型》有 1 条评论

  1. 小恒 说:

    MAKING-H无人售货机加盟http://www.makingh.com

留下一个回复

你的email不会被公开。