本文来自 9 月 1 日在成都举行的 Apache Flink China Meetup,云邪的分享。

作者:云邪

整理:李泽聚(Flink China 社区志愿者)

校对:云邪 / 韩非(Flink China 社区志愿者)

Flink 介绍

Flink 是一款分布式的计算引擎,它可以用来做批处理,即处理静态的数据集、历史的数据集;也可以用来做流处理,即实时地处理一些实时数据流,实时地产生数据的结果;也可以用来做一些基于事件的应用,比如说滴滴通过 Flink CEP 实现实时监测用户及司机的行为流来判断用户或司机的行为是否正当。

总而言之,Flink 是一个 Stateful Computations Over Streams,即数据流上的有状态的计算。这里面有两个关键字,一个是 Streams,Flink 认为有界数据集是无界数据流的一种特例,所以说有界数据集也是一种数据流,事件流也是一种数据流。Everything is streams,即 Flink 可以用来处理任何的数据,可以支持批处理、流处理、AI、MachineLearning 等等。另外一个关键词是 Stateful,即有状态计算。有状态计算是最近几年来越来越被用户需求的一个功能。举例说明状态的含义,比如说一个网站一天内访问 UV 数,那么这个 UV 数便为状态。Flink 提供了内置的对状态的一致性的处理,即如果任务发生了 Failover,其状态不会丢失、不会被多算少算,同时提供了非常高的性能。

那 Flink 的受欢迎离不开它身上还有很多的标签,其中包括性能优秀(尤其在流计算领域)、高可扩展性、支持容错,是一种纯内存式的一个计算引擎,做了内存管理方面的大量优化,另外也支持 eventime 的处理、支持超大状态的 Job(在阿里巴巴中作业的 state 大小超过 TB 的是非常常见的)、支持 exactly-once 的处理。

Flink 基石

Flink 之所以能这么流行,离不开它最重要的四个基石:Checkpoint、State、Time、Window。

首先是 Checkpoint 机制,这是 Flink 最重要的一个特性。Flink 基于 Chandy-Lamport 算法实现了一个分布式的一致性的快照,从而提供了一致性的语义。Chandy-Lamport 算法实际上在 1985 年的时候已经被提出来,但并没有被很广泛的应用,而 Flink 则把这个算法发扬光大了。Spark 最近在实现 Continue streaming,Continue streaming 的目的是为了降低它处理的延时,其也需要提供这种一致性的语义,最终采用 Chandy-Lamport 这个算法,说明 Chandy-Lamport 算法在业界得到了一定的肯定。

提供了一致性的语义之后,Flink 为了让用户在编程时能够更轻松、更容易地去管理状态,还提供了一套非常简单明了的 State API,包括里面的有 ValueState、ListState、MapState,近期添加了 BroadcastState,使用 State API 能够自动享受到这种一致性的语义。

除此之外,Flink 还实现了 Watermark 的机制,能够支持基于事件的时间的处理,或者说基于系统时间的处理,能够容忍数据的延时、容忍数据的迟到、容忍乱序的数据

另外流计算中一般在对流数据进行操作之前都会先进行开窗,即基于一个什么样的窗口上做这个计算。Flink 提供了开箱即用的各种窗口,比如滑动窗口、滚动窗口、会话窗口以及非常灵活的自定义的窗口。

Flink API

Flink 分层 API 主要有三层,如下图:

最底层是 ProcessFunction,它能够提供非常灵活的功能,它能够访问各种各样的 State,用来注册一些 timer,利用 timer 回调的机制能够实现一些基于事件驱动的一些应用。

之上是 DataStream API,最上层是 SQL/Table API 的一种 High-level API。

Flink 的用途

Flink 能用来做什么?回顾一下 Flink up 前几站的分享,有非常多的嘉宾分享了他们在自己公司里面基于 Flink 做的一些实践,包括携程、唯品会、饿了么、滴滴、头条等等。他们的应用场景包括实时的机器学习,实时的统计分析,实时的异常监测等等。这些实践案例的共同点就是都用来做实时性的任务。

Flink Title 的变化

早期 Flink 是这样介绍自己的:“我是一个开源的流批统一的计算引擎”,当时跟 Spark 有点类似。后来 Spark 改成了一长串的文字,里面有各种各样的形容词:“我是一个分布式的、高性能的、高可用的、高精确的流计算系统”。最近 Spark 又进行了修改:“我是一个数据流上的有状态的计算”。

通过观察这个变化,可以发现 Flink 社区重心的变迁,即社区现在主要精力是放在打造它的流计算引擎上。先在流计算领域扎根,领先其他对手几年,然后借助社区的力量壮大社区,再借助社区的力量扩展它的生态。

阿里巴巴 Flink 是这样介绍自己的:“Flink 是一个大数据量处理的统一的引擎”。这个“统一的引擎”包括流处理、批处理、AI、MachineLearning、图计算等等。

Flink 过去与现在

Flink High-Level API 的历史变迁

在 Flink 1.0.0 时期,Table API 和 CEP 这两个框架被首次加入到仓库里面,同时社区对于 SQL 的需求很大。SQL 和 Table API 非常相近,都是一种处理结构化数据的一种 High-Level 语言,实现上可以共用很多内容。所以在 1.1.0 里面,社区基于 Apache Calcite 对整个非 Table 的 Module 做了重大的重构,使得 Table API 和 SQL 共用了大部分的代码,同时进行了支持。

在 Flink 1.2.0 时期,在 Table API 和 SQL 上支持 Tumbling Window、Sliding Window、Session Window 这些窗口。

在 Flink 1.3.0 时期,首次引用了 Dynamic Table 这个概念,借助 Dynamic Table,流和批之间是可以相互进行转换的。流可以是一张表,表也可以是一张流,这是流批统一的基础之一。Retraction 机制是 Dynamic Table 最重要的一个功能,基于 Retraction 才能够正确地实现多级 Application、多级 Join,才能够保证语意与结果的一个正确性。同时该版本支持了 CEP 算子的可控性。

在 Flink 1.5.0 时期,支持了 Join 操作,包括 window Join 以及非 window Join,还添加了 SQL CLI 支持。SQL CLI 提供了一个类似 shell 命令的对话框,可以交互式执行查询。

Flink API 的历史变迁

在 Flink 1.0.0 时期,加入了 State API,即 ValueState、ReducingState、ListState 等等。State API 主要方便了 DataStream 用户,使其能够更加容易地管理状态。

在 Flink 1.1.0 时期,提供了对 SessionWindow 以及迟到数据处理的支持。

在 Flink 1.2.0 时期,提供了 ProcessFunction,一个 Low-level 的 API。基于 ProcessFunction 用户可以比较灵活地实现基于事件的一些应用。

在 Flink 1.3.0 时期,提供了 Side outputs 功能。一般算子的输出只有一种输出的类型,但是有些时候可能需要输出另外的类型,比如把一些异常数据、迟到数据以侧边流的形式进行输出,并交给异常节点进行下一步处理,这就是 Side outputs。

在 Flink 1.5.0 时期,加入了 BroadcastState。BroadcastState 用来存储上游被广播过来的数据,这个节点上的很多 N 个并发上存在的 BroadcastState 里面的数据都是一模一样的,因为它是从上游广播来的。基于这种 State 可以比较好地去解决不等值 Join 这种场景。比如一个 Query 里面写的“SLECECT * FROM L JOIN R WHERE L.a > R.b”,也就是说我们需要把左表和右表里面所有 A 大于 B 的数据都关联输出出来。在以前的实现中,由于没有 Join 等值条件,就无法按照等值条件来做 KeyBy 的 Shuffle,只能够将所有的数据全部汇集到一个节点上,一个单并发的节点上进行处理,而这个单并发的节点就会成为整个 Job 的瓶颈。而有了 BroadcastState 以后就可以做一些优化:因为左表数据量比较大,右表数据量比较小,所以选择把右表进行广播,把左表按照它某一个进行均匀分布的 key,做 keyby shuffle,shuffle 到下游的 N 个 Join 的节点,Join 的节点里面会存两份 State,左边 state 和右边 state,左边 state 用来存左边数据流的 state,是一个 keyedState,因为它是按它某一个 key 做 keyby 分发下来的。右边 State 是一个 BroadcastState,所有的 Join 节点里面的 BroadcastState 里面存的数据都是一模一样的,因为均为从上游广播而来。所有 keyedState 进行并发处理,之后将 keyedState 集合进行合并便等于左边数据流的全集处理结果。于是便实现了这个 Join 节点的可扩充,通过增加 join 节点的并发,可以比较好地提升 Job 处理能力。除了不等值 Join 场景,BroadcastState 还可以比较有效地解决像 CAP 上的动态规则。

在 Flink 1.6.0 时期,提供了 State TTL 参数、DataStream Interval Join 功能。State TTL 实现了在申请某个 State 时候可以在指定一个 TTL 参数,指定该 state 过了多久之后需要被系统自动清除。在这个版本之前,如果用户想要实现这种状态清理操作需要使用 ProcessFunction 注册一个 Timer,然后利用 Timer 的回调手动把这个 State 清除。从该版本开始,Flink 框架可以基于 TTL 原生地解决这件事情。DataStream Interval Join 功能即含有区间间隔的 Join,比如说左流 Join 右流前后几分钟之内的数据,这种叫做 Interval Join。

Flink Checkpoint & Recovery 的历史变迁

Checkpoint 机制在 Flink 很早期的时候就已经支持,是 Flink 一个很核心的功能,Flink 社区也一直致力于努力把 Checkpoint 效率提升,以及换成 FailOver 之后它的 Recallable 效率的提升。

在 Flink 1.0.0 时期,提供了 RocksDB 的支持,这个版本之前所有的状态都只能存在进程的内存里面,这个内存总有存不下的一天,如果存不下则会发生 OOM。如果想要存更多数据、更大量 State 就要用到 RocksDB。RocksDB 是一款基于文件的嵌入式数据库,它会把数据存到磁盘,但是同时它又提供高效读写能力。所以使用 RocksDB 不会发生 OOM 这种事情。在 Flink1.1.0 里面,提供了纯异步化的 RocksDB 的 snapshot。以前版本在做 RocksDB 的 snapshot 时它会同步阻塞主数据流的处理,很影响吞吐量,即每当 checkpoint 时主数据流就会卡住。纯异步化处理之后不会卡住数据流,于是吞吐量也得到了提升。

在 Flink 1.2.0 时期,引入了 Rescalable keys 和 operate state 的概念,它支持了一个 Key State 的可扩充以及 operator state 的可扩充。 在 Flink 1.3.0 时期,引入了增量的 checkpoint 这个比较重要的功能。只有基于增量的 checkpoint 才能更好地支持含有超大 State 的 Job。在阿里内部,这种上 TB 的 State 是非常常见。如果每一次都把全量上 TB 的 State 都刷到远程的 HDFS 上那么这个效率是很低下的。而增量 checkpoint 只是把 checkpoint 间隔新增的那些状态发到远程做存储,每一次 checkpoint 发的数据就少了很多,效率得到提高。在这个版本里面还引入了一个细粒度的 recovery,细粒度的 recovery 在做恢复的时候,有时不需要对整个 Job 做恢复,可能只需要恢复这个 Job 中的某一个子图,这样便能够提高恢复效率。

在 Flink 1.5.0 时期,引入了 Task local 的 State 的 recovery。因为基于 checkpoint 机制,会把 State 持久化地存储到某一个远程存储,比如 HDFS,当发生 Failover 的时候需要重新把这个数据从远程 HDFS 再 download 下来,如果这个状态特别大那么该 download 操作的过程就会很漫长,导致 Failover 恢复所花的时间会很长。Task local state recovery 提供的机制是当 Job 发生 Failover 之后,能够保证该 Job 状态在本地不会丢失,进行恢复时只需在本地直接恢复,不需从远程 HDFS 重新把状态 download 下来,于是就提升了 Failover recovery 的效率。

Flink Runtime 的历史变迁

Runtime 的变迁历史是非常重要的。

在 Flink 1.2.0 时期,提供了 Async I/O 功能。如果任务内部需要频繁地跟外部存储做查询访问,比如说查询一个 HBase 表,在该版本之前每次查询的操作都是阻塞的,会频繁地被 I/O 的请求卡住。当加入异步 I/O 之后就可以同时地发起 N 个异步查询的请求,这样便提升了整个 job 的吞吐量,同时 Async I/O 又能够保证该 job 的 Async 语义。

在 Flink 1.3.0 时期,引入了 HistoryServer 的模块。HistoryServer 主要功能是当 job 结束以后,它会把 job 的状态以及信息都进行归档,方便后续开发人员做一些深入排查。

在 Flink 1.4.0 时期,提供了端到端的 exactly once 的语义保证,Flink 中所谓 exactly once 一般是指 Flink 引擎本身的 exactly once。如果要做到从输入到处理再到输出,整个端到端整体的 exactly once 的话,它需要输出组件具备 commit 功能。在 kafka 老版本中不存在 commit 功能,从最近的 1.1 开始有了这个功能,于是 Flink 很快便实现了端到端 exactly once。

在 Flink 1.5.0 时期,Flink 首次对外正式地提到新的部署模型和处理模型。新的模型开发工作已经持续了很久,在阿里巴巴内部这个新的处理模型也已经运行了有两年以上,该模型的实现对 Flink 内部代码改动量特别大,可以说是自 Flink 项目建立以来,Runtime 改动最大的一个改进。简而言之,它的一个特性就是它可以使得在使用 YARN、Mesos 这种调度系统时,可以更加更好地动态分配资源、动态释放资源、提高资源利用性,还有提供更好的 jobs 之间的隔离。最后是在这个版本中,Flink 对其网络站进行了一个基本重构。

Flink 网络栈重构

在流计算中有两个用来衡量性能的指标:延迟和吞吐。一般来讲如果想要更高吞吐就要牺牲一些延迟,如果想要更低的延迟就要牺牲一定的吞吐。但是网络栈的重构却实现了延迟和吞吐的同时提升,这主要得益于它两方面的工作:第一个是基于信用的流控,另一个是基于事件的 I/O。一个用来提高它的吞吐,另一个用来降低它的延迟。

在介绍流控之前需要先介绍一下现有的网络栈。Flink 中 TaskManager 就是用来管理各个 task 的角色,它是以进程为单位;task 用来执行用户代码,以线程为单位。当 tasks 之间有数据传输的交互的时候就要建立网络的连接,如果 2 秒之间都建立一个 TCP 连接的话,那么这个 TCP 连接会被严重浪费,所以 Flink 在两个 TaskManager 之间建立一个 TCP 连接,即两个进程之间只存在一个连接。各个 task 之间以 TCP channel 的方式来共享 TCP 的连接,这样整个 job 中就不会有太多的 TCP 连接。

Flink 反压

反压的意思是当某一个 task 的处理性能跟不上输入速率的时候,其输入端的 Buffer 就会被填满,当输入端 Buffer 被填满的时候就会导致 TCP 的读取被暂停。TCP 的读取被暂停之后,就会导致上游输出端的 Buffer 池越积越多,因为下游此时已经不再进行消费。当上游输出端的 Buffer 池也堆满的时候, TCP 通道就会被关闭,其内部所有的 TCP channel 也会被关闭。从而上游 task 就会逐级的向上游进行反压,这是整体的反压流程,所以说 Flink 以前的反压机制是比较原生态、比较粗暴的,因为其控制力度很大,整个 TCP 中一旦某一个 Task 性能跟不上,就会把整个 TCP 连接关掉。如下图所示:

右下角的 task 虽然处理跟不上了,但上面的 task 仍然可以继续进行处理。左边这些上游数据可以继续发给右上角的 task 进行处理。但是由于现在整个的 TCP 连接都被关闭,导致右上角 task 同样收不到数据,整体吞吐量实际上是下降的趋势。为了优化这个功能就需要做到更加细密度的流控,目前是关闭整个 TCP 连接,优化措施就是需要对 TCP channel 进行控制,当某个 task 处理不过来时只需要该 Task 对应的 TCP channel,其它 TCP channel 不受影响。优化实现方式就是基于信用的流控。

基于信用的流控的核心思想就是基于信用额度的消费。比如银行做贷款,为了防止坏账太多,它会对每一个人评估其信用额度,当发放贷款时贷款不会超过这个人能承受的额度。基于这种方式,它能够一方面不会产生太多坏账,另一方面可以充分地把银行的资金利用起来。基于信用的流控就是基于这种思想,Flink 中所谓的信用额度,就是指这个下游消费端的可用的 Buffer 数。如下图:

该图左边是指发送端,有四个输出的队列,每个队列里面的方块代表输出 Buffer,即准备丢给下游处理的 Buffer。右边是消费端,消费端也有四个队列,这四个队列里面也有一些 Buffer 块,这些 Buffer 块是空闲的 Buffer,准备用来接收上游发给自己的数据。

上面提到基于数据的流控中所谓的信用就是指这个消费端它可用的 Buffer 数,代表当前还能够消费多少数据,消费端首先会向上游反馈当前的信用是多少, producer 端只会向信用额度大于 0 的下游进行发送,对于信用额度如果为 0 的就不再发送数据。这样整个网络的利用率便得到了很大的提升,不会发生某些 Buffer 被长时间的停留在网络的链路上的情况。基于信用的流控主要有以下两方面的优化提升:一个是当某一个 task 发生反压处理跟不上的时候,不会发生所有的 task 都卡住,这种做法使吞吐量得到了很大的提升,在阿里内部用双 11 大屏作业进行测试,这种新的流控算法会得到 20% 的提升;另一个是基于事件的 I/O,Flink 在网络端写数据时会先往一个 Buffer 块里面写数据,这个 Buffer 块是一个 32K 的长度的单位,即 32K 的大小,当这个 Buffer 块被填满的时候就会输出到网络里面,或者如果数据流比较慢,没办法很快填满的话,那么会等待一个超时,默认一个 100 毫秒,即如果 100 毫秒内还没被填满那么这个 Buffer 也会被输出到网络里面。此时若是在以前版本中 Flink 延迟可能是在 100 毫秒以内,最差的情况下是到 100 毫秒,因为需要到 100 毫秒等这个 Buffer 发出去。如果要得到更低的延时,现在的做法就会将这个 Buffer 直接加入到输出的队列,但是还是保持继续往这个 Buffer 块里面写数据,当网络里面有容量时这个 Buffer 块便会立刻被发出去,如果网络现在也比较繁忙,那就继续填充这个 Buffer,这样吞吐也会比较好一点。基于这种算法,Flink 的延时几乎是完美的,可以看到它的曲线基本上是低于 10 毫秒的,这也充分利用了网络的容量,几乎对吞吐没有影响。

作者简介

活动预告

11 月 4 日,Apache Flink China 社区第二季 Meetup 巡展开启。

来自阿里、汇智、菜鸟、袋鼠云、有赞的技术专家,将为你展现:

  1. 如何扩展 Flink SQL 实现流与维表的 join
  2. 如何通过平台提高运维的效率,降低调优的成本
  3. Flink 批处理与 ML 的尝试
  4. Apache RocketMQ 与 Apache Flink 的集成 …………

报名传送门:www.huodongxing.com/event/14632…

感谢    赞同    分享    收藏    关注    反对    举报    ...