最近在做实时行情相关的项目,刚开始觉得很简单:连上、收消息、处理数据就完事了。结果线上跑几天后发现各种问题:
后来重新整理了一套比较稳的方案,核心有 3 个:
WebSocket 建立连接不一定就一直在线。运营商网络、NAT 设备、负载均衡器都可能清理长时间无流量的连接。所以一定要做心跳。 如果服务端支持 ping/pong ,直接使用协议层心跳即可。 如果不支持,可以定时发送业务心跳消息。 例如:
async def heartbeat(ws):
while True:
try:
await ws.ping()
await asyncio.sleep(30)
except Exception:
break
30 秒左右发一次通常就够用了。
WebSocket 断开是常态,不是异常。所以不要把代码写成:
await websocket.recv()
然后指望它永远不掉线。 我觉得应该是:
while True:
try:
await connect()
except Exception as e:
print(e)
await asyncio.sleep(5)
连接断开以后自动重新建立连接。这样即使服务端重启或者网络闪断,也能自动恢复。
一个常见错误可能是⬇️
断线:
while True:
try:
connect()
except:
pass
结果:
这时候就需要退避机制,也就是每次失败后逐渐增加等待时间。例如:
delay = 1
while True:
try:
await connect()
delay = 1
except Exception:
await asyncio.sleep(delay)
delay = min(delay \* 2, 60)
等待时间变成 1s 、2s 、4s 、8s 、16s 、32s 、60s...
连接恢复后再重置。这样会稳定很多。
连接建立后订阅需要的品种即可,以 BTC 为例:
import asyncio
import json
import websockets
WS\_URL = "wss://quote.alltick.io/quote-b-ws-api"
API\_TOKEN = "YOUR\_API\_TOKEN"
async def subscribe():
async with websockets.connect(WS\_URL) as ws:
sub\_msg = {
"cmd\_id": 22002,
"seq\_id": 1,
"trace": "test",
"data": {
"symbol\_list": \[
{
"code": "BTCUSDT"
}
\]
},
"token": API\_TOKEN
}
await ws.send(json.dumps(sub\_msg))
while True:
msg = await ws.recv()
print(msg)
asyncio.run(subscribe())
如果是生产环境,我一般会把:
全部封装到一个 ConnectionManager 里面。业务层只负责消费消息,不关心连接状态。这样后面维护会轻松很多。
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.