流媒体技术之RTSP

时间:2021-10-07 17:04:48

     最近对于流媒体技术比较感兴趣,虽然读书的时候学过相关方面的基础知识,但是大学上课,你懂得,一方面理论与实际脱节很严重,另一方面考试完全就是突击。学了和没学一样。好了,吐槽结束,书归正文。

    研究流媒体技术的前提是先明白三个协议,RTSP,RTCP和RTP。关于这三种协议具体的定义百度上可以说是一抓一大把。总的来说, RTSP控制负责控制,包括创建,播放和暂停等操作,RTCP和RTP可以认为是一种协议,最大的区别 是RTCP中没有负载(payload,也就是媒体数据流),RTP则包含了负载。RTCP主要负责传输server和client的状态,如已经接收了多少数据,时间戳是什么,而RTP主要作用就是传输流媒体数据。

    大部分对于RTSP都提到了这一个词:“RTSP是文本协议”,这句话是什么意思?通俗点说,如果你想告诉服务器你的名字,你首先构建一个类似于name="xxxxx"的字符串,然后把这个字符串转成byte[],经过SOCKET传给服务器,服务器就能够知道你的名字了。与之形成对比的是RTCP,RTCP规定了每个比特的每一位都代表什么,例如一个RTCP包的第一个比特的前两位代表版本,第三位用来填充,而第二个比特代表这次会话的序列号。坦率的说,实现RTCP协议可比RTSP烧脑多了。

   回到RTSP这个话题,RTSP协议包含以下几种操作,option,describe,setup,play,pause和teardown。option是询问服务器你能提供什么方法,describe则是获取服务器的详细信息,setup是与服务器建立连接,服务器返回一个sessionid用来之后进行鉴权,play就是通知服务器可以发数据了,pause则是通知服务器暂停发数据,teardown,挥泪告别,さようなら。

   如果你在百度上搜索过如下的关键字:RTSP  java。你会发现有人已经实现了RTSP协议,如果你真的使用了那份代码,恭喜你,你踩到坑啦。大部分转载的人并没有对转载的内容进行验证。我被网上的这份代码坑了号就,今天刚刚出坑,特此记录。

    RTSPProtocal:RTSP协议类,主要负责创建RTSP文本

    

public class RTSPProtocal {
	
	public static byte[] encodeOption(String address, String VERSION, int seq) {  
        StringBuilder sb = new StringBuilder();  
        sb.append("OPTIONS ");  
        sb.append(address.substring(0, address.lastIndexOf("/")));  
        sb.append(VERSION);  
        sb.append("Cseq: ");  
        sb.append(seq);  
        sb.append("\r\n");  
        sb.append("\r\n");  
        System.out.println(sb.toString());  
        //send(sb.toString().getBytes());
        return sb.toString().getBytes();
    }
	
	public static byte[] encodeDescribe(String address, String VERSION, int seq) {  
        StringBuilder sb = new StringBuilder();  
        sb.append("DESCRIBE ");  
        sb.append(address);  
        sb.append(VERSION);  
        sb.append("Cseq: ");  
        sb.append(seq);  
        sb.append("\r\n");  
        sb.append("\r\n");  
        System.out.println(sb.toString());  
        //send(sb.toString().getBytes());
        return sb.toString().getBytes();  
    }
	
	public static byte[] encodeSetup(String address, String VERSION, String sessionid, 
										int portOdd, int portEven, int seq, String trackInfo) {  
        StringBuilder sb = new StringBuilder();  
        sb.append("SETUP ");  
        sb.append(address);  
        sb.append("/");  
        sb.append(trackInfo);  
        sb.append(VERSION);  
        sb.append("Cseq: ");  
        sb.append(seq++);  
        sb.append("\r\n");
        //"50002-50003"
        sb.append("Transport: RTP/AVP;UNICAST;client_port="+portEven+"-"+portOdd+";mode=play\r\n");  
        sb.append("\r\n");  
        System.out.println(sb.toString());  
        System.out.println(sb.toString());  
        //send(sb.toString().getBytes());  
        return sb.toString().getBytes();
    }
	
	public static byte[] encodePlay(String address, String VERSION, String sessionid, int seq) {  
        StringBuilder sb = new StringBuilder();  
        sb.append("PLAY ");  
        sb.append(address);  
        sb.append(VERSION);  
        sb.append("Session: ");  
        sb.append(sessionid);  
        sb.append("Cseq: ");  
        sb.append(seq);
        sb.append("\r\n");
        sb.append("Range: npt=0.000-");
        sb.append("\r\n");  
        sb.append("\r\n");  
        System.out.println(sb.toString());  
        //send(sb.toString().getBytes());  
        return sb.toString().getBytes();
    }
	
	public static byte[] encodePause(String address, String VERSION, String sessionid, int seq) {  
        StringBuilder sb = new StringBuilder();  
        sb.append("PAUSE ");  
        sb.append(address);  
        sb.append("/");  
        sb.append(VERSION);  
        sb.append("Cseq: ");  
        sb.append(seq);  
        sb.append("\r\n");  
        sb.append("Session: ");  
        sb.append(sessionid);  
        sb.append("\r\n");  
        System.out.println(sb.toString());  
        //send(sb.toString().getBytes());
        return sb.toString().getBytes();
    }
	
	public static byte[] encodeTeardown(String address, String VERSION, String sessionid, int seq) {  
        StringBuilder sb = new StringBuilder();  
        sb.append("TEARDOWN ");  
        sb.append(address);  
        sb.append("/");  
        sb.append(VERSION);  
        sb.append("Cseq: ");  
        sb.append(seq);  
        sb.append("\r\n");
        sb.append("User-Agent: LibVLC/2.2.1 (LIVE555 Streaming Media v2014.07.25)\r\n");  
        sb.append("Session: ");  
        sb.append(sessionid);  
        sb.append("\r\n");
        System.out.println(sb.toString()); 
        return sb.toString().getBytes();
        //send(sb.toString().getBytes());  
        // 
    }
}
   RTSPClient:使用RTSPProtocal中的静态方法获取字符创,拥有发送和接收数据的功能

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Map;

public class RTSPClient {
	
	private static final int BUFFER_SIZE = 8192;
	
	private String localIpAddress;
	private String remoteIpAddress;
	
	private int localPort;
	private int localPortOdd;
	private int localPortEven;
	private int remoteIPort;
	
	private int remotePortOdd;
	private int remotePortEven;
	
	private Map<Integer, ReceiveSocket> map = new HashMap<>();
	
	
	
	public int getRemotePortOdd() {
		return remotePortOdd;
	}
	public void setRemotePortOdd(int remotePortOdd) {
		this.remotePortOdd = remotePortOdd;
	}
	public int getRemotePortEven() {
		return remotePortEven;
	}
	public void setRemotePortEven(int remotePortEven) {
		this.remotePortEven = remotePortEven;
	}
	
	public void addSocket(Integer port, ReceiveSocket socket){
		map.put(port, socket);
	}
	
	private String rtspAddress;
	private Socket tcpSocket;
	
	private SocketChannel socketChannel;
	private Selector selector; 
	
	public String getLocalIpAddress() {
		return localIpAddress;
	}
	public void setLocalIpAddress(String localIpAddress) {
		this.localIpAddress = localIpAddress;
	}
	public int getLocalPort() {
		return localPort;
	}
	public void setLocalPort(int localPort) {
		this.localPort = localPort;
	}
	public int getLocalPortOdd() {
		return localPortOdd;
	}
	public void setLocalPortOdd(int localPortOdd) {
		this.localPortOdd = localPortOdd;
	}
	public int getLocalPortEven() {
		return localPortEven;
	}
	public void setLocalPortEven(int localPortEven) {
		this.localPortEven = localPortEven;
	}
	public String getRtspAddress() {
		return rtspAddress;
	}
	public void setRtspAddress(String rtspAddress) {
		this.rtspAddress = rtspAddress;
	}
	public Socket getTcpSocket() {
		return tcpSocket;
	}
	public void setTcpSocket(Socket tcpSocket) {
		this.tcpSocket = tcpSocket;
	}
	
	
	public String getRemoteIpAddress() {
		return remoteIpAddress;
	}
	public void setRemoteIpAddress(String remoteIpAddress) {
		this.remoteIpAddress = remoteIpAddress;
	}
	public int getRemoteIPort() {
		return remoteIPort;
	}
	public void setRemoteIPort(int remoteIPort) {
		this.remoteIPort = remoteIPort;
	}
	
	public Selector getSelector() {
		return selector;
	}
	public void setSelector(Selector selector) {
		this.selector = selector;
	}
	//new InetSocketAddress(  
    //remoteIp, 554),  
    //new InetSocketAddress("192.168.31.106", 0),  
    //"rtsp://218.204.223.237:554/live/1/66251FC11353191F/e7ooqwcfbqjoo80j.sdp"
	public void inital() throws IOException{
		 socketChannel = SocketChannel.open();   
         socketChannel.socket().setSoTimeout(30000);  
         socketChannel.configureBlocking(false);
         
         InetSocketAddress localAddress = new InetSocketAddress(this.localIpAddress, localPort);
         InetSocketAddress remoteAddress=new InetSocketAddress(this.remoteIpAddress, 554);
         
         socketChannel.socket().bind(localAddress);  
         if (socketChannel.connect(remoteAddress)) {  
             System.out.println("开始建立连接:" + remoteAddress);  
         } 
         
         if (selector == null) {  
             // 创建新的Selector  
             try {  
                 selector = Selector.open();  
             } catch (final IOException e) {  
                 e.printStackTrace();  
             }  
         }
         socketChannel.register(selector, SelectionKey.OP_CONNECT  
                 | SelectionKey.OP_READ | SelectionKey.OP_WRITE, this);  
         System.out.println("端口打开成功");
	}
	
	public void write(byte[] out) throws IOException {  
		if (out == null || out.length < 1) {  
            return;  
        }
		System.out.println(out.toString());
		ByteBuffer sendBuf = ByteBuffer.allocateDirect(BUFFER_SIZE);
		sendBuf.clear();  
        sendBuf.put(out);  
        sendBuf.flip();
        if (isConnected()) {  
            try {  
                socketChannel.write(sendBuf);  
            } catch (final IOException e) {  
            }  
        } else {  
            System.out.println("通道为空或者没有连接上");  
        }  
    }
	
	public boolean isConnected() {  
        return socketChannel != null && socketChannel.isConnected();  
    }
	
	public byte[] receive() {  
        if (isConnected()) {  
            try {  
                int len = 0;  
                int readBytes = 0;
                ByteBuffer receiveBuf = ByteBuffer.allocateDirect(BUFFER_SIZE);
                synchronized (receiveBuf) {  
                    receiveBuf.clear();  
                    try {  
                        while ((len = socketChannel.read(receiveBuf)) > 0) {  
                            readBytes += len;  
                        }  
                    } finally {  
                        receiveBuf.flip();  
                    }  
                    if (readBytes > 0) {  
                        final byte[] tmp = new byte[readBytes];  
                        receiveBuf.get(tmp);  
                        return tmp;  
                    } else {  
                        System.out.println("接收到数据为空,重新启动连接");  
                        return null;  
                    }  
                }  
            } catch (final IOException e) {  
                System.out.println("接收消息错误:");  
            }  
        } else {  
            System.out.println("端口没有连接");  
        }  
        return null;  
    }
	/*
	 * 非常重要
	 * */
	public void sendBeforePlay(){
		ReceiveSocket socketEven = map.get(this.localPortEven);
		ReceiveSocket socketOdd = map.get(this.localPortOdd);
		if(socketEven == null){
			socketEven = new ReceiveSocket(this.localIpAddress,this.localPortEven);
			map.put(this.localPortEven, socketEven);
		}
		if(socketOdd == null){
			socketEven = new ReceiveSocket(this.localIpAddress, this.localPortOdd);
			map.put(this.localPortOdd, socketOdd);
		}
		byte[] bytes = new byte[1];
		bytes[0]=0;
		try {
			socketEven.send(bytes, this.remoteIpAddress, this.remotePortEven);
			socketOdd.send(bytes, this.remoteIpAddress, this.remotePortOdd);
		} catch (IOException e) {
			e.printStackTrace();
		}
		return;
	}
	
	public void reConnect(SelectionKey key) throws IOException {  
        if (isConnected()) {  
            return;  
        }  
        // 完成SocketChannel的连接  
        socketChannel.finishConnect();  
        while (!socketChannel.isConnected()) {  
            try {  
                Thread.sleep(300);  
            } catch (final InterruptedException e) {  
                e.printStackTrace();  
            }  
            socketChannel.finishConnect();  
        }  
  
    }
}
   ReceiveSocket:用来接收服务器发来的RTP和RTCP协议数据,只是简单地对UDP进行了包装而已

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.net.UnknownHostException;


public class ReceiveSocket implements Runnable{
	
	private DatagramSocket ds;
	public ReceiveSocket(String localAddress, int port){
		 try {
			
			InetSocketAddress addr = new InetSocketAddress("192.168.31.106", port);
			ds = new DatagramSocket(addr);//监听16264端口
		} catch (SocketException e) {
			e.printStackTrace();
		}   
	}
	
	@Override
	public void run() {
		// TODO Auto-generated method stub
		while(true){
			byte[] buf = new byte[20];  
		    DatagramPacket dp = new DatagramPacket(buf,buf.length);  
		    try 
		    {
				ds.receive(dp);
				String ip = dp.getAddress().getHostAddress();   //数据提取  
		        String data = new String(dp.getData(),0,dp.getLength());  
		        int port = dp.getPort();  
		        System.out.println(data+"."+port+".."+ip);
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}   
		}    
	}
	
	public void send(byte[] buf, String ip, int rec_port) throws IOException {
		// TODO Auto-generated method stub
		DatagramPacket dp = new DatagramPacket(buf,buf.length,InetAddress.getByName(ip),rec_port);//10000为定义的端口     
        ds.send(dp);  
        //ds.close();    
	}
}
    PlayerClient:播放类,通过不同状态之间的相互转化完成RTSP协议的交互工作。 这里有一点非常关键:请注意setup这个状态,在和服务器建立连接之后,如果直接发送PLAY请求,服务器不会向指定的端口发送RTCP数据(这个问题困扰了我一晚上)。因此在发送PLAY请求之前,client接收RTCP和RTP的两个端口必须先向服务器的RTCP和RTP端口发送任意的数据,发送方式为UDP,服务器在setup操作时已经返回RTCP和RTP的端口信息。具体的实现参考sendBeforePlay()。我在网上没有找到这么操作的原因,这还是通过wireshark对VLC进行抓包才发现这个隐藏逻辑
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;


public class PlayerClient {
	private RTSPClient rtspClient = new RTSPClient();
	private static final String VERSION = " RTSP/1.0\r\n";  
	private static final String RTSP_OK = "RTSP/1.0 200 OK";
	private Selector selector;
	private enum Status {  
        init, options, describe, setup, play, pause, teardown  
    }
	private Status sysStatus = Status.init;
	private String rtspAddress = "rtsp://218.204.223.237:554/live/1/66251FC11353191F/e7ooqwcfbqjoo80j.sdp";
	private String localAddress = "192.168.31.106";
	private int localPort=0;
	private String remoteAddress = "218.204.223.237";
	private int count=0;
	private String sessionid;
	private String trackInfo;
	private boolean isSended=true;
	private int localPortOdd=50002;
	private int localPortEven=50003;
	private ReceiveSocket socket1 = new ReceiveSocket(localAddress,localPortOdd);
    private ReceiveSocket socket2 = new ReceiveSocket(localAddress,localPortEven);
	
	public void init(){
		rtspClient.setLocalIpAddress(localAddress);
		rtspClient.setLocalPort(localPort);
		rtspClient.setRemoteIpAddress(remoteAddress);
		rtspClient.setRemoteIPort(554);
		rtspClient.setRtspAddress(rtspAddress);
		rtspClient.setLocalPortEven(this.localPortEven);
		rtspClient.setLocalPortOdd(this.localPortOdd);
		rtspClient.addSocket(this.localPortOdd, socket1);
		rtspClient.addSocket(this.localPortEven, socket2);
		try 
		{
			rtspClient.inital();
		} catch (IOException e) {
			e.printStackTrace();
		}
		this.selector = rtspClient.getSelector();
		new Thread(socket1).start();
        new Thread(socket2).start();
	}
	
	public void run() throws IOException{
		int seq=2;
		while(true){
			if(rtspClient.isConnected() && isSended){
				switch (sysStatus) {  
	            case init:
	    			byte[] message = RTSPProtocal.encodeOption(this.rtspAddress, this.VERSION, seq);
	    			this.rtspClient.write(message); 
	                break;  
	            case options:
	            	seq++;
	            	message = RTSPProtocal.encodeDescribe(this.rtspAddress, this.VERSION, seq);
	    			this.rtspClient.write(message);   
	                break;  
	            case describe:  
	            	seq++;
	            	message = RTSPProtocal.encodeSetup(this.rtspAddress, VERSION, sessionid, 
	            								localPortEven, localPortOdd,seq, trackInfo);
	    			this.rtspClient.write(message);  
	                break;  
	            case setup:  
	                if(sessionid==null&&sessionid.length()>0){  
	                    System.out.println("setup还没有正常返回");  
	                }else{  
	                	seq++;
	                	message = RTSPProtocal.encodePlay(this.rtspAddress, VERSION, sessionid, seq);
	        			this.rtspClient.write(message); 
	                }  
	                break;  
	            case play:
	            	count++;
	            	System.out.println("count: "+count);
	                break;
	            case pause:   
	                break;  
	            default:  
	                break;  
	            }
				isSended=false;
			}
			else{
				
			}
			select();
		}
	}
	
	private void handle(byte[] msg) {  
        String tmp = new String(msg);  
        System.out.println("返回内容:"+tmp);  
        if (tmp.startsWith(RTSP_OK)) {  
            switch (sysStatus) {  
            case init:  
                sysStatus = Status.options;  
                System.out.println("option ok");
                isSended=true; 
                break;  
            case options:  
                sysStatus = Status.describe;  
                trackInfo=tmp.substring(tmp.indexOf("trackID"));  
                System.out.println("describe ok"); 
                isSended=true; 
                break;  
            case describe:  
                sessionid = tmp.substring(tmp.indexOf("Session: ") + 9, tmp  
                        .indexOf("Date:"));  
                int index = tmp.indexOf("server_port=");
                String serverPort1 = tmp.substring(tmp.indexOf("server_port=") + 12, tmp  
                        .indexOf("-", index));
                String serverPort2 = tmp.substring(tmp.indexOf("-", index) + 1, tmp  
                        .indexOf("\r\n", index));
                
                this.rtspClient.setRemotePortEven(Integer.valueOf(serverPort1));
                this.rtspClient.setRemotePortOdd(Integer.valueOf(serverPort2));
                
                if(sessionid!=null&&sessionid.length()>0){  
                    sysStatus = Status.setup;  
                    System.out.println("setup ok");  
                }
                isSended=true; 
                break;  
            case setup:  
                sysStatus = Status.play;   
                System.out.println("play ok"); 
                this.rtspClient.sendBeforePlay();
                this.rtspClient.sendBeforePlay();
                isSended=true; 
                break;  
            case play:  
                //sysStatus = Status.pause;  
                System.out.println("pause ok");  
                isSended=true; 
                break;  
            case pause:  
                sysStatus = Status.teardown;  
                System.out.println("teardown ok"); 
                isSended=true; 
                //shutdown.set(true);  
                break;  
            case teardown:  
                sysStatus = Status.init;  
                System.out.println("exit start"); 
                isSended=true; 
                break;  
            default:  
                break;  
            }   
        } else {  
            System.out.println("返回错误:" + tmp);  
        }  
  
    }
	
	private void select() {  
        int n = 0;  
        try 
        {  
            if (selector == null) {  
                return;  
            }  
            n = selector.select(1000);  
        } catch (final Exception e) {  
            e.printStackTrace();  
        }  
  
        // 如果select返回大于0,处理事件  
        if (n > 0) {  
            for (final Iterator<SelectionKey> i = selector.selectedKeys()  
                    .iterator(); i.hasNext();) {  
                // 得到下一个Key  
                final SelectionKey sk = i.next();  
                i.remove();  
                // 检查其是否还有效  
                if (!sk.isValid()) {  
                    continue;  
                }  
                if (sk.isReadable()) {  
                	byte[] message = rtspClient.receive();
                	handle(message);
                }
                if (sk.isConnectable()) {  
                    try {
                    	rtspClient.reConnect(sk);
					} catch (IOException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}  
                }
            }  
        }  
    }
	
}
Test :测试类

public class Test {
	public static void main(String[] args){
		PlayerClient player = new PlayerClient();
		player.init();
		try 
		{
			player.run();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
}
   只要在ReceiveSocket的run方法中打断点,你就会发现源源不断的数据向你发来,是不是感觉很爽,哈哈哈。