1、基本操作
生成一个消息队列的key
$msg_key = ftok(__FILE__, 'a');
产生一个消息队列
$msg_queue = msg_get_queue($msg_key, 0666);
var_dump($msg_queue);
resource(116) of type (sysvmsg queue)
bool(true)
检测一个队列是否存在 ,返回boolean值
$status = msg_queue_exists($msg_key);
var_dump($status);
bool(true)
查看当前队列的一些详细信息
$message_queue_status = msg_stat_queue($msg_queue);
var_dump($message_queue_status);
array(10) {
["msg_perm.uid"]=>
int(48)
["msg_perm.gid"]=>
int(48)
["msg_perm.mode"]=>
int(438)
["msg_stime"]=>
int(0)
["msg_rtime"]=>
int(0)
["msg_ctime"]=>
int(1594350967)
["msg_qnum"]=>
int(0)
["msg_qbytes"]=>
int(65536)
["msg_lspid"]=>
int(0)
["msg_lrpid"]=>
int(0)
}
将一条消息加入消息队列
/**
* msg_send 有三个必选参数
* resource $queue ,
* int $msgtype ,
* mixed $message
*
* 第一个必须要是队列资源类型。resource(4) of type (sysvmsg queue)
* 第二个参数是消息类型,一个整形,且必须大于0.
* msg_send() sends a message of type msgtype (which MUST be greater than 0) to the message queue specified by queue.
* 第三个参数。是要发送的信息。可以是字符串,也可以是数组。默认会被serialize.
*/
$ret = msg_send($msg_queue, 1, "Hello world chp");
var_dump($ret);
bool(true)
从消息队列中读取一条消息
/**
* msg_receive 的参数比较多。必须要填的参数有5个。
* resource $queue ,
* int $desiredmsgtype ,
* int &$msgtype ,
* int $maxsize ,
* mixed &$message
*
* 其中$desiredmsgtype .经过测试和官网描述不符,暂不解释。
*
* $msgtype 。这个是msg_send 中所选定的msg_type.这是一个引用参数。
* The type of the message that was received will be stored in this parameter.
*
* $maxsize。
* The maximum size of message to be accepted is specified by the maxsize;
* if the message in the queue is larger than this size the function will fail (unless you set flags as described below).
* 这个参数声明的是一个最大的消息大小,如果超过则会报错。
*
* $message.
* 上文msg_send 发送的消息类型。
*/
msg_receive($msg_queue, 1, $message_type, 1024, $message1);
var_dump($message_type);
var_dump($message1);
int(1)
string(15) "Hello world chp"
移除消息队列
msg_remove_queue($msg_queue);
2、多进程通讯
两个进程,一个发,一个收
$proc_num = 2;
$crc_num = 10;
$intval_sleep = 1;
set_time_limit(0);
$ftok = ftok(__FILE__, 'a');
$msg_queue = msg_get_queue($ftok);
pcntl_signal(SIGCHLD, SIG_IGN);
$_arr_child = array();
for ($_cnt = 0; $_cnt < $proc_num; $_cnt++) {
$_pid = pcntl_fork();
if ($_pid < 0) {
Log::info("fork error : {$_cnt}", Log::INFO);
exit;
} elseif ($_pid) {
$_arr_child[$_cnt] = $_pid;
} else {
for ($idx = 0; $idx < $crc_num; $idx++) {
if ($_cnt == 0) {
msg_send($msg_queue,1, "hello world chp" . $idx);
} else {
msg_receive($msg_queue, 0, $msg_type, 1024, $message);
echo "msg_type = " . $msg_type . "\tcontent = " . $message . "\n";
}
}
}
sleep($intval_sleep) ;
}
for ($_idx = 0; $_idx < $proc_num; $_idx++) {
if ($_arr_child[$_idx]) {
pcntl_waitpid($_arr_child[$_idx], $status);
}
}
msg_remove_queue($msg_queue);
执行结果如下:
msg_type = 1 content = hello world chp0
msg_type = 1 content = hello world chp1
msg_type = 1 content = hello world chp2
msg_type = 1 content = hello world chp3
msg_type = 1 content = hello world chp4
msg_type = 1 content = hello world chp5
msg_type = 1 content = hello world chp6
msg_type = 1 content = hello world chp7
msg_type = 1 content = hello world chp8
msg_type = 1 content = hello world chp9