记Hadoop2.5.0线上mapreduce任务执行map任务划分的一次问题解决

前言

近日在线上发现有些mapreduce作业的执行时间很长,我们需要解决这个问题。输入文件的大小是5G,采用了lzo压缩,整个集群的默认block大小是128M。本文将详细描述这次线上问题的排查过程。

现象

线上有一个脚本,为了便于展示,我将这个脚本重新copy了一份并重命名为zzz。这个脚本实际是使用Hadoop streaming运行一个mapreduce任务,在线上执行它的部分输出内容如下:


可以看到map任务划分为1个。这个执行过程十分漫长,我将中间的一些信息省略,map与reduce任务的执行进度如下:

16/05/16 10:22:16 INFO mapreduce.Job:  map 0% reduce 0%
16/05/16 10:22:32 INFO mapreduce.Job:  map 1% reduce 0%
。。。
16/05/16 10:44:14 INFO mapreduce.Job:  map 99% reduce 0%
16/05/16 10:44:20 INFO mapreduce.Job:  map 100% reduce 0%
16/05/16 10:44:33 INFO mapreduce.Job:  map 100% reduce 2%
16/05/16 10:44:34 INFO mapreduce.Job:  map 100% reduce 19%
。。。
16/05/16 10:44:57 INFO mapreduce.Job:  map 100% reduce 99%
16/05/16 10:44:58 INFO mapreduce.Job:  map 100% reduce 100%
从以上内容可以看到map任务执行一共耗时22分钟左右,而reduce任务只耗用了30多秒。

分析

根据以上现象分析,我们知道耗时主要发生在map任务执行的阶段。我们首先查看下这个map任务的输入内容,看到它的大小为5GB且使用lzo压缩:


如此大的输入仅仅在一个map任务中处理显然是进度缓慢的主要原因,我们需要对mapreduce的任务划分进行干预。我们查看下mapreduce任务的InputFormat,以便确定干预的手段,打开我们的脚本其中有以下代码片段:

     hadoop jar $streaming_jar \
          -D mapred.reduce.tasks=30 \
          -D mapreduce.job.maps=100 \
          -D mapreduce.input.fileinputformat.split.minsize=100000000 \
          -inputformat TextInputFormat \
          -file mr.py \
          -input $member_input \
          -output $output \
          -mapper "python mr.py map" \
          -reducer "python mr.py reduce"
我们看到甚至在脚本中还配置了mapreduce.job.maps,根据《Hadoop2.6.0的FileInputFormat的任务切分原理分析(即如何控制FileInputFormat的map任务数量)》一文的分析,我们知道此参数实际不会对map任务划分产生任何影响。查看到mapreduce作业的input format是TextInputFormat,TextInputFormat的实现见代码清单1。

代码清单1 TextInputFormat的实现

/** An {@link InputFormat} for plain text files.  Files are broken into lines.
 * Either linefeed or carriage-return are used to signal end of line.  Keys are
 * the position in the file, and values are the line of text.. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TextInputFormat extends FileInputFormat<LongWritable, Text> {

  //省略无关代码

  @Override
  protected boolean isSplitable(JobContext context, Path file) {
    final CompressionCodec codec =
      new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
    if (null == codec) {
      return true;
    }
    return codec instanceof SplittableCompressionCodec;
  }

}

我们看到TextInputFormat继承了FileInputFormat,因而根据《Hadoop2.6.0的FileInputFormat的任务切分原理分析(即如何控制FileInputFormat的map任务数量)》一文的内容,真正影响使用FileInputFormat的map任务划分的参数有:

  • dfs.blockSize
  • mapreduce.input.fileinputformat.split.minsize
  • mapreduce.input.fileinputformat.split.maxsize
我们首先来看看dfs.blockSize的大小,由于参数使用了Hadoop集群的默认配置,查看信息如下:


可以看到blockSize的大小是128M,我们不太可能为了一个mapreduce任务去更改整个Hadoop集群的配置,所以我们的重点讲放在后两个参数上。按照《Hadoop2.6.0的FileInputFormat的任务切分原理分析(即如何控制FileInputFormat的map任务数量)》一文的分析,我对mapreduce.input.fileinputformat.split.minsize和mapreduce.input.fileinputformat.split.maxsize的大小进行了各种尝试,但在运行的过程中发现map任务数量依然只有一个。难道我之前文章的内容分析的有问题?我一度陷入困境。
我休息了一会,重新查看代码清单1,发现TextInputFormat的isSplitable方法。isSplitable方法用于判断TextInputFormat是否可以进行map任务划分。由于输入文件是经过sqoop从关系型数据库抽取的,采用了lzo进行压缩,而Hadoop默认不支持压缩算法lzo,需要单独安装hadoop-lzo,查看Hadoop集群配置,发现我们之前已经做好了这方面的工作。

<property>
  <name>io.compression.codecs</name>
  <value>org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec,org.apache.hadoop.io.compress.SnappyCodec</value>
  <source>core-site.xml</source>
</property>
这说明TextInputFormat的isSplitable方法在从压缩算法工厂类CompressionCodecFactory中获取到的CompressionCodec不为null,那么TextInputFormat是否支持map任务划分就取决于com.hadoop.compression.lzo.LzoCodec是否实现了SplittableCompressionCodec接口。我们看看com.hadoop.compression.lzo.LzoCodec的实现:
/**
 * A {@link org.apache.hadoop.io.compress.CompressionCodec} for a streaming
 * <b>lzo</b> compression/decompression pair.
 * http://www.oberhumer.com/opensource/lzo/
 *
 */
public class LzoCodec extends org.anarres.lzo.hadoop.codec.LzoCodec {
}
看来com.hadoop.compression.lzo.LzoCodec继承了org.anarres.lzo.hadoop.codec.LzoCodec,再来看看org.anarres.lzo.hadoop.codec.LzoCodec:
/**
 * A {@link org.apache.hadoop.io.compress.CompressionCodec} for a streaming
 * <b>lzo</b> compression/decompression pair.
 * http://www.oberhumer.com/opensource/lzo/
 * 
 */
public class LzoCodec extends Configured implements CompressionCodec {
可以看到org.anarres.lzo.hadoop.codec.LzoCodec直接实现了CompressionCodec,并没有实现SplittableCompressionCodec接口,SplittableCompressionCodec接口实际也继承了CompressionCodec接口:
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface SplittableCompressionCodec extends CompressionCodec {
因此codec instanceof SplittableCompressionCodec这条Java语句将返回false,采用lzo压缩算法的输入文件将导致map任务不可划分,也就是不会生成多个map任务。

解决方法

使用hadoop-lzo-0.4.20-SNAPSHOT.jar提供的LzoTextInputFormat类,它实现了SplittableCompressionCodec接口。



后记:个人总结整理的《深入理解Spark:核心思想与源码分析》一书现在已经正式出版上市,目前京东、当当、天猫等网站均有销售,欢迎感兴趣的同学购买。


京东:http://item.jd.com/11846120.html 

当当:http://product.dangdang.com/23838168.html 



本页内容版权归属为原作者,如有侵犯您的权益,请通知我们删除。

Hadoop+Hive部署安装配置 - 2016-07-15 17:07:47

最近结合具体的项目,搭建了Hadoop+Hive,在运行Hive之前要首先搭建好Hadoop,关于Hadoop的搭建有三种模式,在以下的介绍中,我主要的采用的是Hadoop的伪分布安装模式。写下来给各位分享。 准备工作: 以上所有的下载的安装包和解压后文件均在/usr/local/hadoop目录 1、 分别ssh到每台服务器上 ,在root用户下修改hostname su root vim /etc/sysconfig/network 如上图所示,HOSTNAME=master vim /etc/hos
本文主要结合Spark-1.6.0的源码,对Spark中任务调度模块的执行过程进行分析。Spark Application在遇到Action操作时才会真正的提交任务并进行计算。这时Spark会根据Action操作之前一系列Transform操作的关联关系,生成一个DAG,在后续的操作中,对DAG进行Stage划分,生成Task并最终运行。整个过程如下图所示,DAGScheduler用于对Application进行分析,然后根据各RDD之间的依赖关系划分Stage,根据这些划分好的Stage,对应每个Sta

Hadoop 2.0工作原理学习 - 2016-07-15 17:07:13

1 HDFS简介 1.1 Hadoop 2.0介绍 Hadoop是Apache的一个分布式系统基础架构,可以为海量数据提供存储和计算。Hadoop 2.0即第二代Hadoop系统,其框架最核心的设计是HDFS、MapReduce和YARN。其中,HDFS为海量数据提供存储,MapReduce用于分布式计算,YARN用于进行资源管理。 Hadoop 1.0和Hadoop 2.0的结构对比: Hadoop 2.0的主要改进有: 1、通过YARN实现资源的调度与管理,从而使Hadoop 2.0可以运行更多种类的
    本文借鉴官文,添加了一些解释和看法,其中有些理解,写的比较粗糙,有问题的地方希望大家指出。写这篇文章,是想把一些官文和资料中基础、重点拿出来,能总结出便于大家理解的话语。与大多数“wordcount”代码不同的是,并不会有如何运行第一storm代码等内容,只有在运行完代码后,发现需要明白:“知其然,并知其所以然”。 Storm是什么?为什么要用Storm?为什么不用Spark? 第一个问题,以下概念足以解释: Storm是 基于数据流的实时处理系统 ,提供了大吞吐量的实时计算能力。通过数据入口获取
本文要解决的问题: 从源码级别对Spark Streaming进行简单学习。 Summarize Spark Streaming实现了对实时流数据的高吞吐量、低容错的数据处理API。它的数据来源有很多种:Kafka、Flume、Twitter、ZeroMQ、TCP Scoket等。架构图如下: Streaming接收实时流输入的数据,将其按批划分,然后交给Spark Enigne分批处理。如下图所示: StreamingContext 和SparkContext相似。要使用Spark的流处理就必须创建St
1.前言 HBase是云计算环境下最重要的NOSQL数据库,提供了基于Hadoop的数据存储、索引、查询,其最大的优点就是可以通过硬件的扩展从而几乎无限的扩展其存储和检索能力。但是HBase与传统的基于SQL语言的关系数据库无论从理念还是使用方式上都相去甚远,以至于要将基于SQL的项目移植到HBase时往往需要重写整个项目。 为了解决这个问题,很多开源项目提供了HBase的类SQL中间件,意即提供一种在HBase上使用的类SQL语言,使得程序员能够像使用关系数据库一样使用HBase, Apache Pho
为什么要用VXLAN 随着云计算数据中心的大规模建设与运营,传统的依赖VLAN技术的二层网络技术面临着越来越多的问题: vlan的数量限制   4096个vlan远不能满足大规模云计算数据中心的需求 物理网络基础设施的限制    基于IP子网的区域划分限制了需要二层网络连通性的应用负载的部署 TOR交换机MAC表耗尽     虚拟化以及东西向流量导致更多的MAC表项 多租户场景 租户可以自定义网络,且无需考虑与其他租户IP地址的重叠。 目前解决这些问题的主要方案是基于overlay的大二层网络技术。典型的
本节我们讨论 volume 的 Backup 操作。 Backup 是将 volume 备份到别的地方(备份设备),将来可以通过 restore 操作恢复。 Backup VS Snapshot 初看 backup 功能好像与 snapshot 很相似,都可以保存 volume 的当前状态,以备以后恢复。但二者在用途和实现上还是有区别的,具体表现在: Snapshot 依赖于源 volume,不能独立存在;而 backup 不依赖源 volume,即便源 volume 不存在了,也可以 restore。

LibSVM在Java中的简单应用 - 2016-07-14 18:07:36

首先,在这里首先感谢台湾林智仁先生的开源工具包libsvm。使SVM算法更加普及。大家可以到下面的libsvm官网去了解相关的信息。 Libsvm官方网站- https://www.csie.ntu.edu.tw/~cjlin/libsvm/ 其次,我在使用过程中发现,先生svm_scale文件中无法将经过规约的文件输出到本地txt文件中,只能在控制台重定向,而我并不想在程序运行中打开控制台进行较为繁琐的操作。 所以我改造了svm_scale文件,实现了文件的写入,在这里可以和大家分享一下。 改造后新增参
最新消息 Docker在上周的DockerCon技术大会上发布了1.12版核心产品Docker Engine,最大的新特性是Docker Swarm已经被整合到了Docker Engine里面而不再是一个单独的工具了,这样就可以更容易的把多个Docker主机组合成一整个规模更大可靠性更高的逻辑单元。Docker的掌舵者 Adrian Mouat相信这种新的集群模式可以大大增强Docker在相关领域的竞争力。 把Docker Swarm整合进Docker Engine是一个重大改进,但它也只是一个附加功能,