在Knative上實現(xiàn)Kafka消息推送
Knative已支持Kafka事件源,您可將Knative與云消息隊列 Kafka 版對接,在Knative上實現(xiàn)Kafka消息推送。
前提條件
- 重要
云消息隊列 Kafka 版實例和Knative必須處于同一VPC內(nèi)。
云消息隊列 Kafka 版實例的版本必須為2.0.0或以上。
背景信息
Knative是一款基于Kubernetes的Serverless框架,其目標是制定云原生、跨平臺的Serverless編排標準。Knative主要包括:
Serving:服務系統(tǒng),用于配置應用的路由、升級策略、自動擴縮容等。
Eventing:事件系統(tǒng),用于自動完成事件的綁定和觸發(fā)。
要讓Eventing(事件系統(tǒng))正常運行,就必須在Knative集群中實現(xiàn)Channel(內(nèi)部事件存儲層),目前支持的Channel實現(xiàn)方式包括Kafka、NATS。本文以云消息隊列 Kafka 版為例介紹如何實現(xiàn)Channel。
適用場景
在線短任務處理
AI音視頻消息處理
監(jiān)控告警
數(shù)據(jù)格式轉(zhuǎn)換
操作流程
在Knative上實現(xiàn)云消息隊列 Kafka 版消息推送的操作流程如下圖所示。
部署Kafka組件
登錄容器服務控制臺。
在左側(cè)導航欄,單擊集群。
在集群列表頁面,單擊要部署Kafka組件的集群的名稱。
在左側(cè)導航欄,選擇 。
在組件管理頁簽下的add-on組件區(qū)域,找到Kafka,在其右側(cè)操作列,單擊部署。
在部署Kafka對話框,單擊確定。
部署完成后,Kafka右側(cè)的狀態(tài)顯示已部署。
創(chuàng)建event-display服務
繼續(xù)單擊服務管理頁簽。
在Knative服務管理頁面,從命名空間列表選擇default,然后在右側(cè)單擊使用模板創(chuàng)建。
在使用模板創(chuàng)建頁面:
從示例模板列表,選擇自定義。
在模板區(qū)域,輸入模板信息。
apiVersion: serving.knative.dev/v1 kind: Service metadata: name: event-display spec: template: metadata: annotations: autoscaling.knative.dev/minScale: "1" spec: containers: - image: registry.cn-hangzhou.aliyuncs.com/knative-sample/eventing-sources-cmd-event_display:bf45b3eb1e7fc4cb63d6a5a6416cf696295484a7662e0cf9ccdf5c080542c21d
單擊創(chuàng)建。
單擊返回。
創(chuàng)建完成后,event-display右側(cè)的狀態(tài)顯示成功。
創(chuàng)建kafka-source服務
創(chuàng)建KafkaSource服務的配置文件kafka-source.yaml。
apiVersion: sources.eventing.knative.dev/v1alpha1 kind: KafkaSource metadata: name: kafka-source spec: consumerGroup: demo-topic # Broker URL. Replace this with the URLs for your kafka cluster, # which is in the format of my-cluster-kafka-bootstrap.my-kafka-namespace:9092. bootstrapServers: alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092 topics: demo sink: apiVersion: serving.knative.dev/v1alpha1 kind: Service name: event-display
參數(shù)
說明
示例值
consumerGroup
創(chuàng)建的Group的名稱。
demo-consumer
bootstrapServers
云消息隊列 Kafka 版實例的默認接入點。
alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092
topics
創(chuàng)建的Topic的名稱。
demo-topic
執(zhí)行以下命令創(chuàng)建KafkaSource服務。
kubectl apply -f kafka-source.yaml
返回示例如下:
cb kafkasource.sources.knative.dev/kafka-source created
發(fā)送消息
在概覽頁面的資源分布區(qū)域,選擇地域。
在實例列表頁面,單擊目標實例名稱。
在左側(cè)導航欄,單擊Topic 管理。
在Topic 管理頁面,單擊目標Topic名稱進入Topic 詳情頁面,然后單擊體驗發(fā)送消息。
在快速體驗消息收發(fā)面板,發(fā)送測試消息。
發(fā)送方式選擇控制臺。
在消息 Key文本框中輸入消息的Key值,例如demo。
在消息內(nèi)容文本框輸入測試的消息內(nèi)容,例如 {"key": "test"}。
設置發(fā)送到指定分區(qū),選擇是否指定分區(qū)。
單擊是,在分區(qū) ID文本框中輸入分區(qū)的ID,例如0。如果您需查詢分區(qū)的ID,請參見查看分區(qū)狀態(tài)。
單擊否,不指定分區(qū)。
根據(jù)界面提示信息,通過SDK訂閱消息,或者執(zhí)行Docker命令訂閱消息。
發(fā)送方式選擇Docker,運行Docker容器。
執(zhí)行運行 Docker 容器生產(chǎn)示例消息區(qū)域的Docker命令,發(fā)送消息。
執(zhí)行發(fā)送后如何消費消息?區(qū)域的Docker命令,訂閱消息。
發(fā)送方式選擇SDK,根據(jù)您的業(yè)務需求,選擇需要的語言或者框架的SDK以及接入方式,通過SDK體驗消息收發(fā)。
結(jié)果驗證
發(fā)送消息后,通過kubectl logs
命令查看event-display服務的日志,確認event-display服務已接收到云消息隊列 Kafka 版發(fā)送的消息。