celery 执行自定义 inspect command 的问题

2019-09-25 16:22:58 +08:00
 40huo

有个项目需要用到后台任务,选了 celery,但在一个场景下遇到了问题。

需要定时收集所有 worker 的一些信息,比如 CPU 负载之类,相当于一个广播出去的任务,所有 worker 执行一下然后将结果回传,但 celery 中的 broadcast task 由于 taskid 是一样的,结果只能收到一个。

celery 提供了另外一种远程控制 worker 的方式,celery inspect xxx,可以自定义一个inspect_command来执行,这在命令行下是可以正常使用,但在定时任务中遇到了问题。

@inspect_command()
def resource_usage(state):
    result = get_resource_usage()
    result = result.__dict__
    logger.warning(result)
    return {"a": result}


# 这是个定时任务,比如每分钟收集一次
@shared_task()
def get_worker_stats():
    from tasks import app

    i = app.control.inspect()
    results = i.resource_usage()
    logger.info(results)
    # 其他逻辑,存入 DB 等

直接执行 celery -A tasks inspect resource_usage 可以正常运行,但在定时任务中运行时会出现

AttributeError: 'Inspect' object has no attribute 'resource_usage'

这样的错误,似乎是自定义的 command 没有注册,但文档里也没有提到如何强制注册。。。

这种情况应该如何处理,如果 celery 实现不了别的库也行。

2049 次点击
所在节点    Python
0 条回复

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/604085

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX