日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版(ADB PG)

更新時(shí)間:

本文為您介紹如何使用AnalyticDB PostgreSQL連接器。

背景信息

云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版是一種大規(guī)模并行處理(MPP)數(shù)據(jù)倉(cāng)庫(kù)服務(wù),可提供海量數(shù)據(jù)在線分析服務(wù)。

AnalyticDB PostgreSQL版支持的信息如下。

類別

詳情

支持類型

維表和結(jié)果表

運(yùn)行模式

流模式和批模式

數(shù)據(jù)格式

暫不適用

特有監(jiān)控指標(biāo)

  • 結(jié)果表:

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

  • 維表:無(wú)

說(shuō)明

指標(biāo)含義詳情,請(qǐng)參見(jiàn)監(jiān)控指標(biāo)說(shuō)明

API種類

SQL

是否支持更新或刪除結(jié)果表數(shù)據(jù)

前提條件

使用限制

  • 僅Flink實(shí)時(shí)計(jì)算引擎VVR 6.0.0及以上版本支持云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版連接器。

  • 僅Flink實(shí)時(shí)計(jì)算引擎VVR 8.0.1及以上版本支持云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版7.0版本。

  • 暫不支持自建的PostgreSQL。

語(yǔ)法結(jié)構(gòu)

CREATE TABLE adbpg_table (
 id INT,
 len INT,
 content VARCHAR,
 PRIMARY KEY(id)
) WITH (
 'connector'='adbpg',
 'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
 'tableName'='<yourDatabaseTableName>',
 'userName'='<yourDatabaseUserName>',
 'password'='<yourDatabasePassword>'
);

WITH參數(shù)

類型

參數(shù)

說(shuō)明

數(shù)據(jù)類型

是否必填

默認(rèn)值

備注

通用

connector

表類型。

String

無(wú)

固定值為adbpg。

url

JDBC連接地址。

String

無(wú)

格式為jdbc:postgresql://<Address>:<PortId>/<DatabaseName>

tableName

表名。

String

無(wú)

無(wú)。

userName

用戶名。

String

無(wú)

無(wú)。

password

密碼。

String

無(wú)

無(wú)。

maxRetryTimes

寫(xiě)入數(shù)據(jù)失敗后,重試寫(xiě)入的最大次數(shù)。

Integer

3

無(wú)。

targetSchema

Schema名稱。

String

public

無(wú)。

caseSensitive

大小寫(xiě)是否敏感。

String

false

參數(shù)取值如下:

  • true:大小寫(xiě)敏感。

  • false(默認(rèn)值):大小寫(xiě)不敏感。

connectionMaxActive

連接池的最大連接數(shù)。

Integer

5

系統(tǒng)會(huì)自動(dòng)釋放與數(shù)據(jù)庫(kù)服務(wù)的空閑連接。

重要

此參數(shù)設(shè)置過(guò)大可能會(huì)導(dǎo)致服務(wù)端連接數(shù)出現(xiàn)異常。

結(jié)果表獨(dú)有

retryWaitTime

重試的時(shí)間間隔。

Integer

100

單位毫秒。

batchSize

一次批量寫(xiě)入的數(shù)據(jù)條數(shù)。

Integer

500

無(wú)。

flushIntervalMs

清空緩存的時(shí)間間隔。

Integer

無(wú)

如果緩存中的數(shù)據(jù)在等待指定時(shí)間后,依然沒(méi)有達(dá)到輸出條件,系統(tǒng)會(huì)自動(dòng)輸出緩存中的所有數(shù)據(jù)。單位毫秒。

writeMode

第一次嘗試寫(xiě)入時(shí)的寫(xiě)入方式。

String

insert

參數(shù)取值如下:

  • insert(默認(rèn)值):直接插入,沖突時(shí)參考conflictMode

  • upsert:沖突時(shí)自動(dòng)update,只能用于有主鍵的表。

conflictMode

當(dāng)Insert寫(xiě)入出現(xiàn)主鍵沖突或者唯一索引沖突時(shí)的處理策略。

String

strict

參數(shù)取值如下:

  • strict(默認(rèn)值):沖突時(shí)報(bào)錯(cuò)。

  • ignore:沖突時(shí)忽略。

  • update:沖突時(shí)自動(dòng)更新,可用于無(wú)主鍵表,執(zhí)行效率較低。

  • upsert:沖突時(shí)自動(dòng)更新,只能用于有主鍵表。

維表獨(dú)有

maxJoinRows

單行數(shù)據(jù)Join的最多行數(shù)。

Integer

1024

無(wú)。

cache

緩存策略。

String

ALL

支持以下三種緩存策略:

  • ALL(默認(rèn)值):緩存維表里的所有數(shù)據(jù)。在Job運(yùn)行前,系統(tǒng)會(huì)將維表中所有數(shù)據(jù)加載到Cache中,之后所有的維表查找數(shù)據(jù)都會(huì)通過(guò)Cache進(jìn)行。如果在Cache中無(wú)法找到數(shù)據(jù),則KEY不存在,并在Cache過(guò)期后重新加載一遍全量Cache。

  • LRU:緩存維表里的部分?jǐn)?shù)據(jù)。源表的每條數(shù)據(jù)都會(huì)觸發(fā)系統(tǒng)先在Cache中查找數(shù)據(jù)。如果沒(méi)有找到,則去物理維表中查找。

  • None:無(wú)緩存。

cacheSize

緩存大小,即緩存多少行數(shù)據(jù)。

Long

100000

僅當(dāng)選擇LRU緩存策略時(shí),cacheSize參數(shù)生效。

cacheTTLMs

緩存失效的超時(shí)時(shí)間。

Long

Long.MAX_VALUE

cacheTTLMs配置和cache配置有關(guān):

  • 如果cache配置為L(zhǎng)RU,則cacheTTLMs為緩存失效的超時(shí)時(shí)間。默認(rèn)不過(guò)期。

  • 如果cache配置為ALL,則cacheTTLMs為設(shè)置緩存重新加載的間隔時(shí)間,默認(rèn)不重新加載。

單位為毫秒。

類型映射

云原生數(shù)據(jù)倉(cāng)庫(kù)AnalyticDB PostgreSQL版字段類型

Flink字段類型

boolean

boolean

smallint

int

int

int

bigint

bigint

float

double

varchar

varchar

text

varchar

timestamp

timestamp

date

date

使用示例

  • 結(jié)果表

    CREATE TEMPORARY TABLE datagen_source (
     `name` VARCHAR,
     `age` INT
    )
    COMMENT 'datagen source table'
    WITH (
     'connector' = 'datagen'
    );
    
    CREATE TABLE adbpg_sink (
     name VARCHAR,
     age INT
    ) WITH (
     'connector'='adbpg',
     'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
     'tableName'='<yourDatabaseTableName>',
     'userName'='<yourDatabaseUserName>',
     'password'='<yourDatabasePassword>'
    );
    
    INSERT INTO adbpg_sink
    SELECT * FROM datagen_source;
  • 維表

    CREATE TEMPORARY TABLE datagen_source(
     a INT,
     b BIGINT,
     c STRING,
     `proctime` AS PROCTIME()
    ) 
    COMMENT 'datagen source table'
    WITH (
     'connector' = 'datagen'
    };
    
    CREATE TABLE adbpg_dim (
     a INT, 
     b VARCHAR, 
     c VARCHAR
    ) WITH (
     'connector'='adbpg',
     'url'='jdbc:postgresql://<Address>:<PortId>/<DatabaseName>',
     'tableName'='<yourDatabaseTableName>',
     'userName'='<yourDatabaseUserName>',
     'password'='<yourDatabasePassword>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
     a INT,
     b STRING
    )
    COMMENT 'blackhole sink table'
    WITH (
     'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink SELECT T.a,H.b
    FROM datagen_source AS T JOIN adb_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;

相關(guān)文檔