用 netty 编写代理服务器,切换出口 IP,不能及时生效

23 小时 36 分钟前
 montaro2017

公司有一台服务器,有很多个公网 IP ,就想着能不能利用起来。

然后现在有一个任务是用浏览器打开指定网址,返回网页源代码,我就打算把这个服务器做成代理服务器。

本来是计划每个 IP 做一个代理服务器,代理服务器根据入口 IP 用对应的 IP 连接目标服务器。

开启每个浏览器的时候设置代理地址,比如 1.2.3.4:8639, 1.2.3.5:8639 这样 。

然后发现多开浏览器非常吃性能,要充分利用所有的公网 IP 得开几十个浏览器,这时候已经卡到动不了了,肯定不行。

所以我就想开 4 个浏览器,每个浏览器设置一个代理,然后通过接口去切换代理后端的出口。

代理服务器是用 netty 写的,逻辑改成了绑定不同端口,然后通过接口指定端口号和出口 IP ,来切换不同端口对应代理的出口 IP 。

其实就是存了一个 Map<Integer, String>,调接口修改这个 map ,netty 代理服务器连接目标服务器的时候使用出口地址去连接。

现在问题在于,调用接口切换出口 IP 后,日志显示已经使用新的出口 IP 了,但是访问查询 IP 的网站,还是使用之前的 IP ,好像要等一段时间才生效,这是什么问题,求各位大佬指教

@Log4j2
public class ProxyFrontendHandler extends SimpleChannelInboundHandler<FullHttpRequest> {

    private final AddressFunction addressFunction;

    public ProxyFrontendHandler(AddressFunction addressFunction) {
        this.addressFunction = addressFunction;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) {
        if (HttpMethod.CONNECT.equals(req.method())) {
            handleConnectRequest(ctx, req);
            return;
        }
        handleHttpRequest(ctx, req);
    }

    private void handleConnectRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
        List<String> split = StrUtil.split(req.uri(), ":");
        String host = CollUtil.getFirst(split);
        int port = Convert.toInt(CollUtil.get(split, 1), 443);

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(ctx.channel().eventLoop())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ch.pipeline().addLast(new RelayHandler(ctx.channel()));
                    }
                });

        ChannelFuture connectFuture;
        InetSocketAddress remoteAddress = new InetSocketAddress(host, port);
        InetSocketAddress sourceAddress = addressFunction.apply(ctx);
        if (sourceAddress != null) {
            log.info("Using outbound: {} | host: {}", sourceAddress, remoteAddress.getHostString());
            connectFuture = bootstrap.connect(remoteAddress, sourceAddress);
        } else {
            connectFuture = bootstrap.connect(remoteAddress);
        }
        connectFuture.addListener((ChannelFutureListener) future -> {
            if (future.isSuccess()) {
                Channel outboundChannel = future.channel();
                DefaultFullHttpResponse response = new DefaultFullHttpResponse(
                        HttpVersion.HTTP_1_1,
                        HttpResponseStatus.OK
                );
                response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
                ctx.writeAndFlush(response).addListener((ChannelFutureListener) f -> {
                    try {
                        ctx.pipeline().remove(HttpServerCodec.class);
                        ctx.pipeline().remove(HttpObjectAggregator.class);
                        ctx.pipeline().addLast(new RelayHandler(outboundChannel));
                    } catch (Exception ignored) {
                    }
                });
            } else {
                sendErrorResponse(ctx, "无法连接到目标服务器");
                closeOnFlush(ctx.channel());
            }
        });
    }

    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
        String host = req.headers().get(HttpHeaderNames.HOST);
        if (host == null) {
            sendErrorResponse(ctx, "缺少 Host 头");
            closeOnFlush(ctx.channel());
            return;
        }
        String[] hostParts = host.split(":");
        String targetHost = hostParts[0];
        int targetPort = hostParts.length > 1 ? Integer.parseInt(hostParts[1]) : 80;
        // 修改请求 URI 为绝对路径
        req.setUri(req.uri().replace("http://" + host, ""));
        req.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);

        // 复制请求以避免在异步操作期间被释放
        FullHttpRequest copiedReq = req.copy();
        // 创建到目标服务器的连接
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(ctx.channel().eventLoop())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ch.pipeline().addLast(new HttpClientCodec());
                        ch.pipeline().addLast(new HttpObjectAggregator(1024 * 1024)); // 增加到 1MB
                        ch.pipeline().addLast(new RelayHandler(ctx.channel()));
                    }
                });
        ChannelFuture connectFuture;
        InetSocketAddress remoteAddress = new InetSocketAddress(targetHost, targetPort);
        InetSocketAddress sourceAddress = addressFunction.apply(ctx);
        if (sourceAddress != null) {
            log.info("Using outbound: {} | host: {}", sourceAddress, remoteAddress.getHostString());
            connectFuture = bootstrap.connect(remoteAddress, sourceAddress);
        } else {
            connectFuture = bootstrap.connect(remoteAddress);
        }
        connectFuture.addListener((ChannelFutureListener) future -> {
            if (future.isSuccess()) {
                future.channel().writeAndFlush(copiedReq);
            } else {
                closeOnFlush(ctx.channel());
            }
            if (copiedReq.refCnt() != 0) {
                copiedReq.release();
            }
        });
    }

    private void sendErrorResponse(ChannelHandlerContext ctx, String message) {
        FullHttpResponse response = new DefaultFullHttpResponse(
                HttpVersion.HTTP_1_1,
                HttpResponseStatus.BAD_GATEWAY,
                Unpooled.wrappedBuffer(message.getBytes())
        );
        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
        response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
        ctx.writeAndFlush(response);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        if (cause instanceof SocketException) {
            closeOnFlush(ctx.channel());
            return;
        }
        log.error(cause.getMessage());
        closeOnFlush(ctx.channel());
    }

    private void closeOnFlush(Channel ch) {
        if (ch.isActive()) {
            ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
        }
    }
}
@Log4j2
public class RelayHandler extends ChannelInboundHandlerAdapter {

    private final Channel relayChannel;

    public RelayHandler(Channel relayChannel) {
        this.relayChannel = relayChannel;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.read();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (relayChannel.isActive()) {
            relayChannel.writeAndFlush(msg).addListener((ChannelFutureListener) future -> {
                if (future.isSuccess()) {
                    ctx.read(); // 继续读取数据
                } else {
                    future.channel().close();
                }
            });
        } else {
            closeOnFlush(ctx.channel());
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.error(cause);
        closeOnFlush(ctx.channel());
    }

    private void closeOnFlush(Channel ch) {
        if (ch.isActive()) {
            ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
        }
    }
}

1077 次点击
所在节点    Java
15 条回复
aladdinding
23 小时 28 分钟前
浏览器连接池的问题吧
Ipsum
23 小时 14 分钟前
我猜没有保存 conn ,切换时,没有关闭旧链接。
montaro2017
23 小时 10 分钟前
@Ipsum #2
ChannelFuture connectFuture;
InetSocketAddress remoteAddress = new InetSocketAddress(targetHost, targetPort);
InetSocketAddress sourceAddress = addressFunction.apply(ctx);
if (sourceAddress != null) {
log.info("Using outbound: {} | host: {}", sourceAddress, remoteAddress.getHostString());
connectFuture = bootstrap.connect(remoteAddress, sourceAddress);
} else {
connectFuture = bootstrap.connect(remoteAddress);
}
这里在连接目标服务器的时候获取了出口地址,然后指定用这个地址去连接的,日志有打印出来,只是连接的时候不知道为什么用的还是之前的出口地址
montaro2017
22 小时 57 分钟前
@aladdinding #1 还真是这个问题,我用 curl 试了,IP 立马就变了
montaro2017
22 小时 48 分钟前
@aladdinding #1 但为啥我日志里显示是用新的出口 IP
cq65617875
22 小时 24 分钟前
浏览器的缓存 用 curl
cq65617875
22 小时 23 分钟前
@cq65617875 或者尝试每次测试都新起一个隐私窗口
montaro2017
22 小时 16 分钟前
@cq65617875 #7 就是因为每次开新窗口开销大,才选择代理后端切换的,浏览器是用 selenium 控制的
5waker
22 小时 7 分钟前
```rust
5waker
22 小时 2 分钟前
@montaro2017 我刚好昨天也遇到了类似的问题,我的做法是在一个长连接里不断检测 header ,然后根据内容再做转发
```rust
loop {
// 读取到完整头部
let header_end = loop {
if let Some(pos) = headers_end_pos(&buf) {
break Some(pos);
}
let n = client_stream.read(&mut tmp).await?;
if n == 0 {
// 客户端关闭或无更多数据
if buf.is_empty() {
return Ok(());
} else {
return Ok(());
}
}
buf.extend_from_slice(&tmp[..n]);
if buf.len() > 128 * 1024 {
error!("请求头过大,终止连接");
return Ok(());
}
};
let header_end = header_end.unwrap();
let headers_vec: Vec<u8> = buf[..header_end].to_vec();

let virtual_env = parse_virtual_env_from_headers(&headers_vec);
let content_length = parse_content_length(&headers_vec);
let chunked = is_chunked(&headers_vec);

// 读取完整正文
let body_len = if let Some(cl) = content_length {
while buf.len() < header_end + cl {
let n = client_stream.read(&mut tmp).await?;
if n == 0 {
error!("Content-Length 指定但连接提前关闭");
return Ok(());
}
buf.extend_from_slice(&tmp[..n]);
}
cl
} else if chunked {
loop {
if let Some(end) = chunked_body_end_pos(&buf[header_end..]) {
break end;
}
let n = client_stream.read(&mut tmp).await?;
if n == 0 {
error!("chunked 正文未完整但连接已关闭");
return Ok(());
}
buf.extend_from_slice(&tmp[..n]);
}
} else {
0
};

let request_end = header_end + body_len;
let body = &buf[header_end..request_end];
let mut new_headers = rewrite_connection_close(&headers_vec);
new_headers.extend_from_slice(body);

// 路由:每个请求一个目标连接;响应仅回写到客户端
if let Some(env) = virtual_env {
let ctrl_opt = {
let envs = (*VIRTUAL_ENVS).lock().unwrap();
envs.get(&env).cloned()
};
if let Some(mut ctrl) = ctrl_opt {
match ctrl.open_stream().await {
Ok(mut sub) => {
sub.write_all(&new_headers).await?;
let _ = tokio::io::copy(&mut sub, &mut client_stream).await;
}
Err(e) => {
error!("打开虚拟环境 {} 的子流失败: {:?}", env, e);
let mut upstream_stream = TcpStream::connect(upstream_addr).await?;
upstream_stream.write_all(&new_headers).await?;
let _ = tokio::io::copy(&mut upstream_stream, &mut client_stream).await;
}
}
} else {
let mut upstream_stream = TcpStream::connect(upstream_addr).await?;
upstream_stream.write_all(&new_headers).await?;
let _ = tokio::io::copy(&mut upstream_stream, &mut client_stream).await;
}
} else {
let mut upstream_stream = TcpStream::connect(upstream_addr).await?;
upstream_stream.write_all(&new_headers).await?;
let _ = tokio::io::copy(&mut upstream_stream, &mut client_stream).await;
}

// 删除已消费的请求字节,保留后续请求(若已到达)
if request_end < buf.len() {
let remaining = buf.split_off(request_end);
buf = remaining;
} else {
buf.clear();
}
}
```
montaro2017
21 小时 57 分钟前
@5waker #10 我要用浏览器自动化来获取网页源代码,所以只能用代理服务器
Gilfoyle26
21 小时 26 分钟前
用 c 写,解决一切烦恼
ronyin
21 小时 22 分钟前
这是搞爬虫么。。。
montaro2017
21 小时 7 分钟前
@ronyin #13 对
testFor
12 小时 48 分钟前
写这个 netty 不如 go 好用

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/1169319

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX