使用兩個攝像頭來計算深度圖,在沒有RGBD的情況下可以使用,不過為了保持效率,需要使用Jetson邊緣計算設備。
先輸入的是兩個IMX219的視頻流,捕獲的視頻流是1080P的,接著傳為G流這個后處理器,看圖是處理了一下顏色空間。
這個VPI我查一下,是英偉達家的視覺編程接口,總之就是進行了一些轉換的過程。
大概是這樣的
最后進行這個合成運算和對齊
?
capture->rectify->SGBM->depth calc.
?
數據流管道
處理流程
IMX219,Sony家的東西
sudo apt install libnvvpi1 python3-vpi1 vpi1-dev
需要安裝的庫
https://repo.download.nvidia.com/jetson/
但是這個源碼我找不到了,就是找到了deb的庫。
英偉達的庫好全啊
當然了,算法很全面
安裝后里面有的內容
可以處理的視頻流后端,這里是支持jetson的(廢話)
后端使用CUDA的結果
寫一個,Python是快,但是不能進行內存管理,CSDK可以。U8是八位無符號數。
?
#include#include #if CV_MAJOR_VERSION >= 3#include #else#include #endif#include #include #include #include #include #include int main(int argc, char *argv[]){ if (argc != 2) { std::cerr << "Must pass an input image to be blurred" << std::endl; return 1; } cv::Mat cvImage = cv::imread(argv[1]); if (cvImage.data == NULL) { std::cerr << "Can't open input image" << std::endl; return 2; } VPIStream stream; vpiStreamCreate(0, &stream); VPIImage image; vpiImageCreateWrapperOpenCVMat(cvImage, 0, &image); VPIImage imageGray; vpiImageCreate(cvImage.cols, cvImage.rows, VPI_IMAGE_FORMAT_U8, 0, &imageGray); VPIImage blurred; vpiImageCreate(cvImage.cols, cvImage.rows, VPI_IMAGE_FORMAT_U8, 0, &blurred); vpiSubmitConvertImageFormat(stream, VPI_BACKEND_CUDA, image, imageGray, NULL); vpiStreamSync(stream); VPIImageData outData; vpiImageLockData(blurred, VPI_LOCK_READ, VPI_IMAGE_BUFFER_HOST_PITCH_LINEAR, &outData); cv::Mat cvOut; vpiImageDataExportOpenCVMat(outData, &cvOut); imwrite("tutorial_blurred.png", cvOut); vpiImageUnlock(blurred); vpiStreamDestroy(stream); vpiImageDestroy(image); vpiImageDestroy(imageGray); vpiImageDestroy(blurred); return 0;}
?
CPP的版本,還需要Cmake編譯
主要是為了加速運行視覺的算法
算法:表示不可分割的計算操作。
后端:代表負責實際計算的硬件引擎。
Streams:充當提交算法的異步隊列,最終在給定后端按順序執行。流和事件是計算管道的構建塊。
緩沖區:存儲輸入和輸出數據。
事件:提供在流和/或應用程序線程上操作的同步原語。
上下文:保存 VPI 和創建對象的狀態。
可以運行的硬件
補一個硬件的淘汰路線
最近一個版本是有了Python的支持
真新啊
這個項目一開始需要的是進行攝像頭的標定。
導入的庫
將棋盤格引入
?
wget https://www.dropbox.com/sh/j4w6tg9b0ov209g/AAA1HfT7Yy33tmGI1tWaqDKCa?dl=1 -O /tmp/calib_files.zipunzip?/tmp/calib_files.zip
?
需要提前執行一下
接著開始處理
ret, K_l, dist_coeff_l, rvecs, tvecs = cv2.calibrateCamera(obj_pts, img_pts, (arr.shape[1], arr.shape[0]), None,None)
得到校準的參數
參數到手
img = Image.open(FOLDER+"001.png")arr = np.array(img) arr_corr = cv2.undistort(arr, K_l, dist_coeff_l, None, K_l) fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(20,10))ax1.imshow(arr, cmap='gray')ax2.imshow(arr_corr, cmap='gray')plt.show()
因為是魚眼的相機,這里使用標定的參數進行校正。
就是這樣
因為是兩個相機,記得標定兩次,然后保存參數:
np.save("K_l.npy", K_l)np.save("K_r.npy", K_r) np.save("dist_coeff_l.npy", dist_coeff_l)np.save("dist_coeff_r.npy", dist_coeff_r)
重新開個Jupyter,導入庫,生成一張新圖,把相機的照片準備好
建立一些列表來存放我們的數據,使用ZIP來進行遍歷,處理角點
現在就只相當于左右兩個棋盤圖像對齊操作
將攝像頭對齊
使用立體校正
rect_l, rect_r, proj_mat_l, proj_mat_r, Q, roiL, roiR = cv2.stereoRectify(K_l, dist_l, K_r, dist_r, gray_l.shape[::-1], Rot, Trns, 1 ,(0,0))
left_stereo_maps = cv2.initUndistortRectifyMap(K_l, dist_l, rect_l, proj_mat_l, gray_l.shape[::-1], cv2.CV_16SC2)right_stereo_maps = cv2.initUndistortRectifyMap(K_r, dist_r, rect_r, proj_mat_r, gray_l.shape[::-1], cv2.CV_16SC2)
初始化每個校正過的視圖
arr_l_rect = cv2.remap(arr_l, left_stereo_maps[0],left_stereo_maps[1], cv2.INTER_LANCZOS4, cv2.BORDER_CONSTANT, 0)arr_r_rect = cv2.remap(arr_r, right_stereo_maps[0],right_stereo_maps[1], cv2.INTER_LANCZOS4, cv2.BORDER_CONSTANT, 0)
將函數進行進行映射
開始進行校正
看一張就行
效果不錯的情況下都安排了
讀取所有的圖像
對單個圖像校準
fig, axs = plt.subplots(2, 2, figsize=(20,10)) # beforeaxs[0][0].title.set_text('Original L')axs[0][0].imshow(arr_l) axs[0][1].title.set_text('Original R')axs[0][1].imshow(arr_r) axs[1][0].title.set_text('Rectified L')axs[1][0].imshow(arr_l_rect)axs[1][0].title.set_text('Rectified R')axs[1][1].imshow(arr_r_rect)
?
結果
開始融合
可以先看下數組形狀
X和Y的
組合一下
最終的結果
import timeimport queueimport numpy as npimport cv2from threading import Threadfrom camera import Camerafrom concurrent.futures import ThreadPoolExecutor MAX_DISP = 128WINDOW_SIZE = 10 class Depth(Thread): def __init__(self): super().__init__() print("Reading camera calibration...") self._map_l, self._map_r = get_calibration() self._cam_r = CameraThread(0) self._cam_l = CameraThread(1) self._disp_arr = None self._should_run = True self._dq = queue.deque(maxlen=3) self._executor = ThreadPoolExecutor(max_workers=4) self.start() # Wait for the deque to start filling up while len(self._dq) < 1: time.sleep(0.1) def stop(self): self._should_run = False self._cam_l.stop() self._cam_r.stop() def disparity(self): while len(self._dq) == 0: time.sleep(0.01) return self._dq.pop() def enqueue_async(self, vpi_image): arr = vpi_image.cpu().copy() self._dq.append(arr) del vpi_image def run(self): import vpi # Don't import vpi in main thread to avoid creating a global context i = 0 self._warp_l = make_vpi_warpmap(self._map_l) self._warp_r = make_vpi_warpmap(self._map_r) ts_history = [] while self._should_run: i += 1 ts = [] ts.append(time.perf_counter()) with vpi.Backend.CUDA: ts.append(time.perf_counter()) # confidenceMap = vpi.Image(vpi_l.size, vpi.Format.U16) # Read Images arr_l = self._cam_l.image arr_r = self._cam_r.image ts.append(time.perf_counter()) # Convert to VPI image vpi_l = vpi.asimage(arr_l) vpi_r = vpi.asimage(arr_r) ts.append(time.perf_counter()) # Rectify vpi_l = vpi_l.remap(self._warp_l) vpi_r = vpi_r.remap(self._warp_r) ts.append(time.perf_counter()) # Resize vpi_l = vpi_l.rescale( (480, 270), interp=vpi.Interp.LINEAR, border=vpi.Border.ZERO ) vpi_r = vpi_r.rescale( (480, 270), interp=vpi.Interp.LINEAR, border=vpi.Border.ZERO ) ts.append(time.perf_counter()) # Convert to 16bpp vpi_l_16bpp = vpi_l.convert(vpi.Format.U16, scale=1) vpi_r_16bpp = vpi_r.convert(vpi.Format.U16, scale=1) ts.append(time.perf_counter()) # Disparity disparity_16bpp = vpi.stereodisp( vpi_l_16bpp, vpi_r_16bpp, out_confmap=None, backend=vpi.Backend.CUDA, window=WINDOW_SIZE, maxdisp=MAX_DISP, ) ts.append(time.perf_counter()) # Convert again disparity_8bpp = disparity_16bpp.convert( vpi.Format.U8, scale=255.0 / (32 * MAX_DISP) ) ts.append(time.perf_counter()) # CPU mapping # self._disp_arr = disparity_8bpp self._executor.submit(self.enqueue_async, disparity_8bpp) # global frames # frames.append(self._disp_arr) ts.append(time.perf_counter()) # Render # cv2.imshow("Disparity", disp_arr) # cv2.waitKey(1) ts.append(time.perf_counter()) ts = np.array(ts) ts_deltas = np.diff(ts) ts_history.append(ts_deltas) debug_str = f"Iter {i} " tasks = [ "Enter context", "Read images", "Convert to VPI image", "Rectify", "Resize 1080p->270p", "Convert to 16bpp", "Disparity", "Convert 16bpp->8bpp", ".cpu() mapping", "OpenCV colormap", "Render", ] debug_str += f"Sum: {sum(ts_deltas):0.2f} " task_acc_time = 1000 * np.array(ts_history).mean(axis=0) for task, dt in zip(tasks, task_acc_time): debug_str += f"{task.ljust(20)} {dt:0.2f} " # print(debug_str) if i % 10 == 0: vpi.clear_cache() def disp2depth(disp_arr): F, B, PIXEL_SIZE = (CAM_PARAMS.F, CAM_PARAMS.B, CAM_PARAMS.PIXEL_SIZE) disp_arr[disp_arr < 10] = 10 disp_arr_mm = disp_arr * PIXEL_SIZE # convert disparites from px -> mm depth_arr_mm = F * B / (disp_arr_mm + 1e-10) # calculate depth return depth_arr_mm def get_calibration() -> tuple: fs = cv2.FileStorage( "calib/rectify_map_imx219_160deg_1080p.yaml", cv2.FILE_STORAGE_READ ) map_l = (fs.getNode("map_l_x").mat(), fs.getNode("map_l_y").mat()) map_r = (fs.getNode("map_r_x").mat(), fs.getNode("map_r_y").mat()) fs.release() return map_l, map_r def make_vpi_warpmap(cv_maps): import vpi src_map = cv_maps[0] idk_what_that_is = cv_maps[1] map_y, map_x = src_map[:, :, 0], src_map[:, :, 1] warp = vpi.WarpMap(vpi.WarpGrid((1920, 1080))) # (H, W, C) -> (C,H,W) arr_warp = np.asarray(warp) wx = arr_warp[:, :, 0] wy = arr_warp[:, :, 1] wy[:1080, :] = map_x wx[:1080, :] = map_y return warp class CameraThread(Thread): def __init__(self, sensor_id) -> None: super().__init__() self._camera = Camera(sensor_id) self._should_run = True self._image = self._camera.read() self.start() def run(self): while self._should_run: self._image = self._camera.read() @property def image(self): # NOTE: if we care about atomicity of reads, we can add a lock here return self._image def stop(self): self._should_run = False self._camera.stop() def calculate_depth(disp_arr): F, B, PIXEL_SIZE = (CAM_PARAMS.F, CAM_PARAMS.B, CAM_PARAMS.PIXEL_SIZE) disp_arr_mm = disp_arr * PIXEL_SIZE # disparites in mm depth_arr_mm = F * B / (disp_arr_mm + 1e-1) # distances return depth_arr_mm if __name__ == "__main__": DISPLAY = True SAVE = True frames_d = [] frames_rgb = [] depth = Depth() t1 = time.perf_counter() for i in range(100): disp_arr = depth.disparity() frames_d.append(disp_arr) frames_rgb.append(depth._cam_l.image) print(i) # print(disp_arr) # depth_arr = calculate_depth(disp_arr) # print(f"{i}/20: {disp_arr:0.2f}") # time.sleep(1) if DISPLAY: disp_arr = cv2.applyColorMap(disp_arr, cv2.COLORMAP_TURBO) cv2.imshow("Depth", disp_arr) cv2.imshow("Image", cv2.resize(depth._cam_l.image, (480, 270))) cv2.waitKey(1) depth.stop() t2 = time.perf_counter() print(f"Approx framerate: {len(frames_d)/(t2-t1)} FPS") # Save frames for i, (disp_arr, rgb_arr) in enumerate(zip(frames_d, frames_rgb)): print(f"{i}/{len(frames_d)}", end=" ") disp_arr = cv2.applyColorMap(disp_arr, cv2.COLORMAP_TURBO) cv2.imwrite(f"images/depth_{i}.jpg", disp_arr) cv2.imwrite(f"images/rgb_{i}.jpg", rgb_arr)
寫了這么多了,不寫了,給個demo。
給出相機的打印件
后面
由遠及近
編輯:黃飛
評論
查看更多