管理Kafka自定義連接器
本文介紹如何創(chuàng)建、查看、編輯、刪除Kafka自定義連接器任務(wù)。
支持的地域
目前支持創(chuàng)建Kafka自定義連接器任務(wù)的地域有:華東1(杭州)、華東2(上海)、華北2(北京)、華北3(張家口)、華南1(深圳)、中國(guó)香港、美國(guó)(硅谷)、新加坡。
計(jì)費(fèi)說(shuō)明
Kafka自定義連接器獨(dú)立于云消息隊(duì)列 Kafka 版實(shí)例,因此不會(huì)在云消息隊(duì)列 Kafka 版側(cè)產(chǎn)生費(fèi)用。同時(shí),阿里云不承諾Kafka自定義連接器的SLA,使用時(shí)所依賴的Serverless 應(yīng)用引擎的費(fèi)用說(shuō)明,請(qǐng)參見(jiàn)按量計(jì)費(fèi)。
前提條件
開(kāi)通服務(wù)
已開(kāi)通對(duì)象存儲(chǔ)OSS服務(wù)并創(chuàng)建存儲(chǔ)空間(Bucket)。更多信息,請(qǐng)參見(jiàn)控制臺(tái)創(chuàng)建存儲(chǔ)空間。
已開(kāi)通Serverless應(yīng)用引擎服務(wù),更多信息,請(qǐng)參見(jiàn)準(zhǔn)備工作。
已創(chuàng)建專有網(wǎng)絡(luò)及交換機(jī)。更多信息,請(qǐng)參見(jiàn)快速搭建(ROS)。
已購(gòu)買(mǎi)并部署云消息隊(duì)列 Kafka 版實(shí)例。更多信息,請(qǐng)參見(jiàn)購(gòu)買(mǎi)并部署實(shí)例。
環(huán)境說(shuō)明
Kafka Connect版本為3.2.3。
Scala版本為2.13。
Java版本為java 8。
創(chuàng)建流程
創(chuàng)建步驟
- 登錄事件總線EventBridge控制臺(tái),在左側(cè)導(dǎo)航欄,單擊事件流。
- 在頂部菜單欄,選擇地域,然后單擊創(chuàng)建事件流。
在創(chuàng)建事件流面板中,完成以下配置,單擊創(chuàng)建。
在基本信息頁(yè)簽,設(shè)置事件流名稱和描述。
在連接器配置頁(yè)簽,選擇事件提供方為Apache Kafka Connect。
在Kafka Connect插件卡片,配置以下參數(shù)。
參數(shù)
說(shuō)明
示例
Kafka Connect插件
事件總線EventBridge支持將包含Kafka Connect配置信息的.zip文件通過(guò)本地上傳或者OSS引用的方式導(dǎo)入控制臺(tái)。默認(rèn)選擇OSS引用。
OSS引用
Bucket存儲(chǔ)桶
選擇包含.zip文件的存儲(chǔ)空間Bucket。更多信息,請(qǐng)參見(jiàn)控制臺(tái)創(chuàng)建存儲(chǔ)空間。
testbucket
文件
選擇Bucket中包含Kafka Connect配置信息的.zip文件。也可根據(jù)控制臺(tái)提示信息上傳.zip文件。
test-connect.zip
在Kafka資源信息卡片,配置以下參數(shù)。
參數(shù)
說(shuō)明
示例
Kafka參數(shù)配置
Source Connect:其他資源服務(wù)通過(guò)Kafka Connect投遞到Kafka。
Sink Connect:Kafka資源通過(guò)Kafka Connect投遞到其他資源服務(wù)。
Source Connect
Kafka實(shí)例
選擇與其他資源服務(wù)對(duì)接的Kafka實(shí)例。
alikafka-cn-zpr37892366****
專有網(wǎng)絡(luò)VPC
選擇VPC ID。
vpc-bq1huohcvuo****
交換機(jī)
選擇vSwitch ID。
vsw-bqu1hdguoo****
安全組
選擇實(shí)例所在的安全組。
sg-dguigreuohpnv****
在Kafka Connect配置信息卡片,選擇一種方式,配置Kafka Connect的連接參數(shù)。
方式一:從文件列表中,選擇已上傳的包含連接器配置信息的.zip包中的.properties文件。
方式二:在控制臺(tái)上配置以下連接參數(shù)。
重要如果已經(jīng)上傳了包含連接器配置信息的.zip文件,此處的配置信息將覆蓋.zip文件中的信息。
參數(shù)(必填)
說(shuō)明
示例
name
Connector的名稱。一般命名為不包含ISO控制符的字符串。
mongo-sink
connector.class
Connector類(lèi)的名稱或者別名。必須是
org.apache.kafka.connect.connector.Connector
的子類(lèi)。com.mongodb.kafka.connect.MongoSinkConnector
task.max
最大任務(wù)數(shù)量。取值范圍為[1,Kafka中Topic的最大分區(qū)數(shù)]。
1
topics
當(dāng)Kafka參數(shù)配置參數(shù)為Sink Connect時(shí),該參數(shù)指定數(shù)據(jù)源Topic,不同Topic之間以半角逗號(hào)(,)進(jìn)行分隔。
sourceA,sourceB
其他選填參數(shù),請(qǐng)參見(jiàn)Kafka Connect Configs。
在實(shí)例配置頁(yè)簽,配置以下參數(shù)。
在Worker規(guī)格卡片,配置以下參數(shù)。
參數(shù)
說(shuō)明
示例
Worker規(guī)格
Worker支持彈性擴(kuò)縮能力,可根據(jù)CPU使用率來(lái)自動(dòng)擴(kuò)縮容Worker。建議使用2 Core 4 GIB規(guī)格。
2 Core 4 GIB
最小Worker數(shù)
設(shè)置Worker擴(kuò)容的最小數(shù)量,取值不能小于1。
2
最大Worker數(shù)
設(shè)置Worker擴(kuò)容的最小數(shù)量,值不能大于50。
2
說(shuō)明最大Worker數(shù)應(yīng)該大于或等于最小Worker數(shù)。
橫向擴(kuò)縮容閾值
當(dāng)CPU使用率大于或小于該值,觸發(fā)自動(dòng)擴(kuò)容或縮容。單位:%。
50
在Kafka Connect Worker配置卡片,勾選自動(dòng)創(chuàng)建Kafka Connect Worker依賴資源。
事件總線EventBridge控制臺(tái)預(yù)設(shè)了Kafka Connect Worker配置文件,不建議修改。
在運(yùn)行配置頁(yè)簽,在日志投遞卡片設(shè)置投遞方式為投遞至SLS或者投遞至Kafka,在角色授權(quán)卡片設(shè)置Connect依賴的角色配置,單擊確定。
重要建議配置的角色包含AliyunSAEFullAccess權(quán)限,否則可能會(huì)導(dǎo)致任務(wù)運(yùn)行失敗。
其他操作
創(chuàng)建任務(wù)后,您可以在事件流頁(yè)面找到此任務(wù),在其右側(cè)的操作列執(zhí)行相應(yīng)操作。
查看詳情:?jiǎn)螕?b data-tag="uicontrol" id="uicontrol-6cz-8qz-ye2" class="uicontrol">操作列的詳情,查看該任務(wù)的基礎(chǔ)信息及指標(biāo)監(jiān)控。
編輯任務(wù):?jiǎn)螕?b data-tag="uicontrol" id="uicontrol-bpr-vxx-i4x" class="uicontrol">操作列的編輯,在編輯事件流面板修改該任務(wù)的連接器配置、實(shí)例配置及運(yùn)行配置。修改完成后單擊確認(rèn)。
啟停任務(wù):?jiǎn)螕?b data-tag="uicontrol" id="uicontrol-i1l-sec-6zd" class="uicontrol">操作列的停用或啟用,然后在提示對(duì)話框,單擊確定。
刪除任務(wù):?jiǎn)螕?b data-tag="uicontrol" id="uicontrol-9nj-7km-n41" class="uicontrol">操作列的刪除,然后在提示對(duì)話框,單擊確定。