日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

MongoDB

本文為您介紹如何使用MongoDB連接器。

背景信息

MongoDB是一個面向文檔的非結構化數據庫,能夠簡化應用程序的開發及擴展。MongoDB連接器支持的信息如下:

類別

詳情

支持類型

源表、維表和結果表

運行模式

僅支持流模式

特有監控指標

  • 源表

    • numBytesIn

    • numBytesInPerSecond

    • numRecordsIn

    • numRecordsInPerSecond

    • numRecordsInErrors

    • currentFetchEventTimeLag

    • currentEmitEventTimeLag

    • watermarkLag

    • sourceIdleTime

  • 維表和結果表:無。

說明

指標含義詳情,請參見監控指標說明。

API 種類

DataStream和SQL

是否支持更新或刪除結果表數據

特色功能

  • MongoDB的CDC源表,即MongoDB的流式源表,會先讀取數據庫的歷史全量數據,并平滑切換到oplog讀取上,保證不多讀一條也不少讀一條。即使發生故障,也能保證通過Exactly Once語義處理數據。MongoDB CDC支持通過Change Stream API高效地捕獲MongoDB的數據庫和集合中的文檔變更,監控文檔的插入、修改、替換、刪除事件,并將其轉換為Flink能夠處理的Changelog數據流。作為源表,支持以下功能特性:

    • 支持利用MongoDB 3.6新增的Change Stream API,更高效地監控變化。

    • 精確一次處理:在作業任何階段失敗都能保證Exactly-once語義。

    • 支持全增量一體化監測:支持快照階段完成后自動切換為增量讀取階段。

    • 支持初始快照階段的并行讀取,需要MongoDB >= 4.0。

    • 支持多種啟動模式:

      • initial模式:在第一次啟動時對受監視的數據庫表執行初始快照,并繼續讀取最新的oplog。

      • latest-offset模式:首次啟動時,從不對受監視的數據庫表執行快照, 連接器僅從oplog 的結尾處開始讀取,這意味著連接器只能讀取在連接器啟動之后的數據更改。

      • timestamp:跳過快照階段,從指定的時間戳開始讀取oplog事件,需要MongoDB >= 4.0。

    • 支持產生Full Changelog事件流,需要MongoDB >= 6.0,詳情請參見關于MongoDB的變更前后像記錄功能。

  • 實時計算Flink VVR 8.0.6及以上版本支持通過CREATE TABLE AS(CTAS)語句CREATE DATABASE AS(CDAS)語句將MongoDB的數據和Schema變更同步到下游表。使用時需開啟MongoDB數據庫的前像后像(Pre- and Post-images)記錄功能,詳情請參見關于MongoDB的變更前后像記錄功能。

  • 實時計算Flink VVR 8.0.9及以上版本擴展維表關聯讀取能力,支持讀取內置ObjectId 類型的_id字段。

前提條件

  • CDC源表

    • CDC連接器支持通過副本集或分片集架構模式讀取阿里云云數據庫MongoDB版的數據,也支持讀取自建MongoDB數據庫的數據 。

    • 使用MongoDB CDC連接器的基礎功能時,必須開啟待監控的MongoDB數據庫的副本集(Replica Set)功能,詳情請參見Replication。

    • 如需使用Full Changelog事件流功能,則需開啟MongoDB數據庫的前像后像(Pre- and Post-images)記錄功能,詳情請參見Document Preimages關于MongoDB的變更前后像記錄功能。

    • 如果啟用了MongoDB的鑒權功能,則需要使用具有以下數據庫權限的MongoDB用戶:

      • splitVector權限

      • listDatabases權限

      • listCollections權限

      • collStats權限

      • find權限

      • changeStream權限

      • config.collections和config.chunks集合的訪問權限

  • 維表和結果表

    • 已創建MongoDB數據庫和表

    • 已設置IP白名單

使用限制

  • 僅支持讀寫3.6及以上版本的MongoDB。

  • CDC源表

    • 實時計算引擎VVR 8.0.1及以上版本支持使用MongoDB CDC連接器。

    • MongoDB 6.0及以上版本支持產生Full Changelog事件流。

    • MongoDB 4.0及以上版本支持指定時間戳的啟動模式。

    • MongoDB 4.0及以上版本支持初始快照階段并行讀取。如果您需要啟用并行模式進行初始快照,則需要將scan.incremental.snapshot.enabled配置項設置為true。

    • 由于MongoDB Change Stream流訂閱限制,不支持讀取admin、local、config數據庫及system集合中的數據,詳情請參見MongoDB文檔。

  • 結果表

    • 實時計算引擎VVR 8.0.5以下版本僅支持插入數據。

    • 實時計算引擎VVR 8.0.5及以上版本,結果表中聲明主鍵時,支持插入、更新和刪除數據,未聲明主鍵時僅支持插入數據。

  • 維表

    • 實時計算引擎VVR 8.0.5及以上版本支持使用MongoDB維表。

語法結構

CREATE TABLE tableName(
  _id STRING,
  [columnName dataType,]*
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb',
  'hosts' = 'localhost:27017',
  'username' = 'mongouser',
  'password' = '${secret_values.password}',
  'database' = 'testdb',
  'collection' = 'testcoll'
)
說明

在創建CDC源表時,您必須聲明_id STRING列,并將其作為唯一的主鍵。

WITH參數

  • 通用

    參數

    說明

    數據類型

    是否必填

    默認值

    備注

    connector

    連接器名稱。

    String

    • 作為源表:

      • 實時計算引擎VVR 8.0.4及之前版本,填寫為mongodb-cdc。

      • 實時計算引擎VVR 8.0.5及之后版本,填寫為mongodb或mongodb-cdc。

    • 作為維表或結果表時,固定值為mongodb。

    uri

    MongoDB連接uri。

    String

    說明

    參數urihosts必須指定其中之一。若指定uri,則無需指定scheme、hosts、usernamepasswordconnector.options。當兩者均指定時將使用uri進行連接。

    hosts

    MongoDB所在的主機名稱。

    String

    可以使用英文逗號(,)分隔提供多個主機名。

    scheme

    MongoDB使用的連接協議。

    String

    mongodb

    可選的取值包括:

    • mongodb:代表使用默認的MongoDB協議進行連接

    • mongodb+srv:代表使用DNS SRV記錄協議進行連接

    username

    連接到MongoDB時使用的用戶名。

    String

    開啟身份驗證功能時,必須配置該參數。

    password

    連接到MongoDB時使用的密碼。

    String

    開啟身份驗證功能時,必須配置該參數。

    重要

    為了避免您的密碼信息泄露,建議您通過密鑰管理的方式填寫密碼取值,詳情請參見變量管理。

    database

    MongoDB數據庫名稱。

    String

    • 作為源表時,數據庫名稱支持正則表達式匹配。

    • 不配置該參數代表監控全部數據庫。

    重要

    不支持監控admin、local、config數據庫中的數據。

    collection

    MongoDB集合名稱。

    String

    • 作為源表時,集合名稱支持正則表達式匹配。

      重要

      如果您要監控的集合名稱中包含正則表達式特殊字符,則必須提供完全限定的名字空間(數據庫名稱.集合名稱),否則無法捕獲對應集合的變更。

    • 不配置該參數代表監控全部集合。

    重要

    不支持監控system集合中的數據。

    connection.options

    MongoDB側的連接參數。

    String

    使用&分隔的key=value式額外連接參數。例如connectTimeoutMS=12000&socketTimeoutMS=13000。

  • 源表獨有

    參數

    說明

    數據類型

    是否必填

    默認值

    備注

    scan.startup.mode

    MongoDB CDC的啟動模式。

    String

    initial

    參數取值如下:

    • initial:從初始位點開始拉取全部數據。

    • latest-offset:從當前位點開始拉取變更數據。

    • timestamp:從指定的時間戳開始拉取變更數據。

    詳情請參見Startup Properties

    scan.startup.timestamp-millis

    指定位點消費的起始時間戳。

    Long

    取決于 scan.startup.mode的取值

    • initial:否

    • latest-offset:否

    • timestamp:是

    參數格式為自Linux Epoch時間戳以來的毫秒數。

    僅適用于timestamp啟動模式。

    initial.snapshotting.queue.size

    進行初始快照時的隊列大小限制。

    Integer

    10240

    僅在scan.startup.mode選項設置為initial 時生效。

    batch.size

    游標的批處理大小。

    Integer

    1024

    無。

    poll.max.batch.size

    同一批處理的最多變更文檔數量。

    Integer

    1024

    此參數控制流處理時一次拉取最多變更文檔的個數。取值越大,連接器內部分配的緩沖區越大。

    poll.await.time.ms

    兩次拉取數據之間的時間間隔。

    Integer

    1000

    單位為毫秒。

    heartbeat.interval.ms

    發送心跳包的時間間隔。

    Integer

    0

    單位為毫秒。

    MongoDB CDC連接器主動向數據庫發送心跳包來保證回溯狀態最新。設置為0代表永不發送心跳包。

    重要

    對于更新不頻繁的集合,強烈建議設定此選項。

    scan.incremental.snapshot.enabled

    是否啟用并行模式進行初始快照。

    Boolean

    false

    實驗性功能。

    scan.incremental.snapshot.chunk.size.mb

    并行模式讀取快照時的分片大小。

    Integer

    64

    實驗性功能。

    單位為MB。

    僅在啟用并行快照時生效。

    scan.full-changelog

    產生完整的Full Changelog事件流。

    Boolean

    false

    實驗性功能。

    說明

    MongoDB數據庫需要為6.0及以上版本,并且已開啟前像后像功能,開啟方法請參見Document Preimages。

    scan.flatten-nested-columns.enabled

    是否將以.分隔的字段名解析為嵌套BSON文檔讀取。

    Boolean

    false

    若開啟,在如下示例的BSON文檔中,col字段在schema中名稱為nested.col。

    {"nested":{"col":true}}
    說明

    僅VVR 8.0.5及以上版本支持該參數。

    scan.primitive-as-string

    是否將BSON文檔中的原始類型都解析為字符串類型。

    Boolean

    false

    說明

    僅VVR 8.0.5及以上版本支持該參數。

  • 維表獨有

    參數

    說明

    數據類型

    是否必填

    默認值

    備注

    lookup.cache

    Cache策略。

    String

    NONE

    目前支持以下兩種緩存策略:

    • None:無緩存。

    • Partial:只在外部數據庫中查找數據時緩存。

    lookup.max-retries

    查詢數據庫失敗的最大重試次數。

    Integer

    3

    無。

    lookup.retry.interval

    如果查詢數據庫失敗,重試的時間間隔。

    Duration

    1s

    無。

    lookup.partial-cache.expire-after-access

    緩存中的記錄最長保留時間。

    Duration

    支持時間單位ms、s、min、h和d。

    使用該配置時 lookup.cache 必須設置為 PARTIAL。

    lookup.partial-cache.expire-after-write

    在記錄寫入緩存后該記錄的最大保留時間。

    Duration

    使用該配置時 lookup.cache 必須設置為 PARTIAL

    lookup.partial-cache.max-rows

    緩存的最大條數。超過該值,最舊的行將過期。

    Long

    使用該配置時 lookup.cache 必須設置為 PARTIAL。

    lookup.partial-cache.cache-missing-key

    在物理表中未關聯到數據時,是否緩存空記錄。

    Boolean

    True

    使用該配置時 lookup.cache 必須設置為 PARTIAL

  • 結果表獨有

    參數

    說明

    數據類型

    是否必填

    默認值

    備注

    sink.buffer-flush.max-rows

    每次按批寫入數據時的最大記錄數。

    Integer

    1000

    無。

    sink.buffer-flush.interval

    寫入數據的刷新間隔。

    Duration

    1s

    無。

    sink.delivery-guarantee

    寫入數據時的語義保證。

    String

    at-least-once

    可選的取值包括:

    • none

    • at-least-once

    說明

    目前不支持exactly-once。

    sink.max-retries

    寫入數據庫失敗時的最大重試次數。

    Integer

    3

    無。

    sink.retry.interval

    寫入數據庫失敗時的重試時間間隔。

    Duration

    1s

    無。

    sink.parallelism

    自定義sink并行度。

    Integer

    無。

類型映射

  • CDC源表

    BSON類型

    Flink SQL類型

    Int32

    INT

    Int64

    BIGINT

    Double

    DOUBLE

    Decimal128

    DECIMAL(p, s)

    Boolean

    BOOLEAN

    Date Timestamp

    DATE

    Date Timestamp

    TIME

    DateTime

    TIMESTAMP(3)

    TIMESTAMP_LTZ(3)

    Timestamp

    TIMESTAMP(0)

    TIMESTAMP_LTZ(0)

    String

    ObjectId

    UUID

    Symbol

    MD5

    JavaScript

    Regex

    STRING

    Binary

    BYTES

    Object

    ROW

    Array

    ARRAY

    DBPointer

    ROW<$ref STRING, $id STRING>

    GeoJSON

    Point: ROW<type STRING, coordinates ARRAY<DOUBLE>>

    Line: ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>>

  • 維表和結果表

    BSON類型

    Flink SQL類型

    Int32

    INT

    Int64

    BIGINT

    Double

    DOUBLE

    Decimal128

    DECIMAL

    Boolean

    BOOLEAN

    DateTime

    TIMESTAMP_LTZ(3)

    Timestamp

    TIMESTAMP_LTZ(0)

    String

    ObjectId

    STRING

    Binary

    BYTES

    Object

    ROW

    Array

    ARRAY

使用示例

  • CDC源表

    CREATE TEMPORARY TABLE mongo_source (
      `_id` STRING, --must be declared
      name STRING,
      weight DECIMAL,
      tags ARRAY<STRING>,
      price ROW<amount DECIMAL, currency STRING>,
      suppliers ARRAY<ROW<name STRING, address STRING>>,
      db_name STRING METADATA FROM 'database_name' VIRTUAL,
      collection_name STRING METADATA VIRTUAL,
      op_ts TIMESTAMP_LTZ(3) METADATA VIRTUAL,
      PRIMARY KEY(_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mongodb',
      'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
      'username' = 'root',
      'password' = '${secret_values.password}',
      'database' = 'flinktest',
      'collection' = 'flinkcollection',
      'scan.incremental.snapshot.enabled' = 'true',
      'scan.full-changelog' = 'true'
    );
    
    CREATE TEMPORARY TABLE  productssink (
      name STRING,
      weight DECIMAL,
      tags ARRAY<STRING>,
      price_amount DECIMAL,
      suppliers_name STRING,
      db_name STRING,
      collection_name STRING,
      op_ts TIMESTAMP_LTZ(3)
    ) WITH (
      'connector' = 'print',
      'logger' = 'true'
    );
    
    INSERT INTO productssink  
    SELECT
      name,
      weight,
      tags,
      price.amount,
      suppliers[1].name,
      db_name,
      collection_name,
      op_ts
    FROM
      mongo_source;
  • 維表

    CREATE TEMPORARY TABLE datagen_source (
      id STRING,
      a int,
      b BIGINT,
      `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE mongo_dim (
      `_id` STRING,
      name STRING,
      weight DECIMAL,
      tags ARRAY<STRING>,
      price ROW<amount DECIMAL, currency STRING>,
      suppliers ARRAY<ROW<name STRING, address STRING>>,
      PRIMARY KEY(_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mongodb',
      'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
      'username' = 'root',
      'password' = '${secret_values.password}',
      'database' = 'flinktest',
      'collection' = 'flinkcollection',
      'lookup.cache' = 'PARTIAL',
      'lookup.partial-cache.expire-after-access' = '10min',
      'lookup.partial-cache.expire-after-write' = '10min',
      'lookup.partial-cache.max-rows' = '100'
    );
    
    CREATE TEMPORARY TABLE print_sink (
      name STRING,
      weight DECIMAL,
      tags ARRAY<STRING>,
      price_amount DECIMAL,
      suppliers_name STRING
    ) WITH (
      'connector' = 'print',
      'logger' = 'true'
    );
    
    INSERT INTO print_sink
    SELECT
      T.id,
      T.a,
      T.b,
      H.name
    FROM
      datagen_source AS T JOIN mongo_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.id = H._id;
  • 結果表

    CREATE TEMPORARY TABLE datagen_source (
      `_id` STRING,
      name STRING,
      weight DECIMAL,
      tags ARRAY<STRING>,
      price ROW<amount DECIMAL, currency STRING>,
      suppliers ARRAY<ROW<name STRING, address STRING>>
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE mongo_sink (
      `_id` STRING,
      name STRING,
      weight DECIMAL,
      tags ARRAY<STRING>,
      price ROW<amount DECIMAL, currency STRING>,
      suppliers ARRAY<ROW<name STRING, address STRING>>,
      PRIMARY KEY(_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mongodb',
      'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
      'username' = 'root',
      'password' = '${secret_values.password}',
      'database' = 'flinktest',
      'collection' = 'flinkcollection'
    );
    
    INSERT INTO mongo_sink
    SELECT * FROM datagen_source;

元數據

MongoDB CDC源表支持元數據列語法,您可以通過元數據列訪問以下元數據。

元數據key

元數據類型

描述

database_name

STRING NOT NULL

包含該文檔的數據庫名。

collection_name

STRING NOT NULL

包含該文檔的集合名。

op_ts

TIMESTAMP_LTZ(3) NOT NULL

該文檔在數據庫中的變更時間,如果該文檔來自表的存量歷史數據而不是從ChangeStream中獲取,則該值總是0。

關于MongoDB的變更前后像記錄功能

MongoDB 6.0 之前的版本默認不會提供變更前文檔及被刪除文檔的數據,在未開啟變更前后像記錄功能時,利用已有信息只能實現 Upsert 語義(即缺失了 Update Before 數據條目)。但在 Flink 中許多有用的算子操作都依賴完整的 Insert、Update Before、Update After、Delete 變更流。

為了補充缺失的變更前事件,目前 Flink SQL Planner 會自動為 Upsert 類型的數據源生成一個 ChangelogNormalize 節點,該節點會在 Flink 狀態中緩存所有文檔的當前版本快照,在遇到被更新或刪除的文檔時,查表即可得知變更前的狀態,但該算子節點需要存儲體積巨大的狀態數據。

image.png

MongoDB 6.0版本支持開啟數據庫的前像后像(Pre- and Post-images)記錄功能,詳情可參考Document Preimages。開啟該功能后,MongoDB會在每次變更發生時,在一個特殊的集合中記錄文檔變更前后的完整狀態。此時在作業中啟用scan.full-changelog配置項,MongoDB CDC會從變更文檔記錄中生成Update Before記錄,從而支持產生完整事件流,消除了對ChangelogNormalize節點的依賴。

Mongo CDC DataStream API

重要

通過DataStream的方式讀寫數據時,則需要使用對應的DataStream連接器連接Flink,DataStream連接器設置方法請參見DataStream連接器使用方法

創建DataStream API程序并使用MongoDBSource。代碼示例如下:

Java

MongoDBSource.builder()
  .hosts("mongo.example.com:27017")
  .username("mongouser")
  .password("mongopasswd")
  .databaseList("testdb")
  .collectionList("testcoll")
  .startupOptions(StartupOptions.initial())
  .deserializer(new JsonDebeziumDeserializationSchema())
  .build();

XML

Maven中央倉庫已經放置了VVR MongoDB連接器,以供您在作業開發時直接使用。

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>flink-connector-mongodb</artifactId>
    <version>${vvr.version}</version>
</dependency>
說明

在使用DataStream API時,若要啟用增量快照功能,請在構造MongoDBSource數據源時,使用com.ververica.cdc.connectors.mongodb.source包中的MongoDBSource#builder();否則,使用com.ververica.cdc.connectors.mongodb中的MongoDBSource#builder()。

在構造MongoDBSource時,可以配置以下參數:

參數

說明

hosts

需要連接的MongoDB數據庫的主機名稱。

username

MongoDB數據庫服務的用戶名。

說明

若MongoDB服務器未啟用鑒權,則無需配置此參數。

password

MongoDB數據庫服務的密碼。

說明

若MongoDB服務器未啟用鑒權,則無需配置此參數。

databaseList

需要監控的MongoDB數據庫名稱。

說明

數據庫名稱支持正則表達式以讀取多個數據庫的數據,您可以使用.*匹配所有數據庫。

collectionList

需要監控的MongoDB集合名稱。

說明

集合名稱支持正則表達式以讀取多個集合的數據,您可以使用.*匹配所有集合。

startupOptions

選擇MongoDB CDC的啟動模式。

合法的取值包括:

  • StartupOptions.initial()

    • 從初始位點開始拉取全部數據

  • StartupOptions.latest-offset()

    • 從當前位點開始拉取變更數據

  • StartupOptions.timestamp()

    • 從指定的時間戳開始拉取變更數據

詳情請參見Startup Properties。

deserializer

反序列化器,將SourceRecord類型記錄反序列化到指定類型。參數取值如下:

  • MongoDBConnectorDeserializationSchema:將Upsert模式下產生的SourceRecord轉成Flink Table API或SQL API內部數據結構RowData。

  • MongoDBConnectorFullChangelogDeserializationSchema:將Full Changelog模式下產生的SourceRecord轉成Flink Table或SQL內部數據結構RowData。

  • JsonDebeziumDeserializationSchema:將SourceRecord轉成JSON格式的String。