【小白深度教程 1.29】手把手教你使用 Open3D(12)点云聚类、分割和 3D 目标检测实战,使用 KITTI 数据集(含 Python 代码)
1. 摘要
本项目深入研究了如何使用 KITTI 数据集进行实际的点云分析。
- 我们首先使用 Open3D 进行可视化,并使用体素网格进行下采样。
- 接着,我们应用 RANSAC 算法来分割障碍物和道路表面,从而增强对场景的理解。 利用 DBSCAN 聚类算法,我们对相似障碍物进行了聚类分析,以获得更准确的空间洞察。
- 为实现跟踪功能,我们为每个障碍物创建了 3D 边界框。
本项目的目标是展示如何应用机器学习算法处理 3D 空间中的点云数据。
与立体相机相比(我们之前的章节讲过),单目相机在准确感知复杂的 3D 环境时面临困难。 此外,如何分类环境中的所有潜在障碍物——从树木和电线到路灯杆及其他车辆——也是一大难题。 障碍物检测所依赖的有限数据集无法涵盖所有可能存在的物体。
2. 点云处理
在我们的分析中,我们将使用 KITTI 数据集,该数据集包含同步的立体图像和 LiDAR 数据。 该数据集通过立体相机系统定时捕获两组图像。 此外,他们的 Velodyne LiDAR 生成了相应的点云。 因此,我们可以使用图像在 2D 中可视化场景,并使用点云在 3D 中可视化场景,如下所示:
2.1 可视化
在本项目中,有多种用于点云处理的库和软件。 其中,Open3D 和 PCL 是两个常用的库。 本项目将专门使用 Open3D。
选择 Open3D 的原因在于其易于使用,并且有大量相关文献可以参考。
我们的点云数据已经转换为 .ply 格式,因此我们可以使用 Open3D 的
函数,如下所示:
read_point_cloud
point_cloud = open3d.io.read_point_cloud(point_cloud_path)
在加载文件后,有许多方法可以用Open3D可视化点云:
方法一
o3d.visualization.draw_geometries([point_cloud])
方法二
vis = open3d.visualization.Visualizer()
vis.create_window()
vis.add_geometry(point_cloud)
vis.get_render_option().background_color = np.asarray([0, 0, 0])
vis.run()
vis.destroy_window()
方法三
visualize_reflectance_distance(point_cloud, mode='distance', save=False, show=True) #mode = reflectance or distance
我们还可以用颜色编码的距离来可视化点云:
注意,远处的点显示为黄色,如右边的颜色条所示。现在,如果我们想用颜色编码的反射率来可视化点云:
2.2 阈值过滤车道线检测
让我们现在来构建基于点云反射率数据的车道线检测。
从上图可以看出,当我们仔细观察时,可以看到车道线呈现较暗的红色阴影。接下来,我们将过滤具有较高反射率的点云数据。步骤如下:
- 获取反射率值
- 创建一个掩码,筛选出反射率高于阈值的点
- 使用掩码过滤点和反射率值
- 使用过滤后的点和颜色创建新的点云
filtered_point_cloud = reflectivity_threshold(point_cloud, threshold=0.5)
使用阈值技术,我们过滤掉了所有反射率低于 0.5 的点云。
可以观察到,车道线变得更加明显,但同时也包括车牌和两侧的护栏。
我们只对车道线感兴趣,因此需要将其他部分过滤掉。
2.3 感兴趣区域(ROI)
现在我们有了过滤后的反射率值,我们需要手动创建一个包含车道线的ROI。
我们需要为ROI设置最小值和最大值:
roi_point_cloud = roi_filter(point_cloud, roi_min=(0, -3, -2), roi_max=(20, 3, 0))
当我们对所选择的 ROI 感到满意时,我们需要使用 reflectivity_threshold 函数和 roi_filter 函数创建一个管道。
# Lane Line Detection
def lane_line_detection(point_cloud):
# Make a copy
point_cloud_copy = copy.deepcopy(point_cloud)
# Thresholding
filtered_point_cloud = reflectivity_threshold(point_cloud_copy, threshold=0.45)
# Region of Interest
roi_point_cloud = roi_filter(filtered_point_cloud, roi_min=(0, -3, -2), roi_max=(20, 3, 0))
return roi_point_cloud
3. 使用体素网格进行下采样
在处理点云时,并不需要所有的点。实际上,下采样点云可以去除潜在的噪声,并提高处理和可视化的速度。体素网格下采样的步骤如下:
- 定义体素大小 :较小的体素尺寸会生成更详细的点云,而较大的体素尺寸会导致更激进的下采样。
- 划分空间 :通过创建 3D 网格,将点云的 3D 空间划分为多个体素。根据点的 3D 坐标,每个点都被分配到对应的体素中。
- 子采样 :对于每个体素,保留一个点。这可以通过选择体素内点的质心或使用其他采样方法来实现。
- 移除冗余点 :落在同一体素中的点被简化为一个点,有效地对点云进行下采样。
# Voxel Grid
point_cloud_downsampled = point_cloud.voxel_down_sample(voxel_size=1)
下采样过程的结果是点云的点数减少,但仍保留其主要特征。
4. 使用 RANSAC 进行分割
RANSAC(随机采样一致性)由 Fischler 和 Bolles 于 1981 年发明,作为解决在有噪声数据和离群值中拟合模型(如直线、平面、圆等)的问题。该算法广泛应用于计算机视觉、图像处理以及其他需要对模型参数进行稳健估计的领域。
RANSAC 算法的步骤如下:
- 随机选择 s 个样本,这是拟合模型所需的最小样本数量。
- 使用随机选择的样本拟合模型。
- 统计在误差范围 e 内拟合模型的点的数量 M。
- 重复步骤 1-3 共 N 次。
- 选择具有最多内点 M 的模型。
在我们的场景中,我们选择
等于 3,即每次 RANSAC 迭代中随机采样的点的数量,
等于 2000,即执行 RANSAC 的迭代次数。
ransac_n
num_iterations
# Perform plane segmentation using RANSAC
plane_model, inliers = point_cloud.segment_plane(distance_threshold=distance_threshold, ransac_n=ransac_n, num_iterations=num_iterations)
# Extract inlier and outlier point clouds
inlier_cloud = point_cloud.select_by_index(inliers)
outlier_cloud = point_cloud.select_by_index(inliers, invert=True)
观察作为道路的平面是如何与所有其他垂直物体(如汽车、树木、广告牌等)分割的。内线是路段,离群值是其余部分。
5. 使用 DBSCAN 进行聚类
DBSCAN(Density-Based Spatial Clustering of Applications with Noise)是一种基于密度的聚类算法,由 Martin Ester、Hans-Peter Kriegel、Jörg Sander 和 Xiaowei Xu 于 1996 年提出。它特别适用于具有复杂结构且需要自动检测簇的数据集。
DBSCAN 算法的步骤如下:
- 选择参数 ε 和 MinPts,分别定义数据点的邻域范围和形成核心点所需的最小点数。
- 如果一个数据点的 ε 邻域内至少包含 MinPts 个点,则将其标记为核心点。
- 如果点 A 位于核心点 B 的 ε 邻域内,则 A 是从 B 直接密度可达的点。
- 密度可达性是传递的,通过连接相互密度可达的点来形成基于密度的簇。
- 噪声点是离群点,不是核心点,也不是从任何核心点密度可达的点。
在我们的场景中,我们将使用
值为 1.5,表示两点之间的最大距离,以便将其中一个点视为另一个点的邻域内。
值为 50,表示形成密集区域(核心点)所需的最小点数。
eps
min_points
labels = np.array(outlier_cloud.cluster_dbscan(eps=eps, min_points=min_points, print_progress=True))
注意,道路是一个簇,所有其他障碍物都是单独的簇,用不同的颜色表示。
6. 使用 PCA 创建 3D 边界框
在成功对障碍物进行聚类后,下一步是将它们封闭在 3D 边界框内。由于我们在 3D 空间中操作,生成的边界框也位于 3D 空间中。然而,除了生成边界框之外,我们还需要获取它们的朝向信息。因为一些障碍物相对于自车可能处于不同的角度,我们希望构建精确包围每个障碍物的边界框。为此,我们将使用 PCA(主成分分析):
-
计算点云子集的质心 :质心即是点云的质量中心。
-
计算协方差矩阵 :协方差矩阵总结了各个维度之间的关系。
-
应用 PCA :对协方差矩阵进行 PCA 分析,找到特征向量和特征值。
-
主轴方向 :具有最大特征值的特征向量表示最长轴和主要扩展方向。
-
次轴方向 :具有最小特征值的特征向量表示最短轴和压缩方向。
-
定义 OBB 的朝向 :使用这些特征向量来定义定向边界框(OBB)的朝向(旋转)。OBB 与特征向量对齐,使边界框代表点云的主要方向。
-
OBB 尺寸 :每个轴上的 OBB 尺寸反映了沿相应特征向量的点的分布范围。
obs = []
# Group points by cluster label
indexes = pd.Series(range(len(labels))).groupby(labels, sort=False).apply(list).tolist()
# Iterate over clusters and perform PCA
for i in range(0, len(indexes)):
nb_points = len(outlier_cloud.select_by_index(indexes[i]).points)
if nb_points > MIN_POINTS and nb_points < MAX_POINTS:
sub_cloud = outlier_cloud.select_by_index(indexes[i])
obb = sub_cloud.get_oriented_bounding_box()
请注意,Open3D 的
函数在后台为我们执行了 PCA 操作。我们使用离群点云子集,因为这些点对应于车辆和其他物体,而内群点则代表道路。
get_oriented_bounding_box
7. 完整 Python 代码
## IMPORT LIBRARIES
import numpy as np
import copy
import os
import open3d as o3d
from utils import *
import matplotlib.pyplot as plt
import pandas as pd
from open3d.visualization.draw_plotly import get_plotly_fig
import plotly.graph_objects as go
print("Open3D version:", o3d.__version__)
import torch
# Check if CUDA is available
if torch.cuda.is_available():
# Set the device to GPU
device = torch.device("cuda")
print("Using GPU for computation.")
print("Device type:", torch.cuda.get_device_name(device))
print("Number of GPUs:", torch.cuda.device_count())
else:
# Set the device to CPU
device = torch.device("cpu")
print("GPU not available. Using CPU for computation.")
### ------------ VISUALIZATION ------------------- ###
def mode_plotly(point_cloud, mode='reflectance'):
# Extract the point cloud's coordinates as a numpy array
points = np.asarray(point_cloud.points)
# Calculate distances from the origin
distances = np.linalg.norm(points, axis=1)
# Check if the point cloud has color information
if hasattr(point_cloud, 'colors') and len(point_cloud.colors) == len(point_cloud.points):
colors = np.asarray(point_cloud.colors)
color_values = distances
else:
colors = np.zeros((len(point_cloud.points), 3)) # Default to black color if no colors provided
if mode == 'reflectance':
# Get the reflectance values from the red channel of colors
reflectivities = colors[:, 0]
fig = go.Figure(data=[go.Scatter3d(
x=points[:, 0],
y=points[:, 1],
z=points[:, 2],
mode='markers',
marker=dict(
size=2,
color=reflectivities,
colorscale='RdPu', # Use the 'Viridis' colorscale (you can choose any other)
colorbar=dict(title="Reflectance"), # add a colorbar title
opacity=0.8,
showscale=True # Show the colorscale
)
)])
elif mode == 'distance':
fig = go.Figure(data=[go.Scatter3d(
x=points[:, 0],
y=points[:, 1],
z=points[:, 2],
mode='markers',
marker=dict(
size=2,
color=color_values, # use color_values for color
colorscale='electric', # choose a colorscale
colorbar=dict(title="Distance"), # add a colorbar title
opacity=0.8
)
)])
else:
print("Wrong mode chosen! Choose either: 'reflectance' OR 'distance'!")
# Update the scene layout if needed
fig.update_layout(
scene=dict(
xaxis=dict(visible=False, range=[-70, 70]),
yaxis=dict(visible=False, range=[-40, 40]),
zaxis=dict(visible=False, range=[-5, 1]),
aspectmode='manual', aspectratio=dict(x=2, y=1, z=0.1),
camera=dict(
up=dict(x=0.15, y=0, z=1),
center=dict(x=0, y=0, z=0.1),
eye=dict(x=-0.3, y=0, z=0.2)
)
),
# plot_bgcolor='black', #background
# paper_bgcolor='black', #background
scene_dragmode='orbit'
)
return fig
def visualize_reflectance_distance(point_cloud, mode, save=False, show=True, output_folder='output', filename='processed'):
"""
Visualize a 3D point cloud and optionally save the plot as an image.
Parameters:
point_cloud (numpy.ndarray): The 3D point cloud data.
save (bool, optional): If True, the plot will be saved as an image. Default is False.
show (bool, optional): If True, the plot will be displayed interactively. Default is True.
output_folder (str, optional): The folder where the image will be saved. Default is 'output'.
filename (str, optional): The name of the saved image file. Default is 'processed'.
Returns:
plotly.graph_objects.Figure: The Plotly figure object.
"""
# Call get_plotly_fig with the provided point cloud and other parameters to get the Plotly figure
fig = mode_plotly(point_cloud, mode=mode)
if show:
fig.show()
# Save the plot as an image if save is True
if save:
image_path = f"{output_folder}/{filename}_processed.jpg"
fig.write_image(image_path, scale=3)
print(f"Plot saved as: {image_path}")
return fig
def plotly_fig(point_cloud):
# Extract the point cloud's coordinates as a numpy array
points = np.asarray(point_cloud.points)
# Check if the point cloud has color information
if hasattr(point_cloud, 'colors') and len(point_cloud.colors) == len(point_cloud.points):
colors = np.asarray(point_cloud.colors)
else:
colors = np.zeros((len(point_cloud.points), 3)) # Default to black color if no colors provided
fig = go.Figure(data=[go.Scatter3d(
x=points[:, 0],
y=points[:, 1],
z=points[:, 2],
mode='markers',
marker=dict(
size=2,
color=colors,
opacity=0.8
)
)])
# Update the scene layout if needed
fig.update_layout(
scene=dict(
xaxis=dict(visible=False, range=[-70, 70]),
yaxis=dict(visible=False, range=[-40, 40]),
zaxis=dict(visible=False, range=[-5, 1]),
aspectmode='manual', aspectratio=dict(x=2, y=1, z=0.1),
camera=dict(
up=dict(x=0.15, y=0, z=1),
center=dict(x=0, y=0, z=0.1),
eye=dict(x=-0.3, y=0, z=0.2)
)
),
# plot_bgcolor='black', # background
# paper_bgcolor='black', # background
scene_dragmode='orbit'
)
return fig
def visualize_point_clouds(*point_clouds, show=True, save=False, output_folder='output', filename='combined'):
"""
Visualize one or more 3D point clouds and optionally save the plot as an image.
Parameters:
*point_clouds: Variable-length arguments, each representing a 3D point cloud data.
Can be numpy.ndarray or open3d.geometry.PointCloud.
show (bool, optional): If True, the plot will be displayed interactively. Default is True.
Returns:
plotly.graph_objects.Figure: The Plotly figure object.
"""
fig_combined = go.Figure()
for pc in point_clouds:
# Convert input to Open3D PointCloud if provided as a numpy array
if isinstance(pc, np.ndarray):
pc = o3d.geometry.PointCloud(o3d.utility.Vector3dVector(pc))
# Call get_plotly_fig to get the Plotly figure for the current point cloud
fig = plotly_fig(pc)
# Add the trace from the current figure to the combined figure
fig_combined.add_trace(fig.data[0])
# Update the layout if needed (common for all point clouds)
fig_combined.update_layout(fig.layout)
if show:
fig_combined.show()
# Save the plot as an image if save is True
if save:
print("\nSaving...")
image_path = f"{output_folder}/{filename}_combined.jpg"
fig_combined.write_image(image_path, scale=1)
print(f"Plot saved as: {image_path}")
return fig_combined
### ------------ LANE DETECTION ------------------- ###
def reflectivity_threshold(point_cloud, threshold=0.5):
"""
Filters a point cloud based on reflectivity threshold.
Parameters:
pcd (open3d.geometry.PointCloud): The input point cloud.
threshold (float, optional): The reflectivity threshold. Points with reflectivity above this value will be kept.
Default is 0.5.
Returns:
open3d.geometry.PointCloud: A new point cloud with points whose reflectivity is above the threshold.
"""
if not hasattr(point_cloud, 'colors'):
# If colors (reflectivity information) are not available, return the original point cloud
return point_cloud
# Get the reflectivity values (assuming reflectivity is stored in the red channel of the colors)
colors = np.asarray(point_cloud.colors)
reflectivities = colors[:, 0]
# Create a mask of points that have reflectivity above the threshold
mask = reflectivities > threshold
# Filter points and reflectivities using the mask
filtered_points = np.asarray(point_cloud.points)[mask]
filtered_colors = colors[mask]
# Create a new point cloud with the filtered points and colors
filtered_point_cloud = o3d.geometry.PointCloud()
filtered_point_cloud.points = o3d.utility.Vector3dVector(filtered_points)
filtered_point_cloud.colors = o3d.utility.Vector3dVector(filtered_colors)
return filtered_point_cloud
def roi_filter(pcd, roi_min=(0, -3, -2), roi_max=(20, 3, 0)):
"""
Filters a point cloud based on a region of interest (ROI) defined by the minimum and maximum coordinates.
Parameters:
pcd (open3d.geometry.PointCloud): The input point cloud.
roi_min (tuple, optional): The minimum (x, y, z) coordinates of the ROI. Default is (0, -3, -2).
roi_max (tuple, optional): The maximum (x, y, z) coordinates of the ROI. Default is (20, 3, 0).
Returns:
open3d.geometry.PointCloud: A new point cloud containing points within the specified ROI.
"""
# Convert the point cloud points to a numpy array
points = np.asarray(pcd.points)
# Create a mask for points that fall within the ROI
mask_roi = np.logical_and.reduce((
points[:, 0] >= roi_min[0],
points[:, 0] <= roi_max[0],
points[:, 1] >= roi_min[1],
points[:, 1] <= roi_max[1],
points[:, 2] >= roi_min[2],
points[:, 2] <= roi_max[2]
))
# Filter points using the mask
roi_points = points[mask_roi]
# Create a new point cloud with the filtered points
roi_pcd = o3d.geometry.PointCloud()
roi_pcd.points = o3d.utility.Vector3dVector(roi_points)
return roi_pcd
def lane_line_detection(point_cloud):
point_cloud_copy = copy.deepcopy(point_cloud)
print(point_cloud_copy)
# Thresholding
filtered_point_cloud = reflectivity_threshold(point_cloud_copy, threshold=0.45)
print(filtered_point_cloud)
# Region of Interest
roi_point_cloud = roi_filter(filtered_point_cloud, roi_min=(0, -3, -2), roi_max=(20, 3, 0))
print(roi_point_cloud)
return roi_point_cloud
def ransac(point_cloud, distance_threshold=0.33, ransac_n=3, num_iterations=100):
"""
RANSAC-based plane segmentation for a point cloud.
Parameters:
point_cloud (open3d.geometry.PointCloud): The input point cloud.
distance_threshold (float, optional): The maximum distance a point can be from the plane to be considered an inlier.
Default is 0.33.
ransac_n (int, optional): The number of points to randomly sample for each iteration of RANSAC. Default is 3.
num_iterations (int, optional): The number of RANSAC iterations to perform. Default is 100.
Returns:
open3d.geometry.PointCloud, open3d.geometry.PointCloud: Two point clouds representing the inliers and outliers
of the segmented plane, respectively.
"""
# Perform plane segmentation using RANSAC
plane_model, inliers = point_cloud.segment_plane(distance_threshold=distance_threshold, ransac_n=ransac_n,
num_iterations=num_iterations)
# Extract inlier and outlier point clouds
inlier_cloud = point_cloud.select_by_index(inliers)
outlier_cloud = point_cloud.select_by_index(inliers, invert=True)
# Color the outlier cloud red and the inlier cloud blue
outlier_cloud.paint_uniform_color([0.8, 0.2, 0.2]) # Red
inlier_cloud.paint_uniform_color([0.25, 0.5, 0.75]) # Blue
return outlier_cloud, inlier_cloud
def dbscan(outlier_cloud, eps=1.0, min_points=10):
"""
Perform Density-Based Spatial Clustering of Applications with Noise (DBSCAN) on the input point cloud.
Parameters:
outlier_cloud (open3d.geometry.PointCloud): The input point cloud to be clustered.
eps (float, optional): The maximum distance between two points for one to be considered as in the neighborhood of the other.
Default is 1.0.
min_points (int, optional): The minimum number of points required to form a dense region (core points). Default is 10.
Returns:
open3d.geometry.PointCloud: The input point cloud with updated cluster colors.
numpy.ndarray: Array of cluster labels assigned to each point in the point cloud.
"""
# Perform DBSCAN clustering on the input point cloud
labels = np.array(outlier_cloud.cluster_dbscan(eps=eps, min_points=min_points, print_progress=True))
# Find the maximum cluster label to get the total number of clusters
max_label = labels.max()
print(f"\nPoint cloud has {max_label + 1} clusters")
# Generate colors for the clusters using a colormap
colors = plt.get_cmap("hsv")(labels / (max_label if max_label > 0 else 1))
colors[labels < 0] = 0
# Assign the computed colors to the point cloud
outlier_cloud.colors = open3d.utility.Vector3dVector(colors[:, :3])
# Return the point cloud with updated colors and the cluster labels
return outlier_cloud, labels
def get_bounding_boxes(labels, outlier_cloud, MAX_POINTS=300):
"""
Get axis-aligned bounding boxes (boxes) for clusters in a point cloud.
Parameters:
labels (numpy.ndarray): The cluster labels assigned to each point in the point cloud.
outlier_cloud (open3d.geometry.PointCloud): The point cloud containing the outlier points.
Returns:
list of open3d.geometry.OrientedBoundingBox: A list of axis-aligned bounding boxes (boxes) for each cluster.
"""
# Extract points for each cluster
clusters = {}
for i, label in enumerate(labels):
if label >= 0:
if label not in clusters:
clusters[label] = []
clusters[label].append(outlier_cloud.points[i])
# Filter out clusters with more than 300 points (optional)
clusters = {label: points for label, points in clusters.items() if len(points) <= MAX_POINTS}
# Create boxes for each cluster
boxes = []
for points in clusters.values():
cluster_cloud = o3d.geometry.PointCloud()
cluster_cloud.points = o3d.utility.Vector3dVector(points)
aabb = cluster_cloud.get_axis_aligned_bounding_box() #PCA
boxes.append(aabb)
return boxes
def get_trace(obb_boxes, fig):
width = 1.0
height = 2.0
depth = 3.0
for obb in obb_boxes:
# Get the eight corner points of the OBB
corners = np.asarray(obb.get_box_points())
# Extract x, y, and z coordinates of the corners
x = corners[:, 0]
y = corners[:, 1]
z = corners[:, 2]
# Create a Mesh3d trace for the oriented bounding box with opacity
obb_trace = go.Mesh3d(
x=x,
y=y,
z=z,
i=[0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7, 0, 2, 6, 4, 1, 3, 7, 5],
j=[1, 2, 3, 0, 5, 6, 7, 4, 2, 3, 7, 6, 1, 0, 4, 5, 6, 7, 3, 2, 0, 1, 5, 4],
k=[2, 3, 0, 1, 6, 7, 4, 5, 3, 7, 6, 2, 0, 4, 5, 1, 7, 6, 2, 4, 1, 5, 0, 3],
color='blue',
opacity=0.2
)
# Add the Mesh3d trace to the figure
fig.add_trace(obb_trace)
return fig
def pca(labels, outlier_cloud, inlier_cloud, MAX_POINTS, MIN_POINTS):
"""
Perform PCA (Principal Component Analysis) on the point cloud clusters.
Parameters:
outlier_cloud (open3d.geometry.PointCloud): The point cloud containing the outlier points.
inlier_cloud (open3d.geometry.PointCloud): The point cloud containing the inlier points.
MAX_POINTS (int): The maximum number of points in a cluster to consider for PCA.
MIN_POINTS (int): The minimum number of points in a cluster to consider for PCA.
Returns:
list of open3d.geometry.OrientedBoundingBox: A list of oriented bounding boxes (OBB) for each cluster.
"""
obs = []
# Group points by cluster label
indexes = pd.Series(range(len(labels))).groupby(labels, sort=False).apply(list).tolist()
# Iterate over clusters and perform PCA
for i in range(0, len(indexes)):
nb_points = len(outlier_cloud.select_by_index(indexes[i]).points)
if nb_points > MIN_POINTS and nb_points < MAX_POINTS:
sub_cloud = outlier_cloud.select_by_index(indexes[i])
obb = sub_cloud.get_oriented_bounding_box()
obb.color = (0.5, 0, 0.5)
obs.append(obb)
# Combine the outlier cloud, PCA bounding boxes, and inlier cloud in a list
list_of_visuals = []
list_of_visuals.append(outlier_cloud)
list_of_visuals.extend(obs)
list_of_visuals.append(inlier_cloud)
return list_of_visuals
if __name__ == '__main__':
# Get the current directory
current_directory = os.getcwd()
# Go back to the parent directory
parent_directory = os.path.dirname(current_directory)
# Set input directory
point_cloud_folder = os.path.join(parent_directory, 'Data', 'KITTI_PCD')
output_folder = os.path.join(parent_directory, 'Data', 'Output')
# Choose index
index = 357
# Get pcd file
point_cloud_path = os.path.join(point_cloud_folder, os.listdir(point_cloud_folder)[index])
point_cloud = open3d.io.read_point_cloud(point_cloud_path)
# Separate points and Colors
points = np.asarray(point_cloud.points)
colors = np.asarray(point_cloud.colors)
print("\n Point Cloud: ")
points = np.asarray(points)
print(points)
print(points.shape)
print("\n Colors: ")
colors = np.asarray(colors)
print(colors)
print(colors.shape)
### ------------------ 1. VISUALIZATION ------------------------- ###
# visualization_draw_geometry(point_cloud, background='black') # Dark background
# visualization_draw_geometry(point_cloud, background='white') # White background
# Visualize with Plotly
plotly_distance(points, show=False)
# Visualize Reflectance
visualize_reflectance_distance(point_cloud, mode='reflectance', save=False, show=False, output_folder=output_folder, filename='test')
# Visualize Distance
visualize_reflectance_distance(point_cloud, mode='distance', save=False, show=False, output_folder=output_folder, filename='test')
# Thresholding
filtered_point_cloud = reflectivity_threshold(point_cloud, threshold=0.5)
visualize_point_clouds(point_cloud.paint_uniform_color((0.7, 0.7, 0.7)),
filtered_point_cloud.paint_uniform_color((0.8, 0.2, 0.2)), show=False)
# Region of Interest
roi_point_cloud = roi_filter(point_cloud, roi_min=(0, -3, -2), roi_max=(20, 3, 0))
visualize_point_clouds(point_cloud.paint_uniform_color((0.7, 0.7, 0.7)),
roi_point_cloud.paint_uniform_color((0.34, 0.55, 0.67)), show=False)
# Visualize all point clouds
visualize_point_clouds(point_cloud.paint_uniform_color((0.7, 0.7, 0.7)),
filtered_point_cloud.paint_uniform_color((1.0, 0.6, 0.6)),
roi_point_cloud.paint_uniform_color((0.68, 0.85, 0.9)),
show=False, save=False, output_folder=output_folder, filename='combined')
# Read point cloud
point_cloud = open3d.io.read_point_cloud(point_cloud_path)
# Lane Line Detection
roi_point_cloud = lane_line_detection(point_cloud)
visualize_point_clouds(point_cloud.paint_uniform_color((0, 0, 0)),
roi_point_cloud.paint_uniform_color((0.2, 0.5, 0.3)), show=False)
### ----- Downsampling
# Read point cloud
point_cloud = open3d.io.read_point_cloud(point_cloud_path)
# Voxel Grid
print("Number of points BEFORE: ", len(point_cloud.points))
point_cloud_downsampled = point_cloud.voxel_down_sample(voxel_size=0.2)
print("Number of points AFTER: ", len(point_cloud_downsampled.points))
visualize_point_clouds(point_cloud_downsampled.paint_uniform_color((0.5, 0.0, 0.5)), show=False)
### ------- SEGMENTATION
outlier_cloud, inlier_cloud, = ransac(point_cloud_downsampled, distance_threshold=0.33, ransac_n=3, num_iterations=2000)
visualize_point_clouds(outlier_cloud, inlier_cloud, show=False)
### ------- CLUSTERING
roi_outliers, labels = dbscan(outlier_cloud, eps=1.5, min_points=50)
visualize_point_clouds(roi_outliers, inlier_cloud, show=False)
### ------- PCA
# Method 1:
boxes = get_bounding_boxes(labels, roi_outliers, MAX_POINTS=1500)
print("\nNumber of boxes: ", len(boxes))
fig = visualize_point_clouds(roi_outliers, inlier_cloud, show=False)
fig = get_trace(boxes, fig)
#fig.show()
# Method 2:
list_of_visuals = pca(labels, roi_outliers, inlier_cloud, MAX_POINTS=1500, MIN_POINTS=5)
print(list_of_visuals)
#visualization_draw_geometry_list(list_of_visuals)
# Normals
point_cloud_downsampled.estimate_normals(search_param=o3d.geometry.KDTreeSearchParamHybrid(radius=0.1, max_nn=30))
utils.py
import open3d
import numpy as np
import plotly.graph_objects as go
def visualization_draw_geometry(pcd, background='black'):
vis = open3d.visualization.Visualizer()
vis.create_window()
vis.add_geometry(pcd)
if background=='black':
vis.get_render_option().background_color = np.asarray([0, 0, 0])
elif background=='white':
vis.get_render_option().background_color = np.asarray([1, 1, 1])
else:
print("Wrong background chosen! Choose either 'white' OR 'black'.")
vis.run()
vis.destroy_window()
def plotly_distance(points, show=True):
# Calculate distances from the origin
distances = np.linalg.norm(points, axis=1)
# Create the 3D scatter plot
fig = go.Figure(data=[go.Scatter3d(
x=points[:, 0],
y=points[:, 1],
z=points[:, 2],
mode='markers',
marker=dict(
size=2,
color=distances, # use distances for color
colorscale='Inferno', # choose a colorscale
colorbar=dict(title="Distance from Origin"), # add a colorbar title
opacity=0.8
)
)])
fig.update_scenes(aspectmode='data')
if show == True:
fig.show()
def visualization_draw_geometry_list(pcd_list):
vis = open3d.visualization.Visualizer()
vis.create_window()
for pcd in pcd_list:
vis.add_geometry(pcd)
vis.get_render_option().background_color = np.asarray([1, 1, 1])
vis.run()
vis.destroy_window()
def mode_plotly(point_cloud, mode='reflectance'):
# Extract the point cloud's coordinates as a numpy array
points = np.asarray(point_cloud.points)
# Calculate distances from the origin
distances = np.linalg.norm(points, axis=1)
# Check if the point cloud has color information
if hasattr(point_cloud, 'colors') and len(point_cloud.colors) == len(point_cloud.points):
colors = np.asarray(point_cloud.colors)
color_values = distances
else:
colors = np.zeros((len(point_cloud.points), 3)) # Default to black color if no colors provided
if mode == 'reflectance':
fig = go.Figure(data=[go.Scatter3d(
x=points[:, 0],
y=points[:, 1],
z=points[:, 2],
mode='markers',
marker=dict(
size=2,
color=colors,
opacity=0.8
)
)])
elif mode == 'distance':
fig = go.Figure(data=[go.Scatter3d(
x=points[:, 0],
y=points[:, 1],
z=points[:, 2],
mode='markers',
marker=dict(
size=2,
color=color_values, # use color_values for color
colorscale='Inferno', # choose a colorscale
colorbar=dict(title="Color Value"), # add a colorbar title
opacity=0.8
)
)])
else:
print("Wrong mode chosen! Choose either: 'reflectance' OR 'distance'!")
# Update the scene layout if needed
fig.update_layout(
scene=dict(
xaxis=dict(visible=False, range=[-70, 70]),
yaxis=dict(visible=False, range=[-40, 40]),
zaxis=dict(visible=False, range=[-5, 1]),
aspectmode='manual', aspectratio=dict(x=2, y=1, z=0.1),
camera=dict(
up=dict(x=0.15, y=0, z=1),
center=dict(x=0, y=0, z=0.1),
eye=dict(x=-0.3, y=0, z=0.2)
)
),
# plot_bgcolor='black', #background
# paper_bgcolor='black', #background
scene_dragmode='orbit'
)
return fig
def visualize_reflectance_distance(point_cloud, mode, save=False, show=True, output_folder='output',
filename='processed'):
"""
Visualize a 3D point cloud and optionally save the plot as an image.
Parameters:
point_cloud (numpy.ndarray): The 3D point cloud data.
save (bool, optional): If True, the plot will be saved as an image. Default is False.
show (bool, optional): If True, the plot will be displayed interactively. Default is True.
output_folder (str, optional): The folder where the image will be saved. Default is 'output'.
filename (str, optional): The name of the saved image file. Default is 'processed'.
Returns:
plotly.graph_objects.Figure: The Plotly figure object.
"""
# Call get_plotly_fig with the provided point cloud and other parameters to get the Plotly figure
fig = mode_plotly(point_cloud, mode=mode)
if show:
fig.show()
# Save the plot as an image if save is True
if save:
image_path = f"{output_folder}/{filename}_processed.jpg"
fig.write_image(image_path, scale=3)
print(f"Plot saved as: {image_path}")
return fig
def plotly_fig(point_cloud):
# Extract the point cloud's coordinates as a numpy array
points = np.asarray(point_cloud.points)
# Check if the point cloud has color information
if hasattr(point_cloud, 'colors') and len(point_cloud.colors) == len(point_cloud.points):
colors = np.asarray(point_cloud.colors)
else:
colors = np.zeros((len(point_cloud.points), 3)) # Default to black color if no colors provided
fig = go.Figure(data=[go.Scatter3d(
x=points[:, 0],
y=points[:, 1],
z=points[:, 2],
mode='markers',
marker=dict(
size=2,
color=colors,
opacity=0.8
)
)])
# Update the scene layout if needed
fig.update_layout(
scene=dict(
xaxis=dict(visible=False, range=[-70, 70]),
yaxis=dict(visible=False, range=[-40, 40]),
zaxis=dict(visible=False, range=[-5, 1]),
aspectmode='manual', aspectratio=dict(x=2, y=1, z=0.1),
camera=dict(
up=dict(x=0.15, y=0, z=1),
center=dict(x=0, y=0, z=0.1),
eye=dict(x=-0.3, y=0, z=0.2)
)
),
plot_bgcolor='black', # background
paper_bgcolor='black', # background
scene_dragmode='orbit'
)
return fig
def visualize_point_clouds(*point_clouds, show=True, save=False, output_folder='output', filename='combined'):
"""
Visualize one or more 3D point clouds and optionally save the plot as an image.
Parameters:
*point_clouds: Variable-length arguments, each representing a 3D point cloud data.
Can be numpy.ndarray or open3d.geometry.PointCloud.
show (bool, optional): If True, the plot will be displayed interactively. Default is True.
Returns:
plotly.graph_objects.Figure: The Plotly figure object.
"""
fig_combined = go.Figure()
for pc in point_clouds:
# Convert input to Open3D PointCloud if provided as a numpy array
if isinstance(pc, np.ndarray):
pc = o3d.geometry.PointCloud(o3d.utility.Vector3dVector(pc))
# Call get_plotly_fig to get the Plotly figure for the current point cloud
fig = plotly_fig(pc)
# Add the trace from the current figure to the combined figure
fig_combined.add_trace(fig.data[0])
# Update the layout if needed (common for all point clouds)
fig_combined.update_layout(fig.layout)
if show:
fig_combined.show()
# Save the plot as an image if save is True
if save:
print("\nSaving...")
image_path = f"{output_folder}/{filename}_combined.jpg"
fig_combined.write_image(image_path, scale=1)
print(f"Plot saved as: {image_path}")
return fig_combined
### ------------ LANE DETECTION ------------------- ###
def reflectivity_threshold(point_cloud, threshold=0.5):
"""
Filters a point cloud based on reflectivity threshold.
Parameters:
pcd (open3d.geometry.PointCloud): The input point cloud.
threshold (float, optional): The reflectivity threshold. Points with reflectivity above this value will be kept.
Default is 0.5.
Returns:
open3d.geometry.PointCloud: A new point cloud with points whose reflectivity is above the threshold.
"""
if not hasattr(point_cloud, 'colors'):
# If colors (reflectivity information) are not available, return the original point cloud
return point_cloud
# Get the reflectivity values (assuming reflectivity is stored in the red channel of the colors)
colors = np.asarray(point_cloud.colors)
reflectivities = colors[:, 0]
# Create a mask of points that have reflectivity above the threshold
mask = reflectivities > threshold
# Filter points and reflectivities using the mask
filtered_points = np.asarray(point_cloud.points)[mask]
filtered_colors = colors[mask]
# Create a new point cloud with the filtered points and colors
filtered_point_cloud = o3d.geometry.PointCloud()
filtered_point_cloud.points = o3d.utility.Vector3dVector(filtered_points)
filtered_point_cloud.colors = o3d.utility.Vector3dVector(filtered_colors)
return filtered_point_cloud
def roi_filter(pcd, roi_min=(0, -3, -2), roi_max=(20, 3, 0)):
"""
Filters a point cloud based on a region of interest (ROI) defined by the minimum and maximum coordinates.
Parameters:
pcd (open3d.geometry.PointCloud): The input point cloud.
roi_min (tuple, optional): The minimum (x, y, z) coordinates of the ROI. Default is (0, -3, -2).
roi_max (tuple, optional): The maximum (x, y, z) coordinates of the ROI. Default is (20, 3, 0).
Returns:
open3d.geometry.PointCloud: A new point cloud containing points within the specified ROI.
"""
# Convert the point cloud points to a numpy array
points = np.asarray(pcd.points)
# Create a mask for points that fall within the ROI
mask_roi = np.logical_and.reduce((
points[:, 0] >= roi_min[0],
points[:, 0] <= roi_max[0],
points[:, 1] >= roi_min[1],
points[:, 1] <= roi_max[1],
points[:, 2] >= roi_min[2],
points[:, 2] <= roi_max[2]
))
# Filter points using the mask
roi_points = points[mask_roi]
# Create a new point cloud with the filtered points
roi_pcd = o3d.geometry.PointCloud()
roi_pcd.points = o3d.utility.Vector3dVector(roi_points)
return roi_pcd
def lane_line_detection(point_cloud):
point_cloud_copy = copy.deepcopy(point_cloud)
print(point_cloud_copy)
# Thresholding
filtered_point_cloud = reflectivity_threshold(point_cloud_copy, threshold=0.45)
print(filtered_point_cloud)
# Region of Interest
roi_point_cloud = roi_filter(filtered_point_cloud, roi_min=(0, -3, -2), roi_max=(20, 3, 0))
print(roi_point_cloud)
return roi_point_cloud
def ransac(point_cloud, distance_threshold=0.33, ransac_n=3, num_iterations=100):
"""
RANSAC-based plane segmentation for a point cloud.
Parameters:
point_cloud (open3d.geometry.PointCloud): The input point cloud.
distance_threshold (float, optional): The maximum distance a point can be from the plane to be considered an inlier.
Default is 0.33.
ransac_n (int, optional): The number of points to randomly sample for each iteration of RANSAC. Default is 3.
num_iterations (int, optional): The number of RANSAC iterations to perform. Default is 100.
Returns:
open3d.geometry.PointCloud, open3d.geometry.PointCloud: Two point clouds representing the inliers and outliers
of the segmented plane, respectively.
"""
# Perform plane segmentation using RANSAC
plane_model, inliers = point_cloud.segment_plane(distance_threshold=distance_threshold, ransac_n=ransac_n,
num_iterations=num_iterations)
# Extract inlier and outlier point clouds
inlier_cloud = point_cloud.select_by_index(inliers)
outlier_cloud = point_cloud.select_by_index(inliers, invert=True)
# Color the outlier cloud red and the inlier cloud blue
outlier_cloud.paint_uniform_color([0.8, 0.2, 0.2]) # Red
inlier_cloud.paint_uniform_color([0.25, 0.5, 0.75]) # Blue
return outlier_cloud, inlier_cloud
def dbscan(outlier_cloud, eps=1.0, min_points=10):
labels = np.array(outlier_cloud.cluster_dbscan(eps=eps, min_points=min_points, print_progress=True))
max_label = labels.max()
print(f"\npoint cloud has {max_label + 1} clusters")
colors = plt.get_cmap("inferno_r")(labels / (max_label if max_label > 0 else 1))
colors[labels < 0] = 0
outlier_cloud.colors = open3d.utility.Vector3dVector(colors[:, :3])
return outlier_cloud, labels
def get_bounding_boxes(labels, outlier_cloud, MAX_POINTS=300):
"""
Get axis-aligned bounding boxes (boxes) for clusters in a point cloud.
Parameters:
labels (numpy.ndarray): The cluster labels assigned to each point in the point cloud.
outlier_cloud (open3d.geometry.PointCloud): The point cloud containing the outlier points.
Returns:
list of open3d.geometry.OrientedBoundingBox: A list of axis-aligned bounding boxes (boxes) for each cluster.
"""
# Extract points for each cluster
clusters = {}
for i, label in enumerate(labels):
if label >= 0:
if label not in clusters:
clusters[label] = []
clusters[label].append(outlier_cloud.points[i])
# Filter out clusters with more than 300 points (optional)
clusters = {label: points for label, points in clusters.items() if len(points) <= MAX_POINTS}
# Create boxes for each cluster
boxes = []
for points in clusters.values():
cluster_cloud = o3d.geometry.PointCloud()
cluster_cloud.points = o3d.utility.Vector3dVector(points)
aabb = cluster_cloud.get_axis_aligned_bounding_box() # PCA
boxes.append(aabb)
return boxes
def pca(labels, outlier_cloud, inlier_cloud, MAX_POINTS, MIN_POINTS):
"""
Perform PCA (Principal Component Analysis) on the point cloud clusters.
Parameters:
outlier_cloud (open3d.geometry.PointCloud): The point cloud containing the outlier points.
inlier_cloud (open3d.geometry.PointCloud): The point cloud containing the inlier points.
MAX_POINTS (int): The maximum number of points in a cluster to consider for PCA.
MIN_POINTS (int): The minimum number of points in a cluster to consider for PCA.
Returns:
list of open3d.geometry.OrientedBoundingBox: A list of oriented bounding boxes (OBB) for each cluster.
"""
obs = []
# Group points by cluster label
indexes = pd.Series(range(len(labels))).groupby(labels, sort=False).apply(list).tolist()
# Iterate over clusters and perform PCA
for i in range(0, len(indexes)):
nb_points = len(outlier_cloud.select_by_index(indexes[i]).points)
if nb_points > MIN_POINTS and nb_points < MAX_POINTS:
sub_cloud = outlier_cloud.select_by_index(indexes[i])
obb = sub_cloud.get_oriented_bounding_box()
obb.color = (1, 1, 0) # Set color of the oriented bounding box to yellow (1, 1, 0)
obs.append(obb)
# Combine the outlier cloud, PCA bounding boxes, and inlier cloud in a list
list_of_visuals = []
list_of_visuals.append(outlier_cloud)
list_of_visuals.extend(obs)
list_of_visuals.append(inlier_cloud)
return list_of_visuals
def get_trace(obb_boxes, fig):
for obb in obb_boxes:
# Get the eight corner points of the OBB
corners = np.asarray(obb.get_box_points())
# Extract x, y, and z coordinates of the corners
x = corners[:, 0]
y = corners[:, 1]
z = corners[:, 2]
# Create a Mesh3d trace for the oriented bounding box with opacity
obb_trace = go.Mesh3d(
x=x,
y=y,
z=z,
i=[0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7, 0, 2, 6, 4, 1, 3, 7, 5],
j=[1, 2, 3, 0, 5, 6, 7, 4, 2, 3, 7, 6, 1, 0, 4, 5, 6, 7, 3, 2, 0, 1, 5, 4],
k=[2, 3, 0, 1, 6, 7, 4, 5, 3, 7, 6, 2, 0, 4, 5, 1, 7, 6, 2, 4, 1, 5, 0, 3],
color='blue',
opacity=0.3
)
# Add the Mesh3d trace to the figure
fig.add_trace(obb_trace)
return fig