关于 asyncio 执行 IO 密集型操作的不解

2021-11-20 22:17:36 +08:00
 firejoke

有一个读文件然后写数据库的操作,想尝试使用协程。
使用协程的:

async def parse_text(file_path: Path, context_qs: [asyncio.Queue]):
    ql = len(context_qs)
    i = 0
    # 每一个 Queue 放 step 个数据就切换下一个
    step = 2
    with open(file_path, encoding="utf8") as f:
        for text in f:
            if i // step == ql:
                i = 0
            context_q = context_qs[i // step]
            context = {}
            text = re.findall(r"\d+", text)
            if text:
                context = {"解析然后组装成 dict"}
                await context_q.put(context)
                # 这里如果不 join ,会一直在这个 for 循环里不出去
                await context_q.join()
                i = i + 1
        else:
            await context_q.put("结束标记")
            return


async def write_db(context_q: asyncio.Queue, model: ModelBase):
    async with AsyncSession() as session:
        while 1:
            context = await context_q.get()
            if context["结束标记"] == "end":
                return
            info, obj = None, None
            try:
                if context["info"]:
                    info = await session.execute(
                        select(InfoModel).filter(
                            InfoModel.attr == context["info"]
                        )
                    )
                    info = info.scalars().one_or_none()
                    if not info:
                        info = InfoModel(attr=context["info"])
                        session.add(info)
                if context["header"]:
                    obj = await session.execute(
                        select(model).filter(
                            model.header == context["header"]
                        ).options(selectinload(getattr(model, "info")))
                    )
                    obj = obj.scalars().one_or_none()
                    if not obj:
                        obj = model(header=context["header"])
                        session.add(obj)
                if obj or info:
                    if info not in obj.info:
                        obj.info.append(info)
                        session.add(obj)
                    await session.commit()
            except Exception as e:
                await session.rollback()
                raise e
            else:
                context_q.task_done()


async def main():
	# 每个读取文件并解析的方法对应 c_q_count 个写数据库的方法
    c_q_count = 3
    a_context_qs = [asyncio.Queue() for i in range(c_q_count)]
    b_context_qs = [asyncio.Queue() for i in range(c_q_count)]
    tasks = [
        asyncio.create_task(
            parse_text(Path("a.txt"), a_context_qs)
        ),
        asyncio.create_task(
            parse_text(Path("b.txt"), b_context_qs)
        ),
    ]
    for i in range(c_q_count):
        tasks.append(asyncio.create_task(write_db(a_context_qs[i], AModel)))
        tasks.append(asyncio.create_task(write_db(b_context_qs[i], BModel)))
    await asyncio.gather(*tasks)



if __name__ == '__main__':
    asyncio.run(main(), debug=settings.DEBUG)

不使用协程的:

def sync_read_file():
    af = Path("a.txt").open(encoding="utf8")
    bf = Path("b.txt").open(encoding="utf8")
    with Session() as session:
        while 1:
            if af:
                try:
                    text = af.readline()
                    context = parse_text(text)
                    sync_write_db(session, context, AModel)
                except IOError:
                    af.close()
                    af = None
            if bf:
                try:
                    text = bf.readline()
                    context = parse_text(text)
                    sync_write_db(session, context, BModel)
                except IOError:
                    bf.close()
                    bf = None
            if not af and not bf:
                return


def sync_write_db(session, context, model):
    info, obj = None, None
    try:
        if context["info"]:
            info = session.execute(
                select(Info).filter(
                    Info.attr == context["info"]
                )
            )
            info = info.scalars().one_or_none()
            if not info:
                info = Info(attr=context["info"])
                session.add(info)
        if context["header"]:
            obj = session.execute(
                select(model).filter(model.info == context["info"]))
            obj = obj.scalars().one_or_none()
            if not obj:
                obj = model(info=context["info"])
                session.add(obj)
        if obj or info:
            if info not in obj.info:
                obj.info.append(info)
                session.add(obj)
            session.commit()
    except Exception as e:
        session.rollback()
        raise e


if __name__ == '__main__':
    sync_read_file()

这个协程的方法,每秒每个表可以写 400 多行,改为同步单线程的还是每秒写 400 多行。
不知道是我协程的用法有问题?还是说有别的什么原因?

3936 次点击
所在节点    Python
36 条回复
firejoke
2021-11-21 12:41:23 +08:00
@Contextualist #14 看文档的意思,是说用异步文件 IO ,在从内存读取时反倒会变慢,在从磁盘读取的时候会加快,在不同环境下其结果是不可预测的。那我如果单独用一个进程读取文件到内存,然后另一个进程从内存读取然后再操作,应该可以绕开这个问题。
firejoke
2021-11-21 12:48:27 +08:00
@locoz #20 我昨天最后也是改成用多进程了,一个进程专门读文件,然后放进队列,其他子进程从队列读,然后操作数据库,那看来我思路没跑偏。还有其他的解法吗?多进程和协程的结合,一般都是以多进程为主吗?
Contextualist
2021-11-21 14:01:03 +08:00
又看了一下你贴出来文件的部分,你是不是就两个大文件(就是说不是大量小文件),那文件 IO 就基本不可能是你的瓶颈,你看到磁盘读取没跑满很有可能是你下游的处理速度没跟上。

多进程和协程,感觉你自己也总结出来了。协程得用在有长时间等待系统调用 (syscall) 的地方(比如网络、子进程、定时任务)。CPU 密集的操作得用多线程或多进程,但在 Python 里有 GIL ,就只能用多进程。
firejoke
2021-11-21 14:17:56 +08:00
@Contextualist #23 是的,就是两个大文件,所以我也觉得文件 IO 不是我这里的瓶颈,协程在这个场景中没体现出他的优势,我已经改成了多进程了。
Contextualist
2021-11-21 14:26:48 +08:00
我对数据库不熟,不过我猜对于很多数据库并发写是不会有性能提升的,用单线程就可以了,但你可能需要 batch / bulk 操作,用来一次性插入数十条、数百条数据,而不是一次插入一条。
O5oz6z3
2021-11-21 15:05:20 +08:00
虽然不懂,看完楼上感觉原因之一在于 asyncio 的上限就是单线程,而单线程吞吐量不如多线程?
firejoke
2021-11-21 16:06:46 +08:00
@Contextualist #25 对欸!资源是消耗在每一条查询和写入的操作上,如果批量写,就可以降低写入频率,至于查询,我已经在查询字段上加了索引,我改一下试试。感谢~
然后我看到你之前提到的 trio ,看他的文档像是涉及到异步操作的都有涉及,感觉非常不错啊。
firejoke
2021-11-21 16:15:30 +08:00
@O5oz6z3 #26 不是,当不存在较长的 io 等待的时候,协程和单线程没差。
yufpga
2021-11-21 20:01:53 +08:00
大概看了下, 这瓶颈显然不是在 parse_text 中的文件读,就算再怎么阻塞,读写本地文件也不至于到每秒才 400 行的程度. 而在 write_db 中, 出现好几处 await 的地方, 这些地方可都是要同步等待结果返回的呀. 一个很好容易验证的方法就是把 write_db 中的 await 用 await asyncio.sleep 替换掉, 尝试不同的 sleep 时间. 实际上上面的问题在于每一次 while 1 的循环循环是同步的, 你必须要先处理完队列中的前一条数据, 才能继续处理下一条数据. 所以处理也很简单, 把每一次的循环异步化掉.
hustlibraco
2021-11-22 00:36:59 +08:00
用```async for```替代```for```可以吗?
firejoke
2021-11-22 09:23:13 +08:00
@hustlibraco #30 换成异步文件读,就可以换成 async for 了。
firejoke
2021-11-22 09:33:38 +08:00
@yufpga #29 我看日志里,我同时开了好多个 task ,这个 task 的循环里 await query 或 add 或 commit ,就会跳到另一个 task 的循环里的 query 或 add 或 commit 。
yufpga
2021-11-22 10:17:06 +08:00
@firejoke 是我看差了, 我以为只有一个 queue, 而你的代码里是两个 context, 各自 3 个 queue, 也就是总共 6 个 queue, 对应 6 个 write_db 的 task. 当遇到 await 的时候, 确实是会跳转到别的 task 里面执行. 确实比较奇怪,但我仍然觉得瓶颈不大可能在 parse_text, 你可以试着记录一下队列写入数据的速率, 如果这个速率也在 400/s 左右, 那说明确实有可能是 parse_text 慢了
ohayoo
2021-11-22 23:33:57 +08:00
@firejoke 老哥可以分享下多进程版本的代码吗?
firejoke
2021-11-23 22:00:11 +08:00
@ohayoo #34 还在调试多进程和协程的组合,后面会贴一下的。
mlbjay
299 天前
python 的 asyncio 是纯用户态线程,同步 io 会阻塞整个线程及其中的所有协程。
Golang 的 MPG 模型就解决的这个问题。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/816841

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX