用ZLMedia实现rtmp拉流转推流(rtmp/hls)

发布时间 2023-05-31 10:42:45作者: 阿风小子
业务场景是:有一个rtmp的源,对外提供rtmp的直播节目,地址rtmp://abc.com/live/tv, 
 
现在的需求是要将此节目拉过来,生成HLS对外发布,或对外还是rtmp发布,比如rtmp://my.com/live/tv。
 
作用嘛,肯定是你懂的!
 
此时需要一个把rtmp数据拉过来,再推出去的动作,一般推给SRS流媒体服务器后,即可随意对外分发提供rtmp或hls。
 
如何实现呢?
 
一般的流媒体服务器,像SRS提供许多流媒体格式的输出,可以用于进行流媒体处理,但此时需要一个拉流的程序。SRS里面用的是ffmpeg程序,但是个人感觉并不是特别好用,比如在某种情况下,ffmpeg卡死了,但是SRS并不会让ffmpeg重新拉流。
 
这里推荐两种方法:
 
一、使用zlmedia 的json API实现
最简单的办法就是使用zlmedia的mediaproxy API接口,即可完成,具体可参考
 
MediaServer支持的HTTP API · ZLMediaKit/ZLMediaKit Wiki · GitHub
 
 "/index/api/addStreamProxy"
二、使用zlmedia进行二次开发
如果需要用程序进行自定义实现,可以参照  
 
https://github.com/xia-chu/ZLMediaKit/blob/master/tests/test_pusher.cpp
 
假定SRS运行在本机的1935端口,推给SRS后,由SRS再处理对外发布。
 
 
int main(int argc, char *argv[]) {
    return domain("rtmp://abc.com/live/hks1", "rtmp://127.0.0.1/live/tv");
}
 以下是我根据ZLMediaKit中的 test_pusher修改后的代码,可以从外界传递参数到程序中,实现自定义拉流并转推。
 
 实测此功能正常运行,稳定性还不错。
 
程序的编译方法:在编译zlmedia的时候会自动编译test目录下的所有.cpp。
 
 
#include <signal.h>
#include <iostream>
#include "Util/logger.h"
#include "Util/NoticeCenter.h"
#include "Poller/EventPoller.h"
#include "Player/PlayerProxy.h"
#include "Rtmp/RtmpPusher.h"
#include "Common/config.h"
#include "Pusher/MediaPusher.h"
 
#include "Util/CMD.h"
 
using namespace std;
using namespace toolkit;
using namespace mediakit;
 
//推流器,保持强引用
MediaPusher::Ptr pusher;
Timer::Ptr g_timer;
 
//声明函数
void rePushDelay(const EventPoller::Ptr &poller,const string &schema,const string &vhost,const string &app, const string &stream, const string &url);
 
//创建推流器并开始推流
void createPusher(const EventPoller::Ptr &poller, const string &schema,const string &vhost,const string &app, const string &stream, const string &url) {
    //创建推流器并绑定一个MediaSource
    pusher.reset(new MediaPusher(schema,vhost, app, stream,poller));
    //可以指定rtsp推流方式,支持tcp和udp方式,默认tcp
//    (*pusher)[Client::kRtpType] = Rtsp::RTP_UDP;
    //设置推流中断处理逻辑
    pusher->setOnShutdown([poller,schema,vhost, app, stream, url](const SockException &ex) {
        WarnL << "Server connection is closed:" << ex.getErrCode() << " " << ex.what();
        //重试
        rePushDelay(poller,schema,vhost,app, stream, url);
    });
    //设置发布结果处理逻辑
    pusher->setOnPublished([poller,schema,vhost, app, stream, url](const SockException &ex) {
        if (ex) {
            WarnL << "Publish fail:" << ex.getErrCode() << " " << ex.what();
            //如果发布失败,就重试
            rePushDelay(poller,schema,vhost,app, stream, url);
        } else {
            InfoL << "Publish success,Please play with player:" << url;
        }
    });
    pusher->publish(url);
}
 
//推流失败或断开延迟2秒后重试推流
void rePushDelay(const EventPoller::Ptr &poller,const string &schema,const string &vhost,const string &app, const string &stream, const string &url) {
    g_timer = std::make_shared<Timer>(2,[poller,schema,vhost,app, stream, url]() {
        InfoL << "Re-Publishing...";
        //重新推流
        createPusher(poller,schema,vhost,app, stream, url);
        //此任务不重复
        return false;
    }, poller);
}
 
//这里才是真正执行main函数,你可以把函数名(domain)改成main,然后就可以输入自定义url了
int domain(const string &playUrl, const string &pushUrl) {
 
auto poller = EventPollerPool::Instance().getPoller();
    //拉一个流,生成一个RtmpMediaSource,源的名称是"app/stream"
    //你也可以以其他方式生成RtmpMediaSource,比如说MP4文件(请查看test_rtmpPusherMp4.cpp代码)
    MediaInfo info(pushUrl);
    PlayerProxy::Ptr player(new PlayerProxy(DEFAULT_VHOST, "app", "stream",false,false,-1 , poller));
    //可以指定rtsp拉流方式,支持tcp和udp方式,默认tcp
//    (*player)[Client::kRtpType] = Rtsp::RTP_UDP;
    player->play(playUrl.data());
 
    //监听RtmpMediaSource注册事件,在PlayerProxy播放成功后触发
    NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastMediaChanged,
                                         [pushUrl,poller](BroadcastMediaChangedArgs) {
                                             //媒体源"app/stream"已经注册,这时方可新建一个RtmpPusher对象并绑定该媒体源
                                             if(bRegist && pushUrl.find(sender.getSchema()) == 0){
                                                 createPusher(poller,sender.getSchema(),sender.getVhost(),sender.getApp(), sender.getId(), pushUrl);
                                             }
                                         });
 
    //设置退出信号处理函数
    static semaphore sem;
    signal(SIGINT, [](int) { sem.post(); });// 设置退出信号
    sem.wait();
    pusher.reset();
    g_timer.reset();
    return 0;
}
 
 
 
int main(int argc, char *argv[]) 
{
// ./ffmpeg -loglevel info  -f flv -i rtmp://test.yunyingtx.com/live/PLTV/88888888/tv 
// -vcodec copy -acodec copy -f flv -y rtmp://127.0.0.1:1936/live/tv
string input;
string output;
try {
for (int i = 0; i < argc-1; i++)
{
if (string(argv[i]) == "-i")
input = argv[i + 1];
}
for (int i = 0; i < argc - 1; i++)
{
if (string(argv[i]) == "-y")
output = argv[i + 1];
}
 
}
catch (std::exception &ex) {
cout << ex.what() << endl;
return -1;
}
 
 
 
 
if (input.find("rtmp://") == string::npos ||
output.find("rtmp://") == string::npos)
{
return -1;
}
 
EventPollerPool::setPoolSize(1);
 
//设置日志
LogLevel logLevel = (LogLevel)LDebug;//
Logger::Instance().add(std::make_shared<ConsoleChannel>());
Logger::Instance().setWriter(std::make_shared<AsyncLogWriter>());
auto fileChannel = std::make_shared<FileChannel>("FileChannel", exeDir() + "../logs/", logLevel);
fileChannel->setMaxDay(7);
Logger::Instance().add(fileChannel);
 
 
DebugL << "program started." << endl;
DebugL << input<<endl;
DebugL << output<<endl;
 
 
    return domain(input, output);//"rtmp://127.0.0.1/live/cctv13"
}
 
 
 
 
 
 
 
zlmedia本身存在一个问题:拉流时有中断的现象,就是说rtmp源还在播,但是我们的这个程序不知道因为什么原因停止工作了,导致整个媒体处理流程中断。
 
经过长时间的跟踪和调试,发现ZLMediaKit在处理 rtmp的流程存在问题
 
https://github.com/xia-chu/ZLMediaKit/issues/455
 
解决办法是在ZLMediaKit的 src/Rtmp/RtmpPlayerImp.h 文件中
对onCheckMeta() 函数加入对 _delegate为空的判断,只有为空的时候才reset()。
 
修改前:
 
    bool onCheckMeta(const AMFValue &val) override {
        _rtmp_src = dynamic_pointer_cast<RtmpMediaSource>(_pMediaSrc);
        if (_rtmp_src) {
            _rtmp_src->setMetaData(val);
            _set_meta_data = true;
        }
        _delegate.reset(new RtmpDemuxer);
        _delegate->loadMetaData(val);
        return true;
    }
修改后 :
 
    bool onCheckMeta(const AMFValue &val) override {
        _rtmp_src = dynamic_pointer_cast<RtmpMediaSource>(_pMediaSrc);
        if (_rtmp_src) {
            _rtmp_src->setMetaData(val);
            _set_meta_data = true;
        }
if (!_delegate) {
_delegate.reset(new RtmpDemuxer);
_delegate->loadMetaData(val);
}
return true;
    }