请大佬帮忙瞄一眼我这个丑陋的异步协程代码

285 天前
 ohayoo

迫于机器配置太低,用多进程多线程,一秒钟才处理几百条 uri ,于是想着来用异步协程来试下,看着文档写出了这样一个丑陋的代码,搞了几万条 uri 测试了下,好像也没啥问题,不打印结果到屏幕的话,一秒钟差不多可以处理 1000 条,大概有这么几个步骤:

我现在的困惑是:

耽误大佬周五下午一点点时间,帮忙瞅一眼,不胜感激!

#!/usr/bin/env python
# -*- coding: utf-8 -*-


import asyncio
from asyncio import Semaphore
from aiohttp import ClientSession
from itertools import islice


def get_lines_iterator(filename, n=1000):
    with open(filename) as fp:
        while True:
            lines = list(islice(fp, n))
            if lines:
                yield lines
            else:
                break


async def delete_file(uri: str,
                      session: ClientSession,
                      sem: Semaphore) -> int:
    headers = {'Authorization': 'xxxxxxxxxxx'}
    url = api + uri
    async with sem, session.delete(url, headers=headers) as response:
        return uri, response.status


# 写法 1:
# async def main(uris):
#     sem = Semaphore(100)
#     async with ClientSession() as session:
#         tasks = [delete_file(uri, session, sem) for uri in uris]
#         await asyncio.gather(*tasks)


# 写法 2:
async def main(uris):
    sem = Semaphore(100)
    async with ClientSession() as session:
        async with asyncio.TaskGroup() as group:
            result = [group.create_task(delete_file(
                uri, session, sem)) for uri in uris]
            return result


if __name__ == '__main__':
    for lines in get_lines_iterator("uris.txt"):
        uris = [uri.strip() for uri in lines]
        result = asyncio.run(main(uris))
        for x in result:
            print(x.result())
2197 次点击
所在节点    Python
19 条回复
fzzff
285 天前
以下是对代码进行优化的建议:

1. 使用异步文件读取:可以使用`aiofiles`库来实现异步文件读取,从而避免阻塞事件循环。这将使得文件的读取操作也能并发进行,提高效率。

2. 使用异步上下文管理器:`aiohttp`支持异步上下文管理器,你可以使用`async with`语法来创建`ClientSession`,这样会更加简洁,而且会在任务完成后自动关闭会话。

3. 使用`asyncio.as_completed`:在并发执行任务时,可以使用`asyncio.as_completed`来获取已完成的任务,而不是等待所有任务都完成再处理结果。这样可以更早地得到一部分结果,并在需要时立即处理。

4. 异常处理:对于异步代码,异常处理十分重要。可以在任务执行时捕获异常,并记录错误信息,以便后续分析和处理。

下面是优化后的代码:

```python
import asyncio
import aiofiles
from aiohttp import ClientSession

async def delete_file(session: ClientSession, sem: asyncio.Semaphore, uri: str):
headers = {'Authorization': 'xxxxxxxxxxx'}
url = api + uri
try:
async with sem:
async with session.delete(url, headers=headers) as response:
return uri, response.status
except Exception as e:
# 处理异常,比如记录错误日志
print(f"Error occurred while processing {uri}: {str(e)}")
return uri, None

async def main(uris):
sem = asyncio.Semaphore(100)
async with ClientSession() as session:
tasks = [delete_file(session, sem, uri) for uri in uris]
for future in asyncio.as_completed(tasks):
uri, status = await future
if status is not None:
print(f"{uri}: {status}")
else:
print(f"{uri}: Error")

async def read_uris(filename):
async with aiofiles.open(filename, mode='r') as fp:
async for line in fp:
yield line.strip()

if __name__ == '__main__':
asyncio.run(main(read_uris("uris.txt")))
```

在优化后的代码中,我们使用`aiofiles`库来异步读取文件,并使用`async for`来逐行获取 URI 。同时,我们使用`asyncio.as_completed`来处理已完成的任务,这样在某些任务执行较慢时,可以更早地输出结果,提高实时性。另外,我们在`delete_file`函数中增加了异常处理,确保在出现异常时不会导致整个任务中断。
fzzff
285 天前
以上是 chatgpt 给出的代码优化建议, 另外个人建议你把请求库替换成 httpx
zong400
285 天前
api 端只能并发 100 ?那你代码优化得再好也没用啊,多部署几个在不同 ip 的服务器吧
wuwukai007
285 天前
api 并发 100 ,线程 100 并发不是绰绰有余吗?
fzzff
285 天前
asyncio.run(main(uris))无论如何不应该放到循环里, 每次 asyncio.run()会创建一个新的事件循环, 你这个代码跑完创建了一堆事件循环同时运行性能必然受损, 而且你的 result 接收到的也不是完整的结果, 如果你想并发运行一组任务应该用 asyncio.gather
ohayoo
285 天前
@zong400 没有,api 端没有说限制 100 ,只是我自己觉得太高了,估计会 ban 我,所以先填个 100 试一试
ohayoo
285 天前
@fzzff 我也觉得放到 for 循环里面肯定不对
liuwei889
285 天前
@fzzff 我也想到 chatgpt 优化了,啊哈哈哈
ohayoo
285 天前
手动 @下 @ClericPy 大佬帮忙看看
zzl22100048
285 天前
fzinfz
285 天前
Maerd
285 天前
最好的方案是使用 asyncio.Queue 来实现协程队列,不要把 asyncio.run 放到循环里,最好是从循环中往队列中塞 url ,然后使用 asyncio.create_task 来控制并发协程数量,从协程中不断读取队列内容实现并发,Semaphore 的实现是较为丑陋且性能低下的,可以看出你的代码风格还是受到了往进程池线程池硬塞内容风格的影响(我同事也喜欢这么写)
ClericPy
285 天前
1. 每次读 1000 行,避免内存占用过多
1000 行内存太小了, 给你个简单的换算公式: 1 万条 URL 大约 1MB 内存
如果 api 响应较慢, 你会很多时候在等最后少数几条结束才能开始下一轮. 不如像他们说的丢异步队列, 然后开 100 个 task 去消费直到 Queue 空

2. 利用 Semaphore 来控制并发数量为 100 ,避免 API 端把我给 ban 了
100 并发实际上很大, 一般 nginx 上带点限流的, 一秒超过 2 个请求就给你 ban 了
其次还要考虑目标网站承载能力, 否则只会导致 dos 然后失败率骤增. 不要敲打服务器 是爬虫第一课
如果这服务器是自己家的... 千万行, 考虑改造下批量任务, 甚至这种请求量的就不像 HTTP 场景

3. 复用 session
没啥问题, 早期 aiohttp 还有 bug 不关 Session 一大堆事, 你分段来做也还好

4. asyncio.run 与 get_event_loop
建议找个 IDE 点 goto definition 看看源码, python 很多内置库源码都值得学习一下.
一般情况下 asyncio.run 只运行一次, 其他各种逻辑放到 main 函数里就好了. 启动多次也没啥事

挺简单个爬虫为啥那么纠结, 写个十几万行啥问题都没了. 你都用上 TaskGroup 了, 看看官方文档吧

PS: 不是大佬, 辨别乱 Q...
ohayoo
285 天前
@ClericPy #13 哈哈哈哈 因为经常看到你在 py 区安利异步协程,所以。。。。
ohayoo
285 天前
感谢以上大佬的指教,我结合各位提到的点 再去琢磨琢磨
julyclyde
284 天前
没看明白为什么还有 uris=[ ] 这一行
你输入文件里每行有多个 uri 吗?
ohayoo
281 天前
@Maerd #12 特别感谢大佬的指导,用 threading 的时候知道要用 queue ,用协程却不知道应该去看看 asyncio 的 Queue ,我的我的,感谢大佬点醒我
zyxbcde
280 天前
都是些伪需求啊
文件那么点完全没必要分开读。
也没必要卡信号量,aiohttp 在创建 session 时候有个参数可以限制最大连接数。
另外我个人不太喜欢用上下文管理 session ,都是自己手动关闭。
ohayoo
280 天前
@zyxbcde #18 后面用了 asyncio.Queue 设置队列长度,开固定数量的 task 去消费等等等等,反正 ok 了,后面复杂的需求直接找个任务队列算了

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/960555

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX