原创文章,转载请注明: 转载自工学1号馆
本文将介绍DistributedCache在Hadoop中的使用方法及实现原理。
DistributedCache是Hadoop提供的文件缓存工具,它能够自动将指定的文件分发到各个节点上,缓存到本地,供用户程序读取使用。它具有以下几个特点:缓存的文件是只读的,修改这些文件内容没有意义;用户可以调整文件可见范围(比如只能用户自己使用,所有用户都可以使用等),进而防止重复拷贝现象;按需拷贝,文件是通过HDFS作为共享数据中心分发到各节点的,且只发给任务被调度到的节点。
Hadoop DistributedCache有以下几种典型的应用场景:
1)分发字典文件,一些情况下Mapper或者Reducer需要用到一些外部字典,比如黑白名单、词表等;
2)map-side join:当多表连接时,一种场景是一个表很大,一个表很小,小到足以加载到内存中,这时可以使用DistributedCache将小表分发到各个节点上,以供Mapper加载使用;
3)自动化软件部署:有些情况下,MapReduce需依赖于特定版本的库,比如依赖于某个版本的PHP解释器,一种做法是让集群管理员把这个版本的PHP装到各个机器上,这通常比较麻烦,另一种方法是使用DistributedCache分发到各个节点上,程序运行完后,Hadoop自动将其删除。
Hadoop提供了两种DistributedCache使用方式,一种是通过API,在程序中设置文件路径,另外一种是通过命令行(-files,-archives或-libjars)参数告诉Hadoop,建议使用第二种方式,该方式可使用以下三个参数设置文件:
(1)-files:将指定的本地/hdfs文件分发到各个Task的工作目录下,不对文件进行任何处理;
(2)-archives:将指定文件分发到各个Task的工作目录下,并对名称后缀为“.jar”、“.zip”,“.tar.gz”、“.tgz”的文件自动解压,默认情况下,解压后的内容存放到工作目录下名称为解压前文件名的目录中,比如压缩包为dict.zip,则解压后内容存放到目录dict.zip中。为此,你可以给文件起个别名/软链接,比如dict.zip#dict,这样,压缩包会被解压到目录dict中。
(3)-libjars:指定待分发的jar包,Hadoop将这些jar包分发到各个节点上后,会将其自动添加到任务的CLASSPATH环境变量中。
使用DistributedCache
DistributedCache第一个比较方便的作用就是来完成分布式文件共享这件事,第二个比较有用的场景,就是在执行一些join操作时,将小表放入cache中,来提高连接效率。
下面我们先通过一个表格来看下,在hadoop中,使用全局变量或全局文件共享的几种方法
序号 | 方法 |
1 | 使用Configuration的set方法,只适合数据内容比较小的场景 |
2 | 将共享文件放在HDFS上,每次都去读取,效率比较低 |
3 | 将共享文件放在DistributedCache里,在setup初始化一次后,即可多次使用,缺点是不支持修改操作,仅能读取 |
接下来将介绍,使用DistributedCache的方法,来共享一些全局配置文件或变量,通过DistributedCache的addCacheFile方法,把HDFS上的一些文件,在hadoop任务启动时,就载入缓存里面,以供全局使用,使用这个方法时,我们需要注意几点,首先我们的本地文件,需要上传到HDFS上,然后再这个方法里写入加载路径,接下来就可以在setup初始化时读取出其内容,然后在map或reduce方法执行时,就可以实时的使用这个文件的一些内容了。
测试共享的一个文件内容如下:
f1.txt,内容“wu yu dong”
import java.io.File; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.util.Scanner; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; //import org.apache.log4j.pattern.LogEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; //import com.qin.operadb.WriteMapDB; public class TestDistributed { private static Logger logger=LoggerFactory.getLogger(TestDistributed.class); private static class FileMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ Path path[]=null; /** * Map函数前调用 * * */ @Override protected void setup(Context context) throws IOException, InterruptedException { logger.info("开始启动setup......"); Configuration conf=context.getConfiguration(); path=DistributedCache.getLocalCacheFiles(conf); System.out.println("获取的路径是: " + path[0].toString()); FileSystem fsopen= FileSystem.getLocal(conf); FSDataInputStream in = fsopen.open(path[0]); Scanner scan=new Scanner(in); while(scan.hasNext()){ System.out.println(Thread.currentThread().getName()+"扫描的内容: "+scan.next()); } scan.close(); } @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { System.out.println("map里输出了"); context.write(new Text(""), new IntWritable(0)); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { logger.info("清空任务了。。。。。。"); } } private static class FileReduce extends Reducer<Object, Object, Object, Object>{ @Override protected void reduce(Object arg0, Iterable<Object> arg1, Context arg2)throws IOException, InterruptedException { System.out.println("我是reduce里面的东西"); } } public static void main(String[] args)throws Exception { JobConf conf=new JobConf(TestDistributed.class); //注意这行代码放在最前面,进行初始化,否则会报 String inputPath="hdfs://localhost:9000/user/wu/in/"; String outputPath="hdfs://localhost:9000/user/wu/out/"; Job job=new Job(conf, "a"); DistributedCache.addCacheFile(new URI("hdfs://localhost:9000/user/wu/in/f1.txt"), job.getConfiguration()); job.setJarByClass(TestDistributed.class); System.out.println("运行模式: "+conf.get("mapred.job.tracker")); /**设置输出表的的信息 第一个参数是job任务,第二个参数是表名,第三个参数字段项**/ FileSystem fs=FileSystem.get(job.getConfiguration()); Path pout=new Path(outputPath); if(fs.exists(pout)){ fs.delete(pout, true); System.out.println("存在此路径, 已经删除......"); } /**设置Map类**/ job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setMapperClass(FileMapper.class); job.setReducerClass(FileReduce.class); FileInputFormat.setInputPaths(job, new Path(inputPath)); //输入路径 FileOutputFormat.setOutputPath(job, new Path(outputPath));//输出路径 System.exit(job.waitForCompletion(true) ? 0 : 1); } }
部分输出内容如下:
获取的路径是: /tmp/hadoop-wu/mapred/local/archive/8979102494836235044_338147689_1046069009/localhost/user/wu/in/f1.txt
Thread-2扫描的内容: wu
Thread-2扫描的内容: yu
Thread-2扫描的内容: dong
map里输出了
map里输出了
map里输出了
15/06/29 01:14:58 INFO TestDistributed: 清空任务了。。。。。。
15/06/29 01:14:58 INFO mapred.MapTask: Starting flush of map output
15/06/29 01:14:58 INFO mapred.MapTask: Finished spill 0
15/06/29 01:14:58 INFO mapred.Task: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
15/06/29 01:14:58 INFO mapred.JobClient: map 100% reduce 0%
15/06/29 01:15:00 INFO mapred.LocalJobRunner:
15/06/29 01:15:00 INFO mapred.Task: Task ‘attempt_local_0001_m_000001_0’ done.
15/06/29 01:15:01 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@d9660d
15/06/29 01:15:01 INFO mapred.LocalJobRunner:
15/06/29 01:15:01 INFO mapred.Merger: Merging 2 sorted segments
15/06/29 01:15:01 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 67 bytes
15/06/29 01:15:01 INFO mapred.LocalJobRunner:
我是reduce里面的东西
Comments