RedCloud Help

Flink

概念透析

Flink为流式/批式处理应用程序的开发提供了不同级别的抽象

  • FlinkAPI最底层的抽象为有状态实时流处理。其抽象实现是Process Function,并且Process Function被Flink框架集成到了DataStream API中来为我们使用。它允许用户在应用长须中自由地处理来自单流或多流的事件(数据),并提供具有全局一致性和容错保障的状态。此外,用户可以在此层抽象中注册事件事件(event time)和处理事件(Processing time)回调方法,从而允许程序可以实现复杂计算。

  • Flink API第二层抽象是Core APIs。实际上,许多应用程序不需要使用上述最底层抽象的API,而是可以使用Core APIs进行变成:其中包含DataStream API(应用于有界/无界数据流场景)。Core APIs提供的流式API(Fluent API)为数据处理提供了通用的模块组件,例如各种形式的用户自定义转换(transformations)、联接(joins)、聚合(aggregations)、窗口(windows)和状态(state)操作等。此层API中处理的数据类型在每个编程语言中都有其对应的类。
    Process Function这类带超过抽象和DataStreamAPI的相互集成使得用户可以选择使用更底层的抽象API来实现自己的需求。DataSet API还额外提供了一些源语,比如循环/迭代(loop/iteration)操作。

  • Flink API第三层抽象是Table API。Table API是以表(Table)为中心的声明式边长(DSL)API,例如在流式数据场景下,它可以表示一张正在动态改变的表。Table API遵循(扩展)关系模型:即表拥有schema(类似于关系型数据库中的schema),并且Table API也提供了类似关系模型中的操作,比如select、project、join、group-by和aggregate等。Table API程序是以声明的方式定义应执行的逻辑操作,而不是确切地指定程序应该执行的代码。尽管Table API使用起来很简洁并且可以由各种类型的用户自定义函数扩展功能,但还是比Core API的表达能力差。此外,Table API程序在执行之前还会使用优化器中的优化规则对用户编写的表达式进行优化。
    表和DataStream/DataSet可以进行无缝切换,Flink允许用户在编写应用程序时将Table API与DataStream/DataSet API混合使用。

  • Flink API最顶层抽象是SQL。这层抽象在语义和程序表达式上都类似于Table API,带锯床其程序实现都是SQL查询表达式。SQL抽象与Table API抽象之间的关联是非常紧密的,并且SQL查询语句可以在TableAPI中定义的表上执行。

有状态流处理

什么是状态

数据流中的许多操作一次只查看一个单独的时间(例如时间解析器),而有些操作会记住多个时间的信息(例如窗口操作符)。这些操作被称为有状态操作。 有状态操作的一些示例

  • 当应用程序搜索某些事件模式时,状态将存储迄今为止遇到的时间序列。

  • 当按分钟/小时/天汇总事件时,状态会保存待汇总的事件。

  • 在通过数据流训练机器学习模型时,状态保存着当前版本的模型参数。

  • 当需要管理历史数据时,该状态允许高效访问过去发生的事件。

Flink需要了解状态,以便使用检查点和保存点实现容错。 有关状态的知识还允许重新扩展Flink应用程序,着意味着Flink会在并行实例间重新分配状态。 在处理状态时,阅读Flink的状态的后端可能也会有帮助。Flink提供了不同的状态后端,指定了状态的存储方式和位置。

键状态

秘钥状态保存在可视为嵌入式秘钥/值存储库中。状态与有状态操作符读取的数据流一起被严格分区和分发。因此,对键/值状态的访问只能在有键流中进行,即在有键/分区数据交换后进行,并且仅限于与当前事件的键相关联的值。对齐流和状态的键可确保所有状态更新都是本地操作,从而保证一致性,而不会产生事务开销。这种对齐还允许FLink以透明的方式重新分配状态和调整流分区。

image_1.png
键状态被进一步组织成所谓的秘钥组。键组是Flink重新分配键状态的的原子单元;键组的数量与定义的最大并行度相同。在执行过程中,键控运算符的每个并行实例都要处理一个或多个键组的键。

状态持久性

Flink采用流重播和检查点相结合的方式实现容错。检查点标记每个输入流中的一个特定点以及每个操作符的相应状态。通过恢复运算符的状态并从检查点开始重放记录,可以从检查点恢复流数据,同时保持一致性(精确一次处理语义)。 检查点间隔是在执行期间容错开销与恢复时间(需要重放的记录数量)之间进行权衡的一种手段。 容错机制可持续绘制分布式流数据的快照。对于状态较小的流式应用程序来读偶,这些快照非常轻量级,可以频繁绘制而不会对性能造成太大影响。流应用程序的状态存储在一颗可配置的位置,通常是分布式文件系统。 如果程序出现故障(由于机器、网络或软件故障),Flink会停止分布式流数据流。然后,系统会重启操作,并将其重置为最新的成功检查点。输入流被重置到状态快照点。作为重新启动的并行数据流的一部分而处理的任何记录,都保证不会影响之前的检查点状态。

Checkpointing检查点

Flink容错机制的核心部分是绘制分布式数据流和操作状态的一致快照。这些快照可作为一致的检查点,在发生故障时系统可以回退到这些快照。Flink绘制这些快照的机制在“Lightweight Asynchronous Snapshots for Distributed Dataflows”(分布式数据流的轻量级异步快照)疑问中有所描述。它的灵感来源于标准的分布式快照Chandy-Lamport)算法,并专门针对Flink的执行模型进行了定制。 请记住,所有与检查点有关的操作都可以异步完成。检查点障碍不会锁步移动,操作可以异步快照其状态。 自Flink1.11版起,检查点可以在对齐或不对齐的情况下进行。在本节中,我们将首先介绍对齐检查点。

Barriers障碍

Flink分布式快照的核心要素是流屏障。这些屏障被注入数据流,并作为数据流的一部分与记录一起流动。屏障永远不会超越记录,而是严格按照顺序流动。屏障将数据流中的记录分为当前快照的记录集和下一个快照的记录集。每个分隔符都带有快照的Id,快照中的记录会被推到分隔符前面。屏障不会中断数据流,因此非常轻便。来自不同快照的多个分隔符可以同时出现在数据流中,这意味着各种快照可能会同时发生。

image_2.png
流屏障在流源处注入并行数据流。快照n的屏障注入点(我们成为Sn)作为报告给检查点协调器(Flink的JobManager)。 然后,这些屏障流向下游。当中间操作符从其所有输入流中接受到快照n的屏障后,它就会向其所有输入流发送快照n的屏障。一旦一个汇操作符(流DAG的末端)从其所有输入流中接收到屏障n,它就会向检查点协调器确定快照n。所有汇操作在确认快照后,该快照即被视为已完成。 快照n完成后,任务将不在向源请求Sn之前的记录,因为此时这些记录(机器后代记录)以通过整个数据流拓扑。
image_3.png
接受多个输入流的操作员需要在快照屏障上对其输入流。上图对此进行了说明:

  • 一旦操作员从输入数据流中接受带出快照屏障n,就不能再处理该数据流中的任何记录,直到也从其他输入数据流中接受到屏障n。否则,它将把属于快照n的记录和属于快照n+1的记录混在一起。

  • 当最后一个数据流烧到第n个障碍后,操作符会发生所有待处理的传出记录,然后自己发出第n个障碍的快照。

  • 它会快照状态并回复处理所有输入数据流的记录,在处理数据流记录之前先处理输入缓冲区的记录。

  • 最后,操作员将状态异步写入状态后台。

需要注意的是,所有具有多个输入的运算符和经过洗牌的运算符在使用多个上游任务的输出流时都需要对齐。

Snapshotting Operator State操作员状态快照

当操作符包括任何形式的状态时,这种状态也必须是快照的一部分。 操作符从输入流中接收到所有快照屏障后,在向输入流中发送屏障前,会对其状态进行快照。此时,所有来自屏障前记录的状态更新都以完成,而依赖于屏障后记录的更新还未应用。由于快照的状态可能很大,因此会存储在一个配置的状态后端中。默认情况下,这个JobManager的内存,但在生产使用中应配置分布式可靠存储(如HDFS)。状态存储完成后,操作员会确定检查点,将快照屏障发送到输出流中,然后继续操作。 生成的快照现在包含

  • 对于每个并行数据流数据源,快照开始时在数据流中的偏移量/位置。

  • 对于每个操作符,都有一个指向作为快照一部分存储的状态的指针。

    image_5.png

Recovery恢复

这种机制下的恢复非常简单:故障发生时,Flink会选择最新完成的检查点k。然后,系统会重新部署整个分布式数据流,并为每个操作员提供作为检查点k一部分的快照状态。源被设置为从位置Sk开始读取流。例如,在Apache Kafka中,这意味着告诉消费者从偏移量Sk开始获取数据。 如果状态是以增量方式快照的,操作员会从最新完整快照的状态开始,然后对该状态应用一系列增量快照更新。 更多信息,请参阅重新启动策略。

Unaligned Checkpointing不对齐检查点

检查点也可以不对齐执行。其基本思想是,只要飞行中的数据成为操作员状态的一部分,检查点就可以超越所有飞行中的数据。 请注意,这种方法实际上更接近Chandy-Lamport算法,但Flink仍会在源中插入屏障,以避免检查点协调器超载。

image_6.png
图中描述了运算符如何处理未对齐的检查点障碍:

  • 运算器对其输入缓冲器中存储的did一个障碍做出反应。

  • 它会立即将障碍物添加到输出缓冲器的末端,从而将其转发给下游操作员。

  • 运算符会将所有被超越的记录标记为异步存储,并创建自身状态的快照。

因此,操作员只需短暂停止处理输入以标记缓冲区、转发屏障并创建其他状态的快照。 非对齐检查点可却从不障碍以最快速度到达汇。它尤其适用于至少有一条缓慢移动数据路径的应用,在这种情况下,对齐时间可能长达数小时。不过,由于它增加了额外的I/O压力,当状态后端的I/O成为瓶颈时,它就无能为力了。有关其他限制,请参阅操作中更深入的讨论。 请注意,保存点将始终对齐。

Unaligned Recovery 不对齐恢复

操作员首先恢复运行中的数据,然后才开始处理非对齐检查点来自上游操作员的任何数据。除此之外,它还执行与恢复对其检查点时相同的步骤。

State Backends状态后端

存储键/值索引的具体数据结构取决于所选的状态后端。一种状态后端将数据存储在内存中的哈希映射中,另一种状态后端则使用RocksDB作为键/值存储。出了定义保存状态的数据结构外,状态后端还实现了对键/值状态进行数据点快照的逻辑,并将快照作为检查点的一部分进行存储。状态后端可以在不改变应用程序逻辑的情况下进行配置。

image_7.png

Savepoints 保存点

所有使用检查点的程序都能从保存点恢复执行。保存允许更新程序和Flink集群,而不会丢失状态。 Savepoints是手动触发的检查点,它能获取程序快照并将其写入状态后端。为此,他们依赖常规的检查点机制。 保存点(Savepoints)与检查点(Checkpoints)类似,只是他们由用户触发,不会在新的检查点完成后自动失效。为了正确使用保存点,了解checkpoints与savepoints之间的区别非常重要,checkpoints与savepoints中对此进行了描述。

Exactly Once vs At Least Once一次或多次

对其步骤可能会增加流程序的延迟。通常情况下,这种额外的延迟只有几毫秒,但我们也看到过一些异常值的延迟明显增加的情况。对于需要所有记录都保持超低延迟(几毫秒)的应用程序,Flink提供了一个开关,可在检查点期间跳过流对齐。一旦操作员看到每个输入的检查点屏障,仍然绘制检查点快照。 当跳过对齐时,运算符会继续处理所有输入,即使在检查点n的某些检查点障碍到达后也是如此。这样,运算符也会处理检查点n的状态快照拍摄之前属于检查点n+1的元素。在还原时,这些记录将作为重复记录出现,因此它们都包含在检查点n的状态快照中,并将在检查点n后作为数据的一部分重放。

对齐只发生在有多个潜质操作(连接)和有多个发送方的操作(流重新分区/洗牌后)上。正因为如此,只有令人尴尬的并行流操作的数据流(map() ,flatMap(),filter(),....)即使在至少一次模式下,实际上也只能保证一次。

State and Fault Tolerance in Batch Programs 批处理程序中的状态和容错

在Batch ExecutionMode(批处理执行模式中),Flink将批处理程序作为流式程序的一种特例来执行,在这种模式下,流是有边界的(元素数量有限)。因此,上述概念适用于批处理程序的方式与适用于流式程序的方式相同,但有一些小的例外:

  • 批处理程序的容错不使用检查点。恢复是通过完全重放数据流来实现的。这是可能得,因为输入是有边界的。这使得会分手的成本更高,但由于避免了检查点,常规处理的成本更低。

  • 批量执行模式下的状态后端使用简化的内存/核外数据结构,而不是键/值索引。

及时流处理

介绍

及时流处理事有状态流处理的扩展,其中时间在计算中发挥一定作用。除此之外,当你进行时间序列分析、根据特定时间段(通常成为窗口)进行聚合时,或者当你事件发生事件很重要的事件处理时,就会出现这种情况。 在以下部分中,我们将重点介绍你在使用Flink应用程序时应考虑的一些主题。

Notions of Time:Event Time and Processing Time 时间概念:事件时间和处理时间

当在流程序中引用时间时(例如定义窗口),而昆虫引用不同的时间概念:

  • 处理时间:处理时间是指正在执行相应操作的是机器的系统时间。

  • 当流程序在处理时间上运行时,所有基于时间的操作(如时间窗口)将使用运行相应运算符的机器的系统时钟。每小时处理时间窗口将包括在系统时钟指示整点时间之间到达特定操作员的所有记录。例如,如果应用程序在上午9:15开始运行,则第一个每小时处理时间窗口将包括上午9:15到10:00之间处理的事件,下一个窗口将包括上午10:00到11:00之间处理的事件,以此类推。
    处理事件是最简单的时间概念,不需要流和机器之间的协调。它提供最佳性能和最低延迟。然而,在分布式和异步环境中,处理时间并不能提供确定性,因为它容易受到记录到达系统的速度(例如从消息队列)以及记录在系统内部运算符之间流动的速度的影响。以及停电(计划内或其他情况)。

  • 事件时间:事件时间是每个单独事件在其生成设备上发生的事件。该时间通常在记录进入Flink之前嵌入到记录中,并且可以从每个记录中提取该事件时间戳。在事件时间中,时间的进展取决于数据,而不是任何挂钟。事件时间程序必须指定如何生成事件时间水印,这是表示事件时间进度的机制。该水印机制将在下面的后面部分中描述。
    在完美的世界中,事件时间处理将产生完全一致和确定性的结果,无论事件何时到达或其顺序如何。但是,除非已知事件按顺序到达(按时间戳),否则事件时间处理在等待无序事件时产生一些延迟。由于只有等待有限的时间,这限制了事件时间应用程序的确定性。
    假设所有的数据均已达到,事件事件操作将按预期运行,即使在处理无序或延迟事件或重新处理历史数据时,也会产生正确且已知的结果。例如,每小时事件时间窗口将包含带有该小时内事件时间戳的所有记录,无论它们到达的顺序或处理时间如何。(有关详细信息,请参阅有关迟到的部分)。

请注意,有时当时间程序实时处理实时数据时,它们会使用一些处理时间操作,以保证它们及时处理。

image_8.png

Event Time And Watermarks 活动时间和水印

注意:Flink实现了数据流模型中的许多技术,有关事件时间和水印的详细介绍,请查看一下文章。

  • Streaming 101 by Tyler Akidau](https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101)

  • 数据流论文模型

支持事件时间的流处理器需要一种方法来测量事件时间的进度。例如,当事件时间超过一小时结束时,需要通知构建每小时窗口的窗口操作员,以便操作员可以关闭正在进行的窗口。

事件时间可以独立于处理时间(通过挂钟测量)进行。例如,在一个程序中,操作员的当前事件时间可以稍微落后于处理时间(考虑到接受事件的延迟),而两者都以相同的速度进行。另一方面,另一个流程序可能通过快速已经缓冲在Kafka主题(或另一个消息队列)中的一些历史数据,只需要几秒钟的处理就可以完成数周的事件时间。

下图显示了带有(逻辑)时间戳和内联流动水印的事件流。在此示例中,事件是按顺序排列的(相对于它们的时间戳),这意味着只是流中的周期性标记。

image_9.png
水印对于无序流至关重要,如下所示,其中事件不按时间戳排序。一般来说,水印是一种生命,表明在流是的该点,特定时间戳之前的所有事件都应该已到达。一旦水印到达操作员,操作员就可以将其内部事件时钟提前到水印的值。
image_10.png
请注意,新创建的流元素(或多个元素)从生成它们的事件或从触发这些元素创建的水印继承事件时间。

Watermarks in Parallel Streams 并行流中的水印

水印是在源函数处或直接在源函数之后生成的。源函数的每个并行子任务通常独立生成其水印。这些水印定义了该特定并行源的事件时间。 当水印流过流媒体程序时,它们会提前到达操作员的事件时间。每当操作符提前其事件时间时,它都会为其后续操作符下游生成一个新的水印。 一些运算符消耗多个输入流;例如,联合,或keyBy(...) 或 partition(...) 函数后面的运算符。此类运算符的当前事件时间使其输入流事件时间的最小值。当其输入流更新其事件时间时,运算符也会更新。 下面显示了流并行流的事件和水印以及跟踪事件时间的运算符的实例。

image_11.png

lateness 迟到

某些元素可能会违反水印条件,这意味着即使在Watermark(t)发生之后,也会出现更多时间戳t'<=t的元素。事实上,在许多现实世界的设置中,某些元素可以任意延迟,从而无法指定某个事件时间戳的所有元素都发生的时间。此外,即使可以限制延长,将水印延迟太多通常也是不可取的,因为这会导致事件时间窗口的评估出现太多延迟。 因此,流式传输程序可能会明确期望一些后期元素。迟到元素是在系统事件时钟(由水印指示)已经过了迟到元素时间戳的时间之后到达的元素。有关如何在事件时间窗口中使用延迟元素的更多信息,请参阅允许延迟。

Windowing 窗口化

聚合事件(例如计数、求和)在流上的工作方式与批处理中的工作方式不同。例如,不可能对流中的所有元素进行计数,因为流通常是无限的(无界)。相反,流上的聚合(计数、总和等)由窗口限定范围,例如“过去5分钟的计数”或“最后100个元素的总和”。 Windows可以是时间驱动的(例如:每30秒)或数据驱动的(例如:每100个元素)。人们通常会区分不同类型的窗口,例如翻滚窗口(无重叠)、滑动窗口(有重叠)和会话窗口(中间有不活动的间隙)。

image_12.png
请查看此博客文件获取其他窗口示例,或查看DataStreamAPI的窗口文档。

Flink是一个分布式系统而,需要有效分配和管理计算资源才能执行流应用程序。它继承了所有常见的集群资源管理器,例如Hadoop YARN,但也可以设置作为独立集群甚至库运行。 本节概述Flink架构,并且描述了其主要组件如何交互以执行应用程序和从故障中恢复。

Flink运行时两种类型的进程组成:一个JobManager和一个或者多个TaskManager。 Client不是运行时和程序执行的一部分,而是用于准备数据流将其发送给JobManager。之后,客户端可以断开链接(分离模式),或保持链接来接受进程报告(附加模式)。客户端可以作为触发执行Java/Scala程序的一部分运行,也可以在命令行进程./bin/flink run ...中运行。 可以通过多种方式启动JobManager和TaskManager:直接在机器上作为standalone集群启动、在容器中启动、或者通过YARN等资源框架管理并启动。TaskManager链接到JobManagers,宣布自己可用,并被分配工作。

JobManager

JobManager具有许多与协调Flink应用程序的分布式执行有关的职责:它决定何时调度下一个task(或一组task)、对完成的task或执行失败做出反应、协调checkpoint、并且协调从失败中恢复等等。这个进程由三个不同的组件组成:

  • ResourceManager ResourceManager负责Flink集群中的资源提供、回收、分配-它管理task slots,这是FLink集群中资源调度的单位(请参考TaskManager)。Flink为不同的环境和资源提供者(例如YARN、Kubernetes和standalone部署)实现了对应的ResourceManager。在standalone设置中,ResourceManager只能分配可用TaskManager的slots,而不能自行启动新的TaskManager。

  • Dispatcher Dispatcher提供一个REST接口,用来提交Flink应用程序执行,并为每个提交的作业启动一个新的JobMaster。它还运行Flink WebUI用来提供作业执行信息。

  • JobMaster JobMaster负责管理单个JobGraph的执行。Flink集群中可以同时运行多个作业,每个作业都有自己的Jobmaster。

始终至少有一个JobManager。高可用(HA)设置中可能有多个JobManager,其中一个始终是leader,其他的则是standby(请参考高可用(HA))。

TaskManagers

TaskManager(也称为worker)执行作业流的task,并且缓存和交换数据流。 必须始终至少有一个TaskManager。在TaskManager中资源调度的最小单位是task slot。TaskManager中task slot的数量表示并发处理task的数量。请注意一个task slot中可以执行多个算子(请参考Tasks和算子链)

Tasks和算子链

对于分布式执行,Flink将算子的subtasks连接成tasks。每个task由一个线程执行。将算子连接成task是个有用的优化:它减少线程间切换、缓冲的开销,并且减少延迟的同时增加吞吐量。链行为是可以配置的;请参考连文档以获取详细信息。 下图中样例数据流用5个subtask执行,因此有5个并行线程。

flink001.png

Task Slots和资源

每个worker(taskManager)都是一个JVM进程,可以在单独的线程中兴一个或多个subtask。为了控制一个TaskManager中接受多少个task,就有了所谓的task slots(至少一个)。 每个task slot代表TaskManager中资源的固定子集。例如,具有3个slot的TaskManager,会将其托管内存1/3用于每个slot。分配资源意味着subtask不会与其他作业的subtask竞争托管内存,而是具有一定数量的保留托管内存。注意此处没有CPU隔离;当前slot仅分离task的托管内存。 通过吊证task slot的数量,用户可以定义subtask如何互相隔离。每个TaskManager有一个slot,这意味着每个task组都在单独的JVM中运行(例如,可以在单独的容器中启动)。具有多个slot意味着更多subtask共享同一个JVM。同一个JVM中的task共享TCP链接(通过多路复用)和心跳信息。它们还可以共享数据集和数据结构,从而减少了每个task的开销。

flink002.png
默认情况下,Flink允许subtask共享slot,即便它们是不同的task的subtask,只要是来自同一作业即可。结果就是一颗slot可以持有整个作业管道。允许slot共享有两颗主要优点:

  • Flink集群所需的task slot和作业中使用的最大并行度恰好一样。无需计算程序总共包含多个task(具有不同并行度)。

  • 容易获得更好的资源利用。如果没有slot共享,非密集subtask(source/map() )将阻塞和密集型subtask(window)一样多的资源。通过slot共享,我们示例中的基本并行度从2增加到6,可以充分利用分配的资源,同时确保繁重的subtask在TaskManager之间公平分配。

    image.png

Flink应用歘是是从其main() 方法缠身的一个或多个FLink作业的任何用户程序。这些作业的执行可以在本地JVM(LocalEnvironment)中进行,或具有多台机器的集群的远程设置(RemoteEnvironment)中进行。对于每个程序,ExecutionEnvironment提供了一些方法来控制作业执行(例如设置并行度)并与外界交互(请参考Flink教程剖析)。 Flink应用歘是的作业可以被提交到长期运行的Flink Session集群、专用的Flink Job集群或Flink Application集群。这些选项之间的差异主要与集群的生命周期的资源隔离保证有关。

  • 集群生命周期:在Flink Session集群中暑,客户端链接到一个预先存在的、长期运行的集群,该集群可以介绍多个作业提交。即使所有作业完成后,集群(和JobManager)仍将继续运行知道手动停止session为止。因此,Flink Session集群的寿命不受任何Flink作业寿命的约束。

  • 资源隔离:TaskManager slot由ResourceManager在提交作业时分配,并在作业完成时释放。由于所有作业都共享同一集群,因此在集群资源方面存在一些急症-例如提交工作阶段的网络带宽。此共享设置的局限性在于,如果TaskManager崩溃,则在此TaskManager上运行task的所有作业都将失败;类似的,如果JobManager上发生一些致命错误,它将影响集群中正在运行的所有作业。

其他注意事项:拥有一颗预先存在的集群可以节省大量时间申请资源和启动TaskManager。有种场景很重要,作业执行时间短并且启动时间长会对端到端的用户体验产生负面影响-就像对简短查询的交互式分析一样,希望作业可以使用现有资源快速执行计算。

  • 集群生命周期:在Flink job集群中,可用的集群管理器(例如Yarn)用于为每个提交的作业启动一个集群,并且该集群仅可用于该作业。在这里,客户端首先在集群管理器请求资源启动JobManager,然后将作业提交给在这个进程中运行的Dispatcher。然后根据作业的资源请求惰性的分配TaskManager。一旦作业完成,Flink Job集群将被拆除。

  • 资源隔离:JobManager中的致命错误仅影响在Flink job集群中运行的一个作业。

  • 其他注意事项:由于ResourceManager必须应用并等待外部资源管理组件来启动TaskManager进程和分配资源,因此Flink Job集群更适合长期运行、具有高稳定性要求且对较长的启动时间不敏感的大型作业。

  • 集群生命周期:Flink Application 集群是专用的Flink集群,仅从Flink应用程序执行作业,并且main() 方法在集群上而不是客户端运行。提交作业是一个单步骤过程:无需先启动Flink集群,然后将作业提交到现有的session集群;相反,将应用程序逻辑和依赖打包成一个可执行的作业JAR中,并且集群入口(ApplicationClusterEntryPoint)负责调用main() 方法来提取JobGraph。例如,这允许你像在Kubernetes上部署任何其他应用程序一样部署Flink应用程序。因此,Flink Application集群的寿命与Flink应用程序的寿命相关。

  • 资源隔离:在Flink Application集群中,ResourceManager和Dispatcher作用于单个的Flink应用程序,相比于Flink Session集群,它提供了更好的隔离。

词汇表

Flink Application集群是专用的Flink Cluster,仅从Flink Application 执行Flink Jobs。Flink Cluster的寿命与Flink Application的寿命有关。

Flink Job集群是专用的Flink Cluster,仅执行一个Flink Job。Flink Cluster的寿命与Flink Job的寿命有关。

一般情况下,Flink集群是由一个Flink JobManager和一个或多个Flink TaskManager进程组成的分布式系统。

Event

Event是对应用程序建模的域的状态更改的声明。它可以同时为流或批处理应用程序的input和output,也可以单独是input或者output中的一种。Event是特殊类型的Record。

ExecutionGraph

Function

Function是由用户实现的,并封装了Flink程序的应用程序逻辑。大多数Function都由相应的Operator封装。

Instance

Instance常用于描述运行时的特定类型(通常是Operator或者Function)的一个具体实例,由于Apache Flink主要是用Java编写的,所以,这与Java中的Instance或Object定义相对应。在ApacheFlink的上下文中,parallel instance也常用于强调同一Operator或者Function的多个instance以并行的方式运行。

一个Flink应用程序是一个java应用程序,它从main()方法(或通过一些其他方式)提交一个或多个Flink Jobs。提交jobs通常是通过调用ExecutionEnvironment的execute()方法来完成的。 一个应用程序的作业可以提交给一个长期运行的Flink Session Cluster,或者提交一个专用的Flink Application Cluster,或提交到Flink Job Cluster。

Flink Job表示为runtime的logical graph(通常也称为数据流图),通过在Flink Application中调用execute()方法创建和提交。

JobGraph

Logical Graph

Flink JobManager是Flink Cluster的主节点。它包含三个不同的组件:Flink Resource Manager、Flink Dispatcher、运行每个Flink Job的Flink JobMaster。

JobMaster是在Flink JobManager运行中的组件之一。JobManager负责监督单个作业Task的执行。以前,整个Flink JobManager都叫做JobManager。

JobResultStore

JobResultStore是一个Flink组件,它将全局终止(已完成的,已取消的或失败的)作业的结果保存到文件系统中,从而使结果比已完成的作业更长久。这些结果然后被Flink用来确定作业是否应该在高可用集群中被恢复。

Logical Graph

逻辑图是一种有向图,其中顶点是算子,边定义算子的输入/输出关系,并对应数据流或数据集通过Flink Application提交作业来创建逻辑图。 逻辑图通常也成为数据流图。

Managed State

Managed State描述了已在框架中注册的应用程序的托管状态。对于托管状态,Apache Flink来负责持久化和重伸缩等事宜。

Operator

Logical Graph的节点。算子执行某种操作,该操作通常由Function执行。Source和Sink是数据输入和数据输出的特殊算子。

Operator Chain

算子链由两个或多个连续的Operator组成,两者之间没有任何的重新分区。同一算子链内的算子可以彼此直接传递record,而无需通过序列化或Flink的网格栈。

Partition

分区是整个数据流或数据集的独立子集。通过将每个Record分配给一个或多个分区,来把数据流或数据集划分为多个分区。在运行期间,Task会消费数据流或数据集的分区。改变数据流或数据集分区方式的转换通常成为重分区。

Physical Graph

Physical graph是一颗在分布式运行时,把Logical Graph 转换为可执行的结果。节点是Task,便表示数据流或数据集的输入/输出关系或partition。

Record

Record的数据集或数据流的组成元素。Operator和Function接受record作为输入,并将record作为输出发出。

长时间运行的Flink Cluster,它可以接受多个Flink Job的执行。此Flink Cluster的生命周期不受任何FLink Job生命周期的约束限制。以前,Flink Session Cluster也称为session mode的Flink Cluster,和FLink Application Cluster相对应。

State Backend

对于流处理程序,Flink Job的State Backend决定了其state是如何存储在每个TaskManager上的(TaskManager的Java堆栈或嵌入式RocksDB),以及它在checkpoint时的写入位置(Flink JobManager的java堆或者Fliesystem)。

Sub-Task

Sub-Task是负责处理数据流Partition的Task。“Sub-Task”强调的是同一个Operator或者Operator Chain具有多个并行的Task。

Task

Task是Physical Graph的节点。它是基本的工作单元,由Flink的runtime来执行。Task正好封装了一个Operator或者Operator Chain的parallel instance。

TaskManager是Flink Cluster的工作进程。Task被调度到TaskManager上执行。TaskManager相互通信,只为在后续的Task之间交换数据。

Transformation

Transformation应用于一个或多个数据流或数据集,并产生一个或多个输出数据流或数据集。Transformation可能会在每个记录的基础上更改数据流或数据集,但也可以只更改其分区或执行聚合。虽然Operator和Function是FlinkAPI的物理部分,但Transformation只是一个API概念。具体来说,大多数(但不是全部)Transformation是由某些Operator实现的。

UID

操作员的唯一标识符, 由用户提供或根据任务结构确定。提交申请时,该标识将转换为UID哈希值。

UID Hash

运行时操作符的唯一标识符,也称为“操作符ID“或”顶点ID“,由UID生成。它通常在日志、REST API或度量中暴露,最重要的是,它是保存点钟识别操作符的方式。

FlinkCEP是Flink上层实现的复杂事件处理库。它可以让你在无限事件流中检测出特定的事件模型,有机会掌握数据中重要的那部分。 本页讲述了FlinkCEP中可用的API,我们首先讲述模式API,它可以让你指定想在数据流中检测的模式,然后讲述如何检测匹配的事件序列并进行处理。再然后我们讲述Flink在按照事件时间处理迟到事件时的假设,以及如何从旧版本的Flink向1.13之后的版本迁移作业。

开始

如果你想现在开始尝试,创建一个Flink程序,添加FlinkCEP的依赖到项目的pom.xml文件中。

<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep</artifactId> <version>1.20-SNAPSHOT</version> </dependency>

现在可以开始使用PatternAPI写你的第一个CEP程序了

DataStream<Event> input = ...; Pattern<Event, ?> pattern = Pattern.<Event>begin("start") .where(SimpleCondition.of(event -> event.getId() == 42)) .next("middle") .subtype(SubEvent.class) .where(SimpleCondition.of(subEvent -> subEvent.getVolume() >= 10.0)) .followedBy("end") .where(SimpleCondition.of(event -> event.getName().equals("end"))); PatternStream<Event> patternStream = CEP.pattern(input, pattern); DataStream<Alert> result = patternStream.process( new PatternProcessFunction<Event, Alert>() { @Override public void processMatch( Map<String, List<Event>> pattern, Context ctx, Collector<Alert> out) throws Exception { out.collect(createAlertFrom(pattern)); } });

模式API

模式API可以让你定义想从输入流中抽取的复杂模式序列。 每个复杂的模式序列包括多个简单的模式,比如,寻找拥有相同属性事件的模式。从现在开始,我们把这些简单的模式成为模式,把我们在数据流中最终寻找的复杂模式序列称为模式序列,你可以把模式序列看做是这样的模式构成的图,这些模式基于用户指定的条件从一个模式转换到另外一个,比如event.getName().equals("end").一个匹配是输入事件的一个序列,这些事件通过一系列有效的模式转换,能够访问到复杂模式图中的所有模式。

单个模式

一个模式可以是一颗单利或者循环模式.单利模式值接受一个时间,循环模式可以介绍多个时间.在匹配表达式中,循环模式可以介绍多个事件.在模式匹配表达式中,模式"a b+ c ? d"(或者"a",后面跟着一个或者多个"b",在往后选择的跟着一个"c",最后跟着一个"d"),a,c?,和d都是单例模式,b+是一个循环模式,默认情况下,模式都单利的,你可以通过使用量词把它们转换成循环模式.每个模式可以有一颗或者多个条件来决定它接受那些事件.

量词

在FlinkCEP中,你可一通过这些方法指定循环模式:pattern.oneOrMore(),指定期望一个给定时间出现一次或者多次的模式(例如前面提到的b+模式);pattern.times(#ofTimes),指定期望一个给定事件出现特定次数的模式,例如出现4次a;pattern.times(#fromTimes,#toTimes),指定期望一个给定事件出现次数在一颗最小值和最大值中间的模式,比如出现2-4次a. 你可以使用pattern.greedy()分发让循环模式变成贪心的,单现在还不能让模式组贪心.你可以使用pattern.optional()方法让所有的模式变成可选的,不管是否是循环模式. 对一个命名为start的模式,以下量词是有效的:

// 期望出现4次 start.times(4); // 期望出现0或者4次 start.times(4).optional(); // 期望出现2、3或者4次 start.times(2, 4); // 期望出现2、3或者4次,并且尽可能的重复次数多 start.times(2, 4).greedy(); // 期望出现0、2、3或者4次 start.times(2, 4).optional(); // 期望出现0、2、3或者4次,并且尽可能的重复次数多 start.times(2, 4).optional().greedy(); // 期望出现1到多次 start.oneOrMore(); // 期望出现1到多次,并且尽可能的重复次数多 start.oneOrMore().greedy(); // 期望出现0到多次 start.oneOrMore().optional(); // 期望出现0到多次,并且尽可能的重复次数多 start.oneOrMore().optional().greedy(); // 期望出现2到多次 start.timesOrMore(2); // 期望出现2到多次,并且尽可能的重复次数多 start.timesOrMore(2).greedy(); // 期望出现0、2或多次 start.timesOrMore(2).optional(); // 期望出现0、2或多次,并且尽可能的重复次数多 start.timesOrMore(2).optional().greedy();

条件

对每个模式你可以指定一个条件来决定一个进来的事件是否被接受进入这个模式,例如,它的value字段应该大于5,或者大于前面接受的事件的平均值.指定判断事件属性的条件可以通过pattern.where(),pattern.or()或者pattern.until()方法.这些可以是IterativeCondition或者SimpleCondition. 迭代条件:这是最普遍的条件类型.使用它可以指定一个基于前面已经被接受的事件的属性或者他们的一个子集的统计数据来决定是否接受时间序列的条件. 下面是一个迭代条件的代码,它介绍"middle"模式下一个事件的名称开头是"foo",并且前面已经匹配到的事件加上这个事件的价格小于5.0.迭代条件非常强大,尤其是跟循环模式结合使用时.

middle.oneOrMore() .subtype(SubEvent.class) .where(new IterativeCondition<SubEvent>() { @Override public boolean filter(SubEvent value, Context<SubEvent> ctx) throws Exception { if (!value.getName().startsWith("foo")) { return false; } double sum = value.getPrice(); for (Event event : ctx.getEventsForPattern("middle")) { sum += event.getPrice(); } return Double.compare(sum, 5.0) < 0; } });

描述的上下文提供了获取事件时间属性的方法.更多细节可以看时间上下文. 简单条件:这种类型的条件扩展了前面提供的IterativeCondition类,它确定是否接受一个事件只取决于事件自身的属性.

start.where(SimpleCondition.of(value -> value.getName().startsWith("foo")));

最后,你可以通过pattern.subtype(subClass)方法限制接受的事件类型是初始事件的子类型.

start.subtype(SubEvent.class) .where(SimpleCondition.of(value -> ... /*一些判断条件*/));

组合条件:如上所示,你可以使用subtype条件和其他的条件结合起来使用.这适用于任何条件,你可以通过以此调用where()来组合条件.最终的结果是每个单一条件的结果是逻辑AND.如果想使用OR来组合条件,你可以像下面这样使用or()条件.

pattern .where(SimpleCondition.of(value -> ... /*一些判断条件*/)) .or(SimpleCondition.of(value -> ... /*一些判断条件*/));

停止条件:如果使用循环模式(oneOrMore()和oneOrMore().optional()),你可以指定一个停止条件,例如,接受事件的值大于5直到值的和小于50. 为了更好的理解它,看下面的例子.给定

  • 模式如"(a+ until b)"(一个或者更多的"a"直到"b")

  • 到来的事件序列"a1" "c" "a2" "b" "a3"

  • 输出结果会是: .

你可以看到 和由于停止条件 没有被输出.

where(condition)

为当前模式定义一个条件.为了匹配这个模式,一个事件必须满足某些条件.多个连续的where()语句取与组成判断条件.

pattern.where(new IterativeCondition<Event>(){ @Override public boolean filter(Event value,Context ctx)throws Exception{ return ... // 一些判断条件 } })
or(condition)

增加一个新的判断,和当前的判断取或.一个事件只要满足至少一个判断条件就匹配到模式.

pattern.where(new IterativeCondition<Event>(){ @verride public boolean filter(Event value,Context ctx)throws Exception{ return ...; //一些判断条件 } }).or(new IterativeCondition<Event>(){ @Override public boolean filter(Event value,Context ctx)throw Exception{ return ...; //替代条件 } })
util(condition)

为循环模式指定一个停止条件.意思是满足了给定的条件的事件出现后,就不会再有事件被接受进入模式了.只适合于和oneOrMore()同时使用.NOTE:在基于事件的条件中,它可用于清理对应模式的状态.

pattern.oneOrMore().until(new IterativeCondition<Event>() { @Override public boolean filter(Event value, Context ctx) throws Exception { return ...; // 替代条件 } });
subtype(subClass)

为当前模式定义一个子类型条件.一个事件只有是这个子类型的时候才能匹配到模式.

pattern.subtype(SubEvent.class);
oneOrMore()

指定模式期望匹配到的事件至少出现一次.默认(在子事件间)使用松散的内部连续性.关于内部连续性的更多消息可以参考连续性.推荐使用until()或者within()来清理状态.

pattern.oneOrMore()
timesOrMore(#times)

指定模式期望匹配到的事件至少出现#times次.默认(在子事件间)使用松散的内部连续性.关于内部连续性的更多信息可以参考连续性.

pattern.timesOrMore(2);
times(#ofTimes)

指定模式期望匹配到的事件正好出现的次数.默认(在子事件间)使用松散的内部连续性.关于内部连续性的更多信息可以参考连续性.

pattern.times(2);
times(#fromTimes,#toTImes)

指定模式期望匹配到的事件出现次数在#fromTimes和#toTimes之间.默认(在子事件间)使用松散的内部连续性.关于内部连续性的更多信息可以参阅连续性.

pattern.times(2,4);
optional()

指定这个模式是可选的,也就是说,它可能根本不出现.这对所有之前提到的量词都适用.

pattern.oneOrMore().optional();
greedy()

指定这个模式是贪心的,也就是说,它会重复尽可能多的次数.这只对量词使用,现在还不支持模式组.

pattern.oneOrMore().greedy();

组合模式

现在你已经看到单个的模式是什么样的了,该去看看如何把他们链接起起来组成一个完整的模式序列. 模式序列由一个初始模式作为开头,如下所示:

Pattern<Event,?>start = Pattern.<Event>begin("start");

接下来,你可以增加更多的模式到模式序列中并指定他们之间所需的连续条件.FlinkCEP支持事件之间如下形式的连续策略:

  1. 严格连续:期望所有匹配的事件严格的一个接一个出现,中间没有任何不匹配的事件.

  2. 松散连续: 忽略匹配的事件之间的不匹配的事件.

  3. 不确定的松散连续: 更进一步的松散连续,允许忽略掉一些匹配事件的附加匹配.

可以使用下面的方法来指定模式之间的连续策略:

  1. next(),指定严格连续.

  2. followedBy(),指定松散连续

  3. followedByAny(),指定不确定的松散连续

或者

  1. notNext(),如果不想后面直接连着一个特定事件

  2. notFollowedBy(),如果不想一个特定事件发生在两个事件之间的任何地方.

// 严格连续 Pattern<Event, ?> strict = start.next("middle").where(...); // 松散连续 Pattern<Event, ?> relaxed = start.followedBy("middle").where(...); // 不确定的松散连续 Pattern<Event, ?> nonDetermin = start.followedByAny("middle").where(...); // 严格连续的NOT模式 Pattern<Event, ?> strictNot = start.notNext("not").where(...); // 松散连续的NOT模式 Pattern<Event, ?> relaxedNot = start.notFollowedBy("not").where(...);

松散连续意味着跟着的事件中,只有第一个可匹配的事件会被匹配上,而不确定的松散连续情况下,有着同样起始的多个匹配会被输出.举例来说,模式"a b",给定事件序列"a","c","b1","b2",会产生如下的结果:

  1. "a"和"b"之间严格连续:{}(没有匹配),"a"之后的"c"导致"a"被丢弃.

  2. "a"和"b"之间松散连续:,松散连续会"跳过不匹配的事件知道匹配上的事件".

  3. "a"和"b"之间不确定的松散连续:,,这是最常见的情况.

也可以为模式定义一个有效时间约束.例如,你可以通过pattern.within()方法指定一个模式应该在10秒内发生.这种时间模式支持处理时间和事件时间.

next.within(Time.seconds(10));

注意定义过时间约束的模式允许以notFollowedBy()结尾.例如,可以定义如下的模式:

Pattern.<Event>begin("start") .next("middle") .where(SimpleCondition.of(value -> value.getName().equals("a"))) .notFollowedBy("end") .where(SimpleCondition.of(value -> value.getName().equals("b"))) .within(Time.seconds(10));
循环模式中的连续性

你可以在循环模式中使用和前面章节讲过的同样的连续性.连续性会被运用在被接受进入模式而事件之间.用这个例子来说明上面所说的连续性,一个模式序列"a b+c"("a"后面跟着一个或多个(不确定连续的)"b",然后跟着一个"c")输入为"a","b1","d1","b2","d2","b3","c",输出结果如下:

  1. 严格连续: , , - 没有相邻的 "b" 。

  2. 松散连续: ,,,,, - "d"都被忽略了。

  3. 不确定松散连续: ,,,,,, - 注意,这是因为"b"之间是不确定松散连续产生的。

对于循环模式(例如oneOrMore()和times()),默认是松散连续.如果想使用严格连续,你需要使用consecutive()方法明确指定,如果想使用不确定松散连续,你可以使用allowCombinations()方法.

03 May 2025