Flink Connector 使用
Connector 準備
參考管理自定義連接器,上傳igraph Connector的jar包。
上傳成功如圖所示
Propoerties詳解
名稱 | 類型 | 是否必須 | 描述 |
endpoint | string | 是 | igraph實例對應的endpoint |
username | string | 是 | igraph請求的username |
password | string | 是 | igraph請求的password |
graph_name | string | 是 | 要寫入的igraph圖名 |
label_name | string | 是 | 要寫入的igraph邊或者節點名 |
pk_field | string | 是 | igraph sink表的pk字段 |
sk_field | string | 否 | igraph sink表的sk字段 |
cmd_field | string | 是 | igraph sink表的cmd字段,取值為add、delete,用于表示這條記錄是新增或刪除請求 |
request_retry | int | 否,默認0 | 請求igraph的重試次數,默認為0 |
dry_run | boolean | 否,默認false | 如果dry_run為true,只做測試,不會真實寫入igraph |
async | boolean | 否,默認為 false | 打開會使用異步模式,寫入性能會有提升 |
Flink SQL demo
節點(kv)
--********************************************************************--
-- Author: jilong.yjl-75179-searchd***@searchdump.onaliyun.com
-- Created Time: 2022-11-09 10:43:36
-- Description: Write your description here
--********************************************************************--
CREATE TEMPORARY TABLE bhv_source (
cmd varchar,
content varchar,
gatewayTime bigint,
instnceId varchar,
pkey varchar
)
WITH (
'connector' = 'sls',
'endPoint' = 'cn-shanghai-intranet.log.aliyuncs.com',
'accessId' = 'xxx',
'accessKey' = 'xxx',
'startTime' = '2022-11-08 23:28:00',
'project' = 'igraph-cn-shanghai',
'logStore' = 'log',
'consumerGroup' = 'xxx',
'batchGetSize' = '1'
);
CREATE TEMPORARY TABLE igraph_sink (
cmd varchar,
content varchar,
gatewayTime bigint,
instnceId varchar,
pkey varchar
) with (
'connector' = 'igraph',
'endpoint' = 'igraph-cn-xxxx.igraph.aliyuncs.com',
'username' = '{igraphUserName}',
'password' = '{igraphPassword}',
'pk_field' = 'pkey',
'cmd_field' = 'cmd',
'graph_name' = '{graphName}',
'label_name' = '{nodeName}',
'dry_run' = 'false'
);
INSERT INTO igraph_sink
select * from bhv_source;
邊(kkv)
--********************************************************************--
-- Author: jilong.yjl-75179-searchd***@searchdump.onaliyun.com
-- Created Time: 2022-11-09 10:43:36
-- Description: Write your description here
--********************************************************************--
CREATE TEMPORARY TABLE bhv_source (
cmd varchar,
content varchar,
gatewayTime bigint,
instnceId varchar,
pkey varchar,
skey VARCHAR
)
WITH (
'connector' = 'sls',
'endPoint' = 'cn-shanghai-intranet.log.aliyuncs.com',
'accessId' = 'xxx',
'accessKey' = 'xxx',
'startTime' = '2022-11-08 23:28:00',
'project' = 'igraph-cn-shanghai',
'logStore' = 'log',
'consumerGroup' = 'xxx',
'batchGetSize' = '1'
);
CREATE TEMPORARY TABLE igraph_sink (
cmd varchar,
content varchar,
gatewayTime bigint,
instnceId varchar,
pkey varchar,
skey varchar
) with (
'connector' = 'igraph',
'endpoint' = 'igraph-cn-xxxx.igraph.aliyuncs.com',
'username' = '{igraphUserName}',
'password' = '{igraphPassword}',
'pk_field' = 'pkey',
'sk_field' = 'skey',
'cmd_field' = 'add',
'graph_name' = '{graphName}',
'label_name' = '{nodeName}',
'dry_run' = 'false'
);
INSERT INTO igraph_sink
select * from bhv_source;
需要注意內容:
pk_field為寫入igraph表的主鍵,必填字段
cmd_field取值為add、delete,表示是插入或刪除請求。如果是add請求,默認需要傳入表的全部字段,如果是update請求默認需要全部字段,如果需要支持部分字段的更新連續管理員配置
如果對邊進行delete操作,只填寫pkey 則默認刪除這個起點出發的全部邊,如果填寫pkey和skey則只刪除一條邊數據
graph_name 為寫入igraph的圖名
label_name 為寫入igraph圖的節點或者邊的label
request_retry,為了避免網絡抖動等原因,設置寫入igraph的重試請求次數
dry_run,為true時用作測試,會打log,不寫igraph。