SRS学习笔记11

时间:2021-08-10 00:47:32

文件:

src\app\srs_app_source.hpp

src\app\srs_app_source.cpp

SrsSource代表 living stream source
class SrsSource : public ISrsReloadHandler
{
private:
static std::map<std::string, SrsSource*> pool;
public:
/**
* create source when fetch from cache failed.
* @param r the client request.
* @param h the event handler for source.
* @param pps the matched source, if success never be NULL.
*/
static int fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps);
private:
/**
* get the exists source, NULL when not exists.
* update the request and return the exists source.
*/
static SrsSource* fetch(SrsRequest* r);
public:
/**
* dispose and cycle all sources.
*/
static void dispose_all();
static int cycle_all();
private:
static int do_cycle_all();
public:
/**
* when system exit, destroy the sources,
* for gmc to analysis mem leaks.
*/
static void destroy();
private:
// source id,
// for publish, it's the publish client id.
// for edge, it's the edge ingest id.
// when source id changed, for example, the edge reconnect,
// invoke the on_source_id_changed() to let all clients know.
int _source_id;
// previous source id.
int _pre_source_id;
// deep copy of client request.
SrsRequest* _req;
// to delivery stream to clients.
std::vector<SrsConsumer*> consumers;
// the time jitter algorithm for vhost.
SrsRtmpJitterAlgorithm jitter_algorithm;
// whether use interlaced/mixed algorithm to correct timestamp.
bool mix_correct;
SrsMixQueue
* mix_queue;
// whether stream is monotonically increase.
bool is_monotonically_increase;
int64_t last_packet_time;
// hls handler.
#ifdef SRS_AUTO_HLS
SrsHls
* hls;
#endif
// dvr handler.
#ifdef SRS_AUTO_DVR
SrsDvr
* dvr;
#endif
// transcoding handler.
#ifdef SRS_AUTO_TRANSCODE
SrsEncoder
* encoder;
#endif
#ifdef SRS_AUTO_HDS
SrsHds
*hds;
#endif
// edge control service
SrsPlayEdge* play_edge;
SrsPublishEdge
* publish_edge;
// gop cache for client fast startup.
SrsGopCache* gop_cache;
// to forward stream to other servers
std::vector<SrsForwarder*> forwarders;
// for aggregate message
SrsStream* aggregate_stream;
// the event handler.
ISrsSourceHandler* handler;
private:
/**
* can publish, true when is not streaming
*/
bool _can_publish;
/**
* atc whether atc(use absolute time and donot adjust time),
* directly use msg time and donot adjust if atc is true,
* otherwise, adjust msg time to start from 0 to make flash happy.
*/
// TODO: FIXME: to support reload atc.
bool atc;
// last die time, when all consumers quit and no publisher,
// we will remove the source when source die.
int64_t die_at;
private:
SrsSharedPtrMessage
* cache_metadata;
// the cached video sequence header.
SrsSharedPtrMessage* cache_sh_video;
// the cached audio sequence header.
SrsSharedPtrMessage* cache_sh_audio;
public:
SrsSource();
virtual ~SrsSource();
public:
virtual void dispose();
virtual int cycle();
// remove source when expired.
virtual bool expired();
// initialize, get and setter.
public:
/**
* initialize the hls with handlers.
*/
virtual int initialize(SrsRequest* r, ISrsSourceHandler* h);
// interface ISrsReloadHandler
public:
virtual int on_reload_vhost_atc(std::string vhost);
virtual int on_reload_vhost_gop_cache(std::string vhost);
virtual int on_reload_vhost_queue_length(std::string vhost);
virtual int on_reload_vhost_time_jitter(std::string vhost);
virtual int on_reload_vhost_mix_correct(std::string vhost);
virtual int on_reload_vhost_forward(std::string vhost);
virtual int on_reload_vhost_hls(std::string vhost);
virtual int on_reload_vhost_hds(std::string vhost);
virtual int on_reload_vhost_dvr(std::string vhost);
virtual int on_reload_vhost_transcode(std::string vhost);
// for the tools callback
public:
// for the SrsForwarder to callback to request the sequence headers.
virtual int on_forwarder_start(SrsForwarder* forwarder);
// for the SrsHls to callback to request the sequence headers.
virtual int on_hls_start();
// for the SrsDvr to callback to request the sequence headers.
virtual int on_dvr_request_sh();
// source id changed.
virtual int on_source_id_changed(int id);
// get current source id.
virtual int source_id();
virtual int pre_source_id();
// logic data methods
public:
virtual bool can_publish(bool is_edge);
virtual int on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata);
public:
virtual int on_audio(SrsCommonMessage* audio);
private:
virtual int on_audio_imp(SrsSharedPtrMessage* audio);
public:
virtual int on_video(SrsCommonMessage* video);
private:
virtual int on_video_imp(SrsSharedPtrMessage* video);
public:
virtual int on_aggregate(SrsCommonMessage* msg);
/**
* publish stream event notify.
* @param _req the request from client, the source will deep copy it,
* for when reload the request of client maybe invalid.
*/
virtual int on_publish();
virtual void on_unpublish();
// consumer methods
public:
/**
* create consumer and dumps packets in cache.
* @param consumer, output the create consumer.
* @param ds, whether dumps the sequence header.
* @param dm, whether dumps the metadata.
* @param dg, whether dumps the gop cache.
*/
virtual int create_consumer(
SrsConnection
* conn, SrsConsumer*& consumer,
bool ds = true, bool dm = true, bool dg = true
);
virtual void on_consumer_destroy(SrsConsumer* consumer);
virtual void set_cache(bool enabled);
virtual SrsRtmpJitterAlgorithm jitter();
// internal
public:
// for edge, when publish edge stream, check the state
virtual int on_edge_start_publish();
// for edge, proxy the publish
virtual int on_edge_proxy_publish(SrsCommonMessage* msg);
// for edge, proxy stop publish
virtual void on_edge_proxy_unpublish();
private:
virtual int create_forwarders();
virtual void destroy_forwarders();
};

这个类被调用的第一个函数是static member function 

fetch_or_createb SrsSource::fetch_or_create
#0  SrsSource::fetch_or_create (r=0x8f4ff0, h=0x8d5458, pps=0x7ffff7f84a00) at src/app/srs_app_source.cpp:746
#
1 0x000000000047f158 in SrsRtmpConn::stream_service_cycle (this=0x8f4ec0) at src/app/srs_app_rtmp_conn.cpp:499
#
2 0x000000000047e9d8 in SrsRtmpConn::service_cycle (this=0x8f4ec0) at src/app/srs_app_rtmp_conn.cpp:416
#
3 0x000000000047d936 in SrsRtmpConn::do_cycle (this=0x8f4ec0) at src/app/srs_app_rtmp_conn.cpp:211
#
4 0x000000000047bc6d in SrsConnection::cycle (this=0x8f4f48) at src/app/srs_app_conn.cpp:89
#
5 0x00000000004ad6a3 in SrsOneCycleThread::cycle (this=0x8f4f90) at src/app/srs_app_thread.cpp:372
#
6 0x00000000004ace3b in internal::SrsThread::thread_cycle (this=0x8f4fb0) at src/app/srs_app_thread.cpp:207
#
7 0x00000000004ad049 in internal::SrsThread::thread_fun (arg=0x8f4fb0) at src/app/srs_app_thread.cpp:245
#
8 0x0000000000535371 in _st_thread_main () at sched.c:327
#
9 0x0000000000535ae1 in st_thread_create (start=0x7ffff7fdbc20, arg=0x1f7fdbb60, joinable=0, stk_size=5480568) at sched.c:591
int SrsSource::fetch_or_create(SrsRequest* r, ISrsSourceHandler* h, SrsSource** pps)
{
int ret = ERROR_SUCCESS;

SrsSource* source = NULL;
// 根据请求中的app和stream参数在SrsSource pool中寻找已经存在的SrsSource if ((source = fetch(r)) != NULL) {
//
*pps = source; return ret;
}
//在SrsSource 不存在,运行在此处,publish一个没被play的app/stream pair,或play一个没被publish的流
string stream_url = r->get_stream_url();
string vhost = r->vhost;

// should always not exists for create a source.
srs_assert (pool.find(stream_url) == pool.end());

source = new SrsSource();
if ((ret = source->initialize(r, h)) != ERROR_SUCCESS) {
srs_freep(source);
return ret;
}

pool[stream_url] = source;
srs_info("create new source for url=%s, vhost=%s", stream_url.c_str(), vhost.c_str());

*pps = source;

return ret;
}

SrsSource* SrsSource::fetch(SrsRequest* r)
{
SrsSource* source = NULL;
// stream_url /hls/stream
string stream_url = r->get_stream_url();
if (pool.find(stream_url) == pool.end()) {
return NULL;
}

source = pool[stream_url];

// we always update the request of resource,
// for origin auth is on, the token in request maybe invalid,
// and we only need to update the token of request, it's simple.
source->_req->update_auth(r);

return source;
}

 

int SrsSource::initialize(SrsRequest* r, ISrsSourceHandler* h)
{
// h参数为SrsRtmpConn类的成员变量server,类型为SrsServer
/*
class SrsServer : virtual public ISrsReloadHandler, virtual public ISrsSourceHandler, virtual public IConnectionManager*/
int ret = ERROR_SUCCESS;

srs_assert(h);
srs_assert(
!_req);

handler
= h;
_req
= r->copy();//创建此流的SrsRequest,可能是play或publish
// 通常情况下,服务器分发rtmp stream到client时,时间戳从0开始
// 当 atc打开时,服务器用绝对时间分发rtmp stream 到 client atc
= _srs_config->get_atc(_req->vhost);

#ifdef SRS_AUTO_HLS
if ((ret = hls->initialize(this)) != ERROR_SUCCESS) {
return ret;
}
#endif

#ifdef SRS_AUTO_DVR
if ((ret = dvr->initialize(this, _req)) != ERROR_SUCCESS) {
return ret;
}
#endif
// play edge 里面有一个 ingester,从origin 拉流
if ((ret = play_edge->initialize(this, _req)) != ERROR_SUCCESS) {
return ret;
}
// publish edge里面有一个 forwarder,转发流到 origin
if ((ret = publish_edge->initialize(this, _req)) != ERROR_SUCCESS) {
return ret;
}

double queue_size = _srs_config->get_queue_length(_req->vhost);
publish_edge
->set_queue_size(queue_size);

jitter_algorithm
= (SrsRtmpJitterAlgorithm)_srs_config->get_time_jitter(_req->vhost);
mix_correct
= _srs_config->get_mix_correct(_req->vhost);

return ret;
}
int SrsPublishEdge::initialize(SrsSource* source, SrsRequest* req)
{
int ret = ERROR_SUCCESS;
// forwarder
if ((ret = forwarder->initialize(source, this, req)) != ERROR_SUCCESS) {
return ret;
}

return ret;
}

int SrsPlayEdge::initialize(SrsSource* source, SrsRequest* req)
{
int ret = ERROR_SUCCESS;
// ingester 类型为 SrsEdgeIngester
if ((ret = ingester->initialize(source, this, req)) != ERROR_SUCCESS) {
return ret;
}

return ret;
}