原创文章,转载请注明: 转载自工学1号馆
本文主要剖析Hadoop作业提交系列的作业文件提交过程
先看JobClient类的源代码开头的注释,翻译如下:
JobClient是一个为了将用户端作业与JobTrack连接的基础接口,JobClient提供了便利的提交作业、跟踪进度,访问组件任务的报告/日志,获取Map-Reduce集群的状态信息等等。作业提交过程涉及如下:
1、检查作业的输入输出规格
2、为作业计算InputSplits(后面的文章详细介绍)
3、如果有需要,为作业的DistributedCache计划必须的账户信息
4、将作业的jar包和配置信息复制到分布式文件系统分map-reduce的系统目录
5、向JobTrack提交作业并随时监控它的状态
通常,用户创建应用,通过JobConf类描述作业的不同方面,并使用JobClient提交作业和监控它的进度
下面是一个例子,关于如何使用JobClient:
// 创建一个新的JobConf对象 JobConf job = new JobConf(new Configuration(), MyJob.class); // 指定不同的作业特征参数 job.setJobName("myjob"); job.setInputPath(new Path("in")); job.setOutputPath(new Path("out")); job.setMapperClass(MyJob.MyMapper.class); job.setReducerClass(MyJob.MyReducer.class); // 提交作业,调查进程知道作业完成 JobClient.runJob(job);
有时候用户可能会链接map-reduce作业用来完成那些通过单一map-reduce不能完成的复杂的任务。这相当的简单,因为job通常输出到分布式文件系统,可以将其作为另一个作业的输入。
然而,这意味着确保作业是否完成(成功/失败)的责任落在客户端,这种情况下,不同的作业控制如下:
runJob(JobConf):提交作业和返回仅仅在作业完成之后
submitJob(JobConf):仅仅提交作业,然后轮询RunningJob返回的句柄来 查询状态和做调度决定
JobConf#setJobEndNotificationURI(String):设置一个作业完成的通知,因此避免轮询
submitJob的方法源代码:
public RunningJob submitJob(JobConf job) throws FileNotFoundException, IOException { try { return submitJobInternal(job); } catch (InterruptedException ie) { throw new IOException("interrupted", ie); } catch (ClassNotFoundException cnfe) { throw new IOException("class not found", cnfe); } }
由于该方法调用的是submitJobInternal方法,所以接下来重点分析JobClient.submitJobInternal函数:
方法源代码的前面注释:
内部的方法,作用是将作业提交到系统
@param job 提交作业的配置参数
@return 为运行的作业返回一个动态代理对象
@throws FileNotFoundException
@throws ClassNotFoundException
@throws InterruptedException
@throws IOException
完整的函数代码如下:
public RunningJob submitJobInternal(final JobConf job ) throws FileNotFoundException, ClassNotFoundException, InterruptedException, IOException { /* * configure the command line options correctly on the submitting dfs */ return ugi.doAs(new PrivilegedExceptionAction<RunningJob>() { public RunningJob run() throws FileNotFoundException, ClassNotFoundException, InterruptedException, IOException{ JobConf jobCopy = job; Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this, jobCopy); JobID jobId = jobSubmitClient.getNewJobId(); Path submitJobDir = new Path(jobStagingArea, jobId.toString()); jobCopy.set("mapreduce.job.dir", submitJobDir.toString()); JobStatus status = null; try { populateTokenCache(jobCopy, jobCopy.getCredentials()); copyAndConfigureFiles(jobCopy, submitJobDir); // get delegation token for the dir TokenCache.obtainTokensForNamenodes(jobCopy.getCredentials(), new Path [] {submitJobDir}, jobCopy); Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); int reduces = jobCopy.getNumReduceTasks(); InetAddress ip = InetAddress.getLocalHost(); if (ip != null) { job.setJobSubmitHostAddress(ip.getHostAddress()); job.setJobSubmitHostName(ip.getHostName()); } JobContext context = new JobContext(jobCopy, jobId); jobCopy = (JobConf)context.getConfiguration(); // Check the output specification if (reduces == 0 ? jobCopy.getUseNewMapper() : jobCopy.getUseNewReducer()) { org.apache.hadoop.mapreduce.OutputFormat<?,?> output = ReflectionUtils.newInstance(context.getOutputFormatClass(), jobCopy); output.checkOutputSpecs(context); } else { jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy); } // Create the splits for the job FileSystem fs = submitJobDir.getFileSystem(jobCopy); LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir)); int maps = writeSplits(context, submitJobDir); jobCopy.setNumMapTasks(maps); // write "queue admins of the queue to which job is being submitted" // to job file. String queue = jobCopy.getQueueName(); AccessControlList acl = jobSubmitClient.getQueueAdmins(queue); jobCopy.set(QueueManager.toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()), acl.getACLString()); // Write job file to JobTracker's fs FSDataOutputStream out = FileSystem.create(fs, submitJobFile, new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION)); try { jobCopy.writeXml(out); } finally { out.close(); } // // Now, actually submit the job (using the submit name) // printTokens(jobId, jobCopy.getCredentials()); status = jobSubmitClient.submitJob( jobId, submitJobDir.toString(), jobCopy.getCredentials()); JobProfile prof = jobSubmitClient.getJobProfile(jobId); if (status != null && prof != null) { return new NetworkedJob(status, prof, jobSubmitClient); } else { throw new IOException("Could not launch job"); } } finally { if (status == null) { LOG.info("Cleaning up the staging area " + submitJobDir); if (fs != null && submitJobDir != null) fs.delete(submitJobDir, true); } } } }); }
具体过程如下:
1、向jobtracker请求一个新的作业ID
JobID jobId = jobSubmitClient.getNewJobId();
2、检查作业的输出说明,如果没有指定输出目录或输出目录已经存在,作业就不提交,错误抛回给MapReduce程序
if (reduces == 0 ? jobCopy.getUseNewMapper() : jobCopy.getUseNewReducer()) { org.apache.hadoop.mapreduce.OutputFormat<?,?> output = ReflectionUtils.newInstance(context.getOutputFormatClass(), jobCopy); output.checkOutputSpecs(context); } else { jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy); }
3、计算作业的分片,如果分片无法计算,比如因为作业路径不存在,作业就不提交,,错误抛回给MapReduce程序
FileSystem fs = submitJobDir.getFileSystem(jobCopy); LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir)); int maps = writeSplits(context, submitJobDir); jobCopy.setNumMapTasks(maps);
4、将运行作业所需的资源复制到一个以作业ID命名的目录下jobtrackr的文件系统中,作业JAR的副本较多,因此在运行作业的任务时,集群中有很多个副本可供tasktracker访问
FSDataOutputStream out = FileSystem.create(fs, submitJobFile, new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION)); try { jobCopy.writeXml(out); } finally { out.close(); }
5、告知jobtracker,作业准备执行
printTokens(jobId, jobCopy.getCredentials()); status = jobSubmitClient.submitJob( jobId, submitJobDir.toString(), jobCopy.getCredentials()); JobProfile prof = jobSubmitClient.getJobProfile(jobId); if (status != null && prof != null) { return new NetworkedJob(status, prof, jobSubmitClient); } else { throw new IOException("Could not launch job"); } } finally { if (status == null) { LOG.info("Cleaning up the staging area " + submitJobDir); if (fs != null && submitJobDir != null) fs.delete(submitJobDir, true); } }
Comments