日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

使用Flink寫入數據到Delta Table

當前MaxCompute為您提供了新版的Flink Connector插件,新版插件支持將Flink數據寫入至MaxCompute的普通表和Delta Table類型表,提高了Flink數據寫入MaxCompute的便捷性。本文為您介紹新版Flink Connector寫入MaxCompute的能力支持情況與主要操作流程。

背景信息

  • 支持的寫入模式:

    使用新版Flink Connector將數據寫入MaxCompute時,支持通過Upsert和Insert寫入方式。其中使用Upsert時支持以下兩種數據寫入流:

    • 按照Primary Key進行分組

    • 按照分區字段進行分組

      若分區數量過多,您可以按照分區字段進行分組,但使用該流程可能導致數據傾斜。

  • Upsert模式下,通過Flink Connector進行數據寫入流程和參數配置建議,詳情請參見數據實時入倉實踐。

  • 您可在配置Flink數據寫入MaxCompute時,通過設置Flink Connector參數指定使用哪種寫入方式,全量Connector參數介紹請參見下文的附錄:新版Flink Connector全量參數。

  • Flink Upsert寫入任務的Checkpoint間隔建議設置3分鐘以上,設置太小的話,寫入效率得不到保障,并且可能引入大量小文件。

  • MaxCompute與實時計算Flink版的字段類型對照關系如下:

    Flink 數據類型

    MaxCompute 數據類型

    CHAR(p)

    CHAR(p)

    VARCHAR(p)

    VARCHAR(p)

    STRING

    STRING

    BOOLEAN

    BOOLEAN

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    INT

    INT

    BIGINT

    LONG

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    DECIMAL(p, s)

    DECIMAL(p, s)

    DATE

    DATE

    TIMESTAMP(9) WITHOUT TIME ZONE、TIMESTAMP_LTZ(9)

    TIMESTAMP

    TIMESTAMP(3) WITHOUT TIME ZONE、TIMESTAMP_LTZ(3)

    DATETIME

    BYTES

    BINARY

    ARRAY<T>

    LIST<T>

    MAP<K, V>

    MAP<K, V>

    ROW

    STRUCT

    說明

    Flink的TIMESTAMP數據類型不含時區,MaxCompute TIMESTAMP數據類型含時區。此差異會導致8小時的時間差。其通過使用TIMESTAMP_LTZ(9)來對齊時間戳。

    --FlinkSQL
    CREATE TEMPORARY TABLE odps_source(
      id BIGINT NOT NULL COMMENT 'id',
      created_time TIMESTAMP NOT NULL COMMENT '創建時間',
      updated_time TIMESTAMP_LTZ(9) NOT NULL COMMENT '更新時間',
    PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
    'connector' = 'maxcompute',
    ...
    );

Flink數據寫入MaxCompute流程:自建開源Flink

  1. 準備工作:創建MaxCompute表。

    您需先創建好MaxCompute表,用于后續Flink數據寫入。以下以創建兩張表(Delta Table非分區表和分區表)作為示例,為您演示Flink數據寫入MaxCompute的主要流程,其中表屬性設置請參考Delta Table表參數

    --創建Delta Table非分區表
    CREATE TABLE mf_flink_tt (
      id BIGINT not null,
      name STRING,
      age INT,
      status BOOLEAN, primary key (id)
    )
    tblproperties ("transactional"="true", 
                   "write.bucket.num" = "64", 
                   "acid.data.retain.hours"="12") ;
    
    --創建Delta Table分區表
    CREATE TABLE mf_flink_tt_part (
      id BIGINT not null,
      name STRING,
      age INT,
      status BOOLEAN, 
      primary key (id)
    )
      partitioned by (dd string, hh string) 
      tblproperties ("transactional"="true", 
                     "write.bucket.num" = "64", 
                     "acid.data.retain.hours"="12") ;
    
  2. 搭建開源Flink集群。當前支持1.13、1.15、1.16和1.17版本的開源Flink,您可以選擇對應版本的Flink:

    說明
    • Flink 1.17版本可復用1.16版本。

    • 本文以Flink Connector 1.13為例,將包下載至本地環境,下載完成后進行解壓。

  3. 下載Flink Connector并添加至Flink集群包中。

    1. 將Flink Connector Jar包下載至本地環境。

    2. 將Flink Connector Jar包添加至解壓后的Flink安裝包的lib目錄中。

      mv flink-connector-odps-1.13-shaded.jar $FLINK_HOME/lib/flink-connector-odps-1.13-shaded.jar
  4. 啟動Flink實例服務。

    cd $FLINK_HOME/bin
    ./start-cluster.sh
  5. 啟動Flink客戶端。

    cd $FLINK_HOME/bin
    ./sql-client.sh
  6. 創建Flink表,并配置Flink Connector參數。

    當前支持直接使用Flink SQL創建Flink表并配置參數,也支持使用Flink的DataStream API進行相關操作。兩種操作的核心示例如下。

    使用Flink SQL

    1. 進入Flink SQL的編輯界面,執行以下命令完成建表與參數配置。

      -- 在 Flink SQL中注冊一張對應的非分區表
      CREATE TABLE mf_flink (
        id BIGINT,
        name STRING,
        age INT,
        status BOOLEAN,
        PRIMARY KEY(id) NOT ENFORCED
      ) WITH (
        'connector' = 'maxcompute',
        'table.name' = 'mf_flink_tt',
        'sink.operation' = 'upsert',
      	'odps.access.id'='LTAI5tRzd4W8cTyLZKT****',
        'odps.access.key'='gJwKaF3hK9MDAQgbO0zs****',
      	'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api',
      	'odps.project.name'='mf_mc_bj'
      );
      
      -- 在 Flink SQL 中注冊一張對應的分區表
      CREATE TABLE mf_flink_part (
        id BIGINT,
        name STRING,
        age INT,
        status BOOLEAN,
        dd STRING,
        hh STRING,
        PRIMARY KEY(id) NOT ENFORCED
      ) PARTITIONED BY (`dd`,`hh`)
      WITH (
        'connector' = 'maxcompute',
        'table.name' = 'mf_flink_tt_part',
        'sink.operation' = 'upsert',
      	'odps.access.id'='LTAI5tRzd4W8cTyLZKT****',
        'odps.access.key'='gJwKaF3hK9MDAQgbO0zs*******',
      	'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api',
      	'odps.project.name'='mf_mc_bj'
      );
    2. 向Flink表中寫入數據,并在MaxCompute表中查詢,驗證Flink數據寫入MaxCompute的結果。

      --在flink Sql客戶端中往非分區表里插入數據
      INSERT INTO mf_flink VALUES (1,'Danny',27, false);
      
      --在Maxcompute中查詢返回
      SELECT * FROM mf_flink_tt;
      +------------+------+------+--------+
      | id         | name | age  | status |
      +------------+------+------+--------+
      | 1          | Danny | 27   | false  |
      +------------+------+------+--------+
      
      --在flink Sql客戶端中往非分區表里插入數據
      INSERT INTO mf_flink VALUES (1,'Danny',28, false);
      --在Maxcompute中查詢返回
      SELECT * FROM mf_flink_tt;
      +------------+------+------+--------+
      | id         | name | age  | status |
      +------------+------+------+--------+
      | 1          | Danny | 28   | false  |
      +------------+------+------+--------+
      
      --在flink Sql客戶端中往分區表里插入數據
      INSERT INTO mf_flink_part VALUES (1,'Danny',27, false, '01','01');
      --在Maxcompute中查詢返回
      SELECT * FROM mf_flink_tt_part WHERE dd=01 AND hh=01;
      +------------+------+------+--------+----+----+
      | id         | name | age  | status | dd | hh |
      +------------+------+------+--------+----+----+
      | 1          | Danny | 27   | false  | 01 | 01 |
      +------------+------+------+--------+----+----+
      
      --在flink Sql客戶端中分區表里插入數據
      INSERT INTO mf_flink_part VALUES (1,'Danny',30, false, '01','01');
      --在Maxcompute中查詢返回
      SELECT * FROM mf_flink_tt_part WHERE dd=01 AND hh=01;
      +------------+------+------+--------+----+----+
      | id         | name | age  | status | dd | hh |
      +------------+------+------+--------+----+----+
      | 1          | Danny | 30   | false  | 01 | 01 |
      +------------+------+------+--------+----+----+

    使用DataStream API

    1. 使用DataStream接口時,需先添加以下依賴。

      <dependency>
        <groupId>com.aliyun.odps</groupId>
        <artifactId>flink-connector-maxcompute</artifactId>
                  <version>xxx</version>
                  <scope>system</scope>
                  <systemPath>${mvn_project.basedir}/lib/flink-connector-maxcompute-xxx-shaded.jar</systemPath>
      </dependency>
      說明

      使用時請將“xxx”改為相應的版本號。

    2. 建表與參數配置的示例代碼如下。

      package com.aliyun.odps.flink.examples;
      
      import org.apache.flink.configuration.Configuration;
      import org.apache.flink.odps.table.OdpsOptions;
      import org.apache.flink.odps.util.OdpsConf;
      import org.apache.flink.odps.util.OdpsPipeline;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.table.api.Table;
      import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
      import org.apache.flink.table.data.RowData;
      
      public class Examples {
      
          public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.enableCheckpointing(120 * 1000);
      
              StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env);
      
              Table source = streamTableEnvironment.sqlQuery("SELECT * FROM source_table");
              DataStream<RowData> input = streamTableEnvironment.toAppendStream(source, RowData.class);
      
              Configuration config = new Configuration();
              config.set(OdpsOptions.SINK_OPERATION, "upsert");
              config.set(OdpsOptions.UPSERT_COMMIT_THREAD_NUM, 8);
              config.set(OdpsOptions.UPSERT_MAJOR_COMPACT_MIN_COMMITS, 100);
      
              OdpsConf odpsConfig = new OdpsConf("accessid",
                      "accesskey",
                      "endpoint",
                      "project",
                      "tunnel endpoint");
      
              OdpsPipeline.Builder builder = OdpsPipeline.builder();
              builder.projectName("sql2_isolation_2a")
                      .tableName("user_ledger_portfolio")
                      .partition("")
                      .configuration(config)
                      .odpsConf(odpsConfig)
                      .sink(input, false);
              env.execute();
          }
      }

Flink數據寫入MaxCompute流程:阿里云全托管Flink

  1. 準備工作:創建MaxCompute表。

    您需先創建好MaxCompute表,用于后續Flink數據寫入。以下以創建一張Delta Table表為例。

    SET odps.sql.type.system.odps2=true;
    DROP TABLE mf_flink_upsert;
    CREATE TABLE mf_flink_upsert (
      c1 int not null, 
      c2 string, 
      gt timestamp,
      primary key (c1)
    ) 
      PARTITIONED BY (ds string)
      tblproperties ("transactional"="true",
                     "write.bucket.num" = "64", 
                     "acid.data.retain.hours"="12") ;
  2. 登錄實時計算控制臺,查看Flink Connector信息,Flink Connector已經加載到阿里云全托管Flink VVP上。

  3. 通過Flink SQL作業創建Flink表,并構造Flink實時數據,完成作業開發后進行作業部署。

    在Flink的作業開發頁面,創建并編輯Flink SQL作業,以下示例為新建一張Flink數據源表、一張Flink臨時結果表,并自動構建實時數據生成邏輯寫入源表,通過計算邏輯將源表數據寫入臨時結果表。SQL作業開發詳細操作請參見SQL作業開發

    --創建flink數據源表,
    CREATE TEMPORARY TABLE fake_src_table
    (
        c1 int,
        c2 VARCHAR,
        gt AS CURRENT_TIMESTAMP
    ) WITH (
      'connector' = 'faker',
      'fields.c2.expression' = '#{superhero.name}',
      'rows-per-second' = '100',
      'fields.c1.expression' = '#{number.numberBetween ''0'',''1000''}'
    );
    
    --flink創建臨時結果表
    CREATE TEMPORARY TABLE test_c_d_g 
    (
        c1 int,
        c2 VARCHAR,
        gt TIMESTAMP,
        ds varchar,
        PRIMARY KEY(c1) NOT ENFORCED
     ) PARTITIONED BY(ds)
     WITH (
        		'connector' = 'maxcompute',
        		'table.name' = 'mf_flink_upsert',
        		'sink.operation' = 'upsert',
        		'odps.access.id'='LTAI5tRzd4W8cTyL****',
        		'odps.access.key'='gJwKaF3hK9MDAQgb**********',
        		'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api',
        		'odps.project.name'='mf_mc_bj',
        		'upsert.write.bucket.num'='64'
    );
    
    --flink 計算邏輯
    INSERT INTO test_c_d_g
    SELECT  c1 AS c1,
            c2 AS c2,
            gt AS gt,
            date_format(gt, 'yyyyMMddHH') AS ds
    FROM    fake_src_table;

    其中:

    odps.end.point:使用對應Region的云產品互聯網絡Endpoint。

    upsert.write.bucket.num:與MaxCompute中創建的Delta Table表的write.bucket.num屬性值保持一致。

  4. 在MaxCompute中查詢數據,驗證Flink數據寫入MaxCompute的結果。

    SELECT * FROM mf_flink_upsert WHERE ds=2023061517;
    
    --返回,由于Flink中的數據為隨機生成,實際MaxCompute查詢結果與本示例不一定完全一致
    +------+----+------+----+
    | c1   | c2 | gt   | ds |
    +------+----+------+----+
    | 0    | Skaar | 2023-06-16 01:59:41.116 | 2023061517 |
    | 21   | Supah Century | 2023-06-16 01:59:59.117 | 2023061517 |
    | 104  | Dark Gorilla Grodd | 2023-06-16 01:59:57.117 | 2023061517 |
    | 126  | Leader | 2023-06-16 01:59:39.116 | 2023061517 |
    

附錄:新版Flink Connector全量參數

  • 基礎參數

    參數

    是否必填

    默認值

    說明

    connector

    無默認值

    Connector類型,需設置為MaxCompute。

    odps.project.name

    無默認值

    MaxCompute的Project名稱。

    odps.access.id

    無默認值

    您的阿里云賬號AccessKey ID。您可以在訪問憑證頁面查看對應信息。

    odps.access.key

    無默認值

    您的阿里云賬號AccessKey Secret。您可以在訪問憑證頁面查看對應信息。

    odps.end.point

    無默認值

    MaxCompute的Endpoint信息。各地域的MaxCompute Endpoint請參見Endpoint

    odps.tunnel.end.point

    Tunnel服務的公網訪問鏈接。如果您未配置Tunnel Endpoint,Tunnel會自動路由到MaxCompute服務所在網絡對應的Tunnel Endpoint。如果您配置了Tunnel Endpoint,則以配置為準,不進行自動路由。

    各地域及網絡對應的Tunnel Endpoint值,請參見Endpoint

    odps.tunnel.quota.name

    無默認值

    訪問MaxCompute使用的Tunnel Quota名稱。

    table.name

    無默認值

    MaxCompute表名稱,格式為[project.][schema.]table。

    odps.namespace.schema

    false

    是否使用三層模型。關于三層模型介紹,請參見Schema操作。

    sink.operation

    insert

    寫入類型,取值為insertupsert

    說明

    僅MaxCompute Delta Table支持Upsert寫入。

    sink.parallelism

    無默認值

    寫入的并行度,如果不設置,則默認使用上游數據并行度。

    說明

    請務必確保表屬性 write.bucket.num 是該配置值的整數倍,這樣可以獲得最佳的寫入性能,并且能夠最有效地節省 Sink 節點內存。

    sink.meta.cache.time

    400

    元數據緩存大小。

    sink.meta.cache.expire.time

    1200

    元數據緩存超時時間,單位:秒(s)。

    sink.coordinator.enable

    是否開啟Coordinator模式。

  • 分區參數

    參數

    是否必填

    默認值

    說明

    sink.partition

    無默認值

    待寫入的分區名稱。

    若您使用的是動態分區,則為動態分區的上級分區名稱。

    sink.partition.default-value

    __DEFAULT_PARTITION__

    使用動態分區時的默認分區名稱。

    sink.dynamic-partition.limit

    100

    動態分區寫入時,單個Checkpoint可同時導入的最大分區數量。

    說明

    建議不要大幅提升該參數值,因為當同時寫入的分區數量過多時,容易導致Sink節點內存溢出(OOM),且當并發寫入分區數超過閾值,寫入任務會報錯。

    sink.group-partition.enable

    false

    動態分區寫入時,是否按照分區進行分組。

    sink.partition.assigner.class

    無默認值

    PartitionAssigner實現類。

  • FileCached模式寫入參數

    當動態分區數量過多時,可以使用文件緩存模式,您可以通過以下參數配置數據寫入時緩存文件信息。

    參數

    是否必配

    默認值

    說明

    sink.file-cached.enable

    false

    是否開啟FileCached寫入。取值說明:

    • false:不開啟。

    • true:開啟。

      說明

      當動態分區數量過多時,可以使用文件緩存模式。

    sink.file-cached.tmp.dirs

    ./local

    文件緩存模式下,默認文件緩存目錄。

    sink.file-cached.writer.num

    16

    文件緩存模式下,單個Task上傳數據的并發數。

    說明

    建議不要大幅提升該參數值,因為當同時寫入的分區數量過多時,容易導致內存溢出(OOM)。

    sink.bucket.check-interval

    60000

    文件緩存模式下,檢查文件大小的周期,單位:毫秒(ms)。

    sink.file-cached.rolling.max-size

    16 M

    文件緩存模式下,單個緩存文件的最大值。

    若文件大小超過該值,會將該文件數據上傳到服務端。

    sink.file-cached.memory

    64 M

    文件緩存模式下,寫入文件使用的最大堆外內存大小。

    sink.file-cached.memory.segment-size

    128 KB

    文件緩存模式下,寫入文件的使用的buffer大小。

    sink.file-cached.flush.always

    true

    文件緩存模式下,寫入文件是否使用緩存。

    sink.file-cached.write.max-retries

    3

    文件緩存模式下,上傳數據的重試次數。

  • InsertUpsert寫入參數

    Upsert寫入參數

    參數

    是否必填

    默認值

    說明

    upsert.writer.max-retries

    3

    Upsert Writer寫入Bucket失敗后,重試次數。

    upsert.writer.buffer-size

    64 m

    單個Upsert Writer數據在Flink中的緩存大小。

    說明
    • 當所有Bucket的緩沖區大小總和達到預設閾值時,系統將自動觸發刷新操作,將數據更新到服務器端。

    • 一個upsert writer里會同時寫入多個Bucket,建議提高該值,以提升寫入效率。

    • 若寫入分區較多時,會存在引發內存OOM風險,可考慮降低該參數值。

    upsert.writer.bucket.buffer-size

    1 m

    單個Bucket數據在Flink中的緩存大小,當Flink服務器使用內存資源緊張時,可以減小該參數值。

    upsert.write.bucket.num

    寫入表的bucket數量,必須與寫入表write.bucket.num值一致。

    upsert.write.slot-num

    1

    單個Session使用Tunnel slot數量。

    upsert.commit.max-retries

    3

    Upsert Session Commit重試次數。

    upsert.commit.thread-num

    16

    Upsert Session Commit的并行度。

    不建議將此參數值調整得過大,因為當同時進行的提交并發數越多時,會導致資源消耗增加,可能導致性能問題或資源過度消耗。

    upsert.major-compact.min-commits

    100

    發起Major Compact的最小Commit次數。

    upsert.commit.timeout

    600

    Upsert Session Commit等待超時時間,單位:秒(s)。

    upsert.major-compact.enable

    false

    是否開啟Major Compact。

    upsert.flush.concurrent

    2

    限制單個分區允許同時寫入的最大Bucket數。

    說明

    每當一個bucket的數據刷新時,將會占用一個Tunnel Slot資源。

    說明

    Upsert寫入時,參數配置建議詳情,請參見Upsert寫入參數配置建議。

    Insert寫入參數

    參數

    是否必配

    默認值

    說明

    insert.commit.thread-num

    16

    Commit Session的并行度。

    insert.arrow-writer.enable

    false

    是否使用Arrow格式。

    insert.arrow-writer.batch-size

    512

    Arrow Batch的最大行數。

    insert.arrow-writer.flush-interval

    100000

    Writer Flush間隔,單位毫秒(ms)。

    insert.writer.buffer-size

    64 M

    使用Buffered Writer的緩存大小。