高并发如何保证微信 access_token 的有效

2016-09-23 09:45:47 +08:00
 swfbarhr

本文源地址:高并发如何保证微信 access_token 的有效,求 star

前言

消失了快 2 个月,俺又回来了。最近换比较忙,好久没写博客,但是学习的脚步一直没停下。前段时间在cnode上看到一个关于微信 access_token 的问题:高并发如何保证微信 token 的有效。其中本人也在上面回复了一下,但是觉得解决方案还是不够好,于是就有了本篇:本文主要以渐进的方式,来一步一步解决高并发情况下 access_token 的获取与保存。

前提条件

由于本文讨论是基于微信公众平台开发展开的,所以如果对微信公众平台开发不熟悉的同学可以先去看下微信公众平台的开发文档

需要解决的问题

本文讨论的其实是 access_token 获取与保存在高并发情况下的边界问题:

(单进程模式下)思路

首先我们来讨论,如何解决单进程模式下高并发遇到的问题。

具体实现代码:

var Emitter = require('events').Emitter;
var util = require('util');

var access_token, flag;

function TokenEmitter() {
  Emitter.call(this);
}
util.inherits(TokenEmitter, Emitter);

myEmitter = new TokenEmitter();
// 消除警告
myEmitter.setMaxListeners(0);

function getAccessToken(appID, appSecret, callback) {
  // 将 callback 缓存到事件队列中,等待触发
  myEmitter.once('token', callback);

  // 判断 access_token 是否过期
  if (!isValid(access_token) && !flag) {
    // 标记已经向微信服务器发送获取 access_token 的请求
    flag = true;

    // 向微信服务器请求新的 access_token
    requestForAccessToken(appID, appSecret, function(err, newToken) {
      if (err) {
        // 通知出错
        return myEmitter.emit('token', err);
      }

      // 更新 access_token
      access_token = newToken;
      // 触发所有监听回调函数
      myEmitter.emit('token', null, newToken.access_token);
      // 还原标记
      flag = false;
    });
  } else {
    process.nextTick(function(){
      callback(null, access_token.access_token);
    });
  }
}

以上代码主要的思路就是利用, node 自带的事件监听器,也就是代码中的'myEmitter.once()'方法,在 access_token 失效的情况下把所有调用的回调方法添加为'token'事件监听函数。并且只有第一个调用者可以去更新 access_token 的值(主要用 flag 来控制)。当获得新的 access_token 后,以新 access_token 为参数,去触发'token'事件。此时,所有监听了'token'事件的函数都会被调用,也就是说,所有调用者的回调函数都会被调用。这样,我们就实现了高并发情况下,防止 access_token 被多次更新的问题,也就是解决了问题 1 。

(多进程模式下)思路

解决了单进程模式下的问题,可以说我们多进程问题也解决了一部分。在多进程模式下,我们的主题思路还是与单进程一直,将调用缓存到事件队列中。但是,多进程的各个进程是不共享内存的,所以我们的 access_token 和 flag 标记不可以存储在变量中,因此需要引入外部存储: redis 。使用 redis 作为外部存储有以下几个原因:

统一存储 access_token

这一点大家都应该没什么疑问, access_token 统一存储的好处就是不需要面对复杂的进程见通信。

锁媒介

当我们标记“正在请求微信服务器”的 flag 标志不可以放在代码的变量中时,那就要寻求代码之外的解决方法,其实我们可以存在 mongodb 、 mysql 等等可以存储的媒介中,甚至可以存放在文本文件中。但是为了保证速度,我还是考虑将其存放在速度更快的 redis 中。

redis 发布订阅功能

当然,如果我们的程序使用的是 node 的 cluster 模块开启的多进程模式,进程间通信还是相对容易一些:每个 worker 都可以向 master 发送 message ,利用这一点把 master 当做中心,来交换数据。但是如果我们是使用 pm2 开启了多实例, pm2 虽然提供了实例间通信的 API ,但是使用起来各种不顺畅,最终选择 redis 来作为各个实例接受通知的发起方。

以上思路的实现代码大致如下:

1.第一步需要做的就是判断 access_token 是否过期(为了方便起见,直接用 appID + appSecret 作为存储 access_token 的键):从 redis 获取键为 appID + appSecret 的内容,因为我们在设置 access_token 时,是将其设为了过期键(设置过程涉及到锁,将在之后给出),所以只要能取到值,就说明 access_token 没有过期。代码如下:

function isValid(appID, appSecret, callback) {
  redis.get(appID + appSecret, function(err, token) {
    if (err) {
      return callback(err);
    }

    // 可以取到值
    if (tokenInfo) {
      return callback(null, token);
    }

    // 未取到值
    callback(null);
  });
}

2.如果在第一步的判断中,我们得出结论: access_token 已经过期,那么我们需要做的下一步就是设置一个代码级别的锁,防止之后的程序访问之后的代码:

function aquireLock(callback) {
  redis.setnx('lock', callback);
}

function releaseLock(callback) {
  redis.del('lock', callback);
}

这 2 个函数,一个用于设置锁,一个用于释放锁。我们设置锁是利用了 redis 的 setnx 命令原理: setnx 只可以设置不存在的 key ,即使同一时间有多个 setnx 命令来设置同一个 key ,最终只有一个客户端可以成功设置'lock'键,也就是说只有一个请求获得了锁的权限。这样就控制了并发产生的问题。

3.最后我们将所有程序写入主函数中:

function getAccessToken(appID, appSecret, callback) {
  // 将 callback 缓存到事件队列中,等待触发
  myEmitter.once('token', callback);

  // 处理订阅消息
  subscribe.on('message', (channel, message) => {
    switch (channel) {
      case 'new_token':
        myEmitter.emit('token', null, message);
        break;
      case 'new_token_err':
        myEmitter.emit('token', new Error(message));
        break;
      default:
        break;
    }
  });

  // 判断 access_token 是否过期
  isValid(appID, appSecret, function(err, token) {
    // 出错
    if (err) {
      return myEmitter.emit('token', err);
    }

    // token 正常
    if (token) {
      return myEmitter.emit('token', null, token.access_token);
    }

    // token 已过期,获取锁
    aquireLock(function(err, result) {
      // 如果获取锁成功,则开始更新 access_token ,如果未得到锁,等待'token'触发
      if (result) {
        // 向微信服务器请求新的 access_token
        requestForAccessToken(appID, appSecret, function(err, newToken) {
          if (err) {
            // 释放锁标记
            releaseLock();
            // 通知出错
            return myEmitter.emit('token', err);
          }

          // 更新 access_token ,将新的 access_token 保存到 redis ,并且提前 5 分钟过期
          redis.setex(appID + appSecret, (newToken.expires_in - 300), newToken.access_token);
          // 发布更新
          publish.publish('new_token', newToken.access_token);
          // 释放锁标记
          releaseLock();
        });
      }
    });

    // 订阅
    subscribe.subscribe('new_token');
  });
}

进一步思考

到此,一个简单多进程控制 access_token 并发的解决方法已经呈现在眼前,但是我们还需要考虑一下边界情况:

function aquireLock(callback) {
  redis.watch('lock');
  redis.multi().setnx('lock').expire('lock', 2).exec(callback);
}

由于设置锁和设置锁的过期时间需要同一时间完成,所以这里我使用了 redis 的事务来保证了原子性。

更进一步的思考

虽然我们解决了锁问题,但是此时所有未获得锁的请求还处于 pending 状态,等待着 access_token 的到来,但是由于获得锁的请求已经走在天堂的路上,已经无法再来给其他这些个请求触发事件了。所以为了解决此类问题,我们需要引入另一个超时,那就是函数调用超时,在一定时间内未完成的话,我们就回调超时错误给调用者:

function getAccessToken(appID, appSecret, callback) {
  // 将 callback 缓存到事件队列中,等待触发
  myEmitter.once('token', callback);

  // 设置函数调用超时
  setTimeout(function () {
    callback(null, new Error('time out'));
  }, 2000);
  
  // ...
}

总结

其实在使用 redis 的订阅功能之前,我还考虑过tjaxon作为进程通信的手段,但是由于 axon 初始化过程有一定的延迟,不符合我的预期,所以放弃了。但是不得不说 axon 是一个非常好的项目,有条件的话可以用在项目当中。好了,以上就是我对高并发下处理 access_token 的一些自己的看法。

24564 次点击
所在节点    Node.js
54 条回复
marvinwilliam
2016-09-23 13:25:43 +08:00
每 7000 秒刷新一次 access_token,为什么一定要等过期了才去刷新啊,不能提前刷新么?
pubby
2016-09-23 13:46:32 +08:00
@faceair 是的,应用服务器得到的超时要提前十秒。而 token 服务器会提前 1 分钟更新 token 。所以有 50 秒的时间窗口去更新 token
goofansu
2016-09-23 13:47:20 +08:00
起一个服务去定时刷新到 redis 不就行了
swfbarhr
2016-09-23 13:50:56 +08:00
@marvinwilliam 首先我认为定时服务去刷新 access_token 没有问题,我也承认这是最简单的解决方法。但是我要讨论的是边界性问题,也就是考虑到各种意外情况,在程序可控的范围内最大限度的去保证 API 的可用性。如你所说, 7000 秒刷新一次就 OK ,但是想想,谁又能保证刷新程序就一定能长远的运行呢?我这边的前提其实是如果我们刷新服务不可用的情况下,如何还能保证期间的请求可以正确的执行。但是如果 PM 或者用户能接受可能出现的一段时间的服务不可用,其实使用刷新服务就已经满足需求了。
reus
2016-09-23 14:43:22 +08:00
@swfbarhr 我们是 3600 秒,出现问题的话,有一个小时的时间可用于处理,处理故障期间,服务还是可用的, token 还未过期。服务都用 systemd 管理,退出了就自动重启,一直出错就报警通知技术人员。服务不可用这种情况,除非是微信方面的问题,否则不可能出现。就一个 GET 请求,实现复杂度基本为零,可能出问题的只是微信服务器那边。

文档里说的做法已经是最佳的了:
1 、为了保密 appsecrect ,第三方需要一个 access_token 获取和刷新的中控服务器。而其他业务逻辑服务器所使用的 access_token 均来自于该中控服务器,不应该各自去刷新,否则会造成 access_token 覆盖而影响业务;

根本就没有什么并发的事情,你这就是“各自去刷新”,所以才需要锁之类。
如果认为 3600 秒还不够,甚至可以减少到几百秒。 access token 每日限额是 2000 ,你可以算算间隔可以到多少。
这事其实很简单,不用搞得太复杂,用分布式锁之类的,根本就是降低可用性。
dwood
2016-09-23 14:43:29 +08:00
这么细心地题主写出来的程序一定是没有 bug 的。。。。
swfbarhr
2016-09-23 14:58:46 +08:00
@dwood 我也不是说我写的东西没有问题,我只是认为我们可以做的,代价也不是很大,那为什么不做一下呢?
swfbarhr
2016-09-23 15:05:25 +08:00
@reus 你说的没有错,可能是我考虑的太多了,我是假设刷新服务不可用的情况下。但是生产环境中可能会出现各种不可预期的问题,做好 2 手防备岂不是更好?
xiaolongyuan
2016-09-23 15:22:15 +08:00
@swfbarhr 万一 redis 挂了呢
swfbarhr
2016-09-23 15:23:15 +08:00
@xiaolongyuan 哈哈,那就真没得玩了
magicdawn
2016-09-23 16:12:23 +08:00
function aquireLock(callback) {
redis.watch('lock');
redis.multi().setnx('lock').expire('lock', 2).exec(callback);
}


setnx lock
expire lock 2

1. redis 事务, 执行出错的话, 还是会继续执行
2. setnx exists-key value, 不会出错, 结果是 0
3. 导致一直 exipre 2s

个人愚见, 不对请指正!
AlexaZhou
2016-09-23 16:12:52 +08:00
微信推荐通过一个中央服务器来刷新 Token , 也就是控制在一个地方刷新,避免了并发的问题

Ps: 我写了个 TokenBoy 专门用来解决这个问题, 直接拿去用就行 , 见 Github
marvinwilliam
2016-09-23 17:11:15 +08:00
@swfbarhr 那就让集群的节点刷新喽
缓存中保存一个对象,包含 access_token,过期时间,刷新锁
当有节点请求并判断快到过期阈值时,设置刷新锁为 true,其他节点正常访问,但是不能刷新,这个节点获取新的 access_token 后写回缓存,重置过期时间和刷新锁
swfbarhr
2016-09-23 17:12:13 +08:00
@magicdawn 貌似这边是有问题,折衷的方法只能是在 setnx 的回调里面去设置过期时间
有一点就是,如果 redis 事务抛处异常,那么事务不会继续执行下去( redis 事务是保证原子性的),同时感谢你指出我的错误
function aquireLock(callback) {
redis.setnx('lock', function(err, result){
// 处理 err...

if(result > 0){
// 设置超时
redis.expire('lock', 2, function(err, result){
// 处理 err...

// 回调成功
});
}

// 设置未成功...
});
}
magicdawn
2016-09-23 17:17:53 +08:00
在应用层去判断 setnx 结果, 然后去 expire 我觉得没有问题, 不会那么巧执行了 setnx / 然后 expire 没吧
magicdawn
2016-09-23 17:18:55 +08:00
有 lua script

--[[
setnxAndExpire
]]

-- get args
local key, value, expire = KEYS[1], ARGV[1], ARGV[2]

-- sennx
local nxresult = redis.call('SETNX', key, value)

-- expire
if nxresult == 1 then
redis.call('EXPIRE', key, expire)
end

-- return
return nxresult


redis.defineCommand('setnx_and_expire', {
lua,
numberOfKeys: 1,
});
magicdawn
2016-09-23 17:20:27 +08:00
之前用过

let val = yield redis.incr key
if( val = 1) {
redis.expire key timeout
}

细想下来也没啥问题, 不会那么巧
swfbarhr
2016-09-23 17:25:50 +08:00
@magicdawn 放在 lua 里面肯定再好不过了,毕竟刻意直接在 redis 玩
qiukun
2016-09-23 20:54:37 +08:00
@reus amazon 倒是不让直接暴露数据库
tairan2006
2016-09-23 21:49:43 +08:00
一个分布式锁也能写这么多…而且文档里也有最佳方案啊,其实更简单的方法是在 redis 机器跑 crontab ,一个小时更新一次什么的,简单暴力。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/308349

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX