一个可以在 PHP -FPM 环境来并发访问 HTTP 接口的工具类

2021-04-26 20:13:20 +08:00
 sun2920989

https://github.com/CodeApePro/TcpMockHttp

这并不是一个开源的项目,只是个人在使用的一段工具类代码.

原理是通过 sockets 扩展创建 tcp 连接,在 tcp 连接上传输符合 http1.1 协议的数据,从而将发送数据与读取结果分开.

发出来以供遇到类似场景时参考.

2301 次点击
所在节点    PHP
45 条回复
sun2920989
2021-05-08 18:31:08 +08:00
@sxbxjhwm #20 好的.
sun2920989
2021-05-08 18:45:46 +08:00
@sxbxjhwm #20 另外,在 curl_multi_select 返回值为-1 时,建议主动增加延迟.在某些情况下可以防止 cpu 使用率异常过高的问题.
sun2920989
2021-05-08 19:00:32 +08:00
@sxbxjhwm #20 一个可能可行的方案是,在一个 curl_multi 实例中只添加一个 curl 句柄,发送请求时执行 curl_multi_init,curl_init,curl_multi_add_handle 和 curl_multi_exec,读取结果时执行 curl_multi_select 和 curl_multi_getcontent 及关闭连接.也可以基本实现类似于 tcp 连接时将发送和读取分开的效果.经过包装后也可以实现我示例代码的效果.可是这样对于每一次请求,都需要一个 curl 实例和一个 curl_multi 实例.我觉得这不是一个好的思路.除此之外,我没有其他的思路.
sun2920989
2021-05-08 19:43:59 +08:00
已实测 23 楼的想法实际性能很差.
sun2920989
2021-05-08 20:18:43 +08:00
更换了一个思路,使用全局唯一的 curl_multi_init,每次请求时创建 curl_init 并添加,然后执行一个循环的 curl_multi_exec,获取数据时执行 curl_multi_select 和 curl_multi_getcontent.整体性能和我使用 tcp 连接基本差不多.但是有两个问题,一是此时等待数据时针对某一个变量进行等待其实已经没有意义,需要等待所有结果返回时第一个 wait 才能返回,然后后面的 wait 就是立即返回.二是此时没有很好的方式来控制并发数,因为我无法在一个 curl_init 中不获取返回值的情况下再传输一次 http 请求数据,所以此时如果判断超过了并发限制,只能报错,无法自动处理.而使用 tcp 时由于 http1.1 的管道,我可以不读取第一次的响应结果直接再次发送第二个请求,然后按照顺序来获取每一个响应.
sun2920989
2021-05-08 20:57:14 +08:00
找到一个参数,CURLMOPT_PIPELINING,也许有一些作用,周一尝试一下。感谢您的回复,让我对 curl 有了更多的理解。
sxbxjhwm
2021-05-10 02:26:47 +08:00
@sun2920989 https://gist.github.com/jshensh/b7f05cc2e1ebf600a1eedbf4a30bc346

我这边比较偷懒是直接在原来 Multi 类的基础上修改的,而且因为我原先的设计返回数据的 key 需要与传入的 key 对应,用了 next 直接操作数组内部指针,生怕追加任务指针会被 reset,所以就没有给 class 再加 push 方法,理论上后续再加,再次执行 exec 都是没问题的。我这边输出的结果很明显的展示了在执行 curl_multi_exec 后进行任意操作都不会影响已发出的请求,传入 callback 也可以预处理每一个返回值,至于并发控制这块,因为需要达到精准控制数量所以只能写在 while 段里所以不能很好的避免阻塞:

root@debian:~/php-curl-class# php test.php
Start: 1620584188.6943
curl_multi_exec: 1620584188.6959
sleep: 1620584191.6963
test1 received data at: 1620584193.697
test3 received data at: 1620584193.6971
test4 received data at: 1620584193.6979
test2 received data at: 1620584193.6989
test5 received data at: 1620584198.6994
test6 received data at: 1620584198.6994
test7 received data at: 1620584198.7006
test8 received data at: 1620584198.7009
array(8) {
["test1"]=>
string(40) "strlen: 259 data: float(1620584193.6961)"
["test3"]=>
string(40) "strlen: 259 data: float(1620584193.6966)"
["test4"]=>
string(40) "strlen: 259 data: float(1620584193.6969)"
["test2"]=>
string(40) "strlen: 259 data: float(1620584193.6965)"
["test5"]=>
string(40) "strlen: 259 data: float(1620584198.6985)"
["test6"]=>
string(39) "strlen: 258 data: float(1620584198.699)"
["test7"]=>
string(40) "strlen: 259 data: float(1620584198.6994)"
["test8"]=>
string(37) "strlen: 256 data: float(1620584198.7)"
}
Done: 1620584198.7015
sun2920989
2021-05-10 09:00:59 +08:00
@sxbxjhwm 好的 感谢。我参考一下。
sxbxjhwm
2021-05-10 09:04:45 +08:00
@sun2920989 curl_multi_exec 输出之前都可以抽出来作为 push 方法的内容,sleep 那部分是假装你执行自己需要的功能,sleep 之后是真正的 exec
sun2920989
2021-05-10 09:09:35 +08:00
@sxbxjhwm 嗯看到了。问题还是那个,如果请求不是一起添加进去的,而是一个一个添加的话,程序将要么在超过并发限制时报错,要么在这时阻塞。没有其他的好办法。虽然 http1.1 的管道是一个建议最好不要使用或者少使用的特性,但是确实可以解决我遇到的问题。
sxbxjhwm
2021-05-10 09:53:53 +08:00
@sun2920989 如果是我处理这样的需求,可能会选择直接用 cli 起个 daemon 进程,用 redis 负责队列,cli 自己从 redis 取任务 + 控制并发,这样通用性更强一些
sun2920989
2021-05-10 10:09:56 +08:00
@sxbxjhwm #31 我这边需要在 fpm 环境来实现这些效果.加速一些页面的打开效率.所以不方便使用后台脚本的方式.除非再做一次 fpm 进程与后台脚本之间的信息交互,总之还是非常感谢您的持续回复,极大的拓宽了我对 curl_multi 的了解.
sxbxjhwm
2021-05-10 10:21:43 +08:00
@sun2920989 阿我原先还以为你做的是一个采集爬虫,没想到是 fpm 。。curl_multi_exec 这东西其实本身是异步的,只不过你需要在同步的 php 里调用只能在适当的地方写阻塞,其实不光是 curl,其他类似的队列要实现并发控制也得用阻塞的方法来控制,php 不像 node 一类的语言,可以通过触发事件来实现完全的异步,这个是最大的问题
sun2920989
2021-05-10 10:30:50 +08:00
@sxbxjhwm #33 是的,php-fpm 本身是同步的,总有一个地方要阻塞来获取网络请求的结果.
sxbxjhwm
2021-05-10 10:43:29 +08:00
@sun2920989 之前做过一个 cli 的爬虫,当时是用 swoole 解决的。做法就是专门起一个负责 task 的进程,用 \Swoole\Server 监听一个 sock 文件,具体文档 https://wiki.swoole.com/#/consts?id=socket-%e7%b1%bb%e5%9e%8b 。做的时候有些偷懒没有用 redis,而是直接用 \Swoole\Table 做基础的共享数据存储,sock 负责队列控制。虽然用 swoole 实现在语法上很好看,用起来却比 curl_multi_exec 要麻烦得多,后续再也没有用过这个方案,在这边讨论这个方案稍微有点超纲,算是之前踩过的坑提醒一下,附一个当时写的部分源码:

```php
protected function execute(Input $input, Output $output)
{
/**
* $initDataTable 基础配置表,用于实例化爬虫类
* |------------------------------------|
* | key | value |
* |------------------------------------|
* | service | service |
* | baseUrl | https:// |
* | cookieJar | json_encode(cookieJar) |
* |------------------------------------|
*/
$initDataTable = new \Swoole\Table(4);
$initDataTable->column('value', \Swoole\Table::TYPE_STRING, 512);
$initDataTable->create();

/**
* $acquiredListTable 记录已由 getList 方法执行过的操作
* |------------------------------|
* | key | value |
* |------------------------------|
* | getDatacenters | 0 |
* | getIpsGroupList | 0 |
* | getIpsList | 0 |
* | getSwitchList | 0 |
* | getHardwareModelList | 0 |
* | getPurchaseList | 0 |
* | getServerList | 0 |
* | getHardwareList | 0 |
* |------------------------------|
*/
$acquiredListTable = new \Swoole\Table(16);
$acquiredListTable->column('count', \Swoole\Table::TYPE_INT, 1);
$acquiredListTable->create();

$workingAtomic = new \Swoole\Atomic();
$successedAtomic = new \Swoole\Atomic();
$failedAtomic = new \Swoole\Atomic();

$serv = new \Swoole\Server(Env::get('runtime_path') . 'task.sock', 0, SWOOLE_PROCESS, SWOOLE_SOCK_UNIX_STREAM);
$serv->table = ['initData' => $initDataTable, 'acquiredList' => $acquiredListTable];
$serv->atomic = ['working' => $workingAtomic, 'successed' => $successedAtomic, 'failed' => $failedAtomic];

$serv->set(array('task_worker_num' => 15));

$serv->on('receive', function($serv, $fd, $from_id, $data) use ($output) {
$this->receive($serv, $fd, $from_id, $data, $output);
});

$serv->on('task', function ($serv, $task_id, $from_id, $data) use ($output) {
$this->task($serv, $task_id, $from_id, $data, $output);
});

$serv->on('finish', function ($serv, $task_id, $data) use ($output) {
$this->finish($serv, $task_id, $data, $output);
});

$serv->start();
}
```
sun2920989
2021-05-10 11:00:17 +08:00
@sxbxjhwm #35 好的.我了解一下.
sun2920989
2021-05-10 11:15:09 +08:00
@sxbxjhwm #35 找到三个参数,按照 libcurl 文档的描述,搭配使用可以实现通过 http1.1 管道来控制并发的效果.CURLMOPT_PIPELINING,CURLMOPT_MAX_HOST_CONNECTIONS,CURLMOPT_MAX_PIPELINE_LENGTH.但是这三个参数如果搭配使用对 curl 的版本有着严格的要求,需要大于等于 7.30 小于 7.62.我这边版本和线上一致不符合这个区间.所以没有测试出全部效果.不过设置 CURLMOPT_PIPELINING 为 1 来启用管道是可以实现的.
sun2920989
2021-05-10 11:17:32 +08:00
@sxbxjhwm #35 测试时还发现一个有意思的事情,在设置了启用管道时,我的倾向是期望在没有超过并发数之前尽可能使用新的连接,虽然建连也有一点损耗但是由于获取数据比较慢,所以也会比复用已有连接更快.但是 curl 库的默认方式似乎是优先尽可能复用已有的链接,直到一条连接上传输的请求超过阈值之后,才会去开一个新的连接.鉴于这个倾向的不同,所以即使我的 curl 版本符合这个区间,也不是我期望的效果.
sxbxjhwm
2021-05-10 11:48:37 +08:00
@sun2920989 我其实有一点不理解,既然是同步的 fpm,为什么不等到获取数据后统一再处理呢,效率上没差的
sun2920989
2021-05-10 13:16:52 +08:00
@sxbxjhwm 主要是使我的工具类可以在项目内尽量通用,尽量复用现有逻辑。而不是将每个地方都做很多修改。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/773442

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX