日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

全托管Flink DataStream作業

Hologres與Flink全托管高度兼容,多數情況下您可以使用Flink SQL的方式,聲明Hologres的源表、維表及結果表,進而使用SQL表達數據的處理邏輯。但對于特殊業務場景,Flink SQL方式無法滿足業務計算時,您需要使用DataStream的方式讀寫數據。本文以VVR-8.0.8-Flink-1.17版本為例,為您完整地展示如何調試和開發基于Hologres連接器的DataStream作業。

前提條件

  • 已購買Hologres實例并創建數據庫。詳情請參見創建數據庫

  • 已安裝代碼開發平臺,用于本地代碼調試,如IntelliJ IDEA。

步驟一:下載Connector依賴

通過DataStream的方式讀寫Hologres數據時,您需要下載Hologres連接器連接Flink全托管。目前已發布的連接器版本請參見Hologres DataStream連接器

  1. 您需要下載如下2個依賴JAR包:

    • ververica-connector-hologres-1.17-vvr-8.0.8.jar:用于本地調試。

    • ververica-connector-hologres-1.17-vvr-8.0.8-uber.jar:用于本地調試和線上部署。

      說明

      從VVR-6.0-Flink-1.15版本起,商業版Connector在本地調試時,需要配合相應版本的Uber JAR使用。使用方法請參見本地運行和調試包含連接器的作業

  2. 下載后,使用如下命令將ververica-connector-hologres-1.17-vvr-8.0.8.jar安裝至本地Maven倉庫中:

    mvn install:install-file -Dfile=$path/ververica-connector-hologres-1.17-vvr-8.0.8.jar -DgroupId=com.alibaba.ververica -DartifactId=ververica-connector-hologres -Dversion=1.17-vvr-8.0.8 -Dpackaging=jar

    其中$path為您本地存放ververica-connector-hologres-1.17-vvr-8.0.8.jar的絕對路徑。

步驟二:本地開發及調試

您需要在本地完成項目開發,再在Flink全托管控制臺上部署并運行。以Binlog源表為例,項目代碼及pom.xml文件如下:

  1. 本地代碼編寫:

    • DataStream API Demo代碼:

      import com.alibaba.ververica.connectors.hologres.binlog.HologresBinlogConfigs;
      import com.alibaba.ververica.connectors.hologres.binlog.StartupMode;
      import com.alibaba.ververica.connectors.hologres.binlog.source.HologresBinlogSource;
      import com.alibaba.ververica.connectors.hologres.config.HologresConnectionParam;
      import com.alibaba.ververica.connectors.hologres.config.JDBCOptions;
      import com.alibaba.ververica.connectors.hologres.utils.JDBCUtils;
      import org.apache.flink.api.common.eventtime.WatermarkStrategy;
      import org.apache.flink.configuration.Configuration;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.table.api.DataTypes;
      import org.apache.flink.table.api.TableSchema;
      
      import java.util.Collections;
      
      public class HologresBinlogSourceDemo {
      
          public static void main(String[] args) throws Exception {
              Configuration envConf = new Configuration();
              // 本地調試時,需要指定uber jar的絕對路徑;打包上傳時請注釋掉
              envConf.setString("pipeline.classpaths", "file://" + "<path_to_uber_jar>");
              final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(envConf);
              // 初始化讀取的表的Schema,需要和Hologres表的字段匹配,可以只定義部分字段。
              TableSchema schema = TableSchema.builder()
              .field("<id>", DataTypes.INT().notNull())
              .primaryKey("<id>")
              .build();
      
              // Hologres的相關參數。
              Configuration config = new Configuration();
              config.setString(HologresConfigs.ENDPOINT, "<yourEndpoint>");
              config.setString(HologresConfigs.USERNAME, "<yourUserName>");
              config.setString(HologresConfigs.PASSWORD, "<yourPassword>");
              config.setString(HologresConfigs.DATABASE, "<yourDatabaseName>");
              config.setString(HologresConfigs.TABLE, "<yourTableName>");
              config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
              config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
              // 構建JDBC Options。
              JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
      
              // 構建Hologres Binlog Source。
              long startTimeMs = 0;
              HologresBinlogSource source = new HologresBinlogSource(
                  new HologresConnectionParam(config),
                  schema,
                  config,
                  jdbcOptions,
                  startTimeMs,
                  StartupMode.INITIAL,
                  "",
                  "",
                  -1,
                  Collections.emptySet());
      
              env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
              env.execute();  
          }
      }

      參數說明:

      參數

      描述

      path_to_uber_jar

      本地Uber JAR的絕對路徑。對于Windows需要加相應磁盤分區,例如file:///D:/path/to/a-uber.jar

      id

      始化讀取的表的Schema,需要和Hologres表的字段匹配,可以只定義部分字段。

      yourEndpoint

      Hologres實例的網絡域名。您可以進入Hologres管理控制臺的實例詳情頁,從網絡信息中獲取域名。

      yourUserName

      阿里云賬號的AccessKey ID。您可以單擊AccessKey 管理,獲取AccessKey ID。

      yourPassword

      對應阿里云賬號的AccessKey Secret。

      yourDatabaseName

      Hologres數據庫名稱。

      yourTableName

      待讀取的Hologres表名稱。

    • pom.xml文件:

      <?xml version="1.0" encoding="UTF-8"?>
      <project xmlns="http://maven.apache.org/POM/4.0.0"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
          <modelVersion>4.0.0</modelVersion>
      
          <groupId>com.alibaba.hologres</groupId>
          <artifactId>hologres-flink-demo</artifactId>
          <version>1.0-SNAPSHOT</version>
          <packaging>jar</packaging>
      
          <properties>
              <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
              <flink.version>1.17.2</flink.version>
              <vvr.version>1.17-vvr-8.0.8</vvr.version>
              <target.java.version>1.8</target.java.version>
              <scala.binary.version>2.12</scala.binary.version>
              <maven.compiler.source>${target.java.version}</maven.compiler.source>
              <maven.compiler.target>${target.java.version}</maven.compiler.target>
              <log4j.version>1.7.21</log4j.version>
          </properties>
      
          <dependencies>
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-java</artifactId>
                  <version>${flink.version}</version>
                  <scope>provided</scope>
              </dependency>
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-streaming-java</artifactId>
                  <version>${flink.version}</version>
                  <scope>provided</scope>
              </dependency>
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-table-common</artifactId>
                  <version>${flink.version}</version>
                  <scope>provided</scope>
              </dependency>
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-table-runtime</artifactId>
                  <version>${flink.version}</version>
                  <scope>provided</scope>
              </dependency>
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-connector-base</artifactId>
                  <version>${flink.version}</version>
                  <scope>provided</scope>
              </dependency>
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-clients</artifactId>
                  <version>${flink.version}</version>
                  <scope>provided</scope>
              </dependency>
              <dependency>
                  <groupId>com.alibaba.ververica</groupId>
                  <artifactId>ververica-connector-hologres</artifactId>
                  <version>${vvr.version}</version>
              </dependency>
              <!--  日志實現  log4j  依賴  -->
              <dependency>
                  <groupId>org.slf4j</groupId>
                  <artifactId>slf4j-api</artifactId>
                  <version>${log4j.version}</version>
              </dependency>
              <dependency>
                  <groupId>org.slf4j</groupId>
                  <artifactId>slf4j-log4j12</artifactId>
                  <version>${log4j.version}</version>
              </dependency>
          </dependencies>
          <build>
              <plugins>
                  <plugin>
                      <groupId>org.apache.maven.plugins</groupId>
                      <artifactId>maven-shade-plugin</artifactId>
                      <version>3.1.0</version>
                      <executions>
                          <execution>
                              <phase>package</phase>
      
                              <goals>
                                  <goal>shade</goal>
                              </goals>
                              <configuration>
                                  <createDependencyReducedPom>false</createDependencyReducedPom>
                                  <shadedArtifactAttached>true</shadedArtifactAttached>
                                  <shadedClassifierName>jar-with-dependencies</shadedClassifierName>
                                  <transformers>
                                      <transformer
                                              implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                  </transformers>
                              </configuration>
                          </execution>
                      </executions>
                  </plugin>
              </plugins>
          </build>
      
      </project>
  2. 本地調試及運行。

    • 您需要配置運行所需要的ClassLoader JAR包,即ververica-classloader-1.15-vvr-6.0-SNAPSHOT.jar。具體操作請參見步驟二:配置運行所需要的ClassLoader JAR包

    • (可選)如果提示缺少一些常見的Flink類無法執行,例如org.apache.flink.configuration.Configuration,需要在“Modify options”處勾選“Add dependencies with provided scope to classpath”。

    配置完成后,您可在本地調試運行該項目,確保本地可運行成功。

本地調試步驟詳情請參見本地運行和調試包含連接器的作業

步驟三:打包運行

本地調試成功后,您可將其進行打包并與Uber JAR一同上傳至Flink。

  1. 打包前,注釋掉下述代碼:

    envConf.setString("pipeline.classpaths", "file://" + "<path_to_uber_jar>");
  2. 編譯打包。

    使用Maven編譯并打包應用程序及其依賴項。命令如下:

    mvn clean package -DskipTests

    打包成功后,即可在本地生成名為hologres-flink-demo-1.0-SNAPSHOT-jar-with-dependencies.jar的文件。

  3. 上傳JAR包。

    在Flink控制臺的資源管理頁面上傳打包好的程序JAR包和ververica-connector-hologres-1.17-vvr-8.0.8-uber.jar。具體操作請參見步驟二:上傳測試JAR包和數據文件image

  4. 部署JAR作業。

    在Flink控制臺的作業運維頁面部署JAR作業。具體操作及參數信息請參見步驟三:部署JAR作業image

  5. 啟動并查看Flink計算結果。

    1. 在Flink控制臺的作業運維頁面,單擊目標作業名稱操作列中的啟動

    2. 配置資源信息和基礎設置。

      作業啟動參數配置詳情請參見作業啟動

    3. 單擊啟動

      單擊啟動后,作業狀態變為運行中,則代表作業運行正常。

常見問題

  • 問題1:當您在IntelliJ IDEA中運行和調試Flink作業時,如果其包含了阿里云實時計算Flink版的商業版連接器依賴,可能會遇到無法找到連接器相關類的運行錯誤。例如Caused by: java.lang.ClassNotFoundException: com.alibaba.ververica.connectors.hologres.binlog.source.reader.HologresBinlogRecordEmitter

  • 問題2:提示缺少一些常見的Flink類無法執行,例如Caused by: java.lang.ClassNotFoundException: org.apache.flink.configuration.Configuration

    • 問題原因:可能是缺少依賴或者沒有正常加載依賴。

    • 解決方法:

      • pom.xml文件中沒有引入相關依賴,大多數情況下可能是flink-connector-base,也可以搜索異常包路徑,查看其屬于哪個Flink依賴。

      • 可能是運行時沒有加載provided依賴。需要在IntelliJ IDEA的“Modify options”處勾選“Add dependencies with provided scope to classpath”。

  • 問題3:運行中報錯Incompatible magic value

    • 問題原因:

      • 原因一:可能是使用的Uber JAR與Connector版本不一致。

      • 原因二:可能是ClassLoader設置有誤。

    • 解決方法:

  • 問題4:運行時拋出異常Unable to load flink-decryption library java.io.FileNotFoundException: Decryption library native/windows/x86/flink-decryption.dll not found

    • 問題原因:目前Uber JAR不支持Windows系統32位的Java。

    • 解決方法:請安裝64位的Java,可以通過java -version命令查看Java安裝信息,如果不包含64-Bit字樣,表明是32位的Java。

  • 問題5:運行時拋出Caused by: java.lang.ClassFormatError

    • 問題原因:可能是由于IntelliJ IDEA配置的JDK版本問題導致。

    • 解決方法:請使用較新的JDK8或者JDK11版本。