tp使用workerman消息推送

发布时间 2023-09-15 00:13:41作者: 成文的博客

安装

首先通过 composer 安装

composer require topthink/think-worker

SocketServer

在命令行启动服务端

php think worker:server

默认会在0.0.0.0:2345开启一个websocket服务。如果需要自定义参数,可以在config/worker_server.php中进行配置。

使用自定义类作为Worker服务入口文件类

<?php

namespace app\work;

use think\worker\Server;
use Workerman\Lib\Timer;
use Workerman\Worker;

class Push extends Server
{
    protected $socket = 'http://0.0.0.0:2346';   //端口自行修改

    protected static $heartbeat_time = 55;

    protected $uidConnections = array();

    public function onConnect($connection)
    {
        dump('connect');
    }

    public function onWorkerStart($worker)
    {
        //过期关闭连接
        Timer::add(1, function () use ($worker) {
            $time_now = time();
            foreach ($worker->connections as $connection) {
               if (empty($connection->lastMessageTime)) {
                    $connection->lastMessageTime = $time_now;
                    continue;
                }
                if ($time_now - $connection->lastMessageTime > self::$heartbeat_time) {      //连接超时
                    $connection->close();
                }
            }
        });


        // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符

        $inner_text_worker = new Worker('text://0.0.0.0:5678');

        $inner_text_worker->onMessage = function($connection, $buffer)

        {
            // $data数组格式,里面有uid,表示向那个uid的页面推送数据

            $data = json_decode($buffer, true);

            $uid = $data['uid'];

            // 通过workerman,向uid的页面推送数据

            $ret = $this->sendMessageByUid($uid,$buffer);

            // 返回推送结果

            $connection->send($ret ? 'ok' : 'fail');

        };

        // ## 执行监听 ##

        $inner_text_worker->listen();

    }

    public function onMessage($connection, $data)
    {
        $connection->lastMessageTime = time();
        // 判断当前客户端是否已经验证,既是否设置了uid
        $data = json_decode($data,true);
        if(isset($data['type'])&&$data['type']=='init')
        {
            // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
            $connection->uid = $data['uid'];
            $connection->no = mt_rand(10000,99999);
            /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
             * 实现针对特定uid推送数据
             * 实现同一 用户多终端登录同时推送
             */
            $this->uidConnections[$connection->uid][$connection->no] = $connection;
            return;
        }
        $connection->send('收到');
    }

    // 当有客户端连接断开时

    function onClose($connection)
    {
        if(isset($connection->uid) && isset($connection->no))
        {
            unset($this->uidConnections[$connection->uid][$connection->no]);
        }
    }

    // 向所有验证的用户推送数据

//    function broadcast($message)
//
//    {
//        foreach($this->uidConnections as $connection)
//
//        {
//            $connection->send($message);
//        }
//        return true;
//    }



    // 针对uid推送数据

    function sendMessageByUid($uid, $message)
    {
        if(isset($this->uidConnections[$uid]))
        {
            foreach($this->uidConnections[$uid] as $item){
                $item->send($message);
            }

            return true;
        }
        return false;

    }

}

然后在worker_server.php中增加配置参数:

return [
	'worker_class'	=>	'app\work\Push',
];

向指定用户推送消息

    public function pushClient()
    {
        // 建立socket连接到内部推送端口

        $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);

        // 推送的数据,包含uid字段,表示是给这个uid推送

        $data = array('uid' => 'uid1', 'percent' => '88%');

        // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符

        fwrite($client, json_encode($data) . "\n");

        // 读取推送结果

        echo fread($client, 8192);
    }

前端代码

<script>
    let ws = new WebSocket("ws://127.0.0.1:2346")

    ws.onopen = function() {
        var uid = 'uid1';
        ws.send(JSON.stringify({'type':"init",'uid':uid}));
        //绑定连接事件
        console.log("连接成功");
        //每30秒发送一次心跳
        setInterval(function(){
            ws.send(JSON.stringify({'type':"peng"}));
            console.log('发送心跳...');
        },3000)
    };

    ws.onmessage = function(evt) {
        //绑定收到消息事件
       /* data = JSON.parse(evt.data)*/
        console.log(evt.data);
    };

    ws.onclose = function(evt) {
        //绑定关闭或断开连接事件
        console.log("连接已关闭");
    };
</script>