Python 异步任务如何检测运行时间

2021-01-20 17:19:21 +08:00
 fangwenxue
    async def run_task(self, task_name, task_fun):
        self.logger.info(f'{task_name}')
        begin_time = int(time.time())
        self.task_state = False
        # t = threading.Thread(target=self.check_task, args=(begin_time, task_name))
        # t.start()
        # t.join()

		
        try:
            await getattr(self, task_fun)()
            await asyncio.sleep(1)
            self.logger.warning(f'{task_name} -> DONE.')
            self.task_state = True
        except Exception as e:
            self.logger.error(e)
            self.task_state = True
        finally:
            await self.close_page()
            await asyncio.sleep(1)

    def check_task(self, begin_time, task_name):
        while True:
            if self.task_state:
                return True

            now_time = int(time.time())
            n = now_time - begin_time
            if n > 120:
                self.logger.warning(f'{task_name} runtime {n}s.')
                return False
            time.sleep(1)
           
643 次点击
所在节点    问与答
3 条回复
ysc3839
2021-01-20 17:29:55 +08:00
我不了解 Python 的 async,如果是 js 的话可以用 Promise.race,然后取消 task 运行。
Vegetable
2021-01-20 17:31:32 +08:00
想得倒是挺多,不过你没感觉这是一个很常见的需求吗?

https://docs.python.org/zh-cn/3/library/asyncio-task.html#timeouts

async def main():
# Wait for at most 1 second
try:
await asyncio.wait_for(eternity(), timeout=1.0)
except asyncio.TimeoutError:
print('timeout!')

asyncio.run(main())
linw1995
2021-01-21 14:47:44 +08:00
如果不是同步堵塞住的话,用 `asyncio.wait_for` 就好了。同步堵塞的话,建议修改这个函数,通过 `loop.run_in_executor` 用副线程去跑,这样就不会堵塞住了。同时在外部用 `asyncio.wait_for`,且通过在 coro 内部去 try-except 这个超时而抛出的 `asyncio.CancelError` ,再通过 theading.Event 等方式,把取消事件传递给副线程的函数

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/746758

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX