本文將以LogHub數據同步至MaxCompute為例,為您介紹如何通過數據集成功能同步LogHub數據至數據集成已支持的目的端數據源(例如MaxCompute、OSS、Tablestore、RDBMS和DataHub等)。
前提條件
準備好相關的數據源,詳情請參見創建MaxCompute數據源。
準備需要同步的來源表與目標表。
背景信息
日志服務支持以下數據同步場景:
跨地域的LogHub與MaxCompute等數據源的數據同步。
不同阿里云賬號下的LogHub與MaxCompute等數據源間的數據同步。
同一阿里云賬號下的LogHub與MaxCompute等數據源間的數據同步。
公共云與金融云賬號下的LogHub與MaxCompute等數據源間的數據同步。
以B賬號進入數據集成配置同步任務,將A賬號的LogHub數據同步至B賬號的MaxCompute為例,跨阿里云賬號的特別說明如下:
使用A賬號的AccessKey ID和AccessKey Secret創建LogHub數據源。
此時B賬號可以同步A賬號下所有日志服務項目的數據。
使用A賬號下的RAM用戶A1的AccessKey ID和AccessKey Secret創建LogHub數據源。
A賬號為RAM用戶A1賦予日志服務的通用權限,即
AliyunLogFullAccess
和AliyunLogReadOnlyAccess
,詳情請參見創建RAM用戶及授權。說明為RAM賬號授予
AliyunLogFullAccess
和AliyunLogReadOnlyAccess
系統策略后,RAM賬號可以查詢主賬號下的所有日志服務。A賬號給RAM用戶A1賦予日志服務的自定義權限。
主賬號A進入
頁面,單擊創建權限策略。根據下述策略進行授權后,B賬號通過RAM用戶A1只能同步日志服務project_name1以及project_name2的數據。
{ "Version": "1", "Statement": [ { "Action": [ "log:Get*", "log:List*", "log:CreateConsumerGroup", "log:UpdateConsumerGroup", "log:DeleteConsumerGroup", "log:ListConsumerGroup", "log:ConsumerGroupUpdateCheckPoint", "log:ConsumerGroupHeartBeat", "log:GetConsumerGroupCheckPoint" ], "Resource": [ "acs:log:*:*:project/project_name1", "acs:log:*:*:project/project_name1/*", "acs:log:*:*:project/project_name2", "acs:log:*:*:project/project_name2/*" ], "Effect": "Allow" } ] }
新建LogHub數據源
登錄DataWorks控制臺,切換至目標地域后,單擊左側導航欄的 ,在下拉框中選擇對應工作空間后單擊進入數據集成。
單擊左側導航欄中的數據源。
在數據源列表頁面,單擊新增數據源。
在新增數據源對話框中,選擇數據源類型為LogHub。
填寫新增LogHub數據源對話框中的配置。
參數
描述
數據源名稱
數據源名稱必須以字母、數字、下劃線組合,且不能以數字和下劃線開頭。
數據源描述
對數據源進行簡單描述,不得超過80個字符。
LogHub Endpoint
LogHub的Endpoint,格式為
http://example.com
。詳情請參見服務入口。Project
輸入項目名稱。
AccessKey ID
訪問密鑰中的AccessKey ID,您可以進入控制臺的用戶信息管理頁面進行復制。
AccessKey Secret
訪問密鑰中的AccessKey Secret,相當于登錄密碼。
單擊測試連通性。
連通性測試通過后,單擊完成。
新建離線同步節點
在數據源頁面,單擊左上角的圖標,選擇 。
在數據開發頁面,鼠標懸停至圖標,單擊業務流程。
在新建業務流程對話框中,輸入業務流程名稱和描述,單擊新建。
展開業務流程,右鍵單擊數據集成,選擇 。
在新建節點對話框中,輸入節點名稱,并選擇路徑。
單擊確認,進入離線節點編輯頁面。
通過向導模式配置同步任務
在離線節點編輯頁面,選擇數據來源和數據去向。
參數
描述
數據來源
選擇LogHub。
數據源名稱
選擇以添加的日志服務數據源名稱。
資源組
選擇獨享數據集成資源組。
數據去向
選擇MaxCompute。
數據源名稱
選擇以添加的MaxCompute數據源名稱。
測試網絡連通性,數據來源和數據去向均可連通后,點擊下一步。
配置數據來源與數據去向具體同步的表等信息。
數據來源參數說明:
參數
描述
Logstore
目標日志庫的名稱。
日志開始時間
數據消費的開始時間位點,為時間范圍(左閉右開)的左邊界,為yyyyMMddHHmmss格式的時間字符串(例如20180111013000)。該參數可以和DataWorks的調度時間參數配合使用。
日志結束時間
數據消費的結束時間位點,為時間范圍(左閉右開)的右邊界,為yyyyMMddHHmmss格式的時間字符串(例如20180111013010)。該參數可以和DataWorks的調度時間參數配合使用。
批量條數
一次讀取的數據條數,默認為256。
說明您可以進行數據預覽,此處僅選擇LogHub中的幾條數據展現在預覽框。由于您在進行同步任務時,會指定開始時間和結束時間,會導致預覽結果和實際的同步結果不一致。
選擇字段的映射關系。
在通道控制中配置同步速率和臟數據策略等參數。
單擊右側調度配置,配置重跑屬性、調度資源組以及依賴的上游節點等參數。
說明依賴的上游節點配置為使用工作空間根節點。
確認當前節點的配置無誤后,單擊左上角的。
運行離線同步節點。
您可以通過以下兩種方式運行離線同步節點:
直接運行(一次性運行)
單擊節點編輯頁面工具欄中的圖標,直接在頁面運行。
說明運行之前需要配置自定義參數的具體取值。
調度運行
單擊右側調度配置,設置時間屬性,配置調度周期。
單擊節點編輯頁面工具欄中的圖標,然后單擊圖標,提交離線同步節點至調度系統,調度系統會根據配置的屬性,從第2天開始自動定時運行。
通過腳本模式配置離線同步節點
成功創建離線同步節點后,單擊工具欄中的轉換腳本。
單擊提示對話框中的確認,即可進入腳本模式進行開發。
單擊工具欄中的導入模板。
在導入模板對話框中,選擇從來源端的LogHub數據源同步至目標端的ODPS數據源的導入模板,單擊確認。
導入模板后,根據自身需求編輯代碼,示例腳本如下。
{ "type": "job", "version": "1.0", "configuration": { "reader": { "plugin": "loghub", "parameter": { "datasource": "loghub_lzz",//數據源名,需要和您添加的數據源名一致。 "logstore": "logstore-ut2",//目標日志庫的名字,LogStore是日志服務中日志數據的采集、存儲和查詢單元。 "beginDateTime": "${startTime}",//數據消費的開始時間位點,為時間范圍(左閉右開)的左邊界。 "endDateTime": "${endTime}",//數據消費的結束時間位點,為時間范圍(左閉右開)的右邊界。 "batchSize": 256,//一次讀取的數據條數,默認為256。 "splitPk": "", "column": [ "key1", "key2", "key3" ] } }, "writer": { "plugin": "odps", "parameter": { "datasource": "odps_source",//數據源名,需要和您添加的數據源名一致。 "table": "test",//目標表名。 "truncate": true, "partition": "",//分區信息。 "column": [//目標列名。 "key1", "key2", "key3" ] } }, "setting": { "speed": { "mbps": 8,//作業速率上限,單位MB/s。 "concurrent": 7//并發數。 } } } }