Apache Flink Client生成StreamGraph

概述

上文我们分析提交流程时,RemoteStreamEnvironment类的execute方法的第一步就是生成StreamGraph

StreamGraph是用于表示流的拓扑结构的数据结构,它包含了生成JobGraph的必要信息。它的类继承关系图如下:

StreamGraph-class-diagram

如果你按照StreamGraph的继承链向上追溯,最终会发现它实现了接口FlinkPlan。Flink在这里效仿的是数据库的执行SQL是产生执行计划的机制,FlinkPlan定义在Flink的优化器相关的包中,针对流应用的计划是StreamingPlan

针对Batch类的应用的计划类是OptimizedPlan。Flink会对Batch类的应用进行优化(这点我们后面会分析),而当前针对Streaming类的应用没有优化措施。

StreamGraph的形象化表示如下图:

Flink-StreamGraph

Flink官方提供了一个计划可视化器来图形化执行计划

节点和边

上面的图是由“节点”和“边”组成的。节点在Flink中对应的数据结构是StreamNode,而边在Flink中对应的数据结构是StreamEdgeStreamNodeStreamEdge之间存在着组合的依赖关系,依赖关系可见下图:

StreamNode-StreamEdge-relationship

StreamEdge包含了其连接的源节点sourceVertex和目的节点targetVertex,而StreamNode中包含了与其连接的入边集合inEdges和出边集合outEdgesStreamEdgeStreamNode都有唯一的编号进行标识,但是各自编号的生成规则并不相同。

StreamNode的编号id的生成是通过调用StreamTransformation的静态方法getNewNodeId获得的,其实现是一个静态计数器:

// This is used to assign a unique ID to every StreamTransformation
protected static Integer idCounter = 0;

public static int getNewNodeId() {   
    idCounter++;   
    return idCounter;
}

StreamEdge的编号edgeId是字符串类型,其生成的规则为:

this.edgeId = sourceVertex + "_" + targetVertex + "_" + typeNumber + "_" + selectedNames + "_" + outputPartitioner;

它是由多个段连接起来的,语义的文字表述如下:

源顶点_目的顶点_输入类型数量_输出选择器的名称_输出分区器

edgeId除了用来实现StreamEdge的hashCode及equals方法之外并没有其他实际意义。

StreamNode其实是表示operator的数据结构,了解这一点很重要。从Flink开始生成StreamGraph开始,source、sink都是图中的一个节点都是operator,都通过StreamNode这一数据结构来表示,我们常将它们单独拎出来讲是因为它们是流的的输入和输出,但在数据结构层面上它们是一致的。

StreamNode除了存储了输入端和输出端的StreamEdge集合,还封装了operator的其他关键属性,基于这不是我们关注的重点,所以不再赘述。

回过头来我们看JobGraph就不是那么难理解了。它包含了表述整个流拓扑的所有必要信息(比如所有的节点集合、所有的source集合、所有的sink集合、虚拟输出选择节点、虚拟分区节点)。同时还包含了大量操作这些信息的方法。

生成StreamGraph

了解了基础的数据结构之后,我们来分析如何生成JobGraph。定位到getStreamGraph的实现:


public StreamGraph getStreamGraph() {   
    if (transformations.size() <= 0) {      
        throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");   
    }   

    return StreamGraphGenerator.generate(this, transformations);
}

它依赖于transformations集合,该集合中存储着一个Streaming程序中所有的转换操作对应的StreamTransformation对象。

每当在DataStream对象上调用transform方法或者调用已经被实现了的一些转换操作(如map、flter等,这些转换操作在内部也调用了transform方法),这些调用都会被加入到transformations集合中。

StreamTransformation表示创建DataStream的操作,其实每个DataStream底层都对应着一个StreamTransformation。DataStream持有执行环境对象的引用,当调用transform方法时,它会调用执行环境对象的addOperator方法,将特定的StreamTransformation对象加入到transformations集合中去,这就是transformations集合中元素的来源。

到目前为止我们提到了多个名词,它们之前拥有着强依赖关系,为了避免混淆,我们以flatMap转换操作为例图示各种对象之间的构建关系:

Stream-Object-relationship

在源码中,其实Flink自身的命名也并不是那么准确,比如上图中的SingleOutputStreamOperator其实是一种DataStream,但却以Operator结尾,让人匪夷所思。这种情况下,鉴定它们类型的方式可以通过查看它们的继承链来进行识别。

StreamGraph的生成依赖于生成器StreamGraphGenerator,每调用一次静态方法generate才会在内部创建一个StreamGraphGenerator的实例,一个实例对应着一个StreamGraph对象。StreamGraphGenerator调用内部的实例方法generateInternal来遍历transformations集合的每个对象:


private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {   
    for (StreamTransformation<?> transformation: transformations) {
        transform(transformation);   
    }   

    return streamGraph;
}

transform方法中,它枚举了Flink中每一种转换类型,并对当前传入的转换类型进行判断,然后将其分发给特定的转换方法进行转换,最终返回当前StreamGraph对象中跟该转换有关的节点编号集合。

你可以将整个过程看作是玩拼图游戏,每遍历完一个转换对象,就离构建完整的StreamGraph更近一步。所有类型各异的转换操作各自持有整个StreamGraph的一部分小图片,根据不同的转换操作类型,它们为StreamGraph提供的“部件”并不完全相同,有的转换只构建节点(如SourceTransformation),有的转换除了构建节点还构建边(如SinkTransformation),有的只构建虚拟节点(如PartitionTransformationSplitTransformationSelectTransformation)。

关于虚拟节点,这里需要说明的是并非所有转换操作都具有实际的物理意义(即物理上对应operator)。有些转换操作只具有逻辑概念,例如unionsplitselectpartition。这些转换操作不会构建真实的StreamNode对象。比如某个流处理应用对应的转换树如下图:

StreamTransformation-demo

但在运行时,其生成的执行计划,这里也就等同于StreamGraph却是下图这种形式:

StreamGraph-demo

从图中可以看到,转换图中对应的一些逻辑操作在产生的执行计划时并不存在,Flink将这些逻辑转换操作转换成了虚拟节点,它们的信息会被绑定到从sourcemap转换的这条边上。

在给StreamGraph创建并添加一个operator时,需要给该operator指定slotSharingGroup,这时需要调用方法determineSlotSharingGroup来获得SlotSharingGroup的名称:

private String determineSlotSharingGroup(String specifiedGroup, Collection<Integer> inputIds) {   
    if (specifiedGroup != null) {      
        return specifiedGroup;   
    } else {      
        String inputGroup = null;      
        for (int id: inputIds) {         
            String inputGroupCandidate = streamGraph.getSlotSharingGroup(id);         
            if (inputGroup == null) {            
                inputGroup = inputGroupCandidate;         
            } else if (!inputGroup.equals(inputGroupCandidate)) {            
                return "default";         
            }      
        }      

        return inputGroup == null ? "default" : inputGroup;   
    }
}

当用户指定了组名,则直接使用用户指定的名称。如果用户没有指定特定的名称,则需要结合输入节点来做决定:第一种情况如果所有的输入节点都拥有相同的slotSharingGroup名称,那么就使用该组名;否则组名将被命名为default

Flink当前对于流处理的应用是不作优化的,所以其执行计划就是StreamGraph。Flink提供了一个执行计划的可视化器,它将客户端生成的执行计划以图形的方式展示出来,就像本节开始我们展示的那幅图就是可视化器生成的。那么我们怎么来查看我们自己编写的程序的执行计划呢?其实很简单,我们以Flink的flink-examples-streaming包中的SocketTextStreamWordCount为例,来看一下如何生成执行计划。

我们将SocketTextStreamWordCount最后一行代码注释掉:

env.execute("WordCount from SocketTextStream Example");

然后将其替换成下面这句:

System.out.println(env.getExecutionPlan());

这行语句的作用是打印当前这个程序的执行计划,它将在控制台产生该执行计划的JSON格式表示:

{"nodes":[{"id":1,"type":"Source: Socket Stream","pact":"Data Source","contents":"Source: Socket Stream",
"parallelism":1},{"id":2,"type":"Flat Map","pact":"Operator","contents":"Flat Map","parallelism":2,
"predecessors":[{"id":1,"ship_strategy":"REBALANCE","side":"second"}]},{"id":4,"type":"Keyed Aggregation",
"pact":"Operator","contents":"Keyed Aggregation","parallelism":2,"predecessors":[{"id":2,
"ship_strategy":"HASH","side":"second"}]},{"id":5,"type":"Sink: Unnamed","pact":"Data Sink",
"contents":"Sink: Unnamed","parallelism":2,"predecessors":[{"id":4,"ship_strategy":"FORWARD",
"side":"second"}]}]}System.out.println(env.getExecutionPlan());

把上面这段JSON复制到Flink的执行计划可视化器,点击下方的Draw按钮,即可生成。

小结

本文我们谈论了StreamGraph的数据结构以及StreamGraphGenerator如何生成StreamGraph。鉴于StreamEdgeStreamNode是组成StreamGraph不可或缺的部分,我们还对这两个数据结构进行了简单的分析。当然,StreamGraph还有一个关键的实例方法:getJobGraph,它用于获取流处理程序的JobGraph(该方法继承自StreamingPlan)。至于什么是JobGraph以及如何获取它,我们将在下文进行讨论。


微信扫码关注公众号:Apache_Flink

apache_flink_weichat


QQ扫码关注QQ群:Apache Flink学习交流群(123414680)

qrcode_for_apache_flink_qq_group

本页内容版权归属为原作者,如有侵犯您的权益,请通知我们删除。

zabbix wechat 报警 - 2016-07-24 22:07:29

监控在运维工作中是比不可少的一环,那伴随着监控也同时会涉及到告警机制,一般的监控到的结果是成功或者失败,如Ping不通、访问网页出错、连接不到Socket,发生时这些称之为故障,故障是最优先的告警。那针对于 zabbix 的告警可以有多种方式去做: zabbix三种常见报警介质: 短信:它的好处是不用联网手机有信号就行,但是需要有短信网关,需要花钱。 邮件:它也可以做到手机短信通知,基本现在邮箱都有这个功能(如果你使用的是移动的手机号,可以让zabbix将报警信息发送到139邮箱,再通过139绑定到手机号
J2EE进阶(十一)SSH框架整合常见问题汇总(二) 问题 8       java.lang.ClassCastException : java.lang.String cannot be cast to java.lang.Boolean      解决       数据库中userdetail表的映射文件如下,可见xb字段数据类型为boolean类型,而自己在userdetail模型类中定义的类型为String类型。为此可以得出这样的结论。模型类中的数据及类型必须与数据表映射文件中的字段信息保持一致
背景: 为了方便整体产品的发布,希望通过docker实现增量发布。大致的思路如下: is-there-a-way-to-add-only-changed-files-to-a-docker-image-as-a-new-layer-with 。本博文对这种方式进行了尝试,与此同时简单介绍如何通过Dockerfile来创建Docker镜像。 前期准备: 解决centos的网络问题 【can not find a valid baseurl for repo: base/7/x86_64】 ,使用dhclie

pcap文件的python解析实例 - 2016-07-24 19:07:52

最近一直在分析数据包。 同时也一直想学python。 凑一块儿了...于是,便开工了。座椅爆炸! 正文 首先要说的是,我知道python有很多解析pcap文件的库,这里不使用它们的原因是为了理解pcap文件的格式细节。使用tcpdump你可以很容易抓取到一系列的数据包,然而tcpdump并没有分析数据包的功能,如果想从这个抓包文件中分析出一些端倪,比如重传情况,你必须使用wireshark之类的软件,用wireshark打开tcpdump抓取的pcap文件,如果你看到了一堆堆的深红色(类似静脉血管里流出的猪

CPU和内存监测 - 2016-07-24 19:07:07

CPU和内存监测 vmstat命令的VM模式 vmstat可以监测给定时间间隔的服务器的状态值,包括CPU的使用率,内存的使用,虚拟内存的交换情况,IO读写情况。 主要从/proc/meminfo,/proc/stat和/proc/*/stat中获取数据 常用手段vmstat [采样的时间间隔秒数] [采样的次数],举例如下: [root@dtbase-master- 2 /root] #vmstat 5 10 procs -----------memory---------- ---swap-- ---
正文 为了弥补pcap文件的缺陷,让抓包文件可以容纳更多的信息,pcapng格式应运而生。关于它的介绍详见《 PCAP Next Generation Dump File Format 》         当前的wireshark/tshark抓取的包默认都被保存为pcapng格式。         形而上的论述就不多谈了,直接给出一个pcapng数据包文件的例子: 然后我强烈建议,对着《 PCAP Next Generation Dump File Format 》来把一个实际抓取的pcapng文件里面
PS:历史原因作者账号名为:ymh198816,但事实上作者的生日并不是1988年1月6日 今天作者要在这里通过一个简单的电商网站订单实时分析系统和大家一起梳理一下大数据环境下的实时分析系统的架构模型。当然这个架构模型只是实时分析技术的一 个简单的入门级架构,实际生产环境中的大数据实时分析技术还涉及到很多细节的处理, 比如使用Storm的ACK机制保证数据都能被正确处理, 集群的高可用架构, 消费数据时如何处理重复数据或者丢失数据等问题,根据不同的业务场景,对数据的可靠性要求以及系统的复杂度的要求也会不同
一 协议端口 如果把IP地址比作一间房子 ,端口就是出入这间房子的门。真正的房子只有几个门,但是一个IP地址的端口可以有65536(即:2^16)个之多!端口是通过端口号来标记的,端口号只有整数,范围是从0 到65535(2^16-1)。 在Internet上,各主机间通过TCP/IP协议发送和接收数据包,各个数据包根据其目的主机的ip地址来进行互联网络中的路由选择,把数据包顺利的传送到目的主机。大多数操作系统都支持多程序(进程)同时运行,那么目的主机应该把接收到的数据包传送给众多同时运行的进程中的哪一个
MyBatis真正的强大,在于其映射语句的魔力。 SQL 映射文件有很少的几个顶级元素(按照它们应该被定义的顺序): (1)cache  给定命名空间的配置缓存。 (2)cache-ref  其他命名空间缓存配置的引用。 (3)resultMap  是最复杂也是最强大的元素,用来描述如何从数据库结果集中来加载对象 (4)sql 可被其他语句引用的可重用语句块。 (5)insert 映射插入语句 (6)update  映射更新语句 (7)delete   映射删除语句 (8)select   映射查询语句
1、Maven构建Spring Boot 创建Maven Web工程,引入spring-boot-starter-parent依赖 project xmlns = "http://maven.apache.org/POM/4.0.0" xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation = "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/m