本文匯總了DataFlow集群使用時的常見問題。
集群使用與運維:
作業問題:
報錯Multiple factories for identifier '...' that implement '...' found in the classpath,該如何處理?
報錯java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id timed out,該如何處理?
報錯java.lang.OutOfMemoryError: GC overhead limit exceeded,該如何處理?
報錯Exception in thread "main" java.lang.NoSuchFieldError: DEPLOYMENT_MODE,該如何處理?
集群中的日志在哪里?如何查看?
根據以下兩種情況查看日志:
如果Flink集群的JobManager已退出,可以在集群機器中通過命令yarn logs -applicationId application_xxxx_yy拉取到本地進行查看,也可以在YARN的Web UI中訪問結束的作業的日志鏈接在網頁端進行查看。
如果Flink集群的JobManager仍在運行。
可以通過訪問對應的Flink Web UI進行查看。
使用命令行命令,通過yarn logs -applicationId application_xxxx_yy -am ALL -logFiles jobmanager.log查看JobManager日志,通過yarn logs -applicationId application_xxxx_yy -containerId container_xxxx_yy_aa_bb -logFiles taskmanager.log來查看TaskManager日志。
作業JAR包和集群內Flink的JAR包存在沖突
該問題發生時,您一般可以在作業日志中看到類似報錯NoSuchFieldError/NoSuchMethodError/ClassNotFoundException
等。您可以通過以下步驟排查和解決:
定位引起沖突的依賴類。根據報錯中的異常類,您可以找到該類所在的依賴JAR,然后在作業JAR的pom.xml所在目錄運行mvn dependency:tree查看依賴樹,判斷該類是如何被引入的。
排除引起沖突的依賴類。
如果是在pom.xml中錯誤設置了JAR包的Scope,則可以修改Scope為Provided來將對應JAR包排除。
如果確實需要使用該異常類所在的JAR,則可通過添加Exclude來排除特定類。
如果確實需要使用該異常類,無法更換為集群內對應版本的類,可以通過Maven Shade Plugin對該類進行Shade。
此外,如果Classpath中存在多個版本的JAR包,作業實際使用的Class版本和類的加載順序有關,為了確認某個類具體是從哪個JAR加載而來,可以在flink-conf.yaml中設置JVM參數env.java.opts: -verbose:class或者通過指定動態參數-Denv.java.opts="-verbose:class"來打印加載的類及其來源。
說明對于JobManager或TaskManager來說,上述信息會打印到
jobmanager.out
或taskmanager.out
中。
DataFlow集群外的機器,如何提交作業到DataFlow集群?
您可以根據以下步驟,通過DataFlow集群外的機器,提交作業到DataFlow集群:
確保DataFlow集群和DataFlow集群外的機器網絡互通。
配置提交Flink作業的客戶端的Hadoop YARN環境。
DataFlow集群中的Hadoop YARN的軟件安裝目錄是/opt/apps/YARN/yarn-current,配置文件的目錄是/etc/taihao-apps/hadoop-conf/,您需要將yarn-current目錄及hadoop-conf目錄下載到提交Flink作業的客戶端上。
然后,在提交Flink作業的客戶端上,配置如下環境變量。
export HADOOP_HOME=/path/to/yarn-current && \ export PATH=${HADOOP_HOME}/bin/:$PATH && \ export HADOOP_CLASSPATH=$(hadoop classpath) && \ export HADOOP_CONF_DIR=/path/to/hadoop-conf
重要Hadoop的配置文件中(例如yarn-site.xml等)配置的服務地址(例如ResourceManager等),使用的是全域名(FQDN,Fully Qualified Domain Name)。例如,master-1-1.c-xxxxxxxxxx.cn-hangzhou.emr.aliyuncs.com。因此,如果您通過集群外的機器提交作業,需要能夠解析這些FQDN或者將配置文件中的FQDN修改成對應的IP地址。
完成以上配置后,您在集群外的機器上啟動Flink作業(例如,運行命令
flink run -d -t yarn-per-job -ynm flink-test $FLINK_HOME/examples/streaming/TopSpeedWindowing.jar
)后,應當能在DataFlow集群的YARN Web UI中看到相應的Flink作業。
在DataFlow集群外機器上,如何解析DataFlow集群中的hostname?
您可以通過以下方式,在DataFlow集群外的機器上,解析DataFlow集群中的hostname:
修改提交Flink作業的客戶端上的/etc/hosts文件,添加相應的hostname到IP的映射。
通過PrivateZone提供的DNS解析服務。
如果您有自己的域名解析服務,也可以通過如下方式,配置JVM的運行參數,使用自己的域名解析服務。
env.java.opts.client: "-Dsun.net.spi.nameservice.nameservers=xxx -Dsun.net.spi.nameservice.provider.1=dns,sun -Dsun.net.spi.nameservice.domain=yyy"
如何查看Flink作業的運行狀態?
通過EMR控制臺查看。
EMR支持Knox,可以通過公網方式訪問YARN、Flink等的Web UI界面,Flink的Web UI可以通過YARN進行查看,詳細信息請參見通過Web UI查看作業狀態。
通過SSH隧道的方式查看,詳情信息請參見通過SSH隧道方式訪問開源組件Web UI。
直接訪問YARN REST接口。
curl --compressed -v -H "Accept: application/json" -X GET "http://master-1-1:8088/ws/v1/cluster/apps?states=RUNNING&queue=default&user.name=***"
說明需確保安全組開放了8443和8088端口,可以訪問到YARN的REST接口或者DataFlow集群和訪問的節點處于同一內網中。
如何訪問Flink作業的日志?
對于運行中的作業,可以通過Flink Web UI,訪問Flink作業的日志。
對于已經運行結束的作業,可以通過Flink History Server查看作業的統計信息或者通過命令
yarn logs -applicationId application_xxxx_yyyy
訪問作業的日志,已經運行結束的作業的日志默認保存在HDFS集群的hdfs:///tmp/logs/$USERNAME/logs/目錄下。
如何訪問DataFlow集群中的Flink HistoryServer?
DataFlow集群會默認在master-1-1節點(即header機器組的第一臺機器)的18082端口啟動Flink HistoryServer,用于收集已運行結束的作業的統計信息 ,具體訪問方式如下:
配置安全組規則,開放master-1-1節點的18082端口的訪問權限。
直接訪問http://$master-1-1-ip:18082。
Flink HistoryServer目前不存儲已完成作業的具體日志,如需查看日志請通過YARN API或者YARN的WebUI查詢。
如何使用DataFlow集群中所支持的商業化Connector?
DataFlow集群提供了很多商業化Connector,例如Hologres、SLS、MaxCompute、DataHub、Elasticsearch和ClickHouse等,您在Flink作業中除了可以使用開源的Connector之外,還可以使用這些商業化Connector。下面以Hologres Connector為例,介紹如何在Flink作業中使用DataFlow集群所攜帶的商業化Connector。
作業開發
下載DataFlow集群所攜帶的商業化Connector的JAR包(位于DataFlow集群的/opt/apps/FLINK/flink-current/opt/connectors目錄下),并通過如下方式將商業化Connector安裝在本地Maven環境中。
mvn install:install-file -Dfile=/path/to/ververica-connector-hologres-1.13-vvr-4.0.7.jar -DgroupId=com.alibaba.ververica -DartifactId=ververica-connector-hologres -Dversion=1.13-vvr-4.0.7 -Dpackaging=jar
在項目的pom.xml文件中添加以下依賴。
<dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-hologres</artifactId> <version>1.13-vvr-4.0.7</version> <scope>provided</scope> </dependency>
運行作業
方式一:
拷貝Hologres Connector到一個獨立的目錄。
hdfs mkdir hdfs:///flink-current/opt/connectors/hologres/ hdfs cp hdfs:///flink-current/opt/connectors/ververica-connector-hologres-1.13-vvr-4.0.7.jar hdfs:///flink-current/opt/connectors/hologres/ververica-connector-hologres-1.13-vvr-4.0.7.jar
提交作業時,命令中添加以下參數。
-D yarn.provided.lib.dirs=hdfs:///flink-current/opt/connectors/hologres/
方式二:
拷貝Hologres Connector到提交Flink作業的客戶端的/opt/apps/FLINK/flink-current/opt/connectors/ververica-connector-hologres-1.13-vvr-4.0.7.jar目錄下,與DataFlow集群中的目錄結構保持一致。
提交作業時,命令中添加以下參數。
-C file:///opt/apps/FLINK/flink-current/opt/connectors/ververica-connector-hologres-1.13-vvr-4.0.7.jar
方式三:將Hologres Connector打包到作業的JAR包中。
如何使用GeminiStateBackend?
DataFlow集群提供了企業版狀態后端(即GeminiStateBackend),性能是開源版本的3~5倍。DataFlow集群在配置文件中默認使用GeminiStateBackend,關于GeminiStateBackend的更多高級配置,詳情請參見企業級狀態后端存儲配置。
如何使用開源的StateBackend?
DataFlow集群在配置文件中默認使用企業版狀態后端(即GeminiStateBackend),您如果想針對單個作業使用開源的狀態后端(例如rocksdb),可以通過-D指定,例如:
flink run-application -t yarn-application -D state.backend=rocksdb /opt/apps/FLINK/flink-current/examples/streaming/TopSpeedWindowing.jar
或者如果您想讓上述修改對后續作業生效,在EMR控制臺,修改state.backend參數的值為您想使用的狀態的后端(例如rocksdb)。單擊保存,然后單擊部署客戶端配置:
客戶端日志在哪里?如何查看?
在EMR的集群環境中我們配置了FLINK_LOG_DIR環境變量來指明Flink客戶端的日志存放位置。它的默認值是/var/log/taihao-apps/flink(在3.43.0之前的版本中默認是/mnt/disk1/log/flink)。您如果需要查看客戶端的完整日志(如SQL-Client的日志)可以在該目錄下查看對應文件。
通過flink run命令運行作業時,作業的參數沒有生效
在通過命令行命令運行Flink作業時,Flink作業的參數需要放在Flink作業JAR包的后面,例如flink run -d -t yarn-per-job test.jar arg1 arg2。
報錯Multiple factories for identifier '...' that implement '...' found in the classpath,該如何處理?
報錯原因
表明在Classpath中找到了某個Connector的多個實現。原因通常為在作業JAR添加了相關Connector依賴,同時又手動在$FLINK_HOME/ib目錄下放入了相同的Connector依賴,導致了依賴沖突。
解決方案
解決思路為去除重復的依賴,詳細步驟可以參考作業JAR包和集群內Flink的JAR包存在沖突問題排查。
如何開啟Flink作業JobManager的HA?
DataFlow集群基于YARN模式部署并運行Flink作業,您可以按照社區中的Configuration開啟JobManager的HA,從而使Flink作業可以更穩定的運行。配置示例如下所示。
high-availability: zookeeper
high-availability.zookeeper.quorum: 192.168.**.**:2181,192.168.**.**:2181,192.168.**.**:2181
high-availability.zookeeper.path.root: /flink
high-availability.storageDir: hdfs:///flink/recovery
開啟HA后,默認情況下,JobManager在失敗后最多重啟一次。如果您想讓JobManager重啟多次,還需要設置YARN的yarn.resourcemanager.am.max-attempts參數和Flink的yarn.application-attempts參數,詳情請參見Flink官方文檔。除此之外,根據經驗,通常還需要調整yarn.application-attempt-failures-validity-interval參數的值,將其從默認的10000毫秒(10秒)調整到一個比較大的值,例如調大為300000毫秒(5分鐘),防止JobManager不停的重啟。
如何查看Flink作業的監控指標?
您可以在EMR控制臺目標集群的集群監控頁面,單擊指標監控。
在Dashboard下拉框中選擇FLINK。
選擇待查看作業對應的Application ID和Job ID,即可展示Flink作業的各項監控指標。
監控指標詳情,請參見Flink指標。
說明僅當集群中已有運行的Flink作業時,才會有可供選擇的Application ID和Job ID。
部分指標只有配置了相應上下游的Source和Sink才會有輸出信息。例如,sourceIdleTime。
如何處理上下游存儲(Connector)問題?
關于上下游存儲方面的常見問題,請參見上下游存儲。
通過DataFlow集群運行Flink作業免密讀寫OSS時報錯,該如何處理?
您需要根據具體的報錯信息,進行相應的處理:
報錯提示
java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS
。問題原因:DataFlow集群目前是通過內置的JindoSDK來支持免密讀寫OSS,并支持StreamingFileSink等API的,不需要再按照社區文檔進行額外的配置,否則會由于依賴沖突導致此報錯。
處理方法:檢查您集群內提交作業的機器的$FLINK_HOME/plugins目錄,查看是否放置了oss-fs-hadoop目錄。如果放置了該目錄,請刪除該目錄后重新提交作業。
報錯提示
Could not find a file system implementation for scheme 'oss'. The scheme is directly supported by Flink through the following plugin: flink-oss-fs-hadoop. ....
。問題原因:EMR-3.40及之前的版本的EMR集群中,master機器組內非master-1-1機器上可能缺少Jindo相關的JAR包。
處理方法:
EMR-3.40及之前的版本:檢查您的集群內提交作業的機器的$FLINK_HOME/lib目錄下是否有Jindo相關的JAR包,例如jindo-flink-4.0.0.jar。如果沒有Jindo相關的JAR包,您可以在集群中運行以下命令將Jindo相關的JAR包,拷貝到$FLINK_HOME/lib目錄后重新提交作業。
cp /opt/apps/extra-jars/flink/jindo-flink-*-full.jar $FLINK_HOME/lib
EMR-3.40之后版本:優化了支持的方式,即使$FLINK_HOME/lib目錄下沒有Jindo相關的JAR包,讀寫OSS的作業也是可以正常運行的。請使用Flink on YARN的相關命令來啟動作業。
報錯java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id timed out,該如何處理?
報錯原因
直接原因是TaskManager心跳超時,具體原因可以在TaskManager日志中查看報錯信息進行定位。除此之外,還存在TaskManager堆內存大小有限或者作業代碼存在內存泄露導致的內存溢出錯誤,例如報錯java.lang.OutOfMemoryError: GC overhead limit exceeded,該如何處理?。
解決方案
上述報錯遇到該類報錯時需要您調大內存或者分析作業內存使用情況來進一步定位原因。
報錯java.lang.OutOfMemoryError: GC overhead limit exceeded,該如何處理?
報錯原因
該報錯代表為作業設定的內存不夠,導致GC超時。常見原因為代碼(如UDF)發生內存泄露或者內存大小確實不能滿足業務需求。
解決方案
您可在重新運行問題作業前通過-D方式指定JVM參數,保存OutOfMemoryError發生時的現場
-D env.java.opts="-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/dump.hprof"
。在flink-conf.yaml中添加參數
env.java.opts: -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/dump.hprof
來配置OutOfMemoryError發生時進行heap dump。
待作業再次報錯之后,您可針對HeapDumpPath指定的heap dump文件進行分析(例如使用MAT工具或者jvisualvm工具),確定問題根因。
Flink UI上作業只有一個Operator,并且顯示Records Received為0,該如何處理?
這是正常現象,Flink的Records Received相關指標用于描述不同Operator之間的數據通信,當作業被優化為一個Operator時,該指標值恒為0。
Flink作業如何開啟火焰圖?
火焰圖(Flame Graph)用于可視化進程中各個方法的CPU消耗,協助用戶解決性能瓶頸。Flink 1.13版本開始支持火焰圖功能,但為了避免火焰圖對生產環境中作業的影響,默認關閉該功能。如果您需要借助火焰圖功能對作業性能進行分析,可以在EMR控制臺Flink服務配置頁簽的flink-conf.yaml中新增參數為rest.flamegraph.enabled,參數值為true的配置項。新增配置項的具體操作,請參見管理配置項。
關于火焰圖的更多介紹,請參見Flame Graphs。
報錯Exception in thread "main" java.lang.NoSuchFieldError: DEPLOYMENT_MODE,該如何處理?
報錯原因
您的作業的JAR中直接或者間接引入了與集群中Flink版本不兼容的flink-core依賴,造成了依賴沖突。
解決方案
在pom.xml中添加以下信息,將flink-core依賴的
scope
設置為provided
來修復該問題。<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <!-- Change to your own flink version --> <version>1.16.1</version> <scope>provided</scope> </dependency>
說明version
需要修改為您實際使用的Flink版本。如果您想進一步定位引入該依賴的原因,可以參見作業JAR包和集群內Flink的JAR包存在沖突。