guangzhou



shanghai

Recent posts:
Blog index
About
RSS

Hadoop in Action–运行实例

September 06, 2015     Hadoop   711   

原创文章,转载请注明: 转载自工学1号馆

在上篇文章《Hadoop in Action–获取专利数据集》中,获取了实验所需的数据,这篇文章来运行一个mapreduce实例

MapReduce介绍

将任务细化,让不同的节点处理不同部分。处理完后,再把各自的结果进行统一。它通过键值对来处理数据。但键和值的类型都有要求。
通过JAVA编写的 MapReduce 程序中。值的类型必须实现了 Writable, 因为它是要被写入文件中的。而实现了 WritableComparable 接口的类,即可以是键也可以是值。因为它是可写的,所以可以是值;而它又是可进行比较的,所以作为键。
一个可执行的 MapReduce JAVA 程序应该包括如下内容:
1.一个 main 方法,用来定义整个流程,接收的参数等。
2.定义 Mapper, 定义如何划分数据生成的值也是键值对形式。
3.定义 Reducer, 定义如何处理结果。最后生成的值也是键值对形式。
4.定义一个 Job, 将 Mapper, Reducer 等必要的值设置进去。
MapReduce 通过操作 键/值 对来处理数据,一般形式是:
map: 将 (K1, V1) 的输入转化成 list(K2, V2) 的输出
reduce: 将 (K2, list(V2)) 的输入转化成 list(K3, V3) 的输出
比如,一个网络游戏,有多个区,而角色又分几个种族。这时要分析每个区每个种族分别有多少。
整个分析的输入就是所有的这些数据,可能是数据库数据,CVS 形式的数据表。
经过 Map 方法后,数据分析任务分配给不同的 DataNode 。每个 DataNode 上的数据可能是:A区 X 族: 5W 人; A区 Y 族: 6W 人; B 区 X 族 4W 人…
当然,这些数据是 键/值 对形式存储的。
然后,这些数据再经过一个被称为洗牌的过程,将不同种族的 键/值 对放到不同的 DataNode 上。
再经过 Reduce, 得到最终结果:X 族 8W; Y 族: 6W ;当然,结果形式还是 键/值 对。
Mapper
—————————————————–
Mapper 的定义是由一个类来实现的。它必须继承 MapReduceBase 基类并实现 Mapper 接口。
Mapper 只有一个方法:map用于处理单独的键值对。运行的时候键值对是由 Task 传递过来的。所以这里只需要定义如何处理就行,不用关心谁调用。
Reducer
—————————————————–
Reducer 的定义是由一个类来实现的。它必须继承 MapReduceBase 基类并实现 Reducer 接口。
当 Reducer 任务接收来自各个 mapper 的输出时,它按键值对中的键,对数据进行排序,并将相同的值的值归并。然后调用 reduce() 方法。

实例

需求: 解析各个专利被其它哪些专利引用了.
上传要解析的文件到 HDFS:
wu@ubuntu:~/opt/hadoop-1.0.1$ bin/hadoop dfs -put cite75_99.txt cite75_99.txt

查看是否上传成功:

wu@ubuntu:~/opt/hadoop-1.0.1$ bin/hadoop dfs -ls
得到的结果如下:
 Found 3 items
-rw-r–r– 1 wu supergroup 264075431 2015-09-05 03:07 /user/wu/cite75_99.txt
drwxr-xr-x – wu supergroup 0 2015-06-29 01:05 /user/wu/in
drwxr-xr-x – wu supergroup 0 2015-06-29 01:15 /user/wu/out
说明上传成功!
代码如下:
import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.util.Tool;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.ToolRunner;

public class MyJob extends Configured implements Tool{
	public static class MapClass extends MapReduceBase
		implements Mapper<Text, Text, Text, Text> {
		public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
			output.collect(value, key);
		}
	}
	public static class Reduce extends MapReduceBase 
		implements Reducer<Text, Text, Text, Text> {
		public void reduce(Text key, Iterator<Text> values,
							OutputCollector<Text, Text> output,
							Reporter reporter) throws IOException {
			String csv = "";
			while(values.hasNext()) {
				if(csv.length() > 0) csv += ",";
				csv += values.next().toString();
			}
			output.collect(key, new Text(csv));
		}
	}
	public int run(String[] args) throws Exception {
		Configuration conf = getConf();
		JobConf job = new JobConf(conf, MyJob.class);
		
		Path in = new Path(args[0]);
		Path out = new Path(args[1]);
		FileInputFormat.setInputPaths(job, in);
		FileOutputFormat.setOutputPath(job, out);
		
		job.setJobName("MyJob");
		job.setMapperClass(MapClass.class);
		job.setReducerClass(Reduce.class);
		
		job.setInputFormat(KeyValueTextInputFormat.class);
		job.setOutputFormat(TextOutputFormat.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.set("key.value.separator.in.input.line", ",");
		JobClient.runJob(job);
		
		return 0;
	}
	public static void main(String[] args) throws Exception{
		int res = ToolRunner.run(new Configuration(), new MyJob(), args);
		System.exit(res);
	}
}
编译 JAVA 代码;
wu@ubuntu:~$ javac -classpath ~/opt/hadoop-1.0.1/hadoop-core-1.0.1.jar -d ~/opt/myjob/class ~/opt/myjob/src/MyJob.java

生成 JAR 包:

wu@ubuntu:~$ jar -cvf ~/opt/myjob/myjob.jar -C ~/opt/myjob/class/ .

Hadoop 执行 JAR:

wu@ubuntu:~$ ~/opt/hadoop-1.0.1/bin/hadoop jar ~/opt/myjob/myjob.jar MyJob /user/wu/cite75_99.txt /user/wu/myjob

执行过程如下:

15/09/05 19:25:45 INFO mapred.FileInputFormat: Total input paths to process : 1
15/09/05 19:25:45 INFO mapred.JobClient: Running job: job_201509050137_0003
15/09/05 19:25:46 INFO mapred.JobClient: map 0% reduce 0%
15/09/05 19:26:07 INFO mapred.JobClient: map 9% reduce 0%
15/09/05 19:26:12 INFO mapred.JobClient: map 13% reduce 0%
15/09/05 19:26:15 INFO mapred.JobClient: map 19% reduce 0%
15/09/05 19:26:18 INFO mapred.JobClient: map 22% reduce 0%
15/09/05 19:26:22 INFO mapred.JobClient: map 25% reduce 0%
15/09/05 19:26:24 INFO mapred.JobClient: map 31% reduce 0%
15/09/05 19:26:27 INFO mapred.JobClient: map 35% reduce 0%
15/09/05 19:26:30 INFO mapred.JobClient: map 41% reduce 0%
15/09/05 19:26:33 INFO mapred.JobClient: map 46% reduce 0%
15/09/05 19:26:36 INFO mapred.JobClient: map 49% reduce 0%
15/09/05 19:27:59 INFO mapred.JobClient: map 54% reduce 0%
15/09/05 19:28:03 INFO mapred.JobClient: map 57% reduce 0%
15/09/05 19:28:05 INFO mapred.JobClient: map 60% reduce 0%
15/09/05 19:28:08 INFO mapred.JobClient: map 63% reduce 0%
15/09/05 19:28:11 INFO mapred.JobClient: map 66% reduce 0%
15/09/05 19:28:16 INFO mapred.JobClient: map 70% reduce 0%
15/09/05 19:28:19 INFO mapred.JobClient: map 73% reduce 0%
15/09/05 19:28:21 INFO mapred.JobClient: map 76% reduce 0%
15/09/05 19:28:24 INFO mapred.JobClient: map 78% reduce 0%
15/09/05 19:28:29 INFO mapred.JobClient: map 81% reduce 8%
15/09/05 19:28:33 INFO mapred.JobClient: map 86% reduce 8%
15/09/05 19:28:35 INFO mapred.JobClient: map 89% reduce 8%
15/09/05 19:28:38 INFO mapred.JobClient: map 92% reduce 8%
15/09/05 19:28:44 INFO mapred.JobClient: map 98% reduce 8%
15/09/05 19:28:49 INFO mapred.JobClient: map 99% reduce 8%
15/09/05 19:28:58 INFO mapred.JobClient: map 100% reduce 8%
15/09/05 19:29:15 INFO mapred.JobClient: map 100% reduce 16%
15/09/05 19:29:43 INFO mapred.JobClient: map 100% reduce 25%
15/09/05 19:29:53 INFO mapred.JobClient: map 100% reduce 66%
15/09/05 19:29:56 INFO mapred.JobClient: map 100% reduce 70%
15/09/05 19:29:59 INFO mapred.JobClient: map 100% reduce 72%
15/09/05 19:30:02 INFO mapred.JobClient: map 100% reduce 75%
15/09/05 19:30:05 INFO mapred.JobClient: map 100% reduce 77%
15/09/05 19:30:08 INFO mapred.JobClient: map 100% reduce 79%
15/09/05 19:30:11 INFO mapred.JobClient: map 100% reduce 81%
15/09/05 19:30:15 INFO mapred.JobClient: map 100% reduce 83%
15/09/05 19:30:18 INFO mapred.JobClient: map 100% reduce 85%
15/09/05 19:30:21 INFO mapred.JobClient: map 100% reduce 87%
15/09/05 19:30:24 INFO mapred.JobClient: map 100% reduce 90%
15/09/05 19:30:27 INFO mapred.JobClient: map 100% reduce 93%
15/09/05 19:30:30 INFO mapred.JobClient: map 100% reduce 95%
15/09/05 19:30:33 INFO mapred.JobClient: map 100% reduce 97%
15/09/05 19:30:40 INFO mapred.JobClient: map 100% reduce 100%
15/09/05 19:30:50 INFO mapred.JobClient: Job complete: job_201509050137_0003
15/09/05 19:30:51 INFO mapred.JobClient: Counters: 30
15/09/05 19:30:51 INFO mapred.JobClient: Job Counters
15/09/05 19:30:51 INFO mapred.JobClient: Launched reduce tasks=1
15/09/05 19:30:51 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=441444
15/09/05 19:30:51 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
15/09/05 19:30:51 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
15/09/05 19:30:51 INFO mapred.JobClient: Launched map tasks=4
15/09/05 19:30:51 INFO mapred.JobClient: Data-local map tasks=4
15/09/05 19:30:51 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=190932
15/09/05 19:30:51 INFO mapred.JobClient: File Input Format Counters
15/09/05 19:30:51 INFO mapred.JobClient: Bytes Read=264087722
15/09/05 19:30:51 INFO mapred.JobClient: File Output Format Counters
15/09/05 19:30:51 INFO mapred.JobClient: Bytes Written=158078539
15/09/05 19:30:51 INFO mapred.JobClient: FileSystemCounters
15/09/05 19:30:51 INFO mapred.JobClient: FILE_BYTES_READ=735648003
15/09/05 19:30:51 INFO mapred.JobClient: HDFS_BYTES_READ=264088106
15/09/05 19:30:51 INFO mapred.JobClient: FILE_BYTES_WRITTEN=1032876240
15/09/05 19:30:51 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=158078539
15/09/05 19:30:51 INFO mapred.JobClient: Map-Reduce Framework
15/09/05 19:30:51 INFO mapred.JobClient: Map output materialized bytes=297120333
15/09/05 19:30:51 INFO mapred.JobClient: Map input records=16522439
15/09/05 19:30:51 INFO mapred.JobClient: Reduce shuffle bytes=297120333
15/09/05 19:30:51 INFO mapred.JobClient: Spilled Records=57431615
15/09/05 19:30:51 INFO mapred.JobClient: Map output bytes=264075431
15/09/05 19:30:51 INFO mapred.JobClient: Total committed heap usage (bytes)=658063360
15/09/05 19:30:51 INFO mapred.JobClient: CPU time spent (ms)=169550
15/09/05 19:30:51 INFO mapred.JobClient: Map input bytes=264075431
15/09/05 19:30:51 INFO mapred.JobClient: SPLIT_RAW_BYTES=384
15/09/05 19:30:51 INFO mapred.JobClient: Combine input records=0
15/09/05 19:30:51 INFO mapred.JobClient: Reduce input records=16522439
15/09/05 19:30:51 INFO mapred.JobClient: Reduce input groups=3258984
15/09/05 19:30:51 INFO mapred.JobClient: Combine output records=0
15/09/05 19:30:51 INFO mapred.JobClient: Physical memory (bytes) snapshot=706838528
15/09/05 19:30:51 INFO mapred.JobClient: Reduce output records=3258984
15/09/05 19:30:51 INFO mapred.JobClient: Virtual memory (bytes) snapshot=1894498304
15/09/05 19:30:51 INFO mapred.JobClient: Map output records=16522439

执行完后再看HDFS中的文件:

wu@ubuntu:~/opt/hadoop-1.0.1$ bin/hadoop fs -lsr

-rw-r–r– 1 wu supergroup 264075431 2015-09-05 03:07 /user/wu/cite75_99.txt
drwxr-xr-x – wu supergroup 0 2015-06-29 01:05 /user/wu/in
-rw-r–r– 1 wu supergroup 72 2015-06-05 01:35 /user/wu/in/1001.txt
-rw-r–r– 1 wu supergroup 11 2015-06-29 01:05 /user/wu/in/f1.txt
drwxr-xr-x – wu supergroup 0 2015-09-05 19:30 /user/wu/myjob
-rw-r–r– 1 wu supergroup 0 2015-09-05 19:30 /user/wu/myjob/_SUCCESS
drwxr-xr-x – wu supergroup 0 2015-09-05 19:25 /user/wu/myjob/_logs
drwxr-xr-x – wu supergroup 0 2015-09-05 19:25 /user/wu/myjob/_logs/history
-rw-r–r– 1 wu supergroup 22986 2015-09-05 19:25 /user/wu/myjob/_logs/history/job_201509050137_0003_1441506345429_wu_MyJob
-rw-r–r– 1 wu supergroup 20331 2015-09-05 19:25 /user/wu/myjob/_logs/history/job_201509050137_0003_conf.xml
-rw-r–r– 1 wu supergroup 158078539 2015-09-05 19:29 /user/wu/myjob/part-00000
drwxr-xr-x – wu supergroup 0 2015-09-05 19:25 /user/wu/out

发现结果文件 myjob 已经有了,将它里面的 part-00000 下载到本地文件系统中:

wu@ubuntu:~/opt/hadoop-1.0.1$ bin/hadoop fs -get /user/wu/myjob/part-00000 .

你会发现文件 part-00000已经下载到当前路径文件夹

查看前十行:

“CITED” “CITING”
1 3964859,4647229
10000 4539112
100000 5031388
1000006 4714284
1000007 4766693
1000011 5033339
1000017 3908629
1000026 4043055
1000033 4190903,4975983

可以看到,4190903, 4975983 两个专利都引用了 1000033 专利。

这时,回过头来看前面的 JAVA 代码
执行整个逻辑的类是 MyJob, Hadoop 要求 Mapper 和 Reducer 必须是它们自身的静态类,所以在 MyJob 里面分别定义了这两个类。而且将它定义成了MyJob 的内部类。这样是为了简单代码,方便管理。
代码逻辑中,核心代码在 run() 中。它实例化一个 JobConf 对象,并通过 JobClient.runJob(job);启动作业。实际上是 JobClient 和 JobTracker 通信,让该作业在集群上启动。JobConf 保存作业运行所需的全部配置参数。
Mapper 类的核心方法是 map(), Reducer 的是 reduce() 方法。
每个 map() 方法的调用都会传入一个类型为 K1, V1 的键值对,它是由 mapper 生成,并通过 OutputCollector 对象的 collect() 方法来输出,所以一定要调用:
output.collect((K2) k, (V2) v);
Reducer 中的 reduce() 方法的每次调用都被赋予 K2 类型的键以及 V2 类型的一组值。K2, V2 的类型必须和 mapper 中输出的类型一样。reduce() 可能会遍历 V2 列表中的所有值:
while (values.hasNext()) {
      if (csv.length() > 0) {
      csv += ",";
    }
    csv += values.next().toString();
}

处理完后,reduce() 使用 OutputCollector 来输出 K3/V3 的键/值对:output.collect(key, new Text(csv));

要注意的是 mapper 和 reducer 中使用的数据类型必须和定义 JobConf 时设置的一样:
job.setInputFormat(KeyValueTextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

 

如果文章对您有帮助,欢迎点击下方按钮打赏作者

Comments

No comments yet.
To verify that you are human, please fill in "七"(required)