gstreamer中的rtpjitterbuffer代码分析:推送线程

时间:2022-12-25 08:44:14

1. 简介:

    本文主要分析gstreamer中的rtpjitterbuffer中推送数据线程的代码。

2. 流程:

    推送线程主要功能就是根据jbuf上的数据,对排序的包(或者对应包序的异常事件)进行处理下发。

    主要流程如下:

    1) 当rtpjitterbuffer工作在PUSH模式下,会通过gst_pad_start_task创建一个子线程,执行推送数据主要操作gst_rtp_jitter_buffer_loop。

    2) handle_next_buffer中会对jbuf中的item进行分析处理,如果是连续包,会依序推送至下游组件。

    3) 当jbuf中没有item,或者当包不连续的时候,会从handle_next_buffer返回GST_FLOW_WAIT,在gst_rtp_jitter_buffer_loop中会进入等待状态。

    4) 当主线程收到包,或者当一个包确认被丢弃的时候,会激活推送数据线程来进行处理。回到2) 执行。

    如图:

gstreamer中的rtpjitterbuffer代码分析:推送线程

3. 代码片段:

3.1 函数主要入口gst_rtp_jitter_buffer_loop

    这个函数主要就是一个循环,如果可以处理包就一直处理,直到包序不连续或者jbuf空,这时候会等待包或者丢包事件的到来唤醒本线程。

  do {
// 一个循环中处理jbuf,直到处理到jbuf空或者出现包缺失,需要等待
// 这时候会等待条件变量来唤醒该线程。
result = handle_next_buffer (jitterbuffer);
if (G_LIKELY (result == GST_FLOW_WAIT)) {
/* now wait for the next event */
JBUF_WAIT_EVENT (priv, flushing);
result = GST_FLOW_OK;
}
} while (result == GST_FLOW_OK);
3.2 单个buffer处理函数 handle_next_buffer

    这个函数整体处理的流程比较清晰。

    1) 窥视jbuf的头部的item,如果为空则等待其余包到来。

    2) 根据item序号判断是否连续包(也可能是事件,在这个函数中不体现,请查看3.3),是连续包则推送至下层,重复包则直接删除,如果包序发生跳跃(即gap > 0),则说明可能出现丢包,需要等待主线程和定时器配合判断,此时也需要进行等待,等待对应包序的包(或者事件,如LOST事件)到来。

/* Peek a buffer and compare the seqnum to the expected seqnum.
* If all is fine, the buffer is pushed.
* If something is wrong, we wait for some event
*/
static GstFlowReturn
handle_next_buffer (GstRtpJitterBuffer * jitterbuffer)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
GstFlowReturn result;
RTPJitterBufferItem *item;
guint seqnum;
guint32 next_seqnum;

/* only push buffers when PLAYING and active and not buffering */
// 状态检查
if (priv->blocked || !priv->active ||
rtp_jitter_buffer_is_buffering (priv->jbuf)) {
return GST_FLOW_WAIT;
}

/* peek a buffer, we're just looking at the sequence number.
* If all is fine, we'll pop and push it. If the sequence number is wrong we
* wait for a timeout or something to change.
* The peeked buffer is valid for as long as we hold the jitterbuffer lock. */
// 窥视jbuf第一个item,如果为空则jbuf为空,本线程进入等待。
item = rtp_jitter_buffer_peek (priv->jbuf);
if (item == NULL) {
goto wait;
}

/* get the seqnum and the next expected seqnum */
seqnum = item->seqnum;
if (seqnum == -1) {
return pop_and_push_next (jitterbuffer, seqnum);
}

next_seqnum = priv->next_seqnum;

/* get the gap between this and the previous packet. If we don't know the
* previous packet seqnum assume no gap. */
if (G_UNLIKELY (next_seqnum == -1)) {
GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
/* we don't know what the next_seqnum should be, the chain function should
* have scheduled a DEADLINE timer that will increment next_seqnum when it
* fires, so wait for that */
result = GST_FLOW_WAIT;
} else {
gint gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);

if (G_LIKELY (gap == 0)) {
/* no missing packet, pop and push */
// 包是预期的包,从jbuf中删除并PUSH至下游。
result = pop_and_push_next (jitterbuffer, seqnum);
} else if (G_UNLIKELY (gap < 0)) {
/* if we have a packet that we already pushed or considered dropped, pop it
* off and get the next packet */
// 重复包,从jbuf中删除并直接忽略。
GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
seqnum, next_seqnum);
item = rtp_jitter_buffer_pop (priv->jbuf, NULL);
free_item (item);
result = GST_FLOW_OK;
} else {
/* the chain function has scheduled timers to request retransmission or
* when to consider the packet lost, wait for that */
// 发生丢包,包序号不连续,这时候不对jbuf进行处理,返回上层进行等待,等待对应包的到来(BUFFER)或者丢失(LOST)。
GST_DEBUG_OBJECT (jitterbuffer,
"Sequence number GAP detected: expected %d instead of %d (%d missing)",
next_seqnum, seqnum, gap);
result = GST_FLOW_WAIT;
}
}

return result;

wait:
{
GST_DEBUG_OBJECT (jitterbuffer, "no buffer, going to wait");
if (priv->eos) {
return GST_FLOW_EOS;
} else {
return GST_FLOW_WAIT;
}
}
}
3.3 单个包的判断函数pop_and_push_next

    这个函数会对包的类型进行相应处理。

    对于BUFFER类型的包,主要是根据标志位判断其buffer上是否需要搭载额外标志位,如丢包设置DISCONT,时间偏移改变需要设置RESYNC,并根据参数ts_offset调整该buffer的pts和dts。

    LOST是一种更具体的EVENT,因此LOST处理上都是EVENT的fallthrough关系。出现LOST的时候,需要根据rtpjitterbuffer配置的do_lost参数判断是否需要将丢包的对应事件向下游发送。

    对于BUFFER类型,发送的是一个数据buffer,而对于LOST和EVENT,发送的是event。(LOST对应的event在do_lost_timeout函数中生成,具体可以参见do_lost_timeout函数)。

    对于QUERY类型,应该是pad的caps进行query或者accept的时候,这种时候的query一般是需要向下游进行传递查询。

/* take a buffer from the queue and push it */
static GstFlowReturn
pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint seqnum)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
GstFlowReturn result = GST_FLOW_OK;
RTPJitterBufferItem *item;
GstBuffer *outbuf = NULL;
GstEvent *outevent = NULL;
GstQuery *outquery = NULL;
GstClockTime dts, pts;
gint percent = -1;
gboolean do_push = TRUE;
guint type;
GstMessage *msg;

/* when we get here we are ready to pop and push the buffer */
item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
type = item->type;

switch (type) {
case ITEM_TYPE_BUFFER:

/* we need to make writable to change the flags and timestamps */
outbuf = gst_buffer_make_writable (item->data);

// 出现了丢包,包括大范围丢包或者某个seqnum被丢弃时候会置位。
// 在即将push的buffer上打上标记,并重置标志位。
if (G_UNLIKELY (priv->discont)) {
/* set DISCONT flag when we missed a packet. We pushed the buffer writable
* into the jitterbuffer so we can modify now. */
GST_DEBUG_OBJECT (jitterbuffer, "mark output buffer discont");
GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
priv->discont = FALSE;
}
// 时间戳不同步,当调用Jitterbuffer的地方对ts_offset进行了设置
// 给buffer打上标记并重置标志位
if (G_UNLIKELY (priv->ts_discont)) {
GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
priv->ts_discont = FALSE;
}

// 重新设置buffer的pts和dts,结合ts_offset。
// segment暂时不是很明白具体作用。
dts =
gst_segment_position_from_running_time (&priv->segment,
GST_FORMAT_TIME, item->dts);
pts =
gst_segment_position_from_running_time (&priv->segment,
GST_FORMAT_TIME, item->pts);

/* apply timestamp with offset to buffer now */
GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts);
GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts);

/* update the elapsed time when we need to check against the npt stop time. */
update_estimated_eos (jitterbuffer, item);

priv->last_out_time = GST_BUFFER_PTS (outbuf);
break;
case ITEM_TYPE_LOST:
// 发生丢包了,置discont标志位,对应上一个case。
priv->discont = TRUE;
// 如果没有开启do_lost选项,则不会把相应的event push至下游。
if (!priv->do_lost)
do_push = FALSE;
/* FALLTHROUGH */
// 丢包时候的item数据也是一个event,具体见do_lost_timeout中的处理
case ITEM_TYPE_EVENT:
outevent = item->data;
break;
case ITEM_TYPE_QUERY:
outquery = item->data;
break;
}

/* now we are ready to push the buffer. Save the seqnum and release the lock
* so the other end can push stuff in the queue again. */
// 更新本地推送数据的记录信息
if (seqnum != -1) {
priv->last_popped_seqnum = seqnum;
priv->next_seqnum = (seqnum + item->count) & 0xffff;
}
// Buffering是否是gstreamer内部机制的buffering?或者是和rtpbin的BUFFER_MODE有关
msg = check_buffering_percent (jitterbuffer, percent);
JBUF_UNLOCK (priv);

item->data = NULL;
free_item (item);

if (msg)
gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg);

switch (type) {
case ITEM_TYPE_BUFFER:
/* push buffer */
GST_DEBUG_OBJECT (jitterbuffer,
"Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT,
seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)),
GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)));
// 推送至下游
result = gst_pad_push (priv->srcpad, outbuf);

JBUF_LOCK_CHECK (priv, out_flushing);
break;
case ITEM_TYPE_LOST:
case ITEM_TYPE_EVENT:
/* We got not enough consecutive packets with a huge gap, we can
* as well just drop them here now on EOS */
// 收到EOS后释放所有不连续的包。
if (GST_EVENT_TYPE (outevent) == GST_EVENT_EOS) {
GST_DEBUG_OBJECT (jitterbuffer, "Clearing gap packets on EOS");
g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
g_queue_clear (&priv->gap_packets);
}

GST_DEBUG_OBJECT (jitterbuffer, "%sPushing event %" GST_PTR_FORMAT
", seqnum %d", do_push ? "" : "NOT ", outevent, seqnum);

// 推送相应事件至下游。
if (do_push)
gst_pad_push_event (priv->srcpad, outevent);
else
gst_event_unref (outevent);

result = GST_FLOW_OK;

JBUF_LOCK_CHECK (priv, out_flushing);
break;
case ITEM_TYPE_QUERY:
// 应该是pad的caps query和accept的查询,这种查询一般都要发送至下游组件。这里不展开
{
gboolean res;

res = gst_pad_peer_query (priv->srcpad, outquery);

JBUF_LOCK_CHECK (priv, out_flushing);
result = GST_FLOW_OK;
GST_LOG_OBJECT (jitterbuffer, "did query %p, return %d", outquery, res);
JBUF_SIGNAL_QUERY (priv, res);
break;
}
}
return result;

/* ERRORS */
out_flushing:
{
return priv->srcresult;
}
}