启动位置
server/ -> start_main
解析命令行
daemon :守护进程
level:日志级别
config:配置文件路径
ssl:ssl密钥路径
threads: 线程锁数量
max_day :日志最多保存天数
CMD_main cmd_main;
try {
cmd_main.operator()(argc, argv);
} catch (ExitException &) {
return 0;
} catch (std::exception &ex) {
cout << ex.what() << endl;
return -1;
}
bool bDaemon = cmd_main.hasKey("daemon");
LogLevel logLevel = (LogLevel) cmd_main["level"].as<int>();
logLevel = MIN(MAX(logLevel, LTrace), LError);
g_ini_file = cmd_main["config"];
string ssl_file = cmd_main["ssl"];
int threads = cmd_main["threads"];
//.....
fileChannel->setMaxDay(cmd_main["max_day"]);
启动流媒体服务
首先会设置线程池的大小,线程池大小默认是std::thread::hardware_concurrency()
cup最大的数量*2,接着调用TcpServer或者UdpServer的start方法。
EventPollerPool::setPoolSize(threads);
//.....
try {
//rtsp服务器,端口默认554
if (rtspPort) { rtspSrv->start<RtspSession>(rtspPort); }
//rtsps服务器,端口默认322
if (rtspsPort) { rtspSSLSrv->start<RtspSessionWithSSL>(rtspsPort); }
//rtmp服务器,端口默认1935
if (rtmpPort) { rtmpSrv->start<RtmpSession>(rtmpPort); }
//rtmps服务器,端口默认19350
if (rtmpsPort) { rtmpsSrv->start<RtmpSessionWithSSL>(rtmpsPort); }
//http服务器,端口默认80
if (httpPort) { httpSrv->start<HttpSession>(httpPort); }
//https服务器,端口默认443
if (httpsPort) { httpsSrv->start<HttpsSession>(httpsPort); }
//telnet远程调试服务器
if (shellPort) { shellSrv->start<ShellSession>(shellPort); }
#if defined(ENABLE_RTPPROXY)
//创建rtp服务器
if (rtpPort) { rtpServer->start(rtpPort); }
#endif//defined(ENABLE_RTPPROXY)
#if defined(ENABLE_WEBRTC)
//webrtc udp服务器
if (rtcPort) { rtcSrv->start<WebRtcSession>(rtcPort); }
#endif//defined(ENABLE_WEBRTC)
#if defined(ENABLE_SRT)
// srt udp服务器
if(srtPort){
srtSrv->start<SRT::SrtSession>(srtPort);
}
#endif//defined(ENABLE_SRT)
} catch (std::exception &ex) {
WarnL << "端口占用或无权限:" << ex.what() << endl;
ErrorL << "程序启动失败,请修改配置文件中端口号后重试!" << endl;
sleep(1);
#if !defined(_WIN32)
if (pid != getpid() && kill_parent_if_failed) {
//杀掉守护进程
kill(pid, SIGINT);
}
#endif
return -1;
}
tcp 类型流媒体 start
1、通过模板SessionType
实现协议的多态,session
用作会话管理,用来处理每一种协议的拆包、解析、封装等功能,onCreateSocket
是一个函数std::function<Ptr(const EventPoller::Ptr &poller)>
在接收到连接请求前执行,SessionHelper
用于管理全局的所有的session
2、start_l
构建tcp的总入口
/**
* @brief 开始tcp server
* @param port 本机端口,0则随机
* @param host 监听网卡ip
* @param backlog tcp listen backlog
*/
template<typename SessionType>
void start(uint16_t port, const std::string &host = "127.0.0.1", uint32_t backlog = 1024) {
//TcpSession创建器,通过它创建不同类型的服务器
_session_alloc = [](const TcpServer::Ptr &server, const Socket::Ptr &sock) {
auto session = std::make_shared<SessionType>(sock);
session->setOnCreateSocket(server->_on_create_socket);
return std::make_shared<SessionHelper>(server, session);
};
start_l(port, host, backlog);
}
监听端口,接收ipv4、ipv6
_socket->listen包含了很多操作
1、创建socket描述符 socket(family, SOCK_STREAM, IPPROTO_TCP))
2、setReuseable
设置地址、端口复用
3、setNoBlocked
设置为非阻塞模式
4、setCloExec
设置close-on-exec状态
5、bind_sock(fd, local_ip, port, family)
绑定ip、端口号
6、::listen(fd, back_log)
监听文件描述符
if (!_socket->listen(port, host.c_str(), backlog)) {
//创建tcp监听失败,可能是由于端口占用或权限问题
string err = (StrPrinter << "listen on " << host << ":" << port << " failed:" << get_uv_errmsg(true));
throw std::runtime_error(err);
}
会话管理
常用的定时器有最小堆、时间轮、红黑树,ZLMediaKit采用std::map红黑树实现,key保存时间,value保存std::function,这一块的实现存在争议。
//新建一个定时器定时管理这些tcp会话
weak_ptr<TcpServer> weak_self = std::dynamic_pointer_cast<TcpServer>(shared_from_this());
_timer = std::make_shared<Timer>(2.0f, [weak_self]() -> bool {
auto strong_self = weak_self.lock();
if (!strong_self) {
return false;
}
strong_self->onManagerSession();
return true;
}, _poller);
初始化子线程
EventPollerPool
,构造函数调用addPoller
1、线程是名称是std::to_string (event poller + " "+ i)
2、线程设置再多也不会大于std::thread::hardware_concurrency() (cpu*2)
3、线程池的item poller,执行runLoop
,参数一true
表示当前线程,false
表示子线程,参数二register_thread
表示把线程shared_from_this()
注入到一个static thread_local std::weak_ptr<EventPoller> s_current_poller;
当中。runLoop 中就是再执行while(!_exit_flag){}
epoll事件循环
EventPollerPool::EventPollerPool() {
auto size = addPoller("event poller", s_pool_size, ThreadPool::PRIORITY_HIGHEST, true);
InfoL << "创建EventPoller个数:" << size;
}
size_t TaskExecutorGetterImp::addPoller(const string &name, size_t size, int priority, bool register_thread) {
auto cpus = thread::hardware_concurrency();
size = size > 0 ? size : cpus;
for (size_t i = 0; i < size; ++i) {
EventPoller::Ptr poller(new EventPoller((ThreadPool::Priority) priority));
poller->runLoop(false, register_thread);
auto full_name = name + " " + to_string(i);
poller->async([i, cpus, full_name]() {
setThreadName(full_name.data());
setThreadAffinity(i % cpus);
});
_threads.emplace_back(std::move(poller));
}
return size;
}
子线程的级别
线程优先级有5个级别,addPoller
时传递的是*别PRIORITY_HIGHEST
,Priorities[]
存储枚举对应的线程优先级数值
enum Priority {
PRIORITY_LOWEST = 0,
PRIORITY_LOW,
PRIORITY_NORMAL,
PRIORITY_HIGH,
PRIORITY_HIGHEST
};
static bool setPriority(Priority priority = PRIORITY_NORMAL, std::thread::native_handle_type threadId = 0) {
// ...._WIN32
//... _linux_
//获取调度程序的最小优先级值
static int Min = sched_get_priority_min(SCHED_OTHER);
if (Min == -1) {
return false;
}
//获取调度程序的最大优先级值
static int Max = sched_get_priority_max(SCHED_OTHER);
if (Max == -1) {
return false;
}
static int Priorities[] = {Min, Min + (Max - Min) / 4, Min + (Max - Min) / 2, Min + (Max - Min) * 3 / 4, Max};
if (threadId == 0) {
threadId = pthread_self();
}
struct sched_param params;
params.sched_priority = Priorities[priority];
// 设置调度参数
return pthread_setschedparam(threadId, SCHED_OTHER, ¶ms) == 0;
}
为每一个event_loop设置epoll_ctl
EventPollerPool
调用Instance
单例模式,初始化线程池,EventPollerPool 初始化了线程池的大小 ,_threads遍历使用。
serverRef不存在就会调用std::make_shared<TcpServer>(poller)
,存在了就会拷贝一份poller
,执行为当前poller的fd设置epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, fd, &ev);
,增加两个事件监听EventPoller::Event_Read | EventPoller::Event_Error
。
这样做,每一个EventPoller的fd都会抢占式listen监听一个accept,accept
事件与已经链接的fd的读、写、关闭事件属于同级事件。
EventPollerPool::Instance().for_each([&](const TaskExecutor::Ptr &executor) {
EventPoller::Ptr poller = dynamic_pointer_cast<EventPoller>(executor);
if (poller == _poller || !poller) {
return;
}
auto &serverRef = _cloned_server[poller.get()];
if (!serverRef) {
serverRef = onCreatServer(poller);
}
if (serverRef) {
serverRef->cloneFrom(*this);
}
});