本文為您介紹如何使用Flume同步EMR Kafka集群的數據至阿里云OSS-HDFS服務。
前提條件
已開通并授權訪問OSS-HDFS服務。具體操作,請參見開通并授權訪問OSS-HDFS服務。
已創建DataLake集群,并選擇了Flume服務。具體操作,請參見創建集群。
已創建DataFlow集群,并選擇了Kafka服務。具體操作,請參見創建集群。
操作步驟
配置Flume。
進入Flume的配置頁面。
在頂部菜單欄處,根據實際情況選擇地域和資源組。
在EMR on ECS頁面,單擊目標集群操作列的集群服務。
在集群服務頁簽,單擊FLUME服務區域的配置。
設置JVM最大可用內存(Xmx)。
Flume向OSS-HDFS寫入數據時需要占用較大的JVM內存,建議增加Flume Agent的Xmx。具體步驟如下:
單擊flume-env.sh頁簽。
本文采用了全局配置方式。如果您希望按照節點配置,可以在FLUME服務配置頁面的下拉列表中選擇獨立節點配置。
修改JAVA_OPTS的參數值。
例如,JVM最大可用內存設置為1 GB,則參數值修改為-Xmx1g。
單擊保存。
修改flume-conf.properties配置。
單擊flume-conf.properties頁簽。
本文采用了全局配置方式。如果您希望按照節點配置,可以在FLUME服務配置頁面的下拉列表中選擇獨立節點配置。
在flume-conf.properties右側,輸入以下配置項。
說明以下示例中的default-agent的值需與FLUME服務配置頁面的agent_name參數值保持一致。
default-agent.sources = source1 default-agent.sinks = k1 default-agent.channels = c1 default-agent.sources.source1.type = org.apache.flume.source.kafka.KafkaSource default-agent.sources.source1.channels = c1 default-agent.sources.source1.kafka.bootstrap.servers = <kafka-host1:port1,kafka-host2:port2...> default-agent.sources.source1.kafka.topics = flume-test default-agent.sources.source1.kafka.consumer.group.id = flume-test-group default-agent.sinks.k1.type = hdfs default-agent.sinks.k1.hdfs.path = oss://<examplebucket>.<exampleregion>.oss-dls.aliyuncs.com/<exampledir> default-agent.sinks.k1.hdfs.fileType=DataStream # Use a channel which buffers events in memory default-agent.channels.c1.type = memory default-agent.channels.c1.capacity = 100 default-agent.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel default-agent.sources.source1.channels = c1 default-agent.sinks.k1.channel = c1
參數
描述
default-agent.sources.source1.kafka.bootstrap.servers
Kafka集群Broker的Host和端口號。
default-agent.sinks.k1.hdfs.path
OSS-HDFS的路徑。填寫格式為
oss://<examplebucket>.<exampleregion>.oss-dls.aliyuncs.com/<exampledir>
。示例值為oss://flume-test.cn-hangzhou.oss-dls.aliyuncs.com/result。各參數說明如下:
<examplebucket>:填寫已開啟OSS-HDFS服務的Bucket名稱。
<exampleregion>:填寫Bucket所在的地域ID。
<exampledir>:填寫OSS-HDFS服務的目錄名稱。
default-agent.channels.c1.capacity
通道中存儲的最大事件數。請根據實際環境修改該參數值。
default-agent.channels.c1.transactionCapacity
每個事務通道將從源接收或提供給接收器的最大事件數。請根據實際環境修改該參數值。
單擊保存。
測試數據同步情況。
- 通過SSH方式連接DataFlow集群,詳情請參見登錄集群。
- 創建名稱為flume-test的Topic。
kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper master-1-1:2181/emr-kafka --topic flume-test --create
生成測試數據。
kafka-console-producer.sh --topic flume-test --broker-list master-1-1:9092
例如,輸入
abc
并回車。在oss://flume-test.cn-hangzhou.oss-dls.aliyuncs.com/result路徑下會以當前時間的時間戳(毫秒)為后綴生成格式為FlumeData.xxxx的文件。