An In-Depth Look into LiGuard Pipelines

In LiGuard, a pipeline is a sequence of components that process LIDAR and/or image data. In other words, it defines an algorithm that performs a sequence of steps on given LIDAR and/or image data. Each component in the pipeline performs a specific task such as reading data, preprocessing, data analysis, or postprocessing. The components are connected in a sequence, and the data flows from one component to the next based on the defined priority of the components. The pipeline is executed in real-time, and the results are visualized in the GUI and/or stored.

Pipeline Directory Structure

A LiGuard pipeline is organized in a structured directory format. The pipeline directory structure is automatically managed by LiGuard, minimizing the need for manual modifications. However, understanding this structure is beneficial, particularly when sharing pipelines with others. Below, we provide a detailed overview to enhance your comprehension of the directory layout and its components.

The pipeline directory contains the following files and directories:

Pipeline Directory
--------------------
|-- base_config.yml
|-- data_handler
    |-- calib
        |-- handler_custom_dataset1.py
        |-- handler_custom_dataset2.py
        |-- ...
    |-- label
        |-- handler_custom_dataset1.py
        |-- handler_custom_dataset2.py
        |-- ...
|-- algo
    |-- pre
        |-- component1.yml
        |-- component1.py
        |-- component2.yml
        |-- component2.py
        |-- ...
    |-- lidar
        |-- component3.yml
        |-- component3.py
    |-- camera
        ...
    |-- calib
        ...
    |-- label
        ...
    |-- post
        ...
|-- outputs (default, can be changed from GUI)
|-- logs (default, can be changed from GUI)
  • base_config.yaml: A YAML file that contains the basic configuration of the pipeline. It includes the input data sources and their types, output directories, built-in data processing functions and their parameters, visualizer settings, log directory, and other general settings.

  • data_handler: A directory that contains subdirectories for different types of data handlers. Each subdirectory contains Python files for handling specific types of data. The subdirectories are:

    • calib: Calibration data handlers for different datasets.

    • label: Label data handlers for different datasets.

  • algo: A directory that contains subdirectories for different types of components. Each subdirectory contains YAML and Python files for the components. The subdirectories are:

    • pre: Preprocessing components that are executed before the main processing.

    • lidar: LIDAR data processing components.

    • camera: Camera data processing components.

    • calib: Calibration data processing components.

    • label: Label data processing components.

    • post: Postprocessing components that are executed after the main processing.

  • outputs: A directory where the output data generated by the pipeline is stored. The default output directory can be changed from the GUI.

  • logs: A directory where the log files generated during the execution of the pipeline are stored. The default log directory can be changed from the GUI.

Standard Functions Structure

Data Handlers

Data handlers are responsible for reading calibration and label data. Each data handler is defined by a Python script that implements the data reading and processing functions for a specific dataset.

Calibration Data Handler

The calibration data handler script should have a variable calib_file_extension that specifies the extension of the calibration files in the calib_subdir directory. The Handler function in the script reads the calibration file and generates a dictionary containing the calibration data in KITTI format. The function signature should be as follows:

import os
import numpy as np

calib_file_extension = '.txt' # this tells the extension of the calibration files in directory given by base_config['data']['calib_subdir']


def Handler(calib_path: str): # don't change the function signature
    """
    Process a calibration file and generate a dictionary containing the calibration data in KITTI format.

    Args:
        calib_path (str): The path to the calibration file.

    Returns:
        dict: A dictionary containing the calibration data.
    """
    calib = {}

    #################################################################################
    # basic code snippet to read the calibration file, uncomment and modify as needed
    #################################################################################
    # if not os.path.exists(calib_path): return calib
    # with open(calib_path, 'r') as f: clbs = f.readlines()
    # for line in clbs:
    #     line = line.strip()
    #     if len(line) == 0 or line.startswith('#'): continue
    #     # parse your calibs
    #     calib['P2'] = ? # 3x4 projection matrix
    #     calib['R0_rect'] = ? # 4x4 rectification matrix
    #     calib['Tr_velo_to_cam'] = ? # 4x4 transformation matrix from lidar to camera

    return calib # make sure to return the calibration data dictionary, even if it is empty

Label Data Handler

The label data handler script should have a variable label_file_extension that specifies the extension of the label files in the label_subdir directory. The Handler function in the script reads the label file and generates a list of labels.

Note: Currently, only bbox_3d and bbox_2d are implemented. For segmentation and other types of labels, it is recommended to use current_label_path in data_dict to read your segmentation labels and create current_point_cloud_point_colors (a numpy array of shape current_point_cloud in data_dict) in data_dict inside your component functions; see Component Structure for more details.

The function signature should be as follows:

import os
import numpy as np

# colors for visualization, [R, G, B] in range [0.0, 1.0] where 0 is the darkest and 1 is the brightest
colors = {
    # 'CLASS_A': [0, 1, 0],
    # 'CLASS_B': [0, 0, 1],
    # ...
}
label_file_extension = '.txt' # this tells the extension of the label files in directory given by config['data']['label_subdir']

import os
import numpy as np

def Handler(label_path: str, calib_data: dict): # don't change the function signature
    """
    Process the label file and generate a list of labels.

    Args:
        label_path (str): Path to the label file.
        calib_data (dict): Calibration data.

    Returns:
        list: List of labels.

    """
    output = []

    ###############################################################################
    # basic code snippet to read the label file, uncomment and modify as needed
    ###############################################################################
    if calib_data is None: return output
    if not os.path.exists(label_path): return output
    with open(label_path, 'r') as f: lbls = f.readlines()
    
    ################################################################################
    # basic code snippet to populate the output list, uncomment and modify as needed
    ################################################################################
    # for line in lbls:
    #     parts = line.strip().split(' ')
    #     obj_class = ?
    #     # do your processing here
    #     # project to lidar coordinates and calculate xyz_center, xyz_extent, xyz_euler_angles
    #     xyz_center = ?
    #     xyz_extent = ?
    #     xyz_euler_angles = ?
    #     rgb_color = np.array(colors[obj_class], dtype=np.float32)
        
    #     # visualzer expect bbox_3d to be present in order to visualize the bounding boxes, so we add them here
    #     label = dict()
    #     label['bbox_3d'] = {
    #         'xyz_center': xyz_center,
    #         'xyz_extent': xyz_extent,
    #         'xyz_euler_angles': xyz_euler_angles,
    #         'rgb_color': rgb_color,
    #         'predicted': False # as it is a ground truth label
    #     }
    #     # Please note that bbox_2d are infered from bbox_3d, using calib_data dictionary by the visualizer.
    #     # If you need to manually add bbox_2d, you can add it as follows:
    #     # label['bbox_2d'] = {
    #     #     'xy_center': xy_center,
    #     #     'xy_extent': xy_extent,
    #     #     'rgb_color': rgb_color,
    #     #     'predicted': False # as it is a ground truth label
        
    #     output.append(label)
    
    return output # make sure to return the output list, even if it is empty

Algorithm Components

Each component in the pipeline is defined by two files: a YAML configuration file (.yml) and a Python script (.py); both must have the same base name (for example: test_component_a.yml <–> test_component_a.py). The YAML configuration file specifies the component’s parameters that are exposed to GUI, while the Python script contains the implementation of the component. The component structure is as follows:

YAML Configuration File

The YAML configuration file defines the component’s settings and parameters. It has the following structure:

# configuration for a function in LiGuard is defined in yaml format
FUNCTION_NAME: # name of the function
  enabled: False # bool -- adds the function to pipeline if True -- necessary parameter, don't remove
  priority: 3 # int -- priority of process, lower is higher -- necessary parameter, don't remove
  # parameters are defined as key-value pairs: param_name: param_value
  # `param_name` should always be a string and `param_value` can be any of these types: int, float, bool, str, list, dict
  # examples:
  # threshold: 0.5
  # do_average: true
  # active_classes: ["class_1", "class_2"]
  # score_weights: # a dictionary in yaml format
  #   class_1: 0.4
  #   class_2: 0.6
  # ...

Python Script

The Python script contains the implementation of the component. It must have a function with the same name as the component. The function should have the following structure:

#########################################################################################################################
# usually, the following imports are common for all the algorithms, so it is recommended to not remove them
import inspect
from liguard.gui.config_gui import resolve_for_application_root, resolve_for_default_workspace
from liguard.gui.logger_gui import Logger
from liguard.algo.utils import AlgoType, algo_func, get_algo_params, make_key
algo_type = AGLO_TYPE
#########################################################################################################################

@algo_func(required_data=[]) # add required keys in the list -- necessary decorator, don't remove
# following keys are standard to `LiGuard`:
# `current_point_cloud_path`, `current_point_cloud_numpy`, `current_image_path`, `current_image_numpy`, `current_calib_path`, `current_calib_data`, `current_label_path`, `current_label_list`
# one or more of the `LiGuard` standard keys can be added to `keys_required_in_data_dict` decorator, for example:
# @keys_required_in_data_dict(['current_point_cloud_numpy', 'current_image_numpy'])
# @keys_required_in_data_dict(['current_calib_data'])
# custom keys can also be added to `keys_required_in_data_dict` decorator if those are generated by any previous algorithm(s) in the pipeline, for example:
# @keys_required_in_data_dict(['custom_key_1', 'custom_key_2'])
def FUNCTION_NAME(data_dict: dict, cfg_dict: dict, logger: Logger):
    """
    A function to perform the algorithmic operations on the data.

    Args:
        data_dict (dict): A dictionary containing the data.
        cfg_dict (dict): A dictionary containing the configuration parameters.
        logger (gui.logger_gui.Logger): A logger object for logging messages and errors in GUI.
    """
    #########################################################################################################################
    # standard code snippet that gets the parameters from the config file and checks if required data is present in data_dict
    # usually, this snippet is common for all the algorithms, so it is recommended to not remove it
    algo_name = inspect.stack()[0].function
    params = get_algo_params(cfg_dict, algo_type, algo_name, logger)
    
    # check if required data is present in data_dict
    for key in FUNCTION_NAME.required_data:
        if key not in data_dict:
            logger.log(f'{key} not found in data_dict', Logger.ERROR)
            return
    # standard code snippet ends here
    #########################################################################################################################
    # imports
    # import numpy as np
    # ...
    
    # your code
    # def add(a, b): return a + b
    # result = add(params['a'], params['b'])

    # add results to data_dict
    # data_dict[f'{algo_name}_result'] = result

Here, FUNCTION_NAME is the name of the component, AGLO_TYPE is the type of the algorithm and its value is from an enum containing following entries AlgoType.PRE, AlgoType.LIDAR, AlgoType.CAMERA, AlgoType.CALIB, AlgoType.LABEL, AlgoType.POST. The required_data list in the decorator @algo_func contains the keys that are required in the data_dict for the algorithm to work. The data_dict contains the data that is passed between the components in the pipeline. The cfg_dict contains the configuration parameters defined in the YAML file. The logger object is used for logging messages and errors in the GUI.

Create Custom Data Handlers and Algorithm Components

  • Data Handlers

    • Calibration Data Handler The easiest way to add custom data handlers to LiGuard is as follows:

  • Under data->calib in Configuration window, click Create Custom Calib Data Handler button.

  • Enter the name of the handler and click OK. This will create the necessary directory stucture and files for the custom data handler. It’ll also open the Python script in the default editor in your OS.

  • Implement the data reading in the Python script by following the structure described above or in the comments in the script.

  • Save the file and click Reload button in the Configuration window to load the new data handler.

  • Under data->calib->clb_type in Configuration window, type the name of the custom data handler. Click Apply to use the custom data handler.

You can also do this manually by creating the Python script according to the structure described above. Make sure to follow the naming convention and the structure of the file to ensure that LiGuard can load and execute the data handler correctly.

- Label Data Handler

The easiest way to add custom data handlers to LiGuard is as follows:

  • Under data->label in Configuration window, click Create Custom Label Data Handler button.

  • Enter the name of the handler and click OK. This will create the necessary directory stucture and files for the custom data handler. It’ll also open the Python script in the default editor in your OS.

  • Implement the data reading in the Python script by following the structure described above or in the comments in the script.

  • Save the file and click Reload button in the Configuration window to load the new data handler.

  • Under data->label->lbl_type in Configuration window, type the name of the custom data handler. Click Apply to use the custom data handler.

You can also do this manually by creating the Python script according to the structure described above. Make sure to follow the naming convention and the structure of the file to ensure that LiGuard can load and execute the data handler correctly.

  • Algorithm Components

The easiest way to add custom components to LiGuard is as follows:

  • Under proc->lidar in Configuration window, click Create Custom Function button.

  • Enter the name of the function and click OK. This will create the necessary directory stucture and files for the custom component. It’ll also open the YAML configuration file and the Python script in the default editor in your OS.

  • Modify the YAML configuration file to define the parameters for the component.

  • Implement the algorithm in the Python script by following the structure described above or in the comments in the script.

  • Save the files and click Reload button in the Configuration window to load the new component, and then click Apply to execute it as part of the pipeline.

  • From now on, you can enable/disable this custom component, adjust its parameters, reorder it in the pipeline using priority parameter. Please make sure to click Apply after making any changes to the component to see the results.

You can also do this manually by creating the YAML configuration file and the Python script according to the structure described above. Make sure to follow the naming convention and the structure of the files to ensure that LiGuard can load and execute the components correctly.

Sharing Pipelines

To share a pipeline with others, you can simply zip the pipeline directory and share it. The recipient can then unzip the directory and load the pipeline in LiGuard. The recipient can also modify the pipeline by adding or removing components, adjusting parameters, or adding new components. The structured directory format ensures that the pipeline remains organized and easy to understand.