简介:
Centso上安装Kafka
Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。
安装环境及版本:
系统环境:CentOS Linux release 7.5.1804 (Core)
版本:librdkafka-master.zip(https://github.com/edenhill/librdkafka)
版本:php-rdkafka-master.zip(https://github.com/arnaud-lb/php-rdkafka)
一、预备
系统已经安装了PHP
二、安装
2.1 安装依赖库
后续编译和安装需要
命令:yum install gcc
命令:yum install gcc-c++
[root@localhost ~]# yum install gcc
[root@localhost ~]# yum install gcc-c++
2.2 安装:librdkafka-master.zip
解压缩:[root@localhost ~]# unzip librdkafka-master.zip
进入解压缩目录:cd librdkafka-master
[root@localhost ~]# cd librdkafka-master
[root@localhost librdkafka-master]# ll
total 464
-rw-r--r-- 1 root root 5759 Jul 2 01:59 CHANGELOG.md
-rw-r--r-- 1 root root 6261 Jul 2 01:59 CMakeLists.txt
-rw-r--r-- 1 root root 3216 Jul 2 01:59 CODE_OF_CONDUCT.md
-rw-r--r-- 1 root root 50583 Jul 2 01:59 CONFIGURATION.md
-rwxr-xr-x 1 root root 4664 Jul 2 01:59 configure
-rw-r--r-- 1 root root 9496 Jul 2 01:59 configure.self
-rw-r--r-- 1 root root 8521 Jul 2 01:59 CONTRIBUTING.md
drwxr-xr-x 3 root root 311 Jul 2 01:59 debian
-rwxr-xr-x 1 root root 3201 Jul 2 01:59 dev-conf.sh
-rw-r--r-- 1 root root 103804 Jul 2 01:59 Doxyfile
drwxr-xr-x 2 root root 4096 Jul 2 01:59 examples
-rw-r--r-- 1 root root 101654 Jul 2 01:59 INTRODUCTION.md
-rwxr-xr-x 1 root root 2373 Jul 2 01:59 lds-gen.py
-rw-r--r-- 1 root root 1350 Jul 2 01:59 LICENSE
-rw-r--r-- 1 root root 1134 Jul 2 01:59 LICENSE.crc32c
-rw-r--r-- 1 root root 695 Jul 2 01:59 LICENSE.fnv1a
-rw-r--r-- 1 root root 1343 Jul 2 01:59 LICENSE.hdrhistogram
-rw-r--r-- 1 root root 1413 Jul 2 01:59 LICENSE.lz4
-rw-r--r-- 1 root root 1334 Jul 2 01:59 LICENSE.murmur2
-rw-r--r-- 1 root root 1271 Jul 2 01:59 LICENSE.pycrc
-rw-r--r-- 1 root root 1658 Jul 2 01:59 LICENSE.queue
-rw-r--r-- 1 root root 326 Jul 2 01:59 LICENSE.regexp
-rw-r--r-- 1 root root 1918 Jul 2 01:59 LICENSE.snappy
-rw-r--r-- 1 root root 17034 Jul 2 01:59 LICENSES.txt
-rw-r--r-- 1 root root 1032 Jul 2 01:59 LICENSE.tinycthread
-rw-r--r-- 1 root root 2598 Jul 2 01:59 LICENSE.wingetopt
-rw-r--r-- 1 root root 1290 Jul 2 01:59 mainpage.doxy
-rwxr-xr-x 1 root root 2915 Jul 2 01:59 Makefile
drwxr-xr-x 3 root root 60 Jul 2 01:59 mklove
drwxr-xr-x 12 root root 184 Jul 2 01:59 packaging
-rw-r--r-- 1 root root 7987 Jul 2 01:59 README.md
-rw-r--r-- 1 root root 912 Jul 2 01:59 README.win32
drwxr-xr-x 2 root root 8192 Jul 2 01:59 src
drwxr-xr-x 2 root root 4096 Jul 2 01:59 src-cpp
-rw-r--r-- 1 root root 20481 Jul 2 01:59 STATISTICS.md
drwxr-xr-x 6 root root 8192 Jul 2 01:59 tests
drwxr-xr-x 10 root root 4096 Jul 2 01:59 win32
检查:[root@localhost librdkafka-master]# ./configure
编译和安装:make && make install
2.3 安装:php-rdkafka-master.zip
解压缩:[root@localhost ~]# unzip php-rdkafka-master.zip
进入解压缩目录:cd php-rdkafka-master
[root@localhost ~]# cd php-rdkafka-master
[root@localhost php-rdkafka-master]# ll
total 320
-rw-r--r-- 1 root root 1546 Jun 26 15:33 compat.c
-rw-r--r-- 1 root root 1683 Jun 26 15:33 compat.h
-rw-r--r-- 1 root root 25484 Jun 26 15:33 conf.c
-rw-r--r-- 1 root root 2639 Jun 26 15:33 conf.h
-rw-r--r-- 1 root root 2450 Jun 26 15:33 config.m4
-rw-r--r-- 1 root root 743 Jun 26 15:33 config.w32
-rw-r--r-- 1 root root 638 Jun 26 15:33 CONTRIBUTING.md
-rw-r--r-- 1 root root 24 Jun 26 15:33 CREDITS
drwxr-xr-x 2 root root 23 Jun 26 15:33 examples
-rw-r--r-- 1 root root 5248 Jun 26 15:33 fun.c
-rw-r--r-- 1 root root 1186 Jun 26 15:33 fun.h
-rw-r--r-- 1 root root 26396 Jun 26 15:33 kafka_consumer.c
-rw-r--r-- 1 root root 1175 Jun 26 15:33 kafka_consumer.h
-rw-r--r-- 1 root root 1082 Jun 26 15:33 LICENSE
-rw-r--r-- 1 root root 6476 Jun 26 15:33 message.c
-rw-r--r-- 1 root root 1408 Jun 26 15:33 message.h
-rw-r--r-- 1 root root 6186 Jun 26 15:33 metadata_broker.c
-rw-r--r-- 1 root root 1285 Jun 26 15:33 metadata_broker.h
-rw-r--r-- 1 root root 7900 Jun 26 15:33 metadata.c
-rw-r--r-- 1 root root 8576 Jun 26 15:33 metadata_collection.c
-rw-r--r-- 1 root root 1476 Jun 26 15:33 metadata_collection.h
-rw-r--r-- 1 root root 1262 Jun 26 15:33 metadata.h
-rw-r--r-- 1 root root 8062 Jun 26 15:33 metadata_partition.c
-rw-r--r-- 1 root root 1294 Jun 26 15:33 metadata_partition.h
-rw-r--r-- 1 root root 6776 Jun 26 15:33 metadata_topic.c
-rw-r--r-- 1 root root 1282 Jun 26 15:33 metadata_topic.h
-rw-r--r-- 1 root root 15604 Jun 26 15:33 package.xml
-rw-r--r-- 1 root root 2249 Jun 26 15:33 php_rdkafka.h
-rw-r--r-- 1 root root 8203 Jun 26 15:33 php_rdkafka_priv.h
-rw-r--r-- 1 root root 4651 Jun 26 15:33 queue.c
-rw-r--r-- 1 root root 1522 Jun 26 15:33 queue.h
-rw-r--r-- 1 root root 27523 Jun 26 15:33 rdkafka.c
-rw-r--r-- 1 root root 12747 Jun 26 15:33 README.md
drwxr-xr-x 2 root root 4096 Jun 26 15:33 tests
-rw-r--r-- 1 root root 22660 Jun 26 15:33 topic.c
-rw-r--r-- 1 root root 1699 Jun 26 15:33 topic.h
-rw-r--r-- 1 root root 11802 Jun 26 15:33 topic_partition.c
-rw-r--r-- 1 root root 1906 Jun 26 15:33 topic_partition.h
-rw-r--r-- 1 root root 1682 Jun 26 15:33 zeval.h
生成config模块:phpize
[root@localhost php-rdkafka-master]# phpize
Configuring for:
PHP Api Version: 20160303
Zend Module Api No: 20160303
Zend Extension Api No: 320160303
检测:[root@localhost php-rdkafka-master]# ./configure –with-php-config=/usr/bin/php-config
编译、安装:[root@localhost php-rdkafka-master]# make && make install
2.4 PHP引用
修改PHP配置文件,增加:
[root@localhost php-rdkafka-master]# vi /etc/php.ini
extension=/usr/lib64/php/modules/rdkafka.so
重启PHP:[root@localhost php-rdkafka-master]# systemctl restart php-fpm
三、使用
$ret = TXCKafka::getIntance()->produce($_aucOrgID, $this->rcv_devId, $_aucBDA, $_cRSSI);
class TXCKafka {
private static $singleinstance = false;
public static function getIntance() {
if (self::$singleinstance == false) {
self::$singleinstance = new self;
}
return self::$singleinstance;
}
function __construct() {}
public $result;
public $rk_producer;
public $topic;
public function connect(){
try {
Log::info("TXCKafka::connect", Log::INFO);
$ret = false;
$conf = new RdKafka\Conf();
$conf->setDrMsgCb(function ($kafka, $message) use (&$ret){
$this->result = $message->err == 0 ? true : false;
});
// 设置授权用户和密码
$conf->set('security.protocol', 'SASL_PLAINTEXT');
$conf->set('sasl.mechanism', 'PLAIN');
$conf->set('sasl.username', Config::s_kafka_username);
$conf->set('sasl.password', Config::s_kafka_password);
$this->rk_producer = new RdKafka\Producer($conf);
$this->rk_producer->setLogLevel(LOG_DEBUG);
$this->rk_producer->addBrokers(Config::s_kafka_brokers);
$topicConf = new RdKafka\TopicConf();
$topicConf->set('message.timeout.ms', 3000);
$this->topic = $this->rk_producer->newTopic('location', $topicConf);
} catch (Exception $e) {
Log::info("TXCKafka::connect Exception: " . $e->getMessage(), Log::EXCEPT);
return false;
}
}
public function produce($org_id, $gateway_id, $mac, $rssi){
try {
if (!$this->topic) $this->connect();
if ($this->send($org_id, $gateway_id, $mac, $rssi)) {
return true;
}
$this->connect();
if ($this->send($org_id, $gateway_id, $mac, $rssi)) {
return true;
}
return false;
} catch (Exception $e) {
Log::info("TXCKafka::produce Exception: " . $e->getMessage(), Log::EXCEPT);
return false;
}
}
public function send($org_id, $gateway_id, $mac, $rssi) {
try {
$msg = ['org_id' => $org_id,
'gateway_id' => $gateway_id,
'mac' => $mac,
'rssi' => $rssi,
'location_time' => date("YmdHis")];
$this->topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($msg));
while($this->rk_producer->getOutQLen() > 0){
$this->rk_producer->poll(10);
}
Log::info("TXCKafka::produce = " . json_encode($msg), Log::INFO);
if ($this->result) {
Log::info("TXCKafka::produce success", Log::INFO);
} else {
Log::info("TXCKafka::produce failed", Log::INFO);
}
return $this->result;
} catch (Exception $e) {
Log::info("TXCKafka::send Exception: " . $e->getMessage(), Log::EXCEPT);
return false;
}
}
}