日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

創(chuàng)建MaxCompute Sink Connector

本文介紹如何創(chuàng)建使用MaxCompute Sink Connector,您可以通過MaxCompute Sink Connector將數(shù)據(jù)從云消息隊列 Kafka 版實例的數(shù)據(jù)源Topic導(dǎo)出至MaxCompute的表中。

前提條件

詳細(xì)步驟,請參見創(chuàng)建前提

注意事項

如需使用MaxCompute分區(qū)功能,創(chuàng)建表時需額外創(chuàng)建一個分區(qū)列,列名為time,類型為STRING。

步驟一:創(chuàng)建目標(biāo)資源

通過MaxCompute客戶端創(chuàng)建表。更多信息,請參見創(chuàng)建表。

本文以名稱為kafka_to_maxcompute的表為例。表中有3列數(shù)據(jù),并使用分區(qū)功能。該表的建表語句如下:

CREATE TABLE IF NOT EXISTS kafka_to_maxcompute(topic STRING,valueName STRING,valueAge BIGINT) PARTITIONED by (time STRING);

如不使用分區(qū)功能,語句如下:

CREATE TABLE IF NOT EXISTS kafka_to_maxcompute(topic STRING,valueName STRING,valueAge BIGINT);

執(zhí)行成功后,如下圖所示:執(zhí)行成果

表管理頁面,查看創(chuàng)建的表信息。表

步驟二:創(chuàng)建MaxCompute Sink Connector并啟動

  1. 登錄云消息隊列 Kafka 版控制臺,在概覽頁面的資源分布區(qū)域,選擇地域。

  2. 在左側(cè)導(dǎo)航欄,選擇Connector生態(tài)集成 > 任務(wù)列表

  3. 任務(wù)列表頁面,單擊創(chuàng)建任務(wù)。

  4. 創(chuàng)建任務(wù)面板,設(shè)置任務(wù)名稱描述,配置以下參數(shù)。

    • 任務(wù)創(chuàng)建

      1. Source(源)配置向?qū)Вx擇數(shù)據(jù)提供方消息隊列 Kafka 版,設(shè)置以下參數(shù),然后單擊下一步

        參數(shù)

        說明

        示例

        地域

        源Kafka實例所在的地域。

        華東1(杭州)

        kafka實例

        數(shù)據(jù)源所在的Kafka實例ID。

        alikafka_post-cn-9hdsbdhd****

        Topic

        數(shù)據(jù)源所在的Kafka實例Topic。

        guide-sink-topic

        Group ID

        數(shù)據(jù)源所在的Kafka實例中的Group ID。

        • 快速創(chuàng)建:自動創(chuàng)建以GID_EVENTBRIDGE_xxx命名的Group ID。

        • 使用已有:選擇已創(chuàng)建的Group,請選擇獨立的Group ID,不要和已有的業(yè)務(wù)混用,以免影響已有的消息收發(fā)。

        使用已有

        消費位點

        • 最新位點:從最新位點開始消費。

        • 最早位點:從最初位點開始消費。

        最新位點

        網(wǎng)絡(luò)配置

        有跨境傳輸數(shù)據(jù)需求時選擇自建公網(wǎng),其他情況可選擇基礎(chǔ)網(wǎng)絡(luò)。

        基礎(chǔ)網(wǎng)絡(luò)

        數(shù)據(jù)格式

        數(shù)據(jù)格式是針對支持二進(jìn)制傳遞的數(shù)據(jù)源端推出的指定內(nèi)容格式的編碼能力。支持多種數(shù)據(jù)格式編碼,如無特殊編碼訴求可將格式設(shè)置為Json。

        • Json(默認(rèn)Json格式編碼,二進(jìn)制數(shù)據(jù)按照utf-8 編碼為Json格式放入Payload。)

        • Text(文本格式編碼,二進(jìn)制數(shù)據(jù)按照utf-8編碼為字符串放入Payload。)

        • Binary(二進(jìn)制格式編碼,二進(jìn)制數(shù)據(jù)按照Base64編碼為字符串放入Payload。)

        Json

        批量推送條數(shù)

        調(diào)用函數(shù)發(fā)送的最大批量消息條數(shù),當(dāng)積壓的消息數(shù)量到達(dá)設(shè)定值時才會發(fā)送請求,取值范圍為 [1,10000]。

        2000

        批量推送間隔(單位:秒)

        調(diào)用函數(shù)的間隔時間,系統(tǒng)每到間隔時間點會將消息聚合后發(fā)給函數(shù)計算,取值范圍為[0,15],單位為秒。0秒表示無等待時間,直接投遞。

        3

      2. Filtering(過濾)配置向?qū)?,定義數(shù)據(jù)模式過濾發(fā)送的請求。更多信息,請參見事件模式

      3. Transform(轉(zhuǎn)換)配置向?qū)ВO(shè)置數(shù)據(jù)清洗,實現(xiàn)分割、映射、富化及動態(tài)路由等繁雜數(shù)據(jù)加工能力。更多信息,請參見使用函數(shù)計算實現(xiàn)消息數(shù)據(jù)清洗。

      4. Sink(目標(biāo))配置向?qū)?,選擇服務(wù)類型大數(shù)據(jù)計算服務(wù) acs.maxcompute,配置以下參數(shù)。

        參數(shù)

        說明

        示例

        賬號AccessKey ID

        阿里云賬號的AccessKey ID,用于訪問MaxCompute服務(wù)。

        LTAI5tHPVCZywsoEVvFu****

        賬號AccessKey Secret

        阿里云賬號的AccessKey Secret。

        4RAKUQpZtUntDgvoKu0tvrkrOM****

        MaxCompute項目名稱

        選擇已創(chuàng)建的MaxCompute項目。

        test_compute

        MaxCompute表名稱

        選擇已創(chuàng)建的MaxCompute表。

        kafka_to_maxcompute

        MaxCompute表入?yún)?/b>

        選擇MaxCompute表后,此處會展示表的列名和類型信息,配置數(shù)據(jù)提取規(guī)則即可。下面是一條消息示例,本示例中Topic列對應(yīng)的值從Topic字段提取,則定義數(shù)據(jù)提取規(guī)則$.topic。

        {
          'data': {
            'topic': 't_test',
            'partition': 2,
            'offset': 1,
            'timestamp': 1717048990499,
            'headers': {
              'headers': [],
              'isReadOnly': False
            },
            'key': 'MaxCompute-K1',
            'value': 'MaxCompute-V1'
          },
          'id': '9b05fc19-9838-4990-bb49-ddb942307d3f-2-1',
          'source': 'acs:alikafka',
          'specversion': '1.0',
          'type': 'alikafka:Topic:Message',
          'datacontenttype': 'application/json; charset=utf-8',
          'time': '2024-05-30T06:03:10.499Z',
          'aliyunaccountid': '1413397765616316'
        }

        topic:$.data.topic

        valuename:$.data.value

        valueage:$.data.offset

        是否開啟分區(qū)能力

        • 開啟:開啟分區(qū)能力。

        • 關(guān)閉:不開啟分區(qū)能力。

        開啟

        分區(qū)配置

        僅當(dāng)是否開啟分區(qū)能力參數(shù)設(shè)置為開啟時需配置此參數(shù)。

        • 支持{yyyy}、{MM}、{dd}、{HH}、{mm}時間變量參數(shù),分別對應(yīng)年、月、日、時、分。時間變量大小寫敏感。

        • 支持填寫常量。

        {yyyy}-{MM}-{dd}.{HH}:{mm}.suffix

        網(wǎng)絡(luò)配置

        • 專有網(wǎng)絡(luò):通過專有網(wǎng)絡(luò)VPC將Kafka消息投遞到MaxCompute。

        • 公網(wǎng):通過公網(wǎng)將Kafka消息投遞到MaxCompute。

        公網(wǎng)

        VPC

        選擇VPC ID。僅當(dāng)網(wǎng)絡(luò)配置專有網(wǎng)絡(luò)時需要配置此參數(shù)。

        vpc-bp17fapfdj0dwzjkd****

        交換機(jī)

        選擇vSwitch ID。僅當(dāng)網(wǎng)絡(luò)配置專有網(wǎng)絡(luò)時需要配置此參數(shù)。

        vsw-bp1gbjhj53hdjdkg****

        安全組

        選擇安全組。僅當(dāng)網(wǎng)絡(luò)配置專有網(wǎng)絡(luò)時需要配置此參數(shù)。

        test_group

    • 任務(wù)屬性

      配置事件推送失敗時的重試策略及錯誤發(fā)生時的處理方式。更多信息,請參見重試和死信。

  5. 完成上述配置后,單擊保存。在任務(wù)列表頁面,找到剛創(chuàng)建的MaxCompute Sink Connector任務(wù),此時狀態(tài)欄為啟動中,當(dāng)狀態(tài)變?yōu)?b data-tag="uicontrol" id="uicontrol-iam-3og-um9" class="uicontrol">運行中時,Connector創(chuàng)建成功。

步驟三:測試MaxCompute Sink Connector

  1. 任務(wù)列表頁面,在MaxCompute Sink Connector任務(wù)的事件源列單擊源Topic。

  2. 在Topic詳情頁面,單擊體驗發(fā)送消息。

  3. 快速體驗消息收發(fā)面板,按照下圖配置消息內(nèi)容,然后單擊確定。

    發(fā)送消息

  4. 進(jìn)入MaxCompute控制臺,執(zhí)行以下SQL語句查看分區(qū)信息。

    show PARTITIONS kafka_to_maxcompute;

    查詢結(jié)果如下所示:分區(qū)

  5. 根據(jù)分區(qū)信息,執(zhí)行以下語句,查看分區(qū)內(nèi)數(shù)據(jù)。

    SELECT * FROM kafka_to_maxcompute WHERE time="2024-05-31.16:37.suffix";

    查詢結(jié)果如下所示:分區(qū)內(nèi)數(shù)據(jù)