Job和Task运行时信息的维护

JobTracker最重要的功能之一是状态监控,包括TaskTracker、Job和Task等运行时状态的监控,其中TaskTracker状态监控比较简单,只要记录其最近心跳汇报时间和健康状况(由TaskTracker端的监控脚本检测,并通过心跳将结果发送给JobTracker)即可。
作业描述模型
如下图所示
这里写图片描述
JobTracker在其内部以“三层多叉树”的方式描述和跟踪每个作业的运行状态。JobTracker为每个作业创建一个JobInProgress对象以跟踪和监控其运行状态。该对象存在于作业的整个运行过程中:它在作业提交时创建,作业运行完成时销毁。同时,为了采用分而治之的策略解决问题,JobTracker会将每个作业拆分成若干个任务,并为每个任务创建一个TaskInProgress对象以跟踪和监控其运行状态,而任务在运行过程中,可能会因为软件Bug、硬件故障等原因运行失败,此时JobTracker会按照一定的策略重新运行该任务,也就是说,每个任务可能会尝试运行多次,直到运行成功或者因超过尝试次数而失败。JobTracker将每运行一次任务称为一次“任务运行尝试”,即Task Attempt。对于某个任务,只要有一个Task Attempt运行成功,则相应的TaskInProgress对象会标注该任务运行成功,而当所有的TaskInProgress均标注其对应的任务运行成功后,JobInProgress对象会标识整个作业运行成功。
为了区分各个作业,如图所示
这里写图片描述
JobTracker会赋予每个作业一个唯一的ID。该ID由三部分组成:作业前缀字符串、JobTracker启动时间和作业提交顺序,各部分通过“_”连接起来组成一个完整的作业ID,比如 job_201208071706_0009,对应的三部分分别是“job”、“201208071706”和“009”(JobTracker运行以来第9个作业)。每个任务的ID继承了作业的ID,并在此基础上进行了扩展,它由三部分组成:作业ID(其中前缀字符串变为“task”)、任务类型(map还是reduce)和任务编号(从000000开始,一直到999999)。比如,task_201208071706_0009_m_000000,表示它的作业ID为task_201208071706_0009,任务类型为map,任务编号为000000。每个Task Attempt的ID继承了任务的ID,它由两部分组成:任务ID(其中前缀字符串变为“attempt”)和运行尝试次数(从0开始),比如,attempt_201208071706_0009_m_000000_0表示任务task_201208071706_0009_m_000000的第0次尝试。

JobInProgress
JobInProgress类主要用于监控和跟踪作业运行状态,并为调度器提供最底层的调度接口。
JobInProgress维护了两种作业信息:一种是静态信息,这些信息是作业提交之时就已经确定好的;另一种是动态信息,这些信息随着作业的运行而动态变化。
(1)作业静态信息
作业静态信息是指作业提交之时就已经确定好的属性信息,主要包括以下几项:
org.apache.hadoop.mapred.JobInProgress.java

//map task, reduce task , cleanup task和setup task对应的TaskInProgress
 TaskInProgress maps[] = new TaskInProgress[0];
  TaskInProgress reduces[] = new TaskInProgress[0];
  TaskInProgress cleanup[] = new TaskInProgress[0];
  TaskInProgress setup[] = new TaskInProgress[0];
  int numMapTasks = 0;//Map Task个数
  int numReduceTasks = 0;//Reduce Task个数
  final long memoryPerMap;//每个Map Task需要的内存量
  final long memoryPerReduce;//每个Reduce Task需要的内存量
  volatile int numSlotsPerMap = 1;//每个Map Task需要的slot个数
  volatile int numSlotsPerReduce = 1;//每个Reduce Task需要的slot个数
  /*允许每个TaskTracker上失败的Task个数,默认是4,通过参数mapred.max.tracker.failures设置。当该作业在某个TaskTracker上失败的个数超过该值时,会将该节点添加到该作业的黑名单中,调度器不再为该节点分配该作业的任务*/
  final int maxTaskFailuresPerTracker;

  private static float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;//当有5%的Map Task完成后,才可以调度Reduce Task
 int completedMapsForReduceSlowstart = 0;//多少Map Task完成后开始调度Reduce Task

 final int mapFailuresPercent;//允许的Map Task失败比例上限,通过参数mapred.max.map.failures.percent设置
 final int reduceFailuresPercent;//允许的Reduce Task失败比例上限,通过参数mapred.max.reduce.failures.percent设置

 JobPriority priority = JobPriority.NORMAL;//作业优先级

(2)作业动态信息
作业动态信息是指作业运行过程中会动态更新的信息。这些信息对于发现TaskTracker/Job/Task故障非常有用,也可以为调度器进行任务调度提供决策依据。

 int runningMapTasks = 0;//正在运行的Map Task数目
  int runningReduceTasks = 0;//正在运行的Reduce Task数目
  int finishedMapTasks = 0;//运行完成的Map Task数目
  int finishedReduceTasks = 0;//运行完成的Reduce Task数目
  int failedMapTasks = 0; //失败的Map Task Attempt数目
  int failedReduceTasks = 0;//失败的Reduce Task Attempt数目

  int speculativeMapTasks = 0;//正在运行的备份任务(MAP)数目
  int speculativeReduceTasks = 0;//正在运行的备份任务(REDUCE)数目

    int failedMapTIPs = 0;//失败的TaskInProgress(MAP)数目,这意味着对应的输入数据将被丢弃,不会产生最终结果
  int failedReduceTIPs = 0;//失败的TaskInProgress(REDUCE)数目
    private volatile boolean launchedCleanup = false;//是否已启动Cleanup Task
  private volatile boolean launchedSetup = false;//是否已启动Setup Task
  private volatile boolean jobKilled = false;//作业是否已被杀死
  private volatile boolean jobFailed = false;//作业是否已失败

 // NetworkTopology Node to the set of TIPs
  Map<Node, List<TaskInProgress>> nonRunningMapCache;//节点与TaskInProgress的映射关系,即TaskInProgress输入数据位置与节点对应关系

  // Map of NetworkTopology Node to set of running TIPs
  Map<Node, Set<TaskInProgress>> runningMapCache;//节点及其上面正在运行的Task映射关系

  // A list of non-local, non-running maps
  final List<TaskInProgress> nonLocalMaps;//不需要考虑数据本地性的Map Task,如果一个Map Task的InputSplit Location为空,则进行任务调度时不需要考虑本地性

  // Set of failed, non-running maps sorted by #failures
  final SortedSet<TaskInProgress> failedMaps;//按照失败次数进行排序的TIP集合

  // A set of non-local running maps
  Set<TaskInProgress> nonLocalRunningMaps;//未运行的Map Task集合

  // A list of non-running reduce TIPs
  Set<TaskInProgress> nonRunningReduces;//未运行的Reduce Task集合

  // A set of running reduce TIPs
  Set<TaskInProgress> runningReduces;//正在运行的Reduce Task集合

  // A list of cleanup tasks for the map task attempts, to be launched
  List<TaskAttemptID> mapCleanupTasks = new LinkedList<TaskAttemptID>();//待清理的Map Task列表,比如用户直接通过命令“bin/hadoop job -kill”杀死的Task

  // A list of cleanup tasks for the reduce task attempts, to be launched
  List<TaskAttemptID> reduceCleanupTasks = new LinkedList<TaskAttemptID>();

long startTime;//作业提交时间
  long launchTime;//作业开始执行时间
  long finishTime;//作业完成时间

TaskInProgress
TaskInProgress类维护了一个Task运行过程中的全部信息。在Hadoop中,由于一个任务可能会推测执行或者重新执行,所以会存在多个Task Attempt,且同一时刻,可能有多个处理相同的任务尝试同时在执行,而这些任务被同一个TaskInProgress对象管理和跟踪,只要任何一个任务尝试运行成功,TaskInProgress就会标注该任务执行成功。

 private final TaskSplitMetaInfo splitInfo;//Task要处理的Split信息
  private int numMaps;//Map Task数目,只对Reduce Task有用
  private int partition;//该Task在task列表中的索引
  private JobTracker jobtracker;//JobTracker对象,用于获取全局时钟
  private TaskID id;//task ID,其后面加下标构成Task Attempt ID
  private JobInProgress job;//该TaskInProgress所在的JobInProgress
  private final int numSlotsRequired;//运行该Task需要的slot数目

  // Status of the TIP
  private int successEventNumber = -1;
  private int numTaskFailures = 0;//Task Attempt失败次数
  private int numKilledTasks = 0;//Task Attempt被杀死次数
  private double progress = 0;//任务运行进度
  private String state = "";//运行状态
  private long startTime = 0;//TaskInProgress对象创建时间
  private long execStartTime = 0;//第一个Task Attempt开始运行时间
  private long execFinishTime = 0;//最后一个运行成功的Task Attempt完成时间
  private int completes = 0;//Task Attempt运行完成数目,实际只有两个值:0和1
  private boolean failed = false;//该TaskInProgress是否运行失败
  private boolean killed = false;//该TaskInProgress是否被杀死
  private boolean jobCleanup = false; //该TaskInProgress是否为Cleanup Task
  private boolean jobSetup = false;//该TaskInProgress是否为Setup Task

  // The 'next' usable taskid of this tip
  int nextTaskId = 0;//该TaskInProgress的下一个可用Task Attempt ID

  // The taskid that took this TIP to SUCCESS
  private TaskAttemptID successfulTaskId;//使得该TaskInProgress运行成功的那个Task ID

  // The first taskid of this tip
  private TaskAttemptID firstTaskId;//第一个运行的Task Attemp的ID

  // Map from task Id -> TaskTracker Id, contains tasks that are
  // currently runnings
  private TreeMap<TaskAttemptID, String> activeTasks = new TreeMap<TaskAttemptID, String>();//正在运行的Task ID与TaskTracker ID之间的映射关系
  // All attempt Ids of this TIP
  private TreeSet<TaskAttemptID> tasks = new TreeSet<TaskAttemptID>();//该TaskInProgress已运行的所有TaskAttempt ID,包括已经运行完成的和正在运行的
  /**
   * Map from taskId -> TaskStatus
   */
  private TreeMap<TaskAttemptID,TaskStatus> taskStatuses = 
    new TreeMap<TaskAttemptID,TaskStatus>();//Task ID与TaskStatus映射关系

  // Map from taskId -> TaskTracker Id, 
  // contains cleanup attempts and where they ran, if any
  private TreeMap<TaskAttemptID, String> cleanupTasks =
    new TreeMap<TaskAttemptID, String>();//Cleanup Task ID与TaskTracker ID映射关系

  private TreeSet<String> machinesWhereFailed = new TreeSet<String>();//所有已经运行失败的Task所在的节点列表
  private TreeSet<TaskAttemptID> tasksReportedClosed = new TreeSet<TaskAttemptID>();//某个Task Attempt运行成功后,其他所有正在运行的Task Attempt保存在该集合中

  //list of tasks to kill, <taskid> -> <shouldFail> 
  private TreeMap<TaskAttemptID, Boolean> tasksToKill = new TreeMap<TaskAttemptID, Boolean>();//待杀死的Task列表

  //task to commit, <taskattemptid>  
  private TaskAttemptID taskToCommit;//等待被提交的Task Attempt,该Task Attempt最终使得TaskInProgress运行成功

本页内容版权归属为原作者,如有侵犯您的权益,请通知我们删除。
错误一: 1.eclipse中, 当maven test的时候,报错: -Dmaven.multiModuleProjectDirectory=$M2_HOME) 2.解决办法: 可以设一个环境变量M2_HOME指向你的maven安装目录M2_HOME=D:\Apps\apache-maven-3.3.1然后在Window-Preference-Java-Installed JREs-Edit在Default VM arguments中设置-Dmaven.multiModuleProjectDirecto

CentOS7 安装 Java 8 以及Tomcat8 - 2016-05-10 14:05:32

安装 Java8 更新软件 yum update 查看是否安装了jdk java - version 如果以前已经安装就卸载 #查看内置的JDK rpm -qa | grep jdk #卸载内置的JDK yum remove java- 1.6 .0 -openjdkyum remove java- 1.7 .0 -openjdk 下载java8 1、在线安装 ### For 32 bit wget -- no -cookies -- no -check-certificate --header "Coo
Docker,云时代的程序交付方式 Docker — 云时代的程序分发方式 要说最近一年云计算业界有什么大事件?Google Compute Engine 的正式发布?Azure入华?还是AWS落地中国?留在每个人大脑中的印象可能各不相同,但要是让笔者来排名的话那么Docker绝对应该算是第一位的。 如果你之前听说过它的话,那么也许你会说“没错,就是它”,因为几乎世界各地的开发、运维都在谈论着Docker;如果你还没听说过Docker,那么我 真的建议你花上10分钟来阅读本文。 1. Docker简介 1

flume的source, channel, sink 列表 - 2016-05-10 14:05:24

Flume Source Source类型 说明 Avro Source 支持Avro协议(实际上是Avro RPC),内置支持 Thrift Source 支持Thrift协议,内置支持 Exec Source 基于Unix的command在标准输出上生产数据 JMS Source 从JMS系统(消息、主题)中读取数据,ActiveMQ已经测试过 Spooling Directory Source 监控指定目录内数据变更 Twitter 1% firehose Source 通过API持续下载Twitt
了解mapreduceV1(旧版本的mapreduce)与mapreduceV2(YARN)的区别我们需要先深入理解一下mapreduceV1的工作机制和设计思想。 首先看一下mapreduce V1的运行图解 MapReduce V1的组件及功能分别是: Client:客户端,负责编写mapreduce代码并配置和提交作业。 JobTracker:是整个mapreduce框架的核心,类似于springMVC中的DispatcherServlet负责初始化作业,分配作业并与TaskTracker进行心跳通
本讲内容: a. Exactly Once b. 输出不重复 注:本讲内容基于Spark 1.6.1版本(在2016年5月来说是Spark最新版本)讲解。 上节回顾: 上节课 通过案例透视了Spark Streaming Job架构和运行机,并结合源码进行了详细解说;同时也了解了Spark Streaming Job的容错机制,包括 Executor 与 Driver两方面的容错机制。 也就是说Job的事务处理,主要是在Executor 与 Driver两个应用中展开 开讲 首先,我们必须知道什么是事务及
5.1 Centos环境准备 Hostname IP          flating ip func   Lxp-node1 192.168.11.10 10.33.41.135   Controller+network   Lxp-node2 192.168.11.11 10.33.41.136   Compute+storage   Lxp-node3 192.168.11.12 10.33.41.139   Compute+storage     /etc/hostname都要改 /etc/hos
前面两章主要讲解了完全分布式的搭建,这章主要讲解服务器单机完成伪分布的搭建,介绍Hadoop配置,启动,以及简单测试。我的机器:阿里云服务器,64位,Java64,Hadoop2.4.1(64)  所有软件下载 百度云 密码:uup8 讲在开头:对于笔者的完全分布式环境请见该文: Hadoop完全分布式安装 写文章不易,欢迎大家采我的文章,以及给出有用的评论,当然大家也可以关注一下我的 github ;多谢; 1,Hadoop简单介绍: Apache Hadoop 是一款支持数据密集型分布式应用,并以A
第一课:通过案例对SparkStreaming透彻理解三板斧之一:解密SparkStreaming另类实验及SparkStreaming本质解析   本期导读: 1 Spark源码定制选择从SparkStreaming入手; 2 Spark Streaming另类在线实验; 3 瞬间理解SparkStreaming本质。   1.    从Spark Streaming入手开始Spark源码版本定制之路 1.1           从Spark Streaming入手Spark源码版本定制之路的理由 从今
一.neutron环境 参考文档: http://www.aboutyun.com/thread-13108-1-1.html http://docs.openstack.org/mitaka/install-guide-ubuntu/neutron-controller-install.html 1.创建neutron数据库并授权 创建 mysql -u root -p CREATE DATABASE neutron; 授权 GRANT ALL PRIVILEGES ON neutron.* TO 'n