ZLMediaKit源码分析(一)启动

时间:2025-03-24 20:09:50

启动位置

server/ -> start_main

解析命令行

daemon :守护进程
level:日志级别
config:配置文件路径
ssl:ssl密钥路径
threads: 线程锁数量
max_day :日志最多保存天数

CMD_main cmd_main;
try {
    cmd_main.operator()(argc, argv);
} catch (ExitException &) {
    return 0;
} catch (std::exception &ex) {
    cout << ex.what() << endl;
    return -1;
}

bool bDaemon = cmd_main.hasKey("daemon");
LogLevel logLevel = (LogLevel) cmd_main["level"].as<int>();
logLevel = MIN(MAX(logLevel, LTrace), LError);
g_ini_file = cmd_main["config"];
string ssl_file = cmd_main["ssl"];
int threads = cmd_main["threads"];

//.....
fileChannel->setMaxDay(cmd_main["max_day"]);

启动流媒体服务

首先会设置线程池的大小,线程池大小默认是std::thread::hardware_concurrency() cup最大的数量*2,接着调用TcpServer或者UdpServer的start方法。

 EventPollerPool::setPoolSize(threads);
	 //.....
        try {
            //rtsp服务器,端口默认554
            if (rtspPort) { rtspSrv->start<RtspSession>(rtspPort); }
            //rtsps服务器,端口默认322
            if (rtspsPort) { rtspSSLSrv->start<RtspSessionWithSSL>(rtspsPort); }

            //rtmp服务器,端口默认1935
            if (rtmpPort) { rtmpSrv->start<RtmpSession>(rtmpPort); }
            //rtmps服务器,端口默认19350
            if (rtmpsPort) { rtmpsSrv->start<RtmpSessionWithSSL>(rtmpsPort); }

            //http服务器,端口默认80
            if (httpPort) { httpSrv->start<HttpSession>(httpPort); }
            //https服务器,端口默认443
            if (httpsPort) { httpsSrv->start<HttpsSession>(httpsPort); }

            //telnet远程调试服务器
            if (shellPort) { shellSrv->start<ShellSession>(shellPort); }

#if defined(ENABLE_RTPPROXY)
            //创建rtp服务器
            if (rtpPort) { rtpServer->start(rtpPort); }
#endif//defined(ENABLE_RTPPROXY)

#if defined(ENABLE_WEBRTC)
            //webrtc udp服务器
            if (rtcPort) { rtcSrv->start<WebRtcSession>(rtcPort); }
#endif//defined(ENABLE_WEBRTC)


#if defined(ENABLE_SRT)
        // srt udp服务器
        if(srtPort){
            srtSrv->start<SRT::SrtSession>(srtPort);
        }
#endif//defined(ENABLE_SRT)

        } catch (std::exception &ex) {
            WarnL << "端口占用或无权限:" << ex.what() << endl;
            ErrorL << "程序启动失败,请修改配置文件中端口号后重试!" << endl;
            sleep(1);
#if !defined(_WIN32)
            if (pid != getpid() && kill_parent_if_failed) {
                //杀掉守护进程
                kill(pid, SIGINT);
            }
#endif
            return -1;
        }

tcp 类型流媒体 start

1、通过模板SessionType实现协议的多态,session用作会话管理,用来处理每一种协议的拆包、解析、封装等功能,onCreateSocket是一个函数std::function<Ptr(const EventPoller::Ptr &poller)>在接收到连接请求前执行,SessionHelper用于管理全局的所有的session
2、start_l 构建tcp的总入口

  /**
    * @brief 开始tcp server
    * @param port 本机端口,0则随机
    * @param host 监听网卡ip
    * @param backlog tcp listen backlog
    */
    template<typename SessionType>
    void start(uint16_t port, const std::string &host = "127.0.0.1", uint32_t backlog = 1024) {
        //TcpSession创建器,通过它创建不同类型的服务器
        _session_alloc = [](const TcpServer::Ptr &server, const Socket::Ptr &sock) {
            auto session = std::make_shared<SessionType>(sock);
            session->setOnCreateSocket(server->_on_create_socket);
            return std::make_shared<SessionHelper>(server, session);
        };
        start_l(port, host, backlog);
    }

监听端口,接收ipv4、ipv6

_socket->listen包含了很多操作
1、创建socket描述符 socket(family, SOCK_STREAM, IPPROTO_TCP))
2、setReuseable 设置地址、端口复用
3、setNoBlocked 设置为非阻塞模式
4、setCloExec 设置close-on-exec状态
5、bind_sock(fd, local_ip, port, family) 绑定ip、端口号
6、::listen(fd, back_log) 监听文件描述符

  if (!_socket->listen(port, host.c_str(), backlog)) {
	        //创建tcp监听失败,可能是由于端口占用或权限问题
	        string err = (StrPrinter << "listen on " << host << ":" << port << " failed:" << get_uv_errmsg(true));
	        throw std::runtime_error(err);
	    }

会话管理

常用的定时器有最小堆、时间轮、红黑树,ZLMediaKit采用std::map红黑树实现,key保存时间,value保存std::function,这一块的实现存在争议。

//新建一个定时器定时管理这些tcp会话
weak_ptr<TcpServer> weak_self = std::dynamic_pointer_cast<TcpServer>(shared_from_this());
_timer = std::make_shared<Timer>(2.0f, [weak_self]() -> bool {
    auto strong_self = weak_self.lock();
    if (!strong_self) {
        return false;
    }
    strong_self->onManagerSession();
    return true;
}, _poller);

初始化子线程

EventPollerPool,构造函数调用addPoller
1、线程是名称是std::to_string (event poller + " "+ i)
2、线程设置再多也不会大于std::thread::hardware_concurrency() (cpu*2)
3、线程池的item poller,执行runLoop,参数一true表示当前线程,false表示子线程,参数二register_thread表示把线程shared_from_this()注入到一个static thread_local std::weak_ptr<EventPoller> s_current_poller;当中。runLoop 中就是再执行while(!_exit_flag){} epoll事件循环

EventPollerPool::EventPollerPool() {
	    auto size = addPoller("event poller", s_pool_size, ThreadPool::PRIORITY_HIGHEST, true);
	    InfoL << "创建EventPoller个数:" << size;
	}
	
	size_t TaskExecutorGetterImp::addPoller(const string &name, size_t size, int priority, bool register_thread) {
	    auto cpus = thread::hardware_concurrency();
	    size = size > 0 ? size : cpus;
	    for (size_t i = 0; i < size; ++i) {
	        EventPoller::Ptr poller(new EventPoller((ThreadPool::Priority) priority));
	        poller->runLoop(false, register_thread);
	        auto full_name = name + " " + to_string(i);
	        poller->async([i, cpus, full_name]() {
	            setThreadName(full_name.data());
	            setThreadAffinity(i % cpus);
	        });
	        _threads.emplace_back(std::move(poller));
	    }
	    return size;
	}
子线程的级别

线程优先级有5个级别,addPoller时传递的是*别PRIORITY_HIGHESTPriorities[]存储枚举对应的线程优先级数值

   enum Priority {
        PRIORITY_LOWEST = 0,
        PRIORITY_LOW,
        PRIORITY_NORMAL,
        PRIORITY_HIGH,
        PRIORITY_HIGHEST
    };

     static bool setPriority(Priority priority = PRIORITY_NORMAL, std::thread::native_handle_type threadId = 0) {
     // ...._WIN32 
     //... _linux_
     //获取调度程序的最小优先级值
    static int Min = sched_get_priority_min(SCHED_OTHER);
    if (Min == -1) {
        return false;
    }
    //获取调度程序的最大优先级值
    static int Max = sched_get_priority_max(SCHED_OTHER);
    if (Max == -1) {
        return false;
    }
    
    static int Priorities[] = {Min, Min + (Max - Min) / 4, Min + (Max - Min) / 2, Min + (Max - Min) * 3 / 4, Max};

    if (threadId == 0) {
        threadId = pthread_self();
    }
    struct sched_param params;
    params.sched_priority = Priorities[priority];

    // 设置调度参数

    return pthread_setschedparam(threadId, SCHED_OTHER, &params) == 0;
    }

为每一个event_loop设置epoll_ctl

EventPollerPool调用Instance单例模式,初始化线程池,EventPollerPool 初始化了线程池的大小 ,_threads遍历使用。

serverRef不存在就会调用std::make_shared<TcpServer>(poller),存在了就会拷贝一份poller,执行为当前poller的fd设置epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, fd, &ev);,增加两个事件监听EventPoller::Event_Read | EventPoller::Event_Error

这样做,每一个EventPoller的fd都会抢占式listen监听一个accept,accept事件与已经链接的fd的读、写、关闭事件属于同级事件。

EventPollerPool::Instance().for_each([&](const TaskExecutor::Ptr &executor) {
    EventPoller::Ptr poller = dynamic_pointer_cast<EventPoller>(executor);
    if (poller == _poller || !poller) {
        return;
    }
    auto &serverRef = _cloned_server[poller.get()];
    if (!serverRef) {
        serverRef = onCreatServer(poller);
    }
    if (serverRef) {
        serverRef->cloneFrom(*this);
    }
});