一、写在前面
21年的时候有机会第一次接触了webman,初次接触这种模式的框架时,感觉还是挺新颖的。目前已经使用了1年多,感觉挺不错的。之前在webman的论坛看到了第三方项目中对PHPForker的介绍,于是有机会学习了一下,并重新回过头来看webman的启动流程,并理解下关于Linux下select调用的模式。
二、说明
以下流程图是个人理解,有不对的地方请指出来,我将修正它。
同时声明下该流程的几个点
- 该流程核心关注接收并响应请求的流程,针对定时器部分暂未列出
- 默认处理流程涉及的系统为Linux
- 涉及信号的部分未列出
- 监听类型为tcp
三、流程图
如果图片查看不完整的话,看这里img
四、一个简单的Server服务
顺带上Timer
<?php
declare(strict_types=1);
namespace Stream;
class Timer
{
/**
* @var SplPriorityQueue
*/
private static SplPriorityQueue $queue;
public function __construct()
{
// 初始化优先队列
self::$queue = new SplPriorityQueue();
// 定义extra
self::$queue->setExtractFlags(SplPriorityQueue::EXTR_BOTH);
}
public function init(): void
{
// 注册信号
pcntl_signal(SIGALRM, [$this, 'handler'], false);
}
/**
* @param callable $func 回调函数
* @param float $interval
* @return void
*/
public function addTimer(callable $func, float $interval): void
{
// 如果不存在任务,则创建一个信号
if (self::$queue->count() === 0) {
pcntl_alarm(1);
}
$now = hrtime(true) / 1e-9;
$nextRunTime = $now + $interval;
self::$queue->insert(
[
'interval' => $interval,
'func' => $func
],
-$nextRunTime
);
$count = self::$queue->count();
printf('queue count:%d' . PHP_EOL, $count);
}
public function handler(): void
{
pcntl_alarm(1);
$this->tick();
}
public function tick(): void
{
$count = self::$queue->count();
$now = hrtime(true) / 1e-9;
while ($count--) {
$data = self::$queue->top();
$runTime = -$data['priority'];
if ($runTime <= $now) {
self::$queue->extract();
call_user_func($data['data']['func']);
$this->addTimer($data['data']['func'], $data['data']['interval']);
}
}
}
}
class Server
{
private string $server = 'tcp://127.0.0.1:5501';
public function listenSelect(): void
{
$mainSocket = stream_socket_server($this->server, $errorCode, $errorMsg);
stream_set_blocking($mainSocket, false);
$read = [];
$write = $except = null;
$read[$this->server] = $mainSocket;
while (true) {
// 调用等待信号的处理器,Timer部分
pcntl_signal_dispatch();
$tmpRead = $read;
$tmpWrite = $write;
try {
$select = stream_select($tmpRead, $tmpWrite, $except, 1, 0);
} catch (\Throwable $e) {
printf($e->getMessage() . PHP_EOL);
continue;
}
if ($select === false) {
continue;
}
foreach ($tmpRead as $sid => $socket) {
if ($socket === $mainSocket) {
// 说明有新的链接进入
$newSocket = stream_socket_accept($socket, 0, $newSocketPeer);
if ($newSocket === false) {
print '接受connection失败:' . $newSocketPeer . PHP_EOL;
continue;
}
$socketAddress = 'tcp://' . stream_socket_get_name($newSocket, true);
$read[$socketAddress] = $newSocket;
print '接受connection成功:' . $newSocketPeer . PHP_EOL;
}
else {
// 从客户端读取数据,如何确定是当前这个client发来的数据?
// 因为select拿到数据,说明一定是有新的数据被读到(返回的read是有新数据到达的socket)
$msg = fread($socket, 65535);
if ($msg === '' || $msg === false) {
// 移除read
foreach ($read as $k => $v) {
if ($socket === $v) {
unset($read[$k]);
}
}
fclose($socket);
print '关闭connection:' . $sid . PHP_EOL;
} else {
// 打印信息,并写回
print '收到connection消息:' . $msg . PHP_EOL;
if (in_array($socket, $read, true)) {
fwrite($socket, '已收到信息,' . date('H:i:s') . PHP_EOL);
}
}
}
}
// usleep(100000);
}
}
}
$timer = new Timer();
$timer->addTimer(
static function() {
$time = time();
printf('this is timer: hello world: %d' . PHP_EOL, $time);
},
1
);
$timer->init();
(new Server())->listenSelect();
启动后,通过telnet连接该地址
四、中途遇到的几个问题
4.1、关于stream_select一直返回
起初在本地测试的过程中,针对stream_select(&$read, &$write, &$except, $tv_sec, $tv_usec),我学着的PHPForker的做法,将新连接进来的socket不仅放入$read中,同时也放入$write中,这样一旦启动之后新连接进入后,原本的stream_select应该阻塞直到超时的,却并没有被阻塞。通过debug调试发现,这时每次select都有write返回,也就是该新加进去的socket。
这里起初,我针对stream_select的什么时候返回的理解,除了超时场景下返回外,另外的就是监听的$read、$write、$except的文件描述符有新变化才返回。所以当第一次将把主动监听的socket放进到read时(这里后面统一称为mainSocket),如果此时有客户端连接进来,那么下一次在select监听的时候,就会发现read中的mainSocket有就绪的状态,所以这时我们将受理连接,也就是通过stream_socket_accept去接收,得到一个客户端的socket。如果此时将该socket不仅放入$read去监听,同时也放到$write去监听,那接下来每次select的时候,都会立即返回。为什么呢?
后面我翻到了有其他人也有同样的疑惑https://www.workerman.net/q/1307,看到这个我才大概明白。前面放进去的$write一直处于可写状态,所以每次select都能拿到。
4.2、理解Linux下的select、poll、epoll模型
这里找到了几篇解说挺好的文章,可以参考下,它有一个系列PHP socket初探
五、推荐文章
主要是select部分
- php socket通信中stream_select方法的理解
- stream_select ($read, $write, $except, $timeout ); 函数问题
- PHP Socket初探—-先从一个简单的socket服务器开始