本教程以MySQL數據源中的用戶基本信息ods_user_info_d
表和HttpFile中的網站訪問日志數據user_log.txt
文件為例,通過數據集成離線同步任務分別同步至私有OSS中,再通過Spark SQL創建外部表來訪問私有OSS數據存儲。本章節旨在完成數據同步操作。
章節目標
本章節通過數據集成將平臺提供的MySQL數據源內的用戶基本信息數據與HttpFile數據源內的用戶網站訪問日志數據同步至私有OSS對象存儲創建的數據源中。
源端數據源類型
源端待同步數據
源端表結構
目標端數據源類型
MySQL
表:ods_user_info_d
用戶基本信息數據
uid 用戶名
gender 性別
age_range 年齡分段
zodiac 星座
OSS
HttpFile
文件:user_log.txt
用戶網站訪問日志數據
一行為一條用戶訪問記錄。
$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent"$http_referer" "$http_user_agent" [unknown_content];
OSS
完成同步任務后,通過EMR Spark SQL創建外部表來訪問私有OSS數據存儲。
操作步驟
步驟一:設計業務流程
本步驟內,將數據集成節點以及EMR Spark SQL 節點相結合,形成用戶畫像分析任務流程中獲取數據部分的流程。主要是通過ods_raw_log_d_2oss_spark
節點從HttpFile數據源獲取日志數據至私有OSS數據源中,再通過ods_raw_log_d_spark
節點生成簡單的日志外部表,從私有OSS數據存儲中獲取用戶日志數據。以及通過ods_user_info_d_2oss_spark
從MySQL數據源同步用戶基本信息至私有OSS數據源中,再通過ods_user_info_d_spark
實現外部表的創建,從私有OSS數據存儲中獲取用戶基本信息數據。
登錄DataWorks控制臺,切換至目標地域后,單擊左側導航欄的 ,在下拉框中選擇對應工作空間后單擊進入數據開發。
設計業務流程
新建業務流程。
數據開發需基于業務流程使用對應的開發組件進行具體開發操作。在創建節點之前,您需要先新建業務流程。具體操作方法可參見創建業務流程。
該業務流程的命名為:
用戶畫像分析_Spark
。設計業務流程。
業務流程新建完成后,將自動展開該業務流程畫布。請根據業務流程設計,在業務流程畫布中單擊新建節點,通過將節點組件拖拽至業務流程畫布,并通過拉線設置節點上下游依賴的方式,設計數據同步階段的業務流程。
在本教程中,由于虛擬節點和同步節點之間并無血緣關系,因此我們通過業務流程拉線的方式來設置節點的依賴關系。有關更多依賴關系設置方式的詳細信息,詳情請參見調度依賴配置指引。以下為各個節點的類型、命名以及作用的介紹。
節點分類
節點類型
節點命名
節點作用
通用
虛擬節點
workshop_start_spark
用于統籌管理整個用戶畫像分析業務流程,例如業務流程起調時間。當業務流程較復雜時,可使數據流轉路徑更清晰。該節點為空跑任務,無須編輯代碼。
數據集成
離線同步
ods_raw_log_d_2oss_spark
用于將HttpFile數據源存儲的用戶網站訪問記錄,通過離線同步的方式同步至私有OSS數據源中,以供后續Spark SQL獲取。
數據集成
離線同步
ods_user_info_d_2oss_spark
用于將MySQL數據源存儲的用戶基本信息數據,通過離線同步的方式同步至私有OSS數據源中,以供后續Spark SQL獲取。
EMR
EMR Spark SQL
ods_raw_log_d_spark
在EMR Spark SQL節點中,創建表
ods_raw_log_d_spark
,并通過該外部表訪問私有OSS中的用戶網站訪問記錄數據。EMR
EMR Spark SQL
ods_user_info_d_spark
在EMR Spark SQL節點中,創建表
ods_user_info_d_spark
,并通過該外部表訪問私有OSS中的用戶基本信息數據。
配置調度邏輯
本案例通過虛擬節點workshop_start_Spark
控制整個業務流程每天00:30調度執行,以下為虛擬節點關鍵調度配置,其他節點調度無須變更,實現邏輯詳情請參見:場景:如何配置業務流程定時時間。其他調度配置相關說明,請參見:任務調度屬性配置概述。
調度配置 | 圖片示例 | 說明 |
調度時間配置 | 虛擬節點配置調度時間為00:30,該虛擬節點會在每日00:30調起當前業務流程并執行。 | |
調度依賴配置 | 由于虛擬節點 |
DataWorks中的所有節點都需要依賴于上游節點,數據同步階段的所有任務都以虛擬節點workshop_start_spark
為依賴,通過workshop_start_spark
節點來觸發數據同步業務流程的執行。
步驟二:搭建同步鏈路
配置完成業務流程后,分別雙擊ods_user_info_d_2oss_spark
以及ods_raw_log_d_2oss_spark
數據集成節點,配置用戶數據同步至私有OSS和配置用戶日志同步至私有OSS,并且通過ods_raw_log_d_spark
和ods_user_info_d_spark
采用 Spark SQL代碼,實現通過Spark SQL創建的外表來訪問存儲于私有OSS的數據。
用戶數據與日志同步至OSS數據源
使用數據集成將平臺提供的用戶數據與用戶日志同步至私有OSS對象存儲的Bucket目錄下。
配置用戶日志同步至OSS
通過離線數據集成任務,實現從平臺的HttpFile數據源內的獲取用戶日志信息,同步至私有OSS數據源中。
同步HttpFile數據源的日志信息至自建的OSS。
在數據開發頁面,雙擊ods_raw_log_d_2oss_spark節點,進入節點配置頁面。
配置同步網絡鏈接。
完成以下網絡與資源配置后,單擊下一步,并根據界面提示完成連通性測試。
參數
描述
數據來源
數據來源:HttpFile
數據源名稱:user_behavior_analysis_httpfile
我的資源組
選擇已購買的Serverless資源組。
數據去向
數據去向:OSS
數據源名稱:選擇前文創建的私有OSS數據源,此處示例為test_g。
配置同步任務。
參數
描述
數據來源
文本類型:選擇text類型。
文件路徑:/user_log.txt。
列分隔符:輸入列分隔符為|。
壓縮格式:包括None、Gzip、Bzip2和Zip四種類型,此處選擇None。
是否跳過表頭:選擇No。
數據去向
文本類型:選擇text類型。
文件名(含路徑):根據您自建OSS的目錄進行輸入,示例為ods_raw_log_d/log_${bizdate}/log_${bizdate}.txt,其中ods_raw_log_d為您自建的目錄名,$bizdate表示獲取前一天的日期。
列分隔符:輸入列分隔符為|。
調度設置。
配置頁面單擊右側調度配置,可進入調度配置面板配置調度與節點信息。以下為配置的內容。
說明DataWorks提供調度參數,可實現在調度場景下,將每日數據寫入不同的OSS路徑及文件下,并以業務日期對路徑目錄與文件進行命名。
在實際場景下,您可以在數據去向的文件名(含目錄)配置中通過
${變量名}
格式自定義路徑中的變量,并通過在調度配置頁面為變量賦值調度參數的方式,實現調度場景下動態生成數據去向目錄與文件名。配置項
配置內容
圖示
調度參數
在調度參數項中單擊新增參數,添加:
參數名:
bizdate
參數值:
$[yyyymmdd-1]
詳情可參見:配置調度參數。
調度依賴
在調度依賴確認產出表已作為本節點輸出。
格式為
workspacename.節點名
。詳情可參見:配置調度依賴。
配置完成后,單擊工具欄中的圖標,進行保存。
配置用戶數據同步至OSS
通過離線數據集成任務,實現從平臺的MySQL數據源內的獲取用戶數據信息,同步至私有OSS數據源中。
在數據開發頁面,雙擊ods_user_info_d_2oss_spark節點,進入節點配置頁面。
配置同步網絡鏈接。
完成以下網絡與資源配置后,單擊下一步,并根據界面提示完成連通性測試。
參數
描述
數據來源
數據來源:MySQL
數據源名稱:user_behavior_analysis_mysql
我的資源組
選擇已購買的Serverless資源組。
數據去向
數據去向:OSS
數據源名稱:選擇前文創建的私有OSS數據源,此處示例為test_g。
配置同步任務。
參數
描述
數據來源
表:選擇數據源中的ods_user_info_d。
切分鍵:建議使用主鍵或有索引的列作為切分鍵,僅支持類型為整型的字段。此處設置切分鍵為uid。
數據去向
文本類型:選擇text類型。
文件名(含路徑):根據您自建OSS的目錄進行輸入,示例為ods_user_info_d/user_${bizdate}/user_${bizdate}.txt。其中ods_user_info_d為您自建的目錄名,$bizdate表示獲取前一天的日期。
列分隔符:輸入列分隔符為|。
調度設置
配置頁面單擊右側調度配置,可進入調度配置面板配置調度與節點信息。以下為配置的內容。
說明DataWorks提供調度參數,可實現在調度場景下,將每日數據寫入不同的OSS路徑及文件下,并以業務日期對路徑目錄與文件進行命名。
在實際場景下,您可以在數據去向的文件名(含目錄)配置中通過
${變量名}
格式自定義路徑中的變量,并通過在調度配置頁面通過為變量賦值調度參數的方式,實現調度場景下動態生成數據去向目錄與文件名。配置項
配置內容
圖示
調度參數
在調度參數項中單擊新增參數,添加:
參數名:
bizdate
參數值:
$[yyyymmdd-1]
詳情可參見:配置調度參數。
調度依賴
在調度依賴確認產出表已作為本節點輸出。
格式為
workspacename.節點名
。詳情可參見:配置調度依賴。
配置完成后,單擊工具欄中的圖標。
創建Spark外部表加載OSS數據
數據通過離線集成任務同步至私有OSS數據源后,基于生成的OSS文件,通過Spark SQL的create
語法創建ods_raw_log_d_spark
與ods_user_info_d_spark
表,并通過LOCATION
來獲取OSS中的用戶信息文件、用戶日志信息以供后續數據加工使用。
配置ods_raw_log_d_spark節點
基于通過EMR Spark SQL創建的外部表ods_raw_log_d_spark
,用LOCATION
來訪問離線數據集成任務寫入私有OSS對象存儲Bucket的日志信息。
代碼配置。
-- 場景:以下SQL為Spark SQL,通過EMR Spark SQL創建的外部表ods_raw_log_d_spark,用LOCATION來獲取離線數據集成任務寫入私有OSS對象存儲Bucket的日志信息,并添加對應的dt分區。 -- 補充: -- DataWorks提供調度參數,可實現調度場景下,將每日增量數據寫入目標表對應的業務分區內。 -- 在實際開發場景下,您可通過${變量名}格式定義代碼變量,并在調度配置頁面通過變量賦值調度參數的方式,實現調度場景下代碼動態入參。 CREATE EXTERNAL TABLE IF NOT EXISTS ods_raw_log_d_spark ( `col` STRING ) PARTITIONED BY ( dt STRING ) LOCATION 'oss://dw-emr-demo/ods_raw_log_d/log_${bizdate}/'; ALTER TABLE ods_raw_log_d_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}') LOCATION 'oss://dw-emr-demo/ods_raw_log_d/log_${bizdate}/' ;
說明上述代碼中的location為示例路徑,與之前配置離線同步任務時的數據去向相同,需要輸入您建立的文件路徑名稱,其中dw-emr-demo是您準備環境時創建的OSS Bucket域名。
配置調度配置。
為
ods_raw_log_d_spark
節點配置任務調度,通過配置的調度參數來獲取對應業務日期的私有OSS日志文件,并寫入同樣業務日期分區的Spark表內。配置項
配置內容
圖示
調度參數
在調度參數項中單擊新增參數,添加:
參數名:
bizdate
參數值:
$[yyyymmdd-1]
,詳情可參見:配置調度參數。調度依賴
在調度依賴確認產出表已作為本節點輸出。
格式為
workspacename.節點名
。詳情可參見:配置調度依賴。
說明本章節在SQL中配置了調度參數
${bizdate}
,并將其賦值為T-1
。在離線計算場景下bizdate為業務交易發生的日期,也常被稱為業務日期。例如,今天統計前一天的營業額,此處的前一天指的是交易發生的日期,也就是業務日期。完成配置后,單擊保存節點。
配置ods_user_info_d_spark節點
基于通過EMR Spark SQL節點創建的外部表ods_user_info_d_spark
,用LOCATION
來訪問離線數據集成任務寫入私有OSS對象存儲Bucket的用戶信息。
代碼配置。
-- 場景:以下SQL為Spark SQL,通過EMR Spark SQL節點創建的外部表ods_user_info_d_spark,用LOCATION來獲取離線數據集成任務寫入私有OSS對象存儲Bucket的用戶信息,并寫入對應的dt分區。 -- 補充: -- DataWorks提供調度參數,可實現調度場景下,將每日增量數據寫入目標表對應的業務分區內。 -- 在實際開發場景下,您可通過${變量名}格式定義代碼變量,并在調度配置頁面通過變量賦值調度參數的方式,實現調度場景下代碼動態入參。 CREATE EXTERNAL TABLE IF NOT EXISTS ods_user_info_d_spark ( `uid` STRING COMMENT '用戶ID' ,`gender` STRING COMMENT '性別' ,`age_range` STRING COMMENT '年齡段' ,`zodiac` STRING COMMENT '星座' ) PARTITIONED BY ( dt STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY'|' STORED AS TEXTFILE LOCATION 'oss://dw-emr-demo/ods_user_info_d/user_${bizdate}/' ; ALTER TABLE ods_user_info_d_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}') LOCATION'oss://dw-emr-demo/ods_user_info_d/user_${bizdate}/' ;
說明上述代碼中的location為示例路徑,與之前配置離線同步任務時的數據去向相同,需要輸入您建立的文件路徑名稱,其中dw-emr-demo是您準備環境時創建的OSS Bucket域名。
配置調度配置。
為
ods_user_info_d_spark
節點配置任務調度,通過配置的調度參數來獲取對應業務日期的私有OSS用戶信息文件,并寫入同樣業務日期分區的Spark表內。配置項
配置內容
圖示
調度參數
在調度參數項中單擊新增參數,添加:
參數名:
bizdate
參數值:
$[yyyymmdd-1]
,詳情可參見:配置調度參數。調度依賴
在調度依賴確認產出表已作為本節點輸出。
格式為
worksspacename.節點名
詳情可參見:配置調度依賴。
完成配置后,單擊保存節點。
步驟三:驗證同步數據
在確保該章節內的所有節點運行成功的情況下,在左側導航欄的臨時查詢中新建EMR Spark SQL臨時查詢,編寫SQL查看EMR Spark SQL節點創建的外部表是否正常產出。
-- 您需要將分區過濾條件更新為您當前操作的實際業務日期。例如,任務運行的日期為20240808,則業務日期為20240807,即任務運行日期的前一天。
SELECT * FROM ods_raw_log_d_spark WHERE dt ='業務日期';--查詢ods_raw_log_d_spark表
SELECT * FROM ods_user_info_d_spark WHERE dt ='業務日期';--查詢ods_user_info_d_spark表
在驗證同步數據的SQL中,可將WHERE條件替換為"dt = ${bizdate}"
,在臨時查詢任務中單擊帶參運行,為SQL占位符${bizdate}
賦值后運行即可。
后續步驟
現在,您已經完成了同步數據,您可以繼續下一個教程。在下一個教程中,您將學習將用戶基本信息數據、用戶網站訪問日志數據在Spark中進行加工處理。詳情請參見加工數據。