通過ES-Hadoop將HDFS中的數(shù)據(jù)寫入Elasticsearch
ES-Hadoop是Elasticsearch推出的專門用于對接Hadoop生態(tài)的工具,可以讓數(shù)據(jù)在Elasticsearch和Hadoop之間雙向移動,無縫銜接Elasticsearch與Hadoop服務(wù),充分使用Elasticsearch的快速搜索及Hadoop批處理能力,實現(xiàn)交互式數(shù)據(jù)處理。對于一些較復雜的分析任務(wù),需要通過MapReduce任務(wù)讀取HDFS上的JSON文件,寫入Elasticsearch集群。本文介紹如何通過ES-Hadoop,借助MapReduce任務(wù)向Elasticsearch寫入數(shù)據(jù)。
操作流程
創(chuàng)建同一專有網(wǎng)絡(luò)下的阿里云Elasticsearch和E-MapReduce(以下簡稱EMR)實例、開啟Elasticsearch實例的自動創(chuàng)建索引功能、準備測試數(shù)據(jù)和Java環(huán)境。
下載ES-Hadoop安裝包,并上傳至EMR Master節(jié)點的HDFS目錄下。
創(chuàng)建Java Maven工程,并配置pom依賴。
編寫MapReduce寫數(shù)據(jù)到Elasticsearch的Java代碼,并打成Jar包上傳至EMR集群,最后運行代碼完成寫數(shù)據(jù)任務(wù)。
在Elasticsearch的Kibana控制臺上,查看通過MapReduce寫入的數(shù)據(jù)。
準備工作
創(chuàng)建阿里云Elasticsearch實例,并開啟自動創(chuàng)建索引功能。
具體操作步驟請參見創(chuàng)建阿里云Elasticsearch實例和配置YML參數(shù)。本文以6.7.0版本的實例為例。
重要在生產(chǎn)環(huán)境中,建議關(guān)閉自動創(chuàng)建索引功能,提前創(chuàng)建好索引和Mapping。由于本文僅用于測試,因此開啟了自動創(chuàng)建索引功能。
創(chuàng)建與Elasticsearch實例在同一專有網(wǎng)絡(luò)下的EMR實例。
實例配置如下:
產(chǎn)品版本:EMR-3.29.0
必選服務(wù):HDFS(2.8.5),其他服務(wù)保持默認
具體操作步驟請參見創(chuàng)建集群。
重要 Elasticsearch實例的私網(wǎng)訪問白名單默認為0.0.0.0/0,您可在安全配置頁面查看,如果未使用默認配置,您還需要在白名單中加入EMR集群的內(nèi)網(wǎng)IP地址:- 請參見查看集群列表與詳情,獲取EMR集群的內(nèi)網(wǎng)IP地址。
- 請參見配置實例公網(wǎng)或私網(wǎng)訪問白名單,配置Elasticsearch實例的VPC私網(wǎng)訪問白名單。
準備JSON測試數(shù)據(jù),將其寫入到map.json文件中,并上傳至HDFS的/tmp/hadoop-es目錄下。
本文使用的測試數(shù)據(jù)如下。
{"id": 1, "name": "zhangsan", "birth": "1990-01-01", "addr": "No.969, wenyixi Rd, yuhang, hangzhou"} {"id": 2, "name": "lisi", "birth": "1991-01-01", "addr": "No.556, xixi Rd, xihu, hangzhou"} {"id": 3, "name": "wangwu", "birth": "1992-01-01", "addr": "No.699 wangshang Rd, binjiang, hangzhou"}
準備Java環(huán)境,要求JDK版本為1.8.0及以上。
步驟一:上傳ES-Hadoop JAR包至HDFS
- 下載ES-Hadoop安裝包,其版本需要與Elasticsearch實例保持一致。
本文使用elasticsearch-hadoop-6.7.0.zip。
- 登錄E-MapReduce控制臺,獲取Master節(jié)點的IP地址,并通過SSH登錄對應(yīng)的ECS機器。
具體操作步驟請參見登錄集群。
將已下載的elasticsearch-hadoop-6.7.0.zip上傳至Master節(jié)點,并解壓獲得elasticsearch-hadoop-6.7.0.jar。
創(chuàng)建HDFS目錄,將elasticsearch-hadoop-6.7.0.jar上傳至該目錄下。
hadoop fs -mkdir /tmp/hadoop-es hadoop fs -put elasticsearch-hadoop-6.7.0/dist/elasticsearch-hadoop-6.7.0.jar /tmp/hadoop-es
步驟二:配置pom依賴
創(chuàng)建Java Maven工程,并將如下的pom依賴添加到Java工程的pom.xml文件中。
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>WriteToEsWithMR</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop-mr</artifactId>
<version>6.7.0</version>
</dependency>
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
</dependency>
</dependencies>
請確保pom依賴中版本與云服務(wù)對應(yīng)版本保持一致,例如elasticsearch-hadoop-mr版本與阿里云Elasticsearch版本一致;hadoop-hdfs與HDFS版本一致。
步驟三:編寫并運行MapReduce任務(wù)
編寫示例代碼。
以下代碼會讀取HDFS上/tmp/hadoop-es目錄下的JSON文件,并將這些JSON文件中的每一行作為一個文檔寫入Elasticsearch。寫入過程由EsOutputFormat在Map階段完成。
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.elasticsearch.hadoop.mr.EsOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class WriteToEsWithMR extends Configured implements Tool { public static class EsMapper extends Mapper<Object, Text, NullWritable, Text> { private Text doc = new Text(); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { if (value.getLength() > 0) { doc.set(value); System.out.println(value); context.write(NullWritable.get(), doc); } } } public int run(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); conf.setBoolean("mapreduce.map.speculative", false); conf.setBoolean("mapreduce.reduce.speculative", false); conf.set("es.nodes", "es-cn-4591jumei000u****.elasticsearch.aliyuncs.com"); conf.set("es.port","9200"); conf.set("es.net.http.auth.user", "elastic"); conf.set("es.net.http.auth.pass", "xxxxxx"); conf.set("es.nodes.wan.only", "true"); conf.set("es.nodes.discovery","false"); conf.set("es.input.use.sliced.partitions","false"); conf.set("es.resource", "maptest/_doc"); conf.set("es.input.json", "true"); Job job = Job.getInstance(conf); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(EsOutputFormat.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(Text.class); job.setJarByClass(WriteToEsWithMR.class); job.setMapperClass(EsMapper.class); FileInputFormat.setInputPaths(job, new Path(otherArgs[0])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int ret = ToolRunner.run(new WriteToEsWithMR(), args); System.exit(ret); } }
表 1. ES-Hadoop相關(guān)參數(shù)說明 參數(shù)
默認值
說明
es.nodes
localhost
指定阿里云Elasticsearch實例的訪問地址,建議使用私網(wǎng)地址,可在實例的基本信息頁面查看,詳情請參見查看實例的基本信息。
es.port
9200
Elasticsearch實例的訪問端口號。
es.net.http.auth.user
elastic
Elasticsearch實例的訪問用戶名。
說明 如果程序中指定elastic賬號訪問Elasticsearch服務(wù),后續(xù)在修改elastic賬號對應(yīng)密碼后需要一些時間來生效,在密碼生效期間會影響服務(wù)訪問,因此不建議通過elastic來訪問。建議在Kibana控制臺中創(chuàng)建一個符合預期的Role角色用戶進行訪問,詳情請參見通過Elasticsearch X-Pack角色管理實現(xiàn)用戶權(quán)限管控。es.net.http.auth.pass
/
Elasticsearch實例的訪問密碼。
es.nodes.wan.only
false
開啟Elasticsearch集群在云上使用虛擬IP進行連接,是否進行節(jié)點嗅探:
true:設(shè)置
false:不設(shè)置
es.nodes.discovery
true
是否禁用節(jié)點發(fā)現(xiàn):
true:禁用
false:不禁用
重要 使用阿里云Elasticsearch,必須將此參數(shù)設(shè)置為false。es.input.use.sliced.partitions
true
是否使用slice分區(qū):
- true:使用。設(shè)置為true,可能會導致索引在預讀階段的時間明顯變長,有時會遠遠超出查詢數(shù)據(jù)所耗費的時間。建議設(shè)置為false,以提高查詢效率。
- false:不使用。
es.index.auto.create
true
通過Hadoop組件向Elasticsearch集群寫入數(shù)據(jù),是否自動創(chuàng)建不存在的index:
true:自動創(chuàng)建
false:不會自動創(chuàng)建
es.resource
/
指定要讀寫的index和type。
es.input.json
false
輸入是否已經(jīng)是JSON格式:
true:是JSON格式
false:不是JSON格式
es.mapping.names
/
表字段與Elasticsearch的索引字段名映射。
es.read.metadata
false
操作Elasticsearch字段涉及到_id之類的內(nèi)部字段,請開啟此屬性。
更多的ES-Hadoop配置項說明,請參見官方配置說明。
將代碼打成Jar包,上傳至EMR客戶端機器(例如Gateway或EMR集群主節(jié)點)。
在EMR客戶端機器上,運行如下命令執(zhí)行MapReduce程序。
hadoop jar es-mapreduce-1.0-SNAPSHOT.jar /tmp/hadoop-es/map.json
說明es-mapreduce-1.0-SNAPSHOT.jar需要替換為您已上傳的Jar包名稱。
步驟四:驗證結(jié)果
登錄對應(yīng)阿里云Elasticsearch實例的Kibana控制臺。
具體操作步驟請參見登錄Kibana控制臺。
在左側(cè)導航欄,單擊Dev Tools。
在Console頁簽下,執(zhí)行以下命令,查看通過MapReduce任務(wù)寫入的數(shù)據(jù)。
GET maptest/_search { "query": { "match_all": {} } }
查詢成功后,返回結(jié)果如下。
總結(jié)
本文以阿里云Elasticsearch和EMR為例,介紹了如何通過ES-Hadoop,借助MapReduce任務(wù)向Elasticsearch寫入數(shù)據(jù)。相反,您也可以借助MapReduce任務(wù)查詢Elasticsearch數(shù)據(jù)。查詢配置和寫入類似,詳細說明可參見官方Reading data from Elasticsearch說明。