日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

云數(shù)據(jù)庫RDS MySQL版

本文為您介紹如何使用云數(shù)據(jù)庫RDS MySQL版連接器。

RDS MySQL基于阿里巴巴的MySQL源碼分支,經(jīng)過雙十一高并發(fā)、大數(shù)據(jù)量的考驗,擁有優(yōu)良的性能。RDS MySQL支持實例管理、賬號管理、數(shù)據(jù)庫管理、備份恢復、白名單、透明數(shù)據(jù)加密以及數(shù)據(jù)遷移等基本功能。RDS MySQL詳情請參見RDS MySQL云數(shù)據(jù)庫

重要

后續(xù)將計劃不再支持云數(shù)據(jù)庫RDS MySQL版連接器,建議您直接使用MySQL連接器。使用MySQL連接器請參見MySQL

RDS MySQL連接器支持的信息如下。

類別

詳情

支持類型

結(jié)果表和維表

運行模式

流模式與批模式

數(shù)據(jù)格式

暫不適用

特有監(jiān)控指標

  • 維表:無

  • 結(jié)果表

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

    • numRecordsOutErrors

說明

指標含義詳情,請參見監(jiān)控指標說明。

API種類

SQL

是否支持更新或刪除結(jié)果表數(shù)據(jù)

前提條件

使用限制

  • 僅Flink計算引擎VVR 2.0.0及以上版本支持RDS MySQL連接器。

  • 僅支持阿里云RDS MySQL云數(shù)據(jù)庫。

  • 語義上可以保證At-Least-Once,在結(jié)果表有主鍵的情況下,冪等可以保證數(shù)據(jù)的正確性。

  • 推薦您使用最新版本的Flink(例如6.x以上),以獲取最新的性能與穩(wěn)定性優(yōu)化。

注意事項

RDS MySQL連接器后續(xù)會逐步下線,建議您在功能滿足的前提下使用MySQL連接器。詳情請參見MySQL。

語法結(jié)構(gòu)

  • 結(jié)果表

    CREATE TABLE rds_sink(
      id INT,
      num BIGINT,
      PRIMARY KEY(id) NOT ENFORCED
    ) WITH (
      'connector'='rds',
      'tableName'='your-table-name',
      'userName'='your-user-name',
      'password'='your-password',
      'url'='your-url'
    );
    說明
    • Flink RDS連接器寫入數(shù)據(jù)庫結(jié)果表原理:針對Flink Sink輸出數(shù)據(jù),拼接成一行SQL語句,然后執(zhí)行。對于沒有主鍵的結(jié)果表,會執(zhí)行INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...);語句。對于包含主鍵的結(jié)果表,會執(zhí)行INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;語句。請注意:如果物理表存在除主鍵外的唯一索引約束,當插入兩條主鍵不同但唯一索引相同的記錄時,下游數(shù)據(jù)會因為唯一索引沖突導致數(shù)據(jù)覆蓋引發(fā)數(shù)據(jù)丟失。

    • 如果在RDS MySQL云數(shù)據(jù)庫定義了自增主鍵,在Flink DDL中不應該聲明該自增字段。數(shù)據(jù)寫入過程中,數(shù)據(jù)庫會自動填補該自增字段。Flink RDS連接器僅支持寫入和刪除帶自增字段的數(shù)據(jù),不支持更新。

  • 維表

    CREATE TABLE rds_dim(
     id1 INT,
     id2 VARCHAR
    ) WITH (
      'connector'='rds',
      'tableName'='your-table-name',
      'userName'='your-user-name',
      'password'='your-password',
      'url'='your-url'
      'cache'='NONE'
    );

WITH參數(shù)

  • 通用

    參數(shù)

    說明

    數(shù)據(jù)類型

    是否必填

    默認值

    備注

    connector

    表類型。

    String

    無。

    固定值為rds。

    tableName

    表名。

    String

    無。

    無。

    userName

    用戶名。

    String

    無。

    無。

    password

    密碼。

    String

    無。

    無。

    url

    表地址。

    String

    無。

    RDS MySQL云數(shù)據(jù)庫專有網(wǎng)絡VPC地址,即內(nèi)網(wǎng)地址,詳情請?參見查看或修改內(nèi)外網(wǎng)地址和端口。

    URL的格式為:jdbc:mysql://<內(nèi)網(wǎng)地址>:<端口號>/<數(shù)據(jù)庫名稱>

    說明

    對于結(jié)果表,建議在URL后面加上參數(shù)?rewriteBatchedStatements=true,以提高系統(tǒng)性能。

    maxRetryTimes

    查詢維表或者寫數(shù)據(jù)到結(jié)果表失敗后,最多重試次數(shù)。

    Integer

    • 在Flink計算引擎VVR 4.0.7及以上版本,該參數(shù)默認值為10。

    • 在Flink計算引擎VVR 4.0.6及以下版本,該參數(shù)默認值為3。

    無。

  • 結(jié)果表獨有

    參數(shù)

    說明

    數(shù)據(jù)類型

    是否必填

    默認值

    備注

    batchSize

    一次批量寫入的條數(shù)。

    Integer

    • 在Flink計算引擎VVR 4.0.7及以上版本,該參數(shù)默認值為4096。

    • 在Flink計算引擎VVR 4.0.0~4.0.6版本,該參數(shù)默認值為5000。

    • 在Flink計算引擎VVR 3.x版本及以下版本,該參數(shù)默認值為100。

    無。

    bufferSize

    內(nèi)存中緩存的最大數(shù)據(jù)條數(shù)。batchSize或 bufferSize任一到達閾值都會觸發(fā)數(shù)據(jù)寫操作。

    Integer

    10000

    • 僅Flink計算引擎VVR 4.0.7及以上版本支持該參數(shù)。

    • 需指定主鍵后,該參數(shù)才生效。

    flushIntervalMs

    flush內(nèi)存緩沖區(qū)的時間間隔。表示如果緩存中的數(shù)據(jù)在等待指定時間后,依然沒有達到輸出條件(batchSize或bufferSize),系統(tǒng)會自動寫出緩存中的所有數(shù)據(jù)到結(jié)果表。

    Integer

    • 在Flink計算引擎VVR 4.0.7及以上版本,該參數(shù)默認值為2000。

    • 在Flink計算引擎VVR 4.0.0~4.0.6版本,該參數(shù)默認值為0。

    • 在Flink計算引擎VVR 3.x版本及以下版本,該參數(shù)默認值為1000。

    在默認值為0的版本中,如果不配置該參數(shù),可能導致少量數(shù)據(jù)永遠無法寫出到結(jié)果表。建議您采用更高版本的Flink。

    ignoreDelete

    是否忽略數(shù)據(jù)Delete操作。

    Boolean

    false

    Flink SQL可能會生成數(shù)據(jù)Delete操作,在多個輸出節(jié)點根據(jù)主鍵同時更新同一張結(jié)果表的不同字段的場景下,可能導致數(shù)據(jù)結(jié)果不正確。

    例如一個任務在刪除了一條數(shù)據(jù)后,另一個任務又只更新了這條數(shù)據(jù)的部分字段,其余未被更新的字段由于被刪除,其值會變成null或默認值。通過將ignoreDelete設置為true,可以避免數(shù)據(jù)刪除操作。

    connectionMaxActive

    數(shù)據(jù)庫連接池大小

    Integer

    40

    • 僅Flink計算引擎VVR 4.0.7及以上版本支持該參數(shù)。

    • 如果出現(xiàn)獲取連接超時的問題,可能是連接池不夠用,可適當增大連接池的大小。

    • 如果數(shù)據(jù)庫能支持的最大并發(fā)連接比較小,可適當減小連接池大小或者減小作業(yè)節(jié)點并行度。

  • 維表獨有

    參數(shù)

    說明

    數(shù)據(jù)類型

    是否必填

    默認值

    備注

    cache

    維表緩存策略。

    String

    • 在Flink計算引擎VVR 4.0.6之前版本,緩存策略默認值為NONE。

    • 在Flink計算引擎VVR 4.0.6及以上版本,緩存策略的默認值為ALL。

    Flink RDS MySQL連接器支持None、LRU和ALL三種緩存策略,取值含義詳情請參見背景信息。

    cacheSize

    緩存大小。

    Integer

    100000

    • 當選擇LRU緩存策略后,需要設置緩存大小。

    • 當選擇NONE或ALL緩存策略時,不必設置緩存大小。

    cacheTTLMs

    緩存超時時間。

    Long

    • 如果cache配置為NONE,則cacheTTLMs可以不配置,表示緩存不超時。

    • 如果cache配置為LRU,則cacheTTLMs為緩存超時時間。默認不過期。

    • 如果cache配置為ALL,則cacheTTLMs為緩存加載時間。默認不重新加載。

    單位為毫秒。

    maxJoinRows

    主表中每一條數(shù)據(jù)查詢維表時,匹配后最多返回的結(jié)果數(shù)。

    Integer

    1024

    進行Join時,主表輸入一條數(shù)據(jù),對應維表匹配后,會返回的結(jié)果總數(shù)受該參數(shù)限制。

    如果您可以預估一條數(shù)據(jù)對應的維表數(shù)據(jù)最多為n條,則可以設置maxJoinRows='n',以確保實時計算匹配處理效率。

類型映射

Flink字段類型

RDS MySQL字段類型

BOOLEAN

BOOLEAN

TINYINT

TINYINT

TINYINT(1)

說明

僅維表支持該映射。

BOOLEAN

SMALLINT

SMALLINT

SMALLINT

TINYINT UNSIGNED

INT

INT

INT

SMALLINT UNSIGNED

BIGINT

BIGINT

BIGINT

INT UNSIGNED

DECIMAL(20,0)

BIGINT UNSIGNED

FLOAT

FLOAT

DECIMAL

DECIMAL

DOUBLE

DOUBLE

DATE

DATE

TIME

TIME

TIMESTAMP

TIMESTAMP

VARCHAR

VARCHAR

VARBINARY

VARBINARY

使用示例

  • 結(jié)果表

    CREATE TEMPORARY TABLE datagen_source(
     `name` VARCHAR,
     `age` INT
    ) WITH (
      'connector'='datagen'
    );
    
    CREATE TEMPORARY TABLE rds_sink(
     `name` VARCHAR,
     `age` INT
    ) WITH (
      'connector'='rds',
      'password'='your-password',
      'tableName'='your-tablename',
      'url'='your-url',
      'userName'='your-username'
    );
    
    INSERT INTO rds_sink
    SELECT * FROM datagen_source;
  • 維表

    CREATE TEMPORARY TABLE datagen_source(
     a INT,
     b BIGINT,
     c STRING,
     `proctime` AS PROCTIME()
    ) WITH (
      'connector'='datagen'
    );
    
    CREATE TEMPORARY TABLE rds_dim(
     a INT,
     b VARCHAR,
     c VARCHAR
    ) WITH (
     'connector'='rds',
     'password'='<yourPassword>',
     'tableName'='<yourTablename>',
     'url'='jdbc:mysql://xxx',
     'userName'='<yourUsername>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
     a INT,
     b STRING
    ) WITH (
     'connector'='blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT T.a, H.b FROM datagen_source AS T JOIN rds_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a=H.a;

常見問題