PHP进程间通讯-消息队列

1、基本操作

生成一个消息队列的key

$msg_key = ftok(__FILE__, 'a');

产生一个消息队列

$msg_queue = msg_get_queue($msg_key, 0666);
var_dump($msg_queue);
resource(116) of type (sysvmsg queue)
bool(true)

检测一个队列是否存在 ,返回boolean值

$status = msg_queue_exists($msg_key);
var_dump($status);
bool(true)

查看当前队列的一些详细信息

$message_queue_status =  msg_stat_queue($msg_queue);
var_dump($message_queue_status);
array(10) {
  ["msg_perm.uid"]=>
  int(48)
  ["msg_perm.gid"]=>
  int(48)
  ["msg_perm.mode"]=>
  int(438)
  ["msg_stime"]=>
  int(0)
  ["msg_rtime"]=>
  int(0)
  ["msg_ctime"]=>
  int(1594350967)
  ["msg_qnum"]=>
  int(0)
  ["msg_qbytes"]=>
  int(65536)
  ["msg_lspid"]=>
  int(0)
  ["msg_lrpid"]=>
  int(0)
}

将一条消息加入消息队列

/**
 * msg_send 有三个必选参数
 * resource $queue ,
 * int $msgtype ,
 * mixed $message
 *
 * 第一个必须要是队列资源类型。resource(4) of type (sysvmsg queue)
 * 第二个参数是消息类型,一个整形,且必须大于0.
 * msg_send() sends a message of type msgtype (which MUST be greater than 0) to the message queue specified by queue.
 * 第三个参数。是要发送的信息。可以是字符串,也可以是数组。默认会被serialize.
 */
$ret = msg_send($msg_queue, 1, "Hello world chp");   
var_dump($ret);
bool(true)

从消息队列中读取一条消息

/**
 * msg_receive 的参数比较多。必须要填的参数有5个。
 * resource $queue ,
 * int $desiredmsgtype ,
 * int &$msgtype ,
 * int $maxsize ,
 * mixed &$message
 *
 * 其中$desiredmsgtype .经过测试和官网描述不符,暂不解释。
 *
 * $msgtype 。这个是msg_send 中所选定的msg_type.这是一个引用参数。
 * The type of the message that was received will be stored in this parameter.
 *
 * $maxsize。
 * The maximum size of message to be accepted is specified by the maxsize;
 * if the message in the queue is larger than this size the function will fail (unless you set flags as described below).
 * 这个参数声明的是一个最大的消息大小,如果超过则会报错。
 *
 * $message.
 * 上文msg_send 发送的消息类型。
 */
msg_receive($msg_queue, 1, $message_type, 1024, $message1);    
var_dump($message_type);         
var_dump($message1);
int(1)
string(15) "Hello world chp"

移除消息队列

msg_remove_queue($msg_queue);

2、多进程通讯

两个进程,一个发,一个收

$proc_num = 2;
$crc_num = 10;
$intval_sleep = 1;   

set_time_limit(0);
$ftok = ftok(__FILE__, 'a');
$msg_queue = msg_get_queue($ftok);


pcntl_signal(SIGCHLD, SIG_IGN);

$_arr_child = array();
for ($_cnt = 0; $_cnt < $proc_num; $_cnt++) { 
    $_pid = pcntl_fork();
    if ($_pid < 0) {
        Log::info("fork error : {$_cnt}", Log::INFO);
        exit;
    } elseif ($_pid) { 
        $_arr_child[$_cnt] = $_pid;
    } else {           
            for ($idx = 0; $idx < $crc_num; $idx++) {                                                                        
                if ($_cnt == 0) {                          
                    msg_send($msg_queue,1, "hello world chp" . $idx);
                } else {
                    msg_receive($msg_queue, 0, $msg_type, 1024, $message);
                    echo "msg_type = " . $msg_type . "\tcontent = " . $message . "\n";
                }   
            }                  
    }
    sleep($intval_sleep) ;
}

for ($_idx = 0; $_idx < $proc_num; $_idx++) {
    if ($_arr_child[$_idx]) {
        pcntl_waitpid($_arr_child[$_idx], $status);
    }
} 

msg_remove_queue($msg_queue);

执行结果如下:

msg_type = 1	content = hello world chp0
msg_type = 1	content = hello world chp1
msg_type = 1	content = hello world chp2
msg_type = 1	content = hello world chp3
msg_type = 1	content = hello world chp4
msg_type = 1	content = hello world chp5
msg_type = 1	content = hello world chp6
msg_type = 1	content = hello world chp7
msg_type = 1	content = hello world chp8
msg_type = 1	content = hello world chp9

分类:

发表回复