V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
uti6770werty
V2EX  ›  Python

请教 concurrent.Future()线程池重启的问题

  •  
  •   uti6770werty · 2020-07-08 10:07:32 +08:00 · 2394 次点击
    这是一个创建于 1401 天前的主题,其中的信息可能已经有所发展或是发生改变。

    v2 的跟帖似乎无法用 markdown 语法? 所以就重新开个帖子请教这个问题。。。

    # coding=utf-8
    import codecs
    import difflib
    import os.path
    import re
    import time
    import string
    import chardet
    import shutil
    import copy
    from concurrent.futures import ProcessPoolExecutor
    
    import datetime
    
    def PrintCount(PName):
        if PName == '甲':
            DelayTime = 0.9
        if PName == '乙':
            DelayTime = 1.2
        if PName == '丙':
            DelayTime = 1.7
        if PName == '丁':
            DelayTime = 2
        if PName == '戊':
            DelayTime = 2.5
    
        countt = 0
        while True:
            countt += 1
            time.sleep(DelayTime)
            print(f'{PName}池:->',f'{countt}')
    
    
    if __name__ == '__main__':
    
        StartTime = time.clock()
        FutureDict = {}
        FutureRetDict = {}
        FutureTimeRecoderDict = {}
    
        PoolNameList = ["甲", "乙", "丙", "丁", "戊"]
    
        # 初始化进程池
        for i in range(len(PoolNameList)):
            # 进程
            FutureDict.update({PoolNameList[i]: ProcessPoolExecutor(max_workers=1)})
            # 进程 Ret
            FutureRetDict.update({PoolNameList[i]: futures.Future()})
            # 进程启动时间
            FutureTimeRecoderDict.update({PoolNameList[i]: None})
    
        # 模拟测试清空进程池的信号
        closeflag = True
    
        # 开始工作
        while True:
            ProcessNum = 0
    
            # 增加任务
            for ProcessName,FutureRet in FutureRetDict.items():
    
                # 模拟 40 秒后终结 [乙] 进程池
                if closeflag == True:
                    if time.clock() - StartTime >= 40: # 在启动 40 秒后触发
                        print(f"开始强制结束 [乙] 进程池")
                        for pid, process in FutureDict['乙']._processes.items():
                            process.terminate()
                            FutureDict['乙'].shutdown()
                        closeflag = False
                        time.sleep(15)
    
                # 如果进程池在运行
                if FutureRet.running() == True:
                    pass
                else:
                    # 增加任务
                    FutureRetDict[ProcessName] = FutureDict[ProcessName].submit(PrintCount,ProcessName)
                    print(f'{ProcessName} 进程池提交了开始.')
                    time.sleep(2)
                    break
            time.sleep(3)
    
    

    40 秒之后,乙进程池的确被停了,但是再向乙进程池提交任务的时候,会提示:

    甲池:-> 57
    甲池:-> 58
    丙池:-> 25
    戊池:-> 13
    甲池:-> 59
    丁池:-> 19
    甲池:-> 60
    丙池:-> 26
    Traceback (most recent call last):
    File "D:/TestForMu.py", line 80, in <module>
    FutureRetDict[ProcessName] = FutureDict[ProcessName].submit(PrintCount,ProcessName)
    File "C:\Users\Administrator\AppData\Local\Programs\Python\Python36\lib\concurrent\futures\process.py", line 452, in submit
    raise BrokenProcessPool('A child process terminated '
    concurrent.futures.process.BrokenProcessPool: A child process terminated abruptly, the process pool is not usable anymore
    甲池:-> 61
    戊池:-> 14
    丁池:-> 20
    丙池:-> 27
    

    应该要做些什么事情,再次启用?
    原谅我这样设计流程,也许不是科学的。。。^_^ 谢谢大家解答!

    6 条回复    2020-07-16 09:31:51 +08:00
    Latin
        1
    Latin  
       2020-07-08 11:05:08 +08:00
    标题不严谨
    已经停止少了一个,肯定有一个提交不了
    或者重新实例化一个进程池提交一个乙进程
    linw1995
        2
    linw1995  
       2020-07-08 11:05:11 +08:00
    pool 就是为了复用多个进程,创建那么多个 pool 干嘛……只要一个就好了
    uti6770werty
        3
    uti6770werty  
    OP
       2020-07-08 12:48:13 +08:00
    @Latin 哦,原来是这样的,请教一个问题,被.shutdown()后的进程池,一直存在吗,系统会自动回收资源吗?

    @linw1995 流程的设计,max_workers=1,想法上来讲,一池一个比较好控制,有其中一个原因是,执行函数有些不稳定,一个进程崩溃了,如 @Latin 大大所说,推倒,再实例一个就应该 OK 了,如果一个池里有 N 个,很难收敛,也不好统计和校验殃及了多少个进程需要重来。。。
    uti6770werty
        4
    uti6770werty  
    OP
       2020-07-08 12:51:38 +08:00
    @linw1995 还有一个就是,Pool 实例自带了 Pool 中止的方法,如果一个 Pool 有 N 个进程,就放飞了。。。
    uti6770werty
        5
    uti6770werty  
    OP
       2020-07-08 19:31:59 +08:00
    报告一下,下午做了个测试,在 process.terminate()后,对 Pool.shutdown(),系统看到进程池开的进程消失了,重新开实例继续也没问题,系统资源应该是被回收了。
    oahebky
        6
    oahebky  
       2020-07-16 09:31:51 +08:00
    看了这个帖子楼主的逻辑。

    楼主自己已经有答案了。对于怎么解决问题我就不多说了。

    ======

    说一个我个人看这个贴子的程序设计的看法。
    我觉得用 concurrent.futures.ProcessPoolExecutor 来达到楼主的目的,有种拿着锤子看什么问题都是钉子的感觉。

    其实担心子进程异常死掉(比如 hang 死)之类的,不如直接使用 multiprocessing.Process 来创建子进程执行 work 。


    既然是 concurrent.futures 包里面的类,其实已经说明了,这个类是为了 future 思想设计的。
    说得简单点就是即提供高度封装的多进程 API,更重要的是为了兼容异步编程,和其它异步库同用。


    楼主的程序设计其实重点不在异步思想上面( future ),所以楼主的这种并发编程其实可以考虑其它库(非 concurrent.futures ),因为在考虑到对子进程的“精细控制”上,concurrent.futures.ProcessPoolExecutor 提供的 API 实质上并不是很洽合这种场景。

    当然,这是我个人的一点看法,仅供参考。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   5901 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 30ms · UTC 01:37 · PVG 09:37 · LAX 18:37 · JFK 21:37
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.