apply_async 多进程异步调用,子任务不执行,居然没找到原因

2019-01-06 12:59:35 +08:00
 tqknight
问题:为啥多传一个 queue 参数,子任务函数 perform_task(), 没有执行?

代码如下:

from multiprocessing import Pool, Queue
from time import time, sleep
import random


def perform_task(id, queue):
print(" [{}号] 子进程 [开始] 执行任务".format(id))
begin = time()
sleep(random.random() * 5)
print(" [{}号] 子进程 [结束] 执行任务".format(id))
end = time()
cost = end - begin
res = " [{}号] 子进程执行任务耗时:{}".format(id, cost)
queue.put(res)
print(res)



if __name__ == "__main__":
# 进程结果集
queue = Queue()

# 进程池中进程最大数量
pool_count = 5

# 创建进程池
pool = Pool(pool_count)
print("进程池准备就绪, 多进程开始执行任务,等待结束...")

#
for i in range(pool_count):
pool.apply_async(perform_task, args=(i, queue) )
# 关闭进程池,使其不再接受新的任务
pool.close()
# 主进程阻塞机制
pool.join()
print("所有进程池中的任务完成")


正常结果应该是:

进程池准备就绪, 多进程开始执行任务,等待结束...
[ 0 号] 子进程 [开始] 执行任务
[ 1 号] 子进程 [开始] 执行任务
[ 2 号] 子进程 [开始] 执行任务
[ 3 号] 子进程 [开始] 执行任务
[ 4 号] 子进程 [开始] 执行任务
[ 3 号] 子进程 [结束] 执行任务
[ 3 号] 子进程执行任务耗时:0.24634218215942383
[ 4 号] 子进程 [结束] 执行任务
[ 4 号] 子进程执行任务耗时:0.41087770462036133
[ 2 号] 子进程 [结束] 执行任务
[ 2 号] 子进程执行任务耗时:0.8377587795257568
[ 1 号] 子进程 [结束] 执行任务
[ 1 号] 子进程执行任务耗时:2.044529914855957
[ 0 号] 子进程 [结束] 执行任务
[ 0 号] 子进程执行任务耗时:2.7406675815582275
所有进程池中的任务完成
7848 次点击
所在节点    Python
5 条回复
tqknight
2019-01-06 13:00:15 +08:00
求大佬指点
fanhaipeng0403
2019-01-06 15:40:10 +08:00
Note that with asynchronous programming you don't need to manually deal with result queues - apply_async returns a AsyncResult instance which can be used to get the result: result.get(). This uses an underlying result (out-) queue and so you simply need to return in your target function. Also if you use result.get() and you passed a Queue instance as an argument to the target function it will raise a RuntimeError
fanhaipeng0403
2019-01-06 15:40:15 +08:00
bihuchao
2019-01-06 16:23:54 +08:00
multiprocessing 中的 Queue 不能被序列化。
目的应该是这样吧
``` Python
#! /usr/bin/python3

import time
import random
import multiprocessing

queue = multiprocessing.Queue()

def perform_task(id):
print("{0}# process start".format(id))
begin = time.time()
time.sleep(random.random()*5)
print("{0}# process end".format(id))

res = "{0}# process time : {1}".format(id, time.time()-begin)
print(res)

queue.put(res)

return

if __name__ == "__main__":
poolCount = 5
pool = multiprocessing.Pool(poolCount)

for i in range(poolCount):
pool.apply_async(perform_task, args=(i, ))

pool.close()
pool.join()

print("End tasking, print results:")

while True:
res = queue.get()
print(res)
if queue.empty():
break

```
执行结果:
```
0# process start
1# process start
2# process start
3# process start
4# process start
0# process end
0# process time : 0.5990872383117676
1# process end
1# process time : 2.662280559539795
3# process end
3# process time : 3.903242826461792
2# process end
2# process time : 4.440236330032349
4# process end
4# process time : 4.543649435043335
End tasking, print results:
0# process time : 0.5990872383117676
1# process time : 2.662280559539795
3# process time : 3.903242826461792
2# process time : 4.440236330032349
4# process time : 4.543649435043335
```
zn
2019-01-06 16:31:08 +08:00
为啥我看到函数名就直觉是蟒蛇呢?我基本没怎么学过蟒蛇,同时百分百确定没用过这个函数。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/524336

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX