200 元有偿求助,使用 Java 的 rsocket 上传文件

88 天前
 shuang

服务端是第三方的,我方需要按照接口文档上传文件。
1 、分片上传,实体为 Flux<DataBuffer>
2 、需要携带 header ,媒体类型为 application/json
目前可以确认,服务端是 ok 的,问题出在客户端上传文件的代码。

以下是客户端上传文件的代码:

CompositeByteBuf compositeMetadata = ByteBufAllocator.DEFAULT.compositeBuffer();

// 1. 创建路由元数据
ByteBuf routeMetadata = TaggingMetadataCodec.createTaggingContent(
        ByteBufAllocator.DEFAULT, Collections.singletonList(platformConfig.getFileUploadRoute()));
 compositeMetadata.addComponent(true, ByteBufAllocator.DEFAULT.buffer().writeBytes(routeMetadata));

// 2. 按文档要求添加请求 header
Map<String, String> uploadFileHeader = new HashMap<>();
uploadFileHeader.put("token", token);
uploadFileHeader.put("fileType", "jpg");
uploadFileHeader.put("fileName", "random-file-name");
ByteBuf customMetadata = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, JSONUtil.toJsonStr(uploadFileHeader));
CompositeMetadataCodec.encodeAndAddMetadata(compositeMetadata, ByteBufAllocator.DEFAULT, 
        WellKnownMimeType.APPLICATION_JSON, customMetadata);

// 读取本地文件
Flux<DataBuffer> dataBufferFlux = DataBufferUtils.read(Paths.get(filePath), new DefaultDataBufferFactory(), 1024 * 8);

// 合并 Payloads  将每个 DataBuffer 转换为 Payload ,并附加 metadata
Flux<Payload> requestPayloads = dataBufferFlux.map(dataBuffer -> {
    // 将每个 DataBuffer 转换为 Payload ,并附加 metadata
    return ByteBufPayload.create(Unpooled.wrappedBuffer(dataBuffer.asByteBuffer()), compositeMetadata);
});

rsocket.requestChannel(requestPayloads)
        .doOnNext(payload -> log.error("=====> doOnNext"))
        .doOnError(error -> log.error("=====> doOnError", error))
        .doOnComplete(() -> log.info("=====> doOnComplete"))
        .subscribe();

注:由于对 rsocket 完全不熟,所以以上代码任何地方都有可能是错的。

目前一直报错,缺少请求头:Missing header 'upload-file-header' for method parameter type , 从报错分析,已经请求到接口了,说明路由 metadata 没问题,但是请求头传递不正确。文件数据流是否传递正确还未知。

希望寻求有过 rsocket 相关开发经验的人,帮忙看下代码哪里有问题。
解决后发微信红包 200 元作为报酬。

1763 次点击
所在节点    外包
17 条回复
sioncheng
87 天前
有点好奇,从报错信息来看,上传方是不是没有明确接收方需要的 upload-file-header 信息。还有,是不是可以先一般 java 代码去实现上传功能,确保明确了解了接收方的接口文档,然后再将一般 java 代码改为 rsocket 。
lervard358
87 天前
我接了 怎么联系 加我  YWxwaGEtZW5naW5lZXJpbmc=
larisboy
87 天前
uploadFileHeader 加上 upload-file-header 看看
shuang
87 天前
@larisboy 不知道怎么加,试了几种写法都不对
shuang
87 天前
@sioncheng 目前问题就在于如何用 rsocket 与服务端交互,不知道这个请求头该如何传递
lbbdefy
87 天前
ByteBuf 大小端的问题要先确认
sioncheng
86 天前
@shuang 再探讨下。我意思是 rsocket 只是一个技术手段吧,rsocket 能做到的,其他 java 方式应该也能做到;理解清楚对方的接收协议才是本质,对方是标准的 multipart/form-data 协议还是自定义协议呢,这样才能对症解决问题吧。
shuang
86 天前
@sioncheng
没太明白你的意思。
对方的接口文档里就是要求按照 rsocket 的方式上传文件。你所说的 multipart/form-data ,应该是常规的 http 协议的文件上传,服务端是 tcp 协议的。两者技术手段不同。
shuang
86 天前
@lbbdefy 第一次听说这个词,我去搜一下什么是大小端
sioncheng
86 天前
@shuang 懂了,就是必须 rsocket ,并且对方也不是常见的 http multipart/form-data 协议。头疼,哈哈。
shuang
86 天前
```
// 元数据
CompositeByteBuf composite = ByteBufAllocator.DEFAULT.compositeBuffer();

// 1. 创建路由元数据
ByteBuf routeContent = TaggingMetadataCodec.createTaggingContent(
ByteBufAllocator.DEFAULT,
Collections.singletonList(platformConfig.getFileUploadRoute()));

CompositeMetadataCodec.encodeAndAddMetadata(
composite,
ByteBufAllocator.DEFAULT,
WellKnownMimeType.MESSAGE_RSOCKET_ROUTING,
routeContent);

// 2. 创建上传文件头
ByteBuf headerContent = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, getHeaderContent(token));
CompositeMetadataCodec.encodeAndAddMetadata(
composite,
ByteBufAllocator.DEFAULT,
WellKnownMimeType.fromString("message/x.upload-file-header"),
headerContent);

// 读取本地文件
Flux<DataBuffer> dataBufferFlux = DataBufferUtils.read(Paths.get(filePath), new DefaultDataBufferFactory(), 1024 * 8);

Flux<Payload> requestPayloads = dataBufferFlux.map(buf -> ByteBufPayload.create(
Unpooled.wrappedBuffer(buf.asByteBuffer()),
composite
));

rsocket.requestChannel(requestPayloads)
.doOnNext(payload -> log.error("=====> doOnNext"))
.doOnError(error -> log.error("=====> doOnError", error))
.doOnComplete(() -> log.info("=====> doOnComplete"))
.subscribe();
```

报错说 Missing header 'upload-file-header' ,我又换了种写法,还是不行
skyyan
85 天前
第三方提供的 api 接口文档能发下不
kvolongoto
85 天前
// 1. 创建路由元数据 (保持不变)
ByteBuf routeMetadata = TaggingMetadataCodec.createTaggingContent(
ByteBufAllocator.DEFAULT, Collections.singletonList(platformConfig.getFileUploadRoute()));
compositeMetadata.addComponent(true, ByteBufAllocator.DEFAULT.buffer().writeBytes(routeMetadata));

// 2. 添加服务器要求的 upload-file-header (新增)
ByteBuf uploadFileHeaderBuf = ByteBufUtil.writeUtf8(
ByteBufAllocator.DEFAULT, "your-header-value-here"); // 替换为实际值
CompositeMetadataCodec.encodeAndAddMetadata(
compositeMetadata,
ByteBufAllocator.DEFAULT,
"upload-file-header", // 必须与服务器注解名称一致
uploadFileHeaderBuf
);

// 3. 添加其他元数据 (JSON 格式)
Map<String, String> uploadFileHeader = new HashMap<>();
uploadFileHeader.put("token", token);
uploadFileHeader.put("fileType", "jpg");
uploadFileHeader.put("fileName", "random-file-name");
ByteBuf customMetadata = ByteBufUtil.writeUtf8(
ByteBufAllocator.DEFAULT, JSONUtil.toJsonStr(uploadFileHeader));
CompositeMetadataCodec.encodeAndAddMetadata(
compositeMetadata,
ByteBufAllocator.DEFAULT,
WellKnownMimeType.APPLICATION_JSON,
customMetadata
);
shuang
85 天前
@kvolongoto
感谢答复。
没看明白,your-header-value-here 这里应该传什么
shuang
85 天前
@kvolongoto
文档里说:请求 header 参数,使用键值对,媒体类型为:application/json 。参数有 token 、fileName 、fileType
shuang
81 天前
已解决。分享一下:

方案一:使用底层的 rsocket ,更灵活,但代码比较繁琐,适合对 rsocket 原理和 api 比较熟悉的人。
RSocketClientConfig.java 关键代码:
@Bean
public RSocket rsocket() {
ClientTransport transport = TcpClientTransport.create(platformConfig.getServerHost(), platformConfig.getServerPort());

RSocket rsocket = RSocketConnector.create()
// 设置 metadata MIME Type ,方便服务端根据 MIME 类型确定 metadata 内容
.metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString())
.dataMimeType(WellKnownMimeType.APPLICATION_JSON.getString())
// 认证相关的参数
.setupPayload(getSetupPayload())
// 接收服务器发送的响应
.acceptor(new SocketAcceptorImpl())
// 设置重连策略
.reconnect(Retry.backoff(2, Duration.ofMillis(500)))
.connect(transport)
.block();

// 检查连接是否成功
if (rsocket == null || rsocket.isDisposed()) {
throw new IllegalStateException("RSocket 连接失败");
}

return rsocket;
}

上传附件的单元测试代码:
@Test
public void testFileSimpleUpload() {
String token = getToken();

// 复合元数据
CompositeByteBuf compositeByteBuf = ByteBufAllocator.DEFAULT.compositeBuffer();

// 1. 创建路由元数据
ByteBuf routeContent = TaggingMetadataCodec.createTaggingContent(
ByteBufAllocator.DEFAULT,
Collections.singletonList(platformConfig.getFileUploadRoute()));
CompositeMetadataCodec.encodeAndAddMetadata(
compositeByteBuf,
ByteBufAllocator.DEFAULT,
WellKnownMimeType.MESSAGE_RSOCKET_ROUTING,
routeContent);

// 2. 创建上传文件头
Map<String, String> uploadFileHeader = new HashMap<>();
uploadFileHeader.put("platformCode", platformConfig.getPlatformCode());
uploadFileHeader.put("token", token);
String fileName = fileResource.getFilename();
uploadFileHeader.put("fileType", fileName.substring(fileName.lastIndexOf(".") + 1));
uploadFileHeader.put("fileName", UUID.randomUUID().toString().replaceAll("-", ""));
ByteBuf headerContent = ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, JSONUtil.toJsonStr(uploadFileHeader));
CompositeMetadataCodec.encodeAndAddMetadata(
compositeByteBuf,
ByteBufAllocator.DEFAULT,
WellKnownMimeType.APPLICATION_JSON,
headerContent);

// 2. 读取本地文件
Flux<Payload> requestPayloads = DataBufferUtils
.read(fileResource, new DefaultDataBufferFactory(), 1024 * 8)
.map(buf -> ByteBufPayload.create(Unpooled.wrappedBuffer(buf.asByteBuffer())))
.startWith(ByteBufPayload.create(Unpooled.EMPTY_BUFFER, compositeByteBuf));

rsocket.requestChannel(requestPayloads)
.doOnNext(payload -> log.info("=====> doOnNext {}", payload.getDataUtf8()))
.doOnError(error -> log.error("=====> doOnError", error))
.doOnComplete(() -> log.info("=====> doOnComplete"))
.blockLast(Duration.ofSeconds(10));
}
shuang
81 天前
方案二:使用 spring 封装的 RSocketRequester ,代码简洁易懂
RSocketClientConfig.java 关键代码:
@Bean
public RSocketRequester rsocketRequester(RSocketRequester.Builder builder) {
return RSocketRequester.wrap(
rsocket(),
MimeTypeUtils.APPLICATION_JSON,
MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString()),
rsocketStrategies());
}
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2JsonEncoder()))
.decoders(decoders -> decoders.add(new Jackson2JsonDecoder()))
.build();
}

上传附件的单元测试代码:
@Test
public void uploadFile() {
this.rSocketRequester
.route(platformConfig.getFileUploadRoute())
.metadata(spec -> spec.metadata(getUploadFileHeader(), MimeTypeUtils.APPLICATION_JSON))
.data(DataBufferUtils.read(fileResource, new DefaultDataBufferFactory(), 1024 * 8))
.retrieveFlux(String.class)
.doOnNext(payload -> log.info("=====> doOnNext {}", payload))
.doOnError(error -> log.error("=====> doOnError", error))
.doOnComplete(() -> log.info("=====> doOnComplete"))
.blockLast();
}

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/1137709

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX