MAC KAFKA

# brew install kafka
#配置文件路径
# /usr/local/etc/kafka/server.properties
# /usr/local/etc/kafka/zookeeper.properties
#命令安装路径
# /usr/local/Cellar/kafka/2.3.0/
#启动Zookeeper
# cd /usr/local/Cellar/kafka/2.3.0/
# ./bin/zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
#启动Kafka
# ./bin/kafka-server-start /usr/local/etc/kafka/server.properties
#创建Topic
# cd /usr/local/Cellar/kafka/2.3.0/bin
# ./kafka-topics --create --zookeeper localhost:2181 --replication-factor 主题的副本数 --partitions 分区数量 --topic 主题名称
#查看已有主题
# ./kafka-topics --list --zookeeper localhost:2181
shell> cd /usr/local/Cellar/kafka/2.3.0/bin
#创建生产者
# ./kafka-console-producer --topic hello --broker-list localhost:9092

PHP安卓RDKAFKA扩展

checking for librdkafka/rdkafka.h” in default path… not found

checking for librdkafka/rdkafka.h” in default path… not found
configure: error: Please reinstall the rdkafka distribution

原因:Kafka client based on librdkafka,缺少依赖的librdkafka

加载第三方扩展包 https://github.com/weiboad/kafka-php

producer

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;

class Producer extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */

    protected $signature = 'producter';

    /**
     * The console command description.
     *
     * @var string
     */

    protected $description = 'Command description';

    /**
     * Create a new command instance.
     *
     * @return void
     */

    public function __construct()
    {
        parent::__construct();
    }

    /**
     * Execute the console command.
     *
     * @return mixed
     */

    public function handle()
    {
        $this->producter();
    }

    public function producter(){
        $conf = new \RdKafka\Conf();

        $conf->setDrMsgCb(function ($kafka, $message) {
            file_put_contents("/tmp/c_dr_cb.log", var_export($message, true), FILE_APPEND);
        });
        $conf->setErrorCb(function ($kafka, $err, $reason) {
            file_put_contents("/tmp/err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
        });

        $rk = new \RdKafka\Producer($conf);
        $rk->setLogLevel(LOG_DEBUG);
        $rk->addBrokers("127.0.0.1");

        $cf = new \RdKafka\TopicConf();
        $cf->set('request.required.acks', 0);
        $topic = $rk->newTopic("test", $cf);

        $option = 'qkl';
        for ($i = 0; $i < 20; $i++) {
            //RD_KAFKA_PARTITION_UA自动选择分区
            //$option可选
            $topic->produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option);
        }


        $len = $rk->getOutQLen();
        while ($len > 0) {
            $len = $rk->getOutQLen();
            var_dump($len);
            $rk->poll(50);
        }
    }
}

consumer

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;

class Consumer extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */

    protected $signature = 'consumer';

    /**
     * The console command description.
     *
     * @var string
     */

    protected $description = 'Command description';

    /**
     * Create a new command instance.
     *
     * @return void
     */

    public function __construct()
    {
        parent::__construct();
    }

    /**
     * Execute the console command.
     *
     * @return mixed
     */

    public function handle()
    {
        $this->consumer();
    }

    public function consumer()
    {
        $conf = new \RdKafka\Conf();
        $conf->setDrMsgCb(function ($kafka, $message) {
            file_put_contents("/tmp/c_dr_cb.log", var_export($message, true), FILE_APPEND);
        });
        $conf->setErrorCb(function ($kafka, $err, $reason) {
            file_put_contents("/tmp/err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason) . PHP_EOL, FILE_APPEND);
        });

        //设置消费组
        $conf->set('group.id', 'myConsumerGroup');

        $rk = new \RdKafka\Consumer($conf);
        $rk->addBrokers("127.0.0.1");

        $topicConf = new \RdKafka\TopicConf();
        $topicConf->set('request.required.acks', 1);
        //在interval.ms的时间内自动提交确认、建议不要启动
        //$topicConf->set('auto.commit.enable', 1);
        $topicConf->set('auto.commit.enable', 0);
        $topicConf->set('auto.commit.interval.ms', 100);

        // 设置offset的存储为file
        //$topicConf->set('offset.store.method', 'file');
        // 设置offset的存储为broker
        $topicConf->set('offset.store.method', 'broker');
        //$topicConf->set('offset.store.path', __DIR__);

        //smallest:简单理解为从头开始消费,其实等价于上面的 earliest
        //largest:简单理解为从最新的开始消费,其实等价于上面的 latest
        //$topicConf->set('auto.offset.reset', 'smallest');

        $topic = $rk->newTopic("test", $topicConf);

        // 参数1消费分区0
        // RD_KAFKA_OFFSET_BEGINNING 重头开始消费
        // RD_KAFKA_OFFSET_STORED 最后一条消费的offset记录开始消费
        // RD_KAFKA_OFFSET_END 最后一条消费
        $topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
        //$topic->consumeStart(0, RD_KAFKA_OFFSET_END); //
        //$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

        while (true) {
            //参数1表示消费分区,这里是分区0
            //参数2表示同步阻塞多久
            $message = $topic->consume(0, 12 * 1000);
            if (is_null($message)) {
                sleep(1);
                echo "No more messages\n";
                continue;
            }
            switch ($message->err) {
                case RD_KAFKA_RESP_ERR_NO_ERROR:
                    print_r($message);
                    break;
                case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                    echo "No more messages; will wait for more\n";
                    break;
                case RD_KAFKA_RESP_ERR__TIMED_OUT:
                    echo "Timed out\n";
                    break;
                default:
                    throw new \Exception($message->errstr(), $message->err);
                    break;
            }
        }
    }
}

Leave Comment