【小白深度教程 1.31】手把手教你使用 Open3D(14)对激光雷达(点云)数据进行车道线分割

【小白深度教程 1.31】手把手教你使用 Open3D(14)对激光雷达(点云)数据进行车道线分割

1. 激光雷达车道线(道路标记)分割的概念

在三维点云中检测道路标记是自动驾驶汽车面临的一个关键挑战。

传统的道路标记分割方法是使用聚类算法获取地面平面,然后通过后处理从地面中提取道路标记。

这种方法的一个显著局限是地面平面的多样化地形。因此,算法可能会错误地在不可通行的表面上检测到车道标记,这对自动驾驶车辆构成严重风险。

为了解决这些挑战,我们决定将问题分解为两个不同的子问题:

  1. 可通行表面提取 :这包括提取仅供汽车行驶的沥青路面,不包括人行道和便道等元素。
  2. 道路标记分割算法 :在可通行表面上应用一种独特的分割算法,精确提取道路标记。

通过有效地解决第一个子问题并准确提取可通行表面,我们为接下来的道路标记分割步骤的最佳表现奠定了基础。

在这里插入图片描述

2. 可通行表面提取(Cylinder3D 模型)

为了实现精确的可通行表面提取,传统利用手工特征的算法往往难以在不同场景和环境中进行有效泛化。为了实现更具适应性的方法,我们决定使用深度学习模型。

优秀的深度学习模型能够出色地进行泛化,并具备适应不同驾驶场景的能力,从而增强可通行表面提取的鲁棒性。

我们选择了包含可通行表面类别的 NuScenes LiDAR 分割数据集。

我们选择圆柱形三维卷积网络模型(Cylindrical 3D Convolution Network)的原因在于它在我们实验时 NuScenes LiDAR 分割挑战排行榜上表现优异。

此外,开源代码的可用性也影响了我们的选择。

Cylinder3D 通过将点云体素化为圆柱体素,与传统的立方体体素化方法不同。

随后,它采用类似 U-Net 的架构对体素进行分割,为每个体素分配特定类别。

该论文的独特之处在于其独特的圆柱体素化方法,使其有别于传统方法。

在这里插入图片描述
圆柱体素化通过确保点在体素内更均匀的分布来提升模型性能。相比之下,立方体素化往往将点集中在少量体素中,导致大量信息丢失。

在体素化过程中,体素的类别由多数投票决定,这会导致与多数类别不同的点被丢弃。为了最大限度地减少信息损失,更均匀的点分布至关重要。圆柱体素中点的良好分布有助于减少信息损失,从而提高模型的准确性和鲁棒性。

在这里插入图片描述

下图展示了模型在 nuScenes 数据上的表现。可通行表面类别以浅蓝色显示,如图所示,模型能够很好地识别可通行表面。

在这里插入图片描述

3. 道路标记分割算法

在成功提取可通行表面后,下一步是确定如何获取道路标记。我们首先对数据进行了可视化和探索,以确定最佳方法。

下图中,仅显示了可通行表面的强度,可以看出道路标记是可见的。这一观察促使我们继续进行道路标记的提取。

在这里插入图片描述

起初,我们采用了简单的方法,将大津(Otsu)阈值分割技术应用于整个可通行表面。然而,结果未能达到我们的预期。

经过一些实验,我们发现 LiDAR 中每个激光扫描器感知强度时存在微小的差异。例如,激光器 1 可能会为白色道路标记记录 0.9 的强度值,而激光器 2 则记录 0.7 的强度。

这种差异可能是由于每个 LiDAR 激光扫描器的不同特性造成的。这种差异也出现在上图中。

我们的做法是为每个环获得一个独特的阈值,其中一个环代表来自同一 LiDAR 激光扫描器的点。我们再次利用大津方法来确定区分沥青和道路标记的最佳阈值。

需要注意的是,nuScenes 数据集的 LiDAR 包含 32 个激光扫描器,形成了由 32 个环组成的点云结构。nuScenes 数据集以简单的格式提供数据:x, y, z, 强度, ring_id。

这使得我们能够轻松获取属于同一激光扫描器的点,因为每个激光扫描器都会形成自己的环。

以下展示了我们的方法在 nuScenes 中应用于整个场景的输出:

在这里插入图片描述

4. 完整 Python 代码:

from cv2 import threshold
import numpy as np
import yaml
import open3d
from pathlib import Path
import os
import cv2
import nthresh
import open3d as o3d
import matplotlib.pyplot as plt
import nthresh
import open3d as o3d
def ring_local_thresholding(points_to_threshold,vis_bool=False, vis_save=False,bin_name=None,points_full=None):
    '''
    points_to_threshold: numpy array of shape (n,5) where n is the number of points and 5 is the x,y,z,intensity,ring
                         these points should be the drivable surface points only
    vis_bool: boolean to visualize the results
    vis_save: boolean to save the results
    bin_name: name of the bin file to save the results
    points_full: numpy array of shape (n,5) where n is the number of points and 5 is the x,y,z,intensity,ring
                    these points should be the full point cloud
    
    returns :  numpy array of the road marks points           
    '''
    list_land_marks_points = []
    np_list_land_marks_points = []
    list_return = []
    n_rings=32

    # get the threshold for each ring
    for i in range(n_rings):
        ring_points=points_to_threshold[points_to_threshold[:,4]==i]
        intensity=ring_points[:,3]
        if(intensity.shape[0]<2):
            continue
        try:
            # get the threshold for each ring using Otsu's method
            threshold = nthresh.nthresh(intensity, n_classes=2, bins=255, n_jobs=1)
        except:
            continue
        # get points of intenisty higher than theh threshold, these points are the road marking points
        binary_intensity=intensity>(threshold[0]+10)
        land_marking_points=ring_points[binary_intensity]
        if(land_marking_points.shape[0]<2):
            continue

        list_return.append(land_marking_points)
        pc_land_marks = o3d.geometry.PointCloud()
        pc_land_marks.points = o3d.utility.Vector3dVector(land_marking_points[:,:3])
        pc_land_marks.paint_uniform_color([1,0, 0])
        list_land_marks_points.append(pc_land_marks)
        np_list_land_marks_points.append(land_marking_points[:,:3])


    # the following code is to visualize the results using open3d library
    if(vis_bool or vis_save):
        visgeom = o3d.visualization.Visualizer()
        visgeom.create_window()
        mesh_frame = o3d.geometry.TriangleMesh.create_coordinate_frame(size=1, origin=[0, 0, 0])  # create coordinate frame
        visgeom.add_geometry(mesh_frame)
        # for box in bounding_boxes:
        #     visgeom.add_geometry(box)
        points_full=points_full[:,:3]+np.array([0,0,-0.5])
        pc_full=o3d.geometry.PointCloud()
        pc_full.points = o3d.utility.Vector3dVector(points_full[:,:3])
        pc_full.paint_uniform_color([0.8,0.8, 0.8])
        visgeom.add_geometry(pc_full)
        for pc in list_land_marks_points:
            visgeom.add_geometry(pc)


        #camera
        ctr = visgeom.get_view_control()
        ctr.set_front([0, -3, 1])
        ctr.set_lookat([0, 0, 0])
        ctr.set_up([0, 1, 0])
        ctr.set_zoom(0.5)

        # if (vis_bool):
        #     visgeom.run()
        #     visgeom.destroy_window()

        # if (vis_save):
        save3DPath='./output_vis_folder/'
        visgeom.capture_screen_image( save3DPath + "/" + str(bin_name)+ ".jpg", do_render=True)

    return np_list_land_marks_points


def draw_grey_scale(ground_points):
    '''
    ground_points: (5*n) numpy array, the ground points only

    description: draws the
    '''

    vis_points=ground_points[:,:3]

    intensity=ground_points[:,3]
    # intensity=intensity+0.001
    color_of_points=(np.stack([intensity, intensity, intensity], axis=1)/np.max(intensity).astype(np.float32))

    background_color=np.asarray([0, 0, 0])

    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(vis_points)
    pcd.colors = o3d.utility.Vector3dVector(color_of_points)
    vis = o3d.visualization.Visualizer()
    vis.create_window()
    vis.get_render_option().background_color = np.asarray([0, 0, 0])
    vis.add_geometry(pcd)
    vis.run()
    vis.destroy_window()





def draw_grey_scale_ground_w_whole_scene(ground_points,whole_pc):
    '''
    ground_points: (5*n) numpy array, the ground points only

    whole_pc: (5*n) numpy array, the whole point cloud

    description:
        Draws the full point cloud with the background black and the points white accroding to its intensity [0-1]
    '''

    # temp=ground_points[ ground_points[:,3]>=0.1  ]

    # ground_points=ground_points[ ground_points[:,3]<0.1  ]
    vis_points=ground_points[:,:3]
    vis_points+=np.array([0,0,0.5])

    intensity=ground_points[:,3]

    print('max=',np.max(intensity))


    # color_of_points=(np.stack([intensity, intensity, intensity], axis=1)/np.max(intensity).astype(np.float32))
    color_of_points=(np.stack([intensity, intensity, intensity], axis=1))
    # color_of_points=(  (np.stack([intensity, intensity, intensity], axis=1)/(0.09)) .astype(np.float32))

    background_color=np.asarray([0, 0, 0])

    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(vis_points)
    pcd.colors = o3d.utility.Vector3dVector(color_of_points)

    pcd_whole_pc = o3d.geometry.PointCloud()
    pcd_whole_pc.points = o3d.utility.Vector3dVector(whole_pc[:,:3])

    intensity_whole_pc=whole_pc[:,3]
    intensity_whole_pc= np.stack([intensity_whole_pc, intensity_whole_pc, intensity_whole_pc], axis=1)/np.max(intensity_whole_pc)
    intensity_whole_pc=intensity_whole_pc.astype(np.float32)
    pcd_whole_pc.colors = o3d.utility.Vector3dVector(intensity_whole_pc)
    # pcd_whole_pc.paint_uniform_color([0.2,0, 0])
    # pcd_whole_pc.paint_uniform_color(np.stack([intensity_whole_pc, intensity_whole_pc, intensity_whole_pc], axis=1))



    vis = o3d.visualization.Visualizer()
    vis.create_window()
    vis.get_render_option().background_color = np.asarray([0.2, 0.2, 0.2])
    # vis.add_geometry(pcd)
    vis.add_geometry(pcd_whole_pc)
    vis.run()
    vis.destroy_window()

def extract_landmarks( pointlcoud:np.array, labels:np.array, name:str):
    '''
    pointlcoud: (5*n) numpy array, the whole point cloud

    labels: (n) numpy array, the labels of the whole point cloud

    description:
        Extracts the landmarks from the point cloud and returns them as a numpy array

    returns:
        points_np : same as pointcloud but with different order
        new_labels_np : label of point 1 -> landmark
                                       0 -> not landmark
    '''

    # 11 is the label of the drivable surface
    drivable_surface_points = pointlcoud[labels==11]


    new_labels_list = [] # list of the labels of the points
    points_list = [] # list of the points in the same order as the new labels

    drivable_surface_points = pointlcoud[labels==11]

    n_rings = 32  # 32 as nuscenes lidar have 32 laser stacked vertically

    for i in range(n_rings):
        
        # get the points of each ring
        ring_points = drivable_surface_points[drivable_surface_points[:,4]==i]
        intensity=ring_points[:,3]

        
        if(intensity.shape[0]<2):
            continue
        try:
            # get the threshold for each ring using Otsu's method
            #the threshold is the intensity value that separates the road marking points from the other points
            threshold = nthresh.nthresh(intensity, n_classes=2, bins=255, n_jobs=1)
        except:
            continue

        # intensity above the threshold are the road marking points
        ring_labels = intensity > (threshold[0]+10)
    
        ring_labels = ring_labels.astype(np.uint32)
        new_labels_list.append(ring_labels)
        points_list.append(ring_points)
    
    # get the non drivable surface points to add to the list of the new pcd
    non_drivable_surface_points = pointlcoud[labels!=11]
    non_drivable_surface_labels = np.zeros(non_drivable_surface_points.shape[0]).astype(np.uint32)

    new_labels_list.append(non_drivable_surface_labels)
    points_list.append(non_drivable_surface_points)

    new_labels_np = np.concatenate(new_labels_list).astype(np.uint32)
    points_np = np.concatenate(points_list)

    # Visualisation part using open3d library
    visualiser = o3d.visualization.Visualizer()
    visualiser.create_window()
    mesh_frame = o3d.geometry.TriangleMesh.create_coordinate_frame(size=1, origin=[0, 0, 0])  # create coordinate frame
    visualiser.add_geometry(mesh_frame)

    points_landmark = points_np[new_labels_np==1]
    points_other = points_np[new_labels_np==0]

    pcd_l = o3d.geometry.PointCloud()
    pcd_l.points = o3d.utility.Vector3dVector(points_landmark[:,:3])
    pcd_l.paint_uniform_color([1,0, 0])

    pcd_o = o3d.geometry.PointCloud()
    pcd_o.points = o3d.utility.Vector3dVector(points_other[:,:3])
    pcd_o.paint_uniform_color([0.8,0.8, 0.8])

    visualiser.add_geometry(pcd_l)
    visualiser.add_geometry(pcd_o)
    ctr = visualiser.get_view_control()
    ctr.set_front([0, -3, 1])
    ctr.set_lookat([0, 0, 0])
    ctr.set_up([0, 1, 0])
    ctr.set_zoom(0.1)
    # save
    save3DPath='./output_vis_folder/'
    visualiser.capture_screen_image( save3DPath + "/" + name+ ".jpg", do_render=True)

    return points_np, new_labels_np
    

def main():

    points_dir = Path('./lidar_data/') # path to .bin data
    label_dir = Path('./lidar_data_labels_all/') # path to .label data
    save_dir = Path('./lidar_data_labels_road_marking/') # path to save the results

    with open('./config/label_mapping/nuscenes.yaml', 'r') as stream: # label_mapping configuration file
        label_mapping = yaml.safe_load(stream)
        color_dict = label_mapping['color_map']

    # rename the label names to be the same as the poincloud name
    for it_scan, it_label in zip(sorted(points_dir.iterdir()), sorted(label_dir.iterdir())):
        bin_name=str(it_scan).split('/')[-1]
        label_name=str(it_label).split('/')[-1]
        old_name=str(it_label)
        new_name=str(label_dir)+'/'+bin_name[:-4]+'.label'
        os.rename(old_name,new_name)

    for it in sorted(label_dir.iterdir()):

        label_file = it
        points_file = points_dir / (str(it.stem) + '.bin')
        labels = np.fromfile(label_file, dtype=np.uint32)

        # x, y, z, inetnsity, ring_index
        points_full = np.fromfile(points_file, dtype=np.float32).reshape((-1, 5))

        new_points, new_labels_np = extract_landmarks(points_full, labels, str(it.stem))
        # new_labels_np as int
        new_labels_np = new_labels_np.astype(np.uint32)
        # new_labels_np as float
        # new_labels_np = new_labels_np.astype(np.float32)
        # save new_points
        new_points.tofile(save_dir / (str(it.stem) + '.bin'))
        # save new_labels_np
        new_labels_np.tofile(save_dir / (str(it.stem) + '.label'))


# main
if __name__ == '__main__':
    main()