本文中含有需要您注意的重要提示信息,忽略該信息可能對您的業務造成影響,請務必仔細閱讀。
本文為您介紹如何在數據攝入YAML作業中,使用實時數倉Hologres連接器進行數據同步。
背景信息
實時數倉Hologres是一站式實時數據倉庫引擎,支持海量數據實時寫入、實時更新、實時分析,支持標準SQL(兼容PostgreSQL協議),支持PB級數據多維分析(OLAP)與即席分析(Ad Hoc),支持高并發低延遲的在線數據服務(Serving),與MaxCompute、Flink、DataWorks深度融合,提供離在線一體化全棧數倉解決方案。Hologres YAML連接器支持的信息如下。
類別 | 詳情 |
支持類型 | 數據攝入目標端(Sink) |
運行模式 | 流模式和批模式 |
數據格式 | 暫不支持 |
特有監控指標 |
說明 指標含義詳情,請參見監控指標說明。 |
API種類 | YAML |
是否支持更新或刪除結果表數據 | 是 |
功能說明
功能 | 詳情 |
支持實時同步整庫(或者多張表)的全量和增量數據到每張對應的結果表中。 | |
在實時同步整庫數據的同時,還支持將每張源表的表結構變更(增加列、刪除列、重命名列等)實時同步到結果表中。 | |
支持使用正則表達式定義庫名,匹配數據源的多個分庫下的源表,合并后同步到下游每張對應表名的結果表中。 | |
支持將上游的一張表寫入到Hologres分區表。 | |
采用多種數據映射策略,將上游數據類型映射為更寬的Hologres數據類型。 |
語法結構
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
參數說明
參數 | 說明 | 數據類型 | 是否必填 | 默認值 | 備注 |
type | sink類型。 | String | 是 | 無 | 固定值為 |
name | sink名稱。 | String | 否 | 無 | 無。 |
dbname | 數據庫名稱。 | String | 是 | 無 | 無。 |
username | 用戶名,請填寫阿里云賬號的AccessKey ID。 | String | 是 | 無 | 詳情請參見如何查看AccessKey ID和AccessKey Secret信息? 重要 為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變量管理。 |
password | 密碼,請填寫阿里云賬號的AccessKey Secret。 | String | 是 | 無 | 詳情請參見如何查看AccessKey ID和AccessKey Secret信息? 重要 為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變量管理。 |
endpoint | Hologres服務地址。 | String | 是 | 無 | 詳情請參見訪問域名。 |
jdbcRetryCount | 當連接故障時,寫入和查詢的重試次數。 | Integer | 否 | 10 | 無。 |
jdbcRetrySleepInitMs | 每次重試的固定等待時間。 | Long | 否 | 1000 | 單位為毫秒。實際重試的等待時間的計算公式為 |
jdbcRetrySleepStepMs | 每次重試的累加等待時間。 | Long | 否 | 5000 | 單位為毫秒。實際重試的等待時間的計算公式為 |
jdbcConnectionMaxIdleMs | JDBC連接的空閑時間。 | Long | 否 | 60000 | 單位為毫秒。超過這個空閑時間,連接就會斷開釋放掉。 |
jdbcMetaCacheTTL | 本地緩存TableSchema信息的過期時間。 | Long | 否 | 60000 | 單位為毫秒。 |
jdbcMetaAutoRefreshFactor | 如果緩存的剩余時間小于觸發時間,則系統會自動刷新緩存。 | Integer | 否 | 4 | 緩存的剩余時間計算方法:緩存的剩余時間=緩存的過期時間 - 緩存已經存活的時間。緩存自動刷新后,則從0開始重新計算緩存的存活時間。 觸發時間計算方法:jdbcMetaCacheTTL/jdbcMetaAutoRefreshFactor兩個參數的比值。 |
mutatetype | 數據寫入模式。 | String | 否 | INSERT_OR_UPDATE | 如果Hologres物理表已設置主鍵,則Hologres Sink通過主鍵確保Exactly-once語義。當同主鍵數據出現多次時,您需要設置mutatetype參數確定更新結果表的方式,mutatetype取值如下:
|
createparttable | 當寫入分區表時,是否根據分區值自動創建不存在的分區表。 | Boolean | 否 | false | 無。 |
sink.delete-strategy | 撤回消息的處理方式。 | String | 否 | 無 | 參數取值如下:
|
jdbcWriteBatchSize | JDBC模式,Hologres Sink節點數據攢批條數(不是來一條數據處理一條,而是攢一批再處理)的最大值。 | Integer | 否 | 256 | 單位為數據行數。 說明 jdbcWriteBatchSize、jdbcWriteBatchByteSize和jdbcWriteFlushInterval三者之間為或的關系。如果同時設置了這三個參數,則滿足其中一個,就進行寫入結果數據。 |
jdbcWriteBatchByteSize | JDBC模式,Hologres Sink節點數據攢批字節數(不是來一條數據處理一條,而是攢一批再處理)的最大值。 | Long | 否 | 2097152(2*1024*1024)字節,即2 MB | 說明 jdbcWriteBatchSize、jdbcWriteBatchByteSize和jdbcWriteFlushInterval三者之間為或的關系。如果同時設置了這三個參數,則滿足其中一個,就進行寫入結果數據。 |
jdbcWriteFlushInterval | JDBC模式,Hologres Sink節點數據攢批寫入Hologres的最長等待時間。 | Long | 否 | 10000 | 單位為毫秒。 說明 jdbcWriteBatchSize、jdbcWriteBatchByteSize和jdbcWriteFlushInterval三者之間為或的關系。如果同時設置了這三個參數,則滿足其中一個,就進行寫入結果數據。 |
ignoreNullWhenUpdate | 當mutatetype='insertOrUpdate'時,是否忽略更新寫入數據中的Null值。 | Boolean | 否 | false | 參數取值如下:
|
jdbcEnableDefaultForNotNullColumn | 如果將Null值寫入Hologres表中Not Null且無默認值的字段,是否允許連接器幫助填充一個默認值。 | Boolean | 否 | true | 參數取值如下:
|
remove-u0000-in-text.enabled | 如果寫入時字符串類型包含\u0000非法字符,是否允許連接器幫助去除。 | Boolean | 否 | false | 參數取值如下:
|
deduplication.enabled | jdbc及jdbc_fixed模式寫入攢批過程中,是否進行去重。 | Boolean | 否 | true | 參數取值如下:
|
sink.type-normalize-strategy | 數據映射策略。 | String | 否 | STANDARD | 當Hologres sink轉換上游數據到Hologres類型時的策略。
|
table_property.* | Hologres物理表屬性。 | String | 否 | 無 | 創建Hologres表時,允許在WITH參數中設置物理表屬性,合理的表屬性設置可以有助于系統高效地組織和查詢數據。 警告 table_property.distribution_key默認為主鍵值,不要輕易設置,會影響寫入數據的正確性。 |
類型映射
通過配置項sink.type-normalize-strategy
設置轉換上游數據到Hologres類型時的策略。
建議您在首次啟動YAML作業時開啟sink.type-normalize-strategy。如果啟動后再開啟sink.type-normalize-strategy,需要刪除下游表并且將作業無狀態重啟才能生效。
目前數組類型僅支持INTEGER、BIGINT、FLOAT、DOUBLE、BOOLEAN、CHAR和VARCHAR。
Hologres不支持numeric類型作為主鍵,因此如果主鍵類型被映射為numeric,會被轉化為varchar類型。
STANDARD
當sink.type-normalize-strategy為STANDARD時,類型映射如下:
Flink CDC類型 | Hologres類型 |
CHAR | bpchar |
STRING | text |
VARCHAR | text(長度大于10485760時) |
varchar(長度不大于10485760時) | |
BOOLEAN | bool |
BINARY | bytea |
VARBINARY | |
DECIMAL | numeric |
TINYINT | int2 |
SMALLINT | |
INTEGER | int4 |
BIGINT | int8 |
FLOAT | float4 |
DOUBLE | float8 |
DATE | date |
TIME_WITHOUT_TIME_ZONE | time |
TIMESTAMP_WITHOUT_TIME_ZONE | timestamp |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | timestamptz |
ARRAY | 各種類型的數組 |
MAP | 不支持 |
ROW | 不支持 |
BROADEN
當sink.type-normalize-strategy為BROADEN時,將Flink CDC類型轉換為更廣泛的Hologres類型。數據映射如下:
Flink CDC類型 | Hologres類型 |
CHAR | text |
STRING | |
VARCHAR | |
BOOLEAN | bool |
BINARY | bytea |
VARBINARY | |
DECIMAL | numeric |
TINYINT | int8 |
SMALLINT | |
INTEGER | |
BIGINT | |
FLOAT | float8 |
DOUBLE | |
DATE | date |
TIME_WITHOUT_TIME_ZONE | time |
TIMESTAMP_WITHOUT_TIME_ZONE | timestamp |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | timestamptz |
ARRAY | 各種類型的數組 |
MAP | 不支持 |
ROW | 不支持 |
ONLY_BIGINT_OR_TEXT
當sink.type-normalize-strategy
為ONLY_BIGINT_OR_TEXT時,將所有Flink CDC類型轉換為Hologres中的BIGINT或STRING類型。類型映射如下:
Flink CDC類型 | Hologres類型 |
TINYINT | int8 |
SMALLINT | |
INTEGER | |
BIGINT | |
BOOLEAN | text |
BINARY | |
VARBINARY | |
DECIMAL | |
FLOAT | |
DOUBLE | |
DATE | |
TIME_WITHOUT_TIME_ZONE | |
TIMESTAMP_WITHOUT_TIME_ZONE | |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | |
ARRAY | 各種類型的數組 |
MAP | 不支持 |
ROW | 不支持 |
分區表寫入
Hologres Sink支持分區表寫入,搭配Transform可以將上游數據寫入到Hologres分區表中。寫入時需要注意:
分區鍵(Partition Key)必須為主鍵的一部分,如果采用上游非主鍵中的一個作為分區表,可能會導致上下游主鍵不一致。數據同步時,如果上下游主鍵不一致,會導致數據不一致。
Hologres支持將TEXT、VARCHAR以及INT類型的數據作為分區鍵(Partition Key),V1.3.22及以上版本支持將DATE類型設為分區鍵。
需要設置createparttable為true, 才能自動創建分區子表,否則用戶需要手動創建分區子表。
示例請參見分區表寫入示。
表結構變更同步
CDC Yaml Pipeline作業在處理表結構變更時有不同的策略,通過pipeline級別的配置項schema.change.behavior
來設置。schema.change.behavior
取值有IGNORE、LENIENT、TRY_EVOLVE、EVOLVE 和 EXCEPTION。Hologres Sink目前不支持TRY_EVOLVE策略。其中LENIENT和EVOLVE涉及到表結構變更,接下來會說明如何處理不同表結構變更事件(Schema Change Event)。
LENIENT(默認)
LENIENT模式下支持的Schema變更策略詳情如下:
添加可空列:會自動在結果表Schema末尾添加對應的列,并自動同步新增列的數據。
刪除可空列:不會直接在結果表中刪除該列,而是將該列的數據自動填充為NULL值。
添加非空列:會自動在結果表Schema末尾添加對應的列,并自動同步新增列的數據,新增的列會默認設置為可空列,對于添加列發生之前的數據自動設置為NULL值。
重命名列:被看作為添加列和刪除列。直接在結果表中末尾添加重命名后的列,并將重命名前的列數據自動填充為NULL值。例如,如果col_a重命名為col_b,則會在結果表末尾添加col_b,并自動將col_a的數據填充為NULL值。
列類型變更:不支持。由于Hologres不支持列類型變更,需要搭配sink.type-normalize-strategy使用。
暫不支持同步以下Schema的變更:
主鍵或索引等約束的變更。
非空列的刪除。
從NOT NULL轉為NULLABLE變更。
EVOLVE
EVOLVE模式下支持的Schema變更策略詳情如下:
添加可空列:支持
刪除可空列:不支持。
添加非空列:會在結果表添加可空列。
重命名列:支持,會在結果表將原有列重命名。
列類型變更:不支持。由于Hologres不支持列類型變更,需要搭配sink.type-normalize-strategy使用。
暫不支持同步以下Schema的變更:
主鍵或索引等約束的變更。
非空列的刪除。
從NOT NULL轉為NULLABLE變更。
在EVOLVE模式下,如果在未刪除結果表的情況下無狀態重啟,有可能出現上游數據與結果表的結構不一致的情況導致作業失敗,需要用戶手動調整下游表結構。
開啟EVOLVE模式示例請參見開啟EVOLVE模式。
代碼示例
寬類型映射
通過配置項sink.type-normalize-strategy
設置寬類型映射。
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.test_source_table
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres Pipeline
分區表寫入
將上游時間戳類型的create_time字段轉化為日期類型,作為Hologres表的分區鍵。
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.test_source_table
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
createparttable: true
transform:
- source-table: test_db.test_source_table
projection: \*, DATE_FORMAT(CAST(create_time AS TIMESTAMP), 'yyyy-MM-dd') as partition_key
primary-keys: id, create_time, partition_key
partition-keys: partition_key
description: add partition key
pipeline:
name: MySQL to Hologres Pipeline
開啟EVOLVE模式
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.test_source_table
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
createparttable: true
pipeline:
name: MySQL to Hologres Pipeline
schema.change.behavior: evolve
單表同步
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.test_source_table
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres Pipeline
整庫同步
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.\.*
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres Pipeline
分庫分表合并
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.user\.*
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
route:
- source-table: test_db.user\.*
sink-table: test_db.user
pipeline:
name: MySQL to Hologres Pipeline
同步到指定schema
Hologres的Schema對應MySQL的Database,可以執行結果表的Schema。
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.user\.*
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
route:
- source-table: test_db.user\.*
sink-table: test_db2.user\.*r
pipeline:
name: MySQL to Hologres Pipeline
不重啟同步新增表
如果想在作業運行的過程中實時同步新增表,設置scan.binlog.newly-added-table.enable = true.
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.\.*
server-id: 5401-5499
scan.binlog.newly-added-table.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres Pipeline
重啟新增存量表
如果想要新增同步存量表,設置scan.newly-added-table.enabled = true后重啟作業。
如果作業先設置scan.binlog.newly-added-table.enabled為true捕獲新增表,不可以再通過scan.newly-added-table.enabled = true重啟捕獲存量表,否則會有數據重復發送。
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.\.*
server-id: 5401-5499
scan.newly-added-table.enabled: true
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres Pipeline
整庫同步時排除部分表
source:
type: mysql
name: MySQL Source
hostname: <yourHostname>
port: 3306
username: flink
password: ${secret_values.password}
tables: test_db.\.*
tables.exclude: test_db.table1
server-id: 5401-5499
sink:
type: hologres
name: Hologres Sink
endpoint: <yourEndpoint>
dbname: <yourDbname>
username: ${secret_values.ak_id}
password: ${secret_values.ak_secret}
sink.type-normalize-strategy: BROADEN
pipeline:
name: MySQL to Hologres Pipeline
相關文檔
source、sink、transform和route模塊的開發參考,詳情請參見數據攝入開發參考。
數據攝入YAML作業開發的操作步驟,詳情請參見數據攝入YAML作業開發(公測中)。