MapReduce shuffle过程剖析及调优

更新记录

  • 2017-07-18 初稿

MapReduce简介

在Hadoop MapReduce中,框架会确保reduce收到的输入数据是根据key排序过的。数据从Mapper输出到Reducer接收,是一个很复杂的过程,框架处理了所有问题,并提供了很多配置项及扩展点。一个MapReduce的大致数据流如下图:

这里写图片描述

更详细的MapReduce介绍参考Hadoop MapReduce原理与实例

Mapper的输出排序、然后传送到Reducer的过程,称为shuffle。本文详细地解析shuffle过程,深入理解这个过程对于MapReduce调优至关重要,某种程度上说,shuffle过程是MapReduce的核心内容。

Mapper端

当map函数通过context.write()开始输出数据时,不是单纯地将数据写入到磁盘。为了性能,map输出的数据会写入到缓冲区,并进行预排序的一些工作,整个过程如下图:

这里写图片描述

环形Buffer数据结构

每一个map任务有一个环形Buffer,map将输出写入到这个Buffer。环形Buffer是内存中的一种首尾相连的数据结构,专门用来存储Key-Value格式的数据:

这里写图片描述

Hadoop中,环形缓冲其实就是一个字节数组:

// MapTask.java
private byte[] kvbuffer;  // main output buffer

kvbuffer = new byte[maxMemUsage - recordCapacity]; 

kvbuffer包含数据区和索引区,这两个区是相邻不重叠的区域,用一个分界点来标识。分界点不是永恒不变的,每次Spill之后都会更新一次。初始分界点为0,数据存储方向为向上增长,索引存储方向向下:

这里写图片描述

bufferindex一直往上增长,例如最初为0,写入一个int类型的key之后变为4,写入一个int类型的value之后变成8。

索引是对key-value在kvbuffer中的索引,是个四元组,占用四个Int长度,包括:

  • value的起始位置
  • key的起始位置
  • partition值
  • value的长度
private static final int VALSTART = 0;    // val offset in acct
private static final int KEYSTART = 1;    // key offset in acct
private static final int PARTITION = 2;   // partition offset in acct
private static final int VALLEN = 3;      // length of value
private static final int NMETA = 4;       // num meta ints
private static final int METASIZE = NMETA * 4; // size in bytes
 // write accounting info
kvmeta.put(kvindex + PARTITION, partition);
kvmeta.put(kvindex + KEYSTART, keystart);
kvmeta.put(kvindex + VALSTART, valstart);
kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));

kvmeta的存放指针kvindex每次都是向下跳四个“格子”,然后再向上一个格子一个格子地填充四元组的数据。比如kvindex初始位置是-4,当第一个key-value写完之后,(kvindex+0)的位置存放value的起始位置、(kvindex+1)的位置存放key的起始位置、(kvindex+2)的位置存放partition的值、(kvindex+3)的位置存放value的长度,然后kvindex跳到-8位置。

缓冲区的大小默认为100M,但是可以通过mapreduce.task.io.sort.mb这个属性来配置。

Spill

map将输出不断写入到这个缓冲区中,当缓冲区使用量达到一定比例之后,一个后台线程开始把缓冲区的数据写入磁盘,这个写入的过程叫spill。开始spill的Buffer比例默认为0.80,可以通过mapreduce.map.sort.spill.percent配置。在后台线程写入的同时,map继续将输出写入这个环形缓冲,如果缓冲池写满了,map会阻塞直到spill过程完成,而不会覆盖缓冲池中的已有的数据。

在写入之前,后台线程把数据按照他们将送往的reducer进行划分,通过调用PartitionergetPartition()方法就能知道该输出要送往哪个Reducer。假设作业有2个reduce任务,则数据在内存中被划分为reduce1和reduce2:

这里写图片描述

并且针对每部分数据,使用快速排序算法(QuickSort)对key排序。

如果设置了Combiner,则在排序的结果上运行combine。

排序后的数据被写入到mapreduce.cluster.local.dir配置的目录中的其中一个,使用round robin fashion的方式轮流。注意写入的是本地文件目录,而不是HDFS。Spill文件名像sipll0.out,spill1.out等。

不同Partition的数据都放在同一个文件,通过索引来区分partition的边界和起始位置。索引是一个三元组结构,包括起始位置、数据长度、压缩后的数据长度,对应IndexRecord类:

public class IndexRecord {
  public long startOffset;
  public long rawLength;
  public long partLength;

  public IndexRecord() { }

  public IndexRecord(long startOffset, long rawLength, long partLength) {
    this.startOffset = startOffset;
    this.rawLength = rawLength;
    this.partLength = partLength;
  }
}

每个mapper也有对应的一个索引环形Buffer,默认为1KB,可以通过mapreduce.task.index.cache.limit.bytes来配置,索引如果足够小则存在内存中,如果内存放不下,需要写入磁盘。
Spill文件索引名称类似这样 spill110.out.index, spill111.out.index。

Spill文件的索引事实上是 org.apache.hadoop.mapred.SpillRecord的一个数组,每个Map任务(源码中的MapTask.java类)维护一个这样的列表:

final ArrayList<SpillRecord> indexCacheList = new ArrayList<SpillRecord>();

创建一个SpillRecord时,会分配(Number_Of_Reducers * 24)Bytes缓冲:

public SpillRecord(int numPartitions) {
  buf = ByteBuffer.allocate(
      numPartitions * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
  entries = buf.asLongBuffer();
}

numPartitions是Partition的个数,其实也就是Reducer的个数:

public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;

// ---

partitions = jobContext.getNumReduceTasks();
final SpillRecord spillRec = new SpillRecord(partitions);

默认的索引缓冲为1KB,即1024*1024 Bytes,假设有2个Reducer,则每个Spill文件的索引大小为2*24=48 Bytes,当Spill文件超过21845.3时,索引文件就需要写入磁盘。

索引及spill文件如下图示意:

这里写图片描述

Spill的过程至少需要运行一次,因为Mapper的输出结果必须要写入磁盘,供Reducer进一步处理。

合并Spill文件

在整个map任务中,一旦缓冲达到设定的阈值,就会触发spill操作,写入spill文件到磁盘,因此最后可能有多个spill文件。在map任务结束之前,这些文件会根据情况合并到一个大的分区的、排序的文件中,排序是在内存排序的基础上进行全局排序。下图是合并过程的简单示意:

这里写图片描述

相对应的索引文件也会被合并,以便在Reducer请求对应Partition的数据的时候能够快速读取。

另外,如果spill文件数量大于mapreduce.map.combiner.minspills配置的数,则在合并文件写入之前,会再次运行combiner。如果spill文件数量太少,运行combiner的收益可能小于调用的代价。

mapreduce.task.io.sort.factor属性配置每次最多合并多少个文件,默认为10,即一次最多合并10个spill文件。最后,多轮合并之后,所有的输出文件被合并为唯一一个大文件,以及相应的索引文件(可能只在内存中存在)。

压缩

在数据量大的时候,对map输出进行压缩通常是个好主意。要启用压缩,将mapreduce.map.output.compress设为true,并使用mapreduce.map.output.compress.codec设置使用的压缩算法。

通过HTTP暴露输出结果

map输出数据完成之后,通过运行一个HTTP Server暴露出来,供reduce端获取。用来相应reduce数据请求的线程数量可以配置,默认情况下为机器内核数量的两倍,如需自己配置,通过mapreduce.shuffle.max.threads属性来配置,注意该配置是针对NodeManager配置的,而不是每个作业配置。

同时,Map任务完成后,也会通知Application Master,以便Reducer能够及时来拉取数据。

通过缓冲、划分(partition)、排序、combiner、合并、压缩等过程之后,map端的工作就算完毕:

这里写图片描述

Reducer端

各个map任务运行完之后,输出写入运行任务的机器磁盘中。Reducer需要从各map任务中提取自己的那一部分数据(对应的partition)。每个map任务的完成时间可能是不一样的,reduce任务在map任务结束之后会尽快取走输出结果,这个阶段叫copy。
Reducer是如何知道要去哪些机器去数据呢?一旦map任务完成之后,就会通过常规心跳通知应用程序的Application Master。reduce的一个线程会周期性地向master询问,直到提取完所有数据(如何知道提取完?)。

数据被reduce提走之后,map机器不会立刻删除数据,这是为了预防reduce任务失败需要重做。因此map输出数据是在整个作业完成之后才被删除掉的。

reduce维护几个copier线程,并行地从map任务机器提取数据。默认情况下有5个copy线程,可以通过mapreduce.reduce.shuffle.parallelcopies配置。

如果map输出的数据足够小,则会被拷贝到reduce任务的JVM内存中。mapreduce.reduce.shuffle.input.buffer.percent配置JVM堆内存的多少比例可以用于存放map任务的输出结果。如果数据太大容不下,则被拷贝到reduce的机器磁盘上。

内存中合并

当缓冲中数据达到配置的阈值时,这些数据在内存中被合并、写入机器磁盘。阈值有2种配置方式:

  • 配置内存比例: 前面提到reduce JVM堆内存的一部分用于存放来自map任务的输入,在这基础之上配置一个开始合并数据的比例。假设用于存放map输出的内存为500M,mapreduce.reduce.shuffle.merger.percent配置为0.80,则当内存中的数据达到400M的时候,会触发合并写入。
  • 配置map输出数量: 通过mapreduce.reduce.merge.inmem.threshold配置。

在合并的过程中,会对被合并的文件做全局的排序。如果作业配置了Combiner,则会运行combine函数,减少写入磁盘的数据量。

Copy过程中磁盘合并

在copy过来的数据不断写入磁盘的过程中,一个后台线程会把这些文件合并为更大的、有序的文件。如果map的输出结果进行了压缩,则在合并过程中,需要在内存中解压后才能给进行合并。这里的合并只是为了减少最终合并的工作量,也就是在map输出还在拷贝时,就开始进行一部分合并工作。合并的过程一样会进行全局排序。

最终磁盘中合并

当所有map输出都拷贝完毕之后,所有数据被最后合并成一个排序的文件,作为reduce任务的输入。这个合并过程是一轮一轮进行的,最后一轮的合并结果直接推送给reduce作为输入,节省了磁盘操作的一个来回。最后(所以map输出都拷贝到reduce之后)进行合并的map输出可能来自合并后写入磁盘的文件,也可能来及内存缓冲,在最后写入内存的map输出可能没有达到阈值触发合并,所以还留在内存中。

每一轮合并并不一定合并平均数量的文件数,指导原则是使用整个合并过程中写入磁盘的数据量最小,为了达到这个目的,则需要最终的一轮合并中合并尽可能多的数据,因为最后一轮的数据直接作为reduce的输入,无需写入磁盘再读出。因此我们让最终的一轮合并的文件数达到最大,即合并因子的值,通过mapreduce.task.io.sort.factor来配置。

假设现在有50个map输出文件,合并因子配置为10,则需要5轮的合并。最终的一轮确保合并10个文件,其中包括4个来自前4轮的合并结果,因此原始的50个中,再留出6个给最终一轮。所以最后的5轮合并可能情况如下:

这里写图片描述

前4轮合并后的数据都是写入到磁盘中的,注意到最后的2格颜色不一样,是为了标明这些数据可能直接来自于内存。

MemToMem合并

除了内存中合并和磁盘中合并外,Hadoop还定义了一种MemToMem合并,这种合并将内存中的map输出合并,然后再写入内存。这种合并默认关闭,可以通过reduce.merge.memtomem.enabled打开,当map输出文件达到reduce.merge.memtomem.threshold时,触发这种合并。

最后一次合并后传递给reduce方法

合并后的文件作为输入传递给Reducer,Reducer针对每个key及其排序的数据调用reduce函数。产生的reduce输出一般写入到HDFS,reduce输出的文件第一个副本写入到当前运行reduce的机器,其他副本选址原则按照常规的HDFS数据写入原则来进行,详细信息请参考这里

通过从map机器提取结果,合并,combine之后,传递给reduce完成最后工作,整个过程也就差不多完成。最后再感受一下下面这张图:

这里写图片描述

性能调优

如果能够根据情况对shuffle过程进行调优,对于提供MapReduce性能很有帮助。相关的参数配置列在后面的表格中。

一个通用的原则是给shuffle过程分配尽可能大的内存,当然你需要确保map和reduce有足够的内存来运行业务逻辑。因此在实现Mapper和Reducer时,应该尽量减少内存的使用,例如避免在Map中不断地叠加。

运行map和reduce任务的JVM,内存通过mapred.child.java.opts属性来设置,尽可能设大内存。容器的内存大小通过mapreduce.map.memory.mbmapreduce.reduce.memory.mb来设置,默认都是1024M。

map优化

在map端,避免写入多个spill文件可能达到最好的性能,一个spill文件是最好的。通过估计map的输出大小,设置合理的mapreduce.task.io.sort.*属性,使得spill文件数量最小。例如尽可能调大mapreduce.task.io.sort.mb

map端相关的属性如下表:

属性名 值类型 默认值 说明
mapreduce.task.io.sort.mb int 100 用于map输出排序的内存大小
mapreduce.map.sort.spill.percent float 0.80 开始spill的缓冲池阈值
mapreduce.task.io.sort.factor int 10 合并文件数最大值,与reduce共用
mapreduce.map.combine.minspills int 3 运行combiner的最低spill文件数
mapreduce.map.out.compress boolean false 输出是否压缩
mapreduce.map.out.compress 类名 DefaultCodec 压缩算法
mapreduce.shuffle.max.threads int 0 服务于reduce提取结果的线程数量

reduce优化

在reduce端,如果能够让所有数据都保存在内存中,可以达到最佳的性能。通常情况下,内存都保留给reduce函数,但是如果reduce函数对内存需求不是很高,将mapreduce.reduce.merge.inmem.threshold(触发合并的map输出文件数)设为0,mapreduce.reduce.input.buffer.percent(用于保存map输出文件的堆内存比例)设为1.0,可以达到很好的性能提升。在2008年的TB级别数据排序性能测试中,Hadoop就是通过将reduce的中间数据都保存在内存中胜利的。

reduce端相关属性:

属性名 值类型 默认值 说明
mapreduce.reduce.shuffle.parallelcopies int 5 提取map输出的copier线程数
mapreduce.reduce.shuffle.maxfetchfailures int 10 提取map输出最大尝试次数,超出后报错
mapreduce.task.io.sort.factor int 10 合并文件数最大值,与map共用
mapreduce.reduce.shuffle.input.buffer.percent float 0.70 copy阶段用于保存map输出的堆内存比例
mapreduce.reduce.shuffle.merge.percent float 0.66 开始spill的缓冲池比例阈值
mapreduce.reduce.shuffle.inmem.threshold int 1000 开始spill的map输出文件数阈值,小于等于0表示没有阈值,此时只由缓冲池比例来控制
mapreduce.reduce.input.buffer.percent float 0.0 reduce函数开始运行时,内存中的map输出所占的堆内存比例不得高于这个值,默认情况内存都用于reduce函数,也就是map输出都写入到磁盘

通用优化

Hadoop默认使用4KB作为缓冲,这个算是很小的,可以通过io.file.buffer.size来调高缓冲池大小。

参考

本页内容版权归属为原作者,如有侵犯您的权益,请通知我们删除。

Spark的广播和累加器的使用 - 2016-07-18 14:07:48

一、广播变量和累加器 1.1 广播变量: 广播变量允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。广播变量可被用于有效地给每个节点一个大输入数据集的副本。Spark还尝试使用高效地广播算法来分发变量,进而减少通信的开销。 Spark的动作通过一系列的步骤执行,这些步骤由分布式的洗牌操作分开。Spark自动地广播每个步骤每个任务需要的通用数据。这些广播数据被序列化地缓存,在运行任务之前被反序列化出来。这意味着当我们需要在多个阶段的任务之间使用相同的数据,或者以反序列化形式缓存数据是十分
引言: 对于刚接触ES的童鞋,经常搞不明白ES的各个概念的含义。尤其对“索引”二字更是与关系型数据库混淆的不行。本文通过对比关系型数据库,将ES中常见的增、删、改、查操作进行图文呈现。能加深你对ES的理解。同时,也列举了kibana下的图形化展示。 ES Restful API GET、POST、PUT、DELETE、HEAD含义: 1)GET:获取请求对象的当前状态。 2)POST:改变对象的当前状态。 3)PUT:创建一个对象。 4)DELETE:销毁对象。 5)HEAD:请求获取对象的基础信息。 M
设计原理 kafka的 设计 初衷是希望作为一个 统一的信息收集平台 , 能够实时的收集反馈信息 ,并需要 能够支撑较大的数据量 , 且具备良好的容错能力. 持久性 kafka 使用文件存储消息 ,这就直接决定kafka在性能上严重依赖文件系统的本身特性.且无论任何OS下,对文件系统本身的优化几乎没有可能.文件缓存/直接内存映射等是常用的手段.因为kafka是对日志文件进行append操作,因此磁盘检索的开支是较小的;同时 为了减少磁盘写入的次数,broker 会将消息暂时buffer起来,当消息的个数(
前言 人们对人脸识别的研究已经有很长一段时间,起初局限于获取基础的人脸信息,随着机器学习领域的发展,人脸识别的应用更加广泛,已经可以被用于人脸搜索、人脸鉴权等相关应用。本文将针对微软认知服务中提供的人脸识别API的调用方法进行一些初步的讲解。 Face API Face API中提供了3方面功能: 人脸检测 人脸分组 人脸识别(搜索) 首先是人脸检测,主要是指传统概念上的人脸识别功能,识别图片中的人的面孔,给出人脸出现的坐标区域,并根据识别出来的人脸分析出一些基本的信息(例如年龄)。 其次是人脸分组,可以

Hadoop MapReduce原理及实例 - 2016-07-17 17:07:30

MapReduce是用于数据处理的一种编程模型,简单但足够强大,专门为并行处理大数据而设计。 1. 通俗理解MapReduce MapReduce的处理过程分为两个步骤:map和reduce。每个阶段的输入输出都是key-value的形式,key和value的类型可以自行指定。map阶段对切分好的数据进行并行处理,处理结果传输给reduce,由reduce函数完成最后的汇总。 例如从大量历史数据中找出往年最高气温,NCDC公开了过去每一年的所有气温等天气数据的检测,每一行记录一条观测记录,格式如下: 为了
前几天有幸参加了OpenStack Days China的两天技术峰会,集合了全球及国内顶尖的OpenStack技术专家,为我们分享了许多关于OpenStack的技术报告。 有许多人参加类似技术峰会都有这些感受: 1、一般主会场的领导和院士发言基本没有什么干货,也就是对我们实际工作没有太大帮助 2、一般讲的不错的都是公司的CEO、CTO等,但是他们都是公司商业因素占据很多,技术并不是他们实干,好像也没有什么收获 3、一般分享干货的实践者往往讲的水平一般,枯燥乏味,太干了,干的让人无法消化 当然,我觉得现如

Flume性能测试报告 - 2016-07-16 17:07:11

1. 测试环境 1.1 硬件 CPU:Intel(R) Core(TM) i7-6700 CPU @ 3.40GHz(8核) 内存:16G 1.2 软件 Flume:1.6.0 Hadoop:2.6.0-cdh5.5.0 Kfaka:2.11-0.9.0.1 JDK:1.8.0_91-b14 64位 1.3 测试文件 文件大小:107M ,共490010条记录 1.4 Flume配置 (1)Source配置 Flume Source采用spooldir的方式,直接读取预先准备好的测试文件。 agent .
我最近研究了hive的相关技术,有点心得,这里和大家分享下。 首先我们要知道hive到底是做什么的。下面这几段文字很好的描述了hive的特性: 1.hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供完整的sql查询功能,可以将sql语句转换为MapReduce任务进行运行。其优点是学习成本低,可以通过类SQL语句快速实现简单的MapReduce统计,不必开发专门的MapReduce应用,十分适合数据仓库的统计分析。 2.Hive是建立在 Hadoop 上的数据仓

webSocket使用教程 - 2016-07-16 14:07:11

webSocket使用教程 Gson简介 GSON是Google开发的Java API,用于转换Java对象和Json对象 gson和其他现有java json类库最大的不同时gson需要序列化的实体类不需要使用annotation来标识需要序列化的字段,同时gson又可以 通过使用annotation来灵活配置需要序列化的字段。在Java开发中,有时需要保存一个数据结构成字符串,可能你会考虑用Json,但是当 Json字符串转换成Java对象时,转换成的是JsonObject,并不是你想要的Class类
目录 目录 前言 Openstack基金委员会 Openstack贡献者须知 注册Openstack In Launchpad 生成并上传OpenPGP密钥 生成并上传SSH公钥 Join The OpenStack Foundation 签署CLA贡献者协议 参考资料 前言 由Openstack基金委员会管理的Openstack社区,现在已经成为了全球第二大开源社区仅次于Linux社区,所以也有人将Openstack定义为下一个Linux。就从我个人角度出发,我认为Openstack和Linux不属于同