日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Databricks Delta Lake數據入湖最佳實踐

使用Spark Structured Streaming完成客戶日志數據寫入Delta Lake。

本章架構圖

data

步驟一:創建Kafka集群和Databricks 數據洞察集群

1. 登錄阿里云E-MapReduce控制臺

2. 創建Kafka集群,詳情參見創建集群

3. 登錄Databricks數據洞察控制臺

4. 創建集群,詳情參見創建集群

步驟二:Databricks 數據洞察集群添加外部數據源

  1. 登錄Databricks數據洞察控制臺

  2. 單擊左側集群按鈕,選擇已創建的集群。

  3. 進入集群詳情頁面,單擊上方數據源按鈕。

  4. 在數據源頁面,單擊添加按鈕,選擇Aliyun EMR KAFKA

  5. 填入描述,選擇kafka集群

步驟三:在Kafka集群上創建Topic

本示例將創建一個分區數為1、副本數為2、名稱為log-generator-topic。

  1. 登錄Kafka集群的Master節點,詳情請參見登錄集群

  2. 通過如下命令創建Topic。

/usr/lib/kafka-current/bin/kafka-topics.sh --partitions 1 --replication-factor 2 --zookeeper emr-header-1:2181 /kafka-1.0.0 --topic log-generator-topic --create

說明

創建Topic后,請保留該登錄窗口,后續步驟仍將使用。

步驟四:執行Spark Structured Streaming作業

  1. 在Notebook中引入第三方庫詳情參見Java庫管理

%spark.conf
spark.jars.packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1

2. 創建數據庫。

%sql
--創建數據庫
create database log_data_warehouse;
--使用創建的庫
use log_data_warehouse;

3. 根據生成的日志結果創建分區表。

說明

Apache日志發生器,生成的日志結構。

#日志結構
40.198.97.68 - - [19/Mar/2021:18:26:12 +0800] "GET /category/office HTTP/1.1" 200 58 "/item/electronics/975" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.7 (KHTML, like Gecko) Chrome/16.0.912.77 Safari/535.7"
104.75.85.35 - - [19/Mar/2021:18:26:12 +0800] "GET /item/networking/4019 HTTP/1.1" 200 118 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 5_0_1 like Mac OS X) AppleWebKit/534.46 (KHTML, like Gecko) Version/5.1 Mobile/9A405 Safari/7534.48.3"
%sql
CREATE TABLE apache_logs(
ipaddr STRING,
identity STRING,
username STRING,
accesstime STRING,
request STRING,
status STRING,
size STRING,
referrer STRING,
agent STRING,
year string,
month string,
day string
)
using delta
PARTITIONED BY(year,month ,day )

4. Notebook執行Spark Structured Streaming 作業

%spark
import org.apache.spark.sql.functions._
//定義執行Structured Streaming的方法
def getquery(checkpoint_dir:String,tableName:String,servers:String,topic:String ) {
// 加載Kafka數據配置項startingOffsets=latest;
var streamingInputDF =  
  spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", servers)
    .option("subscribe", topic)     
    .option("startingOffsets", "latest")  
    .option("minPartitions", "10")  
    .option("failOnDataLoss", "true")
    .load()
// 定義日志的字段類型
val resDF=streamingInputDF
    .select(col("value").cast("string"))
    .withColumn("newMessage",split(col("value"), " "))
    .filter(col("newMessage").getItem(8).isNotNull)
    .select(
        col("newMessage").getItem(0).as("ipaddr"),
        col("newMessage").getItem(1).as("identity"),
        col("newMessage").getItem(2).as("username"),
        col("newMessage").getItem(3).as("accesstime"),
        col("newMessage").getItem(4).as("request"),
        col("newMessage").getItem(5).as("status"),
        col("newMessage").getItem(6).as("size"),
        col("newMessage").getItem(7).as("referrer"),
        col("newMessage").getItem(8).as("agent")
    )
    .withColumn("year",date_format(current_date(),"yyyy"))
    .withColumn("month",date_format(current_date(),"MM"))
    .withColumn("day",date_format(current_date(),"dd"))
// 將流數據動態寫入到apache_logs表里   
val query = resDF
      .writeStream
      .outputMode("append")
      .format("delta")
      .option("checkpointLocation", checkpoint_dir)
      .table(tableName)
  query
}

5. 執行作業

%spark
val my_checkpoint_dir="your checkpiont path"
val tableName="apache_logs"
val servers= "your kafka server"
val topic= "log-generator-topic"
val flowquery =getquery(my_checkpoint_dir,tableName,servers,topic)

步驟五:使用Kafka發布消息

  1. 在Kafka集群的命令行窗口,執行如下命令運行Kafka的生產者。

/usr/lib/kafka-current/bin/kafka-console-producer.sh --topic log-generator-topic --broker-list emr-worker-1:9092

在命令行中輸入數據

40.198.97.68 - - [19/Mar/2021:18:26:12 +0800] "GET /category/office HTTP/1.1" 200 58 "/item/electronics/975" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.7 (KHTML, like Gecko) Chrome/16.0.912.77 Safari/535.7"
104.75.85.35 - - [19/Mar/2021:18:26:12 +0800] "GET /item/networking/4019 HTTP/1.1" 200 118 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 5_0_1 like Mac OS X) AppleWebKit/534.46 (KHTML, like Gecko) Version/5.1 Mobile/9A405 Safari/7534.48.3"

2.向Kafka集群手動寫入三條測試數據

打他

步驟六:查看結果

1. 進入Databricks數據洞察Notebook,動態查看數據寫入情況

%spark
for( i <- 1 to 3 ) {
    Thread.sleep(5000)
    spark.sql("select count(1) from apache_logs").show()
}

查看數據寫入情況

查詢寫入表的count數成功寫入三條。data

步驟七: 使用Yarn Applications UI頁面查看作業詳情或kill job

通過Yarn UI查看Spark Structured Streaming作業的信息,詳情請參見訪問Web UI

1. 進入running,選擇需要kill的application。

data