源码解析 Flink UDAF 背后做了什么
0x00 摘要本文涉及到Flink SQL UDAF,Window 状态管理等部分,希望能起到抛砖引玉的作用,让大家可以借此深入了解这个领域。
0x01 概念1.1 概念大家知道,Flink的自定义聚合函数(UDAF)可以将多条记录聚合成1条记录,这功能是通过accumulate方法来完成的,官方参考指出:
但是实时计算还有一些特殊的场景,在此场景下,还需要提供merge方法才能完成。
1.2 疑问之前因为没亲身操作,所以一直忽略merge的特殊性。最近无意中看到了一个UDAF的实现,突然觉得有一个地方很奇怪,即 accumulate 和 merge 这两个函数不应该定义在一个类中。因为这是两个完全不同的处理方法。应该定义在两个不同的类中。
比如用UDAF做word count,则:
accumulate 是在一个task中累积数字,其实就相当于 map;merge 是把很多task的结果再次累积起来,就相当于 reduce;然后又想出了一个问题:Flink是如何管理 UDAF的accumulator?其状态存在哪里?
看起来应该是Flink在背后做了一些黑魔法,把这两个函数从一个类中拆分了。为了验证我们的推测,让我们从源码入手来看看这些问题:
Flink SQL转换/执行计划生成阶段,如何处理在 "同一个类中" 的不同类型功能函数 accumulate 和 merge?Flink runtime 如何处理 merge?Flink runtime 如何处理 UDAF的accumulator的历史状态?1.3 UDAF示例代码示例代码摘要如下 :
代码语言:javascript代码运行次数:0运行复制<pre class="brush:php;toolbar:false;">public class CountUdaf extends AggregateFunction<Long, CountUdaf.CountAccum> { //定义存放count UDAF状态的accumulator的数据的结构。 public static class CountAccum { public long total; } //初始化count UDAF的accumulator。 public CountAccum createAccumulator() { CountAccum acc = new CountAccum(); acc.total = 0; return acc; } //accumulate提供了,如何根据输入的数据,更新count UDAF存放状态的accumulator。 public void accumulate(CountAccum accumulator, Object iValue) { accumulator.total++; } public void merge(CountAccum accumulator, Iterable<CountAccum> its) { for (CountAccum other : its) { accumulator.total += other.total; } }}批处理相对简单,因为数据是有边界的,其逻辑比较清晰。
2.1 代码首先给出测试代码
代码语言:javascript代码运行次数:0运行复制<pre class="brush:php;toolbar:false;">val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))// register the DataSet as a view "WordCount"tEnv.createTemporaryView("WordCount", input, 'word, 'frequency)tEnv.registerFunction("countUdaf", new CountUdaf())// run a SQL query on the Table and retrieve the result as a new Tableval table = tEnv.sqlQuery("SELECT word, countUdaf(frequency), SUM(frequency) FROM WordCount GROUP BY word")case class WC(word: String, frequency: Long)在
DataSetAggregate.translateToPlan
于是我们推断,这很有可能就是 combineGroup 调用accumulate,reduceGroup 调用 merge。
关于combineGroup,如果有兴趣,可以看看我之前文章 [源码解析] Flink的groupBy和reduce究竟做了什么 以及 源码解析 GroupReduce,GroupCombine 和 Flink SQL group by](https://cloud.tencent.com/developer/article/1693307)
代码语言:javascript代码运行次数:0运行复制<pre class="brush:php;toolbar:false;">override def translateToPlan(tableEnv: BatchTableEnvImpl, queryConfig: BatchQueryConfig): DataSet[Row] = { if (grouping.length > 0) { // grouped aggregation if (preAgg.isDefined) { // 执行到这里 inputDS // pre-aggregation .groupBy(grouping: _*) .combineGroup(preAgg.get) // 第一阶段 .returns(preAggType.get) .name(aggOpName) // final aggregation .groupBy(grouping.indices: _*) .reduceGroup(finalAgg.right.get) // 第二阶段 .returns(rowTypeInfo) .name(aggOpName) } }}SQL语句对应的执行计划大致为:
![[源码解析] Flink UDAF 背后做了什么](https://img.php.cn/upload/article/001/503/042/175738027194599.jpg)
在执行看,确实对应了两个阶段。
阶段 1 确实是 GroupReduceCombineDriver 调用到了 accumulate。
代码语言:javascript代码运行次数:0运行复制<pre class="brush:php;toolbar:false;">//堆栈如下accumulate:25, CountUdaf (mytest)accumulate:-1, DataSetAggregatePrepareMapHelper$5combine:71, DataSetPreAggFunction (org.apache.flink.table.runtime.aggregate)sortAndCombine:213, GroupReduceCombineDriver (org.apache.flink.runtime.operators)run:188, GroupReduceCombineDriver (org.apache.flink.runtime.operators) //SQL UDAF生成的代码如下 function = {DataSetAggregatePrepareMapHelper$5@10085} function_mytest$CountUdaf$5ae272a09e5f36214da5c4e5436c4c48 = {CountUdaf@10079} "CountUdaf" function_org$apache$flink$table$functions$aggfunctions$LongSumAggFunction$a5214701531789b3139223681d = {LongSumAggFunction@10087} "LongSumAggFunction" 阶段 2 中 GroupReduceDriver 调用到了 merge
代码语言:javascript代码运行次数:0运行复制<pre class="brush:php;toolbar:false;">//堆栈如下merge:29, CountUdaf (mytest)mergeAccumulatorsPair:-1, DataSetAggregateFinalHelper$6reduce:71, DataSetFinalAggFunction (org.apache.flink.table.runtime.aggregate)run:131, GroupReduceDriver (org.apache.flink.runtime.operators) //SQL UDAF生成的代码如下 function = {DataSetAggregateFinalHelper$6@10245} function_mytest$CountUdaf$5ae272a09e5f36214da5c4e5436c4c48 = {CountUdaf@10238} "CountUdaf" function_org$apache$flink$table$functions$aggfunctions$LongSumAggFunction$a5214701531789b3139223681d = {LongSumAggFunction@10247} "LongSumAggFunction" Flink对用户定义的UDAF代码分别生成了两个不同的功能类:
DataSetAggregatePrepareMapHelper : 用于Combine阶段,调用了accumulateDataSetAggregateFinalHelper :用于Reduce阶段,调用了merge2.4 状态管理UDAF有一个accumulator,这个会在程序运行过程中始终存在,Flink是如何管理这个accumulator呢?
GroupReduceCombineDriver类有一个成员变量 combiner,
代码语言:javascript代码运行次数:0运行复制<pre class="brush:php;toolbar:false;">public class GroupReduceCombineDriver<IN, OUT> implements Driver<GroupCombineFunction<IN, OUT>, OUT> { private GroupCombineFunction<IN, OUT> combiner;}而 combiner 被赋予了 DataSetPreAggFunction 类的一个实例。
代码语言:javascript代码运行次数:0运行复制<pre class="brush:php;toolbar:false;">class DataSetPreAggFunction(genAggregations: GeneratedAggregationsFunction) extends AbstractRichFunction{ private var accumulators: Row = _ //这里存储历史状态 private var function: GeneratedAggregations = _}Flink就是把 UDAF的accumulator 存储在
combiner.accumulators
<pre class="brush:php;toolbar:false;">combiner = {DataSetPreAggFunction@10063} genAggregations = {GeneratedAggregationsFunction@10070} accumulators = {Row@10117} "mytest.CountUdaf$CountAccum@1e343db7,(0,false)" function = {DataSetAggregatePrepareMapHelper$5@10066} // function是包含用户代码的功能类。 function_mytest$CountUdaf$5ae272a09e5f36214da5c4e5436c4c48 = {CountUdaf@10076} "CountUdaf" 让我们总结一下,批处理被分成两个阶段:
combineGroup :根据用户UDAF代码生成功能类 DataSetAggregatePrepareMapHelper,用于Combine阶段,调用了accumulate;reduceGroup :根据用户UDAF代码生成功能类 DataSetAggregateFinalHelper,用于Reduce阶段,调用了 merge;Flink在GroupReduceCombineDriver类的成员变量 combiner 中存储 accumulator历史状态。
0x03 流处理流处理则是和批处理完全不同的世界,下面我们看看流处理背后有什么奥秘。
在流计算场景中,数据没有边界源源不断的流入的,每条数据流入都可能会触发计算,比如在进行count或sum这些操作是如何计算的呢?
是选择每次触发计算将所有流入的历史数据重新计算一遍?还是每次计算都基于上次计算结果进行增量计算呢?如果选择增量计算,那么上一次的中间计算结果保存在哪里?内存?3.1 示例代码代码语言:javascript代码运行次数:0运行复制<pre class="brush:php;toolbar:false;">val query: Table = tableEnv.sqlQuery( """ |SELECT |countUdaf(num) |FROM tb_num |GROUP BY TUMBLE(proctime, INTERVAL '10' SECOND) """.stripMargin)
DataStreamGroupWindowAggregateBase.translateToPlan
WindowedStream
WindowAssigner
WindowedStream
KeyedStream
AllWindowedStream
AllWindowedStream
DataStream
windowAll(...)
我们的示例代码是基于Key的,所以走
WindowedStream
<pre class="brush:php;toolbar:false;">// grouped / keyed aggregationif (grouping.length > 0) { // 有key,所以是 WindowedStream,我们示例走这里 val windowFunction = AggregateUtil.createAggregationGroupWindowFunction(...) val keySelector = new CRowKeySelector(grouping, inputSchema.projectedTypeInfo(grouping)) val keyedStream = timestampedInput.keyBy(keySelector) val windowedStream = createKeyedWindowedStream(queryConfig, window, keyedStream) .asInstanceOf[WindowedStream[CRow, Row, DataStreamWindow]] val (aggFunction, accumulatorRowType) = AggregateUtil.createDataStreamGroupWindowAggregateFunction(...) windowedStream .aggregate(aggFunction, windowFunction, accumulatorRowType, outRowType) .name(keyedAggOpName)}// global / non-keyed aggregationelse { // 没有key,所以是AllWindowedStream val windowFunction = AggregateUtil.createAggregationAllWindowFunction(...) val windowedStream = createNonKeyedWindowedStream(queryConfig, window, timestampedInput) .asInstanceOf[AllWindowedStream[CRow, DataStreamWindow]] val (aggFunction, accumulatorRowType) = AggregateUtil.createDataStreamGroupWindowAggregateFunction(...) windowedStream .aggregate(aggFunction, windowFunction, accumulatorRowType, outRowType) .name(nonKeyedAggOpName)}SQL语句对应的执行计划大致如下,我们能看出来 accumulate & merge 都在 Window 中处理。
![[源码解析] Flink UDAF 背后做了什么](https://img.php.cn/upload/article/001/503/042/175738027122774.jpg)
可以看到,流处理对UDAF的管理,就完全是进入了Window的地盘,而UDAF历史状态管理其实就是Flink Window状态管理的领域了。
我们以基于key的WindowedStream为例继续进行研究。
3.3.1 接受到一个新输入当Window接受到一个输入item时候,item会被分配到一个key,由KeySelector完成。WindowOperator 类首先使用用户选择的 windowAssigner 将流入的数据分配到响应的window中,有可能是1个,0个甚至多个window。这里就会做accumulate。
本例
windowAssigner = {TumblingProcessingTimeWindows}可以看到,是 windowState 添加元素时候,调用到State的API,然后间接调用到了UDAF。
3.3.2 windowState & UDAF执行windowState 以 window 为 namespace,以隔离不同的window的context。这里虽然叫做 windowState 。但是可以发现,该类存储的是不同window中的对应的原始数据(processWindowFunction情况)或结果(ReduceFunction/AggregateFunction情况)。我们此例中,存储的是执行结果。
本例用到的 window process 是 Incremental Aggregation Functions。即 ReduceFunction 与 AggregateFunction ,其特点是无需保存 window 中的所有数据,一旦新数据进入,便可与之前的中间结果进行计算,因此这种 window 中其状态仅需保存一个结果便可。
因此这里我们拿到的是 HeapReducingState, HeapAggregatingState,当执行到
windowState.add(element.getValue());
在flink中state用来存放计算过程的节点中间结果或元数据。在flink内部提供三种state存储实现
内存HeapStateBackend:存放数据量小,用于开发测试使用;生产不建议使用HDFS的FsStateBackend :分布式文件持久化,每次都会产生网络io,可用于大state,不支持增量;可用于生产RocksDB的RocksDBStateBackend:本地文件 + 异步hdfs持久化,也可用于大state数据量,唯一支持增量,可用于生产;我们这里拿到的是 HeapAggregatingState。
3.3.4 State 存储结构以三元组的形式存储保存数据,即 key, namespace, value。
代码语言:javascript代码运行次数:0运行复制<pre class="brush:php;toolbar:false;">public abstract class StateTable<K, N, S>implements StateSnapshotRestore, Iterable<StateEntry<K, N, S>> { /** * Map for holding the actual state objects. The outer array represents the key-groups. * All array positions will be initialized with an empty state map. */protected final StateMap<K, N, S>[] keyGroupedStateMaps;}// 真实中变量摘录如下keyGroupedStateMaps = {StateMap[1]@9266} 0 = {CopyOnWriteStateMap@9262} // 这里就是将要保存用户accumulator的地方 stateSerializer = {RowSerializer@9254} snapshotVersions = {TreeSet@9277} size = 0 primaryTable = {CopyOnWriteStateMap$StateMapEntry[128]@9278} incrementalRehashTable = {CopyOnWriteStateMap$StateMapEntry[2]@9280} lastNamespace = {TimeWindow@9239} "TimeWindow{start=1593934200000, end=1593934210000}"在上面提及的
3.1.2)stateMap.transform(key, namespace, value, transformation);
<pre class="brush:php;toolbar:false;">@Overridepublic <T> void transform( K key, N namespace, T value, StateTransformationFunction<S, T> transformation) throws Exception { final StateMapEntry<K, N, S> entry = putEntry(key, namespace); // copy-on-write check for state entry.state = transformation.apply( (entry.stateVersion < highestRequiredSnapshotVersion) ? getStateSerializer().copy(entry.state) : entry.state, value); // 当执行完用户代码之后,数据会存储在这里,这个就是CopyOnWriteStateMap的一个Entry entry.stateVersion = stateMapVersion;流处理对UDAF的管理,就完全是进入了Window的地盘,而UDAF历史状态管理其实就是Flink Window状态管理的领域了。
window接受到新输入,就会往 windowState 添加元素。windowState 添加元素时候,调用到State的API,然后间接调用到了UDAFwindowState 在本例存储的是UDAF执行结果。具体存储是在HeapAggregatingState中完成。0xFF 参考Flink - 当数据流入window时,会发生什么
Flink SQL 自定义UDAF
自定义聚合函数(UDAF)
Apache Flink - 常见数据流类型
Flink-SQL源码解读(一)window算子的创建的源码分析
从udaf谈flink的state
Apache Flink - 常见数据流类型
Flink状态管理(二)状态数据结构和注册流程
以上就是[源码解析] Flink UDAF 背后做了什么的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号