當您需要將Azure Event Hubs事件中心中的數據同步到阿里云Elasticsearch中時,可使用阿里云Logstash的管道配置功能實現。本文介紹具體的實現方法。
操作流程
步驟一:準備環境與實例
- 創建阿里云Elasticsearch實例,并開啟自動創建索引功能。本文使用7.10版本的實例。具體操作請參見創建阿里云Elasticsearch實例和配置YML參數。
- 創建阿里云Logstash實例并配置NAT公網數據傳輸。本文使用7.4版本的實例。具體操作請參見創建阿里云Logstash實例。由于阿里云Logstash實例部署在專有網絡VPC下,但在數據同步過程中,Logstash需要連接公網才能與Azure Event Hubs事件中心互通,因此需要通過配置NAT網關實現與公網連通,詳情請參見配置NAT公網數據傳輸。說明 對于自建的Logstash,需要購買與阿里云Elasticsearch在同一VPC下的ECS實例(已符合條件的ECS不需要重復購買,需要綁定彈性公網IP)。
- 準備Azure Event Hubs事件中心的自建環境。具體操作請參見Azure Event Hubs官方文檔。
步驟二:創建并配置Logstash管道
- 登錄阿里云Elasticsearch控制臺。
- 進入目標實例。
- 在頂部菜單欄處,選擇地域。
- 在左側導航欄,單擊Logstash實例,然后在Logstash實例中單擊目標實例ID。
- 在左側導航欄,單擊管道管理。
- 單擊創建管道。
- 在創建管道任務頁面,輸入管道ID并配置管道。本文使用的管道配置如下。
input { azure_event_hubs { event_hub_connections => ["Endpoint=sb://abc-****.****.cn/;SharedAccessKeyName=gem-****-es-consumer;SharedAccessKey=******;EntityPath=xxxxxx"] initial_position => "beginning" threads => 2 decorate_events => true consumer_group => "group-kl" storage_connection => "DefaultEndpointsProtocol=https;AccountName=xxxxx;AccountKey=*******;EndpointSuffix=core.****.cn" storage_container => "lettie_container" } } filter { } output { elasticsearch { hosts => ["es-cn-tl****5r50005adob.elasticsearch.aliyuncs.com:9200"] index => "test-log" password => "xxxxxx" user => "elastic" } }
表 1. input參數說明 參數 說明 event_hub_connections 標識要讀取的事件中心的連接字符串列表。連接字符串包括事件中心的EntityPath。更多詳細說明,請參見event_hub_connections。 說明 每一個事件中心都會定義一個event_hub_connections參數,其他參數在各事件中心之間共享。initial_position 從事件中心讀取數據的位置,可選值:beginning(默認)、end和look_back。更多詳細說明,請參見initial position 。 threads 處理事件的線程總數。更多詳細說明,請參見threads。 decorate_events 是否同步事件中心的元數據,包括事件中心名稱、consumer_group、processor_host、分區、偏移量、序列、時間戳和event_size。更多詳細說明,請參見decorate events。 consumer_group 用于讀取事件中心數據的消費者組。您需要專門為Logstash創建一個消費者組,并確保Logstash的所有節點都使用該消費者組,以便它們可以正常協同工作。更多詳細說明,請參見consumer group。 storage_connection Blob賬戶存儲的連接字符串。Blob賬戶存儲會保留重啟之間的偏移量,并確保Logstash的多個節點處理不同的分區。設置此值后,重啟將在處理中斷的地方開始。如果未設置此值,重啟將從initial_position設置的值的地方開始。更多詳細說明,請參見storage connection。 storage_container 用于持久保存偏移量并允許多個Logstash節點一起工作的存儲容器的名稱。更多詳細說明,請參見storage container。 說明 為避免覆蓋偏移量,建議使用不同的storage_container名稱。如果同一份數據分別寫入到不同的服務中,此參數需設置為不同的名稱。表 2. output參數說明 參數 說明 hosts Elasticsearch服務的訪問地址,需要設置為 http://<阿里云Elasticsearch實例ID>.elasticsearch.aliyuncs.com:9200
。index 遷移后的索引名。 user 訪問Elasticsearch服務的用戶名,默認為elastic。 password 對應用戶的密碼。對于阿里云Elasticsearch,elastic用戶的密碼在創建實例時設定,如果忘記可進行重置,重置密碼的注意事項和操作步驟請參見重置實例訪問密碼。 更多Config配置詳情請參見Logstash配置文件說明。
- 單擊下一步,配置管道參數。
參數 說明 管道工作線程 并行執行管道的Filter和Output的工作線程數量。當事件出現積壓或CPU未飽和時,請考慮增大線程數,更好地使用CPU處理能力。默認值:實例的CPU核數。 管道批大小 單個工作線程在嘗試執行Filter和Output前,可以從Input收集的最大事件數目。較大的管道批大小可能會帶來較大的內存開銷。您可以設置LS_HEAP_SIZE變量,來增大JVM堆大小,從而有效使用該值。默認值:125。 管道批延遲 創建管道事件批時,將過小的批分派給管道工作線程之前,要等候每個事件的時長,單位為毫秒。默認值:50ms。 隊列類型 用于事件緩沖的內部排隊模型。可選值: - MEMORY:默認值。基于內存的傳統隊列。
- PERSISTED:基于磁盤的ACKed隊列(持久隊列)。
隊列最大字節數 請確保該值小于您的磁盤總容量。默認值:1024 MB。 隊列檢查點寫入數 啟用持久性隊列時,在強制執行檢查點之前已寫入事件的最大數目。設置為0,表示無限制。默認值:1024。 警告 配置完成后,需要保存并部署才能生效。保存并部署操作會觸發實例重啟,請在不影響業務的前提下,繼續執行以下步驟。 - 單擊保存或者保存并部署。
- 保存:將管道信息保存在Logstash里并觸發實例變更,配置不會生效。保存后,系統會返回管道管理頁面。可在管道列表區域,單擊操作列下的立即部署,觸發實例重啟,使配置生效。
- 保存并部署:保存并且部署后,會觸發實例重啟,使配置生效。
步驟三:驗證結果
- 登錄目標阿里云Elasticsearch實例的Kibana控制臺,根據頁面提示進入Kibana主頁。登錄Kibana控制臺的具體操作,請參見登錄Kibana控制臺。說明 本文以阿里云Elasticsearch 7.10.0版本為例,其他版本操作可能略有差別,請以實際界面為準。
- 單擊右上角的Dev tools。
- 在Console中,執行如下命令,查看同步后數據。
GET test-log3/_search { "query":{ "match":{ "message":"L23" } } }
預期結果如下。