kafka学习之路(三)——高级

设计原理

kafka的设计初衷是希望作为一个统一的信息收集平台,能够实时的收集反馈信息,并需要能够支撑较大的数据量,且具备良好的容错能力.

持久性

kafka使用文件存储消息,这就直接决定kafka在性能上严重依赖文件系统的本身特性.且无论任何OS下,对文件系统本身的优化几乎没有可能.文件缓存/直接内存映射等是常用的手段.因为kafka是对日志文件进行append操作,因此磁盘检索的开支是较小的;同时为了减少磁盘写入的次数,broker会将消息暂时buffer起来,当消息的个数(或尺寸)达到一定阀值时,再flush到磁盘,这样减少了磁盘IO调用的次数.

 


性能

需要考虑的影响性能点很多,除磁盘IO之外,我们还需要考虑网络IO,这直接关系到kafka的吞吐量问题.kafka并没有提供太多高超的技巧;对于producer端,可以将消息buffer起来,当消息的条数达到一定阀值时,批量发送给broker;对于consumer端也是一样,批量fetch多条消息.不过消息量的大小可以通过配置文件来指定.对于kafka broker端,有个sendfile系统调用可以潜在的提升网络IO的性能:将文件的数据映射到系统内存中,socket直接读取相应的内存区域即可,而无需进程再次copy和交换. 其实对于producer/consumer/broker三者而言,CPU的开支应该都不大,因此启用消息压缩机制是一个良好的策略;压缩需要消耗少量的CPU资源,不过对于kafka而言,网络IO更应该需要考虑.可以将任何在网络上传输的消息都经过压缩.kafka支持gzip/snappy等多种压缩方式.


生产者

负载均衡: producer将会和Topic下所有partition leader保持socket连接;消息由producer直接通过socket发送到broker,中间不会经过任何"路由层".事实上,消息被路由到哪个partition上,由producer客户端决定.比如可以采用"random""key-hash""轮询"等,如果一个topic中有多个partitions,那么在producer端实现"消息均衡分发"是必要的.

其中partition leader的位置(host:port)注册在zookeeper中,producer作为zookeeper client,已经注册了watch用来监听partition leader的变更事件.

异步发送:将多条消息暂且在客户端buffer起来,并将他们批量的发送到broker,小数据IO太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率。不过这也有一定的隐患,比如说当producer失效时,那些尚未发送的消息将会丢失。


消费者

consumer端向broker发送"fetch"请求,并告知其获取消息的offset;此后consumer将会获得一定条数的消息;consumer端也可以重置offset来重新消费消息.

在JMS实现中,Topic模型基于push方式,即broker将消息推送给consumer端.不过在kafka中,采用了pull方式,即consumer在和broker建立连接之后,主动去pull(或者说fetch)消息;这中模式有些优点,首先consumer端可以根据自己的消费能力适时的去fetch消息并处理,且可以控制消息消费的进度(offset);此外,消费者可以良好的控制消息消费的数量,batch fetch.

其他JMS实现,消息消费的位置是由producer保留,以便避免重复发送消息或者将没有消费成功的消息重发等,同时还要控制消息的状态.这就要求JMS broker需要太多额外的工作.在kafka中,partition中的消息只有一个consumer在消费,且不存在消息状态的控制,也没有复杂的消息确认机制,可见kafkabroker端是相当轻量级的.当消息被consumer接收之后,consumer可以在本地保存最后消息的offset,并间歇性的向zookeeper注册offset.由此可见,consumer客户端也很轻量级.

 

消息传送机制

对于JMS实现,消息传输担保非常直接:有且只有一次(exactly once).在kafka中稍有不同:

1) at most once: 最多一次,这个和JMS中"非持久化"消息类似.发送一次,无论成败,将不会重发.

2) at least once: 消息至少发送一次,如果消息未能接受成功,可能会重发,直到接收成功.

3) exactly once: 消息只会发送一次.

at most once: 消费者fetch消息,然后保存offset,然后处理消息;当client保存offset之后,但是在消息处理过程中出现了异常,导致部分消息未能继续处理.那么此后"未处理"的消息将不能被fetch到,这就是"atmost once".

at least once: 消费者fetch消息,然后处理消息,然后保存offset.如果消息处理成功之后,但是在保存offset阶段zookeeper异常导致保存操作未能执行成功,这就导致接下来再次fetch时可能获得上次已经处理过的消息,这就是"at least once",原因offset没有及时的提交给zookeeper,zookeeper恢复正常还是之前offset状态.

exactly once: kafka中并没有严格的去实现(基于2阶段提交,事务),我们认为这种策略在kafka中是没有必要的.

通常情况下"at-least-once"是我们首选.(相比at most once而言,重复接收数据总比丢失数据要好).

 


6、复制备份

kafka将每个partition数据复制到多个server上,任何一个partition有一个leader和多个follower(可以没有);备份的个数可以通过broker配置文件来设定.leader处理所有的read-write请求,follower需要和leader保持同步.follower和consumer一样,消费消息并保存在本地日志中;leader负责跟踪所有的follower状态,如果follower"落后"太多或者失效,leader将会把它从replicas同步列表中删除.当所有的follower都将一条消息保存成功,此消息才被认为是"committed",那么此时consumer才能消费它.即使只有一个replicas实例存活,仍然可以保证消息的正常发送和接收,只要zookeeper集群存活即可.(不同于其他分布式存储,比如hbase需要"多数派"存活才行)

当leader失效时,需在followers中选取出新的leader,可能此时follower落后于leader,因此需要选择一个"up-to-date"的follower.选择follower时需要兼顾一个问题,就是新leaderserver上所已经承载的partition leader的个数,如果一个server上有过多的partition leader,意味着此server将承受着更多的IO压力.在选举新leader,需要考虑到"负载均衡".


日志

如果一个topic的名称为"my_topic",它有2个partitions,那么日志将会保存在my_topic_0和my_topic_1两个目录中;日志文件中保存了一序列"log entries"(日志条目),每个log entry格式为"4个字节的数字N表示消息的长度" + "N个字节的消息内容";每个日志都有一个offset来唯一的标记一条消息,offset的值为8个字节的数字,表示此消息在此partition中所处的起始位置..每个partition在物理存储层面,有多个logfile组成(称为segment).segmentfile的命名为"最小offset".kafka.例如"00000000000.kafka";其中"最小offset"表示此segment中起始消息的offset.

 

其中每个partiton中所持有的segments列表信息会存储在zookeeper中.

当segment文件尺寸达到一定阀值时(可以通过配置文件设定,默认1G),将会创建一个新的文件;当buffer中消息的条数达到阀值时将会触发日志信息flush到日志文件中,同时如果"距离最近一次flush的时间差"达到阀值时,也会触发flush到日志文件.如果broker失效,极有可能会丢失那些尚未flush到文件的消息.因为server意外实现,仍然会导致log文件格式的破坏(文件尾部),那么就要求当server启东是需要检测最后一个segment的文件结构是否合法并进行必要的修复.

获取消息时,需要指定offset和最大chunk尺寸,offset用来表示消息的起始位置,chunk size用来表示最大获取消息的总长度(间接的表示消息的条数).根据offset,可以找到此消息所在segment文件,然后根据segment的最小offset取差值,得到它在file中的相对位置,直接读取输出即可.

日志文件的删除策略非常简单:启动一个后台线程定期扫描log file列表,把保存时间超过阀值的文件直接删除(根据文件的创建时间).为了避免删除文件时仍然有read操作(consumer消费),采取copy-on-write方式.

简单来说,在复制一个对象时并不是真的在内存中把原来对象的数据复制一份到另外一个地址,而是在新对象的内存映射表中指向同原对象相同的位置,并且把那块内存的 Copy-On-Write位设为 1。在对这个对象执行读操作的时候,内存数据没有变动,直接执行就可以。在写的时候,才真正将原始对象复制一份到新的地址,修改新对象的内存映射表到这个新的位置,然后往这里写。


分配

kafka使用zookeeper来存储一些meta信息,并使用了zookeeperwatch机制来发现meta信息的变更并作出相应的动作(比如consumer失效,触发负载均衡等)

1) Broker node registry: 当一个kafkabroker启动后,首先会向zookeeper注册自己的节点信息(临时znode),同时当broker和zookeeper断开连接时,此znode也会被删除.

格式: /broker/ids/[0...N] -->host:port;其中[0..N]表示broker id,每个broker的配置文件中都需要指定一个数字类型的id(全局不可重复),znode的值为此broker的host:port信息.

2) Broker Topic Registry: 当一个broker启动时,会向zookeeper注册自己持有的topic和partitions信息,仍然是一个临时znode.

格式: /broker/topics/[topic]/[0...N] 其中[0..N]表示partition索引号.

3) Consumer and Consumer group: 每个consumer客户端被创建时,会向zookeeper注册自己的信息;此作用主要是为了"负载均衡".

一个group中的多个consumer可以交错的消费一个topic的所有partitions;简而言之,保证此topic的所有partitions都能被此group所消费,且消费时为了性能考虑,让partition相对均衡的分散到每个consumer上.

4) Consumer id Registry: 每个consumer都有一个唯一的ID(host:uuid,可以通过配置文件指定,也可以由系统生成),此id用来标记消费者信息.

格式:/consumers/[group_id]/ids/[consumer_id]

仍然是一个临时的znode,此节点的值为{"topic_name":#streams...},即表示此consumer目前所消费的topic + partitions列表.

5) Consumer offset Tracking: 用来跟踪每个consumer目前所消费的partition中最大的offset.

格式:/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]-->offset_value

此znode为持久节点,可以看出offset跟group_id有关,以表明当group中一个消费者失效,其他consumer可以继续消费.

6) Partition Owner registry: 用来标记partition被哪个consumer消费.临时znode格式:

/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]-->consumer_node_id当consumer启动时,所触发的操作:

A) 首先进行"Consumer id Registry";

B) 然后在"Consumer id Registry"节点下注册一个watch用来监听当前group中其他consumer的"leave"和"join";只要此znode path下节点列表变更,都会触发此group下consumer的负载均衡.(比如一个consumer失效,那么其他consumer接管partitions).

C) 在"Broker id registry"节点下,注册一个watch用来监听broker的存活情况;如果broker列表变更,将会触发所有的groups下的consumer重新balance.

 

1) Producer端使用zookeeper用来"发现"broker列表,以及和Topic下每个partitionleader建立socket连接并发送消息.

2) Broker端使用zookeeper用来注册broker信息,已及监测partitionleader存活性.

3) Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader建立socket连接,并获取消息.


本页内容版权归属为原作者,如有侵犯您的权益,请通知我们删除。
前言 人们对人脸识别的研究已经有很长一段时间,起初局限于获取基础的人脸信息,随着机器学习领域的发展,人脸识别的应用更加广泛,已经可以被用于人脸搜索、人脸鉴权等相关应用。本文将针对微软认知服务中提供的人脸识别API的调用方法进行一些初步的讲解。 Face API Face API中提供了3方面功能: 人脸检测 人脸分组 人脸识别(搜索) 首先是人脸检测,主要是指传统概念上的人脸识别功能,识别图片中的人的面孔,给出人脸出现的坐标区域,并根据识别出来的人脸分析出一些基本的信息(例如年龄)。 其次是人脸分组,可以

Hadoop MapReduce原理及实例 - 2016-07-17 17:07:30

MapReduce是用于数据处理的一种编程模型,简单但足够强大,专门为并行处理大数据而设计。 1. 通俗理解MapReduce MapReduce的处理过程分为两个步骤:map和reduce。每个阶段的输入输出都是key-value的形式,key和value的类型可以自行指定。map阶段对切分好的数据进行并行处理,处理结果传输给reduce,由reduce函数完成最后的汇总。 例如从大量历史数据中找出往年最高气温,NCDC公开了过去每一年的所有气温等天气数据的检测,每一行记录一条观测记录,格式如下: 为了
前几天有幸参加了OpenStack Days China的两天技术峰会,集合了全球及国内顶尖的OpenStack技术专家,为我们分享了许多关于OpenStack的技术报告。 有许多人参加类似技术峰会都有这些感受: 1、一般主会场的领导和院士发言基本没有什么干货,也就是对我们实际工作没有太大帮助 2、一般讲的不错的都是公司的CEO、CTO等,但是他们都是公司商业因素占据很多,技术并不是他们实干,好像也没有什么收获 3、一般分享干货的实践者往往讲的水平一般,枯燥乏味,太干了,干的让人无法消化 当然,我觉得现如

Flume性能测试报告 - 2016-07-16 17:07:11

1. 测试环境 1.1 硬件 CPU:Intel(R) Core(TM) i7-6700 CPU @ 3.40GHz(8核) 内存:16G 1.2 软件 Flume:1.6.0 Hadoop:2.6.0-cdh5.5.0 Kfaka:2.11-0.9.0.1 JDK:1.8.0_91-b14 64位 1.3 测试文件 文件大小:107M ,共490010条记录 1.4 Flume配置 (1)Source配置 Flume Source采用spooldir的方式,直接读取预先准备好的测试文件。 agent .
我最近研究了hive的相关技术,有点心得,这里和大家分享下。 首先我们要知道hive到底是做什么的。下面这几段文字很好的描述了hive的特性: 1.hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供完整的sql查询功能,可以将sql语句转换为MapReduce任务进行运行。其优点是学习成本低,可以通过类SQL语句快速实现简单的MapReduce统计,不必开发专门的MapReduce应用,十分适合数据仓库的统计分析。 2.Hive是建立在 Hadoop 上的数据仓

webSocket使用教程 - 2016-07-16 14:07:11

webSocket使用教程 Gson简介 GSON是Google开发的Java API,用于转换Java对象和Json对象 gson和其他现有java json类库最大的不同时gson需要序列化的实体类不需要使用annotation来标识需要序列化的字段,同时gson又可以 通过使用annotation来灵活配置需要序列化的字段。在Java开发中,有时需要保存一个数据结构成字符串,可能你会考虑用Json,但是当 Json字符串转换成Java对象时,转换成的是JsonObject,并不是你想要的Class类
目录 目录 前言 Openstack基金委员会 Openstack贡献者须知 注册Openstack In Launchpad 生成并上传OpenPGP密钥 生成并上传SSH公钥 Join The OpenStack Foundation 签署CLA贡献者协议 参考资料 前言 由Openstack基金委员会管理的Openstack社区,现在已经成为了全球第二大开源社区仅次于Linux社区,所以也有人将Openstack定义为下一个Linux。就从我个人角度出发,我认为Openstack和Linux不属于同
文本详细介绍了HDFS中的许多概念,对于理解Hadoop分布式文件系统很有帮助。 1. 介绍 在现代的企业环境中,单机容量往往无法存储大量数据,需要跨机器存储。统一管理分布在集群上的文件系统称为分布式文件系统。而一旦在系统中,引入网络,就不可避免地引入了所有网络编程的复杂性,例如挑战之一是如果保证在节点不可用的时候数据不丢失。 传统的网络文件系统(NFS)虽然也称为分布式文件系统,但是其存在一些限制。由于NFS中,文件是存储在单机上,因此无法提供可靠性保证,当很多客户端同时访问NFS Server时,很容
首先,在这里首先感谢台湾林智仁先生的开源工具包libsvm。使SVM算法更加普及。大家可以到下面的libsvm官网去了解相关的信息。 Libsvm官方网站- https://www.csie.ntu.edu.tw/~cjlin/libsvm/ 其次,我在使用过程中发现,先生svm_scale文件中无法将经过规约的文件输出到本地txt文件中,只能在控制台重定向,而我并不想在程序运行中打开控制台进行较为繁琐的操作。 所以我改造了svm_scale文件,实现了文件的写入,在这里可以和大家分享一下。 改造后新增参

Sqoop-1.4.5用户手册 - 2016-07-15 17:07:22

本文以 Sqoop User Guide (v1.4.5) 为主,对Sqoop-1.4.5的用户手册进行翻译,同时会结合一些实际操作中的注意事项一并写入。由于原文档很长,本文首先会以实际使用到的部分为主,逐步进行完善。 1、Introduction Sqoop是一个用于在Hadoop和关系型数据库之间流转数据的一个工具。可以使用Sqoop将数据从关系型数据库系统(RDBMS)比如MySQL或者Oracle导入到Hadoop分布式文件系统(HDFS)上,然后数据在Hadoop MapReduce上转换,以及