本文提供了一個自定義聚合函數(UDAF),實現將多行數據合并為一行并按照指定列進行排序,并以居民用電戶電網終端數據為例,介紹如何在實時計算控制臺使用該函數進行數據聚合和排序。
示例數據
居民用電戶電網終端數據表electric_info,包括事件標識event_id,用戶標識user_id,事件時間event_time,用戶終端狀態status。需要將用戶的終端狀態按照事件時間升序排列。
electric_info
event_id
user_id
event_time
status
1
1222
2023-06-30 11:14:00
LD
2
1333
2023-06-30 11:12:00
LD
3
1222
2023-06-30 11:11:00
TD
4
1333
2023-06-30 11:12:00
LD
5
1222
2023-06-30 11:15:00
TD
6
1333
2023-06-30 11:18:00
LD
7
1222
2023-06-30 11:19:00
TD
8
1333
2023-06-30 11:10:00
TD
9
1555
2023-06-30 11:16:00
TD
10
1555
2023-06-30 11:17:00
LD
預期結果
user_id
status
1222
TD,LD,TD,TD
1333
TD,LD,LD,LD
1555
TD,LD
步驟一:準備數據源
本文以云數據庫RDS為例。
- 說明
RDS MySQL版實例需要與Flink工作空間處于同一VPC。不在同一VPC下時請參見網絡連通性。
創建名稱為electric的數據庫,并創建高權限賬號或具有數據庫electric讀寫權限的普通賬號。
通過DMS登錄RDS MySQL,在electric數據庫中創建表electric_info和electric_info_SortListAgg,并插入數據。
CREATE TABLE `electric_info` ( event_id bigint NOT NULL PRIMARY KEY COMMENT '事件id', user_id bigint NOT NULL COMMENT '用戶標識', event_time timestamp NOT NULL COMMENT '事件時間', status varchar(10) NOT NULL COMMENT '用戶終端狀態' ); CREATE TABLE `electric_info_SortListAgg` ( user_id bigint NOT NULL PRIMARY KEY COMMENT '用戶標識', status_sort varchar(50) NULL COMMENT '用戶終端狀態按事件時間升序' ); -- 準備數據 INSERT INTO electric_info VALUES (1,1222,'2023-06-30 11:14','LD'), (2,1333,'2023-06-30 11:12','LD'), (3,1222,'2023-06-30 11:11','TD'), (4,1333,'2023-06-30 11:12','LD'), (5,1222,'2023-06-30 11:15','TD'), (6,1333,'2023-06-30 11:18','LD'), (7,1222,'2023-06-30 11:19','TD'), (8,1333,'2023-06-30 11:10','TD'), (9,1555,'2023-06-30 11:16','TD'), (10,1555,'2023-06-30 11:17','LD');
步驟二:注冊UDF
pom.xml文件已配置了Flink 1.17.1版該自定義函數需要的最小化依賴信息。關于使用自定義函數的更多信息,詳情請參見自定義函數。
本示例中ASI_UDAF實現了多行數據合并一行并按照指定列進行排序,詳情如下。后續您可以根據實際業務情況進行修改。
package ASI_UDAF; import org.apache.commons.lang3.StringUtils; import org.apache.flink.table.functions.AggregateFunction; import java.util.ArrayList; import java.util.Comparator; import java.util.Iterator; import java.util.List; public class ASI_UDAF{ /**Accumulator class*/ public static class AcList { public List<String> list; } /**Aggregate function class*/ public static class SortListAgg extends AggregateFunction<String,AcList> { public String getValue(AcList asc) { /**Sort the data in the list according to a specific rule*/ asc.list.sort(new Comparator<String>() { @Override public int compare(String o1, String o2) { return Integer.parseInt(o1.split("#")[1]) - Integer.parseInt(o2.split("#")[1]); } }); /**Traverse the sorted list, extract the required fields, and join them into a string*/ List<String> ret = new ArrayList<String>(); Iterator<String> strlist = asc.list.iterator(); while (strlist.hasNext()) { ret.add(strlist.next().split("#")[0]); } String str = StringUtils.join(ret, ','); return str; } /**Method to create an accumulator*/ public AcList createAccumulator() { AcList ac = new AcList(); List<String> list = new ArrayList<String>(); ac.list = list; return ac; } /**Accumulation method: add the input data to the accumulator*/ public void accumulate(AcList acc, String tuple1) { acc.list.add(tuple1); } /**Retraction method*/ public void retract(AcList acc, String num) { } } }
進入注冊UDF頁面。
注冊UDF方式的優點是便于后續開發進行代碼復用。對于Java類型的UDF,您也可以通過依賴文件項進行上傳,詳情請參見自定義聚合函數(UDAF)。
登錄實時計算控制臺。
單擊目標工作空間操作列下的控制臺。
單擊
。單擊左側的函數頁簽,單擊注冊UDF。
在選擇文件位置上傳步驟1中的JAR文件,單擊確定。
說明當您開通Flink工作空間時綁定了OSS Bucket,則您的UDF JAR文件會被上傳到該OSS Bucket的sql-artifacts目錄下;當您開通Flink工作空間時選擇了全托管存儲時,則您的UDF JAR文件會被存儲在資源管理頁面的資源文件中。
此外,Flink開發控制臺會解析您UDF JAR文件中是否使用了Flink UDF、UDAF和UDTF接口的類,并自動提取類名,填充到Function Name字段中。
在管理函數對話框,單擊創建函數。
在SQL編輯器頁面左側函數列表,您可以看到已注冊成功的UDF。
步驟三:創建Flink作業
在
頁面,單擊新建。單擊空白的流作業草稿。
單擊下一步。
在新建作業草稿對話框,填寫作業配置信息。
作業參數
說明
文件名稱
作業的名稱。
說明作業名稱在當前項目中必須保持唯一。
存儲位置
指定該作業的存儲位置。
您還可以在現有文件夾右側,單擊圖標,新建子文件夾。
引擎版本
當前作業使用的Flink的引擎版本。需要與pom中的version一致。
引擎版本號含義、版本對應關系和生命周期重要時間點詳情請參見引擎版本介紹。
編寫DDL和DML代碼。
--創建臨時表electric_info CREATE TEMPORARY TABLE electric_info ( event_id bigint not null, `user_id` bigint not null, event_time timestamp(6) not null, status string not null, primary key(event_id) not enforced ) WITH ( 'connector' = 'mysql', 'hostname' = 'rm-bp1s1xgll21******.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'your_username', 'password' = '${secret_values.mysql_pw}', 'database-name' = 'electric', 'table-name' = 'electric_info' ); CREATE TEMPORARY TABLE electric_info_sortlistagg ( `user_id` bigint not null, status_sort varchar(50) not null, primary key(user_id) not enforced ) WITH ( 'connector' = 'mysql', 'hostname' = 'rm-bp1s1xgll21******.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'your_username', 'password' = '${secret_values.mysql_pw}', 'database-name' = 'electric', 'table-name' = 'electric_info_sortlistagg' ); --將electric_info表中的數據聚合并插入到electric_info_sortlistagg表中 --將status和event_time拼接成的字符串作為參數傳遞給已注冊的自定義函數ASI_UDAF$SortListAgg INSERT INTO electric_info_sortlistagg SELECT `user_id`, `ASI_UDAF$SortListAgg`(CONCAT(status,'#',CAST(UNIX_TIMESTAMP(event_time) as STRING))) FROM electric_info GROUP BY user_id;
參數說明如下,您可以根據實際情況進行修改。MySQL連接器更多參數詳情請參見MySQL。
參數
說明
備注
connector
連接器類型。
本示例固定值為
mysql
。hostname
MySQL數據庫的IP地址或者Hostname。
本文填寫為RDS MySQL實例的內網地址。
username
MySQL數據庫服務的用戶名。
無。
password
MySQL數據庫服務的密碼。
本示例通過使用名為mysql_pw密鑰的方式填寫密碼值,避免信息泄露,詳情請參見變量管理。
database-name
MySQL數據庫名稱。
本示例填寫為步驟一:準備數據源中創建的數據庫electric。
table-name
MySQL表名。
本示例填寫為electric或electric_info_sortlistagg。
port
MySQL數據庫服務的端口號。
無。
(可選)單擊右上方的深度檢查和調試,功能詳情請參見SQL作業開發。
單擊部署,單擊確定。
在
頁面,單擊目標作業名稱操作列下的啟動,選擇無狀態啟動。
步驟四:查詢結果
在RDS中使用如下語句查看用戶的終端狀態按照事件時間升序排列結果。
SELECT * FROM `electric_info_sortlistagg`;
結果如下: