LogHub(SLS)數據源為您提供讀取和寫入LogHub(SLS)雙向通道的功能,本文為您介紹DataWorks的LogHub(SLS)數據同步的能力支持情況。
使用限制
數據集成離線寫LogHub(SLS)時,由于LogHub(SLS)無法實現冪等,FailOver重跑任務時會引起數據重復。
支持的字段類型
數據集成支持讀寫的LogHub(SLS)字段類型如下。
字段類型 | 離線讀(LogHub(SLS) Reader) | 離線寫(LogHub(SLS) Writer) | 實時讀 |
STRING | 支持 | 支持 | 支持 |
其中:
離線寫LogHub(SLS)時
會將支持同步的各類型數據均轉換成STRING類型后寫入LogHub(SLS)。LogHub(SLS) Writer針對LogHub(SLS)類型的轉換列表,如下所示。
支持的數據集成內部類型
寫入LogHub(SLS)時的數據類型
LONG
STRING
DOUBLE
STRING
STRING
STRING
DATE
STRING
BOOLEAN
STRING
BYTES
STRING
實時讀LogHub(SLS)時
會自帶以下元數據字段。
LogHub(SLS)實時同步字段
數據類型
說明
__time__
STRING
SLS保留字段:__time__寫入日志數據時指定的日志時間,unix時間戳,單位為秒。
__source__
STRING
SLS保留字段:__source__日志來源設備。
__topic__
STRING
SLS保留字段:__topic__topic名稱。
__tag__:__receive_time__
STRING
日志到達服務端的時間。開啟記錄外網IP功能后,服務端接收日志時為原始日志追加該字段。unix時間戳,單位為秒。
__tag__:__client_ip__
STRING
日志來源設備的公網IP。開啟記錄外網IP功能后,服務端接收日志時為原始日志追加該字段。
__tag__:__path__
STRING
Logtail采集的日志文件路徑,Logtail會自動為日志追加該字段。
__tag__:__hostname__
STRING
Logtail采集數據的來源機器主機名,Logtail會自動為日志追加該字段。
創建數據源
在進行數據同步任務開發時,您需要在DataWorks上創建一個對應的數據源,操作流程請參見創建并管理數據源,詳細的配置參數解釋可在配置界面查看對應參數的文案提示。
數據同步任務開發
數據同步任務的配置入口和通用配置流程可參見下文的配置指導。
LogHub數據源作為數據來源端,在進行任務配置同步時支持通過LogHub的查詢語法、SPL語句(SLS Processing Language是SLS處理日志的語法)對LogHub內的數據進行過濾,具體語法說明請參見附錄二:LogHub SPL語法過濾說明。
單表離線同步任務配置指導
操作流程請參見通過向導模式配置離線同步任務、通過腳本模式配置離線同步任務。
腳本模式配置的全量參數和腳本Demo請參見下文的附錄一:腳本Demo與參數說明。
單表實時同步任務配置指導
操作流程請參見配置單表增量數據實時同步、DataStudio側實時同步任務配置。
整庫離線、整庫(實時)全增量、整庫(實時)分庫分表等整庫級別同步配置指導
操作流程請參見數據集成側同步任務配置。
常見問題
更多其他數據集成常見問題請參見數據集成常見問題。
附錄一:腳本Demo與參數說明
離線任務腳本配置方式
如果您配置離線任務時使用腳本模式的方式進行配置,您需要按照統一的腳本格式要求,在任務腳本中編寫相應的參數,詳情請參見通過腳本模式配置離線同步任務,以下為您介紹腳本模式下數據源的參數配置詳情。
Reader腳本Demo
{
"type":"job",
"version":"2.0",//版本號。
"steps":[
{
"stepType":"LogHub",//插件名。
"parameter":{
"datasource":"",//數據源。
"column":[//字段。
"col0",
"col1",
"col2",
"col3",
"col4",
"C_Category",
"C_Source",
"C_Topic",
"C_MachineUUID", //日志主題。
"C_HostName", //主機名。
"C_Path", //路徑。
"C_LogTime" //事件時間。
],
"beginDateTime":"",//數據消費的開始時間位點。
"batchSize":"",//一次從日志服務查詢的數據條數。
"endDateTime":"",//數據消費的結束時間位點。
"fieldDelimiter":",",//列分隔符。
"logstore":""http://:目標日志庫的名字。
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"http://錯誤記錄數。
},
"speed":{
"throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
"concurrent":1 //作業并發數。
"mbps":"12",//限流,此處1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Reader腳本參數
參數 | 描述 | 是否必選 | 默認值 |
endPoint | 日志服務入口endPoint是訪問一個項目(Project)及其內部日志數據的URL。它和Project所在的阿里云地域(Region)及Project名稱相關。各地域的服務入口請參見服務入口。 | 是 | 無 |
accessId | 訪問日志服務的訪問密鑰,用于標識用戶。 | 是 | 無 |
accessKey | 訪問日志服務的訪問密鑰,用來驗證用戶的密鑰。 | 是 | 無 |
project | 目標日志服務的項目名稱,是日志服務中的資源管理單元,用于隔離和控制資源。 | 是 | 無 |
logstore | 目標日志庫的名稱,logstore是日志服務中日志數據的采集、存儲和查詢單元。 | 是 | 無 |
batchSize | 一次從日志服務查詢的數據條數。 | 否 | 128 |
column | 每條數據中的列名,此處可以配置日志服務中的元數據作為同步列。日志服務支持日志主題、采集機器唯一標識、主機名、路徑和日志時間等元數據。 說明 列名區分大小寫。元數據的寫法請參見日志服務機器組。 | 是 | 無 |
beginDateTime | 數據消費的開始時間位點,即日志數據到達LogHub(SLS)的時間。該參數為時間范圍(左閉右開)的左邊界,yyyyMMddHHmmss格式的時間字符串(例如20180111013000),可以和DataWorks的調度時間參數配合使用。 例如,您在節點編輯頁面右側的調度配置,在參數中配置 說明 beginDateTime和endDateTime需要互相組合配套使用。 | 是 | 無 |
endDateTime | 數據消費的結束時間位點,為時間范圍(左閉右開)的右邊界,yyyyMMddHHmmss格式的時間字符串(例如20180111013010),可以和DataWorks的調度時間參數配合使用。 例如,您在節點編輯頁面右側的調度配置,在參數中配置endDateTime=${yyyymmdd},則在日志結束時間處配置為${endDateTime}000000,表示獲取的日志結束時間為業務日期后一天的0點0分0秒。詳情請參見調度參數支持的格式。 說明 上一周期的endDateTime需要和下一周期的beginDateTime保持一致,或晚于下一周期的beginDateTime。否則,可能無法拉取部分區域的數據。 | 是 | 無 |
Writer腳本Demo
{
"type": "job",
"version": "2.0",//版本號。
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "LogHub",//插件名。
"parameter": {
"datasource": "",//數據源。
"column": [//字段。
"col0",
"col1",
"col2",
"col3",
"col4",
"col5"
],
"topic": "",//選取topic。
"batchSize": "1024",//一次性批量提交的記錄數大小。
"logstore": ""http://目標LogService LogStore的名稱。
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": ""http://錯誤記錄數。
},
"speed": {
"throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
"concurrent":3, //作業并發數。
"mbps":"12"http://限流,此處1mbps = 1MB/s。
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
Writer腳本參數
LogHub(SLS) Writer通過數據集成框架獲取Reader生成的數據,然后將數據集成支持的類型通過逐一判斷轉換成STRING類型。當達到您指定的batchSize時,會使用LogService Java SDK一次性推送至LogHub(SLS)。
參數 | 描述 | 是否必選 | 默認值 |
endpoint | 日志服務入口endPoint是訪問一個項目(Project)及其內部日志數據的URL。它和Project所在的阿里云地域(Region)及Project名稱相關。各地域的服務入口請參見:服務入口。 | 是 | 無 |
accessKeyId | 訪問日志服務的AccessKeyId。 | 是 | 無 |
accessKeySecret | 訪問日志服務的AccessKeySecret。 | 是 | 無 |
project | 目標日志服務的項目名稱。 | 是 | 無 |
logstore | 目標日志庫的名稱,logstore是日志服務中日志數據的采集、存儲和查詢單元。 | 是 | 無 |
topic | 目標日志服務的topic名稱。 | 否 | 空字符串 |
batchSize | LogHub(SLS)一次同步的數據條數,默認1,024條,最大值為4,096。 說明 一次性同步至LogHub(SLS)的數據大小不要超過5M,請根據您的單條數據量大小調整一次性推送的條數。 | 否 | 1,024 |
column | 每條數據中的column名稱。 | 是 | 無 |
附錄二:LogHub SPL語法過濾說明
LogHub數據源作為數據來源端,在進行任務配置同步時支持通過LogHub的查詢語法、SPL語句(SLS Processing Language是SLS處理日志的語法)對LogHub內的數據進行過濾,具體語法說明如下:
SPL的更多詳細信息,請參見SPL概述。
場景 | SQL語句 | SPL語句 |
數據過濾 |
|
|
字段處理與篩選 | 精確選擇字段,并將其重命名:
|
|
數據規整 (調用SQL函數) | 轉換數據類型、時間解析等:
| 轉換數據類型、時間解析等:
|
字段提取 | 正則提取:
JSON提取:
|
|