PHP下Kafka安装及使用(CentOS)

简介:

Centso上安装Kafka
Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。

安装环境及版本:

系统环境:CentOS Linux release 7.5.1804 (Core)
版本:librdkafka-master.zip(https://github.com/edenhill/librdkafka)
版本:php-rdkafka-master.zip(https://github.com/arnaud-lb/php-rdkafka)

一、预备

系统已经安装了PHP

二、安装

2.1 安装依赖库

后续编译和安装需要
命令:yum install gcc
命令:yum install gcc-c++

[root@localhost ~]# yum install gcc
[root@localhost ~]# yum install gcc-c++

2.2 安装:librdkafka-master.zip

解压缩:[root@localhost ~]# unzip librdkafka-master.zip
进入解压缩目录:cd librdkafka-master

[root@localhost ~]# cd librdkafka-master
[root@localhost librdkafka-master]# ll
total 464
-rw-r--r--  1 root root   5759 Jul  2 01:59 CHANGELOG.md
-rw-r--r--  1 root root   6261 Jul  2 01:59 CMakeLists.txt
-rw-r--r--  1 root root   3216 Jul  2 01:59 CODE_OF_CONDUCT.md
-rw-r--r--  1 root root  50583 Jul  2 01:59 CONFIGURATION.md
-rwxr-xr-x  1 root root   4664 Jul  2 01:59 configure
-rw-r--r--  1 root root   9496 Jul  2 01:59 configure.self
-rw-r--r--  1 root root   8521 Jul  2 01:59 CONTRIBUTING.md
drwxr-xr-x  3 root root    311 Jul  2 01:59 debian
-rwxr-xr-x  1 root root   3201 Jul  2 01:59 dev-conf.sh
-rw-r--r--  1 root root 103804 Jul  2 01:59 Doxyfile
drwxr-xr-x  2 root root   4096 Jul  2 01:59 examples
-rw-r--r--  1 root root 101654 Jul  2 01:59 INTRODUCTION.md
-rwxr-xr-x  1 root root   2373 Jul  2 01:59 lds-gen.py
-rw-r--r--  1 root root   1350 Jul  2 01:59 LICENSE
-rw-r--r--  1 root root   1134 Jul  2 01:59 LICENSE.crc32c
-rw-r--r--  1 root root    695 Jul  2 01:59 LICENSE.fnv1a
-rw-r--r--  1 root root   1343 Jul  2 01:59 LICENSE.hdrhistogram
-rw-r--r--  1 root root   1413 Jul  2 01:59 LICENSE.lz4
-rw-r--r--  1 root root   1334 Jul  2 01:59 LICENSE.murmur2
-rw-r--r--  1 root root   1271 Jul  2 01:59 LICENSE.pycrc
-rw-r--r--  1 root root   1658 Jul  2 01:59 LICENSE.queue
-rw-r--r--  1 root root    326 Jul  2 01:59 LICENSE.regexp
-rw-r--r--  1 root root   1918 Jul  2 01:59 LICENSE.snappy
-rw-r--r--  1 root root  17034 Jul  2 01:59 LICENSES.txt
-rw-r--r--  1 root root   1032 Jul  2 01:59 LICENSE.tinycthread
-rw-r--r--  1 root root   2598 Jul  2 01:59 LICENSE.wingetopt
-rw-r--r--  1 root root   1290 Jul  2 01:59 mainpage.doxy
-rwxr-xr-x  1 root root   2915 Jul  2 01:59 Makefile
drwxr-xr-x  3 root root     60 Jul  2 01:59 mklove
drwxr-xr-x 12 root root    184 Jul  2 01:59 packaging
-rw-r--r--  1 root root   7987 Jul  2 01:59 README.md
-rw-r--r--  1 root root    912 Jul  2 01:59 README.win32
drwxr-xr-x  2 root root   8192 Jul  2 01:59 src
drwxr-xr-x  2 root root   4096 Jul  2 01:59 src-cpp
-rw-r--r--  1 root root  20481 Jul  2 01:59 STATISTICS.md
drwxr-xr-x  6 root root   8192 Jul  2 01:59 tests
drwxr-xr-x 10 root root   4096 Jul  2 01:59 win32

检查:[root@localhost librdkafka-master]# ./configure

编译和安装:make && make install

2.3 安装:php-rdkafka-master.zip

解压缩:[root@localhost ~]# unzip php-rdkafka-master.zip
进入解压缩目录:cd php-rdkafka-master

[root@localhost ~]# cd php-rdkafka-master
[root@localhost php-rdkafka-master]# ll
total 320
-rw-r--r-- 1 root root  1546 Jun 26 15:33 compat.c
-rw-r--r-- 1 root root  1683 Jun 26 15:33 compat.h
-rw-r--r-- 1 root root 25484 Jun 26 15:33 conf.c
-rw-r--r-- 1 root root  2639 Jun 26 15:33 conf.h
-rw-r--r-- 1 root root  2450 Jun 26 15:33 config.m4
-rw-r--r-- 1 root root   743 Jun 26 15:33 config.w32
-rw-r--r-- 1 root root   638 Jun 26 15:33 CONTRIBUTING.md
-rw-r--r-- 1 root root    24 Jun 26 15:33 CREDITS
drwxr-xr-x 2 root root    23 Jun 26 15:33 examples
-rw-r--r-- 1 root root  5248 Jun 26 15:33 fun.c
-rw-r--r-- 1 root root  1186 Jun 26 15:33 fun.h
-rw-r--r-- 1 root root 26396 Jun 26 15:33 kafka_consumer.c
-rw-r--r-- 1 root root  1175 Jun 26 15:33 kafka_consumer.h
-rw-r--r-- 1 root root  1082 Jun 26 15:33 LICENSE
-rw-r--r-- 1 root root  6476 Jun 26 15:33 message.c
-rw-r--r-- 1 root root  1408 Jun 26 15:33 message.h
-rw-r--r-- 1 root root  6186 Jun 26 15:33 metadata_broker.c
-rw-r--r-- 1 root root  1285 Jun 26 15:33 metadata_broker.h
-rw-r--r-- 1 root root  7900 Jun 26 15:33 metadata.c
-rw-r--r-- 1 root root  8576 Jun 26 15:33 metadata_collection.c
-rw-r--r-- 1 root root  1476 Jun 26 15:33 metadata_collection.h
-rw-r--r-- 1 root root  1262 Jun 26 15:33 metadata.h
-rw-r--r-- 1 root root  8062 Jun 26 15:33 metadata_partition.c
-rw-r--r-- 1 root root  1294 Jun 26 15:33 metadata_partition.h
-rw-r--r-- 1 root root  6776 Jun 26 15:33 metadata_topic.c
-rw-r--r-- 1 root root  1282 Jun 26 15:33 metadata_topic.h
-rw-r--r-- 1 root root 15604 Jun 26 15:33 package.xml
-rw-r--r-- 1 root root  2249 Jun 26 15:33 php_rdkafka.h
-rw-r--r-- 1 root root  8203 Jun 26 15:33 php_rdkafka_priv.h
-rw-r--r-- 1 root root  4651 Jun 26 15:33 queue.c
-rw-r--r-- 1 root root  1522 Jun 26 15:33 queue.h
-rw-r--r-- 1 root root 27523 Jun 26 15:33 rdkafka.c
-rw-r--r-- 1 root root 12747 Jun 26 15:33 README.md
drwxr-xr-x 2 root root  4096 Jun 26 15:33 tests
-rw-r--r-- 1 root root 22660 Jun 26 15:33 topic.c
-rw-r--r-- 1 root root  1699 Jun 26 15:33 topic.h
-rw-r--r-- 1 root root 11802 Jun 26 15:33 topic_partition.c
-rw-r--r-- 1 root root  1906 Jun 26 15:33 topic_partition.h
-rw-r--r-- 1 root root  1682 Jun 26 15:33 zeval.h

生成config模块:phpize

[root@localhost php-rdkafka-master]# phpize
Configuring for:
PHP Api Version:         20160303
Zend Module Api No:      20160303
Zend Extension Api No:   320160303

检测:[root@localhost php-rdkafka-master]# ./configure –with-php-config=/usr/bin/php-config

编译、安装:[root@localhost php-rdkafka-master]# make && make install

2.4 PHP引用

修改PHP配置文件,增加:
[root@localhost php-rdkafka-master]# vi /etc/php.ini

extension=/usr/lib64/php/modules/rdkafka.so

重启PHP:[root@localhost php-rdkafka-master]# systemctl restart php-fpm

三、使用

$ret = TXCKafka::getIntance()->produce($_aucOrgID, $this->rcv_devId, $_aucBDA, $_cRSSI);

class TXCKafka {
	private static $singleinstance = false;
	public static function getIntance() {
		if (self::$singleinstance == false) {
			self::$singleinstance = new self;
		}
		return self::$singleinstance;
	}
	function __construct() {}
    
	public $result;
	public $rk_producer;
	public $topic;
	public function connect(){
		try { 
			Log::info("TXCKafka::connect", Log::INFO);
			$ret = false;
			$conf = new RdKafka\Conf();
			$conf->setDrMsgCb(function ($kafka, $message) use (&$ret){
				$this->result = $message->err == 0 ? true : false;
			});

			// 设置授权用户和密码
			$conf->set('security.protocol', 'SASL_PLAINTEXT');
			$conf->set('sasl.mechanism', 'PLAIN');
			$conf->set('sasl.username', Config::s_kafka_username);
			$conf->set('sasl.password', Config::s_kafka_password);

			$this->rk_producer = new RdKafka\Producer($conf);
			$this->rk_producer->setLogLevel(LOG_DEBUG);
			$this->rk_producer->addBrokers(Config::s_kafka_brokers);

			$topicConf = new RdKafka\TopicConf();
			$topicConf->set('message.timeout.ms', 3000);
			$this->topic = $this->rk_producer->newTopic('location', $topicConf);
		} catch (Exception $e) {
			Log::info("TXCKafka::connect Exception: " . $e->getMessage(), Log::EXCEPT);
			return false;
		}
	}
	
	public function produce($org_id, $gateway_id, $mac, $rssi){	
		try { 
			if (!$this->topic) $this->connect();

			if ($this->send($org_id, $gateway_id, $mac, $rssi)) {
				return true;
			}
			
			$this->connect();
			if ($this->send($org_id, $gateway_id, $mac, $rssi)) {
				return true;
			}
			
			return false;
		} catch (Exception $e) {
			Log::info("TXCKafka::produce Exception: " . $e->getMessage(), Log::EXCEPT);
			return false;
		}
	}
    
    public function send($org_id, $gateway_id, $mac, $rssi) { 
		try { 
			$msg = ['org_id' => $org_id, 
					'gateway_id' => $gateway_id, 
					'mac' => $mac, 
					'rssi' => $rssi, 
					'location_time' => date("YmdHis")];	
					
			$this->topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($msg));
				
			while($this->rk_producer->getOutQLen() > 0){
				$this->rk_producer->poll(10);
			}
			
			
			Log::info("TXCKafka::produce = " . json_encode($msg), Log::INFO);
			if ($this->result) {
				Log::info("TXCKafka::produce success", Log::INFO);
			} else {
				Log::info("TXCKafka::produce failed", Log::INFO);
			}
			
			return $this->result;
		} catch (Exception $e) {
			Log::info("TXCKafka::send Exception: " . $e->getMessage(), Log::EXCEPT);
			return false;
		}
    }
}  

发表回复