gaitsetpy
GaitSetPy - A Python package for gait analysis and recognition.
This package provides a comprehensive toolkit for gait data analysis with both a modern class-based architecture and legacy function-based API for backward compatibility.
Features:
- Modular architecture with singleton design pattern
- Plugin-based system for easy extension
- Comprehensive dataset loaders (Daphnet, MobiFall, Arduous, PhysioNet)
- Feature extraction and preprocessing pipelines
- Machine learning models for classification
- Exploratory data analysis tools
- Backward compatibility with legacy API
Architecture:
- Core: Base classes and singleton managers
- Dataset: Data loading and preprocessing
- Features: Feature extraction and analysis
- Preprocessing: Data cleaning and transformation
- EDA: Exploratory data analysis and visualization
- Classification: Machine learning models and evaluation
Maintainer: @aharshit123456
1""" 2GaitSetPy - A Python package for gait analysis and recognition. 3 4This package provides a comprehensive toolkit for gait data analysis with both 5a modern class-based architecture and legacy function-based API for backward compatibility. 6 7Features: 8- Modular architecture with singleton design pattern 9- Plugin-based system for easy extension 10- Comprehensive dataset loaders (Daphnet, MobiFall, Arduous, PhysioNet) 11- Feature extraction and preprocessing pipelines 12- Machine learning models for classification 13- Exploratory data analysis tools 14- Backward compatibility with legacy API 15 16Architecture: 17- Core: Base classes and singleton managers 18- Dataset: Data loading and preprocessing 19- Features: Feature extraction and analysis 20- Preprocessing: Data cleaning and transformation 21- EDA: Exploratory data analysis and visualization 22- Classification: Machine learning models and evaluation 23 24Maintainer: @aharshit123456 25""" 26 27# Core architecture components 28from .core import ( 29 BaseDatasetLoader, 30 BaseFeatureExtractor, 31 BasePreprocessor, 32 BaseEDAAnalyzer, 33 BaseClassificationModel, 34 DatasetManager, 35 FeatureManager, 36 PreprocessingManager, 37 EDAManager, 38 ClassificationManager 39) 40 41# New class-based API 42from .dataset import ( 43 DaphnetLoader, 44 MobiFallLoader, 45 ArduousLoader, 46 PhysioNetLoader, 47 HARUPLoader, 48 get_dataset_manager, 49 get_available_datasets, 50 load_dataset 51) 52 53from .features import ( 54 GaitFeatureExtractor, 55 LBPFeatureExtractor, 56 FourierSeriesFeatureExtractor, 57 PhysioNetFeatureExtractor, 58 get_feature_manager, 59 get_available_extractors, 60 extract_features 61) 62 63from .preprocessing import ( 64 ClippingPreprocessor, 65 NoiseRemovalPreprocessor, 66 OutlierRemovalPreprocessor, 67 BaselineRemovalPreprocessor, 68 DriftRemovalPreprocessor, 69 HighFrequencyNoiseRemovalPreprocessor, 70 LowFrequencyNoiseRemovalPreprocessor, 71 ArtifactRemovalPreprocessor, 72 TrendRemovalPreprocessor, 73 DCOffsetRemovalPreprocessor, 74 get_preprocessing_manager, 75 get_available_preprocessors, 76 preprocess_data, 77 create_preprocessing_pipeline 78) 79 80from .eda import ( 81 DaphnetVisualizationAnalyzer, 82 SensorStatisticsAnalyzer, 83 get_eda_manager, 84 get_available_analyzers, 85 analyze_data, 86 visualize_data, 87 plot_daphnet_data, 88 analyze_sensor_statistics, 89 plot_sensor_features 90) 91 92from .classification import ( 93 RandomForestModel, 94 get_classification_manager, 95 get_available_models, 96 train_model, 97 predict, 98 evaluate_model_performance, 99 create_random_forest, 100 train_random_forest 101) 102 103# Legacy API for backward compatibility 104# Explicitly import all public exports from submodules instead of using wildcard imports 105# This improves code clarity and makes it easier to track what's being exported 106 107# Dataset legacy functions 108from .dataset import ( 109 load_daphnet_data, 110 create_sliding_windows, 111 load_mobifall_data, 112 load_arduous_data, 113 load_physionet_data, 114 create_physionet_windows, 115 load_harup_data, 116 create_harup_windows, 117 extract_harup_features, 118 download_dataset, 119 extract_dataset, 120 sliding_window 121) 122 123# Features legacy functions 124from .features import ( 125 calculate_mean, 126 calculate_standard_deviation, 127 calculate_variance, 128 calculate_skewness, 129 calculate_kurtosis, 130 calculate_root_mean_square, 131 calculate_range, 132 calculate_median, 133 calculate_mode, 134 calculate_mean_absolute_value, 135 calculate_median_absolute_deviation, 136 calculate_peak_height, 137 calculate_stride_times, 138 calculate_step_time, 139 calculate_cadence, 140 calculate_freezing_index, 141 calculate_dominant_frequency, 142 calculate_peak_frequency, 143 calculate_power_spectral_entropy, 144 calculate_principal_harmonic_frequency, 145 calculate_entropy, 146 calculate_interquartile_range, 147 calculate_correlation, 148 calculate_auto_regression_coefficients, 149 get_mean_for_windows, 150 get_standard_deviation_for_windows, 151 get_variance_for_windows 152) 153 154# Preprocessing legacy functions 155from .preprocessing import ( 156 clip_sliding_windows, 157 remove_noise, 158 remove_outliers, 159 remove_baseline, 160 remove_drift, 161 remove_artifacts, 162 remove_trend, 163 remove_dc_offset, 164 remove_high_frequency_noise, 165 remove_low_frequency_noise 166) 167 168# EDA legacy functions 169from .eda import ( 170 plot_thigh_data, 171 plot_shank_data, 172 plot_trunk_data, 173 plot_all_data, 174 plot_all_thigh_data, 175 plot_all_shank_data, 176 plot_all_trunk_data, 177 plot_all_datasets, 178 plot_sensor_with_features 179) 180 181# Classification legacy functions 182from .classification import ( 183 create_random_forest_model, 184 preprocess_features, 185 evaluate_model 186) 187 188# Import version from single source of truth 189from ._version import __version__, get_version, get_version_info, get_release_info 190__author__ = "Harshit Agarwal | Alohomora Labs" 191 192# Convenient access to all managers 193def get_all_managers(): 194 """ 195 Get all singleton managers. 196 197 Returns: 198 Dictionary containing all manager instances 199 """ 200 return { 201 'dataset': DatasetManager(), 202 'feature': FeatureManager(), 203 'preprocessing': PreprocessingManager(), 204 'eda': EDAManager(), 205 'classification': ClassificationManager() 206 } 207 208# System information 209def get_system_info(): 210 """ 211 Get information about the available components in the system. 212 213 Returns: 214 Dictionary containing system information 215 """ 216 return { 217 'version': __version__, 218 'author': __author__, 219 'available_datasets': get_available_datasets(), 220 'available_extractors': get_available_extractors(), 221 'available_preprocessors': get_available_preprocessors(), 222 'available_analyzers': get_available_analyzers(), 223 'available_models': get_available_models(), 224 'architecture': 'Modular with singleton design pattern' 225 } 226 227# Shortcut functions for common workflows 228def load_and_analyze_daphnet(data_dir: str, sensor_type: str = 'all', window_size: int = 192): 229 """ 230 Complete workflow for loading and analyzing Daphnet data. 231 232 Args: 233 data_dir: Directory containing the Daphnet dataset 234 sensor_type: Type of sensor to analyze ('all', 'thigh', 'shank', 'trunk') 235 window_size: Size of sliding windows for feature extraction 236 237 Returns: 238 Dictionary containing data, features, and analysis results 239 """ 240 # Load dataset 241 loader = DaphnetLoader() 242 data, names = loader.load_data(data_dir) 243 244 # Create sliding windows 245 windows = loader.create_sliding_windows(data, names, window_size=window_size) 246 247 # Extract features 248 extractor = GaitFeatureExtractor() 249 features = extractor.extract_features(windows[0]['windows'], fs=64) 250 251 # Analyze data 252 analyzer = DaphnetVisualizationAnalyzer() 253 analysis = analyzer.analyze(data) 254 255 return { 256 'data': data, 257 'names': names, 258 'windows': windows, 259 'features': features, 260 'analysis': analysis, 261 'loader': loader, 262 'extractor': extractor, 263 'analyzer': analyzer 264 } 265 266def load_and_analyze_physionet(data_dir: str, window_size: int = 600, step_size: int = 100): 267 """ 268 Complete workflow for loading and analyzing PhysioNet VGRF data. 269 270 Args: 271 data_dir: Directory to store/find the PhysioNet dataset 272 window_size: Size of sliding windows for feature extraction (default: 600) 273 step_size: Step size for sliding windows (default: 100) 274 275 Returns: 276 Dictionary containing data, features, and analysis results 277 """ 278 # Load dataset 279 loader = PhysioNetLoader() 280 data, names = loader.load_data(data_dir) 281 282 # Create sliding windows 283 windows = loader.create_sliding_windows(data, names, window_size=window_size, step_size=step_size) 284 285 # Extract PhysioNet-specific features 286 extractor = PhysioNetFeatureExtractor() 287 all_features = [] 288 289 for window_dict in windows: 290 if 'windows' in window_dict: 291 features = extractor.extract_features(window_dict['windows'], fs=100) 292 all_features.append({ 293 'name': window_dict['name'], 294 'features': features, 295 'metadata': window_dict.get('metadata', {}) 296 }) 297 298 return { 299 'data': data, 300 'names': names, 301 'windows': windows, 302 'features': all_features, 303 'labels': loader.get_labels(), 304 'loader': loader, 305 'extractor': extractor 306 } 307 308def train_gait_classifier(features, model_type: str = 'random_forest', **kwargs): 309 """ 310 Train a gait classification model. 311 312 Args: 313 features: List of feature dictionaries 314 model_type: Type of model to train ('random_forest', etc.) 315 **kwargs: Additional arguments for model training 316 317 Returns: 318 Trained model instance 319 """ 320 if model_type == 'random_forest': 321 model = RandomForestModel(**kwargs) 322 model.train(features, **kwargs) 323 return model 324 else: 325 raise ValueError(f"Model type '{model_type}' not supported") 326 327__all__ = [ 328 # Core architecture 329 'BaseDatasetLoader', 330 'BaseFeatureExtractor', 331 'BasePreprocessor', 332 'BaseEDAAnalyzer', 333 'BaseClassificationModel', 334 'DatasetManager', 335 'FeatureManager', 336 'PreprocessingManager', 337 'EDAManager', 338 'ClassificationManager', 339 340 # New class-based API 341 'DaphnetLoader', 342 'MobiFallLoader', 343 'ArduousLoader', 344 'PhysioNetLoader', 345 'GaitFeatureExtractor', 346 'LBPFeatureExtractor', 347 'FourierSeriesFeatureExtractor', 348 'PhysioNetFeatureExtractor', 349 'ClippingPreprocessor', 350 'NoiseRemovalPreprocessor', 351 'OutlierRemovalPreprocessor', 352 'BaselineRemovalPreprocessor', 353 'DriftRemovalPreprocessor', 354 'HighFrequencyNoiseRemovalPreprocessor', 355 'LowFrequencyNoiseRemovalPreprocessor', 356 'ArtifactRemovalPreprocessor', 357 'TrendRemovalPreprocessor', 358 'DCOffsetRemovalPreprocessor', 359 'DaphnetVisualizationAnalyzer', 360 'SensorStatisticsAnalyzer', 361 'RandomForestModel', 362 363 # Manager access functions 364 'get_dataset_manager', 365 'get_feature_manager', 366 'get_preprocessing_manager', 367 'get_eda_manager', 368 'get_classification_manager', 369 'get_all_managers', 370 371 # Utility functions 372 'get_available_datasets', 373 'get_available_extractors', 374 'get_available_preprocessors', 375 'get_available_analyzers', 376 'get_available_models', 377 'get_system_info', 378 379 # Workflow functions 380 'load_and_analyze_daphnet', 381 'load_and_analyze_physionet', 382 'train_gait_classifier', 383 384 # Legacy dataset functions 385 'load_daphnet_data', 386 'create_sliding_windows', 387 'load_mobifall_data', 388 'load_arduous_data', 389 'load_physionet_data', 390 'create_physionet_windows', 391 'load_harup_data', 392 'create_harup_windows', 393 'extract_harup_features', 394 'download_dataset', 395 'extract_dataset', 396 'sliding_window', 397 398 # Legacy feature functions 399 'calculate_mean', 400 'calculate_standard_deviation', 401 'calculate_variance', 402 'calculate_skewness', 403 'calculate_kurtosis', 404 'calculate_root_mean_square', 405 'calculate_range', 406 'calculate_median', 407 'calculate_mode', 408 'calculate_mean_absolute_value', 409 'calculate_median_absolute_deviation', 410 'calculate_peak_height', 411 'calculate_stride_times', 412 'calculate_step_time', 413 'calculate_cadence', 414 'calculate_freezing_index', 415 'calculate_dominant_frequency', 416 'calculate_peak_frequency', 417 'calculate_power_spectral_entropy', 418 'calculate_principal_harmonic_frequency', 419 'calculate_entropy', 420 'calculate_interquartile_range', 421 'calculate_correlation', 422 'calculate_auto_regression_coefficients', 423 'get_mean_for_windows', 424 'get_standard_deviation_for_windows', 425 'get_variance_for_windows', 426 427 # Legacy preprocessing functions 428 'clip_sliding_windows', 429 'remove_noise', 430 'remove_outliers', 431 'remove_baseline', 432 'remove_drift', 433 'remove_artifacts', 434 'remove_trend', 435 'remove_dc_offset', 436 'remove_high_frequency_noise', 437 'remove_low_frequency_noise', 438 439 # Legacy EDA functions 440 'plot_thigh_data', 441 'plot_shank_data', 442 'plot_trunk_data', 443 'plot_all_data', 444 'plot_all_thigh_data', 445 'plot_all_shank_data', 446 'plot_all_trunk_data', 447 'plot_all_datasets', 448 'plot_sensor_with_features', 449 450 # Legacy classification functions 451 'create_random_forest_model', 452 'preprocess_features', 453 'evaluate_model', 454]
21class BaseDatasetLoader(ABC): 22 """ 23 Base class for all dataset loaders. 24 25 All dataset loaders should inherit from this class and implement the required methods. 26 This class provides thread-safe concurrent downloading capabilities for efficient data retrieval. 27 """ 28 29 def __init__(self, name: str, description: str = "", max_workers: int = 8): 30 """ 31 Initialize the dataset loader. 32 33 Args: 34 name: Name of the dataset 35 description: Description of the dataset 36 max_workers: Maximum number of concurrent download threads (default: 8) 37 """ 38 self.name = name 39 self.description = description 40 self.data = None 41 self.metadata = {} 42 self.max_workers = max_workers 43 self._download_stats = {'success': 0, 'failed': 0, 'skipped': 0} 44 45 @abstractmethod 46 def load_data(self, data_dir: str, **kwargs) -> Tuple[List[pd.DataFrame], List[str]]: 47 """ 48 Load dataset from the specified directory. 49 50 Args: 51 data_dir: Directory containing the dataset 52 **kwargs: Additional arguments specific to the dataset 53 54 Returns: 55 Tuple of (data_list, names_list) 56 """ 57 pass 58 59 @abstractmethod 60 def create_sliding_windows(self, data: List[pd.DataFrame], names: List[str], 61 window_size: int = 192, step_size: int = 32) -> List[Dict]: 62 """ 63 Create sliding windows from the loaded data. 64 65 Args: 66 data: List of DataFrames 67 names: List of names corresponding to the data 68 window_size: Size of each sliding window 69 step_size: Step size for sliding windows 70 71 Returns: 72 List of dictionaries containing sliding windows 73 """ 74 pass 75 76 @abstractmethod 77 def get_supported_formats(self) -> List[str]: 78 """ 79 Get list of supported file formats. 80 81 Returns: 82 List of supported file extensions 83 """ 84 pass 85 86 def _download_file(self, url: str, dest_path: str, 87 chunk_size: int = 8192, timeout: int = 30) -> Tuple[bool, str]: 88 """ 89 Download a single file from URL to destination path. 90 91 This method is thread-safe and can be called concurrently. 92 93 Args: 94 url: URL to download from 95 dest_path: Destination file path 96 chunk_size: Size of chunks to download (default: 8192 bytes) 97 timeout: Request timeout in seconds (default: 30) 98 99 Returns: 100 Tuple of (success: bool, message: str) 101 """ 102 try: 103 # Check if file already exists 104 if os.path.exists(dest_path): 105 self._download_stats['skipped'] += 1 106 return True, f"File already exists: {dest_path}" 107 108 # Make the request 109 response = requests.get(url, stream=True, timeout=timeout) 110 111 if response.status_code == 200: 112 # Ensure parent directory exists 113 os.makedirs(os.path.dirname(dest_path) if os.path.dirname(dest_path) else '.', exist_ok=True) 114 115 # Write file in chunks 116 with open(dest_path, 'wb') as f: 117 for chunk in response.iter_content(chunk_size=chunk_size): 118 if chunk: 119 f.write(chunk) 120 121 self._download_stats['success'] += 1 122 return True, f"Successfully downloaded: {dest_path}" 123 else: 124 self._download_stats['failed'] += 1 125 return False, f"HTTP {response.status_code}: {url}" 126 127 except requests.exceptions.Timeout: 128 self._download_stats['failed'] += 1 129 return False, f"Timeout downloading: {url}" 130 except requests.exceptions.RequestException as e: 131 self._download_stats['failed'] += 1 132 return False, f"Request error for {url}: {str(e)}" 133 except IOError as e: 134 self._download_stats['failed'] += 1 135 return False, f"IO error for {dest_path}: {str(e)}" 136 except Exception as e: 137 self._download_stats['failed'] += 1 138 return False, f"Unexpected error for {url}: {str(e)}" 139 140 def download_files_concurrent(self, 141 download_tasks: List[Dict[str, str]], 142 show_progress: bool = True, 143 desc: str = "Downloading files") -> Dict[str, Any]: 144 """ 145 Download multiple files concurrently using a thread pool. 146 147 Args: 148 download_tasks: List of dicts with 'url' and 'dest_path' keys 149 show_progress: Whether to show progress bar (default: True) 150 desc: Description for progress bar 151 152 Returns: 153 Dictionary with download statistics and results 154 155 Example: 156 tasks = [ 157 {'url': 'http://example.com/file1.txt', 'dest_path': '/path/to/file1.txt'}, 158 {'url': 'http://example.com/file2.txt', 'dest_path': '/path/to/file2.txt'} 159 ] 160 results = loader.download_files_concurrent(tasks) 161 """ 162 # Reset stats 163 self._download_stats = {'success': 0, 'failed': 0, 'skipped': 0} 164 165 results = [] 166 failed_downloads = [] 167 168 with ThreadPoolExecutor(max_workers=self.max_workers) as executor: 169 # Submit all download tasks 170 future_to_task = { 171 executor.submit(self._download_file, task['url'], task['dest_path']): task 172 for task in download_tasks 173 } 174 175 # Process completed tasks with optional progress bar 176 if show_progress: 177 futures = tqdm(as_completed(future_to_task), 178 total=len(download_tasks), 179 desc=desc) 180 else: 181 futures = as_completed(future_to_task) 182 183 for future in futures: 184 task = future_to_task[future] 185 try: 186 success, message = future.result() 187 results.append({ 188 'url': task['url'], 189 'dest_path': task['dest_path'], 190 'success': success, 191 'message': message 192 }) 193 194 if not success: 195 failed_downloads.append({ 196 'url': task['url'], 197 'dest_path': task['dest_path'], 198 'error': message 199 }) 200 201 except Exception as e: 202 error_msg = f"Exception during download: {str(e)}" 203 results.append({ 204 'url': task['url'], 205 'dest_path': task['dest_path'], 206 'success': False, 207 'message': error_msg 208 }) 209 failed_downloads.append({ 210 'url': task['url'], 211 'dest_path': task['dest_path'], 212 'error': error_msg 213 }) 214 215 # Return comprehensive results 216 return { 217 'total': len(download_tasks), 218 'success': self._download_stats['success'], 219 'failed': self._download_stats['failed'], 220 'skipped': self._download_stats['skipped'], 221 'failed_downloads': failed_downloads, 222 'all_results': results 223 } 224 225 def set_max_workers(self, max_workers: int): 226 """ 227 Set the maximum number of concurrent download threads. 228 229 Args: 230 max_workers: Maximum number of threads (must be positive) 231 """ 232 if max_workers < 1: 233 raise ValueError("max_workers must be at least 1") 234 self.max_workers = max_workers 235 236 def get_download_stats(self) -> Dict[str, int]: 237 """ 238 Get statistics from the last download operation. 239 240 Returns: 241 Dictionary with success, failed, and skipped counts 242 """ 243 return self._download_stats.copy() 244 245 def get_info(self) -> Dict[str, Any]: 246 """ 247 Get information about the dataset. 248 249 Returns: 250 Dictionary containing dataset information 251 """ 252 return { 253 'name': self.name, 254 'description': self.description, 255 'metadata': self.metadata, 256 'supported_formats': self.get_supported_formats(), 257 'max_workers': self.max_workers 258 }
Base class for all dataset loaders.
All dataset loaders should inherit from this class and implement the required methods. This class provides thread-safe concurrent downloading capabilities for efficient data retrieval.
29 def __init__(self, name: str, description: str = "", max_workers: int = 8): 30 """ 31 Initialize the dataset loader. 32 33 Args: 34 name: Name of the dataset 35 description: Description of the dataset 36 max_workers: Maximum number of concurrent download threads (default: 8) 37 """ 38 self.name = name 39 self.description = description 40 self.data = None 41 self.metadata = {} 42 self.max_workers = max_workers 43 self._download_stats = {'success': 0, 'failed': 0, 'skipped': 0}
Initialize the dataset loader.
Args: name: Name of the dataset description: Description of the dataset max_workers: Maximum number of concurrent download threads (default: 8)
45 @abstractmethod 46 def load_data(self, data_dir: str, **kwargs) -> Tuple[List[pd.DataFrame], List[str]]: 47 """ 48 Load dataset from the specified directory. 49 50 Args: 51 data_dir: Directory containing the dataset 52 **kwargs: Additional arguments specific to the dataset 53 54 Returns: 55 Tuple of (data_list, names_list) 56 """ 57 pass
Load dataset from the specified directory.
Args: data_dir: Directory containing the dataset **kwargs: Additional arguments specific to the dataset
Returns: Tuple of (data_list, names_list)
59 @abstractmethod 60 def create_sliding_windows(self, data: List[pd.DataFrame], names: List[str], 61 window_size: int = 192, step_size: int = 32) -> List[Dict]: 62 """ 63 Create sliding windows from the loaded data. 64 65 Args: 66 data: List of DataFrames 67 names: List of names corresponding to the data 68 window_size: Size of each sliding window 69 step_size: Step size for sliding windows 70 71 Returns: 72 List of dictionaries containing sliding windows 73 """ 74 pass
Create sliding windows from the loaded data.
Args: data: List of DataFrames names: List of names corresponding to the data window_size: Size of each sliding window step_size: Step size for sliding windows
Returns: List of dictionaries containing sliding windows
76 @abstractmethod 77 def get_supported_formats(self) -> List[str]: 78 """ 79 Get list of supported file formats. 80 81 Returns: 82 List of supported file extensions 83 """ 84 pass
Get list of supported file formats.
Returns: List of supported file extensions
140 def download_files_concurrent(self, 141 download_tasks: List[Dict[str, str]], 142 show_progress: bool = True, 143 desc: str = "Downloading files") -> Dict[str, Any]: 144 """ 145 Download multiple files concurrently using a thread pool. 146 147 Args: 148 download_tasks: List of dicts with 'url' and 'dest_path' keys 149 show_progress: Whether to show progress bar (default: True) 150 desc: Description for progress bar 151 152 Returns: 153 Dictionary with download statistics and results 154 155 Example: 156 tasks = [ 157 {'url': 'http://example.com/file1.txt', 'dest_path': '/path/to/file1.txt'}, 158 {'url': 'http://example.com/file2.txt', 'dest_path': '/path/to/file2.txt'} 159 ] 160 results = loader.download_files_concurrent(tasks) 161 """ 162 # Reset stats 163 self._download_stats = {'success': 0, 'failed': 0, 'skipped': 0} 164 165 results = [] 166 failed_downloads = [] 167 168 with ThreadPoolExecutor(max_workers=self.max_workers) as executor: 169 # Submit all download tasks 170 future_to_task = { 171 executor.submit(self._download_file, task['url'], task['dest_path']): task 172 for task in download_tasks 173 } 174 175 # Process completed tasks with optional progress bar 176 if show_progress: 177 futures = tqdm(as_completed(future_to_task), 178 total=len(download_tasks), 179 desc=desc) 180 else: 181 futures = as_completed(future_to_task) 182 183 for future in futures: 184 task = future_to_task[future] 185 try: 186 success, message = future.result() 187 results.append({ 188 'url': task['url'], 189 'dest_path': task['dest_path'], 190 'success': success, 191 'message': message 192 }) 193 194 if not success: 195 failed_downloads.append({ 196 'url': task['url'], 197 'dest_path': task['dest_path'], 198 'error': message 199 }) 200 201 except Exception as e: 202 error_msg = f"Exception during download: {str(e)}" 203 results.append({ 204 'url': task['url'], 205 'dest_path': task['dest_path'], 206 'success': False, 207 'message': error_msg 208 }) 209 failed_downloads.append({ 210 'url': task['url'], 211 'dest_path': task['dest_path'], 212 'error': error_msg 213 }) 214 215 # Return comprehensive results 216 return { 217 'total': len(download_tasks), 218 'success': self._download_stats['success'], 219 'failed': self._download_stats['failed'], 220 'skipped': self._download_stats['skipped'], 221 'failed_downloads': failed_downloads, 222 'all_results': results 223 }
Download multiple files concurrently using a thread pool.
Args: download_tasks: List of dicts with 'url' and 'dest_path' keys show_progress: Whether to show progress bar (default: True) desc: Description for progress bar
Returns: Dictionary with download statistics and results
Example: tasks = [ {'url': 'http://example.com/file1.txt', 'dest_path': '/path/to/file1.txt'}, {'url': 'http://example.com/file2.txt', 'dest_path': '/path/to/file2.txt'} ] results = loader.download_files_concurrent(tasks)
225 def set_max_workers(self, max_workers: int): 226 """ 227 Set the maximum number of concurrent download threads. 228 229 Args: 230 max_workers: Maximum number of threads (must be positive) 231 """ 232 if max_workers < 1: 233 raise ValueError("max_workers must be at least 1") 234 self.max_workers = max_workers
Set the maximum number of concurrent download threads.
Args: max_workers: Maximum number of threads (must be positive)
236 def get_download_stats(self) -> Dict[str, int]: 237 """ 238 Get statistics from the last download operation. 239 240 Returns: 241 Dictionary with success, failed, and skipped counts 242 """ 243 return self._download_stats.copy()
Get statistics from the last download operation.
Returns: Dictionary with success, failed, and skipped counts
245 def get_info(self) -> Dict[str, Any]: 246 """ 247 Get information about the dataset. 248 249 Returns: 250 Dictionary containing dataset information 251 """ 252 return { 253 'name': self.name, 254 'description': self.description, 255 'metadata': self.metadata, 256 'supported_formats': self.get_supported_formats(), 257 'max_workers': self.max_workers 258 }
Get information about the dataset.
Returns: Dictionary containing dataset information
261class BaseFeatureExtractor(ABC): 262 """ 263 Base class for all feature extractors. 264 265 All feature extractors should inherit from this class and implement the required methods. 266 """ 267 268 def __init__(self, name: str, description: str = ""): 269 """ 270 Initialize the feature extractor. 271 272 Args: 273 name: Name of the feature extractor 274 description: Description of the feature extractor 275 """ 276 self.name = name 277 self.description = description 278 self.config = {} 279 280 @abstractmethod 281 def extract_features(self, windows: List[Dict], fs: int, **kwargs) -> List[Dict]: 282 """ 283 Extract features from sliding windows. 284 285 Args: 286 windows: List of sliding window dictionaries 287 fs: Sampling frequency 288 **kwargs: Additional arguments for feature extraction 289 290 Returns: 291 List of feature dictionaries 292 """ 293 pass 294 295 @abstractmethod 296 def get_feature_names(self) -> List[str]: 297 """ 298 Get names of features extracted by this extractor. 299 300 Returns: 301 List of feature names 302 """ 303 pass 304 305 def configure(self, config: Dict[str, Any]): 306 """ 307 Configure the feature extractor. 308 309 Args: 310 config: Configuration dictionary 311 """ 312 self.config.update(config) 313 314 def get_info(self) -> Dict[str, Any]: 315 """ 316 Get information about the feature extractor. 317 318 Returns: 319 Dictionary containing feature extractor information 320 """ 321 return { 322 'name': self.name, 323 'description': self.description, 324 'config': self.config, 325 'feature_names': self.get_feature_names() 326 }
Base class for all feature extractors.
All feature extractors should inherit from this class and implement the required methods.
268 def __init__(self, name: str, description: str = ""): 269 """ 270 Initialize the feature extractor. 271 272 Args: 273 name: Name of the feature extractor 274 description: Description of the feature extractor 275 """ 276 self.name = name 277 self.description = description 278 self.config = {}
Initialize the feature extractor.
Args: name: Name of the feature extractor description: Description of the feature extractor
280 @abstractmethod 281 def extract_features(self, windows: List[Dict], fs: int, **kwargs) -> List[Dict]: 282 """ 283 Extract features from sliding windows. 284 285 Args: 286 windows: List of sliding window dictionaries 287 fs: Sampling frequency 288 **kwargs: Additional arguments for feature extraction 289 290 Returns: 291 List of feature dictionaries 292 """ 293 pass
Extract features from sliding windows.
Args: windows: List of sliding window dictionaries fs: Sampling frequency **kwargs: Additional arguments for feature extraction
Returns: List of feature dictionaries
295 @abstractmethod 296 def get_feature_names(self) -> List[str]: 297 """ 298 Get names of features extracted by this extractor. 299 300 Returns: 301 List of feature names 302 """ 303 pass
Get names of features extracted by this extractor.
Returns: List of feature names
305 def configure(self, config: Dict[str, Any]): 306 """ 307 Configure the feature extractor. 308 309 Args: 310 config: Configuration dictionary 311 """ 312 self.config.update(config)
Configure the feature extractor.
Args: config: Configuration dictionary
314 def get_info(self) -> Dict[str, Any]: 315 """ 316 Get information about the feature extractor. 317 318 Returns: 319 Dictionary containing feature extractor information 320 """ 321 return { 322 'name': self.name, 323 'description': self.description, 324 'config': self.config, 325 'feature_names': self.get_feature_names() 326 }
Get information about the feature extractor.
Returns: Dictionary containing feature extractor information
329class BasePreprocessor(ABC): 330 """ 331 Base class for all preprocessors. 332 333 All preprocessors should inherit from this class and implement the required methods. 334 """ 335 336 def __init__(self, name: str, description: str = ""): 337 """ 338 Initialize the preprocessor. 339 340 Args: 341 name: Name of the preprocessor 342 description: Description of the preprocessor 343 """ 344 self.name = name 345 self.description = description 346 self.config = {} 347 self.fitted = False 348 349 @abstractmethod 350 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 351 """ 352 Fit the preprocessor to the data. 353 354 Args: 355 data: Input data to fit on 356 **kwargs: Additional arguments for fitting 357 """ 358 pass 359 360 @abstractmethod 361 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 362 """ 363 Transform the data using the fitted preprocessor. 364 365 Args: 366 data: Input data to transform 367 **kwargs: Additional arguments for transformation 368 369 Returns: 370 Transformed data 371 """ 372 pass 373 374 def fit_transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 375 """ 376 Fit the preprocessor and transform the data. 377 378 Args: 379 data: Input data to fit and transform 380 **kwargs: Additional arguments 381 382 Returns: 383 Transformed data 384 """ 385 self.fit(data, **kwargs) 386 return self.transform(data, **kwargs) 387 388 def configure(self, config: Dict[str, Any]): 389 """ 390 Configure the preprocessor. 391 392 Args: 393 config: Configuration dictionary 394 """ 395 self.config.update(config) 396 397 def get_info(self) -> Dict[str, Any]: 398 """ 399 Get information about the preprocessor. 400 401 Returns: 402 Dictionary containing preprocessor information 403 """ 404 return { 405 'name': self.name, 406 'description': self.description, 407 'config': self.config, 408 'fitted': self.fitted 409 }
Base class for all preprocessors.
All preprocessors should inherit from this class and implement the required methods.
336 def __init__(self, name: str, description: str = ""): 337 """ 338 Initialize the preprocessor. 339 340 Args: 341 name: Name of the preprocessor 342 description: Description of the preprocessor 343 """ 344 self.name = name 345 self.description = description 346 self.config = {} 347 self.fitted = False
Initialize the preprocessor.
Args: name: Name of the preprocessor description: Description of the preprocessor
349 @abstractmethod 350 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 351 """ 352 Fit the preprocessor to the data. 353 354 Args: 355 data: Input data to fit on 356 **kwargs: Additional arguments for fitting 357 """ 358 pass
Fit the preprocessor to the data.
Args: data: Input data to fit on **kwargs: Additional arguments for fitting
360 @abstractmethod 361 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 362 """ 363 Transform the data using the fitted preprocessor. 364 365 Args: 366 data: Input data to transform 367 **kwargs: Additional arguments for transformation 368 369 Returns: 370 Transformed data 371 """ 372 pass
Transform the data using the fitted preprocessor.
Args: data: Input data to transform **kwargs: Additional arguments for transformation
Returns: Transformed data
374 def fit_transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 375 """ 376 Fit the preprocessor and transform the data. 377 378 Args: 379 data: Input data to fit and transform 380 **kwargs: Additional arguments 381 382 Returns: 383 Transformed data 384 """ 385 self.fit(data, **kwargs) 386 return self.transform(data, **kwargs)
Fit the preprocessor and transform the data.
Args: data: Input data to fit and transform **kwargs: Additional arguments
Returns: Transformed data
388 def configure(self, config: Dict[str, Any]): 389 """ 390 Configure the preprocessor. 391 392 Args: 393 config: Configuration dictionary 394 """ 395 self.config.update(config)
Configure the preprocessor.
Args: config: Configuration dictionary
397 def get_info(self) -> Dict[str, Any]: 398 """ 399 Get information about the preprocessor. 400 401 Returns: 402 Dictionary containing preprocessor information 403 """ 404 return { 405 'name': self.name, 406 'description': self.description, 407 'config': self.config, 408 'fitted': self.fitted 409 }
Get information about the preprocessor.
Returns: Dictionary containing preprocessor information
412class BaseEDAAnalyzer(ABC): 413 """ 414 Base class for all EDA analyzers. 415 416 All EDA analyzers should inherit from this class and implement the required methods. 417 """ 418 419 def __init__(self, name: str, description: str = ""): 420 """ 421 Initialize the EDA analyzer. 422 423 Args: 424 name: Name of the EDA analyzer 425 description: Description of the EDA analyzer 426 """ 427 self.name = name 428 self.description = description 429 self.config = {} 430 431 @abstractmethod 432 def analyze(self, data: Union[pd.DataFrame, List[pd.DataFrame]], **kwargs) -> Dict[str, Any]: 433 """ 434 Perform analysis on the data. 435 436 Args: 437 data: Input data to analyze 438 **kwargs: Additional arguments for analysis 439 440 Returns: 441 Dictionary containing analysis results 442 """ 443 pass 444 445 @abstractmethod 446 def visualize(self, data: Union[pd.DataFrame, List[pd.DataFrame]], **kwargs): 447 """ 448 Create visualizations of the data. 449 450 Args: 451 data: Input data to visualize 452 **kwargs: Additional arguments for visualization 453 """ 454 pass 455 456 def configure(self, config: Dict[str, Any]): 457 """ 458 Configure the EDA analyzer. 459 460 Args: 461 config: Configuration dictionary 462 """ 463 self.config.update(config) 464 465 def get_info(self) -> Dict[str, Any]: 466 """ 467 Get information about the EDA analyzer. 468 469 Returns: 470 Dictionary containing EDA analyzer information 471 """ 472 return { 473 'name': self.name, 474 'description': self.description, 475 'config': self.config 476 }
Base class for all EDA analyzers.
All EDA analyzers should inherit from this class and implement the required methods.
419 def __init__(self, name: str, description: str = ""): 420 """ 421 Initialize the EDA analyzer. 422 423 Args: 424 name: Name of the EDA analyzer 425 description: Description of the EDA analyzer 426 """ 427 self.name = name 428 self.description = description 429 self.config = {}
Initialize the EDA analyzer.
Args: name: Name of the EDA analyzer description: Description of the EDA analyzer
431 @abstractmethod 432 def analyze(self, data: Union[pd.DataFrame, List[pd.DataFrame]], **kwargs) -> Dict[str, Any]: 433 """ 434 Perform analysis on the data. 435 436 Args: 437 data: Input data to analyze 438 **kwargs: Additional arguments for analysis 439 440 Returns: 441 Dictionary containing analysis results 442 """ 443 pass
Perform analysis on the data.
Args: data: Input data to analyze **kwargs: Additional arguments for analysis
Returns: Dictionary containing analysis results
445 @abstractmethod 446 def visualize(self, data: Union[pd.DataFrame, List[pd.DataFrame]], **kwargs): 447 """ 448 Create visualizations of the data. 449 450 Args: 451 data: Input data to visualize 452 **kwargs: Additional arguments for visualization 453 """ 454 pass
Create visualizations of the data.
Args: data: Input data to visualize **kwargs: Additional arguments for visualization
456 def configure(self, config: Dict[str, Any]): 457 """ 458 Configure the EDA analyzer. 459 460 Args: 461 config: Configuration dictionary 462 """ 463 self.config.update(config)
Configure the EDA analyzer.
Args: config: Configuration dictionary
465 def get_info(self) -> Dict[str, Any]: 466 """ 467 Get information about the EDA analyzer. 468 469 Returns: 470 Dictionary containing EDA analyzer information 471 """ 472 return { 473 'name': self.name, 474 'description': self.description, 475 'config': self.config 476 }
Get information about the EDA analyzer.
Returns: Dictionary containing EDA analyzer information
479class BaseClassificationModel(ABC): 480 """ 481 Base class for all classification models. 482 483 All classification models should inherit from this class and implement the required methods. 484 """ 485 486 def __init__(self, name: str, description: str = ""): 487 """ 488 Initialize the classification model. 489 490 Args: 491 name: Name of the classification model 492 description: Description of the classification model 493 """ 494 self.name = name 495 self.description = description 496 self.model = None 497 self.config = {} 498 self.trained = False 499 500 @abstractmethod 501 def train(self, features: List[Dict], **kwargs): 502 """ 503 Train the classification model. 504 505 Args: 506 features: List of feature dictionaries 507 **kwargs: Additional arguments for training 508 """ 509 pass 510 511 @abstractmethod 512 def predict(self, features: List[Dict], **kwargs) -> np.ndarray: 513 """ 514 Make predictions using the trained model. 515 516 Args: 517 features: List of feature dictionaries 518 **kwargs: Additional arguments for prediction 519 520 Returns: 521 Array of predictions 522 """ 523 pass 524 525 @abstractmethod 526 def evaluate(self, features: List[Dict], **kwargs) -> Dict[str, float]: 527 """ 528 Evaluate the model performance. 529 530 Args: 531 features: List of feature dictionaries 532 **kwargs: Additional arguments for evaluation 533 534 Returns: 535 Dictionary containing evaluation metrics 536 """ 537 pass 538 539 @abstractmethod 540 def save_model(self, filepath: str): 541 """ 542 Save the trained model to a file. 543 544 Args: 545 filepath: Path to save the model 546 """ 547 pass 548 549 @abstractmethod 550 def load_model(self, filepath: str): 551 """ 552 Load a trained model from a file. 553 554 Args: 555 filepath: Path to the saved model 556 """ 557 pass 558 559 def configure(self, config: Dict[str, Any]): 560 """ 561 Configure the classification model. 562 563 Args: 564 config: Configuration dictionary 565 """ 566 self.config.update(config) 567 568 def get_info(self) -> Dict[str, Any]: 569 """ 570 Get information about the classification model. 571 572 Returns: 573 Dictionary containing model information 574 """ 575 return { 576 'name': self.name, 577 'description': self.description, 578 'config': self.config, 579 'trained': self.trained 580 }
Base class for all classification models.
All classification models should inherit from this class and implement the required methods.
486 def __init__(self, name: str, description: str = ""): 487 """ 488 Initialize the classification model. 489 490 Args: 491 name: Name of the classification model 492 description: Description of the classification model 493 """ 494 self.name = name 495 self.description = description 496 self.model = None 497 self.config = {} 498 self.trained = False
Initialize the classification model.
Args: name: Name of the classification model description: Description of the classification model
500 @abstractmethod 501 def train(self, features: List[Dict], **kwargs): 502 """ 503 Train the classification model. 504 505 Args: 506 features: List of feature dictionaries 507 **kwargs: Additional arguments for training 508 """ 509 pass
Train the classification model.
Args: features: List of feature dictionaries **kwargs: Additional arguments for training
511 @abstractmethod 512 def predict(self, features: List[Dict], **kwargs) -> np.ndarray: 513 """ 514 Make predictions using the trained model. 515 516 Args: 517 features: List of feature dictionaries 518 **kwargs: Additional arguments for prediction 519 520 Returns: 521 Array of predictions 522 """ 523 pass
Make predictions using the trained model.
Args: features: List of feature dictionaries **kwargs: Additional arguments for prediction
Returns: Array of predictions
525 @abstractmethod 526 def evaluate(self, features: List[Dict], **kwargs) -> Dict[str, float]: 527 """ 528 Evaluate the model performance. 529 530 Args: 531 features: List of feature dictionaries 532 **kwargs: Additional arguments for evaluation 533 534 Returns: 535 Dictionary containing evaluation metrics 536 """ 537 pass
Evaluate the model performance.
Args: features: List of feature dictionaries **kwargs: Additional arguments for evaluation
Returns: Dictionary containing evaluation metrics
539 @abstractmethod 540 def save_model(self, filepath: str): 541 """ 542 Save the trained model to a file. 543 544 Args: 545 filepath: Path to save the model 546 """ 547 pass
Save the trained model to a file.
Args: filepath: Path to save the model
549 @abstractmethod 550 def load_model(self, filepath: str): 551 """ 552 Load a trained model from a file. 553 554 Args: 555 filepath: Path to the saved model 556 """ 557 pass
Load a trained model from a file.
Args: filepath: Path to the saved model
559 def configure(self, config: Dict[str, Any]): 560 """ 561 Configure the classification model. 562 563 Args: 564 config: Configuration dictionary 565 """ 566 self.config.update(config)
Configure the classification model.
Args: config: Configuration dictionary
568 def get_info(self) -> Dict[str, Any]: 569 """ 570 Get information about the classification model. 571 572 Returns: 573 Dictionary containing model information 574 """ 575 return { 576 'name': self.name, 577 'description': self.description, 578 'config': self.config, 579 'trained': self.trained 580 }
Get information about the classification model.
Returns: Dictionary containing model information
138class DatasetManager(BaseManager): 139 """ 140 Singleton manager for dataset loaders. 141 """ 142 143 def register_dataset(self, name: str, dataset_class: Type[BaseDatasetLoader]): 144 """ 145 Register a dataset loader. 146 147 Args: 148 name: Name to register the dataset under 149 dataset_class: Dataset loader class 150 """ 151 if not issubclass(dataset_class, BaseDatasetLoader): 152 raise ValueError(f"Dataset class must inherit from BaseDatasetLoader") 153 self.register(name, dataset_class) 154 155 def load_dataset(self, name: str, data_dir: str, **kwargs) -> BaseDatasetLoader: 156 """ 157 Load a dataset using the registered loader. 158 159 Args: 160 name: Name of the dataset loader 161 data_dir: Directory containing the dataset 162 **kwargs: Additional arguments for the loader 163 164 Returns: 165 Dataset loader instance with loaded data 166 """ 167 loader = self.create_instance(name, name, f"{name} dataset loader") 168 loader.load_data(data_dir, **kwargs) 169 return loader
Singleton manager for dataset loaders.
143 def register_dataset(self, name: str, dataset_class: Type[BaseDatasetLoader]): 144 """ 145 Register a dataset loader. 146 147 Args: 148 name: Name to register the dataset under 149 dataset_class: Dataset loader class 150 """ 151 if not issubclass(dataset_class, BaseDatasetLoader): 152 raise ValueError(f"Dataset class must inherit from BaseDatasetLoader") 153 self.register(name, dataset_class)
Register a dataset loader.
Args: name: Name to register the dataset under dataset_class: Dataset loader class
155 def load_dataset(self, name: str, data_dir: str, **kwargs) -> BaseDatasetLoader: 156 """ 157 Load a dataset using the registered loader. 158 159 Args: 160 name: Name of the dataset loader 161 data_dir: Directory containing the dataset 162 **kwargs: Additional arguments for the loader 163 164 Returns: 165 Dataset loader instance with loaded data 166 """ 167 loader = self.create_instance(name, name, f"{name} dataset loader") 168 loader.load_data(data_dir, **kwargs) 169 return loader
Load a dataset using the registered loader.
Args: name: Name of the dataset loader data_dir: Directory containing the dataset **kwargs: Additional arguments for the loader
Returns: Dataset loader instance with loaded data
172class FeatureManager(BaseManager): 173 """ 174 Singleton manager for feature extractors. 175 """ 176 177 def register_extractor(self, name: str, extractor_class: Type[BaseFeatureExtractor]): 178 """ 179 Register a feature extractor. 180 181 Args: 182 name: Name to register the extractor under 183 extractor_class: Feature extractor class 184 """ 185 if not issubclass(extractor_class, BaseFeatureExtractor): 186 raise ValueError(f"Extractor class must inherit from BaseFeatureExtractor") 187 self.register(name, extractor_class) 188 189 def extract_features(self, extractor_name: str, windows: List[Dict], fs: int, **kwargs) -> List[Dict]: 190 """ 191 Extract features using the specified extractor. 192 193 Args: 194 extractor_name: Name of the feature extractor 195 windows: List of sliding window dictionaries 196 fs: Sampling frequency 197 **kwargs: Additional arguments for feature extraction 198 199 Returns: 200 List of feature dictionaries 201 """ 202 extractor = self.get_cached_instance(extractor_name, extractor_name, f"{extractor_name} feature extractor") 203 return extractor.extract_features(windows, fs, **kwargs)
Singleton manager for feature extractors.
177 def register_extractor(self, name: str, extractor_class: Type[BaseFeatureExtractor]): 178 """ 179 Register a feature extractor. 180 181 Args: 182 name: Name to register the extractor under 183 extractor_class: Feature extractor class 184 """ 185 if not issubclass(extractor_class, BaseFeatureExtractor): 186 raise ValueError(f"Extractor class must inherit from BaseFeatureExtractor") 187 self.register(name, extractor_class)
Register a feature extractor.
Args: name: Name to register the extractor under extractor_class: Feature extractor class
189 def extract_features(self, extractor_name: str, windows: List[Dict], fs: int, **kwargs) -> List[Dict]: 190 """ 191 Extract features using the specified extractor. 192 193 Args: 194 extractor_name: Name of the feature extractor 195 windows: List of sliding window dictionaries 196 fs: Sampling frequency 197 **kwargs: Additional arguments for feature extraction 198 199 Returns: 200 List of feature dictionaries 201 """ 202 extractor = self.get_cached_instance(extractor_name, extractor_name, f"{extractor_name} feature extractor") 203 return extractor.extract_features(windows, fs, **kwargs)
Extract features using the specified extractor.
Args: extractor_name: Name of the feature extractor windows: List of sliding window dictionaries fs: Sampling frequency **kwargs: Additional arguments for feature extraction
Returns: List of feature dictionaries
206class PreprocessingManager(BaseManager): 207 """ 208 Singleton manager for preprocessors. 209 """ 210 211 def register_preprocessor(self, name: str, preprocessor_class: Type[BasePreprocessor]): 212 """ 213 Register a preprocessor. 214 215 Args: 216 name: Name to register the preprocessor under 217 preprocessor_class: Preprocessor class 218 """ 219 if not issubclass(preprocessor_class, BasePreprocessor): 220 raise ValueError(f"Preprocessor class must inherit from BasePreprocessor") 221 self.register(name, preprocessor_class) 222 223 def preprocess_data(self, preprocessor_name: str, data: Any, **kwargs) -> Any: 224 """ 225 Preprocess data using the specified preprocessor. 226 227 Args: 228 preprocessor_name: Name of the preprocessor 229 data: Input data to preprocess 230 **kwargs: Additional arguments for preprocessing 231 232 Returns: 233 Preprocessed data 234 """ 235 preprocessor = self.get_cached_instance(preprocessor_name, preprocessor_name, f"{preprocessor_name} preprocessor") 236 return preprocessor.fit_transform(data, **kwargs)
Singleton manager for preprocessors.
211 def register_preprocessor(self, name: str, preprocessor_class: Type[BasePreprocessor]): 212 """ 213 Register a preprocessor. 214 215 Args: 216 name: Name to register the preprocessor under 217 preprocessor_class: Preprocessor class 218 """ 219 if not issubclass(preprocessor_class, BasePreprocessor): 220 raise ValueError(f"Preprocessor class must inherit from BasePreprocessor") 221 self.register(name, preprocessor_class)
Register a preprocessor.
Args: name: Name to register the preprocessor under preprocessor_class: Preprocessor class
223 def preprocess_data(self, preprocessor_name: str, data: Any, **kwargs) -> Any: 224 """ 225 Preprocess data using the specified preprocessor. 226 227 Args: 228 preprocessor_name: Name of the preprocessor 229 data: Input data to preprocess 230 **kwargs: Additional arguments for preprocessing 231 232 Returns: 233 Preprocessed data 234 """ 235 preprocessor = self.get_cached_instance(preprocessor_name, preprocessor_name, f"{preprocessor_name} preprocessor") 236 return preprocessor.fit_transform(data, **kwargs)
Preprocess data using the specified preprocessor.
Args: preprocessor_name: Name of the preprocessor data: Input data to preprocess **kwargs: Additional arguments for preprocessing
Returns: Preprocessed data
239class EDAManager(BaseManager): 240 """ 241 Singleton manager for EDA analyzers. 242 """ 243 244 def register_analyzer(self, name: str, analyzer_class: Type[BaseEDAAnalyzer]): 245 """ 246 Register an EDA analyzer. 247 248 Args: 249 name: Name to register the analyzer under 250 analyzer_class: EDA analyzer class 251 """ 252 if not issubclass(analyzer_class, BaseEDAAnalyzer): 253 raise ValueError(f"Analyzer class must inherit from BaseEDAAnalyzer") 254 self.register(name, analyzer_class) 255 256 def analyze_data(self, analyzer_name: str, data: Any, **kwargs) -> Dict[str, Any]: 257 """ 258 Analyze data using the specified analyzer. 259 260 Args: 261 analyzer_name: Name of the EDA analyzer 262 data: Input data to analyze 263 **kwargs: Additional arguments for analysis 264 265 Returns: 266 Analysis results dictionary 267 """ 268 analyzer = self.get_cached_instance(analyzer_name, analyzer_name, f"{analyzer_name} analyzer") 269 return analyzer.analyze(data, **kwargs) 270 271 def visualize_data(self, analyzer_name: str, data: Any, **kwargs): 272 """ 273 Create visualizations using the specified analyzer. 274 275 Args: 276 analyzer_name: Name of the EDA analyzer 277 data: Input data to visualize 278 **kwargs: Additional arguments for visualization 279 """ 280 analyzer = self.get_cached_instance(analyzer_name, analyzer_name, f"{analyzer_name} analyzer") 281 analyzer.visualize(data, **kwargs)
Singleton manager for EDA analyzers.
244 def register_analyzer(self, name: str, analyzer_class: Type[BaseEDAAnalyzer]): 245 """ 246 Register an EDA analyzer. 247 248 Args: 249 name: Name to register the analyzer under 250 analyzer_class: EDA analyzer class 251 """ 252 if not issubclass(analyzer_class, BaseEDAAnalyzer): 253 raise ValueError(f"Analyzer class must inherit from BaseEDAAnalyzer") 254 self.register(name, analyzer_class)
Register an EDA analyzer.
Args: name: Name to register the analyzer under analyzer_class: EDA analyzer class
256 def analyze_data(self, analyzer_name: str, data: Any, **kwargs) -> Dict[str, Any]: 257 """ 258 Analyze data using the specified analyzer. 259 260 Args: 261 analyzer_name: Name of the EDA analyzer 262 data: Input data to analyze 263 **kwargs: Additional arguments for analysis 264 265 Returns: 266 Analysis results dictionary 267 """ 268 analyzer = self.get_cached_instance(analyzer_name, analyzer_name, f"{analyzer_name} analyzer") 269 return analyzer.analyze(data, **kwargs)
Analyze data using the specified analyzer.
Args: analyzer_name: Name of the EDA analyzer data: Input data to analyze **kwargs: Additional arguments for analysis
Returns: Analysis results dictionary
271 def visualize_data(self, analyzer_name: str, data: Any, **kwargs): 272 """ 273 Create visualizations using the specified analyzer. 274 275 Args: 276 analyzer_name: Name of the EDA analyzer 277 data: Input data to visualize 278 **kwargs: Additional arguments for visualization 279 """ 280 analyzer = self.get_cached_instance(analyzer_name, analyzer_name, f"{analyzer_name} analyzer") 281 analyzer.visualize(data, **kwargs)
Create visualizations using the specified analyzer.
Args: analyzer_name: Name of the EDA analyzer data: Input data to visualize **kwargs: Additional arguments for visualization
284class ClassificationManager(BaseManager): 285 """ 286 Singleton manager for classification models. 287 """ 288 289 def register_model(self, name: str, model_class: Type[BaseClassificationModel]): 290 """ 291 Register a classification model. 292 293 Args: 294 name: Name to register the model under 295 model_class: Classification model class 296 """ 297 if not issubclass(model_class, BaseClassificationModel): 298 raise ValueError(f"Model class must inherit from BaseClassificationModel") 299 self.register(name, model_class) 300 301 def train_model(self, model_name: str, features: List[Dict], **kwargs) -> BaseClassificationModel: 302 """ 303 Train a classification model. 304 305 Args: 306 model_name: Name of the classification model 307 features: List of feature dictionaries 308 **kwargs: Additional arguments for training 309 310 Returns: 311 Trained model instance 312 """ 313 model = self.create_instance(model_name, model_name, f"{model_name} classification model") 314 model.train(features, **kwargs) 315 return model 316 317 def predict(self, model_name: str, features: List[Dict], **kwargs) -> Any: 318 """ 319 Make predictions using a trained model. 320 321 Args: 322 model_name: Name of the classification model 323 features: List of feature dictionaries 324 **kwargs: Additional arguments for prediction 325 326 Returns: 327 Predictions array 328 """ 329 model = self.get_cached_instance(model_name, model_name, f"{model_name} classification model") 330 return model.predict(features, **kwargs) 331 332 def evaluate_model(self, model_name: str, features: List[Dict], **kwargs) -> Dict[str, float]: 333 """ 334 Evaluate a classification model. 335 336 Args: 337 model_name: Name of the classification model 338 features: List of feature dictionaries 339 **kwargs: Additional arguments for evaluation 340 341 Returns: 342 Evaluation metrics dictionary 343 """ 344 model = self.get_cached_instance(model_name, model_name, f"{model_name} classification model") 345 return model.evaluate(features, **kwargs)
Singleton manager for classification models.
289 def register_model(self, name: str, model_class: Type[BaseClassificationModel]): 290 """ 291 Register a classification model. 292 293 Args: 294 name: Name to register the model under 295 model_class: Classification model class 296 """ 297 if not issubclass(model_class, BaseClassificationModel): 298 raise ValueError(f"Model class must inherit from BaseClassificationModel") 299 self.register(name, model_class)
Register a classification model.
Args: name: Name to register the model under model_class: Classification model class
301 def train_model(self, model_name: str, features: List[Dict], **kwargs) -> BaseClassificationModel: 302 """ 303 Train a classification model. 304 305 Args: 306 model_name: Name of the classification model 307 features: List of feature dictionaries 308 **kwargs: Additional arguments for training 309 310 Returns: 311 Trained model instance 312 """ 313 model = self.create_instance(model_name, model_name, f"{model_name} classification model") 314 model.train(features, **kwargs) 315 return model
Train a classification model.
Args: model_name: Name of the classification model features: List of feature dictionaries **kwargs: Additional arguments for training
Returns: Trained model instance
317 def predict(self, model_name: str, features: List[Dict], **kwargs) -> Any: 318 """ 319 Make predictions using a trained model. 320 321 Args: 322 model_name: Name of the classification model 323 features: List of feature dictionaries 324 **kwargs: Additional arguments for prediction 325 326 Returns: 327 Predictions array 328 """ 329 model = self.get_cached_instance(model_name, model_name, f"{model_name} classification model") 330 return model.predict(features, **kwargs)
Make predictions using a trained model.
Args: model_name: Name of the classification model features: List of feature dictionaries **kwargs: Additional arguments for prediction
Returns: Predictions array
332 def evaluate_model(self, model_name: str, features: List[Dict], **kwargs) -> Dict[str, float]: 333 """ 334 Evaluate a classification model. 335 336 Args: 337 model_name: Name of the classification model 338 features: List of feature dictionaries 339 **kwargs: Additional arguments for evaluation 340 341 Returns: 342 Evaluation metrics dictionary 343 """ 344 model = self.get_cached_instance(model_name, model_name, f"{model_name} classification model") 345 return model.evaluate(features, **kwargs)
Evaluate a classification model.
Args: model_name: Name of the classification model features: List of feature dictionaries **kwargs: Additional arguments for evaluation
Returns: Evaluation metrics dictionary
18class DaphnetLoader(BaseDatasetLoader): 19 """ 20 Daphnet dataset loader class. 21 22 This class handles loading and processing of the Daphnet dataset for gait analysis. 23 """ 24 25 def __init__(self, max_workers: int = 8): 26 """ 27 Initialize Daphnet loader with concurrent download support. 28 29 Args: 30 max_workers: Maximum number of concurrent download threads (default: 8) 31 """ 32 super().__init__( 33 name="daphnet", 34 description="Daphnet Freezing of Gait Dataset - Contains accelerometer data from subjects with Parkinson's disease", 35 max_workers=max_workers 36 ) 37 self.metadata = { 38 'sensors': ['shank', 'thigh', 'trunk'], 39 'components': ['h_fd', 'v', 'h_l'], # horizontal forward, vertical, horizontal lateral 40 'sampling_frequency': 64, 41 'annotations': { 42 0: 'not_valid', 43 1: 'no_freeze', 44 2: 'freeze' 45 } 46 } 47 48 def load_data(self, data_dir: str, **kwargs) -> Tuple[List[pd.DataFrame], List[str]]: 49 """ 50 Load Daphnet dataset from the specified directory. 51 52 Args: 53 data_dir: Directory to store/find the dataset 54 **kwargs: Additional arguments (unused for Daphnet) 55 56 Returns: 57 Tuple of (data_list, names_list) 58 """ 59 # Download and extract if needed 60 download_dataset("daphnet", data_dir) 61 extract_dataset("daphnet", data_dir) 62 63 file_path = os.path.join(data_dir, "dataset_fog_release/dataset") 64 daphnet_data = [] 65 daphnet_names = [] 66 67 # Load all subject files 68 for file in sorted(glob(os.path.join(file_path, "S*.txt"))): 69 # Extract filename from path 70 filename = os.path.basename(file) 71 daphnet_names.append(filename) 72 73 # Load CSV with proper column names 74 column_names = [ 75 "time", "shank_h_fd", "shank_v", "shank_h_l", 76 "thigh_h_fd", "thigh_v", "thigh_h_l", 77 "trunk_h_fd", "trunk_v", "trunk_h_l", "annotations" 78 ] 79 80 df = pd.read_csv(file, sep=" ", names=column_names) 81 82 # Set time as index 83 df = df.set_index("time") 84 85 # Calculate magnitude for each sensor 86 df["thigh"] = np.sqrt(df["thigh_h_l"]**2 + df["thigh_v"]**2 + df["thigh_h_fd"]**2) 87 df["shank"] = np.sqrt(df["shank_h_l"]**2 + df["shank_v"]**2 + df["shank_h_fd"]**2) 88 df["trunk"] = np.sqrt(df["trunk_h_l"]**2 + df["trunk_v"]**2 + df["trunk_h_fd"]**2) 89 90 # Reorder columns for consistency 91 df = df[["shank", "shank_h_fd", "shank_v", "shank_h_l", 92 "thigh", "thigh_h_fd", "thigh_v", "thigh_h_l", 93 "trunk", "trunk_h_fd", "trunk_v", "trunk_h_l", "annotations"]] 94 95 daphnet_data.append(df) 96 97 # Store loaded data 98 self.data = daphnet_data 99 self.names = daphnet_names 100 101 return daphnet_data, daphnet_names 102 103 def create_sliding_windows(self, data: List[pd.DataFrame], names: List[str], 104 window_size: int = 192, step_size: int = 32) -> List[Dict]: 105 """ 106 Create sliding windows from the Daphnet dataset. 107 108 Args: 109 data: List of DataFrames containing Daphnet data 110 names: List of names corresponding to the data 111 window_size: Size of the sliding window (default: 192) 112 step_size: Step size for the sliding window (default: 32) 113 114 Returns: 115 List of dictionaries containing sliding windows for each DataFrame 116 """ 117 windows_data = [] 118 119 for idx, df in enumerate(data): 120 # Filter out invalid data (annotations == 0) 121 df_filtered = df[df.annotations > 0] 122 123 if df_filtered.empty: 124 continue 125 126 windows = [] 127 processed_columns = set() 128 129 # Process each sensor column 130 for col in df_filtered.columns: 131 if col != "annotations" and col not in processed_columns: 132 window_data = sliding_window(df_filtered[col], window_size, step_size) 133 windows.append({"name": col, "data": window_data}) 134 processed_columns.add(col) 135 136 # Include annotations separately 137 annotations_window = sliding_window(df_filtered["annotations"], window_size, step_size) 138 windows.append({"name": "annotations", "data": annotations_window}) 139 140 windows_data.append({"name": names[idx], "windows": windows}) 141 142 return windows_data 143 144 def get_supported_formats(self) -> List[str]: 145 """ 146 Get list of supported file formats for Daphnet dataset. 147 148 Returns: 149 List of supported file extensions 150 """ 151 return ['.txt'] 152 153 def get_sensor_info(self) -> Dict[str, List[str]]: 154 """ 155 Get information about sensors in the dataset. 156 157 Returns: 158 Dictionary containing sensor information 159 """ 160 return { 161 'sensors': self.metadata['sensors'], 162 'components': self.metadata['components'], 163 'sampling_frequency': self.metadata['sampling_frequency'] 164 } 165 166 def get_annotation_info(self) -> Dict[int, str]: 167 """ 168 Get information about annotations in the dataset. 169 170 Returns: 171 Dictionary mapping annotation values to descriptions 172 """ 173 return self.metadata['annotations']
Daphnet dataset loader class.
This class handles loading and processing of the Daphnet dataset for gait analysis.
25 def __init__(self, max_workers: int = 8): 26 """ 27 Initialize Daphnet loader with concurrent download support. 28 29 Args: 30 max_workers: Maximum number of concurrent download threads (default: 8) 31 """ 32 super().__init__( 33 name="daphnet", 34 description="Daphnet Freezing of Gait Dataset - Contains accelerometer data from subjects with Parkinson's disease", 35 max_workers=max_workers 36 ) 37 self.metadata = { 38 'sensors': ['shank', 'thigh', 'trunk'], 39 'components': ['h_fd', 'v', 'h_l'], # horizontal forward, vertical, horizontal lateral 40 'sampling_frequency': 64, 41 'annotations': { 42 0: 'not_valid', 43 1: 'no_freeze', 44 2: 'freeze' 45 } 46 }
Initialize Daphnet loader with concurrent download support.
Args: max_workers: Maximum number of concurrent download threads (default: 8)
48 def load_data(self, data_dir: str, **kwargs) -> Tuple[List[pd.DataFrame], List[str]]: 49 """ 50 Load Daphnet dataset from the specified directory. 51 52 Args: 53 data_dir: Directory to store/find the dataset 54 **kwargs: Additional arguments (unused for Daphnet) 55 56 Returns: 57 Tuple of (data_list, names_list) 58 """ 59 # Download and extract if needed 60 download_dataset("daphnet", data_dir) 61 extract_dataset("daphnet", data_dir) 62 63 file_path = os.path.join(data_dir, "dataset_fog_release/dataset") 64 daphnet_data = [] 65 daphnet_names = [] 66 67 # Load all subject files 68 for file in sorted(glob(os.path.join(file_path, "S*.txt"))): 69 # Extract filename from path 70 filename = os.path.basename(file) 71 daphnet_names.append(filename) 72 73 # Load CSV with proper column names 74 column_names = [ 75 "time", "shank_h_fd", "shank_v", "shank_h_l", 76 "thigh_h_fd", "thigh_v", "thigh_h_l", 77 "trunk_h_fd", "trunk_v", "trunk_h_l", "annotations" 78 ] 79 80 df = pd.read_csv(file, sep=" ", names=column_names) 81 82 # Set time as index 83 df = df.set_index("time") 84 85 # Calculate magnitude for each sensor 86 df["thigh"] = np.sqrt(df["thigh_h_l"]**2 + df["thigh_v"]**2 + df["thigh_h_fd"]**2) 87 df["shank"] = np.sqrt(df["shank_h_l"]**2 + df["shank_v"]**2 + df["shank_h_fd"]**2) 88 df["trunk"] = np.sqrt(df["trunk_h_l"]**2 + df["trunk_v"]**2 + df["trunk_h_fd"]**2) 89 90 # Reorder columns for consistency 91 df = df[["shank", "shank_h_fd", "shank_v", "shank_h_l", 92 "thigh", "thigh_h_fd", "thigh_v", "thigh_h_l", 93 "trunk", "trunk_h_fd", "trunk_v", "trunk_h_l", "annotations"]] 94 95 daphnet_data.append(df) 96 97 # Store loaded data 98 self.data = daphnet_data 99 self.names = daphnet_names 100 101 return daphnet_data, daphnet_names
Load Daphnet dataset from the specified directory.
Args: data_dir: Directory to store/find the dataset **kwargs: Additional arguments (unused for Daphnet)
Returns: Tuple of (data_list, names_list)
103 def create_sliding_windows(self, data: List[pd.DataFrame], names: List[str], 104 window_size: int = 192, step_size: int = 32) -> List[Dict]: 105 """ 106 Create sliding windows from the Daphnet dataset. 107 108 Args: 109 data: List of DataFrames containing Daphnet data 110 names: List of names corresponding to the data 111 window_size: Size of the sliding window (default: 192) 112 step_size: Step size for the sliding window (default: 32) 113 114 Returns: 115 List of dictionaries containing sliding windows for each DataFrame 116 """ 117 windows_data = [] 118 119 for idx, df in enumerate(data): 120 # Filter out invalid data (annotations == 0) 121 df_filtered = df[df.annotations > 0] 122 123 if df_filtered.empty: 124 continue 125 126 windows = [] 127 processed_columns = set() 128 129 # Process each sensor column 130 for col in df_filtered.columns: 131 if col != "annotations" and col not in processed_columns: 132 window_data = sliding_window(df_filtered[col], window_size, step_size) 133 windows.append({"name": col, "data": window_data}) 134 processed_columns.add(col) 135 136 # Include annotations separately 137 annotations_window = sliding_window(df_filtered["annotations"], window_size, step_size) 138 windows.append({"name": "annotations", "data": annotations_window}) 139 140 windows_data.append({"name": names[idx], "windows": windows}) 141 142 return windows_data
Create sliding windows from the Daphnet dataset.
Args: data: List of DataFrames containing Daphnet data names: List of names corresponding to the data window_size: Size of the sliding window (default: 192) step_size: Step size for the sliding window (default: 32)
Returns: List of dictionaries containing sliding windows for each DataFrame
144 def get_supported_formats(self) -> List[str]: 145 """ 146 Get list of supported file formats for Daphnet dataset. 147 148 Returns: 149 List of supported file extensions 150 """ 151 return ['.txt']
Get list of supported file formats for Daphnet dataset.
Returns: List of supported file extensions
153 def get_sensor_info(self) -> Dict[str, List[str]]: 154 """ 155 Get information about sensors in the dataset. 156 157 Returns: 158 Dictionary containing sensor information 159 """ 160 return { 161 'sensors': self.metadata['sensors'], 162 'components': self.metadata['components'], 163 'sampling_frequency': self.metadata['sampling_frequency'] 164 }
Get information about sensors in the dataset.
Returns: Dictionary containing sensor information
166 def get_annotation_info(self) -> Dict[int, str]: 167 """ 168 Get information about annotations in the dataset. 169 170 Returns: 171 Dictionary mapping annotation values to descriptions 172 """ 173 return self.metadata['annotations']
Get information about annotations in the dataset.
Returns: Dictionary mapping annotation values to descriptions
17class MobiFallLoader(BaseDatasetLoader): 18 """ 19 MobiFall dataset loader class. 20 21 This class handles loading and processing of the MobiFall dataset for gait analysis. 22 """ 23 24 def __init__(self, max_workers: int = 8): 25 """ 26 Initialize MobiFall loader with concurrent download support. 27 28 Args: 29 max_workers: Maximum number of concurrent download threads (default: 8) 30 """ 31 super().__init__( 32 name="mobifall", 33 description="MobiFall Dataset - Contains accelerometer and gyroscope data for fall detection", 34 max_workers=max_workers 35 ) 36 self.metadata = { 37 'sensors': ['accelerometer', 'gyroscope'], 38 'components': ['x', 'y', 'z'], 39 'sampling_frequency': 100, # Typical for MobiFall 40 'activities': ['ADL', 'FALL'] # Activities of Daily Living and Falls 41 } 42 43 def load_data(self, data_dir: str, **kwargs) -> Tuple[List[pd.DataFrame], List[str]]: 44 """ 45 Load MobiFall dataset from the specified directory. 46 47 Args: 48 data_dir: Directory to store/find the dataset 49 **kwargs: Additional arguments (unused for MobiFall) 50 51 Returns: 52 Tuple of (data_list, names_list) 53 """ 54 # TODO: Implement MobiFall data loading 55 # This is a placeholder implementation 56 print("MobiFall data loading is not yet implemented") 57 return [], [] 58 59 def create_sliding_windows(self, data: List[pd.DataFrame], names: List[str], 60 window_size: int = 192, step_size: int = 32) -> List[Dict]: 61 """ 62 Create sliding windows from the MobiFall dataset. 63 64 Args: 65 data: List of DataFrames containing MobiFall data 66 names: List of names corresponding to the data 67 window_size: Size of the sliding window (default: 192) 68 step_size: Step size for the sliding window (default: 32) 69 70 Returns: 71 List of dictionaries containing sliding windows for each DataFrame 72 """ 73 # TODO: Implement MobiFall sliding window creation 74 # This is a placeholder implementation 75 print("MobiFall sliding window creation is not yet implemented") 76 return [] 77 78 def get_supported_formats(self) -> List[str]: 79 """ 80 Get list of supported file formats for MobiFall dataset. 81 82 Returns: 83 List of supported file extensions 84 """ 85 return ['.csv', '.txt'] 86 87 def get_sensor_info(self) -> Dict[str, List[str]]: 88 """ 89 Get information about sensors in the dataset. 90 91 Returns: 92 Dictionary containing sensor information 93 """ 94 return { 95 'sensors': self.metadata['sensors'], 96 'components': self.metadata['components'], 97 'sampling_frequency': self.metadata['sampling_frequency'] 98 } 99 100 def get_activity_info(self) -> List[str]: 101 """ 102 Get information about activities in the dataset. 103 104 Returns: 105 List of activity types 106 """ 107 return self.metadata['activities']
MobiFall dataset loader class.
This class handles loading and processing of the MobiFall dataset for gait analysis.
24 def __init__(self, max_workers: int = 8): 25 """ 26 Initialize MobiFall loader with concurrent download support. 27 28 Args: 29 max_workers: Maximum number of concurrent download threads (default: 8) 30 """ 31 super().__init__( 32 name="mobifall", 33 description="MobiFall Dataset - Contains accelerometer and gyroscope data for fall detection", 34 max_workers=max_workers 35 ) 36 self.metadata = { 37 'sensors': ['accelerometer', 'gyroscope'], 38 'components': ['x', 'y', 'z'], 39 'sampling_frequency': 100, # Typical for MobiFall 40 'activities': ['ADL', 'FALL'] # Activities of Daily Living and Falls 41 }
Initialize MobiFall loader with concurrent download support.
Args: max_workers: Maximum number of concurrent download threads (default: 8)
43 def load_data(self, data_dir: str, **kwargs) -> Tuple[List[pd.DataFrame], List[str]]: 44 """ 45 Load MobiFall dataset from the specified directory. 46 47 Args: 48 data_dir: Directory to store/find the dataset 49 **kwargs: Additional arguments (unused for MobiFall) 50 51 Returns: 52 Tuple of (data_list, names_list) 53 """ 54 # TODO: Implement MobiFall data loading 55 # This is a placeholder implementation 56 print("MobiFall data loading is not yet implemented") 57 return [], []
Load MobiFall dataset from the specified directory.
Args: data_dir: Directory to store/find the dataset **kwargs: Additional arguments (unused for MobiFall)
Returns: Tuple of (data_list, names_list)
59 def create_sliding_windows(self, data: List[pd.DataFrame], names: List[str], 60 window_size: int = 192, step_size: int = 32) -> List[Dict]: 61 """ 62 Create sliding windows from the MobiFall dataset. 63 64 Args: 65 data: List of DataFrames containing MobiFall data 66 names: List of names corresponding to the data 67 window_size: Size of the sliding window (default: 192) 68 step_size: Step size for the sliding window (default: 32) 69 70 Returns: 71 List of dictionaries containing sliding windows for each DataFrame 72 """ 73 # TODO: Implement MobiFall sliding window creation 74 # This is a placeholder implementation 75 print("MobiFall sliding window creation is not yet implemented") 76 return []
Create sliding windows from the MobiFall dataset.
Args: data: List of DataFrames containing MobiFall data names: List of names corresponding to the data window_size: Size of the sliding window (default: 192) step_size: Step size for the sliding window (default: 32)
Returns: List of dictionaries containing sliding windows for each DataFrame
78 def get_supported_formats(self) -> List[str]: 79 """ 80 Get list of supported file formats for MobiFall dataset. 81 82 Returns: 83 List of supported file extensions 84 """ 85 return ['.csv', '.txt']
Get list of supported file formats for MobiFall dataset.
Returns: List of supported file extensions
87 def get_sensor_info(self) -> Dict[str, List[str]]: 88 """ 89 Get information about sensors in the dataset. 90 91 Returns: 92 Dictionary containing sensor information 93 """ 94 return { 95 'sensors': self.metadata['sensors'], 96 'components': self.metadata['components'], 97 'sampling_frequency': self.metadata['sampling_frequency'] 98 }
Get information about sensors in the dataset.
Returns: Dictionary containing sensor information
100 def get_activity_info(self) -> List[str]: 101 """ 102 Get information about activities in the dataset. 103 104 Returns: 105 List of activity types 106 """ 107 return self.metadata['activities']
Get information about activities in the dataset.
Returns: List of activity types
17class ArduousLoader(BaseDatasetLoader): 18 """ 19 Arduous dataset loader class. 20 21 This class handles loading and processing of the Arduous dataset for gait analysis. 22 """ 23 24 def __init__(self, max_workers: int = 8): 25 """ 26 Initialize Arduous loader with concurrent download support. 27 28 Args: 29 max_workers: Maximum number of concurrent download threads (default: 8) 30 """ 31 super().__init__( 32 name="arduous", 33 description="Arduous Dataset - Contains multi-sensor wearable data for daily activity recognition", 34 max_workers=max_workers 35 ) 36 self.metadata = { 37 'sensors': ['accelerometer', 'gyroscope', 'magnetometer'], 38 'components': ['x', 'y', 'z'], 39 'sampling_frequency': 50, # Typical for Arduous 40 'activities': ['walking', 'running', 'sitting', 'standing', 'lying'] 41 } 42 43 def load_data(self, data_dir: str, **kwargs) -> Tuple[List[pd.DataFrame], List[str]]: 44 """ 45 Load Arduous dataset from the specified directory. 46 47 Args: 48 data_dir: Directory to store/find the dataset 49 **kwargs: Additional arguments (unused for Arduous) 50 51 Returns: 52 Tuple of (data_list, names_list) 53 """ 54 # TODO: Implement Arduous data loading 55 # This is a placeholder implementation 56 print("Arduous data loading is not yet implemented") 57 return [], [] 58 59 def create_sliding_windows(self, data: List[pd.DataFrame], names: List[str], 60 window_size: int = 192, step_size: int = 32) -> List[Dict]: 61 """ 62 Create sliding windows from the Arduous dataset. 63 64 Args: 65 data: List of DataFrames containing Arduous data 66 names: List of names corresponding to the data 67 window_size: Size of the sliding window (default: 192) 68 step_size: Step size for the sliding window (default: 32) 69 70 Returns: 71 List of dictionaries containing sliding windows for each DataFrame 72 """ 73 # TODO: Implement Arduous sliding window creation 74 # This is a placeholder implementation 75 print("Arduous sliding window creation is not yet implemented") 76 return [] 77 78 def get_supported_formats(self) -> List[str]: 79 """ 80 Get list of supported file formats for Arduous dataset. 81 82 Returns: 83 List of supported file extensions 84 """ 85 return ['.csv', '.txt'] 86 87 def get_sensor_info(self) -> Dict[str, List[str]]: 88 """ 89 Get information about sensors in the dataset. 90 91 Returns: 92 Dictionary containing sensor information 93 """ 94 return { 95 'sensors': self.metadata['sensors'], 96 'components': self.metadata['components'], 97 'sampling_frequency': self.metadata['sampling_frequency'] 98 } 99 100 def get_activity_info(self) -> List[str]: 101 """ 102 Get information about activities in the dataset. 103 104 Returns: 105 List of activity types 106 """ 107 return self.metadata['activities']
Arduous dataset loader class.
This class handles loading and processing of the Arduous dataset for gait analysis.
24 def __init__(self, max_workers: int = 8): 25 """ 26 Initialize Arduous loader with concurrent download support. 27 28 Args: 29 max_workers: Maximum number of concurrent download threads (default: 8) 30 """ 31 super().__init__( 32 name="arduous", 33 description="Arduous Dataset - Contains multi-sensor wearable data for daily activity recognition", 34 max_workers=max_workers 35 ) 36 self.metadata = { 37 'sensors': ['accelerometer', 'gyroscope', 'magnetometer'], 38 'components': ['x', 'y', 'z'], 39 'sampling_frequency': 50, # Typical for Arduous 40 'activities': ['walking', 'running', 'sitting', 'standing', 'lying'] 41 }
Initialize Arduous loader with concurrent download support.
Args: max_workers: Maximum number of concurrent download threads (default: 8)
43 def load_data(self, data_dir: str, **kwargs) -> Tuple[List[pd.DataFrame], List[str]]: 44 """ 45 Load Arduous dataset from the specified directory. 46 47 Args: 48 data_dir: Directory to store/find the dataset 49 **kwargs: Additional arguments (unused for Arduous) 50 51 Returns: 52 Tuple of (data_list, names_list) 53 """ 54 # TODO: Implement Arduous data loading 55 # This is a placeholder implementation 56 print("Arduous data loading is not yet implemented") 57 return [], []
Load Arduous dataset from the specified directory.
Args: data_dir: Directory to store/find the dataset **kwargs: Additional arguments (unused for Arduous)
Returns: Tuple of (data_list, names_list)
59 def create_sliding_windows(self, data: List[pd.DataFrame], names: List[str], 60 window_size: int = 192, step_size: int = 32) -> List[Dict]: 61 """ 62 Create sliding windows from the Arduous dataset. 63 64 Args: 65 data: List of DataFrames containing Arduous data 66 names: List of names corresponding to the data 67 window_size: Size of the sliding window (default: 192) 68 step_size: Step size for the sliding window (default: 32) 69 70 Returns: 71 List of dictionaries containing sliding windows for each DataFrame 72 """ 73 # TODO: Implement Arduous sliding window creation 74 # This is a placeholder implementation 75 print("Arduous sliding window creation is not yet implemented") 76 return []
Create sliding windows from the Arduous dataset.
Args: data: List of DataFrames containing Arduous data names: List of names corresponding to the data window_size: Size of the sliding window (default: 192) step_size: Step size for the sliding window (default: 32)
Returns: List of dictionaries containing sliding windows for each DataFrame
78 def get_supported_formats(self) -> List[str]: 79 """ 80 Get list of supported file formats for Arduous dataset. 81 82 Returns: 83 List of supported file extensions 84 """ 85 return ['.csv', '.txt']
Get list of supported file formats for Arduous dataset.
Returns: List of supported file extensions
87 def get_sensor_info(self) -> Dict[str, List[str]]: 88 """ 89 Get information about sensors in the dataset. 90 91 Returns: 92 Dictionary containing sensor information 93 """ 94 return { 95 'sensors': self.metadata['sensors'], 96 'components': self.metadata['components'], 97 'sampling_frequency': self.metadata['sampling_frequency'] 98 }
Get information about sensors in the dataset.
Returns: Dictionary containing sensor information
100 def get_activity_info(self) -> List[str]: 101 """ 102 Get information about activities in the dataset. 103 104 Returns: 105 List of activity types 106 """ 107 return self.metadata['activities']
Get information about activities in the dataset.
Returns: List of activity types
25class PhysioNetLoader(BaseDatasetLoader): 26 """ 27 PhysioNet VGRF dataset loader class. 28 29 This class handles loading and processing of the PhysioNet Gait in Parkinson's Disease dataset. 30 The dataset contains vertical ground reaction force (VGRF) data from subjects with Parkinson's 31 disease and healthy controls. 32 33 Features concurrent downloading for efficient data retrieval. 34 """ 35 36 def __init__(self, max_workers: int = 8): 37 """ 38 Initialize PhysioNet loader with concurrent download support. 39 40 Args: 41 max_workers: Maximum number of concurrent download threads (default: 8) 42 """ 43 super().__init__( 44 name="physionet", 45 description="PhysioNet Gait in Parkinson's Disease Dataset - Contains VGRF data from subjects with Parkinson's disease and healthy controls", 46 max_workers=max_workers 47 ) 48 self.metadata = { 49 'sensors': ['VGRF_L1', 'VGRF_L2', 'VGRF_L3', 'VGRF_L4', 'VGRF_L5', 'VGRF_L6', 'VGRF_L7', 'VGRF_L8', 50 'VGRF_R1', 'VGRF_R2', 'VGRF_R3', 'VGRF_R4', 'VGRF_R5', 'VGRF_R6', 'VGRF_R7', 'VGRF_R8'], 51 'sampling_frequency': 100, # 100 Hz sampling frequency 52 'subjects': { 53 'Co': 'Control subjects', 54 'Pt': 'Parkinson\'s disease patients' 55 }, 56 'window_size': 600, # 6 seconds at 100 Hz 57 'url': 'https://physionet.org/files/gaitpdb/1.0.0/' 58 } 59 self.labels = [] 60 self.subject_types = [] 61 62 def _download_physionet_data(self, data_dir: str) -> str: 63 """ 64 Download PhysioNet dataset if not already present using concurrent downloads. 65 66 This method uses multi-threaded downloading to significantly speed up the 67 download process for the 100+ files in the PhysioNet dataset. 68 69 Args: 70 data_dir: Directory to store the dataset 71 72 Returns: 73 Path to the downloaded/existing dataset directory 74 """ 75 dataset_path = os.path.join(data_dir, "physionet_gaitpdb") 76 77 if os.path.exists(dataset_path) and len(os.listdir(dataset_path)) > 0: 78 print(f"PhysioNet dataset already exists at: {dataset_path}") 79 return dataset_path 80 81 os.makedirs(dataset_path, exist_ok=True) 82 83 # Download the dataset files 84 base_url = "https://physionet.org/files/gaitpdb/1.0.0/" 85 86 # Get list of files (basic file names based on the reference) 87 file_patterns = [ 88 # Control subjects - Ga prefix 89 *[f"GaCo{i:02d}_{j:02d}.txt" for i in range(1, 18) for j in range(1, 3)], 90 "GaCo22_01.txt", "GaCo22_10.txt", 91 92 # Parkinson's patients - Ga prefix 93 *[f"GaPt{i:02d}_{j:02d}.txt" for i in range(3, 10) for j in range(1, 3)], 94 *[f"GaPt{i:02d}_{j:02d}.txt" for i in range(12, 34) for j in range(1, 3)], 95 *[f"GaPt{i:02d}_10.txt" for i in range(13, 34)], 96 97 # Control subjects - Ju prefix 98 *[f"JuCo{i:02d}_01.txt" for i in range(1, 27)], 99 100 # Parkinson's patients - Ju prefix 101 *[f"JuPt{i:02d}_{j:02d}.txt" for i in range(1, 30) for j in range(1, 8)], 102 103 # Control subjects - Si prefix 104 *[f"SiCo{i:02d}_01.txt" for i in range(1, 31)], 105 106 # Parkinson's patients - Si prefix 107 *[f"SiPt{i:02d}_01.txt" for i in range(2, 41)] 108 ] 109 110 # Prepare download tasks for concurrent execution 111 download_tasks = [ 112 { 113 'url': base_url + filename, 114 'dest_path': os.path.join(dataset_path, filename) 115 } 116 for filename in file_patterns 117 ] 118 119 print(f"Downloading PhysioNet dataset to {dataset_path} using {self.max_workers} threads") 120 121 # Use concurrent downloading from base class 122 results = self.download_files_concurrent( 123 download_tasks, 124 show_progress=True, 125 desc="Downloading PhysioNet files" 126 ) 127 128 # Print summary 129 print(f"\nDownload Summary:") 130 print(f" Total files: {results['total']}") 131 print(f" Successfully downloaded: {results['success']}") 132 print(f" Already existed (skipped): {results['skipped']}") 133 print(f" Failed: {results['failed']}") 134 135 if results['failed'] > 0 and len(results['failed_downloads']) > 0: 136 print(f"\nFailed downloads (showing first 10):") 137 for failed in results['failed_downloads'][:10]: 138 print(f" - {os.path.basename(failed['dest_path'])}: {failed['error']}") 139 if len(results['failed_downloads']) > 10: 140 print(f" ... and {len(results['failed_downloads']) - 10} more failures") 141 142 return dataset_path 143 144 def load_data(self, data_dir: str, **kwargs) -> Tuple[List[pd.DataFrame], List[str]]: 145 """ 146 Load PhysioNet VGRF dataset from the specified directory. 147 148 Args: 149 data_dir: Directory to store/find the dataset 150 **kwargs: Additional arguments (unused for PhysioNet) 151 152 Returns: 153 Tuple of (data_list, names_list) 154 """ 155 # Download dataset if needed 156 dataset_path = self._download_physionet_data(data_dir) 157 158 physionet_data = [] 159 physionet_names = [] 160 self.labels = [] 161 self.subject_types = [] 162 163 # Load all available files 164 for filepath in sorted(glob(os.path.join(dataset_path, "Ga*.txt"))): 165 filename = os.path.basename(filepath) 166 167 # Extract subject type from filename 168 if 'Co' in filename: 169 subject_type = 'Control' 170 label = 'Co' 171 elif 'Pt' in filename: 172 subject_type = 'Patient' 173 label = 'Pt' 174 else: 175 continue # Skip files that don't match expected pattern 176 177 try: 178 # Read the file - PhysioNet files are tab-delimited with variable columns 179 # Column 0: time, Columns 1-16: VGRF sensors, additional columns may exist 180 df = pd.read_csv(filepath, delimiter='\t', header=None) 181 182 # Handle variable number of columns 183 n_cols = min(df.shape[1], 19) # Limit to 19 columns max 184 df = df.iloc[:, :n_cols] 185 186 # Create column names 187 col_names = ['time'] 188 for i in range(1, n_cols): 189 if i <= 8: 190 col_names.append(f'VGRF_L{i}') 191 elif i <= 16: 192 col_names.append(f'VGRF_R{i-8}') 193 else: 194 col_names.append(f'sensor_{i}') 195 196 df.columns = col_names 197 198 # Set time as index 199 df = df.set_index('time') 200 201 # Add subject metadata 202 df['subject_type'] = subject_type 203 df['label'] = label 204 205 physionet_data.append(df) 206 physionet_names.append(filename) 207 self.labels.append(label) 208 self.subject_types.append(subject_type) 209 210 except Exception as e: 211 print(f"Error loading {filename}: {e}") 212 continue 213 214 # Store loaded data 215 self.data = physionet_data 216 self.names = physionet_names 217 218 print(f"Loaded {len(physionet_data)} PhysioNet files") 219 print(f"Subject distribution: {dict(zip(*np.unique(self.subject_types, return_counts=True)))}") 220 221 return physionet_data, physionet_names 222 223 def create_sliding_windows(self, data: List[pd.DataFrame], names: List[str], 224 window_size: int = 600, step_size: int = 100) -> List[Dict]: 225 """ 226 Create sliding windows from the PhysioNet dataset. 227 228 Args: 229 data: List of DataFrames containing PhysioNet data 230 names: List of names corresponding to the data 231 window_size: Size of the sliding window (default: 600 for 6 seconds at 100Hz) 232 step_size: Step size for the sliding window (default: 100) 233 234 Returns: 235 List of dictionaries containing sliding windows for each DataFrame 236 """ 237 windows_data = [] 238 239 for idx, df in enumerate(data): 240 # Remove metadata columns for windowing 241 sensor_columns = [col for col in df.columns if col.startswith('VGRF_') or col.startswith('sensor_')] 242 df_sensors = df[sensor_columns] 243 244 if df_sensors.empty or len(df_sensors) < window_size: 245 continue 246 247 windows = [] 248 249 # Create windows for each sensor 250 for col in sensor_columns: 251 try: 252 window_data = sliding_window(df_sensors[col].values, window_size, step_size) 253 windows.append({"name": col, "data": window_data}) 254 except Exception as e: 255 print(f"Error creating windows for {col} in {names[idx]}: {e}") 256 continue 257 258 if windows: 259 windows_data.append({ 260 "name": names[idx], 261 "windows": windows, 262 "metadata": { 263 "subject_type": df['subject_type'].iloc[0] if 'subject_type' in df.columns else 'Unknown', 264 "label": df['label'].iloc[0] if 'label' in df.columns else 'Unknown', 265 "window_size": window_size, 266 "step_size": step_size, 267 "num_windows": len(windows[0]["data"]) if windows else 0 268 } 269 }) 270 271 return windows_data 272 273 def get_supported_formats(self) -> List[str]: 274 """ 275 Get list of supported file formats for PhysioNet dataset. 276 277 Returns: 278 List of supported file extensions 279 """ 280 return ['.txt'] 281 282 def get_sensor_info(self) -> Dict[str, List[str]]: 283 """ 284 Get information about sensors in the dataset. 285 286 Returns: 287 Dictionary containing sensor information 288 """ 289 return { 290 'sensors': self.metadata['sensors'], 291 'sampling_frequency': self.metadata['sampling_frequency'], 292 'window_size': self.metadata['window_size'] 293 } 294 295 def get_subject_info(self) -> Dict[str, str]: 296 """ 297 Get information about subjects in the dataset. 298 299 Returns: 300 Dictionary containing subject information 301 """ 302 return self.metadata['subjects'] 303 304 def get_labels(self) -> List[str]: 305 """ 306 Get labels for loaded data. 307 308 Returns: 309 List of labels corresponding to loaded data 310 """ 311 return self.labels 312 313 def filter_by_subject_type(self, subject_type: str) -> Tuple[List[pd.DataFrame], List[str]]: 314 """ 315 Filter loaded data by subject type. 316 317 Args: 318 subject_type: 'Control' or 'Patient' 319 320 Returns: 321 Tuple of (filtered_data, filtered_names) 322 """ 323 if not self.data: 324 raise ValueError("No data loaded. Call load_data() first.") 325 326 filtered_data = [] 327 filtered_names = [] 328 329 for i, df in enumerate(self.data): 330 if df['subject_type'].iloc[0] == subject_type: 331 filtered_data.append(df) 332 filtered_names.append(self.names[i]) 333 334 return filtered_data, filtered_names
PhysioNet VGRF dataset loader class.
This class handles loading and processing of the PhysioNet Gait in Parkinson's Disease dataset. The dataset contains vertical ground reaction force (VGRF) data from subjects with Parkinson's disease and healthy controls.
Features concurrent downloading for efficient data retrieval.
36 def __init__(self, max_workers: int = 8): 37 """ 38 Initialize PhysioNet loader with concurrent download support. 39 40 Args: 41 max_workers: Maximum number of concurrent download threads (default: 8) 42 """ 43 super().__init__( 44 name="physionet", 45 description="PhysioNet Gait in Parkinson's Disease Dataset - Contains VGRF data from subjects with Parkinson's disease and healthy controls", 46 max_workers=max_workers 47 ) 48 self.metadata = { 49 'sensors': ['VGRF_L1', 'VGRF_L2', 'VGRF_L3', 'VGRF_L4', 'VGRF_L5', 'VGRF_L6', 'VGRF_L7', 'VGRF_L8', 50 'VGRF_R1', 'VGRF_R2', 'VGRF_R3', 'VGRF_R4', 'VGRF_R5', 'VGRF_R6', 'VGRF_R7', 'VGRF_R8'], 51 'sampling_frequency': 100, # 100 Hz sampling frequency 52 'subjects': { 53 'Co': 'Control subjects', 54 'Pt': 'Parkinson\'s disease patients' 55 }, 56 'window_size': 600, # 6 seconds at 100 Hz 57 'url': 'https://physionet.org/files/gaitpdb/1.0.0/' 58 } 59 self.labels = [] 60 self.subject_types = []
Initialize PhysioNet loader with concurrent download support.
Args: max_workers: Maximum number of concurrent download threads (default: 8)
144 def load_data(self, data_dir: str, **kwargs) -> Tuple[List[pd.DataFrame], List[str]]: 145 """ 146 Load PhysioNet VGRF dataset from the specified directory. 147 148 Args: 149 data_dir: Directory to store/find the dataset 150 **kwargs: Additional arguments (unused for PhysioNet) 151 152 Returns: 153 Tuple of (data_list, names_list) 154 """ 155 # Download dataset if needed 156 dataset_path = self._download_physionet_data(data_dir) 157 158 physionet_data = [] 159 physionet_names = [] 160 self.labels = [] 161 self.subject_types = [] 162 163 # Load all available files 164 for filepath in sorted(glob(os.path.join(dataset_path, "Ga*.txt"))): 165 filename = os.path.basename(filepath) 166 167 # Extract subject type from filename 168 if 'Co' in filename: 169 subject_type = 'Control' 170 label = 'Co' 171 elif 'Pt' in filename: 172 subject_type = 'Patient' 173 label = 'Pt' 174 else: 175 continue # Skip files that don't match expected pattern 176 177 try: 178 # Read the file - PhysioNet files are tab-delimited with variable columns 179 # Column 0: time, Columns 1-16: VGRF sensors, additional columns may exist 180 df = pd.read_csv(filepath, delimiter='\t', header=None) 181 182 # Handle variable number of columns 183 n_cols = min(df.shape[1], 19) # Limit to 19 columns max 184 df = df.iloc[:, :n_cols] 185 186 # Create column names 187 col_names = ['time'] 188 for i in range(1, n_cols): 189 if i <= 8: 190 col_names.append(f'VGRF_L{i}') 191 elif i <= 16: 192 col_names.append(f'VGRF_R{i-8}') 193 else: 194 col_names.append(f'sensor_{i}') 195 196 df.columns = col_names 197 198 # Set time as index 199 df = df.set_index('time') 200 201 # Add subject metadata 202 df['subject_type'] = subject_type 203 df['label'] = label 204 205 physionet_data.append(df) 206 physionet_names.append(filename) 207 self.labels.append(label) 208 self.subject_types.append(subject_type) 209 210 except Exception as e: 211 print(f"Error loading {filename}: {e}") 212 continue 213 214 # Store loaded data 215 self.data = physionet_data 216 self.names = physionet_names 217 218 print(f"Loaded {len(physionet_data)} PhysioNet files") 219 print(f"Subject distribution: {dict(zip(*np.unique(self.subject_types, return_counts=True)))}") 220 221 return physionet_data, physionet_names
Load PhysioNet VGRF dataset from the specified directory.
Args: data_dir: Directory to store/find the dataset **kwargs: Additional arguments (unused for PhysioNet)
Returns: Tuple of (data_list, names_list)
223 def create_sliding_windows(self, data: List[pd.DataFrame], names: List[str], 224 window_size: int = 600, step_size: int = 100) -> List[Dict]: 225 """ 226 Create sliding windows from the PhysioNet dataset. 227 228 Args: 229 data: List of DataFrames containing PhysioNet data 230 names: List of names corresponding to the data 231 window_size: Size of the sliding window (default: 600 for 6 seconds at 100Hz) 232 step_size: Step size for the sliding window (default: 100) 233 234 Returns: 235 List of dictionaries containing sliding windows for each DataFrame 236 """ 237 windows_data = [] 238 239 for idx, df in enumerate(data): 240 # Remove metadata columns for windowing 241 sensor_columns = [col for col in df.columns if col.startswith('VGRF_') or col.startswith('sensor_')] 242 df_sensors = df[sensor_columns] 243 244 if df_sensors.empty or len(df_sensors) < window_size: 245 continue 246 247 windows = [] 248 249 # Create windows for each sensor 250 for col in sensor_columns: 251 try: 252 window_data = sliding_window(df_sensors[col].values, window_size, step_size) 253 windows.append({"name": col, "data": window_data}) 254 except Exception as e: 255 print(f"Error creating windows for {col} in {names[idx]}: {e}") 256 continue 257 258 if windows: 259 windows_data.append({ 260 "name": names[idx], 261 "windows": windows, 262 "metadata": { 263 "subject_type": df['subject_type'].iloc[0] if 'subject_type' in df.columns else 'Unknown', 264 "label": df['label'].iloc[0] if 'label' in df.columns else 'Unknown', 265 "window_size": window_size, 266 "step_size": step_size, 267 "num_windows": len(windows[0]["data"]) if windows else 0 268 } 269 }) 270 271 return windows_data
Create sliding windows from the PhysioNet dataset.
Args: data: List of DataFrames containing PhysioNet data names: List of names corresponding to the data window_size: Size of the sliding window (default: 600 for 6 seconds at 100Hz) step_size: Step size for the sliding window (default: 100)
Returns: List of dictionaries containing sliding windows for each DataFrame
273 def get_supported_formats(self) -> List[str]: 274 """ 275 Get list of supported file formats for PhysioNet dataset. 276 277 Returns: 278 List of supported file extensions 279 """ 280 return ['.txt']
Get list of supported file formats for PhysioNet dataset.
Returns: List of supported file extensions
282 def get_sensor_info(self) -> Dict[str, List[str]]: 283 """ 284 Get information about sensors in the dataset. 285 286 Returns: 287 Dictionary containing sensor information 288 """ 289 return { 290 'sensors': self.metadata['sensors'], 291 'sampling_frequency': self.metadata['sampling_frequency'], 292 'window_size': self.metadata['window_size'] 293 }
Get information about sensors in the dataset.
Returns: Dictionary containing sensor information
295 def get_subject_info(self) -> Dict[str, str]: 296 """ 297 Get information about subjects in the dataset. 298 299 Returns: 300 Dictionary containing subject information 301 """ 302 return self.metadata['subjects']
Get information about subjects in the dataset.
Returns: Dictionary containing subject information
304 def get_labels(self) -> List[str]: 305 """ 306 Get labels for loaded data. 307 308 Returns: 309 List of labels corresponding to loaded data 310 """ 311 return self.labels
Get labels for loaded data.
Returns: List of labels corresponding to loaded data
313 def filter_by_subject_type(self, subject_type: str) -> Tuple[List[pd.DataFrame], List[str]]: 314 """ 315 Filter loaded data by subject type. 316 317 Args: 318 subject_type: 'Control' or 'Patient' 319 320 Returns: 321 Tuple of (filtered_data, filtered_names) 322 """ 323 if not self.data: 324 raise ValueError("No data loaded. Call load_data() first.") 325 326 filtered_data = [] 327 filtered_names = [] 328 329 for i, df in enumerate(self.data): 330 if df['subject_type'].iloc[0] == subject_type: 331 filtered_data.append(df) 332 filtered_names.append(self.names[i]) 333 334 return filtered_data, filtered_names
Filter loaded data by subject type.
Args: subject_type: 'Control' or 'Patient'
Returns: Tuple of (filtered_data, filtered_names)
49class GaitFeatureExtractor(BaseFeatureExtractor): 50 """ 51 Comprehensive gait feature extractor class. 52 53 This class extracts various time-domain, frequency-domain, and statistical features 54 from gait data sliding windows. 55 """ 56 57 def __init__(self, verbose: bool = True): 58 super().__init__( 59 name="gait_features", 60 description="Comprehensive gait feature extractor for time-domain, frequency-domain, and statistical features" 61 ) 62 self.verbose = verbose 63 self.config = { 64 'time_domain': True, 65 'frequency_domain': True, 66 'statistical': True, 67 'ar_order': 3 # Order for auto-regression coefficients 68 } 69 70 if self.verbose: 71 print("🚀 GaitFeatureExtractor initialized successfully!") 72 print(f"📊 Default configuration: {self.config}") 73 74 def extract_features(self, windows: List[Dict], fs: int, **kwargs) -> List[Dict]: 75 """ 76 Extract gait features from sliding windows. 77 78 Args: 79 windows: List of sliding window dictionaries 80 fs: Sampling frequency 81 **kwargs: Additional arguments including time_domain, frequency_domain, statistical flags 82 83 Returns: 84 List of feature dictionaries for each sensor 85 """ 86 # Update config with any passed arguments 87 time_domain = kwargs.get('time_domain', self.config['time_domain']) 88 frequency_domain = kwargs.get('frequency_domain', self.config['frequency_domain']) 89 statistical = kwargs.get('statistical', self.config['statistical']) 90 ar_order = kwargs.get('ar_order', self.config['ar_order']) 91 92 if self.verbose: 93 print("\n" + "="*60) 94 print("🔍 STARTING GAIT FEATURE EXTRACTION") 95 print("="*60) 96 print(f"📈 Total sensors/windows to process: {len(windows)}") 97 print(f"🔊 Sampling frequency: {fs} Hz") 98 print(f"⏱️ Time domain features: {'✅' if time_domain else '❌'}") 99 print(f"🌊 Frequency domain features: {'✅' if frequency_domain else '❌'}") 100 print(f"📊 Statistical features: {'✅' if statistical else '❌'}") 101 print(f"🔄 Auto-regression order: {ar_order}") 102 print("-"*60) 103 104 features = [] 105 106 # Main progress bar for processing all windows 107 main_pbar = tqdm( 108 windows, 109 desc="🔍 Processing Sensors", 110 unit="sensor", 111 disable=not self.verbose 112 ) 113 114 for i, window_dict in enumerate(main_pbar): 115 sensor_name = window_dict['name'] 116 window_data = window_dict['data'] 117 118 if self.verbose: 119 main_pbar.set_postfix({ 120 'Current': sensor_name, 121 'Windows': len(window_data) if isinstance(window_data, list) else 1 122 }) 123 124 # Skip annotation windows 125 if sensor_name == 'annotations': 126 if self.verbose: 127 logger.info(f"📝 Processing annotation data for {sensor_name}") 128 129 features.append({ 130 'name': sensor_name, 131 'features': {}, 132 'annotations': [self._extract_annotation_labels(window) for window in window_data] 133 }) 134 continue 135 136 if self.verbose: 137 logger.info(f"🎯 Processing sensor: {sensor_name}") 138 logger.info(f"📦 Number of windows: {len(window_data)}") 139 140 sensor_features = {'name': sensor_name, 'features': {}} 141 142 # Time domain features 143 if time_domain: 144 if self.verbose: 145 print(f" ⏱️ Extracting time domain features for {sensor_name}...") 146 147 time_features = self._extract_time_domain_features(window_data) 148 sensor_features['features'].update(time_features) 149 150 if self.verbose: 151 feature_count = sum(len(v) if isinstance(v, list) else 1 for v in time_features.values()) 152 print(f" ✅ Time domain: {len(time_features)} feature types, {feature_count} total features") 153 154 # Frequency domain features 155 if frequency_domain: 156 if self.verbose: 157 print(f" 🌊 Extracting frequency domain features for {sensor_name}...") 158 159 freq_features = self._extract_frequency_domain_features(window_data, fs) 160 sensor_features['features'].update(freq_features) 161 162 if self.verbose: 163 feature_count = sum(len(v) if isinstance(v, list) else 1 for v in freq_features.values()) 164 print(f" ✅ Frequency domain: {len(freq_features)} feature types, {feature_count} total features") 165 166 # Statistical features 167 if statistical: 168 if self.verbose: 169 print(f" 📊 Extracting statistical features for {sensor_name}...") 170 171 stat_features = self._extract_statistical_features(window_data) 172 sensor_features['features'].update(stat_features) 173 174 if self.verbose: 175 feature_count = sum(len(v) if isinstance(v, list) else 1 for v in stat_features.values()) 176 print(f" ✅ Statistical: {len(stat_features)} feature types, {feature_count} total features") 177 178 # Auto-regression coefficients 179 if self.verbose: 180 print(f" 🔄 Extracting auto-regression coefficients for {sensor_name}...") 181 182 ar_features = self._extract_ar_coefficients(window_data, ar_order) 183 sensor_features['features'].update(ar_features) 184 185 if self.verbose: 186 feature_count = sum(len(v) if isinstance(v, list) else 1 for v in ar_features.values()) 187 print(f" ✅ Auto-regression: {len(ar_features)} feature types, {feature_count} total features") 188 189 # Calculate total features for this sensor 190 total_features = sum( 191 len(v) if isinstance(v, list) else 1 192 for v in sensor_features['features'].values() 193 ) 194 195 if self.verbose: 196 print(f" 🎯 Total features extracted for {sensor_name}: {total_features}") 197 print(f" 📋 Feature types: {list(sensor_features['features'].keys())}") 198 print("-"*40) 199 200 features.append(sensor_features) 201 202 if self.verbose: 203 print("\n" + "="*60) 204 print("🎉 FEATURE EXTRACTION COMPLETED!") 205 print("="*60) 206 print(f"📊 Total sensors processed: {len(features)}") 207 208 # Calculate overall statistics 209 total_feature_count = 0 210 for feature_dict in features: 211 if 'features' in feature_dict: 212 total_feature_count += sum( 213 len(v) if isinstance(v, list) else 1 214 for v in feature_dict['features'].values() 215 ) 216 217 print(f"🔢 Total features extracted: {total_feature_count}") 218 print(f"📈 Average features per sensor: {total_feature_count / len(features):.1f}") 219 print("="*60) 220 221 return features 222 223 def _extract_time_domain_features(self, windows: List) -> Dict[str, List]: 224 """Extract time domain features from windows.""" 225 if self.verbose: 226 print(" 🔍 Computing time domain features...") 227 228 time_features = {} 229 230 # Define time domain feature functions 231 time_domain_funcs = { 232 'mean': calculate_mean, 233 'std': calculate_standard_deviation, 234 'variance': calculate_variance, 235 'rms': calculate_root_mean_square, 236 'range': calculate_range, 237 'median': calculate_median, 238 'mode': calculate_mode, 239 'mean_absolute_value': calculate_mean_absolute_value, 240 'median_absolute_deviation': calculate_median_absolute_deviation, 241 'peak_height': calculate_peak_height, 242 'zero_crossing_rate': calculate_zero_crossing_rate, 243 'energy': calculate_energy, 244 } 245 246 # Progress bar for time domain features 247 feature_pbar = tqdm( 248 time_domain_funcs.items(), 249 desc=" ⏱️ Time features", 250 unit="feature", 251 leave=False, 252 disable=not self.verbose 253 ) 254 255 for feature_name, func in feature_pbar: 256 if self.verbose: 257 feature_pbar.set_postfix({'Computing': feature_name}) 258 259 time_features[feature_name] = [ 260 func(self._ensure_numpy_array(window)) for window in windows 261 ] 262 263 return time_features 264 265 def _ensure_numpy_array(self, signal): 266 """Convert pandas Series to numpy array if needed.""" 267 if hasattr(signal, 'values'): 268 return signal.values 269 return signal 270 271 def _extract_frequency_domain_features(self, windows: List, fs: int) -> Dict[str, List]: 272 """Extract frequency domain features from windows.""" 273 if self.verbose: 274 print(" 🔍 Computing frequency domain features...") 275 276 freq_features = {} 277 278 # Define frequency domain feature functions 279 freq_domain_funcs = { 280 'dominant_frequency': lambda w: calculate_dominant_frequency(w, fs), 281 'peak_frequency': lambda w: calculate_peak_frequency(w, fs), 282 'power_spectral_entropy': lambda w: calculate_power_spectral_entropy(w, fs), 283 'principal_harmonic_frequency': lambda w: calculate_principal_harmonic_frequency(w, fs), 284 'stride_times': lambda w: calculate_stride_times(w, fs), 285 'step_time': lambda w: calculate_step_time(w, fs), 286 'cadence': lambda w: calculate_cadence(w, fs), 287 'freezing_index': lambda w: calculate_freezing_index(w, fs), 288 } 289 290 # Progress bar for frequency domain features 291 feature_pbar = tqdm( 292 freq_domain_funcs.items(), 293 desc=" 🌊 Freq features", 294 unit="feature", 295 leave=False, 296 disable=not self.verbose 297 ) 298 299 for feature_name, func in feature_pbar: 300 if self.verbose: 301 feature_pbar.set_postfix({'Computing': feature_name}) 302 303 freq_features[feature_name] = [ 304 func(self._ensure_numpy_array(window)) for window in windows 305 ] 306 307 return freq_features 308 309 def _extract_statistical_features(self, windows: List) -> Dict[str, List]: 310 """Extract statistical features from windows.""" 311 if self.verbose: 312 print(" 🔍 Computing statistical features...") 313 314 stat_features = {} 315 316 # Define statistical feature functions 317 stat_funcs = { 318 'skewness': calculate_skewness, 319 'kurtosis': calculate_kurtosis, 320 'entropy': calculate_entropy, 321 'interquartile_range': calculate_interquartile_range, 322 } 323 324 # Progress bar for statistical features 325 feature_pbar = tqdm( 326 stat_funcs.items(), 327 desc=" 📊 Stat features", 328 unit="feature", 329 leave=False, 330 disable=not self.verbose 331 ) 332 333 for feature_name, func in feature_pbar: 334 if self.verbose: 335 feature_pbar.set_postfix({'Computing': feature_name}) 336 337 stat_features[feature_name] = [ 338 func(self._ensure_numpy_array(window)) for window in windows 339 ] 340 341 # Handle correlation separately (needs two signals) 342 if self.verbose: 343 print(" 🔗 Computing correlation features...") 344 345 stat_features['correlation'] = [ 346 calculate_correlation( 347 self._ensure_numpy_array(window)[:-1], 348 self._ensure_numpy_array(window)[1:] 349 ) if len(window) > 1 else 0 350 for window in windows 351 ] 352 353 return stat_features 354 355 def _extract_ar_coefficients(self, windows: List, order: int) -> Dict[str, List]: 356 """Extract auto-regression coefficients from windows.""" 357 if self.verbose: 358 print(f" 🔍 Computing auto-regression coefficients (order={order})...") 359 360 # Progress bar for AR coefficients 361 ar_pbar = tqdm( 362 windows, 363 desc=" 🔄 AR coeffs", 364 unit="window", 365 leave=False, 366 disable=not self.verbose 367 ) 368 369 ar_coeffs = [] 370 for window in ar_pbar: 371 coeffs = calculate_auto_regression_coefficients( 372 self._ensure_numpy_array(window), order 373 ) 374 ar_coeffs.append(coeffs) 375 376 return {'ar_coefficients': ar_coeffs} 377 378 def _extract_annotation_labels(self, window) -> int: 379 """Extract the most common annotation label from a window.""" 380 if hasattr(window, 'mode'): 381 return window.mode().iloc[0] if len(window.mode()) > 0 else 0 382 else: 383 # For numpy arrays or other types 384 unique, counts = np.unique(window, return_counts=True) 385 return unique[np.argmax(counts)] 386 387 def get_feature_names(self) -> List[str]: 388 """ 389 Get names of all features that can be extracted. 390 391 Returns: 392 List of feature names 393 """ 394 time_domain_features = [ 395 'mean', 'std', 'variance', 'rms', 'range', 'median', 'mode', 396 'mean_absolute_value', 'median_absolute_deviation', 'peak_height', 397 'zero_crossing_rate', 'energy' 398 ] 399 400 frequency_domain_features = [ 401 'dominant_frequency', 'peak_frequency', 'power_spectral_entropy', 402 'principal_harmonic_frequency', 'stride_times', 'step_time', 403 'cadence', 'freezing_index' 404 ] 405 406 statistical_features = [ 407 'skewness', 'kurtosis', 'entropy', 'interquartile_range', 'correlation' 408 ] 409 410 other_features = ['ar_coefficients'] 411 412 return time_domain_features + frequency_domain_features + statistical_features + other_features 413 414 def print_extraction_summary(self, features: List[Dict]) -> None: 415 """ 416 Print a detailed summary of extracted features. 417 418 Args: 419 features: List of feature dictionaries returned by extract_features 420 """ 421 print("\n" + "="*80) 422 print("📊 FEATURE EXTRACTION SUMMARY") 423 print("="*80) 424 425 for i, feature_dict in enumerate(features): 426 sensor_name = feature_dict['name'] 427 print(f"\n🎯 Sensor {i+1}: {sensor_name}") 428 print("-" * 40) 429 430 if 'features' in feature_dict and feature_dict['features']: 431 for feature_type, feature_values in feature_dict['features'].items(): 432 if isinstance(feature_values, list): 433 print(f" 📈 {feature_type}: {len(feature_values)} values") 434 if feature_values: 435 sample_value = feature_values[0] 436 if isinstance(sample_value, (list, np.ndarray)): 437 print(f" └── Shape per window: {np.array(sample_value).shape}") 438 else: 439 print(f" └── Sample value: {sample_value:.4f}") 440 else: 441 print(f" 📈 {feature_type}: {feature_values}") 442 443 if 'annotations' in feature_dict: 444 print(f" 📝 Annotations: {len(feature_dict['annotations'])} windows") 445 446 print("\n" + "="*80)
Comprehensive gait feature extractor class.
This class extracts various time-domain, frequency-domain, and statistical features from gait data sliding windows.
57 def __init__(self, verbose: bool = True): 58 super().__init__( 59 name="gait_features", 60 description="Comprehensive gait feature extractor for time-domain, frequency-domain, and statistical features" 61 ) 62 self.verbose = verbose 63 self.config = { 64 'time_domain': True, 65 'frequency_domain': True, 66 'statistical': True, 67 'ar_order': 3 # Order for auto-regression coefficients 68 } 69 70 if self.verbose: 71 print("🚀 GaitFeatureExtractor initialized successfully!") 72 print(f"📊 Default configuration: {self.config}")
Initialize the feature extractor.
Args: name: Name of the feature extractor description: Description of the feature extractor
74 def extract_features(self, windows: List[Dict], fs: int, **kwargs) -> List[Dict]: 75 """ 76 Extract gait features from sliding windows. 77 78 Args: 79 windows: List of sliding window dictionaries 80 fs: Sampling frequency 81 **kwargs: Additional arguments including time_domain, frequency_domain, statistical flags 82 83 Returns: 84 List of feature dictionaries for each sensor 85 """ 86 # Update config with any passed arguments 87 time_domain = kwargs.get('time_domain', self.config['time_domain']) 88 frequency_domain = kwargs.get('frequency_domain', self.config['frequency_domain']) 89 statistical = kwargs.get('statistical', self.config['statistical']) 90 ar_order = kwargs.get('ar_order', self.config['ar_order']) 91 92 if self.verbose: 93 print("\n" + "="*60) 94 print("🔍 STARTING GAIT FEATURE EXTRACTION") 95 print("="*60) 96 print(f"📈 Total sensors/windows to process: {len(windows)}") 97 print(f"🔊 Sampling frequency: {fs} Hz") 98 print(f"⏱️ Time domain features: {'✅' if time_domain else '❌'}") 99 print(f"🌊 Frequency domain features: {'✅' if frequency_domain else '❌'}") 100 print(f"📊 Statistical features: {'✅' if statistical else '❌'}") 101 print(f"🔄 Auto-regression order: {ar_order}") 102 print("-"*60) 103 104 features = [] 105 106 # Main progress bar for processing all windows 107 main_pbar = tqdm( 108 windows, 109 desc="🔍 Processing Sensors", 110 unit="sensor", 111 disable=not self.verbose 112 ) 113 114 for i, window_dict in enumerate(main_pbar): 115 sensor_name = window_dict['name'] 116 window_data = window_dict['data'] 117 118 if self.verbose: 119 main_pbar.set_postfix({ 120 'Current': sensor_name, 121 'Windows': len(window_data) if isinstance(window_data, list) else 1 122 }) 123 124 # Skip annotation windows 125 if sensor_name == 'annotations': 126 if self.verbose: 127 logger.info(f"📝 Processing annotation data for {sensor_name}") 128 129 features.append({ 130 'name': sensor_name, 131 'features': {}, 132 'annotations': [self._extract_annotation_labels(window) for window in window_data] 133 }) 134 continue 135 136 if self.verbose: 137 logger.info(f"🎯 Processing sensor: {sensor_name}") 138 logger.info(f"📦 Number of windows: {len(window_data)}") 139 140 sensor_features = {'name': sensor_name, 'features': {}} 141 142 # Time domain features 143 if time_domain: 144 if self.verbose: 145 print(f" ⏱️ Extracting time domain features for {sensor_name}...") 146 147 time_features = self._extract_time_domain_features(window_data) 148 sensor_features['features'].update(time_features) 149 150 if self.verbose: 151 feature_count = sum(len(v) if isinstance(v, list) else 1 for v in time_features.values()) 152 print(f" ✅ Time domain: {len(time_features)} feature types, {feature_count} total features") 153 154 # Frequency domain features 155 if frequency_domain: 156 if self.verbose: 157 print(f" 🌊 Extracting frequency domain features for {sensor_name}...") 158 159 freq_features = self._extract_frequency_domain_features(window_data, fs) 160 sensor_features['features'].update(freq_features) 161 162 if self.verbose: 163 feature_count = sum(len(v) if isinstance(v, list) else 1 for v in freq_features.values()) 164 print(f" ✅ Frequency domain: {len(freq_features)} feature types, {feature_count} total features") 165 166 # Statistical features 167 if statistical: 168 if self.verbose: 169 print(f" 📊 Extracting statistical features for {sensor_name}...") 170 171 stat_features = self._extract_statistical_features(window_data) 172 sensor_features['features'].update(stat_features) 173 174 if self.verbose: 175 feature_count = sum(len(v) if isinstance(v, list) else 1 for v in stat_features.values()) 176 print(f" ✅ Statistical: {len(stat_features)} feature types, {feature_count} total features") 177 178 # Auto-regression coefficients 179 if self.verbose: 180 print(f" 🔄 Extracting auto-regression coefficients for {sensor_name}...") 181 182 ar_features = self._extract_ar_coefficients(window_data, ar_order) 183 sensor_features['features'].update(ar_features) 184 185 if self.verbose: 186 feature_count = sum(len(v) if isinstance(v, list) else 1 for v in ar_features.values()) 187 print(f" ✅ Auto-regression: {len(ar_features)} feature types, {feature_count} total features") 188 189 # Calculate total features for this sensor 190 total_features = sum( 191 len(v) if isinstance(v, list) else 1 192 for v in sensor_features['features'].values() 193 ) 194 195 if self.verbose: 196 print(f" 🎯 Total features extracted for {sensor_name}: {total_features}") 197 print(f" 📋 Feature types: {list(sensor_features['features'].keys())}") 198 print("-"*40) 199 200 features.append(sensor_features) 201 202 if self.verbose: 203 print("\n" + "="*60) 204 print("🎉 FEATURE EXTRACTION COMPLETED!") 205 print("="*60) 206 print(f"📊 Total sensors processed: {len(features)}") 207 208 # Calculate overall statistics 209 total_feature_count = 0 210 for feature_dict in features: 211 if 'features' in feature_dict: 212 total_feature_count += sum( 213 len(v) if isinstance(v, list) else 1 214 for v in feature_dict['features'].values() 215 ) 216 217 print(f"🔢 Total features extracted: {total_feature_count}") 218 print(f"📈 Average features per sensor: {total_feature_count / len(features):.1f}") 219 print("="*60) 220 221 return features
Extract gait features from sliding windows.
Args: windows: List of sliding window dictionaries fs: Sampling frequency **kwargs: Additional arguments including time_domain, frequency_domain, statistical flags
Returns: List of feature dictionaries for each sensor
387 def get_feature_names(self) -> List[str]: 388 """ 389 Get names of all features that can be extracted. 390 391 Returns: 392 List of feature names 393 """ 394 time_domain_features = [ 395 'mean', 'std', 'variance', 'rms', 'range', 'median', 'mode', 396 'mean_absolute_value', 'median_absolute_deviation', 'peak_height', 397 'zero_crossing_rate', 'energy' 398 ] 399 400 frequency_domain_features = [ 401 'dominant_frequency', 'peak_frequency', 'power_spectral_entropy', 402 'principal_harmonic_frequency', 'stride_times', 'step_time', 403 'cadence', 'freezing_index' 404 ] 405 406 statistical_features = [ 407 'skewness', 'kurtosis', 'entropy', 'interquartile_range', 'correlation' 408 ] 409 410 other_features = ['ar_coefficients'] 411 412 return time_domain_features + frequency_domain_features + statistical_features + other_features
Get names of all features that can be extracted.
Returns: List of feature names
414 def print_extraction_summary(self, features: List[Dict]) -> None: 415 """ 416 Print a detailed summary of extracted features. 417 418 Args: 419 features: List of feature dictionaries returned by extract_features 420 """ 421 print("\n" + "="*80) 422 print("📊 FEATURE EXTRACTION SUMMARY") 423 print("="*80) 424 425 for i, feature_dict in enumerate(features): 426 sensor_name = feature_dict['name'] 427 print(f"\n🎯 Sensor {i+1}: {sensor_name}") 428 print("-" * 40) 429 430 if 'features' in feature_dict and feature_dict['features']: 431 for feature_type, feature_values in feature_dict['features'].items(): 432 if isinstance(feature_values, list): 433 print(f" 📈 {feature_type}: {len(feature_values)} values") 434 if feature_values: 435 sample_value = feature_values[0] 436 if isinstance(sample_value, (list, np.ndarray)): 437 print(f" └── Shape per window: {np.array(sample_value).shape}") 438 else: 439 print(f" └── Sample value: {sample_value:.4f}") 440 else: 441 print(f" 📈 {feature_type}: {feature_values}") 442 443 if 'annotations' in feature_dict: 444 print(f" 📝 Annotations: {len(feature_dict['annotations'])} windows") 445 446 print("\n" + "="*80)
Print a detailed summary of extracted features.
Args: features: List of feature dictionaries returned by extract_features
Inherited Members
26class LBPFeatureExtractor(BaseFeatureExtractor): 27 """ 28 Local Binary Pattern (LBP) feature extractor for VGRF data. 29 30 This extractor converts time-series data into LBP codes and extracts 31 histogram features from the LBP representation. 32 """ 33 34 def __init__(self, verbose: bool = True): 35 super().__init__( 36 name="lbp_features", 37 description="Local Binary Pattern feature extractor for VGRF time-series data" 38 ) 39 self.verbose = verbose 40 self.config = { 41 'radius': 2, # LBP radius (number of neighbors) 42 'n_bins': 256, # Number of histogram bins 43 'normalize': True # Normalize histogram 44 } 45 46 if self.verbose: 47 print("🔍 LBP Feature Extractor initialized!") 48 49 def lbp_1d(self, data: np.ndarray, radius: int = 2) -> str: 50 """ 51 Compute 1D Local Binary Pattern for time-series data. 52 53 Args: 54 data: Input time-series data 55 radius: Radius for LBP computation 56 57 Returns: 58 LBP code as binary string 59 """ 60 n = len(data) 61 lbp_code = '' 62 63 for i in range(n): 64 pattern = '' 65 for j in range(i - radius, i + radius + 1): 66 if j < 0 or j >= n: 67 pattern += '0' 68 elif data[j] >= data[i]: 69 pattern += '1' 70 else: 71 pattern += '0' 72 lbp_code += pattern 73 74 return lbp_code 75 76 def lbp_to_histogram(self, lbp_code: str, n_bins: int = 256, normalize: bool = True) -> np.ndarray: 77 """ 78 Convert LBP code to histogram features. 79 80 Args: 81 lbp_code: Binary LBP code string 82 n_bins: Number of histogram bins 83 normalize: Whether to normalize histogram 84 85 Returns: 86 Histogram features as numpy array 87 """ 88 # Convert LBP code to integer values 89 if len(lbp_code) == 0: 90 return np.zeros(n_bins) 91 92 # Process LBP code in chunks of 8 bits (or smaller) 93 chunk_size = 8 94 lbp_values = [] 95 96 for i in range(0, len(lbp_code), chunk_size): 97 chunk = lbp_code[i:i + chunk_size] 98 if len(chunk) > 0: 99 # Convert binary string to integer 100 try: 101 value = int(chunk, 2) 102 lbp_values.append(value % n_bins) # Ensure within bin range 103 except ValueError: 104 continue 105 106 if len(lbp_values) == 0: 107 return np.zeros(n_bins) 108 109 # Create histogram 110 hist, _ = np.histogram(lbp_values, bins=n_bins, range=(0, n_bins)) 111 112 if normalize and np.sum(hist) > 0: 113 hist = hist / np.sum(hist) 114 115 return hist 116 117 def extract_features(self, windows: List[Dict], fs: int, **kwargs) -> List[Dict]: 118 """ 119 Extract LBP features from sliding windows. 120 121 Args: 122 windows: List of sliding window dictionaries 123 fs: Sampling frequency (unused for LBP) 124 **kwargs: Additional arguments 125 126 Returns: 127 List of feature dictionaries 128 """ 129 # Update config with any passed arguments 130 radius = kwargs.get('radius', self.config['radius']) 131 n_bins = kwargs.get('n_bins', self.config['n_bins']) 132 normalize = kwargs.get('normalize', self.config['normalize']) 133 134 if self.verbose: 135 print(f"\n🔍 LBP Feature Extraction") 136 print(f"📊 Radius: {radius}, Bins: {n_bins}, Normalize: {normalize}") 137 138 features = [] 139 140 for window_dict in tqdm(windows, desc="Processing LBP features", disable=not self.verbose): 141 sensor_name = window_dict['name'] 142 window_data = window_dict['data'] 143 144 # Skip annotation windows 145 if sensor_name == 'annotations': 146 continue 147 148 sensor_features = {'name': sensor_name, 'features': {}} 149 150 # Extract LBP features for each window 151 lbp_histograms = [] 152 lbp_means = [] 153 lbp_stds = [] 154 155 for window in window_data: 156 # Ensure window is numpy array 157 if hasattr(window, 'values'): 158 window = window.values 159 160 # Compute LBP 161 lbp_code = self.lbp_1d(window, radius) 162 163 # Convert to histogram 164 hist = self.lbp_to_histogram(lbp_code, n_bins, normalize) 165 lbp_histograms.append(hist) 166 167 # Extract summary statistics 168 lbp_means.append(np.mean(hist)) 169 lbp_stds.append(np.std(hist)) 170 171 # Store features 172 sensor_features['features'] = { 173 'lbp_histograms': lbp_histograms, 174 'lbp_mean': lbp_means, 175 'lbp_std': lbp_stds, 176 'lbp_energy': [np.sum(hist**2) for hist in lbp_histograms], 177 'lbp_entropy': [self._calculate_entropy(hist) for hist in lbp_histograms] 178 } 179 180 features.append(sensor_features) 181 182 return features 183 184 def _calculate_entropy(self, hist: np.ndarray) -> float: 185 """Calculate entropy of histogram.""" 186 # Avoid log(0) by adding small value 187 hist = hist + 1e-10 188 return -np.sum(hist * np.log2(hist)) 189 190 def get_feature_names(self) -> List[str]: 191 """Get names of LBP features.""" 192 return [ 193 'lbp_histograms', 'lbp_mean', 'lbp_std', 194 'lbp_energy', 'lbp_entropy' 195 ]
Local Binary Pattern (LBP) feature extractor for VGRF data.
This extractor converts time-series data into LBP codes and extracts histogram features from the LBP representation.
34 def __init__(self, verbose: bool = True): 35 super().__init__( 36 name="lbp_features", 37 description="Local Binary Pattern feature extractor for VGRF time-series data" 38 ) 39 self.verbose = verbose 40 self.config = { 41 'radius': 2, # LBP radius (number of neighbors) 42 'n_bins': 256, # Number of histogram bins 43 'normalize': True # Normalize histogram 44 } 45 46 if self.verbose: 47 print("🔍 LBP Feature Extractor initialized!")
Initialize the feature extractor.
Args: name: Name of the feature extractor description: Description of the feature extractor
49 def lbp_1d(self, data: np.ndarray, radius: int = 2) -> str: 50 """ 51 Compute 1D Local Binary Pattern for time-series data. 52 53 Args: 54 data: Input time-series data 55 radius: Radius for LBP computation 56 57 Returns: 58 LBP code as binary string 59 """ 60 n = len(data) 61 lbp_code = '' 62 63 for i in range(n): 64 pattern = '' 65 for j in range(i - radius, i + radius + 1): 66 if j < 0 or j >= n: 67 pattern += '0' 68 elif data[j] >= data[i]: 69 pattern += '1' 70 else: 71 pattern += '0' 72 lbp_code += pattern 73 74 return lbp_code
Compute 1D Local Binary Pattern for time-series data.
Args: data: Input time-series data radius: Radius for LBP computation
Returns: LBP code as binary string
76 def lbp_to_histogram(self, lbp_code: str, n_bins: int = 256, normalize: bool = True) -> np.ndarray: 77 """ 78 Convert LBP code to histogram features. 79 80 Args: 81 lbp_code: Binary LBP code string 82 n_bins: Number of histogram bins 83 normalize: Whether to normalize histogram 84 85 Returns: 86 Histogram features as numpy array 87 """ 88 # Convert LBP code to integer values 89 if len(lbp_code) == 0: 90 return np.zeros(n_bins) 91 92 # Process LBP code in chunks of 8 bits (or smaller) 93 chunk_size = 8 94 lbp_values = [] 95 96 for i in range(0, len(lbp_code), chunk_size): 97 chunk = lbp_code[i:i + chunk_size] 98 if len(chunk) > 0: 99 # Convert binary string to integer 100 try: 101 value = int(chunk, 2) 102 lbp_values.append(value % n_bins) # Ensure within bin range 103 except ValueError: 104 continue 105 106 if len(lbp_values) == 0: 107 return np.zeros(n_bins) 108 109 # Create histogram 110 hist, _ = np.histogram(lbp_values, bins=n_bins, range=(0, n_bins)) 111 112 if normalize and np.sum(hist) > 0: 113 hist = hist / np.sum(hist) 114 115 return hist
Convert LBP code to histogram features.
Args: lbp_code: Binary LBP code string n_bins: Number of histogram bins normalize: Whether to normalize histogram
Returns: Histogram features as numpy array
117 def extract_features(self, windows: List[Dict], fs: int, **kwargs) -> List[Dict]: 118 """ 119 Extract LBP features from sliding windows. 120 121 Args: 122 windows: List of sliding window dictionaries 123 fs: Sampling frequency (unused for LBP) 124 **kwargs: Additional arguments 125 126 Returns: 127 List of feature dictionaries 128 """ 129 # Update config with any passed arguments 130 radius = kwargs.get('radius', self.config['radius']) 131 n_bins = kwargs.get('n_bins', self.config['n_bins']) 132 normalize = kwargs.get('normalize', self.config['normalize']) 133 134 if self.verbose: 135 print(f"\n🔍 LBP Feature Extraction") 136 print(f"📊 Radius: {radius}, Bins: {n_bins}, Normalize: {normalize}") 137 138 features = [] 139 140 for window_dict in tqdm(windows, desc="Processing LBP features", disable=not self.verbose): 141 sensor_name = window_dict['name'] 142 window_data = window_dict['data'] 143 144 # Skip annotation windows 145 if sensor_name == 'annotations': 146 continue 147 148 sensor_features = {'name': sensor_name, 'features': {}} 149 150 # Extract LBP features for each window 151 lbp_histograms = [] 152 lbp_means = [] 153 lbp_stds = [] 154 155 for window in window_data: 156 # Ensure window is numpy array 157 if hasattr(window, 'values'): 158 window = window.values 159 160 # Compute LBP 161 lbp_code = self.lbp_1d(window, radius) 162 163 # Convert to histogram 164 hist = self.lbp_to_histogram(lbp_code, n_bins, normalize) 165 lbp_histograms.append(hist) 166 167 # Extract summary statistics 168 lbp_means.append(np.mean(hist)) 169 lbp_stds.append(np.std(hist)) 170 171 # Store features 172 sensor_features['features'] = { 173 'lbp_histograms': lbp_histograms, 174 'lbp_mean': lbp_means, 175 'lbp_std': lbp_stds, 176 'lbp_energy': [np.sum(hist**2) for hist in lbp_histograms], 177 'lbp_entropy': [self._calculate_entropy(hist) for hist in lbp_histograms] 178 } 179 180 features.append(sensor_features) 181 182 return features
Extract LBP features from sliding windows.
Args: windows: List of sliding window dictionaries fs: Sampling frequency (unused for LBP) **kwargs: Additional arguments
Returns: List of feature dictionaries
190 def get_feature_names(self) -> List[str]: 191 """Get names of LBP features.""" 192 return [ 193 'lbp_histograms', 'lbp_mean', 'lbp_std', 194 'lbp_energy', 'lbp_entropy' 195 ]
Get names of LBP features.
Inherited Members
198class FourierSeriesFeatureExtractor(BaseFeatureExtractor): 199 """ 200 Fourier Series feature extractor for VGRF data. 201 202 This extractor fits Fourier series to time-series data and extracts 203 coefficients and reconstruction features. 204 """ 205 206 def __init__(self, verbose: bool = True): 207 super().__init__( 208 name="fourier_features", 209 description="Fourier series feature extractor for VGRF time-series data" 210 ) 211 self.verbose = verbose 212 self.config = { 213 'n_terms': 10, # Number of Fourier terms 214 'period': 3.0, # Period for Fourier series 215 'extract_coefficients': True, 216 'extract_reconstruction_error': True 217 } 218 219 if self.verbose: 220 print("🌊 Fourier Series Feature Extractor initialized!") 221 222 def fit_fourier_series(self, signal: np.ndarray, time_points: np.ndarray, 223 period: float = 3.0, n_terms: int = 10) -> Dict[str, Any]: 224 """ 225 Fit Fourier series to signal. 226 227 Args: 228 signal: Input signal 229 time_points: Time points 230 period: Period of the Fourier series 231 n_terms: Number of Fourier terms 232 233 Returns: 234 Dictionary containing Fourier series parameters 235 """ 236 try: 237 # Calculate Fourier coefficients 238 L = period 239 240 # Calculate a0 (DC component) 241 a0 = 2/L * simpson(signal, time_points) 242 243 # Calculate an and bn coefficients 244 an = [] 245 bn = [] 246 247 for n in range(1, n_terms + 1): 248 # Calculate an coefficient 249 an_val = 2.0/L * simpson(signal * np.cos(2.*np.pi*n*time_points/L), time_points) 250 an.append(an_val) 251 252 # Calculate bn coefficient 253 bn_val = 2.0/L * simpson(signal * np.sin(2.*np.pi*n*time_points/L), time_points) 254 bn.append(bn_val) 255 256 # Reconstruct signal 257 reconstructed = np.full_like(time_points, a0/2) 258 for n in range(n_terms): 259 reconstructed += an[n] * np.cos(2.*np.pi*(n+1)*time_points/L) 260 reconstructed += bn[n] * np.sin(2.*np.pi*(n+1)*time_points/L) 261 262 # Calculate reconstruction error 263 reconstruction_error = np.mean((signal - reconstructed)**2) 264 265 return { 266 'a0': a0, 267 'an': an, 268 'bn': bn, 269 'reconstructed': reconstructed, 270 'reconstruction_error': reconstruction_error, 271 'fourier_energy': a0**2 + 2*np.sum(np.array(an)**2 + np.array(bn)**2) 272 } 273 274 except Exception as e: 275 if self.verbose: 276 print(f"Error in Fourier series fitting: {e}") 277 return { 278 'a0': 0, 279 'an': [0] * n_terms, 280 'bn': [0] * n_terms, 281 'reconstructed': np.zeros_like(time_points), 282 'reconstruction_error': float('inf'), 283 'fourier_energy': 0 284 } 285 286 def extract_features(self, windows: List[Dict], fs: int, **kwargs) -> List[Dict]: 287 """ 288 Extract Fourier series features from sliding windows. 289 290 Args: 291 windows: List of sliding window dictionaries 292 fs: Sampling frequency 293 **kwargs: Additional arguments 294 295 Returns: 296 List of feature dictionaries 297 """ 298 # Update config with any passed arguments 299 n_terms = kwargs.get('n_terms', self.config['n_terms']) 300 period = kwargs.get('period', self.config['period']) 301 302 if self.verbose: 303 print(f"\n🌊 Fourier Series Feature Extraction") 304 print(f"📊 Terms: {n_terms}, Period: {period}") 305 306 features = [] 307 308 for window_dict in tqdm(windows, desc="Processing Fourier features", disable=not self.verbose): 309 sensor_name = window_dict['name'] 310 window_data = window_dict['data'] 311 312 # Skip annotation windows 313 if sensor_name == 'annotations': 314 continue 315 316 sensor_features = {'name': sensor_name, 'features': {}} 317 318 # Extract Fourier features for each window 319 a0_values = [] 320 an_values = [] 321 bn_values = [] 322 reconstruction_errors = [] 323 fourier_energies = [] 324 325 for window in window_data: 326 # Ensure window is numpy array 327 if hasattr(window, 'values'): 328 window = window.values 329 330 # Create time points 331 time_points = np.linspace(0, period, len(window)) 332 333 # Fit Fourier series 334 fourier_result = self.fit_fourier_series(window, time_points, period, n_terms) 335 336 # Store results 337 a0_values.append(fourier_result['a0']) 338 an_values.append(fourier_result['an']) 339 bn_values.append(fourier_result['bn']) 340 reconstruction_errors.append(fourier_result['reconstruction_error']) 341 fourier_energies.append(fourier_result['fourier_energy']) 342 343 # Store features 344 sensor_features['features'] = { 345 'fourier_a0': a0_values, 346 'fourier_an': an_values, 347 'fourier_bn': bn_values, 348 'fourier_reconstruction_error': reconstruction_errors, 349 'fourier_energy': fourier_energies, 350 'fourier_an_mean': [np.mean(an) for an in an_values], 351 'fourier_bn_mean': [np.mean(bn) for bn in bn_values], 352 'fourier_an_std': [np.std(an) for an in an_values], 353 'fourier_bn_std': [np.std(bn) for bn in bn_values] 354 } 355 356 features.append(sensor_features) 357 358 return features 359 360 def get_feature_names(self) -> List[str]: 361 """Get names of Fourier series features.""" 362 return [ 363 'fourier_a0', 'fourier_an', 'fourier_bn', 364 'fourier_reconstruction_error', 'fourier_energy', 365 'fourier_an_mean', 'fourier_bn_mean', 366 'fourier_an_std', 'fourier_bn_std' 367 ]
Fourier Series feature extractor for VGRF data.
This extractor fits Fourier series to time-series data and extracts coefficients and reconstruction features.
206 def __init__(self, verbose: bool = True): 207 super().__init__( 208 name="fourier_features", 209 description="Fourier series feature extractor for VGRF time-series data" 210 ) 211 self.verbose = verbose 212 self.config = { 213 'n_terms': 10, # Number of Fourier terms 214 'period': 3.0, # Period for Fourier series 215 'extract_coefficients': True, 216 'extract_reconstruction_error': True 217 } 218 219 if self.verbose: 220 print("🌊 Fourier Series Feature Extractor initialized!")
Initialize the feature extractor.
Args: name: Name of the feature extractor description: Description of the feature extractor
222 def fit_fourier_series(self, signal: np.ndarray, time_points: np.ndarray, 223 period: float = 3.0, n_terms: int = 10) -> Dict[str, Any]: 224 """ 225 Fit Fourier series to signal. 226 227 Args: 228 signal: Input signal 229 time_points: Time points 230 period: Period of the Fourier series 231 n_terms: Number of Fourier terms 232 233 Returns: 234 Dictionary containing Fourier series parameters 235 """ 236 try: 237 # Calculate Fourier coefficients 238 L = period 239 240 # Calculate a0 (DC component) 241 a0 = 2/L * simpson(signal, time_points) 242 243 # Calculate an and bn coefficients 244 an = [] 245 bn = [] 246 247 for n in range(1, n_terms + 1): 248 # Calculate an coefficient 249 an_val = 2.0/L * simpson(signal * np.cos(2.*np.pi*n*time_points/L), time_points) 250 an.append(an_val) 251 252 # Calculate bn coefficient 253 bn_val = 2.0/L * simpson(signal * np.sin(2.*np.pi*n*time_points/L), time_points) 254 bn.append(bn_val) 255 256 # Reconstruct signal 257 reconstructed = np.full_like(time_points, a0/2) 258 for n in range(n_terms): 259 reconstructed += an[n] * np.cos(2.*np.pi*(n+1)*time_points/L) 260 reconstructed += bn[n] * np.sin(2.*np.pi*(n+1)*time_points/L) 261 262 # Calculate reconstruction error 263 reconstruction_error = np.mean((signal - reconstructed)**2) 264 265 return { 266 'a0': a0, 267 'an': an, 268 'bn': bn, 269 'reconstructed': reconstructed, 270 'reconstruction_error': reconstruction_error, 271 'fourier_energy': a0**2 + 2*np.sum(np.array(an)**2 + np.array(bn)**2) 272 } 273 274 except Exception as e: 275 if self.verbose: 276 print(f"Error in Fourier series fitting: {e}") 277 return { 278 'a0': 0, 279 'an': [0] * n_terms, 280 'bn': [0] * n_terms, 281 'reconstructed': np.zeros_like(time_points), 282 'reconstruction_error': float('inf'), 283 'fourier_energy': 0 284 }
Fit Fourier series to signal.
Args: signal: Input signal time_points: Time points period: Period of the Fourier series n_terms: Number of Fourier terms
Returns: Dictionary containing Fourier series parameters
286 def extract_features(self, windows: List[Dict], fs: int, **kwargs) -> List[Dict]: 287 """ 288 Extract Fourier series features from sliding windows. 289 290 Args: 291 windows: List of sliding window dictionaries 292 fs: Sampling frequency 293 **kwargs: Additional arguments 294 295 Returns: 296 List of feature dictionaries 297 """ 298 # Update config with any passed arguments 299 n_terms = kwargs.get('n_terms', self.config['n_terms']) 300 period = kwargs.get('period', self.config['period']) 301 302 if self.verbose: 303 print(f"\n🌊 Fourier Series Feature Extraction") 304 print(f"📊 Terms: {n_terms}, Period: {period}") 305 306 features = [] 307 308 for window_dict in tqdm(windows, desc="Processing Fourier features", disable=not self.verbose): 309 sensor_name = window_dict['name'] 310 window_data = window_dict['data'] 311 312 # Skip annotation windows 313 if sensor_name == 'annotations': 314 continue 315 316 sensor_features = {'name': sensor_name, 'features': {}} 317 318 # Extract Fourier features for each window 319 a0_values = [] 320 an_values = [] 321 bn_values = [] 322 reconstruction_errors = [] 323 fourier_energies = [] 324 325 for window in window_data: 326 # Ensure window is numpy array 327 if hasattr(window, 'values'): 328 window = window.values 329 330 # Create time points 331 time_points = np.linspace(0, period, len(window)) 332 333 # Fit Fourier series 334 fourier_result = self.fit_fourier_series(window, time_points, period, n_terms) 335 336 # Store results 337 a0_values.append(fourier_result['a0']) 338 an_values.append(fourier_result['an']) 339 bn_values.append(fourier_result['bn']) 340 reconstruction_errors.append(fourier_result['reconstruction_error']) 341 fourier_energies.append(fourier_result['fourier_energy']) 342 343 # Store features 344 sensor_features['features'] = { 345 'fourier_a0': a0_values, 346 'fourier_an': an_values, 347 'fourier_bn': bn_values, 348 'fourier_reconstruction_error': reconstruction_errors, 349 'fourier_energy': fourier_energies, 350 'fourier_an_mean': [np.mean(an) for an in an_values], 351 'fourier_bn_mean': [np.mean(bn) for bn in bn_values], 352 'fourier_an_std': [np.std(an) for an in an_values], 353 'fourier_bn_std': [np.std(bn) for bn in bn_values] 354 } 355 356 features.append(sensor_features) 357 358 return features
Extract Fourier series features from sliding windows.
Args: windows: List of sliding window dictionaries fs: Sampling frequency **kwargs: Additional arguments
Returns: List of feature dictionaries
360 def get_feature_names(self) -> List[str]: 361 """Get names of Fourier series features.""" 362 return [ 363 'fourier_a0', 'fourier_an', 'fourier_bn', 364 'fourier_reconstruction_error', 'fourier_energy', 365 'fourier_an_mean', 'fourier_bn_mean', 366 'fourier_an_std', 'fourier_bn_std' 367 ]
Get names of Fourier series features.
Inherited Members
370class PhysioNetFeatureExtractor(BaseFeatureExtractor): 371 """ 372 Combined feature extractor for PhysioNet VGRF data. 373 374 This extractor combines LBP and Fourier series features along with 375 basic statistical features specific to VGRF data. 376 """ 377 378 def __init__(self, verbose: bool = True): 379 super().__init__( 380 name="physionet_features", 381 description="Combined feature extractor for PhysioNet VGRF data including LBP and Fourier features" 382 ) 383 self.verbose = verbose 384 self.lbp_extractor = LBPFeatureExtractor(verbose=False) 385 self.fourier_extractor = FourierSeriesFeatureExtractor(verbose=False) 386 387 if self.verbose: 388 print("🚀 PhysioNet Feature Extractor initialized!") 389 390 def extract_features(self, windows: List[Dict], fs: int, **kwargs) -> List[Dict]: 391 """ 392 Extract combined features from sliding windows. 393 394 Args: 395 windows: List of sliding window dictionaries 396 fs: Sampling frequency 397 **kwargs: Additional arguments 398 399 Returns: 400 List of feature dictionaries 401 """ 402 # Extract features from each extractor 403 extract_lbp = kwargs.get('extract_lbp', True) 404 extract_fourier = kwargs.get('extract_fourier', True) 405 extract_statistical = kwargs.get('extract_statistical', True) 406 407 if self.verbose: 408 print(f"\n🔍 PhysioNet Feature Extraction") 409 print(f"📊 LBP: {extract_lbp}, Fourier: {extract_fourier}, Statistical: {extract_statistical}") 410 411 features = [] 412 413 # Extract LBP features 414 if extract_lbp: 415 lbp_features = self.lbp_extractor.extract_features(windows, fs, **kwargs) 416 else: 417 lbp_features = [] 418 419 # Extract Fourier features 420 if extract_fourier: 421 fourier_features = self.fourier_extractor.extract_features(windows, fs, **kwargs) 422 else: 423 fourier_features = [] 424 425 # Extract statistical features 426 if extract_statistical: 427 statistical_features = self._extract_statistical_features(windows) 428 else: 429 statistical_features = [] 430 431 # Combine features 432 for i, window_dict in enumerate(windows): 433 sensor_name = window_dict['name'] 434 435 # Skip annotation windows 436 if sensor_name == 'annotations': 437 continue 438 439 combined_features = {'name': sensor_name, 'features': {}} 440 441 # Add LBP features 442 if extract_lbp and i < len(lbp_features): 443 combined_features['features'].update(lbp_features[i]['features']) 444 445 # Add Fourier features 446 if extract_fourier and i < len(fourier_features): 447 combined_features['features'].update(fourier_features[i]['features']) 448 449 # Add statistical features 450 if extract_statistical and i < len(statistical_features): 451 combined_features['features'].update(statistical_features[i]['features']) 452 453 features.append(combined_features) 454 455 return features 456 457 def _extract_statistical_features(self, windows: List[Dict]) -> List[Dict]: 458 """Extract basic statistical features.""" 459 features = [] 460 461 for window_dict in windows: 462 sensor_name = window_dict['name'] 463 window_data = window_dict['data'] 464 465 # Skip annotation windows 466 if sensor_name == 'annotations': 467 continue 468 469 sensor_features = {'name': sensor_name, 'features': {}} 470 471 # Extract statistical features for each window 472 means = [] 473 stds = [] 474 maxs = [] 475 mins = [] 476 ranges = [] 477 478 for window in window_data: 479 # Ensure window is numpy array 480 if hasattr(window, 'values'): 481 window = window.values 482 483 means.append(np.mean(window)) 484 stds.append(np.std(window)) 485 maxs.append(np.max(window)) 486 mins.append(np.min(window)) 487 ranges.append(np.max(window) - np.min(window)) 488 489 # Store features 490 sensor_features['features'] = { 491 'vgrf_mean': means, 492 'vgrf_std': stds, 493 'vgrf_max': maxs, 494 'vgrf_min': mins, 495 'vgrf_range': ranges 496 } 497 498 features.append(sensor_features) 499 500 return features 501 502 def get_feature_names(self) -> List[str]: 503 """Get names of all features.""" 504 feature_names = [] 505 feature_names.extend(self.lbp_extractor.get_feature_names()) 506 feature_names.extend(self.fourier_extractor.get_feature_names()) 507 feature_names.extend(['vgrf_mean', 'vgrf_std', 'vgrf_max', 'vgrf_min', 'vgrf_range']) 508 return feature_names
Combined feature extractor for PhysioNet VGRF data.
This extractor combines LBP and Fourier series features along with basic statistical features specific to VGRF data.
378 def __init__(self, verbose: bool = True): 379 super().__init__( 380 name="physionet_features", 381 description="Combined feature extractor for PhysioNet VGRF data including LBP and Fourier features" 382 ) 383 self.verbose = verbose 384 self.lbp_extractor = LBPFeatureExtractor(verbose=False) 385 self.fourier_extractor = FourierSeriesFeatureExtractor(verbose=False) 386 387 if self.verbose: 388 print("🚀 PhysioNet Feature Extractor initialized!")
Initialize the feature extractor.
Args: name: Name of the feature extractor description: Description of the feature extractor
390 def extract_features(self, windows: List[Dict], fs: int, **kwargs) -> List[Dict]: 391 """ 392 Extract combined features from sliding windows. 393 394 Args: 395 windows: List of sliding window dictionaries 396 fs: Sampling frequency 397 **kwargs: Additional arguments 398 399 Returns: 400 List of feature dictionaries 401 """ 402 # Extract features from each extractor 403 extract_lbp = kwargs.get('extract_lbp', True) 404 extract_fourier = kwargs.get('extract_fourier', True) 405 extract_statistical = kwargs.get('extract_statistical', True) 406 407 if self.verbose: 408 print(f"\n🔍 PhysioNet Feature Extraction") 409 print(f"📊 LBP: {extract_lbp}, Fourier: {extract_fourier}, Statistical: {extract_statistical}") 410 411 features = [] 412 413 # Extract LBP features 414 if extract_lbp: 415 lbp_features = self.lbp_extractor.extract_features(windows, fs, **kwargs) 416 else: 417 lbp_features = [] 418 419 # Extract Fourier features 420 if extract_fourier: 421 fourier_features = self.fourier_extractor.extract_features(windows, fs, **kwargs) 422 else: 423 fourier_features = [] 424 425 # Extract statistical features 426 if extract_statistical: 427 statistical_features = self._extract_statistical_features(windows) 428 else: 429 statistical_features = [] 430 431 # Combine features 432 for i, window_dict in enumerate(windows): 433 sensor_name = window_dict['name'] 434 435 # Skip annotation windows 436 if sensor_name == 'annotations': 437 continue 438 439 combined_features = {'name': sensor_name, 'features': {}} 440 441 # Add LBP features 442 if extract_lbp and i < len(lbp_features): 443 combined_features['features'].update(lbp_features[i]['features']) 444 445 # Add Fourier features 446 if extract_fourier and i < len(fourier_features): 447 combined_features['features'].update(fourier_features[i]['features']) 448 449 # Add statistical features 450 if extract_statistical and i < len(statistical_features): 451 combined_features['features'].update(statistical_features[i]['features']) 452 453 features.append(combined_features) 454 455 return features
Extract combined features from sliding windows.
Args: windows: List of sliding window dictionaries fs: Sampling frequency **kwargs: Additional arguments
Returns: List of feature dictionaries
502 def get_feature_names(self) -> List[str]: 503 """Get names of all features.""" 504 feature_names = [] 505 feature_names.extend(self.lbp_extractor.get_feature_names()) 506 feature_names.extend(self.fourier_extractor.get_feature_names()) 507 feature_names.extend(['vgrf_mean', 'vgrf_std', 'vgrf_max', 'vgrf_min', 'vgrf_range']) 508 return feature_names
Get names of all features.
Inherited Members
18class ClippingPreprocessor(BasePreprocessor): 19 """ 20 Preprocessor for clipping values to a specified range. 21 """ 22 23 def __init__(self, min_val: float = -1, max_val: float = 1): 24 super().__init__( 25 name="clipping", 26 description="Clips values in the data to be within a specified range" 27 ) 28 self.config = { 29 'min_val': min_val, 30 'max_val': max_val 31 } 32 33 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 34 """ 35 Fit the preprocessor (no fitting needed for clipping). 36 37 Args: 38 data: Input data to fit on 39 **kwargs: Additional arguments 40 """ 41 # Update config with any passed arguments 42 self.config.update({k: v for k, v in kwargs.items() if k in ['min_val', 'max_val']}) 43 self.fitted = True 44 45 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 46 """ 47 Clip values in the data to be within the specified range. 48 49 Args: 50 data: Input data to transform 51 **kwargs: Additional arguments 52 53 Returns: 54 Clipped data 55 """ 56 min_val = kwargs.get('min_val', self.config['min_val']) 57 max_val = kwargs.get('max_val', self.config['max_val']) 58 59 return np.clip(data, min_val, max_val)
Preprocessor for clipping values to a specified range.
23 def __init__(self, min_val: float = -1, max_val: float = 1): 24 super().__init__( 25 name="clipping", 26 description="Clips values in the data to be within a specified range" 27 ) 28 self.config = { 29 'min_val': min_val, 30 'max_val': max_val 31 }
Initialize the preprocessor.
Args: name: Name of the preprocessor description: Description of the preprocessor
33 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 34 """ 35 Fit the preprocessor (no fitting needed for clipping). 36 37 Args: 38 data: Input data to fit on 39 **kwargs: Additional arguments 40 """ 41 # Update config with any passed arguments 42 self.config.update({k: v for k, v in kwargs.items() if k in ['min_val', 'max_val']}) 43 self.fitted = True
Fit the preprocessor (no fitting needed for clipping).
Args: data: Input data to fit on **kwargs: Additional arguments
45 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 46 """ 47 Clip values in the data to be within the specified range. 48 49 Args: 50 data: Input data to transform 51 **kwargs: Additional arguments 52 53 Returns: 54 Clipped data 55 """ 56 min_val = kwargs.get('min_val', self.config['min_val']) 57 max_val = kwargs.get('max_val', self.config['max_val']) 58 59 return np.clip(data, min_val, max_val)
Clip values in the data to be within the specified range.
Args: data: Input data to transform **kwargs: Additional arguments
Returns: Clipped data
Inherited Members
62class NoiseRemovalPreprocessor(BasePreprocessor): 63 """ 64 Preprocessor for removing noise using moving average filter. 65 """ 66 67 def __init__(self, window_size: int = 5): 68 super().__init__( 69 name="noise_removal", 70 description="Applies a moving average filter to reduce noise" 71 ) 72 self.config = { 73 'window_size': window_size 74 } 75 76 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 77 """ 78 Fit the preprocessor (no fitting needed for noise removal). 79 80 Args: 81 data: Input data to fit on 82 **kwargs: Additional arguments 83 """ 84 self.config.update({k: v for k, v in kwargs.items() if k in ['window_size']}) 85 self.fitted = True 86 87 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 88 """ 89 Apply a moving average filter to reduce noise. 90 91 Args: 92 data: Input data to transform 93 **kwargs: Additional arguments 94 95 Returns: 96 Noise-reduced data 97 """ 98 window_size = kwargs.get('window_size', self.config['window_size']) 99 100 if isinstance(data, pd.DataFrame): 101 return data.rolling(window=window_size, center=True).mean().bfill().ffill() 102 elif isinstance(data, pd.Series): 103 return data.rolling(window=window_size, center=True).mean().bfill().ffill() 104 else: 105 # For numpy arrays, use uniform filter 106 from scipy.ndimage import uniform_filter1d 107 return uniform_filter1d(data, size=window_size, mode='nearest')
Preprocessor for removing noise using moving average filter.
67 def __init__(self, window_size: int = 5): 68 super().__init__( 69 name="noise_removal", 70 description="Applies a moving average filter to reduce noise" 71 ) 72 self.config = { 73 'window_size': window_size 74 }
Initialize the preprocessor.
Args: name: Name of the preprocessor description: Description of the preprocessor
76 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 77 """ 78 Fit the preprocessor (no fitting needed for noise removal). 79 80 Args: 81 data: Input data to fit on 82 **kwargs: Additional arguments 83 """ 84 self.config.update({k: v for k, v in kwargs.items() if k in ['window_size']}) 85 self.fitted = True
Fit the preprocessor (no fitting needed for noise removal).
Args: data: Input data to fit on **kwargs: Additional arguments
87 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 88 """ 89 Apply a moving average filter to reduce noise. 90 91 Args: 92 data: Input data to transform 93 **kwargs: Additional arguments 94 95 Returns: 96 Noise-reduced data 97 """ 98 window_size = kwargs.get('window_size', self.config['window_size']) 99 100 if isinstance(data, pd.DataFrame): 101 return data.rolling(window=window_size, center=True).mean().bfill().ffill() 102 elif isinstance(data, pd.Series): 103 return data.rolling(window=window_size, center=True).mean().bfill().ffill() 104 else: 105 # For numpy arrays, use uniform filter 106 from scipy.ndimage import uniform_filter1d 107 return uniform_filter1d(data, size=window_size, mode='nearest')
Apply a moving average filter to reduce noise.
Args: data: Input data to transform **kwargs: Additional arguments
Returns: Noise-reduced data
Inherited Members
110class OutlierRemovalPreprocessor(BasePreprocessor): 111 """ 112 Preprocessor for removing outliers using Z-score method. 113 """ 114 115 def __init__(self, threshold: float = 3): 116 super().__init__( 117 name="outlier_removal", 118 description="Removes outliers beyond a given threshold using the Z-score method" 119 ) 120 self.config = { 121 'threshold': threshold 122 } 123 self.mean_ = None 124 self.std_ = None 125 126 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 127 """ 128 Fit the preprocessor by computing mean and standard deviation. 129 130 Args: 131 data: Input data to fit on 132 **kwargs: Additional arguments 133 """ 134 self.config.update({k: v for k, v in kwargs.items() if k in ['threshold']}) 135 136 if isinstance(data, (pd.DataFrame, pd.Series)): 137 self.mean_ = data.mean() 138 self.std_ = data.std() 139 else: 140 self.mean_ = np.mean(data) 141 self.std_ = np.std(data) 142 143 self.fitted = True 144 145 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 146 """ 147 Remove outliers beyond the threshold using Z-score method. 148 149 Args: 150 data: Input data to transform 151 **kwargs: Additional arguments 152 153 Returns: 154 Data with outliers removed 155 """ 156 threshold = kwargs.get('threshold', self.config['threshold']) 157 158 if isinstance(data, (pd.DataFrame, pd.Series)): 159 z_scores = (data - self.mean_).abs() / self.std_ 160 return data[z_scores <= threshold] 161 else: 162 z_scores = np.abs(data - self.mean_) / self.std_ 163 return data[z_scores <= threshold]
Preprocessor for removing outliers using Z-score method.
115 def __init__(self, threshold: float = 3): 116 super().__init__( 117 name="outlier_removal", 118 description="Removes outliers beyond a given threshold using the Z-score method" 119 ) 120 self.config = { 121 'threshold': threshold 122 } 123 self.mean_ = None 124 self.std_ = None
Initialize the preprocessor.
Args: name: Name of the preprocessor description: Description of the preprocessor
126 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 127 """ 128 Fit the preprocessor by computing mean and standard deviation. 129 130 Args: 131 data: Input data to fit on 132 **kwargs: Additional arguments 133 """ 134 self.config.update({k: v for k, v in kwargs.items() if k in ['threshold']}) 135 136 if isinstance(data, (pd.DataFrame, pd.Series)): 137 self.mean_ = data.mean() 138 self.std_ = data.std() 139 else: 140 self.mean_ = np.mean(data) 141 self.std_ = np.std(data) 142 143 self.fitted = True
Fit the preprocessor by computing mean and standard deviation.
Args: data: Input data to fit on **kwargs: Additional arguments
145 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 146 """ 147 Remove outliers beyond the threshold using Z-score method. 148 149 Args: 150 data: Input data to transform 151 **kwargs: Additional arguments 152 153 Returns: 154 Data with outliers removed 155 """ 156 threshold = kwargs.get('threshold', self.config['threshold']) 157 158 if isinstance(data, (pd.DataFrame, pd.Series)): 159 z_scores = (data - self.mean_).abs() / self.std_ 160 return data[z_scores <= threshold] 161 else: 162 z_scores = np.abs(data - self.mean_) / self.std_ 163 return data[z_scores <= threshold]
Remove outliers beyond the threshold using Z-score method.
Args: data: Input data to transform **kwargs: Additional arguments
Returns: Data with outliers removed
Inherited Members
166class BaselineRemovalPreprocessor(BasePreprocessor): 167 """ 168 Preprocessor for removing baseline by subtracting the mean. 169 """ 170 171 def __init__(self): 172 super().__init__( 173 name="baseline_removal", 174 description="Removes baseline by subtracting the mean" 175 ) 176 self.mean_ = None 177 178 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 179 """ 180 Fit the preprocessor by computing the mean. 181 182 Args: 183 data: Input data to fit on 184 **kwargs: Additional arguments 185 """ 186 if isinstance(data, (pd.DataFrame, pd.Series)): 187 self.mean_ = data.mean() 188 else: 189 self.mean_ = np.mean(data) 190 191 self.fitted = True 192 193 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 194 """ 195 Remove baseline by subtracting the mean. 196 197 Args: 198 data: Input data to transform 199 **kwargs: Additional arguments 200 201 Returns: 202 Baseline-corrected data 203 """ 204 return data - self.mean_
Preprocessor for removing baseline by subtracting the mean.
171 def __init__(self): 172 super().__init__( 173 name="baseline_removal", 174 description="Removes baseline by subtracting the mean" 175 ) 176 self.mean_ = None
Initialize the preprocessor.
Args: name: Name of the preprocessor description: Description of the preprocessor
178 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 179 """ 180 Fit the preprocessor by computing the mean. 181 182 Args: 183 data: Input data to fit on 184 **kwargs: Additional arguments 185 """ 186 if isinstance(data, (pd.DataFrame, pd.Series)): 187 self.mean_ = data.mean() 188 else: 189 self.mean_ = np.mean(data) 190 191 self.fitted = True
Fit the preprocessor by computing the mean.
Args: data: Input data to fit on **kwargs: Additional arguments
193 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 194 """ 195 Remove baseline by subtracting the mean. 196 197 Args: 198 data: Input data to transform 199 **kwargs: Additional arguments 200 201 Returns: 202 Baseline-corrected data 203 """ 204 return data - self.mean_
Remove baseline by subtracting the mean.
Args: data: Input data to transform **kwargs: Additional arguments
Returns: Baseline-corrected data
Inherited Members
207class DriftRemovalPreprocessor(BasePreprocessor): 208 """ 209 Preprocessor for removing low-frequency drift using high-pass filter. 210 """ 211 212 def __init__(self, cutoff: float = 0.01, fs: int = 100): 213 super().__init__( 214 name="drift_removal", 215 description="Removes low-frequency drift using a high-pass filter" 216 ) 217 self.config = { 218 'cutoff': cutoff, 219 'fs': fs 220 } 221 222 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 223 """ 224 Fit the preprocessor (no fitting needed for drift removal). 225 226 Args: 227 data: Input data to fit on 228 **kwargs: Additional arguments 229 """ 230 self.config.update({k: v for k, v in kwargs.items() if k in ['cutoff', 'fs']}) 231 self.fitted = True 232 233 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 234 """ 235 Remove low-frequency drift using a high-pass filter. 236 237 Args: 238 data: Input data to transform 239 **kwargs: Additional arguments 240 241 Returns: 242 Drift-corrected data 243 """ 244 cutoff = kwargs.get('cutoff', self.config['cutoff']) 245 fs = kwargs.get('fs', self.config['fs']) 246 247 b, a = butter(1, cutoff / (fs / 2), btype='highpass') 248 249 if isinstance(data, (pd.DataFrame, pd.Series)): 250 return pd.Series(filtfilt(b, a, data), index=data.index) 251 else: 252 return filtfilt(b, a, data)
Preprocessor for removing low-frequency drift using high-pass filter.
212 def __init__(self, cutoff: float = 0.01, fs: int = 100): 213 super().__init__( 214 name="drift_removal", 215 description="Removes low-frequency drift using a high-pass filter" 216 ) 217 self.config = { 218 'cutoff': cutoff, 219 'fs': fs 220 }
Initialize the preprocessor.
Args: name: Name of the preprocessor description: Description of the preprocessor
222 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 223 """ 224 Fit the preprocessor (no fitting needed for drift removal). 225 226 Args: 227 data: Input data to fit on 228 **kwargs: Additional arguments 229 """ 230 self.config.update({k: v for k, v in kwargs.items() if k in ['cutoff', 'fs']}) 231 self.fitted = True
Fit the preprocessor (no fitting needed for drift removal).
Args: data: Input data to fit on **kwargs: Additional arguments
233 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 234 """ 235 Remove low-frequency drift using a high-pass filter. 236 237 Args: 238 data: Input data to transform 239 **kwargs: Additional arguments 240 241 Returns: 242 Drift-corrected data 243 """ 244 cutoff = kwargs.get('cutoff', self.config['cutoff']) 245 fs = kwargs.get('fs', self.config['fs']) 246 247 b, a = butter(1, cutoff / (fs / 2), btype='highpass') 248 249 if isinstance(data, (pd.DataFrame, pd.Series)): 250 return pd.Series(filtfilt(b, a, data), index=data.index) 251 else: 252 return filtfilt(b, a, data)
Remove low-frequency drift using a high-pass filter.
Args: data: Input data to transform **kwargs: Additional arguments
Returns: Drift-corrected data
Inherited Members
255class HighFrequencyNoiseRemovalPreprocessor(BasePreprocessor): 256 """ 257 Preprocessor for removing high-frequency noise using low-pass filter. 258 """ 259 260 def __init__(self, cutoff: float = 10, fs: int = 100): 261 super().__init__( 262 name="high_frequency_noise_removal", 263 description="Applies a low-pass filter to remove high-frequency noise" 264 ) 265 self.config = { 266 'cutoff': cutoff, 267 'fs': fs 268 } 269 270 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 271 """ 272 Fit the preprocessor (no fitting needed for filtering). 273 274 Args: 275 data: Input data to fit on 276 **kwargs: Additional arguments 277 """ 278 self.config.update({k: v for k, v in kwargs.items() if k in ['cutoff', 'fs']}) 279 self.fitted = True 280 281 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 282 """ 283 Apply a low-pass filter to remove high-frequency noise. 284 285 Args: 286 data: Input data to transform 287 **kwargs: Additional arguments 288 289 Returns: 290 Filtered data 291 """ 292 cutoff = kwargs.get('cutoff', self.config['cutoff']) 293 fs = kwargs.get('fs', self.config['fs']) 294 295 b, a = butter(1, cutoff / (fs / 2), btype='lowpass') 296 297 if isinstance(data, (pd.DataFrame, pd.Series)): 298 return pd.Series(filtfilt(b, a, data), index=data.index) 299 else: 300 return filtfilt(b, a, data)
Preprocessor for removing high-frequency noise using low-pass filter.
260 def __init__(self, cutoff: float = 10, fs: int = 100): 261 super().__init__( 262 name="high_frequency_noise_removal", 263 description="Applies a low-pass filter to remove high-frequency noise" 264 ) 265 self.config = { 266 'cutoff': cutoff, 267 'fs': fs 268 }
Initialize the preprocessor.
Args: name: Name of the preprocessor description: Description of the preprocessor
270 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 271 """ 272 Fit the preprocessor (no fitting needed for filtering). 273 274 Args: 275 data: Input data to fit on 276 **kwargs: Additional arguments 277 """ 278 self.config.update({k: v for k, v in kwargs.items() if k in ['cutoff', 'fs']}) 279 self.fitted = True
Fit the preprocessor (no fitting needed for filtering).
Args: data: Input data to fit on **kwargs: Additional arguments
281 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 282 """ 283 Apply a low-pass filter to remove high-frequency noise. 284 285 Args: 286 data: Input data to transform 287 **kwargs: Additional arguments 288 289 Returns: 290 Filtered data 291 """ 292 cutoff = kwargs.get('cutoff', self.config['cutoff']) 293 fs = kwargs.get('fs', self.config['fs']) 294 295 b, a = butter(1, cutoff / (fs / 2), btype='lowpass') 296 297 if isinstance(data, (pd.DataFrame, pd.Series)): 298 return pd.Series(filtfilt(b, a, data), index=data.index) 299 else: 300 return filtfilt(b, a, data)
Apply a low-pass filter to remove high-frequency noise.
Args: data: Input data to transform **kwargs: Additional arguments
Returns: Filtered data
Inherited Members
303class LowFrequencyNoiseRemovalPreprocessor(BasePreprocessor): 304 """ 305 Preprocessor for removing low-frequency noise using high-pass filter. 306 """ 307 308 def __init__(self, cutoff: float = 0.5, fs: int = 100): 309 super().__init__( 310 name="low_frequency_noise_removal", 311 description="Applies a high-pass filter to remove low-frequency noise" 312 ) 313 self.config = { 314 'cutoff': cutoff, 315 'fs': fs 316 } 317 318 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 319 """ 320 Fit the preprocessor (no fitting needed for filtering). 321 322 Args: 323 data: Input data to fit on 324 **kwargs: Additional arguments 325 """ 326 self.config.update({k: v for k, v in kwargs.items() if k in ['cutoff', 'fs']}) 327 self.fitted = True 328 329 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 330 """ 331 Apply a high-pass filter to remove low-frequency noise. 332 333 Args: 334 data: Input data to transform 335 **kwargs: Additional arguments 336 337 Returns: 338 Filtered data 339 """ 340 cutoff = kwargs.get('cutoff', self.config['cutoff']) 341 fs = kwargs.get('fs', self.config['fs']) 342 343 b, a = butter(1, cutoff / (fs / 2), btype='highpass') 344 345 if isinstance(data, (pd.DataFrame, pd.Series)): 346 return pd.Series(filtfilt(b, a, data), index=data.index) 347 else: 348 return filtfilt(b, a, data)
Preprocessor for removing low-frequency noise using high-pass filter.
308 def __init__(self, cutoff: float = 0.5, fs: int = 100): 309 super().__init__( 310 name="low_frequency_noise_removal", 311 description="Applies a high-pass filter to remove low-frequency noise" 312 ) 313 self.config = { 314 'cutoff': cutoff, 315 'fs': fs 316 }
Initialize the preprocessor.
Args: name: Name of the preprocessor description: Description of the preprocessor
318 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 319 """ 320 Fit the preprocessor (no fitting needed for filtering). 321 322 Args: 323 data: Input data to fit on 324 **kwargs: Additional arguments 325 """ 326 self.config.update({k: v for k, v in kwargs.items() if k in ['cutoff', 'fs']}) 327 self.fitted = True
Fit the preprocessor (no fitting needed for filtering).
Args: data: Input data to fit on **kwargs: Additional arguments
329 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 330 """ 331 Apply a high-pass filter to remove low-frequency noise. 332 333 Args: 334 data: Input data to transform 335 **kwargs: Additional arguments 336 337 Returns: 338 Filtered data 339 """ 340 cutoff = kwargs.get('cutoff', self.config['cutoff']) 341 fs = kwargs.get('fs', self.config['fs']) 342 343 b, a = butter(1, cutoff / (fs / 2), btype='highpass') 344 345 if isinstance(data, (pd.DataFrame, pd.Series)): 346 return pd.Series(filtfilt(b, a, data), index=data.index) 347 else: 348 return filtfilt(b, a, data)
Apply a high-pass filter to remove low-frequency noise.
Args: data: Input data to transform **kwargs: Additional arguments
Returns: Filtered data
Inherited Members
351class ArtifactRemovalPreprocessor(BasePreprocessor): 352 """ 353 Preprocessor for removing artifacts by interpolating missing values. 354 """ 355 356 def __init__(self, method: str = "linear"): 357 super().__init__( 358 name="artifact_removal", 359 description="Removes artifacts by interpolating missing values" 360 ) 361 self.config = { 362 'method': method 363 } 364 365 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 366 """ 367 Fit the preprocessor (no fitting needed for interpolation). 368 369 Args: 370 data: Input data to fit on 371 **kwargs: Additional arguments 372 """ 373 self.config.update({k: v for k, v in kwargs.items() if k in ['method']}) 374 self.fitted = True 375 376 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 377 """ 378 Remove artifacts by interpolating missing values. 379 380 Args: 381 data: Input data to transform 382 **kwargs: Additional arguments 383 384 Returns: 385 Artifact-free data 386 """ 387 method = kwargs.get('method', self.config['method']) 388 389 if isinstance(data, (pd.DataFrame, pd.Series)): 390 return data.interpolate(method=method).bfill().ffill() 391 else: 392 # For numpy arrays, use linear interpolation 393 from scipy.interpolate import interp1d 394 x = np.arange(len(data)) 395 valid_mask = ~np.isnan(data) 396 if np.any(valid_mask): 397 f = interp1d(x[valid_mask], data[valid_mask], kind='linear', fill_value='extrapolate') 398 return f(x) 399 else: 400 return data
Preprocessor for removing artifacts by interpolating missing values.
356 def __init__(self, method: str = "linear"): 357 super().__init__( 358 name="artifact_removal", 359 description="Removes artifacts by interpolating missing values" 360 ) 361 self.config = { 362 'method': method 363 }
Initialize the preprocessor.
Args: name: Name of the preprocessor description: Description of the preprocessor
365 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 366 """ 367 Fit the preprocessor (no fitting needed for interpolation). 368 369 Args: 370 data: Input data to fit on 371 **kwargs: Additional arguments 372 """ 373 self.config.update({k: v for k, v in kwargs.items() if k in ['method']}) 374 self.fitted = True
Fit the preprocessor (no fitting needed for interpolation).
Args: data: Input data to fit on **kwargs: Additional arguments
376 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 377 """ 378 Remove artifacts by interpolating missing values. 379 380 Args: 381 data: Input data to transform 382 **kwargs: Additional arguments 383 384 Returns: 385 Artifact-free data 386 """ 387 method = kwargs.get('method', self.config['method']) 388 389 if isinstance(data, (pd.DataFrame, pd.Series)): 390 return data.interpolate(method=method).bfill().ffill() 391 else: 392 # For numpy arrays, use linear interpolation 393 from scipy.interpolate import interp1d 394 x = np.arange(len(data)) 395 valid_mask = ~np.isnan(data) 396 if np.any(valid_mask): 397 f = interp1d(x[valid_mask], data[valid_mask], kind='linear', fill_value='extrapolate') 398 return f(x) 399 else: 400 return data
Remove artifacts by interpolating missing values.
Args: data: Input data to transform **kwargs: Additional arguments
Returns: Artifact-free data
Inherited Members
403class TrendRemovalPreprocessor(BasePreprocessor): 404 """ 405 Preprocessor for removing trends using polynomial fitting. 406 """ 407 408 def __init__(self, order: int = 2): 409 super().__init__( 410 name="trend_removal", 411 description="Removes trends using polynomial fitting" 412 ) 413 self.config = { 414 'order': order 415 } 416 417 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 418 """ 419 Fit the preprocessor (no fitting needed for detrending). 420 421 Args: 422 data: Input data to fit on 423 **kwargs: Additional arguments 424 """ 425 self.config.update({k: v for k, v in kwargs.items() if k in ['order']}) 426 self.fitted = True 427 428 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 429 """ 430 Remove trends using polynomial fitting. 431 432 Args: 433 data: Input data to transform 434 **kwargs: Additional arguments 435 436 Returns: 437 Detrended data 438 """ 439 order = kwargs.get('order', self.config['order']) 440 441 if isinstance(data, (pd.DataFrame, pd.Series)): 442 x = np.arange(len(data)) 443 poly_coeffs = np.polyfit(x, data, order) 444 trend = np.polyval(poly_coeffs, x) 445 return data - trend 446 else: 447 x = np.arange(len(data)) 448 poly_coeffs = np.polyfit(x, data, order) 449 trend = np.polyval(poly_coeffs, x) 450 return data - trend
Preprocessor for removing trends using polynomial fitting.
408 def __init__(self, order: int = 2): 409 super().__init__( 410 name="trend_removal", 411 description="Removes trends using polynomial fitting" 412 ) 413 self.config = { 414 'order': order 415 }
Initialize the preprocessor.
Args: name: Name of the preprocessor description: Description of the preprocessor
417 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 418 """ 419 Fit the preprocessor (no fitting needed for detrending). 420 421 Args: 422 data: Input data to fit on 423 **kwargs: Additional arguments 424 """ 425 self.config.update({k: v for k, v in kwargs.items() if k in ['order']}) 426 self.fitted = True
Fit the preprocessor (no fitting needed for detrending).
Args: data: Input data to fit on **kwargs: Additional arguments
428 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 429 """ 430 Remove trends using polynomial fitting. 431 432 Args: 433 data: Input data to transform 434 **kwargs: Additional arguments 435 436 Returns: 437 Detrended data 438 """ 439 order = kwargs.get('order', self.config['order']) 440 441 if isinstance(data, (pd.DataFrame, pd.Series)): 442 x = np.arange(len(data)) 443 poly_coeffs = np.polyfit(x, data, order) 444 trend = np.polyval(poly_coeffs, x) 445 return data - trend 446 else: 447 x = np.arange(len(data)) 448 poly_coeffs = np.polyfit(x, data, order) 449 trend = np.polyval(poly_coeffs, x) 450 return data - trend
Remove trends using polynomial fitting.
Args: data: Input data to transform **kwargs: Additional arguments
Returns: Detrended data
Inherited Members
453class DCOffsetRemovalPreprocessor(BasePreprocessor): 454 """ 455 Preprocessor for removing DC offset by subtracting the mean. 456 """ 457 458 def __init__(self): 459 super().__init__( 460 name="dc_offset_removal", 461 description="Removes DC offset by subtracting the mean" 462 ) 463 self.mean_ = None 464 465 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 466 """ 467 Fit the preprocessor by computing the mean. 468 469 Args: 470 data: Input data to fit on 471 **kwargs: Additional arguments 472 """ 473 if isinstance(data, (pd.DataFrame, pd.Series)): 474 self.mean_ = data.mean() 475 else: 476 self.mean_ = np.mean(data) 477 478 self.fitted = True 479 480 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 481 """ 482 Remove DC offset by subtracting the mean. 483 484 Args: 485 data: Input data to transform 486 **kwargs: Additional arguments 487 488 Returns: 489 DC-corrected data 490 """ 491 return data - self.mean_
Preprocessor for removing DC offset by subtracting the mean.
458 def __init__(self): 459 super().__init__( 460 name="dc_offset_removal", 461 description="Removes DC offset by subtracting the mean" 462 ) 463 self.mean_ = None
Initialize the preprocessor.
Args: name: Name of the preprocessor description: Description of the preprocessor
465 def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs): 466 """ 467 Fit the preprocessor by computing the mean. 468 469 Args: 470 data: Input data to fit on 471 **kwargs: Additional arguments 472 """ 473 if isinstance(data, (pd.DataFrame, pd.Series)): 474 self.mean_ = data.mean() 475 else: 476 self.mean_ = np.mean(data) 477 478 self.fitted = True
Fit the preprocessor by computing the mean.
Args: data: Input data to fit on **kwargs: Additional arguments
480 def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]: 481 """ 482 Remove DC offset by subtracting the mean. 483 484 Args: 485 data: Input data to transform 486 **kwargs: Additional arguments 487 488 Returns: 489 DC-corrected data 490 """ 491 return data - self.mean_
Remove DC offset by subtracting the mean.
Args: data: Input data to transform **kwargs: Additional arguments
Returns: DC-corrected data
Inherited Members
18class DaphnetVisualizationAnalyzer(BaseEDAAnalyzer): 19 """ 20 EDA analyzer for Daphnet dataset visualization. 21 22 This analyzer provides comprehensive visualization capabilities for Daphnet dataset 23 including thigh, shank, and trunk sensor data. 24 """ 25 26 def __init__(self): 27 super().__init__( 28 name="daphnet_visualization", 29 description="Comprehensive visualization analyzer for Daphnet dataset sensor data" 30 ) 31 self.config = { 32 'figsize': (20, 16), 33 'colors': { 34 'no_freeze': 'orange', 35 'freeze': 'purple' 36 }, 37 'alpha': 0.6 38 } 39 40 def analyze(self, data: Union[pd.DataFrame, List[pd.DataFrame]], **kwargs) -> Dict[str, Any]: 41 """ 42 Analyze the data and return statistical summaries. 43 44 Args: 45 data: Input data to analyze 46 **kwargs: Additional arguments 47 48 Returns: 49 Dictionary containing analysis results 50 """ 51 if isinstance(data, list): 52 # Multiple datasets 53 results = {} 54 for i, df in enumerate(data): 55 results[f'dataset_{i}'] = self._analyze_single_dataset(df) 56 return results 57 else: 58 # Single dataset 59 return self._analyze_single_dataset(data) 60 61 def _analyze_single_dataset(self, df: pd.DataFrame) -> Dict[str, Any]: 62 """Analyze a single dataset.""" 63 # Basic statistics 64 stats = { 65 'shape': df.shape, 66 'columns': df.columns.tolist(), 67 'annotation_distribution': df['annotations'].value_counts().to_dict() if 'annotations' in df.columns else {}, 68 'missing_values': df.isnull().sum().to_dict(), 69 'data_range': { 70 'min': df.select_dtypes(include=[np.number]).min().to_dict(), 71 'max': df.select_dtypes(include=[np.number]).max().to_dict() 72 } 73 } 74 75 # Sensor-specific statistics 76 sensor_stats = {} 77 for sensor in ['thigh', 'shank', 'trunk']: 78 if sensor in df.columns: 79 sensor_stats[sensor] = { 80 'mean': df[sensor].mean(), 81 'std': df[sensor].std(), 82 'min': df[sensor].min(), 83 'max': df[sensor].max() 84 } 85 86 stats['sensor_statistics'] = sensor_stats 87 return stats 88 89 def visualize(self, data: Union[pd.DataFrame, List[pd.DataFrame]], **kwargs): 90 """ 91 Create visualizations of the data. 92 93 Args: 94 data: Input data to visualize 95 **kwargs: Additional arguments including sensor_type, dataset_index, names 96 """ 97 sensor_type = kwargs.get('sensor_type', 'all') 98 dataset_index = kwargs.get('dataset_index', 0) 99 names = kwargs.get('names', []) 100 101 if isinstance(data, list): 102 if dataset_index < len(data): 103 df = data[dataset_index] 104 dataset_name = names[dataset_index] if dataset_index < len(names) else f"Dataset {dataset_index}" 105 else: 106 print(f"Dataset index {dataset_index} out of range") 107 return 108 else: 109 df = data 110 dataset_name = names[0] if names else "Dataset" 111 112 if sensor_type == 'all': 113 self._plot_all_sensors(df, dataset_name) 114 elif sensor_type == 'thigh': 115 self._plot_thigh_data(df, dataset_name) 116 elif sensor_type == 'shank': 117 self._plot_shank_data(df, dataset_name) 118 elif sensor_type == 'trunk': 119 self._plot_trunk_data(df, dataset_name) 120 else: 121 print(f"Unknown sensor type: {sensor_type}") 122 123 def _plot_thigh_data(self, df: pd.DataFrame, dataset_name: str): 124 """Plot thigh sensor data.""" 125 print(f"Plotting thigh data for {dataset_name}") 126 127 # Filter data 128 df_filtered = df[df.annotations > 0] if 'annotations' in df.columns else df 129 130 if df_filtered.empty: 131 print("No valid data to plot") 132 return 133 134 # Create figure 135 fig, axes = plt.subplots(4, 1, sharex=True, figsize=self.config['figsize']) 136 fig.suptitle(f"Thigh Data from {dataset_name}") 137 138 # Separate freeze and no-freeze data 139 if 'annotations' in df.columns: 140 neg = df_filtered[df_filtered.annotations == 1] # No freeze 141 pos = df_filtered[df_filtered.annotations == 2] # Freeze 142 else: 143 neg = df_filtered 144 pos = pd.DataFrame() 145 146 # Plot each component 147 components = ['thigh_h_fd', 'thigh_v', 'thigh_h_l', 'thigh'] 148 labels = ['Horizontal Forward', 'Vertical', 'Horizontal Lateral', 'Overall'] 149 150 for i, (component, label) in enumerate(zip(components, labels)): 151 if component in df_filtered.columns: 152 # Plot main signal 153 axes[i].plot(df_filtered.index, df_filtered[component]) 154 axes[i].set_ylabel(f"{label} Thigh Acceleration") 155 156 # Plot annotations if available 157 if not neg.empty: 158 axes[i].scatter(neg.index, neg[component], 159 c=self.config['colors']['no_freeze'], 160 label="no freeze", alpha=self.config['alpha']) 161 if not pos.empty: 162 axes[i].scatter(pos.index, pos[component], 163 c=self.config['colors']['freeze'], 164 label="freeze", alpha=self.config['alpha']) 165 166 axes[i].legend() 167 168 plt.xlabel("Time") 169 plt.tight_layout() 170 plt.show() 171 172 def _plot_shank_data(self, df: pd.DataFrame, dataset_name: str): 173 """Plot shank sensor data.""" 174 print(f"Plotting shank data for {dataset_name}") 175 176 # Filter data 177 df_filtered = df[df.annotations > 0] if 'annotations' in df.columns else df 178 179 if df_filtered.empty: 180 print("No valid data to plot") 181 return 182 183 # Create figure 184 fig, axes = plt.subplots(4, 1, sharex=True, figsize=self.config['figsize']) 185 fig.suptitle(f"Shank Data from {dataset_name}") 186 187 # Separate freeze and no-freeze data 188 if 'annotations' in df.columns: 189 neg = df_filtered[df_filtered.annotations == 1] # No freeze 190 pos = df_filtered[df_filtered.annotations == 2] # Freeze 191 else: 192 neg = df_filtered 193 pos = pd.DataFrame() 194 195 # Plot each component 196 components = ['shank_h_fd', 'shank_v', 'shank_h_l', 'shank'] 197 labels = ['Horizontal Forward', 'Vertical', 'Horizontal Lateral', 'Overall'] 198 199 for i, (component, label) in enumerate(zip(components, labels)): 200 if component in df_filtered.columns: 201 # Plot main signal 202 axes[i].plot(df_filtered.index, df_filtered[component]) 203 axes[i].set_ylabel(f"{label} Shank Acceleration") 204 205 # Plot annotations if available 206 if not neg.empty: 207 axes[i].scatter(neg.index, neg[component], 208 c=self.config['colors']['no_freeze'], 209 label="no freeze", alpha=self.config['alpha']) 210 if not pos.empty: 211 axes[i].scatter(pos.index, pos[component], 212 c=self.config['colors']['freeze'], 213 label="freeze", alpha=self.config['alpha']) 214 215 axes[i].legend() 216 217 plt.xlabel("Time") 218 plt.tight_layout() 219 plt.show() 220 221 def _plot_trunk_data(self, df: pd.DataFrame, dataset_name: str): 222 """Plot trunk sensor data.""" 223 print(f"Plotting trunk data for {dataset_name}") 224 225 # Filter data 226 df_filtered = df[df.annotations > 0] if 'annotations' in df.columns else df 227 228 if df_filtered.empty: 229 print("No valid data to plot") 230 return 231 232 # Create figure 233 fig, axes = plt.subplots(4, 1, sharex=True, figsize=self.config['figsize']) 234 fig.suptitle(f"Trunk Data from {dataset_name}") 235 236 # Separate freeze and no-freeze data 237 if 'annotations' in df.columns: 238 neg = df_filtered[df_filtered.annotations == 1] # No freeze 239 pos = df_filtered[df_filtered.annotations == 2] # Freeze 240 else: 241 neg = df_filtered 242 pos = pd.DataFrame() 243 244 # Plot each component 245 components = ['trunk_h_fd', 'trunk_v', 'trunk_h_l', 'trunk'] 246 labels = ['Horizontal Forward', 'Vertical', 'Horizontal Lateral', 'Overall'] 247 248 for i, (component, label) in enumerate(zip(components, labels)): 249 if component in df_filtered.columns: 250 # Plot main signal 251 axes[i].plot(df_filtered.index, df_filtered[component]) 252 axes[i].set_ylabel(f"{label} Trunk Acceleration") 253 254 # Plot annotations if available 255 if not neg.empty: 256 axes[i].scatter(neg.index, neg[component], 257 c=self.config['colors']['no_freeze'], 258 label="no freeze", alpha=self.config['alpha']) 259 if not pos.empty: 260 axes[i].scatter(pos.index, pos[component], 261 c=self.config['colors']['freeze'], 262 label="freeze", alpha=self.config['alpha']) 263 264 axes[i].legend() 265 266 plt.xlabel("Time") 267 plt.tight_layout() 268 plt.show() 269 270 def _plot_all_sensors(self, df: pd.DataFrame, dataset_name: str): 271 """Plot all sensor data in a combined view.""" 272 print(f"Plotting all sensor data for {dataset_name}") 273 274 # Create figure with subplots for each sensor 275 fig, axes = plt.subplots(3, 1, sharex=True, figsize=self.config['figsize']) 276 fig.suptitle(f"All Sensor Data from {dataset_name}") 277 278 # Filter data 279 df_filtered = df[df.annotations > 0] if 'annotations' in df.columns else df 280 281 if df_filtered.empty: 282 print("No valid data to plot") 283 return 284 285 sensors = ['thigh', 'shank', 'trunk'] 286 for i, sensor in enumerate(sensors): 287 if sensor in df_filtered.columns: 288 axes[i].plot(df_filtered.index, df_filtered[sensor]) 289 axes[i].set_ylabel(f"{sensor.capitalize()} Acceleration") 290 291 # Add annotations if available 292 if 'annotations' in df_filtered.columns: 293 neg = df_filtered[df_filtered.annotations == 1] 294 pos = df_filtered[df_filtered.annotations == 2] 295 296 if not neg.empty: 297 axes[i].scatter(neg.index, neg[sensor], 298 c=self.config['colors']['no_freeze'], 299 label="no freeze", alpha=self.config['alpha']) 300 if not pos.empty: 301 axes[i].scatter(pos.index, pos[sensor], 302 c=self.config['colors']['freeze'], 303 label="freeze", alpha=self.config['alpha']) 304 305 axes[i].legend() 306 307 plt.xlabel("Time") 308 plt.tight_layout() 309 plt.show()
EDA analyzer for Daphnet dataset visualization.
This analyzer provides comprehensive visualization capabilities for Daphnet dataset including thigh, shank, and trunk sensor data.
26 def __init__(self): 27 super().__init__( 28 name="daphnet_visualization", 29 description="Comprehensive visualization analyzer for Daphnet dataset sensor data" 30 ) 31 self.config = { 32 'figsize': (20, 16), 33 'colors': { 34 'no_freeze': 'orange', 35 'freeze': 'purple' 36 }, 37 'alpha': 0.6 38 }
Initialize the EDA analyzer.
Args: name: Name of the EDA analyzer description: Description of the EDA analyzer
40 def analyze(self, data: Union[pd.DataFrame, List[pd.DataFrame]], **kwargs) -> Dict[str, Any]: 41 """ 42 Analyze the data and return statistical summaries. 43 44 Args: 45 data: Input data to analyze 46 **kwargs: Additional arguments 47 48 Returns: 49 Dictionary containing analysis results 50 """ 51 if isinstance(data, list): 52 # Multiple datasets 53 results = {} 54 for i, df in enumerate(data): 55 results[f'dataset_{i}'] = self._analyze_single_dataset(df) 56 return results 57 else: 58 # Single dataset 59 return self._analyze_single_dataset(data)
Analyze the data and return statistical summaries.
Args: data: Input data to analyze **kwargs: Additional arguments
Returns: Dictionary containing analysis results
89 def visualize(self, data: Union[pd.DataFrame, List[pd.DataFrame]], **kwargs): 90 """ 91 Create visualizations of the data. 92 93 Args: 94 data: Input data to visualize 95 **kwargs: Additional arguments including sensor_type, dataset_index, names 96 """ 97 sensor_type = kwargs.get('sensor_type', 'all') 98 dataset_index = kwargs.get('dataset_index', 0) 99 names = kwargs.get('names', []) 100 101 if isinstance(data, list): 102 if dataset_index < len(data): 103 df = data[dataset_index] 104 dataset_name = names[dataset_index] if dataset_index < len(names) else f"Dataset {dataset_index}" 105 else: 106 print(f"Dataset index {dataset_index} out of range") 107 return 108 else: 109 df = data 110 dataset_name = names[0] if names else "Dataset" 111 112 if sensor_type == 'all': 113 self._plot_all_sensors(df, dataset_name) 114 elif sensor_type == 'thigh': 115 self._plot_thigh_data(df, dataset_name) 116 elif sensor_type == 'shank': 117 self._plot_shank_data(df, dataset_name) 118 elif sensor_type == 'trunk': 119 self._plot_trunk_data(df, dataset_name) 120 else: 121 print(f"Unknown sensor type: {sensor_type}")
Create visualizations of the data.
Args: data: Input data to visualize **kwargs: Additional arguments including sensor_type, dataset_index, names
Inherited Members
312class SensorStatisticsAnalyzer(BaseEDAAnalyzer): 313 """ 314 EDA analyzer for sensor data statistics and feature visualization. 315 316 This analyzer provides statistical analysis and feature visualization capabilities 317 for sensor data including sliding windows and extracted features. 318 """ 319 320 def __init__(self): 321 super().__init__( 322 name="sensor_statistics", 323 description="Statistical analysis and feature visualization for sensor data" 324 ) 325 self.config = { 326 'figsize': (20, 10), 327 'feature_markers': { 328 'mean': 'x', 329 'rms': 'o', 330 'peak_height': 'v', 331 'mode': '<', 332 'median': '^' 333 } 334 } 335 336 def analyze(self, data: Union[pd.DataFrame, List[pd.DataFrame]], **kwargs) -> Dict[str, Any]: 337 """ 338 Analyze sensor data and return statistical summaries. 339 340 Args: 341 data: Input data to analyze 342 **kwargs: Additional arguments 343 344 Returns: 345 Dictionary containing analysis results 346 """ 347 if isinstance(data, list): 348 # Multiple datasets 349 results = {} 350 for i, df in enumerate(data): 351 results[f'dataset_{i}'] = self._compute_statistics(df) 352 return results 353 else: 354 # Single dataset 355 return self._compute_statistics(data) 356 357 def _compute_statistics(self, df: pd.DataFrame) -> Dict[str, Any]: 358 """Compute comprehensive statistics for a dataset.""" 359 stats = { 360 'basic_stats': df.describe().to_dict(), 361 'correlation_matrix': df.corr().to_dict() if len(df.select_dtypes(include=[np.number]).columns) > 1 else {}, 362 'skewness': df.skew().to_dict(), 363 'kurtosis': df.kurtosis().to_dict() 364 } 365 366 # Add sensor-specific statistics 367 sensor_stats = {} 368 for sensor in ['thigh', 'shank', 'trunk']: 369 if sensor in df.columns: 370 sensor_data = df[sensor].dropna() 371 sensor_stats[sensor] = { 372 'mean': sensor_data.mean(), 373 'std': sensor_data.std(), 374 'variance': sensor_data.var(), 375 'min': sensor_data.min(), 376 'max': sensor_data.max(), 377 'range': sensor_data.max() - sensor_data.min(), 378 'median': sensor_data.median(), 379 'q25': sensor_data.quantile(0.25), 380 'q75': sensor_data.quantile(0.75), 381 'iqr': sensor_data.quantile(0.75) - sensor_data.quantile(0.25) 382 } 383 384 stats['sensor_statistics'] = sensor_stats 385 return stats 386 387 def visualize(self, sliding_windows: List[Dict], features: List[Dict], **kwargs): 388 """ 389 Create visualizations of sensor data with overlaid features. 390 391 Args: 392 sliding_windows: List of sliding window dictionaries 393 features: List of feature dictionaries 394 **kwargs: Additional arguments including sensor_name, start_idx, end_idx, num_windows 395 """ 396 sensor_name = kwargs.get('sensor_name', 'shank') 397 start_idx = kwargs.get('start_idx', 0) 398 end_idx = kwargs.get('end_idx', 1000) 399 num_windows = kwargs.get('num_windows', 10) 400 save = kwargs.get('save', False) 401 402 self._plot_sensor_with_features(sliding_windows, features, start_idx, end_idx, 403 sensor_name, num_windows, save) 404 405 def _plot_sensor_with_features(self, sliding_windows: List[Dict], features: List[Dict], 406 start_idx: int, end_idx: int, sensor_name: str = "shank", 407 num_windows: int = 10, save: bool = False): 408 """ 409 Plot sliding windows of sensor data with overlaid statistical features. 410 411 Args: 412 sliding_windows: List of sliding window dictionaries 413 features: List of feature dictionaries 414 start_idx: Start index of the time window 415 end_idx: End index of the time window 416 sensor_name: Name of the sensor to plot 417 num_windows: Number of sliding windows to plot 418 save: Whether to save the plot 419 """ 420 fig, axes = plt.subplots(2, 1, figsize=self.config['figsize'], 421 gridspec_kw={'height_ratios': [3, 1]}) 422 423 # Extract sensor windows 424 sensor_windows = next((sw['data'] for sw in sliding_windows if sw['name'] == sensor_name), None) 425 if sensor_windows is None: 426 print(f"Sensor '{sensor_name}' not found in sliding_windows.") 427 return 428 429 # Extract corresponding features 430 sensor_features = next((feat['features'] for feat in features if feat['name'] == sensor_name), None) 431 if sensor_features is None: 432 print(f"Sensor '{sensor_name}' not found in features.") 433 return 434 435 # Filter windows based on start_idx and end_idx 436 filtered_windows = [series for series in sensor_windows 437 if start_idx <= series.index[0] and series.index[-1] <= end_idx] 438 439 if not filtered_windows: 440 print(f"No windows found in the specified index range ({start_idx} - {end_idx}).") 441 return 442 443 # Store entropy & frequency features for separate plotting 444 entropy_values = [] 445 dominant_frequencies = [] 446 447 # Plot first num_windows windows 448 for i in range(min(num_windows, len(filtered_windows))): 449 series = filtered_windows[i] 450 451 # Extract time and signal values 452 time_values = series.index.to_numpy() 453 signal_values = series.values 454 455 # Determine actual start and end indices for this window 456 window_start, window_end = time_values[0], time_values[-1] 457 458 # Plot time series data 459 axes[0].plot(time_values, signal_values, alpha=0.6) 460 461 # Mark start and end of each window with vertical dotted lines 462 axes[0].axvline(x=window_start, color='black', linestyle='dotted', alpha=0.7) 463 axes[0].axvline(x=window_end, color='black', linestyle='dotted', alpha=0.7) 464 465 # Overlay statistical features 466 for feature_name, marker in self.config['feature_markers'].items(): 467 if feature_name in sensor_features and len(sensor_features[feature_name]) > i: 468 feature_value = sensor_features[feature_name][i] 469 if feature_value != 0: # Skip zero values 470 closest_index = np.argmin(np.abs(signal_values - feature_value)) 471 closest_time = time_values[closest_index] 472 axes[0].scatter(closest_time, feature_value, color='red', 473 marker=marker, s=100, label=feature_name if i == 0 else "") 474 475 # Store entropy & frequency features for separate plotting 476 if 'entropy' in sensor_features and len(sensor_features['entropy']) > i: 477 entropy_values.append(sensor_features['entropy'][i]) 478 if 'dominant_frequency' in sensor_features and len(sensor_features['dominant_frequency']) > i: 479 dominant_frequencies.append(sensor_features['dominant_frequency'][i]) 480 481 # Labels and title for time-series plot 482 axes[0].set_xlabel('Time') 483 axes[0].set_ylabel(f'{sensor_name} Signal') 484 axes[0].set_title(f'First {num_windows} windows of {sensor_name} in range {start_idx}-{end_idx} with Features') 485 axes[0].legend() 486 487 # Frequency-domain & entropy plot 488 if dominant_frequencies: 489 window_indices = list(range(len(dominant_frequencies))) 490 axes[1].plot(window_indices, dominant_frequencies, 491 label="Dominant Frequency", marker="o", linestyle="dashed", color="blue") 492 493 if entropy_values: 494 axes[1].bar(window_indices, entropy_values, alpha=0.6, label="Entropy", color="green") 495 496 axes[1].set_xlabel("Window Index") 497 axes[1].set_ylabel("Feature Value") 498 axes[1].set_title("Frequency & Entropy Features") 499 axes[1].legend() 500 501 plt.tight_layout() 502 503 # Save or show plot 504 if save: 505 file_path = input("Enter the file path to save the plot (e.g., 'plot.png'): ") 506 plt.savefig(file_path, dpi=300) 507 print(f"Plot saved at {file_path}") 508 else: 509 plt.show()
EDA analyzer for sensor data statistics and feature visualization.
This analyzer provides statistical analysis and feature visualization capabilities for sensor data including sliding windows and extracted features.
320 def __init__(self): 321 super().__init__( 322 name="sensor_statistics", 323 description="Statistical analysis and feature visualization for sensor data" 324 ) 325 self.config = { 326 'figsize': (20, 10), 327 'feature_markers': { 328 'mean': 'x', 329 'rms': 'o', 330 'peak_height': 'v', 331 'mode': '<', 332 'median': '^' 333 } 334 }
Initialize the EDA analyzer.
Args: name: Name of the EDA analyzer description: Description of the EDA analyzer
336 def analyze(self, data: Union[pd.DataFrame, List[pd.DataFrame]], **kwargs) -> Dict[str, Any]: 337 """ 338 Analyze sensor data and return statistical summaries. 339 340 Args: 341 data: Input data to analyze 342 **kwargs: Additional arguments 343 344 Returns: 345 Dictionary containing analysis results 346 """ 347 if isinstance(data, list): 348 # Multiple datasets 349 results = {} 350 for i, df in enumerate(data): 351 results[f'dataset_{i}'] = self._compute_statistics(df) 352 return results 353 else: 354 # Single dataset 355 return self._compute_statistics(data)
Analyze sensor data and return statistical summaries.
Args: data: Input data to analyze **kwargs: Additional arguments
Returns: Dictionary containing analysis results
387 def visualize(self, sliding_windows: List[Dict], features: List[Dict], **kwargs): 388 """ 389 Create visualizations of sensor data with overlaid features. 390 391 Args: 392 sliding_windows: List of sliding window dictionaries 393 features: List of feature dictionaries 394 **kwargs: Additional arguments including sensor_name, start_idx, end_idx, num_windows 395 """ 396 sensor_name = kwargs.get('sensor_name', 'shank') 397 start_idx = kwargs.get('start_idx', 0) 398 end_idx = kwargs.get('end_idx', 1000) 399 num_windows = kwargs.get('num_windows', 10) 400 save = kwargs.get('save', False) 401 402 self._plot_sensor_with_features(sliding_windows, features, start_idx, end_idx, 403 sensor_name, num_windows, save)
Create visualizations of sensor data with overlaid features.
Args: sliding_windows: List of sliding window dictionaries features: List of feature dictionaries **kwargs: Additional arguments including sensor_name, start_idx, end_idx, num_windows
Inherited Members
21class RandomForestModel(BaseClassificationModel): 22 """ 23 Random Forest classification model. 24 25 This class provides Random Forest classification functionality with 26 comprehensive training, prediction, and evaluation capabilities. 27 """ 28 29 def __init__(self, n_estimators: int = 100, random_state: int = 42, max_depth: Optional[int] = None): 30 super().__init__( 31 name="random_forest", 32 description="Random Forest classifier for gait data classification" 33 ) 34 self.config = { 35 'n_estimators': n_estimators, 36 'random_state': random_state, 37 'max_depth': max_depth 38 } 39 self.model = RandomForestClassifier( 40 n_estimators=n_estimators, 41 random_state=random_state, 42 max_depth=max_depth 43 ) 44 self.feature_names = [] 45 self.class_names = [] 46 47 def train(self, features: List[Dict], **kwargs): 48 """ 49 Train the Random Forest model on the given features. 50 51 Args: 52 features: List of feature dictionaries 53 **kwargs: Additional arguments including test_size, validation_split 54 """ 55 # Preprocess features 56 X, y = preprocess_features(features) 57 58 # Store feature and class information 59 self.feature_names = [f"feature_{i}" for i in range(X.shape[1])] 60 self.class_names = list(set(y)) 61 62 # Split data if test_size is specified 63 test_size = kwargs.get('test_size', 0.2) 64 validation_split = kwargs.get('validation_split', True) 65 66 if validation_split: 67 X_train, X_test, y_train, y_test = train_test_split( 68 X, y, test_size=test_size, random_state=self.config['random_state'] 69 ) 70 71 # Train model 72 self.model.fit(X_train, y_train) 73 74 # Store validation data for later evaluation 75 self.X_test = X_test 76 self.y_test = y_test 77 78 # Print training accuracy 79 train_accuracy = self.model.score(X_train, y_train) 80 test_accuracy = self.model.score(X_test, y_test) 81 82 print(f"Training accuracy: {train_accuracy:.4f}") 83 print(f"Validation accuracy: {test_accuracy:.4f}") 84 else: 85 # Train on all data 86 self.model.fit(X, y) 87 train_accuracy = self.model.score(X, y) 88 print(f"Training accuracy: {train_accuracy:.4f}") 89 90 self.trained = True 91 print("Random Forest model trained successfully.") 92 93 def predict(self, features: List[Dict], **kwargs) -> Union[np.ndarray, Any]: 94 """ 95 Make predictions using the trained Random Forest model. 96 97 Args: 98 features: List of feature dictionaries 99 **kwargs: Additional arguments including return_probabilities 100 101 Returns: 102 Array of predictions or probabilities 103 """ 104 if not self.trained: 105 raise ValueError("Model must be trained before making predictions") 106 107 # Preprocess features 108 X, _ = preprocess_features(features) 109 110 # Make predictions 111 return_probabilities = kwargs.get('return_probabilities', False) 112 113 if return_probabilities: 114 return self.model.predict_proba(X) 115 else: 116 return self.model.predict(X) 117 118 def evaluate(self, features: List[Dict], **kwargs) -> Dict[str, float]: 119 """ 120 Evaluate the Random Forest model performance. 121 122 Args: 123 features: List of feature dictionaries 124 **kwargs: Additional arguments including detailed_report 125 126 Returns: 127 Dictionary containing evaluation metrics 128 """ 129 if not self.trained: 130 raise ValueError("Model must be trained before evaluation") 131 132 # Use validation data if available, otherwise use provided features 133 if hasattr(self, 'X_test') and hasattr(self, 'y_test'): 134 X_test, y_test = self.X_test, self.y_test 135 else: 136 X_test, y_test = preprocess_features(features) 137 138 # Make predictions 139 y_pred = self.model.predict(X_test) 140 141 # Calculate metrics 142 accuracy = accuracy_score(y_test, y_pred) 143 conf_matrix = confusion_matrix(y_test, y_pred) 144 145 # Basic metrics 146 metrics = { 147 'accuracy': accuracy, 148 'confusion_matrix': conf_matrix.tolist() 149 } 150 151 # Detailed report if requested 152 detailed_report = kwargs.get('detailed_report', False) 153 if detailed_report: 154 class_report = classification_report(y_test, y_pred, output_dict=True) 155 metrics['classification_report'] = class_report 156 157 # Feature importance 158 if hasattr(self.model, 'feature_importances_'): 159 feature_importance = dict(zip(self.feature_names, self.model.feature_importances_)) 160 metrics['feature_importance'] = feature_importance 161 162 return metrics 163 164 def save_model(self, filepath: str): 165 """ 166 Save the trained Random Forest model to a file. 167 168 Args: 169 filepath: Path to save the model 170 """ 171 if not self.trained: 172 raise ValueError("Model must be trained before saving") 173 174 # Save model with additional metadata 175 model_data = { 176 'model': self.model, 177 'config': self.config, 178 'feature_names': self.feature_names, 179 'class_names': self.class_names, 180 'trained': self.trained 181 } 182 183 joblib.dump(model_data, filepath) 184 print(f"Random Forest model saved to {filepath}") 185 186 def load_model(self, filepath: str): 187 """ 188 Load a trained Random Forest model from a file. 189 190 Args: 191 filepath: Path to the saved model 192 """ 193 try: 194 model_data = joblib.load(filepath) 195 196 # Handle legacy model format 197 if isinstance(model_data, dict): 198 self.model = model_data['model'] 199 self.config = model_data.get('config', self.config) 200 self.feature_names = model_data.get('feature_names', []) 201 self.class_names = model_data.get('class_names', []) 202 self.trained = model_data.get('trained', True) 203 else: 204 # Legacy format - just the model 205 self.model = model_data 206 self.trained = True 207 208 print(f"Random Forest model loaded from {filepath}") 209 except Exception as e: 210 print(f"Error loading model: {e}") 211 raise 212 213 def get_feature_importance(self) -> Dict[str, float]: 214 """ 215 Get feature importance scores. 216 217 Returns: 218 Dictionary mapping feature names to importance scores 219 """ 220 if not self.trained: 221 raise ValueError("Model must be trained to get feature importance") 222 223 if hasattr(self.model, 'feature_importances_'): 224 return dict(zip(self.feature_names, self.model.feature_importances_)) 225 else: 226 return {} 227 228 def predict_single(self, single_features: Dict) -> int: 229 """ 230 Make prediction for a single feature vector. 231 232 Args: 233 single_features: Dictionary containing features for a single sample 234 235 Returns: 236 Predicted class 237 """ 238 if not self.trained: 239 raise ValueError("Model must be trained before making predictions") 240 241 # Convert single feature dict to format expected by preprocess_features 242 features_list = [single_features] 243 X, _ = preprocess_features(features_list) 244 245 return self.model.predict(X)[0]
Random Forest classification model.
This class provides Random Forest classification functionality with comprehensive training, prediction, and evaluation capabilities.
29 def __init__(self, n_estimators: int = 100, random_state: int = 42, max_depth: Optional[int] = None): 30 super().__init__( 31 name="random_forest", 32 description="Random Forest classifier for gait data classification" 33 ) 34 self.config = { 35 'n_estimators': n_estimators, 36 'random_state': random_state, 37 'max_depth': max_depth 38 } 39 self.model = RandomForestClassifier( 40 n_estimators=n_estimators, 41 random_state=random_state, 42 max_depth=max_depth 43 ) 44 self.feature_names = [] 45 self.class_names = []
Initialize the classification model.
Args: name: Name of the classification model description: Description of the classification model
47 def train(self, features: List[Dict], **kwargs): 48 """ 49 Train the Random Forest model on the given features. 50 51 Args: 52 features: List of feature dictionaries 53 **kwargs: Additional arguments including test_size, validation_split 54 """ 55 # Preprocess features 56 X, y = preprocess_features(features) 57 58 # Store feature and class information 59 self.feature_names = [f"feature_{i}" for i in range(X.shape[1])] 60 self.class_names = list(set(y)) 61 62 # Split data if test_size is specified 63 test_size = kwargs.get('test_size', 0.2) 64 validation_split = kwargs.get('validation_split', True) 65 66 if validation_split: 67 X_train, X_test, y_train, y_test = train_test_split( 68 X, y, test_size=test_size, random_state=self.config['random_state'] 69 ) 70 71 # Train model 72 self.model.fit(X_train, y_train) 73 74 # Store validation data for later evaluation 75 self.X_test = X_test 76 self.y_test = y_test 77 78 # Print training accuracy 79 train_accuracy = self.model.score(X_train, y_train) 80 test_accuracy = self.model.score(X_test, y_test) 81 82 print(f"Training accuracy: {train_accuracy:.4f}") 83 print(f"Validation accuracy: {test_accuracy:.4f}") 84 else: 85 # Train on all data 86 self.model.fit(X, y) 87 train_accuracy = self.model.score(X, y) 88 print(f"Training accuracy: {train_accuracy:.4f}") 89 90 self.trained = True 91 print("Random Forest model trained successfully.")
Train the Random Forest model on the given features.
Args: features: List of feature dictionaries **kwargs: Additional arguments including test_size, validation_split
93 def predict(self, features: List[Dict], **kwargs) -> Union[np.ndarray, Any]: 94 """ 95 Make predictions using the trained Random Forest model. 96 97 Args: 98 features: List of feature dictionaries 99 **kwargs: Additional arguments including return_probabilities 100 101 Returns: 102 Array of predictions or probabilities 103 """ 104 if not self.trained: 105 raise ValueError("Model must be trained before making predictions") 106 107 # Preprocess features 108 X, _ = preprocess_features(features) 109 110 # Make predictions 111 return_probabilities = kwargs.get('return_probabilities', False) 112 113 if return_probabilities: 114 return self.model.predict_proba(X) 115 else: 116 return self.model.predict(X)
Make predictions using the trained Random Forest model.
Args: features: List of feature dictionaries **kwargs: Additional arguments including return_probabilities
Returns: Array of predictions or probabilities
118 def evaluate(self, features: List[Dict], **kwargs) -> Dict[str, float]: 119 """ 120 Evaluate the Random Forest model performance. 121 122 Args: 123 features: List of feature dictionaries 124 **kwargs: Additional arguments including detailed_report 125 126 Returns: 127 Dictionary containing evaluation metrics 128 """ 129 if not self.trained: 130 raise ValueError("Model must be trained before evaluation") 131 132 # Use validation data if available, otherwise use provided features 133 if hasattr(self, 'X_test') and hasattr(self, 'y_test'): 134 X_test, y_test = self.X_test, self.y_test 135 else: 136 X_test, y_test = preprocess_features(features) 137 138 # Make predictions 139 y_pred = self.model.predict(X_test) 140 141 # Calculate metrics 142 accuracy = accuracy_score(y_test, y_pred) 143 conf_matrix = confusion_matrix(y_test, y_pred) 144 145 # Basic metrics 146 metrics = { 147 'accuracy': accuracy, 148 'confusion_matrix': conf_matrix.tolist() 149 } 150 151 # Detailed report if requested 152 detailed_report = kwargs.get('detailed_report', False) 153 if detailed_report: 154 class_report = classification_report(y_test, y_pred, output_dict=True) 155 metrics['classification_report'] = class_report 156 157 # Feature importance 158 if hasattr(self.model, 'feature_importances_'): 159 feature_importance = dict(zip(self.feature_names, self.model.feature_importances_)) 160 metrics['feature_importance'] = feature_importance 161 162 return metrics
Evaluate the Random Forest model performance.
Args: features: List of feature dictionaries **kwargs: Additional arguments including detailed_report
Returns: Dictionary containing evaluation metrics
164 def save_model(self, filepath: str): 165 """ 166 Save the trained Random Forest model to a file. 167 168 Args: 169 filepath: Path to save the model 170 """ 171 if not self.trained: 172 raise ValueError("Model must be trained before saving") 173 174 # Save model with additional metadata 175 model_data = { 176 'model': self.model, 177 'config': self.config, 178 'feature_names': self.feature_names, 179 'class_names': self.class_names, 180 'trained': self.trained 181 } 182 183 joblib.dump(model_data, filepath) 184 print(f"Random Forest model saved to {filepath}")
Save the trained Random Forest model to a file.
Args: filepath: Path to save the model
186 def load_model(self, filepath: str): 187 """ 188 Load a trained Random Forest model from a file. 189 190 Args: 191 filepath: Path to the saved model 192 """ 193 try: 194 model_data = joblib.load(filepath) 195 196 # Handle legacy model format 197 if isinstance(model_data, dict): 198 self.model = model_data['model'] 199 self.config = model_data.get('config', self.config) 200 self.feature_names = model_data.get('feature_names', []) 201 self.class_names = model_data.get('class_names', []) 202 self.trained = model_data.get('trained', True) 203 else: 204 # Legacy format - just the model 205 self.model = model_data 206 self.trained = True 207 208 print(f"Random Forest model loaded from {filepath}") 209 except Exception as e: 210 print(f"Error loading model: {e}") 211 raise
Load a trained Random Forest model from a file.
Args: filepath: Path to the saved model
213 def get_feature_importance(self) -> Dict[str, float]: 214 """ 215 Get feature importance scores. 216 217 Returns: 218 Dictionary mapping feature names to importance scores 219 """ 220 if not self.trained: 221 raise ValueError("Model must be trained to get feature importance") 222 223 if hasattr(self.model, 'feature_importances_'): 224 return dict(zip(self.feature_names, self.model.feature_importances_)) 225 else: 226 return {}
Get feature importance scores.
Returns: Dictionary mapping feature names to importance scores
228 def predict_single(self, single_features: Dict) -> int: 229 """ 230 Make prediction for a single feature vector. 231 232 Args: 233 single_features: Dictionary containing features for a single sample 234 235 Returns: 236 Predicted class 237 """ 238 if not self.trained: 239 raise ValueError("Model must be trained before making predictions") 240 241 # Convert single feature dict to format expected by preprocess_features 242 features_list = [single_features] 243 X, _ = preprocess_features(features_list) 244 245 return self.model.predict(X)[0]
Make prediction for a single feature vector.
Args: single_features: Dictionary containing features for a single sample
Returns: Predicted class
Inherited Members
53def get_dataset_manager(): 54 """Get the singleton DatasetManager instance.""" 55 return DatasetManager()
Get the singleton DatasetManager instance.
93def get_feature_manager(): 94 """Get the singleton FeatureManager instance.""" 95 return FeatureManager()
Get the singleton FeatureManager instance.
69def get_preprocessing_manager(): 70 """Get the singleton PreprocessingManager instance.""" 71 return PreprocessingManager()
Get the singleton PreprocessingManager instance.
Get the singleton EDAManager instance.
44def get_classification_manager(): 45 """Get the singleton ClassificationManager instance.""" 46 return ClassificationManager()
Get the singleton ClassificationManager instance.
194def get_all_managers(): 195 """ 196 Get all singleton managers. 197 198 Returns: 199 Dictionary containing all manager instances 200 """ 201 return { 202 'dataset': DatasetManager(), 203 'feature': FeatureManager(), 204 'preprocessing': PreprocessingManager(), 205 'eda': EDAManager(), 206 'classification': ClassificationManager() 207 }
Get all singleton managers.
Returns: Dictionary containing all manager instances
58def get_available_datasets(): 59 """Get list of available dataset names.""" 60 return DatasetManager().get_available_components()
Get list of available dataset names.
98def get_available_extractors(): 99 """Get list of available feature extractor names.""" 100 return FeatureManager().get_available_components()
Get list of available feature extractor names.
74def get_available_preprocessors(): 75 """Get list of available preprocessor names.""" 76 return PreprocessingManager().get_available_components()
Get list of available preprocessor names.
56def get_available_analyzers(): 57 """Get list of available EDA analyzer names.""" 58 return EDAManager().get_available_components()
Get list of available EDA analyzer names.
49def get_available_models(): 50 """Get list of available classification model names.""" 51 return ClassificationManager().get_available_components()
Get list of available classification model names.
210def get_system_info(): 211 """ 212 Get information about the available components in the system. 213 214 Returns: 215 Dictionary containing system information 216 """ 217 return { 218 'version': __version__, 219 'author': __author__, 220 'available_datasets': get_available_datasets(), 221 'available_extractors': get_available_extractors(), 222 'available_preprocessors': get_available_preprocessors(), 223 'available_analyzers': get_available_analyzers(), 224 'available_models': get_available_models(), 225 'architecture': 'Modular with singleton design pattern' 226 }
Get information about the available components in the system.
Returns: Dictionary containing system information
229def load_and_analyze_daphnet(data_dir: str, sensor_type: str = 'all', window_size: int = 192): 230 """ 231 Complete workflow for loading and analyzing Daphnet data. 232 233 Args: 234 data_dir: Directory containing the Daphnet dataset 235 sensor_type: Type of sensor to analyze ('all', 'thigh', 'shank', 'trunk') 236 window_size: Size of sliding windows for feature extraction 237 238 Returns: 239 Dictionary containing data, features, and analysis results 240 """ 241 # Load dataset 242 loader = DaphnetLoader() 243 data, names = loader.load_data(data_dir) 244 245 # Create sliding windows 246 windows = loader.create_sliding_windows(data, names, window_size=window_size) 247 248 # Extract features 249 extractor = GaitFeatureExtractor() 250 features = extractor.extract_features(windows[0]['windows'], fs=64) 251 252 # Analyze data 253 analyzer = DaphnetVisualizationAnalyzer() 254 analysis = analyzer.analyze(data) 255 256 return { 257 'data': data, 258 'names': names, 259 'windows': windows, 260 'features': features, 261 'analysis': analysis, 262 'loader': loader, 263 'extractor': extractor, 264 'analyzer': analyzer 265 }
Complete workflow for loading and analyzing Daphnet data.
Args: data_dir: Directory containing the Daphnet dataset sensor_type: Type of sensor to analyze ('all', 'thigh', 'shank', 'trunk') window_size: Size of sliding windows for feature extraction
Returns: Dictionary containing data, features, and analysis results
267def load_and_analyze_physionet(data_dir: str, window_size: int = 600, step_size: int = 100): 268 """ 269 Complete workflow for loading and analyzing PhysioNet VGRF data. 270 271 Args: 272 data_dir: Directory to store/find the PhysioNet dataset 273 window_size: Size of sliding windows for feature extraction (default: 600) 274 step_size: Step size for sliding windows (default: 100) 275 276 Returns: 277 Dictionary containing data, features, and analysis results 278 """ 279 # Load dataset 280 loader = PhysioNetLoader() 281 data, names = loader.load_data(data_dir) 282 283 # Create sliding windows 284 windows = loader.create_sliding_windows(data, names, window_size=window_size, step_size=step_size) 285 286 # Extract PhysioNet-specific features 287 extractor = PhysioNetFeatureExtractor() 288 all_features = [] 289 290 for window_dict in windows: 291 if 'windows' in window_dict: 292 features = extractor.extract_features(window_dict['windows'], fs=100) 293 all_features.append({ 294 'name': window_dict['name'], 295 'features': features, 296 'metadata': window_dict.get('metadata', {}) 297 }) 298 299 return { 300 'data': data, 301 'names': names, 302 'windows': windows, 303 'features': all_features, 304 'labels': loader.get_labels(), 305 'loader': loader, 306 'extractor': extractor 307 }
Complete workflow for loading and analyzing PhysioNet VGRF data.
Args: data_dir: Directory to store/find the PhysioNet dataset window_size: Size of sliding windows for feature extraction (default: 600) step_size: Step size for sliding windows (default: 100)
Returns: Dictionary containing data, features, and analysis results
309def train_gait_classifier(features, model_type: str = 'random_forest', **kwargs): 310 """ 311 Train a gait classification model. 312 313 Args: 314 features: List of feature dictionaries 315 model_type: Type of model to train ('random_forest', etc.) 316 **kwargs: Additional arguments for model training 317 318 Returns: 319 Trained model instance 320 """ 321 if model_type == 'random_forest': 322 model = RandomForestModel(**kwargs) 323 model.train(features, **kwargs) 324 return model 325 else: 326 raise ValueError(f"Model type '{model_type}' not supported")
Train a gait classification model.
Args: features: List of feature dictionaries model_type: Type of model to train ('random_forest', etc.) **kwargs: Additional arguments for model training
Returns: Trained model instance
177def load_daphnet_data(data_dir: str): 178 """ 179 Legacy function for loading Daphnet data. 180 181 Args: 182 data_dir: Directory to store the dataset 183 184 Returns: 185 Tuple of (data_list, names_list) 186 """ 187 loader = DaphnetLoader() 188 return loader.load_data(data_dir)
Legacy function for loading Daphnet data.
Args: data_dir: Directory to store the dataset
Returns: Tuple of (data_list, names_list)
191def create_sliding_windows(daphnet, daphnet_names, window_size=192, step_size=32): 192 """ 193 Legacy function for creating sliding windows. 194 195 Args: 196 daphnet: List of dataframes containing Daphnet data 197 daphnet_names: List of names of the Daphnet dataframes 198 window_size: Size of the sliding window 199 step_size: Step size for the sliding window 200 201 Returns: 202 List of dictionaries containing sliding windows for each DataFrame 203 """ 204 loader = DaphnetLoader() 205 return loader.create_sliding_windows(daphnet, daphnet_names, window_size, step_size)
Legacy function for creating sliding windows.
Args: daphnet: List of dataframes containing Daphnet data daphnet_names: List of names of the Daphnet dataframes window_size: Size of the sliding window step_size: Step size for the sliding window
Returns: List of dictionaries containing sliding windows for each DataFrame
111def load_mobifall_data(): 112 """ 113 Legacy function for loading MobiFall data. 114 115 Returns: 116 Tuple of (data_list, names_list) 117 """ 118 loader = MobiFallLoader() 119 return loader.load_data("")
Legacy function for loading MobiFall data.
Returns: Tuple of (data_list, names_list)
111def load_arduous_data(): 112 """ 113 Legacy function for loading Arduous data. 114 115 Returns: 116 Tuple of (data_list, names_list) 117 """ 118 loader = ArduousLoader() 119 return loader.load_data("")
Legacy function for loading Arduous data.
Returns: Tuple of (data_list, names_list)
338def load_physionet_data(data_dir: str) -> Tuple[List[pd.DataFrame], List[str]]: 339 """ 340 Legacy function to load PhysioNet data. 341 342 Args: 343 data_dir: Directory containing the dataset 344 345 Returns: 346 Tuple of (data_list, names_list) 347 """ 348 loader = PhysioNetLoader() 349 return loader.load_data(data_dir)
Legacy function to load PhysioNet data.
Args: data_dir: Directory containing the dataset
Returns: Tuple of (data_list, names_list)
352def create_physionet_windows(data: List[pd.DataFrame], names: List[str], 353 window_size: int = 600, step_size: int = 100) -> List[Dict]: 354 """ 355 Legacy function to create sliding windows from PhysioNet data. 356 357 Args: 358 data: List of DataFrames 359 names: List of names 360 window_size: Size of sliding window 361 step_size: Step size for sliding window 362 363 Returns: 364 List of sliding window dictionaries 365 """ 366 loader = PhysioNetLoader() 367 return loader.create_sliding_windows(data, names, window_size, step_size)
Legacy function to create sliding windows from PhysioNet data.
Args: data: List of DataFrames names: List of names window_size: Size of sliding window step_size: Step size for sliding window
Returns: List of sliding window dictionaries
399def load_harup_data(data_dir: str, subjects=None, activities=None, trials=None): 400 """ 401 Legacy function for loading HAR-UP data. 402 403 Args: 404 data_dir: Directory containing the dataset 405 subjects: List of subject IDs to load (default: all subjects) 406 activities: List of activity IDs to load (default: all activities) 407 trials: List of trial IDs to load (default: all trials) 408 409 Returns: 410 Tuple of (data_list, names_list) 411 """ 412 loader = HARUPLoader() 413 return loader.load_data(data_dir, subjects, activities, trials)
Legacy function for loading HAR-UP data.
Args: data_dir: Directory containing the dataset subjects: List of subject IDs to load (default: all subjects) activities: List of activity IDs to load (default: all activities) trials: List of trial IDs to load (default: all trials)
Returns: Tuple of (data_list, names_list)
416def create_harup_windows(harup_data, harup_names, window_size=100, step_size=50): 417 """ 418 Legacy function for creating sliding windows from HAR-UP data. 419 420 Args: 421 harup_data: List of dataframes containing HAR-UP data 422 harup_names: List of names of the HAR-UP dataframes 423 window_size: Size of the sliding window 424 step_size: Step size for the sliding window 425 426 Returns: 427 List of dictionaries containing sliding windows for each DataFrame 428 """ 429 loader = HARUPLoader() 430 return loader.create_sliding_windows(harup_data, harup_names, window_size, step_size)
Legacy function for creating sliding windows from HAR-UP data.
Args: harup_data: List of dataframes containing HAR-UP data harup_names: List of names of the HAR-UP dataframes window_size: Size of the sliding window step_size: Step size for the sliding window
Returns: List of dictionaries containing sliding windows for each DataFrame
433def extract_harup_features(windows_data, time_domain=True, freq_domain=True): 434 """ 435 Legacy function for extracting features from HAR-UP windows. 436 437 Args: 438 windows_data: List of dictionaries containing sliding windows 439 time_domain: Whether to extract time domain features 440 freq_domain: Whether to extract frequency domain features 441 442 Returns: 443 List of dictionaries containing extracted features 444 """ 445 loader = HARUPLoader() 446 return loader.extract_features(windows_data, time_domain, freq_domain)
Legacy function for extracting features from HAR-UP windows.
Args: windows_data: List of dictionaries containing sliding windows time_domain: Whether to extract time domain features freq_domain: Whether to extract frequency domain features
Returns: List of dictionaries containing extracted features
25def download_dataset(dataset_name, data_dir): 26 """Download the dataset.""" 27 if dataset_name == "daphnet": 28 download_daphnet_data(data_dir) 29 elif dataset_name == "mobifall": 30 download_mobifall_data(data_dir) 31 elif dataset_name == "arduous": 32 download_arduous_data(data_dir) 33 elif dataset_name == "harup": 34 download_harup_data(data_dir) 35 elif dataset_name == "urfall": 36 download_urfall_data(data_dir) 37 elif dataset_name == "physionet": 38 # PhysioNet dataset is handled by the PhysioNetLoader itself 39 pass 40 else: 41 raise ValueError(f"Dataset {dataset_name} not supported.")
Download the dataset.
243def extract_dataset(dataset_name, data_dir): 244 """Extract the dataset.""" 245 if dataset_name == "daphnet": 246 extract_daphnet_data(data_dir) 247 elif dataset_name == "mobifall": 248 extract_mobifall_data(data_dir) 249 elif dataset_name == "arduous": 250 extract_arduous_data(data_dir) 251 elif dataset_name == "harup": 252 extract_harup_data(data_dir) 253 elif dataset_name == "urfall": 254 extract_urfall_data(data_dir) 255 elif dataset_name == "physionet": 256 # PhysioNet dataset is handled by the PhysioNetLoader itself 257 pass 258 else: 259 raise ValueError(f"Dataset {dataset_name} not supported.")
Extract the dataset.
320def sliding_window(data, window_size, step_size): 321 if window_size <= 0 or step_size <= 0: 322 return [] 323 if len(data) < window_size: 324 return [] 325 num_windows = (len(data) - window_size) // step_size + 1 326 windows = [] 327 for i in range(num_windows): 328 start = i * step_size 329 end = start + window_size 330 windows.append(data[start:end]) 331 return windows
133def calculate_mean(signal): 134 """Calculate the mean of the signal.""" 135 return np.mean(signal)
Calculate the mean of the signal.
64def calculate_standard_deviation(signal): 65 """ 66 Calculate the standard deviation of a signal. 67 Args: 68 signal (np.array): Input signal. 69 Returns: 70 std_dev (float): Standard deviation. 71 """ 72 return np.std(signal)
Calculate the standard deviation of a signal. Args: signal (np.array): Input signal. Returns: std_dev (float): Standard deviation.
96def calculate_variance(signal): 97 """ 98 Calculate the variance of a signal. 99 Args: 100 signal (np.array): Input signal. 101 Returns: 102 variance (float): Variance. 103 """ 104 return np.var(signal)
Calculate the variance of a signal. Args: signal (np.array): Input signal. Returns: variance (float): Variance.
149def calculate_skewness(signal): 150 """Calculate the skewness of the signal.""" 151 try: 152 return skew(signal) 153 except Exception as e: 154 print(f"An error occurred in skewness: {e}") 155 return 0
Calculate the skewness of the signal.
106def calculate_kurtosis(signal): 107 """ 108 Calculate the kurtosis of a signal. 109 Args: 110 signal (np.array): Input signal. 111 Returns: 112 kurtosis_value (float): Kurtosis. 113 """ 114 try: 115 return kurtosis(signal, fisher=False) 116 except Exception as e: 117 print(f"An error occurred in feature 'kurtosis': {e}") 118 return 0
Calculate the kurtosis of a signal. Args: signal (np.array): Input signal. Returns: kurtosis_value (float): Kurtosis.
157def calculate_root_mean_square(signal): 158 """Calculate the root mean square of the signal.""" 159 return np.sqrt(np.mean(np.square(signal)))
Calculate the root mean square of the signal.
161def calculate_range(signal): 162 """Calculate the range of the signal.""" 163 return np.max(signal) - np.min(signal)
Calculate the range of the signal.
145def calculate_median(signal): 146 """Calculate the median of the signal.""" 147 return np.median(signal)
Calculate the median of the signal.
194def calculate_mode(signal): 195 """Calculate the mode of the signal.""" 196 values, counts = np.unique(signal, return_counts=True) 197 return values[np.argmax(counts)]
Calculate the mode of the signal.
206def calculate_mean_absolute_value(signal): 207 """Calculate the mean absolute value of the signal.""" 208 return np.mean(np.abs(signal))
Calculate the mean absolute value of the signal.
210def calculate_median_absolute_deviation(signal): 211 """Calculate the median absolute deviation of the signal.""" 212 return np.median(np.abs(signal - np.median(signal)))
Calculate the median absolute deviation of the signal.
180def calculate_peak_height(signal): 181 """Calculate the peak height of the signal.""" 182 peaks, _ = find_peaks(signal) 183 return np.max(signal[peaks]) if len(peaks) > 0 else 0
Calculate the peak height of the signal.
9def calculate_stride_times(signal, fs): 10 """ 11 Calculate stride times from a signal using peak detection. 12 Args: 13 signal (np.array): Input signal. 14 fs (int): Sampling frequency. 15 Returns: 16 avg_stride_time (float): Average stride time. 17 """ 18 peaks, _ = find_peaks(signal) 19 stride_times = np.diff(peaks) / fs 20 avg_stride_time = np.mean(stride_times) if len(stride_times) > 0 else 0 21 return avg_stride_time
Calculate stride times from a signal using peak detection. Args: signal (np.array): Input signal. fs (int): Sampling frequency. Returns: avg_stride_time (float): Average stride time.
120def calculate_step_time(signal, fs): 121 """ 122 Calculate step times from a signal using peak detection. 123 Args: 124 signal (np.array): Input signal. 125 fs (int): Sampling frequency. 126 Returns: 127 step_times (np.array): Array of step times. 128 """ 129 peaks, _ = find_peaks(signal) 130 step_times = np.diff(peaks) / fs 131 return step_times
Calculate step times from a signal using peak detection. Args: signal (np.array): Input signal. fs (int): Sampling frequency. Returns: step_times (np.array): Array of step times.
199def calculate_cadence(signal, fs): 200 """Calculate the cadence (steps per minute) of the signal.""" 201 peaks, _ = find_peaks(signal) 202 step_count = len(peaks) 203 duration = len(signal) / fs 204 return (step_count / duration) * 60
Calculate the cadence (steps per minute) of the signal.
50def calculate_freezing_index(signal, fs): 51 """ 52 Calculate the freezing index of a signal. 53 Args: 54 signal (np.array): Input signal. 55 fs (int): Sampling frequency. 56 Returns: 57 freezing_index (float): Freezing index. 58 """ 59 power_3_8 = calculate_power(signal, fs, (3, 8)) 60 power_0_5_3 = calculate_power(signal, fs, (0.5, 3)) 61 freezing_index = power_3_8 / power_0_5_3 if power_0_5_3 != 0 else 0 62 return freezing_index
Calculate the freezing index of a signal. Args: signal (np.array): Input signal. fs (int): Sampling frequency. Returns: freezing_index (float): Freezing index.
169def calculate_dominant_frequency(signal, fs): 170 """Calculate the dominant frequency of the signal.""" 171 try: 172 fft_values = np.abs(fft(signal)) 173 freqs = np.fft.fftfreq(len(signal), 1 / fs) 174 dominant_freq = freqs[np.argmax(fft_values)] 175 return dominant_freq 176 except Exception as e: 177 print(f"An error occurred: {e}") 178 return 0
Calculate the dominant frequency of the signal.
214def calculate_peak_frequency(signal, fs): 215 """Calculate the peak frequency of the signal.""" 216 try: 217 f, Pxx = welch(signal, fs=fs, nperseg=min(len(signal), 192)) # Ensure nperseg ≤ length 218 return f[np.argmax(Pxx)] 219 except Exception as e: 220 print(f"An error occurred in feature 'peak_frequency': {e}") 221 return 0
Calculate the peak frequency of the signal.
233def calculate_power_spectral_entropy(signal, fs): 234 """Calculate the power spectral entropy of the signal.""" 235 try: 236 f, Pxx = welch(signal, fs=fs, nperseg=min(len(signal), 192)) # Ensure nperseg ≤ length 237 Pxx_norm = Pxx / np.sum(Pxx) 238 return -np.sum(Pxx_norm * np.log2(Pxx_norm + np.finfo(float).eps)) 239 except Exception as e: 240 print(f"An error occurred in feature 'power spectral entropy': {e}") 241 return 0
Calculate the power spectral entropy of the signal.
243def calculate_principal_harmonic_frequency(signal, fs): 244 """Calculate the principal harmonic frequency of the signal.""" 245 try: 246 fft_values = np.abs(fft(signal)) 247 freqs = np.fft.fftfreq(len(signal), 1 / fs) 248 return freqs[np.argmax(fft_values)] 249 except Exception as e: 250 print(f"An error occurred in feature 'principal_harmonic_frequency': {e}") 251 return 0
Calculate the principal harmonic frequency of the signal.
74def calculate_entropy(signal): 75 """ 76 Calculate the entropy of a signal. 77 Args: 78 signal (np.array): Input signal. 79 Returns: 80 entropy_value (float): Entropy. 81 """ 82 value, counts = np.unique(signal, return_counts=True) 83 probabilities = counts / len(signal) 84 return entropy(probabilities, base=2)
Calculate the entropy of a signal. Args: signal (np.array): Input signal. Returns: entropy_value (float): Entropy.
185def calculate_interquartile_range(signal): 186 """Calculate the interquartile range of the signal.""" 187 try: 188 q75, q25 = np.percentile(signal, [75, 25]) 189 return q75 - q25 190 except Exception as e: 191 print(f"An error occurred in feature 'interquartile_range': {e}") 192 return 0
Calculate the interquartile range of the signal.
165def calculate_correlation(signal1, signal2): 166 """Calculate the correlation between two signals.""" 167 return np.corrcoef(signal1, signal2)[0, 1]
Calculate the correlation between two signals.
253def calculate_auto_regression_coefficients(signal, order=3): 254 """Calculate the auto-regression coefficients of the signal.""" 255 try: 256 model = AutoReg(signal, lags=order) 257 results = model.fit() 258 return results.params 259 except Exception as e: 260 print(f"An error occurred in feature 'auto_regression_coefficients': {e}") 261 return 0
Calculate the auto-regression coefficients of the signal.
15def clip_sliding_windows(data, min_val=-1, max_val=1): 16 """ 17 Clip values in the sliding windows to be within a specified range. 18 """ 19 return np.clip(data, min_val, max_val)
Clip values in the sliding windows to be within a specified range.
21def remove_noise(data, window_size=5): 22 """ 23 Apply a moving average filter to reduce noise. 24 """ 25 return data.rolling(window=window_size, center=True).mean().fillna(method="bfill").fillna(method="ffill")
Apply a moving average filter to reduce noise.
27def remove_outliers(data, threshold=3): 28 """ 29 Remove outliers beyond a given threshold using the Z-score method. 30 """ 31 mean, std = data.mean(), data.std() 32 return data[(data - mean).abs() <= threshold * std]
Remove outliers beyond a given threshold using the Z-score method.
34def remove_baseline(data): 35 """ 36 Remove baseline by subtracting the mean. 37 """ 38 return data - data.mean()
Remove baseline by subtracting the mean.
40def remove_drift(data, cutoff=0.01, fs=100): 41 """ 42 Remove low-frequency drift using a high-pass filter. 43 """ 44 b, a = butter(1, cutoff / (fs / 2), btype='highpass') 45 return filtfilt(b, a, data)
Remove low-frequency drift using a high-pass filter.
47def remove_artifacts(data, method="interpolate"): 48 """ 49 Remove artifacts by interpolating missing values. 50 """ 51 return data.interpolate(method="linear").fillna(method="bfill").fillna(method="ffill")
Remove artifacts by interpolating missing values.
53def remove_trend(data, order=2): 54 """ 55 Remove trends using polynomial fitting. 56 """ 57 x = np.arange(len(data)) 58 poly_coeffs = np.polyfit(x, data, order) 59 trend = np.polyval(poly_coeffs, x) 60 return data - trend
Remove trends using polynomial fitting.
62def remove_dc_offset(data): 63 """ 64 Remove DC offset by subtracting the mean. 65 """ 66 return data - data.mean()
Remove DC offset by subtracting the mean.
68def remove_high_frequency_noise(data, cutoff=10, fs=100): 69 """ 70 Apply a low-pass filter to remove high-frequency noise. 71 """ 72 b, a = butter(1, cutoff / (fs / 2), btype='lowpass') 73 return filtfilt(b, a, data)
Apply a low-pass filter to remove high-frequency noise.
75def remove_low_frequency_noise(data, cutoff=0.5, fs=100): 76 """ 77 Apply a high-pass filter to remove low-frequency noise. 78 """ 79 b, a = butter(1, cutoff / (fs / 2), btype='highpass') 80 return filtfilt(b, a, data)
Apply a high-pass filter to remove low-frequency noise.
19def plot_thigh_data(daphnetThigh, daphnetNames, i): 20 """ 21 Plot thigh acceleration data for a specific dataset. 22 Args: 23 daphnetThigh (list): List of DataFrames containing thigh acceleration data. 24 daphnetNames (list): List of dataset names. 25 i (int): Index of the dataset to plot. 26 """ 27 print(daphnetNames[i]) 28 fig, axes = plt.subplots(4, 1, sharex=True, sharey=True, figsize=(20, 16)) 29 fig.suptitle("Thigh Data from " + daphnetNames[i]) 30 plt.xlabel("Time") 31 32 df = daphnetThigh[i] 33 df = df[df.annotations > 0] # Filter out rows with no annotations 34 neg = df[df.annotations == 1] # No freeze 35 pos = df[df.annotations == 2] # Freeze 36 37 # Plot horizontal forward thigh acceleration 38 ax1 = axes[0] 39 ax1.plot(df.thigh_h_fd) 40 ax1.set_ylabel("Horizontal Forward Thigh Acceleration") 41 ax1.scatter(neg.index, neg.thigh_h_fd, c='orange', label="no freeze") 42 ax1.scatter(pos.index, pos.thigh_h_fd, c='purple', label="freeze") 43 ax1.legend() 44 45 # Plot vertical thigh acceleration 46 ax2 = axes[1] 47 ax2.plot(df.thigh_v) 48 ax2.set_ylabel("Vertical Thigh Acceleration") 49 ax2.scatter(neg.index, neg.thigh_v, c='orange', label="no freeze") 50 ax2.scatter(pos.index, pos.thigh_v, c='purple', label="freeze") 51 ax2.legend() 52 53 # Plot horizontal lateral thigh acceleration 54 ax3 = axes[2] 55 ax3.plot(df.thigh_h_l) 56 ax3.set_ylabel("Horizontal Lateral Thigh Acceleration") 57 ax3.scatter(neg.index, neg.thigh_h_l, c='orange', label="no freeze") 58 ax3.scatter(pos.index, pos.thigh_h_l, c='purple', label="freeze") 59 ax3.legend() 60 61 # Plot overall thigh acceleration 62 ax4 = axes[3] 63 ax4.plot(df.thigh) 64 ax4.set_ylabel("Overall Thigh Acceleration") 65 ax4.scatter(neg.index, neg.thigh, c='orange', label="no freeze") 66 ax4.scatter(pos.index, pos.thigh, c='purple', label="freeze") 67 ax4.legend() 68 69 plt.tight_layout() 70 plt.show()
Plot thigh acceleration data for a specific dataset. Args: daphnetThigh (list): List of DataFrames containing thigh acceleration data. daphnetNames (list): List of dataset names. i (int): Index of the dataset to plot.
73def plot_shank_data(daphnetShank, daphnetNames, i): 74 """ 75 Plot shank acceleration data for a specific dataset. 76 Args: 77 daphnetShank (list): List of DataFrames containing shank acceleration data. 78 daphnetNames (list): List of dataset names. 79 i (int): Index of the dataset to plot. 80 """ 81 print(daphnetNames[i]) 82 fig, axes = plt.subplots(4, 1, sharex=True, sharey=True, figsize=(20, 16)) 83 fig.suptitle("Shank Data from " + daphnetNames[i]) 84 plt.xlabel("Time") 85 86 df = daphnetShank[i] 87 df["shank"] = np.sqrt(df["shank_h_l"]**2 + df["shank_v"]**2 + df["shank_h_fd"]**2) 88 df = df[df.annotations > 0] 89 neg = df[df.annotations == 1] 90 pos = df[df.annotations == 2] 91 92 ax1 = axes[0] 93 ax1.plot(df.shank_h_fd) 94 ax1.set_ylabel("Horizontal Forward Shank Acceleration") 95 ax1.scatter(neg.index, neg.shank_h_fd, c='orange', label="no freeze") 96 ax1.scatter(pos.index, pos.shank_h_fd, c='purple', label="freeze") 97 ax1.legend() 98 99 ax2 = axes[1] 100 ax2.plot(df.shank_v) 101 ax2.set_ylabel("Vertical Shank Acceleration") 102 ax2.scatter(neg.index, neg.shank_v, c='orange', label="no freeze") 103 ax2.scatter(pos.index, pos.shank_v, c='purple', label="freeze") 104 ax2.legend() 105 106 ax3 = axes[2] 107 ax3.plot(df.shank_h_l) 108 ax3.set_ylabel("Horizontal Lateral Shank Acceleration") 109 ax3.scatter(neg.index, neg.shank_h_l, c='orange', label="no freeze") 110 ax3.scatter(pos.index, pos.shank_h_l, c='purple', label="freeze") 111 ax3.legend() 112 113 ax4 = axes[3] 114 ax4.plot(df.shank) 115 ax4.set_ylabel("Overall Shank Acceleration") 116 ax4.scatter(neg.index, neg.shank, c='orange', label="no freeze") 117 ax4.scatter(pos.index, pos.shank, c='purple', label="freeze") 118 ax4.legend() 119 120 plt.tight_layout() 121 plt.show()
Plot shank acceleration data for a specific dataset. Args: daphnetShank (list): List of DataFrames containing shank acceleration data. daphnetNames (list): List of dataset names. i (int): Index of the dataset to plot.
124def plot_trunk_data(daphnetTrunk, daphnetNames, i): 125 """ 126 Plot trunk acceleration data for a specific dataset. 127 Args: 128 daphnetTrunk (list): List of DataFrames containing trunk acceleration data. 129 daphnetNames (list): List of dataset names. 130 i (int): Index of the dataset to plot. 131 """ 132 print(daphnetNames[i]) 133 fig, axes = plt.subplots(4, 1, sharex=True, sharey=True, figsize=(20, 16)) 134 fig.suptitle("Trunk Data from " + daphnetNames[i]) 135 plt.xlabel("Time") 136 137 df = daphnetTrunk[i] 138 df["trunk"] = np.sqrt(df["trunk_h_l"]**2 + df["trunk_v"]**2 + df["trunk_h_fd"]**2) 139 df = df[df.annotations > 0] 140 neg = df[df.annotations == 1] 141 pos = df[df.annotations == 2] 142 143 ax1 = axes[0] 144 ax1.plot(df.trunk_h_fd) 145 ax1.set_ylabel("Horizontal Forward Trunk Acceleration") 146 ax1.scatter(neg.index, neg.trunk_h_fd, c='orange', label="no freeze") 147 ax1.scatter(pos.index, pos.trunk_h_fd, c='purple', label="freeze") 148 ax1.legend() 149 150 ax2 = axes[1] 151 ax2.plot(df.trunk_v) 152 ax2.set_ylabel("Vertical Trunk Acceleration") 153 ax2.scatter(neg.index, neg.trunk_v, c='orange', label="no freeze") 154 ax2.scatter(pos.index, pos.trunk_v, c='purple', label="freeze") 155 ax2.legend() 156 157 ax3 = axes[2] 158 ax3.plot(df.trunk_h_l) 159 ax3.set_ylabel("Horizontal Lateral Trunk Acceleration") 160 ax3.scatter(neg.index, neg.trunk_h_l, c='orange', label="no freeze") 161 ax3.scatter(pos.index, pos.trunk_h_l, c='purple', label="freeze") 162 ax3.legend() 163 164 ax4 = axes[3] 165 ax4.plot(df.trunk) 166 ax4.set_ylabel("Overall Trunk Acceleration") 167 ax4.scatter(neg.index, neg.trunk, c='orange', label="no freeze") 168 ax4.scatter(pos.index, pos.trunk, c='purple', label="freeze") 169 ax4.legend() 170 171 plt.tight_layout() 172 plt.show()
Plot trunk acceleration data for a specific dataset. Args: daphnetTrunk (list): List of DataFrames containing trunk acceleration data. daphnetNames (list): List of dataset names. i (int): Index of the dataset to plot.
191def plot_all_data(daphnetThigh, daphnetShank, daphnetTrunk, daphnetNames, i): 192 """ 193 Plot thigh, shank, and trunk acceleration data for a specific dataset. 194 Args: 195 daphnetThigh (list): List of DataFrames containing thigh acceleration data. 196 daphnetShank (list): List of DataFrames containing shank acceleration data. 197 daphnetTrunk (list): List of DataFrames containing trunk acceleration data. 198 daphnetNames (list): List of dataset names. 199 i (int): Index of the dataset to plot. 200 """ 201 plot_thigh_data(daphnetThigh, daphnetNames, i) 202 plot_shank_data(daphnetShank, daphnetNames, i) 203 plot_trunk_data(daphnetTrunk, daphnetNames, i)
Plot thigh, shank, and trunk acceleration data for a specific dataset. Args: daphnetThigh (list): List of DataFrames containing thigh acceleration data. daphnetShank (list): List of DataFrames containing shank acceleration data. daphnetTrunk (list): List of DataFrames containing trunk acceleration data. daphnetNames (list): List of dataset names. i (int): Index of the dataset to plot.
175def plot_all_thigh_data(daphnetThigh, daphnetNames): 176 """Plot thigh acceleration data for all datasets.""" 177 for i in range(len(daphnetThigh)): 178 plot_thigh_data(daphnetThigh, daphnetNames, i)
Plot thigh acceleration data for all datasets.
180def plot_all_shank_data(daphnetShank, daphnetNames): 181 """Plot shank acceleration data for all datasets.""" 182 for i in range(len(daphnetShank)): 183 plot_shank_data(daphnetShank, daphnetNames, i)
Plot shank acceleration data for all datasets.
185def plot_all_trunk_data(daphnetTrunk, daphnetNames): 186 """Plot trunk acceleration data for all datasets.""" 187 for i in range(len(daphnetTrunk)): 188 plot_trunk_data(daphnetTrunk, daphnetNames, i)
Plot trunk acceleration data for all datasets.
205def plot_all_datasets(daphnetThigh, daphnetShank, daphnetTrunk, daphnetNames): 206 """Plot thigh, shank, and trunk acceleration data for all datasets.""" 207 for i in range(len(daphnetThigh)): 208 plot_all_data(daphnetThigh, daphnetShank, daphnetTrunk, daphnetNames, i)
Plot thigh, shank, and trunk acceleration data for all datasets.
11def plot_sensor_with_features(sliding_windows, features, start_idx, end_idx, sensor_name="shank", num_windows=10, save=False): 12 """ 13 @brief Plots sliding windows of a sensor's time series data with overlaid statistical features. 14 15 This function plots the first `num_windows` sliding windows within the given `start_idx` and `end_idx` 16 for a specified sensor and overlays feature values at their corresponding time indices. 17 It also displays entropy and dominant frequency in a separate plot. 18 19 @param[in] sliding_windows List of dictionaries, where each dictionary contains: 20 - 'name': sensor name (str) 21 - 'data': List of time-series windows (each as a Pandas Series) 22 @param[in] features List of dictionaries, where each dictionary contains: 23 - 'name': sensor name (str) 24 - 'features': Dictionary of extracted feature lists 25 @param[in] start_idx Start index of the time window to be plotted. 26 @param[in] end_idx End index of the time window to be plotted. 27 @param[in] sensor_name Name of the sensor to be plotted (default: "shank"). 28 @param[in] num_windows Number of sliding windows to plot (default: 10). 29 @param[in] save If True, saves the plot to a file instead of displaying it. 30 31 @return None 32 """ 33 34 fig, axes = plt.subplots(2, 1, figsize=(20, 10), gridspec_kw={'height_ratios': [3, 1]}) 35 36 # Extract sensor windows 37 sensor_windows = next((sw['data'] for sw in sliding_windows if sw['name'] == sensor_name), None) 38 if sensor_windows is None: 39 print(f"Sensor '{sensor_name}' not found in sliding_windows.") 40 return 41 42 # Extract corresponding features 43 sensor_features = next((feat['features'] for feat in features if feat['name'] == sensor_name), None) 44 if sensor_features is None: 45 print(f"Sensor '{sensor_name}' not found in features.") 46 return 47 48 # Filter windows based on start_idx and end_idx 49 filtered_windows = [series for series in sensor_windows if start_idx <= series.index[0] and series.index[-1] <= end_idx] 50 51 if not filtered_windows: 52 print(f"No windows found in the specified index range ({start_idx} - {end_idx}).") 53 return 54 55 # Store entropy & frequency features for separate plotting 56 entropy_values = [] 57 dominant_frequencies = [] 58 59 # Plot first `num_windows` windows 60 for i in range(min(num_windows, len(filtered_windows))): 61 series = filtered_windows[i] # Each window is a Pandas Series 62 63 # Extract time and signal values 64 time_values = series.index.to_numpy() # Time is the index 65 signal_values = series.values # Sensor readings 66 67 # Determine actual start and end indices for this window 68 window_start, window_end = time_values[0], time_values[-1] 69 70 # Plot time series data 71 axes[0].plot(time_values, signal_values, alpha=0.6) 72 73 # Mark start and end of each window with vertical dotted lines 74 axes[0].axvline(x=window_start, color='black', linestyle='dotted', alpha=0.7) 75 axes[0].axvline(x=window_end, color='black', linestyle='dotted', alpha=0.7) 76 77 # Overlay statistical features 78 for feature, marker in zip(['mean', 'rms', 'peak_height', 'mode', 'median'], ['x', 'o', 'v', '<', '^']): 79 if feature in sensor_features and len(sensor_features[feature]) > i: 80 feature_value = sensor_features[feature][i] 81 if feature_value != 0: # Skip zero values 82 closest_index = np.argmin(np.abs(signal_values - feature_value)) 83 closest_time = time_values[closest_index] 84 axes[0].scatter(closest_time, feature_value, color='red', marker=marker, s=100) 85 86 # Store entropy & frequency features for separate plotting 87 if 'entropy' in sensor_features and len(sensor_features['entropy']) > i: 88 entropy_values.append(sensor_features['entropy'][i]) 89 if 'dominant_frequency' in sensor_features and len(sensor_features['dominant_frequency']) > i: 90 dominant_frequencies.append(sensor_features['dominant_frequency'][i]) 91 92 # Labels and title for time-series plot 93 axes[0].set_xlabel('Time') 94 axes[0].set_ylabel(f'{sensor_name} Signal') 95 axes[0].set_title(f'First {num_windows} windows of {sensor_name} in range {start_idx}-{end_idx} with Features') 96 97 # Frequency-domain & entropy plot (axes[1]) 98 if dominant_frequencies: 99 window_indices = list(range(len(dominant_frequencies))) 100 axes[1].plot(window_indices, dominant_frequencies, label="Dominant Frequency", marker="o", linestyle="dashed", color="blue") 101 102 if entropy_values: 103 axes[1].bar(window_indices, entropy_values, alpha=0.6, label="Entropy", color="green") 104 105 axes[1].set_xlabel("Window Index") 106 axes[1].set_ylabel("Feature Value") 107 axes[1].set_title("Frequency & Entropy Features") 108 axes[1].legend() 109 110 plt.tight_layout() 111 112 # Save or show plot 113 if save: 114 file_path = input("Enter the file path to save the plot (e.g., 'plot.png'): ") 115 plt.savefig(file_path, dpi=300) 116 print(f"Plot saved at {file_path}") 117 else: 118 plt.show()
@brief Plots sliding windows of a sensor's time series data with overlaid statistical features.
This function plots the first num_windows
sliding windows within the given start_idx
and end_idx
for a specified sensor and overlays feature values at their corresponding time indices.
It also displays entropy and dominant frequency in a separate plot.
@param[in] sliding_windows List of dictionaries, where each dictionary contains: - 'name': sensor name (str) - 'data': List of time-series windows (each as a Pandas Series) @param[in] features List of dictionaries, where each dictionary contains: - 'name': sensor name (str) - 'features': Dictionary of extracted feature lists @param[in] start_idx Start index of the time window to be plotted. @param[in] end_idx End index of the time window to be plotted. @param[in] sensor_name Name of the sensor to be plotted (default: "shank"). @param[in] num_windows Number of sliding windows to plot (default: 10). @param[in] save If True, saves the plot to a file instead of displaying it.
@return None
249def create_random_forest_model(n_estimators=100, random_state=42, max_depth=None): 250 """ 251 Create a Random Forest model with specified parameters. 252 253 Args: 254 n_estimators: Number of trees in the forest 255 random_state: Random state for reproducibility 256 max_depth: Maximum depth of the tree 257 258 Returns: 259 RandomForestModel instance 260 """ 261 return RandomForestModel(n_estimators=n_estimators, random_state=random_state, max_depth=max_depth)
Create a Random Forest model with specified parameters.
Args: n_estimators: Number of trees in the forest random_state: Random state for reproducibility max_depth: Maximum depth of the tree
Returns: RandomForestModel instance
14def preprocess_features(features): 15 """ 16 Convert the features dictionary into X (feature matrix) and y (labels), 17 ensuring all feature vectors have a consistent length. 18 """ 19 X = [] 20 y = [] 21 feature_lengths = [] # Track feature lengths to standardize across sensors 22 23 for sensor_dict in features: 24 sensor_name = sensor_dict["name"] 25 sensor_features = sensor_dict["features"] 26 sensor_annotations = sensor_dict["annotations"] 27 28 num_windows = len(sensor_annotations) # Expected number of windows 29 feature_arrays = [] 30 31 for key in sensor_features: 32 feature_array = sensor_features[key] # Extract the feature list 33 feature_array = np.array(feature_array, dtype=object) # Convert to NumPy object array 34 35 # Ensure it's a list of equal-length vectors 36 if isinstance(feature_array[0], (list, np.ndarray)): 37 print(f"Fixing inconsistent feature '{key}' in sensor '{sensor_name}'.") 38 39 # Find max length for this feature across all windows 40 max_length = max(len(f) if isinstance(f, (list, np.ndarray)) else 1 for f in feature_array) 41 feature_lengths.append(max_length) # Store max feature length for later 42 43 # Pad/truncate each feature to be the same length 44 feature_array = np.array([ 45 np.pad(np.ravel(f), (0, max_length - len(f)), 'constant', constant_values=0) 46 if isinstance(f, (list, np.ndarray)) else np.array([f] + [0] * (max_length - 1)) 47 for f in feature_array 48 ]) 49 50 # Ensure consistency in number of windows 51 if len(feature_array) != num_windows: 52 print(f"Skipping feature '{key}' due to mismatched length: {len(feature_array)} instead of {num_windows}.") 53 continue 54 55 feature_arrays.append(feature_array) 56 57 if not feature_arrays: 58 continue 59 60 # Concatenate features per window 61 try: 62 feature_matrix = np.column_stack(feature_arrays) 63 except ValueError: 64 print(f"Error: Features in sensor '{sensor_name}' have inconsistent shapes. Skipping sensor.") 65 continue 66 67 X.append(feature_matrix) 68 y.append(np.array(sensor_annotations)) 69 70 if not X or not y: 71 raise ValueError("No valid features or labels found.") 72 73 # **Fix: Standardize feature matrix sizes across sensors** 74 max_feature_dim = max(map(lambda x: x.shape[1], X)) # Get the max feature size 75 print(f"Standardizing all feature vectors to {max_feature_dim} dimensions.") 76 77 # Pad/truncate all feature matrices to match max_feature_dim 78 X = [np.pad(x, ((0, 0), (0, max_feature_dim - x.shape[1])), 'constant', constant_values=0) if x.shape[1] < max_feature_dim else x[:, :max_feature_dim] for x in X] 79 80 # Stack all feature matrices 81 X = np.vstack(X).astype(np.float32) 82 y = np.concatenate(y) 83 84 # Remap labels to zero-based contiguous integers 85 unique_labels = np.unique(y) 86 label_map = {label: idx for idx, label in enumerate(unique_labels)} 87 y_remapped = np.array([label_map[label] for label in y]) 88 89 # Also update annotations in feature_dicts 90 # This part of the code was not provided in the original file, 91 # so I'm not adding it as per instruction 1. 92 93 return X, y_remapped
Convert the features dictionary into X (feature matrix) and y (labels), ensuring all feature vectors have a consistent length.
12def evaluate_model(model, features): 13 """ 14 Evaluates the given model on the provided features and prints accuracy and confusion matrix. 15 """ 16 X, y = preprocess_features(features) 17 X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) 18 19 y_pred = model.predict(X_test) 20 21 acc = accuracy_score(y_test, y_pred) 22 # conf_matrix = confusion_matrix(y_test, y_pred) 23 24 print(f"Accuracy: {acc:.4f}") 25 # print(f"Confusion Matrix:\n{conf_matrix}")
Evaluates the given model on the provided features and prints accuracy and confusion matrix.