首页   注册   登录
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python 学习手册
Python Cookbook
Python 基础教程
Python Sites
PyPI - Python Package Index
http://www.simple-is-better.com/
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
hanssx
V2EX  ›  Python

celery worker 多线程执行完后卡住假死

  •  
  •   hanssx · 140 天前 · 2293 次点击
    这是一个创建于 140 天前的主题,其中的信息可能已经有所发展或是发生改变。

    celery 任务代码大致如下,此任务未加 soft_time_limit 或 time_limit 其中 Session 是根据以下四行代码得来:

    SQLALCHEMY_DATABASE_URI = 'mysql://xx'
    some_engine = create_engine(SQLALCHEMY_DATABASE_URI, echo=False, pool_pre_ping=True)
    session_factory = sessionmaker(autocommit=False, bind=some_engine)  # autoflush=False,
    Session = scoped_session(session_factory)
    

    celery 任务大致代码:

    '''
    class NmapThread_(threading.Thread):
        def __init__(self, queue):
            threading.Thread.__init__(self)
            self.queue = queue
    
        def run(self):
            while True:
                try:
                    ip = self.queue.get(block=False)
                except SoftTimeLimitExceeded as e:
                    raise
                except Exception as e:
                    logging.error(e, exc_info=True)  # 记录线程退出
                    break
                try:
                    ns = nmap.scan(ip)  # 使用 subprocess.Popen()调用外部 nmap 扫描该 ip 并将结果入库
                    ns.run()
                except SoftTimeLimitExceeded as e:
                    raise
                except Exception as e:
                    logging.error(e, exc_info=True)
                finally:
                    Session.remove()
                    self.queue.task_done()
    
    
    class IpScan:
        def __init__(self):
            self.queue = Queue()
            self.thread_count = 2
    
        def run(self):
            # 此处入队列的 IP 个数非常多,大概需要多线程运行 1 周
            self.queue.put('1.1.1.1')
            self.queue.put('2.2.2.2')
            self.queue.put('3.3.3.3')
            self.queue.put('4.4.4.4')
            self.queue.put('5.5.5.5')
            self.queue.put('6.6.6.6')
            self.scan_start()
    
        def scan_start(self):
            for i in range(self.thread_count):
                t = NmapThread_(self.queue)
                t.setDaemon(True)
                t.start()
            self.queue.join()
    
    def web_fingerprint_discern(*args, **kwargs):
        print('web_fingerprint_discern begin!')
        print(args)
        print(kwargs)
    
    @ce.task(name='default.test4queue', bind=True)
    def test4queue(self):
        ips = IpScan()
        ips.run()
        web_fingerprint_discern()
    '''
    

    现在的问题是多线程运行期间一直没有问题,直到最后多线程执行完(2019-08-31),抛出异常 break 跳出各线程之后,celry worker 就卡住假死了(2019-08-31),具体表现在:

    • celery worker 卡住假死,没有再执行接下来的 web_fingerprint_discern(),这个函数里面其实只有 3 个 print()
    • celery flower 显示 celery worker 已离线
    • rabbitMQ 显示队列中的任务还在执行之中

    然后我今天(2019-09-02)上班的时候强制 ctrl+c 之后,输出了一些日志, nC3UzR.md.png

    大家可以看到, web_fingerprint_discern()的 3 条 print 语句,第 1 条发生 2019-08-31,后 2 条发生在 2019-09-02 我 ctrl+c 的时候,其他附加表现:

    • 最后报错了 broken pipe
    • rabbitMQ 队列中的任务已经回滚到 Ready 阶段,也就是说如果重启 celery worker 任务会重新执行

    一直不能解决这个 celery 问题,由于时间原因也不方便换其他类 celery 架构,而且调试发现如果不是扫描任务就不会假死(当然测试时间肯定没有一周那么长,只有几分钟,所以如果完整模拟测试非常耗时,想寻找可能出现的问题点修复后再行测试)

    求各位大佬帮忙解决,若能解决,50 红包奉上以表谢意。

    17 回复  |  直到 2019-09-09 11:33:48 +08:00
    hanssx
        1
    hanssx   140 天前
    celery 版本
    ```
    (asset) [[email protected]_9_196_centos asset]# celery --version
    4.3.0 (rhubarb)
    ```

    多线程应该是结束了,我在日志中收到了 break 线程的 while 循环之前,打印出的日志,一共 30 个日志,我也启了 30 个线程。
    ```
    [2019-08-31 22:47:31] [ERROR] - (asscan.py:38)
    Traceback (most recent call last):
    File "/root/python/asset/scan/asscan.py", line 34, in run
    task_host = self.queue.get(block=False)
    File "/usr/local/python3/lib/python3.7/queue.py", line 167, in get
    raise Empty
    _queue.Empty
    [2019-08-31 22:48:12] [ERROR] - (asscan.py:38)
    Traceback (most recent call last):
    File "/root/python/asset/scan/asscan.py", line 34, in run
    task_host = self.queue.get(block=False)
    File "/usr/local/python3/lib/python3.7/queue.py", line 167, in get
    raise Empty
    _queue.Empty
    [2019-08-31 22:48:12] [WARNING] - web_fingerprint_discern begin! (log.py:235)
    [2019-09-02 10:21:21] [WARNING] - () (log.py:235)
    [2019-09-02 10:21:21] [WARNING] - {'task_uuid': '95c3dda4-cb04-11e9-a344-52540
    ```
    neoblackcap
        2
    neoblackcap   140 天前   ♥ 1
    没记错的话,celery 自身实现是对 fork 之类有限制的,所以你不应该在任务里面进行类似 fork 之类的操作,线程 pthread_create 同理了。
    而且线程的支持我记得已经被 celery 自身抛弃的,所以应该是有缺陷的,建议不使用线程。

    根据我以前的做法,我一般都是将网络 IO 与逻辑处理分离。celery 对 gevent 跟进程支持都相当好,因此我会选用个 gevent 处理所有网络 IO (网络 IO,通过 IO 复用,几百万个任务都可以轻松搞定,前提是不能有任何 CPU 密集型处理)。然后通过跟进程型任务结合,组成流水线,在 celery 对应 chain 操作。那么就可以稳定地运行。

    因为 gevent 是处理网络是不堵塞的,所以你还是可以继续发任务给该 worker

    可以参考一下
    hanssx
        3
    hanssx   140 天前
    @neoblackcap 谢谢 neoblackcap 师父指点,我还有几点想请教师父,
    1. 总体而言就是不使用线程而使用进程或 gevent,是吧?
    2. 之前每个线程执行的内容是使用 subprocess.Popen()调用外部 nmap 扫描该 ip 并将结果入库,主要是两个方面:一是调用 busprocess.Popen(),也就是终究还是会使用子进程;二是会有入库操作,这个应该不算是 CPU 密集型处理?怎么界定 CPU 密集型处理呢?
    3. 我这个 nmap 扫描应属于网络 IO 密集型,是不是使用 gevent 比较好?
    4. 我对 gevent 不熟悉,不知改动量大不大,师父能提供一些更改的方法吗?
    sazima
        4
    sazima   140 天前
    用 task.apply_async() 可以吗, 同样是异步的.
    hanssx
        5
    hanssx   140 天前
    @sazima 嗯,本身触发任务的时候,就是用的 apply_async(),应该和这个关系不大。
    neoblackcap
        6
    neoblackcap   140 天前   ♥ 1
    @hanssx cpu 密集型是相对的,关键是你的任务类型不能堵塞整个处理逻辑,凡是耗时长的,不需要 IO 的任务都是 IO 密集型

    看了一下你用 subprocess.Popen 去调用 nmap,你如果要改的话,请使用 gevent 的网络接口实现你 nmap 的功能,如果不会的话,此方法无解,你还是另寻他法吧。
    hanssx
        7
    hanssx   140 天前
    @neoblackcap 谢谢,gevent 实现 nmap 的功能基本不可能,必须得使用 subprocess.Popen 去调用 nmap,这种情况下,我使用多进程代替多线程可以吗?你之前说进程或者 gevent 都可以。
    neoblackcap
        8
    neoblackcap   140 天前 via iPhone
    @hanssx 不可以,可以的前提是你改得动网络请求的部分
    lovedebug
        9
    lovedebug   140 天前 via Android
    我碰到的卡住都是程序自己的 bug,异常没抓住。
    hanssx
        10
    hanssx   140 天前
    @neoblackcap 我不明白为啥不可以,我先试试用多进程,你也说了 celery 对线程支持有缺陷,网络请求的阻塞是必然的。
    xixijun
        11
    xixijun   139 天前   ♥ 1
    我之前也写过 celery 调度 nmap 的扫描器,答案是可以的。
    celery task 里面用 subprocess.Popen 调用 nmap
    celery 的启动用默认的 execute pool 即 prefork。
    还要注意 soft time limit 的设置,超时时需要手动 kill nmap 子进程,防止孤儿进程和僵尸进程
    hanssx
        12
    hanssx   139 天前
    @xixijun 感谢 xixijun 师父的回答,请问师父你说的可以,是指使用多进程来代替多线程吗?我这边扫的是公司全网,就是扫完之后 celery worker 就卡住假死了,具体详情可查看一下问题描述。
    hanssx
        13
    hanssx   139 天前
    2019-09-03 Update:
    已经修改为 multiprocessing 多线程,但实际测试时,celery 不能直接使用 multiprocessing,解决方案参见 https://stackoverflow.com/questions/30624290/celery-daemonic-processes-are-not-allowed-to-have-children
    因为使用 cavas 改动比较大,所以我直接使用得 import billiard as multiprocessing
    目前运行中,持续观察。
    hanssx
        14
    hanssx   138 天前
    多进程可以解决这个问题,之前 @崔庆才师父说可能是 logging 死锁的问题,很有可能,待下一步确定。
    hanssx
        15
    hanssx   134 天前
    已确定为 logging 死锁问题,50 块钱由崔庆才师父和 @neoblackcap 师父平分,
    @neoblackcap 师父,加我一下扣扣 9 六 14 六 2392,把支付宝账号发我即可。
    neoblackcap
        16
    neoblackcap   134 天前 via iPhone   ♥ 1
    @hanssx 现在我想起来了,我建议你还用 nmap 的 Python 封装库,而不是直接用 subprocess,这样就比较少一些问题,好像叫 Python-nmap,搜一下就可以了。钱就不需要了
    hanssx
        17
    hanssx   133 天前
    @neoblackcap 嗯,我使用得是你说的 nmap 的 Python 封装库,源码里面使用得也是 subprocess.Popen(),额,需要时可加我扣扣,随时欢迎师父加我。
    关于   ·   FAQ   ·   API   ·   我们的愿景   ·   广告投放   ·   感谢   ·   实用小工具   ·   1000 人在线   最高记录 5168   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.3 · 24ms · UTC 20:20 · PVG 04:20 · LAX 12:20 · JFK 15:20
    ♥ Do have faith in what you're doing.