关于 API 限流:令牌桶到底怎么写才稳?

8 小时 9 分钟前
 matters

最近在重构一套多策略的量化执行层逻辑,又绕不开 Rate Limit 这个问题。分享一点在生产环境实现平滑令牌桶( Token Bucket )时的经验:

1. 为什么弃用定时器?

早期写限流,直觉是用 Ticker 每隔一段时间加 Token 。

但当你的系统需要维护几百个交易对、几千个独立桶时(尤其是在处理多账号或多币种权重不同时),系统上下文切换和定时器开销会变得非常显著。

——优化点: 改用延迟计算。

不用主动去加 Token ,而是在 Request 到达时,根据 CurrentTime - LastRequestTime 动态计算。这样即便你有 10,000 个桶,不活跃的桶也不会占用任何 CPU 。

2. 重视“权重( Weight )”

有的交易所 API 文档里,限流单位往往不是“请求次数”,而是“权重值”。

所以在设计令牌桶接口时,consume() 方法必须强制带上 weight 参数。如果你的限流器还停留在 count++阶段,在实战中基本没法用。

3. 处理网络抖动( Jitter )带来的假限流

理论上本地限流 10 次/秒,API 限制也是 10 次/秒,但因为网络抖动的存在,请求可能在某一毫秒“堆叠”到达服务端。

——避坑经验: 本地限流一定要比官方文档**保守 5%-10%**。同时,要在封装层实现一个简单的指数退避,捕获到 429 后立即收紧本地阈值,而不是死磕。

4. Python 示例

以下是抽象出来的一个最小原型,去掉了繁琐的业务逻辑,核心就是原子操作和延迟计算:

import time
import threading

class AllTickLimiter:
    def __init__(self, capacity: float, rate: float):
        """
        :param capacity: 桶容量(最大允许的突发请求权重)
        :param rate: 令牌恢复速率(每秒恢复的权重数)
        """
        self.capacity = float(capacity)
        self.rate = float(rate)
        self.tokens = float(capacity)
        self.last_tick = time.monotonic()
        self._lock = threading.Lock()

    def allow(self, weight: float = 1.0) -> bool:
        """
        检查当前令牌是否足够支付本次请求的权重
        """
        with self._lock:
            now = time.monotonic()
            # 1. 延迟计算:计算自上次请求以来生成的令牌
            delta = (now - self.last_tick) * self.rate
            self.tokens = min(self.capacity, self.tokens + delta)
            self.last_tick = now

            # 2. 尝试消费
            if self.tokens >= weight:
                self.tokens -= weight
                return True
            return False

    def sync_from_header(self, server_remaining: float):
        """
        利用响应头中的权威剩余量进行校准
        防止本地计算与服务端由于网络延迟导致的偏差
        """
        with self._lock:
            # 强制同步服务端返回的剩余额度
            self.tokens = min(self.capacity, server_remaining)
            self.last_tick = time.monotonic()

# --- 实战调用示例 —

# 假设你的 API 套餐是每秒 10 个 Token
limiter = AllTickLimiter(capacity=20, rate=10)

def get_market_data(symbol: str):
    # 假设查询实时报价权重为 1
    weight = 1.0
    
    if limiter.allow(weight):
        # 模拟 AllTick API 请求
        # response = requests.get(f"https://api.alltick.co/v1/quote?symbol={symbol}")
        # data = response.json()
        
        print(f"[{symbol}] 请求成功")
        
        # 进阶操作:从 Header 获取服务端权威数据进行同步
        # remaining = float(response.headers.get("X-RateLimit-Remaining", 20))
        # limiter.sync_from_header(remaining)
    else:
        print(f"[{symbol}] 触发本地限流,请求被拦截")

# 模拟快速并发请求
for i in range(15):
    get_market_data("BTCUSDT")

5. 分布式下的抉择

如果是单机策略,上面的逻辑足够。如果是多机集群,建议直接上 Redis + Lua 脚本。千万不要在分布式环境下尝试用各节点同步变量的方式做限流,一致性带来的延迟抖动会很折磨。

1322 次点击
所在节点    程序员
11 条回复
lp7631010
7 小时 53 分钟前
ai 给你刷刷刷写出来了
aababc
7 小时 48 分钟前
好像之前见过一个 GCRA 的算法可以参考一下
AlanAdam
4 小时 25 分钟前
用纯血 opus4.8 帮你优化
echoechoin
4 小时 14 分钟前
“不用主动去加 Token ,而是在 Request 到达时,根据 CurrentTime - LastRequestTime 动态计算。这样即便你有 10,000 个桶,不活跃的桶也不会占用任何 CPU 。” 我还以为都是这么做的
defunct9
4 小时 11 分钟前
```python
import time
import threading
from typing import Tuple

# 定义三色标记常量
COLOR_GREEN = "GREEN" # C 桶充足,完美放行
COLOR_YELLOW = "YELLOW" # C 桶不足但 E 桶充足,超额放行(突发)
COLOR_RED = "RED" # 双桶皆不足,拒绝请求

class DualBucketThreeColorLimiter:
def __init__(self, bc: float, be: float, cir: float):
"""
:param bc: Committed Burst Size (C 桶容量,承诺突发量)
:param be: Excess Burst Size (E 桶容量,超额突发量)
:param cir: Committed Information Rate (承诺信息速率,每秒恢复的 Token 数)
"""
self.bc = float(bc)
self.be = float(be)
self.cir = float(cir)

# 初始状态:双桶皆满
self.tokens_c = float(bc)
self.tokens_e = float(be)

self.last_tick = time.monotonic()
self._lock = threading.Lock()

def consume(self, weight: float = 1.0) -> Tuple[bool, str]:
"""
尝试消费指定权重的令牌,返回 (是否放行, 流量颜色)
遵循 MEF 10 / RFC 2698 标准的双速率三色标记算法逻辑变体
"""
with self._lock:
now = time.monotonic()
delta = now - self.last_tick
self.last_tick = now

# 1. 延迟计算:向桶内补充令牌
delta_tokens = delta * self.cir

# C 桶溢出的令牌会流入 E 桶
overflow_c = max(0.0, (self.tokens_c + delta_tokens) - self.bc)
self.tokens_c = min(self.bc, self.tokens_c + delta_tokens)
self.tokens_e = min(self.be, self.tokens_e + overflow_c)

# 2. 三色评估与消费逻辑
# 情况 🟢:C 桶令牌足够
if self.tokens_c >= weight:
self.tokens_c -= weight
return True, COLOR_GREEN

# 情况 🟡:C 桶不够,但 E 桶足够(借用突发额度)
elif self.tokens_e >= weight:
self.tokens_e -= weight
return True, COLOR_YELLOW

# 情况 🔴:双桶都不够
else:
return False, COLOR_RED

def sync_from_header(self, server_c_remaining: float, server_e_remaining: float):
"""
量化实战进阶:从交易所 Response Header 权威同步双桶状态
"""
with self._lock:
self.tokens_c = min(self.bc, server_c_remaining)
self.tokens_e = min(self.be, server_e_remaining)
self.last_tick = time.monotonic()
```
seanxx
2 小时 59 分钟前
AI 写呗 直接给几个方案跑跑找个你觉得合适的
Ayanokouji
2 小时 43 分钟前
套一层 apisix ,或者参考 apisix 的插件实现
longaiwp
2 小时 37 分钟前
问一下 AI ,得到几个方案,思考一下要哪个,这种东西烂大街了
lxxzml
2 小时 28 分钟前
@echoechoin 我也以为都是这样,因为我之前项目都是这样写的
aw2350
1 小时 38 分钟前
go rate + redis lua
Mandelo
15 分钟前
技术问题发帖前,可以先问问 AI

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/1217807

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX