日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

通過ES-Hadoop將HDFS中的數(shù)據(jù)寫入Elasticsearch

ES-Hadoop是Elasticsearch推出的專門用于對接Hadoop生態(tài)的工具,可以讓數(shù)據(jù)在Elasticsearch和Hadoop之間雙向移動,無縫銜接Elasticsearch與Hadoop服務(wù),充分使用Elasticsearch的快速搜索及Hadoop批處理能力,實現(xiàn)交互式數(shù)據(jù)處理。對于一些較復雜的分析任務(wù),需要通過MapReduce任務(wù)讀取HDFS上的JSON文件,寫入Elasticsearch集群。本文介紹如何通過ES-Hadoop,借助MapReduce任務(wù)向Elasticsearch寫入數(shù)據(jù)。

操作流程

  1. 準備工作

    創(chuàng)建同一專有網(wǎng)絡(luò)下的阿里云Elasticsearch和E-MapReduce(以下簡稱EMR)實例、開啟Elasticsearch實例的自動創(chuàng)建索引功能、準備測試數(shù)據(jù)和Java環(huán)境。

  2. 步驟一:上傳ES-Hadoop JAR包至HDFS

    下載ES-Hadoop安裝包,并上傳至EMR Master節(jié)點的HDFS目錄下。

  3. 步驟二:配置pom依賴

    創(chuàng)建Java Maven工程,并配置pom依賴。

  4. 步驟三:編寫并運行MapReduce任務(wù)

    編寫MapReduce寫數(shù)據(jù)到Elasticsearch的Java代碼,并打成Jar包上傳至EMR集群,最后運行代碼完成寫數(shù)據(jù)任務(wù)。

  5. 步驟四:驗證結(jié)果

    在Elasticsearch的Kibana控制臺上,查看通過MapReduce寫入的數(shù)據(jù)。

準備工作

  1. 創(chuàng)建阿里云Elasticsearch實例,并開啟自動創(chuàng)建索引功能。

    具體操作步驟請參見創(chuàng)建阿里云Elasticsearch實例配置YML參數(shù)。本文以6.7.0版本的實例為例。

    重要

    在生產(chǎn)環(huán)境中,建議關(guān)閉自動創(chuàng)建索引功能,提前創(chuàng)建好索引和Mapping。由于本文僅用于測試,因此開啟了自動創(chuàng)建索引功能。

  2. 創(chuàng)建與Elasticsearch實例在同一專有網(wǎng)絡(luò)下的EMR實例。

    實例配置如下:

    • 產(chǎn)品版本:EMR-3.29.0

    • 必選服務(wù):HDFS(2.8.5),其他服務(wù)保持默認

    具體操作步驟請參見創(chuàng)建集群

    重要 Elasticsearch實例的私網(wǎng)訪問白名單默認為0.0.0.0/0,您可在安全配置頁面查看,如果未使用默認配置,您還需要在白名單中加入EMR集群的內(nèi)網(wǎng)IP地址:
  3. 準備JSON測試數(shù)據(jù),將其寫入到map.json文件中,并上傳至HDFS的/tmp/hadoop-es目錄下。

    本文使用的測試數(shù)據(jù)如下。

    {"id": 1, "name": "zhangsan", "birth": "1990-01-01", "addr": "No.969, wenyixi Rd, yuhang, hangzhou"}
    {"id": 2, "name": "lisi", "birth": "1991-01-01", "addr": "No.556, xixi Rd, xihu, hangzhou"}
    {"id": 3, "name": "wangwu", "birth": "1992-01-01", "addr": "No.699 wangshang Rd, binjiang, hangzhou"}
  4. 準備Java環(huán)境,要求JDK版本為1.8.0及以上。

步驟一:上傳ES-Hadoop JAR包至HDFS

  1. 下載ES-Hadoop安裝包,其版本需要與Elasticsearch實例保持一致。
    本文使用elasticsearch-hadoop-6.7.0.zip。
  2. 登錄E-MapReduce控制臺,獲取Master節(jié)點的IP地址,并通過SSH登錄對應(yīng)的ECS機器。
    具體操作步驟請參見登錄集群
  3. 將已下載的elasticsearch-hadoop-6.7.0.zip上傳至Master節(jié)點,并解壓獲得elasticsearch-hadoop-6.7.0.jar。

  4. 創(chuàng)建HDFS目錄,將elasticsearch-hadoop-6.7.0.jar上傳至該目錄下。

    hadoop fs -mkdir /tmp/hadoop-es
    hadoop fs -put elasticsearch-hadoop-6.7.0/dist/elasticsearch-hadoop-6.7.0.jar /tmp/hadoop-es

步驟二:配置pom依賴

創(chuàng)建Java Maven工程,并將如下的pom依賴添加到Java工程的pom.xml文件中。

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>2.4.1</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <transformers>
                            <transformer
                                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>WriteToEsWithMR</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

<dependencies>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>2.8.5</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
        <version>2.8.5</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>2.8.5</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-auth</artifactId>
        <version>2.8.5</version>
    </dependency>

    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-hadoop-mr</artifactId>
        <version>6.7.0</version>
    </dependency>

    <dependency>
        <groupId>commons-httpclient</groupId>
        <artifactId>commons-httpclient</artifactId>
        <version>3.1</version>
    </dependency>
</dependencies>
重要

請確保pom依賴中版本與云服務(wù)對應(yīng)版本保持一致,例如elasticsearch-hadoop-mr版本與阿里云Elasticsearch版本一致;hadoop-hdfs與HDFS版本一致。

步驟三:編寫并運行MapReduce任務(wù)

  1. 編寫示例代碼。

    以下代碼會讀取HDFS上/tmp/hadoop-es目錄下的JSON文件,并將這些JSON文件中的每一行作為一個文檔寫入Elasticsearch。寫入過程由EsOutputFormat在Map階段完成。

    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.elasticsearch.hadoop.mr.EsOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    
    public class WriteToEsWithMR extends Configured implements Tool {
    
        public static class EsMapper extends Mapper<Object, Text, NullWritable, Text> {
            private Text doc = new Text();
    
            @Override
            protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
                if (value.getLength() > 0) {
                    doc.set(value);
                    System.out.println(value);
                    context.write(NullWritable.get(), doc);
                }
            }
        }
        public int run(String[] args) throws Exception {
            Configuration conf = new Configuration();
            String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    
            conf.setBoolean("mapreduce.map.speculative", false);
            conf.setBoolean("mapreduce.reduce.speculative", false);
            conf.set("es.nodes", "es-cn-4591jumei000u****.elasticsearch.aliyuncs.com");
            conf.set("es.port","9200");
            conf.set("es.net.http.auth.user", "elastic");
            conf.set("es.net.http.auth.pass", "xxxxxx");
            conf.set("es.nodes.wan.only", "true");
            conf.set("es.nodes.discovery","false");
            conf.set("es.input.use.sliced.partitions","false");
            conf.set("es.resource", "maptest/_doc");
            conf.set("es.input.json", "true");
    
            Job job = Job.getInstance(conf);
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(EsOutputFormat.class);
            job.setMapOutputKeyClass(NullWritable.class);
            job.setMapOutputValueClass(Text.class);
            job.setJarByClass(WriteToEsWithMR.class);
            job.setMapperClass(EsMapper.class);
    
            FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
    
            return job.waitForCompletion(true) ? 0 : 1;
        }
    
        public static void main(String[] args) throws Exception {
            int ret = ToolRunner.run(new WriteToEsWithMR(), args);
            System.exit(ret);
        }
    }
    表 1. ES-Hadoop相關(guān)參數(shù)說明

    參數(shù)

    默認值

    說明

    es.nodes

    localhost

    指定阿里云Elasticsearch實例的訪問地址,建議使用私網(wǎng)地址,可在實例的基本信息頁面查看,詳情請參見查看實例的基本信息

    es.port

    9200

    Elasticsearch實例的訪問端口號。

    es.net.http.auth.user

    elastic

    Elasticsearch實例的訪問用戶名。

    說明 如果程序中指定elastic賬號訪問Elasticsearch服務(wù),后續(xù)在修改elastic賬號對應(yīng)密碼后需要一些時間來生效,在密碼生效期間會影響服務(wù)訪問,因此不建議通過elastic來訪問。建議在Kibana控制臺中創(chuàng)建一個符合預期的Role角色用戶進行訪問,詳情請參見通過Elasticsearch X-Pack角色管理實現(xiàn)用戶權(quán)限管控

    es.net.http.auth.pass

    /

    Elasticsearch實例的訪問密碼。

    es.nodes.wan.only

    false

    開啟Elasticsearch集群在云上使用虛擬IP進行連接,是否進行節(jié)點嗅探:

    • true:設(shè)置

    • false:不設(shè)置

    es.nodes.discovery

    true

    是否禁用節(jié)點發(fā)現(xiàn):

    • true:禁用

    • false:不禁用

    重要 使用阿里云Elasticsearch,必須將此參數(shù)設(shè)置為false。

    es.input.use.sliced.partitions

    true

    是否使用slice分區(qū):

    • true:使用。設(shè)置為true,可能會導致索引在預讀階段的時間明顯變長,有時會遠遠超出查詢數(shù)據(jù)所耗費的時間。建議設(shè)置為false,以提高查詢效率。
    • false:不使用。

    es.index.auto.create

    true

    通過Hadoop組件向Elasticsearch集群寫入數(shù)據(jù),是否自動創(chuàng)建不存在的index:

    • true:自動創(chuàng)建

    • false:不會自動創(chuàng)建

    es.resource

    /

    指定要讀寫的index和type。

    es.input.json

    false

    輸入是否已經(jīng)是JSON格式:

    • true:是JSON格式

    • false:不是JSON格式

    es.mapping.names

    /

    表字段與Elasticsearch的索引字段名映射。

    es.read.metadata

    false

    操作Elasticsearch字段涉及到_id之類的內(nèi)部字段,請開啟此屬性。

    更多的ES-Hadoop配置項說明,請參見官方配置說明

  2. 將代碼打成Jar包,上傳至EMR客戶端機器(例如Gateway或EMR集群主節(jié)點)。

  3. 在EMR客戶端機器上,運行如下命令執(zhí)行MapReduce程序。

    hadoop jar es-mapreduce-1.0-SNAPSHOT.jar /tmp/hadoop-es/map.json
    說明

    es-mapreduce-1.0-SNAPSHOT.jar需要替換為您已上傳的Jar包名稱。

步驟四:驗證結(jié)果

  1. 登錄對應(yīng)阿里云Elasticsearch實例的Kibana控制臺。

    具體操作步驟請參見登錄Kibana控制臺

  2. 在左側(cè)導航欄,單擊Dev Tools

  3. Console頁簽下,執(zhí)行以下命令,查看通過MapReduce任務(wù)寫入的數(shù)據(jù)。

    GET maptest/_search
    {
      "query": {
        "match_all": {}
      }
    }

    查詢成功后,返回結(jié)果如下。返回結(jié)果

總結(jié)

本文以阿里云Elasticsearch和EMR為例,介紹了如何通過ES-Hadoop,借助MapReduce任務(wù)向Elasticsearch寫入數(shù)據(jù)。相反,您也可以借助MapReduce任務(wù)查詢Elasticsearch數(shù)據(jù)。查詢配置和寫入類似,詳細說明可參見官方Reading data from Elasticsearch說明。