Hologres與Flink全托管高度兼容,多數情況下您可以使用Flink SQL的方式,聲明Hologres的源表、維表及結果表,進而使用SQL表達數據的處理邏輯。但對于特殊業務場景,Flink SQL方式無法滿足業務計算時,您需要使用DataStream的方式讀寫數據。本文以VVR-8.0.8-Flink-1.17版本為例,為您完整地展示如何調試和開發基于Hologres連接器的DataStream作業。
前提條件
已購買Hologres實例并創建數據庫。詳情請參見創建數據庫。
已安裝代碼開發平臺,用于本地代碼調試,如IntelliJ IDEA。
步驟一:下載Connector依賴
通過DataStream的方式讀寫Hologres數據時,您需要下載Hologres連接器連接Flink全托管。目前已發布的連接器版本請參見Hologres DataStream連接器。
您需要下載如下2個依賴JAR包:
ververica-connector-hologres-1.17-vvr-8.0.8.jar:用于本地調試。
ververica-connector-hologres-1.17-vvr-8.0.8-uber.jar:用于本地調試和線上部署。
說明從VVR-6.0-Flink-1.15版本起,商業版Connector在本地調試時,需要配合相應版本的Uber JAR使用。使用方法請參見本地運行和調試包含連接器的作業。
下載后,使用如下命令將ververica-connector-hologres-1.17-vvr-8.0.8.jar安裝至本地Maven倉庫中:
mvn install:install-file -Dfile=$path/ververica-connector-hologres-1.17-vvr-8.0.8.jar -DgroupId=com.alibaba.ververica -DartifactId=ververica-connector-hologres -Dversion=1.17-vvr-8.0.8 -Dpackaging=jar
其中
$path
為您本地存放ververica-connector-hologres-1.17-vvr-8.0.8.jar的絕對路徑。
步驟二:本地開發及調試
您需要在本地完成項目開發,再在Flink全托管控制臺上部署并運行。以Binlog源表為例,項目代碼及pom.xml文件如下:
本地代碼編寫:
DataStream API Demo代碼:
import com.alibaba.ververica.connectors.hologres.binlog.HologresBinlogConfigs; import com.alibaba.ververica.connectors.hologres.binlog.StartupMode; import com.alibaba.ververica.connectors.hologres.binlog.source.HologresBinlogSource; import com.alibaba.ververica.connectors.hologres.config.HologresConnectionParam; import com.alibaba.ververica.connectors.hologres.config.JDBCOptions; import com.alibaba.ververica.connectors.hologres.utils.JDBCUtils; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import java.util.Collections; public class HologresBinlogSourceDemo { public static void main(String[] args) throws Exception { Configuration envConf = new Configuration(); // 本地調試時,需要指定uber jar的絕對路徑;打包上傳時請注釋掉 envConf.setString("pipeline.classpaths", "file://" + "<path_to_uber_jar>"); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(envConf); // 初始化讀取的表的Schema,需要和Hologres表的字段匹配,可以只定義部分字段。 TableSchema schema = TableSchema.builder() .field("<id>", DataTypes.INT().notNull()) .primaryKey("<id>") .build(); // Hologres的相關參數。 Configuration config = new Configuration(); config.setString(HologresConfigs.ENDPOINT, "<yourEndpoint>"); config.setString(HologresConfigs.USERNAME, "<yourUserName>"); config.setString(HologresConfigs.PASSWORD, "<yourPassword>"); config.setString(HologresConfigs.DATABASE, "<yourDatabaseName>"); config.setString(HologresConfigs.TABLE, "<yourTableName>"); config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true); config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true); // 構建JDBC Options。 JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config); // 構建Hologres Binlog Source。 long startTimeMs = 0; HologresBinlogSource source = new HologresBinlogSource( new HologresConnectionParam(config), schema, config, jdbcOptions, startTimeMs, StartupMode.INITIAL, "", "", -1, Collections.emptySet()); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print(); env.execute(); } }
參數說明:
參數
描述
path_to_uber_jar
本地Uber JAR的絕對路徑。對于Windows需要加相應磁盤分區,例如
file:///D:/path/to/a-uber.jar
。id
始化讀取的表的Schema,需要和Hologres表的字段匹配,可以只定義部分字段。
yourEndpoint
Hologres實例的網絡域名。您可以進入Hologres管理控制臺的實例詳情頁,從網絡信息中獲取域名。
yourUserName
阿里云賬號的AccessKey ID。您可以單擊AccessKey 管理,獲取AccessKey ID。
yourPassword
對應阿里云賬號的AccessKey Secret。
yourDatabaseName
Hologres數據庫名稱。
yourTableName
待讀取的Hologres表名稱。
pom.xml文件:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.alibaba.hologres</groupId> <artifactId>hologres-flink-demo</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.17.2</flink.version> <vvr.version>1.17-vvr-8.0.8</vvr.version> <target.java.version>1.8</target.java.version> <scala.binary.version>2.12</scala.binary.version> <maven.compiler.source>${target.java.version}</maven.compiler.source> <maven.compiler.target>${target.java.version}</maven.compiler.target> <log4j.version>1.7.21</log4j.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-hologres</artifactId> <version>${vvr.version}</version> </dependency> <!-- 日志實現 log4j 依賴 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${log4j.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${log4j.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.1.0</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <createDependencyReducedPom>false</createDependencyReducedPom> <shadedArtifactAttached>true</shadedArtifactAttached> <shadedClassifierName>jar-with-dependencies</shadedClassifierName> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
本地調試及運行。
您需要配置運行所需要的ClassLoader JAR包,即ververica-classloader-1.15-vvr-6.0-SNAPSHOT.jar。具體操作請參見步驟二:配置運行所需要的ClassLoader JAR包。
(可選)如果提示缺少一些常見的Flink類無法執行,例如
org.apache.flink.configuration.Configuration
,需要在“Modify options”處勾選“Add dependencies with provided scope to classpath”。
配置完成后,您可在本地調試運行該項目,確保本地可運行成功。
本地調試步驟詳情請參見本地運行和調試包含連接器的作業。
步驟三:打包運行
本地調試成功后,您可將其進行打包并與Uber JAR一同上傳至Flink。
打包前,注釋掉下述代碼:
envConf.setString("pipeline.classpaths", "file://" + "<path_to_uber_jar>");
編譯打包。
使用Maven編譯并打包應用程序及其依賴項。命令如下:
mvn clean package -DskipTests
打包成功后,即可在本地生成名為hologres-flink-demo-1.0-SNAPSHOT-jar-with-dependencies.jar的文件。
上傳JAR包。
在Flink控制臺的資源管理頁面上傳打包好的程序JAR包和ververica-connector-hologres-1.17-vvr-8.0.8-uber.jar。具體操作請參見步驟二:上傳測試JAR包和數據文件。
部署JAR作業。
在Flink控制臺的作業運維頁面部署JAR作業。具體操作及參數信息請參見步驟三:部署JAR作業。
啟動并查看Flink計算結果。
在Flink控制臺的作業運維頁面,單擊目標作業名稱操作列中的啟動。
配置資源信息和基礎設置。
作業啟動參數配置詳情請參見作業啟動。
單擊啟動。
單擊啟動后,作業狀態變為運行中,則代表作業運行正常。
常見問題
問題1:當您在IntelliJ IDEA中運行和調試Flink作業時,如果其包含了阿里云實時計算Flink版的商業版連接器依賴,可能會遇到無法找到連接器相關類的運行錯誤。例如
Caused by: java.lang.ClassNotFoundException: com.alibaba.ververica.connectors.hologres.binlog.source.reader.HologresBinlogRecordEmitter
。問題原因:此類異常一般是由于本地調試沒有正確使用uber jar導致的
解決方法:請參考本文或者本地運行和調試包含連接器的作業正確使用Uber JAR進行調試。
問題2:提示缺少一些常見的Flink類無法執行,例如
Caused by: java.lang.ClassNotFoundException: org.apache.flink.configuration.Configuration
。問題原因:可能是缺少依賴或者沒有正常加載依賴。
解決方法:
pom.xml文件中沒有引入相關依賴,大多數情況下可能是flink-connector-base,也可以搜索異常包路徑,查看其屬于哪個Flink依賴。
可能是運行時沒有加載provided依賴。需要在IntelliJ IDEA的“Modify options”處勾選“Add dependencies with provided scope to classpath”。
問題3:運行中報錯
Incompatible magic value
。問題原因:
原因一:可能是使用的Uber JAR與Connector版本不一致。
原因二:可能是ClassLoader設置有誤。
解決方法:
對于原因一:可參考本文選擇對應版本的Connector和Uber JAR。
對于原因二:請參考配置運行所需要的ClassLoader JAR包重新設置。
問題4:運行時拋出異常
Unable to load flink-decryption library java.io.FileNotFoundException: Decryption library native/windows/x86/flink-decryption.dll not found
。問題原因:目前Uber JAR不支持Windows系統32位的Java。
解決方法:請安裝64位的Java,可以通過
java -version
命令查看Java安裝信息,如果不包含64-Bit
字樣,表明是32位的Java。
問題5:運行時拋出
Caused by: java.lang.ClassFormatError
。問題原因:可能是由于IntelliJ IDEA配置的JDK版本問題導致。
解決方法:請使用較新的JDK8或者JDK11版本。