用 Golang 实现基于 Redis 的安全高效 RPC 通信

2020-03-17 09:28:11 +08:00
 tikazyq

前言

RPC ( Remote Procedure Call ),翻译过来为“远程过程调用”,是一种分布式系统中服务或节点之间的有效通信机制。通过 RPC,某个节点(或客户端)可以很轻松的调用远端(或服务端)的方法或服务,就像在本地调用一样简单。现有的很多 RPC 框架都要求暴露服务端地址,也就是需要知道服务器的 IP 和 RPC 端口。而本篇文章将介绍一种不需要暴露 IP 地址和端口的 RPC 通信方式。这种方式是基于 Redis BRPOP/BLPOP 操作实现的延迟队列,以及 Golang 中的 goroutine 协程异步机制,整个框架非常简单和易于理解,同时也很高效、稳定和安全。这种方式已经应用到了 Crawlab 中的节点通信当中,成为了各节点即时传输信息的主要方式。下面我们将从 Crawlab 早期节点通信方案 PubSub 开始,介绍当时遇到的问题和解决方案,然后如何过渡到现在的 RPC 解决方案,以及它是如何在 Crawlab 中发挥作用的。

基于 PubSub 的方案

PubSub

早期的 Crawlab 是基于 Redis 的 PubSub,也就是发布订阅模式。这是 Redis 中主要用于一对多的单向通信的方案。其用法非常简单:

  1. 订阅者利用SUBSCRIBE channel1 channel2 ...来订阅一个或多个频道;
  2. 发布者利用PUBLISH channelx message来发布消息给该频道的订阅者。

Redis 的PubSub可以用作广播模式,即一个发布者对应多个订阅者。而在 Crawlab 中,我们只有一个订阅者对应一个发布者的情况(主节点->工作节点:nodes:<node_id>)或一个订阅者对应多个发布者的情况(工作节点->主节点:nodes:master>)。这是为了方便双向通信。

以下为节点通信原理示意图。

各个节点会通过 Redis 的PubSub功能来做相互通信。

所谓PubSub,简单来说是一个发布订阅模式。订阅者( Subscriber )会在 Redis 上订阅( Subscribe )一个通道,其他任何一个节点都可以作为发布者( Publisher )在该通道上发布( Publish )消息。

通信架构

在 Crawlab 中,主节点会订阅 nodes:master 通道,其他节点如果需要向主节点发送消息,只需要向 nodes:master 发布消息就可以了。同理,各工作节点会各自订阅一个属于自己的通道 nodes:<node_id>node_id 是 MongoDB 里的节点 ID,是 MongoDB ObjectId ),如果需要给工作节点发送消息,只需要发布消息到该通道就可以了。

一个网络请求的简单过程如下:

  1. 客户端(前端应用)发送请求给主节点( API );
  2. 主节点通过 Redis PubSub<nodes:<node_id> 通道发布消息给相应的工作节点;
  3. 工作节点收到消息之后,执行一些操作,并将相应的消息通过 <nodes:master> 通道发布给主节点;
  4. 主节点收到消息之后,将消息返回给客户端。

不是所有节点通信都是双向的,也就是说,主节点只会单方面对工作节点通信,工作节点并不会返回响应给主节点,所谓的单向通信。以下是 Crawlab 的通信类型。

changoroutine

如果您在阅读 Crawlab 源码,会发现节点通信中有大量的 chan 语法,这是 Golang 的一个并发特性。

chan 表示为一个通道,在 Golang 中分为无缓冲和有缓冲的通道,我们用了无缓冲通道来阻塞协程,只有当 chan 接收到信号(chan <- "some signal"),该阻塞才会释放,协程进行下一步操作)。在请求响应模式中,如果为双向通信,主节点收到请求后会起生成一个无缓冲通道来阻塞该请求,当收到来自工作节点的消息后,向该无缓冲通道赋值,阻塞释放,返回响应给客户端。

go 命令会起一个 goroutine(协程)来完成并发,配合 chan,该协程可以利用无缓冲通道挂起,等待信号执行接下来的操作。

基于延迟队列的方案

PubSub 方案的问题

PubSub 这种消息订阅-发布设计模式是一种有效的实现节点通信的方式,但是它有两个问题:

  1. PubSub 的数据是即时的,会随着 Redis 宕机而丢失;
  2. 写基于 PubSub 的通信服务会要求用到 goroutinechannel,这加大了开发难度,降低了可维护性。

其中,第二个问题是比较棘手的。如果我们希望加入更多的功能,需要写大量的异步代码,这会加大系统模块间的耦合度,造成扩展性很差,而且代码阅读起来很痛苦。

因此,为了解决这个问题,我们采用了基于 Redis 延迟消息队列的 RPC 服务。

延迟队列架构

下图是基于延迟队列架构的 RPC 实现示意图。

每一个节点都有一个客户端( Client )和服务端( Server )。客户端用于发送消息到目标节点( Target Node )并接收其返回的消息,服务端用于接收、处理源节点( Source Node )的消息并返回消息给源节点的客户端。

整个 RPC 通信的流程如下:

  1. 源节点的客户端通过 LPUSH 将消息推送到 Redis 的 nodes:<node_id> 中,并执行 BRPOP nodes:<node_id>:<msg_id> 阻塞并监听这个消息队列;
  2. 目标节点的服务端通过 BRPOP 一直在监听 nodes:<node_id>,收到消息后,通过消息中的 Method 字段执行对应的程序;
  3. 目标节点执行完毕后,服务端通过 LPUSH 将消息推送到 Redis 的 nodes:<node_id>:<msg_id> 中;
  4. 由于源节点客户端一直在监听 nodes:<node_id>:<msg_id> 这个消息队列,当目标节点服务端推送消息到这个队列后,源节点客户端将立即收到返回的消息,再做后续处理。

这样,整个节点的通信流程就通过 Redis 完成了。这样做的好处在于不用暴露 HTTP 的 IP 地址和端口,只需要知道节点 ID 即可完成 RPC 通信。

这样设计后的 RPC 代码比较容易理解和维护。每次需要扩展新的通信类别时,只需要继承 rpc.Service 类,实现 ClientHandle(客户端处理方法)和 ServerHandle(服务端处理方法)方法就可以了。

这里多说一下 BRPOP。它将移出并获取消息队列的最后一个元素, 如果消息队列没有元素会阻塞队列直到等待超时或发现可弹出元素为止。因此,使用 BRPOP 命令相对于轮训或其他方式,可以避免不间断的请求 Redis,避免浪费网络和计算资源。

如果对 Redis 的操作命令不熟悉的,可以参考一下掘金小册《 Redis 深度历险:核心原理与应用实践》,这本小册深入介绍了 Redis 的原理以及工程实践,对于应用 Redis 到实际开发中非常实用。

代码实践

讲了这么多理论知识,我们还是需要看看代码的。老师常教育我们:“Talk is cheap. Show me the code.”

由于 Crawlab 后端是 Golang 开发的,要理解以下代码需要一些 Golang 的基础知识。

消息数据结构

首先我们需要定一个传输消息的数据结构。代码如下。

package entity

type RpcMessage struct {
	Id      string            `json:"id"`      // 消息 ID
	Method  string            `json:"method"`  // 消息方法
	NodeId  string            `json:"node_id"` // 节点 ID
	Params  map[string]string `json:"params"`  // 参数
	Timeout int               `json:"timeout"` // 超时
	Result  string            `json:"result"`  // 结果
	Error   string            `json:"error"`   // 错误
}

这里,我们定义了消息 ID、方法、节点 ID、参数等字段。消息 ID 是 UUID,保证了消息 ID 的唯一性。

基础接口

首先,我们定义一个抽象基础接口,方便让实际业务逻辑模块继承。服务端的处理逻辑在 ServerHandle 中,返回 entity 里的 RpcMessage,而客户端的逻辑在 ClientHandle 中。

// RPC 服务基础类
type Service interface {
	ServerHandle() (entity.RpcMessage, error)
	ClientHandle() (interface{}, error)
}

客户端通用方法

当我们调用客户端的通用方法的时候,需要实现两个逻辑:

  1. 发送消息:生成消息 ID,将消息序列化为 JSON,LPUSH 推入 Redis 消息队列;
  2. 通过 BRPOP 延迟获取返回的消息,返回给调用方。

以下是实现的代码。

// 客户端处理消息函数
func ClientFunc(msg entity.RpcMessage) func() (entity.RpcMessage, error) {
	return func() (replyMsg entity.RpcMessage, err error) {
		// 请求 ID
		msg.Id = uuid.NewV4().String()

		// 发送 RPC 消息
		msgStr := utils.ObjectToString(msg)
		if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s", msg.NodeId), msgStr); err != nil {
			log.Errorf("RpcClientFunc error: " + err.Error())
			debug.PrintStack()
			return replyMsg, err
		}

		// 获取 RPC 回复消息
		dataStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s:%s", msg.NodeId, msg.Id), msg.Timeout)
		if err != nil {
			log.Errorf("RpcClientFunc error: " + err.Error())
			debug.PrintStack()
			return replyMsg, err
		}

		// 反序列化消息
		if err := json.Unmarshal([]byte(dataStr), &replyMsg); err != nil {
			log.Errorf("RpcClientFunc error: " + err.Error())
			debug.PrintStack()
			return replyMsg, err
		}

		// 如果返回消息有错误,返回错误
		if replyMsg.Error != "" {
			return replyMsg, errors.New(replyMsg.Error)
		}

		return
	}
}

服务端处理

服务端处理的逻辑如下,大致的逻辑是:

  1. 在一个循环中,通过 BRPOP 获取该节点对应的消息;
  2. 当获取到消息时,生成一个 goroutine 异步处理该消息;
  3. 继续等待。

您可以在 InitRpcService 这个方法中看到上述逻辑。私有方法 handleMsg 实现了序列化、调用服务端 RPC 服务方法、发送返回消息的逻辑。如果需要拓展 RPC 方法类型,在工厂类方法 GetService 里添加就可以了。

// 获取 RPC 服务
func GetService(msg entity.RpcMessage) Service {
	switch msg.Method {
	case constants.RpcInstallLang:
		return &InstallLangService{msg: msg}
	case constants.RpcInstallDep:
		return &InstallDepService{msg: msg}
	case constants.RpcUninstallDep:
		return &UninstallDepService{msg: msg}
	case constants.RpcGetLang:
		return &GetLangService{msg: msg}
	case constants.RpcGetInstalledDepList:
		return &GetInstalledDepsService{msg: msg}
	}
	return nil
}

// 处理 RPC 消息
func handleMsg(msgStr string, node model.Node) {
	// 反序列化消息
	var msg entity.RpcMessage
	if err := json.Unmarshal([]byte(msgStr), &msg); err != nil {
		log.Errorf(err.Error())
		debug.PrintStack()
	}

	// 获取 service
	service := GetService(msg)

	// 根据 Method 调用本地方法
	replyMsg, err := service.ServerHandle()
	if err != nil {
		log.Errorf(err.Error())
		debug.PrintStack()
	}

	// 发送返回消息
	if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s:%s", node.Id.Hex(), replyMsg.Id), utils.ObjectToString(replyMsg)); err != nil {
		log.Errorf(err.Error())
		debug.PrintStack()
	}
}

// 初始化服务端 RPC 服务
func InitRpcService() error {
	go func() {
		for {
			// 获取当前节点
			node, err := model.GetCurrentNode()
			if err != nil {
				log.Errorf(err.Error())
				debug.PrintStack()
				continue
			}

			// 获取获取消息队列信息
			msgStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s", node.Id.Hex()), 0)
			if err != nil {
				if err != redis.ErrNil {
					log.Errorf(err.Error())
					debug.PrintStack()
				}
				continue
			}

			// 处理消息
			go handleMsg(msgStr, node)
		}
	}()
	return nil
}

调用方法例子

Crawlab 的节点上经常需要为爬虫安装一些第三方依赖,例如 pymongo、requests 等。而其中,我们也需要直到某个节点上是否已经安装了某个依赖,这需要跨服务器通信,也就是需要在分布式网络中进行双向通信。而这个逻辑是通过 RPC 实现的。主节点向目标节点发起 RPC 调用,目标节点运行被调用方法,将运行结果也就是安装的依赖列表返回给客户端,客户端再返回给调用者。

下面的代码实现了获取目标节点上已安装的依赖的 RPC 方法。

// 获取已安装依赖服务
// 继承 Service 基础类
type GetInstalledDepsService struct {
	msg entity.RpcMessage
}

// 服务端处理方法
// 重载 ServerHandle
func (s *GetInstalledDepsService) ServerHandle() (entity.RpcMessage, error) {
	lang := utils.GetRpcParam("lang", s.msg.Params)
	deps, err := GetInstalledDepsLocal(lang)
	if err != nil {
		s.msg.Error = err.Error()
		return s.msg, err
	}
	resultStr, _ := json.Marshal(deps)
	s.msg.Result = string(resultStr)
	return s.msg, nil
}

// 客户端处理方法
// 重载 ClientHandle
func (s *GetInstalledDepsService) ClientHandle() (o interface{}, err error) {
	// 发起 RPC 请求,获取服务端数据
	s.msg, err = ClientFunc(s.msg)()
	if err != nil {
		return o, err
	}

	// 反序列化
	var output []entity.Dependency
	if err := json.Unmarshal([]byte(s.msg.Result), &output); err != nil {
		return o, err
	}
	o = output

	return
}

发起调用

写好了 RPC 服务端和客户端处理方法,就可以轻松编写调用逻辑了。以下是调用获取远端已安装依赖列表的方法。首先由 GetService 工厂类获取之前定义好的 GetInstalledDepsService,再调用其客户端处理方法 ClientHandle,然后返回结果。这就像在本地调用方法一样。是不是很简单?

// 获取远端已安装依赖
func GetInstalledDepsRemote(nodeId string, lang string) (deps []entity.Dependency, err error) {
	params := make(map[string]string)
	params["lang"] = lang
	s := GetService(entity.RpcMessage{
		NodeId:  nodeId,
		Method:  constants.RpcGetInstalledDepList,
		Params:  params,
		Timeout: 60,
	})
	o, err := s.ClientHandle()
	if err != nil {
		return
	}
	deps = o.([]entity.Dependency)
	return
}

结语

本篇文章主要介绍了一种基于 Redis 延迟队列的 RPC 通信方式,这种方式不用暴露各个节点或服务的 IP 地址或端口,是一种非常安全的方式。而且,这种方式已经用 Golang 在 Crawlab 中实现了双向通信,特别是 Golang 中的天生支持异步的 goroutine,让这种方式的实现变得简单。实际上,这种方式理论上是非常高效的,能够支持高并发数据传输。

但是,在 Crawlab 的实现中还存在一些隐患,也就是它并没有限制服务端的处理并发数量。因此如果传输消息过多时,服务端资源会被占满,导致处理速度变慢甚至宕机的风险。修复方式是在服务端限制并发数量。另外,限于时间的原因,作者还没有来得及测试这种 RPC 通信方式的实际传输效率,容错机制也没有加入。因此总的来说还有很大的提升和优化空间。

虽然如此,这种方式对于 Crawlab 的低并发远程通信来说是足够的了,在实际使用中也没有出现问题,非常稳定。对于隐秘性有要求、希望不暴露地址信息的开发者,我们也推荐将该种方式在实际应用中尝试。

参考

4735 次点击
所在节点    程序员
34 条回复
xiaofan2
2020-03-17 09:48:32 +08:00
想法不错 不过适用范围有限
tikazyq
2020-03-17 10:05:34 +08:00
@xiaofan2 确实,不过在我们的 scenario 里还是可行的
jelipo
2020-03-17 10:16:20 +08:00
原理还是使用队列实现,为什么不直接用现成的队列中间件
dapang1221
2020-03-17 10:18:35 +08:00
所以就是给 redis 套了个壳再封装一次吗…
bbao
2020-03-17 10:20:07 +08:00
看了标题,瞬间拉倒评论;

楼主 GRPC 不香么?要自己造轮子
chendy
2020-03-17 10:34:02 +08:00
基于 Redis 的 RPC ???
tikazyq
2020-03-17 10:45:42 +08:00
@jelipo 项目中直接用了 Redis,不想再多 kafka 之类的其他组建,加重框架
tikazyq
2020-03-17 10:45:56 +08:00
@dapang1221 有不套壳的实现?
tikazyq
2020-03-17 10:46:21 +08:00
@bbao 主要是 grpc 不是很适合现在的分布式架构,不想暴露地址信息
tikazyq
2020-03-17 10:46:40 +08:00
@chendy 14 年就有老外这么做了
gowk
2020-03-17 10:51:52 +08:00
图画的不错,我想知道用什么工具画的
tikazyq
2020-03-17 10:52:34 +08:00
@gowk processon
ayavvv
2020-03-17 10:59:24 +08:00
基于 redis 套壳做了消息队列,完了又基于消息队列做了 rpc。。。老哥你真的是人才。。
tikazyq
2020-03-17 11:12:22 +08:00
@ayavvv 请教下问题主要出现在哪儿?
index90
2020-03-17 11:17:12 +08:00
这种设计是有价值的,把 client 和 server 当作是一个节点上的 reciver 和 sender,就实现了一个并行处理服务了。利用多进程代替了多线程,从而可以方便进行调度管理。微信的后台就有类似这样的实现
pkwenda
2020-03-17 12:07:54 +08:00
思路不错,可以举一反三
myzWILLmake
2020-03-17 12:59:27 +08:00
前公司的游戏服务端消息队列+rpc 就是这么实现的,实际用下来还可以。
tikazyq
2020-03-17 13:02:13 +08:00
@myzWILLmake 也是用 Redis 么,性能怎么样
kaneg
2020-03-17 13:19:49 +08:00
最近做了一个项目需要解决两个服务之间相互调用的问题。服务之间互相都不知道对方的地址,唯一有联系的是它们都连同一个 Kafka。最后的方案也是在 Kafka 上套了个 rpc 的壳。
如果不看实际场景,这种方案是挺奇怪的。但是它给我们的项目在不可能完成的的情况下创造了一种可能性。
iyaozhen
2020-03-17 13:20:30 +08:00
你这感觉实际有很多问题

1. 用队列没有常规 rpc 调用响应快
2. redis 容易单点故障,而且一旦服务异常,redis 内存就爆了(亲身经历)
3. 消息序列化使用 json 性能也不好,最好用 pb。json 另外一个坏处就是格式是松散的,没有严格限制,开发效率也不高,没有代码提示

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/653446

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX