工学1号馆

home

Hadoop作业提交深度剖析4–作业提交

Wu Yudong    June 28, 2015     Hadoop   689   

原创文章,转载请注明: 转载自工学1号馆

本文主要剖析Hadoop作业提交系列的作业文件提交过程

7201718

 

先看JobClient类的源代码开头的注释,翻译如下:

JobClient是一个为了将用户端作业与JobTrack连接的基础接口,JobClient提供了便利的提交作业、跟踪进度,访问组件任务的报告/日志,获取Map-Reduce集群的状态信息等等。作业提交过程涉及如下:

1、检查作业的输入输出规格

2、为作业计算InputSplits(后面的文章详细介绍)

3、如果有需要,为作业的DistributedCache计划必须的账户信息

4、将作业的jar包和配置信息复制到分布式文件系统分map-reduce的系统目录

5、向JobTrack提交作业并随时监控它的状态

通常,用户创建应用,通过JobConf类描述作业的不同方面,并使用JobClient提交作业和监控它的进度

下面是一个例子,关于如何使用JobClient:

// 创建一个新的JobConf对象
JobConf job = new JobConf(new Configuration(), MyJob.class);
     
// 指定不同的作业特征参数     
job.setJobName("myjob");
     
job.setInputPath(new Path("in"));
job.setOutputPath(new Path("out"));
     
job.setMapperClass(MyJob.MyMapper.class);
job.setReducerClass(MyJob.MyReducer.class);

// 提交作业,调查进程知道作业完成
JobClient.runJob(job);

有时候用户可能会链接map-reduce作业用来完成那些通过单一map-reduce不能完成的复杂的任务。这相当的简单,因为job通常输出到分布式文件系统,可以将其作为另一个作业的输入。

然而,这意味着确保作业是否完成(成功/失败)的责任落在客户端,这种情况下,不同的作业控制如下:

runJob(JobConf):提交作业和返回仅仅在作业完成之后

submitJob(JobConf):仅仅提交作业,然后轮询RunningJob返回的句柄来 查询状态和做调度决定

JobConf#setJobEndNotificationURI(String):设置一个作业完成的通知,因此避免轮询

submitJob的方法源代码:

  public RunningJob submitJob(JobConf job) throws FileNotFoundException,
                                                  IOException {
    try {
      return submitJobInternal(job);
    } catch (InterruptedException ie) {
      throw new IOException("interrupted", ie);
    } catch (ClassNotFoundException cnfe) {
      throw new IOException("class not found", cnfe);
    }
  }

由于该方法调用的是submitJobInternal方法,所以接下来重点分析JobClient.submitJobInternal函数:

方法源代码的前面注释:

内部的方法,作用是将作业提交到系统
@param job 提交作业的配置参数
@return  为运行的作业返回一个动态代理对象
@throws FileNotFoundException
@throws ClassNotFoundException
@throws InterruptedException
@throws IOException

完整的函数代码如下:

  public 
  RunningJob submitJobInternal(final JobConf job
                               ) throws FileNotFoundException, 
                                        ClassNotFoundException,
                                        InterruptedException,
                                        IOException {
    /*
     * configure the command line options correctly on the submitting dfs
     */
    return ugi.doAs(new PrivilegedExceptionAction<RunningJob>() {
      public RunningJob run() throws FileNotFoundException, 
      ClassNotFoundException,
      InterruptedException,
      IOException{
        JobConf jobCopy = job;
        Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this,
            jobCopy);
        JobID jobId = jobSubmitClient.getNewJobId();
        Path submitJobDir = new Path(jobStagingArea, jobId.toString());
        jobCopy.set("mapreduce.job.dir", submitJobDir.toString());
        JobStatus status = null;
        try {
          populateTokenCache(jobCopy, jobCopy.getCredentials());

          copyAndConfigureFiles(jobCopy, submitJobDir);

          // get delegation token for the dir
          TokenCache.obtainTokensForNamenodes(jobCopy.getCredentials(),
                                              new Path [] {submitJobDir},
                                              jobCopy);

          Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
          int reduces = jobCopy.getNumReduceTasks();
          InetAddress ip = InetAddress.getLocalHost();
          if (ip != null) {
            job.setJobSubmitHostAddress(ip.getHostAddress());
            job.setJobSubmitHostName(ip.getHostName());
          }
          JobContext context = new JobContext(jobCopy, jobId);

          jobCopy = (JobConf)context.getConfiguration();

          // Check the output specification
          if (reduces == 0 ? jobCopy.getUseNewMapper() : 
            jobCopy.getUseNewReducer()) {
            org.apache.hadoop.mapreduce.OutputFormat<?,?> output =
              ReflectionUtils.newInstance(context.getOutputFormatClass(),
                  jobCopy);
            output.checkOutputSpecs(context);
          } else {
            jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy);
          }

          // Create the splits for the job
          FileSystem fs = submitJobDir.getFileSystem(jobCopy);
          LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir));
          int maps = writeSplits(context, submitJobDir);
          jobCopy.setNumMapTasks(maps);

          // write "queue admins of the queue to which job is being submitted"
          // to job file.
          String queue = jobCopy.getQueueName();
          AccessControlList acl = jobSubmitClient.getQueueAdmins(queue);
          jobCopy.set(QueueManager.toFullPropertyName(queue,
              QueueACL.ADMINISTER_JOBS.getAclName()), acl.getACLString());

          // Write job file to JobTracker's fs        
          FSDataOutputStream out = 
            FileSystem.create(fs, submitJobFile,
                new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));

          try {
            jobCopy.writeXml(out);
          } finally {
            out.close();
          }
          //
          // Now, actually submit the job (using the submit name)
          //
          printTokens(jobId, jobCopy.getCredentials());
          status = jobSubmitClient.submitJob(
              jobId, submitJobDir.toString(), jobCopy.getCredentials());
          JobProfile prof = jobSubmitClient.getJobProfile(jobId);
          if (status != null && prof != null) {
            return new NetworkedJob(status, prof, jobSubmitClient);
          } else {
            throw new IOException("Could not launch job");
          }
        } finally {
          if (status == null) {
            LOG.info("Cleaning up the staging area " + submitJobDir);
            if (fs != null && submitJobDir != null)
              fs.delete(submitJobDir, true);
          }
        }
      }
    });
  }

具体过程如下:

1、向jobtracker请求一个新的作业ID

JobID jobId = jobSubmitClient.getNewJobId();

2、检查作业的输出说明,如果没有指定输出目录或输出目录已经存在,作业就不提交,错误抛回给MapReduce程序

 if (reduces == 0 ? jobCopy.getUseNewMapper() : 
            jobCopy.getUseNewReducer()) {
            org.apache.hadoop.mapreduce.OutputFormat<?,?> output =
              ReflectionUtils.newInstance(context.getOutputFormatClass(),
                  jobCopy);
            output.checkOutputSpecs(context);
          } else {
            jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy);
          }

3、计算作业的分片,如果分片无法计算,比如因为作业路径不存在,作业就不提交,,错误抛回给MapReduce程序

  FileSystem fs = submitJobDir.getFileSystem(jobCopy);
          LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir));
          int maps = writeSplits(context, submitJobDir);
          jobCopy.setNumMapTasks(maps);

4、将运行作业所需的资源复制到一个以作业ID命名的目录下jobtrackr的文件系统中,作业JAR的副本较多,因此在运行作业的任务时,集群中有很多个副本可供tasktracker访问

  FSDataOutputStream out = 
            FileSystem.create(fs, submitJobFile,
                new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));

          try {
            jobCopy.writeXml(out);
          } finally {
            out.close();
          }

5、告知jobtracker,作业准备执行

   printTokens(jobId, jobCopy.getCredentials());
          status = jobSubmitClient.submitJob(
              jobId, submitJobDir.toString(), jobCopy.getCredentials());
          JobProfile prof = jobSubmitClient.getJobProfile(jobId);
          if (status != null && prof != null) {
            return new NetworkedJob(status, prof, jobSubmitClient);
          } else {
            throw new IOException("Could not launch job");
          }
        } finally {
          if (status == null) {
            LOG.info("Cleaning up the staging area " + submitJobDir);
            if (fs != null && submitJobDir != null)
              fs.delete(submitJobDir, true);
          }
        }

 

如果文章对您有帮助,欢迎点击下方按钮打赏作者

Comments

No comments yet.
To verify that you are human, please fill in "七"(required)