日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Flink消費

Flink Log Connector是日志服務提供的用于對接Flink的工具,支持對接開源Flink和實時計算Flink版。本文介紹如何對接Flink消費日志數據。

前提條件

  • 已開通日志服務。更多信息,請參見開通日志服務。

  • 已創建RAM用戶并完成授權。具體操作,請參見創建RAM用戶并完成授權

  • 已配置環境變量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET。具體操作,請參見在Linux、macOS和Windows系統配置環境變量。

    重要
    • 阿里云賬號的AccessKey擁有所有API的訪問權限,建議您使用RAM用戶的AccessKey進行API訪問或日常運維。

    • 強烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導致AccessKey泄露,威脅您賬號下所有資源的安全。

背景信息

Flink Log Connector包括兩部分,消費者(Flink Log Consumer)和生產者(Flink Log Producer),兩者用途區別如下:

  • 消費者用于從日志服務中讀取數據,支持exactly once語義,支持Shard負載均衡。

  • 生產者用于將數據寫入日志服務。

使用Flink Log Connector時,需要在項目中添加Maven依賴,示例如下:

<dependency>
    <groupId>com.aliyun.openservices</groupId>
    <artifactId>flink-log-connector</artifactId>
    <version>0.1.38</version>
</dependency>
<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>2.5.0</version>
</dependency>

除此之外的代碼編寫,請您參考GitHub上源碼進行編寫。更多信息,請參見aliyun-log-flink-connector

Flink Log Consumer

在Flink Log Connector中,Flink Log Consumer提供了訂閱日志服務中某一個Logstore的能力,實現了exactly once語義,在使用時您無需關心Logstore中Shard數量的變化,Flink Log Consumer會自動感知。

Flink中每一個子任務負責消費Logstore中的部分Shard,如果Logstore中Shard發生分裂或合并,子任務消費的Shard也會隨之改變。

Flink Log Consumer用到的日志服務API接口如下:

  • GetCursorOrData

    用于從Shard中獲取數據,注意頻繁的調用該接口可能會導致數據超過日志服務的Shard限額,可以通過ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLISConfigConstants.LOG_MAX_NUMBER_PER_FETCH控制接口調用的時間間隔和每次調用獲取的日志數量。Shard的限額請參見分區(Shard)。

    示例如下:

    configProps.put(ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS, "100");
    configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "100");
  • ListShards

    用于獲取Logstore中所有的Shard列表及Shard狀態等。如果您的Shard經常發生分裂合并,可以通過調整接口的調用周期來及時發現Shard的變化。示例如下:

    // 設置每30s調用一次ListShards接口。
    configProps.put(ConfigConstants.LOG_SHARDS_DISCOVERY_INTERVAL_MILLIS, "30000");
  • CreateConsumerGroup

    當設置消費進度監控時調用該接口創建ConsumerGroup,用于同步Checkpoint。

  • UpdateCheckPoint

    該接口將Flink的snapshot同步到日志服務的ConsumerGroup中。

  1. 配置啟動參數。

    以下是一個簡單的消費示例,使用java.util.Properties作為配置工具,所有Flink Log Consumer的配置均在ConfigConstants中。

    Properties configProps = new Properties();
    // 設置訪問日志服務的域名。
    configProps.put(ConfigConstants.LOG_ENDPOINT, "cn-hangzhou.log.aliyuncs.com");
    // 本示例從環境變量中獲取AccessKey ID和AccessKey Secret。
    String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    configProps.put(ConfigConstants.LOG_ACCESSKEYID,accessKeyId);
    configProps.put(ConfigConstants.LOG_ACCESSKEY,accessKeySecret);
    // 設置日志服務的project。
    String project = "your-project";
    // 設置日志服務的Logstore。
    String logstore = "your-logstore";
    // 設置消費日志服務起始位置。
    configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR);
    // 設置日志服務的消息反序列化方法。
    FastLogGroupDeserializer deserializer = new FastLogGroupDeserializer();
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<FastLogGroupList> dataStream = env.addSource(
            new FlinkLogConsumer<FastLogGroupList>(project, logstore, deserializer, configProps)
    );
    dataStream.addSink(new SinkFunction<FastLogGroupList>() {
        @Override
        public void invoke(FastLogGroupList logGroupList, Context context) throws Exception {
            for (FastLogGroup logGroup : logGroupList.getLogGroups()) {
                int logsCount = logGroup.getLogsCount();
                String topic = logGroup.getTopic();
                String source = logGroup.getSource();
                for (int i = 0; i < logsCount; ++i) {
                    FastLog row = logGroup.getLogs(i);
                    for (int j = 0; j < row.getContentsCount(); ++j) {
                        FastLogContent column = row.getContents(j);
                        // 處理日志
                        System.out.println(column.getKey());
                        System.out.println(column.getValue());
                    }
                }
            }
        }
    });
    // 或者使用RawLogGroupListDeserializer
    RawLogGroupListDeserializer rawLogGroupListDeserializer = new RawLogGroupListDeserializer();
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<RawLogGroupList> rawLogGroupListDataStream = env.addSource(
            new FlinkLogConsumer<RawLogGroupList>(project, logstore, rawLogGroupListDeserializer, configProps)
    );
    rawLogGroupListDataStream.addSink(new SinkFunction<RawLogGroupList>() {
        @Override
        public void invoke(RawLogGroupList logGroupList, Context context) throws Exception {
            for (RawLogGroup logGroup : logGroupList.getRawLogGroups()) {
                String topic = logGroup.getTopic();
                String source = logGroup.getSource();
                for (RawLog row : logGroup.getLogs()) {
                    // 處理日志
                }
            }
        }
    });
    說明

    Flink的子任務數量和日志服務Logstore中的Shard數量是獨立的,如果Shard數量多于子任務數量,每個子任務不重復的消費Shard,如果少于子任務數量,那么部分子任務就會空閑,直到新的Shard產生。

  2. 設置消費起始位置。

    Flink Log Consumer支持設置Shard的消費起始位置,通過設置屬性ConfigConstants.LOG_CONSUMER_BEGIN_POSITION,就可以定制從Shard的頭、尾或者某個特定時間開始消費。另外,Flink Log Connector也支持從某個具體的消費組中恢復消費。屬性的具體取值如下:

    • Consts.LOG_BEGIN_CURSOR:表示從Shard的頭開始消費,也就是從Shard中最舊的數據開始消費。

    • Consts.LOG_END_CURSOR:表示從Shard的尾部開始,也就是從Shard中最新的數據開始消費。

    • Consts.LOG_FROM_CHECKPOINT:表示從某個特定的消費組中保存的Checkpoint開始消費,通過ConfigConstants.LOG_CONSUMERGROUP指定具體的消費組。

    • UnixTimestamp:一個整型數值的字符串,用1970-01-01到現在的秒數表示,含義是消費Shard中這個時間點之后的數據。

    示例如下:

    configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_BEGIN_CURSOR);
    configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR);
    configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, "1512439000");
    configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_FROM_CHECKPOINT);
    說明

    如果在啟動Flink任務時,設置了從Flink自身的StateBackend中恢復,那么Flink Log Connector會忽略上面的配置,使用StateBackend中保存的Checkpoint。

  3. 可選:設置消費進度監控。

    Flink Log Consumer支持設置消費進度監控,獲取每一個Shard的實時消費位置,使用時間戳表示。更多信息,請參見步驟二:查看消費組狀態消費組監控與告警。

    示例如下:

    configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "your consumer group name");
    說明

    該項為可選配置項,設置后Flink Log Consumer會首先創建消費組,如果消費組已經存在,則不執行任何操作,Flink Log Consumer中的snapshot會自動同步到日志服務的消費組中,您可以通過日志服務的控制臺查看Flink Log Consumer的消費進度。

  4. 設置容災和exactly once語義支持。

    當打開Flink的Checkpointing功能時,Flink Log Consumer會周期性地將每個Shard的消費進度保存,當任務失敗時,Flink會恢復消費任務,并從保存的最新的Checkpoint開始消費。

    Checkpoint的周期定義了當任務失敗時,最多多少的數據會被回溯,即重新消費,使用代碼如下:

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 開啟Flink exactly once語義。
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    // 每5s保存一次Checkpoint。
    env.enableCheckpointing(5000);

    更多Flink Checkpoint的信息請參見Checkpoints

Flink Log Producer

Flink Log Producer用于將數據寫入日志服務。

說明

Flink Log Producer只支持Flink at least once語義,在任務失敗時,寫入日志服務中的數據有可能會重復,但不會丟失。

Flink Log Producer用到的日志服務API接口如下:

  • PutLogs

  • ListShards

  1. 初始化Flink Log Producer。

    1. 初始化配置參數Properties。

      Flink Log Producer初始化步驟與Flink Log Consumer類似。Flink Log Producer初始化配置包含以下參數,一般情況下使用默認值即可,如有需要可以自定義配置。示例如下:

      // 用于發送數據的I/O線程的數量,默認為核心數。
      ConfigConstants.IO_THREAD_NUM
      // 日志發送前被緩存的最大允許時間,默認為2000毫秒。
      ConfigConstants.FLUSH_INTERVAL_MS
      // 任務可以使用的內存總的大小,默認為100 MB。
      ConfigConstants.TOTAL_SIZE_IN_BYTES
      // 內存達到上限時,發送日志的最大阻塞時間,單位為毫秒,默認為 60s。
      ConfigConstants.MAX_BLOCK_TIME_MS
      // 最大重試次數,默認為 10 次。
      ConfigConstants.MAX_RETRIES
    2. 重載LogSerializationSchema,定義將數據序列化成RawLogGroup的方法。

      RawLogGroup是日志的集合,各字段含義請參見日志(Log)。

      如果您需要指定數據寫到某一個Shard中,可以使用LogPartitioner產生數據的HashKey,LogPartitioner為可選項,如果您沒有配置,數據會隨機寫入某一個Shard。

      示例如下:

      FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps);
      logProducer.setCustomPartitioner(new LogPartitioner<String>() {
            // 生成32位Hash值。
            public String getHashKey(String element) {
                try {
                    MessageDigest md = MessageDigest.getInstance("MD5");
                    md.update(element.getBytes());
                    String hash = new BigInteger(1, md.digest()).toString(16);
                    while(hash.length() < 32) hash = "0" + hash;
                    return hash;
                } catch (NoSuchAlgorithmException e) {
                }
                return  "0000000000000000000000000000000000000000000000000000000000000000";
            }
        });
  2. 將模擬產生的字符串寫入日志服務,示例如下:

    // 將數據序列化成日志服務的數據格式。
    class SimpleLogSerializer implements LogSerializationSchema<String> {
        public RawLogGroup serialize(String element) {
            RawLogGroup rlg = new RawLogGroup();
            RawLog rl = new RawLog();
            rl.setTime((int)(System.currentTimeMillis() / 1000));
            rl.addContent("message", element);
            rlg.addLog(rl);
            return rlg;
        }
    }
    public class ProducerSample {
        public static String sEndpoint = "cn-hangzhou.log.aliyuncs.com";
        //本示例從環境變量中獲取AccessKey ID和AccessKey Secret。
        public static String sAccessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        public static String sAccessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        public static String sProject = "ali-cn-hangzhou-sls-admin";
        public static String sLogstore = "test-flink-producer";
        private static final Logger LOG = LoggerFactory.getLogger(ConsumerSample.class);
        public static void main(String[] args) throws Exception {
            final ParameterTool params = ParameterTool.fromArgs(args);
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.getConfig().setGlobalJobParameters(params);
            env.setParallelism(3);
            DataStream<String> simpleStringStream = env.addSource(new EventsGenerator());
            Properties configProps = new Properties();
            // 設置訪問日志服務的域名。
            configProps.put(ConfigConstants.LOG_ENDPOINT, sEndpoint);
            // 設置用戶AK。
            configProps.put(ConfigConstants.LOG_ACCESSKEYID, sAccessKeyId);
            configProps.put(ConfigConstants.LOG_ACCESSKEY, sAccessKey);
            // 設置日志寫入的日志服務project。
            configProps.put(ConfigConstants.LOG_PROJECT, sProject);
            // 設置日志寫入的日志服務Logstore。
            configProps.put(ConfigConstants.LOG_LOGSTORE, sLogstore);
            FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps);
            simpleStringStream.addSink(logProducer);
            env.execute("flink log producer");
        }
        // 模擬產生日志。
        public static class EventsGenerator implements SourceFunction<String> {
            private boolean running = true;
            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                long seq = 0;
                while (running) {
                    Thread.sleep(10);
                    ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12));
                }
            }
            @Override
            public void cancel() {
                running = false;
            }
        }
    }

消費示例

本示例中,Flink Log Consumer將讀取到的數據以FastLogGroupList形式存儲到數據流中,接著使用flatMap函數將FastLogGroupList轉換為JSON字符串并輸出到命令行或寫入文本文件。

package com.aliyun.openservices.log.flink.sample;

import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.log.common.FastLog;
import com.aliyun.openservices.log.common.FastLogGroup;
import com.aliyun.openservices.log.flink.ConfigConstants;
import com.aliyun.openservices.log.flink.FlinkLogConsumer;
import com.aliyun.openservices.log.flink.data.FastLogGroupDeserializer;
import com.aliyun.openservices.log.flink.data.FastLogGroupList;
import com.aliyun.openservices.log.flink.model.CheckpointMode;
import com.aliyun.openservices.log.flink.util.Consts;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

public class FlinkConsumerSample {
    private static final String SLS_ENDPOINT = "your-endpoint";
    //本示例從環境變量中獲取AccessKey ID和AccessKey Secret。
    private static final String ACCESS_KEY_ID = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
    private static final String ACCESS_KEY_SECRET = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
    private static final String SLS_PROJECT = "your-project";
    private static final String SLS_LOGSTORE = "your-logstore";

    public static void main(String[] args) throws Exception {
        final ParameterTool params = ParameterTool.fromArgs(args);

        Configuration conf = new Configuration();
        // Checkpoint dir like "file:///tmp/flink"
        conf.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "your-checkpoint-dir");
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, conf);
        env.getConfig().setGlobalJobParameters(params);
        env.setParallelism(1);
        env.enableCheckpointing(5000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.setStateBackend(new FsStateBackend("file:///tmp/flinkstate"));
        Properties configProps = new Properties();
        configProps.put(ConfigConstants.LOG_ENDPOINT, SLS_ENDPOINT);
        configProps.put(ConfigConstants.LOG_ACCESSKEYID, ACCESS_KEY_ID);
        configProps.put(ConfigConstants.LOG_ACCESSKEY, ACCESS_KEY_SECRET);
        configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "10");
        configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_FROM_CHECKPOINT);
        configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "your-consumer-group");
        configProps.put(ConfigConstants.LOG_CHECKPOINT_MODE, CheckpointMode.ON_CHECKPOINTS.name());
        configProps.put(ConfigConstants.LOG_COMMIT_INTERVAL_MILLIS, "10000");

        FastLogGroupDeserializer deserializer = new FastLogGroupDeserializer();
        DataStream<FastLogGroupList> stream = env.addSource(
                new FlinkLogConsumer<>(SLS_PROJECT, SLS_LOGSTORE, deserializer, configProps));

        stream.flatMap((FlatMapFunction<FastLogGroupList, String>) (value, out) -> {
            for (FastLogGroup logGroup : value.getLogGroups()) {
                int logCount = logGroup.getLogsCount();
                for (int i = 0; i < logCount; i++) {
                    FastLog log = logGroup.getLogs(i);
                    JSONObject jsonObject = new JSONObject();
                    jsonObject.put("topic", logGroup.getTopic());
                    jsonObject.put("source", logGroup.getSource());
                    for (int j = 0; j < log.getContentsCount(); j++) {
                        jsonObject.put(log.getContents(j).getKey(), log.getContents(j).getValue());
                    }
                    out.collect(jsonObject.toJSONString());
                }
            }
        }).returns(String.class);

        stream.writeAsText("log-" + System.nanoTime());
        env.execute("Flink consumer");
    }
}