webman内核分析一:启动流程与select系统调用

Posted by

一、写在前面

21年的时候有机会第一次接触了webman,初次接触这种模式的框架时,感觉还是挺新颖的。目前已经使用了1年多,感觉挺不错的。之前在webman的论坛看到了第三方项目中对PHPForker的介绍,于是有机会学习了一下,并重新回过头来看webman的启动流程,并理解下关于Linux下select调用的模式。

二、说明

以下流程图是个人理解,有不对的地方请指出来,我将修正它。
同时声明下该流程的几个点

  1. 该流程核心关注接收并响应请求的流程,针对定时器部分暂未列出
  2. 默认处理流程涉及的系统为Linux
  3. 涉及信号的部分未列出
  4. 监听类型为tcp

三、流程图

如果图片查看不完整的话,看这里img

webman启动流程图

四、一个简单的Server服务

顺带上Timer

<?php
declare(strict_types=1);

namespace Stream;

class Timer
{
    /**
     * @var SplPriorityQueue
     */
    private static SplPriorityQueue $queue;

    public function __construct()
    {
        // 初始化优先队列
        self::$queue = new SplPriorityQueue();
        // 定义extra
        self::$queue->setExtractFlags(SplPriorityQueue::EXTR_BOTH);
    }

    public function init(): void
    {
        // 注册信号
        pcntl_signal(SIGALRM, [$this, 'handler'], false);
    }


    /**
     * @param callable $func 回调函数
     * @param float $interval
     * @return void
     */
    public function addTimer(callable $func, float $interval): void
    {
        // 如果不存在任务,则创建一个信号
        if (self::$queue->count() === 0) {
            pcntl_alarm(1);
        }
        $now = hrtime(true) / 1e-9;
        $nextRunTime = $now + $interval;
        self::$queue->insert(
            [
                'interval' => $interval,
                'func' => $func
            ],
            -$nextRunTime
        );
        $count = self::$queue->count();
        printf('queue count:%d' . PHP_EOL, $count);
    }

    public function handler(): void
    {
        pcntl_alarm(1);
        $this->tick();
    }

    public function tick(): void
    {
        $count = self::$queue->count();
        $now = hrtime(true) / 1e-9;
        while ($count--) {
            $data = self::$queue->top();
            $runTime = -$data['priority'];
            if ($runTime <= $now) {
                self::$queue->extract();
                call_user_func($data['data']['func']);
                $this->addTimer($data['data']['func'], $data['data']['interval']);
            }
        }
    }

}

class Server
{
    private string $server = 'tcp://127.0.0.1:5501';

    public function listenSelect(): void
    {
        $mainSocket = stream_socket_server($this->server, $errorCode, $errorMsg);

        stream_set_blocking($mainSocket, false);

        $read = [];
        $write = $except = null;

        $read[$this->server] = $mainSocket;

        while (true) {
		    // 调用等待信号的处理器,Timer部分
            pcntl_signal_dispatch();

            $tmpRead = $read;
            $tmpWrite = $write;

            try {
                $select = stream_select($tmpRead, $tmpWrite, $except, 1, 0);
            } catch (\Throwable $e) {
                printf($e->getMessage() . PHP_EOL);
                continue;
            }

            if ($select === false) {
                continue;
            }

            foreach ($tmpRead as $sid => $socket) {
                if ($socket === $mainSocket) {
                    // 说明有新的链接进入
                    $newSocket = stream_socket_accept($socket, 0, $newSocketPeer);
                    if ($newSocket === false) {
                        print '接受connection失败:' . $newSocketPeer . PHP_EOL;
                        continue;
                    }
                    $socketAddress = 'tcp://' . stream_socket_get_name($newSocket, true);
                    $read[$socketAddress] = $newSocket;
                    print '接受connection成功:' . $newSocketPeer . PHP_EOL;
                }
                else {
                    // 从客户端读取数据,如何确定是当前这个client发来的数据?
                    // 因为select拿到数据,说明一定是有新的数据被读到(返回的read是有新数据到达的socket)
                    $msg = fread($socket, 65535);
                    if ($msg === '' || $msg === false) {
                        // 移除read
                        foreach ($read as $k => $v) {
                            if ($socket === $v) {
                                unset($read[$k]);
                            }
                        }
                        fclose($socket);
                        print '关闭connection:' . $sid . PHP_EOL;
                    } else {
                        // 打印信息,并写回
                        print '收到connection消息:' . $msg . PHP_EOL;
                        if (in_array($socket, $read, true)) {
                            fwrite($socket, '已收到信息,' . date('H:i:s') . PHP_EOL);
                        }
                    }
                }
            }

//            usleep(100000);
        }

    }
}

$timer = new Timer();
$timer->addTimer(
    static function() {
        $time = time();
        printf('this is timer: hello world: %d' . PHP_EOL, $time);
    },
    1
);

$timer->init();
(new Server())->listenSelect();

启动后,通过telnet连接该地址

启动Server
发起连接
接收到请求,同时写入数据

四、中途遇到的几个问题

4.1、关于stream_select一直返回

起初在本地测试的过程中,针对stream_select(&$read, &$write, &$except, $tv_sec, $tv_usec),我学着的PHPForker的做法,将新连接进来的socket不仅放入$read中,同时也放入$write中,这样一旦启动之后新连接进入后,原本的stream_select应该阻塞直到超时的,却并没有被阻塞。通过debug调试发现,这时每次select都有write返回,也就是该新加进去的socket。

这里起初,我针对stream_select的什么时候返回的理解,除了超时场景下返回外,另外的就是监听的$read、$write、$except的文件描述符有新变化才返回。所以当第一次将把主动监听的socket放进到read时(这里后面统一称为mainSocket),如果此时有客户端连接进来,那么下一次在select监听的时候,就会发现read中的mainSocket有就绪的状态,所以这时我们将受理连接,也就是通过stream_socket_accept去接收,得到一个客户端的socket。如果此时将该socket不仅放入$read去监听,同时也放到$write去监听,那接下来每次select的时候,都会立即返回。为什么呢?

后面我翻到了有其他人也有同样的疑惑https://www.workerman.net/q/1307,看到这个我才大概明白。前面放进去的$write一直处于可写状态,所以每次select都能拿到。

4.2、理解Linux下的select、poll、epoll模型

这里找到了几篇解说挺好的文章,可以参考下,它有一个系列PHP socket初探

五、推荐文章

主要是select部分

  1. php socket通信中stream_select方法的理解
  2. stream_select ($read, $write, $except, $timeout ); 函数问题
  3. PHP Socket初探—-先从一个简单的socket服务器开始

Leave a Reply

您的电子邮箱地址不会被公开。 必填项已用*标注