首页   注册   登录
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python 学习手册
Python Cookbook
Python 基础教程
Python Sites
PyPI - Python Package Index
http://www.simple-is-better.com/
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
V2EX  ›  Python

Celery 可以启动多个线程嘛=-=

  •  
  •   YuuuZeee · 336 天前 · 1805 次点击
    这是一个创建于 336 天前的主题,其中的信息可能已经有所发展或是发生改变。

    RT =-=小白想要提升一下现在的一个任务的性能

    假设我们有一个 api 多次调用的需求在一个 celery 的 task 里面

    最普通的方法可以是这样:

    @task
    def foo():
    	for i in range(20):
        	call_api_here(i)
        do_something_else()
    

    但是这个很耗时因为会卡在 api 调用上

    然后如果我们用 multiprocessing (不是一个好的注意,但是粗暴), celery 有自己的 library 叫做 billiard.

    然后我们可以这样做

    @task
    def foo():
    	with Pool(5) as p:
        	p.map(call_api_here, some_params)
        do_something_else()    
    

    但是这样的问题是,每一个 task 都会开启一个 pool 然后结束后这个 pool 并没有被释放,最终导致内存被各种占用

    所以我想了另外一个办法,用 multithreading

    def foo()
        threads = []
        for i in range(20):
                t = threading.Thread(target=call_api_here, args=(i,))
                threads.append(t)
    
        for t in threads:
            t.start()
    
        for t in threads:
            t.join()
    
    

    但是试了下发现,虽然线程会随着进程结束而被销毁,但是貌似在每个 task 里面只有前几个线程执行了。。。后面的都 gg 了。。。

    想问问各位大佬有什么好的方法嘛😂

    10 回复  |  直到 2019-01-12 07:34:58 +08:00
        1
    fanhaipeng0403   336 天前
    from time import sleep
    from concurrent.futures import ThreadPoolExecutor\ ProcessPoolExecutor
    def child_1():
    sleep(9)
    print(1)


    def child_2():
    sleep(2)
    print(2)



    def child_3():
    sleep(3)
    print(3)



    def child_4():
    sleep(1)
    print(4)


    def child_5():
    sleep(2)
    print(5)



    with ThreadPoolExecutor\ProcessPoolExecutor(max_workers=5) as executor:
    executor.submit(child_1)
    executor.submit(child_2)
    executor.submit(child_3)
    executor.submit(child_4)
    executor.submit(child_5)


    t1= executor.submit(child_1)
    t2=executor.submit(child_2)
    t3=executor.submit(child_3)
    t4=executor.submit(child_4)
    t5=executor.submit(child_5)

    print(t1.result())
        2
    YuuuZeee   336 天前
    @fanhaipeng0403 这样的话每个任务都会创建一个 pool 还是占着内存呀
        3
    fanhaipeng0403   336 天前
    y worker -A app.tasks.celery -l INFO -Q default -c 20 (每个队列多搞几个 worker ) -n default_worker.%%i
        4
    fanhaipeng0403   336 天前
    import asyncio

    async def slow_operation(n):
    await asyncio.sleep(n)
    print('Slow operation {} complete'.format(n))
    return n


    loop = asyncio.get_event_loop()
    done, _ = loop.run_until_complete(
    asyncio.wait([
    slow_operation(1),
    slow_operation(2),
    slow_operation(9),
    slow_operation(2),
    slow_operation(1),
    slow_operation(2),
    slow_operation(3),
    ]))
    for fut in done:
    print("return value is {}".format(fut.result()))

    然后用 uvloop
        5
    Hstar   336 天前
    关键词 celery worker
        6
    ipwx   336 天前
    不能拆成多个 celery 任务,让 celery 去管嘛?
        7
    YuuuZeee   336 天前
    @ipwx 你的意思是对 api 的调用也拆成 celery 任务嘛=-=?这个思路也有考虑
    优点是 每个任务都可以追踪
    缺点是 任务颗粒度太高,很吃存储
        8
    wizardoz   336 天前
    同意楼上的拆成很多任务
        9
    ohyeah521   328 天前
    遇到同样的问题,一个 task 启动了,然后在 task 里面读取一个目录下面的所有文件,查找文件内容是否包含我要查找的字符串,如果不用 threading,就是一个文件一个文件的读取,效率真的很低,请问各位大佬该怎么办?


    每个文件启动一个 worker 感觉也很 low 啊
        10
    YuuuZeee   327 天前 via Android
    @ohyeah521 用 threading
    关于   ·   FAQ   ·   API   ·   我们的愿景   ·   广告投放   ·   感谢   ·   实用小工具   ·   1186 人在线   最高记录 5043   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.3 · 50ms · UTC 17:59 · PVG 01:59 · LAX 09:59 · JFK 12:59
    ♥ Do have faith in what you're doing.