首页   注册   登录
 hanssx 最近的时间轴更新

hanssx

V2EX 第 350569 号会员,加入于 2018-09-19 11:30:43 +08:00
celery worker 多线程执行完后卡住假死
Python  •  hanssx  •  94 天前  •  最后回复来自 hanssx
17
celery 到底能不能动态添加周期任务
Python  •  hanssx  •  107 天前  •  最后回复来自 hanssx
13
vmware manjaro network 上不了网
Linux  •  hanssx  •  111 天前  •  最后回复来自 hanssx
9
sqlalchemy 中 column 类型为 list 的情况
Flask  •  hanssx  •  171 天前  •  最后回复来自 strict
7
使用 pycharm 对 Python 项目调试遇到非常诡异的现象
Python  •  hanssx  •  272 天前  •  最后回复来自 EdwardChu
8
logging format 如何优雅地对齐
Python  •  hanssx  •  2018-11-30 14:47:14 PM  •  最后回复来自 hanssx
5
hanssx 最近回复了
我之前写的用 join 的方法不对,join 方法虽然也有 wait 的功能,但是如果单个进程 join 还好,多个进程的话可能在 join 之前进程就已经结束变为僵尸进程,现在的方法是处理 SIGCHILD 信号,这样保证不会有僵尸进程了,至于还会不会有其他问题,下下周再跑程序时反馈,感谢各位的帮助。如果有更好的办法,请告知,红包以表谢意。
这回复不支持 markdown 也有点难受。
@ytymf 确实是的,这个父进程是 celery worker 的,如果不用 celery,这套东西是没问题的,我修改了如下代码:
```
def run(self):
self.scan_list = sorted(self.scan_list) # ['http://2.2.2.2:22', 'http://www.baidu.com', ......]
for url in self.scan_list:
self.queue.put(url)
time.sleep(3) # KLD 将一个对象放入一个空队列后,可能需要极小的延迟,队列的方法 empty()才会返回 False。 参考: https://docs.python.org/zh-cn/3/library/multiprocessing.html
self.scan_start()
```
改为
```
def run(self):
self.scan_list = sorted(self.scan_list) # ['http://2.2.2.2:22', 'http://www.baidu.com', ......]
scan_list_len = len(self.scan_list)
# self.total_domain_cnt = len(self.scan_list)
for url in self.scan_list:
self.queue.put(url)
while self.queue.qsize() != scan_list_len:
time.sleep(3) # KLD 将一个对象放入一个空队列后,可能需要极小的延迟,队列的方法 empty()才会返回 False。 参考: https://docs.python.org/zh-cn/3/library/multiprocessing.html
self.scan_start()
```

```
def scan_start(self):
"""
进程池扫描启动
:return:
"""
for i in range(self.process_count):
t = WebFingerprintDiscernProcess(self.config, self.queue, self.lock)
t.daemon = True
t.start()
self.queue.join()
```
改为
```
def scan_start(self):
"""
进程池扫描启动
:return:
"""
process_list = []
for i in range(self.process_count):
# As far as possible one should try to avoid shifting large amounts of data between processes.
# https://docs.python.org/3/library/multiprocessing.html#multiprocessing-programming
t = WebFingerprintDiscernProcess(self.config, self.queue, self.lock)
t.daemon = True
t.start()
process_list.append(t)
for p in process_list:
p.join()
self.queue.join()
```
不过我的用法确实太奇怪,celery 官方是不推荐在里面使用多进程的,billiard 看 issue 里面好像也要换的意思。
@lolizeppelin 额,spawn 模式启动不起来 celery worker,师父你知道哪儿的问题吗,能说一下吗。
你说的基础知识,我理解可能是信号处理,你是说子进程结束的时候发送 SIGCHILD KILL 之类的信号,如果父进程不处理就会使子进程变为僵尸进程是吧,这个我是知道的,我也避免了,加上了子进程.join()
@lolizeppelin 那个是这样的,本身使用 multiprocessing 没啥问题,在 celery worker 里面使用就会有问题,celery 也给了一个 billiard,算是 multiprocessing 的 patch。
@ytymf 试了一下,celery worker 直接启动不起来,考虑用 canvas 重构一下,还是感谢师父,学到了。
import billiard as multiprocessing
multiprocessing.set_start_method('spawn')
@ytymf 多谢师父,晚上我改一下试试。
@ytymf 多谢指教,学到了。我这个主进程是 celery worker 产生的进程,我代码中并没有用线程,更没有用多线程。僵尸进程产生的原因应该子进程结束之后,父进程仍然存在且没有 wait 或 communicate 子进程,也就是没处理子进程发来的 SIGCHILD KILL 那个信号。
@hhbcarl 谢谢师父,你说的这个 canvas 之前有了解,但是现在项目代码不太容易改,周末尝试一下。
----------------------------------------------------------------------------------------------------------------------------------------
@ClericPy 是僵尸进程,Z 代表 Zombie,僵尸进程产生的原因是因为子进程结束之后,父进程仍然存在且没有 wait 或 communicate 子进程,产生原因我倒是清楚,只是不明白为什么一开始就有子进程结束,因为队列里面的东西很多,不可能一开始子进程就结束的。
----------------------------------------------------------------------------------------------------------------------------------------
@ytymf “multiprocessing 一定要确保父进程是单线程的”,这个是啥意思呀,是确保父进程是单进程吧? celery worker 里面看确实是单进程。
48 天前
回复了 zbl430 创建的主题 Linux QQ For Linux 我哭了,官方版
要是有企业微信就好了。
关于   ·   FAQ   ·   API   ·   我们的愿景   ·   广告投放   ·   感谢   ·   实用小工具   ·   1165 人在线   最高记录 5043   ·     Select Language
创意工作者们的社区
World is powered by solitude
VERSION: 3.9.8.3 · 20ms · UTC 18:39 · PVG 02:39 · LAX 10:39 · JFK 13:39
♥ Do have faith in what you're doing.