unit uMain; interface uses System.SysUtils, System.Types, System.UITypes, System.Classes, System.Variants,IdGlobal, FMX.Types, FMX.Controls, FMX.Forms, FMX.Graphics, FMX.Dialogs, FMX.StdCtrls, IdBaseComponent, IdComponent, IdTCPConnection, IdTCPClient, FMX.Layouts, FMX.Memo, FMX.Controls.Presentation, FMX.Edit; const FClientID='tndsoft'; //客户端ID,需要唯一,最终由设备ID代替 type // Message type. 4 Bit unsigned. TMQTTMessageType = ( Reserved0, //0 Reserved CONNECT, // 1 Client request to connect to Broker CONNACK, // 2 Connect Acknowledgment PUBLISH, // 3 Publish message PUBACK, // 4 Publish Acknowledgment PUBREC, // 5 Publish Received (assured delivery part 1) PUBREL, // 6 Publish Release (assured delivery part 2) PUBCOMP, // 7 Publish Complete (assured delivery part 3) SUBSCRIBE, // 8 Client Subscribe request SUBACK, // 9 Subscribe Acknowledgment UNSUBSCRIBE, // 10 Client Unsubscribe request UNSUBACK, // 11 Unsubscribe Acknowledgment PINGREQ, // 12 PING Request PINGRESP, // 13 PING Response DISCONNECT, // 14 Client is Disconnecting Reserved15 // 15 ); TRemainingLength = Array of Byte; //剩余长度字段,可变,1到4字节 TUTF8Text = Array of Byte; type TMQTTMessage = Record FixedHeader: Byte; RL: TBytes; Data: TBytes; End; {事件} TConnAckEvent = procedure (Sender: TObject; ReturnCode: integer) of object; type TfMQTTForDelphi = class(TForm) Memo1: TMemo; IdTCPClient1: TIdTCPClient; Button1: TButton; Button2: TButton; edtHost: TEdit; edtPort: TEdit; Button3: TButton; Button4: TButton; procedure IdTCPClient1Connected(Sender: TObject); procedure IdTCPClient1Disconnected(Sender: TObject); procedure Button1Click(Sender: TObject); procedure Button2Click(Sender: TObject); procedure Button3Click(Sender: TObject); procedure Button4Click(Sender: TObject); private { Private declarations } public { Public declarations } end; type TClientHandleThread=class(TThread) private { Private declarations } servercmd:integer; serverMsg:string; FCurrentData: TMQTTMessage; FConnAckEvent: TConnAckEvent;//连接返回事件 procedure HandleInput; protected procedure Execute; override; public property OnConnAck : TConnAckEvent read FConnAckEvent write FConnAckEvent; end; var fMQTTForDelphi: TfMQTTForDelphi; myClientHandleThread:TClientHandleThread; function FixedHeader(MessageType: TMQTTMessageType; Dup, Qos,Retain: Word): Byte; function VariableHeader_Connect(KeepAlive: Word): TBytes; procedure CopyIntoArray(var DestArray: Array of Byte; SourceArray: Array of Byte; StartIndex: integer); function StrToBytes(str: string; perpendLength: boolean): TUTF8Text; procedure AppendArray(var Dest: TUTF8Text; Source: Array of Byte); function RemainingLength(x: Integer): TRemainingLength; function BuildCommand(FixedHeader: Byte; RemainL: TRemainingLength; VariableHead: TBytes; Payload: Array of Byte): TBytes; procedure AppendBytes(var DestArray: TBytes;const NewBytes: TBytes); function RemainingLengthToInt(RLBytes: TBytes): Integer; implementation {$R *.fmx} {$R *.NmXhdpiPh.fmx ANDROID} {$R *.XLgXhdpiTb.fmx ANDROID} {$R *.SmXhdpiPh.fmx ANDROID} procedure TClientHandleThread.Execute; var CurrentMessage:TMQTTMessage; vBuffer:TIdBytes; Buffer: TBytes; RLInt: Integer; I:Integer; begin while not Terminated do begin if not fMQTTForDelphi.IdTCPClient1.Connected then Terminate else try CurrentMessage.FixedHeader := 0; CurrentMessage.RL := nil; CurrentMessage.Data := nil; CurrentMessage.FixedHeader:=fMQTTForDelphi.IdTCPClient1.IOHandler.ReadByte; //读取FixedHdader {读取剩余长度-编码过} SetLength(CurrentMessage.RL,1); SetLength(Buffer,1); CurrentMessage.RL[0]:=fMQTTForDelphi.IdTCPClient1.IOHandler.ReadByte; //读取剩余长度第一位 for i := 1 to 3 do //读取剩余长度其他位数,剩余长度为可变长度1-4. begin if (( CurrentMessage.RL[i - 1] and 128) <> 0) then begin Buffer[0] := fMQTTForDelphi.IdTCPClient1.IOHandler.ReadByte; AppendBytes(CurrentMessage.RL, Buffer); end else Break; end; {解码剩余长度} RLInt := RemainingLengthToInt(CurrentMessage.RL); {将剩余长度的数据全部读出} if (RLInt > 0) then begin SetLength(CurrentMessage.Data, RLInt); fMQTTForDelphi.IdTCPClient1.IOHandler.ReadBytes(vBuffer,RLInt,True); CurrentMessage.Data:=TBytes(vBuffer); // FPSocket^.RecvBufferEx(Pointer(CurrentMessage.Data), RLInt, 1000); end; FCurrentData := CurrentMessage; Synchronize(HandleInput); except end; try servercmd:=fMQTTForDelphi.IdTCPClient1.IOHandler.ReadWord; Synchronize(HandleInput); except end; end; end; procedure TClientHandleThread.HandleInput; var MessageType: Byte; DataLen: integer; QoS: integer; sErrCode:string; Topic: string; Payload: string; ResponseVH: TBytes; ConnectReturn: Integer; begin if (FCurrentData.FixedHeader <> 0) then begin MessageType := FCurrentData.FixedHeader shr 4; if (MessageType = Ord(CONNACK)) then begin // Check if we were given a Connect Return Code. ConnectReturn := 0; // Any return code except 0 is an Error if ((Length(FCurrentData.Data) > 0) and (Length(FCurrentData.Data) < 4)) then begin ConnectReturn := FCurrentData.Data[1]; case ConnectReturn of 0:sErrCode:='连接已接受'; 1:sErrCode:='连接已拒绝,不支持的协议版本'; 2:sErrCode:='连接已拒绝,不合格的客户端标识符'; 3:sErrCode:='连接已拒绝,服务端不可用'; 4:sErrCode:='连接已拒绝,无效的用户名或密码'; 5:sErrCode:='连接已拒绝,未授权'; end; fMQTTForDelphi.Memo1.Lines.Add(sErrCode); end; //if Assigned(OnConnAck) then OnConnAck(Self, ConnectReturn); end else if (MessageType = Ord(PUBLISH)) then begin // // Read the Length Bytes // DataLen := BytesToStrLength(Copy(FCurrentData.Data, 0, 2)); // // Get the Topic // SetString(Topic, PAnsiChar(@FCurrentData.Data[2]), DataLen); // // Get the Payload // SetString(Payload, PAnsiChar(@FCurrentData.Data[2 + DataLen]), (Length(FCurrentData.Data) - 2 - DataLen)); // if Assigned(OnPublish) then OnPublish(Self, Topic, Payload); end else if (MessageType = Ord(SUBACK)) then begin // // Reading the Message ID // ResponseVH := Copy(FCurrentData.Data, 0, 2); // DataLen := BytesToStrLength(ResponseVH); // // Next Read the Granted QoS // QoS := 0; // if (Length(FCurrentData.Data) - 2) > 0 then // begin // ResponseVH := Copy(FCurrentData.Data, 2, 1); // QoS := ResponseVH[0]; // end; // if Assigned(OnSubAck) then OnSubAck(Self, DataLen, QoS); end else if (MessageType = Ord(UNSUBACK)) then begin // // Read the Message ID for the event handler // ResponseVH := Copy(FCurrentData.Data, 0, 2); // DataLen := BytesToStrLength(ResponseVH); // if Assigned(OnUnSubAck) then OnUnSubAck(Self, DataLen); end else if (MessageType = Ord(PINGRESP)) then begin fMQTTForDelphi.Memo1.Lines.Add('收到心跳应答包。'); // if Assigned(OnPingResp) then OnPingResp(Self); end; end; end; procedure TfMQTTForDelphi.Button1Click(Sender: TObject); begin IdTCPClient1.Host:=edtHost.Text; IdTCPClient1.Port:=StrToInt(edtPort.Text); IdTCPClient1.Connect; end; procedure TfMQTTForDelphi.Button2Click(Sender: TObject); begin myClientHandleThread:=TClientHandleThread.Create(); myClientHandleThread.Resume; end; procedure TfMQTTForDelphi.Button3Click(Sender: TObject); var MqttData_FixedHead:Byte; //固定头 MqttData_VariableHeader:TBytes;//可变头 MqttData_RemainingLength: TRemainingLength;//剩余长度 Payload: TUTF8Text; SendData: TBytes; //构件完成的最终需要发送的二进制数据 begin MqttData_FixedHead:=FixedHeader(CONNECT,0,0,0); MqttData_VariableHeader:= VariableHeader_Connect(40);//构建可变头。参数单位秒,在此时间之内,客户端需要发送 PINGREQ 否则服务端将断开网络连接 {开始构建有效荷载,由于需要认证帐号密码,此处荷载内容为ID,USER,PWD} SetLength(Payload, 0); AppendArray(Payload, StrToBytes(FClientID, true)); //id AppendArray(Payload, StrToBytes('ade', true)); //user AppendArray(Payload, StrToBytes('ade', true)); //pwd {计算剩余长度} MqttData_RemainingLength:=RemainingLength(Length(MqttData_VariableHeader) + Length(Payload)); {组包} SendData:=BuildCommand(MqttData_FixedHead, MqttData_RemainingLength, MqttData_VariableHeader, Payload); {发送} IdTCPClient1.Socket.Write(TIdBytes(SendData)); end; procedure TfMQTTForDelphi.Button4Click(Sender: TObject); var FH: Byte; RL: Byte; Data: TBytes; begin SetLength(Data, 2); FH := FixedHeader(PINGREQ, 0, 0, 0); RL := 0; Data[0] := FH; Data[1] := RL; IdTCPClient1.Socket.Write(TIdBytes(Data)); end; procedure TfMQTTForDelphi.IdTCPClient1Connected(Sender: TObject); begin Memo1.Lines.Add('连接成功'); end; procedure TfMQTTForDelphi.IdTCPClient1Disconnected(Sender: TObject); begin Memo1.Lines.Add('连接断开'); end; function FixedHeader(MessageType: TMQTTMessageType; Dup, Qos,Retain: Word): Byte; //固定头第一个字节 begin { Fixed Header Spec: bit |7 6 5 4 | |3 | |2 1 | | 0 | byte 1 |Message Type| |DUP flag| |QoS level| |RETAIN| } Result := (Ord(MessageType) * 16) + (Dup * 8) + (Qos * 2) + (Retain * 1); end; function RemainingLength(x: Integer): TRemainingLength; //固定头第二个字节,动态长度1-4 var byteindex: integer; digit: integer; begin SetLength(Result, 1); byteindex := 0; while (x > 0) do begin digit := x mod 128; x := x div 128; if x > 0 then begin digit := digit or 128; end; Result[byteindex] := digit; if x > 0 then begin inc(byteindex); SetLength(Result, Length(Result) + 1); end; end; end; function VariableHeader_Connect(KeepAlive: Word): TBytes; const MQTT_PROTOCOL = 'MQTT'; MQTT_VERSION = 4;//V3.1.1协议级别为4.非3了 var Qos, Retain: word; iByteIndex: integer; ProtoBytes: TUTF8Text; ConnectFlag:Byte;//连接标志 begin SetLength(Result, 10); //长度为10 iByteIndex := 0; ProtoBytes := StrToBytes(MQTT_PROTOCOL, true); CopyIntoArray(Result, ProtoBytes, iByteIndex); //协议名 Inc(iByteIndex, Length(ProtoBytes)); Result[iByteIndex] := MQTT_VERSION; //版本号 Inc(iByteIndex); asm mov ConnectFlag,11000010B //UserName,Pwd,CleanSession为1,其余均为0 end; Result[iByteIndex] := ConnectFlag;//连接标志 Inc(iByteIndex); Result[iByteIndex] := 0; //保持连接时间第一位 Inc(iByteIndex); Result[iByteIndex] := KeepAlive; //保持连接时间第二位 end; procedure CopyIntoArray(var DestArray: Array of Byte; SourceArray: Array of Byte; StartIndex: integer); begin Assert(StartIndex >= 0); Move(SourceArray[0], DestArray[StartIndex], Length(SourceArray)); end; function StrToBytes(str: string; perpendLength: boolean): TUTF8Text; var i, offset: integer; begin { This is a UTF-8 hack to give 2 Bytes of Length followed by the string itself. } if perpendLength then begin SetLength(Result, Length(str) + 2); Result[0] := Length(str) div 256; Result[1] := Length(str) mod 256; offset := 1; end else begin SetLength(Result, Length(str)); offset := -1; end; for I := 1 to Length(str) do Result[i + offset] := ord(str[i]); end; procedure AppendArray(var Dest: TUTF8Text; Source: Array of Byte); var DestLen: Integer; begin DestLen := Length(Dest); SetLength(Dest, DestLen + Length(Source)); Move(Source, Dest[DestLen], Length(Source)); end; function BuildCommand(FixedHeader: Byte; RemainL: TRemainingLength; VariableHead: TBytes; Payload: Array of Byte): TBytes; //构建最终的发送数据包 var iNextIndex: integer; begin // Attach Fixed Header (1 byte) iNextIndex := 0; SetLength(Result, 1); Result[iNextIndex] := FixedHeader; // Attach RemainingLength (1-4 bytes) iNextIndex := Length(Result); SetLength(Result, Length(Result) + Length(RemainL)); CopyIntoArray(Result, RemainL, iNextIndex); // Attach Variable Head iNextIndex := Length(Result); SetLength(Result, Length(Result) + Length(VariableHead)); CopyIntoArray(Result, VariableHead, iNextIndex); // Attach Payload. iNextIndex := Length(Result); SetLength(Result, Length(Result) + Length(Payload)); CopyIntoArray(Result, Payload, iNextIndex); end; procedure AppendBytes(var DestArray: TBytes;const NewBytes: TBytes); var DestLen: Integer; begin DestLen := Length(DestArray); SetLength(DestArray, DestLen + Length(NewBytes)); Move(NewBytes, DestArray[DestLen], Length(NewBytes)); end; function RemainingLengthToInt(RLBytes: TBytes): Integer; var multi: integer; i: integer; digit: Byte; begin multi := 1; i := 0; Result := 0; digit := RLBytes[i]; repeat digit := RLBytes[i]; Result := Result + (digit and 127) * multi; multi := multi * 128; Inc(i); until ((digit and 128) = 0); end; end. {MQTT包格式如下: [Fixed header]+[Variable header]+[Payload] [Fixed header]: FixedHeader+RemainingLength [Variable header] : 报文标识符 [Payload]:有效荷载 [CONNECT]:客户端发起连接 格式如下: [Fixed header]: FixedHeader+RemainingLength [Variable header] :协议名+协议级别+连接标志+保持连接 04MQTT+$4+Flag+Time [Payload]: ClientId+User+Pwd }后来测试以上代码在win32正常,安卓平台需要修改一下字符串的编码函数。需要注意。
MQTT3.1.1delphi实现(xe7)跨平台
需要做一个APP控制设备的程序,思来想去放弃自己实现服务端,准备直接采用现成的MQTT服务端程序,自己只需要关心逻辑,传输的交个MQTT .网上能找到的delphi的版本是老外的基于一个三方网络库的,win32下面可以编译运行,需要修改部分AnsiString和WideString。测试的时候会掉线,此条可能是因为当时没细读协议,规定时间未发送心跳包,被服务端断开。改成
Android
死活没能编译通过,技术问题,遂放弃。准备自己读协议,重新用TIdTCPClient实现一遍,这样可以方便的跨平台,也无三方控件,同时深入理解一下mqtt.代码基本参考老外的版本。先弄个能用的测试代码出来。服务端使用mosquitto.客户端现已完成MQTT连接,MQTT心跳包。