日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

使用教程(寬表模型)

表格存儲支持作為實時計算Flink的源表和結果表使用,您可以將表格存儲數據表中的數據經過Flink處理后得到的結果保存到表格存儲的另一張數據表中。

背景信息

實時計算Flink能將Tunnel Service的數據通道作為流式數據的輸入,每條數據類似一個JSON格式。示例如下:

{
  "OtsRecordType": "PUT", 
  "OtsRecordTimestamp": 1506416585740836, 
  "PrimaryKey": [
    {
      "ColumnName": "pk_1", 
      "Value": 1506416585881590900
    },
    {
      "ColumnName": "pk_2", 
      "Value": "string_pk_value"
    }
  ],
  "Columns": [
    {
      "OtsColumnType": "Put", 
      "ColumnName": "attr_0",
      "Value": "hello_table_store",
    },
    {
      "OtsColumnType": "DELETE_ONE_VERSION", 
      "ColumnName": "attr_1"
    }
  ]
}

字段名

描述

OtsRecordType

數據操作類型,取值范圍如下:

  • PUT:新增數據操作。

  • UPDATE:更新數據操作。

  • DELETE:刪除數據操作。

OtsRecordTimestamp

數據操作時間,單位為微秒。全量數據時取值為0。

PrimaryKey

主鍵列信息,以JSON格式數組表示。支持配置1~4列,請以實際主鍵列為準。包括如下選項:

  • ColumnName:列名稱。

  • Value:列值。

Columns

屬性列信息,以JSON格式的數組表示。包括如下選項:

  • OtsColumnType:列操作類型。取值范圍為PUT、DELETE_ONE_VERSION、DELETE_ALL_VERSION。

  • ColumnName:列名。

  • Value:列值。

    當設置OtsColumnTypeDELETE_ONE_VERSION或者DELETE_ALL_VERSION時,不需要配置該參數。

Tablestore數據源表

存儲在Tablestore中數據的主鍵和屬性列值均可以在Flink中通過數據源表DDL以列名與相應的類型映射進行讀取。更多信息,請參見表格存儲Tablestore連接器

DDL定義

數據源表的DDL定義示例如下:

CREATE TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector'='ots',
    'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
    'instanceName' = 'flink-source',
    'tableName' ='flink_source_table',
    'tunnelName' = 'flinksourcestream',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' //是否忽略delete操作的數據.
);

除了待消費的用戶數據外,Tunnel Service返回數據中的OtsRecordType、OtsRecordTimestamp字段均支持通過屬性字段的方式讀取。字段說明請參見下表。

字段名

Flink映射名

描述

OtsRecordType

type

數據操作類型。

OtsRecordTimestamp

timestamp

數據操作時間,單位為微秒。全量數據時取值為0。

當需要讀取OtsRecordTypeOtsRecordTimestamp字段時,Flink提供了METADATA關鍵字用于獲取源表中的屬性字段,具體DDL示例如下:

CREATE TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR,
    record_type STRING METADATA FROM 'type',
    record_timestamp BIGINT METADATA FROM 'timestamp'
) WITH (
    ...
);

WITH參數

參數

是否必填

描述

connector

源表類型。固定取值為ots。

endPoint

表格存儲實例的服務地址。更多信息,請參見服務地址

instanceName

表格存儲的實例名稱。

tableName

表格存儲的數據表名稱。

tunnelName

表格存儲數據表的數據通道名稱。關于創建通道的具體操作,請參見創建數據通道

accessId

阿里云賬號或者RAM用戶的AccessKey(包括AccessKey IDAccessKey Secret)。獲取AccessKey的具體操作,請參見獲取AccessKey

accessKey

ignoreDelete

是否忽略DELETE操作類型的實時數據。默認值為false,表示不忽略DELETE操作類型的實時數據。

skipInvalidData

是否忽略臟數據。默認值為false,表示不忽略臟數據。

如果不忽略臟數據,則處理臟數據時會進行報錯。如果需要忽略臟數據,請設置此參數為true。

源表字段類型映射

Tablestore字段類型

Flink字段類型

INTEGER

BIGINT

STRING

STRING

BOOLEAN

BOOLEAN

DOUBLE

DOUBLE

BINARY

BINARY

Tablestore數據結果表

Flink支持使用Tablestore存儲輸出結果。更多信息,請參見表格存儲Tablestore連接器

DDL定義

結果表的DDL定義示例如下:

說明

Tablestore數據結果表定義中除了主鍵列外,需要包含至少一個屬性列。

CREATE TABLE ots_sink (
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR,
    PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
     ...
);

WITH參數

參數

是否必填

描述

connector

結果表類型。固定取值為ots。

endPoint

表格存儲實例的服務地址。更多信息,請參見服務地址

instanceName

表格存儲的實例名稱。

tableName

表格存儲的數據表名稱。

tunnelName

表格存儲數據表的數據通道名稱。關于創建通道的具體操作,請參見創建數據通道

accessId

阿里云賬號或者RAM用戶的AccessKey(包括AccessKey IDAccessKey Secret)。獲取AccessKey的具體操作,請參見獲取AccessKey

accessKey

valueColumns

指定插入的字段列名。插入多個字段以半角逗號(,)分隔。例如ID,NAME

bufferSize

流入多少條數據后開始輸出。默認值為5000,表示輸入的數據達到5000條就開始輸出。

batchWriteTimeoutMs

寫入超時的時間。單位為毫秒。默認值為5000,表示如果緩存中的數據在等待5秒后,依然沒有達到輸出條件,系統會自動輸出緩存中的所有數據。

batchSize

一次批量寫入的條數。默認值為100。

retryIntervalMs

重試間隔時間,單位毫秒。默認值為1000。

maxRetryTimes

最大重試次數。默認值為100。

ignoreDelete

是否忽略DELETE操作類型的實時數據。默認值為false,表示不忽略DELETE操作類型的實時數據。

autoIncrementKey

當結果表中包含主鍵自增列時,通過該參數指定主鍵自增列的列名稱。

defaultTimestampInMillisecond

寫入結果表的數據的版本號,單位為毫秒。當不進行配置時,版本號取決于寫入的時間。

結果表字段類型映射

Flink字段類型

Tablestore字段類型

BINARY

BINARY

VARBINARY

BINARY

CHAR

STRING

VARCHAR

STRING

TINYINT

INTEGER

SMALLINT

INTEGER

INTEGER

INTEGER

BIGINT

INTEGER

FLOAT

DOUBLE

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

SQL示例

讀取數據源表的數據

批量從數據源表ots source中讀取數據,SQL示例如下:

CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector'='ots',
    'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
    'instanceName' = 'flink-source',
    'tableName' ='flink_source_table',
    'tunnelName' = 'flinksourcestream',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' //是否忽略delete操作的數據。
);
SELECT * FROM tablestore_stream LIMIT 100;

數據同步到結果表

ots sink數據會以updateRow的方式寫入結果表,SQL示例如下:

CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector'='ots',
    'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
    'instanceName' = 'flink-source',
    'tableName' ='flink_source_table',
    'tunnelName' = 'flinksourcestream',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' //是否忽略delete操作的數據。
);

CREATE TEMPORARY TABLE ots_sink (
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR,
    PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
    'connector'='ots',
    'endPoint'='https://flink-sink.cn-hangzhou.ots.aliyuncs.com',
    'instanceName'='flink-sink',
    'tableName'='flink_sink_table',
    'accessId'='xxxxxxxxxxx',
    'accessKey'='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'valueColumns'='customerid,customername'
);

INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;

實時計算作業開發流程

前提條件

  • 已創建AccessKey。具體操作,請參見創建AccessKey

  • 已為表格存儲數據表(源表)創建數據通道。具體操作,請參見創建數據通道

步驟一:創建作業

  1. 登錄實時計算控制臺

  2. Flink全托管頁簽,單擊目標工作空間操作列下的控制臺

  3. 在左側導航欄,單擊SQL開發

  4. 單擊新建

  5. 新建作業草稿對話框中,單擊空白的流作業草稿

    Flink全托管也為您提供了豐富的代碼模板和數據同步,每種代碼模板都為您提供了具體的使用場景、代碼示例和使用指導。您可以直接單擊對應的模板快速地了解Flink產品功能和相關語法,實現您的業務邏輯,詳情請參見代碼模板數據同步模板

  6. 單擊下一步

  7. 填寫作業配置信息。

    作業參數

    說明

    示例

    文件名稱

    作業的名稱。

    說明

    作業名稱在當前項目中必須保持唯一。

    flink-test

    存儲位置

    指定該作業的代碼文件所屬的文件夾。

    您還可以在現有文件夾右側,單擊新建文件夾圖標,新建子文件夾。

    作業草稿

    引擎版本

    當前作業使用的Flink的引擎版本。引擎版本號含義、版本對應關系和生命周期重要時間點詳情請參見引擎版本介紹

    vvr-6.0.4-flink-1.15

  8. 單擊創建

步驟二:編寫作業代碼

  1. 創建一個源表和結果表的臨時表。

    說明

    在生產作業中,建議您盡量減少臨時表的使用,直接使用元數據管理中已經注冊的表。

    創建一個tablestore_streamots_sink臨時表代碼示例如下:

    CREATE TEMPORARY TABLE tablestore_stream(
        `order` VARCHAR,
        orderid VARCHAR,
        customerid VARCHAR,
        customername VARCHAR
    ) WITH (
        'connector'='ots',
        'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
        'instanceName' = 'flink-source',
        'tableName' ='flink_source_table',
        'tunnelName' = 'flinksourcestream',
        'accessId' ='xxxxxxxxxxx',
        'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
        'ignoreDelete' = 'false' //是否忽略delete操作的數據。
    );
    
    CREATE TEMPORARY TABLE ots_sink (
        `order` VARCHAR,
        orderid VARCHAR,
        customerid VARCHAR,
        customername VARCHAR,
        PRIMARY KEY (`order`,orderid) NOT ENFORCED
    ) WITH (
        'connector'='ots',
        'endPoint'='https://flink-sink.cn-hangzhou.ots.aliyuncs.com',
        'instanceName'='flink-sink',
        'tableName'='flink_sink_table',
        'accessId'='xxxxxxxxxxx',
        'accessKey'='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
        'valueColumns'='customerid,customername'
    );
  2. 編寫作業邏輯。

    將源表數據插入到結果表的代碼示例如下:

    INSERT INTO ots_sink
    SELECT `order`, orderid, customerid, customername FROM tablestore_stream;

步驟三:進行更多配置

在作業開發頁面右側,單擊更多配置后,您可以填寫以下參數信息:

  • 引擎版本:修改您創建作業時選擇的Flink引擎版本。

    說明 VVR 3.0.3版本(對應Flink 1.12版本)開始,VVP支持同時運行多個不同引擎版本的SQL作業。如果您的作業已使用了Flink 1.12及更早版本的引擎,您需要按照以下情況進行處理:
    • Flink 1.12版本:停止后啟動作業,系統將自動將引擎升級為vvr-3.0.3-flink-1.12版本。
    • Flink 1.11Flink 1.10版本:手動將作業引擎版本升級到vvr-3.0.3-flink-1.12vvr-4.0.7-flink-1.13版本后重啟作業,否則會在啟動作業時超時報錯。
  • 附加依賴文件:作業中需要使用到的附加依賴,例如臨時函數等。

步驟四:進行深度檢查

在作業開發頁面頂部,單擊深度檢查,進行語法檢查。

步驟五:(可選)進行作業調試

在作業開發頁面頂部,單擊調試

您可以使用作業調試功能模擬作業運行、檢查輸出結果,驗證SELECTINSERT業務邏輯的正確性,提升開發效率,降低數據質量風險。具體操作,請參見作業調試

步驟六:作業部署

在作業開發頁面頂部,單擊部署,在部署新版本對話框,可根據需要填寫或選中相關內容,單擊確認部署

說明

Session集群適用于非生產環境的開發測試環境,您可以使用Session集群模式部署或調試作業,提高作業JM(Job Manager)資源利用率和提高作業啟動速度。但不推薦您將作業提交至Session集群中,因為會存在業務穩定性問題。具體操作,請參見步驟一:創建Session集群

步驟七:啟動并查看Flink計算結果

說明

如果您對作業進行了修改(例如更改SQL代碼、增刪改WITH參數、更改作業版本等),且希望修改生效,則需要先上線,然后停止再啟動。另外,如果作業無法復用State,希望作業全新啟動時,也需要停止后再啟動作業。關于作業停止的具體操作,請參見作業停止

  1. 在左側導航欄,單擊作業運維

  2. 單擊目標作業名稱操作列中的啟動

    作業啟動參數配置詳情請參見作業啟動。單擊啟動后,您可以看到作業狀態變為運行中,則代表作業運行正常。

  3. 在作業運維詳情頁面,查看Flink計算結果。

    1. 作業運維頁面,單擊目標作業名稱。

    2. 單擊作業探查

    3. 運行日志頁簽,單擊運行Task Managers頁簽下的Path, ID

    4. 單擊日志,在頁面搜索Sink相關的日志信息。

      image..png