本文為您介紹實時計算Flink版的SQL常見問題,包括作業常見問題、開發報錯、運維報錯。
運行拓撲圖中顯示的Low Watermark、Watermark以及Task InputWatermark指標顯示的時間和當前時間有時差?
開發報錯
運維報錯
為什么使用POJO類作為UDTF返回類型時字段會出現“錯位”?
問題描述
當使用POJO類作為UDTF返回類型,并在SQL中顯式聲明了UDTF返回列的別名列表(Alias Name)時,可能會出現字段錯位(即使類型一致,但實際使用的字段可能與預期不符)問題。
例如,如果使用如下POJO類作為UDTF的返回類型,并根據自定義函數開發的要求進行打包并完成函數注冊(這里使用作業級自定義函數注冊方式)后,SQL校驗會失敗。
package com.aliyun.example; public class TestPojoWithoutConstructor { public int c; public String d; public boolean a; public String b; }
package com.aliyun.example; import org.apache.flink.table.functions.TableFunction; public class MyTableFuncPojoWithoutConstructor extends TableFunction<TestPojoWithoutConstructor> { private static final long serialVersionUID = 1L; public void eval(String str1, Integer i2) { TestPojoWithoutConstructor p = new TestPojoWithoutConstructor(); p.d = str1 + "_d"; p.c = i2 + 2; p.b = str1 + "_b"; collect(p); } }
CREATE TEMPORARY FUNCTION MyTableFuncPojoWithoutConstructor as 'com.aliyun.example.MyTableFuncPojoWithoutConstructor'; CREATE TEMPORARY TABLE src ( id STRING, cnt INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE sink ( f1 INT, f2 STRING, f3 BOOLEAN, f4 STRING ) WITH ( 'connector' = 'print' ); INSERT INTO sink SELECT T.* FROM src, LATERAL TABLE(MyTableFuncPojoWithoutConstructor(id, cnt)) AS T(c, d, a, b);
SQL校驗報錯信息如下:
org.apache.flink.table.api.ValidationException: SQL validation failed. Column types of query result and sink for 'vvp.default.sink' do not match. Cause: Sink column 'f1' at position 0 is of type INT but expression in the query is of type BOOLEAN NOT NULL. Hint: You will need to rewrite or cast the expression. Query schema: [c: BOOLEAN NOT NULL, d: STRING, a: INT NOT NULL, b: STRING] Sink schema: [f1: INT, f2: STRING, f3: BOOLEAN, f4: STRING] at org.apache.flink.table.sqlserver.utils.FormatValidatorExceptionUtils.newValidationException(FormatValidatorExceptionUtils.java:41)
看起來從UDTF返回的字段和POJO類中的字段可能錯位了,SQL中字段c最終是BOOLEAN,而字段a是INT類型,和POJO類的定義恰好相反。
問題原因
根據POJO類的類型規則:
如果POJO類實現了有參構造函數,推導的返回類型會按構造函數的參數列表順序。
如果POJO類缺少有參構造函數,就會按字段名的字典序重排列。
在上述示例中,由于UDTF返回類型缺少有參構造函數,因此對應的返回類型為
BOOLEAN a, VARCHAR(2147483647) b, INTEGER c, VARCHAR(2147483647) d)
。雖然這一步并沒有產生錯誤,但因為SQL中對返回字段加了重命名列表LATERAL TABLE(MyTableFuncPojoWithoutConstructor(id, cnt)) AS T(c, d, a, b)
,這導致對推導出的類型顯式進行了重命名(基于字段位置進行映射),進而引發與POJO類中的字段錯位問題,出現校驗異常或非預期的數據錯位問題。解決方案
POJO類缺少有參構造函數時,去掉對UDTF返回字段的顯式重命名,如將上述SQL的INSERT語句改為:
-- POJO類無有參構造函數時,推薦顯式選擇需要的字段名,使用 T.* 時需要明確知曉實際返回的字段順序。 SELECT T.c, T.d, T.a, T.b FROM src, LATERAL TABLE(MyTableFuncPojoWithoutConstructor(id, cnt)) AS T;
POJO類實現有參構造函數,以確定返回類型的字段順序。這種情況下UDTF返回類型的字段順序就是有參構造函數的參數順序。
package com.aliyun.example; public class TestPojoWithConstructor { public int c; public String d; public boolean a; public String b; // Using specific fields order instead of alphabetical order public TestPojoWithConstructor(int c, String d, boolean a, String b) { this.c = c; this.d = d; this.a = a; this.b = b; } }
為什么數據在LocalGroupAggregate節點中長時間卡住,無輸出?
問題描述
如果作業未設置
table.exec.mini-batch.size
或設置table.exec.mini-batch.size
為負,并且作業還包含WindowAggregate和GroupAggregate,且WindowAggregate的時間列為事件時間(proctime),在這種情況下啟動作業,在作業拓撲圖中會看到包含LocalGroupAggregate節點,但缺失MiniBatchAssigner節點。作業中同時包含WindowAggregate和GroupAggregate,且WindowAggregate的時間列為事件時間(proctime)的代碼示例如下。
CREATE TEMPORARY TABLE s1 ( a INT, b INT, ts as PROCTIME(), PRIMARY KEY (a) NOT ENFORCED ) WITH ( 'connector'='datagen', 'rows-per-second'='1', 'fields.b.kind'='random', 'fields.b.min'='0', 'fields.b.max'='10' ); CREATE TEMPORARY TABLE sink ( a BIGINT, b BIGINT ) WITH ( 'connector'='print' ); CREATE TEMPORARY VIEW window_view AS SELECT window_start, window_end, a, sum(b) as b_sum FROM TABLE(TUMBLE(TABLE s1, DESCRIPTOR(ts), INTERVAL '2' SECONDS)) GROUP BY window_start, window_end, a; INSERT INTO sink SELECT count(distinct a), b_sum FROM window_view GROUP BY b_sum;
問題原因
在table.exec.mini-batch.size未配置或者為負的情況下,MiniBatch處理模式會使用Managed Memory緩存數據,但是此時會錯誤地無法生成MiniBatchAssigner節點,因此計算節點無法收到MinibatchAssigner節點發送的Watermark消息后觸發計算和輸出,只能在三種條件(Managed Memory已滿、進行Checkpoint前和作業停止時)之一下觸發計算和輸出,詳情請參見table.exec.mini-batch.size。如果此時Checkpoint間隔設置過大,就會導致數據積攢在LocalGroupAggregate節點中,長時間無法輸出。
解決方案
調小Checkpoint間隔,讓LocalGroupAggregate節點在執行Checkpoint前自動觸發輸出。
通過Heap Memory來緩存數據,讓LocalGroupAggregate節點內緩存數據達到N條時自動觸發輸出。即在
頁面的部署詳情頁運行參數配置區域其他配置中,設置table.exec.mini-batch.size參數為正值N。
運行拓撲圖中顯示的Low Watermark、Watermark以及Task InputWatermark指標顯示的時間和當前時間有時差?
原因1:聲明源表Watermark時使用了TIMESTAMP_LTZ(TIMESTAMP(p) WITH LOCAL TIME ZONE)類型,導致Watermark和當前時間有時差。
下文以具體的示例為您展示使用TIMESTAMP_LTZ類型和TIMESTAMP類型對應的Watermark指標差異。
源表中Watermark聲明使用的字段是TIMESTAMP_LTZ類型。
CREATE TEMPORARY TABLE s1 ( a INT, b INT, ts as CURRENT_TIMESTAMP,--使用CURRENT_TIMESTAMP內置函數生成TIMESTAMP_LTZ類型。 WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector'='datagen', 'rows-per-second'='1', 'fields.b.kind'='random','fields.b.min'='0','fields.b.max'='10' ); CREATE TEMPORARY TABLE t1 ( k INT, ts_ltz timestamp_ltz(3), cnt BIGINT ) WITH ('connector' = 'print'); -- 輸出計算結果。 INSERT INTO t1 SELECT b, window_start, COUNT(*) FROM TABLE( TUMBLE(TABLE s1, DESCRIPTOR(ts), INTERVAL '5' SECOND)) GROUP BY b, window_start, window_end;
說明Legacy Window對應的老語法和TVF Window(Table-Valued Function)產生的結果是一致的。以下為Legacy Window對應的老語法的示例代碼。
SELECT b, TUMBLE_END(ts, INTERVAL '5' SECOND), COUNT(*) FROM s1 GROUP BY TUMBLE(ts, INTERVAL '5' SECOND), b;
在Flink開發控制臺將作業部署上線運行后,以北京時間為例,可以觀察到作業運行拓撲圖及監控告警上顯示的Watermark和當前時間存在8小時時差。
Watermark&Low Watermark
Task InputWatermark
源表中Watermark聲明使用的字段是TIMESTAMP(TIMESTAMP(p) WITHOUT TIME ZONE)類型。
CREATE TEMPORARY TABLE s1 ( a INT, b INT, -- 模擬數據源中的TIMESTAMP無時區信息,從2024-01-31 01:00:00開始逐秒累加。 ts as TIMESTAMPADD(SECOND, a, TIMESTAMP '2024-01-31 01:00:00'), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector'='datagen', 'rows-per-second'='1', 'fields.a.kind'='sequence','fields.a.start'='0','fields.a.end'='100000', 'fields.b.kind'='random','fields.b.min'='0','fields.b.max'='10' ); CREATE TEMPORARY TABLE t1 ( k INT, ts_ltz timestamp_ltz(3), cnt BIGINT ) WITH ('connector' = 'print'); -- 輸出計算結果。 INSERT INTO t1 SELECT b, window_start, COUNT(*) FROM TABLE( TUMBLE(TABLE s1, DESCRIPTOR(ts), INTERVAL '5' SECOND)) GROUP BY b, window_start, window_end;
在Flink開發控制臺上將作業部署上線運行后,可以觀察到作業運行拓撲圖及監控告警上顯示的Watermark和當前時間是同步的(本示例是與模擬數據的時間同步的),不存在時差現象。
Watermark&Low Watermark
Task InputWatermark
原因2:Flink開發控制臺和Apache Flink UI的展示時間存在時區差異。
Flink開發控制臺UI界面是以UTC+0顯示時間,而Apache Flink UI是通過瀏覽器獲取本地時區并進行相應的時間轉換后的本地時間。以北京時間為例,為您展示二者顯示區別,您會觀察到在Flink開發控制臺顯示的時間比Apache Flink UI時間慢8小時。
Flink開發控制臺
Apache Flink UI
報錯:undefined
報錯詳情
報錯原因
您的JAR包較大。
解決方案
您可以在OSS管理控制臺上傳JAR包,詳情請參見如何在OSS控制臺上傳JAR包?
報錯:Object '****' not found
報錯詳情
單擊深度檢查后,出現報錯詳情如下。
報錯原因
在DDL和DML同在一個文本中提交運行時,DDL沒有聲明為CREATE TEMPORARY TABLE。
解決方案
在DDL和DML同在一個文本中提交運行時,DDL需要聲明為CREATE TEMPORARY TABLE,而不是聲明為CREATE TABLE。
報錯:Only a single 'INSERT INTO' is supported
報錯詳情
單擊深度檢查后,驗證報錯詳情如下。
報錯原因
多個DML語句沒有寫在關鍵語句
BEGIN STATEMENT SET;
和END;
之間。解決方案
將多個DML語句寫在
BEGIN STATEMENT SET;
和END;
之間。詳情請參見INSERT INTO語句。
報錯:The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled' , default: true (fallback keys: [])' to 'true'
報錯詳情
Caused by: org.apache.flink.table.api.ValidationException: The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled' , default: true (fallback keys: [])' to 'true' at com.alibaba.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.validatePrimaryKeyIfEnableParallel(MySqlTableSourceFactory.java:186) at com.alibaba.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.createDynamicTableSource(MySqlTableSourceFactory.java:85) at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:134) ... 30 more
報錯原因
實時計算引擎vvr-3.0.7-flink-1.12及以前的版本,CDC Source只能單并發運行。但在實時計算引擎vvr-4.0.8-flink-1.13版本后增加了按PK分片進行多并發讀取數據的功能并默認打開該功能(scan.incremental.snapshot.enabled默認設置為true),在該功能下必須要配置主鍵。
解決方案
如果您使用實時計算引擎vvr-4.0.8-flink-1.13及以后的版本,則可以根據需求來選擇解決方案:
如果您需要多并發讀取MySQL CDC的數據,則在DDL中必須配置主鍵(PK)。
如果您不需要多并發讀取MySQL CDC的數據,需要將scan.incremental.snapshot.enabled設置為false,參數配置詳情請參見WITH參數。
報錯:exceeded quota: resourcequota
報錯詳情
作業啟動過程中報錯。
報錯原因
當前項目空間資源不足導致作業啟動失敗。
解決方案
您需要對項目資源進行資源變配,詳情請參見資源調整。
報錯:Exceeded checkpoint tolerable failure threshold
報錯詳情
作業運行過程中報錯。
org.apache.flink.util.FlinkRuntimeException:Exceeded checkpoint tolerable failure threshold. at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
報錯原因
未設置任務允許Checkpoint失敗的次數,系統默認Checkpoint失敗一次就觸發一次Failover。
解決方案
在
頁面,單擊目標作業名稱。在部署詳情頁簽,單擊運行參數配置區域右側的編輯。
在其他配置文本框,輸入如下參數。
execution.checkpointing.tolerable-failed-checkpoints: num
您需要設置num值來調整任務允許Checkpoint失敗的次數。num需要為0或正整數。如果num為0時,則表示不允許存在任何Checkpoint異常或者失敗。
報錯:Flink version null is not configured for sql
報錯詳情
StatusRuntimeException: INTERNAL: Flink version null is not configured for sql.
報錯原因
系統升級至VVR 4.0.8,導致作業的Flink計算引擎版本信息沒有了。
解決方案
在作業開發頁面右側更多配置頁簽中,配置正確的Flink計算引擎版本。
說明如果您需要使用調試功能,則還需要檢查Session集群頁面的引擎版本是否選擇正確。
INFO:org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss
報錯詳情
報錯原因
OSS每次創建新目錄時,會先檢查是否存在該目錄,如果不存在,就會報這個INFO信息,但該INFO信息不影響Flink作業運行。
解決方案
在日志模板中添加
<Logger level="ERROR" name="org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss"/>
。詳情請參見配置作業日志輸出。
報錯:DateTimeParseException: Text 'xxx' could not be parsed
報錯詳情
作業運行過程中,會出現報錯
DateTimeParseException: Text 'xxx' could not be parsed
。報錯原因
VVR 4.0.13以下版本,您在DDL中聲明的日期格式和實際數據的格式不一致,Flink系統會直接報錯。
解決方案
VVR 4.0.13及以上版本對JSON格式(Json、Canal Json、Debezium Json、Maxwell Json和Ogg Json)中TIMESTAMP類型的數據解析進行了增強,提升數據解析的能力。數據解析增強詳情如下:
支持聲明的TIMESTAMP類型解析DATE格式數據。
支持聲明的TIMESTAMP_LTZ類型解析DATE或TIMESTAMP格式數據。
Flink系統根據您設置的table.local-time-zone的時區信息來轉換TIMESTAMP數據至TIMESTAMP_LTZ。例如,在DDL中聲明如下信息。
CREATE TABLE source ( date_field TIMESTAMP, timestamp_field TIMESTAMP_LTZ(3) ) WITH ( 'format' = 'json', ... );
當解析數據 {"date_field": "2020-09-12", "timestamp_field": "2020-09-12T12:00:00"} ,且在當前時區為東八區的情況下,會得到如下結果:"+I(2020-09-12T00:00:00, 2020-09-12T04:00:00.000Z)"。
支持自動解析TIMESTAMP或TIMESTAMP_LTZ格式。
增強前,JSON Format在解析TIMESTAMP數據時,需要您正確設置timestamp-format.standard為SQL或ISO-8601,數據才能被正確解析。增強后,Flink系統會自動推導TIMESTAMP的格式并解析,如果無法正確解析,則會報告錯誤。您手動設置的timestamp-format.standard的值會作為提示供解析器使用。
報錯:DELETE command denied to user 'userName'@'*.*.*.*' for table 'table_name'
報錯詳情
Cause by:java.sql.SQLSyntaxErrorException:DELETE command denied to user 'userName'@'*.*.*.*' for table 'table_name' at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120) ...
報錯原因
MySQL的CDC流結合where條件過濾使用時,update類型的數據會發送update_before和update_after兩條數據到下游,update_before數據到下游會被識別為DELETE操作,需要用戶具有DELETE權限。
解決方案
檢查SQL邏輯是否存在retract相關操作,如果存在相關操作,給結果表的操作用戶賦予DELETE權限。
報錯:java.io.EOFException: SSL peer shut down incorrectly
報錯詳情
Caused by: java.io.EOFException: SSL peer shut down incorrectly at sun.security.ssl.SSLSocketInputRecord.decodeInputRecord(SSLSocketInputRecord.java:239) ~[?:1.8.0_302] at sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:190) ~[?:1.8.0_302] at sun.security.ssl.SSLTransport.decode(SSLTransport.java:109) ~[?:1.8.0_302] at sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1392) ~[?:1.8.0_302] at sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1300) ~[?:1.8.0_302] at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:435) ~[?:1.8.0_302] at com.mysql.cj.protocol.ExportControlled.performTlsHandshake(ExportControlled.java:347) ~[?:?] at com.mysql.cj.protocol.StandardSocketFactory.performTlsHandshake(StandardSocketFactory.java:194) ~[?:?] at com.mysql.cj.protocol.a.NativeSocketConnection.performTlsHandshake(NativeSocketConnection.java:101) ~[?:?] at com.mysql.cj.protocol.a.NativeProtocol.negotiateSSLConnection(NativeProtocol.java:308) ~[?:?] at com.mysql.cj.protocol.a.NativeAuthenticationProvider.connect(NativeAuthenticationProvider.java:204) ~[?:?] at com.mysql.cj.protocol.a.NativeProtocol.connect(NativeProtocol.java:1369) ~[?:?] at com.mysql.cj.NativeSession.connect(NativeSession.java:133) ~[?:?] at com.mysql.cj.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:949) ~[?:?] at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:819) ~[?:?] at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:449) ~[?:?] at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:242) ~[?:?] at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:198) ~[?:?] at org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider.getOrEstablishConnection(SimpleJdbcConnectionProvider.java:128) ~[?:?] at org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.open(AbstractJdbcOutputFormat.java:54) ~[?:?] ... 14 more
報錯原因
在MySQL driver版本為8.0.27時,MySQL數據庫開啟了SSL協議,但默認訪問方式不通過SSL連接數據庫,導致報錯。
解決方案
建議WITH參數中connector設置為rds,且MySQL維表URL參數中追加
characterEncoding=utf-8&useSSL=false
,例如:'url'='jdbc:mysql://***.***.***.***:3306/test?characterEncoding=utf-8&useSSL=false'
報錯:binlog probably contains events generated with statement or mixed based replication format
報錯詳情
Caused by: io.debezium.DebeziumException: Received DML 'insert into table_name (...) values (...)' for processing, binlog probably contains events generated with statement or mixed based replication format
報錯原因
MySQL CDC Binlog不可以為mixed格式,只能為ROW格式。
解決方案
在MySQL產品側,通過
show variables like "binlog_format"
命令查看當前模式的Binlog格式。說明您可以使用
show global variables like "binlog_format"
查看全局模式的Binlog格式。在MySQL產品側,將Binlog格式設置為ROW格式。
重啟作業生效。
報錯:java.lang.ClassCastException: org.codehaus.janino.CompilerFactory cannot be cast to org.codehaus.commons.compiler.ICompilerFactory
報錯詳情
Causedby:java.lang.ClassCastException:org.codehaus.janino.CompilerFactorycannotbecasttoorg.codehaus.commons.compiler.ICompilerFactory atorg.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129) atorg.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory(CompilerFactoryFactory.java:79) atorg.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:426) ...66more
報錯原因
JAR包中引入了會發生沖突的janino依賴。
UDF JAR或連接器JAR中,誤打入Flink中的某些依賴(例如flink-table-planner和flink-table-runtime)。
解決方案
分析JAR包里面是否含有org.codehaus.janino.CompilerFactory。因為在不同機器上的Class加載順序不一樣,所以有時候出現類沖突。該問題的解決步驟如下:
在
頁面,單擊目標作業名稱。在部署詳情頁簽,單擊運行參數配置區域右側的編輯。
在其他配置文本框,輸入如下參數。
classloader.parent-first-patterns.additional: org.codehaus.janino
其中,參數的value值需要替換為沖突的類。