新人提问:快速 merge 大量 csv 文件

2019-09-02 16:30:58 +08:00
 allenloong
最近在折腾把 1000+csv 文件进行合并,每个 csv 大约有 600 万条,4 列,合并完后最多有 2000 万条( 1000+列)。单个 csv,其中一列为 string,其余三列是 int。

example_csv:

col1| col2 |col3| col4
---------------------------
str1 10001 16000 1
str1 17000 17005 0
str2 13333 23333 1

合并以前三列为 index,已经尝试 pandas merge 和 join,但是速度慢,128G 的内存也不够用。尝试用了 pyspark,能够很快的 join 完 1000 个文件,内存也不会很夸张,但是没有办法把合并好的 dataframe 写出。

想请教下,有没有什么的有效率的办法解决这个问题,谢谢。
3672 次点击
所在节点    Python
38 条回复
lsvih
2019-09-02 16:36:27 +08:00
为啥不能把合并好的 df 写出呢
allenloong
2019-09-02 16:37:40 +08:00
@lsvih #1 会直接报 stack overflow
liprais
2019-09-02 16:38:54 +08:00
spark 写的时候报啥错
allenloong
2019-09-02 16:41:37 +08:00
@liprais #3 stack overflow, 是在一个 fat node 上跑的,增加了 Executor memory 也不行
letking
2019-09-02 16:45:58 +08:00
用 saveAsTextFile 保存到 hdfs 文件夹(yarn 集群)或者本地文件夹(local 模式),然后直接 cat 把文件夹下所有文件写入一个文件就行了(要去除表头行)。
或者用 toLocalIterator 方法,把数据都收集到 driver 上然后写入到一个文件里。
wilimm
2019-09-02 16:47:15 +08:00
cat file1 file2 ... > file_merge
autogen
2019-09-02 17:11:05 +08:00
cat origin/*.cvs > merged.cvs
MMMMMMMMMMMMMMMM
2019-09-02 17:15:37 +08:00
https://news.ycombinator.com/item?id=20848581

这里有一车工具,或许楼主可以试试?
allenloong
2019-09-02 17:21:54 +08:00
@letking #5 saveAsTextFile 还是会报 stack overflow
momocraft
2019-09-02 17:26:44 +08:00
这格式是 tsv .. ?

可能某处的代码试图在内存中生成整个文件了

从例子看不出你的 merge 有多复杂,最坏情况总可以导入关系数据库,然后从 query 流成 tsv/csv
allenloong
2019-09-02 17:31:06 +08:00
@momocraft #10 是的,就是 tsv,合并的时候没问题,一往外写就出问题了。(第一次用 pyspark,可能真的是自己的问题。


BName = str(os.path.basename(bg_f[0]).split('.')[0])
schema = StructType([
StructField('CataID', StringType(), True),
StructField('Start_Block', IntegerType(), True),
StructField('End_Block', IntegerType(), True),
StructField(BName, IntegerType(), True)
])
temp = sqlContext.read.csv(bg_f[0], sep='\t', header=False, schema=schema)
for p in bg_f[1:]:
SName = str(os.path.basename(p).split('.')[0])
schema = StructType([
StructField('CataID', StringType(), True),
StructField('Start_Block', IntegerType(), True),
StructField('End_Block', IntegerType(), True),
StructField(BName, IntegerType(), True)
])
cur = sqlContext.read.csv(p, sep='\t', header=False, schema=schema)
temp = temp.join(cur,
on=['CataID', 'Start_Block', 'End_Block'],
how='outer')
temp = temp.drop('CataID', 'Start_Block', 'End_Block')
letking
2019-09-02 17:40:40 +08:00
@allenloong spark 惰性运行的,合并操作可能没有立即执行而是在写结果的时候才执行。
你这问题可能是合并时内存就不够。
allenloong
2019-09-02 17:45:14 +08:00
@letking #12 那有什么好的方法推荐吗?
liprais
2019-09-02 17:47:09 +08:00
temp = temp.join(cur,
on=['CataID', 'Start_Block', 'End_Block'],
how='outer')
temp = temp.drop('CataID', 'Start_Block', 'End_Block')
这一段好像达不到你想要的效果
optional
2019-09-02 17:47:36 +08:00
导入到数据库,然后再导出?
ytmsdy
2019-09-02 17:48:23 +08:00
都这么大数据量了,写个脚本搞到数据库里面,然后再从数据库里面导出来。
xypty
2019-09-02 17:49:59 +08:00
行名称一致的话,导入数据库,然后导出,找个 csv 导入导出最快的数据库就行了
allenloong
2019-09-02 17:52:13 +08:00
@xypty #17 每个文件的行数不一样,也不一致,列名只有前三列是一样的。
allenloong
2019-09-02 17:53:17 +08:00
@liprais #14 我是想合并的时候用前三列做参考,输出的时候再扔掉前三列。
allenloong
2019-09-02 17:54:06 +08:00
@optional #15 每一个文件应该都要单独导入...列名不一样

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/597244

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX