有一个读文件然后写数据库的操作,想尝试使用协程。
使用协程的:  
async def parse_text(file_path: Path, context_qs: [asyncio.Queue]):
    ql = len(context_qs)
    i = 0
    # 每一个 Queue 放 step 个数据就切换下一个
    step = 2
    with open(file_path, encoding="utf8") as f:
        for text in f:
            if i // step == ql:
                i = 0
            context_q = context_qs[i // step]
            context = {}
            text = re.findall(r"\d+", text)
            if text:
                context = {"解析然后组装成 dict"}
                await context_q.put(context)
                # 这里如果不 join ,会一直在这个 for 循环里不出去
                await context_q.join()
                i = i + 1
        else:
            await context_q.put("结束标记")
            return
async def write_db(context_q: asyncio.Queue, model: ModelBase):
    async with AsyncSession() as session:
        while 1:
            context = await context_q.get()
            if context["结束标记"] == "end":
                return
            info, obj = None, None
            try:
                if context["info"]:
                    info = await session.execute(
                        select(InfoModel).filter(
                            InfoModel.attr == context["info"]
                        )
                    )
                    info = info.scalars().one_or_none()
                    if not info:
                        info = InfoModel(attr=context["info"])
                        session.add(info)
                if context["header"]:
                    obj = await session.execute(
                        select(model).filter(
                            model.header == context["header"]
                        ).options(selectinload(getattr(model, "info")))
                    )
                    obj = obj.scalars().one_or_none()
                    if not obj:
                        obj = model(header=context["header"])
                        session.add(obj)
                if obj or info:
                    if info not in obj.info:
                        obj.info.append(info)
                        session.add(obj)
                    await session.commit()
            except Exception as e:
                await session.rollback()
                raise e
            else:
                context_q.task_done()
async def main():
	# 每个读取文件并解析的方法对应 c_q_count 个写数据库的方法
    c_q_count = 3
    a_context_qs = [asyncio.Queue() for i in range(c_q_count)]
    b_context_qs = [asyncio.Queue() for i in range(c_q_count)]
    tasks = [
        asyncio.create_task(
            parse_text(Path("a.txt"), a_context_qs)
        ),
        asyncio.create_task(
            parse_text(Path("b.txt"), b_context_qs)
        ),
    ]
    for i in range(c_q_count):
        tasks.append(asyncio.create_task(write_db(a_context_qs[i], AModel)))
        tasks.append(asyncio.create_task(write_db(b_context_qs[i], BModel)))
    await asyncio.gather(*tasks)
if __name__ == '__main__':
    asyncio.run(main(), debug=settings.DEBUG)
不使用协程的:
def sync_read_file():
    af = Path("a.txt").open(encoding="utf8")
    bf = Path("b.txt").open(encoding="utf8")
    with Session() as session:
        while 1:
            if af:
                try:
                    text = af.readline()
                    context = parse_text(text)
                    sync_write_db(session, context, AModel)
                except IOError:
                    af.close()
                    af = None
            if bf:
                try:
                    text = bf.readline()
                    context = parse_text(text)
                    sync_write_db(session, context, BModel)
                except IOError:
                    bf.close()
                    bf = None
            if not af and not bf:
                return
def sync_write_db(session, context, model):
    info, obj = None, None
    try:
        if context["info"]:
            info = session.execute(
                select(Info).filter(
                    Info.attr == context["info"]
                )
            )
            info = info.scalars().one_or_none()
            if not info:
                info = Info(attr=context["info"])
                session.add(info)
        if context["header"]:
            obj = session.execute(
                select(model).filter(model.info == context["info"]))
            obj = obj.scalars().one_or_none()
            if not obj:
                obj = model(info=context["info"])
                session.add(obj)
        if obj or info:
            if info not in obj.info:
                obj.info.append(info)
                session.add(obj)
            session.commit()
    except Exception as e:
        session.rollback()
        raise e
if __name__ == '__main__':
    sync_read_file()
这个协程的方法,每秒每个表可以写 400 多行,改为同步单线程的还是每秒写 400 多行。
不知道是我协程的用法有问题?还是说有别的什么原因?
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.