日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

使用Kafka協(xié)議上傳日志

更新時(shí)間:

您可以使用Kafka Producer SDK、Beats系列軟件、Collectd、Fluentd、Logstash、Telegraf、Vector等采集工具采集日志,并通過Kafka協(xié)議上傳到日志服務(wù)。本文介紹通過采集工具采集到日志后,利用Kafka協(xié)議上傳日志到日志服務(wù)的操作步驟。

相關(guān)限制

  • 支持的Kafka協(xié)議版本最低為2.1.0。

  • 為保證日志傳輸安全性,必須使用SASL_SSL連接協(xié)議。

權(quán)限說明

以下兩個(gè)權(quán)限規(guī)則滿足其中之一即可。

  • AliyunLogFullAccess

    本策略定義了管理日志服務(wù)(Log)的權(quán)限。授權(quán)方式請(qǐng)參見為RAM用戶授權(quán)為RAM角色授權(quán)。

  • 自定義權(quán)限策略

    1. 創(chuàng)建一個(gè)自定義權(quán)限策略,其中在腳本編輯頁(yè)簽,請(qǐng)使用以下腳本替換配置框中的原有內(nèi)容。具體操作,請(qǐng)參見創(chuàng)建自定義權(quán)限策略。

      說明

      腳本中的Project名稱請(qǐng)根據(jù)實(shí)際情況替換。

      {
          "Version": "1",
          "Statement": [
              {
                  "Action": "log:GetProject",
                  "Resource": "acs:log:*:*:project/project名稱",
                  "Effect": "Allow"
              },
              {
                  "Action": [
                      "log:GetLogStore",
                      "log:ListShards",
                      "log:PostLogStoreLogs"
                  ],
                  "Resource": "acs:log:*:*:project/project名稱/logstore/*",
                  "Effect": "Allow"
              }
          ]
      }
    2. 為RAM用戶添加創(chuàng)建的自定義權(quán)限策略。具體操作,請(qǐng)參見為RAM用戶授權(quán)

配置方式

使用Kafka協(xié)議上傳日志時(shí),您需要配置以下參數(shù)。

配置名

配置值

說明

示例

SLS_KAFKA_ENDPOINT

初始連接的集群地址,格式為Project名稱.Endpoint:Port,請(qǐng)根據(jù)Project所在的Endpoint進(jìn)行配置。更多信息,請(qǐng)參見服務(wù)入口。

  • 私網(wǎng):端口號(hào)為10011,例如Project名稱.cn-hangzhou-intranet.log.aliyuncs.com:10011。

  • 公網(wǎng):端口號(hào)為10012,例如Project名稱.cn-hangzhou.log.aliyuncs.com:10012。

aliyun-project-test為Project名稱,cn-hangzhou-xxx.aliyuncs.comEndpoint10011、10012分別為私網(wǎng)和公網(wǎng)的端口號(hào)。

  • 私網(wǎng):aliyun-project-test.cn-hangzhou-intranet.log.aliyuncs.com:10011

  • 公網(wǎng):aliyun-project-test.cn-hangzhou.log.aliyuncs.com:10012。

SLS_PROJECT

Project名稱

日志服務(wù)對(duì)應(yīng)的Project名稱。

aliyun-project-test

SLS_LOGSTORE

Logstore名稱

日志服務(wù)對(duì)應(yīng)的Logstore名稱。Logstore名稱后綴加上.json,則代表嘗試JSON解析。

例如Logstore名稱是test-logstore

  • 配置值為test-logstore,上報(bào)日志內(nèi)容存儲(chǔ)在content字段中。

  • 配置值為test-logstore.json,上報(bào)日志內(nèi)容按JSON提取,提取用戶上報(bào)的JSON數(shù)據(jù)中的首層Key為字段名,對(duì)應(yīng)的Value為字段值。

SLS_PASSWORD

具備sls寫入權(quán)限的AccessKeySecret。

AK的概念和創(chuàng)建步驟,請(qǐng)參見創(chuàng)建AccessKey

值為AccessKey ID和AliyunKey Secret用#號(hào)拼接。

  • AccessKey ID:用于標(biāo)識(shí)用戶。

  • AccessKey Secret:是一個(gè)用于驗(yàn)證您擁有該AccessKey ID的密碼。

LTaI5xxxxxxxxxxxxindexp2#CZO8XXXXXXXXXXpKSG

說明
  • AccessKey ID:LTaI5xxxxxxxxxxxxindexp2。

  • AliyunKeySecret:CZO8XXXXXXXXXXpKSG

  • #:用于拼接AccessKey ID和AliyunKey Secret。

說明

如果您要通過Kafka消費(fèi)組實(shí)時(shí)消費(fèi)日志服務(wù)的數(shù)據(jù),請(qǐng)?zhí)峤?span props="china">工單咨詢阿里云技術(shù)支持工程師。

示例一:通過Beats系列軟件上傳日志

Beats系列軟件(MetricBeat、PacketBeat、Winlogbeat、Auditbeat、Filebeat、Heartbeat等)采集到日志后,支持通過Kafka協(xié)議將日志上傳到日志服務(wù)。更多信息,請(qǐng)參見Beats-Kafka-Output。

  • 配置示例

    示例中用到的SLS_開頭的參數(shù)配置請(qǐng)參見配置方式

    output.kafka:
      # initial brokers for reading cluster metadata
      hosts: ["SLS_KAFKA_ENDPOINT"]
      username: "SLS_PROJECT"
      password: "SLS_PASSWORD"
      ssl.certificate_authorities:
      # message topic selection + partitioning
      topic: 'SLS_LOGSTORE'
      partition.round_robin:
        reachable_only: false
    
      required_acks: 1
      compression: gzip
      max_message_bytes: 1000000

示例二:通過Collectd上傳日志

Collectd是一個(gè)守護(hù)(daemon)進(jìn)程,用于定期采集系統(tǒng)和應(yīng)用程序的性能指標(biāo),并支持通過Kafka協(xié)議上傳到日志服務(wù)。更多信息,請(qǐng)參見Write Kafka Plugin

將Collectd采集到日志上傳到日志服務(wù)時(shí),還需安裝Kafka插件以及相關(guān)依賴。例如:在linux Centos中,可以使用yum安裝Kafka插件,命令為sudo yum install collectd-write_kafka,安裝RPM請(qǐng)參見Collectd-write_kafka。

  • 配置示例

    • 示例中將日志輸出格式(Format)設(shè)置為JSON,除此之外,還支持Command、Graphite類型。更多信息,請(qǐng)參見Collectd配置文檔。

    • 示例中用到的SLS_開頭的參數(shù)配置請(qǐng)參見配置方式

    LoadPlugin write_kafka
    
    <Plugin write_kafka>
      Property "metadata.broker.list" "SLS_KAFKA_ENDPOINT"
      Property "security.protocol" "sasl_ssl"
      Property "sasl.mechanism" "PLAIN"
      Property "sasl.username" "SLS_PROJECT"
      Property "sasl.password" "SLS_PASSWORD"
      Property "broker.address.family" "v4"
      <Topic "SLS_LOGSTORE">
        Format JSON
        Key "content"
      </Topic>
    </Plugin>

示例三:使用Telegraf上傳日志

Telegraf是由Go語(yǔ)言編寫的代理程序,內(nèi)存占用小,用于收集、處理、匯總數(shù)據(jù)指標(biāo)。Telegraf具有豐富的插件及集成功能,可從其運(yùn)行的系統(tǒng)中獲取各種指標(biāo)、從第三方API中獲取指標(biāo)以及通過statsd和Kafka消費(fèi)者服務(wù)監(jiān)聽指標(biāo)。

將Telegraf采集到的日志通過Kafka協(xié)議上傳到日志服務(wù)前,您需要先修改配置文件。

  • 配置示例

    • 示例中將日志輸出格式(Format)設(shè)置為JSON,除此之外還支持Graphite、Carbon2等類型。更多信息,請(qǐng)參見Telegraf輸出格式。

      說明

      Telegraf必須配置一個(gè)合法的tls_ca路徑,使用服務(wù)器自帶的根證書的路徑即可。Linux環(huán)境中,根證書CA路徑一般為/etc/ssl/certs/ca-bundle.crt

    • 示例中用到的SLS_開頭的參數(shù)配置請(qǐng)參見配置方式。

    # Kafka output plugin configuration
    [[outputs.kafka]]
      ## URLs of kafka brokers
      brokers = ["SLS_KAFKA_ENDPOINT"]
      ## Kafka topic for producer messages
      topic = "SLS_LOGSTORE"
      routing_key = "content"
      ## CompressionCodec represents the various compression codecs recognized by
      ## Kafka in messages.
      ## 0 : No compression
      ## 1 : Gzip compression
      ## 2 : Snappy compression
      ## 3 : LZ4 compression
      compression_codec = 1
      ## Optional TLS Config tls_ca = "/etc/ssl/certs/ca-bundle.crt"
      tls_cert = "/etc/ssl/certs/ca-certificates.crt" # tls_key = "/etc/telegraf/key.pem"
      ## Use TLS but skip chain & host verification
      # insecure_skip_verify = false
      ## Optional SASL Config
      sasl_username = "SLS_PROJECT"
      sasl_password = "SLS_PASSWORD"
      ## Data format to output.
      ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
      data_format = "json"

示例四:使用Fluentd上傳日志

Fluentd是一個(gè)開源的日志收集器,是云端原生計(jì)算基金會(huì)(CNCF)的成員項(xiàng)目之一,遵循Apache 2 License協(xié)議。

Fluentd支持眾多輸入、處理、輸出插件,支持通過Kafka插件將日志上傳到日志服務(wù),您只需安裝并配置Kafka插件即可。更多信息,請(qǐng)參見fluent-plugin-kafka

  • 配置示例

    • 示例中將日志輸出格式(Format)設(shè)置為JSON,除此之外還支持?jǐn)?shù)十種Format類型。更多信息,請(qǐng)參見Fluentd Formatter。

    • 示例中用到的SLS_開頭的參數(shù)配置請(qǐng)參見配置方式。

    <match **>
      @type kafka2
      brokers     SLS_KAFKA_ENDPOINT
      default_topic SLS_LOGSTORE
      default_message_key content
      sasl_over_ssl true
      use_event_time true
      username SLS_PROJECT
      password "SLS_PASSWORD"
      ssl_ca_certs_from_system true
      # ruby-kafka producer options
      max_send_retries 1000
      required_acks 1
      compression_codec gzip
      use_event_time true
      max_send_limit_bytes 2097152
    
      <buffer hostlogs>
        flush_interval 10s
      </buffer>
      <format>
        @type json
      </format>
    </match>

示例五:使用Logstash上傳日志

Logstash是一個(gè)具備實(shí)時(shí)處理能力、開源的日志采集引擎,可以動(dòng)態(tài)采集不同來源的日志。

Logstash內(nèi)置Kafka輸出插件,您可以配置Logstash實(shí)現(xiàn)日志通過kafka協(xié)議上傳到日志服務(wù)。由于日志服務(wù)使用SASL_SSL連接協(xié)議,因此還需要配置SSL證書和jaas文件。

  • 配置示例

    • 示例中將日志輸出格式(Format)設(shè)置為JSON,除此之外還支持?jǐn)?shù)十種Format類型。更多信息,請(qǐng)參見Logstash Codec。

      說明

      本示例為連通性測(cè)試的配置,您的生產(chǎn)環(huán)境中建議刪除stdout的輸出配置。

    • 示例中用到的SLS_開頭的參數(shù)配置請(qǐng)參見配置方式。

    
    output {
      stdout { codec => rubydebug }
      kafka {
        topic_id => "SLS_LOGSTORE"
        bootstrap_servers => "SLS_KAFKA_ENDPOINT"
        security_protocol => "SASL_SSL"
        sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='SLS_PROJECT' password='SLS_PASSWORD';"
        sasl_mechanism => "PLAIN"
        codec => "json"
        client_id => "kafka-logstash"
      }
    }

示例六:通過Fluent-bit上傳日志

Fluent-bit是一個(gè)輕量級(jí)、高可擴(kuò)展的日志與指標(biāo)的處理器、轉(zhuǎn)發(fā)器,支持眾多輸入、處理和輸出插件,支持通過Kafka插件將日志上傳到日志服務(wù)。更多信息,請(qǐng)參見Kafka output plugin

  • 配置示例

    示例中用到的SLS_開頭的參數(shù)配置請(qǐng)參見配置方式。

    [Output]
        Name    kafka
        Match    *
        Brokers   SLS_KAFKA_ENDPOINT
        Topics    SLS_LOGSTORE
        Format    json
        rdkafka.sasl.username   SLS_PROJECT
        rdkafka.sasl.password   SLS_PASSWORD
        rdkafka.security.protocol   SASL_SSL
        rdkafka.sasl.mechanism      PLAIN

示例七 :Vector配置Kafka協(xié)議上傳

Vector是一款輕量級(jí)、高性能的日志處理軟件,它支持Kafka協(xié)議的方式上報(bào)日志。下面是Vector通過Kafka兼容模式寫入SLS的配置方法。

  • 配置示例

    示例中用到的SLS_開頭的參數(shù)配置請(qǐng)參見配置方式。

    [sinks.aliyun_sls]
      type = "kafka"
      inputs = ["test_logs"]
      bootstrap_servers = "SLS_KAFKA_ENDPOINT"
      compression = "gzip"
      healthcheck = true
      topic = "SLS_LOGSTORE"
      encoding.codec = "json"
      sasl.enabled = true
      sasl.mechanism = "PLAIN"
      sasl.username = "SLS_PROJECT"
      sasl.password = "SLS_PASSWORD"
      tls.enabled = true

示例八:通過Kafka生產(chǎn)者(produce)上傳日志

  • 代碼示例

    package org.example;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    public class KafkaProduceExample {
    
        public static void main(String[] args) {
            //配置信息。
            Properties props = new Properties();
            String project = "etl-dev";
            String logstore = "testlog";
            // 如果希望produce的內(nèi)容被json解析展開,則設(shè)置為true
            boolean parseJson = true;
            // 阿里云賬號(hào)AccessKey擁有所有API的訪問權(quán)限,風(fēng)險(xiǎn)很高。強(qiáng)烈建議您創(chuàng)建并使用RAM用戶進(jìn)行API訪問或日常運(yùn)維,請(qǐng)登錄RAM控制臺(tái)創(chuàng)建RAM用戶。
            // 此處以把AccessKey 和 AccessKeySecret 保存在環(huán)境變量為例說明。您可以根據(jù)業(yè)務(wù)需要,保存到配置文件里。
            // 強(qiáng)烈建議不要把 AccessKey 和 AccessKeySecret 保存到代碼里,會(huì)存在密鑰泄漏風(fēng)險(xiǎn)
            String accessKeyID = System.getenv("SLS_ACCESS_KEY_ID");
            String accessKeySecret = System.getenv("SLS_ACCESS_KEY_SECRET");
            String endpoint = "cn-huhehaote.log.aliyuncs.com"; // 根據(jù)實(shí)際project所在的endpoint配置
            String port = "10012"; // 公網(wǎng)用10012,私網(wǎng)用10011
    
            String hosts = project + "." + endpoint + ":" + port;
            String topic = logstore;
            if(parseJson) {
                topic = topic + ".json";
            }
    
            props.put("bootstrap.servers", hosts);
            props.put("security.protocol", "sasl_ssl");
            props.put("sasl.mechanism", "PLAIN");
            props.put("sasl.jaas.config",
                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" +
                            project + "\" password=\"" + accessKeyID + "#" + accessKeySecret + "\";");
            props.put("enable.idempotence", "false"); // SLS的Kafka寫入接口不支持事務(wù)
    
            //設(shè)置數(shù)據(jù)key和value的序列化處理類。
            props.put("key.serializer", StringSerializer.class);
            props.put("value.serializer", StringSerializer.class);
    
            //創(chuàng)建生產(chǎn)者實(shí)例。
            KafkaProducer<String,String> producer = new KafkaProducer<>(props);
    
            //發(fā)送記錄
            for(int i=0;i<1;i++){
                String content = "{\"msg\": \"Hello World\"}";
                ProducerRecord record = new ProducerRecord<String, String>(topic, content);
                producer.send(record);
            }
            producer.close();
        }
    }
  • pom依賴

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.1.0</version>
    </dependency>

錯(cuò)誤信息

使用Kafka協(xié)議上傳日志失敗時(shí),會(huì)按照Kafka的錯(cuò)誤信息返回對(duì)應(yīng)的錯(cuò)誤信息,如下表所示,Kafka協(xié)議錯(cuò)誤信息詳情,請(qǐng)參見error list。

錯(cuò)誤信息

說明

推薦解決方式

NetworkException

出現(xiàn)網(wǎng)絡(luò)錯(cuò)誤時(shí)返回該錯(cuò)誤信息。

一般等待1秒后重試即可。

TopicAuthorizationException

鑒權(quán)失敗時(shí)返回該錯(cuò)誤信息。

一般是您提供的AccessKey錯(cuò)誤或沒有寫入對(duì)應(yīng)Project、Logstore的權(quán)限。請(qǐng)?zhí)顚懻_的且具備寫入權(quán)限的AccessKey。

UnknownTopicOrPartitionException

出現(xiàn)該錯(cuò)誤可能有兩種原因:

  • 不存在對(duì)應(yīng)的Project或Logstore。

  • Project所在地域與填入的Endpoint不一致。

請(qǐng)確保已創(chuàng)建對(duì)應(yīng)的Project和Logstore。如果已創(chuàng)建還是提示該錯(cuò)誤,請(qǐng)檢查Project所在地域是否和填入的Endpoint一致。

KafkaStorageException

服務(wù)端出現(xiàn)異常時(shí)返回該錯(cuò)誤信息。

一般等待1秒后重試即可。