
在数据处理领域,合并大型文件是常见的需求。例如,您可能需要每周将一个新增的CSV文件与一个已有的、高达50GB的CSV归档文件进行合并。传统的做法是尝试将文件全部加载到内存中进行处理,但这对于数十GB甚至更大的文件来说,会导致严重的内存溢出问题,甚至使程序崩溃。
为了解决这一挑战,我们可以借鉴归并排序算法中的“合并”步骤,采用一种流式处理的方法。这种方法的核心思想是逐行读取两个已排序的输入文件,比较当前行并按序写入到输出文件,而不是一次性加载所有数据。Go语言凭借其强大的并发原语和高效的I/O操作,非常适合实现这种流式合并方案。
流式合并的效率源于其对内存的极低占用。它不需要将整个文件读入内存,而是每次只在内存中保留两个输入文件各一行的数据,以及一个用于写入的缓冲区。其工作流程如下:
这种方法确保了输出文件也是排序的,并且整个过程是高度内存效率的。
立即学习“go语言免费学习笔记(深入)”;
以下是使用Go语言实现流式合并的详细代码及解释。
main函数是程序的入口点,负责处理命令行参数、文件操作和协调合并流程。
package main
import (
"encoding/csv"
"io"
"log"
"os"
"fmt" // 用于示例中的日志输出
)
const outFile = "merged_output.csv" // 定义输出文件名
func main() {
// 确保程序接收到两个输入文件路径作为命令行参数
if len(os.Args) != 3 {
log.Fatalf("\nUsage: %s <file1.csv> <file2.csv>\nExample: %s archive.csv weekly_update.csv", os.Args[0], os.Args[0])
}
// 打开第一个输入文件
f1, err := os.Open(os.Args[1])
if err != nil {
log.Fatalf("\nError opening first file %s: %v", os.Args[1], err)
}
defer f1.Close() // 确保文件在函数结束时关闭
// 打开第二个输入文件
f2, err := os.Open(os.Args[2])
if err != nil {
log.Fatalf("\nError opening second file %s: %v", os.Args[2], err)
}
defer f2.Close() // 确保文件在函数结束时关闭
// 创建输出文件
w, err := os.Create(outFile)
if err != nil {
log.Fatalf("\nError creating output file %s: %v", outFile, err)
}
defer w.Close() // 确保文件在函数结束时关闭
// 包装文件读取器为CSV读取器
cr1 := csv.NewReader(f1)
cr2 := csv.NewReader(f2)
// 包装输出文件写入器为CSV写入器
cw := csv.NewWriter(w)
defer cw.Flush() // 确保所有缓冲数据在程序退出前写入文件说明:
在进入主循环之前,我们需要从两个文件中各读取第一行数据。然后,在一个无限循环中,根据 compare 函数的结果,决定写入哪一行,并从对应的文件读取下一行。
// 初始化读取两行数据
line1, b1 := readline(cr1)
if !b1 {
// 如果第一个文件为空或无CSV行,直接复制第二个文件剩余内容
log.Printf("File 1 (%s) is empty or has no CSV lines. Copying remaining lines from File 2.", os.Args[1])
copyRemaining(cr2, cw)
return // 结束程序
}
line2, b2 := readline(cr2)
if !b2 {
// 如果第二个文件为空或无CSV行,直接复制第一个文件剩余内容
log.Printf("File 2 (%s) is empty or has no CSV lines. Copying remaining lines from File 1.", os.Args[2])
writeline(cw, line1) // 写入之前读取的line1
copyRemaining(cr1, cw)
return // 结束程序
}
// 核心合并逻辑
for {
// 比较两行数据,决定哪一行应该先写入
if compare(line1, line2) {
writeline(cw, line1)
line1, b1 = readline(cr1) // 读取下一个line1
if !b1 {
// 如果文件1已读完,将文件2的剩余内容全部复制
writeline(cw, line2) // 写入最后读取的line2
copyRemaining(cr2, cw)
break // 退出循环
}
} else {
writeline(cw, line2)
line2, b2 = readline(cr2) // 读取下一个line2
if !b2 {
// 如果文件2已读完,将文件1的剩余内容全部复制
writeline(cw, line1) // 写入最后读取的line1
copyRemaining(cr1, cw)
break // 退出循环
}
}
}
log.Printf("CSV files merged successfully to %s", outFile)
}说明:
以上就是Go语言高效合并大型排序CSV文件:流式处理教程的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号