本文介紹自定義算子管理的主要功能。
1 什么是自定義算子
自定義算子是用戶自己開發的算法,例如某個預標注算法,通過打包成代碼鏡像的方式注冊到數據管理平臺上,可以在數據處理任務中使用。
此處定義的算子可在后續數據處理任務時調用運行,包括“數據標準化算子”、“數據預處理算子”和“數據預標注算子”。
數據標準化算子:對接入平臺的非標準格式數據進行標準化,支持RGB圖像、視頻、傳感器內外參、點云、激光雷達原始掃描、毫米波雷達原始掃描、毫米波雷達目標追蹤、感知輸出障礙物、靜態地圖、GPS輸出、IMU輸出、里程計輸出、融合定位輸出結果、算法輸出軌跡、地盤輸出、導航輸出、標定等數據的標準化。 平臺內置ROS2 STD、Autowave、Appllo Cyber等數據格式的標準化程序支持您自定義所需算子。
數據預處理算子:對數據進行智能預處理平臺內置目標視覺增強、視覺圖片視角偏轉、各類點云濾波、圖像分辨率調整、圖像顏色通道調整等預處理算法支持您自定義所需算子。
數據預標注算子:對數據進行智能預標注平臺內置2D圖片/3D點云BondingBox與Tracking標注算法同時支持您自定義所需算子。
2 自定義算子的開發
用戶的自定義算子需要集成平臺提供的開發框架,以便能夠在平臺中被正常調用。
開發框架為您提供以下功能:
1、并行計算能力:您可以通過調用開發框架中提供的方法,使您的自定義算子能夠支持并發計算。
2、本地聯調的能力:開發框架模擬了產品線上環境,您可以在本地聯調成功后上傳到云上。
2.1 安裝SDK
1、項目根目錄創建SDK目錄,下載SDK文件并放置在該目錄下:ali_autodrive-0.0.1.tar.gz
2、安裝模塊:pip install sdk/ali_autodrive-0.0.1.tar.gz
2.2 實現抽象類
1、項目根目錄創建模塊(例如example),在example模塊中創建數據處理類(如DataProcessor)。
2、DataProcessor實現抽象方法DataProcessTaskAbstract,重寫拆分方法和處理方法。
import json
from abc import ABC
from PIL import Image as pilImage
from ali_autodrive.parallel_compute.executor_agent.LogUtil import *
from ali_autodrive.parallel_compute.model.FileContent import FileContent
from ali_autodrive.parallel_compute.utils.tree_util import *
from ali_autodrive.parallel_compute.DataProcessTaskAbstract import DataProcessTaskAbstract
from ali_autodrive.parallel_compute.biz_enum.DataTypeEnum import DataTypeEnum
# 節點參數
NODE_DATA_TRANSFORM_PARAMS = "transformParams"
# 車輛ID
VEHICLE_ID = "vehicleId"
class DataProcessor(DataProcessTaskAbstract, ABC):
def __init__(self):
super(DataProcessor, self).__init__()
# 拆分方法
def data_partition(self, context):
self.get_logger().info("Data partition start.")
# 數據分片,每個文件拆分成一個分片的場景
file_list = get_sub_node_file_list(self.get_file_tree())
for file in file_list:
content = FileContent()
content.file_path = file
# 每次保存生成一個分片
self.save_data_partition([json.dumps(content.__dict__)])
self.get_logger().info("Data partition end.")
# 處理方法,圖片格式轉換為JPEG
def data_process(self, context):
self.get_logger().info("Data process start.")
# 獲取工作空間
workspace = self.get_user_workspace()
self.get_logger().info("Workspace is " + workspace)
# 獲取節點配置參數
params = json.loads(self.get_parameters()[NODE_DATA_TRANSFORM_PARAMS])
vehicle_id = params[VEHICLE_ID]
self.get_logger().info("VehicleId is " + vehicle_id)
# 獲取當前分片文件本地路徑列表
local_file_list = get_sub_node_local_file_list(self.get_file_tree())
# 轉換圖片格式,獲取新文件列表
file_contents = self.__convert_image_format(workspace, local_file_list)
# 存儲新文件
self.save_data_partition(file_contents)
self.get_logger().info("Data process end.")
def __convert_image_format(self, workspace, local_file_list):
# 工作空間創建臨時圖片目錄
image_path = os.path.join(workspace, "image")
if not os.path.exists(image_path):
os.makedirs(image_path)
# 逐個轉換圖片格式,臨時目錄創建新文件保存
image_file_list = []
for file in local_file_list:
new_file_name = os.path.basename(file).split(".")[-2] + ".JPEG"
new_file_path = os.path.join(image_path, new_file_name)
im = pilImage.open(file)
im.save(new_file_path, format='JPEG')
im.close()
image_file_list.append(self.__get_file_content(new_file_path, im.height, im.width))
return image_file_list
# 獲取文件描述信息,json結構, 會自動創建數據集,可以在自動駕駛平臺數據檢索和回放
@staticmethod
def __get_file_content(file_path, height, width):
# 文件打標
file_tag = {"header": {}}
file_tag["header"]["data_type"] = DataTypeEnum.camera_img.name
file_tag["height"] = height
file_tag["width"] = width
file_tag["format"] = 'JPEG'
# 待保存文件描述信息
content = FileContent()
content.file_tag = file_tag
content.file_path = file_path
return json.dumps(content.__dict__)
3 本地測試
3.1 創建配置文件
項目根目錄創建配置文件(如config.ini)
[init]
#表示工作目錄,替換為自己機器工作空間
workspace = /Users/icyore/workspace
#模擬OSS地址,替換為自己的測試文件目錄
ossInput = /Users/icyore/oss/input
#模擬OSS地址,替換為測試輸出目錄
ossOutput = /Users/icyore/oss/output
#【可空】算子初始化參數,可以在算子中獲取,同數據管理平臺標準化轉換節點的“轉換程序參數”,JSON字符串類型。
transformParams ={\"vehicleId\":\"并行計算測試車\"}
3.2 創建啟動腳本
項目根目錄創建啟動腳本(如test_start.py)
from ali_autodrive.parallel_compute.service_startup import *
# ./config.ini 配置文件路徑,可以是相對當前目錄路徑,也可以是絕對路徑
# example.DataProcessor 算子所在模塊
# DataProcessor 算子實現類
test("./config.ini", "example.DataProcessor", "DataProcessor")
3.3 運行啟動腳本
運行啟動腳本
python test_start.py
4 創建鏡像
4.1 打包模塊
項目根目錄創建setup.py文件,執行打包命令:python setup.py sdist
注意:打包生成的文件存儲在dist目錄下
# -*- coding:utf-8 -*-
from setuptools import (setup, find_packages)
setup(
# 包名
name="example",
# 版本
version="0.0.1",
# 需要包含的子包列表
packages=find_packages(),
# 添加依賴
install_requires=[
#'python-lzf==0.2.4',
]
)
4.2 創建Dockerfile
項目根目錄創建Dockerfile文件
FROM python:3.8
COPY . /app
WORKDIR /app
RUN pip install sdk/ali_autodrive-0.0.1.tar.gz
ADD sdk/ali_autodrive-0.0.1.tar.gz ali_autodrive
RUN pip install dist/example-0.0.1.tar.gz
WORKDIR /app/ali_autodrive/ali_autodrive-0.0.1/ali_autodrive/parallel_compute
EXPOSE 5000
#example.DataProcessor 算子所在模塊
#DataProcessor 算子實現類
CMD ["python","service_startup.py" ,"example.DataProcessor","DataProcessor"]
提示:為了提升鏡像打包速度,可以將上一個版本當作基礎鏡像,基于基礎鏡像打包,節省模塊安裝耗時。
4.3 上傳鏡像
制作鏡像并上傳到ACR
docker login --username=jieran.gjj@city-brain-pro auto-driver-registry.cn-hangzhou.cr.aliyuncs.com
docker build -t parallel-compute-example:0.0.1 .
docker tag parallel-compute-example:0.0.1 auto-driver-registry.cn-hangzhou.cr.aliyuncs.com/partition_compute/parallel-compute-example:0.0.1
docker push auto-driver-registry.cn-hangzhou.cr.aliyuncs.com/partition_compute/parallel-compute-example:0.0.1
其中auto-driver-registry.cn-hangzhou.cr.aliyuncs.com為ACR倉庫地址,parallel_compute為ACR命名空間,需要替換為自己的地址,并使用自己的賬號進行登錄。
其中parallel-compute-example:0.0.1為鏡像名:版本號,可以自定義名稱和版本號。