原创文章,转载请注明: 转载自工学1号馆
在上篇文章《Hadoop in Action–获取专利数据集》中,获取了实验所需的数据,这篇文章来运行一个mapreduce实例
MapReduce介绍
实例
wu@ubuntu:~/opt/hadoop-1.0.1$ bin/hadoop dfs -put cite75_99.txt cite75_99.txt
查看是否上传成功:
wu@ubuntu:~/opt/hadoop-1.0.1$ bin/hadoop dfs -ls
-rw-r–r– 1 wu supergroup 264075431 2015-09-05 03:07 /user/wu/cite75_99.txt
drwxr-xr-x – wu supergroup 0 2015-06-29 01:05 /user/wu/in
drwxr-xr-x – wu supergroup 0 2015-06-29 01:15 /user/wu/out
说明上传成功!
import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.util.Tool; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.KeyValueTextInputFormat; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.util.ToolRunner; public class MyJob extends Configured implements Tool{ public static class MapClass extends MapReduceBase implements Mapper<Text, Text, Text, Text> { public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { output.collect(value, key); } } public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { String csv = ""; while(values.hasNext()) { if(csv.length() > 0) csv += ","; csv += values.next().toString(); } output.collect(key, new Text(csv)); } } public int run(String[] args) throws Exception { Configuration conf = getConf(); JobConf job = new JobConf(conf, MyJob.class); Path in = new Path(args[0]); Path out = new Path(args[1]); FileInputFormat.setInputPaths(job, in); FileOutputFormat.setOutputPath(job, out); job.setJobName("MyJob"); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); job.setInputFormat(KeyValueTextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.set("key.value.separator.in.input.line", ","); JobClient.runJob(job); return 0; } public static void main(String[] args) throws Exception{ int res = ToolRunner.run(new Configuration(), new MyJob(), args); System.exit(res); } }
wu@ubuntu:~$ javac -classpath ~/opt/hadoop-1.0.1/hadoop-core-1.0.1.jar -d ~/opt/myjob/class ~/opt/myjob/src/MyJob.java
生成 JAR 包:
wu@ubuntu:~$ jar -cvf ~/opt/myjob/myjob.jar -C ~/opt/myjob/class/ .
Hadoop 执行 JAR:
wu@ubuntu:~$ ~/opt/hadoop-1.0.1/bin/hadoop jar ~/opt/myjob/myjob.jar MyJob /user/wu/cite75_99.txt /user/wu/myjob
执行过程如下:
15/09/05 19:25:45 INFO mapred.FileInputFormat: Total input paths to process : 1
15/09/05 19:25:45 INFO mapred.JobClient: Running job: job_201509050137_0003
15/09/05 19:25:46 INFO mapred.JobClient: map 0% reduce 0%
15/09/05 19:26:07 INFO mapred.JobClient: map 9% reduce 0%
15/09/05 19:26:12 INFO mapred.JobClient: map 13% reduce 0%
15/09/05 19:26:15 INFO mapred.JobClient: map 19% reduce 0%
15/09/05 19:26:18 INFO mapred.JobClient: map 22% reduce 0%
15/09/05 19:26:22 INFO mapred.JobClient: map 25% reduce 0%
15/09/05 19:26:24 INFO mapred.JobClient: map 31% reduce 0%
15/09/05 19:26:27 INFO mapred.JobClient: map 35% reduce 0%
15/09/05 19:26:30 INFO mapred.JobClient: map 41% reduce 0%
15/09/05 19:26:33 INFO mapred.JobClient: map 46% reduce 0%
15/09/05 19:26:36 INFO mapred.JobClient: map 49% reduce 0%
15/09/05 19:27:59 INFO mapred.JobClient: map 54% reduce 0%
15/09/05 19:28:03 INFO mapred.JobClient: map 57% reduce 0%
15/09/05 19:28:05 INFO mapred.JobClient: map 60% reduce 0%
15/09/05 19:28:08 INFO mapred.JobClient: map 63% reduce 0%
15/09/05 19:28:11 INFO mapred.JobClient: map 66% reduce 0%
15/09/05 19:28:16 INFO mapred.JobClient: map 70% reduce 0%
15/09/05 19:28:19 INFO mapred.JobClient: map 73% reduce 0%
15/09/05 19:28:21 INFO mapred.JobClient: map 76% reduce 0%
15/09/05 19:28:24 INFO mapred.JobClient: map 78% reduce 0%
15/09/05 19:28:29 INFO mapred.JobClient: map 81% reduce 8%
15/09/05 19:28:33 INFO mapred.JobClient: map 86% reduce 8%
15/09/05 19:28:35 INFO mapred.JobClient: map 89% reduce 8%
15/09/05 19:28:38 INFO mapred.JobClient: map 92% reduce 8%
15/09/05 19:28:44 INFO mapred.JobClient: map 98% reduce 8%
15/09/05 19:28:49 INFO mapred.JobClient: map 99% reduce 8%
15/09/05 19:28:58 INFO mapred.JobClient: map 100% reduce 8%
15/09/05 19:29:15 INFO mapred.JobClient: map 100% reduce 16%
15/09/05 19:29:43 INFO mapred.JobClient: map 100% reduce 25%
15/09/05 19:29:53 INFO mapred.JobClient: map 100% reduce 66%
15/09/05 19:29:56 INFO mapred.JobClient: map 100% reduce 70%
15/09/05 19:29:59 INFO mapred.JobClient: map 100% reduce 72%
15/09/05 19:30:02 INFO mapred.JobClient: map 100% reduce 75%
15/09/05 19:30:05 INFO mapred.JobClient: map 100% reduce 77%
15/09/05 19:30:08 INFO mapred.JobClient: map 100% reduce 79%
15/09/05 19:30:11 INFO mapred.JobClient: map 100% reduce 81%
15/09/05 19:30:15 INFO mapred.JobClient: map 100% reduce 83%
15/09/05 19:30:18 INFO mapred.JobClient: map 100% reduce 85%
15/09/05 19:30:21 INFO mapred.JobClient: map 100% reduce 87%
15/09/05 19:30:24 INFO mapred.JobClient: map 100% reduce 90%
15/09/05 19:30:27 INFO mapred.JobClient: map 100% reduce 93%
15/09/05 19:30:30 INFO mapred.JobClient: map 100% reduce 95%
15/09/05 19:30:33 INFO mapred.JobClient: map 100% reduce 97%
15/09/05 19:30:40 INFO mapred.JobClient: map 100% reduce 100%
15/09/05 19:30:50 INFO mapred.JobClient: Job complete: job_201509050137_0003
15/09/05 19:30:51 INFO mapred.JobClient: Counters: 30
15/09/05 19:30:51 INFO mapred.JobClient: Job Counters
15/09/05 19:30:51 INFO mapred.JobClient: Launched reduce tasks=1
15/09/05 19:30:51 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=441444
15/09/05 19:30:51 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
15/09/05 19:30:51 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
15/09/05 19:30:51 INFO mapred.JobClient: Launched map tasks=4
15/09/05 19:30:51 INFO mapred.JobClient: Data-local map tasks=4
15/09/05 19:30:51 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=190932
15/09/05 19:30:51 INFO mapred.JobClient: File Input Format Counters
15/09/05 19:30:51 INFO mapred.JobClient: Bytes Read=264087722
15/09/05 19:30:51 INFO mapred.JobClient: File Output Format Counters
15/09/05 19:30:51 INFO mapred.JobClient: Bytes Written=158078539
15/09/05 19:30:51 INFO mapred.JobClient: FileSystemCounters
15/09/05 19:30:51 INFO mapred.JobClient: FILE_BYTES_READ=735648003
15/09/05 19:30:51 INFO mapred.JobClient: HDFS_BYTES_READ=264088106
15/09/05 19:30:51 INFO mapred.JobClient: FILE_BYTES_WRITTEN=1032876240
15/09/05 19:30:51 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=158078539
15/09/05 19:30:51 INFO mapred.JobClient: Map-Reduce Framework
15/09/05 19:30:51 INFO mapred.JobClient: Map output materialized bytes=297120333
15/09/05 19:30:51 INFO mapred.JobClient: Map input records=16522439
15/09/05 19:30:51 INFO mapred.JobClient: Reduce shuffle bytes=297120333
15/09/05 19:30:51 INFO mapred.JobClient: Spilled Records=57431615
15/09/05 19:30:51 INFO mapred.JobClient: Map output bytes=264075431
15/09/05 19:30:51 INFO mapred.JobClient: Total committed heap usage (bytes)=658063360
15/09/05 19:30:51 INFO mapred.JobClient: CPU time spent (ms)=169550
15/09/05 19:30:51 INFO mapred.JobClient: Map input bytes=264075431
15/09/05 19:30:51 INFO mapred.JobClient: SPLIT_RAW_BYTES=384
15/09/05 19:30:51 INFO mapred.JobClient: Combine input records=0
15/09/05 19:30:51 INFO mapred.JobClient: Reduce input records=16522439
15/09/05 19:30:51 INFO mapred.JobClient: Reduce input groups=3258984
15/09/05 19:30:51 INFO mapred.JobClient: Combine output records=0
15/09/05 19:30:51 INFO mapred.JobClient: Physical memory (bytes) snapshot=706838528
15/09/05 19:30:51 INFO mapred.JobClient: Reduce output records=3258984
15/09/05 19:30:51 INFO mapred.JobClient: Virtual memory (bytes) snapshot=1894498304
15/09/05 19:30:51 INFO mapred.JobClient: Map output records=16522439
执行完后再看HDFS中的文件:
wu@ubuntu:~/opt/hadoop-1.0.1$ bin/hadoop fs -lsr
-rw-r–r– 1 wu supergroup 264075431 2015-09-05 03:07 /user/wu/cite75_99.txt
drwxr-xr-x – wu supergroup 0 2015-06-29 01:05 /user/wu/in
-rw-r–r– 1 wu supergroup 72 2015-06-05 01:35 /user/wu/in/1001.txt
-rw-r–r– 1 wu supergroup 11 2015-06-29 01:05 /user/wu/in/f1.txt
drwxr-xr-x – wu supergroup 0 2015-09-05 19:30 /user/wu/myjob
-rw-r–r– 1 wu supergroup 0 2015-09-05 19:30 /user/wu/myjob/_SUCCESS
drwxr-xr-x – wu supergroup 0 2015-09-05 19:25 /user/wu/myjob/_logs
drwxr-xr-x – wu supergroup 0 2015-09-05 19:25 /user/wu/myjob/_logs/history
-rw-r–r– 1 wu supergroup 22986 2015-09-05 19:25 /user/wu/myjob/_logs/history/job_201509050137_0003_1441506345429_wu_MyJob
-rw-r–r– 1 wu supergroup 20331 2015-09-05 19:25 /user/wu/myjob/_logs/history/job_201509050137_0003_conf.xml
-rw-r–r– 1 wu supergroup 158078539 2015-09-05 19:29 /user/wu/myjob/part-00000
drwxr-xr-x – wu supergroup 0 2015-09-05 19:25 /user/wu/out
发现结果文件 myjob 已经有了,将它里面的 part-00000 下载到本地文件系统中:
wu@ubuntu:~/opt/hadoop-1.0.1$ bin/hadoop fs -get /user/wu/myjob/part-00000 .
你会发现文件 part-00000已经下载到当前路径文件夹
查看前十行:
“CITED” “CITING”
1 3964859,4647229
10000 4539112
100000 5031388
1000006 4714284
1000007 4766693
1000011 5033339
1000017 3908629
1000026 4043055
1000033 4190903,4975983
可以看到,4190903, 4975983 两个专利都引用了 1000033 专利。
while (values.hasNext()) { if (csv.length() > 0) { csv += ","; } csv += values.next().toString(); }
处理完后,reduce() 使用 OutputCollector 来输出 K3/V3 的键/值对:output.collect(key, new Text(csv));
job.setInputFormat(KeyValueTextInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class);
Comments