分布式模式
分布式模式在分布式環(huán)境中執(zhí)行Map狀態(tài)迭代,從而提高處理效率和容量,適用于處理大規(guī)模數(shù)據(jù)或并行計(jì)算。
基本概念
分布式模式:當(dāng)Map ProcessorConfig的ExecutionMode字段被設(shè)置為Express時(shí),Map狀態(tài)的執(zhí)行會(huì)以分布式模式進(jìn)行。在此模式下,迭代(Map)狀態(tài)將每次迭代作為Express模式的子工作流執(zhí)行。
子工作流執(zhí)行:當(dāng)?shù)∕ap)狀態(tài)被設(shè)置為分布式模式時(shí),Map迭代的執(zhí)行會(huì)以子工作流執(zhí)行的形式進(jìn)行,此時(shí)子工作流的定義為Map Processor的狀態(tài)轉(zhuǎn)換定義,接收的輸入為Map迭代元素。
故障容忍策略:對(duì)于Map執(zhí)行,當(dāng)子工作流執(zhí)行失敗時(shí),Map狀態(tài)會(huì)以失敗狀態(tài)結(jié)束。對(duì)于迭代數(shù)量較多的Map執(zhí)行,可以通過(guò)配置錯(cuò)誤容忍使Map執(zhí)行在子工作流執(zhí)行失敗時(shí)繼續(xù)執(zhí)行。
分布式模式包含以下字段:
字段 | 類型 | 是否必選 | 描述 | 示例值 |
Name | string | 是 | 狀態(tài)名稱。 | my-state-name |
Description | string | 否 | 狀態(tài)描述。 | describe it here |
Type | string | 是 | 狀態(tài)類型。 | Map |
InputConstructor | map[string]any | 否 | 輸入構(gòu)造器。 | 請(qǐng)參見(jiàn)輸入和輸出 |
ItemsPath | string | 是 | 用于提取輸入數(shù)組的表達(dá)式。 | 請(qǐng)參見(jiàn)ItemsPath |
ItemBatcher | ItemBatcher | 否 | 開啟ItemBatcher可以將多個(gè)Item聚合為Item Batch作為輸入傳遞給子工作流執(zhí)行。 | 請(qǐng)參見(jiàn)ItemBatcher |
ItemReader | ItemReader | 否 | 支持從OSS讀取數(shù)據(jù)。 | 請(qǐng)參見(jiàn)ItemReader |
ItemConstructor | ItemConstructor | 否 |
| 請(qǐng)參見(jiàn)ItemConstructor |
ResultWriter | ResultWriter | 否 | 支持將子工作流執(zhí)行的信息寫入到指定的OSS目錄。 | 請(qǐng)參見(jiàn)ResultWriter |
MaxConcuccency | int | 否 | 支持配置子工作流執(zhí)行并發(fā)。 | 40 |
MaxItems | MaxItems | 否 | 支持配置Map最多處理的Item數(shù)量。 | 請(qǐng)參見(jiàn)MaxItems |
ToleratedFailurePercentage | ToleratedFailurePercentage | 否 | 支持按照百分比配置失敗容忍策略。 | 請(qǐng)參見(jiàn)ToleratedFailurePercentage |
ToleratedFailureCount | ToleratedFailureCount | 否 | 支持按照數(shù)量配置失敗容忍策略。 | 請(qǐng)參見(jiàn)ToleratedFailureCount |
Processor | Processor | 是 | 迭代處理器。 | 請(qǐng)參見(jiàn)Processor |
ProcessorConfig | ProcessorConfig | 是 | 處理器配置。 | 請(qǐng)參見(jiàn)ProcessorConfig |
OutputConstructor | map[string]any | 否 | 輸出構(gòu)造器。 | 請(qǐng)參見(jiàn)狀態(tài)輸出構(gòu)造器 |
Next | string | 否 | 當(dāng)前狀態(tài)的下一狀態(tài)。當(dāng)End取值為true時(shí),無(wú)需指定。 | my-next-state |
End | bool | 否 | 是否為當(dāng)前作用域的終結(jié)節(jié)點(diǎn)。 | true |
Retry | Retry | 否 | 用于定義錯(cuò)誤重試策略。 | 請(qǐng)參見(jiàn) 錯(cuò)誤處理 |
Catch | Catch | 否 | 用于定義錯(cuò)誤捕獲策略。 | 請(qǐng)參見(jiàn) 錯(cuò)誤處理 |
使用限制
對(duì)于分布式模式執(zhí)行存在以下配額限制,如果默認(rèn)配額不能滿足您的需求,您可以通過(guò)工單提交配額提升申請(qǐng)。
配額名稱 | 含義 | 默認(rèn)值 |
MaxOpenMapRun | 單個(gè)賬戶在單個(gè)地域最多允許同時(shí)執(zhí)行的分布式Map數(shù)量。 | 10 |
MaxConcurrency | 單個(gè)MapRun支持的最大并發(fā)。 | 100 |
MaxItems | 單個(gè)MapRun最多支持讀取的Item數(shù)量。 | 10000 |
分布式模式的關(guān)鍵字段
ItemsPath
用于提取輸入數(shù)組的表達(dá)式。該表達(dá)式執(zhí)行后返回JSON Array,則可以進(jìn)行迭代,將其中每個(gè)元素傳入ItemProcessor進(jìn)行處理;可使用表達(dá)式變量$Context和$Input,示例如下:
$Input.FieldA
Processor
迭代處理器。Processor包含的字段如下表所示。
字段 | 類型 | 是否必選 | 描述 | 示例值 |
States | array | 是 | 內(nèi)部嵌套的狀態(tài)數(shù)組。 |
|
StartAt | string | 是 | 內(nèi)部嵌套狀態(tài)數(shù)組的執(zhí)行起點(diǎn)。 | my start task |
ProcessorConfig
處理器配置。ProcessorConfig包含的字段如下表所示。
字段 | 類型 | 是否必選 | 描述 | 示例值 |
ExecutionMode | string | 是 | 執(zhí)行模式。 | Express |
ItemReader
Map分布式模式通過(guò)支持從OSS讀取數(shù)據(jù)輸入的方式,支持更大的數(shù)據(jù)輸入。ItemReader包含的字段如下表所示。
字段 | 類型 | 是否必選 | 描述 | 示例值 |
SourceType | string | 是 | 來(lái)源類型,可選值:OSS_CSV、OSS_JSON_LIST、OSS_OBJECTS、OSS_INVENTORY_FILES。 | OSS_CSV |
SourceParameters | string | 否 | 來(lái)源參數(shù)。 | 請(qǐng)參見(jiàn)SourceParameters |
ReaderConfig | ReaderConfig | 否 | 閱讀器配置。 | 請(qǐng)參見(jiàn)ReaderConfig |
SourceParameters
來(lái)源參數(shù)。SourceParameters包含的字段如下表所示。
字段 | 類型 | 是否必選 | 描述 | 示例值 |
Bucket | string | 否 | 文件所在的Bucket名稱。 | example-bucket |
ObjectName | string | 否 | 對(duì)象名稱。 | object_name_1 |
Prefix | string | 否 | 限定返回的Bucket名稱必須以prefix作為前綴。如果不設(shè)定,則不過(guò)濾前綴信息。 默認(rèn)值:無(wú) | example-prefix |
ReaderConfig
閱讀器配置。ReaderConfig包含的字段如下表所示。
字段 | 類型 | 是否必選 | 描述 | 示例值 |
CSVHeaders | []string | 否 | CSV文件中,第一行所包含的列標(biāo)題或字段名稱。 | ColA,ColB,ColC |
從OSS讀取數(shù)據(jù)的方式包括以下幾個(gè)方法:
存儲(chǔ)于OSS中的CSV文件
使用Map狀態(tài)從一個(gè)CSV文件中讀取數(shù)據(jù)。假設(shè)您有一個(gè)
example-object.csv
文件存儲(chǔ)在OSS的存儲(chǔ)桶example-bucket
中,可參考以下?tīng)顟B(tài)機(jī)定義示例。Type: StateMachine Name: MyWorkflow SpecVersion: v1 StartAt: Map States: - Type: Map Name: Map ProcessorConfig: ExecutionMode: Express Processor: StartAt: Pass States: - Type: Pass Name: Pass End: true ItemReader: SourceType: OSS_CSV SourceParameters: Bucket: example-bucket ObjectName: example-object.csv End: true
以下示例顯示了
example-object.csv
文件的內(nèi)容。ColA,ColB,ColC col_a_1,col_b_1,col_c_1
以下示例顯示了子工作流執(zhí)行接收的輸入。
{ "ColA": "col_a_1", "ColB": "col_b_1", "ColC": "col_c_1", }
存儲(chǔ)于OSS中的JSON文件
重要存儲(chǔ)于OSS中的JSON文件所包含的JSON內(nèi)容必須是一個(gè)JSON Array。
用于從指定的OSS中讀取JSON列表。假設(shè)您有一個(gè)
example-object.json
文件存儲(chǔ)在OSS存儲(chǔ)桶example-bucket
中,可參考以下?tīng)顟B(tài)機(jī)定義示例。Type: StateMachine Name: MyWorkflow SpecVersion: v1 StartAt: Map States: - Type: Map Name: Map ProcessorConfig: ExecutionMode: Express Processor: StartAt: Pass States: - Type: Pass Name: Pass End: true ItemReader: SourceType: OSS_JSON_LIST SourceParameters: Bucket: example-bucket ObjectName: example-object.json End: true
以下示例顯示了
example-object.json
文件的內(nèi)容。[ { "key_1": "value_1" } ]
以下示例顯示了子工作流執(zhí)行接收的輸入。
{ "key_1": "value_1", }
列舉OSS中的Objects
用于從指定的OSS中讀取對(duì)象。假設(shè)您已將Objects存儲(chǔ)在OSS存儲(chǔ)桶
example-bucket
中,命名的前綴為example-prefix
,可參考以下?tīng)顟B(tài)機(jī)定義示例。Type: StateMachine Name: MyWorkflow SpecVersion: v1 StartAt: Map States: - Type: Map Name: Map ProcessorConfig: ExecutionMode: Express Processor: StartAt: Pass States: - Type: Pass Name: Pass End: true ItemReader: SourceType: OSS_OBJECTS SourceParameters: Bucket: example-bucket Prefix: example-prefix End: true
以下目錄結(jié)構(gòu)示意圖表明
example-prefix/object_1
文件存儲(chǔ)在OSS存儲(chǔ)桶example-bucket
中。example-bucket ├── example-prefix/object_1
以下示例顯示了子工作流執(zhí)行接收的輸入。
{ "XMLName": { "Space": "", "Local": "Contents" }, "Key": "example-prefix/object_1", "Type": "Normal", "Size": 268435, "ETag": "\"50B06D6680D86F04138HSN612EF5DEC6\"", "Owner": { "XMLName": { "Space": "", "Local": "" }, "ID": "", "DisplayName": "" }, "LastModified": "2024-01-01T01:01:01Z", "StorageClass": "Standard", "RestoreInfo": "" }
存儲(chǔ)于OSS中的Inventory
使用Map狀態(tài)從指定的OSS存儲(chǔ)桶和文件中讀取數(shù)據(jù)。假設(shè)您已將清單文件
manifest.json
存儲(chǔ)在example-bucket
的OSS存儲(chǔ)桶中,路徑是inventory/2024-01-01T01-01Z/manifest.json
,可參考以下?tīng)顟B(tài)機(jī)定義示例。Type: StateMachine Name: MyWorkflow SpecVersion: v1 StartAt: Map States: - Type: Map Name: Map ProcessorConfig: ExecutionMode: Express Processor: StartAt: Pass States: - Type: Pass Name: Pass End: true ItemReader: SourceType: OSS_INVENTORY_FILES SourceParameters: Bucket: example-bucket ObjectName: inventory/2024-01-01T01-01Z/manifest.json ItemConstructor: Key.$: $Item.Key End: true
以下示例顯示了清單文件的內(nèi)容。
"example-bucket","object_name_1" "example-bucket","object_name_2"
以下示例顯示第一個(gè)子工作流執(zhí)行接收的輸入。
{ "Bucket": "example-bucket", "Key": "object_name_1" }
ItemBatcher
開啟ItemBatcher可以將多個(gè)Item聚合為Item Batch作為輸入傳遞給子工作流執(zhí)行。ItemBatcher包含的字段如下表所示。
字段 | 類型 | 是否必選 | 描述 | 示例值 |
MaxItemsPerBatch | int | 否 | 支持按照數(shù)量對(duì)Item進(jìn)行聚合。 | 請(qǐng)參見(jiàn)MaxItemsPerBatch |
MaxInputBytesPerBatch | int | 否 | 支持按照聚合后輸入的大小對(duì)Item進(jìn)行聚合。 | 請(qǐng)參見(jiàn)MaxInputBytesPerBatch |
BatchInput | map[string]any | 否 | 支持在聚合Items時(shí)添加額外的輸入。 | 請(qǐng)參見(jiàn)BatchInput |
MaxItemsPerBatch
MaxItemsPerBatch支持按照數(shù)量對(duì)Item進(jìn)行聚合。以下是MaxItemsPerBatch的使用示例。
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemBatcher:
MaxItemsPerBatch: 2
End: true
以下示例是狀態(tài)機(jī)執(zhí)行的輸入。
{
"Items": [
{"key_1":"value_1"},
{"key_2":"value_2"},
{"key_3":"value_3"},
{"key_4":"value_4"},
{"key_5":"value_5"}
]
}
以下示例顯示了子工作流執(zhí)行接收的輸入。
# execution-1
# 第1個(gè)子工作流輸入示例
{
"Items": [
{"key_1":"value_1"},
{"key_2":"value_2"}
]
}
# execution-2
# 第2個(gè)子工作流輸入示例
{
"Items": [
{"key_1":"value_3"},
{"key_2":"value_4"}
]
}
# execution-3
# 第3個(gè)子工作流輸入示例
{
"Items": [
{"key_1":"value_5"},
]
}
MaxInputBytesPerBatch
MaxInputBytesPerBatch支持按照聚合后輸入的大小對(duì)Item進(jìn)行聚合,MaxInputBytesPerBatch可以確保聚合后的輸入不超過(guò)指定的大小。以下是MaxInputBytesPerBatch的使用示例。
由于ItemBatcher會(huì)添加額外的Items key,輸入的整體大小計(jì)算包含了額外附加的Key和附加的BatchInput。
MaxInputBytesPerBatch指定的數(shù)值的單位為Byte。
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemBatcher:
MaxInputBytesPerBatch: 50
End: true
以下示例是狀態(tài)機(jī)執(zhí)行的輸入。
{
"Items":[
{"Key":1},
{"key":2},
{"Key":3},
{"Key":4},
{"Key":5}
]
}
以下示例顯示了子工作流執(zhí)行接收的輸入。
# execution-1
# 第1個(gè)子工作流輸入示例
{
"Items":[
{"Key":1},
{"key":2}
]
}
# execution-2
# 第2個(gè)子工作流輸入示例
{
"Items":[
{"Key":3},
{"key":4}
]
}
# execution-3
# 第3個(gè)子工作流輸入示例
{
"Items":[
{"Key":5}
]
}
BatchInput
BatchInput支持在聚合Items時(shí)添加額外的輸入。以下是BatchInput的使用示例。
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemBatcher:
MaxInputBytesPerBatch: 70
BatchInput:
InputKey.$: $Input.Key
End: true
以下示例是狀態(tài)機(jī)執(zhí)行的輸入。
{
"Key":"value",
"Items":[
{"Key":1},
{"key":2},
{"Key":3},
{"Key":4},
{"Key":5}
]
}
以下示例顯示了子工作流執(zhí)行接收的輸入。
# execution-1
# 第1個(gè)子工作流輸入示例
{
"BatchInput":{
"InputKey":"value"
},
"Items":[
{"Key":1},
{"key":2}
]
}
# execution-2
# 第2個(gè)子工作流輸入示例
{
"BatchInput":{
"InputKey":"value"
},
"Items":[
{"Key":3},
{"key":4}
]
}
# execution-3
# 第3個(gè)子工作流輸入示例
{
"BatchInput":{
"InputKey":"value"
},
"Items":[
{"Key":5}
]
}
ItemConstructor
ItemConstructor支持對(duì)Item進(jìn)行單獨(dú)構(gòu)造,ItemConstructor支持通過(guò)$Item引用要被構(gòu)造的Item。以下是ItemConstructor的使用示例。
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
ItemBatcher:
MaxInputBytesPerBatch: 200
BatchInput:
InputKey.$: $Input.Key
ItemConstructor:
ConstructedKey.$: $Item.Key
InputKey.$: $Input.Key
End: true
以下示例是狀態(tài)機(jī)執(zhí)行的輸入。
{
"Key":"value",
"Items":[
{"Key":1},
{"Key":2},
{"Key":3},
{"Key":4},
{"Key":5}
]
}
以下示例顯示了子工作流執(zhí)行接收的輸入。
# execution-1
# 第1個(gè)子工作流輸入示例
{
"BatchInput": {
"InputKey": "value"
},
"Items": [
{
"InputKey": "value",
"ConstructedKey": 1
},
{
"InputKey": "value",
"ConstructedKey": 2
},
{
"InputKey": "value",
"ConstructedKey": 3
}
]
}
# execution-2
# 第2個(gè)子工作流輸入示例
{
"BatchInput": {
"InputKey": "value"
},
"Items": [
{
"InputKey": "value",
"ConstructedKey": 4
},
{
"InputKey": "value",
"ConstructedKey": 5
}
]
}
ResultWriter
ResultWriter支持將子工作流執(zhí)行的信息寫入到指定的OSS目錄。ResultWriter包含的字段如下表所示。
字段 | 類型 | 是否必選 | 描述 | 示例值 |
Parameters | string | 是 | 請(qǐng)求參數(shù)。 | 請(qǐng)參見(jiàn)Parameters |
Parameters
請(qǐng)求參數(shù)。Parameters包含的字段如下表所示。
字段 | 類型 | 是否必選 | 描述 | 示例值 |
Bucket | string | 是 | 文件所在的Bucket名稱。 | example-bucket |
Prefix | string | 是 | 限定返回的Bucket名稱必須以prefix作為前綴。如果不設(shè)定,則不過(guò)濾前綴信息。 默認(rèn)值:無(wú) | example-prefix |
以下是ResultWriter的使用示例。
工作流狀態(tài)的輸入輸出不能超過(guò)大小限制,對(duì)于迭代數(shù)量較多的Map執(zhí)行,可能會(huì)出現(xiàn)輸出內(nèi)容超過(guò)大小限制的問(wèn)題,建議通過(guò)ResultWriter配置將Map的輸出寫入OSS。
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
ItemConstructor:
Key.$: $Item.Key
FailedValue.$: $Input.FailedValue
ToleratedFailurePercentage: 30
Processor:
StartAt: Choice
States:
- Type: Choice
Name: Choice
Branches:
- Condition: $Input.Key > $Input.FailedValue
Next: Fail
Default: Succeed
- Type: Succeed
Name: Succeed
End: true
- Type: Fail
Name: Fail
Code: MockError
End: true
ResultWriter:
Parameters:
Bucket: example-bucket
Prefix: example-prefix/
End: true
以下示例是狀態(tài)機(jī)執(zhí)行的輸入。
{
"FailedValue": 4,
"Items": [
{
"Key": 1
},
{
"Key": 2
},
{
"Key": 3
},
{
"Key": 4
},
{
"Key": 5
}
]
}
以下示例顯示了3個(gè)JSON文件內(nèi)容。
# 以下是manifest.json文件的內(nèi)容,存儲(chǔ)路徑為example-prefix/map-run-name/manifest.json
{
"DestinationBucket": "example-bucket",
"MapRunName": "map-run-name",
"ResultFiles": {
"FAILED": [
{
"ObjectName": "example-prefix/map-run-name/FAILED_0.json",
"Size": 262
}
],
"SUCCEED": [
{
"ObjectName": "example-prefix/map-run-name/SUCCEED_0.json",
"Size": 1057
}
]
}
}
# 以下是FAILED_0.json文件的內(nèi)容,存儲(chǔ)路徑為example-prefix/map-run-name/FAILED_0.json
[
{
"ExecutionName": "execution-name-5",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":5}",
"Output": "{\"ErrorCode\":\"MockError\"}",
"Status": "Failed",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
}
]
# 以下是SUCCEED_0.json文件的內(nèi)容,存儲(chǔ)路徑為example-prefix/map-run-name/SUCCEED_0.json
[
{
"ExecutionName": "execution-name-1",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":1}",
"Output": "{\"FailedValue\":4,\"Key\":1}",
"Status": "Succeeded",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
},
{
"ExecutionName": "execution-name-2",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":2}",
"Output": "{\"FailedValue\":4,\"Key\":2}",
"Status": "Succeeded",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
},
{
"ExecutionName": "execution-name-3",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":3}",
"Output": "{\"FailedValue\":4,\"Key\":3}",
"Status": "Succeeded",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
},
{
"ExecutionName": "execution-name-4",
"FlowName": "example",
"Input": "{\"FailedValue\":4,\"Key\":4}",
"Output": "{\"FailedValue\":4,\"Key\":4}",
"Status": "Succeeded",
"StartedTime": "rfc3339-format-time-string",
"StoppedTime": "rfc3339-format-time-string"
}
]
MaxItems
MaxItems支持配置Map最多處理的Item數(shù)量。例如對(duì)于包含了10000個(gè)Object的特定OSS Prefix,如果配置MaxItems為1000,則Map只會(huì)從OSS加載1000個(gè)Object。
MaxConcurrency
MaxConcurrency支持配置子工作流執(zhí)行并發(fā)。例如對(duì)于包含了10000個(gè)Item的Map執(zhí)行,當(dāng)MaxConcurrency配置為100時(shí),Map同時(shí)最多執(zhí)行100個(gè)子工作流。
ToleratedFailurePercentage
ToleratedFailurePercentage支持按照百分比配置失敗容忍策略,例如當(dāng)Item總數(shù)為10000時(shí),配置ToleratedFailurePercentage為10可以使 Map 容忍最多 1000 個(gè) Item 處理失敗。
ToleratedFailureCount
ToleratedFailureCount支持按照數(shù)量配置失敗容忍策略,例如當(dāng)Item總數(shù)為10000時(shí),配置ToleratedFailureCount為10可以使Map容忍最多10個(gè)Item處理失敗。
使用示例
如下工作流定義包含了一個(gè)分布式模式的Map狀態(tài),它從上游狀態(tài)讀取輸入,并通過(guò)$Input.Items
提取迭代元素。對(duì)于每一個(gè)迭代元素,Map會(huì)創(chuàng)建一個(gè)Express模式的子工作流執(zhí)行。
Type: StateMachine
Name: MyWorkflow
SpecVersion: v1
StartAt: Map
States:
- Type: Map
Name: Map
ProcessorConfig:
ExecutionMode: Express
ItemsPath: $Input.Items
Processor:
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
End: true
以下示例是狀態(tài)機(jī)執(zhí)行的輸入。
{
"Items": [
{"key_1":"value_1"},
{"key_2":"value_2"},
{"key_3":"value_3"}
]
}
Map會(huì)產(chǎn)生三個(gè)子工作流執(zhí)行,其中子工作流執(zhí)行的定義為:
Type: StateMachine
Name: Map
SpecVersion: v1
StartAt: Pass
States:
- Type: Pass
Name: Pass
End: true
以下示例顯示了子工作流執(zhí)行接收的輸入。
# execution-1
# 第1個(gè)Map迭代對(duì)應(yīng)的子工作流執(zhí)行接收的輸入示例
{"key_1":"value_1"}
# execution-2
# 第2個(gè)Map迭代對(duì)應(yīng)的子工作流執(zhí)行接收的輸入示例
{"key_2":"value_2"}
# execution-3
# 第3個(gè)Map迭代對(duì)應(yīng)的子工作流執(zhí)行接收的輸入示例
{"key_3":"value_3"}
完成后,Map狀態(tài)的輸出是一個(gè)JSON數(shù)組,其中每個(gè)項(xiàng)目都是迭代的輸出。
{
"Items": [
{
"key_1": "value_1"
},
{
"key_2": "value_2"
},
{
"key_3": "value_3"
}
]
}