Workerman官方教程学习笔记

发布时间 2023-03-30 16:33:04作者: 白開水

视频教程
文档手册
教程基于 workerman 3.3

基础教程

Worker类的使用

WorkerMan中有两个重要的类Worker与Connection。
worker 对象实际上是一个容器,它可以以特定的协议去监听某个端口。当客户端连接到这个容器监听端口之后,会在这个 worker 容器内部产生一个 connection 对象。
在 WorkerMan 中通过操作这个 connection 对象来完成对客户端发送数据以及接收客户端数据的操作。

<?php

use Workerman\Worker;
use Workerman\Connection\TcpConnection;
use Workerman\Protocols\Http\Request;

require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker('http://0.0.0.0:8686');

$worker->onWorkerStart = function ($worker) {
};
$worker->onConnect = function ($connection) {
    echo "connection success" . PHP_EOL;
};
$worker->onMessage = function (TcpConnection $connection, Request $request) {
    $connection->send("hello");
};
$worker->onClose = function ($connection) {
    echo "connection close" . PHP_EOL;
};
$worker->onWorkerStop = function ($worker) {
};

// 运行worker
Worker::runAll();

image.png

<?php

use Workerman\Worker;
use Workerman\Connection\TcpConnection;

require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker('websocket://0.0.0.0:8686');

$worker->onMessage = function (TcpConnection $connection, $data) {
    $connection->send("hello");
};

// 运行worker
Worker::runAll();

image.png

<?php

use Workerman\Worker;
use Workerman\Connection\TcpConnection;

require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker('tcp://0.0.0.0:80');

$worker->onMessage = function (TcpConnection $connection, $data) {
    $connection->send("hello world");
};

// 运行worker
Worker::runAll();

image.png

<?php

use Workerman\Worker;
use Workerman\Connection\UdpConnection;

require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker('udp://0.0.0.0:80');

$worker->onMessage = function (UdpConnection $connection, $data) {
    $connection->send("hello world");
};

// 运行worker
Worker::runAll();

image.png

Connection类的使用

TcpConnection类和AsyncTcpConnection类。AsyncTcpConnection 类是 TcpConnection 类的子类。
当我们在 Workerman 中需要访问外部的某个服务的时候,通过 AsyncTcpConnection 类异步的发起一个 tcp 连接,去连接远程的服务端,异步通讯。

<?php

use Workerman\Connection\ConnectionInterface;
use Workerman\Worker;

require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker('tcp://0.0.0.0:8686');

$worker->onConnect = function (ConnectionInterface $connection) {
    echo "connection success" . PHP_EOL;
    //类似于IP白名单
    if ($connection->getRemoteIp() !== '127.0.0.1') {
        $connection->close('bad ip');
    }
};
$worker->onMessage = function (ConnectionInterface $connection, $data) {
    $connection->send("hello");
};
$worker->onClose = function ($connection) {
    echo "connection close" . PHP_EOL;
};

// 运行worker
Worker::runAll();
<?php

use Workerman\Connection\AsyncTcpConnection;
use Workerman\Connection\TcpConnection;
use Workerman\Worker;

require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker('http://0.0.0.0:8686');

$worker->onConnect = function (TcpConnection $connection) {
    // 作为客户端去链接其他服务端(代理)
    // 发起异步的 TCP 连接 通过实现 onConnect、onMessage、onClose 实现业务逻辑
    $connectionBaidu = new AsyncTcpConnection('ssl://www.baidu.com:443');
  	// 当百度返回数据后,转发给浏览器客户端
    $connectionBaidu->onMessage = function ($connectionBaidu, $data) use ($connection) {
        $connection->send($data);
    };
  	// 浏览器客户端发来数据后转发给百度
    $connection->onMessage = function ($connection, $data) use ($connectionBaidu) {
        $connectionBaidu->send($data);
    };
  	// 执行连接
    $connectionBaidu->connect();
};

// 运行worker
Worker::runAll();

Timer类的使用

<?php

require_once './vendor/autoload.php';

use Workerman\Worker;
use Workerman\Lib\Timer;

$worker = new Worker('websocket://0.0.0.0:8686');
$worker->onConnect = function ($connection) {
    // 为每个客户端创建10s的账户认证
    Timer::add(10, function () use ($connection) {
        if (!isset($connection->name)) {
            $connection->close('auth timeout and close');
        }
    }, null, false);
};

$worker->onMessage = function ($connection, $data) {
    if (!isset($connection->name)) {
        $data = json_decode($data, true);
        if (!isset($data['name']) || !isset($data['password'])) {
            $connection->close('auth fail and close');
            return;
        }
        //假设此处进行数据库验证
        $connection->name = $data['name'];
        broadcast($connection->name . " login");
        return;
    }
    broadcast($connection->name . " said:" . $data);
};

function broadcast($msg)
{
    global $worker;
    foreach ($worker->connections as $connection) {
        if (!isset($connection->name)) {
            continue;
        }
        $connection->send($msg);
    }
}

Worker::runAll();

image.png

WebServer的使用

Workerman 4.x 版本去掉了 WebServer,推荐使用 webman
image.png
image.png

原理解析

Stream 函数

Worker类

<?php

/**
 * 单进程IO复用select
 * 同时处理多个连接
 * ab -n100000 -c100 -k http://127.0.0.1:1215/
 */
class Worker
{
    // 连接事件回调
    public $onConnect = null;
    // 消息事件回调
    public $onMessage = null;
    // 连接关闭事件回调
    public $onClose = null;
    // 监听端口的socket
    protected $socket = null;
    // 所有socket,包括客户端socket和监听端口socket
    protected $allSockets = [];

    // 构造函数
    public function __construct($address)
    {
        // 创建监听socket
        $this->socket = stream_socket_server($address, $errno, $errstr);
        echo "listen $address\r\n";
        // 设置为非阻塞
        // 阻塞:当读取一个socket时,如果对方没有发来任何数据,这个读取操作会一直卡着,直到这个socket超时
        // 非阻塞:当读取一个socket时,如果对方没有发来任何数据,读取操作会立刻返回,返回空数据
        // 在workerman中不想因为读取某个客户端的连接,而导致整个workerman卡住,所以设置非堵塞
        stream_set_blocking($this->socket, 0);
        // 将监听socket放入allSockets
        $this->allSockets[(int)$this->socket] = $this->socket;
    }

    // 运行
    public function run()
    {
        while (1) {
            // 这里不监听socket可写事件和外带数据可读事件
            $write = $except = null;
            // 监听所有socket可读事件,包括客户端socket和监听端口的socket
            $read = $this->allSockets;
            // stream_select是个IO复用函数,可以监听一个socket集合的可读、可写。
            // 整个程序阻塞在这里,等待$read里面的socket可读,这里$read是个引用参数,也就是说stream_select()返回后$read是会被重新赋值
            // $read值的内容就是所有状态可读的socket集合
            stream_select($read, $write, $except, 60);
            // $read被重新赋值,遍历所有状态为可读的socket
            foreach ($read as $index => $socket) {
                // 如果是监听socket可读,说明有新连接
                if ($socket === $this->socket) {
                    // 通过stream_socket_accept获取新连接(客户端)
                    $new_conn_socket = stream_socket_accept($this->socket);
                    if (!$new_conn_socket) continue;
                    // 如果有onConnect事件回调,则尝试触发
                    if ($this->onConnect) {
                        call_user_func($this->onConnect, $new_conn_socket);
                    }
                    // 将新的客户端连接socket放入allSockets,以便stream_select监听其可读事件
                    $this->allSockets[(int)$new_conn_socket] = $new_conn_socket;
                } else { 
                    // 如果是客户端连接可读,说明对应连接的客户端有数据发来
                    // 读数据
                    $buffer = fread($socket, 65535);
                    // 数据为空,代表连接已经断开
                    if ($buffer === '' || $buffer === false) {
                        // 尝试触发onClose回调
                        if ($this->onClose) {
                            call_user_func($this->onClose, $socket);
                        }
                        fclose($socket);
                        // 从allSockets里删除对应的连接,不再监听这个socket可读事件
                        unset($this->allSockets[(int)$socket]);
                        continue;
                    }
                    // 尝试触发onMessage回调
                    call_user_func($this->onMessage, $socket, $buffer);
                }
            }
        } //end while
    }
}

$server = new Worker('tcp://0.0.0.0:1215');

$server->onConnect = function ($conn) {
    echo "onConnect\n";
};

$server->onMessage = function ($conn, $msg) {
    fwrite($conn, "HTTP/1.1 200 OK\r\nConnection: keep-alive\r\nServer: workerman/1.1.4\r\nContent-length:5\r\n\r\nhello");
};

$server->onClose = function ($conn) {
    echo "onClose\n";
};

$server->run();

image.png

TcpConnection类

<?php

/**
 * 单进程IO复用select
 * 同时处理多个连接
 * ab -n100000 -c100 -k http://127.0.0.1:1215/
 */
class Worker
{
    // 连接事件回调
    public $onConnect = null;
    // 消息事件回调
    public $onMessage = null;
    // 连接关闭事件回调
    public $onClose = null;
    // 监听端口的socket
    protected $socket = null;
    // 所有socket,包括客户端socket和监听端口socket
    protected $allSockets = [];
    // 所有tcpconnection类的实例,也就是所有客户端的连接对象
    public $connections = [];

    // 构造函数
    public function __construct($address)
    {
        // 创建监听socket
        $this->socket = stream_socket_server($address, $errno, $errstr);
        echo "listen $address\r\n";
        // 设置为非阻塞
        // 阻塞:当读取一个socket时,如果对方没有发来任何数据,这个读取操作会一直卡着,直到这个socket超时
        // 非阻塞:当读取一个socket时,如果对方没有发来任何数据,读取操作会立刻返回,返回空数据
        stream_set_blocking($this->socket, 0);
        // 将监听socket放入allSockets
        $this->allSockets[(int)$this->socket] = $this->socket;
    }

    // 运行
    public function run()
    {
        while (1) {
            // 这里不监听socket可写事件和外带数据可读事件
            $write = $except = null;
            // 监听所有socket可读事件,包括客户端socket和监听端口的socket
            $read = $this->allSockets;
            // stream_select是个IO复用函数。整个程序阻塞在这里,等待$read里面的socket可读,这里$read是个引用参数
            stream_select($read, $write, $except, 60);
            // $read被重新赋值,遍历所有状态为可读的socket
            foreach ($read as $index => $socket) {
                // 如果是监听socket可读,说明有新连接
                if ($socket === $this->socket) {
                    // 通过stream_socket_accept获取新连接
                    $new_conn_socket = stream_socket_accept($this->socket);
                    if (!$new_conn_socket) continue;
                    $conn = new TcpConnection($new_conn_socket);
                    $this->connections[(int)$new_conn_socket] = $conn;
                    // 如果有onConnect事件回调,则尝试触发
                    if ($this->onConnect) {
                        call_user_func($this->onConnect, $conn);
                    }
                    // 将新的客户端连接socket放入allSockets,以便stream_select监听其可读事件
                    $this->allSockets[(int)$new_conn_socket] = $new_conn_socket;
                } else { // 是客户端连接可读,说明对应连接的客户端有数据发来
                    // 读数据
                    $buffer = fread($socket, 65535);
                    // 数据为空,代表连接已经断开
                    if ($buffer === '' || $buffer === false) {
                        // 尝试触发onClose回调
                        if ($this->onClose) {
                            call_user_func($this->onClose, $this->connections[(int)$socket]);
                        }
                        fclose($socket);
                        // 从allSockets里删除对应的连接,不再监听这个socket可读事件
                        unset($this->allSockets[(int)$socket], $this->connections[(int)$socket]);
                        continue;
                    }
                    // 尝试触发onMessage回调
                    call_user_func($this->onMessage, $this->connections[(int)$socket], $buffer);
                }
            }
        } //end while
    }
}

问题:

  • 上述 onConnect、onMessage、onClose 回调中需要使用 php 原始 api 来操作 socket
  • TcpConnection 其实是对这些 socket 的二次封装
<?php

class TcpConnection
{
    protected $_socket = null;

    public function __construct($socket)
    {
        $this->_socket = $socket;
    }

    public function send($buffer)
    {
        //检测socket是否断开
        if (feof($this->_socket)) return false;
        return fwrite($this->_socket, $buffer);
    }
}
<?php

require_once 'Worker.php';
require_once 'TcpConnection.php';

$server = new Worker('tcp://0.0.0.0:1215');

$server->onConnect = function ($conn) {
    $conn->send('input your name: ');
};

$server->onMessage = function ($conn, $msg) {
    if (!isset($conn->name)) {
        $conn->name = trim($msg);
        broadcast("{$conn->name} come");
        return;
    }
    broadcast("{$conn->name} said: $msg");
};

$server->onClose = function ($conn) {
    broadcast("{$conn->name} logout");
};

function broadcast($msg)
{
    global $server;
    foreach ($server->connections as $conn) {
        $conn->send("$msg\r\n");
    }
}

$server->run();

image.png

源码解析

客户端与worker进程的关系
image.png

  • 连接哪个Worker进程由操作系统根据系统的负载情况自动分配

主进程与worker子进程关系
image.png

Workerman                      // workerman内核代码
    ├── Connection                 // socket连接相关
    │   ├── ConnectionInterface.php// socket连接接口
    │   ├── TcpConnection.php      // Tcp连接类
    │   ├── AsyncTcpConnection.php // 异步Tcp连接类
    │   └── UdpConnection.php      // Udp连接类
    ├── Events                     // 网络事件库
    │   ├── EventInterface.php     // 网络事件库接口
    │   ├── Event.php              // Libevent网络事件库
    │   ├── Ev.php                 // Libev网络事件库
    │   ├── Swoole.php             // Swoole网络事件库
    │   └── Select.php             // Select网络事件库
    ├── Lib                        // 常用的类库
    │   ├── Constants.php          // 常量定义
    │   └── Timer.php              // 定时器
    ├── Protocols                  // 协议相关
    │   ├── ProtocolInterface.php  // 协议接口类
    │   ├── Http                   // http协议相关
    │   │   ├── Chunk.php    // http chunk类
    │   │   ├── Request.php  // http 请求类
    │   │   ├── Response.php  // http响应类
    │   │   ├── ServerSentEvents.php  // SSE类
    │   │   ├── Session
    │   │   │   ├── FileSessionHandler.php  // session文件存储
    │   │   │   └── RedisSessionHandler.php // session redis存储
    │   │   ├── Session.php  // session类
    │   │   └── mime.types   // mime映射文件
    │   ├── Http.php               // http协议实现
    │   ├── Text.php               // Text协议实现
    │   ├── Frame.php              // Frame协议实现
    │   └── Websocket.php          // websocket协议的实现
    ├── Worker.php                 // Worker
    ├── WebServer.php              // WebServer
    └── Autoloader.php             // 自动加载类

事件循环

  • event
    • 这是一个扩展,有效地调度I/O,时间和信号基础事件使用最好的可用于特定平台的I/O通知机制。这是将libevent移植到PHP基础设施的一个端口。
  • libevent
    • Libevent 是一个库,它提供了一种机制,在文件描述符上发生特定事件时或超时后执行回调函数。
    • PHP8 开始在文档中被移除。。。
  • select
    • stream_select - 在给定的流数组上运行与 select() 系统调用等效的操作,超时由 tv_sec 和 tv_usec 指定
protected static $_availableEventLoops = array(
  'event'    => '\Workerman\Events\Event',
  'libevent' => '\Workerman\Events\Libevent'
);

/**
 * Get event loop name.
 *
 * @return string
 */
protected static function getEventLoopName()
{
    if (static::$eventLoopClass) {
        return static::$eventLoopClass;
    }

    if (!\class_exists('\Swoole\Event', false)) {
        unset(static::$_availableEventLoops['swoole']);
    }

    $loop_name = '';
    foreach (static::$_availableEventLoops as $name=>$class) {
        if (\extension_loaded($name)) {
            $loop_name = $name;
            break;
        }
    }

    if ($loop_name) {
        static::$eventLoopClass = static::$_availableEventLoops[$loop_name];
    } else {
        static::$eventLoopClass =  '\Workerman\Events\Select';
    }
    return static::$eventLoopClass;
}

疑难问题定位