Celery 可以启动多个线程嘛=-=

2019-01-03 10:57:47 +08:00
 YuuuZeee

RT =-=小白想要提升一下现在的一个任务的性能

假设我们有一个 api 多次调用的需求在一个 celery 的 task 里面

最普通的方法可以是这样:

@task
def foo():
	for i in range(20):
    	call_api_here(i)
    do_something_else()

但是这个很耗时因为会卡在 api 调用上

然后如果我们用 multiprocessing (不是一个好的注意,但是粗暴), celery 有自己的 library 叫做 billiard.

然后我们可以这样做

@task
def foo():
	with Pool(5) as p:
    	p.map(call_api_here, some_params)
    do_something_else()    

但是这样的问题是,每一个 task 都会开启一个 pool 然后结束后这个 pool 并没有被释放,最终导致内存被各种占用

所以我想了另外一个办法,用 multithreading

def foo()
    threads = []
    for i in range(20):
            t = threading.Thread(target=call_api_here, args=(i,))
            threads.append(t)

    for t in threads:
        t.start()

    for t in threads:
        t.join()

但是试了下发现,虽然线程会随着进程结束而被销毁,但是貌似在每个 task 里面只有前几个线程执行了。。。后面的都 gg 了。。。

想问问各位大佬有什么好的方法嘛😂

4062 次点击
所在节点    Python
10 条回复
fanhaipeng0403
2019-01-03 11:16:30 +08:00
from time import sleep
from concurrent.futures import ThreadPoolExecutor\ ProcessPoolExecutor
def child_1():
sleep(9)
print(1)


def child_2():
sleep(2)
print(2)



def child_3():
sleep(3)
print(3)



def child_4():
sleep(1)
print(4)


def child_5():
sleep(2)
print(5)



with ThreadPoolExecutor\ProcessPoolExecutor(max_workers=5) as executor:
executor.submit(child_1)
executor.submit(child_2)
executor.submit(child_3)
executor.submit(child_4)
executor.submit(child_5)


t1= executor.submit(child_1)
t2=executor.submit(child_2)
t3=executor.submit(child_3)
t4=executor.submit(child_4)
t5=executor.submit(child_5)

print(t1.result())
YuuuZeee
2019-01-03 11:17:22 +08:00
@fanhaipeng0403 这样的话每个任务都会创建一个 pool 还是占着内存呀
fanhaipeng0403
2019-01-03 11:17:28 +08:00
y worker -A app.tasks.celery -l INFO -Q default -c 20 (每个队列多搞几个 worker ) -n default_worker.%%i
fanhaipeng0403
2019-01-03 11:20:17 +08:00
import asyncio

async def slow_operation(n):
await asyncio.sleep(n)
print('Slow operation {} complete'.format(n))
return n


loop = asyncio.get_event_loop()
done, _ = loop.run_until_complete(
asyncio.wait([
slow_operation(1),
slow_operation(2),
slow_operation(9),
slow_operation(2),
slow_operation(1),
slow_operation(2),
slow_operation(3),
]))
for fut in done:
print("return value is {}".format(fut.result()))

然后用 uvloop
Hstar
2019-01-03 11:20:48 +08:00
关键词 celery worker
ipwx
2019-01-03 11:21:43 +08:00
不能拆成多个 celery 任务,让 celery 去管嘛?
YuuuZeee
2019-01-03 11:23:38 +08:00
@ipwx 你的意思是对 api 的调用也拆成 celery 任务嘛=-=?这个思路也有考虑
优点是 每个任务都可以追踪
缺点是 任务颗粒度太高,很吃存储
wizardoz
2019-01-03 11:24:32 +08:00
同意楼上的拆成很多任务
ohyeah521
2019-01-11 20:34:46 +08:00
遇到同样的问题,一个 task 启动了,然后在 task 里面读取一个目录下面的所有文件,查找文件内容是否包含我要查找的字符串,如果不用 threading,就是一个文件一个文件的读取,效率真的很低,请问各位大佬该怎么办?


每个文件启动一个 worker 感觉也很 low 啊
YuuuZeee
2019-01-12 07:34:58 +08:00
@ohyeah521 用 threading

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/523378

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX