本文為您介紹如何使用MongoDB連接器。
背景信息
MongoDB是一個面向文檔的非結構化數據庫,能夠簡化應用程序的開發及擴展。MongoDB連接器支持的信息如下:
類別 | 詳情 |
支持類型 | 源表、維表和結果表 |
運行模式 | 僅支持流模式 |
特有監控指標 |
說明 指標含義詳情,請參見監控指標說明。 |
API 種類 | DataStream和SQL |
是否支持更新或刪除結果表數據 | 是 |
特色功能
MongoDB的CDC源表,即MongoDB的流式源表,會先讀取數據庫的歷史全量數據,并平滑切換到oplog讀取上,保證不多讀一條也不少讀一條。即使發生故障,也能保證通過Exactly Once語義處理數據。MongoDB CDC支持通過Change Stream API高效地捕獲MongoDB的數據庫和集合中的文檔變更,監控文檔的插入、修改、替換、刪除事件,并將其轉換為Flink能夠處理的Changelog數據流。作為源表,支持以下功能特性:
支持利用MongoDB 3.6新增的Change Stream API,更高效地監控變化。
精確一次處理:在作業任何階段失敗都能保證Exactly-once語義。
支持全增量一體化監測:支持快照階段完成后自動切換為增量讀取階段。
支持初始快照階段的并行讀取,需要MongoDB >= 4.0。
支持多種啟動模式:
initial模式:在第一次啟動時對受監視的數據庫表執行初始快照,并繼續讀取最新的oplog。
latest-offset模式:首次啟動時,從不對受監視的數據庫表執行快照, 連接器僅從oplog 的結尾處開始讀取,這意味著連接器只能讀取在連接器啟動之后的數據更改。
timestamp:跳過快照階段,從指定的時間戳開始讀取oplog事件,需要MongoDB >= 4.0。
支持產生Full Changelog事件流,需要MongoDB >= 6.0,詳情請參見關于MongoDB的變更前后像記錄功能。
實時計算Flink VVR 8.0.6及以上版本支持通過CREATE TABLE AS(CTAS)語句或CREATE DATABASE AS(CDAS)語句將MongoDB的數據和Schema變更同步到下游表。使用時需開啟MongoDB數據庫的前像后像(Pre- and Post-images)記錄功能,詳情請參見關于MongoDB的變更前后像記錄功能。
實時計算Flink VVR 8.0.9及以上版本擴展維表關聯讀取能力,支持讀取內置ObjectId 類型的
_id
字段。
前提條件
CDC源表
CDC連接器支持通過副本集或分片集架構模式讀取阿里云云數據庫MongoDB版的數據,也支持讀取自建MongoDB數據庫的數據 。
使用MongoDB CDC連接器的基礎功能時,必須開啟待監控的MongoDB數據庫的副本集(Replica Set)功能,詳情請參見Replication。
如需使用Full Changelog事件流功能,則需開啟MongoDB數據庫的前像后像(Pre- and Post-images)記錄功能,詳情請參見Document Preimages和關于MongoDB的變更前后像記錄功能。
如果啟用了MongoDB的鑒權功能,則需要使用具有以下數據庫權限的MongoDB用戶:
splitVector權限
listDatabases權限
listCollections權限
collStats權限
find權限
changeStream權限
config.collections和config.chunks集合的訪問權限
維表和結果表
已創建MongoDB數據庫和表
已設置IP白名單
使用限制
僅支持讀寫3.6及以上版本的MongoDB。
CDC源表
實時計算引擎VVR 8.0.1及以上版本支持使用MongoDB CDC連接器。
MongoDB 6.0及以上版本支持產生Full Changelog事件流。
MongoDB 4.0及以上版本支持指定時間戳的啟動模式。
MongoDB 4.0及以上版本支持初始快照階段并行讀取。如果您需要啟用并行模式進行初始快照,則需要將
scan.incremental.snapshot.enabled
配置項設置為true。由于MongoDB Change Stream流訂閱限制,不支持讀取admin、local、config數據庫及system集合中的數據,詳情請參見MongoDB文檔。
結果表
實時計算引擎VVR 8.0.5以下版本僅支持插入數據。
實時計算引擎VVR 8.0.5及以上版本,結果表中聲明主鍵時,支持插入、更新和刪除數據,未聲明主鍵時僅支持插入數據。
維表
實時計算引擎VVR 8.0.5及以上版本支持使用MongoDB維表。
語法結構
CREATE TABLE tableName(
_id STRING,
[columnName dataType,]*
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb',
'hosts' = 'localhost:27017',
'username' = 'mongouser',
'password' = '${secret_values.password}',
'database' = 'testdb',
'collection' = 'testcoll'
)
在創建CDC源表時,您必須聲明_id STRING
列,并將其作為唯一的主鍵。
WITH參數
通用
參數
說明
數據類型
是否必填
默認值
備注
connector
連接器名稱。
String
是
無
作為源表:
實時計算引擎VVR 8.0.4及之前版本,填寫為mongodb-cdc。
實時計算引擎VVR 8.0.5及之后版本,填寫為mongodb或mongodb-cdc。
作為維表或結果表時,固定值為mongodb。
uri
MongoDB連接uri。
String
否
無
說明參數
uri
與hosts
必須指定其中之一。若指定uri
,則無需指定scheme
、hosts
、username
、password
、connector.options
。當兩者均指定時將使用uri
進行連接。hosts
MongoDB所在的主機名稱。
String
否
無
可以使用英文逗號(
,
)分隔提供多個主機名。scheme
MongoDB使用的連接協議。
String
否
mongodb
可選的取值包括:
mongodb
:代表使用默認的MongoDB協議進行連接mongodb+srv
:代表使用DNS SRV記錄協議進行連接
username
連接到MongoDB時使用的用戶名。
String
否
無
開啟身份驗證功能時,必須配置該參數。
password
連接到MongoDB時使用的密碼。
String
否
無
開啟身份驗證功能時,必須配置該參數。
重要為了避免您的密碼信息泄露,建議您通過密鑰管理的方式填寫密碼取值,詳情請參見變量管理。
database
MongoDB數據庫名稱。
String
否
無
作為源表時,數據庫名稱支持正則表達式匹配。
不配置該參數代表監控全部數據庫。
重要不支持監控admin、local、config數據庫中的數據。
collection
MongoDB集合名稱。
String
否
無
作為源表時,集合名稱支持正則表達式匹配。
重要如果您要監控的集合名稱中包含正則表達式特殊字符,則必須提供完全限定的名字空間(數據庫名稱.集合名稱),否則無法捕獲對應集合的變更。
不配置該參數代表監控全部集合。
重要不支持監控system集合中的數據。
connection.options
MongoDB側的連接參數。
String
否
無
使用
&
分隔的key=value
式額外連接參數。例如connectTimeoutMS=12000&socketTimeoutMS=13000。源表獨有
參數
說明
數據類型
是否必填
默認值
備注
scan.startup.mode
MongoDB CDC的啟動模式。
String
否
initial
參數取值如下:
initial:從初始位點開始拉取全部數據。
latest-offset:從當前位點開始拉取變更數據。
timestamp:從指定的時間戳開始拉取變更數據。
詳情請參見Startup Properties。
scan.startup.timestamp-millis
指定位點消費的起始時間戳。
Long
取決于 scan.startup.mode的取值
initial:否
latest-offset:否
timestamp:是
無
參數格式為自Linux Epoch時間戳以來的毫秒數。
僅適用于
timestamp
啟動模式。initial.snapshotting.queue.size
進行初始快照時的隊列大小限制。
Integer
否
10240
僅在
scan.startup.mode
選項設置為initial
時生效。batch.size
游標的批處理大小。
Integer
否
1024
無。
poll.max.batch.size
同一批處理的最多變更文檔數量。
Integer
否
1024
此參數控制流處理時一次拉取最多變更文檔的個數。取值越大,連接器內部分配的緩沖區越大。
poll.await.time.ms
兩次拉取數據之間的時間間隔。
Integer
否
1000
單位為毫秒。
heartbeat.interval.ms
發送心跳包的時間間隔。
Integer
否
0
單位為毫秒。
MongoDB CDC連接器主動向數據庫發送心跳包來保證回溯狀態最新。設置為0代表永不發送心跳包。
重要對于更新不頻繁的集合,強烈建議設定此選項。
scan.incremental.snapshot.enabled
是否啟用并行模式進行初始快照。
Boolean
否
false
實驗性功能。
scan.incremental.snapshot.chunk.size.mb
并行模式讀取快照時的分片大小。
Integer
否
64
實驗性功能。
單位為MB。
僅在啟用并行快照時生效。
scan.full-changelog
產生完整的Full Changelog事件流。
Boolean
否
false
實驗性功能。
說明MongoDB數據庫需要為6.0及以上版本,并且已開啟前像后像功能,開啟方法請參見Document Preimages。
scan.flatten-nested-columns.enabled
是否將以
.
分隔的字段名解析為嵌套BSON文檔讀取。Boolean
否
false
若開啟,在如下示例的BSON文檔中,
col
字段在schema中名稱為nested.col
。{"nested":{"col":true}}
說明僅VVR 8.0.5及以上版本支持該參數。
scan.primitive-as-string
是否將BSON文檔中的原始類型都解析為字符串類型。
Boolean
否
false
說明僅VVR 8.0.5及以上版本支持該參數。
維表獨有
參數
說明
數據類型
是否必填
默認值
備注
lookup.cache
Cache策略。
String
否
NONE
目前支持以下兩種緩存策略:
None:無緩存。
Partial:只在外部數據庫中查找數據時緩存。
lookup.max-retries
查詢數據庫失敗的最大重試次數。
Integer
否
3
無。
lookup.retry.interval
如果查詢數據庫失敗,重試的時間間隔。
Duration
否
1s
無。
lookup.partial-cache.expire-after-access
緩存中的記錄最長保留時間。
Duration
否
無
支持時間單位ms、s、min、h和d。
使用該配置時
lookup.cache
必須設置為PARTIAL
。lookup.partial-cache.expire-after-write
在記錄寫入緩存后該記錄的最大保留時間。
Duration
否
無
使用該配置時
lookup.cache
必須設置為PARTIAL
。lookup.partial-cache.max-rows
緩存的最大條數。超過該值,最舊的行將過期。
Long
否
無
使用該配置時
lookup.cache
必須設置為PARTIAL
。lookup.partial-cache.cache-missing-key
在物理表中未關聯到數據時,是否緩存空記錄。
Boolean
否
True
使用該配置時
lookup.cache
必須設置為PARTIAL
。結果表獨有
參數
說明
數據類型
是否必填
默認值
備注
sink.buffer-flush.max-rows
每次按批寫入數據時的最大記錄數。
Integer
否
1000
無。
sink.buffer-flush.interval
寫入數據的刷新間隔。
Duration
否
1s
無。
sink.delivery-guarantee
寫入數據時的語義保證。
String
否
at-least-once
可選的取值包括:
none
at-least-once
說明目前不支持exactly-once。
sink.max-retries
寫入數據庫失敗時的最大重試次數。
Integer
否
3
無。
sink.retry.interval
寫入數據庫失敗時的重試時間間隔。
Duration
否
1s
無。
sink.parallelism
自定義sink并行度。
Integer
否
空
無。
類型映射
CDC源表
BSON類型
Flink SQL類型
Int32
INT
Int64
BIGINT
Double
DOUBLE
Decimal128
DECIMAL(p, s)
Boolean
BOOLEAN
Date Timestamp
DATE
Date Timestamp
TIME
DateTime
TIMESTAMP(3)
TIMESTAMP_LTZ(3)
Timestamp
TIMESTAMP(0)
TIMESTAMP_LTZ(0)
String
ObjectId
UUID
Symbol
MD5
JavaScript
Regex
STRING
Binary
BYTES
Object
ROW
Array
ARRAY
DBPointer
ROW<$ref STRING, $id STRING>
GeoJSON
Point: ROW<type STRING, coordinates ARRAY<DOUBLE>>
Line: ROW<type STRING, coordinates ARRAY<ARRAY< DOUBLE>>>
維表和結果表
BSON類型
Flink SQL類型
Int32
INT
Int64
BIGINT
Double
DOUBLE
Decimal128
DECIMAL
Boolean
BOOLEAN
DateTime
TIMESTAMP_LTZ(3)
Timestamp
TIMESTAMP_LTZ(0)
String
ObjectId
STRING
Binary
BYTES
Object
ROW
Array
ARRAY
使用示例
CDC源表
CREATE TEMPORARY TABLE mongo_source ( `_id` STRING, --must be declared name STRING, weight DECIMAL, tags ARRAY<STRING>, price ROW<amount DECIMAL, currency STRING>, suppliers ARRAY<ROW<name STRING, address STRING>>, db_name STRING METADATA FROM 'database_name' VIRTUAL, collection_name STRING METADATA VIRTUAL, op_ts TIMESTAMP_LTZ(3) METADATA VIRTUAL, PRIMARY KEY(_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb', 'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,', 'username' = 'root', 'password' = '${secret_values.password}', 'database' = 'flinktest', 'collection' = 'flinkcollection', 'scan.incremental.snapshot.enabled' = 'true', 'scan.full-changelog' = 'true' ); CREATE TEMPORARY TABLE productssink ( name STRING, weight DECIMAL, tags ARRAY<STRING>, price_amount DECIMAL, suppliers_name STRING, db_name STRING, collection_name STRING, op_ts TIMESTAMP_LTZ(3) ) WITH ( 'connector' = 'print', 'logger' = 'true' ); INSERT INTO productssink SELECT name, weight, tags, price.amount, suppliers[1].name, db_name, collection_name, op_ts FROM mongo_source;
維表
CREATE TEMPORARY TABLE datagen_source ( id STRING, a int, b BIGINT, `proctime` AS PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mongo_dim ( `_id` STRING, name STRING, weight DECIMAL, tags ARRAY<STRING>, price ROW<amount DECIMAL, currency STRING>, suppliers ARRAY<ROW<name STRING, address STRING>>, PRIMARY KEY(_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb', 'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,', 'username' = 'root', 'password' = '${secret_values.password}', 'database' = 'flinktest', 'collection' = 'flinkcollection', 'lookup.cache' = 'PARTIAL', 'lookup.partial-cache.expire-after-access' = '10min', 'lookup.partial-cache.expire-after-write' = '10min', 'lookup.partial-cache.max-rows' = '100' ); CREATE TEMPORARY TABLE print_sink ( name STRING, weight DECIMAL, tags ARRAY<STRING>, price_amount DECIMAL, suppliers_name STRING ) WITH ( 'connector' = 'print', 'logger' = 'true' ); INSERT INTO print_sink SELECT T.id, T.a, T.b, H.name FROM datagen_source AS T JOIN mongo_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.id = H._id;
結果表
CREATE TEMPORARY TABLE datagen_source ( `_id` STRING, name STRING, weight DECIMAL, tags ARRAY<STRING>, price ROW<amount DECIMAL, currency STRING>, suppliers ARRAY<ROW<name STRING, address STRING>> ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE mongo_sink ( `_id` STRING, name STRING, weight DECIMAL, tags ARRAY<STRING>, price ROW<amount DECIMAL, currency STRING>, suppliers ARRAY<ROW<name STRING, address STRING>>, PRIMARY KEY(_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb', 'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,', 'username' = 'root', 'password' = '${secret_values.password}', 'database' = 'flinktest', 'collection' = 'flinkcollection' ); INSERT INTO mongo_sink SELECT * FROM datagen_source;
元數據
MongoDB CDC源表支持元數據列語法,您可以通過元數據列訪問以下元數據。
元數據key | 元數據類型 | 描述 |
database_name | STRING NOT NULL | 包含該文檔的數據庫名。 |
collection_name | STRING NOT NULL | 包含該文檔的集合名。 |
op_ts | TIMESTAMP_LTZ(3) NOT NULL | 該文檔在數據庫中的變更時間,如果該文檔來自表的存量歷史數據而不是從ChangeStream中獲取,則該值總是0。 |
關于MongoDB的變更前后像記錄功能
MongoDB 6.0 之前的版本默認不會提供變更前文檔及被刪除文檔的數據,在未開啟變更前后像記錄功能時,利用已有信息只能實現 Upsert 語義(即缺失了 Update Before 數據條目)。但在 Flink 中許多有用的算子操作都依賴完整的 Insert、Update Before、Update After、Delete 變更流。
為了補充缺失的變更前事件,目前 Flink SQL Planner 會自動為 Upsert 類型的數據源生成一個 ChangelogNormalize 節點,該節點會在 Flink 狀態中緩存所有文檔的當前版本快照,在遇到被更新或刪除的文檔時,查表即可得知變更前的狀態,但該算子節點需要存儲體積巨大的狀態數據。
MongoDB 6.0版本支持開啟數據庫的前像后像(Pre- and Post-images)記錄功能,詳情可參考Document Preimages。開啟該功能后,MongoDB會在每次變更發生時,在一個特殊的集合中記錄文檔變更前后的完整狀態。此時在作業中啟用scan.full-changelog
配置項,MongoDB CDC會從變更文檔記錄中生成Update Before記錄,從而支持產生完整事件流,消除了對ChangelogNormalize節點的依賴。
Mongo CDC DataStream API
通過DataStream的方式讀寫數據時,則需要使用對應的DataStream連接器連接Flink,DataStream連接器設置方法請參見DataStream連接器使用方法。
創建DataStream API程序并使用MongoDBSource。代碼示例如下:
Java
MongoDBSource.builder()
.hosts("mongo.example.com:27017")
.username("mongouser")
.password("mongopasswd")
.databaseList("testdb")
.collectionList("testcoll")
.startupOptions(StartupOptions.initial())
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
XML
Maven中央倉庫已經放置了VVR MongoDB連接器,以供您在作業開發時直接使用。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mongodb</artifactId>
<version>${vvr.version}</version>
</dependency>
在使用DataStream API時,若要啟用增量快照功能,請在構造MongoDBSource數據源時,使用com.ververica.cdc.connectors.mongodb.source
包中的MongoDBSource#builder()
;否則,使用com.ververica.cdc.connectors.mongodb
中的MongoDBSource#builder()
。
在構造MongoDBSource時,可以配置以下參數:
參數 | 說明 |
hosts | 需要連接的MongoDB數據庫的主機名稱。 |
username | MongoDB數據庫服務的用戶名。 說明 若MongoDB服務器未啟用鑒權,則無需配置此參數。 |
password | MongoDB數據庫服務的密碼。 說明 若MongoDB服務器未啟用鑒權,則無需配置此參數。 |
databaseList | 需要監控的MongoDB數據庫名稱。 說明 數據庫名稱支持正則表達式以讀取多個數據庫的數據,您可以使用 |
collectionList | 需要監控的MongoDB集合名稱。 說明 集合名稱支持正則表達式以讀取多個集合的數據,您可以使用 |
startupOptions | 選擇MongoDB CDC的啟動模式。 合法的取值包括:
詳情請參見Startup Properties。 |
deserializer | 反序列化器,將SourceRecord類型記錄反序列化到指定類型。參數取值如下:
|