
本教程详细阐述了如何在 dask dataframe 中对分组数据执行模式(mode)聚合。由于 dask 不直接提供 `groupby.agg` 的模式函数,文章通过自定义 `dask.dataframe.aggregation` 类,实现 `chunk`、`agg` 和 `finalize` 阶段的逻辑,从而有效地在分布式环境中计算分组模式,并提供完整的示例代码和注意事项。
在数据分析中,查找一组数据的众数(mode)是一项常见操作。Pandas DataFrame 提供了 Series.mode() 方法,并且可以方便地与 groupby().agg() 结合使用,以计算每个分组的众数。然而,在处理大规模数据集时,Dask DataFrame 成为一个强大的分布式计算工具。尽管 Dask 提供了丰富的聚合功能,但其内置的 groupby().aggregate() 方法并不直接支持像 Pandas Series.mode 这样的聚合操作。这意味着,如果我们需要在 Dask DataFrame 中进行分组众数计算,就需要自定义聚合逻辑。
在深入 Dask 的自定义聚合之前,我们首先回顾一下在 Pandas 中如何轻松实现这一功能。这有助于理解我们希望在 Dask 中复制的行为。
import pandas as pd
import numpy as np
# 示例数据
data_pandas = pd.DataFrame({
'status': ['pending', 'pending', 'pending', 'canceled', 'canceled', 'canceled', 'confirmed', 'confirmed', 'confirmed'],
'clientId': ['A', 'B', 'C', 'A', 'D', 'C', 'A', 'B', 'C'],
'partner': ['A', np.nan, 'C', 'A', np.nan, 'C', 'A', np.nan, 'C'],
'product': ['afiliates', 'pre-paid', 'giftcard', 'afiliates', 'pre-paid', 'giftcard', 'afiliates', 'pre-paid', 'giftcard'],
'brand': ['brand_4', 'brand_2', 'brand_3', 'brand_1', 'brand_2', 'brand_3', 'brand_1', 'brand_3', 'brand_3'],
'gmv': [100, 100, 100, 100, 100, 100, 100, 100, 100]
})
data_pandas = data_pandas.astype({
"partner": "category",
"status": "category",
"product": "category",
"brand": "category"
})
# 使用 Pandas 计算分组模式
mode_pandas = data_pandas.groupby(["clientId", "product"], observed=True).agg({"brand": pd.Series.mode})
print("Pandas Groupby Mode Result:")
print(mode_pandas)Pandas 的 Series.mode 能够返回一个 Series,其中包含所有频率最高的值(如果存在多个众数)。
Dask 提供了一个 dask.dataframe.Aggregation 类,允许用户定义自定义的分布式聚合操作。这个类需要三个核心函数:chunk、agg 和 finalize,它们分别对应分布式计算的不同阶段。
chunk 函数:局部计数chunk 函数在 Dask 的每个分区(chunk)上独立运行。它的目标是为每个分组键计算目标列中每个值的频率。对于众数计算,这意味着在每个分区内,我们需要统计每个 brand 值出现的次数。pd.Series.value_counts() 是实现这一目标的理想工具。
def chunk(s):
"""
在每个 Dask 分区上执行,计算每个值的频率。
输入是一个 Pandas Series。
"""
return s.value_counts()agg 函数:合并中间结果agg 函数负责合并 chunk 函数在不同分区上产生的中间结果。由于 chunk 函数返回的是每个值及其计数的 Series,agg 函数需要将这些 Series 合并,并对相同的值的计数进行求和,从而得到全局的频率统计。
def agg(s0):
"""
合并来自不同分区的中间结果(频率计数)。
输入是一个 Pandas Series,其索引包含分组键和值,值是计数。
"""
# _selected_obj 是 Dask 内部结构,代表了聚合的 Series。
# groupby(level=s0._selected_obj.index.names) 确保按原始分组键和值进行求和。
_intermediate = s0._selected_obj.groupby(level=s0._selected_obj.index.names).sum()
# 过滤掉计数为0或负数的情况
_intermediate = _intermediate[_intermediate > 0]
return _intermediatefinalize 函数:确定最终模式finalize 函数在所有 agg 操作完成后运行,它接收合并后的全局频率计数,并从中确定最终的众数。这个函数需要能够处理可能存在多个众数的情况,即返回所有频率最高的值。
def finalize(s):
"""
从合并后的频率计数中确定最终的众数。
输入是一个 Pandas Series,其索引包含分组键和值,值是合并后的计数。
"""
# 获取原始分组键的层级(不包括聚合列的值本身)
level = list(range(s.index.nlevels - 1))
# 对每个分组,找出频率最高的项
# s.groupby(level=level) 按原始分组键重新分组
# apply(lambda x: x[x == x.max()]) 找出每个组内频率等于最大频率的所有项
return s.groupby(level=level, group_keys=False).apply(lambda x: x[x == x.max()])定义好 chunk、agg 和 finalize 函数后,我们可以将它们封装到 dask.dataframe.Aggregation 对象中,然后将其传递给 Dask DataFrame 的 groupby().aggregate() 方法。
import dask.dataframe as dd
from dask.dataframe import Aggregation
# 将 Pandas DataFrame 转换为 Dask DataFrame
df_dask = dd.from_pandas(data_pandas, npartitions=1) # npartitions=1 简化示例,实际应用中可根据数据大小调整
# 定义自定义的 Dask 模式聚合
mode_dask_agg = Aggregation(
name="mode", # 聚合的名称
chunk=chunk,
agg=agg,
finalize=finalize,
)
# 应用自定义聚合
mode_dask_result = df_dask.groupby(["clientId", "product"], observed=True, dropna=True).aggregate(
{"brand": mode_dask_agg}
).compute() # .compute() 触发计算并返回 Pandas DataFrame
print("\nDask Groupby Mode Result:")
print(mode_dask_result)以下是包含所有步骤的完整示例代码:
from pandas import DataFrame, Series, NA
import pandas as pd
from dask.dataframe import from_pandas, Aggregation
import dask.dataframe as dd
import numpy as np
# 1. 准备数据
data = DataFrame(
{
"status": [
"pending", "pending", "pending", "canceled", "canceled", "canceled", "confirmed", "confirmed", "confirmed",
],
"clientId": ["A", "B", "C", "A", "D", "C", "A", "B", "C"],
"partner": ["A", NA, "C", "A", NA, "C", "A", NA, "C"],
"product": [
"afiliates", "pre-paid", "giftcard", "afiliates", "pre-paid", "giftcard", "afiliates", "pre-paid", "giftcard",
],
"brand": [
"brand_4", "brand_2", "brand_3", "brand_1", "brand_2", "brand_3", "brand_1", "brand_3", "brand_3",
],
"gmv": [100, 100, 100, 100, 100, 100, 100, 100, 100],
}
)
data = data.astype(
{
"partner": "category",
"status": "category",
"product": "category",
"brand": "category",
}
)
# 2. Pandas 模式聚合(作为对比)
mode_pandas = data.groupby(["clientId", "product"], observed=True).agg(
{"brand": Series.mode}
)
print("Pandas Groupby Mode Result:")
print(mode_pandas)
# 3. 转换为 Dask DataFrame
df_dask = from_pandas(data, npartitions=1)
# 4. 定义 Dask 自定义聚合函数的三个阶段
def chunk(s):
"""在每个 Dask 分区上执行,计算每个值的频率。"""
return s.value_counts()
def agg(s0):
"""合并来自不同分区的中间结果(频率计数)。"""
_intermediate = s0._selected_obj.groupby(level=s0._selected_obj.index.names).sum()
_intermediate = _intermediate[_intermediate > 0]
return _intermediate
def finalize(s):
"""从合并后的频率计数中确定最终的众数。"""
level = list(range(s.index.nlevels - 1))
return s.groupby(level=level, group_keys=False).apply(lambda x: x[x == x.max()])
# 5. 创建 dask.dataframe.Aggregation 对象
mode_dask_agg = Aggregation(
name="mode",
chunk=chunk,
agg=agg,
finalize=finalize,
)
# 6. 在 Dask DataFrame 上应用自定义聚合
mode_dask_result = df_dask.groupby(["clientId", "product"], observed=True, dropna=True).aggregate(
{"brand": mode_dask_agg}
).compute()
print("\nDask Groupby Mode Result:")
print(mode_dask_result)尽管上述自定义聚合旨在模拟 Pandas Series.mode 的行为,但在某些特定情况下,Dask 的结果可能与 Pandas 略有不同。这通常发生在以下情况:
在实际应用中,建议对比 Dask 和 Pandas 在小规模数据集上的结果,以验证自定义聚合的正确性,并理解任何潜在的差异。
通过 dask.dataframe.Aggregation 类,我们成功地为 Dask DataFrame 的 groupby 操作实现了自定义的模式聚合功能。这种方法不仅解决了 Dask 不直接支持 Series.mode 的问题,也展示了 Dask 框架在处理复杂分布式聚合任务时的灵活性和可扩展性。理解 chunk、agg 和 finalize 三个阶段的工作原理是构建高效、正确自定义聚合的关键。
以上就是Dask DataFrame groupby 模式(Mode)聚合的实现指南的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号