请教一个 NATS-STRAMING 的问题,不知道是不是个 bug…如果先 publish 消息,然后再启动 QueueSubscriber 的话,相同队列中只有一个 Subscriber 能够消费消息?

2019-08-29 13:17:23 +08:00
 heavyrainn

PUBLISHER 代码:

func main() {
	nc,err := stan.Connect("test-cluster","idc",stan.NatsURL("nats://127.0.0.1:4222"))
	if err != nil{
		panic(err)
	}
	fmt.Println("connect succ")
	for i:=0;i<10;i++{
		fmt.Println("publishing:",i)
		err := nc.Publish("tp1",[]byte(strconv.Itoa(i)))
		if err != nil{
			panic(err)
		}
	}
	nc.Close()
}

QueueSubscriber 代码:

func main() {
	nc,err := stan.Connect("test-cluster","subscriber",stan.NatsURL("nats://localhost:4222"))
	if err != nil{
		panic(err)
	}
	defer nc.Close()
	subs := make([]stan.Subscription,3)
	for i:=0;i<3;i++{
		workername := "worker"+strconv.Itoa(i)
		fmt.Println(fmt.Sprintf("QueueSubscribe %s start",workername))
		sub,err := nc.QueueSubscribe("tp1","ch1", func(msg *stan.Msg) {
			fmt.Println(workername,"get msg:",string(msg.Data),"start doing something")
			time.Sleep(1*time.Second)
		},stan.DurableName("subscriber"),stan.AckWait(time.Hour*24))
		if err != nil{
			panic(err)
		}
		subs[i] = sub
	}
	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt, os.Kill)
	select{
	case <- c:
		fmt.Println("Subscriber CLOSE")
		for i,_ := range subs{
			subs[i].Close()
		}
		nc.Close()
		fmt.Println("quit")
	}
}

Publisher 输出:

connect succ
publishing: 0
publishing: 1
publishing: 2
publishing: 3
publishing: 4
publishing: 5
publishing: 6
publishing: 7
publishing: 8
publishing: 9

QueueSubscriber 输出:

QueueSubscribe worker0 start
QueueSubscribe worker1 start
worker0 get msg: 0 start doing something
QueueSubscribe worker2 start
worker0 get msg: 1 start doing something
worker0 get msg: 2 start doing something
worker0 get msg: 3 start doing something
worker0 get msg: 4 start doing something
worker0 get msg: 5 start doing something
worker0 get msg: 6 start doing something
worker0 get msg: 7 start doing something
worker0 get msg: 8 start doing something
worker0 get msg: 9 start doing something

请问朋友们是否有遇到过一样的问题呢?谢谢大家

2952 次点击
所在节点    Go 编程语言
4 条回复
freestyle
2019-08-30 12:44:52 +08:00
这是特性,queue 模式. 如果想每个订阅者都收到,设置不同的 queue 名字或普通方式订阅就行. 可以看下我的博客 https://imhanjm.com/2018/02/17/%E6%B7%B1%E5%85%A5%E7%90%86%E8%A7%A3nats%20&%20nats%20streaming/
heavyrainn
2019-08-30 15:04:59 +08:00
@freestyle 额…不是,我的意思是,为啥其他的 worker 不工作,只有 worker0 在工作
freestyle
2019-08-31 07:16:06 +08:00
@heavyrainn 你这是同一个连接啊 一般 queueSub 是不同的进程即不同的连接 你试试给每个 worker 创建一个连接.
heavyrainn
2019-09-11 17:28:02 +08:00
@freestyle 我搞清楚了…是因为没有设置 MaxInflight 值的问题,派出的任务都最先启动的 worker 给接收了。设置 MaxInflight 值为 1 实现了正常分 worker 执行。谢谢啦

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/596175

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX