请问大佬 并发的 RPC 调用次数超过了 Hystrix 默认的线程数,怎么"慢慢"地调用?

2019-08-19 10:40:59 +08:00
 wuzhizuiguo

一个 service 方法里 for 循环一个列表,每条记录里 new Thread 在 run 方法里调用一次 payRPC 方法 例如

  orderList.forEach(x->{
              new Thread(){
                     public void run(){
                          payRPC.reset(x.getId(),x.getUserId());
                      }
                  }.start();
                 });

但是这种方式 报错了 "could not be queued for execution and no fallback available", 超出了 Hystrix 默认是 10 个线程.

请问一下能否改成 循环列表 把 rpc 调用放到线程池中去,让它去控制短时间内的 RPC 调用,这样是否可行? executorService 的队列 queue 大小是不是得小于 10 个(Hystrix 默认的线程数)

orderList.forEach(x->{
           executorService.execute(new Runnable() {
               @Override
               public void run() {
                   payRPC.reset(x.getId(),x.getUserId());
               }
           });
       });

或者 循环列表, 不使用 PRC 调用,直接利用 httpClient 这种直接调用 url 接口请求? 有提到 RabbitMQ(这个还没接触过)

就像这种情况 https://www.v2ex.com/t/481064, 300 万的列表,rpc 接口 但每次只能处理 300(超出报错),怎么去有限制地调用 rpc 接口 (我的数据量没这么大).

请问下大佬 我这种该怎么解决? 谢谢

4576 次点击
所在节点    Java
13 条回复
Raymon111111
2019-08-19 11:19:05 +08:00
线程池应该是分为执行线程的大小和等待队列的吧
DanielYao
2019-08-19 11:28:56 +08:00
你可以先把 id 全部放到 ConcurrentQueue<T> 里,然后定义一个 list<task>任务集合,循环从 ConcurrentQueue 取,放到 list<task>中,10 个一次执行
if (listT.Count % 10 == 0)
Task.WaitAll(listT.ToArray());
DanielYao
2019-08-19 11:30:29 +08:00
@DanielYao 最好还是能让 RPC 提供批量方法
519718366
2019-08-19 11:35:51 +08:00
不知道你这代码是为了配合问题描述还是业务就这样

如果一次服务调用就要 reset 多个 order,那就看看服务提供方能给你一个批量接口
for 循环调 rpc 服务会被批斗死

如果一次只是 reset 一个 order,你代码只是为了模拟多个用户并发的场景,你就正常调,那就是正常的业务产品问题了
wuzhizuiguo
2019-08-19 11:50:38 +08:00
@Raymon111111 谢谢. 好像是的.. 第一次用. ThreadPoolExecutor 的 corePoolSize 和 maximumPoolSize 小于 10 ,然后 BlockingQueue 大一点, 可能是先执行核心线程数大小的 rpc 调用,然后队列里的在等着. 测了 20 几个,没报异常,不知道列表大了会怎么样?
wuzhizuiguo
2019-08-19 11:51:01 +08:00
@DanielYao 这个 ConcurrentQueue 好像是 C++的, 我去看看对应 Java 的
wuzhizuiguo
2019-08-19 11:51:33 +08:00
@519718366 谢谢. 明白了, 是我自己写成了这样的.. 没有了解到这样循环调用 rpc 会出现问题.
519718366
2019-08-19 12:10:12 +08:00
@wuzhizuiguo 既然你的服务的任务量在这,不管线程池还是 mq,注意你自己的服务别超时
wuzhizuiguo
2019-08-19 13:14:10 +08:00
@519718366 好的. 没有那么大,很小.....
wdmx007
2019-08-19 14:59:33 +08:00
java 的线程池不是内置了一个 TaskQueue 嘛,不用再外面自己用 Queue 了。
passerbytiny
2019-08-19 16:09:26 +08:00
你的这个错误是“执行已经不能放到队列,即队列已满,并且这种情况也没有定义 fallback 处理”,受限制的是队列容量而不是线程数。队列容量一般是大于线程数的,但是 Hystrix 本来就是做熔断的,有相对极严格的超时要求,队列容量也不建议太大。

你这种情况实际上与 Hystrix 无关,调大 Hystrix 限制、换成 httpClient 都治标不治本。你的问题的本质是:一个 RPC 请求(对你 service 本身的 RPC )分裂成了多个后续的 RPC 请求(多个 payRPC.reset )。类似于 N+1 查询,但比 N+1 查询严重多了。处理方式有两个:
一,让 pay 服务提供批量 reset 的接口,这样不管 orderList 多长都只需要一个后续 RPC。前提是批量 reset 的时间足够短,如果批量 reset 时间很长,甚至等同于单个 reset 时间乘以个数,那么该方式不可用。
二,异步处理或最终一致性。比如说 pay 服务提供批量 reset 注册接口,调用该接口只是把 reset 加入 pay 服务的队列,并没有实际的 reset,pay 服务后续会自行处理队列,你当前的服务需要过段时间后再去询问执行结果。又比如说使用消息队列,你当前的服务只用把一个 reset 消息放到 RabbitMQ 等消息队列上,不用根 pay 服务直接打交道,pay 服务需要自行消费消息队列做后续处理。这种方式很复杂,一时半会说不清楚,你可以以“最终一致性”、“事件驱动”为关键字搜索资料。
wuzhizuiguo
2019-08-19 16:42:35 +08:00
@wdmx007 谢谢. 是这个参数吧 BlockingQueue.
wuzhizuiguo
2019-08-19 16:43:23 +08:00
@passerbytiny 谢谢,很详细, 学习了. 具体报错是博客上看到的" 超出了 Hystrix 的默认 10 个线程", 实际日志恰好显示执行的个数也是 10 个,Hystrix 还没有了解过. 确实我偷懒了,想当然了,以为循环 rpc 调一下就好了,没有写一个 RPC 批量处理. 刚刚试了下 ExecutorService 线程池 创建一个核心线程数小于 10 的(2,3,4..), 然后设置一定长度的队列 BlockingQueue(几千). 消息队列自己写的部分还没有接触到..格局很小,没有几十上百万的数据...

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/593054

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX