本文為您介紹如何使用Print連接器。
背景信息
Print是用于調試的連接器,允許接收并打印一定數量的輸入記錄。如果您想觀察Flink作業的中間結果,或者觀察最終輸出結果,可以添加Print結果表,單擊運行,在TaskManager的日志中觀察打印出的結果信息。
Print可用于輔助確認輸出到其他結果表中的消息是否符合預期。
Print連接器支持的信息如下。
類別 | 詳情 |
支持類型 | 結果表,數據攝入目標端 |
運行模式 | 批模式和流模式 |
數據格式 | 暫不適用 |
特有監控指標 | 暫無 |
API種類 | SQL,數據攝入YAML作業 |
是否支持更新或刪除結果表數據 | 是 |
前提條件
因為Print結果表數據輸出為Info日志,所以如果您需要查看Print結果表的結果數據,則需要將日志級別調至Info,否則無法查到結果數據。
Taskmanager.out日志展示數據限制為2000條。如果您有排查臟數據或特定數據等需求,建議在Where條件中指定業務場景相關條件后,進行Print操作,以避免因為數據條數限制導致無法排查。
SQL
語法結構
CREATE TABLE print_table (
a INT,
b varchar
) WITH (
'connector'='print',
'logger'='true'
);
您也可以基于現有的表模式使用LIKE
子句來創建,代碼示例如下。
CREATE TABLE print_table WITH ('connector' = 'print')
LIKE table_source (EXCLUDING ALL)
WITH參數
參數 | 說明 | 數據類型 | 是否必填 | 默認值 | 備注 |
connector | 表類型。 | String | 是 | 無 | 固定值為print。 |
logger | 控制臺是否顯示數據結果。 | Boolean | 否 | false | 取值如下:
|
print-identifier | 數據結果標識。 | String | 否 | 無 | 在日志中通過數據結果標識檢索信息。 |
sink.parallelism | 結果表并行度。 | Int | 否 | 上游并行度 | 無。 |
數據攝入
數據攝入YAML作業可以使用values連接器打印數據到out文件或日志中。
語法結構
source:
type: xxx
sink:
type: values
name: Values Sink
print.enabled: true
WITH參數
參數 | 說明 | 數據類型 | 是否必填 | 默認值 | 備注 |
type | 目標端類型。 | STRING | 是 | 無 | 固定值為values。 |
name | 目標端名稱。 | STRING | 否 | 無 | 無。 |
print.enabled | 是否作為print使用。 | BOOLEAN | 是 | 無 | 固定值為true。 |
materialized.in.memory | 是否將Binlog事件持久化到內存。 | BOOLEAN | 否 | false | 無。 |
sink.print.standard-error | 是否替換為輸出到標準錯誤流(System.error) | BOOLEAN | 否 | false | 默認輸出到標準輸出(System.out)。 |
sink.print.logger | 是否在日志打印。 | BOOLEAN | 否 | false | 無。 |
sink.print.limit | 最多打印的條數。 | LONG | 否 | 2000 | 無。 |
error.on.schema.change | Schema變更發生時是否報錯。 | BOOLEAN | 否 | false | 無。 |
使用示例
結果表
CREATE TEMPORARY TABLE table_source( name VARCHAR, score BIGINT ) WITH ( ... ); CREATE TEMPORARY TABLE print_sink( name VARCHAR, score BIGINT ) WITH ( 'connector' = 'print' ); INSERT INTO print_sink SELECT * from table_source;
數據攝入目標端
source: type: mysql name: MySQL Source hostname: ${mysql.hostname} port: ${mysql.port} username: ${mysql.username} password: ${mysql.password} tables: ${mysql.source.table} server-id: 7601-7604 sink: type: values name: Values Sink print.enabled: true