EMR-3.38.3及后續版本的EMR集群可以使用數據湖元數據DLF(Data Lake Formation)服務對集群數據進行統一管理,EMR中的Flink組件在開源Flink基礎上增加了與DLF適配的功能。本文為您介紹如何在EMR集群上通過Flink SQL創建Hive Catalog連接到DLF,并讀取Hive全量數據。
前提條件
使用限制
DataFlow集群和DataLake集群需要在同一VPC下。
創建的DataFlow集群需要為EMR-3.38.3后續版本。
操作流程
步驟一:數據準備
下載Hive作業需要的測試數據至OSS任意空目錄,數據上傳目錄將作為后續的外表地址使用。
本示例中上傳目錄為oss://<yourBucketName>/hive/userdata/,其中<yourBucketName>為您在OSS控制臺上創建的Bucket名稱。上傳文件詳細信息,請參見控制臺上傳文件。
在DLF控制臺創建元數據庫,詳情請參見創建元數據庫。
本示例中創建的元數據庫名稱為flink_dlf_hive,選擇路徑為oss://<yourBucketName>/flink_dlf_hive/db。
在DataLake集群中,查看已經創建的元數據庫。
通過SSH方式登錄DataLake集群,詳情請參見登錄集群。
執行以下命令,切換為hadoop用戶并進入Hive命令行。
su - hadoop hive
執行以下命令,查看庫信息。
desc database flink_dlf_hive;
說明命令中的
flink_dlf_hive
為上一步驟中創建的數據庫的名稱。OK flink_dlf_hive oss://aliyu****/flink_dlf_hive/db acs:ram::125046002175****:user/29915368510086**** USER Time taken: 0.069 seconds, Fetched: 1 row(s)
創建Hive的外表并驗證。
執行以下命令,創建Hive的外表。
USE flink_dlf_hive; set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; set hive.stats.autogather=false; DROP TABLE IF EXISTS emrusers; CREATE EXTERNAL TABLE emrusers ( userid INT, movieid INT, rating INT, unixtime STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE LOCATION 'oss://<yourBucketName>/<yourTableDir>/';
說明請替換命令中的
<yourBucketName>
為您實際在OSS控制臺上創建的Bucket名稱,<yourTableDir>
為您的數據實際存儲目標,本示例中的地址為oss://<yourBucketName>/hive/userdata/。在Hive命令行中,查詢數據進行驗證。
示例1
SELECT userid,movieid,rating,unix_timestamp() from emrusers limit 10;
示例2
SELECT movieid,count(userid) as usercount from emrusers group by movieid order by usercount desc limit 50;
可選:在DLF控制臺上,驗證表信息。
登錄數據湖構建控制臺。
在左側導航欄,選擇 ,單擊數據表。
在數據表頁面,通過庫名過濾,可以查看已創建的表信息。
步驟二:DataFlow集群連接DLF讀取Hive全量數據
通過SSH方式登錄DataFlow集群,詳情請參見登錄集群。
執行以下命令啟動Yarn Session。
yarn-session.sh --detached
上傳Hive配置文件到DataFlow集群的新建路徑下。
您可以執行以下命令,復制DataLake集群中的hive-site.xml文件到DataFlow集群。
scp root@<master-1-1節點內網的IP地址>:/etc/taihao-apps/hive-conf/hive-site.xml /root/test/
說明命令中的
<master-1-1節點內網的IP地址>
,您可以在EMR控制臺的集群管理的節點管理頁面查看,/root/test/為DataFlow集群的路徑,您可以根據實際情況修改。加載集群中內置的Hive Connector,啟動SQL客戶端。
當您的集群是EMR-3.43.0之前版本:
在Maven倉庫官網中下載依賴的JAR包,并上傳至DataFlow集群中。本示例是上傳到/root目錄,下載的JAR包為Jackson Core、Jackson Databind和Jackson Annotations。具體版本請根據您實際情況下載,本示例下載的JAR包為jackson-core-2.12.1.jar、jackson-databind-2.12.1.jar和jackson-annotations-2.12.1.jar。
執行以下命令,啟動SQL客戶端。
sql-client.sh -j /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/ververica-connector-hive-2.3.6-1.*-SNAPSHOT-jar-with-dependencies.jar -j /root/jackson-core-2.12.1.jar -j /root/jackson-databind-2.12.1.jar -j /root/jackson-annotations-2.12.1.jar
當您的集群是EMR-3.43.0及之后版本:
執行以下命令,啟動SQL客戶端。
sql-client.sh -j /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/ververica-connector-hive-2.3.6-1.*-SNAPSHOT-jar-with-dependencies.jar
在Flink SQL命令行中,創建Catalog。
CREATE CATALOG hive_catalog WITH ( 'type' = 'hive', 'default-database' = 'flink_dlf_hive', 'hive-version' = '2.3.6', 'hive-conf-dir' = '/root/test', 'hadoop-conf-dir' = '/etc/taihao-apps/hadoop-conf/' );
涉及參數如下表。
參數
描述
type
固定值為hive。
default-database
步驟一:數據準備中創建的數據庫的名稱。
本示例為flink_dlf_hive。
hive-version
固定值為2.3.6。
hive-conf-dir
前一步驟中復制的hive-site.xml所在的目錄。
本示例為/root/test。
hadoop-conf-dir
固定值為/etc/taihao-apps/hadoop-conf/。
返回信息如下,表示創建成功。
[INFO] Execute statement succeed.
在Flink SQL命令行中,查看DLF的數據庫。
執行以下命令,設置當前的Catalog為剛才創建的hive_catalog。
USE CATALOG hive_catalog;
執行以下命令,顯示當前Catalog下的數據庫。
SHOW DATABASES;
執行以下命令,設置當前的數據庫,本示例中數據庫為flink_dlf_hive。
USE flink_dlf_hive;
執行以下命令,查看當前數據庫中的表。
SHOW TABLES;
返回信息如下。
+------------+ | table name | +------------+ | emrusers | +------------+ 1 row in set
執行以下命令,查看表信息。
desc emrusers;
返回信息如下。
+----------+--------+------+-----+--------+-----------+ | name | type | null | key | extras | watermark | +----------+--------+------+-----+--------+-----------+ | userid | INT | true | | | | | movieid | INT | true | | | | | rating | INT | true | | | | | unixtime | STRING | true | | | | +----------+--------+------+-----+--------+-----------+ 4 rows in set
驗證讀取Hive全量數據。
在Flink SQL客戶端執行以下命令,創建表。
create table default_catalog.default_database.datahole(userid int, movieid int, ts timestamp) with ('connector' = 'blackhole');
執行以下命令,讀取Hive全量數據到blackhole。
insert into `default_catalog`.`default_database`.`datahole` select userid, movieid, CURRENT_TIMESTAMP as ts from `hive_catalog`.`flink_dlf_hive`.`emrusers`;
執行成功后,會返回已提交的Flink作業的Application ID與Job ID。返回如下類似信息。
通過Web UI查看作業狀態,詳情請參見通過Web UI查看作業狀態。
單擊目標作業的Application ID,可以查看作業運行的詳情,單擊Tracking URL所在行的鏈接,在左側導航欄中,選擇可以查看已完成的作業。 。