本文為您介紹如何通過DataWorks數據同步功能,遷移阿里云Elasticsearch集群上的數據至MaxCompute。
前提條件
已開通MaxCompute服務。
開通指導,詳情請參見開通MaxCompute和DataWorks。
已開通DataWorks服務。
開通指導,詳情請參見開通DataWorks服務。
新增MaxCompute數據源。詳情請參見創建MaxCompute數據源。
在DataWorks上已完成創建業務流程。
本例使用DataWorks簡單模式,詳情請參見創建業務流程。
已搭建阿里云Elasticsearch集群。
進行數據遷移前,您需要保證自己的阿里云Elasticsearch集群環境正常。搭建阿里云Elasticsearch集群的詳細過程,請參見快速入門。
本示例中阿里云Elasticsearch的具體配置如下:
地域:華東2(上海)
可用區:上海可用區B
版本:5.5.3 with Commercial Feature
背景信息
Elasticsearch是一個基于Lucene的搜索服務器,它提供了一個多用戶分布式的全文搜索引擎。Elasticsearch是遵從Apache開源條款的一款開源產品,是當前主流的企業級搜索引擎。
阿里云Elasticsearch提供Elasticsearch 5.5.3 with Commercial Feature、6.3.2 with Commercial Feature、6.7.0 with Commercial Feature及商業插件X-pack服務,致力于數據分析、數據搜索等場景服務。在開源Elasticsearch基礎上提供企業級權限管控、安全監控告警、自動報表生成等功能。
操作步驟
在Elasticsearch上創建源表。詳情請參見通過DataWorks將MaxCompute數據同步到阿里云ES。
在MaxCompute上創建目標表。
登錄DataWorks控制臺,切換至目標地域后,單擊左側導航欄的 ,在下拉框中選擇對應工作空間后單擊進入數據開發。
右鍵單擊您所創建的業務流程,選擇。
在彈出的新建表對話框中,填寫名稱,并單擊新建。
說明如果在數據開發中綁定多個MaxCompute數據源,則按需選擇MaxCompute引擎實例。
單擊表編輯頁面上方的DDL模式。
在DDL對話框,輸入如下建表語句,單擊生成表結構。
create table elastic2mc_bankdata ( age string, job string, marital string, education string, default string, housing string, loan string, contact string, month string, day of week string );
單擊提交到生產環境。
同步數據。
進入數據開發頁面,右鍵單擊指定業務流程,選擇 。
在新建節點對話框中,輸入名稱,并單擊確認。
在頂部菜單欄上,單擊圖標。
在腳本模式下,單擊頂部菜單欄上的圖標。
在導入模板對話框中選擇來源類型、數據源、目標類型及數據源,并單擊確定。
配置腳本。
示例代碼如下。代碼釋義請參見Elasticsearch Reader。
{ "type": "job", "steps": [ { "stepType": "elasticsearch", "parameter": { "retryCount": 3, "column": [ "age", "job", "marital", "education", "default", "housing", "loan", "contact", "month", "day_of_week", "duration", "campaign", "pdays", "previous", "poutcome", "emp_var_rate", "cons_price_idx", "cons_conf_idx", "euribor3m", "nr_employed", "y" ], "scroll": "1m", "index": "es_index", "pageSize": 1, "sort": { "age": "asc" }, "type": "elasticsearch", "connTimeOut": 1000, "retrySleepTime": 1000, "endpoint": "http://es-cn-xxxx.xxxx.xxxx.xxxx.com:9200", "password": "xxxx", "search": { "match_all": {} }, "readTimeOut": 5000, "username": "xxxx" }, "name": "Reader", "category": "reader" }, { "stepType": "odps", "parameter": { "partition": "", "truncate": true, "compress": false, "datasource": "odps_source",// MaxCompute數據源名稱 "column": [ "age", "job", "marital", "education", "default", "housing", "loan", "contact", "month", "day_of_week", "duration", "campaign", "pdays", "previous", "poutcome", "emp_var_rate", "cons_price_idx", "cons_conf_idx", "euribor3m", "nr_employed", "y" ], "emptyAsNull": false, "table": "elastic2mc_bankdata" }, "name": "Writer", "category": "writer" } ], "version": "2.0", "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] }, "setting": { "errorLimit": { "record": "0" }, "speed": { "throttle": false, "concurrent": 1, "dmu": 1 } } }
說明您可以在創建的阿里云Elasticsearch集群的基本信息中,查看公網地址和公網端口信息。
單擊圖標運行代碼。
您可以在運行日志查看運行結果。
查看結果。
右鍵單擊業務流程,選擇 。
在新建節點對話框中輸入節點名稱,并單擊提交。
在ODPS SQL節點編輯頁面輸入如下語句。
SELECT * FROM elastic2mc_bankdata;
單擊圖標運行代碼。
您可以在運行日志查看運行結果。