使用DataWorks工具將表格存儲(Tablestore)時序表中的全量數據或者增量數據同步到另一個時序表。
背景信息
表格存儲支持對時序數據進行存儲與查詢,在某些場景下,您可能希望將時序表中的時序數據遷移到另一個時序表中。
關于表格存儲時序模型的更多信息,請參見時序模型介紹。
具體數據遷移場景如下:
數據集成是穩定高效、彈性伸縮的數據同步平臺,致力于提供在復雜網絡環境下、豐富的異構數據源之間高速穩定的數據移動及同步能力。您可以在數據開發(DataStudio)界面直接創建離線同步節點用于離線(批量)數據周期性同步或者創建實時同步節點用于單表或整庫增量數據實時同步。更多信息,請參見數據集成概述。
使用DataWorks數據集成的離線同步任務,您可以將表格存儲(Tablestore)時序表中的全量數據或者增量數據同步到另一個時序表中。
前提條件
在訪問控制RAM服務側完成如下操作:
已創建RAM用戶并為RAM用戶授予管理表格存儲權限(AliyunOTSFullAccess)。具體操作,請參見創建RAM用戶和為RAM用戶授權。
已為RAM用戶創建AccessKey。具體操作,請參見創建AccessKey。
在表格存儲服務側已完成如下操作:
已在實例詳情頁面獲取實例的服務地址(Endpoint)。
在概覽頁面,單擊實例名稱后,在實例詳情頁簽的實例訪問地址區域,即可查看實例的Endpoint,請根據實際選擇。
已創建目標時序表,用于存放遷移數據。具體操作,請參見創建時序表。
在DataWorks服務側已完成如下操作:
已開通DataWorks服務并創建工作空間。具體操作,請參見開通DataWorks服務和創建工作空間。
已授予RAM用戶DataWorks工作空間的開發角色權限,用于在數據開發(DataStudio)界面創建同步任務。具體操作,請參見添加空間成員并管理成員角色權限。
全量數據同步
步驟一:新增數據源
具體操作,請參見新增表格存儲數據源。
如果源時序表和目標時序表所在實例相同,則只需要創建一個數據源即可。
如果源時序表和目標時序表所在實例不同,則需要分別創建源數據源和目標數據源。
步驟二:新建離線同步任務
進入數據開發頁面。
登錄DataWorks控制臺,切換至目標地域后,單擊左側導航欄的 ,在下拉框中選擇對應工作空間后單擊進入數據開發。
在DataStudio控制臺的數據開發頁面,單擊業務流程節點下的目標業務流程。
如果需要新建業務流程,請參見創建業務流程。
在數據集成節點上右鍵選擇新建節點 > 離線同步。
在新建節點對話框,選擇路徑并填寫節點名稱。
單擊確認。
在數據集成節點下會顯示新建的離線同步節點。
步驟三:配置離線同步任務并啟動
在數據集成節點下,雙擊打開新建的離線同步任務節點。
配置同步網絡鏈接。
選擇離線同步任務的數據來源、數據去向以及用于執行同步任務的資源組,并測試連通性。
重要數據同步任務的執行必須經過資源組來實現,請選擇資源組并保證資源組與讀寫兩端的數據源能聯通訪問。
在網絡與資源配置步驟,選擇數據來源為Tablestore,并選擇數據源名稱為源數據源。
選擇資源組。
選擇資源組后,系統會顯示資源組的地域、規格等信息以及自動測試資源組與所選數據源之間連通性。
重要請與新增數據源時選擇的資源組保持一致。
選擇數據去向為Tablestore,并選擇數據源名稱為目標數據源。
系統會自動測試資源組與所選數據源之間連通性。
測試可連通后,單擊下一步。
配置任務并保存。
全量數據的同步需要使用到Tablestore(OTS) Reader與Tablestore Writer插件。腳本配置規則請參見Reader腳本Demo與參數說明和Writer腳本Demo與參數說明。
在配置任務步驟,單擊工具欄中的轉換腳本,選擇確認即可進入腳本模式。
根據配置示例編輯腳本。
配置示例如下:
重要為了便于理解,在配置示例中增加了注釋內容,實際使用腳本時請刪除所有注釋內容。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "ots", //讀數據使用的插件為Tablestore Reader。 "parameter": { "datasource": "instance01", //已增加的Tablestore源數據源。 "table": "timeseriesReadTable", //源時序表名稱。 "newVersion": "true", //使用Tablestore Reader的新版本。 "mode": "normal", //使用Tablestore Reader的普通模式。 "isTimeseriesTable": "true", //表示該表為時序表。 "envType": 1, "column": [ { "name": "_m_name" //導出時序表的度量名稱字段。 }, { "name": "_data_source" //導出時序表的數據源字段。 }, { "name": "_tags" //導出時序表的標簽字段。 }, { "name": "_time" //導出時序表的時間戳字段。 }, //以下配置請根據時序表中的具體數據為準。 本示例中從源時序表中導出列名分別為string_1(string類型),bool_1(bool類型),int_1(int整型),double_1(double類型)的四列。 { "name": "string_1", "type": "string" }, { "name": "bool_1", "type": "BOOL" }, { "name": "int_1", "type": "int" }, { "name": "double_1", "type": "double" } ] }, "name": "Reader", //表示以上配置為reader讀數據的配置。 "category": "reader" }, { "stepType": "ots", //寫數據使用的插件為Tablestore Writer。 "parameter": { "datasource": "instance02", //已增加的Tablestore目標數據源。 "table": "timeseriesWriteTable", //目標時序表名稱。 "newVersion": "true", //使用Tablestore Writer的新版本。 "mode": "normal", //使用Tablestore Writer的普通模式。 "isTimeseriesTable": "true", //表示該表為時序表。 "envType": 1, "column": [ { "name": "_m_name" //導入時序表的度量名稱字段。 }, { "name": "_data_source" //導入時序表的數據源字段。 }, { "name": "_tags" //導入時序表的標簽字段。 }, { "name": "_time" //導入時序表的時間戳字段。 }, //導入Tablestore Reader導出的四列字段,導入字段與導出字段的順序必須一一對應。 { "name": "string_1", "type": "string" }, { "name": "bool_1", "type": "BOOL" }, { "name": "int_1", "type": "int" }, { "name": "double_1", "type": "double" } ] }, "name": "Writer", //表示以上配置為writer寫數據的配置。 "category": "writer" } ], "setting": { "errorLimit": { "record": "" }, "locale": "zh", "speed": { "throttle": false, "concurrent": 2 //同步任務的并發數。 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
按【Ctrl+S】保存腳本。
說明執行后續操作時,如果未保存腳本,則系統會出現保存確認的提示,單擊確認即可。
執行同步任務。
說明全量數據一般只需要同步一次,無需配置調度屬性。
單擊圖標。
在參數對話框,選擇運行資源組的名稱。
單擊運行。
步驟四:查看任務執行結果
執行同步任務后,在DataWorks控制臺通過日志查看任務執行狀態,在表格存儲控制臺查看目標時序表數據同步結果。
在DataWorks控制臺通過日志查看同步任務運行狀態。
在同步任務的結果頁簽,單擊Detail log url對應的鏈接。
在任務的詳細運行日志頁面,查看
Current task status
對應的狀態。當
Current task status
的值為FINISH時,表示任務運行完成。
在表格存儲控制臺查看目標時序表數據同步結果。
登錄表格存儲控制臺。
在概覽頁面上方,選擇地域。
單擊實例名稱。
在實例詳情頁簽,單擊時序表列表頁簽。
在時序表列表頁簽,單擊目標時序表操作列的數據管理。
在數據管理頁簽,即可查看同步到該時序表中的數據。
增量數據同步
步驟一:新增數據源
具體操作,請參見新增表格存儲數據源。
步驟二:新建離線同步任務
進入數據開發頁面。
登錄DataWorks控制臺,切換至目標地域后,單擊左側導航欄的 ,在下拉框中選擇對應工作空間后單擊進入數據開發。
在DataStudio控制臺的數據開發頁面,單擊業務流程節點下的目標業務流程。
如果需要新建業務流程,請參見創建業務流程。
在數據集成節點上右鍵選擇新建節點 > 離線同步。
在新建節點對話框,選擇路徑并填寫節點名稱。
單擊確認。
在數據集成節點下會顯示新建的離線同步節點。
步驟三:配置離線同步任務并啟動
雙擊打開新建的離線同步任務節點。
配置同步網絡鏈接。
選擇離線同步任務的數據來源、數據去向以及用于執行同步任務的資源組,并測試連通性。
重要數據同步任務的執行必須經過資源組來實現,請選擇資源組并保證資源組與讀寫兩端的數據源能聯通訪問。
在網絡與資源配置步驟,選擇數據來源為Tablestore Stream,并選擇數據源名稱為源數據源。
說明Tablestore Stream插件主要用于導出Tablestore增量數據,更多信息請參見Tablestore Stream配置同步任務。
選擇資源組。
選擇資源組后,系統會顯示資源組的地域、規格等信息以及自動測試資源組與所選數據源之間連通性。
重要請與新增數據源時選擇的資源組保持一致。
選擇數據去向為Tablestore,并選擇數據源名稱為目標數據源。
系統會自動測試資源組與所選數據源之間連通性。
測試可連通后,單擊下一步。
在提示對話框,單擊確認使用腳本模式。
重要表格存儲僅支持腳本模式。當存在不支持向導模式的數據源時,如果繼續編輯任務,將強制使用腳本模式進行編輯。
任務轉為腳本模式后,將無法轉為向導模式,
配置同步任務并保存。
增量數據的同步需要使用到Tablestore Stream Reader與Tablestore Writer插件。腳本配置規則請參見Reader腳本Demo與參數說明和Writer腳本Demo與參數說明。
在配置任務步驟,單擊工具欄中的轉換腳本,選擇確認即可進入腳本模式。
根據配置示例編輯腳本。
配置示例如下:
重要為了便于理解,在配置示例中增加了注釋內容,實際使用腳本時請刪除所有注釋內容。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "otsstream", // 讀數據使用的插件為Tablestore Stream Reader。 "parameter": { "datasource": "instance01", // 已增加的Tablestore源數據源。 "dataTable": "timeseriesReadTable", // 源時序表名稱。 "statusTable": "TableStoreStreamReaderStatusTable", // 用于記錄狀態的表的名稱。如果該表不存在,則Tablestore Stream Reader會自動創建。 "maxRetries": 30, // 最大重試次數。 "isExportSequenceInfo": false, // 不導出時序信息。 "mode": "single_version_and_update_only", // 模式為行模式(single_version_and_update_only)。 "isTimeseriesTable": "true", // 表示該表為時序表。 // 此處的${}用于注入參數,參數會在下一步配置。 "startTimeString": "${startTime}", // 增量數據的時間范圍(左閉右開)的左邊界。 "endTimeString": "${endTime}", // 增量數據的時間范圍(左閉右開)的右邊界。 "envType": 1, "column": [ { "name": "_m_name" }, { "name": "_data_source" }, { "name": "_tags" }, { "name": "_time" }, { "name": "string_1", "type": "string" }, { "name": "bool_1", "type": "BOOL" }, { "name": "int_1", "type": "int" }, { "name": "double_1", "type": "double" } ] }, "name": "Reader", "category": "reader" }, { "stepType": "ots", // 寫數據使用的插件為Tablestore Writer。 "parameter": { "datasource": "instance02", // 已增加的Tablestore目標數據源。 "table": "timeseriesWriteTable", // 目標時序表名稱。 "newVersion": "true", // 使用Tablestore Writer的新版本。 "mode": "normal", // 使用Tablestore Writer的普通模式。 "isTimeseriesTable": "true", // 表示該表為時序表。 "envType": 1, "column": [ { "name": "_m_name" // 導入時序表的度量名稱字段。 }, { "name": "_data_source" // 導入時序表的數據源字段。 }, { "name": "_tags" // 導入時序表的標簽字段。 }, { "name": "_time" // 導入時序表的時間戳字段。 }, // 導入Tablestore Steam Reader導出的四列字段。導入字段與導出字段的順序必須一一對應。 { "name": "string_1", "type": "string" }, { "name": "bool_1", "type": "BOOL" }, { "name": "int_1", "type": "int" }, { "name": "double_1", "type": "double" } ] }, "name": "Writer", // 表示以上配置為writer寫數據的配置。 "category": "writer" } ], "setting": { "errorLimit": { "record": "" }, "locale": "zh", "speed": { "throttle": false, "concurrent": 2 // 同步任務的并發度。 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
按【Ctrl+S】保存腳本。
說明執行后續操作時,如果未保存腳本,則系統會出現保存確認的提示,單擊確認即可。
配置調度屬性。
單擊任務右側的調度配置。
在調度配置面板的調度參數部分,單擊新增參數,根據下表說明新增參數。更多信息,請參見調度參數支持的格式。
參數
參數值
startTime
$[yyyymmddhh24-2/24]$[miss-10/24/60]
endTime
$[yyyymmddhh24-1/24]$[miss-10/24/60]
配置示例如下圖所示。
假如任務運行時的時間為2023年04月23日19:00:00點,則startTime為20230423175000,endTime為20230423185000。任務將會同步17:50到18:50時段內新增的數據。
在時間屬性部分,配置時間屬性。更多信息,請參見時間屬性配置說明。
此處以任務整點每小時自動運行為例介紹配置,如下圖所示。
在調度依賴部分,單擊使用工作空間根節點,系統會自動生成依賴的上游節點信息。
使用工作空間根節點表示該任務無上游的依賴任務。
配置完成后,關閉配置調度面板。
(可選)根據需要調試腳本代碼。
通過調試腳本代碼,確保同步任務能成功同步源時序表的增量數據到目標時序表中。
重要調試腳本代碼時配置的時間范圍內的數據可能會多次導入到目標時序表,相同時間線會覆蓋寫入到目標時序表。
單擊圖標。
在參數對話框,選擇運行資源組的名稱,并配置自定義參數。
自定義參數的格式為
yyyyMMddHHmmss
,例如20230423175000。單擊運行。
提交同步任務。
提交同步任務后,同步任務會按照配置的調度屬性進行運行。
單擊圖標。
在提交對話框,根據需要填寫變更描述。
單擊確認。
步驟四:查看任務執行結果
在DataWorks控制臺查看任務運行狀態。
單擊同步任務工具欄右側的運維。
在周期實例頁面的實例視角頁簽,查看實例的運行詳情。
在表格存儲控制臺查看目標時序表數據同步結果。
登錄表格存儲控制臺。
在概覽頁面上方,選擇地域。
單擊實例名稱。
在實例詳情頁簽,單擊時序表列表頁簽。
在時序表列表頁簽,單擊目標時序表操作列的數據管理。
在數據管理頁簽,即可查看同步到該時序表中的數據。