请教一下 Webflux Reactor 编程

2020-05-18 11:44:04 +08:00
 binbinyouliiii
public People getPeople(String name) {
    People people = getCache(name);
    if (people != null) {
        return people;
    }
    //Mono<People> people = httpService.monoRequest("http://www.xxx.com/path", People.class);
    people = httpService.syncRequest("http://www.xxx.com/path/info?name=" + name, People.class);
    setCache(msisdn, operatorInfo);
    return people;
}

private People getCache(String name) {
    String str = redisTemplate.opsForValue().get(name);
    if (str == null) {
        return null;
    }
    return deserialize(str);
}

private void setCache(String name, People people) {
    String str = serialize(people);
    redisTemplate.opsForValue().set(name, str);
}

以上是一个非常简单的带有缓存的获取信息的方法,我尝试使用 Spring Webflux 来做,但是写起来发现举步维艰,尤其对于 null 值的流式条件判断根本想不出有什么办法去写


以下是我自己的理解写的,看起来非常不优雅,而且没有 null 判断,请问哪位大佬可以给个优雅的样例吗?网上的例子少得可怜

public Mono<People> getPeople(String name) {
    Mono<People> peopleMono = httpService
            .monoRequest("http://www.xxx.com/path/info?name=" + name, People.class);
    Mono<Boolean> setCacheMono = peopleMono.flatMap(people -> setCache(name, people));
    return peopleMono
            .and(setCacheMono)
            .then(peopleMono);
}

private Mono<People> getnCache(String name) {
    return reactiveRedisTemplate.opsForValue()
            .get(name)
            .map(this::deserialize);
}

private Mono<Boolean> setCache(String name, People people) {
    return reactiveRedisTemplate.opsForValue()
            .set(name, serialize(people));
}
2602 次点击
所在节点    Java
10 条回复
jaylee4869
2020-05-18 11:51:56 +08:00
Optional
TtTtTtT
2020-05-18 12:08:28 +08:00
其实是一样的。。

People people = getCache(name);
if (people != null) {
return people;
}

=>

getCahe(name).switchIfEmpty(<after logic>)
TtTtTtT
2020-05-18 12:24:13 +08:00
换个姿势解释一下,传统代码是这样子的:
statement1;
statement2;
statement3;

那么对于 Reactor,其根本目标就是在于让 statement1 和 statement2 之间可以执行一些额外的代码,来保证代码是可调度的,进而起到所谓的 IO 优化。

那么首先,就需要改造成:

(statement1 as Mono).then(statement2 as Mono).then(statement3 as Mono)

这样就能在每个 Mono 之间插入一些调度的代码。

另一方面,为了兼容其他的语法,比如 return,比如 null,Mono 就提供了大量的成员方法来简化这个 then,让它写起来更爽。
比如 null 对应了 Mono.empty(),return 对应了 Mono<T>里的 T 。

以上,你需要考虑每个逻辑对应的是 Mono 的哪个方法。
比如 if(x != null) { return x;} <after statement>对应的就是 switchIfEmpty(<after statement> as Mono);
比如 if(x != null) { return x;} else { return y; } 对应的就是 defaultIfEmpty(y)。
azcvcza
2020-05-18 12:34:24 +08:00
如果可以的话,康康 js 的 promise 可能会有点帮助
hantsy
2020-05-18 12:53:34 +08:00
Mono 中 Empty 时用 SwitchIfEmpty 或者 defaultIfEmpty 。

https://github.com/hantsy/spring-reactive-sample

ReactiveStreams 实现太多了,Reactor,Rxjava2/3, SmallRye Munity, Microprofile Reactive message Operators,Helidon 也包含了一个实现。
hantsy
2020-05-18 12:55:06 +08:00
@azcvcza js 世界的 Rxjs 才是用的 ReactiveStreams 标准。Promise 算是老一代流式处理,没有订阅模式,通知机制。
binbinyouliiii
2020-05-18 14:24:55 +08:00
@TtTtTtT #3 看了以下 switchIfEmpty 看起来像是为了保证不出错,碰到 empty 的时候以补偿的形式继续执行下去的,走到 setCache 的时候,因为无法判断是否是从缓存拿出来的,所以只能一股脑继续执行 setCahe 了。
azcvcza
2020-05-18 15:41:05 +08:00
@hantsy keyi
TtTtTtT
2020-05-18 17:14:41 +08:00
@binbinyouliiii 不不不,switchIfEmpty 应该这么用:
getFromCache(key).switchIfEmpty(
getFromDb(key).flatMap(value -> setCache(key,value))
)
如果设置缓存是纯异步的,也可以:
getFromCache(key).switchIfEmpty(
getFromDb(key).doOnSuccess(value -> setCache(key,value).subscribe())
)
jinzhongyuan
2020-05-19 20:10:43 +08:00
活捉大佬

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/672822

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX