请问下面异步嵌套的 await 为什么会导致 mongo 入库失败

2020-11-25 16:16:27 +08:00
 yagamil

请问下面异步嵌套的 await 为什么会导致 Task <Task pending coro=<fetch() running at xxxx.py:45> cb=[gather.<locals>._done_callback() 回调失败 ?

加入了异步插入 mongo 后导致的,去掉入库那一段就没问题。


mongo_client = AsyncIOMotorClient(connect_uri)
collection = mongo_client['db_stock']['new_stock_ttjj']

def parse_json(content):
   # 解析
    content += ';function getV(){return hsEnHLwG;}'
    ctx = execjs.compile(content)
    result = ctx.call('getV')
    return result

async def update_data(data):
   # 异步 mongo 入库 
    code = data['securitycode']
    found =  await collection.find_one({'securitycode':code})
    if not found:
        await collection.insert_one(data)


async def fetch(session,page):

    async with session.get(home_url.format(page),headers=headers) as resp:
        content = await resp.text()

        try:
            js_content = parse_json(content)
            for stock_info in js_content['data']:
                securityshortname = stock_info['securityshortname']

                print(securityshortname)
                await update_data(stock_info)
        except Exception as e:
            print(e)


async def main():
    async with aiohttp.ClientSession() as session:
        async with session.get(home_url.format(1), headers=headers) as resp:

            content = await resp.text()
            js_data = parse_json(content)
            pages = js_data['pages']
            tasks =[]
            for page in range(1,pages+1):
                task = asyncio.ensure_future(fetch(session,page))
                tasks.append(task)

            await asyncio.gather(*tasks)

asyncio.run(main())

1341 次点击
所在节点    Python
4 条回复
keepeye
2020-11-25 16:26:50 +08:00
task = asyncio.ensure_future(fetch(session,page))
似乎可以改成
task = fetch(session,page)
yagamil
2020-11-25 16:39:09 +08:00
@keepeye 问题不在这里。
yagamil
2020-11-25 17:12:23 +08:00
找到原因:
def run(main, *, debug=False):
if events._get_running_loop() is not None:
raise RuntimeError(
"asyncio.run() cannot be called from a running event loop")

if not coroutines.iscoroutine(main):
raise ValueError("a coroutine was expected, got {!r}".format(main))

loop = events.new_event_loop()

asyncio.run 内部定义了一个 loop,
moto 初始化的时候内部也用了这个。

用 loop 定义,motor 的定义放在 loop 后面。
n37r09u3
2020-11-25 17:25:26 +08:00
放后面是用同一个 loop 了 吧?

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/729160

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX