如何优雅的通知 multiprocessing.Pool 中的进程退出?

2018-04-15 00:21:28 +08:00
 Monad

目的是想在主进程中随时终止子进程的执行 目前的代码长这样 有个问题是如果我 Ctrl-C 的话 所有的子进程虽然会正常退出 但是主进程会一直挂起在 pool.join()上 求解决方案~

#!/usr/bin/env python2

from __future__ import print_function

import time
import signal
import logging
from multiprocessing import Manager
from multiprocessing.pool import Pool
from multiprocessing.queues import Queue


def Fn(n, q, ns):
  if ns.done:
    return
  try:
    q.put(n)
  finally:
    return


def main():
  handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
  pool = Pool(processes=5)
  signal.signal(signal.SIGINT, handler)

  total = 10000
  manager = Manager()
  q = manager.Queue()
  ns = manager.Namespace()
  ns.done = False

  for n in range(total):
    pool.apply_async(Fn, args=(n, q, ns))
  pool.close()
  try:
    received = 0
    while received != total:
      n = q.get(60)
      print('get {} from queue'.format(n))
      received += 1
  except KeyboardInterrupt:
    ns.done = True
    pass
  finally:
    pool.join()


if __name__ == '__main__':
  main()
  
7208 次点击
所在节点    Python
15 条回复
lolizeppelin
2018-04-15 09:37:48 +08:00
openstack 里缩水的代码, 稍微简化了下
https://github.com/lolizeppelin/simpleservice/blob/master/simpleservice/base.py

看懂了就知道怎么处理了
lolizeppelin
2018-04-15 09:41:28 +08:00
def _pipe_watcher(self):
# This will block until the write end is closed when the parent
# dies unexpectedly
self.readpipe.read(1)

LOG.info('Parent process has died unexpectedly, exiting')

if self.launcher:
self.launcher.stop()
sys.exit(1)
Monad
2018-04-15 12:11:59 +08:00
@lolizeppelin #2 这段代码在主进程创建 pipe, 然后设置进程退出时的 close fd, 通过这种方式来通知子进程 read 返回吧
但是我现在是 hang 在了主进程的 join 呢 而且目前来看 子进程(应该)是都正常退出了 主进程没有返回
lolizeppelin
2018-04-15 12:18:26 +08:00
pipe 是主进程退出 子进程也收到能退出 是个退出保险

主进程里确认子进程退出用 waitpid
lolizeppelin
2018-04-15 12:24:14 +08:00
啥叫“应该” 子进程是否结束是可以看到

multiprocessing 我记得默认是用 socket 来父子进程通信的
join 里应该是取了 socket 数据 并 wait 子进程结束

好好看看处理信号的部分就知道怎么让子进程 exit 了

当然如果你代码是 win 上的当我上面的的都没说
Monad
2018-04-15 12:36:38 +08:00
@lolizeppelin #5 我的意思是 ##应该正常##退出了 子进程是肯定退出了 ps 看过 至于是不是异常退出的 并没有确定
如果不用 Pool 手动创建管理的话 waitpid 应该是 OK 的
但是用 Pool 的话 惯用法应该不会去 os.getpid 然后手动 waitpid 吧
lolizeppelin
2018-04-15 13:12:41 +08:00
你代码有问题 子进程是不是正常退出的都不知道 直接 try 包一层都好啊

子进程有信号处理没
主进程收到 ctrl c 信号以后 给所有子进程发终止信号不就行了
lolizeppelin
2018-04-15 13:15:00 +08:00
打日志 好歹你要知道子进程怎么退出的
Monad
2018-04-15 13:18:04 +08:00
@lolizeppelin #7 不是子进程正不正常退出的问题 这只是一个描述问题 不用纠结 我用 try 能保证它退出 并且我上面贴的代码就是这样了

现在的问题是 子进程全部退出了 父进程仍然在 pool.join()没有返回 这个问题
Monad
2018-04-15 13:19:48 +08:00
@lolizeppelin #8 按照我的理解 如果用 C 的做法实现 pool.join 子进程无论怎么正常 /异常退出 父进程都应该能够通过 waitpid 感知到子进程退出 所有退出之后 join 就可以返回了 所以 python 现在这个现象让我很不理解
lolizeppelin
2018-04-15 14:45:04 +08:00
看了下, 和父子进程一点关系都没.....

自进程
ns.done 没有捕获异常只是小问题

主要在这 3 有问题
manager = Manager()
q = manager.Queue()
ns = manager.Namespace()

要解决得慢慢折腾里面代码 我随便弄了下不想弄了, 折腾 multiprocessing 不如自己写多进程代码还好控一点
lolizeppelin
2018-04-16 10:13:19 +08:00
@Monad
大致搞定了 给你代码弄蒙了

一开始叫你看信号是没错的,你信号用错了.................

except KeyboardInterrupt 这是不对的,你注册正确的拦截信号以后,是不会收到这个错误的

信号要处理 2 次,一次是在 fork 前,就是 multiprocessing 创建任务之前,拦截 SIGINT,拦截执行内容
def empty(signo, frame):
print 'do nothing!!!'
这里的目的是让 multiprocessing 里的代码不会因为收到 SIGINT 抛出异常

第二次处理时在 fork 后,拦截内容
def stop(signo, frame):
print 'stoped'
ns.done = True

这里拦截到信号以后设置 ns.done
lolizeppelin
2018-04-16 10:16:43 +08:00
顺便...这个 pool 用的有点问题 Fn 返回后还会生成新的 Fn 塞进去....具体你看看怎么停掉 pool 我就不看了
fool079
2018-04-16 10:17:30 +08:00
我印象中 setDaemon=True
也就是设为守护进程就可以随着父进程的关闭而关闭了。。
itfanr
2018-09-12 15:17:33 +08:00
参考 Samba 和 ctdb 代码吧

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/446911

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX