论文:Disaggregated State Management in Apache Flink 2.0 (VLDB 2025)

Flink 2.0 为什么要做存算分离?从大状态流处理看状态管理架构演进

在实时计算系统中,状态管理一直是一个绕不开的问题。

对于简单的流式任务来说,系统可能只需要对每条数据做过滤、转换或简单统计,状态规模并不大。但在真实业务中,很多流处理任务并不是“来一条算一条”这么简单。它们需要记住历史信息,需要等待另一条流的数据到来,需要在窗口中持续维护中间结果,还要在故障之后准确恢复计算进度。

随着业务规模增长,状态会越来越大。当状态从几 GB 增长到数百 GB,甚至 TB 级时,原本看起来合理的本地状态架构就会逐渐暴露瓶颈。Flink 2.0 中的存算分离状态管理,正是为了解决这个问题。

本文围绕 Flink 2.0 的状态管理架构展开,重点讨论三个问题:Flink 1.x 为什么会在大状态场景下遇到瓶颈;Flink 2.0 如何通过 AEC、ForSt 和 UFS 实现计算与状态解耦;以及这种架构在真实物流工作负载和 Nexmark Benchmark 中带来了哪些收益与代价。

从批处理到流处理:为什么状态会成为核心问题

数据处理大致可以分为批处理和流处理。

批处理面对的是已经收集完成的一批数据。系统先把数据积累起来,再统一进行统计、分析和输出。这种方式适合离线报表、日终结算和历史数据分析,优点是流程清晰、结果稳定,也便于复现和审计。但它通常需要等待数据积累到一定规模后再计算,因此实时性较弱。

流处理面对的则是持续产生、不断到达的数据。实时风控需要尽快识别异常交易,实时推荐需要根据用户当前行为调整结果,监控告警需要在异常发生后立即响应。在这些场景下,系统不能等到“数据收集完”之后再统一处理,而是要一边接收数据,一边持续计算。

Apache Flink 是典型的分布式流处理系统。它支持有状态计算、Exactly-once 语义和事件时间处理,因此被广泛应用在实时推荐、风险控制、物流追踪、实时数仓和监控告警等场景中。

但流处理越深入真实业务,状态就越重要。系统不仅要处理当前数据,还要维护用户画像、订单状态、窗口聚合结果、Join 缓存、去重集合等中间信息。状态一旦变大,就会影响计算资源、存储资源、Checkpoint、故障恢复和扩缩容。

在 Flink 1.x 中,一个流处理作业会被表示为由多个算子组成的有向无环图,也就是 DAG。典型作业通常包括 Source、Transformation 和 Sink。Source 负责接收 Kafka、日志系统或数据库中的数据;Transformation 执行过滤、聚合、窗口计算和 Join;Sink 将结果写入数据库、消息队列或文件系统。

在部署层面,Flink 主要由 JobManager 和 TaskManager 组成。JobManager 负责生成执行图、调度任务、管理作业生命周期、协调 Checkpoint,并在故障时触发恢复。TaskManager 是真正执行计算的工作节点,内部运行多个 Stream Task,通过网络通道接收上游数据,并把结果发送给下游。

Flink 1.x 的关键特点是:Checkpoint 文件会保存在 HDFS、S3 或 OSS 这类远程持久化系统中,但作业运行时频繁读写的活动状态,仍然主要保存在 TaskManager 本地。

换句话说,Flink 1.x 是“本地状态负责运行,远程状态负责恢复”。

对于大状态任务,Flink 1.x 通常使用 RocksDB 状态后端。RocksDB 采用 LSM-Tree 结构,可以把热数据放在内存中,把 SSTable 文件保存在本地磁盘上,因此能够支持大于内存容量的状态。

这种设计在小状态或中等状态场景下性能很好,因为本地状态访问延迟低,计算线程可以高效处理数据。但当状态规模达到数百 GB 甚至 TB 级时,本地状态架构会逐渐成为系统瓶颈。

大状态场景:实时物流订单生命周期追踪

论文中的真实业务场景是阿里巴巴的实时物流管理任务。这个任务要追踪订单从创建、发货、运输、清关,到最终签收的完整生命周期。

它的数据来源很多,包括天猫、淘宝、AliExpress 等电商平台的订单流,也包括物流运输状态、仓库状态和配送状态等更新流。从作业结构来看,这个场景主要包含两个有状态处理阶段:第一阶段是订单去重与聚合,系统需要根据订单 ID 保存最新版本的订单信息;第二阶段是订单状态与物流状态的实时 Join,系统需要把订单更新流和物流运输更新流匹配起来,生成更完整的物流记录。

这个任务的计算逻辑并不复杂,主要是聚合和 Join。真正困难的是状态规模。

物流状态不能很快删除。虽然大部分订单可能在一周内完成配送,但真实业务中会出现海外清关、极端天气、地址异常、二次配送、退货逆向物流、物流节点延迟上报等情况。为了保证订单生命周期完整,系统需要保存大约 60 天的订单和物流状态。

在 Join 算子内部,至少要维护订单更新状态和物流更新状态。状态规模会随着订单数量、物流事件数量和状态保留周期一起增长。普通时间段可能达到数百 GB,“双十一”等促销活动期间甚至可能达到 TB 级。

这类任务的特点是:每条记录的计算逻辑并不重,CPU 未必是瓶颈,但状态量非常大,本地磁盘会先成为限制因素。它是典型的磁盘受限型作业,而不是 CPU 受限型作业。

第一个瓶颈是本地磁盘容量限制。

在云平台中,计算资源往往按固定规格出售。例如,一个计算单元可能包含 1 个 CPU 核心、4 GB 内存和 20 GB 本地磁盘。如果一个作业总状态规模是 290 GB,即使它实际只需要 8 个 CPU 核心,也可能因为磁盘容量不够,被迫申请 16 个计算单元。

用户真正需要的是更多存储,但由于 CPU、内存和磁盘被绑定在一起,最终不得不同时购买更多 CPU 和内存。这不仅带来资源浪费,也降低了弹性扩展能力。状态增加时,不能只扩展存储,而必须增加 TaskManager 数量,并重新分配计算任务。

第二个瓶颈是后台状态操作会干扰前台计算。

RocksDB 使用 LSM-Tree 管理状态。随着状态不断更新,系统会产生大量 SSTable 文件,并周期性执行 Compaction。Compaction 会合并文件、清理过期版本,并优化后续读取性能。同时,Flink 还会周期性执行 Checkpoint,把本地状态复制到远程存储。

虽然 Compaction 和 Checkpoint 可以放在后台线程执行,但它们仍然运行在同一个 TaskManager 上,会和前台算子共享 CPU、本地磁盘、内存和网络带宽。当 Checkpoint 或 Compaction 触发时,TaskManager 容易出现 CPU 和磁盘 I/O 峰值,进而导致处理延迟上升、吞吐量波动,甚至引发反压。

第三个瓶颈是 Checkpoint、恢复和扩缩容时间长。

在 Flink 1.x 中,Checkpoint 通常包括两个阶段:同步阶段负责冻结或复制相关数据结构,生成本地快照;异步阶段负责把本地状态文件上传到 HDFS、S3 或 OSS。对于大状态任务,即使开启增量 Checkpoint,每次仍然可能产生较大的新增文件。

恢复和扩缩容时问题更加明显。TaskManager 需要从远程 Checkpoint 下载状态文件,在本地重建 RocksDB,并重新分配 Key Group。当状态越大,故障恢复、跨集群迁移、Scale-out、Scale-in 和作业重启就越慢。

因此,Flink 1.x 的问题不是单纯的“状态存不下”,而是计算和状态存储绑定在一起后,磁盘容量、后台资源干扰和状态迁移成本都会成为大状态流处理的瓶颈。

Flink 2.0 的核心设计思想可以概括为一句话:将计算与状态存储解耦,并把分布式文件系统作为主要状态存储。

在 Flink 1.x 中,活动状态主要保存在 TaskManager 本地,而 Checkpoint 状态保存在 HDFS、S3 或 OSS 中。正常运行依赖本地状态,故障恢复依赖远程状态,两者之间需要不断复制和迁移。

在 Flink 2.0 中,活动状态本身就直接写入 DFS,Checkpoint 也位于 DFS。本地内存和本地磁盘不再承担唯一持久化职责,而是更多作为缓存使用。

这样一来,TaskManager 的角色发生了变化。它不再是“计算和状态存储一体化”的节点,而是更偏向负责计算执行。状态的主要持久化能力交给远程分布式存储系统。

这种设计带来的目标包括:支持超过单节点磁盘容量的大状态;允许计算和存储相对独立扩展;减少 Checkpoint 中的数据复制;加快故障恢复和扩缩容。

不过,Flink 2.0 并不是完全推翻原有架构。JobManager、TaskManager 和 Checkpoint Coordinator 的基本模式仍然保留,新的状态后端 ForSt 也仍然运行在 TaskManager 内部。因此,它更像是一种兼容原有系统的渐进式架构演进。

AEC:让远程状态访问不再阻塞主线程

把状态放到远程 DFS 后,读取延迟不可避免地增加。如果仍然采用 Flink 1.x 的同步执行方式,那么一条记录发起远程状态读取后,主线程必须一直等待状态返回,才能继续处理下一条记录。

异步执行的作用,就是把状态访问从主线程关键路径中移出去。它不能降低单次远程 I/O 的物理延迟,但可以通过并发提高整体吞吐量,让 CPU 在等待远程 I/O 时继续处理其他记录。

Flink 2.0 引入了 AEC,也就是 Asynchronous Execution Controller。它将一条记录的处理拆成三个阶段。

第一阶段是非状态计算,由主线程执行,主要完成字段解析、格式转换、Key 提取和状态查询参数构造。

第二阶段是异步状态访问,状态请求会提交给 AEC 中的状态访问线程池。线程池可以访问内存缓存、本地磁盘缓存或远程 DFS。这个过程中,主线程不需要阻塞等待。

第三阶段是状态访问后的回调。当状态请求完成后,对应回调任务进入 Callback Channel。主线程再从回调队列中取出任务,完成 Join、聚合、状态更新和结果输出。

这样,Flink 2.0 可以把 CPU 计算和远程 I/O 重叠起来,提高远程状态场景下的吞吐量。

但异步执行也带来了语义问题。如果相同 Key 的多条记录并发访问和修改状态,就可能出现乱序更新。为此,AEC 引入了 Key Accounting Unit,用来记录当前正在处理的 Key。相同 Key 的记录必须串行执行,不同 Key 的记录可以尽量并发。

因此,AEC 的核心规则是:相同 Key 严格串行,不同 Key 尽量并发。

此外,异步执行还会影响 Checkpoint 和 Watermark。Checkpoint Barrier 到达时,Barrier 前面的记录可能还没有处理完成。Flink 2.0 通过异步排空机制解决这个问题:Barrier 到达后暂停处理后续新记录,等待 Barrier 之前所有状态请求、回调和阻塞缓冲中的记录处理完成,再生成 Checkpoint。

Watermark 的问题类似。如果 Watermark 已经到达,但它之前的某些事件还在等待异步状态读取,就不能立即向下游传播。Flink 2.0 使用事件时间 Epoch 机制,将两个连续 Watermark 之间的记录划分为一个 Epoch。只有队首 Epoch 中的所有记录都处理完成后,对应 Watermark 才能继续向下游发送。

也就是说,AEC 不只是为了提高并发,它还承担了维护 Flink 核心语义的职责:按 Key 顺序、Exactly-once 和 Watermark 事件时间语义都不能因为异步执行而被破坏。

ForSt:面向存算分离的状态后端

如果说 AEC 解决的是运行时问题,那么 ForSt 解决的是状态本身如何组织、存储和缓存的问题。

ForSt 的名称来自 For Streaming,是 Flink 2.0 中新的分离式状态后端。从数据结构上看,ForSt 延续了 RocksDB 的 LSM-Tree 设计,包括 MemTable、不同 Level 的 SSTable、Flush 和 Compaction。因此,它仍然适合高频写入和大规模状态管理。

ForSt 和传统 RocksDB 状态后端最大的区别在于:SSTable 文件不再长期保存在 TaskManager 本地,而是通过 UFS 直接写入远程 DFS。DFS 成为活动状态的主要存储位置,TaskManager 本地内存和磁盘主要作为缓存。

为了降低远程访问延迟,ForSt 使用两级本地缓存。第一级是内存 Block Cache,用于缓存经常访问的数据块。第二级是本地磁盘 File Cache,用于缓存完整的 SSTable 文件。

这里和 Flink 1.x 有一个关键区别:在 Flink 1.x 中,本地 SSTable 是主要状态文件;在 Flink 2.0 中,本地 SSTable 只是缓存。即使本地缓存丢失,也不会影响状态正确性,因为 DFS 中仍然保存完整状态。

ForSt 还可以将 Compaction 转移到独立的 Remote Compaction Service 中。Compactor 直接从 DFS 读取文件,完成合并后把新文件写回 DFS,并通知 ForSt 更新 LSM-Tree 元数据。这样可以减少 TaskManager 上的 CPU 峰值和磁盘 I/O 抖动,让后台状态维护和前台计算进一步解耦。

UFS:把 Checkpoint 从复制文件变成更新元数据

ForSt 要把状态写入远程存储,就必须面对不同 DFS 文件语义不一致的问题。HDFS 比较接近传统文件系统,而 S3、OSS 这类对象存储主要基于对象 Key,不一定支持真正的目录结构、原子 Rename 或 POSIX Hard Link。

因此,Flink 2.0 引入了 UFS,也就是 Unified File System。UFS 对上提供统一的逻辑文件视图,对下适配不同 DFS。它维护逻辑文件名、物理文件位置、引用计数、工作目录和 Checkpoint 目录之间的映射,以及文件生命周期。

UFS 最关键的能力是逻辑链接。即使底层对象存储不支持真正的 Hard Link,UFS 也可以通过元数据管理,让多个逻辑文件指向同一个物理对象。

这直接改变了 Checkpoint 的成本。

在 Flink 1.x 中,Checkpoint 的核心操作是复制和上传状态文件。活动状态在本地,恢复状态在远程,所以每次 Checkpoint 都需要把本地状态同步到 DFS。

在 Flink 2.0 中,活动状态文件本身已经位于 DFS。大部分数据在 Checkpoint 之前就已经持久化,不需要再次上传。因此,Checkpoint 主要变成了文件引用和元数据更新。

ForSt 先确定本次 Checkpoint 需要哪些 SSTable 文件,然后 UFS 在 Checkpoint 目录中为这些文件创建逻辑链接。工作目录和 Checkpoint 目录中的逻辑文件可以指向同一个物理文件。JobManager 只需要记录本次 Checkpoint 对应的文件引用和元数据。

旧 Checkpoint 删除时,JobManager 不会直接删除物理文件,而是通知 UFS 减少引用计数。只有当某个物理文件既不被活动状态引用,也不被任何 Checkpoint 引用时,UFS 才会真正删除它。

因此,Flink 2.0 把 Checkpoint 从数据密集型操作,转变成了元数据密集型操作。

恢复与扩缩容:从移动数据到建立引用

同样的思想也可以用于恢复和扩缩容。

在 Flink 1.x 中,恢复时需要从 DFS 下载 Checkpoint 文件,再复制到 TaskManager 本地,并重建 RocksDB。对于数百 GB 状态,这个过程可能需要几分钟甚至更久。

在 Flink 2.0 中,新的 ForSt 实例可以直接链接 DFS 中已经存在的状态文件。恢复过程主要变成:JobManager 读取 Checkpoint 元数据,为新的 ForSt 实例分配状态文件引用,在工作目录中建立逻辑链接,加载 LSM-Tree 元数据,然后恢复处理。

扩缩容过程也类似。当并行度变化时,系统需要重新分配 Key Group,并调整不同 ForSt 实例引用的文件和元数据。由于不再大规模搬迁状态数据,恢复和扩缩容时间就不再与状态总大小严格线性相关。

当然,这并不意味着恢复完全没有成本。系统仍然需要读取文件元数据并初始化 ForSt。对于对象存储来说,小文件和元数据读取仍然可能成为瓶颈。但总体来看,Flink 2.0 已经把恢复和扩缩容从“移动大量状态数据”,转变为“重新建立状态引用”。

实验结果:成本、Checkpoint、恢复与资源稳定性

论文实验主要分为两部分:真实物流工作负载和 Nexmark Benchmark。

真实物流工作负载的状态规模大约是 290 GB,用来对比 Flink 1.20 加 RocksDB 本地状态后端,以及 Flink 2.0 加 ForSt 分离式状态后端。实验关注最小计算单元数量、月度成本、Checkpoint 时间、故障恢复时间、Scale-out、Scale-in 和 CPU 利用率稳定性。

在成本方面,Flink 1.20 因为状态主要保存在 TaskManager 本地磁盘上,每个计算单元只提供 20 GB 本地磁盘,所以至少需要 16 个计算单元。Flink 2.0 把主要状态放到 DFS,不再受单个计算单元本地磁盘容量限制,只需要 8 个计算单元即可维持相同流量并避免反压。论文给出的月度成本从约 688 美元降到约 344 美元,降低约 50%。

在 Checkpoint 方面,实验中 Checkpoint 间隔设置为 1 分钟,并开启增量 Checkpoint。系统连续运行 5 小时,共记录 300 次 Checkpoint。Flink 2.0 在 HDFS 环境中的所有 Checkpoint 都可以在 3 秒内完成;而 Flink 1.20 有 19.7% 的 Checkpoint 超过 30 秒,还有超过 1.5% 的 Checkpoint 超过 50 秒。在 OSS 对象存储环境中,Flink 2.0 的 Checkpoint 也基本可以在 4 秒内完成。原因在于活动状态已经位于 DFS 中,Checkpoint 不再需要复制大量状态文件,而主要通过 UFS 创建逻辑链接并更新元数据。

在恢复和扩缩容方面,论文评估了跨集群迁移、Scale-out 和 Scale-in 三种场景。Flink 1.20 需要下载远程 Checkpoint 并在新 TaskManager 本地重建 RocksDB;Flink 2.0 则主要通过 UFS 链接已有状态文件。实验显示,Flink 2.0 在 HDFS 环境下能够在几十秒内完成恢复和扩缩容,部分场景最高可获得约 49 倍速度提升。

在资源稳定性方面,Flink 1.20 中可以观察到明显的周期性 CPU 峰值,而且这些峰值和 1 分钟一次的 Checkpoint 周期基本一致。Flink 2.0 的 TaskManager CPU 曲线更加平稳,因为 UFS 减少了 Checkpoint 中的数据复制,Remote Compaction 也可以减少 Compaction 对计算节点的干扰。

Nexmark 实验:异步执行和缓存不是免费午餐

Nexmark Benchmark 用来分析 Flink 2.0 在不同工作负载下的稳态吞吐量表现。

对于状态访问频繁、I/O 压力较大的查询,如果采用同步远程访问,主线程会不断等待远程状态返回,吞吐量会明显受限。开启异步执行后,多个状态请求可以并发提交,远程 I/O 延迟可以被不同记录之间的并行处理隐藏。进一步加入本地缓存后,热点 SSTable 或数据块可以直接从内存和本地磁盘读取,吞吐量还会继续提升。

论文中的部分查询状态规模达到 1 GB 到 4 GB。即使本地缓存只有 1 GB,Flink 2.0 的性能仍然可以达到甚至超过 Flink 1.20 的本地磁盘方案。这说明,在高 I/O 工作负载中,远程状态并不一定意味着性能下降。只要结合异步并发和热点缓存,就可以有效弥补远程存储访问延迟。

但异步执行并不是免费午餐。

对于状态规模很小,或者状态能够完全放入内存缓存的查询来说,远程 I/O 并不是主要瓶颈。此时异步执行带来的收益有限,反而可能增加额外开销。这些开销包括 Future 对象创建、主线程和状态线程之间的任务调度、线程上下文切换、Key 管理、回调队列操作,以及垃圾回收。

论文统计,在有状态算子中,Flink 2.0 的异步分离式状态模型平均会增加约 30% 的 CPU 使用。因此,Flink 2.0 并不是要求所有作业都使用异步模式。对于大状态、I/O 密集型作业,异步模式更有优势;对于状态很小、CPU 密集型作业,同步模式可能更合适。

局限性与未来方向

Flink 2.0 的存算分离状态管理适合大状态、磁盘受限、频繁扩缩容和云原生流处理任务,但它也带来了新的挑战。

首先,系统更加依赖远程存储和网络。当 DFS 或网络出现故障时,状态访问可能直接受到影响,因此需要更完善的重试、限流、缓存和降级机制。

其次,恢复时虽然不再复制大文件,但仍然需要读取大量 SSTable 元数据。对于对象存储来说,小文件和元数据访问可能成为新的瓶颈。

第三,AEC、Future、线程调度和回调机制会增加 CPU 与 GC 开销。对于小状态或 CPU 密集型任务,这部分开销可能抵消异步带来的收益。

第四,缓存策略仍有优化空间。未来可以考虑更智能的热点预测、预取机制和自适应缓存分配。

第五,Remote Compaction 仍处于实验阶段。实际部署时,还需要处理任务失败、重复执行、文件一致性和服务过载等问题。

最后,不同作业适合不同执行模式。未来系统可以根据状态规模、缓存命中率、CPU 利用率和远程访问延迟,动态选择同步或异步执行,而不是让所有作业使用统一配置。

总结

Flink 1.x 将计算与状态紧密绑定。在小状态场景下,这种设计可以获得很好的本地访问性能;但在大状态和云原生环境中,它会带来本地磁盘受限、Checkpoint 时间长、恢复慢和资源利用率低等问题。

Flink 2.0 的核心思路是将 DFS 作为主要状态存储,并使用本地内存和磁盘作为缓存,从而实现计算和状态的相对独立扩展。

在运行时层面,AEC 通过异步状态访问和不同 Key 之间的并发执行,隐藏远程 I/O 延迟;同时通过 Key Accounting、异步排空和 Epoch Manager,继续保证按 Key 顺序、Exactly-once 和 Watermark 语义。

在状态管理层面,ForSt 使用 LSM-Tree、UFS、多级缓存和 Remote Compaction 管理大规模状态。UFS 通过逻辑链接和引用计数,把 Checkpoint 从文件复制转化为轻量级元数据操作。

最终,Flink 2.0 在大状态物流任务中实现了更低成本、更快 Checkpoint、更快恢复和更稳定的资源利用率。它的价值并不是让所有任务都更快,而是让大状态流处理任务在云原生环境中更容易扩展、更容易恢复,也更容易控制成本。