外掛負載與 API 集成(Integrate Payloads with the API)

前言大綱

2.1 版本側重於通過創建數據採集管道來簡化傳感外掛載荷與機器人的集成。本文檔概述了開發人員如何編寫必要的 API 服務來集成他們的傳感器。數據採集管道適用於任何外掛載荷,但是對於收集圖像數據的外掛載荷(例如 360 相機)和收集其他數據類型和格式的外掛載荷(例如激光雷達或氣體傳感器)採取不同的開發步驟。

為了完全集成傳感器,有效載荷需要根據將收集和存儲的數據類型實現波士頓動力 API ImageServiceDataAcquisitionPluginService。這些服務會註冊到robot上的目錄服務,數據採集服務會自動檢測新的服務並與它們通信以獲取數據採集請求。下圖顯示了預期的通信圖。

相機外掛載荷Camera Payloads

要將以已知格式(例如 raw bytes  或 JPEG)輸出圖像數據的相機外掛負載與數據採集管道集成,開發人員需要實現 ImageService

數據採集服務將自動識別任何目錄註冊的影像服務,為影像源創建 ImageAcquistionCapabilities,並能夠使用標準 ImageService RPC 為每個服務收集影像數據。對於相機外掛負載,開發人員需要實現圖像服務類。為了簡化影像服務的實現,image_service_helpers.py 中提供了一組基礎影像服務輔助函數。

對於圖像服務中可用的每個圖像源,應創建 VisualImageSource 幫助器類來創建必要的原型,以響應來自圖像服務的 gRPC 請求、請求和解碼來自特定圖像源的圖像數據,以及創建或清除 特定於圖像源的故障。構造子函數需要提供圖像源名稱(作為字符串),然後是一個從 CameraInterface 繼承並覆蓋捕獲圖像數據和解碼數據的函數的類。可選地,圖像中的像素行數和列數可以在 VisualImageSource 構造函數中作為整數值提供。 增益和曝光時間可以作為動態確定值的函數或 VisualImageSource 構造函數中的固定浮點值提供。

可以使用 CameraBaseImageServicer 類創建圖像服務,該類是一個幫助器類,將管理狀態並響應圖像服務所需的 RPC(ListImageSources RPC 和 GetImage RPC)。 圖像服務助手類的構造函數要求開發人員提供 sdk 機器人實例、服務名稱和 VisualImageSource 列表,這些列表將表示可從此圖像服務捕獲的圖像源。

相機介面

每個圖像源都應該實現一個類,該類繼承父類 CameraInterface 並覆蓋虛擬函數 blocks_captureimage_decode

Blocking_capture 接口函數應該是一個區塊函數,它使用硬件的 api 調用相機,收集然後返回圖像數據。虛函數具有以下簽名:

  • def blocking_capture(self): returns (image_data: "Any Data Format", acquisition_time_seconds: float)

返回的 image_data 可以是任何類型,但必須在捕獲函數的輸出和解碼函數的輸入之間保持一致。acquisition_time_seconds 應該是服務計算機時鐘中的浮點數,它表示獲取圖像數據的時間。在獲取 image_data 之前,blocking_capture 函數不得返回。

image_decode 介面函數應將圖像數據解碼為所需格式的字節,並使用解碼數據填充圖像原始消息。虛函數具有以下簽名:
  • def image_decode(self, image_data : "Any Data Format", image_proto: image_pb2.Image, image_format : image_pb2.Image.Format, quality_percent : int)

輸入圖像數據將始終與 blocks_capture 函數返回的圖像數據具有相同的格式(和內容)。image_proto 是一個 Image protobuf 消息,它將在 decode 函數中進行變異以包括:

  • 1) 圖像數據,
  • 2) 像素格式,
  • 3) 圖像格式,以及 
  • 4) 可選的變換快照字段。
解碼函數不需要返回任何東西,但它必須變異圖像protobuf消息。

image_format 是請求的格式(例如 jpeg、raw); 如果圖像數據無法解碼為特定格式,則應在 image_decode 函數中引發異常,提供錯誤消息(例如 raise Exception("Unable to decode to FORMAT_RAW for the web cam"))。 如果請求的圖像格式為 None,則 image_decode 函數應該選擇最好的

最後,可以提供 quality_percent 作為用於解碼圖像數據的參數。

注意:平板電腦和機器人上的數據採集服務請求JPEG格式的圖像,因此image_decode函數必須至少能夠響應 image_format=image_pb2.Image.FORMAT_JPEG 以確保可以在平板電腦上查看外掛相機。

使用後台捕獲執行緒Using Background Capture Threads

圖像服務的 GetImage RPC 根據請求的圖像源使用相機的 API(例如 OpenCV)從外掛負載中檢索圖像,並將它們轉換為服務返回的 bosdyn.api.Image proto 消息。此 RPC 旨在在與外掛負載通信並返回收集的圖像時“快速”完成。為了對較慢的捕獲啟用對 RPC 的快速響應,CameraBaseImageServicer 圖像服務構造函數具有參數 use_background_capture_thread,當設置為 True 時,將為每個圖像源創建一個後台執行緒,該執行緒將從相機外掛負載中連續獲取圖像並保持緩衝區這些圖像以減少響應 RPC 時的延遲。然後,CameraBaseImageServicer 將使用捕獲執行緒中存儲在緩衝區中的最新圖像來響應 GetImage RPC,而不是調用服務。默認情況下,圖像服務會將 use_background_capture_thread 設置為 True 並為每個圖像源創建執行緒。

use_background_capture_thread 設置為False 時,CameraBaseImageServicer 將在RPC 完成期間調用blocking_capture 函數。

圖像服務範例

有兩個 SDK 示例顯示了使用輔助函數的 ImageService 實現: USB web camera和  Ricoh Theta camera.。

創建影像服務的技巧

除了圖像數據之外,image_decode 函數還應嘗試準確填寫圖像原型的 PixelFormat 和 ImageFormat 字段。這允許最終用戶應用程序更準確地解碼圖像數據。

image_decode 函數應該處理 image_format=None 的輸入參數。 因此,如果 image_format None,則 image_decode 函數應該選擇最佳/首選格式,並以解碼為該格式的圖像數據進行響應,並將 ImageFormat 字段設置為圖像原型 image proto中選擇的格式。

要讓特定相機源運行後台執行緒,但同一服務中的其他源在 RPC 時運行 Blocking_capture 函數,請將 use_background_capture_thread 作為 False 提供給 CameraBaseImageServicer 構造函數。 另外,對於 VisualImageSource 物件,在將源列表傳遞給影像服務構造函數之前使用 create_capture_thread() 函數啟動執行緒。 以下是演示該過程的偽代碼:
visual_source1 = VisualImageSource("source1", camera_interface_object)
visual_source2_not_threaded = VisualImageSource2("source2", camera_interface_object)
visual_source1.create_capture_thread()
CameraBaseImageServicer(robot, "image-service-name", [visual_source1, visual_source2_not_threaded], use_background_capture_thread=False)
在開發影像服務時,如果服務表現不符合預期或意外失敗,可以使用影像服務測試程序(python/examples/tester_programs/image_service_tester.py)通過詳細顯示系統的故障來幫助測試和調試常見故障模式 輸出。

非圖像外掛載荷

要將輸出不同的非圖像格式數據的外掛負載與數據採集管道集成,開發人員需要實現 DataAcquisitionPluginService

目錄註冊的 DataAcquisitionPluginService 會被機器人上的數據採集服務自動識別。插件服務將收集必要的有效負載數據,以響應從數據採集服務獲取其特定數據的請求。為了簡化插件服務的實現,data_acquisition_plugin_service.py 中提供了一組基礎插件服務輔助函數。

可以使用 DataAcquisitionPluginService 類創建插件服務,該類是一個幫助器類,將管理狀態並響應插件服務所需的 RPC。插件服務幫助類(Helper Class)的構造函數要求開發者提供 
  • 1) 服務可以收集數據的數據能力列表,以及 
  • 2) 收集這些數據的函數。

數據能力列表

對於將從負載中收集的每條數據,插件服務必須指定:
  • 名稱:唯一標識要收集的數據。
  • 描述:對正在收集的數據的簡短的、人類可讀的描述。在遙控和自動行走期間配置數據收集操作時,此說明將顯示在平板電腦上。
  • 通道名稱:將與此插件服務收集並存儲在數據緩衝區中的所有數據相關聯的字符串。
建議將不同類型的捕獲放在不同的通道上。 如果要在同一個抓包動作中存儲多條數據在同一個通道上,請在DataIdentifier中設置data_name,這樣仍然可以唯一標識這些數據。

使用此信息,數據採集插件將創建一個功能列表,即 Capability(name, description, channel_name)。 例如,收集 稀疏和密集 激光掃描數據的插件將具有以下功能列表:

  • kCapabilities = [Capability(name=”sparse", description="Sparse laser scan", channel_name=”laser_scan_sparse”), Capability(name="dense", description="Dense laser scan", channel_name=”laser_scan_dense”)]

此功能列表將傳遞給 DataAcquisitionPluginService 類的構造函數,並用於響應 GetServiceInfo RPC。

數據採集功能

除了能力之外,插件服務還必須實現一個數據收集功能,它將收集所有請求的數據並將其存儲在數據獲取存儲服務中。 當插件服務收到 AcquirePluginData RPC 時,將運行此數據收集功能。 數據收集函數必須具有以下簽名:
  • def data_collect_fn(request : AcquirePluginDataRequest, store_helper :  DataAcquisitionStoreHelper)

request 參數是 AcquirePluginDataRequest,它具有acquire_requests 字段,其中包含應由插件服務收集和保存的 數據捕獲列表。 store_helperargument 是一個 DataAcquisitionStoreHelper,它將被調用以保存收集的數據。

需要注意的是,數據採集函數應該在採集有效載荷數據並將其存儲在數據採集存儲服務中的整個過程中進行區塊阻塞,並且直到所有數據被採集並存儲完畢後才返回。

數據收集函數應該為作為參數傳遞給函數的acquisition_requests 列表中請求的每個DataCapture 執行以下操作:

從獲取請求的 action_id 字段創建一個 DataIdentifier。 該數據標識符將與所有收集的數據一起存儲。 如果同一個通道用於插件的所有數據功能,則 data_name 字段應填充數據捕獲的名稱字段。

  • data_id = data_acquisition_pb2.DataIdentifier(action_id=request.action_id, channel=CHANNEL_NAME, data_name=DATA_NAME)

通過與外掛負載通信來收集數據。

注意:如果用戶已取消或超時,長時間運行的獲取應確保偶爾調用 store_helper.state.cancel_check() 以提前並乾淨地退出。

使用 store_helper 將數據存儲在數據獲取存儲服務中。數據可以存儲為 AssociatedMetadata,即 json 結構化數據,也可以存儲為 原始字節數據raw bytes data

要將數據保存為原始字節,請使用 store_data 函數。

提示:如果採集到的數據是protobuf消息,protobuf  SerializeToString()函數會將 proto 轉換為字節。

  • store_helper.store_data(BYTES_DATA, data_id)
要將數據保存為關聯的元數據metadata,請使用 store_metadata 函數。 AssociatedMetadata proto 消息用於打包 json 數據,該 proto 的 reference_id 應包含 request.action_id
message = data_acquisition_pb2.AssociatedMetadata()
message.reference_id.action_id.CopyFrom(request.action_id)
message.metadata.data.update({
    "data": “special_data”
})
store_helper.store_metadata(message, data_id)

請注意,DataAcquisitionPluginService 類將等待響應 AcquirePluginData RPC,直到收集到所有數據並完成所有存儲調用。

插件範例

有幾個 插件範例服務example plugin services 顯示數據收集、創建關聯的元數據和存儲以下類型的外掛載荷的數據:Piksi GPS、PointCloud 服務和通用 GPS 元數據。

錯誤報告

如果插件服務出現故障,可以向數據採集服務報告錯誤。如果特定數據的收集失敗,則可以返回帶有請求的特定 DataIdentifierDataError
  • store_helper.state.add_errors([make_error(data_id, “Error Message!”)])

如果插件服務遇到與特定數據無關的錯誤,數據收集功能可以引發 異常Exception。此異常將被基礎 DataAcquisitionPluginService 類捕獲,並將導致 AcquirePluginData RPC 以 STATUS_INTERNAL_ERROR 進行響應。 數據採集服務會為此獲取失敗創建一個PluginError,它會出現在發送給數據採集服務的GetStatus RPC中。

創建數據採集插件的技巧

通常,插件會在使用 store_helper 存儲任何數據之前立即獲取所有請求的數據。 這允許插件準確地將狀態設置為 STATUS_SAVING 以指示所有獲取已完成並在開始存儲其數據之前為 GetStatus RPC 提供反饋。 同樣,插件可以在保存任何數據之前檢查它是否已被取消,以防止為以後取消的請求意外存儲數據。
  • store_helper.state.set_status(data_acquisition_pb2.GetStatusResponse.STATUS_SAVING)
在開發數據採集插件服務時,如果服務不按預期運行或出現意外故障,可以使用插件測試程序(python/examples/tester_programs/plugin_tester.py)通過詳細顯示的方式幫助測試和調試常見故障模式 系統的輸出。

數據收集功能可以是它自己的類的一部分,用於管理特定於外掛負載的狀態。 此外,如果為數據收集和管理創建了一個單獨的類,它可以使用後台執行緒來收集和緩衝數據,以加快對 AcquirePluginData RPC 的響應。

如果數據獲取服務花費的時間超過 30 秒,則數據獲取服務會將獲取請求標記為 STATUS_TIMEDOUT。插件可以通過在此簽名之後提供附加功能來擴展使用的超時:

  • def acquire_response_fn(request:data_acquisition_pb2.AcquirePluginDataRequest, response:data_acquisition_pb2.AcquirePluginDataResponse): returns Boolean

獲取響應函數可以驗證請求表頭並檢查獲取請求是否有效。 如果不是,則應修改響應參數以設置狀態字段。 同樣,可以通過更新響應的 timeout_deadline 字段來延長有效函數的超時時間。 請注意,超時應以機器人的時鐘設置。 最終,獲取響應函數將響應一個布林值,指示獲取請求是否有效,如果有效,則數據收集將繼續。

如果插件的數據收集很慢,它應該定期檢查 RPC 是否被取消,以便它可以立即停止不再需要的數據收集。在數據收集功能中,檢查採集是否被取消:

  • store_helper.state.cancel_check()

在數據收集函數中使用異步函數與外掛負載進行通信時要小心。服務架構期望數據收集過程區塊阻塞,直到所有數據被完全收集和存儲。

將元數據Metadata附加到其他數據或圖像

額外的元數據,例如機器人狀態或傳感器配置信息,可以與插件收集的外部數據或來自圖像服務的圖像相關聯地存儲。 為了保存與每條數據鏈接的元數據,可以創建一個 DataAcquisitionPluginService 來收集和保存這些元數據。 該插件將列出代表每條附加元數據的捕獲操作。 該插件將從機器人上的數據獲取服務接收 AcquirePluginData 請求,其中將包含重複的 DataIdentifier 列表,元數據插件可使用這些列表來存儲與這些已識別數據相關聯的元數據。

元數據將被配置為 JSON 數據,並且可以存儲為 AssociatedMetadata proto。 這個 proto 包含一個 reference_id 字段,它將是它應該關聯的數據中的 DataIdentifier; 如果僅填寫該標識符的 action_id,則元數據與整個操作(所有重複的數據標識符)相關聯。

插件可以使用 store helperAssociatedMetadata proto 和一個新的 DataIdentifier 來存儲此關聯的元數據,該 DataIdentifier 唯一標識關聯的元數據(而不是元數據所引用的內容):

  • store_helper.store_metadata(associated_metadata_proto, data_id)
從數據採集存儲下載或檢索數據時,每個動作保存的元數據也將被檢索,並可輕鬆鏈接回存儲的數據。

目錄註冊和運行新服務

新服務,無論是數據採集插件服務還是圖像服務,都必須在目錄中運行並註冊,才能與機器人和數據採集服務進行通信。 這需要唯一的服務名稱、服務類型(“bosdyn.api.DataAcquisitionPluginService”或“bosdyn.api.ImageService”)和服務權限。

在向目錄註冊服務之前,運行新服務的外掛負載計算機應使用外掛負載的  GUID/secret  執行外掛負載身份驗證。

測試新服務

對於 DataAcquisitionPluginService 和 ImageService 服務,可以使用 測試程序 tester programs 來檢查新服務的通信和目錄註冊,以及 API 服務的特定功能。 插件測試器腳本應該在新服務也在運行的同時運行,並將關於運行測試時發生的任何錯誤或警告的輸出打印到終端。

對於其他類型的服務,tester_programs 示例中提供了通用服務檢查輔助函數,可用於檢查目錄註冊、與服務的 gRPC 通信以及服務是否存在活動服務故障。 其他調試工具包括 執行特定服務類型的不同 API 示例(例如 :  get_image example)或命令行,它可以通過運行以下命令列出完整的選項和操作集:

  • python3 -m bosdyn.client --username {USERNAME}  --password {PASSWORD} {ROBOT_IP} --help


參考資料

特色、摘要,Feature、Summary:

關鍵字、標籤,Keyword、Tag:

留言

這個網誌中的熱門文章

Ubuntu 常用指令、分類與簡介

iptables的觀念與使用

網路設定必要參數IP、netmask(遮罩)、Gateway(閘道)、DNS

了解、分析登錄檔 - log

Python 與SQLite 資料庫

Blogger文章排版範本

Pandas 模組

如何撰寫Shell Script

查詢指令或設定 -Linux 線上手冊 - man

下載網頁使用 requests 模組