Flink状态
1. 通俗理解什么是状态?
状态被称作state,在Flink中,是用来 保存 中间的 计算结果 或者 缓存数据。 根据状态是否需要保存中间结果,分为 无状态计算 和 有状态计算。 对于流计算而言,事件持续产生,如果每次计算相互独立,不依赖上下游的事件,则相同输入,可以得到相同输出,是无状态计算。 如果计算需要依赖于之前或者后续事件,则被称为有状态计算。有状态计算如 sum求和,数据累加等。2. Flink状态类型包含哪些?
(1) 按照由 Flink管理 还是 用户管理,状态可以分为 原始状态(Raw State)和 托管状态(ManagedState)
- 托管状态(ManagedState):由Flink 自行进行管理的State。
- 原始状态(Raw State):由用户自行进行管理。
从状态管理方式的方式来说,Managed State 由Flink Runtime 管理,自动存储,自动恢复,在内存管理上有优化;而 Raw State 需要用户自己管理,需要自己序列化,Flink 不知道 State 中存入的数据是什么结构,只有用户自己知道,需要最终序列化为可存储的数据结构。
从状态数据结构来说,Managed State 支持已知的数据结构,如Value、List、Map等。而 Raw State 只支持 **字节 **数组,所有状态都要转换为二进制字节数组才可以。
从推荐使用场景来说,Managed State 大多数情况下均可使用,而Raw State 是当 Managed State 不够用时,比如需要自定义Operator 时,才会使用 Raw State。在实际生产过程中,只推荐使用 Managed State 。
(2)State 按照是否有 key 划分为 KeyedState 和 OperatorState 两种。
keyedState特点:
只能用在keyedStream上的算子中,状态跟特定的key绑定。
keyStream流上的每一个key 对应一个state 对象。若一个operator 实例处理多个key,访问相应的多个State,可对应多个state。
keyedState 保存在StateBackend中
通过RuntimeContext访问,实现Rich Function接口。
支持多种数据结构:ValueState、ListState、ReducingState、AggregatingState、MapState.
OperatorState特点:
- 可以用于所有算子,但整个算子只对应一个state。
2.并发改变时有多种重新分配的方式可选:均匀分配;
实现CheckpointedFunction或者 ListCheckpointed 接口。
目前只支持 ListState数据结构。
这里的fromElements会调用FromElementsFunction的类,其中就使用了类型为List state 的 operator state
3. Flink广播状态了解吗?
Flink中,广播状态中叫作 BroadcastState。 在广播状态模式中使用。所谓广播状态模式, 就是来自一个流的数据需要被广播到所有下游任务,在算子本地存储,在处理另一个流的时候依赖于广播的数据.下面以一个示例来说明广播状态模式。4. Flink状态接口包含哪些?
在Flink中使用状态,包含两种状态接口: (1)状态操作接口:使用状态对象本身存储,写入、更新数据。(2)状态访问接口:从StateBackend获取状态对象本身。
状态操作接口
Flink 中的 状态操作接口 面向两类用户,即 应用开发者 和 Flink 框架本身。 所有Flink设计了两套接口
1、面向开发者State接口
面向开发的State接口只提供了对State中数据的增删改基本操作接口,用户无法访问状态的其他运行时所需要的信息。接口体系如下图:
2、面向内部State接口
内部State 接口 是给 Flink 框架使用,提供更多的State方法,可以根据需要灵活扩展。除了对State中数据的访问之外,还提供内部运行时信息,如State中数据的序列化器,命名空间(namespace)、命名空间的序列化器、命名空间合并的接口。内部State接口命名方式为InternalxxxState。状态访问接口 有了状态之后,开发者自定义UDF时,应该如何访问状态?
状态会被保存在StateBackend中,但StateBackend 又包含不同的类型。所有Flink中抽象了两个状态访问接口: OperatorStateStore 和 KeyedStateStore,用户在编写UDF时,就无须考虑到底是使用哪种 StateBackend类型接口。
OperatorStateStore 接口原理:
OperatorState数据以Map形式保存在内存中,并没有使用RocksDBStateBackend和HeapKeyedStateBackend。
KeyedStateStore 接口原理:Flink状态存储
5. Flink状态如何存储?
在Flink中, 状态存储被叫做 StateBackend , 它具备两种能力:(1)在计算过程中提供访问State能力,开发者在编写业务逻辑中能够使用StateBackend的接口读写数据。 (2)能够将State持久化到外部存储,提供容错能力。
Flink状态提供三种存储方式:
(1)内存:MemoryStateBackend,适用于验证、测试、不推荐生产使用。
(2)文件:FSStateBackend,适用于长周期大规模的数据。
(3)RocksDB : RocksDBStateBackend,适用于长周期大规模的数据。
1、内存型 StateBackend
MemoryStateBackend,运行时所需的State数据全部保存在 TaskManager JVM堆上内存中, KV类型的State、窗口算子的State 使用HashTable 来保存数据、触发器等。执行检查点的时候,会把 State 的快照数据保存到**JobManager进程的内存中**。 MemoryStateBackend 可以使用异步的方式进行快照,(也可以同步),推荐异步,避免阻塞算子处理数据。 基于内存的 Stateßackend 在生产环境下不建议使用,可以在本地开发调试测试 。注意点如下 :
1) State 存储在 JobManager 的内存中.受限于 JobManager的内存大小。 2) 每个 State默认5MB,可通过 MemoryStateBackend 构造函数调整 3) 每个 Stale 不能超过 Akka Frame 大小。2、文件型 StateBackend
FSStateBackend,运行时所需的State数据全部保存在 TaskManager 的内存中,** 执行检查点的时候,会把 State 的快照数据保存到配置的文件系统中**。 可以是分布式或者本地文件系统,路径如: HDFS路径:“hdfs://namenode:40010/flink/checkpoints” 本地路径:“file:///data/flink/checkpoints”。 FSStateBackend适用于处理小状态、短窗口、或者小键值状态的有状态处理任务。注意点如下 :
1) State 数据首先被存在 TaskManager 的内存中。 2) State大小不能超过TM内存。 3) TM异步将State数据写入外部存储。MemoryStateBackend 和FSStateBackend 都依赖于HeapKeyedStateBackend,HeapKeyedStateBackend 使用 State存储数据。
3、RocksDBStateBackend
RocksDBStateBackend 跟内存型和文件型都不同 。
RocksDBStateBackend 使用**嵌入式的本地数据库 RocksDB 将流计算数据状态存储在本地磁盘中**,不会受限于TaskManager 的内存大小,在执行检查点的时候,再将整个 RocksDB 中保存的State数据全量或者增量持久化到配置的文件系统中,
在 JobManager 内存中会存储少量的检查点元数据。RocksDB克服了State受内存限制的问题,同时又能够持久化到远端文件系统中,比较适合在生产中使用。
缺点:
RocksDBStateBackend 相比基于内存的StateBackend,访问State的成本高很多,可能导致数据流的吞吐量剧烈下降,甚至可能降低为原来的 1/10。
适用场景
1)最适合用于处理大状态、长窗口,或大键值状态的有状态处理任务。 2)RocksDBStateBackend 非常适合用于高可用方案。 3) RocksDBStateBackend 是目前唯一支持增量检查点的后端。 增量检查点非常适用于超 大状态的场景。注意点
1)总 State 大小仅限于磁盘大小,不受内存限制 2)RocksDBStateBackend 也需要配置外部文件系统,集中保存State 。 3)RocksDB的 JNI API 基于 byte 数组,单 key 和单 Value 的大小不能超过 8 字节 4)对于使用具有合并操作状态的应用程序,如ListState ,随着时间可能会累积到超过 2*31次方字节大小,这将会导致在接下来的查询中失败。6. Flink状态如何持久化?
首选,Flink的状态最终都要持久化到第三方存储中,确保集群故障或者作业挂掉后能够恢复。 RocksDBStateBackend 持久化策略有两种:增量持久化策略 RocksIncementalSnapshotStrategy
全量持久化策略 RocksFullSnapshotStrategy
1、全量持久化策略
每次将全量的State写入到状态存储中(HDFS)。内存型、文件型、RocksDB类型的StataBackend 都支持全量持久化策略。快照保存策略类体系
在执行持久化策略的时候,使用异步机制,每个算子启动1个独立的线程,将自身的状态写入分布式存储可靠存储中。在做持久化的过程中,状态可能会被持续修改,基于内存的状态后端使用 CopyOnWriteStateTable 来保证线程安全,RocksDBStateBackend则使用RocksDB的快照机制,使用快照来保证线程安全。
2、增量持久化策略
增量持久化就是每次持久化增量的State,只有RocksDBStateBackend 支持增量持久化。Flink 增量式的检查点以 RocksDB为基础, RocksDB是一个基于LSM-Tree的KV存储。新的数据保存在内存中, 称为memtable。如果Key相同,后到的数据将覆盖之前的数据,一旦memtable写满了,RocksDB就会将数据压缩并写入磁盘。memtable的数据持久化到磁盘后,就变成了不可变的 sstable。
因为 sstable 是不可变的,Flink对比前一个检查点创建和删除的RocksDB sstable 文件就可以计算出状态有哪些发生改变。
为了确保 sstable 是不可变的,Flink 会在RocksDB 触发刷新操作,强制将 memtable 刷新到磁盘上 。在Flink 执行检查点时,会将新的sstable 持久化到HDFS中,同时保留引用。这个过程中 Flink 并不会持久化本地所有的sstable,因为本地的一部分历史sstable 在之前的检查点中已经持久化到存储中了,只需增加对 sstable文件的引用次数就可以。RocksDB会在后台合并 sstable 并删除其中重复的数据。然后在RocksDB删除原来的 sstable,替换成新合成的 sstable.。新的 sstable 包含了被删除的 sstable中的信息,通过合并历史的sstable会合并成一个新的 sstable,并删除这些历史sstable. 可以减少检查点的历史文件,避免大量小文件的产生。
7. Flink状态过期后如何清理?
1、DataStream中状态过期
可以对DataStream中的每一个状态设置 清理策略 StateTtlConfig,可以设置的内容如下:- 过期时间:超过多长时间未访问,视为State过期,类似于缓存。
- 过期时间更新策略:创建和写时更新、读取和写时更新。
- State可见性:未清理可用,超时则不可用。
2、Flink SQL中状态过期****
Flink SQL 一般在流Join、聚合类场景使用State,如果State不定时清理,则导致State过多,内存溢出。清理策略配置如下:
StreamQueryConfig qConfig = ...
//设置过期时间为 min = 12小时 ,max = 24小时
qConfig.withIdleStateRetentionTime(Time.hours(12),Time.hours(24));