Source code for calib.file_io

import os
import glob
import time
import threading

calib_dir = os.path.dirname(os.path.realpath(__file__))

supported_calib_types = [clb_handler.split('_')[1].replace('.py','') for clb_handler in os.listdir(calib_dir) if 'handler' in clb_handler]

[docs] class FileIO: """ Class for handling file input/output operations related to calibration files. """ def __init__(self, cfg: dict): """ Initializes a FileIO object. Args: cfg (dict): Configuration dictionary containing parameters for file IO. Raises: NotImplementedError: If the calibration type is not supported. """ # get params from config self.cfg = cfg self.clb_dir = os.path.join(cfg['data']['path'], cfg['data']['calib_subdir']) self.clb_type = cfg['data']['calib']['clb_type'] self.clb_count = cfg['data']['size'] # Check if the calibration type is supported if self.clb_type not in supported_calib_types: raise NotImplementedError("Calib type not supported. Supported file types: " + ', '.join(supported_calib_types) + ".") # Import the calibration handler h = __import__('calib.handler_'+self.clb_type, fromlist=['calib_file_extension', 'Handler']) self.clb_ext, self.reader = h.calib_file_extension, h.Handler # read all the calibration files files = glob.glob(os.path.join(self.clb_dir, '*' + self.clb_ext)) file_basenames = [os.path.splitext(os.path.basename(file))[0] for file in files] file_basenames.sort(key=lambda file_name: int(''.join(filter(str.isdigit, file_name)))) self.files_basenames = file_basenames[:self.clb_count] # read the calibration files in async mode self.data_lock = threading.Lock() self.data = [] self.stop = threading.Event() threading.Thread(target=self.__async_read_fn__).start()
[docs] def get_abs_path(self, idx: int): """ Get the absolute path of the calibration file at the specified index. Args: idx (int): Index of the calibration file. Returns: str: Absolute path of the calibration file. """ clb_path = os.path.join(self.clb_dir, self.files_basenames[idx] + self.clb_ext) return clb_path
def __async_read_fn__(self): """ Asynchronously read all the calibration files. """ for idx in range(len(self.files_basenames)): if self.stop.is_set(): break clb_abs_path = self.get_abs_path(idx) calib = self.reader(clb_abs_path) with self.data_lock: self.data.append((clb_abs_path, calib)) time.sleep(self.cfg['threads']['io_sleep']) def __len__(self): """ Get the number of calibration files. Returns: int: Number of calibration files. """ return len(self.files_basenames) def __getitem__(self, idx): """ Get the calibration file at the specified index. Args: idx: Index of the calibration file. Returns: tuple: Tuple containing the absolute path of the calibration file and the calibration data. """ try: with self.data_lock: return self.data[idx] except: clb_abs_path = self.get_abs_path(idx) calib = self.reader(clb_abs_path) return (clb_abs_path, calib)
[docs] def close(self): """ Stop the asynchronous read thread. """ self.stop.set()