日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

云消息隊列 RocketMQ 版

本文為您介紹云消息隊列 RocketMQ 版連接器。

重要

鑒于云消息隊列 RocketMQ 版 4.x標準版實例共享API調用彈性上限為每秒5000次,使用該版本的消息中間件在與實時計算Flink版對接時,若超過上限會觸發限流機制,可能會導致Flink作業運行不穩定。因此,在選擇消息中間件時,如果您正在或計劃通過標準版RocketMQ與Flink對接,請您謹慎評估。如果業務場景允許,請考慮使用Kafka、日志服務(SLS)或DataHub等其他中間件進行替代。如果您確實需要使用云消息隊列 RocketMQ 版 4.x標準版處理大規模的消息,也請同時通過提交工單與RocketMQ產品取得聯系申請提升限速上限。

背景信息

云消息隊列 RocketMQ 版是阿里云基于Apache RocketMQ構建的低延遲、高并發、高可用和高可靠的分布式消息中間件。其既可為分布式應用系統提供異步解耦和削峰填谷的能力,同時也具備互聯網應用所需的海量消息堆積、高吞吐和可靠重試等特性。

RocketMQ連接器支持的信息如下。

類別

詳情

支持類型

源表和結果表

運行模式

僅支持流模式

數據格式

CSV和二進制格式

特有監控指標

  • 源表

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerScond

    • currentEmitEventTimeLag

    • currentFetchEventTimeLag

    • sourceIdleTime

  • 結果表

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

說明

指標含義詳情,請參見監控指標說明

API種類

Datastream(僅支持RocketMQ 4.x)和SQL

是否支持更新或刪除結果表數據

不支持刪除結果表數據,只支持插入和更新數據。

特色功能

RocketMQ源表和結果表支持屬性字段,具體如下。

  • 源表屬性字段

    說明

    僅在VVR 3.0.1及以上版本支持獲取以下RocketMQ屬性字段。

    字段名

    字段類型

    說明

    topic

    VARCHAR METADATA VIRTUAL

    消息Topic。

    queue-id

    INT METADATA VIRTUAL

    消息隊列ID。

    queue-offset

    BIGINT METADATA VIRTUAL

    消息隊列的消費位點。

    msg-id

    VARCHAR METADATA VIRTUAL

    消息ID。

    store-timestamp

    TIMESTAMP(3) METADATA VIRTUAL

    消息存儲時間。

    born-timestamp

    TIMESTAMP(3) METADATA VIRTUAL

    消息生成時間。

    keys

    VARCHAR METADATA VIRTUAL

    消息Keys。

    tags

    VARCHAR METADATA VIRTUAL

    消息Tags。

  • 結果表屬性字段

    說明

    僅實時計算引擎VVR 4.0.0及以上版本支持以下RocketMQ屬性字段。

    字段名

    字段類型

    說明

    keys

    VARCHAR METADATA

    消息Keys。

    tags

    VARCHAR METADATA

    消息Tags。

前提條件

已創建了RocketMQ資源,詳情請參見創建資源

使用限制

  • 僅Flink實時計算引擎VVR 2.0.0及以上版本支持RocketMQ連接器。

  • 僅Flink實時計算引擎VVR 8.0.3及以上版本支持5.x版本的RocketMQ。

  • 在Flink實時計算引擎VVR 6.0.2以下版本,源表的并發度必須小于等于RocketMQ topic的分區數,在實時計算引擎VVR 6.0.2及以上版本解除該限制。您可以提前設置大于分區數的并發度,不需要因RocketMQ的縮容而手動調整作業并發度。

  • RocketMQ連接器使用Pull Consumer消費,所有的子任務分擔消費。

語法結構

CREATE TABLE mq_source(
  x varchar,
  y varchar,
  z varchar
) WITH (
  'connector' = 'mq5',
  'topic' = '<yourTopicName>',
  'endpoint' = '<yourEndpoint>',
  'consumerGroup' = '<yourConsumerGroup>'
);

WITH參數

  • 通用

    參數

    說明

    數據類型

    是否必填

    默認值

    備注

    connector

    connector類型。

    String

    • RocketMQ 4.x固定值為mq

    • RocketMQ 5.x固定值為mq5

    endPoint

    EndPoint地址

    String

    云消息隊列 RocketMQ 版接入地址支持以下兩種類型:

    • VVR 3.0.1及以上版本的作業:需要使用TCP協議客戶端接入點,詳情請參見

      • 內網服務MQ(阿里云經典網絡/VPC)接入地址:在MQ控制臺目標實例詳情中,選擇接入點 > TCP協議客戶端接入點 > 內網訪問,獲取對應的EndPoint。

      • 公網服務MQ接入地址:在MQ控制臺目標實例詳情中,選擇接入點 > TCP協議 > 客戶端接入點 > 公網訪問,獲取對應的EndPoint。

      重要

      由于阿里云網絡安全策略動態變化,實時計算連接公網服務MQ時可能會出現網絡連接問題,推薦您使用內網服務MQ。

      • 內網服務無法跨域訪問。例如,您所購買的實時計算服務的地域為華東1,但是購買的RocketMQ服務的地域為華東2(上海),則無法訪問。

      • 通過公網方式訪問RocketMQ,需要配置NAT,詳情請參見創建和管理公網NAT網關實例

    • VVR 3.0.1以下版本的作業:RocketMQ舊的接入點已不可用,您需要適配升級實時計算作業。

      重要

      如果您已使用了VVR 3.0.1以下版本的RocketMQ連接器,則您需要將您的實時計算作業升級至VVR 3.0.1及以上版本,并將作業中EndPoint參數取值更改為新的RocketMQ接入點,舊的接入點存在穩定性風險或不可用的問題,詳情請參見實時計算Flink版產品公告

    topic

    topic名稱。

    String

    無。

    accessId

    • 4.x:阿里云賬號的AccessKey ID。

    • 5.x:

      RocketMQ實例用戶名

    String

    • RocketMQ 4.x:是

    • RocketMQ 5.x:否

    重要

    為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變量管理

    • RocketMQ 5.x:如果是使用公網接入點訪問,需配置為RocketMQ控制臺實例用戶名。如果是在阿里云ECS內網訪問,無需填寫該配置。

    accessKey

    • 4.x: 阿里云賬號的AccessKey Secret。

    • 5.x:實例密碼

    String

    • RocketMQ 4.x:是

    • RocketMQ 5.x:否

    重要

    為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變量管理

    • RocketMQ 5.x:如果是使用公網接入點訪問,需配置為RocketMQ控制臺實例密碼。如果是在阿里云ECS內網訪問,無需填寫該配置。

    tag

    訂閱或寫入的標簽

    String

    • RocketMQ作為源表時,只能讀取單個tag。

    • RocketMQ作為結果表時,支持設置多個tag,以逗號(,)進行分隔。

    說明

    當作為結果表時,僅支持RocketMQ 4.x。RocketMQ 5.x請使用結果表屬性字段來指定寫出消息的 tag。

    nameServerSubgroup

    NameServer組。

    String

    • 內網服務(阿里云經典網絡或VPC):必須配置'nameServerSubgroup' = 'nsaddr4client-internal'

    • 公網服務:無需配置nameServerSubgroup

    說明

    僅VVR 2.1.1-VVR 3.0.0版本支持該參數,VVR 3.0.1及以后版本不支持該參數。

    encoding

    編碼格式。

    String

    UTF-8

    無。

    instanceID

    RocketMQ實例ID。

    String

    • 如果RocketMQ實例無獨立命名空間,則不可以使用instanceID參數。

    • 如果RocketMQ實例有獨立命名空間,則instanceID參數必選。

    說明

    僅RocketMQ 4.x支持該參數,RocketMQ 5.x不需要配置該參數。

  • 源表獨有

    參數

    說明

    數據類型

    是否必填

    默認值

    備注

    consumerGroup

    Consumer組名。

    String

    無。

    pullIntervalMs

    上游沒有數據可供消費時,source的休眠時間。

    Int

    單位為毫秒。

    目前沒有限流機制,無法設置讀取RocketMQ的速率。

    說明

    僅RocketMQ 4.x支持該參數,RocketMQ 5.x不需要配置該參數。

    timeZone

    時區。

    String

    例如,Asia/Shanghai。

    startTimeMs

    啟動時間點。

    Long

    時間戳,單位為毫秒。

    startMessageOffset

    消息開始的偏移量。

    Int

    如果填寫該參數,則優先以startMessageOffset的位點開始加載數據。

    lineDelimiter

    解析Block時,行分隔符。

    String

    \n

    無。

    fieldDelimiter

    字段分隔符。

    String

    \u0001

    根據MQ終端的模式,分隔符分別為:

    • 在只讀模式下(默認模式),分隔符為\u0001。該模式下,分隔符不可見。

    • 在編輯模式下,分隔符為^A

    lengthCheck

    單行字段條數檢查策略。

    Int

    NONE

    取值如下:

    • NONE:默認值。

      • 解析出的字段數大于定義字段數時,按從左到右的順序,取定義字段數量的數據。

      • 解析出的字段數小于定義字段數時,跳過這行數據。

    • SKIP:解析出的字段數和定義字段數不同時跳過數據。

    • EXCEPTION:解析出的字段數和定義字段數不同時提示異常。

    • PAD:按從左到右順序填充。

      • 解析出的字段數大于定義字段數時,按從左到右的順序,取定義字段數量的數據。

      • 解析出的字段數小于定義字段數時,在行尾用Null填充缺少的字段。

    說明

    SKIP、EXCEPTION和PAD為可選值。

    columnErrorDebug

    是否打開調試開關。

    Boolean

    false

    如果設置為true,則打印解析異常的Log。

    pullBatchSize

    每次拉取消息的最大數量。

    Int

    64

    僅實時計算引擎VVR 8.0.7及以上版本支持該參數。

  • 結果表獨有

    參數

    說明

    數據類型

    是否必填

    默認值

    備注

    producerGroup

    寫入的群組。

    String

    無。

    retryTimes

    寫入的重試次數。

    Int

    10

    無。

    sleepTimeMs

    重試間隔時間。

    Long

    5000

    無。

    partitionField

    指定字段名,將該字段作為分區列。

    String

    如果modepartition,則該參數必填。

    說明

    僅實時計算引擎VVR 8.0.5及以上版本支持該參數。

類型映射

Flink字段類型

云消息隊列RocketMQ字段類型

VARCHAR

STRING

代碼示例

  • 源表示例

    • CSV格式

      假設您的一條CSV格式消息記錄如下。

      1,name,male 
      2,name,female
      說明

      一條RocketMQ消息可以包括零條到多條數據記錄,記錄之間使用\n分隔。

      Flink作業中,聲明RocketMQ數據源表的DDL如下。

      • RocketMQ 5.x

      CREATE TABLE mq_source(
        id varchar,
        name varchar,
        gender varchar,
        topic varchar metadata virtual
      ) WITH (
        'connector' = 'mq5',
        'topic' = 'mq-test',
        'endpoint' = '<yourEndpoint>',
        'consumerGroup' = 'mq-group',
        'fieldDelimiter' = ','
      );
      • RocketMQ 4.x

      CREATE TABLE mq_source(
        id varchar,
        name varchar,
        gender varchar,
        topic varchar metadata virtual
      ) WITH (
        'connector' = 'mq',
        'topic' = 'mq-test',
        'endpoint' = '<yourEndpoint>',
        'pullIntervalMs' = '1000',
        'accessId' = '${secret_values.ak_id}',
        'accessKey' = '${secret_values.ak_secret}',
        'consumerGroup' = 'mq-group',
        'fieldDelimiter' = ','
      );
    • 二進制格式

      • RocketMQ 5.x

        CREATE TEMPORARY TABLE source_table (
          mess varbinary
        ) WITH (
          'connector' = 'mq5',
          'endpoint' = '<yourEndpoint>',
          'topic' = 'mq-test',
          'consumerGroup' = 'mq-group'
        );
        
        CREATE TEMPORARY TABLE out_table (
          commodity varchar
        ) WITH (
          'connector' = 'print'
        );
        
        INSERT INTO out_table
        select 
          cast(mess as varchar)
        FROM source_table;

      • RocketMQ 4.x

        CREATE TEMPORARY TABLE source_table (
          mess varbinary
        ) WITH (
          'connector' = 'mq',
          'endpoint' = '<yourEndpoint>',
          'pullIntervalMs' = '500',
          'accessId' = '${secret_values.ak_id}',
          'accessKey' = '${secret_values.ak_secret}',
          'topic' = 'mq-test',
          'consumerGroup' = 'mq-group'
        );
        
        CREATE TEMPORARY TABLE out_table (
          commodity varchar
        ) WITH (
          'connector' = 'print'
        );
        
        INSERT INTO out_table
        select 
          cast(mess as varchar)
        FROM source_table;
  • 結果表示例

    • 創建結果表

      • RocketMQ 5.x

        CREATE TABLE mq_sink (
          id INTEGER,
          len BIGINT,
          content VARCHAR
        ) WITH (
          'connector'='mq5',
          'endpoint'='<yourEndpoint>',
          'topic'='<yourTopicName>',
          'producerGroup'='<yourGroupName>'
        );
      • RocketMQ 4.x

        CREATE TABLE mq_sink (
          id INTEGER,
          len BIGINT,
          content VARCHAR
        ) WITH (
          'connector'='mq',
          'endpoint'='<yourEndpoint>',
          'accessId'='${secret_values.ak_id}',
          'accessKey'='${secret_values.ak_secret}',
          'topic'='<yourTopicName>',
          'producerGroup'='<yourGroupName>'
        );
        說明

        如果您的MQ消息為二進制格式,則DDL中只能定義一個字段,且字段類型必須為VARBINARY。

    • 創建將keystags字段指定為RocketMQ消息的key和tag的結果表

      • RocketMQ 5.x

        CREATE TABLE mq_sink (
          id INTEGER,
          len BIGINT,
          content VARCHAR,
          keys VARCHAR METADATA,
          tags VARCHAR METADATA
        ) WITH (
          'connector'='mq5',
          'endpoint'='<yourEndpoint>',
          'topic'='<yourTopicName>',
          'producerGroup'='<yourGroupName>'
        );
      • RocketMQ 4.x

        CREATE TABLE mq_sink (
          id INTEGER,
          len BIGINT,
          content VARCHAR,
          keys VARCHAR METADATA,
          tags VARCHAR METADATA
        ) WITH (
          'connector'='mq',
          'endpoint'='<yourEndpoint>',
          'accessId'='${secret_values.ak_id}',
          'accessKey'='${secret_values.ak_secret}',
          'topic'='<yourTopicName>',
          'producerGroup'='<yourGroupName>'
        );

DataStream API

重要

通過DataStream的方式讀寫數據時,則需要使用對應的DataStream連接器連接Flink全托管,DataStream連接器設置方法請參見DataStream連接器使用方法

實時計算引擎VVR提供MetaQSource,用于讀取RocketMQ;提供OutputFormat的實現類MetaQOutputFormat,用于寫入RocketMQ。讀取RocketMQ和寫入RocketMQ的示例如下:

RocketMQ 4.x

/**
 * A demo that illustrates how to consume messages from RocketMQ, convert
 * messages, then produce messages to RocketMQ.
 */
public class RocketMQDataStreamDemo {

    public static final String ENDPOINT = "<yourEndpoint>";
    public static final String ACCESS_ID = "<accessID>";
    public static final String ACCESS_KEY = "<accessKey>";
    public static final String INSTANCE_ID = "<instanceID>";
    public static final String SOURCE_TOPIC = "<sourceTopicName>";
    public static final String CONSUMER_GROUP = "<consumerGroup>";
    public static final String SINK_TOPIC = "<sinkTopicName>";
    public static final String PRODUCER_GROUP = "<producerGroup>";

    public static void main(String[] args) throws Exception {
        // Sets up the streaming execution environment
        Configuration conf = new Configuration();

        // 以下兩個配置僅本地調試時使用,需要在作業打包上傳到阿里云實時計算Flink版之前刪除
        conf.setString("pipeline.classpaths", "file://" + "uber jar絕對路徑");
        conf.setString("classloader.parent-first-patterns.additional",
                "com.alibaba.ververica.connectors.metaq.source.reader.deserializer.MetaQRecordDeserializationSchema;com.alibaba.ververica.connector.mq.shaded.");
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        // Creates and adds RocketMQ source.
        env.fromSource(createRocketMQSource(), WatermarkStrategy.noWatermarks(), "source")
                // Converts message body to upper case.
                .map(RocketMQDataStreamDemo2::convertMessages)
                // Creates and adds RocketMQ sink.
                .addSink(new OutputFormatSinkFunction<>(createRocketMQOutputFormat()))
                .name(RocketMQDataStreamDemo2.class.getSimpleName());
        // Compiles and submits job.
        env.execute("RocketMQ connector end-to-end DataStream demo");
    }

    private static MetaQSource<MessageExt> createRocketMQSource() {
        Properties mqProperties = createMQProperties();

        return new MetaQSource<>(SOURCE_TOPIC,
                CONSUMER_GROUP,
                null, // always null
                null, // tag of the messages to consumer
                Long.MAX_VALUE, // stop timestamp in milliseconds
                -1, // Start timestamp in milliseconds. Set to -1 to disable starting from offset.
                0, // Start offset.
                300_000, // Partition discover interval.
                mqProperties,
                Boundedness.CONTINUOUS_UNBOUNDED,
                new MyDeserializationSchema());
    }

    private static MetaQOutputFormat createRocketMQOutputFormat() {
        return new MetaQOutputFormat.Builder()
                .setTopicName(SINK_TOPIC)
                .setProducerGroup(PRODUCER_GROUP)
                .setMqProperties(createMQProperties())
                .build();
    }

    private static Properties createMQProperties() {
        Properties properties = new Properties();
        properties.put(PROPERTY_ONS_CHANNEL, "ALIYUN");
        properties.put(NAMESRV_ADDR, ENDPOINT);
        properties.put(PROPERTY_ACCESSKEY, ACCESS_ID);
        properties.put(PROPERTY_SECRETKEY, ACCESS_KEY);
        properties.put(PROPERTY_ROCKET_AUTH_ENABLED, true);
        properties.put(PROPERTY_INSTANCE_ID, INSTANCE_ID);
        return properties;
    }

    private static List<MessageExt> convertMessages(MessageExt messages) {
        return Collections.singletonList(messages);
    }

    public static class MyDeserializationSchema implements MetaQRecordDeserializationSchema<MessageExt> {
        @Override
        public void deserialize(List<MessageExt> list, Collector<MessageExt> collector) {
            for (MessageExt messageExt : list) {
                collector.collect(messageExt);
            }
        }

        @Override
        public TypeInformation<MessageExt> getProducedType() {
            return TypeInformation.of(MessageExt.class);
        }
    }
}
    }
}

RocketMQ 5.x

/**
 * A demo that illustrates how to consume messages from RocketMQ, convert
 * messages, then produce messages to RocketMQ.
 */
public class RocketMQ5DataStreamDemo {

    public static final String ENDPOINT = "<yourEndpoint>";
    public static final String ACCESS_ID = "<accessID>";
    public static final String ACCESS_KEY = "<accessKey>";
    public static final String SOURCE_TOPIC = "<sourceTopicName>";
    public static final String CONSUMER_GROUP = "<consumerGroup>";
    public static final String SINK_TOPIC = "<sinkTopicName>";
    public static final String PRODUCER_GROUP = "<producerGroup>";

    public static void main(String[] args) throws Exception {
        // Sets up the streaming execution environment
        Configuration conf = new Configuration();

        // 以下兩個配置僅本地調試時使用,需要在作業打包上傳到阿里云實時計算Flink版之前刪除
        conf.setString("pipeline.classpaths", "file://" + "uber jar絕對路徑");
        conf.setString(
                "classloader.parent-first-patterns.additional",
                "com.alibaba.ververica.connectors.mq5.source.reader.deserializer.RocketMQRecordDeserializationSchema;com.alibaba.ververica.connectors.mq5.shaded.");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        final DataStreamSource<String> ds =
                env.fromSource(
                        RocketMQSource.<String>builder()
                                .setEndpoint(ENDPOINT)
                                .setAccessId(ACCESS_ID)
                                .setAccessKey(ACCESS_KEY)
                                .setTopic(SOURCE_TOPIC)
                                .setConsumerGroup(CONSUMER_GROUP)
                                .setDeserializationSchema(new MyDeserializer())
                                .setStartOffset(1)
                                .build(),
                        WatermarkStrategy.noWatermarks(),
                        "source");

        ds.map(new ToMessage())
                .addSink(
                        new OutputFormatSinkFunction<>(
                                new RocketMQOutputFormat.Builder()
                                        .setEndpoint(ENDPOINT)
                                        .setAccessId(ACCESS_ID)
                                        .setAccessKey(ACCESS_KEY)
                                        .setTopicName(SINK_TOPIC)
                                        .setProducerGroup(PRODUCER_GROUP)
                                        .build()));

        env.execute();
    }

    private static class MyDeserializer implements RocketMQRecordDeserializationSchema<String> {
        @Override
        public void deserialize(List<MessageExt> record, Collector<String> out) {
            for (MessageExt messageExt : record) {
                out.collect(new String(messageExt.getBody()));
            }
        }

        @Override
        public TypeInformation<String> getProducedType() {
            return Types.STRING;
        }
    }

    private static class ToMessage implements MapFunction<String, List<MessageExt>> {

        public ToMessage() {
        }

        @Override
        public List<MessageExt> map(String s) {
            final MessageExt message = new MessageExt();
            message.setBody(s.getBytes());
            message.setWaitStoreMsgOK(true);
            return Collections.singletonList(message);
        }
    }
}

XML

Maven中央庫中已經放置了MQ DataStream連接器

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mq</artifactId>
    <version>${vvr-version}</version>
</dependency>
說明

RocketMQ接入點Endpoint配置詳情請參見關于TCP內網接入點設置的公告

常見問題

RocketMQ Topic擴容時,RocketMQ如何感知Topic分區數變化?