Dask DataFrame groupby 模式(Mode)聚合的实现指南

碧海醫心
发布: 2025-11-15 12:13:37
原创
667人浏览过

Dask DataFrame groupby 模式(Mode)聚合的实现指南

本教程详细阐述了如何在 dask dataframe 中对分组数据执行模式(mode)聚合。由于 dask 不直接提供 `groupby.agg` 的模式函数,文章通过自定义 `dask.dataframe.aggregation` 类,实现 `chunk`、`agg` 和 `finalize` 阶段的逻辑,从而有效地在分布式环境中计算分组模式,并提供完整的示例代码和注意事项。

引言:Dask Groupby 模式聚合的挑战

在数据分析中,查找一组数据的众数(mode)是一项常见操作。Pandas DataFrame 提供了 Series.mode() 方法,并且可以方便地与 groupby().agg() 结合使用,以计算每个分组的众数。然而,在处理大规模数据集时,Dask DataFrame 成为一个强大的分布式计算工具。尽管 Dask 提供了丰富的聚合功能,但其内置的 groupby().aggregate() 方法并不直接支持像 Pandas Series.mode 这样的聚合操作。这意味着,如果我们需要在 Dask DataFrame 中进行分组众数计算,就需要自定义聚合逻辑。

Pandas 中的模式聚合(作为参考)

在深入 Dask 的自定义聚合之前,我们首先回顾一下在 Pandas 中如何轻松实现这一功能。这有助于理解我们希望在 Dask 中复制的行为。

import pandas as pd
import numpy as np

# 示例数据
data_pandas = pd.DataFrame({
    'status': ['pending', 'pending', 'pending', 'canceled', 'canceled', 'canceled', 'confirmed', 'confirmed', 'confirmed'],
    'clientId': ['A', 'B', 'C', 'A', 'D', 'C', 'A', 'B', 'C'],
    'partner': ['A', np.nan, 'C', 'A', np.nan, 'C', 'A', np.nan, 'C'],
    'product': ['afiliates', 'pre-paid', 'giftcard', 'afiliates', 'pre-paid', 'giftcard', 'afiliates', 'pre-paid', 'giftcard'],
    'brand': ['brand_4', 'brand_2', 'brand_3', 'brand_1', 'brand_2', 'brand_3', 'brand_1', 'brand_3', 'brand_3'],
    'gmv': [100, 100, 100, 100, 100, 100, 100, 100, 100]
})

data_pandas = data_pandas.astype({
    "partner": "category",
    "status": "category",
    "product": "category",
    "brand": "category"
})

# 使用 Pandas 计算分组模式
mode_pandas = data_pandas.groupby(["clientId", "product"], observed=True).agg({"brand": pd.Series.mode})
print("Pandas Groupby Mode Result:")
print(mode_pandas)
登录后复制

Pandas 的 Series.mode 能够返回一个 Series,其中包含所有频率最高的值(如果存在多个众数)。

自定义 Dask 聚合函数:dask.dataframe.Aggregation

Dask 提供了一个 dask.dataframe.Aggregation 类,允许用户定义自定义的分布式聚合操作。这个类需要三个核心函数:chunk、agg 和 finalize,它们分别对应分布式计算的不同阶段。

  1. chunk 函数:局部计数chunk 函数在 Dask 的每个分区(chunk)上独立运行。它的目标是为每个分组键计算目标列中每个值的频率。对于众数计算,这意味着在每个分区内,我们需要统计每个 brand 值出现的次数。pd.Series.value_counts() 是实现这一目标的理想工具。

    def chunk(s):
        """
        在每个 Dask 分区上执行,计算每个值的频率。
        输入是一个 Pandas Series。
        """
        return s.value_counts()
    登录后复制
  2. agg 函数:合并中间结果agg 函数负责合并 chunk 函数在不同分区上产生的中间结果。由于 chunk 函数返回的是每个值及其计数的 Series,agg 函数需要将这些 Series 合并,并对相同的值的计数进行求和,从而得到全局的频率统计。

    def agg(s0):
        """
        合并来自不同分区的中间结果(频率计数)。
        输入是一个 Pandas Series,其索引包含分组键和值,值是计数。
        """
        # _selected_obj 是 Dask 内部结构,代表了聚合的 Series。
        # groupby(level=s0._selected_obj.index.names) 确保按原始分组键和值进行求和。
        _intermediate = s0._selected_obj.groupby(level=s0._selected_obj.index.names).sum()
        # 过滤掉计数为0或负数的情况
        _intermediate = _intermediate[_intermediate > 0]
        return _intermediate
    登录后复制
  3. finalize 函数:确定最终模式finalize 函数在所有 agg 操作完成后运行,它接收合并后的全局频率计数,并从中确定最终的众数。这个函数需要能够处理可能存在多个众数的情况,即返回所有频率最高的值。

    聚好用AI
    聚好用AI

    可免费AI绘图、AI音乐、AI视频创作,聚集全球顶级AI,一站式创意平台

    聚好用AI 115
    查看详情 聚好用AI
    def finalize(s):
        """
        从合并后的频率计数中确定最终的众数。
        输入是一个 Pandas Series,其索引包含分组键和值,值是合并后的计数。
        """
        # 获取原始分组键的层级(不包括聚合列的值本身)
        level = list(range(s.index.nlevels - 1))
        # 对每个分组,找出频率最高的项
        # s.groupby(level=level) 按原始分组键重新分组
        # apply(lambda x: x[x == x.max()]) 找出每个组内频率等于最大频率的所有项
        return s.groupby(level=level, group_keys=False).apply(lambda x: x[x == x.max()])
    登录后复制

在 Dask DataFrame 中应用自定义模式聚合

定义好 chunk、agg 和 finalize 函数后,我们可以将它们封装到 dask.dataframe.Aggregation 对象中,然后将其传递给 Dask DataFrame 的 groupby().aggregate() 方法。

import dask.dataframe as dd
from dask.dataframe import Aggregation

# 将 Pandas DataFrame 转换为 Dask DataFrame
df_dask = dd.from_pandas(data_pandas, npartitions=1) # npartitions=1 简化示例,实际应用中可根据数据大小调整

# 定义自定义的 Dask 模式聚合
mode_dask_agg = Aggregation(
    name="mode", # 聚合的名称
    chunk=chunk,
    agg=agg,
    finalize=finalize,
)

# 应用自定义聚合
mode_dask_result = df_dask.groupby(["clientId", "product"], observed=True, dropna=True).aggregate(
    {"brand": mode_dask_agg}
).compute() # .compute() 触发计算并返回 Pandas DataFrame

print("\nDask Groupby Mode Result:")
print(mode_dask_result)
登录后复制

完整示例代码

以下是包含所有步骤的完整示例代码:

from pandas import DataFrame, Series, NA
import pandas as pd
from dask.dataframe import from_pandas, Aggregation
import dask.dataframe as dd
import numpy as np

# 1. 准备数据
data = DataFrame(
    {
        "status": [
            "pending", "pending", "pending", "canceled", "canceled", "canceled", "confirmed", "confirmed", "confirmed",
        ],
        "clientId": ["A", "B", "C", "A", "D", "C", "A", "B", "C"],
        "partner": ["A", NA, "C", "A", NA, "C", "A", NA, "C"],
        "product": [
            "afiliates", "pre-paid", "giftcard", "afiliates", "pre-paid", "giftcard", "afiliates", "pre-paid", "giftcard",
        ],
        "brand": [
            "brand_4", "brand_2", "brand_3", "brand_1", "brand_2", "brand_3", "brand_1", "brand_3", "brand_3",
        ],
        "gmv": [100, 100, 100, 100, 100, 100, 100, 100, 100],
    }
)

data = data.astype(
    {
        "partner": "category",
        "status": "category",
        "product": "category",
        "brand": "category",
    }
)

# 2. Pandas 模式聚合(作为对比)
mode_pandas = data.groupby(["clientId", "product"], observed=True).agg(
    {"brand": Series.mode}
)
print("Pandas Groupby Mode Result:")
print(mode_pandas)

# 3. 转换为 Dask DataFrame
df_dask = from_pandas(data, npartitions=1)

# 4. 定义 Dask 自定义聚合函数的三个阶段
def chunk(s):
    """在每个 Dask 分区上执行,计算每个值的频率。"""
    return s.value_counts()

def agg(s0):
    """合并来自不同分区的中间结果(频率计数)。"""
    _intermediate = s0._selected_obj.groupby(level=s0._selected_obj.index.names).sum()
    _intermediate = _intermediate[_intermediate > 0]
    return _intermediate

def finalize(s):
    """从合并后的频率计数中确定最终的众数。"""
    level = list(range(s.index.nlevels - 1))
    return s.groupby(level=level, group_keys=False).apply(lambda x: x[x == x.max()])

# 5. 创建 dask.dataframe.Aggregation 对象
mode_dask_agg = Aggregation(
    name="mode",
    chunk=chunk,
    agg=agg,
    finalize=finalize,
)

# 6. 在 Dask DataFrame 上应用自定义聚合
mode_dask_result = df_dask.groupby(["clientId", "product"], observed=True, dropna=True).aggregate(
    {"brand": mode_dask_agg}
).compute()

print("\nDask Groupby Mode Result:")
print(mode_dask_result)
登录后复制

注意事项与 Dask/Pandas 差异

尽管上述自定义聚合旨在模拟 Pandas Series.mode 的行为,但在某些特定情况下,Dask 的结果可能与 Pandas 略有不同。这通常发生在以下情况:

  • 多个众数(Multi-mode): 当一个分组中存在多个值具有相同的最高频率时,Pandas 的 Series.mode 会返回一个包含所有这些众数的 Series。我们自定义的 finalize 函数也尝试处理这种情况,返回所有具有最大频率的值。
  • 数据类型和 NaN 处理: Dask 和 Pandas 在处理分类数据或 NaN 值时可能存在细微差异。dropna=True 参数在 Dask 的 groupby 中可以控制是否在分组前删除 NaN 值。在自定义聚合函数中,也需要确保对 NaN 的处理逻辑符合预期。
  • 性能考量: 自定义聚合虽然功能强大,但其性能可能不如 Dask 内置的、高度优化的聚合函数。对于非常大的数据集,应评估其性能影响。

在实际应用中,建议对比 Dask 和 Pandas 在小规模数据集上的结果,以验证自定义聚合的正确性,并理解任何潜在的差异。

总结

通过 dask.dataframe.Aggregation 类,我们成功地为 Dask DataFrame 的 groupby 操作实现了自定义的模式聚合功能。这种方法不仅解决了 Dask 不直接支持 Series.mode 的问题,也展示了 Dask 框架在处理复杂分布式聚合任务时的灵活性和可扩展性。理解 chunk、agg 和 finalize 三个阶段的工作原理是构建高效、正确自定义聚合的关键。

以上就是Dask DataFrame groupby 模式(Mode)聚合的实现指南的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号