本文為您介紹如何在自動駕駛數據管理中編寫一個用戶自定義算子構建一個簡單的自動駕駛云上數據預處理流程。
當平臺內置的算子無法解決您的數據處理需求時,您需要自己開發一個算子(如天氣識別算子)并打包成像,上傳到自定義算子管理模塊,點擊上線后,該算子將會在任務配置的算子列表中出現,即可參與任務的編排。大致可分為以下兩個步驟:
上傳自定義算子。
創建一個包含自定義算子的任務。
視頻教程
Step 1 上傳自定義算子
自定義算子的上傳分為以下三個步驟:
在您的算子代碼中引入平臺提供的開發框架
將您的算子打包成鏡像并上傳到阿里云ACR代碼倉庫
在自定義算子管理模塊上傳該鏡像
以下為您詳細介紹這三個步驟。
Step 1.1 引入平臺提供的開發框架
1) 安裝SDK
項目根目錄創建sdk目錄,下載SDK文件并放置在該目錄下:
安裝模塊:pip install sdk/ali_autodrive-0.0.1.tar.gz
2) 實現抽象類
項目根目錄創建模塊(例如example),在example模塊中創建數據處理類(如DataProcessor)
DataProcessor實現抽象方法DataProcessTaskAbstract,重寫拆分方法和處理方法。
import json
import time
import av
import os.path
from abc import ABC
from ali_autodrive.parallel_compute.model.FileContent import FileContent
from ali_autodrive.parallel_compute.utils.tree_util import *
from ali_autodrive.parallel_compute.DataProcessTaskAbstract import DataProcessTaskAbstract
MAX_RECORD_NUM = 200
class DataProcessor(DataProcessTaskAbstract, ABC):
def __init__(self):
super(DataProcessor, self).__init__()
def data_partition(self, context):
self.get_logger().info("Data processor, data partition start.")
file_tree_reader = FileTreeReader(self.get_file_tree())
while file_tree_reader.finish is False:
file_list = file_tree_reader.get_sub_node_file_list(1)
for file in file_list:
if "file_path" in file and file["file_path"].endswith('.mp4'):
self.save_data_partition([json.dumps(file)])
self.get_logger().info("Data processor, data partition end.")
def data_process(self, context):
self.get_logger().info("Data processor, data process start.")
# 視頻抽幀
local_file_list = get_sub_node_local_file_list(self.get_file_tree())
for file_path in local_file_list:
self.framing(file_path)
self.get_logger().info("Data processor, data process end.")
def framing(self, file_path):
file_name = self.__get_file_name(file_path)
container = av.open(file_path)
stream = container.streams.video[0]
stream.codec_context.skip_frame = 'NONREF'
path = self.get_user_workspace() + "/img/" + file_name
if not os.path.exists(path):
os.makedirs(path)
file_list = []
timestamp_ns = time.time_ns()
for frame in container.decode(stream):
file_path = path + "/frame-%04d.jpg" % frame.index
frame.to_image().save(file_path)
# 文件打標
file_tag = {}
file_tag["timestamp_ns"] = timestamp_ns + int(frame.time * 1000000000)
content = FileContent()
content.file_tag = file_tag
content.file_path = file_path
if len(file_list) >= MAX_RECORD_NUM:
self.save_data_partition(file_list)
file_list = []
else:
file_list.append(json.dumps(content.__dict__))
if len(file_list) > 0:
self.save_data_partition(file_list)
@staticmethod
def __get_file_name(file_path):
return file_path.split("/")[-1].split(".")[0]
Step 1.2 本地測試
1)創建配置文件
項目根目錄創建配置文件(如config.ini)
[init]
#表示工作目錄,替換為自己機器工作空間
workspace = /Users/icyore/workspace
#模擬OSS地址,替換為自己的測試文件目錄
ossInput = /Users/icyore/oss/input
#模擬OSS地址,替換為測試輸出目錄
ossOutput = /Users/icyore/oss/output
#【可空】算子初始化參數,可以在算子中獲取,同數據管理平臺標準化轉換節點的“轉換程序參數”,JSON字符串類型。
transformParams ={"vehicleId":"并行計算測試車"}
2)創建啟動腳本
項目根目錄創建啟動腳本(如test_start.py)
from ali_autodrive.parallel_compute.service_startup import *
# ./config.ini 配置文件路徑,可以是相對當前目錄路徑,也可以是絕對路徑
# example.DataProcessor 算子所在模塊
# DataProcessor 算子實現類
test("./config.ini", "example.DataProcessor", "DataProcessor")
3)運行啟動腳本
運行啟動腳本
python test_start.py
Step 1.3創建鏡像
1)打包模塊
項目根目錄創建setup.py文件,執行打包命令:python setup.py sdist
注意:打包生成的文件存儲在dist目錄下
# -*- coding:utf-8 -*-
from setuptools import (setup, find_packages)
setup(
# 包名
name="example",
# 版本
version="0.0.1",
# 需要包含的子包列表
packages=find_packages(),
# 添加依賴
install_requires=[
#'python-lzf==0.2.4',
]
)
2)創建Dockerfile
項目根目錄創建Dockerfile文件
FROM python:3.8
COPY . /app
WORKDIR /app
RUN pip install sdk/ali_autodrive-0.0.1.tar.gz
ADD sdk/ali_autodrive-0.0.1.tar.gz ali_autodrive
RUN pip install dist/example-0.0.1.tar.gz
WORKDIR /app/ali_autodrive/ali_autodrive-0.0.1/ali_autodrive/parallel_compute
EXPOSE 5000
#example.DataProcessor 算子所在模塊
#DataProcessor 算子實現類
CMD ["python","service_startup.py" ,"example.DataProcessor","DataProcessor"]
為了提升鏡像打包速度,可以將上一個版本當作基礎鏡像,基于基礎鏡像打包,節省模塊安裝耗時。
3)上傳鏡像
制作鏡像并上傳到ACR
docker login --username=jier****@city-brain-pro auto-driver-registry.cn-hangzhou.cr.aliyuncs.com
docker build -t parallel-compute-example:0.0.1 .
docker tag parallel-compute-example:0.0.1 auto-driver-registry.cn-hangzhou.cr.aliyuncs.com/partition_compute/parallel-compute-example:0.0.1
docker push auto-driver-registry.cn-hangzhou.cr.aliyuncs.com/partition_compute/parallel-compute-example:0.0.1
其中auto-driver-registry.cn-hangzhou.cr.aliyuncs.com為ACR倉庫地址,parallel_compute為ACR命名空間,需要替換為自己的地址,并使用自己的賬號進行登錄。
其中parallel-compute-example:0.0.1為鏡像名:版本號,可以自定義名稱和版本號。
Step 1.4 上傳自定義算子
打開產品左側目錄,在數據定義模塊下面,找到自定義算子管理。
點擊右上角的添加算子,選擇對應類別,按照提示上傳算子。
在算子列表中找到您剛剛上傳的算子,點擊上線。
Step 2 創建包含自定義算子的任務
以上步驟完成后,您既可在任務配置頁面的系統節點列表中,找到您剛剛上傳的算子。此時,該算子和系統內置算子一樣,可以單獨使用,也可以和內置算子一起混合編排。
具體步驟,可參考 使用內置算子建一個簡單的數據處理任務。