創(chuàng)建MaxCompute Sink Connector
本文介紹如何創(chuàng)建使用MaxCompute Sink Connector,您可以通過MaxCompute Sink Connector將數(shù)據(jù)從云消息隊列 Kafka 版實例的數(shù)據(jù)源Topic導(dǎo)出至MaxCompute的表中。
前提條件
詳細(xì)步驟,請參見創(chuàng)建前提。
注意事項
如需使用MaxCompute分區(qū)功能,創(chuàng)建表時需額外創(chuàng)建一個分區(qū)列,列名為time,類型為STRING。
步驟一:創(chuàng)建目標(biāo)資源
通過MaxCompute客戶端創(chuàng)建表。更多信息,請參見創(chuàng)建表。
本文以名稱為kafka_to_maxcompute的表為例。表中有3列數(shù)據(jù),并使用分區(qū)功能。該表的建表語句如下:
CREATE TABLE IF NOT EXISTS kafka_to_maxcompute(topic STRING,valueName STRING,valueAge BIGINT) PARTITIONED by (time STRING);
如不使用分區(qū)功能,語句如下:
CREATE TABLE IF NOT EXISTS kafka_to_maxcompute(topic STRING,valueName STRING,valueAge BIGINT);
執(zhí)行成功后,如下圖所示:
在表管理頁面,查看創(chuàng)建的表信息。
步驟二:創(chuàng)建MaxCompute Sink Connector并啟動
登錄云消息隊列 Kafka 版控制臺,在概覽頁面的資源分布區(qū)域,選擇地域。
在左側(cè)導(dǎo)航欄,選擇 。
在任務(wù)列表頁面,單擊創(chuàng)建任務(wù)。
在創(chuàng)建任務(wù)面板,設(shè)置任務(wù)名稱和描述,配置以下參數(shù)。
任務(wù)創(chuàng)建
在Source(源)配置向?qū)Вx擇數(shù)據(jù)提供方為消息隊列 Kafka 版,設(shè)置以下參數(shù),然后單擊下一步。
參數(shù)
說明
示例
地域
源Kafka實例所在的地域。
華東1(杭州)
kafka實例
數(shù)據(jù)源所在的Kafka實例ID。
alikafka_post-cn-9hdsbdhd****
Topic
數(shù)據(jù)源所在的Kafka實例Topic。
guide-sink-topic
Group ID
數(shù)據(jù)源所在的Kafka實例中的Group ID。
快速創(chuàng)建:自動創(chuàng)建以GID_EVENTBRIDGE_xxx命名的Group ID。
使用已有:選擇已創(chuàng)建的Group,請選擇獨立的Group ID,不要和已有的業(yè)務(wù)混用,以免影響已有的消息收發(fā)。
使用已有
消費位點
最新位點:從最新位點開始消費。
最早位點:從最初位點開始消費。
最新位點
網(wǎng)絡(luò)配置
有跨境傳輸數(shù)據(jù)需求時選擇自建公網(wǎng),其他情況可選擇基礎(chǔ)網(wǎng)絡(luò)。
基礎(chǔ)網(wǎng)絡(luò)
數(shù)據(jù)格式
數(shù)據(jù)格式是針對支持二進(jìn)制傳遞的數(shù)據(jù)源端推出的指定內(nèi)容格式的編碼能力。支持多種數(shù)據(jù)格式編碼,如無特殊編碼訴求可將格式設(shè)置為Json。
Json(默認(rèn)Json格式編碼,二進(jìn)制數(shù)據(jù)按照utf-8 編碼為Json格式放入Payload。)
Text(文本格式編碼,二進(jìn)制數(shù)據(jù)按照utf-8編碼為字符串放入Payload。)
Binary(二進(jìn)制格式編碼,二進(jìn)制數(shù)據(jù)按照Base64編碼為字符串放入Payload。)
Json
批量推送條數(shù)
調(diào)用函數(shù)發(fā)送的最大批量消息條數(shù),當(dāng)積壓的消息數(shù)量到達(dá)設(shè)定值時才會發(fā)送請求,取值范圍為 [1,10000]。
2000
批量推送間隔(單位:秒)
調(diào)用函數(shù)的間隔時間,系統(tǒng)每到間隔時間點會將消息聚合后發(fā)給函數(shù)計算,取值范圍為[0,15],單位為秒。0秒表示無等待時間,直接投遞。
3
在Filtering(過濾)配置向?qū)?,定義數(shù)據(jù)模式過濾發(fā)送的請求。更多信息,請參見事件模式。
在Transform(轉(zhuǎn)換)配置向?qū)ВO(shè)置數(shù)據(jù)清洗,實現(xiàn)分割、映射、富化及動態(tài)路由等繁雜數(shù)據(jù)加工能力。更多信息,請參見使用函數(shù)計算實現(xiàn)消息數(shù)據(jù)清洗。
在Sink(目標(biāo))配置向?qū)?,選擇服務(wù)類型為大數(shù)據(jù)計算服務(wù) acs.maxcompute,配置以下參數(shù)。
參數(shù)
說明
示例
賬號AccessKey ID
阿里云賬號的AccessKey ID,用于訪問MaxCompute服務(wù)。
LTAI5tHPVCZywsoEVvFu****
賬號AccessKey Secret
阿里云賬號的AccessKey Secret。
4RAKUQpZtUntDgvoKu0tvrkrOM****
MaxCompute項目名稱
選擇已創(chuàng)建的MaxCompute項目。
test_compute
MaxCompute表名稱
選擇已創(chuàng)建的MaxCompute表。
kafka_to_maxcompute
MaxCompute表入?yún)?/b>
選擇MaxCompute表后,此處會展示表的列名和類型信息,配置數(shù)據(jù)提取規(guī)則即可。下面是一條消息示例,本示例中Topic列對應(yīng)的值從Topic字段提取,則定義數(shù)據(jù)提取規(guī)則為
$.topic
。{ 'data': { 'topic': 't_test', 'partition': 2, 'offset': 1, 'timestamp': 1717048990499, 'headers': { 'headers': [], 'isReadOnly': False }, 'key': 'MaxCompute-K1', 'value': 'MaxCompute-V1' }, 'id': '9b05fc19-9838-4990-bb49-ddb942307d3f-2-1', 'source': 'acs:alikafka', 'specversion': '1.0', 'type': 'alikafka:Topic:Message', 'datacontenttype': 'application/json; charset=utf-8', 'time': '2024-05-30T06:03:10.499Z', 'aliyunaccountid': '1413397765616316' }
topic:
$.data.topic
valuename:
$.data.value
valueage:
$.data.offset
是否開啟分區(qū)能力
開啟:開啟分區(qū)能力。
關(guān)閉:不開啟分區(qū)能力。
開啟
分區(qū)配置
僅當(dāng)是否開啟分區(qū)能力參數(shù)設(shè)置為開啟時需配置此參數(shù)。
支持{yyyy}、{MM}、{dd}、{HH}、{mm}時間變量參數(shù),分別對應(yīng)年、月、日、時、分。時間變量大小寫敏感。
支持填寫常量。
{yyyy}-{MM}-{dd}.{HH}:{mm}.suffix
網(wǎng)絡(luò)配置
專有網(wǎng)絡(luò):通過專有網(wǎng)絡(luò)VPC將Kafka消息投遞到MaxCompute。
公網(wǎng):通過公網(wǎng)將Kafka消息投遞到MaxCompute。
公網(wǎng)
VPC
選擇VPC ID。僅當(dāng)網(wǎng)絡(luò)配置為專有網(wǎng)絡(luò)時需要配置此參數(shù)。
vpc-bp17fapfdj0dwzjkd****
交換機(jī)
選擇vSwitch ID。僅當(dāng)網(wǎng)絡(luò)配置為專有網(wǎng)絡(luò)時需要配置此參數(shù)。
vsw-bp1gbjhj53hdjdkg****
安全組
選擇安全組。僅當(dāng)網(wǎng)絡(luò)配置為專有網(wǎng)絡(luò)時需要配置此參數(shù)。
test_group
任務(wù)屬性
配置事件推送失敗時的重試策略及錯誤發(fā)生時的處理方式。更多信息,請參見重試和死信。
完成上述配置后,單擊保存。在任務(wù)列表頁面,找到剛創(chuàng)建的MaxCompute Sink Connector任務(wù),此時狀態(tài)欄為啟動中,當(dāng)狀態(tài)變?yōu)?b data-tag="uicontrol" id="uicontrol-iam-3og-um9" class="uicontrol">運行中時,Connector創(chuàng)建成功。
步驟三:測試MaxCompute Sink Connector
在任務(wù)列表頁面,在MaxCompute Sink Connector任務(wù)的事件源列單擊源Topic。
在Topic詳情頁面,單擊體驗發(fā)送消息。
在快速體驗消息收發(fā)面板,按照下圖配置消息內(nèi)容,然后單擊確定。
進(jìn)入MaxCompute控制臺,執(zhí)行以下SQL語句查看分區(qū)信息。
show PARTITIONS kafka_to_maxcompute;
查詢結(jié)果如下所示:
根據(jù)分區(qū)信息,執(zhí)行以下語句,查看分區(qū)內(nèi)數(shù)據(jù)。
SELECT * FROM kafka_to_maxcompute WHERE time="2024-05-31.16:37.suffix";
查詢結(jié)果如下所示: