phpMQTT进程池字节格式

简介:多进程池共享订阅主题,避免每次处理一个消息时都创建和关系进程,发送消息时,进程使用长连接方式,避免每次发消息都建立和关闭连接。

一、php接口类:PhpMQTT

在原有的基础上,增加了共享订阅,对SSL支持更好,支持字节格式传输,对发送结果等判断更好

<?php          
header("Content-type: text/html; charset=utf-8");
include_once '/var/www/html/api/com/com_inc.php';

class PhpMQTT {   
	public $address;			/* broker address */
	public $port;				/* broker port */
	public $clientid;			/* client id sent to brocker */ 
	public $cafile;             /* ssl file */
    
    public $clean_session;      /* clean session */
	public $will;				/* stores the will of the client */
	private $username;			/* stores username */
	private $password;			/* stores password */
	private $socket; 			/* holds the socket	*/
	                  
	private $msgid = 1;			/* counter for message id */
    private $topics;
	private $qos;        

	public $keepalive = 10;		/* default keepalive timmer */
	public $last_ping_time;		/* host unix time, used to detect disconects */

	function __construct($address, $port, $clientid, $cafile = NULL) {     
		$this->address = $address;
		$this->port = $port;
		$this->clientid = $clientid;
		$this->cafile = $cafile;
	}

	function connect($clean_session = true, $will = NULL, $username = NULL, $password = NULL) {
        $this->clean_session = $clean_session;
		$this->will = $will;  
		$this->username = $username; 
		$this->password = $password; 

        //Open Internet or Unix domain socket connection
		if ($this->cafile) {
			$socketContext = stream_context_create(["ssl" => [
				"verify_peer_name" => false,
				"cafile" => $this->cafile]]);
			$this->socket = stream_socket_client("tls://" . $this->address . ":" . $this->port, $errno, $errstr, 60, STREAM_CLIENT_CONNECT, $socketContext);
		} else {
			$this->socket = stream_socket_client("tcp://" . $this->address . ":" . $this->port, $errno, $errstr, 60, STREAM_CLIENT_CONNECT);
		}

        //On success a stream resource is returned ,, FALSE on failure.
		if (!$this->socket) {
            Log::info("mqtt connect error : " . $errno . " " . $errstr, Log::INFO);
			return false;
		}

        //Set timeout period on a stream, return TRUE or FALSE
		stream_set_timeout($this->socket, 5);
        
        //Set blocking/non-blocking mode on a stream, FALSE->non-blocking, TRUE->blocking, return TRUE or FALSE
		stream_set_blocking($this->socket, 0);

		$buf_len = 0;
		$buffer = "";
 /* 
		//V3.1.0
		$buffer .= chr(0x00); $buf_len++;
		$buffer .= chr(0x06); $buf_len++;
		$buffer .= chr(0x4d); $buf_len++;
		$buffer .= chr(0x51); $buf_len++;
		$buffer .= chr(0x49); $buf_len++;
		$buffer .= chr(0x73); $buf_len++;
		$buffer .= chr(0x64); $buf_len++;
		$buffer .= chr(0x70); $buf_len++;
		$buffer .= chr(0x03); $buf_len++;
*/
		//V3.1.1  Protocol Name:length(2B)+MQTT Protocol Level:4  
		$buffer .= chr(0x00); $buf_len++;
		$buffer .= chr(0x04); $buf_len++;
		$buffer .= chr(0x4d); $buf_len++;
		$buffer .= chr(0x51); $buf_len++;
		$buffer .= chr(0x54); $buf_len++;
		$buffer .= chr(0x54); $buf_len++;
		$buffer .= chr(0x04); $buf_len++;

		//connect flags(reserved, clean session, will flag, will qos(2bit), will retain, password flag, username flat)
		$var = 0;
		if ($this->clean_session) $var += 2;                    //set clean session
		if ($this->will) {
			$var += 4;                                   // Set will flag
			$var += ($this->will['qos'] << 3);           //Set will qos
			if ($this->will['retain'])	$var += 32;       //Set will retain
		}
		if ($this->username) $var += 128;	        //Add username to header
		if ($this->password) $var += 64;	         //Add password to header

		$buffer .= chr($var); $buf_len++;

		//Keep alive (maxtime interval seconds, ping)
		$buffer .= chr($this->keepalive >> 8); $buf_len++;
		$buffer .= chr($this->keepalive & 0xff); $buf_len++;

        //clientid: length(2B) + clientid
		$buffer .= $this->concat_msb_lsb_str($this->clientid, $buf_len);

		//Adding will to payload
		if ($this->will) {
			$buffer .= $this->concat_msb_lsb_str($this->will['topic'], $buf_len);  
			$buffer .= $this->concat_msb_lsb_str($this->will['content'], $buf_len);
		}

        //username:length(2B) + username 
		if ($this->username) $buffer .= $this->concat_msb_lsb_str($this->username, $buf_len);  
        //password:length(2B) + password  
		if ($this->password) $buffer .= $this->concat_msb_lsb_str($this->password, $buf_len);    

		$head = "  ";
        //fixed header
		$head{0} = chr(0x10);
		$head{1} = chr($buf_len);

//echo date("Y-m-d H:i:s") . "\t" . "connect send head = " . FuncUtil::chrToHexstrShow($head, 0, 2) . "\n";
//echo date("Y-m-d H:i:s") . "\t" . "connect send head = " . $head . "\n";
		fwrite($this->socket, $head, 2);
        Log::info("mqtt connect send head: " . FuncUtil::chrToHexstrShow($head, 0, 2), Log::INFO);
		
//echo date("Y-m-d H:i:s") . "\t" . "connect send buffer = " . FuncUtil::chrToHexstrShow($buffer, 0, strlen($buffer)) . "\n";
//echo date("Y-m-d H:i:s") . "\t" . "connect send buffer = " . $buffer . "\n";
		fwrite($this->socket,  $buffer);
        Log::info("mqtt connect send buffer: " . FuncUtil::chrToHexstrShow($buffer, 0, strlen($buffer)), Log::INFO);

	 	$string = $this->read(4);
        Log::info("mqtt connect ack: " . FuncUtil::chrToHexstrShow($string, 0, 4), Log::INFO);
//echo date("Y-m-d H:i:s") . "\t" . "connect recv ack = " . $string . "\n";
//echo date("Y-m-d H:i:s") . "\t" . "connect recv ack = " . FuncUtil::chrToHexstrShow($string, 0, 4) . "\n";

        //fix header:0x2002
        //Connect Acknowledge Flags + Connect Return code(0:success)
		if (ord($string{0}) >> 4 == 2 && $string{3} == chr(0)) {
			Log::info("mqtt connect success", Log::INFO);
		} else {	
			Log::info("mqtt connect failed", Log::INFO);
			return false;
		}

        //start timing
		$this->last_ping_time = time();

		return true;
	}  

    function reconnect() {
        //Closes an open file pointer: return TRUE or FALSE
		fclose($this->socket);
        
        //Closes an open file pointer
    	while ($this->connect($this->clean_session, $this->will, $this->username, $this->password) == false) {
			sleep(10);
		}
        
        //re subscribe topics  
		if (count($this->topics)) {
			$this->subscribe($this->topics, $this->qos);
		}
    }

	/* subscribe: subscribes to topics */
	function subscribe($topics, $qos = 0) {
		$this->qos = $qos;
        $this->topics = $topics;
		
		$buf_len = 0;
		$buffer = "";
        
        //msg identifier
		$id = $this->msgid;
		$buffer .= chr($id >> 8);  $buf_len++;
		$buffer .= chr($id % 256);  $buf_len++;
        
        //multi loop:length(2B)+topic+qos
		foreach ($topics as $key => $topic) {
			$buffer .= $this->concat_msb_lsb_str($key, $buf_len);
			$buffer .= chr($qos);  $buf_len++;
		}

/*
		//V3.1
		$cmd = 0x80;
		//$qos
		$cmd += ($qos << 1);
*/
		//v3.1.1 fix header
		$cmd = 0x82;

		$head = chr($cmd);
		$head .= chr($buf_len);

//echo date("Y-m-d H:i:s") . "\t" . "subscribe send head = " . FuncUtil::chrToHexstrShow($head, 0, 2) . "\n";
//echo date("Y-m-d H:i:s") . "\t" . "subscribe send head = " . $head . "\n";
		fwrite($this->socket, $head, 2);
        Log::info("mqtt subscribe send head: " . FuncUtil::chrToHexstrShow($head, 0, 2), Log::INFO);
		
//echo date("Y-m-d H:i:s") . "\t" . "subscribe send buffer = " . FuncUtil::chrToHexstrShow($buffer, 0, $i) . "\n";
//echo date("Y-m-d H:i:s") . "\t" . "subscribe send buffer = " . $buffer . "\n";
		fwrite($this->socket, $buffer, $buf_len);
        Log::info("mqtt subscribe send buffer: " . FuncUtil::chrToHexstrShow($buffer, 0, $buf_len), Log::INFO);
		
        //fixheader: 0x90 + length(msgid)
		$string = $this->read(2);
        Log::info("mqtt subscribe send ack 0x90+length: " . FuncUtil::chrToHexstrShow($string, 0, 2), Log::INFO);
//echo date("Y-m-d H:i:s") . "\t" . "subscribe recv buffer = " . FuncUtil::chrToHexstrShow($string, 0, 2) . "\n";
//echo date("Y-m-d H:i:s") . "\t" . "subscribe recv buffer = " . $string . "\n";
		//if (!(ord($string{0}) == 0x90 && ord($string{1}) == 0x03)) {
		//	return false;
		//}
		
        //msgid+retcode(0,1,2:success 0x80:failure)	
		$bytes = ord(substr($string, 1, 1));
		$string = $this->read($bytes);
        Log::info("mqtt subscribe send ack msgid+retcode: " . FuncUtil::chrToHexstrShow($string, 0, $bytes), Log::INFO);
//echo date("Y-m-d H:i:s") . "\t" . "subscribe recv buffer = " . FuncUtil::chrToHexstrShow($string, 0, $bytes) . "\n";
//echo date("Y-m-d H:i:s") . "\t" . "subscribe recv buffer = " . $string . "\n";
	}

	/* publish: publishes $content on a $topic */
	function publish($topic, $content, $contentlen, $qos = 0, $retain = 0) {
		$buf_len = 0;
		$buffer = "";

        //topic:length(2B) + topic
		$buffer .= $this->concat_msb_lsb_str($topic, $buf_len);

        //qos=1/2: msg identifier
		if ($qos) {
			$id = $this->msgid++;
			$buffer .= chr($id >> 8);  $buf_len++;
		 	$buffer .= chr($id % 256);  $buf_len++;
		}

        //publish msg
		$buffer .= $content;
		$buf_len += $contentlen;

        //cmd(1B):0x30+dup+qos-h+qos-l+retain
		$head = " ";
		$cmd = 0x30;
		if ($qos) $cmd += $qos << 1;
		if ($retain) $cmd += 1;
        
        //fixheader:cmd + length
		$head{0} = chr($cmd);		
		$head .= $this->convert_len2str($buf_len);

        Log::info("mqtt send head = " . FuncUtil::chrToHexstrShow($head, 0, strlen($head)), Log::INFO);
		$ret = fwrite($this->socket, $head, strlen($head));
        if (!$ret) {
            Log::info("mqtt send head failed", Log::INFO);
            return $ret;
        }
		
        Log::info("mqtt send topic = " . $topic, Log::INFO);
        Log::info("mqtt send msg header = " . FuncUtil::chrToHexstrShow($content, 0, 6), Log::INFO);
        Log::info("mqtt send msg body = " . FuncUtil::chrToHexstrShow($content, 6, $contentlen - 6), Log::INFO);
		$ret = fwrite($this->socket, $buffer, $buf_len);   
        if (!$ret) {
            Log::info("mqtt send buffer failed", Log::INFO);
            return $ret;
        }
		
        //only QOS=1|2 puback
		if($qos) {
            //0x40+length+msg identifier
			$string = $this->read(4);
            Log::info("mqtt send ack " . FuncUtil::chrToHexstrShow($string, 0, 4), Log::INFO);
			if (!strlen($string)) {
				Log::info("mqtt send ack failed", Log::INFO);
				$ret = false;
			} else {
				if (ord($string{0}) == 0x40 && ord($string{1}) == 0x02 && (ord($string{2})*256 + ord($string{3}) == $id)) {
					$ret = true;
					Log::info("mqtt send ack success", Log::INFO);
				} else {
					Log::info("mqtt send ack failed", Log::INFO);
					$ret = false;
				}
			}
		}
	
        return $ret;
	}

	/* proc: the processing loop for an "allways on" client 
		set true when you are doing other stuff in the loop good for watching something else at the same time */	
	function proc( $loop = true) {
		$cmd = 0;
		
		//Use feof for checking if stream is open.
		if (feof($this->socket)) {
			Log::info("mqtt reconnect feof", Log::INFO);
            $this->reconnect();
		}
		
		$cmd_type = $this->read(1, true);
//echo date("Y-m-d H:i:s") . "\t" . "proc recv = " . strlen($cmd_type) . "\t" . $cmd_type . "\n";
//echo date("Y-m-d H:i:s") . "\t" . "proc recv = " . FuncUtil::chrToHexstrShow($cmd_type, 0, strlen($cmd_type)) . "\n";
		
		if (!strlen($cmd_type)) {
			if ($loop) {
				usleep(100000);
			}
		} else {
            //fixheader:cmd 
			$cmd = (int)(ord($cmd_type)/16);
//echo date("Y-m-d H:i:s") . "\t" . "proc fixheader cmd = " . $cmd . "\n";
//echo date("Y-m-d H:i:s") . "\t" . "proc fixheader = " . FuncUtil::chrToHexstrShow($cmd_type, 0, strlen($cmd_type)) . "\n";

            //fixheader:length
			$multiplier = 1; 
			$length = 0;
			do{
				$digit = ord($this->read(1));
				$length += ($digit & 127) * $multiplier; 
				$multiplier *= 128;
			} while (($digit & 128) != 0);
//echo date("Y-m-d H:i:s") . "\t" . "proc recv fixheader length = " . $length . "\n";
			
			if ($length){
				$string = $this->read($length);
			} 
//echo date("Y-m-d H:i:s") . "\t" . "proc recv content = " . $string . "\n";
//echo date("Y-m-d H:i:s") . "\t" . "proc recv content = " . FuncUtil::chrToHexstrShow($string, 0, $length) . "\n";
			
			if ($cmd) {
				switch ($cmd) {
					case 3:
						$this->call_message($string);
					break;
				}
				$this->last_ping_time = time();
			}
		}

		if ($this->last_ping_time < (time() - $this->keepalive )) {  
			//Log::info("not found something so ping", Log::INFO);
			$this->ping();	
		}
		
		if ($this->last_ping_time < (time() - ($this->keepalive * 2))) {
			Log::info("mqtt reconnect disconnect(long no ping)", Log::INFO);
            $this->reconnect();
		}

		return true;
	}
    
	/* message: processes a recieved topic */
	function call_message($msg) {
	 	$tlen = (ord($msg{0}) << 8) + ord($msg{1});
		$topic = substr($msg, 2, $tlen);
		if($this->qos){
			$msg_id = substr($msg, ($tlen+2), 2);
			$msg = substr($msg, ($tlen+4));
			
			$head_ack = chr(0x40);
			$head_ack .= chr(0x02);	
			$head_ack .= $msg_id;	
			
			Log::info("mqtt recv ack = " . FuncUtil::chrToHexstrShow($head_ack, 0, strlen($head_ack)), Log::INFO);      
			$ret = fwrite($this->socket, $head_ack, strlen($head_ack));	
		}else{
			$msg = substr($msg, ($tlen+2));
		}

        Log::info("mqtt recv: " . date("r"), Log::INFO);
        Log::info("mqtt recv topic = " . $topic, Log::INFO);        
        Log::info("mqtt recv header = " . FuncUtil::chrToHexstrShow($msg, 0, 6), Log::INFO);
        Log::info("mqtt recv body = " . FuncUtil::chrToHexstrShow($msg, 6, FuncUtil::BEChrToInt($msg, 4, 2)), Log::INFO);	
	
						
		$found = false;
		foreach ($this->topics as $key => $top) { 
			$key = str_replace('$queue/', "", $key); 
            //Perform a regular expression match
			if ( preg_match("/^" . str_replace("#", ".*",
					str_replace("+", "[^\/]*",
						str_replace("/", "\/",
							str_replace("$", '\$', $key)))) . "$/", $topic) ) {
                //Verify that the contents of a variable can be called as a function
				if (is_callable($top['function'])) {
                    //Call the callback given by the first parameter
					call_user_func($top['function'], $topic, $msg);
					$found = true;
				}
			}
		}      

		//if (!$found) Log::info("msg recieved but no match in subscriptions", Log::INFO);   
	}
    
	/* read: reads in so many bytes */
	function read($len = 8192, $nb = false) {
		$string="";
		$left_over = $len;
		
		if ($nb) {
			return fread($this->socket, $left_over);
		}
			
		while (!feof($this->socket) && $left_over > 0) {
			$fread = fread($this->socket, $left_over);
			$string .= $fread;
			$left_over = $len - strlen($string);
		}
	
	   return $string;
	}

	/* ping: sends a keep alive ping */
	function ping() {
        //fixheader: 0xc0(response 0xd0) 
		$head = " ";
		$head = chr(0xc0);		
		$head .= chr(0x00);
		fwrite($this->socket, $head, 2);
		//Log::info("ping send", Log::INFO); 
	}

	/* disconnect: sends a proper disconect cmd */
	function disconnect() {
        //fixheader: 0xe0
		$head = " ";
		$head{0} = chr(0xe0);		
		$head{1} = chr(0x00);
		fwrite($this->socket, $head, 2); 
		Log::info("mqtt disconnect", Log::INFO); 
	}

	/* close: sends a proper disconect, then closes the socket */
	function close() {
		Log::info("mqtt close", Log::INFO); 
	 	$this->disconnect();
		stream_socket_shutdown($this->socket, STREAM_SHUT_WR);	
		Log::info("mqtt stream_socket_shutdown", Log::INFO); 
	}
    
	/* convert_len2str: */
	function convert_len2str($len) {
		$string = "";
		do{
            $digit = $len % 128;
            $len = $len >> 7;
            // if there are more digits to encode, set the top bit of this digit
            if ($len > 0)
                $digit = ($digit | 0x80);
            $string .= chr($digit);
		} while ($len > 0);
		return $string;
	}

	/* concat_msb_lsb_str: writes a string to a buffer */
	function concat_msb_lsb_str($str, &$val) {
		$ret = " ";
		$len = strlen($str);
		$msb = $len >> 8;
		$lsb = $len % 256;
		$ret = chr($msb);
		$ret .= chr($lsb);
		$ret .= $str;
		$val += ($len+2);
		return $ret;
	}
}

二、发送消息

支持发送失败后的重连重发

<?php
header("Content-type: text/html; charset=utf-8");
include_once '/var/www/html/api/com/com_inc.php';

class MQTTPublishBinary {  
	private $mqtt = null;
    
	private static $singleinstance;
	public static function getIntance() {
		if (!(self::$singleinstance instanceof self)) {
			self::$singleinstance = new self();
		}
		return self::$singleinstance;
	}
	private function __construct() {}
    private function __clone() {} 
    
	public function exec($topic, $data, $datalen) {
		try {
        				
            if(!$this->mqtt){
                if(!$this->connect()){ 
			         return false;
                }
            }
			
            $send_ret = $this->mqtt->publish($topic, $data, $datalen, 1);
            if (!$send_ret) {
                Log::info("mqtt publish send failed one", Log::INFO);
                $this->disconnect();
                if (!$this->connect()) { 
			         return false;
                }               
                $send_ret = $this->mqtt->publish($topic, $data, $datalen, 1);
				if (!$send_ret) {	
                    Log::info("mqtt publish send failed two", Log::INFO);
                }
            }
            
		} catch (Exception $e) {
			Log::info("exec Exception: " . $e->getMessage(), Log::EXCEPT);
			return false;
		}
	}
    
    public function disconnect() {   
		try {
            $this->mqtt->close();    
            $this->mqtt = null;		
		} catch (Exception $e) {
			Log::info("exec Exception: " . $e->getMessage(), Log::EXCEPT);
			return false;
		}
    }
    
    public function connect() {
		try {				
			//$this->mqtt = new PhpMQTT(Config::getMQTTServer(), Config::s_mqtt_port_ssl, FuncUtil::getEmqttClientID(), Config::s_mqtt_cafile);  
			$this->mqtt = new PhpMQTT(Config::getMQTTServer(), Config::s_mqtt_port, FuncUtil::getEmqttClientID());  
			if ($this->mqtt->connect(true, NULL, Config::s_mqtt_username, Config::s_mqtt_password)) { 	
				return true;
			} else {                    	
				return false;
			}		
		} catch (Exception $e) {
			Log::info("exec Exception: " . $e->getMessage(), Log::EXCEPT);
			return false;
		}
    }
}
?>

三、消息接收

<?php
header("Content-type: text/html; charset=utf-8");
include_once '/var/www/html/api/com/com_inc.php';

$total_elp = 0;

class MQTTSubscribeBinary {
	private $mqtt = null;
	private static $singleinstance;
	public static function getIntance() {
		if (!(self::$singleinstance instanceof self)) {
			self::$singleinstance = new self();
		}
		return self::$singleinstance;
	}
	private function __construct() {
        set_time_limit(0);
    }
    private function __clone() {} 
    
	public function exec($topic) {
    	try {
			Log::info("MQTTSubscribeBinary::exec", Log::INFO);
			//$this->mqtt = new PhpMQTT(Config::getMQTTServer(), Config::s_mqtt_port_ssl, $server_cfg['client_id'], Config::s_mqtt_cafile);
			$this->mqtt = new PhpMQTT(Config::getMQTTServer(), Config::s_mqtt_port, FuncUtil::getEmqttClientID());
            
			if (!$this->mqtt->connect(true, NULL, Config::s_mqtt_username, Config::s_mqtt_password)) {      
                Log::info("MQTTSubscribeBinary connect failed ", Log::INFO);
				exit(1);
			}            
            
			$_topics[$topic] = array("function" => "proc_msg");
			$this->mqtt->subscribe($_topics, 0);
            
			while ($this->mqtt->proc()) {
			}
		} catch (Exception $e) {
            Log::info("exec Exception: " . $e->getMessage(), Log::EXCEPT);
		} finally {
			$this->close();
			sleep(10);
			$this->exec($topic);
		}
	}
    
	private function close() {
		Log::info("MQTTSubscribeBinary::close", Log::INFO);
		try {
			if ($this->mqtt != null) {
				$this->mqtt->close(); 
				$this->mqtt = null;
			}
		} catch (Exception $e) {
            Log::info("close Exception: " . $e->getMessage(), Log::EXCEPT);
		}
	}
}

function proc_msg($topic, $msg) {
	try {

        $start = microtime(true);   
          
		switch (FuncUtil::BEChrToInt($msg, 2, 2)) {         
			default:
		  		break;
		}

        $elp = (microtime(true)-$start)*1000;
        global $total_elp;
        $total_elp = $total_elp + $elp; 
        //echo date("Y-m-d H:i:s") . "\t" . posix_getpid() . "\ttotal_elp = " . $total_elp . "\t elp = " . $elp . "\t" . $topic . "\n";  
        
        
        
	} catch (Exception $e) {
        Log::info("procmsg Exception: " . $e->getMessage(), Log::EXCEPT);
		return false;
	}
	return true;
}  
?>

四、多进程接收消息

<?php
header("Content-type: text/html; charset=utf-8");
include_once '/var/www/html/api/com/com_inc.php';
                                         
MQTTStartupService::getIntance()->exec(); 
class MQTTStartupService {
	private static $singleinstance;
	public static function getIntance() {
		if (!(self::$singleinstance instanceof self)) {
			self::$singleinstance = new self();
		}
		return self::$singleinstance;
	}
	private function __construct() {}
    private function __clone() {} 
    
	public function exec() {
		try {                  
            $server_cfg = array(
                array("topic" => '$queue//ibc/+/send', "proc_num" => Config::getIBCProcNum()),
                array("topic" => '$queue//ble/+/send', "proc_num" => Config::getBLEProcNum())
            );            
            
            pcntl_signal(SIGCHLD, SIG_IGN);

            $_arr_child = array();
            for ($_cnt = 0; $_cnt < count($server_cfg); $_cnt++) { 
                $_pid = pcntl_fork();
                if ($_pid < 0) {
                    Log::info("fork error : {$_cnt}", Log::INFO);
                    exit;
                } elseif ($_pid) { 
                    $_arr_child[$_cnt] = $_pid;
                } else {                                                                                                              
                    Log::info("//////////////////////////////// topic=" . $server_cfg[$_cnt]['topic'] . " ////////////////////////////////", Log::INFO);  
                    Log::info("process no = " . $_cnt . ":\t process_id = " . posix_getpid(), Log::INFO);  
                    
                    $this->startupService($server_cfg[$_cnt]['topic'],  $server_cfg[$_cnt]['proc_num']);                        
                }
                sleep(1) ;
            }
            
            for ($_idx = 0; $_idx < count($_arr_child); $_idx++) {
                if ($_arr_child[$_idx]) {
                    pcntl_waitpid($_arr_child[$_idx], $status);
                }
            }            
            
		} catch (Exception $e) {
            Log::info("exec Exception: " . $e->getMessage(), Log::EXCEPT);
		}
	}
	
    private function startupService($topic, $proc_num) {
    	try {
            pcntl_signal(SIGCHLD, SIG_IGN);

            $_arr_child = array();
            for ($_cnt = 0; $_cnt < $proc_num; $_cnt++) { 
                $_pid = pcntl_fork();
                if ($_pid < 0) {
                    Log::info("fork error : {$_cnt}", Log::INFO);
                    exit;
                } elseif ($_pid) { 
                    $_arr_child[$_cnt] = $_pid;
                } else {                                                                                               
                    Log::info("topic = " . $topic . "\tprocess no = " . $_cnt . ":\t process_id = " . posix_getpid(), Log::INFO);     
                    echo "topic = " . $topic . "\tprocess no = " . $_cnt . ":\t process_id = " . posix_getpid() . "\n";   
                    
                    try {
                        MQTTSubscribeBinary::getIntance()->exec($topic);                        
                    } catch (Exception $e) {
                    } finally {            
                        exit(0);
                    } 
                }
                usleep(100000) ;
            }
            
            for ($_idx = 0; $_idx < count($_arr_child); $_idx++) {
                if ($_arr_child[$_idx]) {
                    pcntl_waitpid($_arr_child[$_idx], $status);
                }
            }
            
    	} catch (Exception $e) {
            Log::info("startupService Exception: " . $e->getMessage(), Log::EXCEPT);
    		return false;
    	}
    	return true;
    }
}


?>

发表回复