
在数据处理和分析中,移除重复记录是数据清洗的关键步骤之一,尤其是在处理大规模数据集时。pyspark作为大数据处理的强大框架,提供了高效的机制来识别和消除dataframe中的重复行。然而,由于pyspark生态系统的发展,目前存在两种主要的dataframe类型,它们各自拥有不同的去重api:原生的pyspark.sql.dataframe和基于pandas api的pyspark.pandas.dataframe。理解这两种类型的差异及其对应的去重方法,对于编写健壮且高效的pyspark代码至关重要。
pyspark.sql.DataFrame是PySpark的核心数据结构,它提供了类似于关系型数据库表的操作接口。对于这种类型的DataFrame,去重操作通过dropDuplicates()方法实现。
dropDuplicates()函数可以接受一个可选的列名列表作为参数,用于指定在哪些列上进行重复检查。如果不指定任何列,则默认会检查所有列。
DataFrame.dropDuplicates(subset=None)
假设我们有一个包含客户ID的PySpark SQL DataFrame,我们希望移除重复的客户ID。
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# 初始化SparkSession
spark = SparkSession.builder.appName("DropDuplicatesSQL").getOrCreate()
# 创建一个示例PySpark SQL DataFrame
data = [("C001", "Alice"), ("C002", "Bob"), ("C001", "Alice"), ("C003", "Charlie"), ("C002", "Bob")]
columns = ["CUSTOMER_ID", "NAME"]
df_sql = spark.createDataFrame(data, columns)
print("原始 PySpark SQL DataFrame:")
df_sql.show()
# 1. 对所有列进行去重
df_distinct_all = df_sql.dropDuplicates()
print("所有列去重后的 DataFrame:")
df_distinct_all.show()
# 2. 仅根据 'CUSTOMER_ID' 列进行去重
# 注意:当仅根据子集去重时,对于重复的子集行,Spark会保留其中任意一行,其非子集列的值可能不确定。
# 在此示例中,由于(C001, Alice)是完全重复的,所以行为一致。
# 但如果数据是 (C001, Alice) 和 (C001, David),则去重后会保留其中一个。
df_distinct_id = df_sql.dropDuplicates(subset=["CUSTOMER_ID"])
print("根据 'CUSTOMER_ID' 列去重后的 DataFrame:")
df_distinct_id.show()
# 停止SparkSession
spark.stop()输出示例:
原始 PySpark SQL DataFrame: +-----------+-------+ |CUSTOMER_ID| NAME| +-----------+-------+ | C001| Alice| | C002| Bob| | C001| Alice| | C003|Charlie| | C002| Bob| +-----------+-------+ 所有列去重后的 DataFrame: +-----------+-------+ |CUSTOMER_ID| NAME| +-----------+-------+ | C001| Alice| | C002| Bob| | C003|Charlie| +-----------+-------+ 根据 'CUSTOMER_ID' 列去重后的 DataFrame: +-----------+-------+ |CUSTOMER_ID| NAME| +-----------+-------+ | C001| Alice| | C002| Bob| | C003|Charlie| +-----------+-------+
PySpark Pandas API(pyspark.pandas)旨在为熟悉Pandas库的用户提供一个在Spark上运行的相似接口。对于通过pyspark.pandas创建或转换而来的DataFrame,其去重方法与Pandas中的drop_duplicates()保持一致。
drop_duplicates()函数提供了更丰富的参数,以控制去重行为,例如保留哪个重复项(第一个、最后一个或不保留)。
DataFrame.drop_duplicates(subset=None, keep='first', inplace=False, ignore_index=False)
import pyspark.pandas as ps
from pyspark.sql import SparkSession
# 初始化SparkSession (pyspark.pandas 会自动使用现有的SparkSession)
spark = SparkSession.builder.appName("DropDuplicatesPandas").getOrCreate()
# 创建一个示例PySpark Pandas DataFrame
data = {"CUSTOMER_ID": ["C001", "C002", "C001", "C003", "C002"],
"NAME": ["Alice", "Bob", "Alice", "Charlie", "Bob"]}
psdf = ps.DataFrame(data)
print("原始 PySpark Pandas DataFrame:")
print(psdf)
# 1. 对所有列进行去重 (默认 keep='first')
psdf_distinct_all = psdf.drop_duplicates()
print("所有列去重后的 DataFrame:")
print(psdf_distinct_all)
# 2. 仅根据 'CUSTOMER_ID' 列进行去重,保留第一个
psdf_distinct_id_first = psdf.drop_duplicates(subset=["CUSTOMER_ID"], keep='first')
print("根据 'CUSTOMER_ID' 列去重 (保留第一个) 后的 DataFrame:")
print(psdf_distinct_id_first)
# 3. 仅根据 'CUSTOMER_ID' 列进行去重,保留最后一个
psdf_distinct_id_last = psdf.drop_duplicates(subset=["CUSTOMER_ID"], keep='last')
print("根据 'CUSTOMER_ID' 列去重 (保留最后一个) 后的 DataFrame:")
print(psdf_distinct_id_last)
# 4. 仅根据 'CUSTOMER_ID' 列进行去重,删除所有重复项
psdf_distinct_id_false = psdf.drop_duplicates(subset=["CUSTOMER_ID"], keep=False)
print("根据 'CUSTOMER_ID' 列去重 (删除所有重复项) 后的 DataFrame:")
print(psdf_distinct_id_false)
# 停止SparkSession (如果需要,但通常在脚本结束时自动停止)
spark.stop()输出示例:
原始 PySpark Pandas DataFrame: CUSTOMER_ID NAME 0 C001 Alice 1 C002 Bob 2 C001 Alice 3 C003 Charlie 4 C002 Bob 所有列去重后的 DataFrame: CUSTOMER_ID NAME 0 C001 Alice 1 C002 Bob 3 C003 Charlie 根据 'CUSTOMER_ID' 列去重 (保留第一个) 后的 DataFrame: CUSTOMER_ID NAME 0 C001 Alice 1 C002 Bob 3 C003 Charlie 根据 'CUSTOMER_ID' 列去重 (保留最后一个) 后的 DataFrame: CUSTOMER_ID NAME 2 C001 Alice 4 C002 Bob 3 C003 Charlie 根据 'CUSTOMER_ID' 列去重 (删除所有重复项) 后的 DataFrame: CUSTOMER_ID NAME 3 C003 Charlie
选择dropDuplicates()还是drop_duplicates()的核心在于你正在操作的DataFrame类型。
DataFrame类型识别:
你可以通过type(df)或df.__class__.__name__来检查DataFrame的类型。
API一致性:
功能差异:
性能考量: 两种方法在底层都会触发Spark的distinct或groupBy操作,这通常涉及到数据的shuffle(混洗),对于大规模数据集而言,shuffle是计算密集型操作。因此,无论使用哪种方法,都应注意其对性能的影响。
PySpark提供了两种强大且高效的方法来处理DataFrame中的重复数据:pyspark.sql.DataFrame的dropDuplicates()和pyspark.pandas.DataFrame的drop_duplicates()。理解它们各自的适用场景和功能特性是编写高效PySpark代码的关键。在实践中,务必根据你当前操作的DataFrame类型来选择正确的去重函数。当需要更精细地控制重复项的保留策略时,pyspark.pandas.DataFrame.drop_duplicates()的keep参数提供了更大的灵活性。始终牢记,去重操作可能涉及数据混洗,因此在处理超大规模数据集时,应评估其性能影响。
以上就是PySpark中高效移除重复数据的两种策略的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号