本文為您介紹如何用Spark SQL創建外部用戶信息表ods_user_info_d_spark
以及日志信息表ods_raw_log_d_spark
訪問存儲在私有OSS中的用戶與日志數據,通過DataWorks的EMR Spark SQL節點進行加工得到目標用戶畫像數據,閱讀本文后,您可以了解如何通過Spark SQL來計算和分析已同步的數據,完成數倉簡單數據加工場景。
前提條件
開始本實驗前,請先完成同步數據中的操作。
已通過EMR Spark SQL節點創建
ods_user_info_d_spark
外部表,可成功訪問同步至私有OSS的用戶基本信息。已通過EMR Spark SQL節點創建
ods_raw_log_d_spark
外部表,可成功訪問同步至私有OSS的日志信息。
注意事項
由于EMR Serverless Spark空間不支持注冊函數,無法通過注冊新函數的方式將日志信息進行切分并轉換IP為地域。本案例通過Spark SQL自有的函數對ods_raw_log_d_spark
日志表進行切分的方式生成dwd_log_info_di_spark
,從而實現進一步的用戶畫像分析。
章節目標
本小節將對ods_user_info_d_spark
和ods_raw_log_d_spark
外部表進行加工處理,并生成基本用戶畫像表。
通過Spark SQL對
ods_raw_log_d_spark
表進行處理,生成新的明細日志表dwd_log_info_di_spark
。利用明細日志表
dwd_log_info_di_spark
和用戶表ods_user_info_d_spark
的uid字段進行關聯,生成匯總用戶日志表dws_user_info_all_di_spark
。dws_user_info_all_di_spark
表,直接應用于數據消費,表數據較多,所以將其進一步加工為ads_user_info_1d_spark
表。
步驟一:設計工作流程
在上一同步數據章節中完成了用戶表用戶畫像分析(Spark版)的數據同步流程。在數據加工階段將會新增dwd_log_info_di_spark
節點對日志表進行細分,dws_user_info_all_di_spark
節點對日志明細表和用戶表進行連接生成新表后,再通過ads_user_info_1d_spark
節點對用戶日志明細表進一步處理,實現用戶畫像表的輸出。
進入數據開發。
登錄DataWorks控制臺,切換至目標地域后,單擊左側導航欄的 ,在下拉框中選擇對應工作空間后單擊進入數據開發。
在同步數據階段,已經成功用EMR Spark SQL節點創建外部表訪問私有OSS數據,接下來的流程的目標是對數據進行進一步加工,以輸出基本用戶畫像數據。
各層級節點以及工作邏輯。
在業務流程畫布中單擊新建節點,創建以下節點,以供加工數據使用。
節點分類
節點類型
節點名稱
(以最終產出表命名)
代碼邏輯
EMR
EMR Spark SQL
dwd_log_info_di_spark
將ods_raw_log_d_spark日志表進行拆分,生成新的日志表,以供后續關聯使用。
EMR
EMR Spark SQL
dws_user_info_all_di_spark
將用戶基本信息和初步加工后的日志數據進行匯總,合并為一張表。
EMR
EMR Spark SQL
ads_user_info_1d_spark
進一步加工產出基本用戶畫像。
流程DAG圖。
將節點組件拖拽至業務流程畫布,并通過拉線設置節點上下游依賴的方式,設計數據加工階段的業務流程。
步驟二:配置EMR Spark SQL節點
配置完成業務流程后,在EMR Spark SQL節點中利用Spark SQL函數對ods_raw_log_d_spark
表進行切分處理,從而通過對切分后的日志表與用戶表進行關聯成新的明細表之后進一步對數據表進行清洗、處理操作后,從而實現對不同用戶的一個用戶畫像。
配置dwd_log_info_di_spark節點
在業務流程面板,雙擊EMR Spark SQL節點dwd_log_info_di_spark
節點,進入dwd_log_info_di_spark
節點的編輯頁面,編寫處理上游ods_raw_log_d_spark
表,將日志明細數據寫入dwd_log_info_di_spark
表中。
配置代碼
雙擊
dwd_log_info_di_spark
節點,進入節點配置頁面,編寫如下語句。-- 場景:以下SQL為Spark SQL,通過Spark SQL函數將加載至Spark中的ods_raw_log_d_spark按"##@@"進行切分后生成多個字段,并寫入新表dwd_log_info_di_spark。 -- 補充: -- DataWorks提供調度參數,可實現調度場景下,將每日增量數據寫入目標表對應的業務分區內。 -- 在實際開發場景下,您可通過${變量名}格式定義代碼變量,并在調度配置頁面通過變量賦值調度參數的方式,實現調度場景下代碼動態入參。 CREATE TABLE IF NOT EXISTS dwd_log_info_di_spark ( ip STRING COMMENT 'ip地址', uid STRING COMMENT '用戶ID', tm STRING COMMENT '時間yyyymmddhh:mi:ss', status STRING COMMENT '服務器返回狀態碼', bytes STRING COMMENT '返回給客戶端的字節數', method STRING COMMENT'請求方法', url STRING COMMENT 'url', protocol STRING COMMENT '協議', referer STRING , device STRING, identity STRING ) PARTITIONED BY ( dt STRING ); ALTER TABLE dwd_log_info_di_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}'); INSERT OVERWRITE TABLE dwd_log_info_di_spark PARTITION (dt='${bizdate}') SELECT ip, uid, tm, status, bytes, regexp_extract(request, '(^[^ ]+) .*', 1) AS method, regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) AS url, regexp_extract(request, '.* ([^ ]+$)', 1) AS protocol, regexp_extract(referer, '^[^/]+://([^/]+){1}', 1) AS referer, CASE WHEN lower(agent) RLIKE 'android' THEN 'android' WHEN lower(agent) RLIKE 'iphone' THEN 'iphone' WHEN lower(agent) RLIKE 'ipad' THEN 'ipad' WHEN lower(agent) RLIKE 'macintosh' THEN 'macintosh' WHEN lower(agent) RLIKE 'windows phone' THEN 'windows_phone' WHEN lower(agent) RLIKE 'windows' THEN 'windows_pc' ELSE 'unknown' END AS device, CASE WHEN lower(agent) RLIKE '(bot|spider|crawler|slurp)' THEN 'crawler' WHEN lower(agent) RLIKE 'feed' OR regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) RLIKE 'feed' THEN 'feed' WHEN lower(agent) NOT RLIKE '(bot|spider|crawler|feed|slurp)' AND agent RLIKE '^(Mozilla|Opera)' AND regexp_extract(request, '^[^ ]+ (.*) [^ ]+$', 1) NOT RLIKE 'feed' THEN 'user' ELSE 'unknown' END AS identity FROM ( SELECT SPLIT(col, '##@@')[0] AS ip, SPLIT(col, '##@@')[1] AS uid, SPLIT(col, '##@@')[2] AS tm, SPLIT(col, '##@@')[3] AS request, SPLIT(col, '##@@')[4] AS status, SPLIT(col, '##@@')[5] AS bytes, SPLIT(col, '##@@')[6] AS referer, SPLIT(col, '##@@')[7] AS agent FROM ods_raw_log_d_spark WHERE dt = '${bizdate}' ) a;
配置調度屬性
配置項
圖示
新增參數
在調度參數項中單擊新增參數,添加:
參數名:
bizdate
參數值:
$[yyyymmdd-1]
詳情可參見:配置調度參數。
調度依賴
在調度依賴確認產出表已作為本節點輸出。
格式為
worksspacename.節點名
。詳情可參見:配置調度依賴。
說明時間屬性的配置,配置調度周期為日,無需單獨配置當前節點定時調度時間,當前節點每日調起時間由業務流程虛擬節點workshop_start_spark的定時調度時間控制,即每日00:30后才會調度。
您可在節點高級設置處配置Spark特有屬性參數,本實驗基于EMR Serverless Spark的Spark SQL任務的系統參數,可參考以下表格內容配置高級參數:
高級參數
配置說明
SERVERLESS_RELEASE_VERSION
變更Serverless Spark引擎版本,示例如下:
"SERVERLESS_RELEASE_VERSION": "esr-2.1 (Spark 3.3.1, Scala 2.12, Java Runtime)"
SERVERLESS_QUEUE_NAME
變更資源隊列,示例如下:
"SERVERLESS_QUEUE_NAME": "dev_queue"
SERVERLESS_SQL_COMPUTE
修改SQL Compute,示例如下:
"SERVERLESS_SQL_COMPUTE": "sc-b4356b0af6039727"
FLOW_SKIP_SQL_ANALYZE
SQL語句執行方式。取值如下:
true
:表示每次執行多條SQL語句。false
:表示每次執行一條SQL語句。
說明該參數僅支持用于數據開發環境測試運行流程。
其他
您可以直接在高級配置里追加自定義SPARK參數。例如,
spark.eventLog.enabled : false
,DataWorks會自動在最終下發到Spark工作空間的EMR集群代碼將代碼補全為Spark工作空間所支持的代碼,格式為:--conf key=value
。還支持配置全局Spark參數,詳情請參見設置全局Spark參數。
若您想查看更多Spark屬性參數設置,可參考Spark Configuration。
保存配置
本案例其他必填配置項,您可按需自行配置,配置完成后,在節點代碼編輯頁面,單擊工具欄中的按鈕,保存當前配置。
驗證日志表拆分情況
在確保上游節點以及本節點運行成功的情況下,在左側導航欄的臨時查詢中新建EMR Spark SQL臨時查詢,編寫SQL查看EMR Spark SQL節點創建的表是否正常產出。
-- 您需要將分區過濾條件更新為您當前操作的實際業務日期。例如,任務運行的日期為20230222,則業務日期為20230221,即任務運行日期的前一天。 SELECT * FROM dwd_log_info_di_spark WHERE dt ='業務日期';
說明在本教程在SQL中配置了調度參數
${bizdate}
,并將其賦值為T-1
。在離線計算場景下bizdate為業務交易發生的日期,也常被稱為業務日期(business date)。例如,今天統計前一天的營業額,此處的前一天指的是交易發生的日期,也就是業務日期。
配置dws_user_info_all_di_spark節點
基于dwd_log_info_di_spark
日志表和ods_user_info_d_spark
用戶表,通過uid對兩表進行關聯,產出新的用戶日志明細表dws_user_info_all_di_spark
。
編輯代碼
雙擊dws_user_info_all_di_spark節點,進入節點配置頁面。在節點編輯頁面,編寫如下語句。
-- 場景:以下SQL為Spark SQL,通過uid將dwd_log_info_di_spark和ods_user_info_d_spark進行關聯,并寫入對應的dt分區。 -- 補充: -- DataWorks提供調度參數,可實現調度場景下,將每日增量數據寫入目標表對應的業務分區內。 -- 在實際開發場景下,您可通過${變量名}格式定義代碼變量,并在調度配置頁面通過變量賦值調度參數的方式,實現調度場景下代碼動態入參。 CREATE TABLE IF NOT EXISTS dws_user_info_all_di_spark ( uid STRING COMMENT '用戶ID', gender STRING COMMENT '性別', age_range STRING COMMENT '年齡段', zodiac STRING COMMENT '星座', device STRING COMMENT '終端類型 ', method STRING COMMENT 'http請求類型', url STRING COMMENT 'url', `time` STRING COMMENT '時間yyyymmddhh:mi:ss' ) PARTITIONED BY (dt STRING); --添加分區 ALTER TABLE dws_user_info_all_di_spark ADD IF NOT EXISTS PARTITION (dt = '${bizdate}'); --插入user表與日志表數據 INSERT OVERWRITE TABLE dws_user_info_all_di_spark PARTITION (dt = '${bizdate}') SELECT COALESCE(a.uid, b.uid) AS uid, b.gender AS gender, b.age_range AS age_range, b.zodiac AS zodiac, a.device AS device, a.method AS method, a.url AS url, a.tm FROM ( SELECT * FROM dwd_log_info_di_spark WHERE dt='${bizdate}' ) a LEFT OUTER JOIN ( SELECT * FROM ods_user_info_d_spark WHERE dt='${bizdate}' ) b ON a.uid = b.uid;
配置調度屬性
配置項
配置內容
圖示
新增參數
在調度參數項中單擊新增參數,添加:
參數名:
bizdate
參數值:
$[yyyymmdd-1]
詳情可參見:配置調度參數。
調度依賴
在調度依賴確認產出表已作為本節點輸出。
格式為
worksspacename.節點名
。詳情可參見:配置調度依賴。
說明時間屬性的配置,配置調度周期為日,無需單獨配置當前節點定時調度時間,當前節點每日調起時間由業務流程虛擬節點workshop_start_spark的定時調度時間控制,即每日00:30后才會調度。
您可在節點高級設置處配置Spark特有屬性參數,本實驗基于EMR Serverless Spark的Spark SQL任務的系統參數,可參考以下表格內容配置高級參數:
高級參數
配置說明
SERVERLESS_RELEASE_VERSION
變更Serverless Spark引擎版本,示例如下:
"SERVERLESS_RELEASE_VERSION": "esr-2.1 (Spark 3.3.1, Scala 2.12, Java Runtime)"
SERVERLESS_QUEUE_NAME
變更資源隊列,示例如下:
"SERVERLESS_QUEUE_NAME": "dev_queue"
SERVERLESS_SQL_COMPUTE
修改SQL Compute,示例如下:
"SERVERLESS_SQL_COMPUTE": "sc-b4356b0af6039727"
FLOW_SKIP_SQL_ANALYZE
SQL語句執行方式。取值如下:
true
:表示每次執行多條SQL語句。false
:表示每次執行一條SQL語句。
說明該參數僅支持用于數據開發環境測試運行流程。
其他
您可以直接在高級配置里追加自定義SPARK參數。例如,
spark.eventLog.enabled : false
,DataWorks會自動在最終下發到Spark工作空間的EMR集群代碼將代碼補全為Spark工作空間所支持的代碼,格式為:--conf key=value
。還支持配置全局Spark參數,詳情請參見設置全局Spark參數。
若您想查看更多Spark屬性參數設置,可參考Spark Configuration。
保存配置
本案例其他必填配置項,您可按需自行配置,配置完成后,在節點代碼編輯頁面,單擊工具欄中的按鈕,保存當前配置。
驗證用戶日志明細表數據情況
在確保上游節點以及本節點運行成功的情況下,在左側導航欄的臨時查詢中新建EMR Spark SQL臨時查詢,編寫SQL查看EMR Spark SQL節點創建的表是否正常產出。
-- 您需要將分區過濾條件更新為您當前操作的實際業務日期。例如,任務運行的日期為20240808,則業務日期為20240807,即任務運行日期的前一天。 SELECT * FROM dws_user_info_all_di_spark WHERE dt ='業務日期';
說明在本教程在SQL中配置了調度參數
${bizdate}
,并將其賦值為T-1
。在離線計算場景下bizdate為業務交易發生的日期,也常被稱為業務日期(business date)。例如,今天統計前一天的營業額,此處的前一天指的是交易發生的日期,也就是業務日期。
配置ads_user_info_1d_spark節點
基于dws_user_info_all_di_spark
表,進行最大值、以及計數計算,產出ads_user_info_1d_spark
表作為用戶畫像表進行消費。
編輯代碼
雙擊ads_user_info_1d_spark節點,進入節點配置頁面。在節點編輯頁面,編寫如下語句。
-- 場景:以下SQL為Spark SQL,通過Spark SQL函數將Spark中的dws_user_info_all_di_spark表進一步的加工,并寫入新表ads_user_info_1d_spark。 -- 補充: -- DataWorks提供調度參數,可實現調度場景下,將每日增量數據寫入目標表對應的業務分區內。 -- 在實際開發場景下,您可通過${變量名}格式定義代碼變量,并在調度配置頁面通過變量賦值調度參數的方式,實現調度場景下代碼動態入參。 CREATE TABLE IF NOT EXISTS ads_user_info_1d_spark ( uid STRING COMMENT '用戶ID', device STRING COMMENT '終端類型 ', pv BIGINT COMMENT 'pv', gender STRING COMMENT '性別', age_range STRING COMMENT '年齡段', zodiac STRING COMMENT '星座' ) PARTITIONED BY ( dt STRING ); ALTER TABLE ads_user_info_1d_spark ADD IF NOT EXISTS PARTITION (dt='${bizdate}'); INSERT OVERWRITE TABLE ads_user_info_1d_spark PARTITION (dt='${bizdate}') SELECT uid , MAX(device) , COUNT(0) AS pv , MAX(gender) , MAX(age_range) , MAX(zodiac) FROM dws_user_info_all_di_spark WHERE dt = '${bizdate}' GROUP BY uid;
調度配置
配置項
配置內容
圖示
新增參數
在調度參數項中單擊新增參數,添加:
參數名:
bizdate
參數值:
$[yyyymmdd-1]
詳情可參見:配置調度參數。
調度依賴
在調度依賴確認產出表已作為本節點輸出。
格式為
worksspacename.節點名
。詳情可參見:配置調度依賴。
說明時間屬性的配置,配置調度周期為日,無需單獨配置當前節點定時調度時間,當前節點每日調起時間由業務流程虛擬節點workshop_start_spark的定時調度時間控制,即每日00:30后才會調度。
您可在節點高級設置處配置Spark特有屬性參數,本實驗基于EMR Serverless Spark的Spark SQL任務的系統參數,可參考以下表格內容配置高級參數:
高級參數
配置說明
SERVERLESS_RELEASE_VERSION
變更Serverless Spark引擎版本,示例如下:
"SERVERLESS_RELEASE_VERSION": "esr-2.1 (Spark 3.3.1, Scala 2.12, Java Runtime)"
SERVERLESS_QUEUE_NAME
變更資源隊列,示例如下:
"SERVERLESS_QUEUE_NAME": "dev_queue"
SERVERLESS_SQL_COMPUTE
修改SQL Compute,示例如下:
"SERVERLESS_SQL_COMPUTE": "sc-b4356b0af6039727"
FLOW_SKIP_SQL_ANALYZE
SQL語句執行方式。取值如下:
true
:表示每次執行多條SQL語句。false
:表示每次執行一條SQL語句。
說明該參數僅支持用于數據開發環境測試運行流程。
其他
您可以直接在高級配置里追加自定義SPARK參數。例如,
spark.eventLog.enabled : false
,DataWorks會自動在最終下發到Spark工作空間的EMR集群代碼將代碼補全為Spark工作空間所支持的代碼,格式為:--conf key=value
。還支持配置全局Spark參數,詳情請參見設置全局Spark參數。
若您想查看更多Spark屬性參數設置,可參考Spark Configuration。
保存配置
本案例其他必填配置項,您可按需自行配置,配置完成后,在節點代碼編輯頁面,單擊工具欄中的按鈕,保存當前配置。
驗證用戶畫像表數據情況
在確保上游節點以及本節點運行成功的情況下,在左側導航欄的臨時查詢中新建EMR Spark SQL臨時查詢,編寫SQL查看該節點創建的表是否正常產出。
-- 您需要將分區過濾條件更新為您當前操作的實際業務日期。例如,任務運行的日期為20230222,則業務日期為20230221,即任務運行日期的前一天。 SELECT * FROM ads_user_info_1d_spark WHERE dt ='業務日期';
說明在本教程在SQL中配置了調度參數
${bizdate}
,并將其賦值為T-1
。在離線計算場景下bizdate為業務交易發生的日期,也常被稱為業務日期(business date)。例如,今天統計前一天的營業額,此處的前一天指的是交易發生的日期,也就是業務日期。
步驟四:提交業務流程
完成業務流程所有配置后,測試該流程是否能正常運行,測試成功后,需要提交流程等待發布。
在業務流程的編輯頁面,單擊,運行業務流程。
待業務流程中的所有節點后出現,單擊,提交運行成功的業務流程。
選擇提交對話框中需要提交的節點,勾選忽略輸入輸出不一致的告警。
單擊提交。
提交成功后,即可在發布頁面發布流程節點。
步驟五:在生產環境運行任務
任務發布后,次日才會生成實例運行,您可以通過補數據來對已發布流程進行補數據操作,以便查看任務在生產環境是否可以運行,詳情可參見執行補數據并查看補數據實例(新版)。
任務發布成功后,單擊右上角的運維中心。
您也可以進入業務流程的編輯頁面,單擊工具欄中的前往運維,進入運維中心頁面。
單擊左側導航欄中的 ,進入周期任務頁面,單擊workshop_start_spark虛節點。
在右側的DAG圖中,右鍵單擊workshop_start_spark節點,選擇 。
勾選需要補數據的任務,輸入業務日期,單擊確定,自動跳轉至補數據實例頁面。
單擊刷新,直至SQL任務全部運行成功即可。