基于MaxCompute 2.0計算引擎,MaxCompute在UDF框架中引入了新的擴展機制UDJ(User Defined Join),來實現靈活的跨表、多表自定義操作,同時減少通過MapReduce等方式對分布式系統底層細節的操作。
背景信息
MaxCompute內置了多種Join操作,包括Inner/Right Join、Outer/Left Join、Outer/Full Join、Outer/Semi/Anti-semi Join等。這些內置的Join操作功能強大,但由于其標準的Join實現,無法滿足很多跨表操作的需求場景。
通常,您可以通過UDF(User Defined Function)描述代碼框架,但現有的UDF/UDTF/UDAF接口主要是針對在單個數據表上的操作而設計。一旦涉及多表的自定義操作,經常還需要依賴于內置Join +各種UDF/UDTF,并配合比較復雜的SQL語句來完成。在多表操作的場景上,您不得不放棄SQL而使用自定義MapReduce,才能完成所需的計算。
無論是Join +各種UDF/UDTF+復雜SQL還是自定義MapReduce門檻都比較高,同時還會帶來一些問題:
使用Join +各種UDF/UDTF+復雜SQL:多個復雜的Join和散布在SQL語言各處的代碼揉合在一起,將帶來多處的邏輯黑盒,不利于生成最優的執行計劃。
使用MapReduce:不僅更大程度上限制了系統進行執行優化的可能性,而且在深度優化本地運行代碼時,由于MapReduce絕大部分代碼由Java完成,在執行效率上會遠低于MaxCompute基于LLVM的代碼生成器。
使用限制
目前版本不支持使用UDF/UDAF/UDTF讀取以下場景的表數據:
做過表結構修改(Schema Evolution)的表數據。
包含復雜數據類型的表數據。
包含JSON數據類型的表數據。
Transactional表的表數據。
UDJ的性能
通過一個真實的線上MapReduce作業進行測試驗證UDJ的性能,該MapReduce作業實現一套比較復雜的算法。將兩個表合并在一起,用UDJ對該MapReduce進行改寫,并且驗證UDJ實現結果的正確性。在并發度相同的情況下,兩者性能對比如下。
由上圖可見,UDJ接口的引入,一方面讓您能更方便地描述對多表數據進行操作的復雜邏輯,一方面大幅提升了性能。代碼只在UDJ內被調用,其上下游的邏輯(例如該示例中的整個Mapper邏輯)則完全通過MaxCompute高效的Native運行完成。在Java代碼中,由于MaxCompute UDJ運行引擎和Java接口之間的數據交互邏輯有深度的優化,通過UDJ實現的Join邏輯也比其對等的Reducer更高效。
UDJ跨表Join功能
通過如下樣例為您詳細介紹MaxCompute UDJ跨表Join的使用方法。
假設存在兩個日志表,分別是payment和user_client_log。
payment:表中保存了用戶的支付記錄,每一筆支付記錄包含用戶ID、支付時間和支付內容。樣例數據如下。
user_id
time
pay_info
2656199
2018-02-13 22:30:00
gZhvdySOQb
8881237
2018-02-13 08:30:00
pYvotuLDIT
8881237
2018-02-13 10:32:00
KBuMzRpsko
user_client_log:保存了用戶的客戶端日志,每一條日志包含了用戶ID、日志時間和日志內容。樣例數據如下。
user_id
time
content
8881237
2018-02-13 00:30:00
click MpkvilgWSmhUuPn
8881237
2018-02-13 06:14:00
click OkTYNUHMqZzlDyL
8881237
2018-02-13 10:30:00
click OkTYNUHMqZzlDyL
對于每一條客戶端日志,找出該用戶在payment表里時間最接近的一條支付記錄,將其中的支付內容和日志內容合并輸出。達到如下效果。
user_id | time | content |
8881237 | 2018-02-13 00:30:00 | click MpkvilgWSmhUuPn, pay pYvotuLDIT |
8881237 | 2018-02-13 06:14:00 | click OkTYNUHMqZzlDyL, pay pYvotuLDIT |
8881237 | 2018-02-13 10:30:00 | click OkTYNUHMqZzlDyL, pay KBuMzRpsko |
面對此類需求通常有如下2種解決方案:
使用內置Join。SQL偽代碼如下。
SELECT p.user_id, p.time, merge(p.pay_info, u.content) FROM payment p RIGHT OUTER JOIN user_client_log u ON p.user_id = u.user_id and abs(p.time - u.time) = min(abs(p.time - u.time))
關聯時需要知道相同user_id下的p.time與u.time差異最小的值,且聚合函數不能出現在關聯條件上。因此,這個看似簡單的需求,無法通過標準的關聯操作實現。
使用UDJ方法實現。
注冊UDJ函數。
配置新版本的SDK。
<dependency> <groupId>com.aliyun.odps</groupId> <artifactId>odps-sdk-udf</artifactId> <version>0.29.10-public</version> <scope>provided</scope> </dependency>
編寫UDJ代碼,并將代碼打包為odps-udj-example.jar。
package com.aliyun.odps.udf.example.udj; import com.aliyun.odps.Column; import com.aliyun.odps.OdpsType; import com.aliyun.odps.Yieldable; import com.aliyun.odps.data.ArrayRecord; import com.aliyun.odps.data.Record; import com.aliyun.odps.udf.DataAttributes; import com.aliyun.odps.udf.ExecutionContext; import com.aliyun.odps.udf.UDJ; import com.aliyun.odps.udf.annotation.Resolve; import java.util.ArrayList; import java.util.Iterator; /** 對于右表的每個記錄,找到最近的左表記錄 * 合并兩條記錄。 */ @Resolve("->string,bigint,string") public class PayUserLogMergeJoin extends UDJ { private Record outputRecord; /** 將在數據處理階段之前調用。 用戶可以實施這個方法做初始化工作。 */ @Override public void setup(ExecutionContext executionContext, DataAttributes dataAttributes) { // outputRecord = new ArrayRecord(new Column[]{ new Column("user_id", OdpsType.STRING), new Column("time", OdpsType.BIGINT), new Column("content", OdpsType.STRING) }); } /** 重寫此方法以實現連接邏輯。 * @param key 當前連接鍵。 * @param 左表對應當前鍵的記錄組。 * @param right對應當前鍵的右表記錄組。 * @param output用于輸出UDJ的結果。 */ @Override public void join(Record key, Iterator<Record> left, Iterator<Record> right, Yieldable<Record> output) { outputRecord.setString(0, key.getString(0)); if (!right.hasNext()) { // 空右組,什么都不做。 return; } else if (!left.hasNext()) { // 空左組。 輸出右側組的所有記錄而不合并。 while (right.hasNext()) { Record logRecord = right.next(); outputRecord.setBigint(1, logRecord.getDatetime(0).getTime()); outputRecord.setString(2, logRecord.getString(1)); output.yield(outputRecord); } return; } ArrayList<Record> pays = new ArrayList<>(); // 左側記錄組將從開始到結束迭代。 // 對于右組的每個記錄,迭代器無法重置。 // 所以我們將左邊的每個記錄保存到ArrayList。 left.forEachRemaining(pay -> pays.add(pay.clone())); while (right.hasNext()) { Record log = right.next(); long logTime = log.getDatetime(0).getTime(); long minDelta = Long.MAX_VALUE; Record nearestPay = null; // 迭代左邊的所有記錄,找到有的記錄時間上的微小差異。 for (Record pay: pays) { long delta = Math.abs(logTime - pay.getDatetime(0).getTime()); if (delta < minDelta) { minDelta = delta; nearestPay = pay; } } // 將日志記錄與最近的支付記錄合并,并輸出到結果。 outputRecord.setBigint(1, log.getDatetime(0).getTime()); outputRecord.setString(2, mergeLog(nearestPay.getString(1), log.getString(1))); output.yield(outputRecord); } } String mergeLog(String payInfo, String logContent) { return logContent + ", pay " + payInfo; } @Override public void close() { } }
在MaxCompute中添加Jar包資源。
add jar odps-udj-example.jar;
在MaxCompute中注冊UDF函數pay_user_log_merge_join。
create function pay_user_log_merge_join as 'com.aliyun.odps.udf.example.udj.PayUserLogMergeJoin' using 'odps-udj-example.jar';
準備示例數據。
創建示例表payment和user_client_log。
create table payment(user_id string,time datetime,pay_info string); create table user_client_log(user_id string,time datetime,content string);
為示例表中插入數據。
--向payment表中插入數據。 INSERT OVERWRITE TABLE payment VALUES ('1335656', datetime '2018-02-13 19:54:00', 'PEqMSHyktn'), ('2656199', datetime '2018-02-13 12:21:00', 'pYvotuLDIT'), ('2656199', datetime '2018-02-13 20:50:00', 'PEqMSHyktn'), ('2656199', datetime '2018-02-13 22:30:00', 'gZhvdySOQb'), ('8881237', datetime '2018-02-13 08:30:00', 'pYvotuLDIT'), ('8881237', datetime '2018-02-13 10:32:00', 'KBuMzRpsko'), ('9890100', datetime '2018-02-13 16:01:00', 'gZhvdySOQb'), ('9890100', datetime '2018-02-13 16:26:00', 'MxONdLckwa') ; --向user_client_log表中插入數據。 INSERT OVERWRITE TABLE user_client_log VALUES ('1000235', datetime '2018-02-13 00:25:36', 'click FNOXAibRjkIaQPB'), ('1000235', datetime '2018-02-13 22:30:00', 'click GczrYaxvkiPultZ'), ('1335656', datetime '2018-02-13 18:30:00', 'click MxONdLckpAFUHRS'), ('1335656', datetime '2018-02-13 19:54:00', 'click mKRPGOciFDyzTgM'), ('2656199', datetime '2018-02-13 08:30:00', 'click CZwafHsbJOPNitL'), ('2656199', datetime '2018-02-13 09:14:00', 'click nYHJqIpjevkKToy'), ('2656199', datetime '2018-02-13 21:05:00', 'click gbAfPCwrGXvEjpI'), ('2656199', datetime '2018-02-13 21:08:00', 'click dhpZyWMuGjBOTJP'), ('2656199', datetime '2018-02-13 22:29:00', 'click bAsxnUdDhvfqaBr'), ('2656199', datetime '2018-02-13 22:30:00', 'click XIhZdLaOocQRmrY'), ('4356142', datetime '2018-02-13 18:30:00', 'click DYqShmGbIoWKier'), ('4356142', datetime '2018-02-13 19:54:00', 'click DYqShmGbIoWKier'), ('8881237', datetime '2018-02-13 00:30:00', 'click MpkvilgWSmhUuPn'), ('8881237', datetime '2018-02-13 06:14:00', 'click OkTYNUHMqZzlDyL'), ('8881237', datetime '2018-02-13 10:30:00', 'click OkTYNUHMqZzlDyL'), ('9890100', datetime '2018-02-13 16:01:00', 'click vOTQfBFjcgXisYU'), ('9890100', datetime '2018-02-13 16:20:00', 'click WxaLgOCcVEvhiFJ') ;
在SQL中使用UDJ函數。
SELECT r.user_id, from_unixtime(time/1000) as time, content FROM ( SELECT user_id, time as time, pay_info FROM payment ) p JOIN ( SELECT user_id, time as time, content FROM user_client_log ) u ON p.user_id = u.user_id USING pay_user_log_merge_join(p.time, p.pay_info, u.time, u.content) r AS (user_id, time, content);
USING子句中的參數說明:
pay_user_log_merge_join
是注冊的UDJ在SQL中的函數名。(p.time, p.pay_info, u.time, u.content)
是UDJ中用到的左右表的列。r
是UDJ結果的別名,方便其他地方引用UDJ的結果。(user_id, time, content)
是UDJ產生的結果的列名。
運行結果如下。
+---------+------------+---------+ | user_id | time | content | +---------+------------+---------+ | 1000235 | 2018-02-13 00:25:36 | click FNOXAibRjkIaQPB | | 1000235 | 2018-02-13 22:30:00 | click GczrYaxvkiPultZ | | 1335656 | 2018-02-13 18:30:00 | click MxONdLckpAFUHRS, pay PEqMSHyktn | | 1335656 | 2018-02-13 19:54:00 | click mKRPGOciFDyzTgM, pay PEqMSHyktn | | 2656199 | 2018-02-13 08:30:00 | click CZwafHsbJOPNitL, pay pYvotuLDIT | | 2656199 | 2018-02-13 09:14:00 | click nYHJqIpjevkKToy, pay pYvotuLDIT | | 2656199 | 2018-02-13 21:05:00 | click gbAfPCwrGXvEjpI, pay PEqMSHyktn | | 2656199 | 2018-02-13 21:08:00 | click dhpZyWMuGjBOTJP, pay PEqMSHyktn | | 2656199 | 2018-02-13 22:29:00 | click bAsxnUdDhvfqaBr, pay gZhvdySOQb | | 2656199 | 2018-02-13 22:30:00 | click XIhZdLaOocQRmrY, pay gZhvdySOQb | | 4356142 | 2018-02-13 18:30:00 | click DYqShmGbIoWKier | | 4356142 | 2018-02-13 19:54:00 | click DYqShmGbIoWKier | | 8881237 | 2018-02-13 00:30:00 | click MpkvilgWSmhUuPn, pay pYvotuLDIT | | 8881237 | 2018-02-13 06:14:00 | click OkTYNUHMqZzlDyL, pay pYvotuLDIT | | 8881237 | 2018-02-13 10:30:00 | click OkTYNUHMqZzlDyL, pay KBuMzRpsko | | 9890100 | 2018-02-13 16:01:00 | click vOTQfBFjcgXisYU, pay gZhvdySOQb | | 9890100 | 2018-02-13 16:20:00 | click WxaLgOCcVEvhiFJ, pay MxONdLckwa | +---------+------------+---------+
UDJ預排序功能
為了找到payment值相差最小的一條記錄,需要反復對payment表的數據進行iterator遍歷。所以事先將相同user_id的payment記錄全部加載到了ArrayList,該方法適用于同一個用戶一天之內的支付行為較少的場景。在其他場景中,有時同組內的數據量可能非常大,以至于無法在內存中存放,此時,您可以通過SORT BY預排序解決。
當某個用戶的支付數據量非常大,導致無法將payment放在內存中時,如果組內所有數據已經按照時間排序,則只需要比較兩邊iterator最頂部的數據,即可實現該功能。
該方式主要是使用SORT BY子句對UDJ的數據進行預排序。在這個過程中,最多只需要同時緩存3條記錄,就可以實現和之前算法相同的功能。Java UDJ代碼如下。
@Override
public void join(Record key, Iterator<Record> left, Iterator<Record> right, Yieldable<Record> output) {
outputRecord.setString(0, key.getString(0));
if (!right.hasNext()) {
return;
} else if (!left.hasNext()) {
while (right.hasNext()) {
Record logRecord = right.next();
outputRecord.setBigint(1, logRecord.getDatetime(0).getTime());
outputRecord.setString(2, logRecord.getString(1));
output.yield(outputRecord);
}
return;
}
long prevDelta = Long.MAX_VALUE;
Record logRecord = right.next();
Record payRecord = left.next();
Record lastPayRecord = payRecord.clone();
while (true) {
long delta = logRecord.getDatetime(0).getTime() - payRecord.getDatetime(0).getTime();
if (left.hasNext() && delta > 0) {
//兩個記錄之間的時間差正在減少,我們仍然可以操作。
//探索左側組以嘗試獲得更小的增量。
lastPayRecord = payRecord.clone();
prevDelta = delta;
payRecord = left.next();
} else {
//到達最小delta點。 檢查最后的記錄,
//輸出合并結果并準備處理下一條記錄。
//右組。
Record nearestPay = Math.abs(delta) < prevDelta ? payRecord : lastPayRecord;
outputRecord.setBigint(1, logRecord.getDatetime(0).getTime());
String mergedString = mergeLog(nearestPay.getString(1), logRecord.getString(1));
outputRecord.setString(2, mergedString);
output.yield(outputRecord);
if (right.hasNext()) {
logRecord = right.next();
prevDelta = Math.abs(
logRecord.getDatetime(0).getTime() - lastPayRecord.getDatetime(0).getTime()
);
} else {
break;
}
}
}
}