基础教程
Worker类的使用
WorkerMan中有两个重要的类Worker与Connection。
worker 对象实际上是一个容器,它可以以特定的协议去监听某个端口。当客户端连接到这个容器监听端口之后,会在这个 worker 容器内部产生一个 connection 对象。
在 WorkerMan 中通过操作这个 connection 对象来完成对客户端发送数据以及接收客户端数据的操作。
<?php
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
use Workerman\Protocols\Http\Request;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker('http://0.0.0.0:8686');
$worker->onWorkerStart = function ($worker) {
};
$worker->onConnect = function ($connection) {
echo "connection success" . PHP_EOL;
};
$worker->onMessage = function (TcpConnection $connection, Request $request) {
$connection->send("hello");
};
$worker->onClose = function ($connection) {
echo "connection close" . PHP_EOL;
};
$worker->onWorkerStop = function ($worker) {
};
// 运行worker
Worker::runAll();
<?php
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker('websocket://0.0.0.0:8686');
$worker->onMessage = function (TcpConnection $connection, $data) {
$connection->send("hello");
};
// 运行worker
Worker::runAll();
<?php
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker('tcp://0.0.0.0:80');
$worker->onMessage = function (TcpConnection $connection, $data) {
$connection->send("hello world");
};
// 运行worker
Worker::runAll();
<?php
use Workerman\Worker;
use Workerman\Connection\UdpConnection;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker('udp://0.0.0.0:80');
$worker->onMessage = function (UdpConnection $connection, $data) {
$connection->send("hello world");
};
// 运行worker
Worker::runAll();
Connection类的使用
TcpConnection类和AsyncTcpConnection类。AsyncTcpConnection 类是 TcpConnection 类的子类。
当我们在 Workerman 中需要访问外部的某个服务的时候,通过 AsyncTcpConnection 类异步的发起一个 tcp 连接,去连接远程的服务端,异步通讯。
<?php
use Workerman\Connection\ConnectionInterface;
use Workerman\Worker;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker('tcp://0.0.0.0:8686');
$worker->onConnect = function (ConnectionInterface $connection) {
echo "connection success" . PHP_EOL;
//类似于IP白名单
if ($connection->getRemoteIp() !== '127.0.0.1') {
$connection->close('bad ip');
}
};
$worker->onMessage = function (ConnectionInterface $connection, $data) {
$connection->send("hello");
};
$worker->onClose = function ($connection) {
echo "connection close" . PHP_EOL;
};
// 运行worker
Worker::runAll();
<?php
use Workerman\Connection\AsyncTcpConnection;
use Workerman\Connection\TcpConnection;
use Workerman\Worker;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker('http://0.0.0.0:8686');
$worker->onConnect = function (TcpConnection $connection) {
// 作为客户端去链接其他服务端(代理)
// 发起异步的 TCP 连接 通过实现 onConnect、onMessage、onClose 实现业务逻辑
$connectionBaidu = new AsyncTcpConnection('ssl://www.baidu.com:443');
// 当百度返回数据后,转发给浏览器客户端
$connectionBaidu->onMessage = function ($connectionBaidu, $data) use ($connection) {
$connection->send($data);
};
// 浏览器客户端发来数据后转发给百度
$connection->onMessage = function ($connection, $data) use ($connectionBaidu) {
$connectionBaidu->send($data);
};
// 执行连接
$connectionBaidu->connect();
};
// 运行worker
Worker::runAll();
Timer类的使用
<?php
require_once './vendor/autoload.php';
use Workerman\Worker;
use Workerman\Lib\Timer;
$worker = new Worker('websocket://0.0.0.0:8686');
$worker->onConnect = function ($connection) {
// 为每个客户端创建10s的账户认证
Timer::add(10, function () use ($connection) {
if (!isset($connection->name)) {
$connection->close('auth timeout and close');
}
}, null, false);
};
$worker->onMessage = function ($connection, $data) {
if (!isset($connection->name)) {
$data = json_decode($data, true);
if (!isset($data['name']) || !isset($data['password'])) {
$connection->close('auth fail and close');
return;
}
//假设此处进行数据库验证
$connection->name = $data['name'];
broadcast($connection->name . " login");
return;
}
broadcast($connection->name . " said:" . $data);
};
function broadcast($msg)
{
global $worker;
foreach ($worker->connections as $connection) {
if (!isset($connection->name)) {
continue;
}
$connection->send($msg);
}
}
Worker::runAll();
WebServer的使用
Workerman 4.x 版本去掉了 WebServer,推荐使用 webman。
原理解析
Worker类
<?php
/**
* 单进程IO复用select
* 同时处理多个连接
* ab -n100000 -c100 -k http://127.0.0.1:1215/
*/
class Worker
{
// 连接事件回调
public $onConnect = null;
// 消息事件回调
public $onMessage = null;
// 连接关闭事件回调
public $onClose = null;
// 监听端口的socket
protected $socket = null;
// 所有socket,包括客户端socket和监听端口socket
protected $allSockets = [];
// 构造函数
public function __construct($address)
{
// 创建监听socket
$this->socket = stream_socket_server($address, $errno, $errstr);
echo "listen $address\r\n";
// 设置为非阻塞
// 阻塞:当读取一个socket时,如果对方没有发来任何数据,这个读取操作会一直卡着,直到这个socket超时
// 非阻塞:当读取一个socket时,如果对方没有发来任何数据,读取操作会立刻返回,返回空数据
// 在workerman中不想因为读取某个客户端的连接,而导致整个workerman卡住,所以设置非堵塞
stream_set_blocking($this->socket, 0);
// 将监听socket放入allSockets
$this->allSockets[(int)$this->socket] = $this->socket;
}
// 运行
public function run()
{
while (1) {
// 这里不监听socket可写事件和外带数据可读事件
$write = $except = null;
// 监听所有socket可读事件,包括客户端socket和监听端口的socket
$read = $this->allSockets;
// stream_select是个IO复用函数,可以监听一个socket集合的可读、可写。
// 整个程序阻塞在这里,等待$read里面的socket可读,这里$read是个引用参数,也就是说stream_select()返回后$read是会被重新赋值
// $read值的内容就是所有状态可读的socket集合
stream_select($read, $write, $except, 60);
// $read被重新赋值,遍历所有状态为可读的socket
foreach ($read as $index => $socket) {
// 如果是监听socket可读,说明有新连接
if ($socket === $this->socket) {
// 通过stream_socket_accept获取新连接(客户端)
$new_conn_socket = stream_socket_accept($this->socket);
if (!$new_conn_socket) continue;
// 如果有onConnect事件回调,则尝试触发
if ($this->onConnect) {
call_user_func($this->onConnect, $new_conn_socket);
}
// 将新的客户端连接socket放入allSockets,以便stream_select监听其可读事件
$this->allSockets[(int)$new_conn_socket] = $new_conn_socket;
} else {
// 如果是客户端连接可读,说明对应连接的客户端有数据发来
// 读数据
$buffer = fread($socket, 65535);
// 数据为空,代表连接已经断开
if ($buffer === '' || $buffer === false) {
// 尝试触发onClose回调
if ($this->onClose) {
call_user_func($this->onClose, $socket);
}
fclose($socket);
// 从allSockets里删除对应的连接,不再监听这个socket可读事件
unset($this->allSockets[(int)$socket]);
continue;
}
// 尝试触发onMessage回调
call_user_func($this->onMessage, $socket, $buffer);
}
}
} //end while
}
}
$server = new Worker('tcp://0.0.0.0:1215');
$server->onConnect = function ($conn) {
echo "onConnect\n";
};
$server->onMessage = function ($conn, $msg) {
fwrite($conn, "HTTP/1.1 200 OK\r\nConnection: keep-alive\r\nServer: workerman/1.1.4\r\nContent-length:5\r\n\r\nhello");
};
$server->onClose = function ($conn) {
echo "onClose\n";
};
$server->run();
TcpConnection类
<?php
/**
* 单进程IO复用select
* 同时处理多个连接
* ab -n100000 -c100 -k http://127.0.0.1:1215/
*/
class Worker
{
// 连接事件回调
public $onConnect = null;
// 消息事件回调
public $onMessage = null;
// 连接关闭事件回调
public $onClose = null;
// 监听端口的socket
protected $socket = null;
// 所有socket,包括客户端socket和监听端口socket
protected $allSockets = [];
// 所有tcpconnection类的实例,也就是所有客户端的连接对象
public $connections = [];
// 构造函数
public function __construct($address)
{
// 创建监听socket
$this->socket = stream_socket_server($address, $errno, $errstr);
echo "listen $address\r\n";
// 设置为非阻塞
// 阻塞:当读取一个socket时,如果对方没有发来任何数据,这个读取操作会一直卡着,直到这个socket超时
// 非阻塞:当读取一个socket时,如果对方没有发来任何数据,读取操作会立刻返回,返回空数据
stream_set_blocking($this->socket, 0);
// 将监听socket放入allSockets
$this->allSockets[(int)$this->socket] = $this->socket;
}
// 运行
public function run()
{
while (1) {
// 这里不监听socket可写事件和外带数据可读事件
$write = $except = null;
// 监听所有socket可读事件,包括客户端socket和监听端口的socket
$read = $this->allSockets;
// stream_select是个IO复用函数。整个程序阻塞在这里,等待$read里面的socket可读,这里$read是个引用参数
stream_select($read, $write, $except, 60);
// $read被重新赋值,遍历所有状态为可读的socket
foreach ($read as $index => $socket) {
// 如果是监听socket可读,说明有新连接
if ($socket === $this->socket) {
// 通过stream_socket_accept获取新连接
$new_conn_socket = stream_socket_accept($this->socket);
if (!$new_conn_socket) continue;
$conn = new TcpConnection($new_conn_socket);
$this->connections[(int)$new_conn_socket] = $conn;
// 如果有onConnect事件回调,则尝试触发
if ($this->onConnect) {
call_user_func($this->onConnect, $conn);
}
// 将新的客户端连接socket放入allSockets,以便stream_select监听其可读事件
$this->allSockets[(int)$new_conn_socket] = $new_conn_socket;
} else { // 是客户端连接可读,说明对应连接的客户端有数据发来
// 读数据
$buffer = fread($socket, 65535);
// 数据为空,代表连接已经断开
if ($buffer === '' || $buffer === false) {
// 尝试触发onClose回调
if ($this->onClose) {
call_user_func($this->onClose, $this->connections[(int)$socket]);
}
fclose($socket);
// 从allSockets里删除对应的连接,不再监听这个socket可读事件
unset($this->allSockets[(int)$socket], $this->connections[(int)$socket]);
continue;
}
// 尝试触发onMessage回调
call_user_func($this->onMessage, $this->connections[(int)$socket], $buffer);
}
}
} //end while
}
}
问题:
- 上述 onConnect、onMessage、onClose 回调中需要使用 php 原始 api 来操作 socket
- TcpConnection 其实是对这些 socket 的二次封装
<?php
class TcpConnection
{
protected $_socket = null;
public function __construct($socket)
{
$this->_socket = $socket;
}
public function send($buffer)
{
//检测socket是否断开
if (feof($this->_socket)) return false;
return fwrite($this->_socket, $buffer);
}
}
<?php
require_once 'Worker.php';
require_once 'TcpConnection.php';
$server = new Worker('tcp://0.0.0.0:1215');
$server->onConnect = function ($conn) {
$conn->send('input your name: ');
};
$server->onMessage = function ($conn, $msg) {
if (!isset($conn->name)) {
$conn->name = trim($msg);
broadcast("{$conn->name} come");
return;
}
broadcast("{$conn->name} said: $msg");
};
$server->onClose = function ($conn) {
broadcast("{$conn->name} logout");
};
function broadcast($msg)
{
global $server;
foreach ($server->connections as $conn) {
$conn->send("$msg\r\n");
}
}
$server->run();
源码解析
客户端与worker进程的关系
- 连接哪个Worker进程由操作系统根据系统的负载情况自动分配
主进程与worker子进程关系
Workerman // workerman内核代码
├── Connection // socket连接相关
│ ├── ConnectionInterface.php// socket连接接口
│ ├── TcpConnection.php // Tcp连接类
│ ├── AsyncTcpConnection.php // 异步Tcp连接类
│ └── UdpConnection.php // Udp连接类
├── Events // 网络事件库
│ ├── EventInterface.php // 网络事件库接口
│ ├── Event.php // Libevent网络事件库
│ ├── Ev.php // Libev网络事件库
│ ├── Swoole.php // Swoole网络事件库
│ └── Select.php // Select网络事件库
├── Lib // 常用的类库
│ ├── Constants.php // 常量定义
│ └── Timer.php // 定时器
├── Protocols // 协议相关
│ ├── ProtocolInterface.php // 协议接口类
│ ├── Http // http协议相关
│ │ ├── Chunk.php // http chunk类
│ │ ├── Request.php // http 请求类
│ │ ├── Response.php // http响应类
│ │ ├── ServerSentEvents.php // SSE类
│ │ ├── Session
│ │ │ ├── FileSessionHandler.php // session文件存储
│ │ │ └── RedisSessionHandler.php // session redis存储
│ │ ├── Session.php // session类
│ │ └── mime.types // mime映射文件
│ ├── Http.php // http协议实现
│ ├── Text.php // Text协议实现
│ ├── Frame.php // Frame协议实现
│ └── Websocket.php // websocket协议的实现
├── Worker.php // Worker
├── WebServer.php // WebServer
└── Autoloader.php // 自动加载类
事件循环
- event
- 这是一个扩展,有效地调度I/O,时间和信号基础事件使用最好的可用于特定平台的I/O通知机制。这是将libevent移植到PHP基础设施的一个端口。
- libevent
- Libevent 是一个库,它提供了一种机制,在文件描述符上发生特定事件时或超时后执行回调函数。
- PHP8 开始在文档中被移除。。。
- select
- stream_select - 在给定的流数组上运行与 select() 系统调用等效的操作,超时由 tv_sec 和 tv_usec 指定
protected static $_availableEventLoops = array(
'event' => '\Workerman\Events\Event',
'libevent' => '\Workerman\Events\Libevent'
);
/**
* Get event loop name.
*
* @return string
*/
protected static function getEventLoopName()
{
if (static::$eventLoopClass) {
return static::$eventLoopClass;
}
if (!\class_exists('\Swoole\Event', false)) {
unset(static::$_availableEventLoops['swoole']);
}
$loop_name = '';
foreach (static::$_availableEventLoops as $name=>$class) {
if (\extension_loaded($name)) {
$loop_name = $name;
break;
}
}
if ($loop_name) {
static::$eventLoopClass = static::$_availableEventLoops[$loop_name];
} else {
static::$eventLoopClass = '\Workerman\Events\Select';
}
return static::$eventLoopClass;
}