请问有必要用专门的消息队列吗? all in sqldb 是否可行?

83 天前
 Need4more

首先是 message 表:

字段名 类型及描述
id 主键
status 消息状态(待处理,进行中,成功,失败)
try_count 消费者重试次数
lock_expires 锁过期时间
created 创建时间
data 消息数据

然后消费端轮询:

select *
from message
where status in ('PENDING', 'STARTED', 'FAILED')
and try_count < max_tries
and lock_expires < now()
order by created
limit 1 for update
skip locked;

解释下 sql:

  1. 状态筛选 (status IN ('PENDING', 'STARTED', 'FAILED')) 只选取未开始或已开始但未完成的任务(可能因崩溃需重启)。
  2. 重试保护 (try_count < max_tries) 确保任务重试次数未超过允许的最大值(避免无限重试失败任务)。
  3. 锁过期检查 (lock_expires < NOW()) 任务被处理时会加锁(设置未来过期时间),此条件筛选锁已过期的任务(说明此前处理进程异常退出)。
  4. 排序与限量 (ORDER BY created LIMIT 1) 按创建时间排序后取最旧的一条任务,实现公平调度。
  5. 并发控制 (FOR UPDATE SKIP LOCKED)
    • FOR UPDATE:锁定所选行,防止其他进程修改。
    • SKIP LOCKED:跳过已被其他进程锁定的行,避免阻塞等待,提升并发效率。
    • 此语法在 postgresql 、mysql(8.0)均支持。

利用关系数据库持久化消息,支持索引,可以灵活检索,关键是无需额外引入组件,请问这种方案是否可行呢?

3271 次点击
所在节点    程序员
31 条回复
adgfr32
82 天前
超时时间也不是很合理, 因为对于不同的任务, 能忍受的长度不一样, 感觉还是约定一个心跳间隔, 只要消息还在被处理, 就定时更新这条记录某个字段的时间. 这样当这个字段超过心跳间隔没有更新, 视为处理的进程已经无了.
Need4more
82 天前
select for update 会锁住记录,确保只会被单个地方消费,skip locked 确保不会阻塞消费者
@z1829909
adgfr32
82 天前
@Need4more GET, 那你更新 status 和 select 是在一个事务里对么
kaffka
82 天前
可以考虑添加成熟的 PG mq 拓展
spritecn
82 天前
没 redis leftPop+rightPush 好用,还有 3/5/10/15 秒后重试你这个不太好实现,fro update 如果消息处理时间稍长,一直占着连接是个问题
fionasit007
82 天前
@HENQIGUAI #7 用的阿里云 redis ,六七年的数据都还在,不过因为数据量太大不知道有没有丢数据的情况
spritecn
82 天前
@fionasit007 阿里 1G 一年 500 块,我好像 10 分之 1 都用不到
fionasit007
82 天前
@spritecn #27 个人的话我是肯定不用,挺贵的,不过这样的云服务用着就是省心,阿里云 redis 反正这么多次事故好像都幸免于难
mightybruce
82 天前
如果就是简单的消息队列需要落盘, 不如采用 redis + rocksdb 持久化的项目 kvrocks
https://github.com/apache/kvrocks

https://kvrocks.apache.org/docs/supported-commands/ 支持 redis 消息操作。
darrenxyli
81 天前
我们现在就是这么干的,做过压测,瓶颈全在数据库能够支持的 QPS 上面
Need4more
81 天前
@darrenxyli 性能能抗住吗?

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/1142852

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX