日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

實時數倉Hologres數據攝入YAML連接器(公測中)

重要

本文中含有需要您注意的重要提示信息,忽略該信息可能對您的業務造成影響,請務必仔細閱讀。

本文為您介紹如何在數據攝入YAML作業中,使用實時數倉Hologres連接器進行數據同步。

背景信息

實時數倉Hologres是一站式實時數據倉庫引擎,支持海量數據實時寫入、實時更新、實時分析,支持標準SQL(兼容PostgreSQL協議),支持PB級數據多維分析(OLAP)與即席分析(Ad Hoc),支持高并發低延遲的在線數據服務(Serving),與MaxCompute、Flink、DataWorks深度融合,提供離在線一體化全棧數倉解決方案。Hologres YAML連接器支持的信息如下。

類別

詳情

支持類型

數據攝入目標端(Sink)

運行模式

流模式和批模式

數據格式

暫不支持

特有監控指標

  • numRecordsOut

  • numRecordsOutPerSecond

說明

指標含義詳情,請參見監控指標說明

API種類

YAML

是否支持更新或刪除結果表數據

功能說明

功能

詳情

整庫同步

支持實時同步整庫(或者多張表)的全量和增量數據到每張對應的結果表中。

表結構變更同步

在實時同步整庫數據的同時,還支持將每張源表的表結構變更(增加列、刪除列、重命名列等)實時同步到結果表中。

分庫分表合并

支持使用正則表達式定義庫名,匹配數據源的多個分庫下的源表,合并后同步到下游每張對應表名的結果表中。

分區表寫入

支持將上游的一張表寫入到Hologres分區表。

類型映射

采用多種數據映射策略,將上游數據類型映射為更寬的Hologres數據類型。

語法結構

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}

參數說明

參數

說明

數據類型

是否必填

默認值

備注

type

sink類型。

String

固定值為hologres。

name

sink名稱。

String

無。

dbname

數據庫名稱。

String

無。

username

用戶名,請填寫阿里云賬號的AccessKey ID。

String

詳情請參見如何查看AccessKey ID和AccessKey Secret信息?

重要

為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變量管理

password

密碼,請填寫阿里云賬號的AccessKey Secret。

String

詳情請參見如何查看AccessKey ID和AccessKey Secret信息?

重要

為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變量管理

endpoint

Hologres服務地址。

String

詳情請參見訪問域名

jdbcRetryCount

當連接故障時,寫入和查詢的重試次數。

Integer

10

無。

jdbcRetrySleepInitMs

每次重試的固定等待時間。

Long

1000

單位為毫秒。實際重試的等待時間的計算公式為jdbcRetrySleepInitMs+retry*jdbcRetrySleepStepMs。

jdbcRetrySleepStepMs

每次重試的累加等待時間。

Long

5000

單位為毫秒。實際重試的等待時間的計算公式為jdbcRetrySleepInitMs+retry*jdbcRetrySleepStepMs

jdbcConnectionMaxIdleMs

JDBC連接的空閑時間。

Long

60000

單位為毫秒。超過這個空閑時間,連接就會斷開釋放掉。

jdbcMetaCacheTTL

本地緩存TableSchema信息的過期時間。

Long

60000

單位為毫秒。

jdbcMetaAutoRefreshFactor

如果緩存的剩余時間小于觸發時間,則系統會自動刷新緩存。

Integer

4

緩存的剩余時間計算方法:緩存的剩余時間=緩存的過期時間 - 緩存已經存活的時間。緩存自動刷新后,則從0開始重新計算緩存的存活時間。

觸發時間計算方法:jdbcMetaCacheTTL/jdbcMetaAutoRefreshFactor兩個參數的比值。

mutatetype

數據寫入模式。

String

INSERT_OR_UPDATE

如果Hologres物理表已設置主鍵,則Hologres Sink通過主鍵確保Exactly-once語義。當同主鍵數據出現多次時,您需要設置mutatetype參數確定更新結果表的方式,mutatetype取值如下:

  • INSERT_OR_IGNORE:保留首次出現的數據,忽略后續所有數據。

  • INSERT_OR_REPLACE:后出現的數據整行替換已有數據。

  • INSERT_OR_UPDATE:更新已有數據的部分列。例如一張表有a、b、c和d四個字段,a是PK(Primary Key),寫入Hologres時只寫入a和b兩個字段,在PK重復的情況下,系統只會更新b字段,c和d保持不變。

createparttable

當寫入分區表時,是否根據分區值自動創建不存在的分區表。

Boolean

false

無。

sink.delete-strategy

撤回消息的處理方式。

String

參數取值如下:

  • IGNORE_DELETE:忽略Update Before和Delete消息。適用于僅需插入或更新數據,而無需刪除數據的場景。

  • CHANGELOG_STANDARD:Flink框架按照 Flink SQL Changelog的工作原理運行,不忽略刪除操作,并通過先刪除數據再插入的方式執行更新操作,以確保數據準確性。適用于不涉及局部更新的場景

jdbcWriteBatchSize

JDBC模式,Hologres Sink節點數據攢批條數(不是來一條數據處理一條,而是攢一批再處理)的最大值。

Integer

256

單位為數據行數。

說明

jdbcWriteBatchSize、jdbcWriteBatchByteSizejdbcWriteFlushInterval三者之間為或的關系。如果同時設置了這三個參數,則滿足其中一個,就進行寫入結果數據。

jdbcWriteBatchByteSize

JDBC模式,Hologres Sink節點數據攢批字節數(不是來一條數據處理一條,而是攢一批再處理)的最大值。

Long

2097152(2*1024*1024)字節,即2 MB

說明

jdbcWriteBatchSize、jdbcWriteBatchByteSizejdbcWriteFlushInterval三者之間為或的關系。如果同時設置了這三個參數,則滿足其中一個,就進行寫入結果數據。

jdbcWriteFlushInterval

JDBC模式,Hologres Sink節點數據攢批寫入Hologres的最長等待時間。

Long

10000

單位為毫秒。

說明

jdbcWriteBatchSizejdbcWriteBatchByteSizejdbcWriteFlushInterval三者之間為或的關系。如果同時設置了這三個參數,則滿足其中一個,就進行寫入結果數據。

ignoreNullWhenUpdate

mutatetype='insertOrUpdate'時,是否忽略更新寫入數據中的Null值。

Boolean

false

參數取值如下:

  • false(默認值):將Null值寫到Hologres結果表里。

  • true:忽略更新寫入數據中的Null值。

jdbcEnableDefaultForNotNullColumn

如果將Null值寫入Hologres表中Not Null且無默認值的字段,是否允許連接器幫助填充一個默認值。

Boolean

true

參數取值如下:

  • true(默認值):允許連接器填充默認值并寫入,規則如下。

    • 如果字段是String類型,則默認寫為空("")。

    • 如果字段是Number類型,則默認寫為0。

    • 如果是Date、timestamp或timestamptz時間類型字段,則默認寫為1970-01-01 00:00:00。

  • false:不填充默認值,寫Null到Not Null字段時,會拋出異常。

remove-u0000-in-text.enabled

如果寫入時字符串類型包含\u0000非法字符,是否允許連接器幫助去除。

Boolean

false

參數取值如下:

  • false(默認值):連接器不對數據進行操作,但碰到臟數據時寫入可能拋出如下異常,ERROR: invalid byte sequence for encoding "UTF8": 0x00

    此時需要在源表提前處理臟數據,或者在SQL中定義臟數據處理邏輯。

  • true:連接器會幫助去除字符串類型中的\u0000,防止寫入拋出異常。

deduplication.enabled

jdbc及jdbc_fixed模式寫入攢批過程中,是否進行去重。

Boolean

true

參數取值如下:

  • true(默認值):如果一批數據中有主鍵相同的數據,默認進行去重,只保留最后一條到達的數據。以兩個字段,其中第一個字段為主鍵的數據舉例:

    • INSERT (1,'a')INSERT (1,'b')兩條記錄先后到達,去重之后只保留后到達的(1,'b')寫入Hologres結果表中。

    • Hologres結果表中已經存在記錄(1,'a'),此時DELETE (1,'a')INSERT (1,'b')兩條記錄先后到達,只保留后到達的(1,'b')寫入hologres中,表現為直接更新,而不是先刪除再插入。

  • false:在攢批過程中不進行去重,如果發現新到的數據和目前攢批的數據中存在主鍵相同的情況,先將攢批數據寫入,寫入完成之后再繼續寫入新到的數據。

sink.type-normalize-strategy

數據映射策略。

String

STANDARD

當Hologres sink轉換上游數據到Hologres類型時的策略。

  • STANDARD:根據標準將Flink CDC類型轉換為PG類型。

  • BROADEN:將Flink CDC類型轉換為更廣泛的Hologres類型。

  • ONLY_BIGINT_OR_TEXT:將所有Flink CDC類型轉換為Hologres中的BIGINT或STRING類型。

table_property.*

Hologres物理表屬性。

String

創建Hologres表時,允許在WITH參數中設置物理表屬性,合理的表屬性設置可以有助于系統高效地組織和查詢數據。

警告

table_property.distribution_key默認為主鍵值,不要輕易設置,會影響寫入數據的正確性。

類型映射

通過配置項sink.type-normalize-strategy設置轉換上游數據到Hologres類型時的策略。

說明
  • 建議您在首次啟動YAML作業時開啟sink.type-normalize-strategy。如果啟動后再開啟sink.type-normalize-strategy,需要刪除下游表并且將作業無狀態重啟才能生效。

  • 目前數組類型僅支持INTEGER、BIGINT、FLOAT、DOUBLE、BOOLEAN、CHAR和VARCHAR。

  • Hologres不支持numeric類型作為主鍵,因此如果主鍵類型被映射為numeric,會被轉化為varchar類型。

STANDARD

當sink.type-normalize-strategy為STANDARD時,類型映射如下:

Flink CDC類型

Hologres類型

CHAR

bpchar

STRING

text

VARCHAR

text(長度大于10485760時)

varchar(長度不大于10485760時)

BOOLEAN

bool

BINARY

bytea

VARBINARY

DECIMAL

numeric

TINYINT

int2

SMALLINT

INTEGER

int4

BIGINT

int8

FLOAT

float4

DOUBLE

float8

DATE

date

TIME_WITHOUT_TIME_ZONE

time

TIMESTAMP_WITHOUT_TIME_ZONE

timestamp

TIMESTAMP_WITH_LOCAL_TIME_ZONE

timestamptz

ARRAY

各種類型的數組

MAP

不支持

ROW

不支持

BROADEN

當sink.type-normalize-strategy為BROADEN時,將Flink CDC類型轉換為更廣泛的Hologres類型。數據映射如下:

Flink CDC類型

Hologres類型

CHAR

text

STRING

VARCHAR

BOOLEAN

bool

BINARY

bytea

VARBINARY

DECIMAL

numeric

TINYINT

int8

SMALLINT

INTEGER

BIGINT

FLOAT

float8

DOUBLE

DATE

date

TIME_WITHOUT_TIME_ZONE

time

TIMESTAMP_WITHOUT_TIME_ZONE

timestamp

TIMESTAMP_WITH_LOCAL_TIME_ZONE

timestamptz

ARRAY

各種類型的數組

MAP

不支持

ROW

不支持

ONLY_BIGINT_OR_TEXT

sink.type-normalize-strategy為ONLY_BIGINT_OR_TEXT時,將所有Flink CDC類型轉換為Hologres中的BIGINT或STRING類型。類型映射如下:

Flink CDC類型

Hologres類型

TINYINT

int8

SMALLINT

INTEGER

BIGINT

BOOLEAN

text

BINARY

VARBINARY

DECIMAL

FLOAT

DOUBLE

DATE

TIME_WITHOUT_TIME_ZONE

TIMESTAMP_WITHOUT_TIME_ZONE

TIMESTAMP_WITH_LOCAL_TIME_ZONE

ARRAY

各種類型的數組

MAP

不支持

ROW

不支持

分區表寫入

Hologres Sink支持分區表寫入,搭配Transform可以將上游數據寫入到Hologres分區表中。寫入時需要注意:

  • 分區鍵(Partition Key)必須為主鍵的一部分,如果采用上游非主鍵中的一個作為分區表,可能會導致上下游主鍵不一致。數據同步時,如果上下游主鍵不一致,會導致數據不一致。

  • Hologres支持將TEXTVARCHAR以及INT類型的數據作為分區鍵(Partition Key),V1.3.22及以上版本支持將DATE類型設為分區鍵。

  • 需要設置createparttable為true, 才能自動創建分區子表,否則用戶需要手動創建分區子表。

示例請參見分區表寫入示

表結構變更同步

CDC Yaml Pipeline作業在處理表結構變更時有不同的策略,通過pipeline級別的配置項schema.change.behavior來設置。schema.change.behavior取值有IGNORE、LENIENT、TRY_EVOLVE、EVOLVE 和 EXCEPTION。Hologres Sink目前不支持TRY_EVOLVE策略。其中LENIENT和EVOLVE涉及到表結構變更,接下來會說明如何處理不同表結構變更事件(Schema Change Event)。

LENIENT(默認)

LENIENT模式下支持的Schema變更策略詳情如下:

  • 添加可空列:會自動在結果表Schema末尾添加對應的列,并自動同步新增列的數據。

  • 刪除可空列:不會直接在結果表中刪除該列,而是將該列的數據自動填充為NULL值。

  • 添加非空列:會自動在結果表Schema末尾添加對應的列,并自動同步新增列的數據,新增的列會默認設置為可空列,對于添加列發生之前的數據自動設置為NULL值。

  • 重命名列:被看作為添加列和刪除列。直接在結果表中末尾添加重命名后的列,并將重命名前的列數據自動填充為NULL值。例如,如果col_a重命名為col_b,則會在結果表末尾添加col_b,并自動將col_a的數據填充為NULL值。

  • 列類型變更:不支持。由于Hologres不支持列類型變更,需要搭配sink.type-normalize-strategy使用。

  • 暫不支持同步以下Schema的變更:

    • 主鍵或索引等約束的變更。

    • 非空列的刪除。

    • 從NOT NULL轉為NULLABLE變更。

EVOLVE

EVOLVE模式下支持的Schema變更策略詳情如下:

  • 添加可空列:支持

  • 刪除可空列:不支持。

  • 添加非空列:會在結果表添加可空列。

  • 重命名列:支持,會在結果表將原有列重命名。

  • 列類型變更:不支持。由于Hologres不支持列類型變更,需要搭配sink.type-normalize-strategy使用。

  • 暫不支持同步以下Schema的變更:

    • 主鍵或索引等約束的變更。

    • 非空列的刪除。

    • 從NOT NULL轉為NULLABLE變更。

警告

在EVOLVE模式下,如果在未刪除結果表的情況下無狀態重啟,有可能出現上游數據與結果表的結構不一致的情況導致作業失敗,需要用戶手動調整下游表結構。

開啟EVOLVE模式示例請參見開啟EVOLVE模式。

代碼示例

寬類型映射

通過配置項sink.type-normalize-strategy設置寬類型映射。

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

分區表寫入

將上游時間戳類型的create_time字段轉化為日期類型,作為Hologres表的分區鍵。

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  createparttable: true
 
transform:
  - source-table: test_db.test_source_table
    projection: \*, DATE_FORMAT(CAST(create_time AS TIMESTAMP), 'yyyy-MM-dd') as partition_key
    primary-keys: id, create_time, partition_key
    partition-keys: partition_key
    description: add partition key 

pipeline:
  name: MySQL to Hologres Pipeline

開啟EVOLVE模式

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  createparttable: true

pipeline:
  name: MySQL to Hologres Pipeline
  schema.change.behavior: evolve

單表同步

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.test_source_table
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

整庫同步

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

分庫分表合并

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.user\.*
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN
  
route:
  - source-table: test_db.user\.*
    sink-table: test_db.user

pipeline:
  name: MySQL to Hologres Pipeline

同步到指定schema

Hologres的Schema對應MySQL的Database,可以執行結果表的Schema。

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.user\.*
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN
  
route:
  - source-table: test_db.user\.*
    sink-table: test_db2.user\.*r

pipeline:
  name: MySQL to Hologres Pipeline

不重啟同步新增表

如果想在作業運行的過程中實時同步新增表,設置scan.binlog.newly-added-table.enable = true.

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  server-id: 5401-5499
  scan.binlog.newly-added-table.enabled: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

重啟新增存量表

如果想要新增同步存量表,設置scan.newly-added-table.enabled = true后重啟作業。

警告

如果作業先設置scan.binlog.newly-added-table.enabled為true捕獲新增表,不可以再通過scan.newly-added-table.enabled = true重啟捕獲存量表,否則會有數據重復發送。

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  server-id: 5401-5499
  scan.newly-added-table.enabled: true

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

整庫同步時排除部分表

source:
  type: mysql
  name: MySQL Source
  hostname: <yourHostname>
  port: 3306
  username: flink
  password: ${secret_values.password}
  tables: test_db.\.*
  tables.exclude: test_db.table1
  server-id: 5401-5499

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <yourEndpoint>
  dbname: <yourDbname>
  username: ${secret_values.ak_id}
  password: ${secret_values.ak_secret}
  sink.type-normalize-strategy: BROADEN

pipeline:
  name: MySQL to Hologres Pipeline

相關文檔