數(shù)據(jù)流動流式任務最佳實踐
為了便于CPFS智算版與OSS Bucket中的單文件粒度持續(xù)性的數(shù)據(jù)流動,您可以通過創(chuàng)建流式任務實現(xiàn)。
方案概覽
實現(xiàn)某一個目錄下不同文件的導入導出,只需要4步:
創(chuàng)建數(shù)據(jù)流動:通過創(chuàng)建數(shù)據(jù)流動,建立CPFS智算版文件系統(tǒng)任意子目錄到OSS Bucket下任意prefix的映射。
創(chuàng)建流式任務:通過調(diào)用CreateDataFlowTask創(chuàng)建流式導入或?qū)С鋈蝿眨⒃炊四夸浀侥康亩说耐ǖ馈?chuàng)建成功后,流式任務的狀態(tài)一直保持為運行中,但實際不會流動數(shù)據(jù),還需要為其創(chuàng)建流式子任務。
創(chuàng)建流式任務的子任務:接著通過調(diào)用CreateDataFlowSubTask提交不同文件的導入或?qū)С鲎尤蝿铡?/p>
查詢流式子任務狀態(tài):最后通過調(diào)用DescribeDataFlowSubTask查詢已提交的子任務進度與狀態(tài)。當調(diào)用結(jié)果中Status值為COMPLETE;Progress值為10000時,則表示源數(shù)據(jù)已全部導入或?qū)С鲋聊繕四夸洝?/p>
前提條件
已創(chuàng)建CPFS智算版文件系統(tǒng)。具體操作,請參見創(chuàng)建文件系統(tǒng)。
已為目標OSS Bucket設置標簽(key: cpfs-dataflow, value: true),且在數(shù)據(jù)流動的使用過程中,不能刪除和修改該標簽,否則CPFS智算版數(shù)據(jù)流動無法訪問Bucket的數(shù)據(jù)。具體操作,請參見管理存儲空間標簽。
為了防止多個數(shù)據(jù)流動向同一個OSS Bucket導出數(shù)據(jù)時產(chǎn)生數(shù)據(jù)沖突,需要該OSS Bucket開啟版本控制。更多信息,請參見版本控制概述。
使用數(shù)據(jù)流動流式任務,CPFS智算版文件系統(tǒng)的版本號必須為2.6.0及以上版本。關于如何查看文件系統(tǒng)的版本號,請參見查詢文件系統(tǒng)版本號。
流式導入任務
本示例以將OSS Bucket(examplebucket)中/bmcpfs/test/file.xml
下的子目錄(/test/file)數(shù)據(jù)遷移至CPFS智算版文件系統(tǒng)(bmcpfs-370jz26fkr2st9****)中/oss/mnt/file.xml
下的子目錄(/mnt/file)為例,介紹如何創(chuàng)建流式導入任務和流式導入子任務,實現(xiàn)單文件粒度持續(xù)性的數(shù)據(jù)流動。
創(chuàng)建數(shù)據(jù)流動。
您可以通過調(diào)用API或控制臺為目標文件系統(tǒng)創(chuàng)建數(shù)據(jù)流動,并獲取數(shù)據(jù)流動ID(例如,df-37bae1804cc6****)。
通過調(diào)用CreateDataFlow API創(chuàng)建數(shù)據(jù)流動。
{ "FileSystemId": "bmcpfs-370jz26fkr2st9****", //CPFS智算版文件系統(tǒng)ID。 "SourceStorage": "oss://examplebucket", //源端OSS Bucket的訪問地址。 "FileSystemPath": "/oss/", //指定鏈接到OSS的CPFS智算版文件系統(tǒng)目錄,且必須是已有目錄。 "SourceStoragePath": "/bmcpfs/", //源端存儲Bucket內(nèi)的訪問路徑。 }
預期輸出:
{ "RequestId": "473469C7-AA6F-4DC5-B3DB-A3DC0D****3E", "DataFlowId": "df-37bae1804cc6****" }
通過控制臺創(chuàng)建數(shù)據(jù)流動。具體操作,請參見管理數(shù)據(jù)流動。
創(chuàng)建數(shù)據(jù)流動流式導入任務。
通過調(diào)用CreateDataFlowTask API創(chuàng)建數(shù)據(jù)流動流式導入任務,并保存
TaskId
返回值。{ "FileSystemId": "bmcpfs-370jz26fkr2st9****", //智算CPFS文件系統(tǒng)ID。 "DataFlowId": "df-37bae1804cc6****", //數(shù)據(jù)流動ID。 "TaskAction": "StreamImport", // 數(shù)據(jù)流動流式任務類型,導入StreamImport,導出StreamExport。 "DataType": "MetaAndData", // 數(shù)據(jù)類型,目前僅支持MetaAndData。 "Directory": "/test/", // 同步源相對目錄。此場景為OSS Bucket的Bucket Prefix。 "DstDirectory": "/mnt/", // 同步目標相對目錄。此場景為CPFS智算版文件系統(tǒng)的目錄。 "ConflictPolicy": "SKIP_THE_FILE" // 同名文件沖突策略。OVERWRITE_EXISTING:強制覆蓋同名文件;SKIP_THE_FILE:跳過同名文件;KEEP_LATEST:比較更新時間,保留最新版本。 }
預期輸出:
{ "RequestId": "2D69A58F-345C-4FDE-88E4-BF518948F518", "TaskId": "task-376a61ab2d80****" }
創(chuàng)建流式導入任務的子任務。
通過調(diào)用CreateDataFlowSubTask API提交流式任務的導入子任務。
{ "FileSystemId": "bmcpfs-370jz26fkr2st9****", //智算CPFS文件系統(tǒng)ID。 "DataFlowId": "df-37bae1804cc****", //數(shù)據(jù)流動ID。 "DataFlowTaskId": "task-376a61ab2d80****", //流式導入任務ID。 "SrcFilePath": "/file.xml", // 流式任務中源目錄下的某個文件路徑。此場景為OSS Bucket的Bucket對象路徑。 "DstFilePath": "/mnt/file.xml" // 流式任務中目標目錄下的某個文件路徑。此場景為CPFS智算版文件系統(tǒng)的目錄。 }
預期輸出:
{ "RequestId": "A70BEE5D-76D3-49FB-B58F-1F398211A5C3", "DataFlowSubTaskId": "subTaskId-370kyfmyknxcyzw****" }
查詢流式子任務的執(zhí)行進度和任務狀態(tài)。
通過調(diào)用DescribeDataFlowSubTasks API查詢已提交的子任務執(zhí)行進度與任務狀態(tài)。不同的Key值對應不同的Value值。更多信息,請參見DescribeDataFlowSubTasks。
本示例通過篩選DataFlowIds(數(shù)據(jù)流動ID)方式查詢子任務的信息。
{ "FileSystemId": "bmcpfs-370jz26fkr2st9****", //CPFS智算版文件系統(tǒng)ID。 "Filters": [ { "Key": "DataFlowIds", "Value": "df-37bae1804cc****" } ] }
預期輸出:
{ "RequestId": "98696EF0-1607-4E9D-B01D-F20930B6****", "DataFlowSubTask": { "DataFlowSubTask": [ { "FileSystemId": "bmcpfs-370jz26fkr2st9****", //文件系統(tǒng)ID。 "DataFlowId": "df-37bae1804cc****", //數(shù)據(jù)流動ID。 "DataFlowTaskId": "task-37b705830bcb****", //數(shù)據(jù)流動流式任務ID。 "DataFlowSubTaskId": "subTaskId-370kyfmyknxcyzw****",//數(shù)據(jù)流動流式子任務ID。 "SrcFilePath": "/bmcpfs/test/file.xml",//源文件路徑。 "DstFilePath": "/oss/mnt/file.xml", //目標文件路徑。 "Status": "COMPLETE", "Progress": 10000, "CreateTime": "2024-10-23 16:28:16", "StartTime": "2024-10-23 16:28:17", "EndTime": "2024-10-23 16:29:22", "ErrorMsg": "",//未返回或者返回為空時,表示沒有錯誤信息。 "ProgressStats": { "BytesTotal": 68, "BytesDone": 68, "ActualBytes": 68, "AverageSpeed": 34 }, "FileDetail": { "ModifyTime": 1725897600000000000, "Size": 68, "Checksum": "crc64:850309505450944****"http://文件校驗碼。 } } ] } }
調(diào)用結(jié)果中的Progress和Status參數(shù)值即為子任務的執(zhí)行進度和任務狀態(tài)信息。當任務狀態(tài)Status值為COMPLETE時,表示任務已完成;當Progress值為10000時,表示數(shù)據(jù)已全部導入或?qū)С鲋聊繕四夸洝?/p>
流式導出任務
本示例以將CPFS智算版文件系統(tǒng)(bmcpfs-370jz26fkr2st9****)中/oss_test/yaml/test/file.png
數(shù)據(jù)遷移至OSS Bucket(examplebucket)/bmcpfs_test/dataflows/mnt/file.png
為例,介紹如何創(chuàng)建流式導出任務和流式導出子任務,實現(xiàn)單文件粒度持續(xù)性的數(shù)據(jù)流動。
創(chuàng)建數(shù)據(jù)流動。
您可以通過調(diào)用API或控制臺為目標文件系統(tǒng)創(chuàng)建數(shù)據(jù)流動,并獲取數(shù)據(jù)流動ID(例如,df-37bae1804cc6****)。
通過調(diào)用CreateDataFlow API創(chuàng)建數(shù)據(jù)流動。
{ "FileSystemId": "bmcpfs-370jz26fkr2st9****", //CPFS智算版文件系統(tǒng)ID。 "SourceStorage": "oss://examplebucket", //源端OSS Bucket的訪問地址。 "FileSystemPath": "/oss/", //指定鏈接到OSS的CPFS智算版文件系統(tǒng)目錄,且必須是已有目錄。 "SourceStoragePath": "/bmcpfs/", //源端存儲Bucket內(nèi)的訪問路徑。 }
預期輸出:
{ "RequestId": "473469C7-AA6F-4DC5-B3DB-A3DC0D****3E", "DataFlowId": "df-37bae1804cc6****" }
通過控制臺創(chuàng)建數(shù)據(jù)流動。具體操作,請參見管理數(shù)據(jù)流動。
創(chuàng)建數(shù)據(jù)流動流式導出任務。
通過調(diào)用CreateDataFlowTask API創(chuàng)建數(shù)據(jù)流動流式導出任務,并保存
TaskId
返回值。{ "FileSystemId": "bmcpfs-370jz26fkr2st9****", //智算CPFS文件系統(tǒng)ID。 "DataFlowId": "df-37bae1804cc6****", //數(shù)據(jù)流動ID。 "TaskAction": "StreamImport", // 數(shù)據(jù)流動流式任務類型,此場景為導出StreamExport。 "DataType": "MetaAndData", // 數(shù)據(jù)類型,目前僅支持MetaAndData。 "Directory": "/yaml/", // 同步源相對目錄。流式導出場景時為CPFS智算版文件系統(tǒng)CPFS目錄的相對路徑。 "DstDirectory": "/dataflows/", // 同步目標相對目錄。流式導出場景為OSS Bucket的Bucket Prefix的相對路徑。 "ConflictPolicy": "SKIP_THE_FILE" // 同名文件沖突策略。OVERWRITE_EXISTING:強制覆蓋同名文件;SKIP_THE_FILE:跳過同名文件;KEEP_LATEST:比較更新時間,保留最新版本。 }
預期輸出:
{ "RequestId": "BC7C825C-5F65-4B56-BEF6-98C56C7C930B", "TaskId": "task-37b705830bcb****" }
創(chuàng)建流式導出任務的子任務。
通過調(diào)用CreateDataFlowSubTask API提交流式導出任務的子任務。
{ "FileSystemId": "bmcpfs-370jz26fkr2st9****", //智算CPFS文件系統(tǒng)ID。 "DataFlowId": "df-37bae1804cc****", //數(shù)據(jù)流動ID。 "DataFlowTaskId": "task-37b705830bcb****", //流式導出任務ID。 "SrcFilePath": "/test/file.png", // 流式任務中Directory目錄下的某個文件的相對路徑。 "DstFilePath": "/mnt/file.png" // 流式任務中DstDirectory目錄下的某個文件的相對路徑。 }
預期輸出:
{ "RequestId": "A70BEE5D-76D3-49FB-B58F-1F398211A5C3", "DataFlowSubTaskId": "subTaskId-370l4l3x6qsb1z1****" }
查詢流式導出子任務的執(zhí)行進度和任務狀態(tài)。
通過調(diào)用DescribeDataFlowSubTasks API查詢已提交的子任務執(zhí)行進度與任務狀態(tài)。不同的Key值對應不同的Value值。更多信息,請參見DescribeDataFlowSubTasks。
本示例通過篩選DataFlowIds(數(shù)據(jù)流動ID)方式查詢子任務的信息。
{ "FileSystemId": "bmcpfs-370jz26fkr2st9****", //CPFS智算版文件系統(tǒng)ID。 "Filters": [ { "Key": "DataFlowIds", "Value": "df-37bae1804cc****" } ] }
預期輸出:
{ "RequestId": "FCBB356-96CA-135B-84B3-02E6F262B6BD", "DataFlowSubTask": { "DataFlowSubTask": [ { "FileSystemId": "bmcpfs-370jz26fkr2st9****", //文件系統(tǒng)ID。 "DataFlowId": "df-37bae1804cc****", //數(shù)據(jù)流動ID。 "DataFlowTaskId": "task-37b705830bcb****", //數(shù)據(jù)流動流式任務ID。 "DataFlowSubTaskId": "subTaskId-370l4l3x6qsb1z1****",//數(shù)據(jù)流動流式子任務ID。 "SrcFilePath": "/oss_test/yaml/test/file.png",//源文件路徑。 "DstFilePath": "/bmcpfs_test/dataflows/mnt/file.png", //目標文件路徑。 "Status": "COMPLETE", "Progress": 10000, "CreateTime": "2024-10-23 17:18:16", "StartTime": "2024-10-23 17:18:17", "EndTime": "2024-10-23 17:19:00", "ErrorMsg": "",//未返回或者返回為空時,表示沒有錯誤信息。 "ProgressStats": { "BytesTotal": 68, "BytesDone": 68, "ActualBytes": 68, "AverageSpeed": 34 }, "FileDetail": { "ModifyTime": 1725897600000000000, "Size": 68, "Checksum": "crc64:850309505450944****"http://文件校驗碼。 } } ] } }
調(diào)用結(jié)果中的Progress和Status參數(shù)值即為子任務的執(zhí)行進度和任務狀態(tài)信息。當任務狀態(tài)Status值為COMPLETE時,表示任務已完成;當Progress值為10000時,表示數(shù)據(jù)已全部導入或?qū)С鲋聊繕四夸洝?/p>
相關操作
如果您需要取消流式子任務,可以通過調(diào)用CancelDataFlowSubTask API實現(xiàn)。僅支持取消CREATED和RUNNING狀態(tài)的流式子任務。
{
"FileSystemId": "bmcpfs-370jz26fkr2st9****", //CPFS智算版文件系統(tǒng)ID。
"DataFlowId": "df-37bae1804cc****", //數(shù)據(jù)流動ID。
"DataFlowTaskId": "task-37b705830bcb****", //流式導入或?qū)С鋈蝿誌D。
"DataFlowSubTaskId": "subTaskId-370kyfmyknxcyzw****" // 數(shù)據(jù)流動流式子任務ID。
}