简介:多进程池共享订阅主题,避免每次处理一个消息时都创建和关系进程,发送消息时,进程使用长连接方式,避免每次发消息都建立和关闭连接。
一、php接口类:PhpMQTT
在原有的基础上,增加了共享订阅,对SSL支持更好,支持字节格式传输,对发送结果等判断更好
<?php
header("Content-type: text/html; charset=utf-8");
include_once '/var/www/html/api/com/com_inc.php';
class PhpMQTT {
public $address; /* broker address */
public $port; /* broker port */
public $clientid; /* client id sent to brocker */
public $cafile; /* ssl file */
public $clean_session; /* clean session */
public $will; /* stores the will of the client */
private $username; /* stores username */
private $password; /* stores password */
private $socket; /* holds the socket */
private $msgid = 1; /* counter for message id */
private $topics;
private $qos;
public $keepalive = 10; /* default keepalive timmer */
public $last_ping_time; /* host unix time, used to detect disconects */
function __construct($address, $port, $clientid, $cafile = NULL) {
$this->address = $address;
$this->port = $port;
$this->clientid = $clientid;
$this->cafile = $cafile;
}
function connect($clean_session = true, $will = NULL, $username = NULL, $password = NULL) {
$this->clean_session = $clean_session;
$this->will = $will;
$this->username = $username;
$this->password = $password;
//Open Internet or Unix domain socket connection
if ($this->cafile) {
$socketContext = stream_context_create(["ssl" => [
"verify_peer_name" => false,
"cafile" => $this->cafile]]);
$this->socket = stream_socket_client("tls://" . $this->address . ":" . $this->port, $errno, $errstr, 60, STREAM_CLIENT_CONNECT, $socketContext);
} else {
$this->socket = stream_socket_client("tcp://" . $this->address . ":" . $this->port, $errno, $errstr, 60, STREAM_CLIENT_CONNECT);
}
//On success a stream resource is returned ,, FALSE on failure.
if (!$this->socket) {
Log::info("mqtt connect error : " . $errno . " " . $errstr, Log::INFO);
return false;
}
//Set timeout period on a stream, return TRUE or FALSE
stream_set_timeout($this->socket, 5);
//Set blocking/non-blocking mode on a stream, FALSE->non-blocking, TRUE->blocking, return TRUE or FALSE
stream_set_blocking($this->socket, 0);
$buf_len = 0;
$buffer = "";
/*
//V3.1.0
$buffer .= chr(0x00); $buf_len++;
$buffer .= chr(0x06); $buf_len++;
$buffer .= chr(0x4d); $buf_len++;
$buffer .= chr(0x51); $buf_len++;
$buffer .= chr(0x49); $buf_len++;
$buffer .= chr(0x73); $buf_len++;
$buffer .= chr(0x64); $buf_len++;
$buffer .= chr(0x70); $buf_len++;
$buffer .= chr(0x03); $buf_len++;
*/
//V3.1.1 Protocol Name:length(2B)+MQTT Protocol Level:4
$buffer .= chr(0x00); $buf_len++;
$buffer .= chr(0x04); $buf_len++;
$buffer .= chr(0x4d); $buf_len++;
$buffer .= chr(0x51); $buf_len++;
$buffer .= chr(0x54); $buf_len++;
$buffer .= chr(0x54); $buf_len++;
$buffer .= chr(0x04); $buf_len++;
//connect flags(reserved, clean session, will flag, will qos(2bit), will retain, password flag, username flat)
$var = 0;
if ($this->clean_session) $var += 2; //set clean session
if ($this->will) {
$var += 4; // Set will flag
$var += ($this->will['qos'] << 3); //Set will qos
if ($this->will['retain']) $var += 32; //Set will retain
}
if ($this->username) $var += 128; //Add username to header
if ($this->password) $var += 64; //Add password to header
$buffer .= chr($var); $buf_len++;
//Keep alive (maxtime interval seconds, ping)
$buffer .= chr($this->keepalive >> 8); $buf_len++;
$buffer .= chr($this->keepalive & 0xff); $buf_len++;
//clientid: length(2B) + clientid
$buffer .= $this->concat_msb_lsb_str($this->clientid, $buf_len);
//Adding will to payload
if ($this->will) {
$buffer .= $this->concat_msb_lsb_str($this->will['topic'], $buf_len);
$buffer .= $this->concat_msb_lsb_str($this->will['content'], $buf_len);
}
//username:length(2B) + username
if ($this->username) $buffer .= $this->concat_msb_lsb_str($this->username, $buf_len);
//password:length(2B) + password
if ($this->password) $buffer .= $this->concat_msb_lsb_str($this->password, $buf_len);
$head = " ";
//fixed header
$head{0} = chr(0x10);
$head{1} = chr($buf_len);
//echo date("Y-m-d H:i:s") . "\t" . "connect send head = " . FuncUtil::chrToHexstrShow($head, 0, 2) . "\n";
//echo date("Y-m-d H:i:s") . "\t" . "connect send head = " . $head . "\n";
fwrite($this->socket, $head, 2);
Log::info("mqtt connect send head: " . FuncUtil::chrToHexstrShow($head, 0, 2), Log::INFO);
//echo date("Y-m-d H:i:s") . "\t" . "connect send buffer = " . FuncUtil::chrToHexstrShow($buffer, 0, strlen($buffer)) . "\n";
//echo date("Y-m-d H:i:s") . "\t" . "connect send buffer = " . $buffer . "\n";
fwrite($this->socket, $buffer);
Log::info("mqtt connect send buffer: " . FuncUtil::chrToHexstrShow($buffer, 0, strlen($buffer)), Log::INFO);
$string = $this->read(4);
Log::info("mqtt connect ack: " . FuncUtil::chrToHexstrShow($string, 0, 4), Log::INFO);
//echo date("Y-m-d H:i:s") . "\t" . "connect recv ack = " . $string . "\n";
//echo date("Y-m-d H:i:s") . "\t" . "connect recv ack = " . FuncUtil::chrToHexstrShow($string, 0, 4) . "\n";
//fix header:0x2002
//Connect Acknowledge Flags + Connect Return code(0:success)
if (ord($string{0}) >> 4 == 2 && $string{3} == chr(0)) {
Log::info("mqtt connect success", Log::INFO);
} else {
Log::info("mqtt connect failed", Log::INFO);
return false;
}
//start timing
$this->last_ping_time = time();
return true;
}
function reconnect() {
//Closes an open file pointer: return TRUE or FALSE
fclose($this->socket);
//Closes an open file pointer
while ($this->connect($this->clean_session, $this->will, $this->username, $this->password) == false) {
sleep(10);
}
//re subscribe topics
if (count($this->topics)) {
$this->subscribe($this->topics, $this->qos);
}
}
/* subscribe: subscribes to topics */
function subscribe($topics, $qos = 0) {
$this->qos = $qos;
$this->topics = $topics;
$buf_len = 0;
$buffer = "";
//msg identifier
$id = $this->msgid;
$buffer .= chr($id >> 8); $buf_len++;
$buffer .= chr($id % 256); $buf_len++;
//multi loop:length(2B)+topic+qos
foreach ($topics as $key => $topic) {
$buffer .= $this->concat_msb_lsb_str($key, $buf_len);
$buffer .= chr($qos); $buf_len++;
}
/*
//V3.1
$cmd = 0x80;
//$qos
$cmd += ($qos << 1);
*/
//v3.1.1 fix header
$cmd = 0x82;
$head = chr($cmd);
$head .= chr($buf_len);
//echo date("Y-m-d H:i:s") . "\t" . "subscribe send head = " . FuncUtil::chrToHexstrShow($head, 0, 2) . "\n";
//echo date("Y-m-d H:i:s") . "\t" . "subscribe send head = " . $head . "\n";
fwrite($this->socket, $head, 2);
Log::info("mqtt subscribe send head: " . FuncUtil::chrToHexstrShow($head, 0, 2), Log::INFO);
//echo date("Y-m-d H:i:s") . "\t" . "subscribe send buffer = " . FuncUtil::chrToHexstrShow($buffer, 0, $i) . "\n";
//echo date("Y-m-d H:i:s") . "\t" . "subscribe send buffer = " . $buffer . "\n";
fwrite($this->socket, $buffer, $buf_len);
Log::info("mqtt subscribe send buffer: " . FuncUtil::chrToHexstrShow($buffer, 0, $buf_len), Log::INFO);
//fixheader: 0x90 + length(msgid)
$string = $this->read(2);
Log::info("mqtt subscribe send ack 0x90+length: " . FuncUtil::chrToHexstrShow($string, 0, 2), Log::INFO);
//echo date("Y-m-d H:i:s") . "\t" . "subscribe recv buffer = " . FuncUtil::chrToHexstrShow($string, 0, 2) . "\n";
//echo date("Y-m-d H:i:s") . "\t" . "subscribe recv buffer = " . $string . "\n";
//if (!(ord($string{0}) == 0x90 && ord($string{1}) == 0x03)) {
// return false;
//}
//msgid+retcode(0,1,2:success 0x80:failure)
$bytes = ord(substr($string, 1, 1));
$string = $this->read($bytes);
Log::info("mqtt subscribe send ack msgid+retcode: " . FuncUtil::chrToHexstrShow($string, 0, $bytes), Log::INFO);
//echo date("Y-m-d H:i:s") . "\t" . "subscribe recv buffer = " . FuncUtil::chrToHexstrShow($string, 0, $bytes) . "\n";
//echo date("Y-m-d H:i:s") . "\t" . "subscribe recv buffer = " . $string . "\n";
}
/* publish: publishes $content on a $topic */
function publish($topic, $content, $contentlen, $qos = 0, $retain = 0) {
$buf_len = 0;
$buffer = "";
//topic:length(2B) + topic
$buffer .= $this->concat_msb_lsb_str($topic, $buf_len);
//qos=1/2: msg identifier
if ($qos) {
$id = $this->msgid++;
$buffer .= chr($id >> 8); $buf_len++;
$buffer .= chr($id % 256); $buf_len++;
}
//publish msg
$buffer .= $content;
$buf_len += $contentlen;
//cmd(1B):0x30+dup+qos-h+qos-l+retain
$head = " ";
$cmd = 0x30;
if ($qos) $cmd += $qos << 1;
if ($retain) $cmd += 1;
//fixheader:cmd + length
$head{0} = chr($cmd);
$head .= $this->convert_len2str($buf_len);
Log::info("mqtt send head = " . FuncUtil::chrToHexstrShow($head, 0, strlen($head)), Log::INFO);
$ret = fwrite($this->socket, $head, strlen($head));
if (!$ret) {
Log::info("mqtt send head failed", Log::INFO);
return $ret;
}
Log::info("mqtt send topic = " . $topic, Log::INFO);
Log::info("mqtt send msg header = " . FuncUtil::chrToHexstrShow($content, 0, 6), Log::INFO);
Log::info("mqtt send msg body = " . FuncUtil::chrToHexstrShow($content, 6, $contentlen - 6), Log::INFO);
$ret = fwrite($this->socket, $buffer, $buf_len);
if (!$ret) {
Log::info("mqtt send buffer failed", Log::INFO);
return $ret;
}
//only QOS=1|2 puback
if($qos) {
//0x40+length+msg identifier
$string = $this->read(4);
Log::info("mqtt send ack " . FuncUtil::chrToHexstrShow($string, 0, 4), Log::INFO);
if (!strlen($string)) {
Log::info("mqtt send ack failed", Log::INFO);
$ret = false;
} else {
if (ord($string{0}) == 0x40 && ord($string{1}) == 0x02 && (ord($string{2})*256 + ord($string{3}) == $id)) {
$ret = true;
Log::info("mqtt send ack success", Log::INFO);
} else {
Log::info("mqtt send ack failed", Log::INFO);
$ret = false;
}
}
}
return $ret;
}
/* proc: the processing loop for an "allways on" client
set true when you are doing other stuff in the loop good for watching something else at the same time */
function proc( $loop = true) {
$cmd = 0;
//Use feof for checking if stream is open.
if (feof($this->socket)) {
Log::info("mqtt reconnect feof", Log::INFO);
$this->reconnect();
}
$cmd_type = $this->read(1, true);
//echo date("Y-m-d H:i:s") . "\t" . "proc recv = " . strlen($cmd_type) . "\t" . $cmd_type . "\n";
//echo date("Y-m-d H:i:s") . "\t" . "proc recv = " . FuncUtil::chrToHexstrShow($cmd_type, 0, strlen($cmd_type)) . "\n";
if (!strlen($cmd_type)) {
if ($loop) {
usleep(100000);
}
} else {
//fixheader:cmd
$cmd = (int)(ord($cmd_type)/16);
//echo date("Y-m-d H:i:s") . "\t" . "proc fixheader cmd = " . $cmd . "\n";
//echo date("Y-m-d H:i:s") . "\t" . "proc fixheader = " . FuncUtil::chrToHexstrShow($cmd_type, 0, strlen($cmd_type)) . "\n";
//fixheader:length
$multiplier = 1;
$length = 0;
do{
$digit = ord($this->read(1));
$length += ($digit & 127) * $multiplier;
$multiplier *= 128;
} while (($digit & 128) != 0);
//echo date("Y-m-d H:i:s") . "\t" . "proc recv fixheader length = " . $length . "\n";
if ($length){
$string = $this->read($length);
}
//echo date("Y-m-d H:i:s") . "\t" . "proc recv content = " . $string . "\n";
//echo date("Y-m-d H:i:s") . "\t" . "proc recv content = " . FuncUtil::chrToHexstrShow($string, 0, $length) . "\n";
if ($cmd) {
switch ($cmd) {
case 3:
$this->call_message($string);
break;
}
$this->last_ping_time = time();
}
}
if ($this->last_ping_time < (time() - $this->keepalive )) {
//Log::info("not found something so ping", Log::INFO);
$this->ping();
}
if ($this->last_ping_time < (time() - ($this->keepalive * 2))) {
Log::info("mqtt reconnect disconnect(long no ping)", Log::INFO);
$this->reconnect();
}
return true;
}
/* message: processes a recieved topic */
function call_message($msg) {
$tlen = (ord($msg{0}) << 8) + ord($msg{1});
$topic = substr($msg, 2, $tlen);
if($this->qos){
$msg_id = substr($msg, ($tlen+2), 2);
$msg = substr($msg, ($tlen+4));
$head_ack = chr(0x40);
$head_ack .= chr(0x02);
$head_ack .= $msg_id;
Log::info("mqtt recv ack = " . FuncUtil::chrToHexstrShow($head_ack, 0, strlen($head_ack)), Log::INFO);
$ret = fwrite($this->socket, $head_ack, strlen($head_ack));
}else{
$msg = substr($msg, ($tlen+2));
}
Log::info("mqtt recv: " . date("r"), Log::INFO);
Log::info("mqtt recv topic = " . $topic, Log::INFO);
Log::info("mqtt recv header = " . FuncUtil::chrToHexstrShow($msg, 0, 6), Log::INFO);
Log::info("mqtt recv body = " . FuncUtil::chrToHexstrShow($msg, 6, FuncUtil::BEChrToInt($msg, 4, 2)), Log::INFO);
$found = false;
foreach ($this->topics as $key => $top) {
$key = str_replace('$queue/', "", $key);
//Perform a regular expression match
if ( preg_match("/^" . str_replace("#", ".*",
str_replace("+", "[^\/]*",
str_replace("/", "\/",
str_replace("$", '\$', $key)))) . "$/", $topic) ) {
//Verify that the contents of a variable can be called as a function
if (is_callable($top['function'])) {
//Call the callback given by the first parameter
call_user_func($top['function'], $topic, $msg);
$found = true;
}
}
}
//if (!$found) Log::info("msg recieved but no match in subscriptions", Log::INFO);
}
/* read: reads in so many bytes */
function read($len = 8192, $nb = false) {
$string="";
$left_over = $len;
if ($nb) {
return fread($this->socket, $left_over);
}
while (!feof($this->socket) && $left_over > 0) {
$fread = fread($this->socket, $left_over);
$string .= $fread;
$left_over = $len - strlen($string);
}
return $string;
}
/* ping: sends a keep alive ping */
function ping() {
//fixheader: 0xc0(response 0xd0)
$head = " ";
$head = chr(0xc0);
$head .= chr(0x00);
fwrite($this->socket, $head, 2);
//Log::info("ping send", Log::INFO);
}
/* disconnect: sends a proper disconect cmd */
function disconnect() {
//fixheader: 0xe0
$head = " ";
$head{0} = chr(0xe0);
$head{1} = chr(0x00);
fwrite($this->socket, $head, 2);
Log::info("mqtt disconnect", Log::INFO);
}
/* close: sends a proper disconect, then closes the socket */
function close() {
Log::info("mqtt close", Log::INFO);
$this->disconnect();
stream_socket_shutdown($this->socket, STREAM_SHUT_WR);
Log::info("mqtt stream_socket_shutdown", Log::INFO);
}
/* convert_len2str: */
function convert_len2str($len) {
$string = "";
do{
$digit = $len % 128;
$len = $len >> 7;
// if there are more digits to encode, set the top bit of this digit
if ($len > 0)
$digit = ($digit | 0x80);
$string .= chr($digit);
} while ($len > 0);
return $string;
}
/* concat_msb_lsb_str: writes a string to a buffer */
function concat_msb_lsb_str($str, &$val) {
$ret = " ";
$len = strlen($str);
$msb = $len >> 8;
$lsb = $len % 256;
$ret = chr($msb);
$ret .= chr($lsb);
$ret .= $str;
$val += ($len+2);
return $ret;
}
}
二、发送消息
支持发送失败后的重连重发
<?php
header("Content-type: text/html; charset=utf-8");
include_once '/var/www/html/api/com/com_inc.php';
class MQTTPublishBinary {
private $mqtt = null;
private static $singleinstance;
public static function getIntance() {
if (!(self::$singleinstance instanceof self)) {
self::$singleinstance = new self();
}
return self::$singleinstance;
}
private function __construct() {}
private function __clone() {}
public function exec($topic, $data, $datalen) {
try {
if(!$this->mqtt){
if(!$this->connect()){
return false;
}
}
$send_ret = $this->mqtt->publish($topic, $data, $datalen, 1);
if (!$send_ret) {
Log::info("mqtt publish send failed one", Log::INFO);
$this->disconnect();
if (!$this->connect()) {
return false;
}
$send_ret = $this->mqtt->publish($topic, $data, $datalen, 1);
if (!$send_ret) {
Log::info("mqtt publish send failed two", Log::INFO);
}
}
} catch (Exception $e) {
Log::info("exec Exception: " . $e->getMessage(), Log::EXCEPT);
return false;
}
}
public function disconnect() {
try {
$this->mqtt->close();
$this->mqtt = null;
} catch (Exception $e) {
Log::info("exec Exception: " . $e->getMessage(), Log::EXCEPT);
return false;
}
}
public function connect() {
try {
//$this->mqtt = new PhpMQTT(Config::getMQTTServer(), Config::s_mqtt_port_ssl, FuncUtil::getEmqttClientID(), Config::s_mqtt_cafile);
$this->mqtt = new PhpMQTT(Config::getMQTTServer(), Config::s_mqtt_port, FuncUtil::getEmqttClientID());
if ($this->mqtt->connect(true, NULL, Config::s_mqtt_username, Config::s_mqtt_password)) {
return true;
} else {
return false;
}
} catch (Exception $e) {
Log::info("exec Exception: " . $e->getMessage(), Log::EXCEPT);
return false;
}
}
}
?>
三、消息接收
<?php
header("Content-type: text/html; charset=utf-8");
include_once '/var/www/html/api/com/com_inc.php';
$total_elp = 0;
class MQTTSubscribeBinary {
private $mqtt = null;
private static $singleinstance;
public static function getIntance() {
if (!(self::$singleinstance instanceof self)) {
self::$singleinstance = new self();
}
return self::$singleinstance;
}
private function __construct() {
set_time_limit(0);
}
private function __clone() {}
public function exec($topic) {
try {
Log::info("MQTTSubscribeBinary::exec", Log::INFO);
//$this->mqtt = new PhpMQTT(Config::getMQTTServer(), Config::s_mqtt_port_ssl, $server_cfg['client_id'], Config::s_mqtt_cafile);
$this->mqtt = new PhpMQTT(Config::getMQTTServer(), Config::s_mqtt_port, FuncUtil::getEmqttClientID());
if (!$this->mqtt->connect(true, NULL, Config::s_mqtt_username, Config::s_mqtt_password)) {
Log::info("MQTTSubscribeBinary connect failed ", Log::INFO);
exit(1);
}
$_topics[$topic] = array("function" => "proc_msg");
$this->mqtt->subscribe($_topics, 0);
while ($this->mqtt->proc()) {
}
} catch (Exception $e) {
Log::info("exec Exception: " . $e->getMessage(), Log::EXCEPT);
} finally {
$this->close();
sleep(10);
$this->exec($topic);
}
}
private function close() {
Log::info("MQTTSubscribeBinary::close", Log::INFO);
try {
if ($this->mqtt != null) {
$this->mqtt->close();
$this->mqtt = null;
}
} catch (Exception $e) {
Log::info("close Exception: " . $e->getMessage(), Log::EXCEPT);
}
}
}
function proc_msg($topic, $msg) {
try {
$start = microtime(true);
switch (FuncUtil::BEChrToInt($msg, 2, 2)) {
default:
break;
}
$elp = (microtime(true)-$start)*1000;
global $total_elp;
$total_elp = $total_elp + $elp;
//echo date("Y-m-d H:i:s") . "\t" . posix_getpid() . "\ttotal_elp = " . $total_elp . "\t elp = " . $elp . "\t" . $topic . "\n";
} catch (Exception $e) {
Log::info("procmsg Exception: " . $e->getMessage(), Log::EXCEPT);
return false;
}
return true;
}
?>
四、多进程接收消息
<?php
header("Content-type: text/html; charset=utf-8");
include_once '/var/www/html/api/com/com_inc.php';
MQTTStartupService::getIntance()->exec();
class MQTTStartupService {
private static $singleinstance;
public static function getIntance() {
if (!(self::$singleinstance instanceof self)) {
self::$singleinstance = new self();
}
return self::$singleinstance;
}
private function __construct() {}
private function __clone() {}
public function exec() {
try {
$server_cfg = array(
array("topic" => '$queue//ibc/+/send', "proc_num" => Config::getIBCProcNum()),
array("topic" => '$queue//ble/+/send', "proc_num" => Config::getBLEProcNum())
);
pcntl_signal(SIGCHLD, SIG_IGN);
$_arr_child = array();
for ($_cnt = 0; $_cnt < count($server_cfg); $_cnt++) {
$_pid = pcntl_fork();
if ($_pid < 0) {
Log::info("fork error : {$_cnt}", Log::INFO);
exit;
} elseif ($_pid) {
$_arr_child[$_cnt] = $_pid;
} else {
Log::info("//////////////////////////////// topic=" . $server_cfg[$_cnt]['topic'] . " ////////////////////////////////", Log::INFO);
Log::info("process no = " . $_cnt . ":\t process_id = " . posix_getpid(), Log::INFO);
$this->startupService($server_cfg[$_cnt]['topic'], $server_cfg[$_cnt]['proc_num']);
}
sleep(1) ;
}
for ($_idx = 0; $_idx < count($_arr_child); $_idx++) {
if ($_arr_child[$_idx]) {
pcntl_waitpid($_arr_child[$_idx], $status);
}
}
} catch (Exception $e) {
Log::info("exec Exception: " . $e->getMessage(), Log::EXCEPT);
}
}
private function startupService($topic, $proc_num) {
try {
pcntl_signal(SIGCHLD, SIG_IGN);
$_arr_child = array();
for ($_cnt = 0; $_cnt < $proc_num; $_cnt++) {
$_pid = pcntl_fork();
if ($_pid < 0) {
Log::info("fork error : {$_cnt}", Log::INFO);
exit;
} elseif ($_pid) {
$_arr_child[$_cnt] = $_pid;
} else {
Log::info("topic = " . $topic . "\tprocess no = " . $_cnt . ":\t process_id = " . posix_getpid(), Log::INFO);
echo "topic = " . $topic . "\tprocess no = " . $_cnt . ":\t process_id = " . posix_getpid() . "\n";
try {
MQTTSubscribeBinary::getIntance()->exec($topic);
} catch (Exception $e) {
} finally {
exit(0);
}
}
usleep(100000) ;
}
for ($_idx = 0; $_idx < count($_arr_child); $_idx++) {
if ($_arr_child[$_idx]) {
pcntl_waitpid($_arr_child[$_idx], $status);
}
}
} catch (Exception $e) {
Log::info("startupService Exception: " . $e->getMessage(), Log::EXCEPT);
return false;
}
return true;
}
}
?>