原创文章,转载请注明: 转载自工学1号馆
本文主要剖析Hadoop作业提交后的初始化过程
调度器调用JobTracker.initJob()函数对新作业进行初始化,作业初始化的主要工作是构造Map Task和Reduce Task并对它们进行初始化。
Hadoop将每个作业分解成4种类型的任务,分别是Setup Task、Map Task、Reduce Task和Cleanup Task。它们的运行时信息由TaskInProgress类维护,因此,创建这些任务实际上是创建TaskInProgress对象。
上述4种任务的作用及创建过程如下。
Setup Task:作业初始化标识性任务。它进行一些非常简单的作业初始化工作,比如将运行状态设置为“setup”,调用OutputCommitter.setupJob()函数等。该任务运行完后,作业由PREP状态变为RUNNING状态,并开始运行Map Task。该类型任务又被分为Map Setup Task和Reduce Setup Task两种,且每个作业各有一个。它们运行时分别占用一个Map slot和Reduce slot。由于这两种任务功能相同,因此有且只有一个可以获得运行的机会(即只要有一个开始运行,另一个马上被杀掉,而具体哪一个能够运行,取决于当时存在的空闲slot种类及调度策略。相关代码如下:
…… // create two setup tips, one map and one reduce. setup = new TaskInProgress[2]; // setup map tip. This map doesn't use any split. Just assign an empty // split. setup[0] = new TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf, this, numMapTasks + 1, 1); setup[0].setJobSetupTask(); // setup reduce tip. setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks, numReduceTasks + 1, jobtracker, conf, this, 1); setup[1].setJobSetupTask(); ……
Map Task:Map阶段处理数据的任务。其数目及对应的处理数据分片由应用程序中的InputFormat组件确定。相关代码如下:
// read input splits and create a map per a split // TaskSplitMetaInfo[] splits = createSplits(jobId); if (numMapTasks != splits.length) { throw new IOException("Number of maps in JobConf doesn't match number of " + "recieved splits for job " + jobId + "! " + "numMapTasks=" + numMapTasks + ", #splits=" + splits.length); } numMapTasks = splits.length; //...... maps = new TaskInProgress[numMapTasks]; for(int i=0; i < numMapTasks; ++i) { inputLength += splits[i].getInputDataLength(); maps[i] = new TaskInProgress(jobId, jobFile, splits[i], jobtracker, conf, this, i, numSlotsPerMap); }
Reduce Task:Reduce阶段处理数据的任务。其数目由用户通过参数mapred.reduce.tasks(默认数目为1)指定。考虑到Reduce Task能否运行依赖于Map Task的输出结果,因此,Hadoop刚开始只会调度Map Task,直到Map Task完成数目达到一定比例(由参数mapred.reduce.slowstart.completed.maps指定,默认是0.05,即5%)后,才开始调度Reduce Task。相关代码如下:
// // Create reduce tasks // this.reduces = new TaskInProgress[numReduceTasks]; for (int i = 0; i < numReduceTasks; i++) { reduces[i] = new TaskInProgress(jobId, jobFile, numMapTasks, i, jobtracker, conf, this, numSlotsPerReduce); nonRunningReduces.add(reduces[i]); }
Cleanup Task:作业结束标志性任务,主要完成一些清理工作,比如删除作业运行过程中用到的一些临时目录(比如_temporary目录)。一旦该任务运行成功后,作业由RUNNING状态变为SUCCESSED状态。相关代码如下:
public class JobInProgress { TaskInProgress cleanup[] = new TaskInProgress[0]; ... ... public synchronized void initTasks() { ... ... // create cleanup two cleanup tips, one map and one reduce. cleanup = new TaskInProgress[2]; // cleanup map tip. This map doesn't use any splits. Just assign an empty // split. TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT; cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit, jobtracker, conf, this, numMapTasks, 1); cleanup[0].setJobCleanupTask(); // cleanup reduce tip. cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks, numReduceTasks, jobtracker, conf, this, 1); cleanup[1].setJobCleanupTask(); ... ... }
Comments