本文為您介紹云消息隊列 RocketMQ 版連接器。
鑒于云消息隊列 RocketMQ 版 4.x標準版實例共享API調用彈性上限為每秒5000次,使用該版本的消息中間件在與實時計算Flink版對接時,若超過上限會觸發限流機制,可能會導致Flink作業運行不穩定。因此,在選擇消息中間件時,如果您正在或計劃通過標準版RocketMQ與Flink對接,請您謹慎評估。如果業務場景允許,請考慮使用Kafka、日志服務(SLS)或DataHub等其他中間件進行替代。如果您確實需要使用云消息隊列 RocketMQ 版 4.x標準版處理大規模的消息,也請同時通過提交工單與RocketMQ產品取得聯系申請提升限速上限。
背景信息
云消息隊列 RocketMQ 版是阿里云基于Apache RocketMQ構建的低延遲、高并發、高可用和高可靠的分布式消息中間件。其既可為分布式應用系統提供異步解耦和削峰填谷的能力,同時也具備互聯網應用所需的海量消息堆積、高吞吐和可靠重試等特性。
RocketMQ連接器支持的信息如下。
類別 | 詳情 |
支持類型 | 源表和結果表 |
運行模式 | 僅支持流模式 |
數據格式 | CSV和二進制格式 |
特有監控指標 |
說明 指標含義詳情,請參見監控指標說明。 |
API種類 | Datastream(僅支持RocketMQ 4.x)和SQL |
是否支持更新或刪除結果表數據 | 不支持刪除結果表數據,只支持插入和更新數據。 |
特色功能
RocketMQ源表和結果表支持屬性字段,具體如下。
源表屬性字段
說明僅在VVR 3.0.1及以上版本支持獲取以下RocketMQ屬性字段。
字段名
字段類型
說明
topic
VARCHAR METADATA VIRTUAL
消息Topic。
queue-id
INT METADATA VIRTUAL
消息隊列ID。
queue-offset
BIGINT METADATA VIRTUAL
消息隊列的消費位點。
msg-id
VARCHAR METADATA VIRTUAL
消息ID。
store-timestamp
TIMESTAMP(3) METADATA VIRTUAL
消息存儲時間。
born-timestamp
TIMESTAMP(3) METADATA VIRTUAL
消息生成時間。
keys
VARCHAR METADATA VIRTUAL
消息Keys。
tags
VARCHAR METADATA VIRTUAL
消息Tags。
結果表屬性字段
說明僅實時計算引擎VVR 4.0.0及以上版本支持以下RocketMQ屬性字段。
字段名
字段類型
說明
keys
VARCHAR METADATA
消息Keys。
tags
VARCHAR METADATA
消息Tags。
前提條件
已創建了RocketMQ資源,詳情請參見創建資源。
使用限制
僅Flink實時計算引擎VVR 2.0.0及以上版本支持RocketMQ連接器。
僅Flink實時計算引擎VVR 8.0.3及以上版本支持5.x版本的RocketMQ。
在Flink實時計算引擎VVR 6.0.2以下版本,源表的并發度必須小于等于RocketMQ topic的分區數,在實時計算引擎VVR 6.0.2及以上版本解除該限制。您可以提前設置大于分區數的并發度,不需要因RocketMQ的縮容而手動調整作業并發度。
RocketMQ連接器使用Pull Consumer消費,所有的子任務分擔消費。
語法結構
CREATE TABLE mq_source(
x varchar,
y varchar,
z varchar
) WITH (
'connector' = 'mq5',
'topic' = '<yourTopicName>',
'endpoint' = '<yourEndpoint>',
'consumerGroup' = '<yourConsumerGroup>'
);
WITH參數
通用
參數
說明
數據類型
是否必填
默認值
備注
connector
connector類型。
String
是
無
RocketMQ 4.x固定值為
mq
。RocketMQ 5.x固定值為
mq5
。
endPoint
EndPoint地址
String
是
無
云消息隊列 RocketMQ 版接入地址支持以下兩種類型:
VVR 3.0.1及以上版本的作業:需要使用TCP協議客戶端接入點,詳情請參見
內網服務MQ(阿里云經典網絡/VPC)接入地址:在MQ控制臺目標實例詳情中,選擇
,獲取對應的EndPoint。公網服務MQ接入地址:在MQ控制臺目標實例詳情中,選擇
,獲取對應的EndPoint。
重要由于阿里云網絡安全策略動態變化,實時計算連接公網服務MQ時可能會出現網絡連接問題,推薦您使用內網服務MQ。
內網服務無法跨域訪問。例如,您所購買的實時計算服務的地域為華東1,但是購買的RocketMQ服務的地域為華東2(上海),則無法訪問。
通過公網方式訪問RocketMQ,需要配置NAT,詳情請參見創建和管理公網NAT網關實例。
VVR 3.0.1以下版本的作業:RocketMQ舊的接入點已不可用,您需要適配升級實時計算作業。
重要如果您已使用了VVR 3.0.1以下版本的RocketMQ連接器,則您需要將您的實時計算作業升級至VVR 3.0.1及以上版本,并將作業中EndPoint參數取值更改為新的RocketMQ接入點,舊的接入點存在穩定性風險或不可用的問題,詳情請參見實時計算Flink版產品公告
topic
topic名稱。
String
是
無
無。
accessId
4.x:阿里云賬號的AccessKey ID。
5.x:
RocketMQ實例用戶名
String
RocketMQ 4.x:是
RocketMQ 5.x:否
無
RocketMQ 4.x:詳情請參見如何查看AccessKey ID和AccessKey Secret信息?
重要為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變量管理。
RocketMQ 5.x:如果是使用公網接入點訪問,需配置為RocketMQ控制臺實例用戶名。如果是在阿里云ECS內網訪問,無需填寫該配置。
accessKey
4.x: 阿里云賬號的AccessKey Secret。
5.x:實例密碼
String
RocketMQ 4.x:是
RocketMQ 5.x:否
無
RocketMQ 4.x:詳情請參見如何查看AccessKey ID和AccessKey Secret信息?
重要為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變量管理。
RocketMQ 5.x:如果是使用公網接入點訪問,需配置為RocketMQ控制臺實例密碼。如果是在阿里云ECS內網訪問,無需填寫該配置。
tag
訂閱或寫入的標簽
String
否
無
RocketMQ作為源表時,只能讀取單個tag。
RocketMQ作為結果表時,支持設置多個tag,以逗號(,)進行分隔。
說明當作為結果表時,僅支持RocketMQ 4.x。RocketMQ 5.x請使用結果表屬性字段來指定寫出消息的 tag。
nameServerSubgroup
NameServer組。
String
否
無
內網服務(阿里云經典網絡或VPC):必須配置
'nameServerSubgroup' = 'nsaddr4client-internal'
。公網服務:無需配置
nameServerSubgroup
。
說明僅VVR 2.1.1-VVR 3.0.0版本支持該參數,VVR 3.0.1及以后版本不支持該參數。
encoding
編碼格式。
String
否
UTF-8
無。
instanceID
RocketMQ實例ID。
String
否
無
如果RocketMQ實例無獨立命名空間,則不可以使用instanceID參數。
如果RocketMQ實例有獨立命名空間,則instanceID參數必選。
說明僅RocketMQ 4.x支持該參數,RocketMQ 5.x不需要配置該參數。
源表獨有
參數
說明
數據類型
是否必填
默認值
備注
consumerGroup
Consumer組名。
String
是
無
無。
pullIntervalMs
上游沒有數據可供消費時,source的休眠時間。
Int
是
無
單位為毫秒。
目前沒有限流機制,無法設置讀取RocketMQ的速率。
說明僅RocketMQ 4.x支持該參數,RocketMQ 5.x不需要配置該參數。
timeZone
時區。
String
否
無
例如,Asia/Shanghai。
startTimeMs
啟動時間點。
Long
否
無
時間戳,單位為毫秒。
startMessageOffset
消息開始的偏移量。
Int
否
無
如果填寫該參數,則優先以
startMessageOffset
的位點開始加載數據。lineDelimiter
解析Block時,行分隔符。
String
否
\n
無。
fieldDelimiter
字段分隔符。
String
否
\u0001
根據MQ終端的模式,分隔符分別為:
在只讀模式下(默認模式),分隔符為
\u0001
。該模式下,分隔符不可見。在編輯模式下,分隔符為
^A
。
lengthCheck
單行字段條數檢查策略。
Int
否
NONE
取值如下:
NONE:默認值。
解析出的字段數大于定義字段數時,按從左到右的順序,取定義字段數量的數據。
解析出的字段數小于定義字段數時,跳過這行數據。
SKIP:解析出的字段數和定義字段數不同時跳過數據。
EXCEPTION:解析出的字段數和定義字段數不同時提示異常。
PAD:按從左到右順序填充。
解析出的字段數大于定義字段數時,按從左到右的順序,取定義字段數量的數據。
解析出的字段數小于定義字段數時,在行尾用Null填充缺少的字段。
說明SKIP、EXCEPTION和PAD為可選值。
columnErrorDebug
是否打開調試開關。
Boolean
否
false
如果設置為true,則打印解析異常的Log。
pullBatchSize
每次拉取消息的最大數量。
Int
否
64
僅實時計算引擎VVR 8.0.7及以上版本支持該參數。
結果表獨有
參數
說明
數據類型
是否必填
默認值
備注
producerGroup
寫入的群組。
String
是
無
無。
retryTimes
寫入的重試次數。
Int
否
10
無。
sleepTimeMs
重試間隔時間。
Long
否
5000
無。
partitionField
指定字段名,將該字段作為分區列。
String
否
無
如果
mode
為partition
,則該參數必填。說明僅實時計算引擎VVR 8.0.5及以上版本支持該參數。
類型映射
Flink字段類型 | 云消息隊列RocketMQ字段類型 |
VARCHAR | STRING |
代碼示例
源表示例
CSV格式
假設您的一條CSV格式消息記錄如下。
1,name,male 2,name,female
說明一條RocketMQ消息可以包括零條到多條數據記錄,記錄之間使用
\n
分隔。Flink作業中,聲明RocketMQ數據源表的DDL如下。
RocketMQ 5.x
CREATE TABLE mq_source( id varchar, name varchar, gender varchar, topic varchar metadata virtual ) WITH ( 'connector' = 'mq5', 'topic' = 'mq-test', 'endpoint' = '<yourEndpoint>', 'consumerGroup' = 'mq-group', 'fieldDelimiter' = ',' );
RocketMQ 4.x
CREATE TABLE mq_source( id varchar, name varchar, gender varchar, topic varchar metadata virtual ) WITH ( 'connector' = 'mq', 'topic' = 'mq-test', 'endpoint' = '<yourEndpoint>', 'pullIntervalMs' = '1000', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'consumerGroup' = 'mq-group', 'fieldDelimiter' = ',' );
二進制格式
RocketMQ 5.x
CREATE TEMPORARY TABLE source_table ( mess varbinary ) WITH ( 'connector' = 'mq5', 'endpoint' = '<yourEndpoint>', 'topic' = 'mq-test', 'consumerGroup' = 'mq-group' ); CREATE TEMPORARY TABLE out_table ( commodity varchar ) WITH ( 'connector' = 'print' ); INSERT INTO out_table select cast(mess as varchar) FROM source_table;
RocketMQ 4.x
CREATE TEMPORARY TABLE source_table ( mess varbinary ) WITH ( 'connector' = 'mq', 'endpoint' = '<yourEndpoint>', 'pullIntervalMs' = '500', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'topic' = 'mq-test', 'consumerGroup' = 'mq-group' ); CREATE TEMPORARY TABLE out_table ( commodity varchar ) WITH ( 'connector' = 'print' ); INSERT INTO out_table select cast(mess as varchar) FROM source_table;
結果表示例
創建結果表
RocketMQ 5.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR ) WITH ( 'connector'='mq5', 'endpoint'='<yourEndpoint>', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );
RocketMQ 4.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR ) WITH ( 'connector'='mq', 'endpoint'='<yourEndpoint>', 'accessId'='${secret_values.ak_id}', 'accessKey'='${secret_values.ak_secret}', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );
說明如果您的MQ消息為二進制格式,則DDL中只能定義一個字段,且字段類型必須為VARBINARY。
創建將
keys
和tags
字段指定為RocketMQ消息的key和tag的結果表RocketMQ 5.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR, keys VARCHAR METADATA, tags VARCHAR METADATA ) WITH ( 'connector'='mq5', 'endpoint'='<yourEndpoint>', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );
RocketMQ 4.x
CREATE TABLE mq_sink ( id INTEGER, len BIGINT, content VARCHAR, keys VARCHAR METADATA, tags VARCHAR METADATA ) WITH ( 'connector'='mq', 'endpoint'='<yourEndpoint>', 'accessId'='${secret_values.ak_id}', 'accessKey'='${secret_values.ak_secret}', 'topic'='<yourTopicName>', 'producerGroup'='<yourGroupName>' );
DataStream API
通過DataStream的方式讀寫數據時,則需要使用對應的DataStream連接器連接Flink全托管,DataStream連接器設置方法請參見DataStream連接器使用方法。
實時計算引擎VVR提供MetaQSource
,用于讀取RocketMQ;提供OutputFormat
的實現類MetaQOutputFormat
,用于寫入RocketMQ。讀取RocketMQ和寫入RocketMQ的示例如下:
RocketMQ 4.x
/**
* A demo that illustrates how to consume messages from RocketMQ, convert
* messages, then produce messages to RocketMQ.
*/
public class RocketMQDataStreamDemo {
public static final String ENDPOINT = "<yourEndpoint>";
public static final String ACCESS_ID = "<accessID>";
public static final String ACCESS_KEY = "<accessKey>";
public static final String INSTANCE_ID = "<instanceID>";
public static final String SOURCE_TOPIC = "<sourceTopicName>";
public static final String CONSUMER_GROUP = "<consumerGroup>";
public static final String SINK_TOPIC = "<sinkTopicName>";
public static final String PRODUCER_GROUP = "<producerGroup>";
public static void main(String[] args) throws Exception {
// Sets up the streaming execution environment
Configuration conf = new Configuration();
// 以下兩個配置僅本地調試時使用,需要在作業打包上傳到阿里云實時計算Flink版之前刪除
conf.setString("pipeline.classpaths", "file://" + "uber jar絕對路徑");
conf.setString("classloader.parent-first-patterns.additional",
"com.alibaba.ververica.connectors.metaq.source.reader.deserializer.MetaQRecordDeserializationSchema;com.alibaba.ververica.connector.mq.shaded.");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
// Creates and adds RocketMQ source.
env.fromSource(createRocketMQSource(), WatermarkStrategy.noWatermarks(), "source")
// Converts message body to upper case.
.map(RocketMQDataStreamDemo2::convertMessages)
// Creates and adds RocketMQ sink.
.addSink(new OutputFormatSinkFunction<>(createRocketMQOutputFormat()))
.name(RocketMQDataStreamDemo2.class.getSimpleName());
// Compiles and submits job.
env.execute("RocketMQ connector end-to-end DataStream demo");
}
private static MetaQSource<MessageExt> createRocketMQSource() {
Properties mqProperties = createMQProperties();
return new MetaQSource<>(SOURCE_TOPIC,
CONSUMER_GROUP,
null, // always null
null, // tag of the messages to consumer
Long.MAX_VALUE, // stop timestamp in milliseconds
-1, // Start timestamp in milliseconds. Set to -1 to disable starting from offset.
0, // Start offset.
300_000, // Partition discover interval.
mqProperties,
Boundedness.CONTINUOUS_UNBOUNDED,
new MyDeserializationSchema());
}
private static MetaQOutputFormat createRocketMQOutputFormat() {
return new MetaQOutputFormat.Builder()
.setTopicName(SINK_TOPIC)
.setProducerGroup(PRODUCER_GROUP)
.setMqProperties(createMQProperties())
.build();
}
private static Properties createMQProperties() {
Properties properties = new Properties();
properties.put(PROPERTY_ONS_CHANNEL, "ALIYUN");
properties.put(NAMESRV_ADDR, ENDPOINT);
properties.put(PROPERTY_ACCESSKEY, ACCESS_ID);
properties.put(PROPERTY_SECRETKEY, ACCESS_KEY);
properties.put(PROPERTY_ROCKET_AUTH_ENABLED, true);
properties.put(PROPERTY_INSTANCE_ID, INSTANCE_ID);
return properties;
}
private static List<MessageExt> convertMessages(MessageExt messages) {
return Collections.singletonList(messages);
}
public static class MyDeserializationSchema implements MetaQRecordDeserializationSchema<MessageExt> {
@Override
public void deserialize(List<MessageExt> list, Collector<MessageExt> collector) {
for (MessageExt messageExt : list) {
collector.collect(messageExt);
}
}
@Override
public TypeInformation<MessageExt> getProducedType() {
return TypeInformation.of(MessageExt.class);
}
}
}
}
}
RocketMQ 5.x
/**
* A demo that illustrates how to consume messages from RocketMQ, convert
* messages, then produce messages to RocketMQ.
*/
public class RocketMQ5DataStreamDemo {
public static final String ENDPOINT = "<yourEndpoint>";
public static final String ACCESS_ID = "<accessID>";
public static final String ACCESS_KEY = "<accessKey>";
public static final String SOURCE_TOPIC = "<sourceTopicName>";
public static final String CONSUMER_GROUP = "<consumerGroup>";
public static final String SINK_TOPIC = "<sinkTopicName>";
public static final String PRODUCER_GROUP = "<producerGroup>";
public static void main(String[] args) throws Exception {
// Sets up the streaming execution environment
Configuration conf = new Configuration();
// 以下兩個配置僅本地調試時使用,需要在作業打包上傳到阿里云實時計算Flink版之前刪除
conf.setString("pipeline.classpaths", "file://" + "uber jar絕對路徑");
conf.setString(
"classloader.parent-first-patterns.additional",
"com.alibaba.ververica.connectors.mq5.source.reader.deserializer.RocketMQRecordDeserializationSchema;com.alibaba.ververica.connectors.mq5.shaded.");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
final DataStreamSource<String> ds =
env.fromSource(
RocketMQSource.<String>builder()
.setEndpoint(ENDPOINT)
.setAccessId(ACCESS_ID)
.setAccessKey(ACCESS_KEY)
.setTopic(SOURCE_TOPIC)
.setConsumerGroup(CONSUMER_GROUP)
.setDeserializationSchema(new MyDeserializer())
.setStartOffset(1)
.build(),
WatermarkStrategy.noWatermarks(),
"source");
ds.map(new ToMessage())
.addSink(
new OutputFormatSinkFunction<>(
new RocketMQOutputFormat.Builder()
.setEndpoint(ENDPOINT)
.setAccessId(ACCESS_ID)
.setAccessKey(ACCESS_KEY)
.setTopicName(SINK_TOPIC)
.setProducerGroup(PRODUCER_GROUP)
.build()));
env.execute();
}
private static class MyDeserializer implements RocketMQRecordDeserializationSchema<String> {
@Override
public void deserialize(List<MessageExt> record, Collector<String> out) {
for (MessageExt messageExt : record) {
out.collect(new String(messageExt.getBody()));
}
}
@Override
public TypeInformation<String> getProducedType() {
return Types.STRING;
}
}
private static class ToMessage implements MapFunction<String, List<MessageExt>> {
public ToMessage() {
}
@Override
public List<MessageExt> map(String s) {
final MessageExt message = new MessageExt();
message.setBody(s.getBytes());
message.setWaitStoreMsgOK(true);
return Collections.singletonList(message);
}
}
}
XML
Maven中央庫中已經放置了MQ DataStream連接器。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mq</artifactId>
<version>${vvr-version}</version>
</dependency>
RocketMQ接入點Endpoint配置詳情請參見關于TCP內網接入點設置的公告。