Live555通过SDP文本信息实现对RTP的接收

时间:2021-08-02 03:58:21

笔者最近因为工做中需要用到此功能,特把实现方法分享给有兴趣的网友。
我的方法来自Live555官方论坛最权威的方案,如下:
[Live-devel] Once again, SDP support for Live555 & interaction with FFMpeg
1.First, create a “MediaSession” object, by calling
“MediaSession::createNew()”, with the SDP description (string) as
parameter.

2.Then, go through each of this object’s ‘subsessions’ (in this case,
there’ll be just one, for “video”), and call
“MediaSubsession::initiate()” on it.

3.Then, you can create an appropriate ‘sink’ object (e.g.,
encapsulating your decoder), and then call “startPlaying()” on it,
passing the subsession’s “readSource()” as parameter.

See the “openRTSP” code (specifically, “testProgs/playCommon.cpp”)
for an example of how this is done.
注意我后面的实现不是参考openRTSP实现,而是更简单的测试源码testRTSPClient.cpp

具体实现步骤:
1.首先建议大家可参考testRTSPClient.cpp这个测试例子

2.把函数中传入url参数的地方修改为传入SDP字符串

3.把涉及到RTSP交互的逻辑都去掉

4.详细实现细节,请参考我的源码

/**********
This library is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by the
Free Software Foundation; either version 2.1 of the License, or (at your
option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)

This library is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
more details.

You should have received a copy of the GNU Lesser General Public License
along with this library; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
**********/

// Copyright (c) 1996-2013, Live Networks, Inc.  All rights reserved
// A demo application, showing how to create and run a RTSP client (that can potentially receive multiple streams concurrently).
//
// NOTE: This code - although it builds a running application - is intended only to illustrate how to develop your own RTSP
// client application.  For a full-featured RTSP client application - with much more functionality, and many options - see
// "openRTSP": http://www.live555.com/openRTSP/

#include "liveMedia.hh"
#include "BasicUsageEnvironment.hh"

// Forward function definitions:
class RTPClient ;
// RTSP 'response handlers':
void startSetupSubSession (RTPClient * rtpClient ) ;
void rtpClientStartPlay (RTPClient * rtpClient ) ;
void initMediaSession (RTPClient * rtpClient ) ;

// Other event handler functions:
void subsessionAfterPlaying ( void * clientData ) ; // called when a stream's subsession (e.g., audio or video substream) ends
void subsessionByeHandler ( void * clientData ) ; // called when a RTCP "BYE" is received for a subsession
void streamTimerHandler ( void * clientData ) ;
// called at the end of a stream's expected duration (if the stream has not already signaled its end using a RTCP "BYE")

// The main streaming routine (for each "rtsp://" URL):
void openURL (UsageEnvironment & env, char const * progName, char const * szSDP ) ;

// Used to iterate through each stream's 'subsessions', setting up each one:
void setupNextSubsession (RTPClient * rtpClient ) ;

// Used to shut down and close a stream (including its "RTPClient" object):
void shutdownStream (RTPClient * rtpClient, int exitCode = 1 ) ;

// A function that outputs a string that identifies each stream (for debugging output).  Modify this if you wish:
//UsageEnvironment& operator<<(UsageEnvironment& env, const RTPClient& rtpClient) {
//return env << "[URL:\"" << rtpClient.url() << "\"]: ";
//}

// A function that outputs a string that identifies each subsession (for debugging output).  Modify this if you wish:
UsageEnvironment & operator << (UsageEnvironment & env, const MediaSubsession & subsession ) {
    return env << subsession. mediumName ( ) << "/" << subsession. codecName ( ) ;
}

void usage (UsageEnvironment & env, char const * progName ) {
    env << "Usage: " << progName <<   "c=IN IP4 192.168.1.47\r\nm=video 5000 RTP/AVP 105\r\na=rtpmap:105 H264\r\n" ; 
}

char eventLoopWatchVariable = 0 ;

int main ( int argc, char ** argv ) {
    // Begin by setting up our usage environment:
    TaskScheduler * scheduler = BasicTaskScheduler :: createNew ( ) ;
    UsageEnvironment * env = BasicUsageEnvironment :: createNew ( *scheduler ) ;

    // We need at least one SDP argument, like:
        //"c=IN IP4 192.168.1.47\r\nm=video 5000 RTP/AVP 105\r\na=rtpmap:105 H264\r\n“
        //注意SDP参数的含义: c字段为RTP数据源的IP, m字段video通道的本地rtp端口为5000,payload type为105
        //媒体类型为H264,同时行尾必须有回车换行\r\n

       
    if (argc < 2 ) {
        usage ( *env, argv [ 0 ] ) ;
        return 1 ;
    }

    // There are argc-1 URLs: argv[1] through argv[argc-1].  Open and start streaming each one:
    for ( int i = 1 ; i <= argc - 1 ; ++i ) {
        openURL ( *env, argv [ 0 ], argv [i ] ) ;
    }

    // All subsequent activity takes place within the event loop:
    env - >taskScheduler ( ). doEventLoop ( &eventLoopWatchVariable ) ;
    // This function call does not return, unless, at some point in time, "eventLoopWatchVariable" gets set to something non-zero.

    return 0 ;

    // If you choose to continue the application past this point (i.e., if you comment out the "return 0;" statement above),
    // and if you don't intend to do anything more with the "TaskScheduler" and "UsageEnvironment" objects,
    // then you can also reclaim the (small) memory used by these objects by uncommenting the following code:
    /*
    env->reclaim(); env = NULL;
    delete scheduler; scheduler = NULL;
    */

}

// Define a class to hold per-stream state that we maintain throughout each stream's lifetime:

class StreamClientState {
public :
    StreamClientState ( ) ;
    virtual ~StreamClientState ( ) ;

public :
    MediaSubsessionIterator * iter ;
    MediaSession * session ;
    MediaSubsession * subsession ;
    TaskToken streamTimerTask ;
    double duration ;
} ;



// Define a data sink (a subclass of "MediaSink") to receive the data for each subsession (i.e., each audio or video 'substream').
// In practice, this might be a class (or a chain of classes) that decodes and then renders the incoming audio or video.
// Or it might be a "FileSink", for outputting the received data into a file (as is done by the "openRTSP" application).
// In this example code, however, we define a simple 'dummy' sink that receives incoming data, but does nothing with it.

class DummySink : public MediaSink {
public :
    static DummySink * createNew (UsageEnvironment & env,
        MediaSubsession & subsession, // identifies the kind of data that's being received
        char const * streamId = NULL ) ; // identifies the stream itself (optional)

private :
    DummySink (UsageEnvironment & env, MediaSubsession & subsession, char const * streamId ) ;
    // called only by "createNew()"
    virtual ~DummySink ( ) ;

    static void afterGettingFrame ( void * clientData, unsigned frameSize,
        unsigned numTruncatedBytes,
    struct timeval presentationTime,
        unsigned durationInMicroseconds ) ;
    void afterGettingFrame ( unsigned frameSize, unsigned numTruncatedBytes,
    struct timeval presentationTime, unsigned durationInMicroseconds ) ;

private :
    // redefined virtual functions:
    virtual Boolean continuePlaying ( ) ;

private :
    u_int8_t * fReceiveBuffer ;
    MediaSubsession & fSubsession ;
    char * fStreamId ;
} ;

//新类
class RTPClient {
public :
    RTPClient (UsageEnvironment & e, MediaSession *ms ) : mediaSession (ms ), env (e )
    {
    }

    virtual ~RTPClient ( )
    {
        //Medium::close(mediaSession);

    }
    UsageEnvironment &  envir ( )
    {
        return env ;
    }
public :
    MediaSession * mediaSession ;
    UsageEnvironment & env ;
    StreamClientState scs ;

} ;

#define RTSP_CLIENT_VERBOSITY_LEVEL 1 // by default, print verbose output from each "RTPClient"

static unsigned rtspClientCount = 0 ; // Counts how many streams (i.e., "RTPClient"s) are currently in use.

void openURL (UsageEnvironment & env, char const * progName, char const * szSDP ) {
    // Begin by creating a "RTPClient" object.  Note that there is a separate "RTPClient" object for each stream that we wish
    // to receive (even if more than stream uses the same "rtsp://" URL).
    MediaSession * mediaSession = MediaSession :: createNew (env,  szSDP ) ;
    if (mediaSession == NULL ) {
        env << "Failed to create a RTP Client for SDP \"" << szSDP << "\": " << env. getResultMsg ( ) << "\n" ;
        return ;
    }

    RTPClient *client = new RTPClient (env, mediaSession ) ;
    if (client == NULL )
    {
        return ;
    }

    ++rtspClientCount ;

    // Next, send a RTSP "DESCRIBE" command, to get a SDP description for the stream.
    // Note that this command - like all RTSP commands - is sent asynchronously; we do not block, waiting for a response.
    // Instead, the following function call returns immediately, and we handle the RTSP response later, from within the event loop:
    //rtpClient->sendDescribeCommand(continueAfterDESCRIBE);
    initMediaSession (client ) ;
}


// Implementation of the RTSP 'response handlers':

void initMediaSession (RTPClient * rtpClient )
{
    do {
        UsageEnvironment & env = rtpClient - >envir ( ) ; // alias
        StreamClientState & scs = ( (RTPClient * )rtpClient ) - >scs ; // alias
        if ( !scs. session - >hasSubsessions ( ) )
        {
            env << "This session has no media subsessions (i.e., no \"m=\" lines)\n" ;
            //env << *rtpClient << "Failed to get a SDP description: " << resultString << "\n";
            //delete[] resultString;
            break ;
        }


        // Then, create and set up our data source objects for the session.  We do this by iterating over the session's 'subsessions',
        // calling "MediaSubsession::initiate()", and then sending a RTSP "SETUP" command, on each one.
        // (Each 'subsession' will have its own data source.)
        scs. iter = new MediaSubsessionIterator ( *scs. session ) ;
        setupNextSubsession (rtpClient ) ;
        return ;
    } while ( 0 ) ;

    // An unrecoverable error occurred with this stream.
    shutdownStream (rtpClient ) ;
}

// By default, we request that the server stream its data using RTP/UDP.
// If, instead, you want to request that the server stream via RTP-over-TCP, change the following to True:
#define REQUEST_STREAMING_OVER_TCP False

void setupNextSubsession (RTPClient * rtpClient ) {
    UsageEnvironment & env = rtpClient - >envir ( ) ; // alias
    StreamClientState & scs = ( (RTPClient * )rtpClient ) - >scs ; // alias

    scs. subsession = scs. iter - >next ( ) ;
    if (scs. subsession ! = NULL ) {
        if ( !scs. subsession - >initiate ( ) ) {
            env << "Failed to initiate the \"" << *scs. subsession << "\" subsession: " << env. getResultMsg ( ) << "\n" ;
            setupNextSubsession (rtpClient ) ; // give up on this subsession; go to the next one
        } else {
            env << "Initiated the \"" << *scs. subsession
                << "\" subsession (client ports " << scs. subsession - >clientPortNum ( ) << "-" << scs. subsession - >clientPortNum ( ) + 1 << ")\n" ;

            //初始化session,下面的函数会递归调用setupNextSubsession初始化下一个session
            startSetupSubSession (rtpClient ) ;
        }
        return ;
    }


    // We've finished setting up all of the subsessions.  Now,  start the streaming:
    rtpClientStartPlay (rtpClient ) ;
}

void startSetupSubSession (RTPClient * rtpClient ) {
    bool success = true ;
    do {
        UsageEnvironment & env = rtpClient - >envir ( ) ; // alias
        StreamClientState & scs = ( (RTPClient * )rtpClient ) - >scs ; // alias


        env << "Set up the \"" << *scs. subsession
            << "\" subsession (client ports " << scs. subsession - >clientPortNum ( ) << "-" << scs. subsession - >clientPortNum ( ) + 1 << ")\n" ;



        // Having successfully setup the subsession, create a data sink for it, and call "startPlaying()" on it.
        // (This will prepare the data sink to receive data; the actual flow of data from the client won't start happening until later,
        // after we've sent a RTSP "PLAY" command.)

        scs. subsession - >sink = DummySink :: createNew (env, *scs. subsession, "test" ) ;
        // perhaps use your own custom "MediaSink" subclass instead
        if (scs. subsession - >sink == NULL ) {
            env << "Failed to create a data sink for the \"" << *scs. subsession
                << "\" subsession: " << env. getResultMsg ( ) << "\n" ;
            success = false ;
            break ;
        }

        env << "Created a data sink for the \"" << *scs. subsession << "\" subsession\n" ;
        scs. subsession - >miscPtr = rtpClient ; // a hack to let subsession handle functions get the "RTPClient" from the subsession
        scs. subsession - >sink - >startPlaying ( * (scs. subsession - >readSource ( ) ),
            subsessionAfterPlaying, scs. subsession ) ;
        // Also set a handler to be called if a RTCP "BYE" arrives for this subsession:
        if (scs. subsession - >rtcpInstance ( ) ! = NULL ) {
            scs. subsession - >rtcpInstance ( ) - >setByeHandler (subsessionByeHandler, scs. subsession ) ;
        }
    } while ( 0 ) ;
    if ( !success )
    {
        shutdownStream (rtpClient ) ;
        return ;
    }

    // Set up the next subsession, if any:
    setupNextSubsession (rtpClient ) ;
}

void rtpClientStartPlay (RTPClient * rtpClient ) {
    Boolean success = False ;

    do {
        UsageEnvironment & env = rtpClient - >envir ( ) ; // alias
        StreamClientState & scs = ( (RTPClient * )rtpClient ) - >scs ; // alias

        // Set a timer to be handled at the end of the stream's expected duration (if the stream does not already signal its end
        // using a RTCP "BYE").  This is optional.  If, instead, you want to keep the stream active - e.g., so you can later
        // 'seek' back within it and do another RTSP "PLAY" - then you can omit this code.
        // (Alternatively, if you don't want to receive the entire stream, you could set this timer for some shorter value.)
        if (scs. duration > 0 ) {
            unsigned const delaySlop = 2 ; // number of seconds extra to delay, after the stream's expected duration.  (This is optional.)
            scs. duration + = delaySlop ;
            unsigned uSecsToDelay = ( unsigned ) (scs. duration * 1000000 ) ;
            scs. streamTimerTask = env. taskScheduler ( ). scheduleDelayedTask (uSecsToDelay, (TaskFunc * )streamTimerHandler, rtpClient ) ;
        }

        env << "Started playing session" ;
        if (scs. duration > 0 ) {
            env << " (for up to " << scs. duration << " seconds)" ;
        }
        env << "...\n" ;

        success = True ;
    } while ( 0 ) ;

    if ( !success ) {
        // An unrecoverable error occurred with this stream.
        shutdownStream (rtpClient ) ;
    }
}


// Implementation of the other event handlers:

void subsessionAfterPlaying ( void * clientData ) {
    MediaSubsession * subsession = (MediaSubsession * )clientData ;
    RTPClient * rtpClient = (RTPClient * ) (subsession - >miscPtr ) ;

    // Begin by closing this subsession's stream:
    Medium :: close (subsession - >sink ) ;
    subsession - >sink = NULL ;

    // Next, check whether *all* subsessions' streams have now been closed:
    MediaSession & session = subsession - >parentSession ( ) ;
    MediaSubsessionIterator iter (session ) ;
    while ( (subsession = iter. next ( ) ) ! = NULL ) {
        if (subsession - >sink ! = NULL ) return ; // this subsession is still active
    }

    // All subsessions' streams have now been closed, so shutdown the client:
    shutdownStream (rtpClient ) ;
}

void subsessionByeHandler ( void * clientData ) {
    MediaSubsession * subsession = (MediaSubsession * )clientData ;
    RTPClient * rtpClient = (RTPClient * )subsession - >miscPtr ;
    UsageEnvironment & env = rtpClient - >envir ( ) ; // alias

    env << "Received RTCP \"BYE\" on \"" << *subsession << "\" subsession\n" ;

    // Now act as if the subsession had closed:
    subsessionAfterPlaying (subsession ) ;
}

void streamTimerHandler ( void * clientData ) {
    RTPClient * rtpClient = (RTPClient * )clientData ;
    StreamClientState & scs = rtpClient - >scs ; // alias

    scs. streamTimerTask = NULL ;

    // Shut down the stream:
    shutdownStream (rtpClient ) ;
}



void shutdownStream (RTPClient * rtpClient ) {
    UsageEnvironment & env = rtpClient - >envir ( ) ; // alias
    StreamClientState & scs = ( (RTPClient * )rtpClient ) - >scs ; // alias

    // First, check whether any subsessions have still to be closed:
    if (scs. session ! = NULL ) {
        Boolean someSubsessionsWereActive = False ;
        MediaSubsessionIterator iter ( *scs. session ) ;
        MediaSubsession * subsession ;

        while ( (subsession = iter. next ( ) ) ! = NULL ) {
            if (subsession - >sink ! = NULL ) {
                Medium :: close (subsession - >sink ) ;
                subsession - >sink = NULL ;

                if (subsession - >rtcpInstance ( ) ! = NULL ) {
                    subsession - >rtcpInstance ( ) - >setByeHandler ( NULL, NULL ) ; // in case the server sends a RTCP "BYE" while handling "TEARDOWN"
                }

                someSubsessionsWereActive = True ;
            }
        }

        if (someSubsessionsWereActive ) {
            // Send a RTSP "TEARDOWN" command, to tell the server to shutdown the stream.
            // Don't bother handling the response to the "TEARDOWN".
            //rtpClient->sendTeardownCommand(*scs.session, NULL);
        }
    }

    env << "Closing the stream.\n" ;
    //Medium::close(rtpClient);
    delete rtpClient ;
    // Note that this will also cause this stream's "StreamClientState" structure to get reclaimed.


}




// Implementation of "StreamClientState":

StreamClientState :: StreamClientState ( )
: iter ( NULL ), session ( NULL ), subsession ( NULL ), streamTimerTask ( NULL ), duration ( 0.0 ) {
}

StreamClientState ::~StreamClientState ( ) {
    delete iter ;
    if (session ! = NULL ) {
        // We also need to delete "session", and unschedule "streamTimerTask" (if set)
        UsageEnvironment & env = session - >envir ( ) ; // alias

        env. taskScheduler ( ). unscheduleDelayedTask (streamTimerTask ) ;
        Medium :: close (session ) ;
    }
}


// Implementation of "DummySink":

// Even though we're not going to be doing anything with the incoming data, we still need to receive it.
// Define the size of the buffer that we'll use:
#define DUMMY_SINK_RECEIVE_BUFFER_SIZE 100000

DummySink * DummySink :: createNew (UsageEnvironment & env, MediaSubsession & subsession, char const * streamId ) {
    return new DummySink (env, subsession, streamId ) ;
}

DummySink :: DummySink (UsageEnvironment & env, MediaSubsession & subsession, char const * streamId )
: MediaSink (env ),
fSubsession (subsession ) {
    fStreamId = strDup (streamId ) ;
    fReceiveBuffer = new u_int8_t [DUMMY_SINK_RECEIVE_BUFFER_SIZE ] ;
}

DummySink ::~DummySink ( ) {
    delete [ ] fReceiveBuffer ;
    delete [ ] fStreamId ;
}

void DummySink :: afterGettingFrame ( void * clientData, unsigned frameSize, unsigned numTruncatedBytes,
struct timeval presentationTime, unsigned durationInMicroseconds ) {
    DummySink * sink = (DummySink * )clientData ;
    sink - >afterGettingFrame (frameSize, numTruncatedBytes, presentationTime, durationInMicroseconds ) ;
}

// If you don't want to see debugging output for each received frame, then comment out the following line:
#define DEBUG_PRINT_EACH_RECEIVED_FRAME 1

void DummySink :: afterGettingFrame ( unsigned frameSize, unsigned numTruncatedBytes,
struct timeval presentationTime, unsigned /*durationInMicroseconds*/ ) {
    // We've just received a frame of data.  (Optionally) print out information about it:
#ifdef DEBUG_PRINT_EACH_RECEIVED_FRAME
    if (fStreamId ! = NULL ) envir ( ) << "Stream \"" << fStreamId << "\"; " ;
    envir ( ) << fSubsession. mediumName ( ) << "/" << fSubsession. codecName ( ) << ":\tReceived " << frameSize << " bytes" ;
    if (numTruncatedBytes > 0 ) envir ( ) << " (with " << numTruncatedBytes << " bytes truncated)" ;
    char uSecsStr [ 6 + 1 ] ; // used to output the 'microseconds' part of the presentation time
    sprintf (uSecsStr, "%06u", ( unsigned )presentationTime. tv_usec ) ;
    envir ( ) << ".\tPresentation time: " << ( int )presentationTime. tv_sec << "." << uSecsStr ;
    if (fSubsession. rtpSource ( ) ! = NULL && !fSubsession. rtpSource ( ) - >hasBeenSynchronizedUsingRTCP ( ) ) {
        envir ( ) << "!" ; // mark the debugging output to indicate that this presentation time is not RTCP-synchronized
    }
#ifdef DEBUG_PRINT_NPT
    envir ( ) << "\tNPT: " << fSubsession. getNormalPlayTime (presentationTime ) ;
#endif
    envir ( ) << "\n" ;
#endif

    // Then continue, to request the next frame of data:
    continuePlaying ( ) ;
}

Boolean DummySink :: continuePlaying ( ) {
    if (fSource == NULL ) return False ; // sanity check (should not happen)

    // Request the next frame of data from our input source.  "afterGettingFrame()" will get called later, when it arrives:
    fSource - >getNextFrame (fReceiveBuffer, DUMMY_SINK_RECEIVE_BUFFER_SIZE,
        afterGettingFrame, this,
        onSourceClosure, this ) ;
    return True ;
}
转自:http://www.mworkbox.com/wp/work/555.html