JAR作業(yè)開發(fā)
Flink DataStream提供了更靈活的編程模型和API,可以自定義各種數(shù)據(jù)轉(zhuǎn)換、操作和算子,適用于復(fù)雜的業(yè)務(wù)邏輯和數(shù)據(jù)處理需求。本文為您介紹Flink JAR作業(yè)的開發(fā)方法。
支持開源Apache Flink
目前實時計算Flink支持的DataStream API完全兼容開源的Flink版本,詳情請參見Apache Flink介紹和Flink DataStream API開發(fā)指南。
開發(fā)環(huán)境要求
已安裝IntelliJ IDEA等開發(fā)工具。
已安裝3.6.3及以上版本的Maven。
作業(yè)開發(fā)僅支持JDK 8和JDK 11版本。
JAR作業(yè)需要您在線下完成開發(fā),再在實時計算管理控制臺上部署并運行。
開發(fā)準備
本樣例涉及關(guān)于數(shù)據(jù)源連接器如何使用,請準備好相關(guān)數(shù)據(jù)源。
實時計算Flink版默認不具備訪問公網(wǎng)的能力,所以本案例采用的數(shù)據(jù)源為阿里云消息隊列Kafka(2.6.2)和阿里云RDS MySQL(8.0)。
如您已經(jīng)購買了上述產(chǎn)品,請確保其與您購買的實時計算Flink版處于同一VPC中,如果不是,請參考如何訪問跨VPC的其他服務(wù)?
如您有自建的數(shù)據(jù)源需要使用,請確認實時計算Flink版能訪問該數(shù)據(jù)源,請參考實時計算Flink版如何訪問公網(wǎng)?和如何設(shè)置白名單?
如您還沒有云消息隊列Kafka數(shù)據(jù)源,請購買云消息隊列Kafka并部署實例,詳情請參見步驟二:購買和部署實例。部署實例時,請確認部署至與實時計算Flink同一VPC下。
如您還沒有RDS MySQL數(shù)據(jù)源,請購買RDS MySQL并部署實例,詳情請參見第一步:快捷創(chuàng)建RDS MySQL實例與配置數(shù)據(jù)庫。部署實例時,請確認部署至與實時計算Flink同一VPC下。
作業(yè)開發(fā)
Maven環(huán)境配置(可選)
配置Maven的setting.xml文件。如果您在后續(xù)的操作中,對Maven中央倉庫的訪問存在無法拉取或速率較慢的情況,可以更換為阿里云鏡像倉庫。
<mirror>
<id>aliyunmaven</id>
<mirrorOf>central</mirrorOf>
<name>Aliyun Maven</name>
<url>https://maven.aliyun.com/repository/public</url>
</mirror>
配置Flink環(huán)境依賴
為了避免JAR包依賴沖突,請您注意以下幾點:
${flink.version}
為作業(yè)運行對應(yīng)的Flink版本。請使用與作業(yè)部署頁面選擇的VVR引擎所使用的Flink版本一致。例如您在部署頁面選擇的引擎為vvr-8.0.9-flink-1.17
,其對應(yīng)的Flink版本為1.17.2
,查看VVR引擎版本詳情請參見操作指導(dǎo)。Flink相關(guān)依賴,作用域請使用provided,即在依賴中添加
<scope>provided</scope>
。主要包含org.apache.flink
組下以flink-
開頭的非Connector依賴。Flink源代碼中只有明確標(biāo)注了@Public或者@PublicEvolving的才是公開供用戶調(diào)用的方法,阿里云實時計算Flink版只對這些方法的兼容性做出產(chǎn)品保證。
如果是Flink服務(wù)內(nèi)置的Connector支持的DataStream API,建議使用其內(nèi)置的依賴。
下面是Flink的一些基本相關(guān)依賴,您可能還需要補充一些日志文件相關(guān)的依賴,完整的依賴參考請參見文末的完整示例代碼。
flink相關(guān)依賴
<!-- Apache Flink 依賴項 -->
<!-- 之所以提供這些依賴項,是因為它們不應(yīng)該打包到JAR文件中。 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
連接器依賴和使用
通過DataStream的方式讀寫數(shù)據(jù),需要使用對應(yīng)的DataStream連接器連接實時計算Flink版。Maven中央倉庫已經(jīng)放置了VVR DataStream連接器,以供您在作業(yè)開發(fā)時直接使用。
請使用我們在支持的連接器中指明提供DataStream API的連接器。如果某個連接器未注明提供給DataStream API,請勿自行使用,因為未來接口和參數(shù)可能會被修改。
您可以選擇以下任意一種方式來使用連接器:
(推薦)上傳連接器Uber JAR包作為附加依賴文件引入
在作業(yè)的Maven POM文件中添加您需要的連接器作為項目依賴,其作用域為provided。完整的依賴文件請參考文末的完整示例代碼。
說明${vvr.version}
是作業(yè)運行環(huán)境引擎版本,如您的作業(yè)運行在vvr-8.0.9-flink-1.17
版本引擎上,其對應(yīng)的Flink版本為1.17.2
。建議您使用最新的引擎,具體版本詳見引擎。由于將連接器的Uber JAR包作為附加依賴文件引入,則無需將該依賴打入JAR包中,所以需要聲明作用域為
provided
。
<!-- Kafka 連接器依賴 --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-kafka</artifactId> <version>${vvr.version}</version> <scope>provided</scope> </dependency> <!-- MySQL 連接器依賴 --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-mysql</artifactId> <version>${vvr.version}</version> <scope>provided</scope> </dependency>
如果您有開發(fā)新連接器或者拓展現(xiàn)有連接器功能的需求,項目還需要依賴連接器公共包
flink-connector-base
或ververica-connector-common
。<!-- Flink 連接器公共接口基礎(chǔ)依賴 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> </dependency> <!-- 阿里云連接器公共接口基礎(chǔ)依賴 --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-common</artifactId> <version>${vvr.version}</version> </dependency>
DataStream連接配置信息和代碼示例需要查看對應(yīng)的DataStream連接器文檔。
支持作為DataStream類型的連接器列表,請參見支持的連接器。
部署作業(yè)并在附加依賴文件項中添加相應(yīng)的連接器Uber JAR包,詳情請參見部署JAR作業(yè)。您可以上傳您自己開發(fā)的連接器,也可以上傳實時計算Flink版提供的連接器(下載地址請參見Connector列表)。如圖所示。
直接將連接器作為項目依賴打進作業(yè)JAR包
在作業(yè)的Maven POM文件中添加您需要的連接器作為項目依賴。例如引入Kafka連接器和MySQL連接器。
說明${vvr.version}
是作業(yè)運行環(huán)境引擎版本,如您的作業(yè)運行在vvr-8.0.9-flink-1.17
版本引擎上,其對應(yīng)的Flink版本為1.17.2
。建議您使用最新的引擎,具體版本詳見引擎。由于將連接器作為項目依賴直接打入JAR包,它們必須在默認作用域(compile)中。
<!-- Kafka 連接器依賴 --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-kafka</artifactId> <version>${vvr.version}</version> </dependency> <!-- MySQL 連接器依賴 --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-mysql</artifactId> <version>${vvr.version}</version> </dependency>
如果您有開發(fā)新連接器或者拓展現(xiàn)有連接器功能的需求,項目還需要依賴連接器公共包
flink-connector-base
或ververica-connector-common
。<!-- Flink 連接器公共接口基礎(chǔ)依賴 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> </dependency> <!-- 阿里云連接器公共接口基礎(chǔ)依賴 --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-common</artifactId> <version>${vvr.version}</version> </dependency>
DataStream連接配置信息和代碼示例需要查看對應(yīng)的DataStream連接器文檔。
支持作為DataStream類型的連接器列表,請參見支持的連接器。
OSS附加依賴文件讀取
因為Flink JAR作業(yè)不支持在Main函數(shù)中讀取本地配置,您可以將配置文件上傳到Flink工作空間下的OSS Bucket,在部署JAR作業(yè)時,通過添加附加配置文件的方式進行讀取。示例如下。
創(chuàng)建配置文件config.properties,避免在代碼中出現(xiàn)明文代碼。
# Kafka bootstrapServers=host1:9092,host2:9092,host3:9092 inputTopic=topic groupId=groupId # MySQL database.url=jdbc:mysql://localhost:3306/my_database database.username=username database.password=password
在JAR作業(yè)中使用代碼讀取存儲在OSS Bucket上的配置文件config.properties。
方式一:讀取工作空間綁定的OSS Bucket
實時計算開發(fā)控制臺左側(cè)導(dǎo)航欄資源管理頁面,上傳該文件。
在作業(yè)運行時,部署作業(yè)所添加附加依賴文件將會加載到作業(yè)所運行Pod的/flink/usrlib目錄下。
讀取該配置文件代碼示例如下。
Properties properties = new Properties(); Map<String,String> configMap = new HashMap<>(); try (InputStream input = new FileInputStream("/flink/usrlib/config.properties")) { // 加載屬性文件 properties.load(input); // 獲取屬性值 configMap.put("bootstrapServers",properties.getProperty("bootstrapServers")) ; configMap.put("inputTopic",properties.getProperty("inputTopic")); configMap.put("groupId",properties.getProperty("groupId")); configMap.put("url",properties.getProperty("database.url")) ; configMap.put("username",properties.getProperty("database.username")); configMap.put("password",properties.getProperty("database.password")); } catch (IOException ex) { ex.printStackTrace(); }
方式二:讀取工作空間有權(quán)限訪問的OSS Bucket
將配置文件上傳目標(biāo)OSS Bucket。
通過OSSClient直接讀取OSS上的存儲文件詳情,請參見流式傳輸和管理訪問憑據(jù)。代碼示例如下。
OSS ossClient = new OSSClientBuilder().build("Endpoint", "AccessKeyId", "AccessKeySecret"); try (OSSObject ossObject = ossClient.getObject("examplebucket", "exampledir/config.properties"); BufferedReader reader = new BufferedReader(new InputStreamReader(ossObject.getObjectContent()))) { // read file and process ... } finally { if (ossClient != null) { ossClient.shutdown(); } }
業(yè)務(wù)代碼編寫
將外部數(shù)據(jù)源集成到Flink數(shù)據(jù)流程序。
Watermark
是Flink一種基于時間語義的計算策略,往往伴隨著時間戳一起使用,所以本示例不使用水印策略。詳情請參考水印策略。// 將外部數(shù)據(jù)源集成到flink數(shù)據(jù)流程序 // WatermarkStrategy.noWatermarks() 指沒有使用水印策略 DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka Source");
算子轉(zhuǎn)換處理。示例中將
DataStream<String>
轉(zhuǎn)換成DataStream<Student>
,更多復(fù)雜的算子轉(zhuǎn)化和處理方式請參考Flink算子。// 轉(zhuǎn)換數(shù)據(jù)結(jié)構(gòu)為student的算子 DataStream<student> source = stream .map(new MapFunction<String, student>() { @Override public student map(String s) throws Exception { // 數(shù)據(jù)由逗號分隔 String[] data = s.split(","); return new student(Integer.parseInt(data[0]), data[1], Integer.parseInt(data[2])); } }).filter(student -> student.score >=60); // 篩選出分數(shù)大于60分的數(shù)據(jù)
作業(yè)打包
通過maven-shade-plugin插件打包。
如果選擇作為附加依賴文件引入使用連接器,打包作業(yè)時,確認連接器相關(guān)依賴的作用域為
provided
。如果選擇連接器作為依賴一起打包,作用域默認(compile)即可。
<build>
<plugins>
<!-- Java 編譯器-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<!-- 我們使用maven-shade-plugin創(chuàng)建一個包含所有必須依賴的fat jar -->
<!-- 修改<mainClass>的值.如果您的程序入口點發(fā)生了改變 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<!-- 去掉一些不必要的依賴性 -->
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- 不要復(fù)制META-INF文件夾中的簽名。否則,這可能會在使用JAR文件時導(dǎo)致安全異常 -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.aliyun.FlinkDemo</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
作業(yè)測試及部署
由于實時計算Flink版默認不具備訪問公網(wǎng)的能力,可能您的代碼無法在本地進行直接測試。建議您分開進行單元測試,詳情請參見本地運行和調(diào)試包含連接器的作業(yè)。
JAR作業(yè)部署請參見部署JAR作業(yè)。
說明部署時,如果選擇方式一使用連接器打包的作業(yè),切記需要上傳添加連接器相關(guān)的Uber JAR包。
如果需要讀取配置文件,也需要在附加依賴文件中上傳添加。
完整示例代碼
本示例代碼中,將Kafka數(shù)據(jù)源的數(shù)據(jù)進行處理后寫入MySQL。此示例僅供參考,更多的代碼風(fēng)格和質(zhì)量指南請參見代碼風(fēng)格和質(zhì)量指南。
FlinkDemo.java
package com.aliyun;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class FlinkDemo {
// 定義數(shù)據(jù)結(jié)構(gòu)
public static class Student {
public int id;
public String name;
public int score;
public Student(int id, String name, int score) {
this.id = id;
this.name = name;
this.score = score;
}
}
public static void main(String[] args) throws Exception {
// 創(chuàng)建Flink執(zhí)行環(huán)境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
Map<String,String> configMap = new HashMap<>();
try (InputStream input = new FileInputStream("/flink/usrlib/config.properties")) {
// 加載屬性文件
properties.load(input);
// 獲取屬性值
configMap.put("bootstrapServers",properties.getProperty("bootstrapServers")) ;
configMap.put("inputTopic",properties.getProperty("inputTopic"));
configMap.put("groupId",properties.getProperty("groupId"));
configMap.put("url",properties.getProperty("database.url")) ;
configMap.put("username",properties.getProperty("database.username"));
configMap.put("password",properties.getProperty("database.password"));
} catch (IOException ex) {
ex.printStackTrace();
}
// Build Kafka source
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers(configMap.get("bootstrapServers"))
.setTopics(configMap.get("inputTopic"))
.setStartingOffsets(OffsetsInitializer.latest())
.setGroupId(configMap.get("groupId"))
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
// 將外部數(shù)據(jù)源集成到flink數(shù)據(jù)流程序
// WatermarkStrategy.noWatermarks() 指沒有使用水印策略
DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka Source");
// 篩選出分數(shù)大于60分的數(shù)據(jù)
DataStream<Student> source = stream
.map(new MapFunction<String, Student>() {
@Override
public Student map(String s) throws Exception {
String[] data = s.split(",");
return new Student(Integer.parseInt(data[0]), data[1], Integer.parseInt(data[2]));
}
}).filter(Student -> Student.score >=60);
source.addSink(JdbcSink.sink("INSERT IGNORE INTO student (id, username, score) VALUES (?, ?, ?)",
new JdbcStatementBuilder<Student>() {
public void accept(PreparedStatement ps, Student data) {
try {
ps.setInt(1, data.id);
ps.setString(2, data.name);
ps.setInt(3, data.score);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
},
new JdbcExecutionOptions.Builder()
.withBatchSize(5) // 每次批量寫入的記錄數(shù)
.withBatchIntervalMs(2000) // 重試時的最大延遲時間(毫秒)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(configMap.get("url"))
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUsername(configMap.get("username"))
.withPassword(configMap.get("password"))
.build()
)).name("Sink MySQL");
env.execute("Flink Demo");
}
}
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.aliyun</groupId>
<artifactId>FlinkDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<name>FlinkDemo</name>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.17.1</flink.version>
<vvr.version>1.17-vvr-8.0.4-1</vvr.version>
<target.java.version>1.8</target.java.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.14.1</log4j.version>
</properties>
<dependencies>
<!-- Apache Flink 依賴項 -->
<!-- 之所以提供這些依賴項,是因為它們不應(yīng)該打包到JAR文件中。 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- 在這里添加連接器依賴項。它們必須在默認作用域(compile)中。 -->
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-kafka</artifactId>
<version>${vvr.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-mysql</artifactId>
<version>${vvr.version}</version>
</dependency>
<!-- 添加日志框架,以便在運行時生成控制臺輸出 -->
<!-- 默認情況下,這些依賴項從應(yīng)用程序JAR中排除 -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java 編譯器-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<!-- 我們使用maven-shade-plugin創(chuàng)建一個包含所有必須依賴的 fat jar -->
<!-- 修改<mainClass>的值.如果您的程序入口點發(fā)生了改變 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<!-- 去掉一些不必要的依賴性 -->
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- 不要復(fù)制META-INF文件夾中的簽名。否則,這可能會在使用JAR文件時導(dǎo)致安全異常 -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.aliyun.FlinkDemo</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
相關(guān)文檔
支持作為DataStream類型的連接器列表,請參見支持的連接器。
Flink JAR作業(yè)的完整開發(fā)流程示例,請參見Flink JAR作業(yè)快速入門。
實時計算Flink版還支持運行SQL和Python作業(yè),開發(fā)方法請參見SQL作業(yè)開發(fā)和Python作業(yè)開發(fā)。