引言:
在我看来,消息和任务调度应该是skynet的核心,整个skynet框架的核心其实就是一个消息管理系统。在skynet中可以把每个功能都当做一个服务,整个skynet工程在执行过程中会创建很多个服务,每个服务相当于一个 Actor
,是互不依赖并行执行的,但同时也存在服务之间的通信和彼此的任务调用,接下来我们就来看一下skynet中服务之间进行通信的机制。
内容概述:
接下来的内容主要围绕一下几点展开:
- 1.消息的分类
- 2.消息队列的结构(全局消息队列和服务消息队列)
- 3.消息队列的操作(创建、添加和删除)
- 4.消息的分发和调度(也就是工作线程的工作)
- 5.消息从一个服务到另一个服务的过程分析(消息发送和消息接收)
- 6.实例演示
服务与消息的关联:
当初始化一个服务的时候,会生成:
- 一个
skynet_context
来作为服务的实例; - 一个唯一(即使是在集群里也是唯一)的服务
handle
,即服务的唯一id,用来识别服务; - 一个消息队列
message_queue
; - 向框架注册一个
callback
,当服务收到有发送来的消息时,通过这个方法传入。
在 skynet_handle_register
方法中生成一个服务 handle
,handle
是一个32位的整数,在生成 handle
的时候,是把该节点的 harbor id
写到了 handle
的高8位里面。所以,拿到一个服务的 handle
,就可以知道这个服务是哪个节点的。
s->handle_index = handle + 1;
rwlock_wunlock(&s->lock);
handle |= s->harbor;
return handle;
由于只有 8个字节 用于标记节点,也就是说,harbor id
最高也只有256(2的8次方)个,也就意味着 skynet集群最多只能有256个节点,而一个节点里最多也只能有24位个服务,即1.6M个。因为一个 handle
是32位的整数,高8位用来存储 harbor id
,只有低的24位用来分配给本节点的 handle
。
消息队列 message_queue
是用来存储发送给该服务的消息的。所有发送给该服务的消息,都要先压到该服务的消息队列中。
消息分类:
首先,在skynet中的消息有两种:进程内消息
和 跨进程消息
1.进程内消息传递:
也就是在skynet服务器中各个服务(Actor
)之间传递的消息类型,可以使用的格式有:
- 文本协议(C服务)
- 自定义序列化库(Lua服务)
- 内存数据结构(自定义)
2.跨进程消息传递:
由于一个skynet服务器就是一个单进程多线程的异步消息传递框架,当有多个skynet服务器一起构成分布式结构的时候,每个skynet服务器就是一个节点,每个节点就只有一个进程。所以这里所说的跨进程通信其实就是网络通信了,发生在的情况有:
-
skynet节点
<–>skynet节点
-
skynet节点
<–>客户端
所以,消息的格式也就对应于网络通信的协议格式,常用的有:
- 自定义协议
sproto
-
google proto buffers
(skynet中使用pbc
) -
json
等
这里我们主要分析进程内消息传递,因为进程间的通信设计到网络协议定制和网关的相关知识,在后续的篇章中会做详细的实现步骤介绍。
消息队列:
首先需要明确一点:在skynet中的所有消息通信都是异步的 ,这一点是由skynet的消息调度机制决定的。大致过程如下图所示:
-
ctx->queue
:每个服务的私有消息队列 -
global_queue
:全局消息队列 -
skynet_globalmq_push
:二级消息队列压入全局消息队列的接口 -
skynet_context_message_dispatch
:从消息队列中取出消息来执行
Skynet维护了两个消息队列,严格来说是嵌套的两级消息队列,首先每个服务会有自己的一个 私有的消息队列
,然后,服务被创建并已生成私有消息队列后,其私有消息队列又会被注册到 全局的消息队列
中。
严格来讲,skynet的全局消息队列中放的是 私有消息队列不为空 的服务(
Actor
)
1.服务的私有消息队列:
每个服务实体有一个私有的消息队列,队列中是一个个发送给它的消息。消息由四部分构成:
struct skynet_message {
//消息所属服务的地址(服务实例的 handle)
uint32_t source;
//用来做上下文的标识
int session;
//消息地址指针
void * data;
//消息大小(消息的请求类型定义在高8位)
size_t sz;
};
向一个服务发送一个消息,就是把这样一个消息体压入这个服务的私有消息队列中。这个结构的值复制进消息队列的,但消息内容本身不做复制。
2.全局消息队列:
Skynet 维护了一个 全局消息队列
,里面放的是若干个私有消息队列不为空的服务,这些服务的私有消息队列也就称为:队列次级消息队列
。二级队列的数据结构:
struct message_queue {
//锁
struct spinlock lock;
//消息队列所属服务的句柄(用于消息处理)
uint32_t handle;
//队列容量
int cap;
//取出标志
int head;
//存入标志
int tail;
//队列是否已被释放表示(0为未释放,1为已释放)
int release;
//是否存入全局消息队列标志
int in_global;
int overload;
int overload_threshold;
//skynet_message消息队列(其实是一个数组通过queue[序号]从队列中获取指定的消息)
struct skynet_message *queue;
//与其他消息队列的关联(非空表示在全局消息队列中)
struct message_queue *next;
};
不难看出来,全局消息队列看起来像是一个 单链表
,每个节点都带着一个指针(next)指向下一个节点,但是,全局消息队列其实是一个用固定大小的数组模拟的循环队列,此循环队列向尾部添加,从头部取出删除,分别用 head
和 tail
记录其首尾下标。
队列操作:
1.消息队列创建过程:
-
全局消息队列:
首先,是创建全局消息队列
的的源码,在skynet框架启动入口skynet-src/skynet_start.c
中的skynet_start
中调用了消息队列的初始化函数://初始化消息队列模块 skynet_mq_init();
这个初始化函数实现源码在
skynet-src/skynet_mq.c
中,在skynet_mq.c
中首先是声明了一个静态的结构指针global_queue *Q
用于缓存通过skynet_mq_init
创建出来的全局消息队列://全局消息队列结构 struct global_queue { struct message_queue *head; struct message_queue *tail; //锁(自旋锁或互斥锁) struct spinlock lock; }; //全局消息队列的静态结构指针 static struct global_queue *Q = NULL;
head
和tail
两个指针分别用来控制Q
的 取出 和 存入 消息的过程spinlock
是封装了了自旋锁
(利用源自操作_sync_lock_test_and_set
来实现)和互斥锁
(利用Linux系统自带的线程库里的pthread_mutex_xxxx
来实现)两种锁的实现方式,源码在skynet-src/spinlock.h
中,通过判断是否定义了USE_PTHREAD_LOCK
这个宏来控制使用哪种锁。
自旋锁:得到锁之前是在一个循环中空转,直到得到锁为止,那么就有三种可能:
- 1:很短时间就得到锁,由于是空转,没有sleep,也就没有由系统到用户态的消耗;
- 2:很长时间才得到锁,虽然没有状态的切换,但是由于忙等时间过长导致性能下降;
- 3:一直空转,消耗cpu时间。
互斥锁 : 企图获得锁,若是得不到锁则阻塞,放弃cpu,没有忙等的出现,当锁可得时,发生状态切换,由内核切换到用户态,虽然没有忙等但是状态切换的代价仍然很大。
选择条件:对自旋锁和互斥锁的选择是要根据得到锁的耗时来的,若果当得到锁后,需要执行大量的操作,一般选用互斥锁,若得到锁后,进行很少量的操作,一般选择自旋锁,因为执行的操作短,那么忙等的开销总体还是小于内核态和用户态切换带来的开销的。
真正创建全局消息队列的实现在下面函数中:
//初始化全局消息队列 void skynet_mq_init() { //创建全局消息队列 struct global_queue *q = skynet_malloc(sizeof(*q)); //将*q指针所占存储全部初始化为0 memset(q,0,sizeof(*q)); //初始化全局消息队列中的锁 SPIN_INIT(q); //将创建结果保存在静态指针中 Q=q; }
void * memset( void * ptr, int value, size_t num );
函数功能是:将ptr
所指的内存区域的前num
个字节的值都设置为value
,然后返回指向ptr
的指针。在
skynet_mq_init
中创建了一个全局的消息队列并存放在指针Q
中,之后使用和操作全局消息队列其实就是直接操作Q
指针。 -
服务私有消息队列:
之前我们分析过使用skynet_context_new
创建服务的过程,其中在初始化创建出来的服务实例之前,会使用skynet_mq_create
为该服务创建一个消息队列(需要传入当前服务的消息处理句柄handle
),也就是此服务的私有消息队列://创建这个实例的消息队列 struct message_queue * queue = ctx->queue = skynet_mq_create(ctx->handle);
创建服务私有消息队列的方法
skynet_mq_create
源码在skynet-src/skynet_mq.c
中:struct message_queue * skynet_mq_create(uint32_t handle) { //创建二级消息队列 struct message_queue *q = skynet_malloc(sizeof(*q)); //设置必要的参数 q->handle = handle; //设置默认队列的大小 q->cap = DEFAULT_QUEUE_SIZE; //用于存入消息 q->head = 0; //用于取出消息 q->tail = 0; //初始化队列的锁 SPIN_INIT(q) // 创建消息队列(通常是在服务创建之后和服务初始化之前进行) , // 设置in_global可以防止消息队列被加到全局消息队列中 // 服务初始化完成后, skynet_context_new 会调用 skynet_mq_push 将当前私有消息队列压入到全局消息队列中 q->in_global = MQ_IN_GLOBAL; q->release = 0; q->overload = 0; q->overload_threshold = MQ_OVERLOAD; //动态分配消息队列的占用空间 q->queue = skynet_malloc(sizeof(struct skynet_message) * q->cap); q->next = NULL; return q; }
服务通过
skynet_module_instance_init
初始化完成后,skynet_context_new
会调用skynet_globalmq_push
将当前私有消息队列压入到全局消息队列中://将实例的消息队列加到全局的消息队列中,这样才能收到消息回调 skynet_globalmq_push(queue);
全局消息队列中的存放数据结构是
message_queue
,私有消息队列中的存放数据结构是skynet_message
。
2.全局消息队列操作:
其实最为关键的两个操作就是将 二级消息队列
(服务的私有消息队列) 从全局消息队列中 压入
和 取出
的操作,其实就是 skynet-src/skynet_mq.c
中的两个方法: skynet_globalmq_push
和 skynet_globalmq_pop
:
-
压入:
void skynet_globalmq_push(struct message_queue * queue) { struct global_queue *q= Q; //加锁 SPIN_LOCK(q) //断言方法:判断当前压入的二级队列是否与其他队列有关联性(即是否已在全局队列中) assert(queue->next == NULL); //不在全局队列中则继续执行 if(q->tail) { //建立关联关系 q->tail->next = queue; //最后一个压入二级队列指针指向当前压入的二级队列 q->tail = queue; } else { //全局队列还为空时,则此时压入的二级队列即使取出队列指针指向地址,也是最后一个压入的队列指针指向地址 q->head = q->tail = queue; } //解锁 SPIN_UNLOCK(q) }
-
取出:
struct message_queue * skynet_globalmq_pop() { struct global_queue *q = Q; //加锁 SPIN_LOCK(q) //获取取出指针对应的二级队列 struct message_queue *mq = q->head; //假如获取的二级队列不为空 if(mq) { //让取出指针指向全局队列中的下一个二级队列(以备后续继续取出,体现了轮询和公平的机制,每个二级队列轮流被取出) q->head = mq->next; //假如已经没有下一个二级队列 if(q->head == NULL) { //断言方法:判断当前取出的消息队列是否是最后压入的消息队列(true则继续执行,false则停止继续执行) assert(mq == q->tail); //假如是,则全局消息队列的压入指针置空(则表示可以继续向该队列位置压入二级队列) q->tail = NULL; } //去掉被取出的二级队列与全局队列的关联性 mq->next = NULL; } //解锁 SPIN_UNLOCK(q) //返回取出的二级消息队列 return mq; }
正是由于当前skynet服务器进程中可以存在多个工作线程,所以可能同时有多个线程同时在执行压入和取出操作,所以一般:操作前都要先加锁,操作后解锁。
q->tail
是最后被压入全局消息队列的二级消息队列的地址(指针)-
q->head
是当前可被从全局消息队列中取出的二级消息队列的地址(指针)
assert(判断逻辑语句或整数运算语句)
:
断言方法,假如括号内为0
,或者判断为false
,则程序不再往下执行,如果为true
或 大于0
则程序继续往下执行。
3.服务私有消息队列操作:
看完上述全局消息队列的操作后,我们可以轻松地通过 skynet_globalmq_push
将一个服务的消息队列加入到全局消息队列中,也可以通过 skynet_globalmq_pop
从全局消息队列中获取到某一个服务的的消息队列。但是,想要具体拿到消息队列中的一个具体的 skynet_message
消息,还需要在了解一下对于私有消息队列的一些操作接口,比如:向消息队列中添加一条消息,或从消息队列中取出一条消息:
-
添加消息:
//向二级队列中添加消息 void skynet_mq_push(struct message_queue *q, struct skynet_message *message) { //判断是否为有效消息地址 assert(message); //锁住 SPIN_LOCK(q) //将消息存入消息存入指针所指位置 q->queue[q->tail] = *message; //判断存入指针是否溢出 if (++ q->tail >= q->cap) { q->tail = 0; } //判断队列是否为空 if (q->head == q->tail) { //重新初始化此消息队列 expand_queue(q); } //判断当前二级消息队列是否还不在全局消息队列中 if (q->in_global == 0) { //将未加入全局消息队列中的二级消息队列压入全局消息队列中 q->in_global = MQ_IN_GLOBAL; skynet_globalmq_push(q); } //解锁 SPIN_UNLOCK(q) }
-
取出消息:
//从二级消息队列取出消息 int skynet_mq_pop(struct message_queue *q, struct skynet_message *message) { int ret = 1; //加锁 SPIN_LOCK(q) //判断队列是否为空 if (q->head != q->tail) { //取出队列的取出指针指向的消息,然后队列的取出指针自加1 *message = q->queue[q->head++]; ret = 0; int head = q->head; int tail = q->tail; int cap = q->cap; //头指针溢出判断 if (head >= cap) { q->head = head = 0; } //剩余消息数量统计 int length = tail - head; if (length < 0) { length += cap; } while (length > q->overload_threshold) { q->overload = length; q->overload_threshold *= 2; } } else { // 当消息队列为空时重置overload_threshold q->overload_threshold = MQ_OVERLOAD; } //将空的二级消息队列标记为不在全局消息队列中 if (ret) { q->in_global = 0; } //关闭锁 SPIN_UNLOCK(q) return ret; }
消息调度(分发):
1.调度思路:
在 Skynet 启动时,建立了若干 工作线程(数量可配置),它们不断的从主消息列队中取出一个次级消息队列来,再从次级队列中取去一条消息,调用对应的服务的 callback
函数进行处理。为了调用公平,一次仅处理一条消息,而不是耗净所有消息(虽然那样的局部效率更高,因为减少了查询服务实体的次数,以及主消息队列进出的次数),这样可以保证没有服务会被饿死(饿死
指的是得不到工作线程执行的机会)。这样,skynet就实现了把一个消息(数据包)从一个服务发送给另一个服务。
2.代码实现:
-
创建工作线程:
在skynet-src/skynet_start.c
中创建若干个工作线程:static int weight[] = { -1, -1, -1, -1, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, }; struct worker_parm wp[thread]; //循环创建工作线程 for (i=0;i<thread;i++) { wp[i].m = m; wp[i].id = i; if (i < sizeof(weight)/sizeof(weight[0])) { wp[i].weight= weight[i]; } else { wp[i].weight = 0; } //创建工作线程 create_thread(&pid[i+3], thread_worker, &wp[i]); }
其中
thread
是一个整数,是config
配置文件中thread
配置项所指定数量。 -
工作线程任务:
工作线程执行的任务就是thread_worker
的内容:static void *thread_worker(void *p) { //初始化 struct worker_parm *wp = p; int id = wp->id; int weight = wp->weight; struct monitor *m = wp->m; struct skynet_monitor *sm = m->m[id]; skynet_initthread(THREAD_WORKER); struct message_queue * q = NULL; //循环调用 skynet_context_message_dispatch while (!m->quit) { q = skynet_context_message_dispatch(sm, q, weight); //消息队列无消息可执行则挂起当前工作线程 if (q == NULL) { if (pthread_mutex_lock(&m->mutex) == 0) { ++ m->sleep; // "spurious wakeup" is harmless, // because skynet_context_message_dispatch() can be call at any time. if (!m->quit) pthread_cond_wait(&m->cond, &m->mutex); -- m->sleep; if (pthread_mutex_unlock(&m->mutex)) { fprintf(stderr, "unlock mutex error"); exit(1); } } } } return NULL; }
在工作线程中,通过一个
while
循环体,不断地通过skynet_context_message_dispatch
接口从全局消息队列中取出消息来执行,直到全局消息队列中全部消息都执行完,则当前工作线程会进入wait等待。 -
skynet_context_message_dispatch
:
这个是处理服务消息的方法:struct message_queue * skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) { //判断传入的二级队列是否为空 if (q == NULL) { //假如为空则从全局消息队列中获取 q = skynet_globalmq_pop(); //假如已经拿不到二级队列则返回 if (q==NULL) return NULL; } //获取当前二级队列所属服务的handle句柄 uint32_t handle = skynet_mq_handle(q); //通过handle获取服务实例 struct skynet_context * ctx = skynet_handle_grab(handle); //判断服务实例是否为空 if (ctx == NULL) { //将句柄封装在结构体中 struct drop_t d = { handle }; //释放消息数据并向消息源发送error消息 skynet_mq_release(q, drop_message, &d); return skynet_globalmq_pop(); } int i,n=1; struct skynet_message msg; for (i=0;i<n;i++) { //从 message_queue 中 pop 一个 msg 消息出来 if (skynet_mq_pop(q,&msg)) { //假如 message_queue 为空,则返回1表示失败,释放ctx的引用次数 skynet_context_release(ctx); //返回global_queue里的下一个message_queue,以供skynet_context_message_dispatch下次调用 return skynet_globalmq_pop(); } else if (i==0 && weight >= 0) { //获取消息队列的消息数量 n = skynet_mq_length(q); n >>= weight; } //查询队列过载 int overload = skynet_mq_overload(q); //过载判断 if (overload) { skynet_error(ctx, "May overload, message queue length = %d", overload); } skynet_monitor_trigger(sm, msg.source , handle); //判断 ctx 服务的 cb 属性是否为空 if (ctx->cb == NULL) { skynet_free(msg.data); } else { //处理消息对应服务的回调函数ctx->cb dispatch_message(ctx, &msg); } skynet_monitor_trigger(sm, 0,0); } assert(q == ctx->queue); struct message_queue *nq = skynet_globalmq_pop(); if (nq) { // 假如全局消息队列不为空,将当前处理的消息队列q放回全局消息队列,并返回下一个消息队列nq // 假如全局消息队列是空的或者阻塞,不将当前处理队列q放回全局消息队列,并再次将 q 返回(提供给下一次 dispatch操作) skynet_globalmq_push(q); q = nq; } skynet_context_release(ctx); return q; }
如上述源码解析,每次调用
skynet_context_message_dispatch
便会处理一个消息:- 首先,通过
skynet_globalmq_pop
从全局消息队列中得到一个服务的私有消息队列q
; - 然后,通过
skynet_mq_pop
从q
中取到一条具体的消息来执行; - 最后,通过
dispatch_message
执行了消息后,判断当前q
队列是否已经没有其他消息:假如有则将q
通过skynet_globalmq_push
放回全局消息队列中;假如没有,则不再将q
放回。
关于通过
dispatch_message
如何调用消息所属服务对于此消息的处理函数,具体调用过程如下:static void dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) { //判断服务是否已经初始化过(有则继续执行,否则结束) assert(ctx->init); //开启服务锁住状态 CHECKCALLING_BEGIN(ctx) pthread_setspecific(G_NODE.handle_key, (void *)(uintptr_t)(ctx->handle)); //从信息的sz字段的高8位获取消息类型信息 int type = msg->sz >> MESSAGE_TYPE_SHIFT; //获得消息的有效数据大小 size_t sz = msg->sz & MESSAGE_TYPE_MASK; //输出日志到消息所属服务的日志文件中 if (ctx->logfile) { //写入到日志文件 skynet_log_output(ctx->logfile, msg->source, type, msg->session, msg->data, sz); } //服务的消息统计 ++ctx->message_count; //服务回调函数执行结果保存地址 int reserve_msg; //判断是否统计单条消息的处理时间标志profile if (ctx->profile) { //CPU开始处理此消息的时间 ctx->cpu_start = skynet_thread_time(); //开始处理回调函数cb,并将执行结果保存到reserve reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz); //CPU耗时统计 uint64_t cost_time = skynet_thread_time() - ctx->cpu_start; //CPU耗时统计数值保存 ctx->cpu_cost += cost_time; } else { //不统计耗时则直接执行回调函数 reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz); } //回调函数执行结果部位空则表示执行成功 if (!reserve_msg) { //释放消息数据 skynet_free(msg->data); } //解除服务锁住状态 CHECKCALLING_END(ctx) }
其中,
pthread_setspecific
通常与pthread_getpecific
一起使用,是实现同一个线程中不同函数间共享数据的一种很好的方式。 - 首先,通过
dispatch_message
将 msg
里面的数据和消息内容取出,然后调用 ctx->cb
来处理消息内容。
服务间的消息传递:
服务启动起来了,数据包是如何从一个服务发送给另一个服务的。要清除这一实现过程,我们只需要解析一下 skynet-src/skynet_server.c
中的 skynet_send
和 skynet_callback
这两个函数的定义就够了:
1.发送消息 skynet.send:
在skynet中向指定服务发消息通过在lua中调用 skynet.send
来实现:
--非阻塞发送消息
function skynet.send(addr, typename, ...)
--获取用来发送消息的协议(消息类型)
local p = proto[typename]
--用p.pack打包数据,然后调用C接口
return c.send(addr, p.id, 0 , p.pack(...))
end
这里 c
就是通过 local c = require "skynet.core"
引入的C模块,对应的定义在 lualib-src/lua-skynet.c
中的 luaopen_skynet_core
中通过 { "send" , lsend }
绑定的 lsend
方法:
/*
参数一:uint32 address/string address 服务地址/服务名称 即支持两种形式参数调用
参数二:integer type 发送的消息类型id
参数三:integer session 会话id
参数四:string message/lightuserdata message_ptr
参数五:integer len
*/
static int lsend(lua_State *L) {
// 通过lua中的一个userdata来获取C对象的指针
// lua_upvalueindex获取到当前运行的函数的第i个上值的伪索引,从伪索引获取到上下文
struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
// 尝试从lua传入第一个参数中获取int类型数据(目标服务地址)
uint32_t dest = (uint32_t)lua_tointeger(L, 1);
// 目标服务名称
const char * dest_string = NULL;
if (dest == 0) {
//检测第一个参数是否为数字类型,如果是且为0则为无效参数
if (lua_type(L,1) == LUA_TNUMBER) {
return luaL_error(L, "Invalid service address 0");
}
// 尝试从lua传入第一个参数中获取string类型数据(目标服务名称)
dest_string = get_dest_string(L, 1);
}
// 从第二个参数中获取int类型数据表示的消息类型
int type = luaL_checkinteger(L, 2);
// 会话标识
int session = 0;
// 检测第三个参数是否为空,假如是空,则是通过skynet.call调用到此处
if (lua_isnil(L,3)) {
//skynet.call会在内部生成一个唯一session,所以这里需要设置分配会话标志
type |= PTYPE_TAG_ALLOCSESSION;
} else {
//从第三个参数获取会话(skynet.send调用时会话id为0)
session = luaL_checkinteger(L,3);
}
//获取第四个参数的类型
int mtype = lua_type(L,4);
switch (mtype) {
case LUA_TSTRING: { //字符串类型
size_t len = 0; //保存消息长度的变量
void * msg = (void *)lua_tolstring(L,4,&len); //获取消息字符串
if (len == 0) { //消息长度为0,则表示空消息
msg = NULL;
}
// 通过两种方式调用发送消息的c实现(根据传入的第一个参数的类型),这里会执行消息拷贝
// 但实际上最终都是通过调用skynet_send实现最终功能,skynet_sendname只是一个二次封装函数
if (dest_string) {
session = skynet_sendname(context, 0, dest_string, type, session , msg, len);
} else {
session = skynet_send(context, 0, dest, type, session , msg, len);
}
break;
}
case LUA_TLIGHTUSERDATA: { //轻用户类型
void * msg = lua_touserdata(L,4); //获取消息指针
int size = luaL_checkinteger(L,5); //从第五个参数获取消息长度
// 与上面的方式一致,只是这里不进行消息的拷贝
if (dest_string) {
session = skynet_sendname(context, 0, dest_string, type | PTYPE_TAG_DONTCOPY, session, msg, size);
} else {
session = skynet_send(context, 0, dest, type | PTYPE_TAG_DONTCOPY, session, msg, size);
}
break;
}
default: //其他类型
luaL_error(L, "skynet.send invalid param %s", lua_typename(L, lua_type(L,4)));
}
if (session < 0) {
// 发送到无效的地址
// 可能抛出一个错误更好
return 0;
}
//返回会话
lua_pushinteger(L,session);
return 1;
}
从上面可以看出,调用skynet.send的方式可以有两种传参方式:
- 通过传入目标服务地址调用方式
skynet_send
- 通过传入目标服务名称调用方式
skynet_sendname
这里发送消息的类型其实包括 进程内消息
和 跨进程的消息
发送的通用接口,这里我们只分析进程内的消息发送接口,其对应的源码其实就是 skynet-src/skynet_server.c
中的 skynet_send
或者 skynet_sendname
:
int skynet_send(struct skynet_context * context, uint32_t source, uint32_t destination , int type, int session, void * data, size_t sz) {
//检测消息格式是否正确(是否数据过大)
if ((sz & MESSAGE_TYPE_MASK) != sz) {
skynet_error(context, "The message to %x is too large", destination);
if (type & PTYPE_TAG_DONTCOPY) {
skynet_free(data);
}
return -1;
}
_filter_args(context, type, &session, (void **)&data, &sz);
//判断消息源服务的handle id是否为0
if (source == 0) {
//设置源地址为上下文的句柄,则源就是自己
source = context->handle;
}
//判断目标服务的handle id是否为0,即没有目标服务
if (destination == 0) {
//没有目的地址则不再继续往下执行,返回会话
return session;
}
//判断是否是跨节点的消息,即远程消息(skynet<-->skynet节点间通信)也就是上面提及的跨进程消息传递
if (skynet_harbor_message_isremote(destination)) {
//构建一个进程间通信的消息,使用harbor传递消息
struct remote_message * rmsg = skynet_malloc(sizeof(*rmsg));
rmsg->destination.handle = destination;
rmsg->message = data;
rmsg->sz = sz;
//通过harbor将消息发送出去
skynet_harbor_send(rmsg, source, session);
} else {
//构建一个进程内通信的消息
struct skynet_message smsg;
smsg.source = source; //设置源地址
smsg.session = session; //设置会话
smsg.data = data; //设置消息数据
smsg.sz = sz; //设置消息长度
//将构建的消息压入目标服务的消息队列
if (skynet_context_push(destination, &smsg)) {
//压入失败则释放消息数据占用的内存,并返回-1错误码
skynet_free(data);
return -1;
}
}
return session; //返回会话
}
下面是一些参数的功能解析:
-
source
和destination
分别是发送方和接收方的handle
; -
type
是发送方和接收方处理数据包的协议; -
session
识别本次调用的口令,发送方发送一个消息后,保留该session,以便收到回应数据包时,能识别出是哪一次调用; -
data
/sz
是数据包的内容和长度,成对使用。
上面通过传入数字地址的方式发送消息的实现,下面是通过传入服务字符串名称的方式发送消息的实现源码:
int skynet_sendname(struct skynet_context * context, uint32_t source, const char * addr , int type, int session, void * data, size_t sz) {
//假如没有输入源地址或者地址为0
if (source == 0) {
//将源地址设置为服务的句柄,即源为服务自己
source = context->handle;
}
//目标服务地址变量
uint32_t des = 0;
//分割以':'开头的名字,这其实是一个数字地址,只不过以字符串的形式表达,例如":0100001"
if (addr[0] == ':') {
//去掉':'后将地址转为长整型
des = strtoul(addr+1, NULL, 16);
} else if (addr[0] == '.') { //以'.'开头的字符串,其实是本地服务名称
//通过目标服务名称获取目标服务地址
des = skynet_handle_findname(addr + 1);
//找不到服务地址
if (des == 0) {
//假如当前消息是不需要拷贝类型的
if (type & PTYPE_TAG_DONTCOPY) {
//消息数据是分配的指针,需要释放掉
skynet_free(data);
}
//返回错误码
return -1;
}
} else {
//远程服务消息过滤器
_filter_args(context, type, &session, (void **)&data, &sz);
//构建远程消息数据
struct remote_message * rmsg = skynet_malloc(sizeof(*rmsg));
copy_name(rmsg->destination.name, addr);
rmsg->destination.handle = 0;
rmsg->message = data;
rmsg->sz = sz;
//通过harbor发送远程消息
skynet_harbor_send(rmsg, source, session);
//返回会话
return session;
}
//使用上述通过服务名称获取的数字服务地址,调用skynet_send发送消息
return skynet_send(context, source, des, type, session, data, sz);
}
2.发起请求 skynet.call:
查看 skynet.lua
中的源码:
function skynet.send(addr, typename, ...)
local p = proto[typename]
return c.send(addr, p.id, 0 , p.pack(...))
end
function skynet.call(addr, typename, ...)
local p = proto[typename]
local session = c.send(addr, p.id , nil , p.pack(...))
if session == nil then
error(call to invalid address .. skynet.address(addr))
end
return p.unpack(yield_call(addr, session))
end
不难看出 skynet.call
比 skynet.send
多出来的部分是对返回值的处理。
3.注册回调函数:
这是调度消息并执行回调函数的核心接口,我们知道 skynet 发送消息时是非阻塞的,实际只是完成了加消息压入到消息队列中,至于消息队列中消息的执行,并最终传递给目标服务,还是依赖于此步骤:
每个 skynet 服务都需要实现 skynet 的启动入口函数,即 skynet.start
,而在 skynet.lua
中查看此函数的定义:
function skynet.start(start_func)
--绑定skynet.dispatch_message为消息回调函数
c.callback(skynet.dispatch_message)
...
end
此处通过调用 skynet.core.callback
来绑定服务的消息回调函数为 skynet.dispatch_message
,而 skynet.core.callback
是lua中调用的一个C函数,可以在 lualib-src/lua-skynet.c
中查看其源码实现:
LUAMOD_API int luaopen_skynet_core(lua_State *L) {
luaL_Reg l[] = {
...
//指向lcallback
{ "callback", lcallback },
...
};
}
这里绑定了skynet中调用的 callback
方法所指向的C源码为 lcallback
,源码内容如下:
static int lcallback(lua_State *L) {
//获取服务实例
struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
//判断是否将消息forward给自己
int forward = lua_toboolean(L, 2);
//检查Lua调用代码中传递的第一个参数类型是否是function,否则将引发错误
luaL_checktype(L,1,LUA_TFUNCTION);
//设置栈顶为1(如果开始的栈顶高于新的栈顶,顶部的值被丢弃)
lua_settop(L,1);
//为lua绑定一个C对象_cb
lua_rawsetp(L, LUA_REGISTRYINDEX, _cb); //注册表[_cb对应的轻量用户数据]=栈顶的LUA函数,会将值弹出栈
//把索引 LUA_REGISTRYINDEX 所指定的值 t 的 t[LUA_RIDX_MAINTHREAD] 值压入栈
lua_rawgeti(L, LUA_REGISTRYINDEX, LUA_RIDX_MAINTHREAD); //获取LUA状态机的主线程(类型可能是thread吧),压入
//把给定索引(-1)处的值转换为一个 Lua 线程(由 lua_State* 代表)。 这个值必须是一个线程;否则函数返回 NULL
lua_State *gL = lua_tothread(L,-1);
//判断是否将消息forward给自己
if (forward) {
//绑定自己的回调函数forward_cb
skynet_callback(context, gL, forward_cb);
} else {
//绑定目标回调函数_cb
skynet_callback(context, gL, _cb);
}
return 0;
}
其中,使用到了 skynet
内置的 forward
功能把消息 forward
回自己。此函数诸多操作,最终的目的就是通过调用 skynet-src/skynet_server.c
中的 skynet_callback
来实现将回调函数绑定到服务实例 context->cb
上:
//绑定回调函数
void skynet_callback(struct skynet_context * context, void *ud, skynet_cb cb) {
//绑定回调到服务实例的cb属性
context->cb = cb;
context->cb_ud = ud;
}
回调数据结构体 skynet_cb
的定义在 skynet-src/skynet.h
中:
typedef int (*skynet_cb)(
struct skynet_context * context,
void *ud,
int type,
int session,
uint32_t source ,
const void * msg,
size_t sz
);
-
ud
是執行操作的lua线程(用lua_State
表示) -
source
和destination
分别是发送方和接收方的handle
; -
type
是发送方和接收方处理数据包的协议(可以带上 dontcopy 的 tag (PTYPE_TAG_DONTCOPY
),让框架不要复制msg/sz
指代的数据包); -
session
识别本次调用的口令,发送方发送一个消息后,保留该session
,以便收到回应数据包时,能识别出是哪一次调用; -
msg
/sz
是数据包的内容和长度,成对使用。
上面的参数中, session
在 skynet.send
和 skynet.call
中的使用略有不同:
使用
skynet_send
发送一个包时,可以在type
上设置 alloc session 的 tag (PTYPE_TAG_ALLOCSESSION
),send api 就会忽略掉传入的 session 参数,而会分配出一个当前服务从来没有使用过的 session 号,发送出去。同时约定,接收方在处理完这个消息后,把这个 session 原样发送回来。这样,编写服务的人只需要在 callback 函数里记录下所有待返回的 session 表,就可以在收到每个消息后,正确的调用对应的处理函数。
3.回调函数实现:
上面步骤2中通过 skynet.core.callback
将一个 lua 函数(skynet.dispatch_message
)设置到当前服务模块中,作为消息的 回调函数
。每个服务都必须设置,而且只能设置一个回调函数,这个回调函数每次收到一条消息时,会接收5个参数:
消息类型 type
消息指针 msg
消息长度 sz
消息会话标志 session
消息来源 source
那么,回调函数在收到消息后,对消息的处理逻辑就是我们接下来要分析的。
-
skynet.dispatch_message
在
skynet.lua
中查看skynet.dispatch_message
的实现,不难看出消息执行时传递到此函数,而消息的处理是直接交由raw_dispatch_message
来处理:--skynet 服务的消息回调函数 function skynet.dispatch_message(...) --将消息数据传递给 raw_dispatch_message local succ, err = pcall(raw_dispatch_message,...) ... end
所以,消息处理操作最终其实是交给
raw_dispatch_message
函数来实现的。 -
coroutine
机制:
在 skynet 的实际应用中,我们可以使用 RPC 语法来发送消息,向外部服务发起一个远程调用,等待对方发送了回应消息后,逻辑再继续向下执行,为了把这种回调函数的模式转换为阻塞 API 调用的形式,skynet 引入了 lua 的coroutine
特性,让一段代码运行了一半时挂起,在之后适当的时候继续运行。为了实现这一点,我们在接收到每条消息的时候,都需要创建一个
coroutine
(协程),然后再此coroutine
中执行该消息的dispatch
函数。创建的coroutine
在消息任务执行完成并返回响应后,便会处于空闲状态,假如使用一个 table 来管理所有创建的coroutine
,即此 table 就是一个 协程池 ,那么每次创建协程的操作可以改为从协程池中取出空闲的协程,这样可以提高效率,这也是co_create
这个函数的作用:从协程池中取出一个空闲的协程来执行函数,假如没有空闲的协程则创建。local function co_create(f) local co = table.remove(coroutine_pool) if co == nil then co = coroutine.create(function(...) f(...) while true do f = nil coroutine_pool[#coroutine_pool+1] = co f = coroutine_yield "EXIT" f(coroutine_yield()) end end) else coroutine_resume(co, f) end return co end
-
消息类型:
在 skynet 中消息分为多种类别,对应的也有不同的编码方式(即协议),消息类型的宏定义可以查看skynet.h
中:#define PTYPE_TEXT 0 #define PTYPE_RESPONSE 1 #define PTYPE_MULTICAST 2 #define PTYPE_CLIENT 3 #define PTYPE_SYSTEM 4 #define PTYPE_HARBOR 5 #define PTYPE_SOCKET 6 #define PTYPE_ERROR 7 #define PTYPE_RESERVED_QUEUE 8 #define PTYPE_RESERVED_DEBUG 9 #define PTYPE_RESERVED_LUA 10 #define PTYPE_RESERVED_SNAX 11 #define PTYPE_TAG_DONTCOPY 0x10000 #define PTYPE_TAG_ALLOCSESSION 0x20000
其中:
-
PTYPE_TEXT
是内部服务最常用的文本消息类型; -
PTYPE_RESPONSE
表示一个回应包,应该依据对方的规范来编码。
这里对应了接收消息的第一个传入参数
type
,这并不是传统意义的消息类型编号,而是一个当前消息包的 协议组别 ,取值范围是 0 ~ 255 ,由一个字节标识。(skynet 在实际时,将type
的数据编码到size
参数的高8位,并不会因为增加这个字段而增大消息的大小) -
-
注册监听消息类型:
当我们需要在一个服务中监听指定类型的消息,就需要在服务启动的时候先注册该类型的消息的监听,通常是在服务的入口函数skynet.start
处通过调用skynet.dispatch
来注册绑定:--服务启动入口 skynet.start(function() --消息传入函数 skynet.dispatch("lua", function(session, address, ...) dispatch(...) end)
这里只监听
"lua"
协议类型的消息,并且绑定dispatch
作为消息的消息处理函数,具体绑定过程,在skynet.lua
中查看skynet.dispatch
的源码实现:function skynet.dispatch(typename, func) --获取协议 local p = proto[typename] if func then local ret = p.dispatch p.dispatch = func return ret else return p and p.dispatch end end
-
typename
指定了接收消息的协议类型,遍历 skynet 中的使用情况,包括了lua
、client
和snax
三种类型,但最主要的还是lua
类型; -
func
是一个消息处理的 lua 函数。
-
-
消息处理函数:
上面将dispatch(...)
绑定为消息的处理函数,而它的函数内容,通常为以下格式:local command = {} function command.foobar(...) end local function dispatch(cmd, ...) command[cmd](...) end
这个关于分割
cmd
以获得消息类型,然后通过command
函数集来调用每个cmd
对应的相应函数,例如:cmd = “foobar” ,则command.foobar
会触发。当然,这只是最粗糙的回调函数实现方式,由于消息还分为两种类型:一种是单向的消息发送,调用skynet.send
实现;另一种是发送一个请求,并等待响应。为了适应这两类消息的接收,需要对消息的回调处理做一定的修改:--消息回调函数 local function dispatch(cmd, ...) --先判断command函数集中包不包含当前传入 cmd 的处理函数,假如未定义则不再往下执行 local f = assert(command[cmd]) --执行 cmd 对应的处理函数 local r = f(...) --判断是否有返回结果 if r ~= NORET then --打包数据并回传给消息发送方 skynet.ret(skynet.pack(r)) end end
通过
skynet.ret
就可以将执行结果返回给等待响应的服务,但由于它不会自动打包数据,所以数据在被回传之前先借助skynet.pack
进行打包。
消息相关常用API:
上面我们仔细分析了消息传递和调用的C实现,但通常我们不会对C源码进行修改,而具体的业务逻辑我们是使用lua来编写的,所以理解的最低限度就是要知道skynet的消息调度在lua层面的一下常用API,在服务间进行通信的接口最常用的有两个: skynet.send
和 skynet.call
,两者的最终都会调用到 skynet_send
接口:
-
skynet.send
:发送消息 -
skynet.call
:发送消息并等待返回响应(目标服务消息处理后需要通过skynet.ret
将结果返回)
API使用实例:
关于 skynet.call
的使用可以参考 examples/simpledb.lua
,这是 call
方法调用的目标服务:
local skynet = require "skynet"
require "skynet.manager" -- import skynet.register
local db = {}
local command = {}
function command.GET(key)
return db[key]
end
function command.SET(key, value)
local last = db[key]
db[key] = value
return last
end
skynet.start(function()
skynet.dispatch("lua", function(session, address, cmd, ...)
--转为大写格式
cmd = cmd:upper()
if cmd == "PING" then
assert(session == 0)
local str = (...)
if #str > 20 then
str = str:sub(1,20) .. "...(" .. #str .. ")"
end
skynet.error(string.format("%s ping %s", skynet.address(address), str))
return
end
--执行消息对应的cmd处理
local f = command[cmd]
--假如有返回值,则需要返回给请求的服务
if f then
--打包数据并返回
skynet.ret(skynet.pack(f(...)))
else
error(string.format("Unknown command %s", tostring(cmd)))
end
end)
skynet.register "SIMPLEDB"
end)
下面是使用 skynet.call
调用上面 SIMPLEDB
服务中的方法:
local r = skynet.call("SIMPLEDB", "lua", "get", self.what)
这里 simpledb.lua
中的 command.GET
会被调用,并返回操作结果给调用的地方,返回的实现在 skynet.ret
中实现。
其他:
在解读skynet源码的时候,我都是查看 这个资料网站 的步骤来进行解读的,方法因人而异吧,反正我觉得比从头“硬读”源码要好一些。
参考资料:
- skynet 消息分发及服务调度的新设计
- skynet 消息队列的新设计(接上文)
- skynet1.0阅读笔记2_skynet的消息投递skynet.call
- skynet中的各种锁
- C语言memset()函数:将内存的前n个字节设置为特定的值
- skynet源码学习 - 从全局队列中弹出/压入一个消息队列过程
- 关于C语言的assert(断言)
- 为 Lua 绑定 C/C++ 对象
- 探索skynet(三):消息队列
- skynet coroutine 运行笔记
- Skynet之消息队列 - 消息的存储与分发
- skynet 里的 coroutine
- skynet浅析
- skynet源码阅读<5>–协程调度模型
- skynet源码分析04-消息处理(协程)