MQTT协议笔记之头部信息

时间:2022-09-02 10:14:56

转载地址:http://www.blogjava.net/yongboy/archive/2014/02/07/409587.html


前言

MQTT(Message Queue Telemetry Transport),遥测传输协议,提供订阅/发布模式,更为简约、轻量,易于使用,针对受限环境(带宽低、网络延迟高、网络通信不稳定),可以简单概括为物联网打造,官方总结特点如下:

1.使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。
2. 对负载内容屏蔽的消息传输。
3. 使用 TCP/IP 提供网络连接。
4. 有三种消息发布服务质量:
“至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
“至少一次”,确保消息到达,但消息重复可能会发生。
“只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
5. 小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量。
6. 使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。

MQTT 3.1协议在线版本: http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html

官方下载地址: http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/MQTT_V3.1_Protocol_Specific.pdf

PDF版本,42页,不算多。

另外,目前MQTT大家都用在了手机推送,可能还有很多的使用方式,有待进一步的探索。

协议方面,以前曾简单实现过一点HTTP协议,基于HTTP上构建若干种通信管道的socket.io协议,不过socket.io 0.9版本的协议才两三页而已。面对领域不同,自然解决的方式也不一样。

阅读完毕MQTT协议,有一个想法,其实可以基于MQTT协议,打造更加私有、精简(协议一些地方,略显多余)的传输协议,比如一个字节的传输开销。有时间,会详细说一下。

固定头部

固定头部,使用两个字节,共16位:

bit 7 6 5 4 3 2 1 0
byte 1 Message Type DUP flag QoS level RETAIN
byte 2 Remaining Length

第一个字节(byte 1)

消息类型(4-7),使用4位二进制表示,可代表16种消息类型:

Mnemonic Enumeration Description
Reserved 0 Reserved
CONNECT 1 Client request to connect to Server
CONNACK 2 Connect Acknowledgment
PUBLISH 3 Publish message
PUBACK 4 Publish Acknowledgment
PUBREC 5 Publish Received (assured delivery part 1)
PUBREL 6 Publish Release (assured delivery part 2)
PUBCOMP 7 Publish Complete (assured delivery part 3)
SUBSCRIBE 8 Client Subscribe request
SUBACK 9 Subscribe Acknowledgment
UNSUBSCRIBE 10 Client Unsubscribe request
UNSUBACK 11 Unsubscribe Acknowledgment
PINGREQ 12 PING Request
PINGRESP 13 PING Response
DISCONNECT 14 Client is Disconnecting
Reserved 15 Reserved

除去0和15位置属于保留待用,共14种消息事件类型。

DUP flag(打开标志)

保证消息可靠传输,默认为0,只占用一个字节,表示第一次发送。不能用于检测消息重复发送等。只适用于客户端或服务器端尝试重发PUBLISH, PUBREL, SUBSCRIBE 或 UNSUBSCRIBE消息,注意需要满足以下条件:

 当QoS > 0
消息需要回复确认

此时,在可变头部需要包含消息ID。当值为1时,表示当前消息先前已经被传送过。

QoS(Quality of Service,服务质量)

使用两个二进制表示PUBLISH类型消息:

QoS value bit 2 bit 1 Description
0 0 0 至多一次 发完即丢弃 <=1
1 0 1 至少一次 需要确认回复 >=1
2 1 0 只有一次 需要确认回复 =1
3 1 1 待用,保留位置

RETAIN(保持)

仅针对PUBLISH消息。不同值,不同含义:

1:表示发送的消息需要一直持久保存(不受服务器重启影响),不但要发送给当前的订阅者,并且以后新来的订阅了此Topic name的订阅者会马上得到推送。

备注:新来乍到的订阅者,只会取出最新的一个RETAIN flag = 1的消息推送。

0:仅仅为当前订阅者推送此消息。

假如服务器收到一个空消息体(zero-length payload)、RETAIN = 1、已存在Topic name的PUBLISH消息,服务器可以删除掉对应的已被持久化的PUBLISH消息。

如何解析

因为java使用有符号(最高位为符号位)数据表示,byte范围:-128-127。该字节的最高位(左边第一位),可能为1。若直接转换为byte类型,会出现负数,这是一个雷区。DataInputStream提供了int readUnsignedByte()读取方式,请注意。下面演示了,如何从一个字节中,获取到所有定义的信息,同时绕过雷区:

public static void main(String[] args) {
byte publishFixHeader = 50;// 0 0 1 1 0 0 1 0

doGetBit(publishFixHeader);
int ori = 224;//1110000,DISCONNECT ,Message Type (14)
byte flag = (byte) ori; //有符号byte
doGetBit(flag);
doGetBit_v2(ori);
}


public static void doGetBit(byte flags) {
boolean retain = (flags & 1) > 0;
int qosLevel = (flags & 0x06) >> 1;
boolean dupFlag = (flags & 8) > 0;
int messageType = (flags >> 4) & 0x0f;

System.out.format(
"Message type:%d, DUP flag:%s, QoS level:%d, RETAIN:%s\n",
messageType, dupFlag, qosLevel, retain);
}

public static void doGetBit_v2(int flags) {
boolean retain = (flags & 1) > 0;
int qosLevel = (flags & 0x06) >> 1;
boolean dupFlag = (flags & 8) > 0;
int messageType = flags >> 4;

System.out.format(
"Message type:%d, DUP flag:%s, QoS level:%d, RETAIN:%s\n",
messageType, dupFlag, qosLevel, retain);
}

处理Remaining Length(剩余长度)

在当前消息中剩余的byte(字节)数,包含可变头部和负荷(称之为内容/body,更为合适)。单个字节最大值:01111111,16进制:0x7F,10进制为127。单个字节为什么不能是11111111(0xFF)呢?因为MQTT协议规定,第八位(最高位)若为1,则表示还有后续字节存在。同时MQTT协议最多允许4个字节表示剩余长度。那么最大长度为:0xFF,0xFF,0xFF,0x7F,二进制表示为:11111111,11111111,11111111,01111111,十进制:268435455 byte=261120KB=256MB=0.25GB 四个字节之间值的范围:

Digits From To
1 0 (0x00) 127 (0x7F)
2 128 (0x80, 0x01) 16 383 (0xFF, 0x7F)
3 16 384 (0x80, 0x80, 0x01) 2 097 151 (0xFF, 0xFF, 0x7F)
4 2 097 152 (0x80, 0x80, 0x80, 0x01) 268 435 455 (0xFF, 0xFF, 0xFF, 0x7F)

如何换算成十进制呢 ? 使用java语言表示如下:

public static void main(String[] args) throws IOException {
// 模拟客户端写入
ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(arrayOutputStream);
dataOutputStream.write(0xff);
dataOutputStream.write(0xff);
dataOutputStream.write(0xff);
dataOutputStream.write(0x7f);

InputStream arrayInputStream = new ByteArrayInputStream(arrayOutputStream.toByteArray());

// 模拟服务器/客户端解析
System. out.println( "result is " + bytes2Length(arrayInputStream));
}

/**
* 转化字节为 int类型长度
* @param in
* @return
* @throws IOException
*/

private static int bytes2Length(InputStream in) throws IOException {
int multiplier = 1;
int length = 0;
int digit = 0;
do {
digit = in.read(); //一个字节的有符号或者无符号,转换转换为四个字节有符号 int类型
length += (digit & 0x7f) * multiplier;
multiplier *= 128;
} while ((digit & 0x80) != 0);

return length;
}

一般最后一个字节小于127(01111111),和0x80(10000000)进行&操作,最终结果都为0,因此计算会终止。代理中间件和请求者,中间传递的是字节流Stream,自然要从流中读取,逐一解析出来。

那么如何将int类型长度解析为不确定的字节值呢?

public static void main(String[] args) throws IOException {
// 模拟服务器/客户端写入
ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(
arrayOutputStream);

// 模拟服务器/客户端解析
length2Bytes(dataOutputStream, 128);
}

/**
* int类型长度解析为1-4个字节
* @param out
* @param length
* @throws IOException
*/

private static void length2Bytes(OutputStream out, int length)
throws IOException
{
int val = length;
do {
int digit = val % 128;
val = val / 128;
if (val > 0)
digit = digit | 0x80;

out.write(digit);
} while (val > 0);
}

digit对val求模,最大值可能是127,一旦127 | 10000000 = 11111111 = 0xff = 255 请注意:剩余长度,只在固定头部中,无论是一个字节,还是四个字节,不能被算作可变头部中。

可变头部

固定头部仅定义了消息类型和一些标志位,一些消息的元数据,需要放入可变头部中。可变头部内容字节长度 + Playload/负荷字节长度 = 剩余长度,这个是需要牢记的。可变头部,包含了协议名称,版本号,连接标志,用户授权,心跳时间等内容,这部分和后面要讲到的CONNECT消息类型,有重复,暂时略过。

Playload/消息体/负荷

消息体主要是为配合固定/可变头部命令(比如CONNECT可变头部User name标记若为1则需要在消息体中附加用户名称字符串)而存在。

CONNECT/SUBSCRIBE/SUBACK/PUBLISH等消息有消息体。PUBLISH的消息体以二进制形式对待。

请记住,MQTT协议只允许在PUBLISH类型消息体中使用自定义特性,在固定/可变头部想加入自定义私有特性,就免了吧。这也是为了协议免于流于形式,变得很分裂也为了兼顾现有客户端等。比如支持压缩等,那就可以在Playload中定义数据支持,在应用中进行读取处理。

这部分会在后面详细论述。

消息标识符/消息ID

固定头中的QoS level标志值为1或2时才会在:PUBLISH,PUBACK,PUBREC,PUBREL,PUBCOMP,SUBSCRIBE,SUBACK,UNSUBSCRIBE,UNSUBACK等消息的可变头中出现。

一个16位无符号位的short类型值(值不能为 0,0做保留作为无效的消息ID),仅仅要求在一个特定方向(服务器发往客户端为一个方向,客户端发送到服务器端为另一个方向)的通信消息中必须唯一。比如客户端发往服务器,有可能存在服务器发往客户端会同时存在重复,但不碍事。

可变头部中,需要两个字节的顺序是MSB(Most Significant Bit) LSB(Last/Least Significant Bit),翻译成中文就是,最高有效位,最低有效位。最高有效位在最低有效位左边/上面,表示这是一个大端字节/网络字节序,符合人的阅读习惯,高位在最左边。

bit 7 6 5 4 3 2 1 0
  Message Identifier MSB
  Message Identifier LSB

但凡如此表示的,都可以视为一个16位无符号short类型整数,两个字节表示。在JAVA中处理比较简单:

DataInputStream.readUnsignedShort

或者

in.read() * 0xFF + in.read();

最大长度可为: 65535

UTF-8编码

有关字符串,MQTT采用的是修改版的UTF-8编码,一般形式为如下,需要牢记:

bit 7 6 5 4 3 2 1 0
byte 1 String Length MSB
byte 2 String Length LSB
bytes 3 ... Encoded Character Data

比如AVA,使用writeUTF()方法写入一串文字“OTWP”,头两个字节为一个完整的无符号数字,代表字符串字节长度,后面四个字节才是字符串真正的长度,共六个字节:

bit 7 6 5 4 3 2 1 0
byte 1 Message Length MSB (0x00)
  0 0 0 0 0 0 0 0
byte 2 Message Length LSB (0x04)
  0 0 0 0 0 1 0 0
byte 3 'O' (0x4F)
  0 1 0 0 1 1 1 1
byte 4 'T' (0x54)
  0 1 0 1 0 1 0 0
byte 5 'W' (0x57)
  0 1 0 1 0 1 1 1
byte 6 'P' (0x50)
  0 1 0 1 0 0 0 0

这点,在程序中,可不用单独处理默认,直接使用readUTF()方法,可自动省去了处理字符串长度的麻烦。当然,可以手动读取字符串:

// 模拟写入
dataOutputStream.writeUTF( "abcd");// 2 + 4 = 6 byte
......
// 模拟读取
int decodedLength = dataInputStream.readUnsignedShort();//2 byte
byte[] decodedString = new byte[decodedLength]; // 4 bytes
dataInputStream.read(decodedString);
String target = new String(decodedString, "UTF-8");

等同于:

String target = dataInputStream.readUTF();

MQTT无论是可变头部还是消息体中,只要是字符串部分,都是采用了修改版的UTF-8编码,读取和写入,借助DataInputStream/DataOutputStream的帮助,一行语句,略去了手动处理的麻烦。

小结

总之,掌握固定头部的QoS level、RETAIN标记、可变头部的Connect flags作用和意义,对总体理解MQTT作用很大。