[ gev ] Go 语言优雅处理 TCP 粘包

2019-11-01 09:24:26 +08:00
 xuxu555

https://github.com/Allenxuxu/gev

gev 是一个轻量、快速的基于 Reactor 模式的非阻塞 TCP 网络库,支持自定义协议,轻松快速搭建高性能服务器。

TCP 为什么会粘包

TCP 本身就是面向流的协议,就是一串没有界限的数据。所以本质上来说 TCP 粘包是一个伪命题。

TCP 底层并不关心上层业务数据,会套接字缓冲区的实际情况进行包的划分,一个完整的业务数据可能会被拆分成多次进行发送,也可能会将多个小的业务数据封装成一个大的数据包发送( Nagle 算法)。

gev 如何优雅处理

gev 通过回调函数 OnMessage 通知用户数据到来,回调函数中会将用户数据缓冲区( ringbuffer )通过参数传递过来。

用户通过对 ringbuffer 操作,来进行数据解包,获取到完整用户数据后再进行业务操作。这样又一个明显的缺点,就是会让业务操作和自定义协议解析代码堆在一起。

所以,最近对 gev 进行了一次较大改动,主要是为了能够以插件的形式支持各种自定义的数据协议,让使用者可以便捷处理 TCP 粘包问题,专注于业务逻辑。

做法如下,定义一个接口 Protocol

// Protocol 自定义协议编解码接口
type Protocol interface {
	UnPacket(c *Connection, buffer *ringbuffer.RingBuffer) (interface{}, []byte)
	Packet(c *Connection, data []byte) []byte
}

用户只需实现这个接口,并注册到 server 中,当客户端数据到来时,gev 会首先调用 UnPacket 方法,如果缓冲区中的数据足够组成一帧,则将数据解包,并返回真正的用户数据,然后在回调 OnMessage 函数并将数据通过参数传递。

下面,我们实现一个简单的自定义协议插件,来启动一个 Server:

| 数据长度 n |  payload |
|  4 字节    |  n 字节   |
// protocol.go
package main

import (
	"encoding/binary"
	"github.com/Allenxuxu/gev/connection"
	"github.com/Allenxuxu/ringbuffer"
	"github.com/gobwas/pool/pbytes"
)

const exampleHeaderLen = 4

type ExampleProtocol struct{}

func (d *ExampleProtocol) UnPacket(c *connection.Connection, buffer *ringbuffer.RingBuffer) (interface{}, []byte) {
	if buffer.VirtualLength() > exampleHeaderLen {
		buf := pbytes.GetLen(exampleHeaderLen)
		defer pbytes.Put(buf)
		_, _ = buffer.VirtualRead(buf)
		dataLen := binary.BigEndian.Uint32(buf)

		if buffer.VirtualLength() >= int(dataLen) {
			ret := make([]byte, dataLen)
			_, _ = buffer.VirtualRead(ret)

			buffer.VirtualFlush()
			return nil, ret
		} else {
			buffer.VirtualRevert()
		}
	}
	return nil, nil
}

func (d *ExampleProtocol) Packet(c *connection.Connection, data []byte) []byte {
	dataLen := len(data)
	ret := make([]byte, exampleHeaderLen+dataLen)
	binary.BigEndian.PutUint32(ret, uint32(dataLen))
	copy(ret[4:], data)
	return ret
}
// server.go
package main

import (
	"flag"
	"log"
	"strconv"

	"github.com/Allenxuxu/gev"
	"github.com/Allenxuxu/gev/connection"
)

type example struct{}

func (s *example) OnConnect(c *connection.Connection) {
	log.Println(" OnConnect: ", c.PeerAddr())
}
func (s *example) OnMessage(c *connection.Connection, ctx interface{}, data []byte) (out []byte) {
	log.Println("OnMessage:", data)
	out = data
	return
}

func (s *example) OnClose(c *connection.Connection) {
	log.Println("OnClose")
}

func main() {
	handler := new(example)
	var port int
	var loops int

	flag.IntVar(&port, "port", 1833, "server port")
	flag.IntVar(&loops, "loops", -1, "num loops")
	flag.Parse()

	s, err := gev.NewServer(handler,
		gev.Address(":"+strconv.Itoa(port)),
		gev.NumLoops(loops),
		gev.Protocol(&ExampleProtocol{}))
	if err != nil {
		panic(err)
	}

	log.Println("server start")
	s.Start()
}

完整代码地址

当回调 OnMessage 函数的时候,会通过参数传递已经拆好包的用户数据。

当我们需要使用其他协议时,仅仅需要实现一个 Protocol 插件,然后只要 gev.NewServer 时指定即可:

gev.NewServer(handler, gev.NumLoops(2), gev.Protocol(&XXXProtocol{}))

基于 Protocol Plugins 模式为 gev 实现 WebSocket 插件

得益于 Protocol Plugins 模式的引进,我可以将 WebSocket 的实现做成一个插件( WebSocket 协议构建在 TCP 之上),独立于 gev 之外。

package websocket

import (
	"log"

	"github.com/Allenxuxu/gev/connection"
	"github.com/Allenxuxu/gev/plugins/websocket/ws"
	"github.com/Allenxuxu/ringbuffer"
)

// Protocol websocket
type Protocol struct {
	upgrade *ws.Upgrader
}

// New 创建 websocket Protocol
func New(u *ws.Upgrader) *Protocol {
	return &Protocol{upgrade: u}
}

// UnPacket 解析 websocket 协议,返回 header,payload
func (p *Protocol) UnPacket(c *connection.Connection, buffer *ringbuffer.RingBuffer) (ctx interface{}, out []byte) {
	upgraded := c.Context()
	if upgraded == nil {
		var err error
		out, _, err = p.upgrade.Upgrade(buffer)
		if err != nil {
			log.Println("Websocket Upgrade :", err)
			return
		}
		c.SetContext(true)
	} else {
		header, err := ws.VirtualReadHeader(buffer)
		if err != nil {
			log.Println(err)
			return
		}
		if buffer.VirtualLength() >= int(header.Length) {
			buffer.VirtualFlush()

			payload := make([]byte, int(header.Length))
			_, _ = buffer.Read(payload)

			if header.Masked {
				ws.Cipher(payload, header.Mask, 0)
			}

			ctx = &header
			out = payload
		} else {
			buffer.VirtualRevert()
		}
	}
	return
}

// Packet 直接返回
func (p *Protocol) Packet(c *connection.Connection, data []byte) []byte {
	return data
}

具体的实现,可以到仓库的 plugins/websocket 查看。

相关文章

项目地址

https://github.com/Allenxuxu/gev

7035 次点击
所在节点    分享创造
64 条回复
tabris17
2019-11-01 10:37:10 +08:00
请问能不能解决 TCP 叉骚包?
wysnylc
2019-11-01 10:39:48 +08:00
请问肉包涨价了吗
xuxu555
2019-11-01 10:40:13 +08:00
@laoyur 过于真实。。
scukmh
2019-11-01 10:40:58 +08:00
坐等被喷,喷子还有多久到位?
heiheidewo
2019-11-01 10:48:40 +08:00
楼主说的粘包,就是大一学生也知道是指应用层协议的分包,不知道有啥好喷的
heiheidewo
2019-11-01 10:50:08 +08:00
@xuxu555 是的,但是代码少,一眼就可以看懂,所以敢用
b821025551b
2019-11-01 10:52:11 +08:00
@heiheidewo #25 对不起,我大一净学高数大物之类的了,还挂了。
laminux29
2019-11-01 11:12:23 +08:00
1.TCP 根本没粘包这个说法。如果出现所谓的粘包,本质是程序员没有学习计算机网络知识,把不同业务的 TCP 数据混在一起乱发。

2.如果程序员不想去啃计算机网络书籍,建议还是用成熟框架去做通信,比如 thrift、WCF 等等。
xuxu555
2019-11-01 11:20:33 +08:00
@laminux29 “TCP 本身就是面向流的协议,就是一串没有界限的数据。所以本质上来说 TCP 粘包是一个伪命题。” 文中指出了的。
hpeng
2019-11-01 11:28:13 +08:00
不懂就问,直接 TCP_NODELAY 不行么?
laminux29
2019-11-01 11:28:24 +08:00
所以题干与标题相冲突,并且是楼主自己指出来的?那楼主干嘛提这个问题..66666
xuxu555
2019-11-01 11:30:15 +08:00
@laminux29 标题没加引号。
xuxu555
2019-11-01 11:31:22 +08:00
@hpeng 数据流还是得自己切包的
hpeng
2019-11-01 11:34:30 +08:00
@xuxu555 我以为大家都知道 tcp 这个。为了减少时延,关掉 Nagle 不就好了么?噢,我好像明白了,题文无关是么。(手动捂脸
ipwx
2019-11-01 11:36:28 +08:00
@hpeng 不不不,你理解错了。所谓的 TCP 粘包虽然是民科说法,但这个问题确实存在。我重新组织一下正规说法:

设我有一列字节流,我有一个 packet protocol specification,要求我根据 protocol specification 把字节流切分成 packet。

比如 packet specification 是:

|Header: body_size(int)|Body: content(bytes[body_size])|

那么我就要把每 4 + body_size 个 bytes 当成一个 packet 返回给上层应用程序。这就是所谓的 TCP 粘包处理,和打开关闭 TCP 连接没有关系。
xuxu555
2019-11-01 11:37:03 +08:00
@hpeng 嗯。。 我深刻反思了下。。。标题该加引号的
hpeng
2019-11-01 11:39:04 +08:00
@ipwx 关了那个算法,还会有这种情况吗?不会组装大包了之后?
xuxu555
2019-11-01 11:41:56 +08:00
@ipwx 不懂就问,这种问题 学名 叫啥
xuxu555
2019-11-01 11:43:33 +08:00
@hpeng 就算关了那个算法,网络延迟啥的,也可能会让你面临这个问题
ipwx
2019-11-01 11:46:13 +08:00
@xuxu555 其实不仅是 expect_length,还有 expect_delimeter,比如读取直到遇见 \r\n (方便解析 HTTP Header )。

这样的话 Protocol 类型只要写成一个状态机就行了。什么 early disconnection,缓冲区什么的管理,都是你的库来处理,上层应用就相当方便了。

顺便我这也不是什么新思路,很多库都有这种接口。

C++ Boost::Asio:

https://www.boost.org/doc/libs/1_71_0/doc/html/boost_asio/reference/async_read.html
https://www.boost.org/doc/libs/1_71_0/doc/html/boost_asio/reference/async_read_until.html

Python Tornado:

https://www.tornadoweb.org/en/stable/iostream.html#tornado.iostream.BaseIOStream.read_into
https://www.tornadoweb.org/en/stable/iostream.html#tornado.iostream.BaseIOStream.read_until

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/615096

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX