首页 > 分布式 > Hadoop > Hadoop MapReduce编程模型
2015
06-26

Hadoop MapReduce编程模型

3、第二个MapReduce程序:TrafficStatistics

业务场景:我们每天在使用智能手机时,都会产生大量有用的日志,流量日志就是其中的一种。流量日志记录了用户的上网行为,包括浏览内容、上行/下行流量等等。下图截取了某运营商流量日志的一部分:

05. MapReduce编程模型5514

其中倒数第二、第三行分别记录了用户的下行和上行流量。完整的流量日志文件请点此下载:百度网盘

由于运营商的流量日志内容巨多,下面考虑使用MapReduce统计用户上行/下行流量,即TrafficStatistics程序。程序中包含的Java类具体如下:

1)定义一个Bean,封装上行流量、下行流量和总流量

public class TrafficBean implements Writable{
	private long up_traffic; //上行流量
	private long down_traffic; //下行流量
	private long sum_traffic; //总流量

	public TrafficBean(long up_traffic, long down_traffic) {
		this.up_traffic = up_traffic;
		this.down_traffic = down_traffic;
		this.sum_traffic = up_traffic + down_traffic;
	}

	//反序列化时需要用到反射机制,所以必须有一个默认构造方法
	public TrafficBean() {}

	/**
	 * 序列化时,将对象的各个组成部分按byte流顺序写入output
	 */
	public void write(DataOutput out) throws IOException {
		out.writeLong(up_traffic);
		out.writeLong(down_traffic);
		out.writeLong(sum_traffic);
		//So, the flow sequence is: --> sum_traffic --> down_traffic --> up_traffic -->
	}

	/**
	 * 反序列化时,从一个数据输入流中按顺序读出对象中的各个组成部分
	 * 读取的顺序一定要与写入的顺序保持一致!!!
	 */
	public void readFields(DataInput in) throws IOException {
		up_traffic = in.readLong();
		down_traffic = in.readLong();
		sum_traffic = in.readLong();
	}

	public long getUp_traffic() {
		return up_traffic;
	}

	public long getDown_traffic() {
		return down_traffic;
	}

	public long getSum_traffic() {
		return sum_traffic;
	}

	/**
	 * 将TrafficBean对象写入文件时写入的内容
	 */
	public String toString() {
		return up_traffic + "\t" + down_traffic + "\t" + sum_traffic;
	}
}

关于Hadoop中序列化机制的说明:

  • Hadoop中并没有直接使用JAVA序列化机制,而是提供了Writable接口。Hadoop的序列化机制较Java中的序列化更为简洁高效,这可以提高系统中的网络IO效率。举个例子:JAVA中的对象序列化时,对象的继承结构也被序列化,反序列化后还会还原出其继承结构,因此序列化一个Dog,还需要序列化Animal。所以Java序列化一个类时,需要多序列化很多东西,在网络中传输时需要占用额外的带宽,而Hadoop序列化时就不需要序列化这种继承结构,因为Hadoop中传输的对象都是为了封装数据,没有太多的逻辑,也不需要继承结构。
  • Java基本类型对应的Hadoop中的序列化类如下图所示:

05. MapReduce编程模型7234

  • MapReduce中的任意key和value都要实现Writable接口,并且key还要实现Comparable接口。因此定义了一个WritableComparable接口,让key实现该接口即可。
public interface WritableComparable<T> extends Writable, Comparable<T> {
}

本例中将TrafficBean作为value进行传输,因此只需要实现Writable接口即可。

2)TrafficStatisticsMapper

public class TrafficStatisticsMapper extends
		Mapper<LongWritable, Text, Text, TrafficBean> {

	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		//拿到一行的内容
		String line = value.toString();
		//切分出各个字段
		String[] fields = StringUtils.split(line, '\t');

		//拿到代表号码、上行/下行流量的字段值
		String telNumer = fields[1];
		long up_traffic = Long.parseLong(fields[fields.length - 3]);
		long down_traffic = Long.parseLong(fields[fields.length - 2]);

		//封装到bean
		TrafficBean tb = new TrafficBean(up_traffic, down_traffic);

		//将这个手机号的号码和流量bean输出为一堆key-value
		context.write(new Text(telNumer), tb);
	}
}

3)TrafficStatisticsReducer

public class TrafficStatisticsReducer extends
		Reducer<Text, TrafficBean, Text, TrafficBean> {

	/**
	 * reduce输入的数据形如:<手机号,{trafficBean1, trafficBean2……}>
	 */
	protected void reduce(Text key, Iterable<TrafficBean> values, Context context)
			throws IOException, InterruptedException {
		//定义上下行流量的计数器
		long up_traffic_counter = 0;
		long down_traffic_counter = 0 ;

		//循环遍历values进行累加
		for (TrafficBean value : values) {
			up_traffic_counter = value.getUp_traffic();
			down_traffic_counter = value.getDown_traffic();
		}

		//将统计结果封装成trafficBean并输出
		TrafficBean resultBean = new TrafficBean(up_traffic_counter, down_traffic_counter);
		context.write(key, resultBean);
	}
}

4)TrafficStatisticsRunner

public class TrafficStatisticsRunner {

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);

		job.setJarByClass(TrafficStatisticsRunner.class);

		job.setMapperClass(TrafficStatisticsMapper.class);
		job.setReducerClass(TrafficStatisticsReducer.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(TrafficBean.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(TrafficBean.class);

		FileInputFormat.setInputPaths(job, new Path("/home/hadoop/traffic-statistics/traffic-file"));
		FileOutputFormat.setOutputPath(job, new Path("/home/hadoop/traffic-statistics/dest"));

		job.waitForCompletion(true);
	}

}

5)运行TrafficStatistics

运行TrafficStatistics程序,结果输出到/home/hadoop/traffic-statistics/dest/part-r-00000文件中,如下:

13480253104 180 180 360
13502468823 7335 110349 117684
13560436666 1116 954 2070
13560439658 918 4938 5856
13602846565 1938 2910 4848
13660577991 6960 690 7650
13719199419 240 0 240
13726230503 2481 24681 27162
13726238888 2481 24681 27162
13760778710 120 120 240
13826544101 264 0 264
13922314466 3008 3720 6728
13925057413 11058 48243 59301
13926251106 240 0 240
13926435656 132 1512 1644
15013685858 3659 3538 7197
15920133257 3156 2936 6092
15989002119 1938 180 2118
18211575961 1527 2106 3633
18320173382 9531 2412 11943
84138413 4116 1432 5548

三、Hadoop MapReduce其他组件

1、局部汇总:Combiner

mapper可能会产生大量的输出,此时带宽容易成为性能瓶颈,Combiner的作用就是在mapper任务的本地对输出先做一次合并,以减少系统中的数据传输量。

Combiner和Reducer写法完全一样,都是继承一个Reducer,但是它在mapper的本地运行,做一些局部汇总操作,减少中间结果。其作用有:①减少网络传输量(网络IO) ②减少Reducer的计算压力

在上面WordCount例子中,mapper任务会产生大量形如<hello,1><tom,1><hello,1>……的输出,使用Combiner就可以对每个mapper任务产生的结果进行一次本地汇总,输出<hello,12><tom,1>……以减少网络IO并减轻Reducer的计算压力。由于WordCount程序中所使用的Combiner的逻辑和Reducer的逻辑一致,因此可以直接使用Reducer作为Combiner。修改WordCountRunner如下:

//设置本次作业的Mapper类和Reducer类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//设置Combiner类
job.setCombinerClass(WordCountReducer.class);

使用Combiner时一定要注意,不是什么情况都能用Combiner,前提是不能影响最终逻辑运算结果才行。如下图所示的计算场景(求平均数)就不能使用Combiner:

05. MapReduce编程模型11022

2、结果分组:Partitioner

业务场景:在上面的TrafficStatistics程序中,最后的结果都会输出到一个文件中,每次查询都要查询整个结果集,如果数据特别多就很不方便。因此想到对结果分文件存储,如根据省份存储结果,在查询时,根据手机号所属省份直接查询对应的结果文件即可。这种需求可以利用Partitioner进行实现。

如何实现:MapReduce程序生成的结果文件数跟reduce任务个数有关,由于TrafficStatistics程序中只有一个reduce任务,因此所有的结果都输出到一个文件中(即part-r-00000)。如果对每个省份的数据分配一个reduce任务进行汇总,并且定义这样一个Partitioner——根据每条数据中的key值,选择对应的reduce任务,即可完成该场景中的需求。

下面的TrafficStatistics_Partition程序在上面TrafficStatistics程序的基础上进行修改:

1)新增AreaPartitioner类:定义分组的逻辑

public class AreaPartitioner extends Partitioner<Text, TrafficBean>{

	/**
	 * 此处用areaMap模拟“手机号-省份”的数据库
	 */
	private static HashMap<String,Integer> areaMap = new HashMap<String, Integer>();

	static{ //初始化areaMap
		areaMap.put("137", 0);
		areaMap.put("138", 1);
		areaMap.put("139", 2);
		areaMap.put("135", 3);
		areaMap.put("159", 4);
	}

	/**
	 * Partitioner中的逻辑:根据不同的“省份”,返回不同的数字,“数据库”找不到的返回5
	 */
	@Override
	public int getPartition(Text key, TrafficBean value, int numPartitions) {
		Integer areaCode = areaMap.get(key.toString().substring(0, 3));
		return areaCode!=null ? areaCode : 5;
	}

}

2)重写TrafficStatisticsRunner:将Partitioner类注册到MapReduce作业中,并指定reduce任务数。

public class TrafficStatisticsRunner {

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);

		job.setJarByClass(TrafficStatisticsRunner.class);

		job.setMapperClass(TrafficStatisticsMapper.class);
		job.setReducerClass(TrafficStatisticsReducer.class);

		//将Partitioner类注册到job对象中
		job.setPartitionerClass(AreaPartitioner.class);

		//设置reduce任务数(从args参数传入)
		job.setNumReduceTasks(Integer.parseInt(args[2]));

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(TrafficBean.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(TrafficBean.class);

		//MapReduce作业的输入输出路径从参数传入
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		job.waitForCompletion(true);
	}

}

3)运行TrafficStatistics_Partition程序

将TrafficStatistics_Partition程序打成Jar包,命名为ts.jar。提交ts.jar到Hadoop集群运行,并指定reduce任务数为6,如下:

#在HDFS上创建输入路径,并上传日志文件
[hadoop@hadoop01 ~]$ hadoop fs -mkdir -p /ts/input
[hadoop@hadoop01 ~]$ hadoop fs -put traffic-statistics/traffic-file/traffic-log.txt /ts/input
#利用hadoop jar命令将任务提交到集群上。
[hadoop@hadoop01 ~]$ hadoop jar ts.jar org.flyne.mapreduce.trafficstatistics.partitioner.TrafficStatisticsRunner /ts/input /ts/output 6

运行完毕后查看程序的输出路径/ts/output,共生成6个输出文件,这和启动的reduce任务数一致。并随机检查一个文件中的内容(如part-r-00003),可以发现和我们期望的结果一致。

05. MapReduce编程模型13578

对结果分组的一些说明:

①本例中设置了reduce任务数为6:job.setNumReduceTasks(6),这和分组数目保持了一致。关于reduce任务数和分组数的关系说明如下:

  • 如果reduce任务数超过分组的数量,不报错,但是多余的reducer输出空文件
  • 如果reduce任务数少于分组的数量,会报错,因为有一些分组找不到对应的reducer
  • 如果reduce任务数为1,也不会报错,这时getPartition()方法失效,所有的分组都到一个reducer中去

②HashPartitioner是MapReduce指定的默认分组方式。其定义如下:

public class HashPartitioner<K, V> extends Partitioner<K, V> {

  public int getPartition(K key, V value, int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }

}

3、排序sort:Comparable

在TrafficStatistics程序中提到:MR程序中的key除了要实现Writable接口外,还需要实现一个Comparable接口。注:Comparable接口不是Hadoop MapReduce提供的编程接口。

业务场景:在文件/home/hadoop/traffic-statistics/dest/part-r-00000(TrafficStatistics程序的输出文件)中,内容是按照手机号进行排序的,现在的业务场景是让最后的结果按照总流量从高到低的顺序排序。

如何实现:在shuffle的过程中有一个排序的过程,它是根据Mapper任务输出的key进行排序的,这也是前文中提到key需要实现Comparable接口的原因。由于本例需要根据总流量排序,因此让TrafficBean实现Comparable接口并作为key进行传输,可以实现该需求。

下面是TrafficStatistics_Sort程序的开发过程:

1)重新定义TrafficBean:增加手机号字段;增加实现Comparable接口

public class TrafficBean implements WritableComparable<TrafficBean>{
	//手机号
	private String phoneNbr;
	private long up_traffic;
	private long down_traffic;
	private long sum_traffic;

	public TrafficBean(String phoneNbr, long up_traffic, long down_traffic) {
		this.phoneNbr = phoneNbr;
		this.up_traffic = up_traffic;
		this.down_traffic = down_traffic;
		this.sum_traffic = up_traffic + down_traffic	;
	}

	public TrafficBean() {}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(phoneNbr);
		out.writeLong(up_traffic);
		out.writeLong(down_traffic);
		out.writeLong(sum_traffic);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		phoneNbr = in.readUTF();
		up_traffic = in.readLong();
		down_traffic = in.readLong();
		sum_traffic = in.readLong();
	}

	public String getPhoneNbr(){
		return phoneNbr;
	}

	public long getUp_traffic() {
		return up_traffic;
	}

	public long getDown_traffic() {
		return down_traffic;
	}

	public long getSum_traffic() {
		return sum_traffic;
	}

	/**
	 * 根据sum_traffic降序排序的逻辑
	 */
	@Override
	public int compareTo(TrafficBean o) {
		return this.sum_traffic > o.getSum_traffic() ? -1 : 1;
	}

	public String toString() {
		return phoneNbr + "\t" + up_traffic + "\t" + down_traffic + "\t" + sum_traffic;
	}
}

2)SortMapper、SortReducer:将TrafficBean作为key传输

public class SortMapper extends Mapper<LongWritable, Text, TrafficBean, NullWritable>{

	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		String[] fields = StringUtils.split(value.toString(), "\t");

		//提取需要的字段
		String phoneNbr = fields[0];
		long up_traffic = Long.parseLong(fields[1]);
		long down_traffic = Long.parseLong(fields[2]);

		TrafficBean tBean = new TrafficBean(phoneNbr, up_traffic, down_traffic);

		context.write(tBean, NullWritable.get());
	}
}

public class SortReducer extends Reducer<TrafficBean, NullWritable, TrafficBean, NullWritable>{

	@Override
	protected void reduce(TrafficBean key, Iterable<NullWritable> values, Context context)
			throws IOException, InterruptedException {
		context.write(key, NullWritable.get());
	}

}

需要指出的是,本程序依赖于TrafficStatistics程序,即输入数据来源于TrafficStatistics程序的输出文件。

3)SortRunner

public class SortRunner {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);

		job.setJarByClass(SortRunner.class);

		job.setMapperClass(SortMapper.class);
		job.setReducerClass(SortReducer.class);

		job.setOutputKeyClass(TrafficBean.class);
		job.setOutputValueClass(NullWritable.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		job.waitForCompletion(true);
	}
}

4)运行TrafficStatistics_Sort程序

将程序打成jar包,命名为ts_sort.jar。提交ts.jar到Hadoop集群运行,如下:

#TrafficStatistics_Sort程序的输入数据即TrafficStatistics程序的输出文件
[hadoop@hadoop01 ~]$ hadoop fs -put traffic-statistics/dest/part-r-00000 /ts-sort/input
#提交任务至集群
[hadoop@hadoop01 ~]$ hadoop jar ts_sort.jar org.flyne.mapreduce.trafficstatistics.sort.SortRunner /ts-sort/input /ts-sort/output

程序输出的文件即为我们所需的排序好的结果。


Hadoop MapReduce编程模型》有 1 条评论

  1. 小恒 说:

    MAKING-H无人售货机加盟http://www.makingh.com

留下一个回复

你的email不会被公开。