EMR-3.11.0及其后續(xù)版本,E-MapReduce支持Druid作為單獨(dú)的一種集群類(lèi)型。

背景信息

E-MapReduce將Druid作為單獨(dú)的集群類(lèi)型,主要基于以下幾方面的考慮:
  • E-MapReduce Druid可以完全脫離Hadoop來(lái)使用。
  • 大數(shù)據(jù)量情況下,E-MapReduce Druid對(duì)內(nèi)存要求比較高,尤其是Broker和Historical節(jié)點(diǎn)。E-MapReduce Druid本身資源不受YARN管控 ,在多服務(wù)運(yùn)行時(shí)容易發(fā)生資源搶奪。
  • Hadoop作為基礎(chǔ)設(shè)施,其規(guī)模通常較大,而E-MapReduce Druid集群較小,部署在同一集群上,由于規(guī)模不一致可能造成資源浪費(fèi),所以單獨(dú)部署會(huì)更加靈活。

創(chuàng)建Druid集群

創(chuàng)建集群時(shí)選擇Druid集群類(lèi)型即可,詳情請(qǐng)參見(jiàn)創(chuàng)建集群
說(shuō)明 E-MapReduce Druid集群自帶的HDFS和YARN僅供測(cè)試使用。對(duì)于生產(chǎn)環(huán)境,建議您使用專門(mén)的Hadoop集群。

配置集群

  • 配置HDFS作為E-MapReduce Druid的Deep Storage。

    對(duì)于獨(dú)立的E-MapReduce Druid集群,如果您需要存放索引數(shù)據(jù)至一個(gè)Hadoop集群的HDFS,請(qǐng)?jiān)O(shè)置兩個(gè)集群的連通性(詳情請(qǐng)參見(jiàn)與Hadoop集群交互)。

    在E-MapReduce Druid配置頁(yè)面的common.runtime頁(yè)簽,配置如下參數(shù)。
    參數(shù)描述
    druid.storage.type設(shè)置為hdfs
    druid.storage.storageDirectoryHDFS目錄,建議填寫(xiě)完整目錄。例如hdfs://emr-header-1.cluster-xxxxxxxx:9000/druid/segments
    說(shuō)明 如果Hadoop集群為HA集群,emr-header-1.cluster-xxxxx:9000需要改成emr-cluster,或者把端口9000改成8020。
  • 配置OSS作為E-MapReduce Druid的Deep Storage。
    在E-MapReduce Druid配置頁(yè)面的common.runtime頁(yè)簽,配置如下參數(shù)。
    參數(shù)描述
    druid.storage.type設(shè)置為hdfs
    druid.storage.storageDirectoryOSS目錄。 例如oss://emr-druid-cn-hangzhou/segments
  • 配置RDS作為E-MapReduce Druid的元數(shù)據(jù)存儲(chǔ)。

    默認(rèn)情況下E-MapReduce Druid利用emr-header-1節(jié)點(diǎn)上的本地MySQL數(shù)據(jù)庫(kù)作為元數(shù)據(jù)存儲(chǔ)。您也可以配置使用阿里云RDS作為元數(shù)據(jù)存儲(chǔ)。

    1. 本示例以RDS MySQL版為例介紹。在具體配置之前,請(qǐng)先確保:
      • 已創(chuàng)建RDS MySQL實(shí)例。
      • 為E-MapReduce Druid訪問(wèn)RDS MySQL創(chuàng)建了單獨(dú)的賬戶(不推薦使用root)。例如賬戶名為druid,密碼為druidpw。
      • 為E-MapReduce Druid元數(shù)據(jù)創(chuàng)建單獨(dú)的MySQL數(shù)據(jù)庫(kù)。例如數(shù)據(jù)庫(kù)名為druiddb。
      • 確保賬戶druid有權(quán)限訪問(wèn)druiddb。
    2. 在E-MapReduce Druid配置頁(yè)面的common.runtime頁(yè)簽,單擊自定義配置,添加如下三個(gè)配置項(xiàng)。
      參數(shù)描述
      druid.metadata.storage.connector.connectURI設(shè)置為jdbc:mysql://rm-xxxxx.mysql.rds.aliyuncs.com:3306/druiddb
      druid.metadata.storage.connector.user設(shè)置為druid
      druid.metadata.storage.connector.password設(shè)置為druidpw
    3. 依次單擊右上角的保存部署客戶端配置重啟All Components,配置即可生效。
    4. 登錄RDS管理控制臺(tái),查看druiddb創(chuàng)建表的情況。
  • 配置組件內(nèi)存。
    E-MapReduce Druid組件內(nèi)存設(shè)置主要包括兩方面:
    • 堆內(nèi)存。
    • direct內(nèi)存。

訪問(wèn)Druid web頁(yè)面

E-MapReduce Druid自帶三個(gè)Web頁(yè)面:
  • Overlord:http://emr-header-1.cluster-1234:18090,用于查看Task運(yùn)行情況。
  • Coordinator:http://emr-header-1.cluster-1234:18081,用于查看Segments存儲(chǔ)情況,并設(shè)置Rule加載和丟棄Segments。
  • Router(EMR-3.23.0及后續(xù)版本):http://emr-header-1.cluster-1234:18888,也稱之為Console,是新版Druid的統(tǒng)一入口。
訪問(wèn)E-MapReduce Druid的Web頁(yè)面:

批量索引

  • 與Hadoop集群交互
    您在創(chuàng)建E-MapReduce Druid集群時(shí)如果勾選了YARN,則系統(tǒng)會(huì)自動(dòng)為您配置好HDFS和YARN的交互,您無(wú)需額外操作。下面的介紹是E-MapReduce 配置獨(dú)立Druid集群與獨(dú)立Hadoop集群之間交互。例如,E-MapReduce Druid集群cluster id為1234,Hadoop集群cluster id為5678。
    說(shuō)明 請(qǐng)嚴(yán)格按照指導(dǎo)進(jìn)行操作,如果操作不當(dāng),集群可能就不會(huì)按照預(yù)期工作。
    非安全獨(dú)立Hadoop集群,請(qǐng)按照如下操作進(jìn)行:
    1. 確保集群間能夠通信(兩個(gè)集群在一個(gè)安全組下,或兩個(gè)集群在不同安全組,但兩個(gè)安全組之間配置了訪問(wèn)規(guī)則)。
    2. 在E-MapReduce Druid集群的每個(gè)節(jié)點(diǎn)的指定路徑下,放置一份Hadoop集群中/etc/ecm/hadoop-conf路徑下的core-site.xmlhdfs-site.xmlyarn-site.xmlmapred-site.xml文件。 這些文件在E-MapReduce Druid集群節(jié)點(diǎn)上放置的路徑與E-MapReduce集群的版本有關(guān),詳情說(shuō)明如下:
      • EMR-3.23.0及后續(xù)版本:/etc/ecm/druid-conf/druid/cluster/_common
      • EMR-3.23.0之前版本:/etc/ecm/druid-conf/druid/_common
      說(shuō)明 如果創(chuàng)建集群時(shí)選了自帶Hadoop,則在上述目錄下會(huì)有幾個(gè)軟鏈接指向自帶Hadoop的配置,請(qǐng)先移除這些軟鏈接。
    3. 將Hadoop集群的hosts寫(xiě)入到E-MapReduce Druid集群的hosts列表中,注意Hadoop集群的hostname應(yīng)采用長(zhǎng)名形式,如emr-header-1.cluster-xxxxxxxx,且最好將Hadoop的hosts放在本集群hosts之后,例如:
      ...
      10.157.*.*    emr-as.cn-hangzhou.aliyuncs.com
      10.157.*.*    eas.cn-hangzhou.emr.aliyuncs.com
      192.168.*.*   emr-worker-1.cluster-1234 emr-worker-1 emr-header-2.cluster-1234 emr-header-2 iZbp1h9g7boqo9x23qb****
      192.168.*.*   emr-worker-2.cluster-1234 emr-worker-2 emr-header-3.cluster-1234 emr-header-3 iZbp1eaa5819tkjx55y****
      192.168.*.*   emr-header-1.cluster-1234 emr-header-1 iZbp1e3zwuvnmakmsje****
      --以下為hadoop集群的hosts信息
      192.168.*.*   emr-worker-1.cluster-5678 emr-header-2.cluster-5678 iZbp195rj7zvx8qar4f****
      192.168.*.*   emr-worker-2.cluster-5678 emr-header-3.cluster-5678 iZbp15vy2rsxoegki4q****
      192.168.*.*   emr-header-1.cluster-5678 iZbp10tx4egw3wfnh5o****
    安全Hadoop集群,請(qǐng)按如下操作進(jìn)行:
    1. 確保集群間能夠通信(兩個(gè)集群在一個(gè)安全組下,或兩個(gè)集群在不同安全組,但兩個(gè)安全組之間配置了訪問(wèn)規(guī)則)。
    2. 在E-MapReduce Druid集群的每個(gè)節(jié)點(diǎn)的指定路徑下,放置一份Hadoop集群/etc/ecm/hadoop-conf路徑下的core-site.xmlhdfs-site.xmlyarn-site.xmlmapred-site.xml文件,并修改core-site.xmlhadoop.security.authentication.use.hasfalse
      其中,core-site.xmlhdfs-site.xmlyarn-site.xmlmapred-site.xml文件在E-MapReduce Druid集群節(jié)點(diǎn)上放置的路徑與E-MapReduce集群的版本有關(guān),詳情說(shuō)明如下:
      • EMR-3.23.0 及以上版本:/etc/ecm/druid-conf/druid/cluster/_common
      • EMR-3.23.0 以下版本:/etc/ecm/druid-conf/druid/_common
      說(shuō)明 如果創(chuàng)建集群時(shí)選了自帶Hadoop,則在上述目錄下會(huì)有幾個(gè)軟鏈接指向自帶Hadoop的配置,請(qǐng)先移除這些軟鏈接。

      其中,hadoop.security.authentication.use.has是客戶端配置,目的是讓用戶能夠使用AccessKey進(jìn)行認(rèn)證。如果使用Kerberos認(rèn)證方式,則需要disable該配置。

    3. 將Hadoop集群的hosts寫(xiě)入到E-MapReduce Druid集群每個(gè)節(jié)點(diǎn)的hosts列表中。

      Hadoop集群的hostname應(yīng)采用長(zhǎng)名形式,例如emr-header-1.cluster-xxxxxxxx,且最好將Hadoop的hosts放在本集群hosts之后。

    4. 設(shè)置兩個(gè)集群間的Kerberos跨域互信,詳情請(qǐng)參見(jiàn)跨域互信
    5. 在Hadoop集群的所有節(jié)點(diǎn)下都創(chuàng)建一個(gè)本地druid賬戶(sudo useradd -m -g hadoop druid),或者設(shè)置druid.auth.authenticator.kerberos.authToLocal,創(chuàng)建Kerberos賬戶到本地賬戶的映射規(guī)則。
      語(yǔ)法規(guī)則請(qǐng)參見(jiàn)Druid-Kerberos
      說(shuō)明 默認(rèn)在安全Hadoop集群中,所有Hadoop命令必須運(yùn)行在一個(gè)本地的賬戶中,該本地賬戶需要與Principal的name部分同名。YARN支持將一個(gè)Principal映射至本地一個(gè)賬戶。
    6. 重啟Druid服務(wù)。
  • 使用Hadoop對(duì)批量數(shù)據(jù)創(chuàng)建索引
    E-MapReduce Druid自帶了一個(gè)名為wikiticker的例子,在${DRUID_HOME}/quickstart/tutorial目錄下(${DRUID_HOME}默認(rèn)為/usr/lib/druid-current)。wikiticker文件(wikiticker-2015-09-12-sampled.json.gz)的每一行是一條記錄,每條記錄是個(gè)JSON對(duì)象。其格式如下所示。
    {
        "time": "2015-09-12T00:46:58.771Z",
        "channel": "#en.wikipedia",
        "cityName": null,
        "comment": "added project",
        "countryIsoCode": null,
        "countryName": null,
        "isAnonymous": false,
        "isMinor": false,
        "isNew": false,
        "isRobot": false,
        "isUnpatrolled": false,
        "metroCode": null,
        "namespace": "Talk",
        "page": "Talk:Oswald Tilghman",
        "regionIsoCode": null,
        "regionName": null,
        "user": "GELongstreet",
        "delta": 36,
        "added": 36,
        "deleted": 0
    }
    使用Hadoop對(duì)批量數(shù)據(jù)創(chuàng)建索引,請(qǐng)按照如下步驟進(jìn)行操作:
    1. 解壓該壓縮文件,并放置于HDFS的目錄下(例如hdfs://emr-header-1.cluster-5678:9000/druid)。 在Hadoop集群上執(zhí)行如下命令。
      ### 如果是在獨(dú)立Hadoop集群上操作,設(shè)置兩個(gè)集群互信之后需要拷貝一個(gè)druid.keytab到Hadoop集群再kinit。
       kinit -kt /etc/ecm/druid-conf/druid.keytab druid
       ###
       hdfs dfs -mkdir hdfs://emr-header-1.cluster-5678:9000/druid
       hdfs dfs -put ${DRUID_HOME}/quickstart/tutorial/wikiticker-2015-09-12-sampled.json hdfs://emr-header-1.cluster-5678:9000/druid
      說(shuō)明
      • 對(duì)于安全集群執(zhí)行HDFS命令前先修改/etc/ecm/hadoop-conf/core-site.xml中的hadoop.security.authentication.use.hasfalse
      • 請(qǐng)確保已經(jīng)在Hadoop集群每個(gè)節(jié)點(diǎn)上創(chuàng)建名為druid的Linux賬戶。
    2. 準(zhǔn)備數(shù)據(jù)索引任務(wù)文件${DRUID_HOME}/quickstart/tutorial/wikiticker-index.json,示例如下。
      {
           "type" : "index_hadoop",
           "spec" : {
               "ioConfig" : {
                   "type" : "hadoop",
                   "inputSpec" : {
                       "type" : "static",
                       "paths" : "hdfs://emr-header-1.cluster-5678:9000/druid/wikiticker-2015-09-12-sampled.json"
                   }
               },
               "dataSchema" : {
                   "dataSource" : "wikiticker",
                   "granularitySpec" : {
                       "type" : "uniform",
                       "segmentGranularity" : "day",
                       "queryGranularity" : "none",
                       "intervals" : ["2015-09-12/2015-09-13"]
                   },
                   "parser" : {
                       "type" : "hadoopyString",
                       "parseSpec" : {
                           "format" : "json",
                           "dimensionsSpec" : {
                               "dimensions" : [
                                   "channel",
                                   "cityName",
                                   "comment",
                                   "countryIsoCode",
                                   "countryName",
                                   "isAnonymous",
                                   "isMinor",
                                   "isNew",
                                   "isRobot",
                                   "isUnpatrolled",
                                   "metroCode",
                                   "namespace",
                                   "page",
                                   "regionIsoCode",
                                   "regionName",
                                   "user"
                               ]
                           },
                           "timestampSpec" : {
                               "format" : "auto",
                               "column" : "time"
                           }
                       }
                   },
                   "metricsSpec" : [
                       {
                           "name" : "count",
                           "type" : "count"
                       },
                       {
                           "name" : "added",
                           "type" : "longSum",
                           "fieldName" : "added"
                       },
                       {
                           "name" : "deleted",
                           "type" : "longSum",
                           "fieldName" : "deleted"
                       },
                       {
                           "name" : "delta",
                           "type" : "longSum",
                           "fieldName" : "delta"
                       },
                       {
                           "name" : "user_unique",
                           "type" : "hyperUnique",
                           "fieldName" : "user"
                       }
                   ]
               },
               "tuningConfig" : {
                   "type" : "hadoop",
                   "partitionsSpec" : {
                       "type" : "hashed",
                       "targetPartitionSize" : 5000000
                   },
                   "jobProperties" : {
                       "mapreduce.job.classloader": "true"
                   }
               }
           },
           "hadoopDependencyCoordinates": ["org.apache.hadoop:hadoop-client:2.8.5"]
       }
      參數(shù)描述
      spec.ioConfig.type設(shè)置為hadoop
      spec.ioConfig.inputSpec.paths文件路徑。
      tuningConfig.type設(shè)置為hadoop
      tuningConfig.jobProperties設(shè)置MapReduce Job的Classloader。
      hadoopDependencyCoordinates制定了Hadoop Client的版本。
    3. 在E-MapReduce Druid集群上運(yùn)行批量索引命令。
      cd ${DRUID_HOME}
      curl --negotiate -u:druid -b ~/cookies -c ~/cookies -XPOST -H 'Content-Type:application/json' -d @quickstart/tutorial/wikiticker-index.json http://emr-header-1.cluster-1234:18090/druid/indexer/v1/task

      其中- -negotiate-u-b-c等選項(xiàng)是針對(duì)安全E-MapReduce Druid集群的。Overlord的端口默認(rèn)為18090。

    4. 查看作業(yè)運(yùn)行情況。

      在瀏覽器訪問(wèn)http://emr-header-1.cluster-1234:18090/console.html,查看作業(yè)運(yùn)行情況。

    5. 根據(jù)Druid語(yǔ)法查詢數(shù)據(jù)。
      Druid有自己的查詢語(yǔ)法。請(qǐng)準(zhǔn)備一個(gè)JSON格式的查詢文件。如下所示為wikiticker數(shù)據(jù)的top N查詢文件(${DRUID_HOME}/quickstart/tutorial/wikiticker-top-pages.json)。
      {
           "queryType" : "topN",
           "dataSource" : "wikiticker",
           "intervals" : ["2015-09-12/2015-09-13"],
           "granularity" : "all",
           "dimension" : "page",
           "metric" : "edits",
           "threshold" : 25,
           "aggregations" : [
               {
                   "type" : "longSum",
                   "name" : "edits",
                   "fieldName" : "count"
               }
           ]
       }
      在命令行界面運(yùn)行下面的命令即可看到查詢結(jié)果。
      cd ${DRUID_HOME}
      curl --negotiate -u:druid -b ~/cookies -c ~/cookies -XPOST -H 'Content-Type:application/json' -d @quickstart/tutorial/wikiticker-top-pages.json 'http://emr-header-1.cluster-1234:18082/druid/v2/?pretty'

      其中- -negotiate-u-b-c等選項(xiàng)是針對(duì)安全E-MapReduce Druid集群的。

實(shí)時(shí)索引

對(duì)于數(shù)據(jù)從Kafka集群實(shí)時(shí)到E-MapReduce Druid集群進(jìn)行索引,推薦您使用Kafka Indexing Service擴(kuò)展,提供高可靠保證和Exactly-Once語(yǔ)義。詳情請(qǐng)參見(jiàn)Kafka Indexing Service

如果您的數(shù)據(jù)實(shí)時(shí)到了阿里云日志服務(wù)(SLS),并想用E-MapReduce Druid實(shí)時(shí)索引這部分?jǐn)?shù)據(jù),您可以使用SLS Indexing Service擴(kuò)展,提供高可靠保證和Exactly-Once語(yǔ)義。使用SLS Indexing Service避免了您額外建立并維護(hù)Kafka集群。詳情請(qǐng)參見(jiàn) SLS-Indexing-Service