Flink流计算编程--状态与检查点

1、Exactly_once简介

Exactly_once语义是Flink的特性之一,那么Flink到底提供了什么层次的Excactly_once?有人说是是每个算子保证只处理一次,有人说是每条数据保证只处理一次。其实理解这个语义并不难,直接在官方文档中就可以看出:
这里写图片描述

从图中可以看出:Exactly_once是为有状态的计算准备的!

换句话说,没有状态的算子操作(operator),Flink无法也无需保证其只被处理Exactly_once!为什么无需呢?因为即使失败的情况下,无状态的operator(map、filter等)只需要数据重新计算一遍即可。例如:

dataStream.filter(_.isInNYC)

当机器、节点等失败时,只需从最近的一份快照开始,利用可重发的数据源重发一次数据即可,当数据经过filter算子时,全部重新算一次即可,根本不需要区分哪个数据被计算过,哪个数据没有被计算过,因为没有状态的算子只有输入和输出,没有状态可以保存。

2、Flink的恢复机制

Flink的失败恢复依赖于“检查点机制+可部分重发的数据源”。

2.1、检查点机制:检查点定期触发,产生快照,快照中记录了(1)当前检查点开始时数据源(例如Kafka)中消息的offset,(2)记录了所有有状态的operator当前的状态信息(例如sum中的数值)。

2.2、可部分重发的数据源:Flink选择最近完成的检查点K。然后系统重放整个分布式的数据流,然后给予每个operator他们在检查点k快照中的状态。数据源被设置为从位置Sk开始重新读取流。例如在Apache Kafka中,那意味着告诉消费者从偏移量Sk开始重新消费。

容错特性:
这里写图片描述

3、检查点与保存点

3.1、检查点

Flink的检查点机制实现了标准的Chandy-Lamport算法,并用来实现分布式快照。在分布式快照当中,有一个核心的元素:Barrier。

屏障作为数据流的一部分随着记录被注入到数据流中。屏障永远不会赶超通常的流记录,它会严格遵循顺序。屏障将数据流中的记录隔离成一系列的记录集合,并将一些集合中的数据加入到当前的快照中,而另一些数据加入到下一个快照中。每一个屏障携带着快照的ID,快照记录着ID并且将其放在快照数据的前面。屏障不会中断流处理,因此非常轻量级。来自不同快照的多个屏障可能同时出现在流中,这意味着多个快照可能并发地发生。

单流的barrier:
这里写图片描述
多流的barrier:
这里写图片描述

不止一个输入流的的operator需要在快照屏障上对齐(align)输入流。

在stream source中,流屏障被注入到并发数据流中。快照n被注入屏障的点(简称为Sn),是在source stream中的数据已被纳入该快照后的位置。例如,在Apache Kafka中,该位置将会是partition中最后一条记录的offset。这个Sn的位置将被报告给检查点协调器(Flink JobManager)。

屏障接下来会流向下游。当一个中间的operator从所有它的输入流中接收到一个来自快照n的屏障,它自身发射一个针对快照n的屏障到所有它的输出流。一旦一个sink operator(流DAG的终点)从它所有的输入流中接收到屏障n,它将会像检查点协调器应答快照n。在所有的sink应答该快照后,它才被认为是完成了。

程序中如何设置检查点?

val env = StreamExecutionEnvironment.getExecutionEnvironment()

// start a checkpoint every 1000 ms
env.enableCheckpointing(1000)

// advanced options:

// set mode to exactly-once (this is the default)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig.setCheckpointTimeout(60000)

// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

3.2、保存点

保存点本质上就是一次检查点,但它与检查点的不同在于:
(1)手动触发
(2)不会过期,除非用户明确的处理

先来看一张图:
这里写图片描述

保存点仅仅是一个指向检查点的指针;同时,其默认保存在JobManager的memory中,但为了高可用,建议保存到hdfs上。通入如下参数调整:

savepoints.state.backend: filesystem
savepoints.state.backend.fs.dir: hdfs:///flink/savepoints

保存点在什么时候使用?

(1)应用程序升级
(2)Flink版本升级
(3)系统升级或系统迁移
(4)程序的模拟仿真情况
(5)A/B测试

如何手动触发及恢复保存点?

CLI方式:

触发:

flink savepoint <JobID>

恢复:

flink run -s <pathToSavepoint> <jobJar> ...

4、状态简介

Flink流处理中的算子操作,是可以有状态的,这也是区别于其他流计算引擎的显著标志之一。

Flink提供了Exactly_once特性,是依赖于带有barrier的分布式快照+可部分重发的数据源功能实现的。而分布式快照中,就保存了operator的状态信息。

4.1、如何定义快照?
(1)使用window操作,基于EventTime、ProcessingTime、基于Count的窗口以及自定义的窗口。
(2)使用检查点接口,可以注册任何类型的java/scala对象。
(3)使用key/value状态接口,通过key来分区使用state。

4.2、重点说说如何使用基于key/value状态接口来定义state

既然是基于key/value的状态接口,那么这些状态只能用于keyedStream之上。keyedStream上的operator操作可以包含window或者map等算子操作。

key/value下可用的状态接口:
(1)ValueState : 状态保存的是一个值,可以通过update(T)来更新,T.value()获取。
(2)ListState : 状态保存的是一个列表,通过add(T)添加数据,Iterable.get获取。
(3)ReducingState : 状态保存的是一个经过聚合之后的值的列表,通过add(T)添加数据,通过指定的聚合方法获取。

通过创建一个StateDescriptor,可以得到一个包含特定名称的状态句柄,可以分别创建ValueStateDescriptor、 ListStateDescriptor或ReducingStateDescriptor状态句柄。

注意:状态是通过RuntimeContext来访问的,因此只能在RichFunction中访问状态。这就要求UDF时要继承Rich函数,例如RichMapFunction、RichFlatMapFunction等。

无状态的流与有状态的流的对比:
这里写图片描述

4.3、状态保存在哪里

状态终端用来对状态进行持久化存储,Flink支持多个状态终端:

(1)MemoryStateBackend
(2)FsStateBackend
(3)RocksDBStateBackend(第三方开发者实现)

五、带状态的operator例子

这里以flink-training上的例子作为样例:

keyBy之后是一个keyedStream,然后进行flatMap操作,转换为dataStream。定义状态就是在flatMap中实现。

.keyBy("rideId")
// compute the average speed of a ride
.flatMap(new SpeedComputer)

继承RichFlatMapFunction而非FlatMapFunction,此例中state是一个基于key/value接口的ValueState方法。而RichFlatMapFunction又继承了AbstractRichFunction,其中要覆写open方法;同时覆写RichFlatMapFunction中的flatMap方法。

class SpeedComputer extends RichFlatMapFunction[TaxiRide, (Long, Float)] {

    var state: ValueState[TaxiRide] = null

    override def open(config: Configuration): Unit = {
      state = getRuntimeContext.getState(new ValueStateDescriptor("ride", classOf[TaxiRide], null))
    }

    override def flatMap(ride: TaxiRide, out: Collector[(Long, Float)]): Unit = {

      if(state.value() == null) {
        // first ride
        state.update(ride)
      }
      else {
        // second ride
        val startEvent = if (ride.isStart) ride else state.value()
        val endEvent = if (ride.isStart) state.value() else ride

        val timeDiff = endEvent.time.getMillis - startEvent.time.getMillis
        val speed = if (timeDiff != 0) {
          (endEvent.travelDistance / timeDiff) * 60 * 60 * 1000
        } else {
          -1
        }
        // emit average speed
        out.collect( (startEvent.rideId, speed) )

        // clear state to free memory
        state.update(null)
      }
    }
  }

通过这个例子,可以知道如何在operator中实现state。

六、总结

最后说一下我对Flink中有状态的算子在恢复时是如何进行的:

假设场景Job:1个Source(Kafka)+1个不带state的operator+1个带state的operator+1个sink。
如果失败,则Flink选择最近的一份检查点开始恢复,检查点中记录了这次检查点开始时数据源(kafka)中对应的topic的offset,从offset开始重新发送数据,当数据流到1个不带operator的算子时,数据全部应用在这个算子上;接着数据流向1个带有operator的算子,由于快照中记录着这个operator的状态的值,因此,数据重新计算时只从记录着状态的值的地方开始计算,而不会从头开始计算,例如key0=2,那么只从key0=2开始计算。随后进行sink。由于失败时可能有些数据已经sink了,那么根据幂等性原则,即使中间输出的结果存在异常,但是重发之后再次sink是正确的,最终的结果还是正确的。

由于sink一般都是外围系统,因此sink的设计一般都没有状态,但是如果保证幂等性,最终的结果也没问题。
这里写图片描述

对Flink的快照以及状态的理解也许有不准确的地方,欢迎大家指出!

参考
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state_backends.html
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html
http://flink.apache.org/features.html
http://blog.csdn.net/yanghua_kobe/article/details/51714388
http://data-artisans.com/how-apache-flink-enables-new-streaming-applications/#more-608
https://yq.aliyun.com/articles/57828?spm=5176.100239.blogcont57826.5.noEJP1
http://mp.weixin.qq.com/s?__biz=MzI2MjE0MDUzNg==&mid=2652914398&idx=1&sn=9a72035a1ea208b096299684fe637dda&scene=1&srcid=0711fENhBenSH5wu7aOyWHPL&from=groupmessage&isappinstalled=0#wechat_redirect%20%E6%9D%8E%E5%91%88%E7%A5%A5

本页内容版权归属为原作者,如有侵犯您的权益,请通知我们删除。
简单工厂模式是指专门定义一个类来负责创建其他类的实例,被创建的实例通常都具有共同的父类。 从图中我们可以清楚的看到,该模式中主要包含下面3种 角色: 工厂(Creator)角色 它是工厂模式的核心,负责实现创建所有势力的内部逻辑。工厂类可以被外界直接调用,创建所需的产品的对象。 抽象(Product)角色 简单工厂模式所创建的所有对象的父类,负责描述所有实例所共有的公共接口。 具体产品(Concrete Product)角色 是该模式的创建目标,所有创建的对象都是充当这个角色的某个具体类的实例。一般来讲是

Mybatis学习第一天 - 2016-07-22 17:07:07

  Mybatis第一天 2      MyBatis介绍 MyBatis 本是apache的一个开源项目iBatis, 2010年这个项目由apache software foundation迁移到了google code,并且改名为MyBatis 。2013年11月迁移到Github。 MyBatis是一个优秀的持久层框架,它对jdbc的操作数据库的过程进行封装,使开发者只需要关注 SQL 本身,而不需要花费精力去处理例如注册驱动、创建connection、创建statement、手动设置参数、结果集

IP数据报的分片和组装过程 - 2016-07-22 17:07:07

        一份数据从一个主机通过以太网发送到里一个主机时,是要经过很多层路由转发的。其中过程相对比较的复杂,在这里我们要讨论的是IP在路由中转发时是以怎样的形式转发的和目的主机在接受到这写数据报时又是怎样处理的。        首先我们需要了解的是整个 IP数据报的格式 : IP的转发控制都是由IP数据报的头部决定的。在这里我们就不详细的讨论首部的所有字段,我们就讨论一下个分片有关的总长度字段。        在IP数据报中,总长度是16位的字段,依次数据报的最大长度为2^16-1=65535字节,

Java的纤程库 - Quasar - 2016-07-22 17:07:06

最近遇到的一个问题大概是微服务架构中经常会遇到的一个问题: 服务  A  是我们开发的系统,它的业务需要调用  B  、  C  、  D  等多个服务,这些服务是通过http的访问提供的。 问题是  B  、  C  、  D  这些服务都是第三方提供的,不能保证它们的响应时间,快的话十几毫秒,慢的话甚至1秒多,所以这些服务的Latency比较长。幸运地是这些服务都是集群部署的,容错率和并发支持都比较高,所以不担心它们的并发性能,唯一不爽的就是就是它们的Latency太高了。 系统A会从Client接收

操作系统知识点整理 - 2016-07-22 17:07:03

作业 用户在一次解题或一个事务处理过程中要求计算机系统所做工作的集合。它包括用户程序、所需要的数据及控制命令等。作业是由一系列有序的步骤组成的。 进程 一个程序在一个数据集合上的一次运行过程。所以一个程序在不同数据集合上运行,乃至一个程序在同样数据集合的多次运行都是不同的进程。 线程 线程是进程中的一个实体,是被系统独立调度和执行的基本单位。 进程和线程的区别 进程是程序的一次执行。线程可以理解为进程中执行的一段程序片段。 进程是独立的,这表现在内存空间、上下文环境上;线程运行在进程空间内。一般来讲(不适

Spring MVC Web简单入门实例 - 2016-07-22 17:07:52

本文通过一个简单的用户登录例子带你入门Spring MVC Web开发。 开发环境 1、STS 3.7.3(Spring Tool Suit), 下载 。STS其实是一个包装过的Eclipse,由Spring小组开发的,专门用于Spring项目的开发。老规矩,安装之前先要安装jdk,并配置好环境变量。 2、Tomcat 7, 下载Tomcat 7 。sts已经集成了一个叫Pivotal tc Server的web服务器,不过我们一般都使用Tomcat作为我们的Web服务器。 Tomcat配置 。 创建项目
1 简介 Ubuntu分区方案一般有下面三种: /boot 200M、/ 30G、/home 剩余全部空间、swap 8G / 剩余全部空间、swap 8G / 30G、/home 剩余全部空间、swap 8G 第一种分区方案是为了将/boot独立出来,防止操作系统无法启动,这样分的好处博主没体会到,好像/boot没什么用,而且把磁盘搞得支离破碎的,所以博主一般不用这种分法。 第二种分区方案是懒人方案,或者说新手方案,简单粗暴,对于装系统像家常便饭一样的博主来说,这样分区最快啦,毕竟在实体机上操作分区需要

7个变革DevOps的工具 - 2016-07-22 17:07:34

1. 简介 随着公司业务的不断迅速增长,使得管理复杂的IT基础设施需求变得更为艰难。解决应对这一复杂变幻的挑战的最佳方法是让开发团队和运维团队紧密协作,实现灵活应对。拥有一个DevOps专家团队可以实现在最少时间服务中断的情况下实现IT基础设施的动态伸缩。 DevOps团队执行各种任务, 如: 新虚拟机的配置 配置网络设备和服务器 应用程序部署 收集和聚合的日志 性能监视服务、网络和应用程序 报警和自动修复的问题 服务器和服务可用性监控 如果不使用正确的工具集来执行这些任务将会是一件即费时又费钱的事。某些
为了降低tomcat服务的压力,把页面上的图片采用windows版的nginx进行加载,由于有些图片比较大,加载特别的慢,所以在nginx中打开了gzip的压缩功能。加载图片的速度快了很多。 通过站长工具中的"网页GZIP压缩检测"工具检测图片的压缩率达到了69.53%,如下图: 下面介绍nginx.conf文件是怎么配置的: 1、打开nginx.conf配置文件 ; 2、找到#gzip on这句,如下图: 3.在把#gzip on 改成下面代码: #开启Gzip gzip on; #不压缩临界值,大于1
一、前言        最近开发程序的时候,出现数据库自增id跳数字情况,无奈之下dba遍查操作日志,没有delete记录。才开始慢慢来查询事物问题。多久以来欠下的账,今天该还给spring事物。 希望大家有所收获。2016年07月19日22:32:38 二、spring嵌套事物       1、展示项目代码--简单测springboot项目 整体项目就这么简单,为了方便。这里就只有biz层与service层,主要作为两层嵌套,大家只要看看大概就ok。后面会给出git项目地址,下载下来看一看就明白,力求最