本文說明如何創建FC Sink Connector,您可以通過FC Sink Connector將數據從云消息隊列 Kafka 版實例的數據源Topic導出至函數計算的函數。
前提條件
在創建FC Sink Connector前,請確保您已完成以下操作:
云消息隊列 Kafka 版
為云消息隊列 Kafka 版實例開啟Connector。更多信息,請參見開啟Connector。
為云消息隊列 Kafka 版實例創建數據源Topic。更多信息,請參見步驟一:創建Topic。
本文以名稱為fc-test-input的Topic為例。
函數計算
在函數計算創建函數。更多信息,請參見快速創建函數。
重要函數類型必須為事件函數。
本文以服務名稱為guide-hello_world、函數名稱為hello_world、運行環境為Python的事件函數為例。該示例函數的代碼如下:
# -*- coding: utf-8 -*- import logging # To enable the initializer feature # please implement the initializer function as below: # def initializer(context): # logger = logging.getLogger() # logger.info('initializing') def handler(event, context): logger = logging.getLogger() logger.info('hello world:' + bytes.decode(event)) return 'hello world:' + bytes.decode(event)
可選:事件總線EventBridge
說明僅在您創建的Connector任務所屬實例的地域為華東1(杭州)或西南1(成都)時,需要完成該操作。
注意事項
僅支持在同地域內,將數據從云消息隊列 Kafka 版實例的數據源Topic導出至函數計算。Connector的限制說明,請參見使用限制。
如果Connector所屬實例的地域為華東1(杭州)或西南1(成都),該功能會部署至事件總線EventBridge。
事件總線EventBridge目前免費供您使用。更多信息,請參見計費說明。
創建Connector時,事件總線EventBridge會為您自動創建服務關聯角色AliyunServiceRoleForEventBridgeSourceKafka和AliyunServiceRoleForEventBridgeConnectVPC。
如果未創建服務關聯角色,事件總線EventBridge會為您自動創建對應的服務關聯角色,以便允許事件總線EventBridge使用此角色訪問云消息隊列 Kafka 版和專有網絡VPC。
如果已創建服務關聯角色,事件總線EventBridge不會重復創建。
關于服務關聯角色的更多信息,請參見服務關聯角色。
部署到事件總線EventBridge的任務暫時不支持查看任務運行日志。Connector任務執行完成后,您可以在訂閱數據源Topic的Group中,通過消費情況查看任務進度。具體操作,請參見查看消費狀態。
操作流程
使用FC Sink Connector將數據從云消息隊列 Kafka 版實例的數據源Topic導出至函數計算的函數的操作流程如下:
可選:使FC Sink Connector跨地域訪問函數計算
重要如果您不需要使FC Sink Connector跨地域訪問函數計算,您可以直接跳過該步驟。
可選:使FC Sink Connector跨賬號訪問函數計算
重要如果您不需要使FC Sink Connector跨賬號訪問函數計算,您可以直接跳過該步驟。
可選:創建FC Sink Connector依賴的Topic和Group
重要如果您不需要自定義Topic和Group的名稱,您可以直接跳過該步驟。
部分FC Sink Connector依賴的Topic的存儲引擎必須為Local存儲,大版本為0.10.2版本的云消息隊列 Kafka 版實例不支持手動創建Local存儲的Topic,只支持自動創建。
結果驗證
為FC Sink Connector開啟公網訪問
如需使FC Sink Connector跨地域訪問其他阿里云服務,您需要為FC Sink Connector開啟公網訪問。具體操作,請參見為Connector開啟公網訪問。
創建自定義權限策略
在目標賬號下創建訪問函數計算的自定義權限策略。
登錄訪問控制控制臺。
在左側導航欄,選擇 。
在權限策略頁面,單擊創建權限策略。
在新建自定義權限策略頁面,創建自定義權限策略。
在腳本編輯頁簽,策略腳本輸入框中自定義權限策略腳本,單擊下一步。
訪問函數計算的自定義權限策略腳本示例如下:
{ "Version": "1", "Statement": [ { "Action": [ "fc:InvokeFunction", "fc:GetFunction" ], "Resource": "*", "Effect": "Allow" } ] }
在基本信息區域名稱文本框,輸入KafkaConnectorFcAccess。
單擊確定。
創建RAM角色
在目標賬號下創建RAM角色。由于RAM角色不支持直接選擇云消息隊列 Kafka 版作為受信服務,您在創建RAM角色時,需要選擇任意支持的服務作為受信服務。RAM角色創建后,手動修改信任策略。
在左側導航欄,選擇 。
在角色頁面,單擊創建角色。
在創建角色面板,創建RAM角色。
選擇可信實體類型為阿里云服務,單擊下一步。
在角色類型區域,選擇普通服務角色,在角色名稱文本框,輸入AliyunKafkaConnectorRole,從選擇受信服務列表,選擇函數計算,然后單擊完成。
在角色頁面,找到AliyunKafkaConnectorRole,單擊AliyunKafkaConnectorRole。
在AliyunKafkaConnectorRole頁面,單擊信任策略管理頁簽,單擊修改信任策略。
在修改信任策略面板,將腳本中fc替換為alikafka,單擊確定。
添加權限
在目標賬號下為創建的RAM角色授予訪問函數計算的權限。
在左側導航欄,選擇 。
在角色頁面,找到AliyunKafkaConnectorRole,在其右側操作列,單擊添加權限。
在添加權限面板,添加KafkaConnectorFcAccess權限。
在選擇權限區域,選擇自定義策略。
在權限策略名稱列表,找到KafkaConnectorFcAccess,單擊KafkaConnectorFcAccess。
單擊確定。
單擊完成。
創建FC Sink Connector依賴的Topic
您可以在云消息隊列 Kafka 版控制臺手動創建FC Sink Connector依賴的5個Topic,包括:任務位點Topic、任務配置Topic、任務狀態Topic、死信隊列Topic以及異常數據Topic。每個Topic所需要滿足的分區數與存儲引擎會有差異,具體信息,請參見配置源服務參數列表。
在概覽頁面的資源分布區域,選擇地域。
重要Topic需要在應用程序所在的地域(即所部署的ECS的所在地域)進行創建。Topic不能跨地域使用。例如Topic創建在華北2(北京)這個地域,那么消息生產端和消費端也必須運行在華北2(北京)的ECS。
在實例列表頁面,單擊目標實例名稱。
在左側導航欄,單擊Topic 管理。
在Topic 管理頁面,單擊創建 Topic。
在創建 Topic面板,設置Topic屬性,然后單擊確定。
參數
說明
示例
名稱
Topic名稱。
demo
描述
Topic的簡單描述。
demo test
分區數
Topic的分區數量。
12
存儲引擎
說明當前僅專業版實例支持選擇存儲引擎類型,標準版暫不支持,默認選擇為云存儲類型。
Topic消息的存儲引擎。
云消息隊列 Kafka 版支持以下兩種存儲引擎。
云存儲:底層接入阿里云云盤,具有低時延、高性能、持久性、高可靠等特點,采用分布式3副本機制。實例的規格類型為標準版(高寫版)時,存儲引擎只能為云存儲。
Local 存儲:使用原生Kafka的ISR復制算法,采用分布式3副本機制。
云存儲
消息類型
Topic消息的類型。
普通消息:默認情況下,保證相同Key的消息分布在同一個分區中,且分區內消息按照發送順序存儲。集群中出現機器宕機時,可能會造成消息亂序。當存儲引擎選擇云存儲時,默認選擇普通消息。
分區順序消息:默認情況下,保證相同Key的消息分布在同一個分區中,且分區內消息按照發送順序存儲。集群中出現機器宕機時,仍然保證分區內按照發送順序存儲。但是會出現部分分區發送消息失敗,等到分區恢復后即可恢復正常。當存儲引擎選擇Local 存儲時,默認選擇分區順序消息。
普通消息
日志清理策略
Topic日志的清理策略。
當存儲引擎選擇Local 存儲(當前僅專業版實例支持選擇存儲引擎類型為Local存儲,標準版暫不支持)時,需要配置日志清理策略。
云消息隊列 Kafka 版支持以下兩種日志清理策略。
Delete:默認的消息清理策略。在磁盤容量充足的情況下,保留在最長保留時間范圍內的消息;在磁盤容量不足時(一般磁盤使用率超過85%視為不足),將提前刪除舊消息,以保證服務可用性。
Compact:使用Kafka Log Compaction日志清理策略。Log Compaction清理策略保證相同Key的消息,最新的value值一定會被保留。主要適用于系統宕機后恢復狀態,系統重啟后重新加載緩存等場景。例如,在使用Kafka Connect或Confluent Schema Registry時,需要使用Kafka Compact Topic存儲系統狀態信息或配置信息。
重要Compact Topic一般只用在某些生態組件中,例如Kafka Connect或Confluent Schema Registry,其他情況的消息收發請勿為Topic設置該屬性。具體信息,請參見云消息隊列 Kafka 版Demo庫。
Compact
標簽
Topic的標簽。
demo
創建完成后,在Topic 管理頁面的列表中顯示已創建的Topic。
創建FC Sink Connector依賴的Group
您可以在云消息隊列 Kafka 版控制臺手動創建FC Sink Connector數據同步任務使用的Group。該Group的名稱必須為connect-任務名稱,具體信息,請參見配置源服務參數列表。
在概覽頁面的資源分布區域,選擇地域。
在實例列表頁面,單擊目標實例名稱。
在左側導航欄,單擊Group 管理。
在Group 管理頁面,單擊創建 Group。
在創建 Group面板的Group ID文本框輸入Group的名稱,在描述文本框簡要描述Group,并給Group添加標簽,單擊確定。
創建完成后,在Group 管理頁面的列表中顯示已創建的Group。
創建并部署FC Sink Connector
創建并部署用于將數據從云消息隊列 Kafka 版同步至函數計算的FC Sink Connector。
在概覽頁面的資源分布區域,選擇地域。
在左側導航欄,單擊Connector 任務列表。
在Connector 任務列表頁面,從選擇實例的下拉列表選擇Connector所屬的實例,然后單擊創建 Connector。
在創建 Connector配置向導頁面,完成以下操作。
在配置基本信息頁簽,按需配置以下參數,然后單擊下一步。
參數
描述
示例值
名稱
Connector的名稱。命名規則:
可以包含數字、小寫英文字母和短劃線(-),但不能以短劃線(-)開頭,長度限制為48個字符。
同一個云消息隊列 Kafka 版實例內保持唯一。
Connector的數據同步任務必須使用名稱為connect-任務名稱的Group。如果您未手動創建該Group,系統將為您自動創建。
kafka-fc-sink
實例
默認配置為實例的名稱與實例ID。
demo alikafka_post-cn-st21p8vj****
在配置源服務頁簽,選擇數據源為消息隊列Kafka版,并配置以下參數,然后單擊下一步。
說明如果您已創建好Topic和Group,那么請選擇手動創建資源,并填寫已創建的資源信息。否則,請選擇自動創建資源。
表 1. 配置源服務參數列表
參數
描述
示例值
數據源 Topic
需要同步數據的Topic。
fc-test-input
消費線程并發數
數據源Topic的消費線程并發數。默認值為6。取值說明如下:
1
2
3
6
12
6
消費初始位置
開始消費的位置。取值說明如下:
最早位點:從最初位點開始消費。
最近位點:從最新位點開始消費。
最早位點
VPC ID
數據同步任務所在的VPC。單擊配置運行環境顯示該參數。默認為云消息隊列 Kafka 版實例所在的VPC,您無需填寫。
vpc-bp1xpdnd3l***
vSwitch ID
數據同步任務所在的交換機。單擊配置運行環境顯示該參數。該交換機必須與云消息隊列 Kafka 版實例處于同一VPC。默認為部署云消息隊列 Kafka 版實例時填寫的交換機。
vsw-bp1d2jgg81***
失敗處理
消息發送失敗后,是否繼續訂閱出現錯誤的Topic的分區。單擊配置運行環境顯示該參數。取值說明如下。
繼續訂閱:繼續訂閱出現錯誤的Topic的分區,并打印錯誤日志。
停止訂閱:停止訂閱出現錯誤的Topic的分區,并打印錯誤日志
說明如何查看日志,請參見Connector相關操作。
如何根據錯誤碼查找解決方案,請參見錯誤碼。
繼續訂閱
創建資源方式
選擇創建Connector所依賴的Topic與Group的方式。單擊配置運行環境顯示該參數。
自動創建
手動創建
自動創建
Connector 消費組
Connector的數據同步任務使用的Group。單擊配置運行環境顯示該參數。該Group的名稱必須為connect-任務名稱。
connect-kafka-fc-sink
任務位點 Topic
用于存儲消費位點的Topic。單擊配置運行環境顯示該參數。
Topic:建議以connect-offset開頭。
分區數:Topic的分區數量必須大于1。
存儲引擎:Topic的存儲引擎必須為Local存儲。
cleanup.policy:Topic的日志清理策略必須為compact。
connect-offset-kafka-fc-sink
任務配置 Topic
用于存儲任務配置的Topic。單擊配置運行環境顯示該參數。
Topic:建議以connect-config開頭。
分區數:Topic的分區數量必須為1。
存儲引擎:Topic的存儲引擎必須為Local存儲。
cleanup.policy:Topic的日志清理策略必須為compact。
connect-config-kafka-fc-sink
任務狀態 Topic
用于存儲任務狀態的Topic。單擊配置運行環境顯示該參數。
Topic:建議以connect-status開頭。
分區數:Topic的分區數量建議為6。
存儲引擎:Topic的存儲引擎必須為Local存儲。
cleanup.policy:Topic的日志清理策略必須為compact。
connect-status-kafka-fc-sink
死信隊列 Topic
用于存儲Connect框架的異常數據的Topic。單擊配置運行環境顯示該參數。該Topic可以和異常數據Topic為同一個Topic,以節省Topic資源。
Topic:建議以connect-error開頭。
分區數:Topic的分區數量建議為6。
存儲引擎:Topic的存儲引擎可以為Local存儲或云存儲。
connect-error-kafka-fc-sink
異常數據 Topic
用于存儲Sink的異常數據的Topic。單擊配置運行環境顯示該參數。該Topic可以和死信隊列Topic為同一個Topic,以節省Topic資源。
Topic:建議以connect-error開頭。
分區數:Topic的分區數量建議為6。
存儲引擎:Topic的存儲引擎可以為Local存儲或云存儲。
connect-error-kafka-fc-sink
在配置目標服務頁簽,選擇目標服務為函數計算,并配置以下參數,然后單擊創建。
說明如果Connector所屬實例的地域為華東1(杭州)或西南1(成都),選擇目標服務為函數計算時, 會分別彈出創建服務關聯角色AliyunServiceRoleForEventBridgeSourceKafka和AliyunServiceRoleForEventBridgeConnectVPC的服務授權對話框,在彈出的服務授權對話框中單擊確認,然后再配置以下參數并單擊創建。如果服務關聯角色已創建,則不再重復創建,即不會再彈出服務授權對話框。
參數
描述
示例值
是否跨賬號/地域
FC Sink Connector是否跨賬號/地域向函數計算服務同步數據。默認為否。取值:
否:同地域同賬號模式。
是:跨地域同賬號模式、同地域跨賬號模式或跨地域跨賬號模式。
否
服務地域
函數計算服務的地域。默認為FC Sink Connector所在地域。如需跨地域,您需要為Connector開啟公網訪問,然后選擇目標地域。更多信息,請參見為FC Sink Connector開啟公網訪問。
重要是否跨賬號/地域為是時,顯示服務地域。
cn-hangzhou
服務接入點
函數計算服務的接入點。在函數計算控制臺的概覽頁的常用信息區域獲取。
內網Endpoint:低延遲,推薦。適用于云消息隊列 Kafka 版實例和函數計算處于同一地域場景。
公網Endpoint:高延遲,不推薦。適用于云消息隊列 Kafka 版實例和函數計算處于不同地域的場景。如需使用公網Endpoint,您需要為Connector開啟公網訪問。更多信息,請參見為FC Sink Connector開啟公網訪問。
重要是否跨賬號/地域general.no為是時,顯示服務接入點。
http://188***.cn-hangzhou.fc.aliyuncs.com
服務賬號
函數計算服務的阿里云賬號ID。在函數計算控制臺的概覽頁的常用信息區域獲取。
重要是否跨賬號/地域為是時,顯示服務賬號。
188***
授權角色名
云消息隊列 Kafka 版訪問函數計算服務的RAM角色。
如不需跨賬號,您需要在本賬號下創建RAM角色并為其授權,然后輸入該授權角色名。操作步驟,請參見創建自定義權限策略、創建RAM角色和添加權限。
如需跨賬號,您需要在目標賬號下創建RAM角色并為其授權,然后輸入該授權角色名。操作步驟,請參見創建自定義權限策略、創建RAM角色和添加權限。
重要是否跨賬號/地域為是時,顯示授權角色名。
AliyunKafkaConnectorRole
服務名
函數計算服務的名稱。
guide-hello_world
函數名
函數計算服務的函數名稱。
hello_world
版本或別名
函數計算服務的版本或別名。
重要是否跨賬號/地域為否時,您需選擇指定版本還是指定別名。
是否跨賬號/地域為是時,您需手動輸入函數計算服務的版本或別名。
LATEST
服務版本
函數計算的服務版本。
重要是否跨賬號/地域為否且版本或別名選擇指定版本時,顯示服務版本。
LATEST
服務別名
函數計算的服務別名。
重要是否跨賬號/地域為否且版本或別名選擇指定別名時,顯示服務別名。
jy
發送模式
消息發送模式。取值說明如下:
異步:推薦。
同步:不推薦。同步發送模式下,如果函數計算的處理時間較長,云消息隊列 Kafka 版的處理時間也會較長。當同一批次消息的處理時間超過5分鐘時,會觸發云消息隊列 Kafka 版客戶端Rebalance。
異步
發送批大小
批量發送的消息條數。默認為20。Connector根據發送批次大小和請求大小限制(同步請求大小限制為6 MB,異步請求大小限制為128 KB)將多條消息聚合后發送。例如,發送模式為異步,發送批次大小為20,如果要發送18條消息,其中有17條消息的總大小為127 KB,有1條消息的大小為200 KB,Connector會將總大小不超過128 KB的17條消息聚合后發送,將大小超過128 KB的1條消息單獨發送。
說明如果您在發送消息時將key設置為null,則請求中不包含key。如果將value設置為null,則請求中不包含value。
如果批量發送的多條消息的大小不超過請求大小限制,則請求中包含消息內容。請求示例如下:
[ { "key":"this is the message's key2", "offset":8, "overflowFlag":false, "partition":4, "timestamp":1603785325438, "topic":"Test", "value":"this is the message's value2", "valueSize":28 }, { "key":"this is the message's key9", "offset":9, "overflowFlag":false, "partition":4, "timestamp":1603785325440, "topic":"Test", "value":"this is the message's value9", "valueSize":28 }, { "key":"this is the message's key12", "offset":10, "overflowFlag":false, "partition":4, "timestamp":1603785325442, "topic":"Test", "value":"this is the message's value12", "valueSize":29 }, { "key":"this is the message's key38", "offset":11, "overflowFlag":false, "partition":4, "timestamp":1603785325464, "topic":"Test", "value":"this is the message's value38", "valueSize":29 } ]
如果發送的單條消息的大小超過請求大小限制,則請求中不包含消息內容。請求示例如下:
[ { "key":"123", "offset":4, "overflowFlag":true, "partition":0, "timestamp":1603779578478, "topic":"Test", "value":"1", "valueSize":272687 } ]
說明如需獲取消息內容,您需要根據位點主動拉取消息。
50
重試次數
消息發送失敗后的重試次數。默認為2。取值范圍為1~3。部分導致消息發送失敗的錯誤不支持重試。錯誤碼與是否支持重試的對應關系如下:
4XX:除429支持重試外,其余錯誤碼不支持重試。
5XX:支持重試。
說明Connector調用InvokeFunction向函數計算發送消息。
重試次數超出取值范圍之后,消息會進入死信隊列Topic。死信隊列Topic的消息不會再次觸發函數計算Connector任務,建議您給死信隊列Topic資源配置監控告警,實時監控資源狀態,及時發現并處理異常。
2
創建完成后,在Connector 任務列表頁面,查看創建的Connector 。
創建完成后,在Connector 任務列表頁面,找到創建的Connector ,單擊其操作列的部署。
如需配置函數計算資源,單擊其操作列的
,跳轉至函數計算控制臺完成操作。
發送測試消息
部署FC Sink Connector后,您可以向云消息隊列 Kafka 版的數據源Topic發送消息,測試數據能否被同步至函數計算。
在Connector 任務列表頁面,找到目標Connector,在其右側操作列,單擊測試。
在發送消息面板,發送測試消息。
發送方式選擇控制臺。
在消息 Key文本框中輸入消息的Key值,例如demo。
在消息內容文本框輸入測試的消息內容,例如 {"key": "test"}。
設置發送到指定分區,選擇是否指定分區。
單擊是,在分區 ID文本框中輸入分區的ID,例如0。如果您需查詢分區的ID,請參見查看分區狀態。
單擊否,不指定分區。
發送方式選擇Docker,執行運行 Docker 容器生產示例消息區域的Docker命令,發送消息。
發送方式選擇SDK,根據您的業務需求,選擇需要的語言或者框架的SDK以及接入方式,通過SDK發送消息。
查看函數日志
向云消息隊列 Kafka 版的數據源Topic發送消息后,查看函數日志,驗證是否收到消息。更多信息,請參見配置日志。
日志中顯示發送的測試消息。