下载tokudu-PhpMQTTClient-ba4e494.zip
下载rsmb_1.2.0.zip(windows环境下要开\windows\broker.exe 切记切记)
解压tokudu-PhpMQTTClient-ba4e494.zip
www下边新建目录androidpush
把解压文件tokudu-PhpMQTTClient-ba4e494中的SAM文件夹copy到androidpush目录下
把解压文件tokudu-PhpMQTTClient-ba4e494中的send_mqtt.php文件copy到androidpush目录下
结构如图:
send_mqtt.php主文件
<?php require('SAM/php_sam.php'); //create a new connection object //创建一个新的连接对象 $conn = new SAMConnection(); //start initialise the connection //开始初始化连接 SAM_HOST服务器host路径 ;SAM_PORT服务器port端口号 $conn->connect(SAM_MQTT, array(SAM_HOST => '127.0.0.1', SAM_PORT => 1883)); //create a new MQTT message with the output of the shell command as the body //建立一个新的MQTT shell命令的输出消息以作为主体($msgCpu是通知内容) //new SAMMessage() 参数一般写为json格式 $msgCpu = new SAMMessage("测试通知"); //send the message on the topic cpu //发送该信息的主体 //860173018344139是设备号,每一台android手机对应一个唯一的设备号 //$msgCpu通知内容 //send()推送通知 $conn->send('topic://'.'860173018344139', $msgCpu); //关闭连接 $conn->disconnect(); echo 'MQTT Message to ' . '860173018344139' . ' sent: ' . '测试通知'; ?>
SAM目录
SAM/MQTT/sam_mqtt.php
<?php /* +----------------------------------------------------------------------+ | Copyright IBM Corporation 2006, 2007. | | All Rights Reserved. | +----------------------------------------------------------------------+ | | | Licensed under the Apache License, Version 2.0 (the "License"); you | | may not use this file except in compliance with the License. You may | | obtain a copy of the License at | | http://www.apache.org/licenses/LICENSE-2.0 | | | | Unless required by applicable law or agreed to in writing, software | | distributed under the License is distributed on an "AS IS" BASIS, | | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | | implied. See the License for the specific language governing | | permissions and limitations under the License. | +----------------------------------------------------------------------+ | Author: Dave Renshaw | +----------------------------------------------------------------------+ $Id: sam_mqtt.php,v 1.1 2007/02/02 15:36:46 dsr Exp $ */ define("SAM_MQTT_CLEANSTART", "SAM_MQTT_CLEANSTART"); define("SAM_MQTT_QOS", "SAM_MQTT_QOS"); define("SAM_MQTT_SUB_SEPARATOR", "#-#"); /* --------------------------------- SAMConnection --------------------------------- */ class SAMConnection_MQTT { var $debug = false; var $errno = 0; var $error = ''; /* Info we need to keep between calls... */ var $sub_id = ''; var $port = ''; var $host = ''; var $cleanstart = false; var $virtualConnected = false; var $connected = false; /* Our current open socket... */ var $sock; /* Table of available operations using the MQTT protocol... */ var $operations = array("MQTT_CONNECT" => 1, "MQTT_CONNACK" => 2, "MQTT_PUBLISH" => 3, "MQTT_PUBACK" => 4, "MQTT_PUBREC" => 5, "MQTT_PUBREL" => 6, "MQTT_PUBCOMP" => 7, "MQTT_SUBSCRIBE" => 8, "MQTT_SUBACK" => 9, "MQTT_UNSUBSCRIBE" => 10, "MQTT_UNSUBACK" => 11, "MQTT_PINGREC" => 12, "MQTT_PINGRESP" => 13, "MQTT_DISCONNECT" => 14); /* --------------------------------- Constructor --------------------------------- */ function SAMConnection_MQTT() { if ($this->debug) e('SAMConnection_MQTT()'); if ($this->debug) x('SAMConnection_MQTT()'); } /* --------------------------------- Commit --------------------------------- */ function Commit() { if ($this->debug) e('SAMConnection_MQTT.Commit()'); $errno = 100; $error = 'Unsupported operation for MQTT protocol!'; $rc = false; if ($this->debug) x("SAMConnection_MQTT.Commit() rc=$rc"); return $rc; } /* --------------------------------- Connect --------------------------------- */ function Connect($proto, $options=array()) { if ($this->debug) e('SAMConnection_MQTT.Connect()'); /* Check our optional parameter array for the necessary bits... */ if ($options[SAM_PORT] == '') { $this->port = 1883; } else { $this->port = $options[SAM_PORT]; } if ($options[SAM_HOST] == '') { $this->host = 'localhost'; } else { $this->host = $options[SAM_HOST]; } $this->cleanstart = in_array(SAM_MQTT_CLEANSTART, $options); if ($this->debug) t("SAMConnection_MQTT.Connect() host=$this->host, port=$this->port, cleanstart=$this->cleanstart"); if ($this->checkHost($this->host, $this->port)) { $this->virtualConnected = true; } else { $this->virtualConnected = false; } if ($this->debug) x("SAMConnection_MQTT.Connect() rc=$this->virtualConnected"); return $this->virtualConnected; } /* --------------------------------- Disconnect --------------------------------- */ function Disconnect() { if ($this->debug) e('SAMConnection_MQTT.Disconnect()'); $rc = false; if ($this->virtualConnected) { if ($this->connected) { $msg = $this->fixed_header("MQTT_DISCONNECT").pack('C', 0); fwrite($this->sock, $msg); $response = fgets($this->sock, 128); if ($this->debug) t('SAMConnection_MQTT.Disconnect() response is '.strlen($response).' bytes'); if (strlen($response) == 0) { fclose($this->sock); $this->sock = NULL; } } $this->virtualConnected = false; $this->connected = false; $rc = true; } if ($this->debug) x("SAMConnection_MQTT.Disconnect() rc=$rc"); return $rc; } /* --------------------------------- IsConnected --------------------------------- */ function IsConnected() { if ($this->debug) e('SAMConnection_MQTT.IsConnected()'); $rc = false; if ($this->connected) { $rc = true; } if ($this->debug) x("SAMConnection_MQTT.IsConnected() rc=$rc"); return $rc; } /* --------------------------------- Peek --------------------------------- */ function Peek() { if ($this->debug) e('SAMConnection_MQTT.Peek()'); $errno = 100; $error = 'Unsupported operation for MQTT protocol!'; $rc = false; if ($this->debug) x("SAMConnection_MQTT.Peek() rc=$rc"); return $rc; } /* --------------------------------- PeekAll --------------------------------- */ function PeekAll() { if ($this->debug) e('SAMConnection_MQTT.PeekAll()'); $errno = 100; $error = 'Unsupported operation for MQTT protocol!'; $rc = false; if ($this->debug) x("SAMConnection_MQTT.PeekAll() rc=$rc"); return $rc; } /* --------------------------------- Receive --------------------------------- */ function Receive($sub_id, $options=array()) { if ($this->debug) e('SAMConnection_MQTT.Receive()'); $rc = false; /* strip the topic from the rear of the subscription id... */ $x = strpos($sub_id, SAM_MQTT_SUB_SEPARATOR); if (!$x) { $this->errno = 279; $this->error = 'Specified subscription id ('.$sub_id.') is not valid!'; return false; } $topic = substr($sub_id, $x + strlen(SAM_MQTT_SUB_SEPARATOR)); $si = substr($sub_id, 0, $x); /* Are we already connected? */ if (!$this->connected) { if ($this->debug) t('SAMConnection_MQTT.Receive() Not connected.'); /* No, so open up the connection... */ $this->sub_id = $si; $rc = $this->do_connect_now(); } else { /* We are already connected. Are we using the right subscriber id? */ if ($this->sub_id != $si) { if ($this->debug) t('SAMConnection_MQTT.Receive() Connected with wrong sub_id.'); /* No, We better reconnect then... */ $this->disconnect(); $this->sub_id = $si; $rc = $this->do_connect_now(); } else { if ($this->debug) t('SAMConnection_MQTT.Receive() Connected OK.'); $rc = true; } } if ($rc) { /* have we got a timeout specified? */ if ($options[SAM_WAIT] > 1) { $m = $options[SAM_WAIT] % 1000; $s = ($options[SAM_WAIT] - $m) /1000; if ($this->debug) t('SAMConnection_MQTT.Receive() timeout='.$options[SAM_WAIT]." ($s secs $m millisecs)"); stream_set_timeout($this->sock, $s, $m); if ($this->debug) t('SAMConnection_MQTT.Receive() timeout set.'); } else { if ($this->debug) t('SAMConnection_MQTT.Receive() no timeout value found!'); } $hdr = $this->read_fixed_header($this->sock); if (!$hdr) { $this->errno = 500; $this->error = 'Receive request failed, timed out with no data!'; $rc = false; } else { if ($hdr['mtype'] == $this->operations['MQTT_PUBLISH']) { $len = $this->read_remaining_length($this->sock); if ($len > 1) { /* read the topic length... */ $topic = $this->read_topic($this->sock); if (!$topic) { $this->errno = 303; $this->error = 'Receive request failed, message format invalid!'; $rc = false; } else { if ($this->debug) t('SAMConnection_MQTT.Receive() topic='.$topic); $len -= (strlen($topic) + 2); /* If QoS 1 or 2 then read the message id... */ if ($hdr['qos'] > 0) { $idb = fread($this->sock, 2); $len -= 2; $fields = unpack('na', $idb); $mid = $fields['a']; if ($this->debug) t('SAMConnection_MQTT.Receive() mid='.$mid); } $payload = fread($this->sock, $len); if ($this->debug) t('SAMConnection_MQTT.Receive() payload='.$payload); $rc = new SAMMessage(); $rc->body = $payload; $rc->header->SAM_MQTT_TOPIC = 'topic://'.$topic; $rc->header->SAM_MQTT_QOS = $hdr['qos']; $rc->header->SAM_TYPE = 'SAM_BYTES'; } } else { $this->errno = 303; $this->error = 'Receive request failed, received message too short! No topic data'; $rc = false; } } else { if ($this->debug) t('SAMConnection_MQTT.Receive() Receive failed response mtype = '.$mtype); $rc = false; } } } if ($this->debug) x("SAMConnection_MQTT.Receive() rc=$rc"); return $rc; } /* --------------------------------- Remove --------------------------------- */ function Remove() { if ($this->debug) e('SAMConnection_MQTT.Remove()'); $errno = 100; $error = 'Unsupported operation for MQTT protocol!'; $rc = false; if ($this->debug) x("SAMConnection_MQTT.Remove() rc=$rc"); return $rc; } /* --------------------------------- Rollback --------------------------------- */ function Rollback() { if ($this->debug) e('SAMConnection_MQTT.Rollback()'); $errno = 100; $error = 'Unsupported operation for MQTT protocol!'; $rc = false; if ($this->debug) x("SAMConnection_MQTT.Rollback() rc=$rc"); return $rc; } /* --------------------------------- Send --------------------------------- */ function Send($topic, $message, $options=array()) { if ($this->debug) e('SAMConnection_MQTT.Send()'); $rc = true; /* check the format of the topic... */ if (strncmp($topic, 'topic://', 8) == 0) { $t = substr($topic, 8); } else { $this->errno = 279; $this->error = 'Specified target ('.$topic.') is not a valid topic!'; return false; } if (in_array(SAM_MQTT_QOS, $options)) { $qos = $options[SAM_MQTT_QOS]; } else { $qos = 0; } /* Are we already connected? */ if (!$this->connected) { /* No, so open up the connection... */ $this->do_connect_now(); } $mid = rand(); $variable = $this->utf($t); if ($qos > 0) { $variable .= pack('n', $mid); } $payload = $message->body; // add in the remaining length field and fix it together $msg = $this->fixed_header("MQTT_PUBLISH", 0, $qos) . $this->remaining_length(strlen($variable)+strlen($payload)) . $variable . $payload; fwrite($this->sock, $msg); if ($qos > 0) { $hdr = $this->read_fixed_header($this->sock); if ($hdr) { /* is this a QoS level 1 message being sent? */ if ($qos == 1) { /* Yup, so we should get a PUBACK response message... */ if ($hdr['mtype'] == $this->operations['MQTT_PUBACK']) { $len = $this->read_remaining_length($this->sock); if ($len > 0) { $response = fread($this->sock, $len); } if ($len < 2) { if ($this->debug) t("SAMConnection_MQTT.Send() send failed, incorrect length response ($len) received!"); $this->errno = 302; $this->error = 'Send request failed!'; $rc = false; } else { $rc = true; } } else { if ($this->debug) t('SAMConnection_MQTT.Send() Send failed response mtype = '.$mtype.' Expected PUBREC!'); $rc = false; } } else { /* lets assume it's QoS level 2... */ /* We should get a PUBREC response message... */ if ($hdr['mtype'] == $this->operations['MQTT_PUBREC']) { $len = $this->read_remaining_length($this->sock); if ($len > 0) { $response = fread($this->sock, $len); } if ($len < 2) { if ($this->debug) t("SAMConnection_MQTT.Send() send failed, incorrect length response ($len) received!"); $this->errno = 302; $this->error = 'Send request failed!'; $rc = false; } else { $rc = true; /* Now we can send a PUBREL message... */ $variable = pack('n', $mid); $msg = $this->fixed_header("MQTT_PUBREL").$this->remaining_length(strlen($variable)).$variable; fwrite($this->sock, $msg); /* get a response... */ $hdr = $this->read_fixed_header($this->sock); if ($hdr['mtype'] == $this->operations['MQTT_PUBCOMP']) { $len = $this->read_remaining_length($this->sock); if ($len > 0) { $response = fread($this->sock, $len); } if ($len < 2) { if ($this->debug) t("SAMConnection_MQTT.Send() send failed, incorrect length response ($len) received!"); $this->errno = 302; $this->error = 'Send request failed!'; $rc = false; } else { $rc = true; } } else { if ($this->debug) t('SAMConnection_MQTT.Send() Send failed response mtype = '.$mtype.' Expected PUBCOMP!'); $rc = false; } } } else { if ($this->debug) t('SAMConnection_MQTT.Send() Send failed response mtype = '.$mtype); $rc = false; } } } } if ($this->debug) x("SAMConnection_MQTT.Send() rc=$rc"); return $rc; } /* --------------------------------- SetDebug --------------------------------- */ function SetDebug($option=false) { $this->debug = $option; return; } /* --------------------------------- Subscribe --------------------------------- */ function Subscribe($topic, $options=array()) { if ($this->debug) e("SAMConnection_MQTT.Subscribe($topic)"); $rc = true; /* check the format of the topic... */ if (strncmp($topic, 'topic://', 8) == 0) { $t = substr($topic, 8); } else { $this->errno = 279; $this->error = 'Specified target ('.$topic.') is not a valid topic!'; return false; } if (in_array(SAM_MQTT_QOS, $options)) { $qos = $options[SAM_MQTT_QOS]; } else { $qos = 0; } /* Are we already connected? */ if (!$this->connected) { /* No, so open up the connection... */ if (!$this->do_connect_now()) { return false; } } // variable header: message id (16 bits) $x = rand(1, 16000); $variable = pack('n', $x); // payload: client ID $payload = $this->utf($t).pack('C', $qos); // add in the remaining length field and fix it together $msg = $this->fixed_header("MQTT_SUBSCRIBE", 0, 1) . $this->remaining_length(strlen($variable)+strlen($payload)) . $variable . $payload; fwrite($this->sock, $msg); $hdr = $this->read_fixed_header($this->sock); if (!$hdr) { if ($this->debug) t("SAMConnection_MQTT.Subscribe() subscribe failed, no response from broker!"); $this->errno = 301; $this->error = 'Subscribe request failed, no response from broker!'; $rc = false; } else { if ($hdr['mtype'] == $this->operations['MQTT_SUBACK']) { $len = $this->read_remaining_length($this->sock); if ($len > 0) { $response = fread($this->sock, $len); /* Return the subscription id with the topic appended to it so we can unsubscribe easily... */ $rc = $this->sub_id.SAM_MQTT_SUB_SEPARATOR.$t; } if ($len < 3) { if ($this->debug) t("SAMConnection_MQTT.Subscribe() subscribe failed, incorrect length response ($len) received!"); $this->errno = 301; $this->error = 'Subscribe request failed, incorrect length response ($len) received!'; $rc = false; } } else { if ($this->debug) t('SAMConnection_MQTT.Subscribe() subscribe failed response mtype = '.$mtype); $rc = false; } } if ($this->debug) x("SAMConnection_MQTT.Subscribe() rc=$rc"); return $rc; } /* --------------------------------- Unsubscribe --------------------------------- */ function Unsubscribe($sub_id) { if ($this->debug) e("SAMConnection_MQTT.Unsubscribe($sub_id)"); /* Detach the topic from the rear of the subscription id... */ $x = strpos($sub_id, SAM_MQTT_SUB_SEPARATOR); if (!$x) { $this->errno = 279; $this->error = 'Specified subscription id ('.$sub_id.') is not valid!'; return false; } $topic = substr($sub_id, $x + strlen(SAM_MQTT_SUB_SEPARATOR)); $si = substr($sub_id, 0, $x); /* Are we already connected? */ if (!$this->connected) { if ($this->debug) t('SAMConnection_MQTT.Unsubscribe() Not connected.'); /* No, so open up the connection... */ $this->sub_id = $si; $rc = $this->do_connect_now(); } else { /* We are already connected. Are we using the right subscriber id? */ if ($this->sub_id != $si) { if ($this->debug) t('SAMConnection_MQTT.Unsubscribe() Connected with wrong sub_id.'); /* No, We better reconnect then... */ $this->disconnect(); $this->sub_id = $si; $rc = $this->do_connect_now(); } else { if ($this->debug) t('SAMConnection_MQTT.Unsubscribe() Connected OK.'); $rc = true; } } /* variable header: message id (16 bits) */ $x = rand(1, 16000); $variable = pack('n', $x); /* payload: client ID */ $payload = $this->utf($topic); /* add in the remaining length field and fix it together */ $msg = $this->fixed_header("MQTT_UNSUBSCRIBE", 0, 1) . $this->remaining_length(strlen($variable)+strlen($payload)) . $variable . $payload; fwrite($this->sock, $msg); $hdr = $this->read_fixed_header($this->sock); if (!$hdr) { if ($this->debug) t("SAMConnection_MQTT.Unsubscribe() unsubscribe failed, no response from broker!"); $this->errno = 302; $this->error = 'Unsubscribe request failed, no response from broker!'; $rc = false; } else { if ($hdr['mtype'] == $this->operations['MQTT_UNSUBACK']) { $len = $this->read_remaining_length($this->sock); if ($len > 0) { $response = fread($this->sock, $len); $rc = $this->sub_id; } if ($len != 2) { if ($this->debug) t("SAMConnection_MQTT.Unsubscribe() unsubscribe failed, incorrect length response ($len) received!"); $this->errno = 301; $this->error = "Unsubscribe request failed, incorrect length response ($len) received!"; $rc = false; } } else { if ($this->debug) t('SAMConnection_MQTT.Unsubscribe() unsubscribe failed response mtype = '.$hdr['mtype']); $rc = false; } } if ($this->debug) x("SAMConnection_MQTT.Unsubscribe() rc=$rc"); return $rc; } function remaining_length($l) { /* return the remaining length field bytes for an integer input parameter */ if ($this->debug) t("SAMConnection_MQTT.remaining_length() l=$l"); $rlf = ''; do { $digit = $l % 128; $l = ($l - $digit)/128; if ($this->debug) t("SAMConnection_MQTT.remaining_length() digit=$digit l=$l"); # if there are more digits to encode, set the top bit of this digit if ( $l > 0 ) { $digit += 128; } $digit = pack('C', $digit); $rlf .= $digit; if ($this->debug) t("SAMConnection_MQTT.remaining_length() rlf=$rlf"); } while ($l > 0); return $rlf; } function utf($s) { /* return the UTF-8 encoded version of the parameter */ $l = strlen($s); $b1 = pack('C', $l/256); $b2 = pack('C', $l%256); $rc = $b1.$b2.$s; return $rc; } function fixed_header($operation, $dup=0, $qos=0, $retain=0) { /* fixed header: msg type (4) dup (1) qos (2) retain (1) */ return pack('C', ($this->operations[$operation] * 16) + ($dup * 4) + ($qos * 2) + $retain); } function checkHost($hostname, $port) { if ($this->debug) e("SAMConnection_MQTT.checkHost($hostname)"); $rc = false; $fp = fsockopen($hostname, $port); if (!$fp) { $rc = false; } else { $this->sock = $fp; $rc = true; } if ($this->debug) x("SAMConnection_MQTT.checkHost(rc=$rc)"); return $rc; } function do_connect_now() { $rc = true; /* Do we have a client/subscriber id yet? */ if ($this->sub_id == '') { /* No, so create a unique one... */ $this->sub_id = uniqid('', true); if ($this->debug) t("SAMConnection_MQTT.do_connect_now() sub_id=$this->sub_id"); } else { if ($this->debug) t("SAMConnection_MQTT.do_connect_now() using existing sub_id=$this->sub_id"); } if ($this->cleanstart) { $x = "\x03"; } else { $x = "\x00"; } $variable = $this->utf('MQIsdp')."\x03$x\x00\x00"; /* payload is subscriber id */ $payload = $this->utf($this->sub_id); /* add in the remaining length field and fix it together */ $msg = $this->fixed_header("MQTT_CONNECT") . $this->remaining_length(strlen($variable)+strlen($payload)) . $variable . $payload; $errno = 0; $errstr = ''; if (!$this->virtualConnected) { $fp = fsockopen($this->host, $this->port, $errno, $errstr); if (!$fp) { if ($this->debug) t("SAMConnection_MQTT.do_connect_now() fsockopen failed! ($errno) $errstr"); $this->errno = 208; $this->error = 'Unable to open socket to broker!'; $this->sock = NULL; return false; } else { $this->virtualConnected = true; $this->sock = $fp; } } stream_set_timeout($this->sock, 10); fwrite($this->sock, $msg); $hdr = $this->read_fixed_header($this->sock); if ($hdr) { if ($hdr['mtype'] == $this->operations['MQTT_CONNACK']) { $len = $this->read_remaining_length($this->sock); if ($len < 2) { if ($this->debug) t("SAMConnection_MQTT.do_connect_now() connect failed, incorrect length response ($len) received!"); $this->errno = 218; $this->error = 'Unable to open connection to broker!'; $rc = false; } else { $response = fread($this->sock, $len); $fields = unpack('Ccomp/Cretcode', $response); if ($fields['retcode'] == 0) { $rc = $this->sock; $this->connected = true; $rc = true; if ($this->debug) t('SAMConnection_MQTT.do_connect_now() connected OK'); } else { if ($this->debug) t('SAMConnection_MQTT.do_connect_now() connect failed retcode = '.$fields['retcode']); $rc = false; if ($fields['retcode'] == 2) { $this->sub_id = ''; $this->errno = 279; $this->error = 'Invalid subscription id!'; } } } } else { if ($this->debug) t('SAMConnection_MQTT.do_connect_now() connect failed response mtype = '.$mtype); $rc = false; } } if (!$rc) { fclose($this->sock); $this->sock = NULL; $this->virtualConnected = false; } return $rc; } function read_fixed_header($conn) { $rc = false; $response = fread($conn, 1); if (strlen($response) > 0) { $fields = unpack('Cbyte1', $response); $x = $fields['byte1']; $ret = $x % 2; $x -= $ret; $qos = ($x % 8) / 2; $x -= ($qos * 2); $dup = ($x % 16) / 8; $x -= ($dup * 8); $mtype = $x / 16; if ($this->debug) t("SAMConnection_MQTT.read_fixed_header() mtype=$mtype, dup=$dup, qos=$qos, retain=$ret"); $rc = array('mtype' => $mtype, 'dup' => $dup, 'qos' => $qos, 'retain' => $ret); } return $rc; } function read_remaining_length($conn) { $rc = 0; $m = 1; while (!feof($conn)) { $byte = fgetc($conn); $fields = unpack('Ca', $byte); $x = $fields['a']; if ($this->debug) t('SAMConnection_MQTT.read_remaining_length() byte ('.strlen($byte).') = '.$x); if ($x < 128) { $rc += $x * $m; break; } else { $rc += (($x - 128) * $m); } $m *= 128; } if ($this->debug) t('SAMConnection_MQTT.read_remaining_length() remaining length = '.$rc); return $rc; } function read_topic($conn) { if ($this->debug) e('SAMConnection_MQTT.read_topic()'); $rc = false; while (!feof($conn)) { $tlen = fread($conn, 2); $fields = unpack('na', $tlen); if ($this->debug) t('SAMConnection_MQTT.read_topic() topic length='.$fields['a']); $rc = fread($conn, $fields['a']); break; } if ($this->debug) x("SAMConnection_MQTT.read_topic(rc=$rc)"); return $rc; } } ?>
SAM/php_sam.php
<?php /* +----------------------------------------------------------------------+ | Copyright IBM Corporation 2006, 2007. | | All Rights Reserved. | +----------------------------------------------------------------------+ | | | Licensed under the Apache License, Version 2.0 (the "License"); you | | may not use this file except in compliance with the License. You may | | obtain a copy of the License at | | http://www.apache.org/licenses/LICENSE-2.0 | | | | Unless required by applicable law or agreed to in writing, software | | distributed under the License is distributed on an "AS IS" BASIS, | | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | | implied. See the License for the specific language governing | | permissions and limitations under the License. | +----------------------------------------------------------------------+ | Author: Dave Renshaw | +----------------------------------------------------------------------+ $Id: php_sam.php,v 1.1 2007/02/02 15:38:53 dsr Exp $ */ /* Debugging flags and functions available to sub packages... */ $eol = "\n"; if (isset($_SERVER['REQUEST_URI'])) { $eol = '<br>'; } function e($s) {global $eol;echo '-->'.$s."$eol";} function t($s) {global $eol;echo ' '.$s."$eol";} function x($s) {global $eol;echo '<--'.$s."$eol";} define('SAM_MQTT', 'mqtt'); /* --------------------------------- SAMConnection --------------------------------- */ class SAMConnection { // var $debug = true; var $debug = false; var $errno = 0; var $error = ''; var $connection; /* --------------------------------- Create --------------------------------- */ function Create($proto) { if ($this->debug) e("SAMConnection.Create(proto=$proto)"); $rc = false; /* search the PHP config for a factory to use... */ $x = get_cfg_var('sam.factory.'.$proto); if ($this->debug) t('SAMConnection.Create() get_cfg_var() "'.$x.'"'); /* If there is no configuration (php.ini) entry for this protocol, default it. */ if (strlen($x) == 0) { /* for every protocol other than MQTT assume we will use XMS */ if ($proto != 'mqtt') { $x = 'xms'; } else { $x = 'mqtt'; } } /* Invoke the chosen factory to create a real connection object... */ $x = 'sam_factory_'.$x.'.php'; if ($this->debug) t("SAMConnection.Create() calling factory - $x"); $rc = include $x; if ($this->debug && $rc) t('SAMConnection.Create() rc = '.get_class($rc)); if ($this->debug) x('SAMConnection.Create()'); return $rc; } /* --------------------------------- Constructor --------------------------------- */ function SAMConnection() { if ($this->debug) e('SAMConnection()'); if ($this->debug) x('SAMConnection()'); } /* --------------------------------- Commit --------------------------------- */ function Commit() { if ($this->debug) e('SAMConnection.Commit()'); $rc = true; if (!$this->connection) { $errno = 106; $error = 'No active connection!'; $rc = false; } else { /* Call the method on the underlying connection object... */ $rc = $this->connection->commit($target, $options); $this->errno = $this->connection->errno; $this->error = $this->connection->error; if (!$rc) { if ($this->debug) t("SAMConnection.Commit() commit failed ($this->errno) $this->error"); $rc = false; } } if ($this->debug) x("SAMConnection.Commit() rc=$rc"); return $rc; } /* --------------------------------- Connect --------------------------------- */ function Connect($proto='', $options=array()) { if ($this->debug) e('SAMConnection.Connect()'); $rc = false; if ($proto == '') { $errno = 101; $error = 'Incorrect number of parameters on connect call!'; $rc = false; } else { $this->connection = $this->create($proto); if (!$this->connection) { $errno = 102; $error = 'Unsupported protocol!'; $rc = false; } else { if ($this->debug) t("SAMConnection.Connect() connection created for protocol $proto"); $this->connection->setdebug($this->debug); /* Call the connect method on the newly created connection object... */ $rc = $this->connection->connect($proto, $options); $this->errno = $this->connection->errno; $this->error = $this->connection->error; if (!$rc) { if ($this->debug) t("SAMConnection.Connect() connect failed ($this->errno) $this->error"); } else { $rc = true; } } } if ($this->debug) x("SAMConnection.Connect() rc=$rc"); return $rc; } /* --------------------------------- Disconnect --------------------------------- */ function Disconnect() { if ($this->debug) e('SAMConnection.Disconnect()'); $rc = true; if (!$this->connection) { $errno = 106; $error = 'No active connection!'; $rc = false; } else { /* Call the method on the underlying connection object... */ $rc = $this->connection->Disconnect(); $this->errno = $this->connection->errno; $this->error = $this->connection->error; if (!$rc) { if ($this->debug) t("SAMConnection.Disconnect() Disconnect failed ($this->errno) $this->error"); } else { $rc = true; $this->connection = false; } } if ($this->debug) x("SAMConnection.Disconnect() rc=$rc"); return $rc; } /* --------------------------------- IsConnected --------------------------------- */ function IsConnected() { if ($this->debug) e('SAMConnection.IsConnected()'); $rc = true; if (!$this->connection) { $errno = 106; $error = 'No active connection!'; $rc = false; } else { /* Call the method on the underlying connection object... */ $rc = $this->connection->isconnected(); $this->errno = $this->connection->errno; $this->error = $this->connection->error; if (!$rc) { if ($this->debug) t("SAMConnection.IsConnected() isconnected failed ($this->errno) $this->error"); $rc = false; } } if ($this->debug) x("SAMConnection.IsConnected() rc=$rc"); return $rc; } /* --------------------------------- Peek --------------------------------- */ function Peek($target, $options=array()) { if ($this->debug) e('SAMConnection.Peek()'); $rc = true; if (!$this->connection) { $errno = 106; $error = 'No active connection!'; $rc = false; } else { /* Call the method on the underlying connection object... */ $rc = $this->connection->peek($target, $options); $this->errno = $this->connection->errno; $this->error = $this->connection->error; if (!$rc) { if ($this->debug) t("SAMConnection.Peek() peek failed ($this->errno) $this->error"); $rc = false; } } if ($this->debug) x("SAMConnection.Peek() rc=$rc"); return $rc; } /* --------------------------------- PeekAll --------------------------------- */ function PeekAll($target, $options=array()) { if ($this->debug) e('SAMConnection.PeekAll()'); $rc = true; if (!$this->connection) { $errno = 106; $error = 'No active connection!'; $rc = false; } else { /* Call the method on the underlying connection object... */ $rc = $this->connection->peekall($target, $options); $this->errno = $this->connection->errno; $this->error = $this->connection->error; if (!$rc) { if ($this->debug) t("SAMConnection.PeekAll() peekall failed ($this->errno) $this->error"); $rc = false; } } if ($this->debug) x("SAMConnection.PeekAll() rc=$rc"); return $rc; } /* --------------------------------- Receive --------------------------------- */ function Receive($target, $options=array()) { if ($this->debug) e('SAMConnection.Receive()'); $rc = true; if (!$this->connection) { $errno = 106; $error = 'No active connection!'; $rc = false; } else { /* Call the receive method on the underlying connection object... */ $rc = $this->connection->receive($target, $options); $this->errno = $this->connection->errno; $this->error = $this->connection->error; if (!$rc) { if ($this->debug) t("SAMConnection.Receive() receive failed ($this->errno) $this->error"); } } if ($this->debug) x("SAMConnection.Receive() rc=$rc"); return $rc; } /* --------------------------------- Remove --------------------------------- */ function Remove($target, $options=array()) { if ($this->debug) e('SAMConnection.Remove()'); $rc = true; if (!$this->connection) { $errno = 106; $error = 'No active connection!'; $rc = false; } else { /* Call the method on the underlying connection object... */ $rc = $this->connection->remove($target, $options); $this->errno = $this->connection->errno; $this->error = $this->connection->error; if (!$rc) { if ($this->debug) t("SAMConnection.Remove() remove failed ($this->errno) $this->error"); $rc = false; } } if ($this->debug) x("SAMConnection.Remove() rc=$rc"); return $rc; } /* --------------------------------- Rollback --------------------------------- */ function Rollback() { if ($this->debug) e('SAMConnection.Rollback()'); $rc = true; if (!$this->connection) { $errno = 106; $error = 'No active connection!'; $rc = false; } else { /* Call the method on the underlying connection object... */ $rc = $this->connection->rollback($target, $options); $this->errno = $this->connection->errno; $this->error = $this->connection->error; if (!$rc) { if ($this->debug) t("SAMConnection.Rollback() rollback failed ($this->errno) $this->error"); $rc = false; } } if ($this->debug) x("SAMConnection.Rollback() rc=$rc"); return $rc; } /* --------------------------------- Send --------------------------------- */ function Send($target, $msg, $options=array()) { if ($this->debug) e('SAMConnection.Send()'); $rc = true; if (!$this->connection) { $errno = 106; $error = 'No active connection!'; $rc = false; } else { /* Call the send method on the underlying connection object... */ $rc = $this->connection->send($target, $msg, $options); $this->errno = $this->connection->errno; $this->error = $this->connection->error; if (!$rc) { if ($this->debug) t("SAMConnection.Send() send failed ($this->errno) $this->error"); $rc = false; } } if ($this->debug) x("SAMConnection.Send() rc=$rc"); return $rc; } /* --------------------------------- SetDebug --------------------------------- */ function SetDebug($option=false) { if ($this->debug) e("SAMConnection.SetDebug($option)"); $this->debug = $option; if ($this->connection) { $this->connection->setdebug($option); } if ($this->debug) x('SAMConnection.SetDebug()'); return; } /* --------------------------------- Subscribe --------------------------------- */ function Subscribe($topic, $options=array()) { if ($this->debug) e("SAMConnection.Subscribe($topic)"); $rc = true; if (!$this->connection) { $errno = 106; $error = 'No active connection!'; $rc = false; } else { /* Call the subscribe method on the underlying connection object... */ $rc = $this->connection->subscribe($topic, $options); $this->errno = $this->connection->errno; $this->error = $this->connection->error; if (!$rc) { if ($this->debug) t("SAMConnection.Subscribe() subscribe failed ($this->errno) $this->error"); $rc = false; } } if ($this->debug) x("SAMConnection.Subscribe() rc=$rc"); return $rc; } /* --------------------------------- Unsubscribe --------------------------------- */ function Unsubscribe($sub_id) { if ($this->debug) e("SAMConnection.Unsubscribe($sub_id)"); $rc = true; if (!$this->connection) { $errno = 106; $error = 'No active connection!'; $rc = false; } else { /* Call the subscribe method on the underlying connection object... */ $rc = $this->connection->unsubscribe($sub_id); $this->errno = $this->connection->errno; $this->error = $this->connection->error; if (!$rc) { if ($this->debug) t("SAMConnection.Unsubscribe() unsubscribe failed ($this->errno) $this->error"); $rc = false; } } if ($this->debug) x("SAMConnection.Unsubscribe() rc=$rc"); return $rc; } } /* --------------------------------- SAMMessage --------------------------------- */ class SAMMessage { /* --------------------------------- Constructor --------------------------------- */ function SAMMessage($body='') { if ($body != '') { $this->body = $body; } } } ?>
SAM/sam_factory_mqtt.php
<?php /* +----------------------------------------------------------------------+ | Copyright IBM Corporation 2006, 2007. | | All Rights Reserved. | +----------------------------------------------------------------------+ | | | Licensed under the Apache License, Version 2.0 (the "License"); you | | may not use this file except in compliance with the License. You may | | obtain a copy of the License at | | http://www.apache.org/licenses/LICENSE-2.0 | | | | Unless required by applicable law or agreed to in writing, software | | distributed under the License is distributed on an "AS IS" BASIS, | | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | | implied. See the License for the specific language governing | | permissions and limitations under the License. | +----------------------------------------------------------------------+ | Author: Dave Renshaw | +----------------------------------------------------------------------+ $Id: sam_factory_mqtt.php,v 1.1 2007/02/02 15:40:41 dsr Exp $ */ require_once('MQTT/sam_mqtt.php'); return new SAMConnection_MQTT(); ?>
SAM/sam_factory_xms.php
<?php /* +----------------------------------------------------------------------+ | Copyright IBM Corporation 2006, 2007. | | All Rights Reserved. | +----------------------------------------------------------------------+ | | | Licensed under the Apache License, Version 2.0 (the "License"); you | | may not use this file except in compliance with the License. You may | | obtain a copy of the License at | | http://www.apache.org/licenses/LICENSE-2.0 | | | | Unless required by applicable law or agreed to in writing, software | | distributed under the License is distributed on an "AS IS" BASIS, | | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | | implied. See the License for the specific language governing | | permissions and limitations under the License. | +----------------------------------------------------------------------+ | Author: Dave Renshaw | +----------------------------------------------------------------------+ $Id: sam_factory_xms.php,v 1.1 2007/02/02 15:40:00 dsr Exp $ */ if (!class_exists('SAMXMSConnection')) { global $eol; $l = (strstr(PHP_OS, 'WIN') ? 'php_' : '').'sam_xms.'.(strstr(PHP_OS, 'WIN') ? 'dll' : 'so'); echo $eol.'<font color="red"><b>Unable to access SAM XMS capabilities. Ensure the '.$l.' library is defined as an extension.</b></font>'.$eol; return false; } else { return new SAMXMSConnection(); } ?>