
本教程详细介绍了如何在pyspark中对dataframe的所有列同时应用多个聚合函数(如`min`和`max`),并以行式结构(每行代表一个聚合结果)展示。通过结合使用`select`进行初步聚合、`cache`优化性能以及`unionbyname`进行结果重塑,实现了灵活且高效的数据分析,避免了直接`agg`函数无法满足特定输出格式的问题。
在PySpark进行数据分析时,一个常见的需求是对DataFrame中的所有或指定列应用多个聚合函数,例如同时计算每列的最小值和最大值。虽然DataFrame.agg()方法能够轻松实现多列多函数的聚合,但其默认输出是将所有聚合结果展平为单行,这往往无法满足将不同聚合类型(如最小值和最大值)作为独立行呈现的需求。
例如,直接使用df.agg(*exprs)的方式,其中exprs = [min(c).alias(c), max(c).alias(c) for c in df.columns],会生成一个包含所有列的最小值和最大值,但这些值都将并列在同一行中,而不是我们期望的“一行是所有列的最小值,另一行是所有列的最大值”的结构。
为了实现这种行式输出的聚合结果,我们需要一种更为精细的策略,结合PySpark的select、cache和unionByName等操作。
以下步骤将详细演示如何通过分阶段处理来达到目标输出格式:
让我们通过一个具体的PySpark代码示例来演示上述过程:
import operator
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
# 初始化Spark会话
spark = SparkSession.builder.appName("MultiFunctionAggregate").getOrCreate()
# 示例数据
_data = [
(4, 123, 18, 29),
(8, 5, 26, 187),
(2, 97, 18, 29),
]
_schema = ['col_1', 'col2', 'col3', 'col_4']
df = spark.createDataFrame(_data, _schema)
print("原始DataFrame:")
df.show()
# +-----+----+----+-----+
# |col_1|col2|col3|col_4|
# +-----+----+----+-----+
# | 4| 123| 18| 29|
# | 8| 5| 26| 187|
# | 2| 97| 18| 29|
# +-----+----+----+-----+
# 1. 初步聚合所有最小值和最大值
# 构建min聚合表达式列表,并为结果列添加'min_'前缀
min_vals = [F.min(c).alias(f'min_{c}') for c in df.columns]
# 构建max聚合表达式列表,并为结果列添加'max_'前缀
max_vals = [F.max(c).alias(f'max_{c}') for c in df.columns]
# 使用select执行所有聚合,结果是一个单行DataFrame
df_agg_raw = df.select(min_vals + max_vals)
print("初步聚合结果 (单行):")
df_agg_raw.show()
# +-------+-------+-------+--------+-------+-------+-------+--------+
# |min_col_1|min_col2|min_col3|min_col_4|max_col_1|max_col2|max_col3|max_col_4|
# +-------+-------+-------+--------+-------+-------+-------+--------+
# | 2| 5| 18| 29| 8| 123| 26| 187|
# +-------+-------+-------+--------+-------+-------+-------+--------+
# 2. 缓存中间结果
# 缓存df_agg_raw以提高后续操作的性能
df_agg_raw.cache()
# 3. 重塑结果为行式结构
# 为最小值行构建选择表达式:添加'agg_type'列,并将min_前缀的列重命名回原始列名
min_cols = operator.add(
[F.lit('min').alias('agg_type')], # 添加一个字面量列,标识聚合类型为'min'
[F.col(f'min_{c}').alias(c) for c in df.columns] # 选取带有'min_'前缀的列,并将其别名改回原始列名
)
# 为最大值行构建选择表达式,原理同上
max_cols = operator.add(
[F.lit('max').alias('agg_type')], # 添加一个字面量列,标识聚合类型为'max'
[F.col(f'max_{c}').alias(c) for c in df.columns] # 选取带有'max_'前缀的列,并将其别名改回原始列名
)
# 从缓存的df_agg_raw中选择并重命名列,创建最小值DataFrame
min_df = df_agg_raw.select(min_cols)
# 从缓存的df_agg_raw中选择并重命名列,创建最大值DataFrame
max_df = df_agg_raw.select(max_cols)
print("重塑后的最小值DataFrame:")
min_df.show()
# +--------+-----+----+----+-----+
# |agg_type|col_1|col2|col3|col_4|
# +--------+-----+----+----+-----+
# | min| 2| 5| 18| 29|
# +--------+-----+----+----+-----+
print("重塑后的最大值DataFrame:")
max_df.show()
# +--------+-----+----+----+-----+
# |agg_type|col_1|col2|col3|col_4|
# +--------+-----+----+----+-----+
# | max| 8| 123| 26| 187|
# +--------+-----+----+----+-----+
# 4. 合并结果
# 使用unionByName合并两个DataFrame,确保按列名匹配
result = min_df.unionByName(max_df)
print("最终结果DataFrame:")
result.show()
# +--------+-----+----+----+-----+
# |agg_type|col_1|col2|col3|col_4|
# +--------+-----+----+----+-----+
# | min| 2| 5| 18| 29|
# | max| 8| 123| 26| 187|
# +--------+-----+----+----+-----+
# 停止Spark会话
spark.stop()通过上述方法,我们能够灵活地控制PySpark聚合结果的输出格式,满足将不同聚合类型以行式结构呈现的特定分析需求,同时兼顾了性能优化。
以上就是PySpark DataFrame多列多函数聚合与结果重塑教程的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号