skynet框架 源码分析 二

时间:2022-10-09 16:46:56

        一个游戏服务器系统的数据流向有很多种。在skynet中,我主要看到了三种,一种是从客户端流到服务器,而后服务器处理完毕之后,发送回客户端。第二种是一个harbor流向另外一个harbor,这应该就是服务进程之间通过套接字的通信了。第三种就是服务进程保存数据到数据库中,而后返回(这部分我还没看完)。

        本章主要讲解第一种数据流向,客户端到服务端中某个节点。服务端如何返回数据到客户端。
        要把这个讲明白,我觉得必须要先得从skynet_start.c文件开始看起。
作者在函数_start中,启动了多个工作线程(_worker),一个moniter线程(_moniter),一个定时器线程(_timer),一个网络线程(_socket)。我们来看看这些线程都在做什么?

        worker线程:
        从全局消息队列中取一条消息队列,而后找到该消息队列的处理器(handle),通过处理器找到对应节点的上下文,而后,更新处理器的状态值,确定其不是无穷执行的状态。接下来执行消息请求。即接口(_dispatch_message),在该接口中会判定是否是广播消息,若不是,则执行节点的回调接口_cb(callback)。处理完毕之后,查看是否需要返回数据,若需要,则将处理结果压到目标节点的消息队列中。

        moniter线程:
        我觉得则个线程,只是判定节点是否出错了,如果出错,进行修复一下。

        timer线程:
        定时器线程我就不解释了,每个服务器系统都应该有该功能。

        socket线程:
        该线程主要是从epoll_wait的结果中读取消息,若有需要处理的消息则进行相关的处理。这里面的消息目前要说明的有三个:第一个是管道,作者把管道的读端放到了epoll中进行管理,也就是说,每次向管道中写数据,都是socket线程读取并处理的;第二个是gate产生的监听端口,也是由epoll管理,并且一旦产生了数据也是由socket处理;第三个是accept客户端的socket后,客户端发送到服务端的数据,此数据也由socket线程处理。


        看完这些线程,我们开始跟踪一个用户从登入到发送消息真个流程。
        服务器启动,大概监听端口详细的步骤可在watchdog.lua中看到。通过模块service_gate.c打开监听,监听本地端口8888。假设此时,有用户尝试连接服务器。服务器端收到一个连接请求。在epoll_wait中产生了一个结果。socket线程发现有消息需要处理时,开始处理消息。

开始读代码:

[c] view plain copy print?skynet框架 源码分析 二skynet框架 源码分析 二
  1. int  
  2. socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) {  
  3.     for (;;) {  
  4.         if (ss->event_index == ss->event_n) {  
  5.             ss->event_n = sp_wait(ss->event_fd, ss->ev, MAX_EVENT);  
  6.             if (more) {  
  7.                 *more = 0;  
  8.             }  
  9.             ss->event_index = 0;  
  10.             if (ss->event_n <= 0) {  
  11.                 ss->event_n = 0;  
  12.                 return -1;  
  13.             }  
  14.         }  
  15.         struct event *e = &ss->ev[ss->event_index++];  
  16.         struct socket *s = e->s;  
  17.         if (s == NULL) {  
  18.             int type = ctrl_cmd(ss, result);  
  19.             if (type != -1)  
  20.                 return type;  
  21.             else  
  22.                 continue;  
  23.         }  
  24.         switch (s->type) {  
  25.         case SOCKET_TYPE_CONNECTING:  
  26.             return report_connect(ss, s, result);  
  27.         case SOCKET_TYPE_LISTEN:  
  28.             if (report_accept(ss, s, result)) {  
  29.                 return SOCKET_ACCEPT;  
  30.             }   
  31.             break;  
  32.         case SOCKET_TYPE_INVALID:  
  33.             fprintf(stderr, "socket-server: invalid socket\n");  
  34.             break;  
  35.         default:  

        socket线程:
        若,此时只有一个用户,那么event_n的值为1,直接执行到case SOCKET_TYPE_LISTEN: 因为watchdog中的gate在监听,所以走到这一句停住。执行report_accept逻辑。在report_accept中,系统保留了accept文件描述符,用来读写操作,但,现在还没有把它放到epoll中,而是做了一次记录。标志了一下哪个槽被引用了。并且,该消息由哪个服务处理(handle),接着依次返回,先由socket_server_poll接口返回一个类型值,socket线程的skynet_soket_poll捕捉到了返回类型,并在skynet_socket.c文件中的接口forward_message中标志该消息是来源于PTYPE_SOCKET的,而后把消息压到watchdog的消息队列中,socket线程处理完毕。

此时,socket的身份其实就是生产者,而worker线程就是消费者。

        worker线程:
        接下来,worker线程上场,发现有消息可以处理,其中一个线程被唤醒,开始处理消息,发现是gate节点的服务请求,调用gate的_cb(callback,每个节点都有一个消息处理函数,_cb就是gate的消息处理函数,也被称为回调)接口来处理数据。接下来我们看看service_gate.c是如何处理该消息的,gate模块发现消息是类型是PTYPE_SOCKET,于是乎直接走了接口dispatch_socket_message,在该接口中,继续执行逻辑,发现message->type为SKYNET_SOCKET_TYPE_ACCEPT,如果没有逻辑错误,而后,向现有的管道中写入一个start_socket的消息请求worker线程执行完毕。

        socket线程:
        epoll在监听到该请求后,由于连接是新建立的event中的socket(用户数据)是NULL,执行ctrl_cmd接口,在里面将新建立的socket的accept端放到epoll中,并返回一个消息类型位SOCKET_OPEN,而后返回给skynet_socket_poll接口,该接口捕捉到返回类型后,向gate节点发送了一个socket消息类型为SKYNET_SOCKET_TYPE_CONNECT的消息,socket线程处理完毕。

        workder线程:
        在gate中执行_cb接口回调,发现有消息类型为PTYPE_SOCKET的消息,同时其数据中存放的是skynet_socket_message并且其类型为SKYNET_SOCKET_TYPE_CONNECT,在gate模块中的dispatch_socket_message接口中,捕捉到该类型的消息时,用_report接口来处理消息。该接口则是向watchdog节点发送了一条文本消息,该文本消息则执行watchdog中的command:open接口,这时候,为用户载入agent节点,和client节点。同时,向gate模块发送一条文本消息告诉gate将两者连接起来。worker处理完毕。

        worker线程:
        由于此时已经不是向管道中写消息,所以,继续worker线程处理逻辑。worker线程抓取到gate的消息组,执行相关的逻辑,发现一条文本消息,中间含有forward命令,于是拆开参数,将agent和client连接起来,并且保存在该gate节点中。

        经过以上步骤,客户端与服务端建立连接完毕。等待两者互相发送消息。

        假设服务端需要向客户端发送消息:

               agent向client节点发送一个服务请求,比如例子中的
               skynet.send(client,"text","Welcome to skynet")

               agent向服务节点client一条文本消息,"Welcome to skynet",这是在脚本中执行的。首先调用lua-skynet模块的_command接口向节点client压一条服务消息,在worker线程处理到client节点时,取出消息,并调用client的_cb(callback)接口处理该消息,而后client节点向管道写入消息。在下次socket线程从管道中取出数据时,发消息类型是发送消息时会调用send_socket接口,向套接字中写入消息。消息发送完毕。

        假设客户端向服务端发送消息:

               客户端向socket中发送消息。socket线程首先捕捉到消息。取出消息,而后将该消息压到gate节点的消息组中。socket线程处理完毕。worker线程执行消息回调的时候,发现消息类型是PTYPE_SOCKET,其来源于socket。而后,检查该消息的数据部分,发现数据类型是SKYNET_SOCKET_TYPE_DATA,于是调用dispatch_message接口继续执行,当数据没有超出16M时,将数据传入_forward接口,在_forward接口中,用skynet_send接口发送消息到agent,agent在收到消息之后,发现消息类型是"client"。调用agent.lua中的dispatch接口处理。

[plain] view plain copy print?
  1. skynet.register_protocol {  
  2.     name = "client",  
  3.     id = 3,  
  4.     pack = function(...) return ... end,  
  5.     unpack = skynet.tostring,  
  6.     dispatch = function (session, address, text)  
  7.         -- It's client, there is no session  
  8.         skynet.send("LOG", "text", "client message :" .. text)  
  9.         local result = skynet.call("SIMPLEDB", "text", text)  
  10.         skynet.ret(result)  
  11.     end  
  12. }  
skynet框架 源码分析 二

         agent节点中,dispatch接口首先将文本消息发送给log节点,而后调用simpledb执行文本指令,最后返回执行结果给客户端。消息返回完毕。