V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
allenloong
V2EX  ›  Python

新人提问:快速 merge 大量 csv 文件

  •  
  •   allenloong · 2019-09-02 16:30:58 +08:00 · 3150 次点击
    这是一个创建于 873 天前的主题,其中的信息可能已经有所发展或是发生改变。
    最近在折腾把 1000+csv 文件进行合并,每个 csv 大约有 600 万条,4 列,合并完后最多有 2000 万条( 1000+列)。单个 csv,其中一列为 string,其余三列是 int。

    example_csv:

    col1| col2 |col3| col4
    ---------------------------
    str1 10001 16000 1
    str1 17000 17005 0
    str2 13333 23333 1

    合并以前三列为 index,已经尝试 pandas merge 和 join,但是速度慢,128G 的内存也不够用。尝试用了 pyspark,能够很快的 join 完 1000 个文件,内存也不会很夸张,但是没有办法把合并好的 dataframe 写出。

    想请教下,有没有什么的有效率的办法解决这个问题,谢谢。
    38 条回复    2019-09-03 18:29:29 +08:00
    lsvih
        1
    lsvih  
       2019-09-02 16:36:27 +08:00
    为啥不能把合并好的 df 写出呢
    allenloong
        2
    allenloong  
    OP
       2019-09-02 16:37:40 +08:00
    @lsvih #1 会直接报 stack overflow
    liprais
        3
    liprais  
       2019-09-02 16:38:54 +08:00
    spark 写的时候报啥错
    allenloong
        4
    allenloong  
    OP
       2019-09-02 16:41:37 +08:00
    @liprais #3 stack overflow, 是在一个 fat node 上跑的,增加了 Executor memory 也不行
    letking
        5
    letking  
       2019-09-02 16:45:58 +08:00
    用 saveAsTextFile 保存到 hdfs 文件夹(yarn 集群)或者本地文件夹(local 模式),然后直接 cat 把文件夹下所有文件写入一个文件就行了(要去除表头行)。
    或者用 toLocalIterator 方法,把数据都收集到 driver 上然后写入到一个文件里。
    wilimm
        6
    wilimm  
       2019-09-02 16:47:15 +08:00
    cat file1 file2 ... > file_merge
    autogen
        7
    autogen  
       2019-09-02 17:11:05 +08:00
    cat origin/*.cvs > merged.cvs
    MMMMMMMMMMMMMMMM
        8
    MMMMMMMMMMMMMMMM  
       2019-09-02 17:15:37 +08:00
    https://news.ycombinator.com/item?id=20848581

    这里有一车工具,或许楼主可以试试?
    allenloong
        9
    allenloong  
    OP
       2019-09-02 17:21:54 +08:00
    @letking #5 saveAsTextFile 还是会报 stack overflow
    momocraft
        10
    momocraft  
       2019-09-02 17:26:44 +08:00
    这格式是 tsv .. ?

    可能某处的代码试图在内存中生成整个文件了

    从例子看不出你的 merge 有多复杂,最坏情况总可以导入关系数据库,然后从 query 流成 tsv/csv
    allenloong
        11
    allenloong  
    OP
       2019-09-02 17:31:06 +08:00
    @momocraft #10 是的,就是 tsv,合并的时候没问题,一往外写就出问题了。(第一次用 pyspark,可能真的是自己的问题。


    BName = str(os.path.basename(bg_f[0]).split('.')[0])
    schema = StructType([
    StructField('CataID', StringType(), True),
    StructField('Start_Block', IntegerType(), True),
    StructField('End_Block', IntegerType(), True),
    StructField(BName, IntegerType(), True)
    ])
    temp = sqlContext.read.csv(bg_f[0], sep='\t', header=False, schema=schema)
    for p in bg_f[1:]:
    SName = str(os.path.basename(p).split('.')[0])
    schema = StructType([
    StructField('CataID', StringType(), True),
    StructField('Start_Block', IntegerType(), True),
    StructField('End_Block', IntegerType(), True),
    StructField(BName, IntegerType(), True)
    ])
    cur = sqlContext.read.csv(p, sep='\t', header=False, schema=schema)
    temp = temp.join(cur,
    on=['CataID', 'Start_Block', 'End_Block'],
    how='outer')
    temp = temp.drop('CataID', 'Start_Block', 'End_Block')
    letking
        12
    letking  
       2019-09-02 17:40:40 +08:00
    @allenloong spark 惰性运行的,合并操作可能没有立即执行而是在写结果的时候才执行。
    你这问题可能是合并时内存就不够。
    allenloong
        13
    allenloong  
    OP
       2019-09-02 17:45:14 +08:00
    @letking #12 那有什么好的方法推荐吗?
    liprais
        14
    liprais  
       2019-09-02 17:47:09 +08:00
    temp = temp.join(cur,
    on=['CataID', 'Start_Block', 'End_Block'],
    how='outer')
    temp = temp.drop('CataID', 'Start_Block', 'End_Block')
    这一段好像达不到你想要的效果
    optional
        15
    optional  
       2019-09-02 17:47:36 +08:00
    导入到数据库,然后再导出?
    ytmsdy
        16
    ytmsdy  
       2019-09-02 17:48:23 +08:00
    都这么大数据量了,写个脚本搞到数据库里面,然后再从数据库里面导出来。
    xypty
        17
    xypty  
       2019-09-02 17:49:59 +08:00
    行名称一致的话,导入数据库,然后导出,找个 csv 导入导出最快的数据库就行了
    allenloong
        18
    allenloong  
    OP
       2019-09-02 17:52:13 +08:00
    @xypty #17 每个文件的行数不一样,也不一致,列名只有前三列是一样的。
    allenloong
        19
    allenloong  
    OP
       2019-09-02 17:53:17 +08:00
    @liprais #14 我是想合并的时候用前三列做参考,输出的时候再扔掉前三列。
    allenloong
        20
    allenloong  
    OP
       2019-09-02 17:54:06 +08:00
    @optional #15 每一个文件应该都要单独导入...列名不一样
    xypty
        21
    xypty  
       2019-09-02 17:54:10 +08:00
    @allenloong 可以考虑一下 mongo,mongo 我觉得还可以
    tinybaby365
        22
    tinybaby365  
       2019-09-02 17:56:52 +08:00
    不知道为什么要合并,意义何在?如果为了方便存储管理,那就打成一个 tar 包。

    实在想不到合并成一个大文件的好处,以后完全没法并行处理啊……
    xomix
        23
    xomix  
       2019-09-02 17:57:17 +08:00
    这种建议是第一个文件直接复制,第二和第三个文件直接从固定字节开始读取(从第一个文件判断表头长度)后追加到复制的文件后。
    这样应该最快,再加上一些大文件复制的缓存等机制还能更快点。
    letking
        24
    letking  
       2019-09-02 17:58:49 +08:00
    @allenloong 依次导入每个文件为一张表,然后 join 导入的表,保存成中间表。跟你现在的流程一样。
    allenloong
        25
    allenloong  
    OP
       2019-09-02 18:00:15 +08:00
    @tinybaby365 #22 因为想用合并好的表去做后面的计算,但是计算要求的表就是这样。
    allenloong
        26
    allenloong  
    OP
       2019-09-02 18:03:34 +08:00
    @letking #24 还是用 spark 或者任意数据库?
    allenloong
        27
    allenloong  
    OP
       2019-09-02 18:04:12 +08:00
    @xomix #23 没有太明白 XD
    letking
        28
    letking  
       2019-09-02 18:07:04 +08:00
    @allenloong mysql 应该就可以。
    spark 之所以这么搞不行是因为 spark 是把所有数据都加载的内存里处理的,而数据库是把数据存磁盘的。
    如果你有 hadoop 集群的话可以用 spark 这么做。
    allenloong
        29
    allenloong  
    OP
       2019-09-02 18:08:33 +08:00
    @letking #28 明白了 我试试 谢谢你
    corefx
        30
    corefx  
       2019-09-02 18:13:58 +08:00
    楼上这么多回复,看的我一脸问号,斗胆问下下合并文件跟内存有什么关系?直接操作文件流,从源文件读取一行,向目标文件写入一行,全部内存开销就是 1 行文本啊!!!
    aheadlead
        31
    aheadlead  
       2019-09-02 18:18:12 +08:00
    分治?类似于归并排序?
    letking
        32
    letking  
       2019-09-02 18:23:39 +08:00
    @corefx 好好看看楼主的问题?
    cigarzh
        33
    cigarzh  
       2019-09-02 18:31:11 +08:00 via iPhone
    上数据库
    rrfeng
        34
    rrfeng  
       2019-09-02 18:48:02 +08:00
    如果有序,就是个归并排序,可以边排边写。
    如果无序,至少要把 index 存起来,可以用 hash 或者原始值,2000w 的 index 算一下内存需要多少?

    最后按 hash 输出即可。


    awk '{a[$1$2$3]+=$4}END{for(i in a) print i,a}' *.csv
    guyskk0x0
        35
    guyskk0x0  
       2019-09-03 09:53:25 +08:00 via Android
    遍历所有行,按 hash(col1,2,3) % N 分组写到 N 个文件,相同的 key 都在同一个文件。
    再对每个文件分别做合并。
    最终结果直接 concat 即可。
    allenloong
        36
    allenloong  
    OP
       2019-09-03 14:05:41 +08:00
    @guyskk0x0 #35 emmmm 如果按照 hash 把文件进行了拆分,合并的时候怎么保证每个小文件的列能够对应上呢?
    vimiix
        37
    vimiix  
       2019-09-03 17:55:56 +08:00
    尝试下 dpark?
    cassidyhere
        38
    cassidyhere  
       2019-09-03 18:29:29 +08:00
    先读所有文件的第一行构造出最终的 columns,再遍历文件
    关于   ·   帮助文档   ·   API   ·   FAQ   ·   我们的愿景   ·   广告投放   ·   感谢   ·   实用小工具   ·   1426 人在线   最高记录 5497   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 31ms · UTC 18:11 · PVG 02:11 · LAX 10:11 · JFK 13:11
    ♥ Do have faith in what you're doing.