最近对于流媒体技术比较感兴趣,虽然读书的时候学过相关方面的基础知识,但是大学上课,你懂得,一方面理论与实际脱节很严重,另一方面考试完全就是突击。学了和没学一样。好了,吐槽结束,书归正文。
研究流媒体技术的前提是先明白三个协议,RTSP,RTCP和RTP。关于这三种协议具体的定义百度上可以说是一抓一大把。总的来说, RTSP控制负责控制,包括创建,播放和暂停等操作,RTCP和RTP可以认为是一种协议,最大的区别 是RTCP中没有负载(payload,也就是媒体数据流),RTP则包含了负载。RTCP主要负责传输server和client的状态,如已经接收了多少数据,时间戳是什么,而RTP主要作用就是传输流媒体数据。
大部分对于RTSP都提到了这一个词:“RTSP是文本协议”,这句话是什么意思?通俗点说,如果你想告诉服务器你的名字,你首先构建一个类似于name="xxxxx"的字符串,然后把这个字符串转成byte[],经过SOCKET传给服务器,服务器就能够知道你的名字了。与之形成对比的是RTCP,RTCP规定了每个比特的每一位都代表什么,例如一个RTCP包的第一个比特的前两位代表版本,第三位用来填充,而第二个比特代表这次会话的序列号。坦率的说,实现RTCP协议可比RTSP烧脑多了。
回到RTSP这个话题,RTSP协议包含以下几种操作,option,describe,setup,play,pause和teardown。option是询问服务器你能提供什么方法,describe则是获取服务器的详细信息,setup是与服务器建立连接,服务器返回一个sessionid用来之后进行鉴权,play就是通知服务器可以发数据了,pause则是通知服务器暂停发数据,teardown,挥泪告别,さようなら。
如果你在百度上搜索过如下的关键字:RTSP java。你会发现有人已经实现了RTSP协议,如果你真的使用了那份代码,恭喜你,你踩到坑啦。大部分转载的人并没有对转载的内容进行验证。我被网上的这份代码坑了号就,今天刚刚出坑,特此记录。
RTSPProtocal:RTSP协议类,主要负责创建RTSP文本
public class RTSPProtocal {RTSPClient:使用RTSPProtocal中的静态方法获取字符创,拥有发送和接收数据的功能
public static byte[] encodeOption(String address, String VERSION, int seq) {
StringBuilder sb = new StringBuilder();
sb.append("OPTIONS ");
sb.append(address.substring(0, address.lastIndexOf("/")));
sb.append(VERSION);
sb.append("Cseq: ");
sb.append(seq);
sb.append("\r\n");
sb.append("\r\n");
System.out.println(sb.toString());
//send(sb.toString().getBytes());
return sb.toString().getBytes();
}
public static byte[] encodeDescribe(String address, String VERSION, int seq) {
StringBuilder sb = new StringBuilder();
sb.append("DESCRIBE ");
sb.append(address);
sb.append(VERSION);
sb.append("Cseq: ");
sb.append(seq);
sb.append("\r\n");
sb.append("\r\n");
System.out.println(sb.toString());
//send(sb.toString().getBytes());
return sb.toString().getBytes();
}
public static byte[] encodeSetup(String address, String VERSION, String sessionid,
int portOdd, int portEven, int seq, String trackInfo) {
StringBuilder sb = new StringBuilder();
sb.append("SETUP ");
sb.append(address);
sb.append("/");
sb.append(trackInfo);
sb.append(VERSION);
sb.append("Cseq: ");
sb.append(seq++);
sb.append("\r\n");
//"50002-50003"
sb.append("Transport: RTP/AVP;UNICAST;client_port="+portEven+"-"+portOdd+";mode=play\r\n");
sb.append("\r\n");
System.out.println(sb.toString());
System.out.println(sb.toString());
//send(sb.toString().getBytes());
return sb.toString().getBytes();
}
public static byte[] encodePlay(String address, String VERSION, String sessionid, int seq) {
StringBuilder sb = new StringBuilder();
sb.append("PLAY ");
sb.append(address);
sb.append(VERSION);
sb.append("Session: ");
sb.append(sessionid);
sb.append("Cseq: ");
sb.append(seq);
sb.append("\r\n");
sb.append("Range: npt=0.000-");
sb.append("\r\n");
sb.append("\r\n");
System.out.println(sb.toString());
//send(sb.toString().getBytes());
return sb.toString().getBytes();
}
public static byte[] encodePause(String address, String VERSION, String sessionid, int seq) {
StringBuilder sb = new StringBuilder();
sb.append("PAUSE ");
sb.append(address);
sb.append("/");
sb.append(VERSION);
sb.append("Cseq: ");
sb.append(seq);
sb.append("\r\n");
sb.append("Session: ");
sb.append(sessionid);
sb.append("\r\n");
System.out.println(sb.toString());
//send(sb.toString().getBytes());
return sb.toString().getBytes();
}
public static byte[] encodeTeardown(String address, String VERSION, String sessionid, int seq) {
StringBuilder sb = new StringBuilder();
sb.append("TEARDOWN ");
sb.append(address);
sb.append("/");
sb.append(VERSION);
sb.append("Cseq: ");
sb.append(seq);
sb.append("\r\n");
sb.append("User-Agent: LibVLC/2.2.1 (LIVE555 Streaming Media v2014.07.25)\r\n");
sb.append("Session: ");
sb.append(sessionid);
sb.append("\r\n");
System.out.println(sb.toString());
return sb.toString().getBytes();
//send(sb.toString().getBytes());
//
}
}
import java.io.IOException;ReceiveSocket:用来接收服务器发来的RTP和RTCP协议数据,只是简单地对UDP进行了包装而已
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Map;
public class RTSPClient {
private static final int BUFFER_SIZE = 8192;
private String localIpAddress;
private String remoteIpAddress;
private int localPort;
private int localPortOdd;
private int localPortEven;
private int remoteIPort;
private int remotePortOdd;
private int remotePortEven;
private Map<Integer, ReceiveSocket> map = new HashMap<>();
public int getRemotePortOdd() {
return remotePortOdd;
}
public void setRemotePortOdd(int remotePortOdd) {
this.remotePortOdd = remotePortOdd;
}
public int getRemotePortEven() {
return remotePortEven;
}
public void setRemotePortEven(int remotePortEven) {
this.remotePortEven = remotePortEven;
}
public void addSocket(Integer port, ReceiveSocket socket){
map.put(port, socket);
}
private String rtspAddress;
private Socket tcpSocket;
private SocketChannel socketChannel;
private Selector selector;
public String getLocalIpAddress() {
return localIpAddress;
}
public void setLocalIpAddress(String localIpAddress) {
this.localIpAddress = localIpAddress;
}
public int getLocalPort() {
return localPort;
}
public void setLocalPort(int localPort) {
this.localPort = localPort;
}
public int getLocalPortOdd() {
return localPortOdd;
}
public void setLocalPortOdd(int localPortOdd) {
this.localPortOdd = localPortOdd;
}
public int getLocalPortEven() {
return localPortEven;
}
public void setLocalPortEven(int localPortEven) {
this.localPortEven = localPortEven;
}
public String getRtspAddress() {
return rtspAddress;
}
public void setRtspAddress(String rtspAddress) {
this.rtspAddress = rtspAddress;
}
public Socket getTcpSocket() {
return tcpSocket;
}
public void setTcpSocket(Socket tcpSocket) {
this.tcpSocket = tcpSocket;
}
public String getRemoteIpAddress() {
return remoteIpAddress;
}
public void setRemoteIpAddress(String remoteIpAddress) {
this.remoteIpAddress = remoteIpAddress;
}
public int getRemoteIPort() {
return remoteIPort;
}
public void setRemoteIPort(int remoteIPort) {
this.remoteIPort = remoteIPort;
}
public Selector getSelector() {
return selector;
}
public void setSelector(Selector selector) {
this.selector = selector;
}
//new InetSocketAddress(
//remoteIp, 554),
//new InetSocketAddress("192.168.31.106", 0),
//"rtsp://218.204.223.237:554/live/1/66251FC11353191F/e7ooqwcfbqjoo80j.sdp"
public void inital() throws IOException{
socketChannel = SocketChannel.open();
socketChannel.socket().setSoTimeout(30000);
socketChannel.configureBlocking(false);
InetSocketAddress localAddress = new InetSocketAddress(this.localIpAddress, localPort);
InetSocketAddress remoteAddress=new InetSocketAddress(this.remoteIpAddress, 554);
socketChannel.socket().bind(localAddress);
if (socketChannel.connect(remoteAddress)) {
System.out.println("开始建立连接:" + remoteAddress);
}
if (selector == null) {
// 创建新的Selector
try {
selector = Selector.open();
} catch (final IOException e) {
e.printStackTrace();
}
}
socketChannel.register(selector, SelectionKey.OP_CONNECT
| SelectionKey.OP_READ | SelectionKey.OP_WRITE, this);
System.out.println("端口打开成功");
}
public void write(byte[] out) throws IOException {
if (out == null || out.length < 1) {
return;
}
System.out.println(out.toString());
ByteBuffer sendBuf = ByteBuffer.allocateDirect(BUFFER_SIZE);
sendBuf.clear();
sendBuf.put(out);
sendBuf.flip();
if (isConnected()) {
try {
socketChannel.write(sendBuf);
} catch (final IOException e) {
}
} else {
System.out.println("通道为空或者没有连接上");
}
}
public boolean isConnected() {
return socketChannel != null && socketChannel.isConnected();
}
public byte[] receive() {
if (isConnected()) {
try {
int len = 0;
int readBytes = 0;
ByteBuffer receiveBuf = ByteBuffer.allocateDirect(BUFFER_SIZE);
synchronized (receiveBuf) {
receiveBuf.clear();
try {
while ((len = socketChannel.read(receiveBuf)) > 0) {
readBytes += len;
}
} finally {
receiveBuf.flip();
}
if (readBytes > 0) {
final byte[] tmp = new byte[readBytes];
receiveBuf.get(tmp);
return tmp;
} else {
System.out.println("接收到数据为空,重新启动连接");
return null;
}
}
} catch (final IOException e) {
System.out.println("接收消息错误:");
}
} else {
System.out.println("端口没有连接");
}
return null;
}
/*
* 非常重要
* */
public void sendBeforePlay(){
ReceiveSocket socketEven = map.get(this.localPortEven);
ReceiveSocket socketOdd = map.get(this.localPortOdd);
if(socketEven == null){
socketEven = new ReceiveSocket(this.localIpAddress,this.localPortEven);
map.put(this.localPortEven, socketEven);
}
if(socketOdd == null){
socketEven = new ReceiveSocket(this.localIpAddress, this.localPortOdd);
map.put(this.localPortOdd, socketOdd);
}
byte[] bytes = new byte[1];
bytes[0]=0;
try {
socketEven.send(bytes, this.remoteIpAddress, this.remotePortEven);
socketOdd.send(bytes, this.remoteIpAddress, this.remotePortOdd);
} catch (IOException e) {
e.printStackTrace();
}
return;
}
public void reConnect(SelectionKey key) throws IOException {
if (isConnected()) {
return;
}
// 完成SocketChannel的连接
socketChannel.finishConnect();
while (!socketChannel.isConnected()) {
try {
Thread.sleep(300);
} catch (final InterruptedException e) {
e.printStackTrace();
}
socketChannel.finishConnect();
}
}
}
import java.io.IOException;PlayerClient:播放类,通过不同状态之间的相互转化完成RTSP协议的交互工作。 这里有一点非常关键:请注意setup这个状态,在和服务器建立连接之后,如果直接发送PLAY请求,服务器不会向指定的端口发送RTCP数据(这个问题困扰了我一晚上)。因此在发送PLAY请求之前,client接收RTCP和RTP的两个端口必须先向服务器的RTCP和RTP端口发送任意的数据,发送方式为UDP,服务器在setup操作时已经返回RTCP和RTP的端口信息。具体的实现参考sendBeforePlay()。我在网上没有找到这么操作的原因,这还是通过wireshark对VLC进行抓包才发现这个隐藏逻辑。
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
public class ReceiveSocket implements Runnable{
private DatagramSocket ds;
public ReceiveSocket(String localAddress, int port){
try {
InetSocketAddress addr = new InetSocketAddress("192.168.31.106", port);
ds = new DatagramSocket(addr);//监听16264端口
} catch (SocketException e) {
e.printStackTrace();
}
}
@Override
public void run() {
// TODO Auto-generated method stub
while(true){
byte[] buf = new byte[20];
DatagramPacket dp = new DatagramPacket(buf,buf.length);
try
{
ds.receive(dp);
String ip = dp.getAddress().getHostAddress(); //数据提取
String data = new String(dp.getData(),0,dp.getLength());
int port = dp.getPort();
System.out.println(data+"."+port+".."+ip);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public void send(byte[] buf, String ip, int rec_port) throws IOException {
// TODO Auto-generated method stub
DatagramPacket dp = new DatagramPacket(buf,buf.length,InetAddress.getByName(ip),rec_port);//10000为定义的端口
ds.send(dp);
//ds.close();
}
}
import java.io.IOException;Test :测试类
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
public class PlayerClient {
private RTSPClient rtspClient = new RTSPClient();
private static final String VERSION = " RTSP/1.0\r\n";
private static final String RTSP_OK = "RTSP/1.0 200 OK";
private Selector selector;
private enum Status {
init, options, describe, setup, play, pause, teardown
}
private Status sysStatus = Status.init;
private String rtspAddress = "rtsp://218.204.223.237:554/live/1/66251FC11353191F/e7ooqwcfbqjoo80j.sdp";
private String localAddress = "192.168.31.106";
private int localPort=0;
private String remoteAddress = "218.204.223.237";
private int count=0;
private String sessionid;
private String trackInfo;
private boolean isSended=true;
private int localPortOdd=50002;
private int localPortEven=50003;
private ReceiveSocket socket1 = new ReceiveSocket(localAddress,localPortOdd);
private ReceiveSocket socket2 = new ReceiveSocket(localAddress,localPortEven);
public void init(){
rtspClient.setLocalIpAddress(localAddress);
rtspClient.setLocalPort(localPort);
rtspClient.setRemoteIpAddress(remoteAddress);
rtspClient.setRemoteIPort(554);
rtspClient.setRtspAddress(rtspAddress);
rtspClient.setLocalPortEven(this.localPortEven);
rtspClient.setLocalPortOdd(this.localPortOdd);
rtspClient.addSocket(this.localPortOdd, socket1);
rtspClient.addSocket(this.localPortEven, socket2);
try
{
rtspClient.inital();
} catch (IOException e) {
e.printStackTrace();
}
this.selector = rtspClient.getSelector();
new Thread(socket1).start();
new Thread(socket2).start();
}
public void run() throws IOException{
int seq=2;
while(true){
if(rtspClient.isConnected() && isSended){
switch (sysStatus) {
case init:
byte[] message = RTSPProtocal.encodeOption(this.rtspAddress, this.VERSION, seq);
this.rtspClient.write(message);
break;
case options:
seq++;
message = RTSPProtocal.encodeDescribe(this.rtspAddress, this.VERSION, seq);
this.rtspClient.write(message);
break;
case describe:
seq++;
message = RTSPProtocal.encodeSetup(this.rtspAddress, VERSION, sessionid,
localPortEven, localPortOdd,seq, trackInfo);
this.rtspClient.write(message);
break;
case setup:
if(sessionid==null&&sessionid.length()>0){
System.out.println("setup还没有正常返回");
}else{
seq++;
message = RTSPProtocal.encodePlay(this.rtspAddress, VERSION, sessionid, seq);
this.rtspClient.write(message);
}
break;
case play:
count++;
System.out.println("count: "+count);
break;
case pause:
break;
default:
break;
}
isSended=false;
}
else{
}
select();
}
}
private void handle(byte[] msg) {
String tmp = new String(msg);
System.out.println("返回内容:"+tmp);
if (tmp.startsWith(RTSP_OK)) {
switch (sysStatus) {
case init:
sysStatus = Status.options;
System.out.println("option ok");
isSended=true;
break;
case options:
sysStatus = Status.describe;
trackInfo=tmp.substring(tmp.indexOf("trackID"));
System.out.println("describe ok");
isSended=true;
break;
case describe:
sessionid = tmp.substring(tmp.indexOf("Session: ") + 9, tmp
.indexOf("Date:"));
int index = tmp.indexOf("server_port=");
String serverPort1 = tmp.substring(tmp.indexOf("server_port=") + 12, tmp
.indexOf("-", index));
String serverPort2 = tmp.substring(tmp.indexOf("-", index) + 1, tmp
.indexOf("\r\n", index));
this.rtspClient.setRemotePortEven(Integer.valueOf(serverPort1));
this.rtspClient.setRemotePortOdd(Integer.valueOf(serverPort2));
if(sessionid!=null&&sessionid.length()>0){
sysStatus = Status.setup;
System.out.println("setup ok");
}
isSended=true;
break;
case setup:
sysStatus = Status.play;
System.out.println("play ok");
this.rtspClient.sendBeforePlay();
this.rtspClient.sendBeforePlay();
isSended=true;
break;
case play:
//sysStatus = Status.pause;
System.out.println("pause ok");
isSended=true;
break;
case pause:
sysStatus = Status.teardown;
System.out.println("teardown ok");
isSended=true;
//shutdown.set(true);
break;
case teardown:
sysStatus = Status.init;
System.out.println("exit start");
isSended=true;
break;
default:
break;
}
} else {
System.out.println("返回错误:" + tmp);
}
}
private void select() {
int n = 0;
try
{
if (selector == null) {
return;
}
n = selector.select(1000);
} catch (final Exception e) {
e.printStackTrace();
}
// 如果select返回大于0,处理事件
if (n > 0) {
for (final Iterator<SelectionKey> i = selector.selectedKeys()
.iterator(); i.hasNext();) {
// 得到下一个Key
final SelectionKey sk = i.next();
i.remove();
// 检查其是否还有效
if (!sk.isValid()) {
continue;
}
if (sk.isReadable()) {
byte[] message = rtspClient.receive();
handle(message);
}
if (sk.isConnectable()) {
try {
rtspClient.reConnect(sk);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
}
public class Test {只要在ReceiveSocket的run方法中打断点,你就会发现源源不断的数据向你发来,是不是感觉很爽,哈哈哈。
public static void main(String[] args){
PlayerClient player = new PlayerClient();
player.init();
try
{
player.run();
} catch (IOException e) {
e.printStackTrace();
}
}
}