表格存儲支持作為實時計算Flink的源表和結果表使用,您可以將表格存儲數據表中的數據經過Flink處理后得到的結果保存到表格存儲的另一張數據表中。
背景信息
實時計算Flink能將Tunnel Service的數據通道作為流式數據的輸入,每條數據類似一個JSON格式。示例如下:
{
"OtsRecordType": "PUT",
"OtsRecordTimestamp": 1506416585740836,
"PrimaryKey": [
{
"ColumnName": "pk_1",
"Value": 1506416585881590900
},
{
"ColumnName": "pk_2",
"Value": "string_pk_value"
}
],
"Columns": [
{
"OtsColumnType": "Put",
"ColumnName": "attr_0",
"Value": "hello_table_store",
},
{
"OtsColumnType": "DELETE_ONE_VERSION",
"ColumnName": "attr_1"
}
]
}
字段名 | 描述 |
OtsRecordType | 數據操作類型,取值范圍如下:
|
OtsRecordTimestamp | 數據操作時間,單位為微秒。全量數據時取值為0。 |
PrimaryKey | 主鍵列信息,以JSON格式數組表示。支持配置1~4列,請以實際主鍵列為準。包括如下選項:
|
Columns | 屬性列信息,以JSON格式的數組表示。包括如下選項:
|
Tablestore數據源表
存儲在Tablestore中數據的主鍵和屬性列值均可以在Flink中通過數據源表DDL以列名與相應的類型映射進行讀取。更多信息,請參見表格存儲Tablestore連接器。
DDL定義
數據源表的DDL定義示例如下:
CREATE TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH (
'connector'='ots',
'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
'instanceName' = 'flink-source',
'tableName' ='flink_source_table',
'tunnelName' = 'flinksourcestream',
'accessId' ='xxxxxxxxxxx',
'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'ignoreDelete' = 'false' //是否忽略delete操作的數據.
);
除了待消費的用戶數據外,Tunnel Service返回數據中的OtsRecordType、OtsRecordTimestamp字段均支持通過屬性字段的方式讀取。字段說明請參見下表。
字段名 | Flink映射名 | 描述 |
OtsRecordType | type | 數據操作類型。 |
OtsRecordTimestamp | timestamp | 數據操作時間,單位為微秒。全量數據時取值為0。 |
當需要讀取OtsRecordType和OtsRecordTimestamp字段時,Flink提供了METADATA關鍵字用于獲取源表中的屬性字段,具體DDL示例如下:
CREATE TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
record_type STRING METADATA FROM 'type',
record_timestamp BIGINT METADATA FROM 'timestamp'
) WITH (
...
);
WITH參數
參數 | 是否必填 | 描述 |
connector | 是 | 源表類型。固定取值為ots。 |
endPoint | 是 | 表格存儲實例的服務地址。更多信息,請參見服務地址。 |
instanceName | 是 | 表格存儲的實例名稱。 |
tableName | 是 | 表格存儲的數據表名稱。 |
tunnelName | 是 | 表格存儲數據表的數據通道名稱。關于創建通道的具體操作,請參見創建數據通道。 |
accessId | 是 | 阿里云賬號或者RAM用戶的AccessKey(包括AccessKey ID和AccessKey Secret)。獲取AccessKey的具體操作,請參見獲取AccessKey。 |
accessKey | 是 | |
ignoreDelete | 否 | 是否忽略DELETE操作類型的實時數據。默認值為false,表示不忽略DELETE操作類型的實時數據。 |
skipInvalidData | 否 | 是否忽略臟數據。默認值為false,表示不忽略臟數據。 如果不忽略臟數據,則處理臟數據時會進行報錯。如果需要忽略臟數據,請設置此參數為true。 |
源表字段類型映射
Tablestore字段類型 | Flink字段類型 |
INTEGER | BIGINT |
STRING | STRING |
BOOLEAN | BOOLEAN |
DOUBLE | DOUBLE |
BINARY | BINARY |
Tablestore數據結果表
Flink支持使用Tablestore存儲輸出結果。更多信息,請參見表格存儲Tablestore連接器。
DDL定義
結果表的DDL定義示例如下:
在Tablestore數據結果表定義中除了主鍵列外,需要包含至少一個屬性列。
CREATE TABLE ots_sink (
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
...
);
WITH參數
參數 | 是否必填 | 描述 |
connector | 是 | 結果表類型。固定取值為ots。 |
endPoint | 是 | 表格存儲實例的服務地址。更多信息,請參見服務地址。 |
instanceName | 是 | 表格存儲的實例名稱。 |
tableName | 是 | 表格存儲的數據表名稱。 |
tunnelName | 是 | 表格存儲數據表的數據通道名稱。關于創建通道的具體操作,請參見創建數據通道。 |
accessId | 是 | 阿里云賬號或者RAM用戶的AccessKey(包括AccessKey ID和AccessKey Secret)。獲取AccessKey的具體操作,請參見獲取AccessKey。 |
accessKey | 是 | |
valueColumns | 是 | 指定插入的字段列名。插入多個字段以半角逗號(,)分隔。例如 |
bufferSize | 否 | 流入多少條數據后開始輸出。默認值為5000,表示輸入的數據達到5000條就開始輸出。 |
batchWriteTimeoutMs | 否 | 寫入超時的時間。單位為毫秒。默認值為5000,表示如果緩存中的數據在等待5秒后,依然沒有達到輸出條件,系統會自動輸出緩存中的所有數據。 |
batchSize | 否 | 一次批量寫入的條數。默認值為100。 |
retryIntervalMs | 否 | 重試間隔時間,單位毫秒。默認值為1000。 |
maxRetryTimes | 否 | 最大重試次數。默認值為100。 |
ignoreDelete | 否 | 是否忽略DELETE操作類型的實時數據。默認值為false,表示不忽略DELETE操作類型的實時數據。 |
autoIncrementKey | 否 | 當結果表中包含主鍵自增列時,通過該參數指定主鍵自增列的列名稱。 |
defaultTimestampInMillisecond | 否 | 寫入結果表的數據的版本號,單位為毫秒。當不進行配置時,版本號取決于寫入的時間。 |
結果表字段類型映射
Flink字段類型 | Tablestore字段類型 |
BINARY | BINARY |
VARBINARY | BINARY |
CHAR | STRING |
VARCHAR | STRING |
TINYINT | INTEGER |
SMALLINT | INTEGER |
INTEGER | INTEGER |
BIGINT | INTEGER |
FLOAT | DOUBLE |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
SQL示例
讀取數據源表的數據
批量從數據源表ots source中讀取數據,SQL示例如下:
CREATE TEMPORARY TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH (
'connector'='ots',
'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
'instanceName' = 'flink-source',
'tableName' ='flink_source_table',
'tunnelName' = 'flinksourcestream',
'accessId' ='xxxxxxxxxxx',
'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'ignoreDelete' = 'false' //是否忽略delete操作的數據。
);
SELECT * FROM tablestore_stream LIMIT 100;
數據同步到結果表
ots sink數據會以updateRow的方式寫入結果表,SQL示例如下:
CREATE TEMPORARY TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH (
'connector'='ots',
'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
'instanceName' = 'flink-source',
'tableName' ='flink_source_table',
'tunnelName' = 'flinksourcestream',
'accessId' ='xxxxxxxxxxx',
'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'ignoreDelete' = 'false' //是否忽略delete操作的數據。
);
CREATE TEMPORARY TABLE ots_sink (
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
'connector'='ots',
'endPoint'='https://flink-sink.cn-hangzhou.ots.aliyuncs.com',
'instanceName'='flink-sink',
'tableName'='flink_sink_table',
'accessId'='xxxxxxxxxxx',
'accessKey'='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'valueColumns'='customerid,customername'
);
INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;
實時計算作業開發流程
前提條件
已創建AccessKey。具體操作,請參見創建AccessKey。
已為表格存儲數據表(源表)創建數據通道。具體操作,請參見創建數據通道。
步驟一:創建作業
登錄實時計算控制臺。
在Flink全托管頁簽,單擊目標工作空間操作列下的控制臺。
在左側導航欄,單擊SQL開發。
單擊新建。
在新建作業草稿對話框中,單擊空白的流作業草稿。
Flink全托管也為您提供了豐富的代碼模板和數據同步,每種代碼模板都為您提供了具體的使用場景、代碼示例和使用指導。您可以直接單擊對應的模板快速地了解Flink產品功能和相關語法,實現您的業務邏輯,詳情請參見代碼模板和數據同步模板。
單擊下一步。
填寫作業配置信息。
作業參數
說明
示例
文件名稱
作業的名稱。
說明作業名稱在當前項目中必須保持唯一。
flink-test
存儲位置
指定該作業的代碼文件所屬的文件夾。
您還可以在現有文件夾右側,單擊圖標,新建子文件夾。
作業草稿
引擎版本
當前作業使用的Flink的引擎版本。引擎版本號含義、版本對應關系和生命周期重要時間點詳情請參見引擎版本介紹。
vvr-6.0.4-flink-1.15
單擊創建。
步驟二:編寫作業代碼
創建一個源表和結果表的臨時表。
說明在生產作業中,建議您盡量減少臨時表的使用,直接使用元數據管理中已經注冊的表。
創建一個tablestore_stream和ots_sink臨時表代碼示例如下:
CREATE TEMPORARY TABLE tablestore_stream( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR ) WITH ( 'connector'='ots', 'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com', 'instanceName' = 'flink-source', 'tableName' ='flink_source_table', 'tunnelName' = 'flinksourcestream', 'accessId' ='xxxxxxxxxxx', 'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx', 'ignoreDelete' = 'false' //是否忽略delete操作的數據。 ); CREATE TEMPORARY TABLE ots_sink ( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR, PRIMARY KEY (`order`,orderid) NOT ENFORCED ) WITH ( 'connector'='ots', 'endPoint'='https://flink-sink.cn-hangzhou.ots.aliyuncs.com', 'instanceName'='flink-sink', 'tableName'='flink_sink_table', 'accessId'='xxxxxxxxxxx', 'accessKey'='xxxxxxxxxxxxxxxxxxxxxxxxxxxx', 'valueColumns'='customerid,customername' );
編寫作業邏輯。
將源表數據插入到結果表的代碼示例如下:
INSERT INTO ots_sink SELECT `order`, orderid, customerid, customername FROM tablestore_stream;
步驟三:進行更多配置
在作業開發頁面右側,單擊更多配置后,您可以填寫以下參數信息:
引擎版本:修改您創建作業時選擇的Flink引擎版本。
說明 從VVR 3.0.3版本(對應Flink 1.12版本)開始,VVP支持同時運行多個不同引擎版本的SQL作業。如果您的作業已使用了Flink 1.12及更早版本的引擎,您需要按照以下情況進行處理:- Flink 1.12版本:停止后啟動作業,系統將自動將引擎升級為vvr-3.0.3-flink-1.12版本。
- Flink 1.11或Flink 1.10版本:手動將作業引擎版本升級到vvr-3.0.3-flink-1.12或vvr-4.0.7-flink-1.13版本后重啟作業,否則會在啟動作業時超時報錯。
附加依賴文件:作業中需要使用到的附加依賴,例如臨時函數等。
步驟四:進行深度檢查
在作業開發頁面頂部,單擊深度檢查,進行語法檢查。
步驟五:(可選)進行作業調試
在作業開發頁面頂部,單擊調試。
您可以使用作業調試功能模擬作業運行、檢查輸出結果,驗證SELECT或INSERT業務邏輯的正確性,提升開發效率,降低數據質量風險。具體操作,請參見作業調試。
步驟六:作業部署
在作業開發頁面頂部,單擊部署,在部署新版本對話框,可根據需要填寫或選中相關內容,單擊確認。
Session集群適用于非生產環境的開發測試環境,您可以使用Session集群模式部署或調試作業,提高作業JM(Job Manager)資源利用率和提高作業啟動速度。但不推薦您將作業提交至Session集群中,因為會存在業務穩定性問題。具體操作,請參見步驟一:創建Session集群。
步驟七:啟動并查看Flink計算結果
如果您對作業進行了修改(例如更改SQL代碼、增刪改WITH參數、更改作業版本等),且希望修改生效,則需要先上線,然后停止再啟動。另外,如果作業無法復用State,希望作業全新啟動時,也需要停止后再啟動作業。關于作業停止的具體操作,請參見作業停止。
在左側導航欄,單擊作業運維。
單擊目標作業名稱操作列中的啟動。
作業啟動參數配置詳情請參見作業啟動。單擊啟動后,您可以看到作業狀態變為運行中,則代表作業運行正常。
在作業運維詳情頁面,查看Flink計算結果。
在作業運維頁面,單擊目標作業名稱。
單擊作業探查。
在運行日志頁簽,單擊運行Task Managers頁簽下的Path, ID。
單擊日志,在頁面搜索Sink相關的日志信息。