基于live555的rtsp封装rtspperf的客户端源码解析

时间:2022-09-26 04:00:12

项目地址:http://sourceforge.net/projects/rtspperf/

实现了基于epoll模型的TaskScheduler,而live555默认实现使用select模型,select模型有一些已知的限制(FD最大值不能超过1024)
实现了rtsp client 框架,提供一个回调接口回调数据。rtspperf实现了多线程调用live555,做客户端的可以参考!

 

客户端 流程图如下:

基于live555的rtsp封装rtspperf的客户端源码解析

 

 

 

主程序如下(player.c):

/**********
This library is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by the
Free Software Foundation; either version 2.1 of the License, or (at your
option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)

This library is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
more details.

You should have received a copy of the GNU Lesser General Public License
along with this library; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
**********/
/* Copyright (C) 2011 Zack Xue <zackxue@163.com> */

#include "playerapi.h"

static unsigned char url[256];
typedef struct __player_param{
 int thread_id;
 int frames;
 int bytes;
 unsigned char url[256];
 FILE * f;
}player_param;

static int hexprint(char * pBuf, int size)
{
   int i = 0;
   for (i = 0; i < size; i ++){
 printf("0x%x ", pBuf[i]);
   }
   printf("\n");
   printf("\n");
   printf("\n");
   return 0;
}

int player_callback(unsigned char * p_buf, int size, void * param)
{
 player_param *p_param = (player_param *)param;
 char buf[4];
 //buf[0] = 0x00;
 //buf[1] = 0x00;
 //buf[2] = 0x00;
 //buf[3] = 0x01;
 
 p_param->frames ++;
 p_param->bytes += size;
 //hexprint(p_buf, 10);
 
 //Write data
 //fwrite(buf, 1, 4, p_param->f);
 //fwrite(p_buf, 1, size, p_param->f);
 
// printf("Recv %d Frames %d bytes %d id %d\n", size, p_param->frames, p_param->bytes, p_param->thread_id);
}

void usage()
{
    printf("./player url(rtsp://192.168.0.1/h264)  Thread num(2)\n");
}

void player_thread(void * param)
{
 void * p_handle = NULL;
 player_param * p_param = (player_param *)param;
 char name[256];
 
 memset(name, 0, 256);
 
 sprintf(name, "player-%d", p_param->thread_id);
 
 p_param->f = fopen (name, "wb");
 
 p_handle = player_create(p_param->url);
 
 
 player_set_callback(p_handle, (player_data_cb)player_callback, (void * )p_param);
 
 
 player_run(p_handle);
 
    return;
}

int main(int argc, char **argv)
{
    int thread;
 pthread_t thread_t[100];
 player_param param[100];

    int i = 0;
   
    if (argc < 3)
    {
        usage();
        return 0;
    }

    strcpy(url, argv[1]);

    thread = atoi(argv[2]);

    printf("thread num %d\n", thread);
    for (i = 0; i < thread; i ++)
    {
  memset(&(param[i]), 0, sizeof(player_param));
  param[i].thread_id = i;
 sprintf(param[i].url, "%s%d", url, i + 1);
 //sleep(2);
        if (pthread_create(&thread_t[i], NULL, (void *)player_thread, (void *)&(param[i])) != 0)
        {
            printf("pthread_create error %d\n", i);
            perror("error");
        }
    }
    while(1)
    {
        sleep(1);
    }

    return;   
}


playerapi.cpp代码如下:

/**********
This library is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by the
Free Software Foundation; either version 2.1 of the License, or (at your
option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)

This library is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
more details.

You should have received a copy of the GNU Lesser General Public License
along with this library; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
**********/
/* Copyright (C) 2011 Zack Xue <zackxue@163.com> */

#include "MediaSink.hh"
#include "playerapi.h"
#include "BasicTaskSchedulerEpoll.h"
#include "playerEnv.h"
#include "rtspClient.h"
#include "playerSink.h"

class player
{
public:
    player(unsigned char * url);
      
 int set_callback(player_data_cb cb, void * param);
 void run();

public:

    unsigned char m_url[256];
 player_data_cb m_cb;
    void * m_param;
};

player::player(unsigned char * url)
        : m_cb(NULL), m_param(NULL){
 if (url != NULL){
  strcpy((char*)m_url, (char*)url);
 }
}

int player::set_callback(player_data_cb cb, void * param){
 this->m_cb = cb;
 this->m_param = param;
 return 0;
}

void rtspDataCallback(unsigned char* data, unsigned dataSize,
           struct timeval presentationTime, void * param)
{
 player *p_player = (player *)param;
 if (p_player->m_cb && p_player->m_param){
  p_player->m_cb(data, dataSize, p_player->m_param);
 }
}

typedef struct __setParamdata{
    Medium* ourClient;
    MediaSession* session;
 UsageEnvironment* env;
 struct __setParamdata * self;
}setParamdata;

void setParamFun(void* clientData)
{
 setParamdata *p_setParamdata = (setParamdata *)clientData;
 clientSetMediaSessionParameter(p_setParamdata->ourClient,
     p_setParamdata->session);
 printf("send clientSetMediaSessionParameter %p\n", clientData);
 p_setParamdata->env->taskScheduler().scheduleDelayedTask(1000 * 25 * 1000, (TaskFunc *)setParamFun, p_setParamdata->self);
 return;

}


void player::run()
{
    static unsigned short desiredPortNumStatic = 5002;
    unsigned short desiredPortNum = desiredPortNumStatic;
    desiredPortNumStatic = desiredPortNumStatic + 8;
    Medium* ourClient = NULL;
    MediaSession* session = NULL;
    UsageEnvironment* env = NULL;
    unsigned char url[256];
 setParamdata keepalive;

   

 sprintf((char*)url, "%s", (char*)m_url);
    // Begin by setting up our usage environment:
    TaskScheduler* scheduler =  BasicTaskSchedulerEpoll::createNew();
    env = playerBasicUsageEnvironment::createNew(*scheduler);
   
    ourClient = createClient(*env, 1, "rtspc");
    if (ourClient == NULL)
    {
        printf("createClient error");
        return;
    }
   
    // Open the URL, to get a SDP description:
    char* sdpDescription
        = getSDPDescriptionFromURL(ourClient, (char *)url, NULL, NULL,
                   NULL, 0,
                   desiredPortNum);
    if (sdpDescription == NULL)
    {
        printf("getSDPDescriptionFromURL error");
        return;
    }
    printf("Start SDP %s", sdpDescription);
   
    // Create a media session object from this SDP description:
    session = MediaSession::createNew(*env, sdpDescription);
   
    delete[] sdpDescription;
    if (sdpDescription == NULL)
    {
        printf("Failed to create a MediaSession"
                "object from the SDP description");
        return;
    }
    else if (!session->hasSubsessions())
    {
        printf("This session has no media subsessions ");
        return;       
    }
   
    // Then, setup the "RTPSource"s for the session:
    MediaSubsessionIterator iter(*session);
    MediaSubsession *subsession;
    Boolean madeProgress = False;
   
    while ((subsession = iter.next()) != NULL)
    {
        if (desiredPortNum != 0)
        {
            subsession->setClientPortNum(desiredPortNum);
            desiredPortNum += 2;
        }
        if (!subsession->initiate(-1))
        {
            printf("Unable to create receiver for %s",
                    subsession->mediumName());
        }
        else
        {
            printf("Created receiver for %s"
                        "subsession (client ports %d-%d",
                    subsession->mediumName(), subsession->clientPortNum(),
                    subsession->clientPortNum()+1);
            madeProgress = True;           
        }
       
        if (subsession->rtpSource() != NULL)
        {
          // Because we're saving the incoming data, rather than playing
          // it in real time, allow an especially large time threshold
          // (1 second) for reordering misordered incoming packets:
          unsigned const thresh = 1000000; // 1 second
          //subsession->rtpSource()->setPacketReorderingThresholdTime(thresh);
        }
    }
   
    /* Setup  */
    iter.reset();
    while ((subsession = iter.next()) != NULL)
    {
        if (subsession->clientPortNum() == 0) continue; // port # was not set
  if (strstr(subsession->mediumName(), "audio") != 0){
   printf("%s----%d skip audio\n", __FILE__, __LINE__);
   continue;
  }
        if (!clientSetupSubsession(ourClient, subsession, False))
        {
            printf("Failed to setup %s Codec %s\n",
                    subsession->mediumName(), subsession->codecName());
        }
        else
        {
            printf("Setup %s %s"
                        "subsession (client ports %d-%d",
                    subsession->mediumName(), subsession->codecName(),
                    subsession->clientPortNum(),
                    subsession->clientPortNum()+1);
        }
    }
   
    /* Create iPlayer sink */
    iter.reset();
    while ((subsession = iter.next()) != NULL)
    {
        playerSink * iSink;
        iSink = playerSink::createNew(*env, (playerSinkCallback)rtspDataCallback,
                        (void *)this,
                     200000);
        subsession->sink = iSink;
       
        subsession->sink->startPlaying(*(subsession->readSource()),
      NULL,
      NULL);
    }
   
    if (!clientStartPlayingSession(ourClient, session))
    {
        printf("clientStartPlayingSession error");
        return;
    }
    else
    {
        printf("Start Playing");
    }
 keepalive.ourClient = ourClient;
 keepalive.session = session;
 keepalive.env = env;
 keepalive.self = &keepalive;
 env->taskScheduler().scheduleDelayedTask(10000, (TaskFunc *)setParamFun, &keepalive);
   
    env->taskScheduler().doEventLoop(); // does not return
}

#ifdef __cplusplus
extern "C" {
#endif

void * player_create(unsigned char * url)
{
 return (void *) (new player(url));
}

int player_set_callback(void * handle, player_data_cb cb, void * param)
{
 player * p_player = (player *) handle;
 if (p_player){
  p_player->set_callback(cb, param);
 }
}

void player_run(void * handle){
 player * p_player = (player *) handle;
 if (p_player){
  p_player->run();
 }
}

#ifdef __cplusplus
};
#endif

playerapp.h

/**********
This library is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by the
Free Software Foundation; either version 2.1 of the License, or (at your
option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)

This library is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
more details.

You should have received a copy of the GNU Lesser General Public License
along with this library; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
**********/
/* Copyright (C) 2011 Zack Xue <zackxue@163.com> */

#ifndef __PLAYER_API__
#define __PLAYER_API__

#ifdef __cplusplus
extern "C" {
#endif

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

typedef int (* player_data_cb) (unsigned char * p_buf, int size, void * param);

void * player_create(unsigned char * url);

int player_set_callback(void * handle, player_data_cb cb, void * param);

void player_run(void * handle);

 

 

#ifdef __cplusplus
};
#endif
#endif


PlayerApi中封装了一个player类;该类向外部提供调用的api,实现了一个PlayerSink用来接收用服务器传递过来的Buff数据,在该Sink中,有一个回调函数,用于将数据回调出去;代码如下(playerSink.h):

/**********
This library is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by the
Free Software Foundation; either version 2.1 of the License, or (at your
option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)

This library is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
more details.

You should have received a copy of the GNU Lesser General Public License
along with this library; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
**********/
/* Copyright (C) 2011 Zack Xue <zackxue@163.com> */

 
#ifndef __PLAYER_SINK_H__
#define __PLAYER_SINK_H__

#ifndef _MEDIA_SINK_HH
#include "MediaSink.hh"
#endif

typedef void (* playerSinkCallback) (unsigned char* data, unsigned dataSize,
        struct timeval presentationTime, void * param);

class playerSink: public MediaSink {
public:
  static playerSink* createNew(UsageEnvironment& env, playerSinkCallback cb,
                    void * param,
        unsigned bufferSize = 20000);
  // "bufferSize" should be at least as large as the largest expected
  //   input frame.

  void addData(unsigned char* data, unsigned dataSize,
        struct timeval presentationTime);
  // (Available in case a client wants to add extra data to the output file)

protected:
  playerSink(UsageEnvironment& env, playerSinkCallback cb,
                void * param,
                unsigned bufferSize);
      // called only by createNew()
  virtual ~playerSink();

protected:
  static void afterGettingFrame(void* clientData, unsigned frameSize,
    unsigned numTruncatedBytes,
    struct timeval presentationTime,
    unsigned durationInMicroseconds);
  virtual void afterGettingFrame1(unsigned frameSize,
      struct timeval presentationTime);

  playerSinkCallback m_cb;
  unsigned char* fBufferLarge;
  unsigned char* fBuffer;
  unsigned fBufferSize;
  void * m_param;

private: // redefined virtual functions:
  virtual Boolean continuePlaying();
};

 

#endif /* __PLAYER_SINK_H__ */


playerSink.cpp 代码如下:

/**********
This library is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by the
Free Software Foundation; either version 2.1 of the License, or (at your
option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)

This library is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
more details.

You should have received a copy of the GNU Lesser General Public License
along with this library; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
**********/
/* Copyright (C) 2011 Zack Xue <zackxue@163.com> */


#include "playerapi.h"
#include "playerSink.h"

#if (defined(__WIN32__) || defined(_WIN32)) && !defined(_WIN32_WCE)
#include <io.h>
#include <fcntl.h>
#endif
#include "FileSink.hh"
#include "GroupsockHelper.hh"
#include "OutputFile.hh"

////////// FileSink //////////

playerSink::playerSink(UsageEnvironment& env, playerSinkCallback cb,
                void * param, 
                unsigned bufferSize)
  : MediaSink(env), m_param(param), m_cb(cb), fBufferSize(bufferSize) {
  fBufferLarge = new unsigned char[bufferSize + 4];
  fBuffer = fBufferLarge + 4;
  fBufferLarge[0] = 0;
  fBufferLarge[1] = 0;
  fBufferLarge[2] = 0;
  fBufferLarge[3] = 0x01;

 
}

playerSink::~playerSink() {
  delete[] fBuffer;
}

playerSink* playerSink::createNew(UsageEnvironment& env, playerSinkCallback cb,
                  void * param,
         unsigned bufferSize) {

    return new playerSink(env, cb, param, bufferSize);
}

Boolean playerSink::continuePlaying() {
  if (fSource == NULL) return False;

  fSource->getNextFrame(fBuffer, fBufferSize,
   afterGettingFrame, this,
   onSourceClosure, this);

  return True;
}

void playerSink::afterGettingFrame(void* clientData, unsigned frameSize,
     unsigned /*numTruncatedBytes*/,
     struct timeval presentationTime,
     unsigned /*durationInMicroseconds*/) {
  playerSink* sink = (playerSink*)clientData;
  sink->afterGettingFrame1(frameSize, presentationTime);
}

void playerSink::addData(unsigned char* data, unsigned dataSize,
         struct timeval presentationTime) {
    return m_cb(data, dataSize, presentationTime, m_param);  
}

void playerSink::afterGettingFrame1(unsigned frameSize,
      struct timeval presentationTime) {
  addData(fBuffer, frameSize, presentationTime);

  // Then try getting the next frame:
  continuePlaying();
}


BasicTaskSchedulerEpoll和playerBasicUsageEnvironment 实现了基于epoll模型的TaskScheduler,代替live555默认实现使用select模型;