真诚请教原因, RabbitMQ 增加多消费者导致生产者推送消息阻塞(10s~30s)

2022-06-21 09:47:04 +08:00
 withBruce

RabbitMQ 增加多消费者导致生产者推送消息阻塞(10s~30s) 设备上传数据到系统 A(netty),系统 A 根据数据类型推送消息到不同的队列,因为设备量增多的原因,之前单消费者开始处理的不及时,就想着多增加个消费者(和之前的消费者代码一样),然后系统 A 推送消息开始出现卡顿,数据帧应答的很慢,感觉不像是流控的事,管理端看着也没问题 相关代码: 系统 A:

channelRead(ChannelHandlerContext ctx,  Object msg){
    ....
    sendAck(ctx,ack);
    switch (data.getClass().getName()) {
        case "realTimeData":
            RabbitUtil.getInstance().publish(realTimeData);
    }
}
publish(RealTimeData realTimeData){
     .......
     Map<String, Object> header = new HashMap<String, Object>();
     header.put("DataType", "RealTimeData");
     BasicProperties props = new BasicProperties().builder().headers(header).build();
     channel.basicPublish(exchangeName, routeKey_CollectedData, props, CollectedRealTimeDataPackageTransform.toBytes(data));
}
channel init:
    private Channel channel;
    private ConnectionFactory factory = new ConnectionFactory();
    @PostConstruct
    public void init() {
        instance = this;
        factory.setUsername(mqUserName);
        factory.setPassword(mqPassword);
        factory.setHost(mqHost);
        factory.setVirtualHost(mqVirtualHost);
        factory.setPort(mqPort);
      }
     channel = factory.newConnection().createChannel();
}

消费者代码:

    @Autowired
    DataProcessor processor;
    @Autowired
    @Qualifier("threadpool")
    ThreadPoolExecutor threadPool;
@RabbitListener(queues = "${mq.queue.Original.CollectedData}", ackMode = "MANUAL")
 public void process(Message msg, Channel channel) {
        MessageProperties mp = msg.getMessageProperties();
        Map<String, Object> headers = mp.getHeaders();
        String dataType = (String) headers.get("DataType");
 switch (dataType) {
            case "RealTimeData":
                CompletableFuture.runAsync(() -> {
                    try {
                        channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
                        CollectedRealTimeData crtd = CollectedRealTimeDataPackageTransform.fromBytes(msg.getBody());
                        processor.process(crtd);
                    } catch (Exception e) {
                        try {
                            channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, false);
                        } catch (IOException ioException) {
                            ioException.printStackTrace();
                        }
                        e.printStackTrace();
                    }
                }, threadPool);
                break;
        }



 }
1561 次点击
所在节点    程序员
5 条回复
qW7bo2FbzbC0
2022-06-21 09:51:09 +08:00
用 rabbit 经常会遇到 server no response 的情况,只能手动 kill 进程重新启动,改用 kafka 后未出现同样问题。我们的场景比较简单,切换起来很快。我们这边观测 rabbit client 很多或消息大的话,对 server 有压力。和 rabbit 的性能宣传语不同,可能使我们的使用方式有问题吧
wupher
2022-06-21 10:27:15 +08:00
没碰到类似的情况,可能是量级未到?

之前写的一个系统,使用 RabbitMQ 进行多端通讯。日常大约在 5000 ~ 8000 个客户端进行数据交换,同步消息和异步消息都有。

消费者一直都是多节点通过 RabbitListener 连接 RabbitMQ 。刚才又看了一下 application.yml
批量获取一次 10 条,concurrency 5 max 10

之前未碰到过 publish 超时的情况。

你用的版本是?
withBruce
2022-06-21 12:09:20 +08:00
concurrency 这个属性开多线程不是消费的同一批数据把
concurrency 5 max 10 配置好这个问题解决了
还是自己对于 mq 没弄明白
谢谢了!
withBruce
2022-06-21 12:09:31 +08:00
@wupher 谢谢
withBruce
2022-06-21 12:09:45 +08:00
@qW7bo2FbzbC0 谢谢

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/861047

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX