询问本地 flink 应用写入到阿里云主机时遇到写入不了的问题

291 天前
 summerlv

问题是这样的,本来是在本地开发 flink 应用,然后想连接云主机 Doris 把内容写入到 Doris 中,但是一开始报错是返回了远程 Doris 运行时的一个内网地址。

于是改了以下源码的这个类:BackendV2 在其中增加了 convertToHostname() 方法将内网地址映射为公网地址

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.rest.models;

import com.car.common.Constant;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.List;
import java.util.Objects;

/**
 * Be response model
 **/
@JsonIgnoreProperties(ignoreUnknown = true)
public class BackendV2 {

    @JsonProperty(value = "backends")
    private List<BackendRowV2> backends;

    public void setBackends(List<BackendRowV2> backends) { this.backends = backends; }

    public static class BackendRowV2 {
        @JsonProperty("ip")
        public String ip;
        @JsonProperty(value="http_port")
        public int httpPort;
        @JsonProperty("is_alive")
        public boolean isAlive;
        
        public String getIp() {
            return ip;
        }

        public void setIp(String ip) {
            this.ip = ip;
        }

        public int getHttpPort() {
            return httpPort;
        }

        public void setHttpPort(int httpPort) {
            this.httpPort = httpPort;
        }

        public boolean isAlive() {
            return isAlive;
        }

        public void setAlive(boolean alive) {
            isAlive = alive;
        }
        
        public String convertToHostname(String ip) {
            System.out.println("================"+ip);
            this.ip = ip;
            if(Objects.equals(ip,"192.168.0.107")) {
                return ip= Constant.HADOOP102;
            } else if(Objects.equals(ip,"192.168.0.108")) {
                return ip = Constant.HADOOP103;
            } else if (Objects.equals(ip,"192.168.0.109")) {
                return ip = Constant.HADOOP104;
            } else {
                return null;
            }
        }
        public String toBackendString(){
			return convertToHostname(ip) + ":" + httpPort;
            //return ip + ":" + httpPort;
        }

    }
}


本来的源码是这样的:

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.rest.models;

import com.car.common.Constant;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.List;
import java.util.Objects;

/**
 * Be response model
 **/
@JsonIgnoreProperties(ignoreUnknown = true)
public class BackendV2 {

    @JsonProperty(value = "backends")
    private List<BackendRowV2> backends;

    public void setBackends(List<BackendRowV2> backends) { this.backends = backends; }

    public static class BackendRowV2 {
        @JsonProperty("ip")
        public String ip;
        @JsonProperty(value="http_port")
        public int httpPort;
        @JsonProperty("is_alive")
        public boolean isAlive;
        
        public String getIp() {
            return ip;
        }

        public void setIp(String ip) {
            this.ip = ip;
        }

        public int getHttpPort() {
            return httpPort;
        }

        public void setHttpPort(int httpPort) {
            this.httpPort = httpPort;
        }

        public boolean isAlive() {
            return isAlive;
        }

        public void setAlive(boolean alive) {
            isAlive = alive;
        }
        
       
        public String toBackendString(){
            return ip + ":" + httpPort;
        }

    }
}

但是替换完以后报错如下:

Caused by: org.apache.doris.shaded.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "http_port" (class org.apache.doris.flink.rest.models.BackendV2$BackendRowV2), not marked as ignorable (4 known properties: "httpPort", "isAlive", "alive", "ip"])
 at [Source: (String)"{"backends":[{"ip":"192.168.0.109","http_port":7040,"is_alive":true}]}"; line: 1, column: 52] (through reference chain: org.apache.doris.flink.rest.models.BackendV2["backends"]->java.util.ArrayList[0]->org.apache.doris.flink.rest.models.BackendV2$BackendRowV2["http_port"])
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:61)
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.DeserializationContext.handleUnknownProperty(DeserializationContext.java:1127)
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:2023)
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1700)
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownVanilla(BeanDeserializerBase.java:1678)
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:319)
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:176)
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.std.CollectionDeserializer._deserializeFromArray(CollectionDeserializer.java:355)
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:244)
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:28)
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.impl.MethodProperty.deserializeAndSet(MethodProperty.java:129)
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:313)
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:176)
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:323)
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4674)
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3629)
	at org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3597)
	at org.apache.doris.flink.rest.RestService.parseBackendV2(RestService.java:380)

替换前报错如下:

2023-07-25 20:54:43 WARN (org.apache.doris.flink.sink.writer.DorisWriter:tryHttpConnection) - Failed to connect to backend:http://192.168.0.109:7040
java.net.ConnectException: Connection timed out: connect
	at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
	at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
	at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:607)
	at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
	at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
	at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
	at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
	at sun.net.www.http.HttpClient.New(HttpClient.java:339)
	at sun.net.www.http.HttpClient.New(HttpClient.java:357)
	at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1228)
	at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1162)
	at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1056)
	at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:990)
	at org.apache.doris.flink.sink.writer.DorisWriter.tryHttpConnection(DorisWriter.java:259)
	at org.apache.doris.flink.sink.writer.DorisWriter.getAvailableBackend(DorisWriter.java:245)
	at org.apache.doris.flink.sink.writer.DorisWriter.initializeLoad(DorisWriter.java:108)
	at org.apache.doris.flink.sink.DorisSink.createWriter(DorisSink.java:64)
	at org.apache.flink.streaming.api.transformations.SinkV1Adapter.createWriter(SinkV1Adapter.java:77)
	at org.apache.flink.streaming.api.transformations.SinkV1Adapter$PlainSinkAdapter.createWriter(SinkV1Adapter.java:306)
	at org.apache.flink.streaming.api.transformations.SinkV1Adapter$StatefulSinkAdapter.createWriter(SinkV1Adapter.java:315)
	at org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:117)
	at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:146)
	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
	at java.lang.Thread.run(Thread.java:750)

想问问大家有没遇到过本地 flink 应用远程连接 Doris 并写入数据的问题啊? 感谢大家!!!

664 次点击
所在节点    程序员
1 条回复
liprais
291 天前
端口转发到本地不就完了

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/959686

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX