您可以通過console工具訪問DataHub項目并運行命令。本文為您介紹如何安裝、配置和運行客戶端并提供客戶端相關使用說明信息。
前提條件
在使用console工具前,請您確認已滿足如下條件:
待安裝console的設備已安裝Java 8或以上版本
?
安裝并配置console客戶端
下載命令行工具進行datahub_console.tar.gz并解壓
解壓下載的安裝包文件,得到bin、conf、lib文件夾
進入conf文件夾,在conf目錄中的datahub.properties文件填寫ak endpoint信息,文件內容如下
datahub.accessid=
datahub.accesskey=
datahub.endpoint=
參數詳細信息如下:
參數 | 是否必填 | 描述 | 示例 |
datahub.accessid | 是 | 阿里云賬號或RAM用戶的AccessKey ID。 | 無 |
datahub.accesskey | 是 | AccessKey ID對應的AccessKey Secret。 | 無 |
datahub.endpoint | 是 | DataHub服務的連接地址。 您需要根據創建DataHub項目時選擇的地域以及網絡連接方式配置Endpoint。各地域及網絡對應的Endpoint值,請參見域名列表 | https://dh-cn-hangzhou.aliyuncs.com |
運行console客戶端
console客戶端可通過如下方式啟動,您可以任選其中一種:
方式一:在console客戶端安裝路徑下的bin文件夾中,雙擊datahubcmd.bat文件(Windows系統),即可啟動console客戶端,返回如下信息,表明已經成功啟動
?
方式二:在系統的命令行執行窗口,進入console客戶端安裝路徑下的bin目錄,執行datahubcmd命令(Windows系統)或者 sh datahubcmd.sh (Linux系統或者Mac系統),即可啟動console客戶端,如下圖所示表明已經成功連接到DataHub
獲取命令幫助
您可以通過如下方式快速獲取console客戶端的命令幫助,您可以任選其中一種:
方式一:在console客戶端查看命令幫助信息
查看全部命令的幫助
help
通過指定關鍵字查看相關命令幫助信息。
例如獲取Topic列表
DataHub=>help lt
NAME
lt - List topic
SYNOPSYS
lt [-p] string
OPTIONS
-p string
projectName
[Mandatory]
方式二:在系統的命令行執行窗口,切換到console客戶端安裝路徑下的bin目錄,執行如下命令查看全部命令的幫助信息
...\bin>datahubcmd help
使用指南
Project操作
創建Project
-p: project名稱
-c: project描述
cp -p test_project -c test_comment
刪除Project
-p: project名稱
dp -p test_project
注意:刪除Project前需要刪除Project下所有的資源(包括Topic以及Topic中的訂閱和同步任務),否則刪除報錯
獲取Project列表
lp
Topic操作
創建Topic
-p:project名稱
-t: topic名稱
-m: 表示不同的Topic類型,BLOB代表創建BLOB類型的Topic,Tuple表示創建Tuple類型的Topic
-f: Tuple類型 Topic字段格式為[(fieldName,fieldType,isNull)],多個字段以逗號隔開
-s: shard數量
-l: 數據生命周期,范圍(1-7)天
-c: topic描述
ct -p test_project -t test_topic -m TUPLE -f [(name,string,true)] -s 3 -l 3 -c test_comment
刪除Topic
-p: project名稱
-t: topic名稱
dt -p test_project -t test_topic
獲取Topic信息
-p: project名稱
-t: topic名稱
gt -p test_project -t test_topic
導出Topic schema結構為JSON文件
-f:保存文件路徑
-p: project名稱
-t: topic名稱
gts -f filepath -p test_project -t test_topic
獲取Topic列表
-p: project名稱
lt -p test_project
導入Json文件創建Topic
-s: shard數量
-l: 數據生命周期,范圍(1-7)天
-f: 文件路徑
-p: project名稱
-t: topic名稱
rtt -s 3 -l 3 -c test_comment -f filepath -p test_project -t test_topic
修改Topic生命周期
-p: project名稱
-t: topic名稱
-l: topic生命周期
-c: topic描述
utl -p test_project -t test_topic -l 3 -c test_comment
Connector操作
創建ODPS connector
-p: project名稱
-t: topic名稱
-m: 參數表示不同的同步類型,目前同步到odps的有SYSTEM_TIME、USER_DEFINE、EVENT_TIME、META_TIME四種同步類型
-e: odps endpoint,請填寫經典網絡地址
-op: odps Project名稱
-oa: 訪問odps的accessId
-ok: 訪問odps的accessKey
-tr參數表示分區的時間間隔,console工具默認為60分鐘
-tf參數分區格式,ds 表示按天分區,ds hh表示按小時分區,ds hh mm表示按分鐘分區
coc -p test_project -t test_topic -m SYSTEM_TIME -e odpsEndpoint -op odpsProject -ot odpsTable -oa odpsAccessId -ok odpsAccessKey -tr 60 -c (field1,field2) -tf ds hh mm
同步odps新增字段
-p: project名稱
-t: topic名稱
-c: connectorId,可通過數據同步頁簽查看
-f: fieldName,新增字段名稱
acf -p test_project -t test_topic -c connectorId -f fieldName
創建同步到MYSQL/RDS connector
-p: project名稱
-t: topic名稱
-h: host,請填寫經典網絡地址
-po: port
-ty參數表示同步的類型,共有兩種
SINK_MYSQL表示創建同步到MySQL的connector
SINK_ADS 表示創建同步到ads的connector
-d: database名稱
-ta: table名稱
-u: userName
-pa: password
-ht表示插入方式,共有兩種
IGNORE
OVERWRITE
-n表示同步的字段,示例:(field1,field2)
cdc -p test_project -t test_topic -h host -po 3306 -ty mysql -d mysql_database -ta msyql_table -u username -pa password -ht IGNORE -n (field1,field2)
創建 DATAHUB connector
-p: project名稱
-t: topic名稱
-sp: sinkProject,數據導入的Project
-st: sinkTopic,數據導入的Topic
-m: 表示認證類型
AK表示通過AK認證,需要填寫accessId和accessKey
STS表示通過STS認證
cdhc -p test_project -t test_topic -sp sinkProject -st sinkTopic -m AK -i accessid k accessKey
創建FC connector
-p: project名稱
-t: topic名稱
-e: fc endpoint,請填寫經典網絡地址
-s: fc Service名稱
-f: fc Function名稱
-au: 認證方式
AK表示通過AK認證,需要填寫accessId和accessKey
STS表示通過STS認證
-n表示同步的字段,例如:(field1,field2)
cfc -p test_project -t test_topic -e endpoint -s service -f function -au AK -i accessId -k accessKey -n (field1,field2)
創建HOLOGRES connector
-p: project名稱
-t: topic名稱
-e: endpoint
-cl: 同步到hologres的字段
-au表示認證方式,目前同步到holo只支持AK認證
-m表示解析類型,選擇Delimiter需要指定lineDelimiter、parseData、columnDelimiter屬性,選擇IngormaticaJson需要指定parseData屬性
Delimiter
InformaticaJson
chc -p test_project -t test_topic -e endpoint -cl (field,field2) -au AK -hp holoProject -ht holoTopic -i accessId -k accessKey -m Delimiter -l 1 -b false -n (field1,field2)
創建OTSconnector
-p: project名稱
-t: topic名稱
-it: ots Instance名稱
-m表示認證類型,默認使用STS
AK表示通過AK認證,需要填寫accessId和accessKey
STS表示通過STS認證
-t: ots Table名稱
-wm表示寫入方式,支持兩種寫入方式
PUT
UPDATE
-c表示同步的字段,例如:(field1,field2)
cotsc -p test_project -t test_topic -i accessId -k accessKey -it instanceId -m AK -t table -wm PUT -c (field1,field2)
創建 OSS connector
-p: project名稱
-t: topic名稱
-b: oss Bucket名稱
-e: oss Endpoint名稱
-pr: 同步到OSS的目錄前綴
-tf:同步時間格式,例如:%Y%m%d%H%M表示按照分鐘級別進行分區
-tr: 分區的時間間隔
-c: 同步字段
csc -p test_project -t test_topic -b bucket -e endpoint -pr ossPrefix -tf ossTimeFormat -tr timeRange -c (f1,f2)
刪除connector(可傳入多個connectorid,以空格分隔)
-p: project名稱
-t: topic名稱
-c: connectorId,可通過數據同步頁簽查看
dc -p test_project -t test_topic -c connectorId
獲取connector詳情信息
-p: project名稱
-t: topic名稱
-c: connectorId,可通過數據同步頁簽查看
gc -p test_project -t test_topic -c connectorId
獲取某個Topic下面的connector列表
-p: project名稱
-t: topic名稱
lc -p test_project -t test_topic
重啟connector
-p: project名稱
-t: topic名稱
-c: connectorId,可通過數據同步頁簽查看
rc -p test_project -t test_topic -c connectorId
更新connector ak
-p: project名稱
-t: topic名稱
-ty: 同步類型 比如:SINK_ODPS
uca -p test_project -t test_topic -ty SINK_ODPS -a accessId -k accessKey
shard操作
合并shard
-p: project名稱
-t: topic名稱
-s: 要合并的shardId
-a: 要合并的另一個shardId
ms -p test_project -t test_topic -s shardId -a adjacentShardId
分裂shard
-p: project名稱
-t: topic名稱
-s: 要分裂的shardId
ss -p test_project -t test_topic -s shardId
獲取某個topic下面的所有shard
-p: project名稱
-t: topic名稱
ls -p test_project -t topicName
獲取同步shard的狀態
-p: project名稱
-t: topic名稱
-s: shardId
-c: connectorId,可通過數據同步頁簽查看
gcs -p test_project -t test_topic -s shardId -c connectorId
獲取訂閱消費的每個shard點位
-p: project名稱
-t: topic名稱
-s: 訂閱id
-i: shardId
gso -p test_project -t test_topic -s subid -i shardId
訂閱操作
創建訂閱
-p: project名稱
-t: topic名稱
-c: 訂閱描述
css -p test_project -t test_topic -c comment
刪除訂閱
-p: project名稱
-t: topic名稱
-s: 訂閱id
dsc -p test_project -t test_topic -s subId
查詢訂閱列表
-p: project名稱
-t: topic名稱
lss -p test_project -t test_topic
上傳下載數據
上傳數據
-f: 參數表示文件路徑,注意:windows路徑下請添加轉義符,例如:D:\\test\\test.txt
-p project名稱
-t: topic名稱
-m: 參數表示文本分隔符,目前支持逗號、空格分隔符
-n: 參數表示每次上傳batchsize大小,默認為1000
uf -f filepath -p test_topic -t test_topic -m "," -n 1000
示例: CSV文件上傳
下面以CSV文件為例,說明下如何使用console工具將CSV文件上傳到DataHub數據。CSV文件的格式如下所示:
1. 0,qe614c760fuk8judu01tn5x055rpt1,true,100.1,14321111111
2. 1,znv1py74o8ynn87k66o32ao4x875wi,true,100.1,14321111111
3. 2,7nm0mtpgo1q0ubuljjjx9b000ybltl,true,100.1,14321111111
4. 3,10t0n6pvonnan16279w848ukko5f6l,true,100.1,14321111111
5. 4,0ub584kw88s6dczd0mta7itmta10jo,true,100.1,14321111111
6. 5,1ltfpf0jt7fhvf0oy4lo8m3z62c940,true,100.1,14321111111
7. 6,zpqsfxqy9379lmcehd7q8kftntrozb,true,100.1,14321111111
8. 7,ce1ga9aln346xcj761c3iytshyzuxg,true,100.1,14321111111
9. 8,k5j2id9a0ko90cykl40s6ojq6gruyi,true,100.1,14321111111
10. 9,ns2zcx9bdip5y0aqd1tdicf7bkdmsm,true,100.1,14321111111
11. 10,54rs9cm1xau2fk66pzyz62tf9tsse4,true,100.1,14321111111
上述CSV文件中每行一條Record,按照(,)區分字段。保存在本地路徑/temp/test.csv中。DataHub Topic格式如下:
字段名稱 | 字段類型 |
id | BIGINT |
name | STRING |
gender | BOOLEAN |
salary | DOUBLE |
my_time | TIMESTAMP |
使用console工具命令如下
uf -f /temp/test.csv -p test_topic -t test_topic -m "," -n 1000
下載數據
-f: 參數表示文件路徑,注意:windows路徑下請添加轉義符,例如:D:\\test\\test.txt
-p: project名稱
-t: topic名稱
-s: shardId
-d: 訂閱id
-f: 下載路徑
-ti: 參數表示讀取該時間之后的點位,格式為:yyyy-mm-dd hh:mm:ss
-l: 參數表示每次讀取的數量
-g: 參數表示是否一直讀
0表示只讀一次,即獲取當前recordsize后不再消費
1表示一直讀取
down -p test_project -t test_topic -s shardId -d subId -f filePath -ti "1970-01-01 00:00:00" -l 100 -g 0
常見問題
腳本啟動失敗:windows環境下運行腳本檢查腳本路徑是否包含括號