日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

對象存儲OSS

本文為您介紹如何使用對象存儲OSS連接器。

阿里云對象存儲OSS(Object Storage Service)是一款海量、安全、低成本和高可靠的云存儲服務,可提供99.9999999999%(12個9)的數據持久性,99.995%的數據可用性。多種存儲類型供選擇,全面優化存儲成本。

類別

詳情

支持類型

源表和結果表

運行模式

批模式和流模式

數據格式

Orc、Parquet、Avro、Csv、JSON和Raw

說明

僅實時計算引擎VVR 6.0.7及以上版本支持讀取Parquet格式的數據。

特有監控指標

暫無

API種類

Datastream和SQL

是否支持更新或刪除結果表數據

不支持更新和刪除結果表數據,只支持插入數據。

使用限制

  • 通用

    • 僅Flink計算引擎VVR 4.0.14及以上版本支持讀取或寫入OSS。

    • Flink計算引擎VVR 8.0.6以下版本僅支持讀取或寫入相同賬號下的OSS。

      說明

      如需讀寫其他賬號下的OSS,請使用Flink計算引擎VVR 8.0.6及以上版本且配置Bucket鑒權信息,詳情請參見配置Bucket鑒權信息。

  • 結果表獨有

    • 對于寫入OSS,目前暫不支持寫Avro、CSV、JSON和Raw此類行存的格式,具體原因請參見FLINK-30635。

    • 僅Flink計算引擎VVR6.0.6及以上版本支持寫入OSS-HDFS服務,具體請參見寫OSS-HDFS。

語法結構

CREATE TABLE OssTable (
  column_name1 INT,
  column_name2 STRING,
  ...
  datetime STRING,
  `hour` STRING
) PARTITIONED BY (datetime, `hour`) WITH (
  'connector' = 'filesystem',
  'path' = 'oss://<bucket>/path',
  'format' = '...'
);

元信息列

您可以在源表中讀取信息列,以獲取讀取OSS數據對應的元信息。例如,如果在OSS源表中定義了元信息列file.path,則該列的值為該行數據所在的文件路徑。元信息列的使用示例如下。

CREATE TABLE MyUserTableWithFilepath (
  column_name1 INT,
  column_name2 STRING,
  `file.path` STRING NOT NULL METADATA
) WITH (
  'connector' = 'filesystem',
  'path' = 'oss://<bucket>/path',
  'format' = 'json'
)

下表列出了OSS源表所支持的元信息列:

Key

數據類型

說明

file.path

STRING NOT NULL

該行數據所在的文件路徑。

file.name

STRING NOT NULL

該行數據所在的文件名,即距離文件根路徑最遠的元素。

file.size

BIGINT NOT NULL

該行數據所在的文件的字節數。

file.modification-time

TIMESTAMP_LTZ(3) NOT NULL

該行數據所在的文件的修改時間。

WITH參數

  • 通用

    參數

    說明

    數據類型

    是否必填

    默認值

    備注

    connector

    表類型。

    String

    固定值為filesystem。

    path

    文件系統路徑。

    String

    URI格式,例如oss://my_bucket/my_path

    說明

    VVR 8.0.6及以上版本配置該參數后,您還需要配置Bucket鑒權信息,才能正常讀寫指定文件系統路徑下的數據,配置方法請參見配置Bucket鑒權信息

    format

    文件格式。

    String

    參數取值如下:

    • csv

    • json

    • avro

    • parquet

    • orc

    • raw

  • 源表獨有

    參數

    說明

    數據類型

    是否必填

    默認值

    備注

    source.monitor-interval

    設置新文件的監控時間間隔,并且必須設置>0的值。

    Duration

    如果未設置此配置項,則提供的路徑僅會被掃描一次,因此源將是有界的。

    每個文件都由其路徑唯一標識,一旦發現新文件,就會處理一次。

    已處理的文件在source的整個生命周期內存儲在state中。因此,source的state在checkpoint和savepoint時進行保存。更短的時間間隔文件會被更快地發現,但也會更頻繁地遍歷文件系統或對象存儲。

  • 結果表獨有

    參數

    說明

    數據類型

    是否必填

    默認值

    備注

    partition.default-name

    在分區字段值為NULL或空字符串時,分區的名稱。

    String

    _DEFAULT_PARTITION__

    無。

    sink.rolling-policy.file-size

    滾動前,文件大小的最大值。

    MemorySize

    128 MB

    寫入目錄下的數據被分割到part文件中。每個分區對應sink的收到數據的subtask都至少會為該分區生成一個part文件。根據可配置的滾動策略,當前in-progress part文件將被關閉,生成新的part文件。該策略基于大小和指定的文件可被打開的最大timeout時長,來滾動part文件。

    說明

    對于列存格式來說,

    即使文件不滿足設置的滾動策略,但是在做checkpoint時,總是會滾動文件。

    所以只要文件滿足了設置的滾動策略或者做了checkpoint,文件總是會被滾動。

    而對于行存格式來說,只有在滿足rolling policy配置的情況下才會滾動文件。

    sink.rolling-policy.rollover-interval

    滾動前,part文件處于打開狀態的最大時長。

    Duration

    30min

    檢查頻率是由sink.rolling-policy.check-interval屬性控制。

    sink.rolling-policy.check-interval

    基于時間滾動策略的檢查間隔。

    Duration

    1min

    該屬性控制了基于sink.rolling-policy.rollover-interval屬性的檢查文件是否該被滾動了。

    auto-compaction

    在流式結果表中是否開啟自動合并功能。數據首先會被寫入臨時文件。當checkpoint完成后,該檢查點產生的臨時文件會被合并,臨時文件在合并前不可見。

    Boolean

    false

    如果啟用文件合并功能,會根據目標文件大小,將多個小文件合并成大文件。在生產環境中使用文件合并功能時,需要注意:

    • 只有checkpoint內部的文件才會被合并,會至少生成與checkpoint個數相同的文件個數。

    • 合并前文件不可見,文件的可見時間是checkpoint間隔時長+合并時長

    • 合并時間過長,將導致反壓,延長checkpoint所需時間。

    compaction.file-size

    合并目標文件大小。

    MemorySize

    128 MB

    默認值與滾動文件大小sink.rolling-policy.file-size相同。

    sink.partition-commit.trigger

    分區提交觸發器類型。

    String

    process-time

    對于寫分區表,Flink提供了兩種類型分區提交觸發器,類型如下兩種:

    • process-time:分區提交觸發器基于分區創建時間和當前系統時間,既不需要分區時間提取器,也不需要watermark生成器。一旦當前系統時間超過了分區創建系統時間和sink.partition-commit.delay之和,立即提交分區。這種觸發器更具通用性,但不是很精確。例如,數據延遲或故障將導致過早提交分區。

    • partition-time:基于提取的分區時間,需要watermark生成。這需要Job支持watermark生成,分區是根據時間來切割的,例如,按小時或按天分區。一旦watermark超過了分區創建系統時間和sink.partition-commit.delay之和立即提交分區。

    sink.partition-commit.delay

    分區被提交的最大延遲時間。表明該延遲時間之前分區不會被提交。

    Duration

    0s

    • 如果是按天分區,可以設置為1 d。

    • 如果是按小時分區,應設置為1 h

    sink.partition-commit.watermark-time-zone

    解析Long類型的watermark到TIMESTAMP類型時所采用的時區,解析得到的watermark的TIMESTAMP會跟分區時間進行比較,以判斷該分區是否需要被提交。

    String

    UTC

    僅當sink.partition-commit.trigger被設置為partition-time時有效。

    • 如果設置得不正確,例如,在TIMESTAMP_LTZ類型的列上定義了source rowtime,如果沒有設置該屬性,那么用戶可能會在若干個小時后才看到分區的提交。默認值為UTC,意味著watermark是定義在TIMESTAMP類型的列上或者沒有定義watermark。

    • 如果watermark定義在TIMESTAMP_LTZ類型的列上,watermark時區必須是會話時區。該屬性的可選值要么是完整的時區名(例如'America/Los_Angeles'),要么是自定義時區(例如'GMT-08:00')。

    partition.time-extractor.kind

    從分區字段中提取時間的時間提取器。

    String

    default

    參數取值如下:

    • default(默認):默認情況下,可以配置timestamp pattern或formatter。

    • custom:應指定提取器類。

    partition.time-extractor.class

    實現PartitionTimeExtractor接口的提取器類。

    String

    無。

    partition.time-extractor.timestamp-pattern

    允許用戶使用分區字段來獲取合法的timestamp pattern的默認construction方式。

    String

    默認支持第一個字段按yyyy-MM-dd hh:mm:ss這種模式提取。

    • 如果需要從一個分區字段'dt'提取timestamp,可以配置:$dt。

    • 如果需要從多個分區字段,比如year、month和day和hour提取timestamp,可以配置成:$year-$month-$day $hour:00:00。

    • 如果需要從兩個分區字段dt和hour提取timestamp,可以配置成:$dt $hour:00:00。

    partition.time-extractor.timestamp-formatter

    轉換分區timestamp字符串值為timestamp的formatter,分區timestamp字符串值通過partition.time-extractor.timestamp-pattern屬性表達。

    String

    yyyy-MM-dd HH:mm:ss

    例如,分區timestamp提取來自多個分區字段,比如year、month和day,可以配置partition.time-extractor.timestamp-pattern屬性為$year$month$day,并且配置partition.time-extractor.timestamp-formatter屬性為yyyyMMdd。默認的formatter是yyyy-MM-dd HH:mm:ss。這里的timestamp-formatter和Java的DateTimeFormatter是通用的。

    sink.partition-commit.policy.kind

    分區提交策略類型。

    String

    分區提交策略通知下游某個分區,該分區已經寫入完畢可以被讀取。參數取值如下:

    • success-file:在目錄中增加_success文件。

    • custom:通過指定的類來創建提交策略。支持同時指定多個提交策略。

    sink.partition-commit.policy.class

    實現PartitionCommitPolicy接口的分區提交策略類。

    String

    該類只有在custom提交策略下才能使用。

    sink.partition-commit.success-file.name

    使用success-file分區提交策略時的文件名。

    String

    _SUCCESS

    無。

    sink.parallelism

    將文件寫入外部文件系統的parallelism。

    Integer

    默認情況下,該sink parallelism與上游chained operator的parallelism一樣。當配置了跟上游的chained operator不一樣的parallelism時,寫文件的算子會使用指定的sink parallelism,如果開啟了文件合并,文件合并的算子也會使用指定的sink parallelism。

    說明

    這個值應該大于0,否則將拋出異常。

配置Bucket鑒權信息

說明

僅實時計算引擎VVR 8.0.6及以上版本支持配置Bucket鑒權信息。

指定文件系統路徑后,您還需要配置Bucket鑒權信息,才能正常讀寫您指定文件系統路徑下的數據。配置Bucket鑒權信息需要在實時計算開發控制臺部署詳情頁簽運行參數配置區域的其他配置中添加如下代碼。

fs.oss.bucket.<bucketName>.accessKeyId: xxxx
fs.oss.bucket.<bucketName>.accessKeySecret: xxxx

其中涉及到的參數解釋如下表所示:

配置項

說明

fs.oss.bucket.<bucketName>.accessKeyId

參數配置說明如下:

  • <bucketName>:需要替換為您目標文件系統路徑參數里填寫的Bucket名稱。

  • accessKeyId:填寫阿里云賬號的Access Key。獲取方法請參見查看RAM用戶的AccessKey信息

  • accessKeySecret:填寫您目標訪問Bucket的阿里云賬號的Access Secret。獲取方法請參見查看RAM用戶的AccessKey信息。

fs.oss.bucket.<bucketName>.accessKeySecret

寫OSS-HDFS

首先需要在實時計算開發控制臺部署詳情頁簽運行參數配置區域的其他配置中添加下如下配置:

fs.oss.jindo.buckets: xxx
fs.oss.jindo.accessKeyId: xxx
fs.oss.jindo.accessKeySecret: xxx

其中涉及到的參數解釋如下表所示:

配置項

說明

fs.oss.jindo.buckets

寫入OSS-HDFS服務中的Bucket名稱,可配置多個,以分號分隔。當Flink寫一個OSS路徑時,如果其對應的bucket包含在fs.oss.jindo.buckets中,則會寫入OSS-HDFS服務中。

fs.oss.jindo.accessKeyId

阿里云賬號的Access Key。獲取方法請參見查看RAM用戶的AccessKey信息。

fs.oss.jindo.accessKeySecret

阿里云賬號的AccessKey Secret。獲取方法請參見查看RAM用戶的AccessKey信息

此外,還需要配置OSS-HDFS的EndPoint。目前支持兩種方式來配置OSS-HDFS的EndPoint:

  • 實時計算開發控制臺部署詳情頁簽運行參數配置區域的其他配置中添加如下配置項來配置OSS-HDFS的EndPoint

    fs.oss.jindo.endpoint: xxx
  • 在OSS的路徑中配置OSS-HDFS的EndPoint

    通過如下的路徑來進行配置

    oss://<user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>/<user-defined-dir>

    其中user-defined-oss-hdfs-bucket為對應的bucket的名字,oss-hdfs-endpoint為OSS-HDFS的EndPoint ;此時配置項fs.oss.jindo.buckets需要包含<user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>。

    例如,假設bucket名字為jindo-test,其oss-hdfs的endpoint為

    cn-beijing.oss-dls.aliyuncs.com。則OSS路徑需為oss://jindo-test.cn-beijing.oss-dls.aliyuncs.com/<user-defined-dir>,配置項fs.oss.jindo.buckets需包含jindo-test.cn-beijing.oss-dls.aliyuncs.com。

使用示例

  • 源表

    CREATE TEMPORARY TABLE fs_table_source (
      `id` INT,
      `name` VARCHAR
    ) WITH (
      'connector'='filesystem',
      'path'='oss://<bucket>/path',
      'format'='parquet'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      `id` INT,
      `name` VARCHAR
    ) with (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink SELECT * FROM fs_table_source ;
  • 結果表

    • 寫分區表

      CREATE TABLE datagen_source (
        user_id STRING,
        order_amount DOUBLE,
        ts BIGINT, -- 以毫秒為單位的時間
        ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
        WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- 在 TIMESTAMP_LTZ 列上定義 watermark
      ) WITH (
        'connector' = 'datagen'
      );
      
      
      CREATE TEMPORARY TABLE fs_table_sink (
        user_id STRING,
        order_amount DOUBLE,
        dt STRING,
        `hour` STRING
      ) PARTITIONED BY (dt, `hour`) WITH (
        'connector'='filesystem',
        'path'='oss://<bucket>/path',
        'format'='parquet',
        'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',
        'sink.partition-commit.delay'='1 h',
        'sink.partition-commit.trigger'='partition-time',
        'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- 假設用戶配置的時區為 'Asia/Shanghai'
        'sink.partition-commit.policy.kind'='success-file'
      );
      
      
      -- 流式 sql,插入文件系統表
      INSERT INTO fs_table_sink 
      SELECT 
        user_id, 
        order_amount, 
        DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'),
        DATE_FORMAT(ts_ltz, 'HH') 
      FROM datagen_source;
    • 寫非分區表

      CREATE TABLE datagen_source (
        user_id STRING,
        order_amount DOUBLE
      ) WITH (
        'connector' = 'datagen'
      );
      
      CREATE TEMPORARY TABLE fs_table_sink (
        user_id STRING,
        order_amount DOUBLE
      ) WITH (
        'connector'='filesystem',
        'path'='oss://<bucket>/path',
        'format'='parquet'
      );
      
      INSERT INTO fs_table_sink SELECT * FROM datagen_source;

DataStream API

重要

通過DataStream的方式讀寫數據時,則需要使用對應的DataStream連接器連接Flink,DataStream連接器設置方法請參見DataStream連接器使用方法。

使用 DataStream API寫OSS和OSS-HDFS的代碼示例如下:

String outputPath = "oss://<bucket>/path"
final StreamingFileSink<Row> sink =
                StreamingFileSink.forRowFormat(
                                new Path(outputPath),
                                (Encoder<Row>)
                                        (element, stream) -> {
                                            out.println(element.toString());
                                        })
                        .withRollingPolicy(OnCheckpointRollingPolicy.build())
                        .build();

outputStream.addSink(sink);

如果您需要寫OSS-HDFS,還需要在實時計算開發控制臺部署詳情頁簽運行參數配置區域的其他配置中配置與OSS-HDFS相關的參數,具體請參見寫OSS-HDFS

相關文檔