本文中含有需要您注意的重要提示信息,忽略該信息可能對您的業務造成影響,請務必仔細閱讀。
本文介紹如何將Kafka數據導入到日志服務,實現數據的查詢分析、加工等操作。
前提條件
已有可用的Kafka集群。
已創建Project和Logstore。具體操作,請參見創建項目Project和創建Logstore。
版本說明
目前,只支持Kafka 2.0.0及以上版本。
創建數據導入配置
登錄日志服務控制臺。
在接入數據區域的數據導入頁簽中,單擊Kafka-數據導入。
- 選擇目標Project和Logstore,單擊下一步。
設置導入配置。
在導入設置步驟中,配置如下參數。
參數
說明
任務名稱
SLS任務的唯一名稱。
顯示名稱
任務顯示名稱。
任務描述
導入任務的描述。
服務地址
Kafka bootstrap Servers地址。多個服務地址之間使用半角逗號(,)分隔。
如果是阿里云云消息隊列 Kafka 版,需輸入接入點的IP地址或域名。
如果是阿里云ECS上自建的Kafka集群,需輸入ECS實例的IP地址。
如果是其他的Kafka集群,需輸入Kafka Broker的公網IP地址或域名。
Topic列表
Kafka主題。多個主題之間使用半角逗號(,)分隔。
消費組
如果您使用的是阿里云云消息隊列 Kafka 版,且未開啟自由使用Group功能,則需要選擇對應的消費組。創建消費組的具體操作,請參見創建消費組。
起始位置
開始導入數據的位置。
最早:從現有的第一條Kafka數據開始導入。
最晚:從最新生成的Kafka數據開始導入。
數據格式
待導入數據的格式。
極簡模式:如果待導入的數據為單行格式,您可以選擇極簡模式。
JSON字符串:如果待導入的數據為JSON,您可以選擇JSON字符串。導入任務會將數據解析為鍵值對格式,只解析到第一層。
解析數組元素
打開解析數組元素開關后,對于JSON數組格式的數據,系統會按其數組元素拆分為多條數據后進行導入。
編碼格式
待導入數據的編碼格式(即字符集),目前支持UTF-8和GBK。
VPC實例ID
如果Kafka集群是VPC環境下的阿里云云消息隊列 Kafka 版或阿里云ECS上自建的Kafka集群,您可以通過設置VPC實例ID,實現日志服務通過阿里云內網讀取Kafka集群的數據。
通過阿里云內網讀取數據,具備更好的安全性和網絡穩定性。
重要Kafka集群需允許被IP網段100.104.0.0/16訪問。
時間配置
時間字段
設置為Kafka數據中代表時間的列名,用于指定數據導入日志服務時的時間。
提取時間正則
如果您選擇的數據格式為極簡模式,您需要設置正則表達式提取Kafka數據中的時間。
例如,數據內容為
message with time 2022-08-08 14:20:20
,則您可以設置提取時間正則為\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d
。時間字段格式
指定時間格式,用于解析時間字段的值。
支持Java SimpleDateFormat語法的時間格式,例如yyyy-MM-dd HH:mm:ss。時間格式的語法詳情請參見Class SimpleDateFormat。常見的時間格式請參見時間格式。
支持epoch格式,可選值為epoch、epochMillis、epochMacro或epochNano。
時間字段時區
選擇時間字段對應的時區。
當時間格式是epoch時,不需要設置時區。
默認時間來源
當沒有提供時間提取信息或者時間提取失敗時,使用您所設置的時間來源,包括系統當前時間和kafka消息時間戳。
高級配置
日志上下文
打開日志上下文開關后,支持日志服務的上下文查詢功能。您可以查看目標數據在原始Kafka partition中的前若干條(上文)或后若干條(下文)數據。
通信協議
通過公網導入時,建議通過加密連接、用戶認證的方式進行導入,即在此處定義連接Kafka集群的通信協議信息,配置示例如下所示。
protocol字段的可選值為plaintext、ssl、sasl_plaintext或sasl_ssl。建議設置為sasl_ssl,此協議需要連接加密和用戶認證。
設置protocol為sasl_plaintext或sasl_ssl時,需設置sasl節點。其中,mechanism字段可以為PLAIN、SCRAM-SHA-256或SCRAM-SHA-512,表示用戶名/密碼身份驗證機制。
{ "protocol":"sasl_plaintext", "sasl":{ "mechanism":"PLAIN", "username":"xxx", "password":"yyy" } }
私網域名解析
部署在阿里云ECS上的Kafka Broker之間采用內部域名通信時,您需要在此處指定每個Broker對應的ECS域名和IP地址。配置示例如下所示。
{ "hostname#1":"192.168.XX.XX", "hostname#2":"192.168.XX.XX", "hostname#3":"192.168.XX.XX" }
單擊預覽,預覽導入結果。
確認無誤后,單擊下一步。
創建索引和預覽數據,然后單擊下一步。日志服務默認開啟全文索引。您也可以根據采集到的日志,手動創建字段索引,或者單擊自動生成索引,日志服務將自動生成字段索引。更多信息,請參見創建索引。
重要如果需要查詢日志中的所有字段,建議使用全文索引。如果只需查詢部分字段、建議使用字段索引,減少索引流量。如果需要對字段進行分析(SELECT語句),必須創建字段索引。
單擊查詢日志,進入查詢和分析頁面,確認是否成功導入Kafka數據。
等待1分鐘左右,如果有目標Kafka數據導入,則說明導入成功。
查看導入配置
創建導入配置成功后,您可以在控制臺中查看已創建的導入配置及生成的統計報表。
單擊目標Project。
選擇目標日志庫下的 ,單擊配置名稱。
在導入配置概覽頁面,查看導入配置的基本信息和統計報表。
相關操作
在導入配置概覽頁面,您還可以進行如下操作。
修改配置
單擊修改配置,修改導入配置的相關配置。具體配置,請參見創建數據導入配置。
刪除配置
單擊刪除配置,刪除該導入配置。
警告刪除后不可恢復,請謹慎操作。
停止任務
單擊停止,停止該導入任務。
常見問題
問題 | 可能原因 | 解決方法 |
預覽時出現Kafka Broker連接錯誤(Broker transport failure)。 |
|
|
預覽時出現超時錯誤(preview request timed out)。 | 待導入的Kafka Topic中沒有數據。 | 如果待導入的Kafka Topic中沒有數據,請在寫入數據后,再重試預覽。 |
數據存在亂碼。 | 編碼格式配置不符合預期。 | 根據Kafka真實的編碼格式更新導入配置。 如果需要修復已有的亂碼數據,請創建新的Logstore和導入配置。 |
日志服務中顯示的數據時間和數據本身的時間不一致。 | 設置導入配置時,沒有指定日志時間字段或者設置時間格式、時區有誤。 | 設置指定的日志時間字段以及正確的時間格式和時區。更多信息,請參見創建數據導入配置。 |
導入數據后,無法查詢和分析數據。 |
| |
導入的數據條目數量少于預期。 | 存在大于3 MB的Kafka數據,您可以通過數據處理流量觀測儀表盤確認。 | 縮小單條Kafka消息的大小。 |
數據導入時存在明顯的延遲 |
|
|
錯誤處理機制
限制項 | 說明 |
網絡連接錯誤 | 導入任務會定期重試,即網絡連接恢復后,導入任務會自動從之前中斷的Offset位置繼續消費數據。 |
Kafka Topic不存在 | 當目標Kafka Topic不存在時,導入任務會跳過該Topic,且不影響其他正常的Topic的數據導入。 當不存在的Topic被重建后,導入任務會正常消費該Topic中的數據(存在約10分鐘的延遲)。 |