使用Spark Structured Streaming完成客戶日志數據寫入Delta Lake。
本章架構圖
步驟一:創建Kafka集群和Databricks 數據洞察集群
1. 登錄阿里云E-MapReduce控制臺。
2. 創建Kafka集群,詳情參見創建集群
3. 登錄Databricks數據洞察控制臺。
4. 創建集群,詳情參見創建集群。
步驟二:Databricks 數據洞察集群添加外部數據源
單擊左側集群按鈕,選擇已創建的集群。
進入集群詳情頁面,單擊上方數據源按鈕。
在數據源頁面,單擊添加按鈕,選擇Aliyun EMR KAFKA
填入描述,選擇kafka集群。
步驟三:在Kafka集群上創建Topic
本示例將創建一個分區數為1、副本數為2、名稱為log-generator-topic。
登錄Kafka集群的Master節點,詳情請參見登錄集群。
通過如下命令創建Topic。
/usr/lib/kafka-current/bin/kafka-topics.sh --partitions 1 --replication-factor 2 --zookeeper emr-header-1:2181 /kafka-1.0.0 --topic log-generator-topic --create
說明
創建Topic后,請保留該登錄窗口,后續步驟仍將使用。
步驟四:執行Spark Structured Streaming作業
在Notebook中引入第三方庫詳情參見Java庫管理
%spark.conf
spark.jars.packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1
2. 創建數據庫。
%sql
--創建數據庫
create database log_data_warehouse;
--使用創建的庫
use log_data_warehouse;
3. 根據生成的日志結果創建分區表。
說明
Apache日志發生器,生成的日志結構。
#日志結構
40.198.97.68 - - [19/Mar/2021:18:26:12 +0800] "GET /category/office HTTP/1.1" 200 58 "/item/electronics/975" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.7 (KHTML, like Gecko) Chrome/16.0.912.77 Safari/535.7"
104.75.85.35 - - [19/Mar/2021:18:26:12 +0800] "GET /item/networking/4019 HTTP/1.1" 200 118 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 5_0_1 like Mac OS X) AppleWebKit/534.46 (KHTML, like Gecko) Version/5.1 Mobile/9A405 Safari/7534.48.3"
%sql
CREATE TABLE apache_logs(
ipaddr STRING,
identity STRING,
username STRING,
accesstime STRING,
request STRING,
status STRING,
size STRING,
referrer STRING,
agent STRING,
year string,
month string,
day string
)
using delta
PARTITIONED BY(year,month ,day )
4. Notebook執行Spark Structured Streaming 作業
%spark
import org.apache.spark.sql.functions._
//定義執行Structured Streaming的方法
def getquery(checkpoint_dir:String,tableName:String,servers:String,topic:String ) {
// 加載Kafka數據配置項startingOffsets=latest;
var streamingInputDF =
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", topic)
.option("startingOffsets", "latest")
.option("minPartitions", "10")
.option("failOnDataLoss", "true")
.load()
// 定義日志的字段類型
val resDF=streamingInputDF
.select(col("value").cast("string"))
.withColumn("newMessage",split(col("value"), " "))
.filter(col("newMessage").getItem(8).isNotNull)
.select(
col("newMessage").getItem(0).as("ipaddr"),
col("newMessage").getItem(1).as("identity"),
col("newMessage").getItem(2).as("username"),
col("newMessage").getItem(3).as("accesstime"),
col("newMessage").getItem(4).as("request"),
col("newMessage").getItem(5).as("status"),
col("newMessage").getItem(6).as("size"),
col("newMessage").getItem(7).as("referrer"),
col("newMessage").getItem(8).as("agent")
)
.withColumn("year",date_format(current_date(),"yyyy"))
.withColumn("month",date_format(current_date(),"MM"))
.withColumn("day",date_format(current_date(),"dd"))
// 將流數據動態寫入到apache_logs表里
val query = resDF
.writeStream
.outputMode("append")
.format("delta")
.option("checkpointLocation", checkpoint_dir)
.table(tableName)
query
}
5. 執行作業
%spark
val my_checkpoint_dir="your checkpiont path"
val tableName="apache_logs"
val servers= "your kafka server"
val topic= "log-generator-topic"
val flowquery =getquery(my_checkpoint_dir,tableName,servers,topic)
步驟五:使用Kafka發布消息
在Kafka集群的命令行窗口,執行如下命令運行Kafka的生產者。
/usr/lib/kafka-current/bin/kafka-console-producer.sh --topic log-generator-topic --broker-list emr-worker-1:9092
在命令行中輸入數據
40.198.97.68 - - [19/Mar/2021:18:26:12 +0800] "GET /category/office HTTP/1.1" 200 58 "/item/electronics/975" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.7 (KHTML, like Gecko) Chrome/16.0.912.77 Safari/535.7"
104.75.85.35 - - [19/Mar/2021:18:26:12 +0800] "GET /item/networking/4019 HTTP/1.1" 200 118 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 5_0_1 like Mac OS X) AppleWebKit/534.46 (KHTML, like Gecko) Version/5.1 Mobile/9A405 Safari/7534.48.3"
2.向Kafka集群手動寫入三條測試數據
步驟六:查看結果
1. 進入Databricks數據洞察Notebook,動態查看數據寫入情況
%spark
for( i <- 1 to 3 ) {
Thread.sleep(5000)
spark.sql("select count(1) from apache_logs").show()
}
查看數據寫入情況
查詢寫入表的count數成功寫入三條。
步驟七: 使用Yarn Applications UI頁面查看作業詳情或kill job
通過Yarn UI查看Spark Structured Streaming作業的信息,詳情請參見訪問Web UI。
1. 進入running,選擇需要kill的application。
文檔內容是否對您有幫助?