可以将 asyncio 与 celery 一起使用吗?

2019-12-09 09:33:29 +08:00
 Harlaus

一个是 asyncio run_forever (我希望它一直运行,维护一个队列)的协程函数, 用 celery 装饰成 task,这样做有效吗,代码像这样的:

import asyncio
improt celery

async def do_some_work(x):
    print("Waiting " + str(x))
    await asyncio.sleep(x)

def worker():
   loop = asyncio.get_event_loop()
   for _ in range(_concurrency):
       loop.create_task(do_some_work(x))
   loop.run_forever()

@app.task
def run(msg):
    worker()

5823 次点击
所在节点    Python
12 条回复
dingyaguang117
2019-12-09 09:35:57 +08:00
- - 这样做的意义是啥?
Harlaus
2019-12-09 09:54:01 +08:00
@dingyaguang117 asyncio 的代码写好了,不想改
superrichman
2019-12-09 10:05:17 +08:00
没有试过 celery, 我用 apscheduler 的 AsyncIOScheduler 可以和 asyncio 一起使用
hustlibraco
2019-12-09 10:10:36 +08:00
run_forever 肯定不行,run_until_complete 应该可以,但是这样做很不好,还是不要偷懒了。python 代码改起来也不是很麻烦。
rogwan
2019-12-09 10:17:10 +08:00
程序是拿来解决问题的,不是拿来玩死自己的。
forrestshuang
2019-12-09 10:19:13 +08:00
celery 本身就是异步的
ClericPy
2019-12-09 10:19:47 +08:00
协程可以挂在任何线程上跑, 非主线程上跑事件循环没什么经验的话有一定可能出问题, 还是那句, 何必呢

把多个任务用 gather 合起来, run_forever 换成 run_until_complete 放里面跑吧, 有时候 running 的 Loop 还不让, 就得 new 一个 Loop
Harlaus
2019-12-09 10:21:02 +08:00
@hustlibraco 也不完全是偷懒的原因,我用 asyncio 和 concurrent.futures 写了一个 work with 队列的代码( https://gitee.com/Harlaus/pipenode/blob/master/pipenode/server.py)见笑了非常粗糙,我想用它集成我写的一些任务,同时我又不想放弃 celery,就酱
hustlibraco
2019-12-09 10:28:44 +08:00
@Harlaus 你可以把队列的部分去掉,替换成 celery,loop 只能有一个,然后去调度 celery 的 task。
Harlaus
2019-12-09 10:37:19 +08:00
@ClericPy
‘把多个任务用 gather 合起来, run_forever 换成 run_until_complete 放里面跑吧, 有时候 running 的 Loop 还不让, 就得 new 一个 Loop'
现在已经是了
lolizeppelin
2019-12-09 15:22:52 +08:00
至少之前我看 kombu 代码的时候还不支持 asyncio

asyncio 就在大量库支持之前根本没法用,还不如老老实实 eventlet
18620610600
2020-04-23 17:19:03 +08:00
celery 官方要 5.0 才支持 asyncio
我的是这么实现在 celery 中跑 async def 的
```
import asyncio

def run_async(coro):
return asyncio.run(coro)

@app.task
def celery_task(*args, **kwargs):
return run_async(async_func(*args, **kwargs))

async def async_func(*args, **kwargs):
rv = await sub_func()
# do sth
return rv


async def sub_func():
return 1


# Usage:
def view(request):
task = celery_task.delay(request)
return Response({'task_id': task.id})
```

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/627140

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX