消费者怎么确保能取到数据,又能正确退出线程

2019-03-12 10:58:49 +08:00
 shayang888

现在是这样的

1.首先我最外面有个 quartz 定时器,每隔 N 秒执行一次

2.定时器里执行的内容是这样,里面有个线程池,线程池大小是 2 个线程,coresize,maxsize 都是 2

3.线程池里的 2 个线程,分别一个去执行生产者方法,一个去执行消费者方法

4.生产者和消费者中间用消息队列来临时存数据

现在有个问题,就是消费者这边,怎么能保证取到数据,又能正确的退出线程,进行到下一次定时器的执行

在这之前我做的蠢办法是消费者那边加了个 while(true),结果定时器执行了 2 次后,线程池就满了,然后拒绝

所以不知道有啥好办法

代码:

//线程池的
public class TestTask extends QuartzJobBean {

    private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            2,
            2,
            60,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(4));

    @Override
    protected void executeInternal(JobExecutionContext jobExecutionContext) {
        threadPoolExecutor.execute(() -> {
            //生产者代码
            
            //数据放入消息队列
        });
        threadPoolExecutor.execute(() -> {
            while (true) {
                //取队列里的数据
                //消费者代码
            }
        });
    }
}
3214 次点击
所在节点    Java
31 条回复
jinksw
2019-03-12 11:09:43 +08:00
看样子你定时器的意思是 每两秒生产一次 然后消费一次

为啥要 2 个线程 你每次运行一个线程 先生产再消费不行吗
sarlanori
2019-03-12 11:15:02 +08:00
在 C#里,我一般是用信号量来等待和通知。
shayang888
2019-03-12 11:24:59 +08:00
@jinksw 那样不就是同步了吗?我是想生产和消费分隔开来
shayang888
2019-03-12 11:26:27 +08:00
@jinksw 另外定时器并不是 2 秒执行一次啊 定时器的执行时间随便设置的
passerbytiny
2019-03-12 12:05:56 +08:00
消费者在线程之上,而不是之内,拿到数据后再从线程池里开线程去执行后续处理。消费者不能用多线程+死循环来取数据,而应该是单线程异步监听+同步取值,再具体我也不知道了,因为基本都是直接调用 API。
Counter
2019-03-12 12:11:53 +08:00
机制是不是不太对,改成这样如何呢?
生产后的数据加锁,生产者方法和消费者方法排队存取
shayang888
2019-03-12 12:14:42 +08:00
@passerbytiny 拿到数据后再从线程池里开线程去执行后续处理吗 好像有点思路
shayang888
2019-03-12 12:16:46 +08:00
@passerbytiny 可是消费者怎么知道它啥时候能拿到数据
shayang888
2019-03-12 12:17:18 +08:00
@Counter 没有懂你的意思
limhiaoing
2019-03-12 12:21:34 +08:00
生产者、消费者线程一般用条件变量 Condition Variable 来通信。
shayang888
2019-03-12 12:23:59 +08:00
@passerbytiny 可能也没这么复杂 我想做的只是在定时器里 将生产者和消费者异步执行就行
shayang888
2019-03-12 12:24:05 +08:00
@Counter 可能也没这么复杂 我想做的只是在定时器里 将生产者和消费者异步执行就行
passerbytiny
2019-03-12 13:02:02 +08:00
@shayang888 #12 消费者也不要放在定时器里,它应该是一个常驻的、独立的单线程。我不知道你的消息队列是什么队列,但一般的消息队列都是提供消费者 API 的,可以直接使用,自己做消费者太难了。

如果用线程去看消费者 /监听,那么是类似 while(true) {if(收到数据) {……} else { Thread.sleep(0.0001);} },这要是用线程去做,要么系统受不了,要么延迟时间受不了。消费者 /监听的轮询,回采用操作系统层次的东西,高级程序员都没必要知道的太深入,自己设计是肯定设计不来的。

你可以参考下 java.net.ServerSocket#accept()。
autogen
2019-03-12 13:19:51 +08:00
生产者发送的 msg 封装一下,加个 ctrl 字段,消费者接收到 msg.ctrl=exit 就退出
NieKing
2019-03-12 13:29:38 +08:00
我想起了 Android 里的 RxJava
linjiayu
2019-03-12 13:33:10 +08:00
实现 callable
linjiayu
2019-03-12 13:35:12 +08:00
public void offer(Event event)
{
synchronized (eventQueue)
{
while (eventQueue.size() >= max)
{
try
{
console(" the queue is full.");
eventQueue.wait();
} catch (InterruptedException e)
{
e.printStackTrace();
}
}

console(" the new event is submitted");
eventQueue.addLast(event);
eventQueue.notifyAll();
}
}

public Event take()
{
synchronized (eventQueue)
{
while (eventQueue.isEmpty())
{
try
{
console(" the queue is empty.");
eventQueue.wait();
} catch (InterruptedException e)
{
e.printStackTrace();
}
}


Event event = eventQueue.removeFirst();
this.eventQueue.notifyAll();
console(" the event " + event + " is handled.");
return event;
}
}
shayang888
2019-03-12 13:46:28 +08:00
@autogen 我现在是多个生产者同时生产数据然后往队列里 push,然后只有一个消费者在从队列里消费 加字段的话 我给哪个生产者加这个字段呢
jingxyy
2019-03-12 13:48:43 +08:00
先不管怎么实现合理的问题
你是不是消费线程消费完了没退出啊?这样每一个 interval 之后你就有一个 while(true)的消费线程在跑,于是第 2 个周期后无法创建新的消费线程
ratel
2019-03-12 13:50:50 +08:00
使用消息中间件啊,消费者单独订阅消息消费,生产者用定时器就行了。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/543599

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX