
pyspark提供了强大的xpath函数,允许用户利用xpath表达式从结构化的xml字符串中解析和提取数据。这对于处理包含xml格式数据的半结构化数据集至关重要。xpath函数通常以xpath(xml_string_column, xpath_expression)的形式使用,它返回一个包含匹配结果的数组。
许多用户在使用xpath函数尝试提取XML节点的文本内容时,可能会发现结果是一个包含null值的数组,而不是期望的文本数据。例如,当尝试提取<Name>John Doe</Name>中的"John Doe"时,如果XPath表达式写为/Root/Customers/Customer/Name,PySpark的xpath函数会返回[null, null, null]等结果。
示例XML数据:
假设我们有一个包含以下XML字符串的DataFrame列:
<?xml version="1.0" encoding="utf-8"?>
<Root>
<Customers>
<Customer CustomerID="1">
<Name>John Doe</Name>
<Address>
<Street>123 Main St</Street>
<City>Anytown</City>
<State>CA</State>
<Zip>12345</Zip>
</Address>
<PhoneNo>123-456-7890</PhoneNo>
</Customer>
<Customer CustomerID="2">
<Name>Jane Smith</Name>
<Address>
<Street>456 Oak St</Street>
<City>Somecity</City>
<State>NY</State>
<Zip>67890</Zip>
</Address>
<PhoneNo>987-654-3210</PhoneNo>
</Customer>
</Customers>
<Orders>
<!-- ...其他订单数据... -->
</Orders>
</Root>错误的代码示例:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("XML_Extraction").getOrCreate()
# 模拟从CSV读取数据,并进行初步清理
# 假设df_Customers_Orders包含一列名为"Data"的字符串,其中是上述XML
data_row = [("""<?xml version="1.0" encoding="utf-8"?>
<Root>
<Customers>
<Customer CustomerID="1">
<Name>John Doe</Name>
<Address>
<Street>123 Main St</Street>
<City>Anytown</City>
<State>CA</State>
<Zip>12345</Zip>
</Address>
<PhoneNo>123-456-7890</PhoneNo>
</Customer>
<Customer CustomerID="2">
<Name>Jane Smith</Name>
<Address>
<Street>456 Oak St</Street>
<City>Somecity</City>
<State>NY</State>
<Zip>67890</Zip>
</Address>
<PhoneNo>987-654-3210</PhoneNo>
</Customer>
<Customer CustomerID="3">
<Name>Bob Johnson</Name>
<Address>
<Street>789 Pine St</Street>
<City>Othercity</City>
<State>TX</State>
<Zip>11223</Zip>
</Address>
<PhoneNo>456-789-0123</PhoneNo>
</Customer>
</Customers>
<Orders>
<Order>
<CustomerID>1</CustomerID>
<EmpID>100</empID>
<OrderDate>2022-01-01</OrderDate>
<Cost>100.50</cost>
</Order>
<Order>
<CustomerID>2</CustomerID>
<EmpID>101</empID>
<OrderDate>2022-01-02</OrderDate>
<Cost>200.75</cost>
</Order>
</Orders>
</Root>"""),]
df_Customers_Orders = spark.createDataFrame(data_row, ["Data"])
# 如果XML字符串被双引号包裹,需要进行清理
# df_Customers_Orders = df_Customers_Orders.withColumn("Data", expr("substring(Data, 2, length(Data)-2)"))
# df_Customers_Orders = df_Customers_Orders.withColumn("Data", regexp_replace("Data", '""', '"'))
df_sample_CustomersOrders_incorrect = df_Customers_Orders.selectExpr(
"xpath(Data,'/Root/Customers/Customer/@CustomerID') as CustomerID",
"xpath(Data,'/Root/Customers/Customer/Name') as ContactName",
"xpath(Data,'/Root/Customers/Customer/PhoneNo') as PhoneNo",
)
df_sample_CustomersOrders_incorrect.show(truncate=False)输出结果:
+----------+------------------------+------------------------+ |CustomerID|ContactName |PhoneNo | +----------+------------------------+------------------------+ |[1, 2, 3] |[null, null, null, null]|[null, null, null, null]| +----------+------------------------+------------------------+
可以看到,ContactName和PhoneNo列返回了null值的数组。
PySpark的xpath函数遵循标准的XPath规范。在XPath中,直接指定节点路径(如/Root/Customers/Customer/Name)通常是选择节点本身,而不是其内部的文本内容。要明确提取节点的文本内容,需要追加text()函数。
这解释了为什么CustomerID(通过@CustomerID提取属性)能够正确获取值,而ContactName和PhoneNo(直接指向节点)却返回空值。
正确的代码示例:
修改上述代码,为需要提取文本内容的XPath表达式添加text()。
df_sample_CustomersOrders_correct = df_Customers_Orders.selectExpr(
"xpath(Data,'/Root/Customers/Customer/@CustomerID') as CustomerID",
"xpath(Data,'/Root/Customers/Customer/Name/text()') as ContactName",
"xpath(Data,'/Root/Customers/Customer/PhoneNo/text()') as PhoneNo",
)
df_sample_CustomersOrders_correct.show(truncate=False)输出结果:
+----------+----------------------------+----------------------------+ |CustomerID|ContactName |PhoneNo | +----------+----------------------------+----------------------------+ |[1, 2, 3] |[John Doe, Jane Smith, Bob J.]|[123-456-7890, 987-654-3210, 456-789-0123]| +----------+----------------------------+----------------------------+
现在,ContactName和PhoneNo列都正确地提取出了相应的文本内容。
在实际应用中,您可能需要将提取出的数组展开成多行,或者进一步处理这些数据。
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StringType
spark = SparkSession.builder.appName("XML_Extraction_Tutorial").getOrCreate()
# 模拟包含XML字符串的CSV文件
# 通常,如果CSV文件中XML字符串被双引号包裹,或有转义字符,需要预处理
# 这里直接创建DataFrame以简化示例,但在实际中,read.csv后可能需要以下清理步骤:
# df_Customers_Orders = spark.read.option("header", "true").csv("source.csv")
# df_Customers_Orders = df_Customers_Orders.withColumn("Data", expr("substring(Data, 2, length(Data)-2)"))
# df_Customers_Orders = df_Customers_Orders.withColumn("Data", regexp_replace("Data", '""', '"'))
xml_string = """<?xml version="1.0" encoding="utf-8"?>
<Root>
<Customers>
<Customer CustomerID="1">
<Name>John Doe</Name>
<Address>
<Street>123 Main St</Street>
<City>Anytown</City>
<State>CA</State>
<Zip>12345</Zip>
</Address>
<PhoneNo>123-456-7890</PhoneNo>
</Customer>
<Customer CustomerID="2">
<Name>Jane Smith</Name>
<Address>
<Street>456 Oak St</Street>
<City>Somecity</City>
<State>NY</State>
<Zip>67890</Zip>
</Address>
<PhoneNo>987-654-3210</PhoneNo>
</Customer>
<Customer CustomerID="3">
<Name>Bob Johnson</Name>
<Address>
<Street>789 Pine St</Street>
<City>Othercity</City>
<State>TX</State>
<Zip>11223</Zip>
</Address>
<PhoneNo>456-789-0123</PhoneNo>
</Customer>
</Customers>
<Orders>
<Order>
<CustomerID>1</CustomerID>
<EmpID>100</empID>
<OrderDate>2022-01-01</OrderDate>
<Cost>100.50</cost>
</Order>
<Order>
<CustomerID>2</CustomerID>
<EmpID>101</empID>
<OrderDate>2022-01-02</OrderDate>
<Cost>200.75</cost>
</Order>
</Orders>
</Root>"""
df_xml_data = spark.createDataFrame([(xml_string,)], ["Data"])
df_xml_data.show(truncate=False)
# 使用xpath函数提取数据
df_extracted_customers = df_xml_data.selectExpr(
"xpath(Data,'/Root/Customers/Customer/@CustomerID') as CustomerID_Array",
"xpath(Data,'/Root/Customers/Customer/Name/text()') as ContactName_Array",
"xpath(Data,'/Root/Customers/Customer/PhoneNo/text()') as PhoneNo_Array",
)
df_extracted_customers.show(truncate=False)
# 将数组列展开成多行,以便于后续处理
# 这里假设所有数组的长度相同,或者您只关心匹配到的第一个元素
df_flattened_customers = df_extracted_customers.select(
explode("CustomerID_Array").alias("CustomerID"),
explode("ContactName_Array").alias("ContactName"),
explode("PhoneNo_Array").alias("PhoneNo")
)
df_flattened_customers.show(truncate=False)
# 写入CSV文件
# df_flattened_customers.write.format("csv").option("header", "true").mode("overwrite").save("path_to_output.csv")
spark.stop()注意事项:
在PySpark中使用xpath函数从XML字符串中提取数据是一个常见的操作。理解XPath表达式中节点文本内容(text())与属性值(@attributeName)的提取差异是避免空值数组的关键。通过本文提供的指南和代码示例,您可以更高效、准确地处理XML数据,从而避免常见的陷阱,确保数据提取的正确性。
以上就是PySpark中使用XPath从XML字符串提取数据的正确指南的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号