Kafka數(shù)據(jù)遷移MaxCompute最佳實(shí)踐
本文為您介紹如何使用DataWorks數(shù)據(jù)集成,將Kafka集群上的數(shù)據(jù)遷移至MaxCompute。
前提條件
新增MaxCompute數(shù)據(jù)源。詳情請(qǐng)參見(jiàn)創(chuàng)建MaxCompute數(shù)據(jù)源。
在DataWorks上完成創(chuàng)建業(yè)務(wù)流程,本例使用DataWorks簡(jiǎn)單模式。詳情請(qǐng)參見(jiàn)創(chuàng)建業(yè)務(wù)流程。
搭建Kafka集群
進(jìn)行數(shù)據(jù)遷移前,您需要保證自己的Kafka集群環(huán)境正常。本文使用阿里云EMR服務(wù)自動(dòng)化搭建Kafka集群,詳細(xì)過(guò)程請(qǐng)參見(jiàn)Kafka快速入門。
本文使用的EMR Kafka版本信息如下:
EMR版本:EMR-3.12.1
集群類型:Kafka
軟件信息:Ganglia 3.7.2,ZooKeeper 3.4.12,Kafka 2.11-1.0.1,Kafka-Manager 1.3.X.XX
Kafka集群使用專有網(wǎng)絡(luò),區(qū)域?yàn)槿A東1(杭州),主實(shí)例組ECS計(jì)算資源配置公網(wǎng)及內(nèi)網(wǎng)IP。
背景信息
Kafka是一款分布式發(fā)布與訂閱的消息中間件,具有高性能、高吞量的特點(diǎn)被廣泛使用,每秒能處理上百萬(wàn)的消息。Kafka適用于流式數(shù)據(jù)處理,主要應(yīng)用于用戶行為跟蹤、日志收集等場(chǎng)景。
一個(gè)典型的Kafka集群包含若干個(gè)生產(chǎn)者(Producer)、Broker、消費(fèi)者(Consumer)以及一個(gè)Zookeeper集群。Kafka集群通過(guò)Zookeeper管理自身集群的配置并進(jìn)行服務(wù)協(xié)同。
Topic是Kafka集群上最常用的消息的集合,是一個(gè)消息存儲(chǔ)邏輯概念。物理磁盤不存儲(chǔ)Topic,而是將Topic中具體的消息按分區(qū)(Partition)存儲(chǔ)在集群中各個(gè)節(jié)點(diǎn)的磁盤上。每個(gè)Topic可以有多個(gè)生產(chǎn)者向它發(fā)送消息,也可以有多個(gè)消費(fèi)者向它拉取(消費(fèi))消息。
每個(gè)消息被添加到分區(qū)時(shí),會(huì)分配一個(gè)Offset(偏移量,從0開(kāi)始編號(hào)),是消息在一個(gè)分區(qū)中的唯一編號(hào)。
步驟一:準(zhǔn)備Kafka數(shù)據(jù)
您需要在Kafka集群創(chuàng)建測(cè)試數(shù)據(jù)。為保證您可以順利登錄EMR集群Header主機(jī),以及保證MaxCompute和DataWorks可以順利和EMR集群Header主機(jī)通信,請(qǐng)您首先配置EMR集群Header主機(jī)安全組,放行TCP 22及TCP 9092端口。
登錄EMR集群Header主機(jī)地址。
進(jìn)入EMR Hadoop控制臺(tái)。
在頂部導(dǎo)航欄,單擊集群管理。
在顯示的頁(yè)面,找到您需要?jiǎng)?chuàng)建測(cè)試數(shù)據(jù)的集群,進(jìn)入集群詳情頁(yè)。
在集群詳情頁(yè)面,單擊主機(jī)列表,確認(rèn)EMR集群Header主機(jī)地址,并通過(guò)SSH鏈接遠(yuǎn)程登錄。
創(chuàng)建測(cè)試Topic。
執(zhí)行如下命令創(chuàng)建測(cè)試所使用的Topic testkafka。
kafka-topics.sh --zookeeper emr-header-1:2181/kafka-1.0.1 --partitions 10 --replication-factor 3 --topic testkafka --create
寫(xiě)入測(cè)試數(shù)據(jù)。
執(zhí)行如下命令,可以模擬生產(chǎn)者向Topic testkafka中寫(xiě)入數(shù)據(jù)。由于Kafka用于處理流式數(shù)據(jù),您可以持續(xù)不斷地向其中寫(xiě)入數(shù)據(jù)。為保證測(cè)試結(jié)果,建議寫(xiě)入10條以上的數(shù)據(jù)。
kafka-console-producer.sh --broker-list emr-header-1:9092 --topic testkafka
您可以同時(shí)再打開(kāi)一個(gè)SSH窗口,執(zhí)行如下命令,模擬消費(fèi)者驗(yàn)證數(shù)據(jù)是否已成功寫(xiě)入Kafka。當(dāng)數(shù)據(jù)寫(xiě)入成功時(shí),您可以看到已寫(xiě)入的數(shù)據(jù)。
kafka-console-consumer.sh --bootstrap-server emr-header-1:9092 --topic testkafka --from-beginning
步驟二:在DataWorks上創(chuàng)建目標(biāo)表
在DataWorks上創(chuàng)建目標(biāo)表用以接收Kafka數(shù)據(jù)。
進(jìn)入數(shù)據(jù)開(kāi)發(fā)頁(yè)面。
單擊左側(cè)導(dǎo)航欄 。
在下拉框中選擇對(duì)應(yīng)工作空間后單擊進(jìn)入數(shù)據(jù)開(kāi)發(fā)。
右鍵單擊業(yè)務(wù)流程,選擇 。
在彈出的新建表對(duì)話框中,填寫(xiě)表名稱,并單擊新建。
說(shuō)明表名必須以字母開(kāi)頭,不能包含中文或特殊字符。
如果在數(shù)據(jù)開(kāi)發(fā)中綁定多個(gè)MaxCompute數(shù)據(jù)源,則按需選擇MaxCompute引擎實(shí)例。
在表的編輯頁(yè)面,單擊DDL模式。
在DDL對(duì)話框中,輸入如下建表語(yǔ)句,單擊生成表結(jié)構(gòu)。
CREATE TABLE testkafka ( key string, value string, partition1 string, timestamp1 string, offset string, t123 string, event_id string, tag string ) ;
其中的每一列,對(duì)應(yīng)于DataWorks數(shù)據(jù)集成Kafka Reader的默認(rèn)列:
__key__表示消息的key。
__value__表示消息的完整內(nèi)容 。
__partition__表示當(dāng)前消息所在分區(qū)。
__headers__表示當(dāng)前消息headers信息。
__offset__表示當(dāng)前消息的偏移量。
__timestamp__表示當(dāng)前消息的時(shí)間戳。
您還可以自主命名,詳情參見(jiàn)Kafka Reader。
單擊提交到生產(chǎn)環(huán)境并確認(rèn)。
步驟三:同步數(shù)據(jù)
新建獨(dú)享數(shù)據(jù)集成資源組。
由于當(dāng)前DataWorks的公共資源組無(wú)法完美支持Kafka插件,您需要使用獨(dú)享數(shù)據(jù)集成資源組完成數(shù)據(jù)同步。詳情請(qǐng)參見(jiàn)新增和使用獨(dú)享數(shù)據(jù)集成資源組。
新建數(shù)據(jù)集成節(jié)點(diǎn)。
進(jìn)入數(shù)據(jù)開(kāi)發(fā)頁(yè)面,右鍵單擊指定業(yè)務(wù)流程,選擇
。在新建節(jié)點(diǎn)對(duì)話框中,輸入節(jié)點(diǎn)名稱,并單擊確認(rèn)。
在頂部菜單欄上,單擊圖標(biāo)。
在腳本模式下,單擊頂部菜單欄上的圖標(biāo)。
配置腳本,示例代碼如下。
{ "type": "job", "steps": [ { "stepType": "kafka", "parameter": { "server": "47.xxx.xxx.xxx:9092", "kafkaConfig": { "group.id": "console-consumer-83505" }, "valueType": "ByteArray", "column": [ "__key__", "__value__", "__partition__", "__timestamp__", "__offset__", "'123'", "event_id", "tag.desc" ], "topic": "testkafka", "keyType": "ByteArray", "waitTime": "10", "beginOffset": "0", "endOffset": "3" }, "name": "Reader", "category": "reader" }, { "stepType": "odps", "parameter": { "partition": "", "truncate": true, "compress": false, "datasource": "odps_source",// MaxCompute數(shù)據(jù)源名稱 "column": [ "key", "value", "partition1", "timestamp1", "offset", "t123", "event_id", "tag" ], "emptyAsNull": false, "table": "testkafka" }, "name": "Writer", "category": "writer" } ], "version": "2.0", "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] }, "setting": { "errorLimit": { "record": "" }, "speed": { "throttle": false, "concurrent": 1 } } }
您可以通過(guò)在Header主機(jī)上使用kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --list命令查看group.id參數(shù),及消費(fèi)者的Group名稱。
命令示例
kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --list
返回結(jié)果
_emr-client-metrics-handler-group console-consumer-69493 console-consumer-83505 console-consumer-21030 console-consumer-45322 console-consumer-14773
以console-consumer-83505為例,您可以根據(jù)該參數(shù)在Header主機(jī)上使用kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --describe --group console-consumer-83505命令確認(rèn)beginOffset及endOffset參數(shù)。
命令示例。
kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --describe --group console-consumer-83505
返回結(jié)果
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID testkafka 6 0 0 0 - - - test 6 3 3 0 - - - testkafka 0 0 0 0 - - - testkafka 1 1 1 0 - - - testkafka 5 0 0 0 - - -
配置調(diào)度資源組。
在節(jié)點(diǎn)編輯頁(yè)面的右側(cè)導(dǎo)航欄,單擊調(diào)度配置。
在資源屬性區(qū)域,選擇調(diào)度資源組為您創(chuàng)建的獨(dú)享數(shù)據(jù)集成資源組。
說(shuō)明如果您需要將Kafka的數(shù)據(jù)周期性(例如每小時(shí))寫(xiě)入MaxCompute,您可以使用beginDateTime及endDateTime參數(shù),設(shè)置數(shù)據(jù)讀取的時(shí)間區(qū)間為1小時(shí),然后每小時(shí)調(diào)度一次數(shù)據(jù)集成任務(wù)。詳情請(qǐng)參見(jiàn)Kafka Reader。
單擊圖標(biāo)運(yùn)行代碼。
您可以在運(yùn)行日志查看運(yùn)行結(jié)果。
后續(xù)步驟
您可以新建一個(gè)數(shù)據(jù)開(kāi)發(fā)任務(wù)運(yùn)行SQL語(yǔ)句,查看當(dāng)前表中是否已存在從云消息隊(duì)列 Kafka 版同步過(guò)來(lái)的數(shù)據(jù)。本文以select * from testkafka
為例,具體步驟如下:
進(jìn)入數(shù)據(jù)開(kāi)發(fā)頁(yè)面。
在左側(cè)導(dǎo)航欄,單擊工作空間。進(jìn)入工作空間列表詳情界面。
在頂部切換至目標(biāo)地域,找到已創(chuàng)建的工作空間,單擊操作列的
,進(jìn)入數(shù)據(jù)開(kāi)發(fā)頁(yè)面。
單擊左側(cè)的圖標(biāo),進(jìn)入臨時(shí)查詢頁(yè)面。單擊上面的圖標(biāo)。選擇
節(jié)點(diǎn)。在新建節(jié)點(diǎn)對(duì)話框中,輸入路徑、名稱信息。
單擊確認(rèn)。
在創(chuàng)建的節(jié)點(diǎn)頁(yè)面,輸入
select * from testkafka
,單擊圖標(biāo),運(yùn)行完成后,查看運(yùn)行日志。