[asyncio] 多线程中子线程使用主线程 event loop,这样是否与直接用主线程去进行事件循环无差别

2021-03-01 14:44:07 +08:00
 opengo

使用concurrent.futures创建线程池然后使用asynciorun_in_executor方法去利用线程池执行某异步任务,具体代码如下:

# -*- coding: utf-8 -*-
import asyncio
import concurrent.futures
import threading
import time
import logging

from datetime import datetime


logger = logging.getLogger(__file__)
logger.setLevel('INFO')


async def run_task(val):
    logger.info(f"run_task Owning thread - {threading.currentThread().native_id} {threading.currentThread().getName()}")
    await asyncio.sleep(val)
    return datetime.now()


def thread_task(loop, val):
    logger.info(f"thread_task Start thread - {threading.currentThread().ident} {threading.currentThread().getName()}")

    v = asyncio.run_coroutine_threadsafe(run_task(val), loop)
    result = v.result()

    logger.info(f"thread - {threading.currentThread().ident}  {threading.currentThread().getName()} done ==== {result}")


async def main(loop):
    result = [0.1, 0.2, 0.3, 0.4, 0.5]
    logger.info(f"main thread :: {threading.currentThread().ident} {threading.currentThread().getName()}")

    with concurrent.futures.ThreadPoolExecutor() as pool:
        blocking_tasks = [
            loop.run_in_executor(
                pool, thread_task, loop, val
            ) for val in result
        ]
        await asyncio.gather(*blocking_tasks)


if __name__ == '__main__':

    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main(loop))
    finally:
        loop.close()

使用run_in_executor调用thread_task方法,再在子线程中使用 asyncio.run_coroutine_threadsafe 方法去调用真正的异步任务,而实际上源码中 asyncio.run_coroutine_threadsafe 方法通过ensure_future 创建了一个future 添加到主线程的 event loop 中并绑定当前线程 future,子线程的异步任务在主线程中被循环,循环完成后再从子线程中去获取结果


感觉上不如直接在主线程中使用asyncio.gather 或者 asyncio.as_completed 去并发执行这些任务,另一种办法是每个子线程再设置一个 loop 去进行事件循环,但是实际测试中这几种方案性能相差并不多

# 使用 asyncio.gather 或 asyncio.as_completed
task_list = []
    
    for val in result:
        task_list.append(asyncio.create_task(run_task(val)))
        
    await asyncio.gather(*task_list)

    for f in asyncio.as_completed(task_list, loop=loop):
        results = await f


# 设置新的 loop
def thread_task(val):

    logger.info(f"thread_task Start thread - {threading.currentThread().ident} {threading.currentThread().getName()}")

    loop = asyncio.new_event_loop()
    try:
        asyncio.set_event_loop(loop)
        v = loop.run_until_complete(asyncio.gather(run_task(val)))
        logger.info(f"thread - {threading.currentThread().ident}  {threading.currentThread().getName()} done ==== {result}")
    finally:
        loop.close()


大家有什么好的多线程+协程的实现方案吗,这里对应的场景是同时处理多个文件的 io 任务

1387 次点击
所在节点    Python
4 条回复
liuxingdeyu
2021-03-02 10:09:01 +08:00
没太搞明白,线程加协程的意义
opengo
2021-03-02 10:19:07 +08:00
@liuxingdeyu 可以多线程去进行网络请求,文件 IO,这里我想利用多线程去处理文件 IO,利用协程去提高性能,多进程+协程的方式面对大量文件 IO 操作性能更好,但是资源开销也更大,所以想尝试多线程+协程的最优方案
linw1995
2021-03-02 11:37:47 +08:00
应该等同,但会更慢
opengo
2021-03-02 23:06:19 +08:00
@linw1995 确实不如单线程+协程,应该是线程池化+上下文切换消耗的

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/757268

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX