celery worker 多线程执行完后卡住假死

2019-09-02 12:11:45 +08:00
 hanssx

celery 任务代码大致如下,此任务未加 soft_time_limit 或 time_limit 其中 Session 是根据以下四行代码得来:

SQLALCHEMY_DATABASE_URI = 'mysql://xx'
some_engine = create_engine(SQLALCHEMY_DATABASE_URI, echo=False, pool_pre_ping=True)
session_factory = sessionmaker(autocommit=False, bind=some_engine)  # autoflush=False,
Session = scoped_session(session_factory)

celery 任务大致代码:

'''
class NmapThread_(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            try:
                ip = self.queue.get(block=False)
            except SoftTimeLimitExceeded as e:
                raise
            except Exception as e:
                logging.error(e, exc_info=True)  # 记录线程退出
                break
            try:
                ns = nmap.scan(ip)  # 使用 subprocess.Popen()调用外部 nmap 扫描该 ip 并将结果入库
                ns.run()
            except SoftTimeLimitExceeded as e:
                raise
            except Exception as e:
                logging.error(e, exc_info=True)
            finally:
                Session.remove()
                self.queue.task_done()


class IpScan:
    def __init__(self):
        self.queue = Queue()
        self.thread_count = 2

    def run(self):
        # 此处入队列的 IP 个数非常多,大概需要多线程运行 1 周
        self.queue.put('1.1.1.1')
        self.queue.put('2.2.2.2')
        self.queue.put('3.3.3.3')
        self.queue.put('4.4.4.4')
        self.queue.put('5.5.5.5')
        self.queue.put('6.6.6.6')
        self.scan_start()

    def scan_start(self):
        for i in range(self.thread_count):
            t = NmapThread_(self.queue)
            t.setDaemon(True)
            t.start()
        self.queue.join()

def web_fingerprint_discern(*args, **kwargs):
    print('web_fingerprint_discern begin!')
    print(args)
    print(kwargs)

@ce.task(name='default.test4queue', bind=True)
def test4queue(self):
    ips = IpScan()
    ips.run()
    web_fingerprint_discern()
'''

现在的问题是多线程运行期间一直没有问题,直到最后多线程执行完(2019-08-31),抛出异常 break 跳出各线程之后,celry worker 就卡住假死了(2019-08-31),具体表现在:

然后我今天(2019-09-02)上班的时候强制 ctrl+c 之后,输出了一些日志,

大家可以看到, web_fingerprint_discern()的 3 条 print 语句,第 1 条发生 2019-08-31,后 2 条发生在 2019-09-02 我 ctrl+c 的时候,其他附加表现:

一直不能解决这个 celery 问题,由于时间原因也不方便换其他类 celery 架构,而且调试发现如果不是扫描任务就不会假死(当然测试时间肯定没有一周那么长,只有几分钟,所以如果完整模拟测试非常耗时,想寻找可能出现的问题点修复后再行测试)

求各位大佬帮忙解决,若能解决,50 红包奉上以表谢意。

7078 次点击
所在节点    Python
17 条回复
hanssx
2019-09-02 12:26:44 +08:00
celery 版本
```
(asset) [root@VM_9_196_centos asset]# celery --version
4.3.0 (rhubarb)
```

多线程应该是结束了,我在日志中收到了 break 线程的 while 循环之前,打印出的日志,一共 30 个日志,我也启了 30 个线程。
```
[2019-08-31 22:47:31] [ERROR] - (asscan.py:38)
Traceback (most recent call last):
File "/root/python/asset/scan/asscan.py", line 34, in run
task_host = self.queue.get(block=False)
File "/usr/local/python3/lib/python3.7/queue.py", line 167, in get
raise Empty
_queue.Empty
[2019-08-31 22:48:12] [ERROR] - (asscan.py:38)
Traceback (most recent call last):
File "/root/python/asset/scan/asscan.py", line 34, in run
task_host = self.queue.get(block=False)
File "/usr/local/python3/lib/python3.7/queue.py", line 167, in get
raise Empty
_queue.Empty
[2019-08-31 22:48:12] [WARNING] - web_fingerprint_discern begin! (log.py:235)
[2019-09-02 10:21:21] [WARNING] - () (log.py:235)
[2019-09-02 10:21:21] [WARNING] - {'task_uuid': '95c3dda4-cb04-11e9-a344-52540
```
neoblackcap
2019-09-02 12:38:10 +08:00
没记错的话,celery 自身实现是对 fork 之类有限制的,所以你不应该在任务里面进行类似 fork 之类的操作,线程 pthread_create 同理了。
而且线程的支持我记得已经被 celery 自身抛弃的,所以应该是有缺陷的,建议不使用线程。

根据我以前的做法,我一般都是将网络 IO 与逻辑处理分离。celery 对 gevent 跟进程支持都相当好,因此我会选用个 gevent 处理所有网络 IO (网络 IO,通过 IO 复用,几百万个任务都可以轻松搞定,前提是不能有任何 CPU 密集型处理)。然后通过跟进程型任务结合,组成流水线,在 celery 对应 chain 操作。那么就可以稳定地运行。

因为 gevent 是处理网络是不堵塞的,所以你还是可以继续发任务给该 worker

可以参考一下
hanssx
2019-09-02 13:26:56 +08:00
@neoblackcap 谢谢 neoblackcap 师父指点,我还有几点想请教师父,
1. 总体而言就是不使用线程而使用进程或 gevent,是吧?
2. 之前每个线程执行的内容是使用 subprocess.Popen()调用外部 nmap 扫描该 ip 并将结果入库,主要是两个方面:一是调用 busprocess.Popen(),也就是终究还是会使用子进程;二是会有入库操作,这个应该不算是 CPU 密集型处理?怎么界定 CPU 密集型处理呢?
3. 我这个 nmap 扫描应属于网络 IO 密集型,是不是使用 gevent 比较好?
4. 我对 gevent 不熟悉,不知改动量大不大,师父能提供一些更改的方法吗?
sazima
2019-09-02 13:27:31 +08:00
用 task.apply_async() 可以吗, 同样是异步的.
hanssx
2019-09-02 13:33:38 +08:00
@sazima 嗯,本身触发任务的时候,就是用的 apply_async(),应该和这个关系不大。
neoblackcap
2019-09-02 16:30:44 +08:00
@hanssx cpu 密集型是相对的,关键是你的任务类型不能堵塞整个处理逻辑,凡是耗时长的,不需要 IO 的任务都是 IO 密集型

看了一下你用 subprocess.Popen 去调用 nmap,你如果要改的话,请使用 gevent 的网络接口实现你 nmap 的功能,如果不会的话,此方法无解,你还是另寻他法吧。
hanssx
2019-09-02 16:47:12 +08:00
@neoblackcap 谢谢,gevent 实现 nmap 的功能基本不可能,必须得使用 subprocess.Popen 去调用 nmap,这种情况下,我使用多进程代替多线程可以吗?你之前说进程或者 gevent 都可以。
neoblackcap
2019-09-02 18:25:30 +08:00
@hanssx 不可以,可以的前提是你改得动网络请求的部分
lovedebug
2019-09-02 18:39:57 +08:00
我碰到的卡住都是程序自己的 bug,异常没抓住。
hanssx
2019-09-02 18:46:11 +08:00
@neoblackcap 我不明白为啥不可以,我先试试用多进程,你也说了 celery 对线程支持有缺陷,网络请求的阻塞是必然的。
xixijun
2019-09-03 10:19:33 +08:00
我之前也写过 celery 调度 nmap 的扫描器,答案是可以的。
celery task 里面用 subprocess.Popen 调用 nmap
celery 的启动用默认的 execute pool 即 prefork。
还要注意 soft time limit 的设置,超时时需要手动 kill nmap 子进程,防止孤儿进程和僵尸进程
hanssx
2019-09-03 11:08:28 +08:00
@xixijun 感谢 xixijun 师父的回答,请问师父你说的可以,是指使用多进程来代替多线程吗?我这边扫的是公司全网,就是扫完之后 celery worker 就卡住假死了,具体详情可查看一下问题描述。
hanssx
2019-09-03 16:15:57 +08:00
2019-09-03 Update:
已经修改为 multiprocessing 多线程,但实际测试时,celery 不能直接使用 multiprocessing,解决方案参见 https://stackoverflow.com/questions/30624290/celery-daemonic-processes-are-not-allowed-to-have-children
因为使用 cavas 改动比较大,所以我直接使用得 import billiard as multiprocessing
目前运行中,持续观察。
hanssx
2019-09-04 22:49:49 +08:00
多进程可以解决这个问题,之前 @崔庆才师父说可能是 logging 死锁的问题,很有可能,待下一步确定。
hanssx
2019-09-08 10:27:04 +08:00
已确定为 logging 死锁问题,50 块钱由崔庆才师父和 @neoblackcap 师父平分,
@neoblackcap 师父,加我一下扣扣 9 六 14 六 2392,把支付宝账号发我即可。
neoblackcap
2019-09-08 12:53:43 +08:00
@hanssx 现在我想起来了,我建议你还用 nmap 的 Python 封装库,而不是直接用 subprocess,这样就比较少一些问题,好像叫 Python-nmap,搜一下就可以了。钱就不需要了
hanssx
2019-09-09 11:33:48 +08:00
@neoblackcap 嗯,我使用得是你说的 nmap 的 Python 封装库,源码里面使用得也是 subprocess.Popen(),额,需要时可加我扣扣,随时欢迎师父加我。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/597143

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX