一些多进程多线程对文件操作的一些总结

2018-05-12 22:16:10 +08:00
 firejoke
读写文件,多进程和多线程

# coding:utf-8
"""
把大文件分块
big_file.txt 是一个 500M 的 5 位数的乱序文档
多线程并没有提升速度
"""

import time
txtfile = ''
import threading
def txtmap(txtup):
with open('big_file.txt','r') as f:
i = 0
while i < 100000:
txt = f.read(1)
i += 1 if txt == ',' else 0
txtup += txt
# txtmap(txtfile)
start = time.time()
for i in range(100):
txti = threading.Thread(target = txtmap(txtfile))
txti.start()
txti.join()
print(time.time() - start)
def txtmap2(txtup):
with open('big_file.txt','r') as f:
i = 0
while i < 1000000:
txt = f.read(1)
i += 1 if txt == ',' else 0
txtup += txt
start = time.time()
for i in range(10):
txtmap2(txtfile)
print(time.time() - start)


多进程和队列初试
import time
from multiprocessing import Process
from multiprocessing import Queue
num = 0
qnum = Queue()
qnum.put(num)
def testnum(num):
num += 1
qnum.put(num)
for i in range(10):
p = Process(target = testnum,args = (qnum.get(),))
p.start()
p.join()
# testnum(num)
print(qnum.get(),qnum.empty())
在这里,qnum 属于实例化对象,不需要用 global 标记



多次测试,发现多进程加队列,必须把文件指针位置也放进去,不然下一个读取位置就会乱跳
with open('big_file.txt','r') as f:
q.put ((txtfile3,f.tell()))
def txtmap(qget):
txtup = qget[0]
i = 0
f.seek(qget[1])
while i < 10:
txt = f.read(1)
i += 1 if txt == ',' else 0
txtup += txt
q.put((txtup,f.tell()))
start = time.time()
for i in range(10):
txtp2i = Process(target = txtmap,args =(q.get(),))
txtp2i.start()
txtp2i.join()
print('多进程加队列',time.time() - start,'\n',q.get())

以及这个 args 的赋值真是烦人,明明从 q.get()出来的就是元组,它非要你=后面必须是一个元组的形式才行


# coding:utf-8
"""
把大文件分块
big_file.txt 是一个 500M 的 5 位数的乱序文档
多进加队列速度 < 多进程全局变量 < 多线程加队列 < 多线程加全局变量 < 普通全局变量
多进程加队列不稳定 多进程因为进程间通讯必须借助队列 Queue 或者管道 pipe 否则改变不了全局变量
"""

import os
import time
from multiprocessing import Process
from multiprocessing import Queue
import threading
txtfile = ''
txtfile2 = ''
txtfile3 = ''
txtfile4 = ''
q = Queue()

#因为本子是 4 核的,所以我只创建了 4 个进程,python 的 GIL 的限制决定 4 核只能同时跑 4 个 python 进程,多了就要等待
with open('big_file.txt','r') as f:
def txtmap():
i = 0
global txtfile
while i < 25:
txt = f.read(1)
i += 1 if txt == ',' else 0
txtfile += txt
print(txt)
print (os.getpid ())
print(txtfile)
start = time.time()
for i in range(4):
txtpi = Process(target = txtmap)
txtpi.start()
txtpi.join()
print('多进程全局变量',time.time() - start,'\n',txtfile)
if txtfile:
print(True)
else:
print(False)

with open('big_file.txt','r') as f:
def txtmap():
i = 0
global txtfile2
while i < 10:
txt = f.read(1)
i += 1 if txt == ',' else 0
txtfile2 += txt
start = time.time()
for i in range(10):
txtti = threading.Thread(target = txtmap)
txtti.start()
txtti.join()
print('多线程全局变量',time.time() - start,'\n',txtfile2)


with open('big_file.txt','r') as f:
q.put ((txtfile3,f.tell()))
def txtmap(qget):
txtup = qget[0]
i = 0
f.seek(qget[1])
while i < 25:
txt = f.read(1)
i += 1 if txt == ',' else 0
txtup += txt
print (os.getpid ())
q.put((txtup,f.tell()))
start = time.time()
for i in range(4):
txtp2i = Process(target = txtmap,args =(q.get(),))
txtp2i.start()
txtp2i.join()
print('多进程加队列',time.time() - start,'\n',q.get()[0])

#因为队列 q 内的消息已被取完,所以再放进去一次,不然会一直处于阻塞状态等待插入消息
q.put(txtfile3)
with open('big_file.txt','r') as f:
def txtmap(txtup):
i = 0
while i < 10:
txt = f.read(1)
i += 1 if txt == ',' else 0
txtup += txt
q.put(txtup)
start = time.time()
for i in range(10):
txtt2i = threading.Thread(target = txtmap,args = (q.get(),))
txtt2i.start()
txtt2i.join()
print('多线程加队列',time.time() - start,'\n',q.get())

with open('big_file.txt','r') as f:
def txtmap2():
i = 0
global txtfile4
while i < 10:
txt = f.read(1)
i += 1 if txt == ',' else 0
txtfile4+= txt
start = time.time()
for i in range(10):
txtmap2()
print('普通全局变量',time.time() - start,'\n',txtfile4)

#os.path.geisize 返回的文件大小就是 seek 指针的最后一个位置
print(os.path.getsize('big_file.txt'))

with open('big_file.txt','r') as f:
print(f.seek(0,2))



#终于看到了多进程同时进行,哦呼!甚是欢喜!而且和文件的指针并不会冲突
from multiprocessing import Process
import time
with open('test.txt', 'r') as f:
def readseek(num):
f.seek(num)
print(time.time(),f.read(10))
time.sleep(10)
p1 = Process(target = readseek, args = (20,))
p2 = Process(target = readseek, args = (60,))
p3 = Process(target = readseek, args = (120,))
p4 = Process(target = readseek, args = (160,))
p1.start()
p2.start()
p3.start()
p4.start()
p1.join()
p2.join()
p3.join()
p4.join()
print(time.time())
3562 次点击
所在节点    Python
16 条回复
firejoke
2018-05-12 22:37:45 +08:00
第一次发帖,改了大概 3 次后发现不能更改了......
想再多进程全局变量那里加一个:多进程并不能改变全局变量,只能通过队列 Queue,或者管道 Pipe
testsec
2018-05-12 22:39:51 +08:00
Python 多线程只适用 io 密集场景
firejoke
2018-05-12 22:59:48 +08:00
@testsec 是的,但是同一时刻也是只能运行一个线程,都是在等待,所以我觉得,要是文件真的超过 1G 的那种,仍然要用和核心数一样多的进程数,这样就可以同时运行和核心数一样多的线程
neoblackcap
2018-05-12 23:11:56 +08:00
@firejoke 我建议你贴到外面的网站上,比如 github gist。你贴出来的代码实在影响大家的交流,缩进丢失,非常难以理解。
我觉得你说的多线程没有用大概是实现的一些地方不是那么好,有其他因素抵消了多线程的优势。IO 本身是会释放 GIL,跟是不是多进程没关系
lukefan
2018-05-12 23:12:23 +08:00
Python 的 IO 操作以及部分在底层释放 GIL 锁的类库并不受 GIL 限制.

一般起 CPU 核数的进程针对的是 CPU 密集型操作, 并不是 IO 密集型操作.

本地的文件 IO 一般情况下搞个单独的消费者线 /进程就够了, 多了性能反而差.

一般爆协程、线程处理的 IO 密集型操作的 IO 主要是 socket、PIPE 这类, 并不是文件 IO.
laqow
2018-05-12 23:14:02 +08:00
感觉 python 的文本处理性能就是屎,特别是 utf8 这种变长编码的,既然不用管内容的话按二进制文件处理会快很多
Applenice
2018-05-12 23:58:09 +08:00
像楼上们说的 Python 线程还是在 I/O 密集型应用上用的多,另外没有缩进看起来真的难受....0.0
wspsxing
2018-05-13 04:50:31 +08:00
依赖缩进的代码不要乱贴,如果不支持代码块就放到 github 之类,gist 在国内被 q 了。

另外,不要用随机读写来加速磁盘访问。
firejoke
2018-05-13 12:51:35 +08:00
@neoblackcap 第一次发帖,改了几次发现还是改不了,我贴到 GitHub 看下
firejoke
2018-05-13 12:53:13 +08:00
@lukefan 我设想的是把大文件分四个块分别给四个进程处理,这样不就可以缩短时间吗?
firejoke
2018-05-13 12:53:47 +08:00
@laqow 这个我没想过,我试试用 pickle 弄一下
firejoke
2018-05-13 12:54:39 +08:00
@Applenice 第一贴 第一贴 第一贴,谅解一下下~我来发到 GitHub
firejoke
2018-05-13 12:57:52 +08:00
@wspsxing 随机读写?读倒不是随机,就是命名的时候用的随机
RicardoScofileld
2018-05-14 13:24:29 +08:00
可以试试 Dpark
firejoke
2018-05-17 14:02:33 +08:00
firejoke
2018-05-18 08:47:57 +08:00
@RicardoScofileld 第一次知道这个框架,好像很好用,我仔细看看

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/454371

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX