使用JDBC Connector導(dǎo)入
當(dāng)您需要將Flink處理后的數(shù)據(jù)導(dǎo)入ClickHouse時(shí),本文為您提供了使用ClickHouse JDBC Connector的方法,使不同版本的Flink處理后的數(shù)據(jù)寫入到ClickHouse中。
背景信息
Flink在1.11.0版本對(duì)其JDBC Connector進(jìn)行了一次較大的重構(gòu):
重構(gòu)之前(1.10.1及之前版本),包名為flink-jdbc 。
重構(gòu)之后(1.11.0及之后版本),包名為flink-connector-jdbc 。
二者對(duì)Flink中以不同方式寫入ClickHouse Sink的支持情況如下:
API名稱 | flink-jdbc | flink-connector-jdbc |
DataStream | 不支持 | 支持 |
Table API (Legacy) | 支持 | 不支持 |
Table API (DDL) | 不支持 | 不支持 |
flink-connector-jdbc完全移除了對(duì)Table API (Legacy) 的支持,只能通過(guò)DDL的方式調(diào)用Table API。但是,Table DDL方式硬編碼了其所支持的JDBC Driver,不支持ClickHouse。本文分別以基于Flink 1.10.1 + flink-jdbc和Flink 1.11.0 + flink-connector-jdbc 為例,介紹Flink寫入ClickHouse的方法。
Flink 1.10.1 + flink-jdbc
Flink 1.10.1及之前版本需要采用flink-jdbc+Table API的方式寫入數(shù)據(jù)到ClickHouse。本節(jié)我們使用Maven及Flink 1.10.1版本為例。
用mvn archetype:generate命令創(chuàng)建項(xiàng)目,生成過(guò)程中根據(jù)提示輸入group-id和artifact-id等。
$ mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-scala \ -DarchetypeVersion=1.10.1
編輯pom.xml中的
<dependencies />
小節(jié)添加依賴。<!--//添加Flink Table API相關(guān)的依賴 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!--//添加Flink JDBC以及Clickhouse JDBC Driver相關(guān)的依賴 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>ru.yandex.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.2.4</version> </dependency>
創(chuàng)建數(shù)據(jù)寫入程序文件。
示例程序使用
CsvTableSource
讀入 CSV 文件產(chǎn)生Table Source,使用JDBCAppendTableSink
將數(shù)據(jù)寫入到ClickHouse Sink中。說(shuō)明由于ClickHouse單次插入的延遲比較高,我們需要設(shè)置
BatchSize
來(lái)批量插入數(shù)據(jù),提高性能。在JDBCAppendTableSink的實(shí)現(xiàn)中,若最后一批數(shù)據(jù)的數(shù)目不足
BatchSize
,則不會(huì)插入剩余數(shù)據(jù)。
package org.myorg.example import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.sources._ import org.apache.flink.table.api.scala.StreamTableEnvironment import org.apache.flink.table.api._ import org.apache.flink.types.Row import org.apache.flink.table.api.{ TableEnvironment, TableSchema, Types, ValidationException } import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink import org.apache.flink.api.common.typeinfo.TypeInformation object StreamingJob { def main(args: Array[String]) { val SourceCsvPath = "/<YOUR-PATH-TO-TEST-CSV>/source.csv" val CkJdbcUrl = "jdbc:clickhouse://<clickhouse-host>:<port>/<database>" val CkUsername = "<YOUR-USERNAME>" val CkPassword = "<YOUR-PASSWORD>" val BatchSize = 500 // 設(shè)置您的batch size val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) val csvTableSource = CsvTableSource .builder() .path(SourceCsvPath) .ignoreFirstLine() .fieldDelimiter(",") .field("name", Types.STRING) .field("age", Types.LONG) .field("sex", Types.STRING) .field("grade", Types.LONG) .field("rate", Types.FLOAT) .build() tEnv.registerTableSource("source", csvTableSource) val resultTable = tEnv.scan("source").select("name, grade, rate") val insertIntoCkSql = """ | INSERT INTO sink_table ( | name, grade, rate | ) VALUES ( | ?, ?, ? | ) """.stripMargin //將數(shù)據(jù)寫入 ClickHouse Sink val sink = JDBCAppendTableSink .builder() .setDrivername("ru.yandex.clickhouse.ClickHouseDriver") .setDBUrl(CkJdbcUrl) .setUsername(CkUsername) .setPassword(CkPassword) .setQuery(insertIntoCkSql) .setBatchSize(BatchSize) .setParameterTypes(Types.STRING, Types.LONG, Types.FLOAT) .build() tEnv.registerTableSink( "sink", Array("name", "grade", "rate"), Array(Types.STRING, Types.LONG, Types.FLOAT), sink ) tEnv.insertInto(resultTable, "sink") env.execute("Flink Table API to ClickHouse Example") } }
參數(shù)說(shuō)明:
SourceCsvPath
:源CSV文件路徑。CkJdbcUrl
:目標(biāo)ClickHouse集群地址。CkUsername
:目標(biāo)ClickHouse集群用戶名。CkPassword
:目標(biāo)ClickHouse集群對(duì)應(yīng)密碼。
編譯運(yùn)行。
$ mvn clean package $ ${FLINK_HOME}/bin/flink run target/example-0.1.jar
Flink 1.11.0 + flink-connector-jdbc
Flink 1.11.0及之后版本需要采用flink-connector-jdbc+DataStream的方式寫入數(shù)據(jù)到ClickHouse。本節(jié)我們使用Maven及Flink 1.11.0版本為例。
用mvn archetype:generate命令創(chuàng)建項(xiàng)目,生成過(guò)程中會(huì)提示輸入group-id和artifact-id等。
$ mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-scala \ -DarchetypeVersion=1.11.0
編輯pom.xml中的
<dependencies />
小節(jié)添加依賴。<!--//添加Flink Table API相關(guān)的依賴 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!--//添加Flink JDBC Connector以及Clickhouse JDBC Driver相關(guān)的依賴 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>ru.yandex.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.2.4</version> </dependency>
創(chuàng)建數(shù)據(jù)寫入程序文件。
示例程序使用
CsvTableSource
讀入CSV文件產(chǎn)生Table Source,通過(guò)TableEnvironment.toAppendStream
將Table轉(zhuǎn)換為DataStream。使用JdbcSink
將數(shù)據(jù)寫入到ClickHouse中。說(shuō)明由于ClickHouse單次插入的延遲比較高,我們需要設(shè)置
BatchSize
來(lái)批量插入數(shù)據(jù),提高性能。當(dāng)前版本的flink-connector-jdbc,使用Scala API調(diào)用JdbcSink時(shí)會(huì)出現(xiàn)lambda函數(shù)的序列化問(wèn)題。我們只能采用手動(dòng)實(shí)現(xiàn)interface的方式來(lái)傳入相關(guān)JDBC Statement build函數(shù)(
class CkSinkBuilder
)。class CkSinkBuilder extends JdbcStatementBuilder[(String, Long, Float)] { def accept(ps: PreparedStatement, v: (String, Long, Float)): Unit = { ps.setString(1, v._1) ps.setLong(2, v._2) ps.setFloat(3, v._3) } }
package org.myorg.example import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.sources._ import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.table.api._ import org.apache.flink.types.Row import org.apache.flink.table.api.{ TableEnvironment, TableSchema, Types, ValidationException } import org.apache.flink.connector.jdbc._ import java.sql.PreparedStatement //手動(dòng)實(shí)現(xiàn)interface的方式來(lái)傳入相關(guān)JDBC Statement build函數(shù) class CkSinkBuilder extends JdbcStatementBuilder[(String, Long, Float)] { def accept(ps: PreparedStatement, v: (String, Long, Float)): Unit = { ps.setString(1, v._1) ps.setLong(2, v._2) ps.setFloat(3, v._3) } } object StreamingJob { def main(args: Array[String]) { val SourceCsvPath = "/<YOUR-PATH-TO-TEST-CSV>/source.csv" val CkJdbcUrl = "jdbc:clickhouse://<clickhouse-host>:<port>/<database>" val CkUsername = "<YOUR-USERNAME>" val CkPassword = "<YOUR-PASSWORD>" val BatchSize = 500 // 設(shè)置您的 batch size val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) val csvTableSource = CsvTableSource .builder() .path(SourceCsvPath) .ignoreFirstLine() .fieldDelimiter(",") .field("name", Types.STRING) .field("age", Types.LONG) .field("sex", Types.STRING) .field("grade", Types.LONG) .field("rate", Types.FLOAT) .build() tEnv.registerTableSource("source", csvTableSource) val resultTable = tEnv.scan("source").select("name, grade, rate") //將Table轉(zhuǎn)換為DataStream val resultDataStream = tEnv.toAppendStream[(String, Long, Float)](resultTable) val insertIntoCkSql = """ | INSERT INTO sink_table ( | name, grade, rate | ) VALUES ( | ?, ?, ? | ) """.stripMargin //將數(shù)據(jù)寫入ClickHouse JDBC Sink resultDataStream.addSink( JdbcSink.sink[(String, Long, Float)]( insertIntoCkSql, new CkSinkBuilder, new JdbcExecutionOptions.Builder().withBatchSize(BatchSize).build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withDriverName("ru.yandex.clickhouse.ClickHouseDriver") .withUrl(CkJdbcUrl) .withUsername(CkUsername) .withPassword(CkPassword) .build() ) ) env.execute("Flink DataStream to ClickHouse Example") } }
參數(shù)說(shuō)明:
SourceCsvPath
:源CSV文件路徑。CkJdbcUrl
:目標(biāo)ClickHouse集群地址。CkUsername
:目標(biāo)ClickHouse集群用戶名。CkPassword
:目標(biāo)ClickHouse集群對(duì)應(yīng)密碼。
編譯運(yùn)行。
$ mvn clean package $ ${FLINK_HOME}/bin/flink run target/example-0.1.jar