日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

使用JDBC Connector導(dǎo)入

更新時(shí)間:

當(dāng)您需要將Flink處理后的數(shù)據(jù)導(dǎo)入ClickHouse時(shí),本文為您提供了使用ClickHouse JDBC Connector的方法,使不同版本的Flink處理后的數(shù)據(jù)寫入到ClickHouse中。

背景信息

Flink在1.11.0版本對(duì)其JDBC Connector進(jìn)行了一次較大的重構(gòu):

  • 重構(gòu)之前(1.10.1及之前版本),包名為flink-jdbc 。

  • 重構(gòu)之后(1.11.0及之后版本),包名為flink-connector-jdbc 。

二者對(duì)Flink中以不同方式寫入ClickHouse Sink的支持情況如下:

API名稱

flink-jdbc

flink-connector-jdbc

DataStream

不支持

支持

Table API (Legacy)

支持

不支持

Table API (DDL)

不支持

不支持

flink-connector-jdbc完全移除了對(duì)Table API (Legacy) 的支持,只能通過(guò)DDL的方式調(diào)用Table API。但是,Table DDL方式硬編碼了其所支持的JDBC Driver,不支持ClickHouse。本文分別以基于Flink 1.10.1 + flink-jdbc和Flink 1.11.0 + flink-connector-jdbc 為例,介紹Flink寫入ClickHouse的方法。

Flink 1.10.1 + flink-jdbc

Flink 1.10.1及之前版本需要采用flink-jdbc+Table API的方式寫入數(shù)據(jù)到ClickHouse。本節(jié)我們使用Maven及Flink 1.10.1版本為例。

  1. mvn archetype:generate命令創(chuàng)建項(xiàng)目,生成過(guò)程中根據(jù)提示輸入group-id和artifact-id等。

    $ mvn archetype:generate \
          -DarchetypeGroupId=org.apache.flink \
          -DarchetypeArtifactId=flink-quickstart-scala \
          -DarchetypeVersion=1.10.1
  2. 編輯pom.xml中的<dependencies />小節(jié)添加依賴。

            <!--//添加Flink Table API相關(guān)的依賴 -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-common</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
    
    
            <!--//添加Flink JDBC以及Clickhouse JDBC Driver相關(guān)的依賴 -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-jdbc_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>ru.yandex.clickhouse</groupId>
                <artifactId>clickhouse-jdbc</artifactId>
                <version>0.2.4</version>
            </dependency>
                            
  3. 創(chuàng)建數(shù)據(jù)寫入程序文件。

    示例程序使用CsvTableSource讀入 CSV 文件產(chǎn)生Table Source,使用JDBCAppendTableSink將數(shù)據(jù)寫入到ClickHouse Sink中。

    說(shuō)明
    • 由于ClickHouse單次插入的延遲比較高,我們需要設(shè)置BatchSize來(lái)批量插入數(shù)據(jù),提高性能。

    • 在JDBCAppendTableSink的實(shí)現(xiàn)中,若最后一批數(shù)據(jù)的數(shù)目不足BatchSize,則不會(huì)插入剩余數(shù)據(jù)。

    package org.myorg.example
    
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.sources._
    import org.apache.flink.table.api.scala.StreamTableEnvironment
    import org.apache.flink.table.api._
    import org.apache.flink.types.Row
    import org.apache.flink.table.api.{
      TableEnvironment,
      TableSchema,
      Types,
      ValidationException
    }
    import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink
    import org.apache.flink.api.common.typeinfo.TypeInformation
    
    object StreamingJob {
      def main(args: Array[String]) {
        val SourceCsvPath =
          "/<YOUR-PATH-TO-TEST-CSV>/source.csv"
        val CkJdbcUrl =
          "jdbc:clickhouse://<clickhouse-host>:<port>/<database>"
        val CkUsername = "<YOUR-USERNAME>"
        val CkPassword = "<YOUR-PASSWORD>"
        val BatchSize = 500 // 設(shè)置您的batch size
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        val tEnv = StreamTableEnvironment.create(env)
    
        val csvTableSource = CsvTableSource
          .builder()
          .path(SourceCsvPath)
          .ignoreFirstLine()
          .fieldDelimiter(",")
          .field("name", Types.STRING)
          .field("age", Types.LONG)
          .field("sex", Types.STRING)
          .field("grade", Types.LONG)
          .field("rate", Types.FLOAT)
          .build()
    
        tEnv.registerTableSource("source", csvTableSource)
    
        val resultTable = tEnv.scan("source").select("name, grade, rate")
    
        val insertIntoCkSql =
          """
            |  INSERT INTO sink_table (
            |    name, grade, rate
            |  ) VALUES (
            |    ?, ?, ?
            |  )
          """.stripMargin
    
    //將數(shù)據(jù)寫入 ClickHouse Sink 
        val sink = JDBCAppendTableSink
          .builder()
          .setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
          .setDBUrl(CkJdbcUrl)
          .setUsername(CkUsername)
          .setPassword(CkPassword)
          .setQuery(insertIntoCkSql)
          .setBatchSize(BatchSize)
          .setParameterTypes(Types.STRING, Types.LONG, Types.FLOAT)
          .build()
    
        tEnv.registerTableSink(
          "sink",
          Array("name", "grade", "rate"),
          Array(Types.STRING, Types.LONG, Types.FLOAT),
          sink
        )
    
        tEnv.insertInto(resultTable, "sink")
    
        env.execute("Flink Table API to ClickHouse Example")
      }
    }

    參數(shù)說(shuō)明:

    • SourceCsvPath:源CSV文件路徑。

    • CkJdbcUrl:目標(biāo)ClickHouse集群地址。

    • CkUsername:目標(biāo)ClickHouse集群用戶名。

    • CkPassword:目標(biāo)ClickHouse集群對(duì)應(yīng)密碼。

  4. 編譯運(yùn)行。

    $ mvn clean package
    $ ${FLINK_HOME}/bin/flink run target/example-0.1.jar

Flink 1.11.0 + flink-connector-jdbc

Flink 1.11.0及之后版本需要采用flink-connector-jdbc+DataStream的方式寫入數(shù)據(jù)到ClickHouse。本節(jié)我們使用Maven及Flink 1.11.0版本為例。

  1. mvn archetype:generate命令創(chuàng)建項(xiàng)目,生成過(guò)程中會(huì)提示輸入group-id和artifact-id等。

    $ mvn archetype:generate \
          -DarchetypeGroupId=org.apache.flink \
          -DarchetypeArtifactId=flink-quickstart-scala \
          -DarchetypeVersion=1.11.0
  2. 編輯pom.xml中的<dependencies />小節(jié)添加依賴。

            <!--//添加Flink Table API相關(guān)的依賴 -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-common</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <!--//添加Flink JDBC Connector以及Clickhouse JDBC Driver相關(guān)的依賴 -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>ru.yandex.clickhouse</groupId>
                <artifactId>clickhouse-jdbc</artifactId>
                <version>0.2.4</version>
            </dependency>
  3. 創(chuàng)建數(shù)據(jù)寫入程序文件。

    示例程序使用 CsvTableSource讀入CSV文件產(chǎn)生Table Source,通過(guò)TableEnvironment.toAppendStream 將Table轉(zhuǎn)換為DataStream。使用JdbcSink將數(shù)據(jù)寫入到ClickHouse中。

    說(shuō)明
    • 由于ClickHouse單次插入的延遲比較高,我們需要設(shè)置BatchSize來(lái)批量插入數(shù)據(jù),提高性能。

    • 當(dāng)前版本的flink-connector-jdbc,使用Scala API調(diào)用JdbcSink時(shí)會(huì)出現(xiàn)lambda函數(shù)的序列化問(wèn)題。我們只能采用手動(dòng)實(shí)現(xiàn)interface的方式來(lái)傳入相關(guān)JDBC Statement build函數(shù)(class CkSinkBuilder)。

      class CkSinkBuilder extends JdbcStatementBuilder[(String, Long, Float)] {
        def accept(ps: PreparedStatement, v: (String, Long, Float)): Unit = {
          ps.setString(1, v._1)
          ps.setLong(2, v._2)
          ps.setFloat(3, v._3)
        }
      }
    package org.myorg.example
    
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.sources._
    import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
    import org.apache.flink.table.api._
    import org.apache.flink.types.Row
    import org.apache.flink.table.api.{
      TableEnvironment,
      TableSchema,
      Types,
      ValidationException
    }
    import org.apache.flink.connector.jdbc._
    import java.sql.PreparedStatement
    
    //手動(dòng)實(shí)現(xiàn)interface的方式來(lái)傳入相關(guān)JDBC Statement build函數(shù)
    class CkSinkBuilder extends JdbcStatementBuilder[(String, Long, Float)] {
      def accept(ps: PreparedStatement, v: (String, Long, Float)): Unit = {
        ps.setString(1, v._1)
        ps.setLong(2, v._2)
        ps.setFloat(3, v._3)
      }
    }
    
    object StreamingJob {
      def main(args: Array[String]) {
        val SourceCsvPath =
          "/<YOUR-PATH-TO-TEST-CSV>/source.csv"
        val CkJdbcUrl = "jdbc:clickhouse://<clickhouse-host>:<port>/<database>"
        val CkUsername = "<YOUR-USERNAME>"
        val CkPassword = "<YOUR-PASSWORD>"
        val BatchSize = 500 // 設(shè)置您的 batch size
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        val tEnv = StreamTableEnvironment.create(env)
    
        val csvTableSource = CsvTableSource
          .builder()
          .path(SourceCsvPath)
          .ignoreFirstLine()
          .fieldDelimiter(",")
          .field("name", Types.STRING)
          .field("age", Types.LONG)
          .field("sex", Types.STRING)
          .field("grade", Types.LONG)
          .field("rate", Types.FLOAT)
          .build()
    
        tEnv.registerTableSource("source", csvTableSource)
    
        val resultTable = tEnv.scan("source").select("name, grade, rate")
    
    //將Table轉(zhuǎn)換為DataStream
        val resultDataStream =
          tEnv.toAppendStream[(String, Long, Float)](resultTable)
    
        val insertIntoCkSql =
          """
            |  INSERT INTO sink_table (
            |    name, grade, rate
            |  ) VALUES (
            |    ?, ?, ?
            |  )
          """.stripMargin
    
    //將數(shù)據(jù)寫入ClickHouse JDBC Sink
        resultDataStream.addSink(
          JdbcSink.sink[(String, Long, Float)](
            insertIntoCkSql,
            new CkSinkBuilder,
            new JdbcExecutionOptions.Builder().withBatchSize(BatchSize).build(),
            new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
              .withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
              .withUrl(CkJdbcUrl)
              .withUsername(CkUsername)
              .withPassword(CkPassword)
              .build()
          )
        )
    
        env.execute("Flink DataStream to ClickHouse Example")
      }
    }

    參數(shù)說(shuō)明:

    • SourceCsvPath:源CSV文件路徑。

    • CkJdbcUrl:目標(biāo)ClickHouse集群地址。

    • CkUsername:目標(biāo)ClickHouse集群用戶名。

    • CkPassword:目標(biāo)ClickHouse集群對(duì)應(yīng)密碼。

  4. 編譯運(yùn)行。

    $ mvn clean package
    $ ${FLINK_HOME}/bin/flink run target/example-0.1.jar