MQ 消费者阻塞如何处理?(ActiveMQ、RocketMQ)

2019-10-30 16:23:57 +08:00
 dunhanson

问题大概描述是:

邮件发送,消费者数量是 5-20,有时候会阻塞(问题还不清楚)导致消费者无法继续处理队列中的消息

我的处理方式是重启 tomcat,重启果然是万能的,重启后,就继续读取消息了。

但不可能天天守着看然后重启一下吧

于是乎,我就搜了相关的 ActiveMQ 的文章 https://blog.csdn.net/ma15732625261/article/details/81267963 里面讲了 SlowConsumerStrategy:慢速消费者策略,但是我配置了,无效果

<policyEntry queue=">" producerFlowControl="true" memoryLimit="512mb">               
    <slowConsumerStrategy>
        <abortSlowConsumerStrategy abortConnection="false"/>
    </slowConsumerStrategy>  
</policyEntry>

我用了下 RocketMQ,也遇到了类似的问题,consumeTimeout 也没效果

我的理解是:配置了 consumeTimeout,超时之后,就处理下一个消息

package cn.msb.rocketmq.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Slf4j
@Service
@RocketMQMessageListener(topic = "test-topic-1",
        consumerGroup = "my-consumer_test-topic-1",
        selectorExpression = "first",
        selectorType = SelectorType.TAG,
        consumeThreadMax = 1, consumeTimeout = 1000)
public class MyConsumer1 implements RocketMQListener<String> {
    public void onMessage(String message) {
        if(message.contains("1")) {
            try {
                System.out.println("1 阻塞中。。。");
                Thread.sleep(1000*60*60);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        log.info("received message: {}", message);
    }
}

我想达到的效果是:消费者处理超时后就终止执行,让给下个消息进行处理

6899 次点击
所在节点    程序员
16 条回复
superrichman
2019-10-30 16:36:25 +08:00
consumeTimeout 的单位是不是分钟
dunhanson
2019-10-30 16:39:33 +08:00
@superrichman 毫秒,默认 30000
lucifer1108
2019-10-30 16:42:57 +08:00
让我想到了一个面试题,怎么限制一个方法的执行时间.
可以用 callable+executors 实现.
贴个 demo 代码
```java
Callable<String> call = new Callable<String>() {
public String call() throws Exception {
// 开始执行耗时操作
// Thread.sleep(1000 * 5);
// return "线程执行完成.";
// 响应时间较长的方法或接口调用,返回 String 类型
return getRecCourses(params);
}
};
try {
ExecutorService exec = Executors.newFixedThreadPool(1);
Future<String> future = exec.submit(call);
// csvData 为 call 方法里的返回值,也就是我们方法的返回值
csvData = future.get(1000 * 1, TimeUnit.MILLISECONDS); // 任务处理超时时间设为 1 秒
} catch (TimeoutException ex) {
// 捕获超时异常,超时处理,可以通过 ex 抛出异常,如果不抛出,则控制台不输出异常。
csvData = null;
LogUtil.warn(Module.COURSE, getClass(), "getCourseRecFromBI", "请求 Bi 推荐课程数据超时,使用原来推荐系统"ex);
} catch (Exception e) {
csvData = null;
LogUtil.warn(Module.COURSE, getClass(), "getCourseRecFromBI", "请求 Bi 推荐课程数据失败,使用原来推荐系统");
}
```
lucifer1108
2019-10-30 16:43:36 +08:00
@lucifer1108 什么鬼,是我用 md 的姿势不正确么
softtwilight
2019-10-30 16:44:53 +08:00
consumeThreadMax = 1, 单线程消费是业务需求吗? 改成多线程不会影响阻塞不会影响别的消费,但是阻塞的问题还是要解决
dunhanson
2019-10-30 17:24:00 +08:00
@lucifer1108 这个有点繁杂
dunhanson
2019-10-30 17:24:35 +08:00
@softtwilight 不是单线程消费,只是用单线程好模拟和控制
dunhanson
2019-10-30 17:27:27 +08:00
@lucifer1108 按道理 MQ 都应该有这个具体的配置的
x537196
2019-10-30 17:35:19 +08:00
为什么阻塞呢?其实我没怎么看懂问题,把消息取下来,放入线程池中执行响应业务不可以吗?
justfly
2019-10-30 17:35:48 +08:00
从根上解决问题,找到阻塞的原因。

根据我的经验,如果消费者突然拿不到消息,而队列又有消息堆积的话,从客户端和服务端两侧都看下 tcp 连接还在不在。

在某些低吞吐量的场景,tcp 连接长时间空闲,某些网络中间件会断掉连接而客户端没感知,就会 block 住了,再有大吞吐量后也不会恢复。

如果连接已经断了,设置 rabbitmq 的心跳,而且心跳时间要比 tcp 自身的 keep alive 间隔短一些,保证连接活跃。
dunhanson
2019-10-31 09:11:46 +08:00
@x537196 还没找
dunhanson
2019-10-31 09:12:02 +08:00
@justfly 问题确实要找的
Dabaicong
2019-10-31 09:30:38 +08:00
#9 楼说的对,拉下消息,放线程池中异步执行,执行成功回调。再加上守护线程,监视任务执行,超时的话,守护线程就干掉 。
jyounn
2019-10-31 13:51:41 +08:00
......
consumeThreadMax = 1, consumeTimeout = 1000)
......
Thread.sleep(1000*60*60);

消费线程数最大为 1,然后又让消费者线程 sleep 3600 秒?线程 sleep 是不会结束的,这个时候不会创建新的消费线程,导致无法创建新线程消费.消费者消费建议使用线程池,可以复用且好管理.另外你说的阻塞具体是什么现象呢?
dunhanson
2019-10-31 16:29:04 +08:00
@jyounn 我这个是模拟线上环境的阻塞状态
线上肯定不止一个消费者线程数的
阻塞情况是指,线上配置 5 个消费者线程池,然后刚好 5 个都在执行的过程中卡住了(问题还不清楚,你就理解为都因为某种原因 sleep 了)
jyounn
2019-11-01 10:29:59 +08:00
@dunhanson 根据你的描述,看下消费的逻辑中是否有导致无限等待的情况?可以搭一个 RocketMQ 控制台看下生产者消费者的状态,通过 debug 看看.

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/614521

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX