日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

PHP SDK收發(fā)消息

更新時(shí)間:

本文介紹如何使用PHP SDK通過接入點(diǎn)接入云消息隊(duì)列 Kafka 版并收發(fā)消息。

環(huán)境準(zhǔn)備

安裝C++依賴庫

  1. 執(zhí)行以下命令切換到y(tǒng)um源配置目錄/etc/yum.repos.d/

    cd /etc/yum.repos.d/
  2. 創(chuàng)建yum源配置文件confluent.repo

    [Confluent.dist]
    name=Confluent repository (dist)
    baseurl=https://packages.confluent.io/rpm/5.1/7
    gpgcheck=1
    gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
    enabled=1
    
    [Confluent]
    name=Confluent repository
    baseurl=https://packages.confluent.io/rpm/5.1
    gpgcheck=1
    gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
    enabled=1
  3. 執(zhí)行以下命令安裝C++依賴庫。

    yum install librdkafka-devel

安裝PHP依賴庫

  1. 執(zhí)行以下命令安裝PHP依賴庫。

    pecl install rdkafka
  2. 在PHP的初始化文件php.ini中添加以下一行語句以開啟擴(kuò)展。

    extension=rdkafka.so

準(zhǔn)備配置

  1. 可選:下載SSL根證書。如果是SSL接入點(diǎn),需下載該證書。

  2. 訪問Aliware-kafka-demos,單擊download,下載Demo工程到本地并解壓。

  3. 在解壓的Demo工程找到kafka-php-demo文件夾,根據(jù)接入點(diǎn)類型打開對(duì)應(yīng)的文件夾,配置setting.php文件。

    <?php
    
    return [
        'sasl_plain_username' => 'xxx',
        'sasl_plain_password' => 'xxx',
        'bootstrap_servers' => "xxx:xx,xxx:xx",
        'topic_name' => 'xxx',
        'consumer_id' => 'xxx'
    ];

    參數(shù)

    描述

    sasl_plain_username

    SASL用戶名。如果是默認(rèn)接入點(diǎn),則無此配置項(xiàng)。

    說明
    • 如果實(shí)例未開啟ACL,您可以在云消息隊(duì)列 Kafka 版控制臺(tái)實(shí)例詳情頁面的配置信息區(qū)域獲取默認(rèn)的用戶名密碼
    • 如果實(shí)例已開啟ACL,請(qǐng)確保要使用的SASL用戶已被授予向云消息隊(duì)列 Kafka 版實(shí)例收發(fā)消息的權(quán)限。具體操作,請(qǐng)參見SASL用戶授權(quán)

    sasl_plain_password

    SASL用戶名密碼。如果是默認(rèn)接入點(diǎn),則無此配置項(xiàng)。

    bootstrap_servers

    SSL接入點(diǎn)。您可在云消息隊(duì)列 Kafka 版控制臺(tái)實(shí)例詳情頁面的接入點(diǎn)信息區(qū)域獲取。

    topic_name

    Topic名稱。您可在云消息隊(duì)列 Kafka 版控制臺(tái)Topic 管理頁面獲取。

    consumer_id

    Group名稱。您可在云消息隊(duì)列 Kafka 版控制臺(tái)Group 管理頁面獲取。

  4. 配置完成后,將配置文件所在文件夾下的全部文件(如果是SSL接入點(diǎn)實(shí)例,包含證書SSL根證書文件)上傳至服務(wù)器PHP安裝目錄下。

發(fā)送消息

執(zhí)行以下命令發(fā)送消息。

php kafka-producer.php

消息程序kafka-producer.php示例代碼如下:

說明

示例代碼為SSL接入點(diǎn)的代碼。如果是默認(rèn)接入點(diǎn),無SASL相關(guān)代碼,即刪除如下代碼中包含sasl.ssl.相關(guān)的代碼即可。

<?php

$setting = require __DIR__ . '/setting.php';

$conf = new RdKafka\Conf();
$conf->set('sasl.mechanisms', 'PLAIN');
$conf->set('api.version.request', 'true');
$conf->set('sasl.username', $setting['sasl_plain_username']);
$conf->set('sasl.password', $setting['sasl_plain_password']);
$conf->set('security.protocol', 'SASL_SSL');
$conf->set('ssl.ca.location', __DIR__ . '/ca-cert.pem');
$conf->set('ssl.endpoint.identification.algorithm', 'none');
$conf->set('message.send.max.retries', 5);
$rk = new RdKafka\Producer($conf);
# if want to debug, set log level to LOG_DEBUG
$rk->setLogLevel(LOG_INFO);
$rk->addBrokers($setting['bootstrap_servers']);
$topic = $rk->newTopic($setting['topic_name']);
$a = $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message hello kafka");
$rk->poll(0);
while ($rk->getOutQLen() > 0) {
    $rk->poll(50);
}
echo "send succ" . PHP_EOL;

代碼示例詳情,請(qǐng)參見php-rdkafka

訂閱消息

執(zhí)行如下命令訂閱消息。

php kafka-consumer.php

消息程序kafka-consumer.php示例代碼如下:

說明

示例代碼為SSL接入點(diǎn)的代碼。如果是默認(rèn)接入點(diǎn),無SASL相關(guān)代碼,即刪除如下代碼中包含sasl.ssl.相關(guān)的代碼即可。

<?php
$setting = require __DIR__ . '/setting.php';
$conf = new RdKafka\Conf();
$conf->set('sasl.mechanisms', 'PLAIN');
$conf->set('api.version.request', 'true');
$conf->set('sasl.username', $setting['sasl_plain_username']);
$conf->set('sasl.password', $setting['sasl_plain_password']);
$conf->set('security.protocol', 'SASL_SSL');
$conf->set('ssl.ca.location', __DIR__ . '/ca-cert.pem');

$conf->set('session.timeout.ms', 10000);

$conf->set('request.timeout.ms', 305000);

$conf->set('group.id', $setting['consumer_id']);

$conf->set('ssl.endpoint.identification.algorithm', 'none');

$conf->set('metadata.broker.list', $setting['bootstrap_servers']);

$topicConf = new RdKafka\TopicConf();

$conf->setDefaultTopicConf($topicConf);

$consumer = new RdKafka\KafkaConsumer($conf);

$consumer->subscribe([$setting['topic_name']]);

echo "Waiting for partition assignment... (make take some time when\n";
echo "quickly re-joining the group after leaving it.)\n";

while (true) {
    $message = $consumer->consume(30 * 1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}

?>

代碼示例詳情,請(qǐng)參見php-rdkafka