0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)

爱谁谁
发布: 2025-07-18 08:00:26
原创
227人浏览过

在《0基础学习pyflink——个数滚动窗口(tumbling count windows)》一文中,我们了解到如果窗口内元素个数未达到设定窗口大小,计算个数的函数不会被触发。例如,下图中红色部分的元素(b,2)和(d,5)不会被计算:

0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)

为了让这些元素也能被计算,我们可以使用时间滚动窗口(Tumbling Time Windows)。这种窗口不依赖于元素的数量,而是基于时间进行触发。只要时间窗口到达,无论窗口内有多少元素,计算都会进行。

我们可以稍作修改《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》的示例,将元素集中在“A”上。以下是修改后的代码:

豆绘AI
豆绘AI

豆绘AI是国内领先的AI绘图与设计平台,支持照片、设计、绘画的一键生成。

豆绘AI 485
查看详情 豆绘AI

map代码语言:javascript

class SumWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]):
    def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]):
        print(*inputs, window)
        return [(key,  len([e for e in inputs]))]
<p>word_count_data = [("A",2),("A",1),("A",4),("A",3),("A",6),("A",5),("A",7),("A",8),("A",9),("A",10),
("A",11),("A",12),("A",13),("A",14),("A",15),("A",16),("A",17),("A",18),("A",19),("A",20)]</p><p>def word_count():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)</p><h1>write all the data to one file</h1><pre class="brush:php;toolbar:false;"><code>env.set_parallelism(1)
source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])
# define the source
# mappging
source = env.from_collection(word_count_data, source_type_info)
# source.print()
# keying
keyed=source.key_by(lambda i: i[0])</code></pre><p>reduce代码语言:javascript</p><pre><code class="javascript">    # reducing
reduced=keyed.window(TumblingProcessingTimeWindows.of(Time.milliseconds(2))) \
                .apply(SumWindowFunction(),
                    Types.TUPLE([Types.STRING(), Types.INT()]))
        # # define the sink
reduced.print()
# submit for execution
env.execute()</code></pre><p>在这个例子中,我们使用了时间滚动窗口,窗口大小设置为2毫秒(<code>Time.milliseconds(2)</code>)。运行这段代码时,由于基于时间触发计算,每个元素都会被计算,输出结果可能会有所不同:</p><p><img src="/uploads/20250428/1745839071680f63df670f9.jpg" alt="0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)" /></p><p>或</p><p><img src="/uploads/20250428/1745839072680f63e00f547.jpg" alt="0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)" /></p><p>或</p><p><img src="/uploads/20250428/1745839073680f63e1a7f86.jpg" alt="0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)" /></p><p>可以看出,结果并不稳定,但每条数据都会被计算,而不是像个数滚动窗口那样某些数据可能不会被触发。</p><p>完整代码如下:</p><pre><code class="javascript">from typing import Iterable
登录后复制

import time from pyflink.common import Types, Time from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction from pyflink.datastream.window import TimeWindow, TumblingProcessingTimeWindows

class SumWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]): def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]): print(*inputs, window) return [(key, len([e for e in inputs]))]

word_count_data = [("A",2),("A",1),("A",4),("A",3),("A",6),("A",5),("A",7),("A",8),("A",9),("A",10), ("A",11),("A",12),("A",13),("A",14),("A",15),("A",16),("A",17),("A",18),("A",19),("A",20)]

def word_count(): env = StreamExecutionEnvironment.get_execution_environment() env.set_runtime_mode(RuntimeExecutionMode.STREAMING)

write all the data to one file

<code>env.set_parallelism(1)
source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])
# define the source
# mappging
source = env.from_collection(word_count_data, source_type_info)
# source.print()
# keying
keyed=source.key_by(lambda i: i[0])

# reducing
reduced=keyed.window(TumblingProcessingTimeWindows.of(Time.milliseconds(2))) \
                .apply(SumWindowFunction(),
                    Types.TUPLE([Types.STRING(), Types.INT()]))
        # # define the sink
reduced.print()
# submit for execution
env.execute()</code>
登录后复制

if name == 'main': word_count()

参考资料:https://www.php.cn/link/dc61c1317e2c1637f0f8d2de7fd8da9b

以上就是0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号