日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Tablestore Stream數據源

DataWorks數據集成支持使用Tablestore Stream Reader讀取Tablestore的增量數據,本文為您介紹DataWorks的Tablestore Stream數據讀取能力。

數據同步前準備:Tablestore Stream環境準備

使用Tablestore Stream插件前,您必須確保Tablestore表上已經開啟Stream功能。時序表已默認開啟stream功能。您可以在建表時指定開啟,也可以使用SDK的UpdateTable接口開啟。開啟Stream的方法,如下所示。

SyncClient client = new SyncClient("", "", "", "");
#方法1:建表的時候開啟:
CreateTableRequest createTableRequest = new CreateTableRequest(tableMeta);
createTableRequest.setStreamSpecification(new StreamSpecification(true, 24)); // 24代表增量數據保留24小時。
client.createTable(createTableRequest);
#方法2:如果建表時未開啟,您可以通過UpdateTable開啟:
UpdateTableRequest updateTableRequest = new UpdateTableRequest("tableName");
updateTableRequest.setStreamSpecification(new StreamSpecification(true, 24));
client.updateTable(updateTableRequest);

使用SDK的UpdateTable接口開啟時:

  • 指定開啟Stream并設置過期時間,即開啟了Tablestore增量數據導出功能。開啟stream功能后,Tablestore服務端就會將您的操作日志額外保存起來,每個分區有一個有序的操作日志隊列,每條操作日志會在一定時間后被垃圾回收,該時間即為您指定的過期時間。

  • Tablestore的SDK提供了幾個Stream相關的API用于讀取這部分的操作日志,增量插件也是通過Tablestore SDK的接口獲取到增量數據。列模式下會將增量數據轉化為多個6元組的形式(pk、colName、version、colValue、opType和sequenceInfo),行模式則會以普通行的形式導出增量數據。

支持的同步模式與字段類型

Tablestore Stream Reader插件支持使用列模式或行模式同步Tablestore的增量數據。兩種模式下的同步過程和字段類型要求如下。

列模式

在Tablestore多版本模式下,表中的數據組織為行>列>版本三級的模式, 一行可以有任意列,列名并不是固定的,每一列可以含有多個版本,每個版本都有一個特定的時間戳(版本號)。

您可以通過Tablestore的API進行一系列讀寫操作,Tablestore通過記錄您最近對表的一系列寫操作(或數據更改操作)來實現記錄增量數據的目的,所以您也可以把增量數據看作一批操作記錄。

Tablestore支持PutRowUpdateRowDeleteRow操作:

  • PutRow:寫入一行,如果該行已存在即覆蓋該行。

  • UpdateRow:更新一行,不更改原行的其它數據。更新包括新增或覆蓋(如果對應列的對應版本已存在)一些列值、刪除某一列的全部版本、刪除某一列的某個版本。

  • DeleteRow:刪除一行。

Tablestore會根據每種操作生成對應的增量數據記錄,Reader插件會讀出這些記錄,并導出為數據集成的數據格式。

同時,由于Tablestore具有動態列、多版本的特性,所以Reader插件導出的一行不對應Tablestore中的一行,而是對應Tablestore中的一列的一個版本。即Tablestore中的一行可能會導出很多行,每行包含主鍵值、該列的列名、該列下該版本的時間戳(版本號)、該版本的值、操作類型。如果設置isExportSequenceInfo為true,還會包括時序信息。

轉換為數據集成的數據格式后,定義了以下四種操作類型:

  • U(UPDATE):寫入一列的一個版本。

  • DO(DELETE_ONE_VERSION):刪除某一列的某個版本。

  • DA(DELETE_ALL_VERSION):刪除某一列的全部版本,此時需要根據主鍵和列名,刪除對應列的全部版本。

  • DR(DELETE_ROW):刪除某一行,此時需要根據主鍵,刪除該行數據。

假設該表有兩個主鍵列,主鍵列名分別為pkName1, pkName2,示例如下。

pkName1

pkName2

columnName

timestamp

columnValue

opType

pk1_V1

pk2_V1

col_a

1441803688001

col_val1

U

pk1_V1

pk2_V1

col_a

1441803688002

col_val2

U

pk1_V1

pk2_V1

col_b

1441803688003

col_val3

U

pk1_V2

pk2_V2

col_a

1441803688000

DO

pk1_V2

pk2_V2

col_b

DA

pk1_V3

pk2_V3

DR

pk1_V3

pk2_V3

col_a

1441803688005

col_val1

U

假設導出的數據如上,共7行,對應Tablestore表內的3行,主鍵分別是(pk1_V1,pk2_V1),(pk1_V2, pk2_V2),(pk1_V3, pk2_V3):

  • 對于主鍵為(pk1_V1,pk2_V1)的一行,包括寫入col_a列的兩個版本和col_b列的一個版本等操作。

  • 對于主鍵為(pk1_V2,pk2_V2)的一行,包括刪除col_a列的一個版本和刪除col_b列的全部版本等操作。

  • 對于主鍵為(pk1_V3,pk2_V3)的一行,包括刪除整行和寫入col_a列的一個版本等操作。

行模式

  • 寬行表

    您可以通過行模式導出數據,該模式將用戶每次更新的記錄,抽取成行的形式導出,需要設置mode屬性并配置列名。

    "parameter": {
      #parameter中配置下面三項配置(例如datasource、table等其它配置項照常配置)。
      "mode": "single_version_and_update_only", # 配置導出模式。
      "column":[  #按照需求添加需要導出TableStore中的列,您可以自定義設置配置個數。
              {
                 "name": "uid"  #列名示例,可以是主鍵或屬性列。
              },
              {
                 "name": "name"  #列名示例,可以是主鍵或屬性列。
              },
      ],
      "isExportSequenceInfo": false, #single_version_and_update_only模式下只能是false。
    }
  • 時序表

    時序表在創建時會自動開啟Stream,因此不需要手動開啟Stream功能。

    Tablestore Stream Reader支持導出時序表中的增量數據,當表為時序表時,需要配置的信息如下:

    "parameter": {
      #parameter中配置下面四項配置(例如datasource、table等其它配置項照常配置)。
      "mode": "single_version_and_update_only", # 配置導出模式。
      "isTimeseriesTable":"true",  # 配置導出為時序表。
      "column":[  #按照需求添加需要導出TableStore中的列,您可以自定義設置配置個數。
              {
                "name": "_m_name"       #度量名稱字段。
              },
              {
                "name": "_data_source"  #數據源字段。
              },
              {
                "name": "_tags"         #標簽字段,將tags轉換為string類型。
              },
              {
                "name": "tag1",       #標簽內部字段鍵名稱。
                "is_timeseries_tag":"true"  #表明該字段為tags內部字段。
              },
              {
                "name": "time"          #時間戳字段。
              },
              {
                 "name": "name"         #屬性列名稱。
              },
      ],
      "isExportSequenceInfo": false, #single_version_and_update_only模式下只能是false。
    }

    行模式導出的數據更接近于原始的行,易于后續處理,但需要注意以下問題:

    • 每次導出的行是從用戶每次更新的記錄中抽取,每一行數據與用戶的寫入或更新操作一一對應。如果用戶存在單獨更新某些列的行為,則會出現有一些記錄只有被更新的部分列,其它列為空的情況。

    • 行模式不會導出數據的版本號(即每列的時間戳),也無法進行刪除操作。

數據類型轉換列表

目前Tablestore Stream Reader支持所有的Tablestore類型,其針對Tablestore類型的轉換列表,如下所示。

類型分類

Tablestore Stream數據類型

整數類

INTEGER

浮點類

DOUBLE

字符串類

STRING

布爾類

BOOLEAN

二進制類

BINARY

數據同步任務開發

數據同步任務的配置入口和通用配置流程可參見下文的配置指導。

附錄:腳本Demo與參數說明

離線任務腳本配置方式

如果您配置離線任務時使用腳本模式的方式進行配置,您需要按照統一的腳本格式要求,在任務腳本中編寫相應的參數,詳情請參見通過腳本模式配置離線同步任務,以下為您介紹腳本模式下數據源的參數配置詳情。

Reader腳本Demo

  • 列模式

    {
        "type":"job",
        "version":"2.0",//版本號。
        "steps":[
            {
                "stepType":"otsstream",//插件名。
                "parameter":{
                    "datasource":"$srcDatasource",//數據源。
                    "dataTable":"",//表名。
                    "statusTable":"TableStoreStreamReaderStatusTable",//用于記錄狀態的表的名稱。
                    "maxRetries":30,//從 TableStore 中讀增量數據時,每次請求的最大重試次數,默認為30。
                    "isExportSequenceInfo":false,//是否導出時序信息。
                    "startTimeString":"${startTime}${hh}",//增量數據的時間范圍(左閉右開)的左邊界。參數配置中配置startTime=${yyyymmdd} hh=$[hh24miss],表示ots讀取開始時間為業務日期的定時時間
                    "endTimeString":"${endTime}${hh}"http://增量數據的時間范圍(左閉右開)的右邊界。參數配置中配置endTime=${yyyymmdd} hh=$[hh24miss],表示ots讀取結束時間為業務日期的定時時間
                },
                "name":"Reader",
                "category":"reader"
            },
            {
                "stepType":"stream",
                "parameter":{},
                "name":"Writer",
                "category":"writer"
            }
        ],
        "setting":{
            "errorLimit":{
                "record":"0"http://錯誤記錄數。
            },
            "speed":{
                "throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
                "concurrent":1,//作業并發數。
                "mbps":"12"http://限流,此處1mbps = 1MB/s。
    
            }
        },
        "order":{
            "hops":[
                {
                    "from":"Reader",
                    "to":"Writer"
                }
            ]
        }
    }
  • 行模式讀取寬表

    {
        "type":"job",
        "version":"2.0",//版本號。
        "steps":[
            {
                "stepType":"otsstream",//插件名。
                "parameter":{
                    "datasource":"$srcDatasource",//數據源。
                    "dataTable":"",//表名。
                    "statusTable":"TableStoreStreamReaderStatusTable",//用于記錄狀態的表的名稱。
                    "maxRetries":30,//從 TableStore 中讀增量數據時,每次請求的最大重試次數,默認為30。
                    "isExportSequenceInfo":false,//是否導出時序信息。
                    "startTimeString":"${startTime}${hh}",//增量數據的時間范圍(左閉右開)的左邊界。參數配置中配置startTime=${yyyymmdd} hh=$[hh24miss],表示ots讀取開始時間為業務日期的定時時間
                    "endTimeString":"${endTime}${hh}",//增量數據的時間范圍(左閉右開)的右邊界。參數配置中配置endTime=${yyyymmdd} hh=$[hh24miss],表示ots讀取結束時間為業務日期的定時時間
                    "mode": "single_version_and_update_only",
                    "column":[
                            {
                                "name":"pId"
                            },
                            {
                                "name": "uId"
                            },
                            {
                                "name":"col0"
                            },
                            {
                                "name": "col1"
                            }
                        ],
                        },
                "name":"Reader",
                "category":"reader"
            },
            {
                "stepType":"stream",
                "parameter":{},
                "name":"Writer",
                "category":"writer"
            }
        ],
        "setting":{
            "errorLimit":{
                "record":"0"http://錯誤記錄數。
            },
            "speed":{
                "throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
                "concurrent":1,//作業并發數。
                "mbps":"12"http://限流
    
            }
        },
        "order":{
            "hops":[
                {
                    "from":"Reader",
                    "to":"Writer"
                }
            ]
        }
    }
  • 行模式讀取時序表

    {
        "type":"job",
        "version":"2.0",//版本號。
        "steps":[
            {
                "stepType":"otsstream",//插件名。
                "parameter":{
                    "datasource":"$srcDatasource",//數據源。
                    "dataTable":"",//表名。
                    "statusTable":"TableStoreStreamReaderStatusTable",//用于記錄狀態的表的名稱。
                    "maxRetries":30,//從 TableStore 中讀增量數據時,每次請求的最大重試次數,默認為30。
                    "isExportSequenceInfo":false,//是否導出時序信息。
                    "startTimeString":"${startTime}${hh}",//增量數據的時間范圍(左閉右開)的左邊界。參數配置中配置startTime=${yyyymmdd} hh=$[hh24miss],表示ots讀取開始時間為業務日期的定時時間
                    "endTimeString":"${endTime}${hh}",//增量數據的時間范圍(左閉右開)的右邊界。參數配置中配置endTime=${yyyymmdd} hh=$[hh24miss],表示ots讀取結束時間為業務日期的定時時間
                    "mode": "single_version_and_update_only",
                    "isTimeseriesTable":"true",
                    "column": [
                              {
                                "name": "_m_name"
                              },
                              {
                                "name": "_data_source",
                              },
                              {
                                "name": "_tags",
                              },
                              {
                                "name": "string_column",
                              }
                        ]
                        },
                "name":"Reader",
                "category":"reader"
            },
            {
                "stepType":"stream",
                "parameter":{},
                "name":"Writer",
                "category":"writer"
            }
        ],
        "setting":{
            "errorLimit":{
                "record":"0"http://錯誤記錄數。
            },
            "speed":{
                "throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
                "concurrent":1,//作業并發數。
                "mbps":"12"http://限流,此處1mbps = 1MB/s。
    
            }
        },
        "order":{
            "hops":[
                {
                    "from":"Reader",
                    "to":"Writer"
                }
            ]
        }
    }

Reader腳本參數

參數

描述

是否必選

默認值

datasource

數據源名稱,腳本模式支持添加數據源,該配置項填寫的內容必須與添加的數據源名稱保持一致。

dataTable

導出增量數據的表的名稱。該表需要開啟Stream,可以在建表時開啟,或者使用UpdateTable接口開啟。

statusTable

Reader插件用于記錄狀態的表的名稱,這些狀態可用于減少對非目標范圍內的數據的掃描,從而加快導出速度。statusTable是Reader用于保存狀態的表,如果該表不存在,Reader會自動創建該表。一次離線導出任務完成后,您無需刪除該表,該表中記錄的狀態可用于下次導出任務中:

  • 您無需創建該表,只需要給出一個表名。Reader插件會嘗試在您的instance下創建該表,如果該表不存在即創建新表。如果該表已存在,會判斷該表的Meta是否與期望一致,如果不一致會拋出異常。

  • 在一次導出完成之后,您無需刪除該表,該表的狀態可以用于下次的導出任務。

  • 該表會開啟TTL,數據自動過期,會認為其數據量很小。

  • 針對同一個instance下的多個不同的dataTable的Reader配置,可以使用同一個statusTable,記錄的狀態信息互不影響。

您配置一個類似TableStoreStreamReaderStatusTable的名稱即可,請注意不要與業務相關的表重名。

startTimestampMillis

增量數據的時間范圍(左閉右開)的左邊界,單位為毫秒:

  • Reader插件會從statusTable中找對應startTimestampMillis的位點,從該點開始讀取開始導出數據。

  • 如果statusTable中找不到對應的位點,則從系統保留的增量數據的第一條開始讀取,并跳過寫入時間小于startTimestampMillis的數據。

endTimestampMillis

增量數據的時間范圍(左閉右開)的右邊界,單位為毫秒:

  • Reader插件從startTimestampMillis位置開始導出數據后,當遇到第一條時間戳大于等于endTimestampMillis的數據時,結束導出數據,導出完成。

  • 當讀取完當前全部的增量數據時,即使未達到endTimestampMillis,也會結束讀取。

date

日期格式為yyyyMMdd,例如20151111,表示導出該日的數據。如果沒有指定date,則需要指定startTimestampMillisendTimestampMillisstartTimeStringendTimeString,反之也成立。例如,采云間調度僅支持天級別,所以提供該配置,作用與startTimestampMillisendTimestampMillisstartTimeStringendTimeString類似。

isExportSequenceInfo

是否導出時序信息,時序信息包含了數據的寫入時間等。默認該值為false,即不導出。

false

maxRetries

從TableStore中讀增量數據時,每次請求的最大重試次數,默認為30次。重試之間有間隔,重試30次的總時間約為5分鐘,通常無需更改。

30

startTimeString

任務的開始時間,即增量數據的時間范圍(左閉右開)的左邊界,格式為yyyymmddhh24miss,單位為秒。

endTimeString

任務的結束時間,即增量數據的時間范圍(左閉右開)的右邊界,格式為yyyymmddhh24miss,單位為秒。

enableSeekIterator

Reader插件需要先確定增量位點,然后再拉取數據,如果是經常運行的任務,插件會根據之前掃描的位點來確定位置。如果之前沒運行過這個插件,將會從增量開始位置(默認增量保留7天,即7天前)開始掃描,因此當還沒有掃描到設置的開始時間之后的數據時,會存在開始一段時間沒有數據導出的情況,您可以在reader的配置參數里增加 "enableSeekIterator": true的配置,幫助您加快位點定位。

false

mode

導出模式,設置為single_version_and_update_only時為行模式,默認不設置為列模式。

isTimeseriesTable

是否為時序表,只有在行模式,即modesingle_version_and_update_only時配置生效。

false

column

column配置single_version_and_update_only模式下,所導出的數據列,配置樣例:

"column":[
    {"name":"pk1"},
	{"name":"col1"},
	{"name":"col2","dataType":"new"},
	{"name":"col2","dataType":"old"},
	{"name":"col2","dataType":"latest"}
],
  • name字段表示要導出的數據列的名稱,必須配置。

  • dataType字段表示要導出的數據類型,默認為new類型,非必須配置。dataType支持三種枚舉類型

    • new:表示本列更新后的值

    • old:表示本列更新前的值

    • latest:表中本列的當前最新值

說明

在行模式下必須配置,否則不會導出數據。

  • 行模式下:是

  • 列模式下:否