最近对于流媒体技术比较感兴趣,虽然读书的时候学过相关方面的基础知识,但是大学上课,你懂得,一方面理论与实际脱节很严重,另一方面考试完全就是突击。学了和没学一样。好了,吐槽结束,书归正文。
研究流媒体技术的前提是先明白三个协议,RTSP,RTCP和RTP。关于这三种协议具体的定义百度上可以说是一抓一大把。总的来说, RTSP控制负责控制,包括创建,播放和暂停等操作,RTCP和RTP可以认为是一种协议,最大的区别 是RTCP中没有负载(payload,也就是媒体数据流),RTP则包含了负载。RTCP主要负责传输server和client的状态,如已经接收了多少数据,时间戳是什么,而RTP主要作用就是传输流媒体数据。
大部分对于RTSP都提到了这一个词:“RTSP是文本协议”,这句话是什么意思?通俗点说,如果你想告诉服务器你的名字,你首先构建一个类似于name="xxxxx"的字符串,然后把这个字符串转成byte[],经过SOCKET传给服务器,服务器就能够知道你的名字了。与之形成对比的是RTCP,RTCP规定了每个比特的每一位都代表什么,例如一个RTCP包的第一个比特的前两位代表版本,第三位用来填充,而第二个比特代表这次会话的序列号。坦率的说,实现RTCP协议可比RTSP烧脑多了。
回到RTSP这个话题,RTSP协议包含以下几种操作,option,describe,setup,play,pause和teardown。option是询问服务器你能提供什么方法,describe则是获取服务器的详细信息,setup是与服务器建立连接,服务器返回一个sessionid用来之后进行鉴权,play就是通知服务器可以发数据了,pause则是通知服务器暂停发数据,teardown,挥泪告别,さようなら。
如果你在百度上搜索过如下的关键字:RTSP java。你会发现有人已经实现了RTSP协议,如果你真的使用了那份代码,恭喜你,你踩到坑啦。大部分转载的人并没有对转载的内容进行验证。我被网上的这份代码坑了号就,今天刚刚出坑,特此记录。
RTSPProtocal:RTSP协议类,主要负责创建RTSP文本
public class RTSPProtocal { public static byte[] encodeOption(String address, String VERSION, int seq) { StringBuilder sb = new StringBuilder(); sb.append("OPTIONS "); sb.append(address.substring(0, address.lastIndexOf("/"))); sb.append(VERSION); sb.append("Cseq: "); sb.append(seq); sb.append("\r\n"); sb.append("\r\n"); System.out.println(sb.toString()); //send(sb.toString().getBytes()); return sb.toString().getBytes(); } public static byte[] encodeDescribe(String address, String VERSION, int seq) { StringBuilder sb = new StringBuilder(); sb.append("DESCRIBE "); sb.append(address); sb.append(VERSION); sb.append("Cseq: "); sb.append(seq); sb.append("\r\n"); sb.append("\r\n"); System.out.println(sb.toString()); //send(sb.toString().getBytes()); return sb.toString().getBytes(); } public static byte[] encodeSetup(String address, String VERSION, String sessionid, int portOdd, int portEven, int seq, String trackInfo) { StringBuilder sb = new StringBuilder(); sb.append("SETUP "); sb.append(address); sb.append("/"); sb.append(trackInfo); sb.append(VERSION); sb.append("Cseq: "); sb.append(seq++); sb.append("\r\n"); //"50002-50003" sb.append("Transport: RTP/AVP;UNICAST;client_port="+portEven+"-"+portOdd+";mode=play\r\n"); sb.append("\r\n"); System.out.println(sb.toString()); System.out.println(sb.toString()); //send(sb.toString().getBytes()); return sb.toString().getBytes(); } public static byte[] encodePlay(String address, String VERSION, String sessionid, int seq) { StringBuilder sb = new StringBuilder(); sb.append("PLAY "); sb.append(address); sb.append(VERSION); sb.append("Session: "); sb.append(sessionid); sb.append("Cseq: "); sb.append(seq); sb.append("\r\n"); sb.append("Range: npt=0.000-"); sb.append("\r\n"); sb.append("\r\n"); System.out.println(sb.toString()); //send(sb.toString().getBytes()); return sb.toString().getBytes(); } public static byte[] encodePause(String address, String VERSION, String sessionid, int seq) { StringBuilder sb = new StringBuilder(); sb.append("PAUSE "); sb.append(address); sb.append("/"); sb.append(VERSION); sb.append("Cseq: "); sb.append(seq); sb.append("\r\n"); sb.append("Session: "); sb.append(sessionid); sb.append("\r\n"); System.out.println(sb.toString()); //send(sb.toString().getBytes()); return sb.toString().getBytes(); } public static byte[] encodeTeardown(String address, String VERSION, String sessionid, int seq) { StringBuilder sb = new StringBuilder(); sb.append("TEARDOWN "); sb.append(address); sb.append("/"); sb.append(VERSION); sb.append("Cseq: "); sb.append(seq); sb.append("\r\n"); sb.append("User-Agent: LibVLC/2.2.1 (LIVE555 Streaming Media v2014.07.25)\r\n"); sb.append("Session: "); sb.append(sessionid); sb.append("\r\n"); System.out.println(sb.toString()); return sb.toString().getBytes(); //send(sb.toString().getBytes()); // } }RTSPClient:使用RTSPProtocal中的静态方法获取字符创,拥有发送和接收数据的功能
import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.HashMap; import java.util.Map; public class RTSPClient { private static final int BUFFER_SIZE = 8192; private String localIpAddress; private String remoteIpAddress; private int localPort; private int localPortOdd; private int localPortEven; private int remoteIPort; private int remotePortOdd; private int remotePortEven; private Map<Integer, ReceiveSocket> map = new HashMap<>(); public int getRemotePortOdd() { return remotePortOdd; } public void setRemotePortOdd(int remotePortOdd) { this.remotePortOdd = remotePortOdd; } public int getRemotePortEven() { return remotePortEven; } public void setRemotePortEven(int remotePortEven) { this.remotePortEven = remotePortEven; } public void addSocket(Integer port, ReceiveSocket socket){ map.put(port, socket); } private String rtspAddress; private Socket tcpSocket; private SocketChannel socketChannel; private Selector selector; public String getLocalIpAddress() { return localIpAddress; } public void setLocalIpAddress(String localIpAddress) { this.localIpAddress = localIpAddress; } public int getLocalPort() { return localPort; } public void setLocalPort(int localPort) { this.localPort = localPort; } public int getLocalPortOdd() { return localPortOdd; } public void setLocalPortOdd(int localPortOdd) { this.localPortOdd = localPortOdd; } public int getLocalPortEven() { return localPortEven; } public void setLocalPortEven(int localPortEven) { this.localPortEven = localPortEven; } public String getRtspAddress() { return rtspAddress; } public void setRtspAddress(String rtspAddress) { this.rtspAddress = rtspAddress; } public Socket getTcpSocket() { return tcpSocket; } public void setTcpSocket(Socket tcpSocket) { this.tcpSocket = tcpSocket; } public String getRemoteIpAddress() { return remoteIpAddress; } public void setRemoteIpAddress(String remoteIpAddress) { this.remoteIpAddress = remoteIpAddress; } public int getRemoteIPort() { return remoteIPort; } public void setRemoteIPort(int remoteIPort) { this.remoteIPort = remoteIPort; } public Selector getSelector() { return selector; } public void setSelector(Selector selector) { this.selector = selector; } //new InetSocketAddress( //remoteIp, 554), //new InetSocketAddress("192.168.31.106", 0), //"rtsp://218.204.223.237:554/live/1/66251FC11353191F/e7ooqwcfbqjoo80j.sdp" public void inital() throws IOException{ socketChannel = SocketChannel.open(); socketChannel.socket().setSoTimeout(30000); socketChannel.configureBlocking(false); InetSocketAddress localAddress = new InetSocketAddress(this.localIpAddress, localPort); InetSocketAddress remoteAddress=new InetSocketAddress(this.remoteIpAddress, 554); socketChannel.socket().bind(localAddress); if (socketChannel.connect(remoteAddress)) { System.out.println("开始建立连接:" + remoteAddress); } if (selector == null) { // 创建新的Selector try { selector = Selector.open(); } catch (final IOException e) { e.printStackTrace(); } } socketChannel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITE, this); System.out.println("端口打开成功"); } public void write(byte[] out) throws IOException { if (out == null || out.length < 1) { return; } System.out.println(out.toString()); ByteBuffer sendBuf = ByteBuffer.allocateDirect(BUFFER_SIZE); sendBuf.clear(); sendBuf.put(out); sendBuf.flip(); if (isConnected()) { try { socketChannel.write(sendBuf); } catch (final IOException e) { } } else { System.out.println("通道为空或者没有连接上"); } } public boolean isConnected() { return socketChannel != null && socketChannel.isConnected(); } public byte[] receive() { if (isConnected()) { try { int len = 0; int readBytes = 0; ByteBuffer receiveBuf = ByteBuffer.allocateDirect(BUFFER_SIZE); synchronized (receiveBuf) { receiveBuf.clear(); try { while ((len = socketChannel.read(receiveBuf)) > 0) { readBytes += len; } } finally { receiveBuf.flip(); } if (readBytes > 0) { final byte[] tmp = new byte[readBytes]; receiveBuf.get(tmp); return tmp; } else { System.out.println("接收到数据为空,重新启动连接"); return null; } } } catch (final IOException e) { System.out.println("接收消息错误:"); } } else { System.out.println("端口没有连接"); } return null; } /* * 非常重要 * */ public void sendBeforePlay(){ ReceiveSocket socketEven = map.get(this.localPortEven); ReceiveSocket socketOdd = map.get(this.localPortOdd); if(socketEven == null){ socketEven = new ReceiveSocket(this.localIpAddress,this.localPortEven); map.put(this.localPortEven, socketEven); } if(socketOdd == null){ socketEven = new ReceiveSocket(this.localIpAddress, this.localPortOdd); map.put(this.localPortOdd, socketOdd); } byte[] bytes = new byte[1]; bytes[0]=0; try { socketEven.send(bytes, this.remoteIpAddress, this.remotePortEven); socketOdd.send(bytes, this.remoteIpAddress, this.remotePortOdd); } catch (IOException e) { e.printStackTrace(); } return; } public void reConnect(SelectionKey key) throws IOException { if (isConnected()) { return; } // 完成SocketChannel的连接 socketChannel.finishConnect(); while (!socketChannel.isConnected()) { try { Thread.sleep(300); } catch (final InterruptedException e) { e.printStackTrace(); } socketChannel.finishConnect(); } } }ReceiveSocket:用来接收服务器发来的RTP和RTCP协议数据,只是简单地对UDP进行了包装而已
import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketException; import java.net.UnknownHostException; public class ReceiveSocket implements Runnable{ private DatagramSocket ds; public ReceiveSocket(String localAddress, int port){ try { InetSocketAddress addr = new InetSocketAddress("192.168.31.106", port); ds = new DatagramSocket(addr);//监听16264端口 } catch (SocketException e) { e.printStackTrace(); } } @Override public void run() { // TODO Auto-generated method stub while(true){ byte[] buf = new byte[20]; DatagramPacket dp = new DatagramPacket(buf,buf.length); try { ds.receive(dp); String ip = dp.getAddress().getHostAddress(); //数据提取 String data = new String(dp.getData(),0,dp.getLength()); int port = dp.getPort(); System.out.println(data+"."+port+".."+ip); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public void send(byte[] buf, String ip, int rec_port) throws IOException { // TODO Auto-generated method stub DatagramPacket dp = new DatagramPacket(buf,buf.length,InetAddress.getByName(ip),rec_port);//10000为定义的端口 ds.send(dp); //ds.close(); } }PlayerClient:播放类,通过不同状态之间的相互转化完成RTSP协议的交互工作。 这里有一点非常关键:请注意setup这个状态,在和服务器建立连接之后,如果直接发送PLAY请求,服务器不会向指定的端口发送RTCP数据(这个问题困扰了我一晚上)。因此在发送PLAY请求之前,client接收RTCP和RTP的两个端口必须先向服务器的RTCP和RTP端口发送任意的数据,发送方式为UDP,服务器在setup操作时已经返回RTCP和RTP的端口信息。具体的实现参考sendBeforePlay()。我在网上没有找到这么操作的原因,这还是通过wireshark对VLC进行抓包才发现这个隐藏逻辑。
import java.io.IOException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Iterator; public class PlayerClient { private RTSPClient rtspClient = new RTSPClient(); private static final String VERSION = " RTSP/1.0\r\n"; private static final String RTSP_OK = "RTSP/1.0 200 OK"; private Selector selector; private enum Status { init, options, describe, setup, play, pause, teardown } private Status sysStatus = Status.init; private String rtspAddress = "rtsp://218.204.223.237:554/live/1/66251FC11353191F/e7ooqwcfbqjoo80j.sdp"; private String localAddress = "192.168.31.106"; private int localPort=0; private String remoteAddress = "218.204.223.237"; private int count=0; private String sessionid; private String trackInfo; private boolean isSended=true; private int localPortOdd=50002; private int localPortEven=50003; private ReceiveSocket socket1 = new ReceiveSocket(localAddress,localPortOdd); private ReceiveSocket socket2 = new ReceiveSocket(localAddress,localPortEven); public void init(){ rtspClient.setLocalIpAddress(localAddress); rtspClient.setLocalPort(localPort); rtspClient.setRemoteIpAddress(remoteAddress); rtspClient.setRemoteIPort(554); rtspClient.setRtspAddress(rtspAddress); rtspClient.setLocalPortEven(this.localPortEven); rtspClient.setLocalPortOdd(this.localPortOdd); rtspClient.addSocket(this.localPortOdd, socket1); rtspClient.addSocket(this.localPortEven, socket2); try { rtspClient.inital(); } catch (IOException e) { e.printStackTrace(); } this.selector = rtspClient.getSelector(); new Thread(socket1).start(); new Thread(socket2).start(); } public void run() throws IOException{ int seq=2; while(true){ if(rtspClient.isConnected() && isSended){ switch (sysStatus) { case init: byte[] message = RTSPProtocal.encodeOption(this.rtspAddress, this.VERSION, seq); this.rtspClient.write(message); break; case options: seq++; message = RTSPProtocal.encodeDescribe(this.rtspAddress, this.VERSION, seq); this.rtspClient.write(message); break; case describe: seq++; message = RTSPProtocal.encodeSetup(this.rtspAddress, VERSION, sessionid, localPortEven, localPortOdd,seq, trackInfo); this.rtspClient.write(message); break; case setup: if(sessionid==null&&sessionid.length()>0){ System.out.println("setup还没有正常返回"); }else{ seq++; message = RTSPProtocal.encodePlay(this.rtspAddress, VERSION, sessionid, seq); this.rtspClient.write(message); } break; case play: count++; System.out.println("count: "+count); break; case pause: break; default: break; } isSended=false; } else{ } select(); } } private void handle(byte[] msg) { String tmp = new String(msg); System.out.println("返回内容:"+tmp); if (tmp.startsWith(RTSP_OK)) { switch (sysStatus) { case init: sysStatus = Status.options; System.out.println("option ok"); isSended=true; break; case options: sysStatus = Status.describe; trackInfo=tmp.substring(tmp.indexOf("trackID")); System.out.println("describe ok"); isSended=true; break; case describe: sessionid = tmp.substring(tmp.indexOf("Session: ") + 9, tmp .indexOf("Date:")); int index = tmp.indexOf("server_port="); String serverPort1 = tmp.substring(tmp.indexOf("server_port=") + 12, tmp .indexOf("-", index)); String serverPort2 = tmp.substring(tmp.indexOf("-", index) + 1, tmp .indexOf("\r\n", index)); this.rtspClient.setRemotePortEven(Integer.valueOf(serverPort1)); this.rtspClient.setRemotePortOdd(Integer.valueOf(serverPort2)); if(sessionid!=null&&sessionid.length()>0){ sysStatus = Status.setup; System.out.println("setup ok"); } isSended=true; break; case setup: sysStatus = Status.play; System.out.println("play ok"); this.rtspClient.sendBeforePlay(); this.rtspClient.sendBeforePlay(); isSended=true; break; case play: //sysStatus = Status.pause; System.out.println("pause ok"); isSended=true; break; case pause: sysStatus = Status.teardown; System.out.println("teardown ok"); isSended=true; //shutdown.set(true); break; case teardown: sysStatus = Status.init; System.out.println("exit start"); isSended=true; break; default: break; } } else { System.out.println("返回错误:" + tmp); } } private void select() { int n = 0; try { if (selector == null) { return; } n = selector.select(1000); } catch (final Exception e) { e.printStackTrace(); } // 如果select返回大于0,处理事件 if (n > 0) { for (final Iterator<SelectionKey> i = selector.selectedKeys() .iterator(); i.hasNext();) { // 得到下一个Key final SelectionKey sk = i.next(); i.remove(); // 检查其是否还有效 if (!sk.isValid()) { continue; } if (sk.isReadable()) { byte[] message = rtspClient.receive(); handle(message); } if (sk.isConnectable()) { try { rtspClient.reConnect(sk); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } } }Test :测试类
public class Test { public static void main(String[] args){ PlayerClient player = new PlayerClient(); player.init(); try { player.run(); } catch (IOException e) { e.printStackTrace(); } } }只要在ReceiveSocket的run方法中打断点,你就会发现源源不断的数据向你发来,是不是感觉很爽,哈哈哈。