將云消息隊(duì)列 Kafka 版的數(shù)據(jù)遷移至MaxCompute
本文介紹如何使用DataWorks數(shù)據(jù)同步功能,將云消息隊(duì)列 Kafka 版集群上的數(shù)據(jù)遷移至阿里云大數(shù)據(jù)計(jì)算服務(wù)MaxCompute,方便您對(duì)離線數(shù)據(jù)進(jìn)行分析加工。
背景信息
大數(shù)據(jù)計(jì)算服務(wù)MaxCompute(原ODPS)是一種大數(shù)據(jù)計(jì)算服務(wù),能提供快速、完全托管免運(yùn)維的EB級(jí)云數(shù)據(jù)倉(cāng)庫(kù)解決方案。
DataWorks基于MaxCompute計(jì)算和存儲(chǔ),提供工作流可視化開(kāi)發(fā)、調(diào)度運(yùn)維托管的一站式海量數(shù)據(jù)離線加工分析平臺(tái)。在數(shù)加(一站式大數(shù)據(jù)平臺(tái))中,DataWorks控制臺(tái)即為MaxCompute控制臺(tái)。MaxCompute和DataWorks一起向用戶提供完善的數(shù)據(jù)處理和數(shù)倉(cāng)管理能力,以及SQL、MR、Graph等多種經(jīng)典的分布式計(jì)算模型,能夠更快速地解決用戶海量數(shù)據(jù)計(jì)算問(wèn)題,有效降低企業(yè)成本,保障數(shù)據(jù)安全。
本教程旨在幫助您使用DataWorks,將云消息隊(duì)列 Kafka 版中的數(shù)據(jù)導(dǎo)入至MaxCompute,來(lái)進(jìn)一步探索大數(shù)據(jù)的價(jià)值。
前提條件
云消息隊(duì)列 Kafka 版實(shí)例版本需要大于等于0.10.2小于等于2.2.x。
購(gòu)買(mǎi)并部署云消息隊(duì)列 Kafka 版。具體操作,請(qǐng)參見(jiàn)購(gòu)買(mǎi)并部署實(shí)例。 本文以部署在華東1(杭州)地域(Region)的集群為例。
創(chuàng)建Topic和Group,具體操作,請(qǐng)參見(jiàn)創(chuàng)建資源。本文以Topic名稱(chēng)為testkafka,Group名稱(chēng)為console-consumer為例。
1.準(zhǔn)備云消息隊(duì)列 Kafka 版數(shù)據(jù)
向Topic testkafka中寫(xiě)入數(shù)據(jù),以作為遷移至MaxCompute中的數(shù)據(jù)。由于云消息隊(duì)列 Kafka 版用于處理流式數(shù)據(jù),您可以持續(xù)不斷地向其中寫(xiě)入數(shù)據(jù)。為保證測(cè)試結(jié)果,建議您寫(xiě)入10條以上的數(shù)據(jù)。
在概覽頁(yè)面的資源分布區(qū)域,選擇地域。
在實(shí)例列表頁(yè)面,單擊目標(biāo)實(shí)例名稱(chēng)。
在左側(cè)導(dǎo)航欄,單擊Topic 管理。
在Topic 管理頁(yè)面,單擊目標(biāo)Topic名稱(chēng)進(jìn)入Topic 詳情頁(yè)面,然后單擊體驗(yàn)發(fā)送消息。
在快速體驗(yàn)消息收發(fā)面板,發(fā)送如下的測(cè)試消息。
在左側(cè)導(dǎo)航欄,單擊消息查詢(xún),然后在消息查詢(xún)頁(yè)面,選擇查詢(xún)方式、所屬的Topic、分區(qū)等信息,單擊查詢(xún),查看之前寫(xiě)入的Topic的數(shù)據(jù)。
關(guān)于消息查詢(xún)的更多信息,請(qǐng)參見(jiàn)消息查詢(xún)。以按時(shí)間查詢(xún)?yōu)槔樵?xún)的一部分消息如下截圖:
2.創(chuàng)建MaxCompute項(xiàng)目
2.1.開(kāi)通MaxCompute(可選)
只有開(kāi)通了MaxCompute,才可以在MaxCompute中執(zhí)行創(chuàng)建項(xiàng)目等操作。具體操作,請(qǐng)參見(jiàn)開(kāi)通MaxCompute。
2.2.創(chuàng)建MaxCompute項(xiàng)目
本文以在華東1(杭州)地域創(chuàng)建名為kafka_bigdata_doc的項(xiàng)目為例。具體操作,請(qǐng)參見(jiàn)通過(guò)MaxCompute控制臺(tái)創(chuàng)建項(xiàng)目。
3.創(chuàng)建DataWorks工作空間
3.1.開(kāi)通DataWorks(可選)
當(dāng)前所在地域首次開(kāi)通DataWorks服務(wù)時(shí),必須購(gòu)買(mǎi)DataWorks任意產(chǎn)品版本和按量付費(fèi)新版資源組,才能開(kāi)通并使用DataWorks。具體操作,請(qǐng)參見(jiàn)開(kāi)通DataWorks服務(wù)。
3.2.創(chuàng)建工作空間
本文以在華東1(杭州)地域創(chuàng)建名為kafka_workspace的工作空間為例。具體操作,請(qǐng)參見(jiàn)創(chuàng)建工作空間。
4.添加數(shù)據(jù)源
4.1.創(chuàng)建獨(dú)享數(shù)據(jù)集成資源組
創(chuàng)建一個(gè)名為kafka_dx的獨(dú)享數(shù)據(jù)集成資源組。具體操作,請(qǐng)參見(jiàn)購(gòu)買(mǎi)資源組。
綁定3.創(chuàng)建DataWorks工作空間步驟中創(chuàng)建的名為kafka_workspace的工作空間。具體操作,請(qǐng)參見(jiàn)綁定歸屬工作空間。
4.2.創(chuàng)建MaxCompute數(shù)據(jù)源
登錄DataWorks控制臺(tái),切換至目標(biāo)地域后,選擇左側(cè)導(dǎo)航欄的 ,在下拉框中選擇對(duì)應(yīng)工作空間后單擊進(jìn)入管理中心。
進(jìn)入工作空間管理中心頁(yè)面后,選擇左側(cè)導(dǎo)航欄的
,進(jìn)入數(shù)據(jù)源頁(yè)面。單擊新增數(shù)據(jù)源,選擇MaxCompute,根據(jù)界面指引創(chuàng)建數(shù)據(jù)源。如下圖所示,創(chuàng)建一個(gè)名為MaxCompute_data的數(shù)據(jù)源:
4.3.創(chuàng)建Kafka數(shù)據(jù)源
登錄DataWorks控制臺(tái),切換至目標(biāo)地域后,選擇左側(cè)導(dǎo)航欄的 ,在下拉框中選擇對(duì)應(yīng)工作空間后單擊進(jìn)入管理中心。
進(jìn)入工作空間管理中心頁(yè)面后,選擇左側(cè)導(dǎo)航欄的
,進(jìn)入數(shù)據(jù)源頁(yè)面。單擊新增數(shù)據(jù)源,選擇Kafka,根據(jù)界面指引創(chuàng)建數(shù)據(jù)源。如下圖所示,創(chuàng)建一個(gè)名為kafka_data的數(shù)據(jù)源:
說(shuō)明實(shí)例ID填寫(xiě)已部署的云消息隊(duì)列 Kafka 版的實(shí)例ID。
測(cè)試連通性時(shí),如果出現(xiàn)無(wú)法連通的情況,單擊自助排查解決,在連通性診斷工具面板中,按照指引完成測(cè)試即可。
5.創(chuàng)建DataWorks表
您需創(chuàng)建DataWorks表,以保證大數(shù)據(jù)計(jì)算服務(wù)MaxCompute可以順利接收云消息隊(duì)列 Kafka 版數(shù)據(jù)。為測(cè)試便利,本文以使用非分區(qū)表為例。
進(jìn)入數(shù)據(jù)開(kāi)發(fā)頁(yè)面。
在左側(cè)導(dǎo)航欄,單擊工作空間。
在目標(biāo)工作空間的操作列中,單擊快速進(jìn)入,選擇數(shù)據(jù)開(kāi)發(fā)。
在數(shù)據(jù)開(kāi)發(fā)頁(yè)面,右鍵單擊目標(biāo)業(yè)務(wù)名稱(chēng),選擇
。在新建表頁(yè)面,選擇引擎類(lèi)型并輸入表名為testkafka。
在DDL對(duì)話框中,輸入如下建表語(yǔ)句,單擊生成表結(jié)構(gòu)。
CREATE TABLE testkafka ( key string, value string, partition string, headers string, offset string, timestamp string ) ;
單擊提交到生產(chǎn)環(huán)境并確認(rèn)。
6.創(chuàng)建并啟動(dòng)離線同步任務(wù)
進(jìn)入數(shù)據(jù)開(kāi)發(fā)頁(yè)面。
在左側(cè)導(dǎo)航欄,單擊工作空間。
在目標(biāo)工作空間的操作列中,單擊快速進(jìn)入,選擇數(shù)據(jù)開(kāi)發(fā)。
在數(shù)據(jù)開(kāi)發(fā)頁(yè)面,右鍵單擊業(yè)務(wù)名稱(chēng),選擇
。在新建節(jié)點(diǎn)對(duì)話框,輸入節(jié)點(diǎn)名稱(chēng)(即數(shù)據(jù)同步任務(wù)名稱(chēng)),然后單擊確認(rèn)。
在創(chuàng)建的節(jié)點(diǎn)頁(yè)面,填寫(xiě)網(wǎng)絡(luò)與資源配置信息。
單擊下一步,填寫(xiě)配置任務(wù)信息,單擊圖標(biāo),運(yùn)行任務(wù)。
7.結(jié)果驗(yàn)證
7.1驗(yàn)證離線同步任務(wù)運(yùn)行結(jié)果
完成運(yùn)行后,運(yùn)行日志中顯示運(yùn)行成功。
7.2驗(yàn)證數(shù)據(jù)同步結(jié)果
進(jìn)入數(shù)據(jù)開(kāi)發(fā)頁(yè)面。
在左側(cè)導(dǎo)航欄,單擊工作空間。
在目標(biāo)工作空間的操作列中,單擊快速進(jìn)入,選擇數(shù)據(jù)開(kāi)發(fā)。
在臨時(shí)查詢(xún)面板,右鍵單擊臨時(shí)查詢(xún),選擇
。在新建節(jié)點(diǎn)對(duì)話框中,輸入名稱(chēng)。
單擊確認(rèn)。
在創(chuàng)建的節(jié)點(diǎn)頁(yè)面,輸入
select * from testkafka
,單擊圖標(biāo),運(yùn)行完成后,查看運(yùn)行日志。