[ Java ] CountDownLatch 运用场景(1)

2018-08-28 21:54:41 +08:00
 Acceml

基本功能

CountDownLatch 也叫闭锁,使得一(多)个主线程必须等待其他线程完成操作后再执行。 使用的方式是:CountDownLatch 内部维护一个计数器,主线程先执行 await 方法,如果此时计数器大于 0,则阻塞等待。当一个线程完成任务后,计数器值减 1。直到计数器为 0 时,表示所有的线程已经完成任务,等待的主线程被唤醒继续执行。 应用场景:应用程序的主线程希望在负责启动框架服务的线程已经完成之后再执行。

应用:缓存加载

在广告的核心引擎中,我们的服务需要加载很多缓存数据,加载完成之后,主线程才能启动对外提供服务。这个时候我们就用到了 CountDownLatch 来定时加载缓存。缓存加载的东西我们之后再单独开帖子讲,这里先看 CountDownLatch 的使用。

public abstract class BaseCacheUpdateJob {
    //job 的名字
    public String name() {
        return this.getClass().getSimpleName();
    }
    //job 的执行周期
    public long getPeriodInSecond() {
        return PERIOD_ONE_HOUR;
    }
    //job 的重要性
    public boolean isEssential() {
        return false;
    }
    //job 的具体内容
    public abstract boolean update();
}
//加载 App 数据的 cache.
@Component
public class AppCache extends BaseCacheUpdateJob {

    private Map<String, String> map = new HashMap<>();

    @Autowired
    public AppCache() {
    }

    @Override
    public long getPeriodInSecond() {
        return PERIOD_ONE_MINUTE;
    }

    public String getValueByKey(String appId) {
        return map.getOrDefault(appId, "not find in appCache");
    }

    @Override
    public boolean update() {
        map.put("add", "0");
        return true;
    }
}

//加载广告数据的 cache.
@Component
public class AdCache extends BaseCacheUpdateJob {

    private Map<String, String> map = new HashMap<>();

    @Autowired
    public AdCache() {
    }

    @Override
    public long getPeriodInSecond() {
        return PERIOD_ONE_MINUTE;
    }

    public String getValueByKey(String appId) {
        return map.getOrDefault(appId, "not find in AdCache");
    }

    @Override
    public boolean update() {
        map.put("add", "0");
        return true;
    }
}

// 加载用户画像的 cache
// 加载 Ctr 预估模型的 cache
// 加载黑白名单的 cache
// 加载配置项的 cache
// ...

上面两步我们定义好了我们服务启动的时候需要干什么事情,那么具体怎么干,就交给了 CountDownLatch

@Service
@Slf4j
public final class InterCacheService { 
    //这里 spring 的自动注入会把定义好的 Bean 全部注入进来内存
    @Autowired
    private List<BaseCacheUpdateJob> cacheUpdateJobs;
    
    @PostConstruct
    private void start() {
        //定义线程池
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(cacheUpdateJobs.size());
        CountDownLatch completeTaskLatch = new CountDownLatch(cacheUpdateJobs.size());
        
        for(BaseCacheUpdateJob job : cacheUpdateJobs) {
            boolean loadStatus = job.update();
            if (loadStatus) {
                countDownLatch.countDown();
            }
        }
        //阻塞住,等待上面的加载完,才会执行主线程.
        completeTaskLatch.await();
        //缓存加载到内存中了,主线程可以继续加载其他 bean,完成之后提供服务.
    }
}

这里只是举个例子,简化了很多代码,这种代码肯定不能在生产环境跑的,这么跑肯定会出问题的。

比如:

1.考虑这么一个场景,如果缓存是一个不那么重要的,你的服务其实是可以起来的。那么如何管理这种状态呢?

2.还有可能出现缓存依赖的问题,加载 AdCache 需要依赖于 AppCache,加载 AppCache 需要依赖 BlackListCache,怎么管理这种状态呢?

3.缓存没加载成功,我什么时候去尝试呢?隔多久?

4.缓存都在同一个时间点去加载,导致我线上的 GC 压力比较大怎么办?

5.缓存一般是多线程访问的公共资源,那么怎么在线程安全和性能之间做取舍呢?

我后面单独开几篇帖子讲缓存,有兴趣的小伙伴可以先看下这个小框架的源码。

源码

https://github.com/Acceml/local_cache_manager

  1. 如果你校招 /1-3 年的社招,简历上没有项目的话,把这个项目吃透,可以拿出去和面试官吹牛逼的。前提是你理解我们为什么这么写。这个可比你写个什么爬虫有技术含量得多。
  2. 如果你是工作的小伙伴,你们的服务缓存有很好的管理机制吗? 可以参考下我们的实现方法。

热门阅读


最近在刷题,有一起的可以加手撕代码群:805423079

1955 次点击
所在节点    Java
3 条回复
skypyb
2018-08-29 09:13:18 +08:00
闭锁啊..用过几次,一般都是必须等待某一些线程 /任务执行完毕然后进行一些操作才用得上把 = =
Acceml
2018-08-29 09:15:02 +08:00
@skypyb 缓存加载就是这么一个典型场景啊。网上举的例子都是什么开会要等待会议人员到场啥的,我觉得还是用一个生产环境的例子比较好。
skypyb
2018-08-29 09:42:12 +08:00
@Acceml 闭锁的使用还是比较简单的。说的简单点,在线程 /任务类中接收一个闭锁,在线程执行完毕或者 finally 块里面调用 countDown()。
需要等待其余线程执行完毕的主线程使用 await()等待闭锁打开就行了

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/484081

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX