Hologres數據源為您提供讀取和寫入Hologres雙向通道的功能,本文為您介紹DataWorks的Hologres數據同步的能力支持情況。
支持的版本
Hologres支持的版本:0.7、0.8、0.9、0.10、1.1、1.2、1.3。
使用限制
離線讀寫
Hologres數據源支持使用Serverless資源組(推薦)和獨享數據集成資源組。
Hologres Writer不支持寫入數據至Hologres的外部表。
Hologres數據源連通性獲取Hologres端點的邏輯:
當前地域的Hologres實例,Hologres端點獲取順序:
。跨地域的Hologres實例,Hologres端點獲取順序:
。
整庫實時寫
實時數據同步任務僅支持使用Serverless資源組(推薦)和獨享數據集成資源組。
實時數據同步任務暫不支持同步沒有主鍵的表。
支持的字段類型
字段類型 | 離線讀(Hologres Reader) | 離線寫(Hologres Writer) | 實時寫 |
UUID | 不支持 | 不支持 | 不支持 |
CHAR | 支持 | 支持 | 支持 |
NCHAR | 支持 | 支持 | 支持 |
VARCHAR | 支持 | 支持 | 支持 |
LONGVARCHAR | 支持 | 支持 | 支持 |
NVARCHAR | 支持 | 支持 | 支持 |
LONGNVARCHAR | 支持 | 支持 | 支持 |
CLOB | 支持 | 支持 | 支持 |
NCLOB | 支持 | 支持 | 支持 |
SMALLINT | 支持 | 支持 | 支持 |
TINYINT | 支持 | 支持 | 支持 |
INTEGER | 支持 | 支持 | 支持 |
BIGINT | 支持 | 支持 | 支持 |
NUMERIC | 支持 | 支持 | 支持 |
DECIMAL | 支持 | 支持 | 支持 |
FLOAT | 支持 | 支持 | 支持 |
REAL | 支持 | 支持 | 支持 |
DOUBLE | 支持 | 支持 | 支持 |
TIME | 支持 | 支持 | 支持 |
DATE | 支持 | 支持 | 支持 |
TIMESTAMP | 支持 | 支持 | 支持 |
BINARY | 支持 | 支持 | 支持 |
VARBINARY | 支持 | 支持 | 支持 |
BLOB | 支持 | 支持 | 支持 |
LONGVARBINARY | 支持 | 支持 | 支持 |
BOOLEAN | 支持 | 支持 | 支持 |
BIT | 支持 | 支持 | 支持 |
JSON | 支持 | 支持 | 支持 |
JSONB | 支持 | 支持 | 支持 |
實現原理
離線讀寫
Hologres Reader通過PSQL讀取Hologres表中的數據,根據表的Shard Count發起多個并發,每個Shard對應一個Select并發任務:
Hologres在創建表時,在同一個
CREATE TABLE
事務中,通過CALL set_table_property('table_name', 'shard_count', 'xx')
配置表的Shard Count。默認情況下,使用數據庫默認的Shard Count,具體數值取決于Hologres實例的配置。
Select語句通過表的內置列hg_shard_id的Shard篩選數據。
離線寫
Hologres Writer通過數據同步框架獲取Reader生成的協議數據,根據conflictMode(沖突策略)的配置決定寫入數據時的沖突解決策略。
您可以通過配置conflictMode,決定新導入的數據和已有數據的主鍵發生沖突時,如何處理新導入的數據:
conflictMode僅適用于有主鍵的表。具體寫入原理和性能,詳情請參考技術原理。
conflictMode為Replace(整行更新)模式時,新數據覆蓋舊數據,整行所有列全部覆蓋,沒有配置列映射的字段會強制寫NULL。
conflictMode為Update(更新)模式時,新數據覆蓋舊數據,只覆蓋配置有列映射的字段。
conflictMode為Ignore(忽略)模式時,忽略新數據。
創建數據源
在進行數據同步任務開發時,您需要在DataWorks上創建一個對應的數據源,操作流程請參見創建并管理數據源,詳細的配置參數解釋可在配置界面查看對應參數的文案提示。
數據同步任務開發
數據同步任務的配置入口和通用配置流程可參見下文的配置指導。
單表離線同步任務配置指導
操作流程請參見通過向導模式配置離線同步任務、通過腳本模式配置離線同步任務。
腳本模式配置的全量參數和腳本Demo請參見下文的附錄:腳本Demo與參數說明。
單表、整庫實時同步任務配置指導
操作流程請參見DataStudio側實時同步任務配置。
單表、整庫全增量實時寫任務配置指導
操作流程請參見數據集成側同步任務配置。
附錄:腳本Demo與參數說明
離線任務腳本配置方式
如果您配置離線任務時使用腳本模式的方式進行配置,您需要按照統一的腳本格式要求,在任務腳本中編寫相應的參數,詳情請參見通過腳本模式配置離線同步任務,以下為您介紹腳本模式下數據源的參數配置詳情。
Reader腳本Demo
配置非分區表
配置從Hologres非分區表讀取數據至內存,如下所示。
{ "type":"job", "version":"2.0",//版本號。 "steps":[ { "stepType":"holo",//插件名。 "parameter":{ "endpoint": "instance-id-region-endpoint.hologres.aliyuncs.com:port", "accessId": "***************", //訪問Hologres的accessId。 "accessKey": "*******************", //訪問Hologres的accessKey。 "database": "postgres", "table": "holo_reader_****", "column" : [ //字段。 "tag", "id", "title" ] }, "name":"Reader", "category":"reader" }, { "stepType":"stream", "parameter":{}, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"0"http://錯誤記錄數。 }, "speed":{ "throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。 "concurrent":1,//作業并發數。 "mbps":"12"http://限流,此處1mbps = 1MB/s。 } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }
Hologres表的DDL語句,如下所示。
begin; drop table if exists holo_reader_basic_src; create table holo_reader_basic_src( tag text not null, id int not null, title text not null, body text, primary key (tag, id)); call set_table_property('holo_reader_basic_src', 'orientation', 'column'); call set_table_property('holo_reader_basic_src', 'shard_count', '3'); commit;
配置分區表
配置從內存產生的數據同步至Hologres分區表的子表。
說明請注意partition的配置。
{ "type":"job", "version":"2.0",//版本號。 "steps":[ { "stepType":"holo",//插件名。 "parameter":{ "endpoint": "instance-id-region-endpoint.hologres.aliyuncs.com:port", "accessId": "***************", //訪問Hologres的accessId。 "accessKey": "*******************", //訪問Hologres的accessKey。 "database": "postgres", "table": "holo_reader_basic_****", "partition": "tag=foo", "column" : [ "*" ], "fetchSize": "100" }, "name":"Reader", "category":"reader" }, { "stepType":"stream", "parameter":{}, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"0"http://錯誤記錄數。 }, "speed":{ "throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。 "concurrent":1,//作業并發數。 "mbps":"12"http://限流,此處1mbps = 1MB/s。 } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }
Hologres表的DDL語句,如下所示。
begin; drop table if exists holo_reader_basic_part_src; create table holo_reader_basic_part_src( tag text not null, id int not null, title text not null, body text, primary key (tag, id)) partition by list( tag ); call set_table_property('holo_reader_basic_part_src', 'orientation', 'column'); call set_table_property('holo_reader_basic_part_src', 'shard_count', '3'); commit; create table holo_reader_basic_part_src_1583161774228 partition of holo_reader_basic_part_src for values in ('foo'); # 確保分區表子表已經創建且導入數據。 postgres=# \d+ holo_reader_basic_part_src Table "public.holo_reader_basic_part_src" Column | Type | Collation | Nullable | Default | Storage | Stats target | Description --------+---------+-----------+----------+---------+----------+--------------+------------- tag | text | | not null | | extended | | id | integer | | not null | | plain | | title | text | | not null | | extended | | body | text | | | | extended | | Partition key: LIST (tag) Indexes: "holo_reader_basic_part_src_pkey" PRIMARY KEY, btree (tag, id) Partitions: holo_reader_basic_part_src_1583161774228 FOR VALUES IN ('foo')
Reader腳本參數
參數 | 描述 | 是否必選 | 默認值 |
endpoint | 目標交互式分析(Hologres)實例對應的endpoint,格式為 endpoint包括經典網絡、公網和VPC三種網絡類型,請根據數據集成資源組和Hologres實例所在的網絡環境選擇正確的endpoint類型,否則會出現網絡不通或者性能受限的情況:
通常建議數據集成資源組和Hologres實例配在同一個地域的同一個可用區,以保證網絡端口連通,實現最大性能。 | 是 | 無 |
accessId | 訪問Hologres的accessId。 | 是 | 無 |
accessKey | 訪問Hologres的accessKey,請確保該密鑰對目標表有寫入權限。 | 是 | 無 |
database | Hologres實例內部數據庫的名稱。 | 是 | 無 |
table | Hologres的表名稱,如果是分區表,請指定父表的名稱。 | 是 | 無 |
column | 定義導入目標表的數據列,必須包含目標表的主鍵集合。例如 | 是 | 無 |
partition | 針對分區表,表示分區Column以及對應的Value,格式為 重要
| 否 | 空,表示非分區表。 |
fetchSize | 指定使用Select語句一次性讀取數據的條數。 | 否 | 1,000 |
Writer腳本Demo
配置非分區表
配置從內存產生的數據導入至Hologres普通表,示例為通過JDBC模式導入的配置。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "mysql", "parameter": { "envType": 0, "datasource": "<mysql_source_name>", "column": [ "<column1>", "<column2>", ......, "<columnN>" ], "connection": [ { "datasource": "<mysql_source_name>",//mysql數據源名 "table": [ "<mysql_table_name>" ] } ], "where": "", "splitPk": "", "encoding": "UTF-8" }, "name": "Reader", "category": "reader" }, { "stepType": "holo", "parameter": { "maxConnectionCount": 9, "datasource": "<holo_sink_name>",//Hologres數據源名稱 "truncate":true,//清理規則。 "conflictMode": "ignore", "envType": 0, "column": [ "<column1>", "<column2>", ......, "<columnN>" ], "table": "<holo_table_name>" }, "name": "Writer", "category": "writer" } ], "setting": { "executeMode": null, "errorLimit": { "record": "" }, "speed": { "concurrent": 2,//作業并發數 "throttle": false//限流 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
Hologres表的DDL語句,如下所示。
begin; drop table if exists mysql_to_holo_test; create table mysql_to_holo_test( tag text not null, id int not null, body text not null, brrth date, primary key (tag, id)); call set_table_property('mysql_to_holo_test', 'orientation', 'column'); call set_table_property('mysql_to_holo_test', 'distribution_key', 'id'); call set_table_property('mysql_to_holo_test', 'clustering_key', 'birth'); commit;
配置分區表
說明目前Hologres僅支持LIST分區,分區Column僅支持單個Column分區,且僅支持INT4或TEXT類型。
請確認該參數和表DDL的分區配置匹配。
配置從內存產生的數據同步至Hologres分區表的子表。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "mysql", "parameter": { "envType": 0, "datasource": "<mysql_source_name>", "column": [ "<column1>", "<column2>", ......, "<columnN>" ], "connection": [ { "datasource": "<mysql_source_name>", "table": [ "<mysql_table_name>" ] } ], "where": "", "splitPk": "<mysql_pk>",//mysql的pk字段 "encoding": "UTF-8" }, "name": "Reader", "category": "reader" }, { "stepType": "holo", "parameter": { "maxConnectionCount": 9, "partition": "<partition_key>",//Hologres分區鍵 "datasource": "<holo_sink_name>",//Hologres數據源名 "conflictMode": "ignore", "envType": 0, "column": [ "<column1>", "<column2>", ......, "<columnN>" ], "table": "<holo_table_name>" }, "name": "Writer", "category": "writer" } ], "setting": { "executeMode": null, "errorLimit": { "record": "" }, "speed": { "concurrent": 2,//作業并發數 "throttle": false//限流 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
Hologres表的DDL語句,如下所示。
BEGIN; CREATE TABLE public.hologres_parent_table( a text , b int, c timestamp, d text, ds text, primary key(ds,b) ) PARTITION BY LIST(ds); CALL set_table_property('public.hologres_parent_table', 'orientation', 'column'); CREATE TABLE public.holo_child_1 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201215'); CREATE TABLE public.holo_child_2 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201216'); CREATE TABLE public.holo_child_3 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201217'); COMMIT;
Writer腳本參數
參數 | 描述 | 是否必選 | 默認值 |
endpoint | 目標交互式分析(Hologres)實例對應的endpoint,格式為 endpoint包括公網、經典網絡和VPC三種網絡類型,請根據數據集成資源組和Hologres實例所在的網絡環境選擇正確的endpoint類型,否則會出現網絡不通或者性能受限的情況:
通常建議數據集成資源組和Hologres實例在同一個地域的同一個可用區,以確保網絡連通,實現最大性能。 | 是 | 無 |
accessId | 訪問Hologres的accessId。 | 是 | 無 |
accessKey | 訪問Hologres的accessKey,請確保該密鑰對目標表有寫入權限。 | 是 | 無 |
database | Hologres實例內部數據庫的名稱。 | 是 | 無 |
table | Hologres的表名稱,目前支持表名稱中包含Schema,例如 | 是 | 無 |
conflictMode | conflictMode包括Replace、Update和Ignore,詳情請參見實現原理。 | 是 | 無 |
column | 定義導入目標表的數據列,必須包含目標表的主鍵集合。例如 | 是 | 無 |
partition | 針對分區表,表示分區Column以及對應的Value,格式為 說明
| 否 | 空,表示非分區表 |
truncate | 寫入Holo表之前是否需要清空目標表。
| 否 | false |