mysql 实现任务队列的疑问

2016-05-14 13:10:54 +08:00
 Keshawn

请教各位,我现在有个系统,想由原来的单个 worker 处理任务扩展成多个 worker ,有些问题没考虑明白,希望能够得到指点。

场景

用户请求对应一个作业 job ,存放在 Mysql 的 job 表中,通过 status 字段判断状态( pending :待处理, finished :已完成,等等),后台有多个 worker 轮询 job 表,读取 pending 状态的 job ,然后进行处理

问题

  1. worker 轮询的条件仅仅是根据 job 的状态,如何保证多个 worker 不会读到同一条记录?读到后立即 job 状态吗?
  2. 因为不限制的话, worker 会把当前所有 pending job 读取出来,假如让 worker 每次只读取一条记录,如何实现?需要什么判断条件?
  3. 有没有必要使用 redis 来实现任务队列?
  4. 不知道还有没有其他更好的方案
7342 次点击
所在节点    MySQL
22 条回复
des
2016-05-14 13:45:03 +08:00
如果不多的话可以用锁,
多了可以开一个 worker 专门用来分发任务
spance
2016-05-14 14:09:34 +08:00
用 RDBMS 做共享通信,代价太大,还不合适。
这不就是典型的消息中间件的应用场景么, activeMQ, rabbitMQ 还有很多。
非要用 RDBMS 基本上就是锁表了,如果你要严格的并发一致性的话,基本上就没什么性能可言了。
neoblackcap
2016-05-14 14:57:08 +08:00
redis 去做消息队列比你 MySQL 强多了。
redis 是在内存里面跑,还没有锁,一个 pub/sub 即可搞定。
当然你对消息要求更高的话就上各种消息队列中间件, rabbitMQ 什么的,能对你的消息进行持久化。
cevincheung
2016-05-14 15:16:38 +08:00
轻量级消息队列 beanstalkd
lecher
2016-05-14 15:31:18 +08:00
redis 做起来更方便, lpub/lsub 都是原子操作。
业务逻辑实现起来更简单,代码量也少。

mysql 一定要做,可以考虑这样的操作:
update task set status='process' where status='pending' limit 1;
只要并发不是很恐怖,这个操作可以当成是原子操作,只有符合 status='pending'的才会被更新,拿到成功更新的状态就说明成功取到任务,然后取具体的数据进行处理。
armoni
2016-05-14 15:33:07 +08:00
如上, MQ 即可
miaoever
2016-05-14 15:55:16 +08:00
我们线上某个系统就使用了 db 作为消息队列(各种历史原因吧)。 系统有多个 slave 作为生产者会向 db 中插入消息( insert ), 同时,根据(固定的)消费者数量,插入的时候还会给每条消息上的某个字段填上任意一个消费者的唯一标示。有一个 master 会起若干线程作为消费者去 db 中捞取属于自己的消息进行消费( select, delete )。可能是目前消息量还不够大?(每天百万级别没到千万),遇到的问题基本都不在 db 的性能上。最大的困扰是为了实现很多 MQ 基本的语义(比如上面提到的 consume once ),你需要自己在业务上做很多工作,而你会发现自己在业务代码中来保证语义正确同时兼顾水平扩展性,确实不是件容易的事情。
miaoever
2016-05-14 16:02:37 +08:00
再补充一下我的观点,遇到需要做持久化的场景,数据库的性能很有竞争力。更多需要考虑的是业务上的场景和需求。
yangyaofei
2016-05-14 17:46:25 +08:00
内存里维护一份 job 然后加锁不行么?
yangyaofei
2016-05-14 18:06:30 +08:00
最近也在做类似的东西,但是我的 job 的数量会很少....

我是直接进程间队列传输 job(我叫 task)的参数和要执行的 action,给 taskManager,启动一个进程,或者终结一个进程等等任务.然后确定启动或者终止后会在相应的 taskList 中添加或者除去这个 task.

处理的结果和 task 运行中的状态由 task 自己写入数据库

获取结果的时候直接从另一个进程中获取


当然我这个简单,没有考虑到 task 很多需要排队的问题,关于这个我是这样设想的.用信号量(其实一个 queue 啊什么的都一样),信号量初始值是 process_MAX,taskManager 将其-1 一次就生成一个 task,等于 0 就 block,task 完成就将这个信号量加 1.
另外有一个进程专门将没有开始的 job 扔进一个 queue 里面,到了 max_size 就 block.taskManager 每次从这个 queue 里面取一个 task 执行就好了


当然我这个是小量的时候管用,大量了锁使瓶颈吧...可以将这个开 N 倍,每个部分负责 task_ID%N=1,2,3,4 等等部分就可以了吧....或者还可以精简构架,但是大概意思就是这个意思
gamexg
2016-05-14 18:16:58 +08:00
用过数据库做过任务队列,不过我每个任务最少需要半个小时,所以不用在意性能。
当时是使用的事务+ select update + limit 1 来取任务,在事务内更新状态。
realpg
2016-05-14 18:53:09 +08:00
非要用 mysql
select for update
做好 limit 即可
mahone3297
2016-05-15 17:24:51 +08:00
大家都说锁。
锁的问题是,当你多个 worker 起来去消费这个消息的时候,大家都在争这个消息,最后只有 1 个人得到了,其他的人都没得到,你多个 worker 的效果是完全没有的。
还是最好要限制某个消息只能某个 worker 能获取比较好。
kimmykuang
2016-05-15 22:33:47 +08:00
我们现在线上的做法是开了 10 张 queue 表,起 10 个进程来消费这 10 张表,其实对于每个 queue 都是单进程消费,所以不存在竞争的问题,然后就是有个调度器来负责将 job 分配到这 10 个 queue 里去,但是还是有一些可优化的地方的,如 job 的优先级等,只能说目前满足了线上业务的需求,也一直想换成 redis 或者干脆用 MQ ,但是一直没有这个时间。
wengang285
2016-05-16 09:50:09 +08:00
1 、读取之后立马 update set status = 处理中 where status= pending
2 、 limit 1
3 、有,但是需要自己监控队列长度,达到一定长度之后拒绝写入,否则容易吃满内存
4 、 kafka
realpg
2016-05-16 12:29:26 +08:00
@wengang285
高并发系统是不能这么搞的。
1ms 的时间差就会被处理掉。
之前有一个超高并发的系统用 MYISAM ,没有事务支持,没有 select for update
当时我做的方案是以 update 代 select
字段加 status1 status2 类型 unsigned bigint 20
从程序生成两个随机数 a1 a2 如果要消费 5 个 那就
update xxxxxx set status1=a1 status2=a2 limit 5
然后再二次 select where status1=a1 status2=a2
这样才能保证不会出问题
wengang285
2016-05-16 14:38:29 +08:00
@realpg 我不知道你说的哪条有问题,如果是第一条, staus 新增一个处理中的状态,一个 worker 读取一条记录之后,立马带 where 条件 update staus 为处理中,如果失败,表示这条记录已经被别的 worker 修改为处理中或者处理完成了,那么流程退出;如果 update 成功,那么这条记录就不会被别的 worker 拿到了,可以满足 lz 的要求啊,而且这种方法我在你说的高并发环境下也用过,并没有什么问题啊。
你说的方法,有点把简单问题复杂化,想象一下这样一种情况,如果两个 worker 随机的 a1 和 a2 是一样的,那你的代码会不会一次消耗 10 个呢?
changwei
2016-05-16 15:34:06 +08:00
这个要看用户规模吧, t.changwei.me/2 这种云端抢二楼应用也是用 mysql 实现类似于任务队列的功能,用户数量不过几百人,执行成功率之类的数据看起来还行。
changwei
2016-05-16 15:38:43 +08:00
@kimmykuang 但是如果后期要扩展 worker ,那么表数量也要跟着改,是不是有紧耦合问题,还有就是新增任务的时候也要考虑到十张表都是均衡插入,不会出现一张表加多了任务,另一张表加少了任务?
kimmykuang
2016-05-16 17:19:08 +08:00
@changwei 一张表多几个任务另一张表少几个任务在目前还是可以容忍的点,基本是轮询分配任务的

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/278597

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX