外掛負載與 API 集成(Integrate Payloads with the API)
前言大綱
2.1 版本側重於通過創建數據採集管道來簡化傳感外掛載荷與機器人的集成。本文檔概述了開發人員如何編寫必要的 API 服務來集成他們的傳感器。數據採集管道適用於任何外掛載荷,但是對於收集圖像數據的外掛載荷(例如 360 相機)和收集其他數據類型和格式的外掛載荷(例如激光雷達或氣體傳感器)採取不同的開發步驟。
相機外掛載荷Camera Payloads
數據採集服務將自動識別任何目錄註冊的影像服務,為影像源創建 ImageAcquistionCapabilities,並能夠使用標準 ImageService RPC 為每個服務收集影像數據。對於相機外掛負載,開發人員需要實現圖像服務類。為了簡化影像服務的實現,image_service_helpers.py 中提供了一組基礎影像服務輔助函數。
可以使用 CameraBaseImageServicer 類創建圖像服務,該類是一個幫助器類,將管理狀態並響應圖像服務所需的 RPC(ListImageSources RPC 和 GetImage RPC)。 圖像服務助手類的構造函數要求開發人員提供 sdk 機器人實例、服務名稱和 VisualImageSource 列表,這些列表將表示可從此圖像服務捕獲的圖像源。
相機介面
Blocking_capture 接口函數應該是一個區塊函數,它使用硬件的 api 調用相機,收集然後返回圖像數據。虛函數具有以下簽名:
- def blocking_capture(self): returns (image_data: "Any Data Format", acquisition_time_seconds: float)
返回的 image_data 可以是任何類型,但必須在捕獲函數的輸出和解碼函數的輸入之間保持一致。acquisition_time_seconds 應該是服務計算機時鐘中的浮點數,它表示獲取圖像數據的時間。在獲取 image_data 之前,blocking_capture 函數不得返回。
- def image_decode(self, image_data : "Any Data Format", image_proto: image_pb2.Image, image_format : image_pb2.Image.Format, quality_percent : int)
輸入圖像數據將始終與 blocks_capture 函數返回的圖像數據具有相同的格式(和內容)。image_proto 是一個 Image protobuf 消息,它將在 decode 函數中進行變異以包括:
- 1) 圖像數據,
- 2) 像素格式,
- 3) 圖像格式,以及
- 4) 可選的變換快照字段。
最後,可以提供 quality_percent 作為用於解碼圖像數據的參數。
使用後台捕獲執行緒Using Background Capture Threads
當use_background_capture_thread 設置為False 時,CameraBaseImageServicer 將在RPC 完成期間調用blocking_capture 函數。
圖像服務範例
創建影像服務的技巧
image_decode 函數應該處理 image_format=None 的輸入參數。 因此,如果 image_format 為 None,則 image_decode 函數應該選擇最佳/首選格式,並以解碼為該格式的圖像數據進行響應,並將 ImageFormat 字段設置為圖像原型 image proto中選擇的格式。
visual_source1 = VisualImageSource("source1", camera_interface_object)
visual_source2_not_threaded = VisualImageSource2("source2", camera_interface_object)
visual_source1.create_capture_thread()
CameraBaseImageServicer(robot, "image-service-name", [visual_source1, visual_source2_not_threaded], use_background_capture_thread=False)
非圖像外掛載荷
目錄註冊的 DataAcquisitionPluginService 會被機器人上的數據採集服務自動識別。插件服務將收集必要的有效負載數據,以響應從數據採集服務獲取其特定數據的請求。為了簡化插件服務的實現,data_acquisition_plugin_service.py 中提供了一組基礎插件服務輔助函數。
- 1) 服務可以收集數據的數據能力列表,以及
- 2) 收集這些數據的函數。
數據能力列表
- 名稱:唯一標識要收集的數據。
- 描述:對正在收集的數據的簡短的、人類可讀的描述。在遙控和自動行走期間配置數據收集操作時,此說明將顯示在平板電腦上。
- 通道名稱:將與此插件服務收集並存儲在數據緩衝區中的所有數據相關聯的字符串。
使用此信息,數據採集插件將創建一個功能列表,即 Capability(name, description, channel_name)。 例如,收集 稀疏和密集 激光掃描數據的插件將具有以下功能列表:
- kCapabilities = [Capability(name=”sparse", description="Sparse laser scan", channel_name=”laser_scan_sparse”), Capability(name="dense", description="Dense laser scan", channel_name=”laser_scan_dense”)]
數據採集功能
- def data_collect_fn(request : AcquirePluginDataRequest, store_helper : DataAcquisitionStoreHelper)
request 參數是 AcquirePluginDataRequest,它具有acquire_requests 字段,其中包含應由插件服務收集和保存的 數據捕獲列表。 store_helperargument 是一個 DataAcquisitionStoreHelper,它將被調用以保存收集的數據。
需要注意的是,數據採集函數應該在採集有效載荷數據並將其存儲在數據採集存儲服務中的整個過程中進行區塊阻塞,並且直到所有數據被採集並存儲完畢後才返回。
從獲取請求的 action_id 字段創建一個 DataIdentifier。 該數據標識符將與所有收集的數據一起存儲。 如果同一個通道用於插件的所有數據功能,則 data_name 字段應填充數據捕獲的名稱字段。
- data_id = data_acquisition_pb2.DataIdentifier(action_id=request.action_id, channel=CHANNEL_NAME, data_name=DATA_NAME)
通過與外掛負載通信來收集數據。
使用 store_helper 將數據存儲在數據獲取存儲服務中。數據可以存儲為 AssociatedMetadata,即 json 結構化數據,也可以存儲為 原始字節數據raw bytes data。
提示:如果採集到的數據是protobuf消息,protobuf SerializeToString()函數會將 proto 轉換為字節。
- store_helper.store_data(BYTES_DATA, data_id)
message = data_acquisition_pb2.AssociatedMetadata()
message.reference_id.action_id.CopyFrom(request.action_id)
message.metadata.data.update({
"data": “special_data”
})
store_helper.store_metadata(message, data_id)
請注意,DataAcquisitionPluginService 類將等待響應 AcquirePluginData RPC,直到收集到所有數據並完成所有存儲調用。
插件範例
錯誤報告
- store_helper.state.add_errors([make_error(data_id, “Error Message!”)])
如果插件服務遇到與特定數據無關的錯誤,數據收集功能可以引發 異常Exception。此異常將被基礎 DataAcquisitionPluginService 類捕獲,並將導致 AcquirePluginData RPC 以 STATUS_INTERNAL_ERROR 進行響應。 數據採集服務會為此獲取失敗創建一個PluginError,它會出現在發送給數據採集服務的GetStatus RPC中。
創建數據採集插件的技巧
- store_helper.state.set_status(data_acquisition_pb2.GetStatusResponse.STATUS_SAVING)
數據收集功能可以是它自己的類的一部分,用於管理特定於外掛負載的狀態。 此外,如果為數據收集和管理創建了一個單獨的類,它可以使用後台執行緒來收集和緩衝數據,以加快對 AcquirePluginData RPC 的響應。
如果數據獲取服務花費的時間超過 30 秒,則數據獲取服務會將獲取請求標記為 STATUS_TIMEDOUT。插件可以通過在此簽名之後提供附加功能來擴展使用的超時:
- def acquire_response_fn(request:data_acquisition_pb2.AcquirePluginDataRequest, response:data_acquisition_pb2.AcquirePluginDataResponse): returns Boolean
獲取響應函數可以驗證請求表頭並檢查獲取請求是否有效。 如果不是,則應修改響應參數以設置狀態字段。 同樣,可以通過更新響應的 timeout_deadline 字段來延長有效函數的超時時間。 請注意,超時應以機器人的時鐘設置。 最終,獲取響應函數將響應一個布林值,指示獲取請求是否有效,如果有效,則數據收集將繼續。
如果插件的數據收集很慢,它應該定期檢查 RPC 是否被取消,以便它可以立即停止不再需要的數據收集。在數據收集功能中,檢查採集是否被取消:
- store_helper.state.cancel_check()
在數據收集函數中使用異步函數與外掛負載進行通信時要小心。服務架構期望數據收集過程區塊阻塞,直到所有數據被完全收集和存儲。
將元數據Metadata附加到其他數據或圖像
元數據將被配置為 JSON 數據,並且可以存儲為 AssociatedMetadata proto。 這個 proto 包含一個 reference_id 字段,它將是它應該關聯的數據中的 DataIdentifier; 如果僅填寫該標識符的 action_id,則元數據與整個操作(所有重複的數據標識符)相關聯。
插件可以使用 store helper 和 AssociatedMetadata proto 和一個新的 DataIdentifier 來存儲此關聯的元數據,該 DataIdentifier 唯一標識關聯的元數據(而不是元數據所引用的內容):
- store_helper.store_metadata(associated_metadata_proto, data_id)
目錄註冊和運行新服務
在向目錄註冊服務之前,運行新服務的外掛負載計算機應使用外掛負載的 GUID/secret 執行外掛負載身份驗證。
測試新服務
對於其他類型的服務,tester_programs 示例中提供了通用服務檢查輔助函數,可用於檢查目錄註冊、與服務的 gRPC 通信以及服務是否存在活動服務故障。 其他調試工具包括 執行特定服務類型的不同 API 示例(例如 : get_image example)或命令行,它可以通過運行以下命令列出完整的選項和操作集:
- python3 -m bosdyn.client --username {USERNAME} --password {PASSWORD} {ROBOT_IP} --help
留言
張貼留言
Aron阿龍,謝謝您的留言互動!