本文中含有需要您注意的重要提示信息,忽略該信息可能對您的業務造成影響,請務必仔細閱讀。
當您需要將MaxCompute離線表中的數據同步到阿里云Elasticsearch中時,可以通過阿里云Logstash的logstash-input-maxcompute插件和管道配置功能實現。本文介紹對應的配置方法。
前提條件
您已完成以下操作:
開通阿里云MaxCompute產品,并完成創建項目、創建表和導入數據的任務。
創建阿里云Logstash實例,并安裝logstash-input-maxcompute插件。
具體操作步驟請參見步驟二:創建阿里云Logstash實例和安裝或卸載插件。
創建目標阿里云Elasticsearch實例,并開啟實例的自動創建索引功能。
具體操作步驟請參見創建阿里云Elasticsearch實例和快速訪問與配置。本文以6.7.0版本為例。
請確保網絡能夠互通。即MaxCompute、阿里云Logstash、阿里云Elasticsearch處于同一專有網絡VPC(Virtual Private Cloud)下。
說明您也可以使用公網環境的服務,前提是需要通過配置NAT網關實現與公網的連通,詳情請參見配置NAT公網數據傳輸。
配置Logstash管道
- 進入阿里云Elasticsearch控制臺的Logstash頁面。
- 進入目標實例。
- 在頂部菜單欄處,選擇地域。
- 在Logstash實例中單擊目標實例ID。
在左側導航欄,單擊管道管理。
單擊創建管道。
在創建管道任務頁面,輸入管道ID,并進行Config配置。
本文使用的Config配置如下。
input { maxcompute { access_id => "LTAIezFX********" access_key => "SCm8xcF3bwdRbGY7AdU1sH********" endpoint => "http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api" project_name => "mXXX" table_name => "sale_detail" partition => "sale_date='201911', region='hangzhou'" thread_num => 1 dirty_data_file => "/ssd/1/<Logstash實例ID>/logstash/data/XXXXX.txt" } } output { elasticsearch { hosts => ["http://es-cn-4591f5ja9000j****.elasticsearch.aliyuncs.com:9200"] index => "odps_index" user => "elastic" password => "Adm******" } }
表 1. input參數說明 參數
類型
是否必選
說明
access_id
string
是
您阿里云賬號的AccessKey ID。
access_key
string
是
您阿里云賬號的Access Key Secret。
endpoint
string
是
MaxCompute各地域Endpoint,請參見各地域Endpoint對照表(阿里云VPC網絡連接方式)。
project_name
string
是
MaxCompute的項目名稱。
table_name
string
是
MaxCompute的表名稱。
partition
string
是
分區字段。分區表按照字段來定義,例如:
sale_date='201911'
,region='hangzhou'
。thread_num
number
是
線程數,默認為1。
dirty_data_file
string
是
指定文件路徑,用于記錄處理失敗的日志。
說明文件路徑請指定為
/ssd/1/<Logstash實例ID>/logstash/data/
。重要以上配置僅作為測試使用,在實際業務中,請按照業務需求進行合理配置。Input插件支持的其他配置選項請參見Logstash Jdbc input plugin。
logstash-input-maxcompute插件會全量同步MaxCompute中的數據到阿里云Elasticsearch中。
Config配置詳情請參見Logstash配置文件說明。
單擊下一步,配置管道參數。
參數
說明
管道工作線程
并行執行管道的Filter和Output的工作線程數量。當事件出現積壓或CPU未飽和時,請考慮增大線程數,更好地使用CPU處理能力。默認值:實例的CPU核數。
管道批大小
單個工作線程在嘗試執行Filter和Output前,可以從Input收集的最大事件數目。較大的管道批大小可能會帶來較大的內存開銷。您可以設置LS_HEAP_SIZE變量,來增大JVM堆大小,從而有效使用該值。默認值:125。
管道批延遲
創建管道事件批時,將過小的批分派給管道工作線程之前,要等候每個事件的時長,單位為毫秒。默認值:50ms。
隊列類型
用于事件緩沖的內部排隊模型??蛇x值:
MEMORY:默認值?;趦却娴膫鹘y隊列。
PERSISTED:基于磁盤的ACKed隊列(持久隊列)。
隊列最大字節數
請確保該值小于您的磁盤總容量。默認值:1024 MB。
隊列檢查點寫入數
啟用持久性隊列時,在強制執行檢查點之前已寫入事件的最大數目。設置為0,表示無限制。默認值:1024。
警告配置完成后,需要保存并部署才能生效。保存并部署操作會觸發實例重啟,請在不影響業務的前提下,繼續執行以下步驟。
單擊保存或者保存并部署。
保存:將管道信息保存在Logstash里并觸發實例變更,配置不會生效。保存后,系統會返回管道管理頁面??稍?b data-tag="uicontrol" id="uicontrol-85q-rau-utk" class="uicontrol">管道列表區域,單擊操作列下的立即部署,觸發實例重啟,使配置生效。
保存并部署:保存并且部署后,會觸發實例重啟,使配置生效。
驗證結果
登錄目標阿里云Elasticsearch實例的Kibana控制臺。
具體步驟請參見登錄Kibana控制臺。
在左側導航欄,單擊Dev Tools(開發工具)。
在Console中,執行以下命令,查看同步成功的索引數據。
GET /odps_index/_search
運行成功后,結果如下。
說明如果運行失敗,可在日志查詢頁面查看相關日志進行排查修復,詳情請參見查詢日志。
切換到Monitoring(監控)頁面,單擊Indices(索引)。
在Indices頁面,查看同步成功的索引,以及寫入文檔的數量。