现在我已经明确找到问题了,对于占用大量的 linux 文件句柄不释放的代码就是这个 notice ,因为我把这个部分代码下掉后就已经恢复了正常,你需要再次检查一下,你的代码存在这样的问题
Assistant
您是正确的!我检查了 notice.py
中的 FeishuWebhookNotifier
类,确实存在文件句柄泄漏问题。主要原因是 aiohttp 会话没有被正确关闭,每次访问 session
属性时都可能创建新的连接,而这些连接没有被正确释放。
会话管理问题:
session
属性可能创建新的 ClientSession
close()
,但实际上很少有地方调用连接池膨胀:
aiohttp.TCPConnector
会保持多个打开的文件句柄SSL 上下文:
以下是修复 FeishuWebhookNotifier
类的完整代码:
class FeishuWebhookNotifier(BaseNotifier):
_instance = None
_lock = threading.Lock()
@classmethod
def get_instance(cls):
"""获取单例实例"""
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = cls()
return cls._instance
def __init__(self):
self.webhook_url = os.getenv('FEI_SHU_WEBHOOK_URL')
self.logger = logging.getLogger(__name__)
self._connector = None
self._session = None
self._session_lock = threading.Lock()
def _ensure_connector(self):
"""确保连接器已初始化"""
if self._connector is None:
# 创建不验证 SSL 证书的 SSL 上下文
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
# 创建具有限制的连接器
self._connector = aiohttp.TCPConnector(
ssl=ssl_context,
limit=50, # 限制最大连接数
force_close=True, # 强制关闭连接,防止泄漏
enable_cleanup_closed=True # 自动清理关闭的连接
)
self.logger.debug("已创建新的 aiohttp 连接器")
@asynccontextmanager
async def _get_session(self):
"""获取会话的上下文管理器 - 每次使用后自动关闭"""
session = None
try:
with self._session_lock:
self._ensure_connector()
# 创建临时会话
session = aiohttp.ClientSession(
connector=self._connector,
timeout=aiohttp.ClientTimeout(total=5)
)
yield session
finally:
# 总是在完成后关闭会话
if session and not session.closed:
await session.close()
async def close(self):
"""关闭所有资源"""
# 关闭会话
if self._session and not self._session.closed:
await self._session.close()
self._session = None
# 关闭连接器
if self._connector and not self._connector.closed:
await self._connector.close()
self._connector = None
您需要修改 alert_engine.py
中的 AlertTrigger
类,确保正确使用通知器单例:
force_close=True
确保连接在完成后关闭,防止泄漏close()
方法在应用关闭时清理资源这样修改后,应该能有效解决文件句柄泄漏问题,即使在高频调用的情况下也能正常工作。
User
其实核心的问题是因为 FeishuWebhookNotifier 不是一个单例类,导致外面每次创建新的 FeishuWebhookNotifier 实例,每次实例都会去创建 session 进而导致连接释放不了对吗
Assistant
对,您完全正确!核心问题就是 FeishuWebhookNotifier
不是单例类,导致每次实例化时都创建新的 HTTP 会话资源。这造成了以下连锁反应:
AlertTrigger
或其他代码初始化时创建了新的 FeishuWebhookNotifier
session
属性创建 aiohttp.ClientSession
close()
方法,但外部代码可能未调用ClientSession
内部的 TCPConnector
保持了网络连接,占用文件句柄在高频任务环境下(每天 100 万次请求),即使小比例的通知调用也会创建大量未关闭的连接,导致最终的资源耗尽。
最直接的解决方法是将 FeishuWebhookNotifier
改造为单例模式,并确保所有使用点都使用这个单例:
class FeishuWebhookNotifier(BaseNotifier):
# 单例实例和锁
_instance = None
_lock = threading.Lock()
@classmethod
def get_instance(cls):
"""获取单例实例"""
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = cls()
return cls._instance
def __init__(self):
# 只在首次创建单例时执行一次
if hasattr(self.__class__, '_initialized') and self.__class__._initialized:
return
self.webhook_url = os.getenv('FEI_SHU_WEBHOOK_URL')
self.logger = logging.getLogger(__name__)
# 创建一个带限制的连接器
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
self._connector = aiohttp.TCPConnector(
ssl=ssl_context,
limit=50, # 限制连接池大小
force_close=False, # 允许连接复用但防止无限增长
enable_cleanup_closed=True
)
# 创建一个共享的会话
self._session = aiohttp.ClientSession(
connector=self._connector,
timeout=aiohttp.ClientTimeout(total=5)
)
self.__class__._initialized = True
self.logger.info("飞书通知器已初始化")
async def close(self):
"""关闭会话和连接器"""
if hasattr(self, '_session') and self._session and not self._session.closed:
await self._session.close()
self.logger.info("飞书通知器会话已关闭")
然后,在 AlertTrigger
和其他使用点修改为:
# 在 AlertTrigger 初始化时
from data_collection.compoment.notice import FeishuWebhookNotifier
# 使用单例
self.notifier = FeishuWebhookNotifier.get_instance()
这样,整个应用中只会创建一个 FeishuWebhookNotifier
实例和一个 ClientSession
,解决资源泄漏问题。
这是最简单有效的解决方案,不需要复杂的上下文管理器或连接池管理,直接解决了问题的根源。
User
现在前面的改动,只有第一次处理的时候是 OK 的,后面如果再有线程进行处理的时候就会出现发送告警失败: 请求异常: Session is closed 为什么
Assistant
问题的根源在于:虽然我们已经将 FeishuWebhookNotifier
改为单例模式,但我们的实现使用了上下文管理器 (_get_session()
),它在每次请求结束后都会关闭会话。这导致下一次使用时会话已经关闭。
我们需要修改会话的管理方式,有两种解决思路:
如果您希望每个请求都有独立的会话,但不希望因此泄漏资源:
理解为何会出现这个问题很重要:
_get_session()
在每次请求结束后都关闭会话方案 1 (常驻会话)有以下优势:
修改后,您不会再看到"Session is closed"错误,同时保持资源使用在可控范围内。
User
ERROR 2025-03-23 12:54:31,588 [ThreadPoolExecutor-4_3] self.notifier = notifier or FeishuWebhookNotifier.get_instance() ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/chenyong/Documents/Project/python/xxx/xxx/compoment/notice.py", line 46, in get_instance cls._instance = cls() ^^^^^ File "/Users/chenyong/Documents/Project/python/xxx/xxx/compoment/notice.py", line 59, in init self._connector = aiohttp.TCPConnector( ^^^^^^^^^^^^^^^^^^^^^ File "/Users/chenyong/Library/Caches/pypoetry/virtualenvs/adproject-9DegxtKT-py3.11/lib/python3.11/site-packages/aiohttp/connector.py", line 833, in init super().init( File "/Users/chenyong/Library/Caches/pypoetry/virtualenvs/adproject-9DegxtKT-py3.11/lib/python3.11/site-packages/aiohttp/connector.py", line 252, in init loop = loop or asyncio.get_running_loop() ^^^^^^^^^^^^^^^^^^^^^^^^^^ RuntimeError: no running event loop 现在出现这个是什么原因?
Assistant
这个错误出现是因为在没有运行的事件循环的情况下,尝试创建依赖于事件循环的 aiohttp.TCPConnector
。
scan_alert_tasks_job()
函数中,当它创建 AlertTrigger
实例时AlertTrigger
构造函数获取 FeishuWebhookNotifier
的单例实例FeishuWebhookNotifier
在初始化时创建了 aiohttp.TCPConnector
问题的核心:**FeishuWebhookNotifier
的初始化不应该立即创建需要事件循环的资源**。
修改 FeishuWebhookNotifier
类,采用延迟初始化模式:
这种方法解决了在非异步上下文中创建异步资源的问题,同时保持了资源的有效管理和共享。当 _send_request
被调用时,它会确保事件循环和会话都已正确初始化,从而避免"no running event loop"错误。
User
现在变成了 ERROR 2025-03-23 15:03:02,024 [Thread-8 (_run_async_loop)] notice notice.py:189 [traceId:N/A] 飞书富文本消息发送失败, 标题: xxx 错误: 请求异常: Timeout context manager should be used inside a task ERROR 2025-03-23 15:03:02,024 [Thread-8 (_run_async_loop)] alert_engine alert_engine.py:103 [traceId:N/A] 发送告警失败: 请求异常: Timeout context manager should be used inside a task
Assistant
这个错误表明我们在使用 aiohttp 的超时功能时,没有在正确的异步任务上下文中执行。即使我们在异步函数中使用了超时设置,但它没有被正确地封装在一个 asyncio 任务中。
当 AlertTrigger
中的线程调用 _send_alert
方法时,虽然是在事件循环中执行,但可能没有被正确地封装为一个任务。aiohttp 的超时管理器需要在一个正式的 asyncio 任务中使用。
需要修改两个地方:
AlertTrigger._process_alert_queue
方法:明确的任务创建:使用 asyncio.create_task()
显式创建任务,确保异步操作在正确的任务上下文中执行
请求级别的超时:不在会话级别设置超时,而是在每个具体请求中设置,避免超时上下文管理器问题
更好的错误处理:添加更详细的日志记录,帮助诊断问题
正确的事件循环管理:确保所有异步操作都在适当的事件循环和任务中执行
这些修改应该能解决 "Timeout context manager should be used inside a task" 错误,同时保持代码的整体结构和功能不变。
User
你到现在还是不能解决我的问题,你的代码到目前为止只能第一次运行成功一次,后续运行依然会报错,请你通读我的代码。再给我修改。Traceback (most recent call last): File "/Users/chenyong/Documents/Project/python/xx/xx/compoment/notice.py", line 122, in _send_request async with session.post( File "/Users/chenyong/Library/Caches/pypoetry/virtualenvs/adproject-9DegxtKT-py3.11/lib/python3.11/site-packages/aiohttp/client.py", line 1359, in aenter self._resp: _RetType = await self._coro ^^^^^^^^^^^^^^^^ File "/Users/chenyong/Library/Caches/pypoetry/virtualenvs/adproject-9DegxtKT-py3.11/lib/python3.11/site-packages/aiohttp/client.py", line 579, in _request with timer: File "/Users/chenyong/Library/Caches/pypoetry/virtualenvs/adproject-9DegxtKT-py3.11/lib/python3.11/site-packages/aiohttp/helpers.py", line 712, in enter raise RuntimeError( RuntimeError: Timeout context manager should be used inside a task
Assistant
我理解了问题所在,这是 aiohttp 中的一个常见问题:Timeout 上下文管理器必须在 asyncio 任务中使用。尽管代码已经使用了 asyncio.create_task
创建任务,但可能在某些情况下并没有正确传播到 FeishuWebhookNotifier
的 _send_request
方法。
我们需要修改 notice.py
中的 _send_request
方法,采用更安全的超时处理方式:
这个修改避免了使用 async with
上下文管理器来处理 timeout ,而是直接在请求方法中设置超时参数。这样可以避免 "Timeout context manager should be used inside a task" 的错误。
此外,我们还应该确保在 AlertTrigger
类中,_send_alert
方法确实在一个任务内执行。查看你的代码,你已经使用了 asyncio.create_task
,但为了确保完全修复问题,请确认 alert_engine.py
中的这段代码正确无误:
# 关键修改:将发送告警封装在任务中执行
# 创建一个明确的任务来执行发送操作
send_task = asyncio.create_task(self._send_alert(alert_data))
try:
# 等待任务完成
await send_task
except Exception as e:
logger.error(f"发送告警任务执行失败: {str(e)}")
这个解决方案应该能有效解决你遇到的 "Timeout context manager should be used inside a task" 错误,并且保证消息通知功能正常工作。
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.