下载tokudu-PhpMQTTClient-ba4e494.zip
下载rsmb_1.2.0.zip(windows环境下要开\windows\broker.exe 切记切记)
解压tokudu-PhpMQTTClient-ba4e494.zip
www下边新建目录androidpush
把解压文件tokudu-PhpMQTTClient-ba4e494中的SAM文件夹copy到androidpush目录下
把解压文件tokudu-PhpMQTTClient-ba4e494中的send_mqtt.php文件copy到androidpush目录下
结构如图:
send_mqtt.php主文件
<?php
require('SAM/php_sam.php');
//create a new connection object
//创建一个新的连接对象
$conn = new SAMConnection();
//start initialise the connection
//开始初始化连接 SAM_HOST服务器host路径 ;SAM_PORT服务器port端口号
$conn->connect(SAM_MQTT, array(SAM_HOST => '127.0.0.1',
SAM_PORT => 1883));
//create a new MQTT message with the output of the shell command as the body
//建立一个新的MQTT shell命令的输出消息以作为主体($msgCpu是通知内容)
//new SAMMessage() 参数一般写为json格式
$msgCpu = new SAMMessage("测试通知");
//send the message on the topic cpu
//发送该信息的主体
//860173018344139是设备号,每一台android手机对应一个唯一的设备号
//$msgCpu通知内容
//send()推送通知
$conn->send('topic://'.'860173018344139', $msgCpu);
//关闭连接
$conn->disconnect();
echo 'MQTT Message to ' . '860173018344139' . ' sent: ' . '测试通知';
?>
SAM目录
SAM/MQTT/sam_mqtt.php
<?php
/*
+----------------------------------------------------------------------+
| Copyright IBM Corporation 2006, 2007. |
| All Rights Reserved. |
+----------------------------------------------------------------------+
| |
| Licensed under the Apache License, Version 2.0 (the "License"); you |
| may not use this file except in compliance with the License. You may |
| obtain a copy of the License at |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
| implied. See the License for the specific language governing |
| permissions and limitations under the License. |
+----------------------------------------------------------------------+
| Author: Dave Renshaw |
+----------------------------------------------------------------------+
$Id: sam_mqtt.php,v 1.1 2007/02/02 15:36:46 dsr Exp $
*/
define("SAM_MQTT_CLEANSTART", "SAM_MQTT_CLEANSTART");
define("SAM_MQTT_QOS", "SAM_MQTT_QOS");
define("SAM_MQTT_SUB_SEPARATOR", "#-#");
/* ---------------------------------
SAMConnection
--------------------------------- */
class SAMConnection_MQTT {
var $debug = false;
var $errno = 0;
var $error = '';
/*
Info we need to keep between calls...
*/
var $sub_id = '';
var $port = '';
var $host = '';
var $cleanstart = false;
var $virtualConnected = false;
var $connected = false;
/*
Our current open socket...
*/
var $sock;
/*
Table of available operations using the MQTT protocol...
*/
var $operations = array("MQTT_CONNECT" => 1,
"MQTT_CONNACK" => 2,
"MQTT_PUBLISH" => 3,
"MQTT_PUBACK" => 4,
"MQTT_PUBREC" => 5,
"MQTT_PUBREL" => 6,
"MQTT_PUBCOMP" => 7,
"MQTT_SUBSCRIBE" => 8,
"MQTT_SUBACK" => 9,
"MQTT_UNSUBSCRIBE" => 10,
"MQTT_UNSUBACK" => 11,
"MQTT_PINGREC" => 12,
"MQTT_PINGRESP" => 13,
"MQTT_DISCONNECT" => 14);
/* ---------------------------------
Constructor
--------------------------------- */
function SAMConnection_MQTT() {
if ($this->debug) e('SAMConnection_MQTT()');
if ($this->debug) x('SAMConnection_MQTT()');
}
/* ---------------------------------
Commit
--------------------------------- */
function Commit() {
if ($this->debug) e('SAMConnection_MQTT.Commit()');
$errno = 100;
$error = 'Unsupported operation for MQTT protocol!';
$rc = false;
if ($this->debug) x("SAMConnection_MQTT.Commit() rc=$rc");
return $rc;
}
/* ---------------------------------
Connect
--------------------------------- */
function Connect($proto, $options=array()) {
if ($this->debug) e('SAMConnection_MQTT.Connect()');
/* Check our optional parameter array for the necessary bits... */
if ($options[SAM_PORT] == '') {
$this->port = 1883;
} else {
$this->port = $options[SAM_PORT];
}
if ($options[SAM_HOST] == '') {
$this->host = 'localhost';
} else {
$this->host = $options[SAM_HOST];
}
$this->cleanstart = in_array(SAM_MQTT_CLEANSTART, $options);
if ($this->debug) t("SAMConnection_MQTT.Connect() host=$this->host, port=$this->port, cleanstart=$this->cleanstart");
if ($this->checkHost($this->host, $this->port)) {
$this->virtualConnected = true;
} else {
$this->virtualConnected = false;
}
if ($this->debug) x("SAMConnection_MQTT.Connect() rc=$this->virtualConnected");
return $this->virtualConnected;
}
/* ---------------------------------
Disconnect
--------------------------------- */
function Disconnect() {
if ($this->debug) e('SAMConnection_MQTT.Disconnect()');
$rc = false;
if ($this->virtualConnected) {
if ($this->connected) {
$msg = $this->fixed_header("MQTT_DISCONNECT").pack('C', 0);
fwrite($this->sock, $msg);
$response = fgets($this->sock, 128);
if ($this->debug) t('SAMConnection_MQTT.Disconnect() response is '.strlen($response).' bytes');
if (strlen($response) == 0) {
fclose($this->sock);
$this->sock = NULL;
}
}
$this->virtualConnected = false;
$this->connected = false;
$rc = true;
}
if ($this->debug) x("SAMConnection_MQTT.Disconnect() rc=$rc");
return $rc;
}
/* ---------------------------------
IsConnected
--------------------------------- */
function IsConnected() {
if ($this->debug) e('SAMConnection_MQTT.IsConnected()');
$rc = false;
if ($this->connected) {
$rc = true;
}
if ($this->debug) x("SAMConnection_MQTT.IsConnected() rc=$rc");
return $rc;
}
/* ---------------------------------
Peek
--------------------------------- */
function Peek() {
if ($this->debug) e('SAMConnection_MQTT.Peek()');
$errno = 100;
$error = 'Unsupported operation for MQTT protocol!';
$rc = false;
if ($this->debug) x("SAMConnection_MQTT.Peek() rc=$rc");
return $rc;
}
/* ---------------------------------
PeekAll
--------------------------------- */
function PeekAll() {
if ($this->debug) e('SAMConnection_MQTT.PeekAll()');
$errno = 100;
$error = 'Unsupported operation for MQTT protocol!';
$rc = false;
if ($this->debug) x("SAMConnection_MQTT.PeekAll() rc=$rc");
return $rc;
}
/* ---------------------------------
Receive
--------------------------------- */
function Receive($sub_id, $options=array()) {
if ($this->debug) e('SAMConnection_MQTT.Receive()');
$rc = false;
/* strip the topic from the rear of the subscription id... */
$x = strpos($sub_id, SAM_MQTT_SUB_SEPARATOR);
if (!$x) {
$this->errno = 279;
$this->error = 'Specified subscription id ('.$sub_id.') is not valid!';
return false;
}
$topic = substr($sub_id, $x + strlen(SAM_MQTT_SUB_SEPARATOR));
$si = substr($sub_id, 0, $x);
/* Are we already connected? */
if (!$this->connected) {
if ($this->debug) t('SAMConnection_MQTT.Receive() Not connected.');
/* No, so open up the connection... */
$this->sub_id = $si;
$rc = $this->do_connect_now();
} else {
/* We are already connected. Are we using the right subscriber id? */
if ($this->sub_id != $si) {
if ($this->debug) t('SAMConnection_MQTT.Receive() Connected with wrong sub_id.');
/* No, We better reconnect then... */
$this->disconnect();
$this->sub_id = $si;
$rc = $this->do_connect_now();
} else {
if ($this->debug) t('SAMConnection_MQTT.Receive() Connected OK.');
$rc = true;
}
}
if ($rc) {
/* have we got a timeout specified? */
if ($options[SAM_WAIT] > 1) {
$m = $options[SAM_WAIT] % 1000;
$s = ($options[SAM_WAIT] - $m) /1000;
if ($this->debug) t('SAMConnection_MQTT.Receive() timeout='.$options[SAM_WAIT]." ($s secs $m millisecs)");
stream_set_timeout($this->sock, $s, $m);
if ($this->debug) t('SAMConnection_MQTT.Receive() timeout set.');
} else {
if ($this->debug) t('SAMConnection_MQTT.Receive() no timeout value found!');
}
$hdr = $this->read_fixed_header($this->sock);
if (!$hdr) {
$this->errno = 500;
$this->error = 'Receive request failed, timed out with no data!';
$rc = false;
} else {
if ($hdr['mtype'] == $this->operations['MQTT_PUBLISH']) {
$len = $this->read_remaining_length($this->sock);
if ($len > 1) {
/* read the topic length... */
$topic = $this->read_topic($this->sock);
if (!$topic) {
$this->errno = 303;
$this->error = 'Receive request failed, message format invalid!';
$rc = false;
} else {
if ($this->debug) t('SAMConnection_MQTT.Receive() topic='.$topic);
$len -= (strlen($topic) + 2);
/* If QoS 1 or 2 then read the message id... */
if ($hdr['qos'] > 0) {
$idb = fread($this->sock, 2);
$len -= 2;
$fields = unpack('na', $idb);
$mid = $fields['a'];
if ($this->debug) t('SAMConnection_MQTT.Receive() mid='.$mid);
}
$payload = fread($this->sock, $len);
if ($this->debug) t('SAMConnection_MQTT.Receive() payload='.$payload);
$rc = new SAMMessage();
$rc->body = $payload;
$rc->header->SAM_MQTT_TOPIC = 'topic://'.$topic;
$rc->header->SAM_MQTT_QOS = $hdr['qos'];
$rc->header->SAM_TYPE = 'SAM_BYTES';
}
} else {
$this->errno = 303;
$this->error = 'Receive request failed, received message too short! No topic data';
$rc = false;
}
} else {
if ($this->debug) t('SAMConnection_MQTT.Receive() Receive failed response mtype = '.$mtype);
$rc = false;
}
}
}
if ($this->debug) x("SAMConnection_MQTT.Receive() rc=$rc");
return $rc;
}
/* ---------------------------------
Remove
--------------------------------- */
function Remove() {
if ($this->debug) e('SAMConnection_MQTT.Remove()');
$errno = 100;
$error = 'Unsupported operation for MQTT protocol!';
$rc = false;
if ($this->debug) x("SAMConnection_MQTT.Remove() rc=$rc");
return $rc;
}
/* ---------------------------------
Rollback
--------------------------------- */
function Rollback() {
if ($this->debug) e('SAMConnection_MQTT.Rollback()');
$errno = 100;
$error = 'Unsupported operation for MQTT protocol!';
$rc = false;
if ($this->debug) x("SAMConnection_MQTT.Rollback() rc=$rc");
return $rc;
}
/* ---------------------------------
Send
--------------------------------- */
function Send($topic, $message, $options=array()) {
if ($this->debug) e('SAMConnection_MQTT.Send()');
$rc = true;
/* check the format of the topic... */
if (strncmp($topic, 'topic://', 8) == 0) {
$t = substr($topic, 8);
} else {
$this->errno = 279;
$this->error = 'Specified target ('.$topic.') is not a valid topic!';
return false;
}
if (in_array(SAM_MQTT_QOS, $options)) {
$qos = $options[SAM_MQTT_QOS];
} else {
$qos = 0;
}
/* Are we already connected? */
if (!$this->connected) {
/* No, so open up the connection... */
$this->do_connect_now();
}
$mid = rand();
$variable = $this->utf($t);
if ($qos > 0) {
$variable .= pack('n', $mid);
}
$payload = $message->body;
// add in the remaining length field and fix it together
$msg = $this->fixed_header("MQTT_PUBLISH", 0, $qos) . $this->remaining_length(strlen($variable)+strlen($payload)) . $variable . $payload;
fwrite($this->sock, $msg);
if ($qos > 0) {
$hdr = $this->read_fixed_header($this->sock);
if ($hdr) {
/* is this a QoS level 1 message being sent? */
if ($qos == 1) {
/* Yup, so we should get a PUBACK response message... */
if ($hdr['mtype'] == $this->operations['MQTT_PUBACK']) {
$len = $this->read_remaining_length($this->sock);
if ($len > 0) {
$response = fread($this->sock, $len);
}
if ($len < 2) {
if ($this->debug) t("SAMConnection_MQTT.Send() send failed, incorrect length response ($len) received!");
$this->errno = 302;
$this->error = 'Send request failed!';
$rc = false;
} else {
$rc = true;
}
} else {
if ($this->debug) t('SAMConnection_MQTT.Send() Send failed response mtype = '.$mtype.' Expected PUBREC!');
$rc = false;
}
} else {
/* lets assume it's QoS level 2... */
/* We should get a PUBREC response message... */
if ($hdr['mtype'] == $this->operations['MQTT_PUBREC']) {
$len = $this->read_remaining_length($this->sock);
if ($len > 0) {
$response = fread($this->sock, $len);
}
if ($len < 2) {
if ($this->debug) t("SAMConnection_MQTT.Send() send failed, incorrect length response ($len) received!");
$this->errno = 302;
$this->error = 'Send request failed!';
$rc = false;
} else {
$rc = true;
/* Now we can send a PUBREL message... */
$variable = pack('n', $mid);
$msg = $this->fixed_header("MQTT_PUBREL").$this->remaining_length(strlen($variable)).$variable;
fwrite($this->sock, $msg);
/* get a response... */
$hdr = $this->read_fixed_header($this->sock);
if ($hdr['mtype'] == $this->operations['MQTT_PUBCOMP']) {
$len = $this->read_remaining_length($this->sock);
if ($len > 0) {
$response = fread($this->sock, $len);
}
if ($len < 2) {
if ($this->debug) t("SAMConnection_MQTT.Send() send failed, incorrect length response ($len) received!");
$this->errno = 302;
$this->error = 'Send request failed!';
$rc = false;
} else {
$rc = true;
}
} else {
if ($this->debug) t('SAMConnection_MQTT.Send() Send failed response mtype = '.$mtype.' Expected PUBCOMP!');
$rc = false;
}
}
} else {
if ($this->debug) t('SAMConnection_MQTT.Send() Send failed response mtype = '.$mtype);
$rc = false;
}
}
}
}
if ($this->debug) x("SAMConnection_MQTT.Send() rc=$rc");
return $rc;
}
/* ---------------------------------
SetDebug
--------------------------------- */
function SetDebug($option=false) {
$this->debug = $option;
return;
}
/* ---------------------------------
Subscribe
--------------------------------- */
function Subscribe($topic, $options=array()) {
if ($this->debug) e("SAMConnection_MQTT.Subscribe($topic)");
$rc = true;
/* check the format of the topic... */
if (strncmp($topic, 'topic://', 8) == 0) {
$t = substr($topic, 8);
} else {
$this->errno = 279;
$this->error = 'Specified target ('.$topic.') is not a valid topic!';
return false;
}
if (in_array(SAM_MQTT_QOS, $options)) {
$qos = $options[SAM_MQTT_QOS];
} else {
$qos = 0;
}
/* Are we already connected? */
if (!$this->connected) {
/* No, so open up the connection... */
if (!$this->do_connect_now()) {
return false;
}
}
// variable header: message id (16 bits)
$x = rand(1, 16000);
$variable = pack('n', $x);
// payload: client ID
$payload = $this->utf($t).pack('C', $qos);
// add in the remaining length field and fix it together
$msg = $this->fixed_header("MQTT_SUBSCRIBE", 0, 1) . $this->remaining_length(strlen($variable)+strlen($payload)) . $variable . $payload;
fwrite($this->sock, $msg);
$hdr = $this->read_fixed_header($this->sock);
if (!$hdr) {
if ($this->debug) t("SAMConnection_MQTT.Subscribe() subscribe failed, no response from broker!");
$this->errno = 301;
$this->error = 'Subscribe request failed, no response from broker!';
$rc = false;
} else {
if ($hdr['mtype'] == $this->operations['MQTT_SUBACK']) {
$len = $this->read_remaining_length($this->sock);
if ($len > 0) {
$response = fread($this->sock, $len);
/* Return the subscription id with the topic appended to it so we can unsubscribe easily... */
$rc = $this->sub_id.SAM_MQTT_SUB_SEPARATOR.$t;
}
if ($len < 3) {
if ($this->debug) t("SAMConnection_MQTT.Subscribe() subscribe failed, incorrect length response ($len) received!");
$this->errno = 301;
$this->error = 'Subscribe request failed, incorrect length response ($len) received!';
$rc = false;
}
} else {
if ($this->debug) t('SAMConnection_MQTT.Subscribe() subscribe failed response mtype = '.$mtype);
$rc = false;
}
}
if ($this->debug) x("SAMConnection_MQTT.Subscribe() rc=$rc");
return $rc;
}
/* ---------------------------------
Unsubscribe
--------------------------------- */
function Unsubscribe($sub_id) {
if ($this->debug) e("SAMConnection_MQTT.Unsubscribe($sub_id)");
/* Detach the topic from the rear of the subscription id... */
$x = strpos($sub_id, SAM_MQTT_SUB_SEPARATOR);
if (!$x) {
$this->errno = 279;
$this->error = 'Specified subscription id ('.$sub_id.') is not valid!';
return false;
}
$topic = substr($sub_id, $x + strlen(SAM_MQTT_SUB_SEPARATOR));
$si = substr($sub_id, 0, $x);
/* Are we already connected? */
if (!$this->connected) {
if ($this->debug) t('SAMConnection_MQTT.Unsubscribe() Not connected.');
/* No, so open up the connection... */
$this->sub_id = $si;
$rc = $this->do_connect_now();
} else {
/* We are already connected. Are we using the right subscriber id? */
if ($this->sub_id != $si) {
if ($this->debug) t('SAMConnection_MQTT.Unsubscribe() Connected with wrong sub_id.');
/* No, We better reconnect then... */
$this->disconnect();
$this->sub_id = $si;
$rc = $this->do_connect_now();
} else {
if ($this->debug) t('SAMConnection_MQTT.Unsubscribe() Connected OK.');
$rc = true;
}
}
/* variable header: message id (16 bits) */
$x = rand(1, 16000);
$variable = pack('n', $x);
/* payload: client ID */
$payload = $this->utf($topic);
/* add in the remaining length field and fix it together */
$msg = $this->fixed_header("MQTT_UNSUBSCRIBE", 0, 1) . $this->remaining_length(strlen($variable)+strlen($payload)) . $variable . $payload;
fwrite($this->sock, $msg);
$hdr = $this->read_fixed_header($this->sock);
if (!$hdr) {
if ($this->debug) t("SAMConnection_MQTT.Unsubscribe() unsubscribe failed, no response from broker!");
$this->errno = 302;
$this->error = 'Unsubscribe request failed, no response from broker!';
$rc = false;
} else {
if ($hdr['mtype'] == $this->operations['MQTT_UNSUBACK']) {
$len = $this->read_remaining_length($this->sock);
if ($len > 0) {
$response = fread($this->sock, $len);
$rc = $this->sub_id;
}
if ($len != 2) {
if ($this->debug) t("SAMConnection_MQTT.Unsubscribe() unsubscribe failed, incorrect length response ($len) received!");
$this->errno = 301;
$this->error = "Unsubscribe request failed, incorrect length response ($len) received!";
$rc = false;
}
} else {
if ($this->debug) t('SAMConnection_MQTT.Unsubscribe() unsubscribe failed response mtype = '.$hdr['mtype']);
$rc = false;
}
}
if ($this->debug) x("SAMConnection_MQTT.Unsubscribe() rc=$rc");
return $rc;
}
function remaining_length($l) {
/* return the remaining length field bytes for an integer input parameter */
if ($this->debug) t("SAMConnection_MQTT.remaining_length() l=$l");
$rlf = '';
do {
$digit = $l % 128;
$l = ($l - $digit)/128;
if ($this->debug) t("SAMConnection_MQTT.remaining_length() digit=$digit l=$l");
# if there are more digits to encode, set the top bit of this digit
if ( $l > 0 ) {
$digit += 128;
}
$digit = pack('C', $digit);
$rlf .= $digit;
if ($this->debug) t("SAMConnection_MQTT.remaining_length() rlf=$rlf");
} while ($l > 0);
return $rlf;
}
function utf($s) {
/* return the UTF-8 encoded version of the parameter */
$l = strlen($s);
$b1 = pack('C', $l/256);
$b2 = pack('C', $l%256);
$rc = $b1.$b2.$s;
return $rc;
}
function fixed_header($operation, $dup=0, $qos=0, $retain=0) {
/* fixed header: msg type (4) dup (1) qos (2) retain (1) */
return pack('C', ($this->operations[$operation] * 16) + ($dup * 4) + ($qos * 2) + $retain);
}
function checkHost($hostname, $port) {
if ($this->debug) e("SAMConnection_MQTT.checkHost($hostname)");
$rc = false;
$fp = fsockopen($hostname, $port);
if (!$fp) {
$rc = false;
} else {
$this->sock = $fp;
$rc = true;
}
if ($this->debug) x("SAMConnection_MQTT.checkHost(rc=$rc)");
return $rc;
}
function do_connect_now() {
$rc = true;
/* Do we have a client/subscriber id yet? */
if ($this->sub_id == '') {
/* No, so create a unique one... */
$this->sub_id = uniqid('', true);
if ($this->debug) t("SAMConnection_MQTT.do_connect_now() sub_id=$this->sub_id");
} else {
if ($this->debug) t("SAMConnection_MQTT.do_connect_now() using existing sub_id=$this->sub_id");
}
if ($this->cleanstart) {
$x = "\x03";
} else {
$x = "\x00";
}
$variable = $this->utf('MQIsdp')."\x03$x\x00\x00";
/* payload is subscriber id */
$payload = $this->utf($this->sub_id);
/* add in the remaining length field and fix it together */
$msg = $this->fixed_header("MQTT_CONNECT") . $this->remaining_length(strlen($variable)+strlen($payload)) . $variable . $payload;
$errno = 0;
$errstr = '';
if (!$this->virtualConnected) {
$fp = fsockopen($this->host, $this->port, $errno, $errstr);
if (!$fp) {
if ($this->debug) t("SAMConnection_MQTT.do_connect_now() fsockopen failed! ($errno) $errstr");
$this->errno = 208;
$this->error = 'Unable to open socket to broker!';
$this->sock = NULL;
return false;
} else {
$this->virtualConnected = true;
$this->sock = $fp;
}
}
stream_set_timeout($this->sock, 10);
fwrite($this->sock, $msg);
$hdr = $this->read_fixed_header($this->sock);
if ($hdr) {
if ($hdr['mtype'] == $this->operations['MQTT_CONNACK']) {
$len = $this->read_remaining_length($this->sock);
if ($len < 2) {
if ($this->debug) t("SAMConnection_MQTT.do_connect_now() connect failed, incorrect length response ($len) received!");
$this->errno = 218;
$this->error = 'Unable to open connection to broker!';
$rc = false;
} else {
$response = fread($this->sock, $len);
$fields = unpack('Ccomp/Cretcode', $response);
if ($fields['retcode'] == 0) {
$rc = $this->sock;
$this->connected = true;
$rc = true;
if ($this->debug) t('SAMConnection_MQTT.do_connect_now() connected OK');
} else {
if ($this->debug) t('SAMConnection_MQTT.do_connect_now() connect failed retcode = '.$fields['retcode']);
$rc = false;
if ($fields['retcode'] == 2) {
$this->sub_id = '';
$this->errno = 279;
$this->error = 'Invalid subscription id!';
}
}
}
} else {
if ($this->debug) t('SAMConnection_MQTT.do_connect_now() connect failed response mtype = '.$mtype);
$rc = false;
}
}
if (!$rc) {
fclose($this->sock);
$this->sock = NULL;
$this->virtualConnected = false;
}
return $rc;
}
function read_fixed_header($conn) {
$rc = false;
$response = fread($conn, 1);
if (strlen($response) > 0) {
$fields = unpack('Cbyte1', $response);
$x = $fields['byte1'];
$ret = $x % 2;
$x -= $ret;
$qos = ($x % 8) / 2;
$x -= ($qos * 2);
$dup = ($x % 16) / 8;
$x -= ($dup * 8);
$mtype = $x / 16;
if ($this->debug) t("SAMConnection_MQTT.read_fixed_header() mtype=$mtype, dup=$dup, qos=$qos, retain=$ret");
$rc = array('mtype' => $mtype, 'dup' => $dup, 'qos' => $qos, 'retain' => $ret);
}
return $rc;
}
function read_remaining_length($conn) {
$rc = 0;
$m = 1;
while (!feof($conn)) {
$byte = fgetc($conn);
$fields = unpack('Ca', $byte);
$x = $fields['a'];
if ($this->debug) t('SAMConnection_MQTT.read_remaining_length() byte ('.strlen($byte).') = '.$x);
if ($x < 128) {
$rc += $x * $m;
break;
} else {
$rc += (($x - 128) * $m);
}
$m *= 128;
}
if ($this->debug) t('SAMConnection_MQTT.read_remaining_length() remaining length = '.$rc);
return $rc;
}
function read_topic($conn) {
if ($this->debug) e('SAMConnection_MQTT.read_topic()');
$rc = false;
while (!feof($conn)) {
$tlen = fread($conn, 2);
$fields = unpack('na', $tlen);
if ($this->debug) t('SAMConnection_MQTT.read_topic() topic length='.$fields['a']);
$rc = fread($conn, $fields['a']);
break;
}
if ($this->debug) x("SAMConnection_MQTT.read_topic(rc=$rc)");
return $rc;
}
}
?>
SAM/php_sam.php
<?php
/*
+----------------------------------------------------------------------+
| Copyright IBM Corporation 2006, 2007. |
| All Rights Reserved. |
+----------------------------------------------------------------------+
| |
| Licensed under the Apache License, Version 2.0 (the "License"); you |
| may not use this file except in compliance with the License. You may |
| obtain a copy of the License at |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
| implied. See the License for the specific language governing |
| permissions and limitations under the License. |
+----------------------------------------------------------------------+
| Author: Dave Renshaw |
+----------------------------------------------------------------------+
$Id: php_sam.php,v 1.1 2007/02/02 15:38:53 dsr Exp $
*/
/* Debugging flags and functions available to sub packages... */
$eol = "\n";
if (isset($_SERVER['REQUEST_URI'])) {
$eol = '<br>';
}
function e($s) {global $eol;echo '-->'.$s."$eol";}
function t($s) {global $eol;echo ' '.$s."$eol";}
function x($s) {global $eol;echo '<--'.$s."$eol";}
define('SAM_MQTT', 'mqtt');
/* ---------------------------------
SAMConnection
--------------------------------- */
class SAMConnection {
// var $debug = true;
var $debug = false;
var $errno = 0;
var $error = '';
var $connection;
/* ---------------------------------
Create
--------------------------------- */
function Create($proto) {
if ($this->debug) e("SAMConnection.Create(proto=$proto)");
$rc = false;
/* search the PHP config for a factory to use... */
$x = get_cfg_var('sam.factory.'.$proto);
if ($this->debug) t('SAMConnection.Create() get_cfg_var() "'.$x.'"');
/* If there is no configuration (php.ini) entry for this protocol, default it. */
if (strlen($x) == 0) {
/* for every protocol other than MQTT assume we will use XMS */
if ($proto != 'mqtt') {
$x = 'xms';
} else {
$x = 'mqtt';
}
}
/* Invoke the chosen factory to create a real connection object... */
$x = 'sam_factory_'.$x.'.php';
if ($this->debug) t("SAMConnection.Create() calling factory - $x");
$rc = include $x;
if ($this->debug && $rc) t('SAMConnection.Create() rc = '.get_class($rc));
if ($this->debug) x('SAMConnection.Create()');
return $rc;
}
/* ---------------------------------
Constructor
--------------------------------- */
function SAMConnection() {
if ($this->debug) e('SAMConnection()');
if ($this->debug) x('SAMConnection()');
}
/* ---------------------------------
Commit
--------------------------------- */
function Commit() {
if ($this->debug) e('SAMConnection.Commit()');
$rc = true;
if (!$this->connection) {
$errno = 106;
$error = 'No active connection!';
$rc = false;
} else {
/* Call the method on the underlying connection object... */
$rc = $this->connection->commit($target, $options);
$this->errno = $this->connection->errno;
$this->error = $this->connection->error;
if (!$rc) {
if ($this->debug) t("SAMConnection.Commit() commit failed ($this->errno) $this->error");
$rc = false;
}
}
if ($this->debug) x("SAMConnection.Commit() rc=$rc");
return $rc;
}
/* ---------------------------------
Connect
--------------------------------- */
function Connect($proto='', $options=array()) {
if ($this->debug) e('SAMConnection.Connect()');
$rc = false;
if ($proto == '') {
$errno = 101;
$error = 'Incorrect number of parameters on connect call!';
$rc = false;
} else {
$this->connection = $this->create($proto);
if (!$this->connection) {
$errno = 102;
$error = 'Unsupported protocol!';
$rc = false;
} else {
if ($this->debug) t("SAMConnection.Connect() connection created for protocol $proto");
$this->connection->setdebug($this->debug);
/* Call the connect method on the newly created connection object... */
$rc = $this->connection->connect($proto, $options);
$this->errno = $this->connection->errno;
$this->error = $this->connection->error;
if (!$rc) {
if ($this->debug) t("SAMConnection.Connect() connect failed ($this->errno) $this->error");
} else {
$rc = true;
}
}
}
if ($this->debug) x("SAMConnection.Connect() rc=$rc");
return $rc;
}
/* ---------------------------------
Disconnect
--------------------------------- */
function Disconnect() {
if ($this->debug) e('SAMConnection.Disconnect()');
$rc = true;
if (!$this->connection) {
$errno = 106;
$error = 'No active connection!';
$rc = false;
} else {
/* Call the method on the underlying connection object... */
$rc = $this->connection->Disconnect();
$this->errno = $this->connection->errno;
$this->error = $this->connection->error;
if (!$rc) {
if ($this->debug) t("SAMConnection.Disconnect() Disconnect failed ($this->errno) $this->error");
} else {
$rc = true;
$this->connection = false;
}
}
if ($this->debug) x("SAMConnection.Disconnect() rc=$rc");
return $rc;
}
/* ---------------------------------
IsConnected
--------------------------------- */
function IsConnected() {
if ($this->debug) e('SAMConnection.IsConnected()');
$rc = true;
if (!$this->connection) {
$errno = 106;
$error = 'No active connection!';
$rc = false;
} else {
/* Call the method on the underlying connection object... */
$rc = $this->connection->isconnected();
$this->errno = $this->connection->errno;
$this->error = $this->connection->error;
if (!$rc) {
if ($this->debug) t("SAMConnection.IsConnected() isconnected failed ($this->errno) $this->error");
$rc = false;
}
}
if ($this->debug) x("SAMConnection.IsConnected() rc=$rc");
return $rc;
}
/* ---------------------------------
Peek
--------------------------------- */
function Peek($target, $options=array()) {
if ($this->debug) e('SAMConnection.Peek()');
$rc = true;
if (!$this->connection) {
$errno = 106;
$error = 'No active connection!';
$rc = false;
} else {
/* Call the method on the underlying connection object... */
$rc = $this->connection->peek($target, $options);
$this->errno = $this->connection->errno;
$this->error = $this->connection->error;
if (!$rc) {
if ($this->debug) t("SAMConnection.Peek() peek failed ($this->errno) $this->error");
$rc = false;
}
}
if ($this->debug) x("SAMConnection.Peek() rc=$rc");
return $rc;
}
/* ---------------------------------
PeekAll
--------------------------------- */
function PeekAll($target, $options=array()) {
if ($this->debug) e('SAMConnection.PeekAll()');
$rc = true;
if (!$this->connection) {
$errno = 106;
$error = 'No active connection!';
$rc = false;
} else {
/* Call the method on the underlying connection object... */
$rc = $this->connection->peekall($target, $options);
$this->errno = $this->connection->errno;
$this->error = $this->connection->error;
if (!$rc) {
if ($this->debug) t("SAMConnection.PeekAll() peekall failed ($this->errno) $this->error");
$rc = false;
}
}
if ($this->debug) x("SAMConnection.PeekAll() rc=$rc");
return $rc;
}
/* ---------------------------------
Receive
--------------------------------- */
function Receive($target, $options=array()) {
if ($this->debug) e('SAMConnection.Receive()');
$rc = true;
if (!$this->connection) {
$errno = 106;
$error = 'No active connection!';
$rc = false;
} else {
/* Call the receive method on the underlying connection object... */
$rc = $this->connection->receive($target, $options);
$this->errno = $this->connection->errno;
$this->error = $this->connection->error;
if (!$rc) {
if ($this->debug) t("SAMConnection.Receive() receive failed ($this->errno) $this->error");
}
}
if ($this->debug) x("SAMConnection.Receive() rc=$rc");
return $rc;
}
/* ---------------------------------
Remove
--------------------------------- */
function Remove($target, $options=array()) {
if ($this->debug) e('SAMConnection.Remove()');
$rc = true;
if (!$this->connection) {
$errno = 106;
$error = 'No active connection!';
$rc = false;
} else {
/* Call the method on the underlying connection object... */
$rc = $this->connection->remove($target, $options);
$this->errno = $this->connection->errno;
$this->error = $this->connection->error;
if (!$rc) {
if ($this->debug) t("SAMConnection.Remove() remove failed ($this->errno) $this->error");
$rc = false;
}
}
if ($this->debug) x("SAMConnection.Remove() rc=$rc");
return $rc;
}
/* ---------------------------------
Rollback
--------------------------------- */
function Rollback() {
if ($this->debug) e('SAMConnection.Rollback()');
$rc = true;
if (!$this->connection) {
$errno = 106;
$error = 'No active connection!';
$rc = false;
} else {
/* Call the method on the underlying connection object... */
$rc = $this->connection->rollback($target, $options);
$this->errno = $this->connection->errno;
$this->error = $this->connection->error;
if (!$rc) {
if ($this->debug) t("SAMConnection.Rollback() rollback failed ($this->errno) $this->error");
$rc = false;
}
}
if ($this->debug) x("SAMConnection.Rollback() rc=$rc");
return $rc;
}
/* ---------------------------------
Send
--------------------------------- */
function Send($target, $msg, $options=array()) {
if ($this->debug) e('SAMConnection.Send()');
$rc = true;
if (!$this->connection) {
$errno = 106;
$error = 'No active connection!';
$rc = false;
} else {
/* Call the send method on the underlying connection object... */
$rc = $this->connection->send($target, $msg, $options);
$this->errno = $this->connection->errno;
$this->error = $this->connection->error;
if (!$rc) {
if ($this->debug) t("SAMConnection.Send() send failed ($this->errno) $this->error");
$rc = false;
}
}
if ($this->debug) x("SAMConnection.Send() rc=$rc");
return $rc;
}
/* ---------------------------------
SetDebug
--------------------------------- */
function SetDebug($option=false) {
if ($this->debug) e("SAMConnection.SetDebug($option)");
$this->debug = $option;
if ($this->connection) {
$this->connection->setdebug($option);
}
if ($this->debug) x('SAMConnection.SetDebug()');
return;
}
/* ---------------------------------
Subscribe
--------------------------------- */
function Subscribe($topic, $options=array()) {
if ($this->debug) e("SAMConnection.Subscribe($topic)");
$rc = true;
if (!$this->connection) {
$errno = 106;
$error = 'No active connection!';
$rc = false;
} else {
/* Call the subscribe method on the underlying connection object... */
$rc = $this->connection->subscribe($topic, $options);
$this->errno = $this->connection->errno;
$this->error = $this->connection->error;
if (!$rc) {
if ($this->debug) t("SAMConnection.Subscribe() subscribe failed ($this->errno) $this->error");
$rc = false;
}
}
if ($this->debug) x("SAMConnection.Subscribe() rc=$rc");
return $rc;
}
/* ---------------------------------
Unsubscribe
--------------------------------- */
function Unsubscribe($sub_id) {
if ($this->debug) e("SAMConnection.Unsubscribe($sub_id)");
$rc = true;
if (!$this->connection) {
$errno = 106;
$error = 'No active connection!';
$rc = false;
} else {
/* Call the subscribe method on the underlying connection object... */
$rc = $this->connection->unsubscribe($sub_id);
$this->errno = $this->connection->errno;
$this->error = $this->connection->error;
if (!$rc) {
if ($this->debug) t("SAMConnection.Unsubscribe() unsubscribe failed ($this->errno) $this->error");
$rc = false;
}
}
if ($this->debug) x("SAMConnection.Unsubscribe() rc=$rc");
return $rc;
}
}
/* ---------------------------------
SAMMessage
--------------------------------- */
class SAMMessage {
/* ---------------------------------
Constructor
--------------------------------- */
function SAMMessage($body='') {
if ($body != '') {
$this->body = $body;
}
}
}
?>
SAM/sam_factory_mqtt.php
<?php
/*
+----------------------------------------------------------------------+
| Copyright IBM Corporation 2006, 2007. |
| All Rights Reserved. |
+----------------------------------------------------------------------+
| |
| Licensed under the Apache License, Version 2.0 (the "License"); you |
| may not use this file except in compliance with the License. You may |
| obtain a copy of the License at |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
| implied. See the License for the specific language governing |
| permissions and limitations under the License. |
+----------------------------------------------------------------------+
| Author: Dave Renshaw |
+----------------------------------------------------------------------+
$Id: sam_factory_mqtt.php,v 1.1 2007/02/02 15:40:41 dsr Exp $
*/
require_once('MQTT/sam_mqtt.php');
return new SAMConnection_MQTT();
?>
SAM/sam_factory_xms.php
<?php
/*
+----------------------------------------------------------------------+
| Copyright IBM Corporation 2006, 2007. |
| All Rights Reserved. |
+----------------------------------------------------------------------+
| |
| Licensed under the Apache License, Version 2.0 (the "License"); you |
| may not use this file except in compliance with the License. You may |
| obtain a copy of the License at |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
| implied. See the License for the specific language governing |
| permissions and limitations under the License. |
+----------------------------------------------------------------------+
| Author: Dave Renshaw |
+----------------------------------------------------------------------+
$Id: sam_factory_xms.php,v 1.1 2007/02/02 15:40:00 dsr Exp $
*/
if (!class_exists('SAMXMSConnection')) {
global $eol;
$l = (strstr(PHP_OS, 'WIN') ? 'php_' : '').'sam_xms.'.(strstr(PHP_OS, 'WIN') ? 'dll' : 'so');
echo $eol.'<font color="red"><b>Unable to access SAM XMS capabilities. Ensure the '.$l.' library is defined as an extension.</b></font>'.$eol;
return false;
} else {
return new SAMXMSConnection();
}
?>