gaitsetpy

GaitSetPy - A Python package for gait analysis and recognition.

This package provides a comprehensive toolkit for gait data analysis with both a modern class-based architecture and legacy function-based API for backward compatibility.

Features:

  • Modular architecture with singleton design pattern
  • Plugin-based system for easy extension
  • Comprehensive dataset loaders (Daphnet, MobiFall, Arduous, PhysioNet)
  • Feature extraction and preprocessing pipelines
  • Machine learning models for classification
  • Exploratory data analysis tools
  • Backward compatibility with legacy API

Architecture:

  • Core: Base classes and singleton managers
  • Dataset: Data loading and preprocessing
  • Features: Feature extraction and analysis
  • Preprocessing: Data cleaning and transformation
  • EDA: Exploratory data analysis and visualization
  • Classification: Machine learning models and evaluation

Maintainer: @aharshit123456

  1"""
  2GaitSetPy - A Python package for gait analysis and recognition.
  3
  4This package provides a comprehensive toolkit for gait data analysis with both
  5a modern class-based architecture and legacy function-based API for backward compatibility.
  6
  7Features:
  8- Modular architecture with singleton design pattern
  9- Plugin-based system for easy extension
 10- Comprehensive dataset loaders (Daphnet, MobiFall, Arduous, PhysioNet)
 11- Feature extraction and preprocessing pipelines
 12- Machine learning models for classification
 13- Exploratory data analysis tools
 14- Backward compatibility with legacy API
 15
 16Architecture:
 17- Core: Base classes and singleton managers
 18- Dataset: Data loading and preprocessing
 19- Features: Feature extraction and analysis
 20- Preprocessing: Data cleaning and transformation
 21- EDA: Exploratory data analysis and visualization
 22- Classification: Machine learning models and evaluation
 23
 24Maintainer: @aharshit123456
 25"""
 26
 27# Core architecture components
 28from .core import (
 29    BaseDatasetLoader,
 30    BaseFeatureExtractor,
 31    BasePreprocessor,
 32    BaseEDAAnalyzer,
 33    BaseClassificationModel,
 34    DatasetManager,
 35    FeatureManager,
 36    PreprocessingManager,
 37    EDAManager,
 38    ClassificationManager
 39)
 40
 41# New class-based API
 42from .dataset import (
 43    DaphnetLoader,
 44    MobiFallLoader,
 45    ArduousLoader,
 46    PhysioNetLoader,
 47    HARUPLoader,
 48    get_dataset_manager,
 49    get_available_datasets,
 50    load_dataset
 51)
 52
 53from .features import (
 54    GaitFeatureExtractor,
 55    LBPFeatureExtractor,
 56    FourierSeriesFeatureExtractor,
 57    PhysioNetFeatureExtractor,
 58    get_feature_manager,
 59    get_available_extractors,
 60    extract_features
 61)
 62
 63from .preprocessing import (
 64    ClippingPreprocessor,
 65    NoiseRemovalPreprocessor,
 66    OutlierRemovalPreprocessor,
 67    BaselineRemovalPreprocessor,
 68    DriftRemovalPreprocessor,
 69    HighFrequencyNoiseRemovalPreprocessor,
 70    LowFrequencyNoiseRemovalPreprocessor,
 71    ArtifactRemovalPreprocessor,
 72    TrendRemovalPreprocessor,
 73    DCOffsetRemovalPreprocessor,
 74    get_preprocessing_manager,
 75    get_available_preprocessors,
 76    preprocess_data,
 77    create_preprocessing_pipeline
 78)
 79
 80from .eda import (
 81    DaphnetVisualizationAnalyzer,
 82    SensorStatisticsAnalyzer,
 83    get_eda_manager,
 84    get_available_analyzers,
 85    analyze_data,
 86    visualize_data,
 87    plot_daphnet_data,
 88    analyze_sensor_statistics,
 89    plot_sensor_features
 90)
 91
 92from .classification import (
 93    RandomForestModel,
 94    get_classification_manager,
 95    get_available_models,
 96    train_model,
 97    predict,
 98    evaluate_model_performance,
 99    create_random_forest,
100    train_random_forest
101)
102
103# Legacy API for backward compatibility
104# Explicitly import all public exports from submodules instead of using wildcard imports
105# This improves code clarity and makes it easier to track what's being exported
106
107# Dataset legacy functions
108from .dataset import (
109    load_daphnet_data,
110    create_sliding_windows,
111    load_mobifall_data,
112    load_arduous_data,
113    load_physionet_data,
114    create_physionet_windows,
115    load_harup_data,
116    create_harup_windows,
117    extract_harup_features,
118    download_dataset,
119    extract_dataset,
120    sliding_window
121)
122
123# Features legacy functions
124from .features import (
125    calculate_mean,
126    calculate_standard_deviation,
127    calculate_variance,
128    calculate_skewness,
129    calculate_kurtosis,
130    calculate_root_mean_square,
131    calculate_range,
132    calculate_median,
133    calculate_mode,
134    calculate_mean_absolute_value,
135    calculate_median_absolute_deviation,
136    calculate_peak_height,
137    calculate_stride_times,
138    calculate_step_time,
139    calculate_cadence,
140    calculate_freezing_index,
141    calculate_dominant_frequency,
142    calculate_peak_frequency,
143    calculate_power_spectral_entropy,
144    calculate_principal_harmonic_frequency,
145    calculate_entropy,
146    calculate_interquartile_range,
147    calculate_correlation,
148    calculate_auto_regression_coefficients,
149    get_mean_for_windows,
150    get_standard_deviation_for_windows,
151    get_variance_for_windows
152)
153
154# Preprocessing legacy functions
155from .preprocessing import (
156    clip_sliding_windows,
157    remove_noise,
158    remove_outliers,
159    remove_baseline,
160    remove_drift,
161    remove_artifacts,
162    remove_trend,
163    remove_dc_offset,
164    remove_high_frequency_noise,
165    remove_low_frequency_noise
166)
167
168# EDA legacy functions
169from .eda import (
170    plot_thigh_data,
171    plot_shank_data,
172    plot_trunk_data,
173    plot_all_data,
174    plot_all_thigh_data,
175    plot_all_shank_data,
176    plot_all_trunk_data,
177    plot_all_datasets,
178    plot_sensor_with_features
179)
180
181# Classification legacy functions
182from .classification import (
183    create_random_forest_model,
184    preprocess_features,
185    evaluate_model
186)
187
188# Import version from single source of truth
189from ._version import __version__, get_version, get_version_info, get_release_info
190__author__ = "Harshit Agarwal | Alohomora Labs"
191
192# Convenient access to all managers
193def get_all_managers():
194    """
195    Get all singleton managers.
196    
197    Returns:
198        Dictionary containing all manager instances
199    """
200    return {
201        'dataset': DatasetManager(),
202        'feature': FeatureManager(),
203        'preprocessing': PreprocessingManager(),
204        'eda': EDAManager(),
205        'classification': ClassificationManager()
206    }
207
208# System information
209def get_system_info():
210    """
211    Get information about the available components in the system.
212    
213    Returns:
214        Dictionary containing system information
215    """
216    return {
217        'version': __version__,
218        'author': __author__,
219        'available_datasets': get_available_datasets(),
220        'available_extractors': get_available_extractors(),
221        'available_preprocessors': get_available_preprocessors(),
222        'available_analyzers': get_available_analyzers(),
223        'available_models': get_available_models(),
224        'architecture': 'Modular with singleton design pattern'
225    }
226
227# Shortcut functions for common workflows
228def load_and_analyze_daphnet(data_dir: str, sensor_type: str = 'all', window_size: int = 192):
229    """
230    Complete workflow for loading and analyzing Daphnet data.
231    
232    Args:
233        data_dir: Directory containing the Daphnet dataset
234        sensor_type: Type of sensor to analyze ('all', 'thigh', 'shank', 'trunk')
235        window_size: Size of sliding windows for feature extraction
236        
237    Returns:
238        Dictionary containing data, features, and analysis results
239    """
240    # Load dataset
241    loader = DaphnetLoader()
242    data, names = loader.load_data(data_dir)
243    
244    # Create sliding windows
245    windows = loader.create_sliding_windows(data, names, window_size=window_size)
246    
247    # Extract features
248    extractor = GaitFeatureExtractor()
249    features = extractor.extract_features(windows[0]['windows'], fs=64)
250    
251    # Analyze data
252    analyzer = DaphnetVisualizationAnalyzer()
253    analysis = analyzer.analyze(data)
254    
255    return {
256        'data': data,
257        'names': names,
258        'windows': windows,
259        'features': features,
260        'analysis': analysis,
261        'loader': loader,
262        'extractor': extractor,
263        'analyzer': analyzer
264    }
265
266def load_and_analyze_physionet(data_dir: str, window_size: int = 600, step_size: int = 100):
267    """
268    Complete workflow for loading and analyzing PhysioNet VGRF data.
269    
270    Args:
271        data_dir: Directory to store/find the PhysioNet dataset
272        window_size: Size of sliding windows for feature extraction (default: 600)
273        step_size: Step size for sliding windows (default: 100)
274        
275    Returns:
276        Dictionary containing data, features, and analysis results
277    """
278    # Load dataset
279    loader = PhysioNetLoader()
280    data, names = loader.load_data(data_dir)
281    
282    # Create sliding windows
283    windows = loader.create_sliding_windows(data, names, window_size=window_size, step_size=step_size)
284    
285    # Extract PhysioNet-specific features
286    extractor = PhysioNetFeatureExtractor()
287    all_features = []
288    
289    for window_dict in windows:
290        if 'windows' in window_dict:
291            features = extractor.extract_features(window_dict['windows'], fs=100)
292            all_features.append({
293                'name': window_dict['name'],
294                'features': features,
295                'metadata': window_dict.get('metadata', {})
296            })
297    
298    return {
299        'data': data,
300        'names': names,
301        'windows': windows,
302        'features': all_features,
303        'labels': loader.get_labels(),
304        'loader': loader,
305        'extractor': extractor
306    }
307
308def train_gait_classifier(features, model_type: str = 'random_forest', **kwargs):
309    """
310    Train a gait classification model.
311    
312    Args:
313        features: List of feature dictionaries
314        model_type: Type of model to train ('random_forest', etc.)
315        **kwargs: Additional arguments for model training
316        
317    Returns:
318        Trained model instance
319    """
320    if model_type == 'random_forest':
321        model = RandomForestModel(**kwargs)
322        model.train(features, **kwargs)
323        return model
324    else:
325        raise ValueError(f"Model type '{model_type}' not supported")
326
327__all__ = [
328    # Core architecture
329    'BaseDatasetLoader',
330    'BaseFeatureExtractor', 
331    'BasePreprocessor',
332    'BaseEDAAnalyzer',
333    'BaseClassificationModel',
334    'DatasetManager',
335    'FeatureManager',
336    'PreprocessingManager',
337    'EDAManager',
338    'ClassificationManager',
339    
340    # New class-based API
341    'DaphnetLoader',
342    'MobiFallLoader',
343    'ArduousLoader',
344    'PhysioNetLoader',
345    'GaitFeatureExtractor',
346    'LBPFeatureExtractor',
347    'FourierSeriesFeatureExtractor',
348    'PhysioNetFeatureExtractor',
349    'ClippingPreprocessor',
350    'NoiseRemovalPreprocessor',
351    'OutlierRemovalPreprocessor',
352    'BaselineRemovalPreprocessor',
353    'DriftRemovalPreprocessor',
354    'HighFrequencyNoiseRemovalPreprocessor',
355    'LowFrequencyNoiseRemovalPreprocessor',
356    'ArtifactRemovalPreprocessor',
357    'TrendRemovalPreprocessor',
358    'DCOffsetRemovalPreprocessor',
359    'DaphnetVisualizationAnalyzer',
360    'SensorStatisticsAnalyzer',
361    'RandomForestModel',
362    
363    # Manager access functions
364    'get_dataset_manager',
365    'get_feature_manager',
366    'get_preprocessing_manager',
367    'get_eda_manager',
368    'get_classification_manager',
369    'get_all_managers',
370    
371    # Utility functions
372    'get_available_datasets',
373    'get_available_extractors',
374    'get_available_preprocessors',
375    'get_available_analyzers',
376    'get_available_models',
377    'get_system_info',
378    
379    # Workflow functions
380    'load_and_analyze_daphnet',
381    'load_and_analyze_physionet',
382    'train_gait_classifier',
383    
384    # Legacy dataset functions
385    'load_daphnet_data',
386    'create_sliding_windows',
387    'load_mobifall_data',
388    'load_arduous_data',
389    'load_physionet_data',
390    'create_physionet_windows',
391    'load_harup_data',
392    'create_harup_windows',
393    'extract_harup_features',
394    'download_dataset',
395    'extract_dataset',
396    'sliding_window',
397    
398    # Legacy feature functions
399    'calculate_mean',
400    'calculate_standard_deviation',
401    'calculate_variance',
402    'calculate_skewness',
403    'calculate_kurtosis',
404    'calculate_root_mean_square',
405    'calculate_range',
406    'calculate_median',
407    'calculate_mode',
408    'calculate_mean_absolute_value',
409    'calculate_median_absolute_deviation',
410    'calculate_peak_height',
411    'calculate_stride_times',
412    'calculate_step_time',
413    'calculate_cadence',
414    'calculate_freezing_index',
415    'calculate_dominant_frequency',
416    'calculate_peak_frequency',
417    'calculate_power_spectral_entropy',
418    'calculate_principal_harmonic_frequency',
419    'calculate_entropy',
420    'calculate_interquartile_range',
421    'calculate_correlation',
422    'calculate_auto_regression_coefficients',
423    'get_mean_for_windows',
424    'get_standard_deviation_for_windows',
425    'get_variance_for_windows',
426    
427    # Legacy preprocessing functions
428    'clip_sliding_windows',
429    'remove_noise',
430    'remove_outliers',
431    'remove_baseline',
432    'remove_drift',
433    'remove_artifacts',
434    'remove_trend',
435    'remove_dc_offset',
436    'remove_high_frequency_noise',
437    'remove_low_frequency_noise',
438    
439    # Legacy EDA functions
440    'plot_thigh_data',
441    'plot_shank_data',
442    'plot_trunk_data',
443    'plot_all_data',
444    'plot_all_thigh_data',
445    'plot_all_shank_data',
446    'plot_all_trunk_data',
447    'plot_all_datasets',
448    'plot_sensor_with_features',
449    
450    # Legacy classification functions
451    'create_random_forest_model',
452    'preprocess_features',
453    'evaluate_model',
454]
class BaseDatasetLoader(abc.ABC):
 21class BaseDatasetLoader(ABC):
 22    """
 23    Base class for all dataset loaders.
 24    
 25    All dataset loaders should inherit from this class and implement the required methods.
 26    This class provides thread-safe concurrent downloading capabilities for efficient data retrieval.
 27    """
 28    
 29    def __init__(self, name: str, description: str = "", max_workers: int = 8):
 30        """
 31        Initialize the dataset loader.
 32        
 33        Args:
 34            name: Name of the dataset
 35            description: Description of the dataset
 36            max_workers: Maximum number of concurrent download threads (default: 8)
 37        """
 38        self.name = name
 39        self.description = description
 40        self.data = None
 41        self.metadata = {}
 42        self.max_workers = max_workers
 43        self._download_stats = {'success': 0, 'failed': 0, 'skipped': 0}
 44    
 45    @abstractmethod
 46    def load_data(self, data_dir: str, **kwargs) -> Tuple[List[pd.DataFrame], List[str]]:
 47        """
 48        Load dataset from the specified directory.
 49        
 50        Args:
 51            data_dir: Directory containing the dataset
 52            **kwargs: Additional arguments specific to the dataset
 53            
 54        Returns:
 55            Tuple of (data_list, names_list)
 56        """
 57        pass
 58    
 59    @abstractmethod
 60    def create_sliding_windows(self, data: List[pd.DataFrame], names: List[str], 
 61                             window_size: int = 192, step_size: int = 32) -> List[Dict]:
 62        """
 63        Create sliding windows from the loaded data.
 64        
 65        Args:
 66            data: List of DataFrames
 67            names: List of names corresponding to the data
 68            window_size: Size of each sliding window
 69            step_size: Step size for sliding windows
 70            
 71        Returns:
 72            List of dictionaries containing sliding windows
 73        """
 74        pass
 75    
 76    @abstractmethod
 77    def get_supported_formats(self) -> List[str]:
 78        """
 79        Get list of supported file formats.
 80        
 81        Returns:
 82            List of supported file extensions
 83        """
 84        pass
 85    
 86    def _download_file(self, url: str, dest_path: str, 
 87                      chunk_size: int = 8192, timeout: int = 30) -> Tuple[bool, str]:
 88        """
 89        Download a single file from URL to destination path.
 90        
 91        This method is thread-safe and can be called concurrently.
 92        
 93        Args:
 94            url: URL to download from
 95            dest_path: Destination file path
 96            chunk_size: Size of chunks to download (default: 8192 bytes)
 97            timeout: Request timeout in seconds (default: 30)
 98            
 99        Returns:
100            Tuple of (success: bool, message: str)
101        """
102        try:
103            # Check if file already exists
104            if os.path.exists(dest_path):
105                self._download_stats['skipped'] += 1
106                return True, f"File already exists: {dest_path}"
107            
108            # Make the request
109            response = requests.get(url, stream=True, timeout=timeout)
110            
111            if response.status_code == 200:
112                # Ensure parent directory exists
113                os.makedirs(os.path.dirname(dest_path) if os.path.dirname(dest_path) else '.', exist_ok=True)
114                
115                # Write file in chunks
116                with open(dest_path, 'wb') as f:
117                    for chunk in response.iter_content(chunk_size=chunk_size):
118                        if chunk:
119                            f.write(chunk)
120                
121                self._download_stats['success'] += 1
122                return True, f"Successfully downloaded: {dest_path}"
123            else:
124                self._download_stats['failed'] += 1
125                return False, f"HTTP {response.status_code}: {url}"
126                
127        except requests.exceptions.Timeout:
128            self._download_stats['failed'] += 1
129            return False, f"Timeout downloading: {url}"
130        except requests.exceptions.RequestException as e:
131            self._download_stats['failed'] += 1
132            return False, f"Request error for {url}: {str(e)}"
133        except IOError as e:
134            self._download_stats['failed'] += 1
135            return False, f"IO error for {dest_path}: {str(e)}"
136        except Exception as e:
137            self._download_stats['failed'] += 1
138            return False, f"Unexpected error for {url}: {str(e)}"
139    
140    def download_files_concurrent(self, 
141                                  download_tasks: List[Dict[str, str]], 
142                                  show_progress: bool = True,
143                                  desc: str = "Downloading files") -> Dict[str, Any]:
144        """
145        Download multiple files concurrently using a thread pool.
146        
147        Args:
148            download_tasks: List of dicts with 'url' and 'dest_path' keys
149            show_progress: Whether to show progress bar (default: True)
150            desc: Description for progress bar
151            
152        Returns:
153            Dictionary with download statistics and results
154            
155        Example:
156            tasks = [
157                {'url': 'http://example.com/file1.txt', 'dest_path': '/path/to/file1.txt'},
158                {'url': 'http://example.com/file2.txt', 'dest_path': '/path/to/file2.txt'}
159            ]
160            results = loader.download_files_concurrent(tasks)
161        """
162        # Reset stats
163        self._download_stats = {'success': 0, 'failed': 0, 'skipped': 0}
164        
165        results = []
166        failed_downloads = []
167        
168        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
169            # Submit all download tasks
170            future_to_task = {
171                executor.submit(self._download_file, task['url'], task['dest_path']): task
172                for task in download_tasks
173            }
174            
175            # Process completed tasks with optional progress bar
176            if show_progress:
177                futures = tqdm(as_completed(future_to_task), 
178                             total=len(download_tasks), 
179                             desc=desc)
180            else:
181                futures = as_completed(future_to_task)
182            
183            for future in futures:
184                task = future_to_task[future]
185                try:
186                    success, message = future.result()
187                    results.append({
188                        'url': task['url'],
189                        'dest_path': task['dest_path'],
190                        'success': success,
191                        'message': message
192                    })
193                    
194                    if not success:
195                        failed_downloads.append({
196                            'url': task['url'],
197                            'dest_path': task['dest_path'],
198                            'error': message
199                        })
200                        
201                except Exception as e:
202                    error_msg = f"Exception during download: {str(e)}"
203                    results.append({
204                        'url': task['url'],
205                        'dest_path': task['dest_path'],
206                        'success': False,
207                        'message': error_msg
208                    })
209                    failed_downloads.append({
210                        'url': task['url'],
211                        'dest_path': task['dest_path'],
212                        'error': error_msg
213                    })
214        
215        # Return comprehensive results
216        return {
217            'total': len(download_tasks),
218            'success': self._download_stats['success'],
219            'failed': self._download_stats['failed'],
220            'skipped': self._download_stats['skipped'],
221            'failed_downloads': failed_downloads,
222            'all_results': results
223        }
224    
225    def set_max_workers(self, max_workers: int):
226        """
227        Set the maximum number of concurrent download threads.
228        
229        Args:
230            max_workers: Maximum number of threads (must be positive)
231        """
232        if max_workers < 1:
233            raise ValueError("max_workers must be at least 1")
234        self.max_workers = max_workers
235    
236    def get_download_stats(self) -> Dict[str, int]:
237        """
238        Get statistics from the last download operation.
239        
240        Returns:
241            Dictionary with success, failed, and skipped counts
242        """
243        return self._download_stats.copy()
244    
245    def get_info(self) -> Dict[str, Any]:
246        """
247        Get information about the dataset.
248        
249        Returns:
250            Dictionary containing dataset information
251        """
252        return {
253            'name': self.name,
254            'description': self.description,
255            'metadata': self.metadata,
256            'supported_formats': self.get_supported_formats(),
257            'max_workers': self.max_workers
258        }

Base class for all dataset loaders.

All dataset loaders should inherit from this class and implement the required methods. This class provides thread-safe concurrent downloading capabilities for efficient data retrieval.

BaseDatasetLoader(name: str, description: str = '', max_workers: int = 8)
29    def __init__(self, name: str, description: str = "", max_workers: int = 8):
30        """
31        Initialize the dataset loader.
32        
33        Args:
34            name: Name of the dataset
35            description: Description of the dataset
36            max_workers: Maximum number of concurrent download threads (default: 8)
37        """
38        self.name = name
39        self.description = description
40        self.data = None
41        self.metadata = {}
42        self.max_workers = max_workers
43        self._download_stats = {'success': 0, 'failed': 0, 'skipped': 0}

Initialize the dataset loader.

Args: name: Name of the dataset description: Description of the dataset max_workers: Maximum number of concurrent download threads (default: 8)

name
description
data
metadata
max_workers
@abstractmethod
def load_data( self, data_dir: str, **kwargs) -> Tuple[List[pandas.core.frame.DataFrame], List[str]]:
45    @abstractmethod
46    def load_data(self, data_dir: str, **kwargs) -> Tuple[List[pd.DataFrame], List[str]]:
47        """
48        Load dataset from the specified directory.
49        
50        Args:
51            data_dir: Directory containing the dataset
52            **kwargs: Additional arguments specific to the dataset
53            
54        Returns:
55            Tuple of (data_list, names_list)
56        """
57        pass

Load dataset from the specified directory.

Args: data_dir: Directory containing the dataset **kwargs: Additional arguments specific to the dataset

Returns: Tuple of (data_list, names_list)

@abstractmethod
def create_sliding_windows( self, data: List[pandas.core.frame.DataFrame], names: List[str], window_size: int = 192, step_size: int = 32) -> List[Dict]:
59    @abstractmethod
60    def create_sliding_windows(self, data: List[pd.DataFrame], names: List[str], 
61                             window_size: int = 192, step_size: int = 32) -> List[Dict]:
62        """
63        Create sliding windows from the loaded data.
64        
65        Args:
66            data: List of DataFrames
67            names: List of names corresponding to the data
68            window_size: Size of each sliding window
69            step_size: Step size for sliding windows
70            
71        Returns:
72            List of dictionaries containing sliding windows
73        """
74        pass

Create sliding windows from the loaded data.

Args: data: List of DataFrames names: List of names corresponding to the data window_size: Size of each sliding window step_size: Step size for sliding windows

Returns: List of dictionaries containing sliding windows

@abstractmethod
def get_supported_formats(self) -> List[str]:
76    @abstractmethod
77    def get_supported_formats(self) -> List[str]:
78        """
79        Get list of supported file formats.
80        
81        Returns:
82            List of supported file extensions
83        """
84        pass

Get list of supported file formats.

Returns: List of supported file extensions

def download_files_concurrent( self, download_tasks: List[Dict[str, str]], show_progress: bool = True, desc: str = 'Downloading files') -> Dict[str, Any]:
140    def download_files_concurrent(self, 
141                                  download_tasks: List[Dict[str, str]], 
142                                  show_progress: bool = True,
143                                  desc: str = "Downloading files") -> Dict[str, Any]:
144        """
145        Download multiple files concurrently using a thread pool.
146        
147        Args:
148            download_tasks: List of dicts with 'url' and 'dest_path' keys
149            show_progress: Whether to show progress bar (default: True)
150            desc: Description for progress bar
151            
152        Returns:
153            Dictionary with download statistics and results
154            
155        Example:
156            tasks = [
157                {'url': 'http://example.com/file1.txt', 'dest_path': '/path/to/file1.txt'},
158                {'url': 'http://example.com/file2.txt', 'dest_path': '/path/to/file2.txt'}
159            ]
160            results = loader.download_files_concurrent(tasks)
161        """
162        # Reset stats
163        self._download_stats = {'success': 0, 'failed': 0, 'skipped': 0}
164        
165        results = []
166        failed_downloads = []
167        
168        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
169            # Submit all download tasks
170            future_to_task = {
171                executor.submit(self._download_file, task['url'], task['dest_path']): task
172                for task in download_tasks
173            }
174            
175            # Process completed tasks with optional progress bar
176            if show_progress:
177                futures = tqdm(as_completed(future_to_task), 
178                             total=len(download_tasks), 
179                             desc=desc)
180            else:
181                futures = as_completed(future_to_task)
182            
183            for future in futures:
184                task = future_to_task[future]
185                try:
186                    success, message = future.result()
187                    results.append({
188                        'url': task['url'],
189                        'dest_path': task['dest_path'],
190                        'success': success,
191                        'message': message
192                    })
193                    
194                    if not success:
195                        failed_downloads.append({
196                            'url': task['url'],
197                            'dest_path': task['dest_path'],
198                            'error': message
199                        })
200                        
201                except Exception as e:
202                    error_msg = f"Exception during download: {str(e)}"
203                    results.append({
204                        'url': task['url'],
205                        'dest_path': task['dest_path'],
206                        'success': False,
207                        'message': error_msg
208                    })
209                    failed_downloads.append({
210                        'url': task['url'],
211                        'dest_path': task['dest_path'],
212                        'error': error_msg
213                    })
214        
215        # Return comprehensive results
216        return {
217            'total': len(download_tasks),
218            'success': self._download_stats['success'],
219            'failed': self._download_stats['failed'],
220            'skipped': self._download_stats['skipped'],
221            'failed_downloads': failed_downloads,
222            'all_results': results
223        }

Download multiple files concurrently using a thread pool.

Args: download_tasks: List of dicts with 'url' and 'dest_path' keys show_progress: Whether to show progress bar (default: True) desc: Description for progress bar

Returns: Dictionary with download statistics and results

Example: tasks = [ {'url': 'http://example.com/file1.txt', 'dest_path': '/path/to/file1.txt'}, {'url': 'http://example.com/file2.txt', 'dest_path': '/path/to/file2.txt'} ] results = loader.download_files_concurrent(tasks)

def set_max_workers(self, max_workers: int):
225    def set_max_workers(self, max_workers: int):
226        """
227        Set the maximum number of concurrent download threads.
228        
229        Args:
230            max_workers: Maximum number of threads (must be positive)
231        """
232        if max_workers < 1:
233            raise ValueError("max_workers must be at least 1")
234        self.max_workers = max_workers

Set the maximum number of concurrent download threads.

Args: max_workers: Maximum number of threads (must be positive)

def get_download_stats(self) -> Dict[str, int]:
236    def get_download_stats(self) -> Dict[str, int]:
237        """
238        Get statistics from the last download operation.
239        
240        Returns:
241            Dictionary with success, failed, and skipped counts
242        """
243        return self._download_stats.copy()

Get statistics from the last download operation.

Returns: Dictionary with success, failed, and skipped counts

def get_info(self) -> Dict[str, Any]:
245    def get_info(self) -> Dict[str, Any]:
246        """
247        Get information about the dataset.
248        
249        Returns:
250            Dictionary containing dataset information
251        """
252        return {
253            'name': self.name,
254            'description': self.description,
255            'metadata': self.metadata,
256            'supported_formats': self.get_supported_formats(),
257            'max_workers': self.max_workers
258        }

Get information about the dataset.

Returns: Dictionary containing dataset information

class BaseFeatureExtractor(abc.ABC):
261class BaseFeatureExtractor(ABC):
262    """
263    Base class for all feature extractors.
264    
265    All feature extractors should inherit from this class and implement the required methods.
266    """
267    
268    def __init__(self, name: str, description: str = ""):
269        """
270        Initialize the feature extractor.
271        
272        Args:
273            name: Name of the feature extractor
274            description: Description of the feature extractor
275        """
276        self.name = name
277        self.description = description
278        self.config = {}
279    
280    @abstractmethod
281    def extract_features(self, windows: List[Dict], fs: int, **kwargs) -> List[Dict]:
282        """
283        Extract features from sliding windows.
284        
285        Args:
286            windows: List of sliding window dictionaries
287            fs: Sampling frequency
288            **kwargs: Additional arguments for feature extraction
289            
290        Returns:
291            List of feature dictionaries
292        """
293        pass
294    
295    @abstractmethod
296    def get_feature_names(self) -> List[str]:
297        """
298        Get names of features extracted by this extractor.
299        
300        Returns:
301            List of feature names
302        """
303        pass
304    
305    def configure(self, config: Dict[str, Any]):
306        """
307        Configure the feature extractor.
308        
309        Args:
310            config: Configuration dictionary
311        """
312        self.config.update(config)
313    
314    def get_info(self) -> Dict[str, Any]:
315        """
316        Get information about the feature extractor.
317        
318        Returns:
319            Dictionary containing feature extractor information
320        """
321        return {
322            'name': self.name,
323            'description': self.description,
324            'config': self.config,
325            'feature_names': self.get_feature_names()
326        }

Base class for all feature extractors.

All feature extractors should inherit from this class and implement the required methods.

BaseFeatureExtractor(name: str, description: str = '')
268    def __init__(self, name: str, description: str = ""):
269        """
270        Initialize the feature extractor.
271        
272        Args:
273            name: Name of the feature extractor
274            description: Description of the feature extractor
275        """
276        self.name = name
277        self.description = description
278        self.config = {}

Initialize the feature extractor.

Args: name: Name of the feature extractor description: Description of the feature extractor

name
description
config
@abstractmethod
def extract_features(self, windows: List[Dict], fs: int, **kwargs) -> List[Dict]:
280    @abstractmethod
281    def extract_features(self, windows: List[Dict], fs: int, **kwargs) -> List[Dict]:
282        """
283        Extract features from sliding windows.
284        
285        Args:
286            windows: List of sliding window dictionaries
287            fs: Sampling frequency
288            **kwargs: Additional arguments for feature extraction
289            
290        Returns:
291            List of feature dictionaries
292        """
293        pass

Extract features from sliding windows.

Args: windows: List of sliding window dictionaries fs: Sampling frequency **kwargs: Additional arguments for feature extraction

Returns: List of feature dictionaries

@abstractmethod
def get_feature_names(self) -> List[str]:
295    @abstractmethod
296    def get_feature_names(self) -> List[str]:
297        """
298        Get names of features extracted by this extractor.
299        
300        Returns:
301            List of feature names
302        """
303        pass

Get names of features extracted by this extractor.

Returns: List of feature names

def configure(self, config: Dict[str, Any]):
305    def configure(self, config: Dict[str, Any]):
306        """
307        Configure the feature extractor.
308        
309        Args:
310            config: Configuration dictionary
311        """
312        self.config.update(config)

Configure the feature extractor.

Args: config: Configuration dictionary

def get_info(self) -> Dict[str, Any]:
314    def get_info(self) -> Dict[str, Any]:
315        """
316        Get information about the feature extractor.
317        
318        Returns:
319            Dictionary containing feature extractor information
320        """
321        return {
322            'name': self.name,
323            'description': self.description,
324            'config': self.config,
325            'feature_names': self.get_feature_names()
326        }

Get information about the feature extractor.

Returns: Dictionary containing feature extractor information

class BasePreprocessor(abc.ABC):
329class BasePreprocessor(ABC):
330    """
331    Base class for all preprocessors.
332    
333    All preprocessors should inherit from this class and implement the required methods.
334    """
335    
336    def __init__(self, name: str, description: str = ""):
337        """
338        Initialize the preprocessor.
339        
340        Args:
341            name: Name of the preprocessor
342            description: Description of the preprocessor
343        """
344        self.name = name
345        self.description = description
346        self.config = {}
347        self.fitted = False
348    
349    @abstractmethod
350    def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs):
351        """
352        Fit the preprocessor to the data.
353        
354        Args:
355            data: Input data to fit on
356            **kwargs: Additional arguments for fitting
357        """
358        pass
359    
360    @abstractmethod
361    def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]:
362        """
363        Transform the data using the fitted preprocessor.
364        
365        Args:
366            data: Input data to transform
367            **kwargs: Additional arguments for transformation
368            
369        Returns:
370            Transformed data
371        """
372        pass
373    
374    def fit_transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]:
375        """
376        Fit the preprocessor and transform the data.
377        
378        Args:
379            data: Input data to fit and transform
380            **kwargs: Additional arguments
381            
382        Returns:
383            Transformed data
384        """
385        self.fit(data, **kwargs)
386        return self.transform(data, **kwargs)
387    
388    def configure(self, config: Dict[str, Any]):
389        """
390        Configure the preprocessor.
391        
392        Args:
393            config: Configuration dictionary
394        """
395        self.config.update(config)
396    
397    def get_info(self) -> Dict[str, Any]:
398        """
399        Get information about the preprocessor.
400        
401        Returns:
402            Dictionary containing preprocessor information
403        """
404        return {
405            'name': self.name,
406            'description': self.description,
407            'config': self.config,
408            'fitted': self.fitted
409        }

Base class for all preprocessors.

All preprocessors should inherit from this class and implement the required methods.

BasePreprocessor(name: str, description: str = '')
336    def __init__(self, name: str, description: str = ""):
337        """
338        Initialize the preprocessor.
339        
340        Args:
341            name: Name of the preprocessor
342            description: Description of the preprocessor
343        """
344        self.name = name
345        self.description = description
346        self.config = {}
347        self.fitted = False

Initialize the preprocessor.

Args: name: Name of the preprocessor description: Description of the preprocessor

name
description
config
fitted
@abstractmethod
def fit( self, data: Union[pandas.core.frame.DataFrame, numpy.ndarray], **kwargs):
349    @abstractmethod
350    def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs):
351        """
352        Fit the preprocessor to the data.
353        
354        Args:
355            data: Input data to fit on
356            **kwargs: Additional arguments for fitting
357        """
358        pass

Fit the preprocessor to the data.

Args: data: Input data to fit on **kwargs: Additional arguments for fitting

@abstractmethod
def transform( self, data: Union[pandas.core.frame.DataFrame, numpy.ndarray], **kwargs) -> Union[pandas.core.frame.DataFrame, numpy.ndarray]:
360    @abstractmethod
361    def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]:
362        """
363        Transform the data using the fitted preprocessor.
364        
365        Args:
366            data: Input data to transform
367            **kwargs: Additional arguments for transformation
368            
369        Returns:
370            Transformed data
371        """
372        pass

Transform the data using the fitted preprocessor.

Args: data: Input data to transform **kwargs: Additional arguments for transformation

Returns: Transformed data

def fit_transform( self, data: Union[pandas.core.frame.DataFrame, numpy.ndarray], **kwargs) -> Union[pandas.core.frame.DataFrame, numpy.ndarray]:
374    def fit_transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]:
375        """
376        Fit the preprocessor and transform the data.
377        
378        Args:
379            data: Input data to fit and transform
380            **kwargs: Additional arguments
381            
382        Returns:
383            Transformed data
384        """
385        self.fit(data, **kwargs)
386        return self.transform(data, **kwargs)

Fit the preprocessor and transform the data.

Args: data: Input data to fit and transform **kwargs: Additional arguments

Returns: Transformed data

def configure(self, config: Dict[str, Any]):
388    def configure(self, config: Dict[str, Any]):
389        """
390        Configure the preprocessor.
391        
392        Args:
393            config: Configuration dictionary
394        """
395        self.config.update(config)

Configure the preprocessor.

Args: config: Configuration dictionary

def get_info(self) -> Dict[str, Any]:
397    def get_info(self) -> Dict[str, Any]:
398        """
399        Get information about the preprocessor.
400        
401        Returns:
402            Dictionary containing preprocessor information
403        """
404        return {
405            'name': self.name,
406            'description': self.description,
407            'config': self.config,
408            'fitted': self.fitted
409        }

Get information about the preprocessor.

Returns: Dictionary containing preprocessor information

class BaseEDAAnalyzer(abc.ABC):
412class BaseEDAAnalyzer(ABC):
413    """
414    Base class for all EDA analyzers.
415    
416    All EDA analyzers should inherit from this class and implement the required methods.
417    """
418    
419    def __init__(self, name: str, description: str = ""):
420        """
421        Initialize the EDA analyzer.
422        
423        Args:
424            name: Name of the EDA analyzer
425            description: Description of the EDA analyzer
426        """
427        self.name = name
428        self.description = description
429        self.config = {}
430    
431    @abstractmethod
432    def analyze(self, data: Union[pd.DataFrame, List[pd.DataFrame]], **kwargs) -> Dict[str, Any]:
433        """
434        Perform analysis on the data.
435        
436        Args:
437            data: Input data to analyze
438            **kwargs: Additional arguments for analysis
439            
440        Returns:
441            Dictionary containing analysis results
442        """
443        pass
444    
445    @abstractmethod
446    def visualize(self, data: Union[pd.DataFrame, List[pd.DataFrame]], **kwargs):
447        """
448        Create visualizations of the data.
449        
450        Args:
451            data: Input data to visualize
452            **kwargs: Additional arguments for visualization
453        """
454        pass
455    
456    def configure(self, config: Dict[str, Any]):
457        """
458        Configure the EDA analyzer.
459        
460        Args:
461            config: Configuration dictionary
462        """
463        self.config.update(config)
464    
465    def get_info(self) -> Dict[str, Any]:
466        """
467        Get information about the EDA analyzer.
468        
469        Returns:
470            Dictionary containing EDA analyzer information
471        """
472        return {
473            'name': self.name,
474            'description': self.description,
475            'config': self.config
476        }

Base class for all EDA analyzers.

All EDA analyzers should inherit from this class and implement the required methods.

BaseEDAAnalyzer(name: str, description: str = '')
419    def __init__(self, name: str, description: str = ""):
420        """
421        Initialize the EDA analyzer.
422        
423        Args:
424            name: Name of the EDA analyzer
425            description: Description of the EDA analyzer
426        """
427        self.name = name
428        self.description = description
429        self.config = {}

Initialize the EDA analyzer.

Args: name: Name of the EDA analyzer description: Description of the EDA analyzer

name
description
config
@abstractmethod
def analyze( self, data: Union[pandas.core.frame.DataFrame, List[pandas.core.frame.DataFrame]], **kwargs) -> Dict[str, Any]:
431    @abstractmethod
432    def analyze(self, data: Union[pd.DataFrame, List[pd.DataFrame]], **kwargs) -> Dict[str, Any]:
433        """
434        Perform analysis on the data.
435        
436        Args:
437            data: Input data to analyze
438            **kwargs: Additional arguments for analysis
439            
440        Returns:
441            Dictionary containing analysis results
442        """
443        pass

Perform analysis on the data.

Args: data: Input data to analyze **kwargs: Additional arguments for analysis

Returns: Dictionary containing analysis results

@abstractmethod
def visualize( self, data: Union[pandas.core.frame.DataFrame, List[pandas.core.frame.DataFrame]], **kwargs):
445    @abstractmethod
446    def visualize(self, data: Union[pd.DataFrame, List[pd.DataFrame]], **kwargs):
447        """
448        Create visualizations of the data.
449        
450        Args:
451            data: Input data to visualize
452            **kwargs: Additional arguments for visualization
453        """
454        pass

Create visualizations of the data.

Args: data: Input data to visualize **kwargs: Additional arguments for visualization

def configure(self, config: Dict[str, Any]):
456    def configure(self, config: Dict[str, Any]):
457        """
458        Configure the EDA analyzer.
459        
460        Args:
461            config: Configuration dictionary
462        """
463        self.config.update(config)

Configure the EDA analyzer.

Args: config: Configuration dictionary

def get_info(self) -> Dict[str, Any]:
465    def get_info(self) -> Dict[str, Any]:
466        """
467        Get information about the EDA analyzer.
468        
469        Returns:
470            Dictionary containing EDA analyzer information
471        """
472        return {
473            'name': self.name,
474            'description': self.description,
475            'config': self.config
476        }

Get information about the EDA analyzer.

Returns: Dictionary containing EDA analyzer information

class BaseClassificationModel(abc.ABC):
479class BaseClassificationModel(ABC):
480    """
481    Base class for all classification models.
482    
483    All classification models should inherit from this class and implement the required methods.
484    """
485    
486    def __init__(self, name: str, description: str = ""):
487        """
488        Initialize the classification model.
489        
490        Args:
491            name: Name of the classification model
492            description: Description of the classification model
493        """
494        self.name = name
495        self.description = description
496        self.model = None
497        self.config = {}
498        self.trained = False
499    
500    @abstractmethod
501    def train(self, features: List[Dict], **kwargs):
502        """
503        Train the classification model.
504        
505        Args:
506            features: List of feature dictionaries
507            **kwargs: Additional arguments for training
508        """
509        pass
510    
511    @abstractmethod
512    def predict(self, features: List[Dict], **kwargs) -> np.ndarray:
513        """
514        Make predictions using the trained model.
515        
516        Args:
517            features: List of feature dictionaries
518            **kwargs: Additional arguments for prediction
519            
520        Returns:
521            Array of predictions
522        """
523        pass
524    
525    @abstractmethod
526    def evaluate(self, features: List[Dict], **kwargs) -> Dict[str, float]:
527        """
528        Evaluate the model performance.
529        
530        Args:
531            features: List of feature dictionaries
532            **kwargs: Additional arguments for evaluation
533            
534        Returns:
535            Dictionary containing evaluation metrics
536        """
537        pass
538    
539    @abstractmethod
540    def save_model(self, filepath: str):
541        """
542        Save the trained model to a file.
543        
544        Args:
545            filepath: Path to save the model
546        """
547        pass
548    
549    @abstractmethod
550    def load_model(self, filepath: str):
551        """
552        Load a trained model from a file.
553        
554        Args:
555            filepath: Path to the saved model
556        """
557        pass
558    
559    def configure(self, config: Dict[str, Any]):
560        """
561        Configure the classification model.
562        
563        Args:
564            config: Configuration dictionary
565        """
566        self.config.update(config)
567    
568    def get_info(self) -> Dict[str, Any]:
569        """
570        Get information about the classification model.
571        
572        Returns:
573            Dictionary containing model information
574        """
575        return {
576            'name': self.name,
577            'description': self.description,
578            'config': self.config,
579            'trained': self.trained
580        } 

Base class for all classification models.

All classification models should inherit from this class and implement the required methods.

BaseClassificationModel(name: str, description: str = '')
486    def __init__(self, name: str, description: str = ""):
487        """
488        Initialize the classification model.
489        
490        Args:
491            name: Name of the classification model
492            description: Description of the classification model
493        """
494        self.name = name
495        self.description = description
496        self.model = None
497        self.config = {}
498        self.trained = False

Initialize the classification model.

Args: name: Name of the classification model description: Description of the classification model

name
description
model
config
trained
@abstractmethod
def train(self, features: List[Dict], **kwargs):
500    @abstractmethod
501    def train(self, features: List[Dict], **kwargs):
502        """
503        Train the classification model.
504        
505        Args:
506            features: List of feature dictionaries
507            **kwargs: Additional arguments for training
508        """
509        pass

Train the classification model.

Args: features: List of feature dictionaries **kwargs: Additional arguments for training

@abstractmethod
def predict(self, features: List[Dict], **kwargs) -> numpy.ndarray:
511    @abstractmethod
512    def predict(self, features: List[Dict], **kwargs) -> np.ndarray:
513        """
514        Make predictions using the trained model.
515        
516        Args:
517            features: List of feature dictionaries
518            **kwargs: Additional arguments for prediction
519            
520        Returns:
521            Array of predictions
522        """
523        pass

Make predictions using the trained model.

Args: features: List of feature dictionaries **kwargs: Additional arguments for prediction

Returns: Array of predictions

@abstractmethod
def evaluate(self, features: List[Dict], **kwargs) -> Dict[str, float]:
525    @abstractmethod
526    def evaluate(self, features: List[Dict], **kwargs) -> Dict[str, float]:
527        """
528        Evaluate the model performance.
529        
530        Args:
531            features: List of feature dictionaries
532            **kwargs: Additional arguments for evaluation
533            
534        Returns:
535            Dictionary containing evaluation metrics
536        """
537        pass

Evaluate the model performance.

Args: features: List of feature dictionaries **kwargs: Additional arguments for evaluation

Returns: Dictionary containing evaluation metrics

@abstractmethod
def save_model(self, filepath: str):
539    @abstractmethod
540    def save_model(self, filepath: str):
541        """
542        Save the trained model to a file.
543        
544        Args:
545            filepath: Path to save the model
546        """
547        pass

Save the trained model to a file.

Args: filepath: Path to save the model

@abstractmethod
def load_model(self, filepath: str):
549    @abstractmethod
550    def load_model(self, filepath: str):
551        """
552        Load a trained model from a file.
553        
554        Args:
555            filepath: Path to the saved model
556        """
557        pass

Load a trained model from a file.

Args: filepath: Path to the saved model

def configure(self, config: Dict[str, Any]):
559    def configure(self, config: Dict[str, Any]):
560        """
561        Configure the classification model.
562        
563        Args:
564            config: Configuration dictionary
565        """
566        self.config.update(config)

Configure the classification model.

Args: config: Configuration dictionary

def get_info(self) -> Dict[str, Any]:
568    def get_info(self) -> Dict[str, Any]:
569        """
570        Get information about the classification model.
571        
572        Returns:
573            Dictionary containing model information
574        """
575        return {
576            'name': self.name,
577            'description': self.description,
578            'config': self.config,
579            'trained': self.trained
580        } 

Get information about the classification model.

Returns: Dictionary containing model information

class DatasetManager(gaitsetpy.core.managers.BaseManager):
138class DatasetManager(BaseManager):
139    """
140    Singleton manager for dataset loaders.
141    """
142    
143    def register_dataset(self, name: str, dataset_class: Type[BaseDatasetLoader]):
144        """
145        Register a dataset loader.
146        
147        Args:
148            name: Name to register the dataset under
149            dataset_class: Dataset loader class
150        """
151        if not issubclass(dataset_class, BaseDatasetLoader):
152            raise ValueError(f"Dataset class must inherit from BaseDatasetLoader")
153        self.register(name, dataset_class)
154    
155    def load_dataset(self, name: str, data_dir: str, **kwargs) -> BaseDatasetLoader:
156        """
157        Load a dataset using the registered loader.
158        
159        Args:
160            name: Name of the dataset loader
161            data_dir: Directory containing the dataset
162            **kwargs: Additional arguments for the loader
163            
164        Returns:
165            Dataset loader instance with loaded data
166        """
167        loader = self.create_instance(name, name, f"{name} dataset loader")
168        loader.load_data(data_dir, **kwargs)
169        return loader

Singleton manager for dataset loaders.

def register_dataset( self, name: str, dataset_class: Type[BaseDatasetLoader]):
143    def register_dataset(self, name: str, dataset_class: Type[BaseDatasetLoader]):
144        """
145        Register a dataset loader.
146        
147        Args:
148            name: Name to register the dataset under
149            dataset_class: Dataset loader class
150        """
151        if not issubclass(dataset_class, BaseDatasetLoader):
152            raise ValueError(f"Dataset class must inherit from BaseDatasetLoader")
153        self.register(name, dataset_class)

Register a dataset loader.

Args: name: Name to register the dataset under dataset_class: Dataset loader class

def load_dataset( self, name: str, data_dir: str, **kwargs) -> BaseDatasetLoader:
155    def load_dataset(self, name: str, data_dir: str, **kwargs) -> BaseDatasetLoader:
156        """
157        Load a dataset using the registered loader.
158        
159        Args:
160            name: Name of the dataset loader
161            data_dir: Directory containing the dataset
162            **kwargs: Additional arguments for the loader
163            
164        Returns:
165            Dataset loader instance with loaded data
166        """
167        loader = self.create_instance(name, name, f"{name} dataset loader")
168        loader.load_data(data_dir, **kwargs)
169        return loader

Load a dataset using the registered loader.

Args: name: Name of the dataset loader data_dir: Directory containing the dataset **kwargs: Additional arguments for the loader

Returns: Dataset loader instance with loaded data

class FeatureManager(gaitsetpy.core.managers.BaseManager):
172class FeatureManager(BaseManager):
173    """
174    Singleton manager for feature extractors.
175    """
176    
177    def register_extractor(self, name: str, extractor_class: Type[BaseFeatureExtractor]):
178        """
179        Register a feature extractor.
180        
181        Args:
182            name: Name to register the extractor under
183            extractor_class: Feature extractor class
184        """
185        if not issubclass(extractor_class, BaseFeatureExtractor):
186            raise ValueError(f"Extractor class must inherit from BaseFeatureExtractor")
187        self.register(name, extractor_class)
188    
189    def extract_features(self, extractor_name: str, windows: List[Dict], fs: int, **kwargs) -> List[Dict]:
190        """
191        Extract features using the specified extractor.
192        
193        Args:
194            extractor_name: Name of the feature extractor
195            windows: List of sliding window dictionaries
196            fs: Sampling frequency
197            **kwargs: Additional arguments for feature extraction
198            
199        Returns:
200            List of feature dictionaries
201        """
202        extractor = self.get_cached_instance(extractor_name, extractor_name, f"{extractor_name} feature extractor")
203        return extractor.extract_features(windows, fs, **kwargs)

Singleton manager for feature extractors.

def register_extractor( self, name: str, extractor_class: Type[BaseFeatureExtractor]):
177    def register_extractor(self, name: str, extractor_class: Type[BaseFeatureExtractor]):
178        """
179        Register a feature extractor.
180        
181        Args:
182            name: Name to register the extractor under
183            extractor_class: Feature extractor class
184        """
185        if not issubclass(extractor_class, BaseFeatureExtractor):
186            raise ValueError(f"Extractor class must inherit from BaseFeatureExtractor")
187        self.register(name, extractor_class)

Register a feature extractor.

Args: name: Name to register the extractor under extractor_class: Feature extractor class

def extract_features( self, extractor_name: str, windows: List[Dict], fs: int, **kwargs) -> List[Dict]:
189    def extract_features(self, extractor_name: str, windows: List[Dict], fs: int, **kwargs) -> List[Dict]:
190        """
191        Extract features using the specified extractor.
192        
193        Args:
194            extractor_name: Name of the feature extractor
195            windows: List of sliding window dictionaries
196            fs: Sampling frequency
197            **kwargs: Additional arguments for feature extraction
198            
199        Returns:
200            List of feature dictionaries
201        """
202        extractor = self.get_cached_instance(extractor_name, extractor_name, f"{extractor_name} feature extractor")
203        return extractor.extract_features(windows, fs, **kwargs)

Extract features using the specified extractor.

Args: extractor_name: Name of the feature extractor windows: List of sliding window dictionaries fs: Sampling frequency **kwargs: Additional arguments for feature extraction

Returns: List of feature dictionaries

class PreprocessingManager(gaitsetpy.core.managers.BaseManager):
206class PreprocessingManager(BaseManager):
207    """
208    Singleton manager for preprocessors.
209    """
210    
211    def register_preprocessor(self, name: str, preprocessor_class: Type[BasePreprocessor]):
212        """
213        Register a preprocessor.
214        
215        Args:
216            name: Name to register the preprocessor under
217            preprocessor_class: Preprocessor class
218        """
219        if not issubclass(preprocessor_class, BasePreprocessor):
220            raise ValueError(f"Preprocessor class must inherit from BasePreprocessor")
221        self.register(name, preprocessor_class)
222    
223    def preprocess_data(self, preprocessor_name: str, data: Any, **kwargs) -> Any:
224        """
225        Preprocess data using the specified preprocessor.
226        
227        Args:
228            preprocessor_name: Name of the preprocessor
229            data: Input data to preprocess
230            **kwargs: Additional arguments for preprocessing
231            
232        Returns:
233            Preprocessed data
234        """
235        preprocessor = self.get_cached_instance(preprocessor_name, preprocessor_name, f"{preprocessor_name} preprocessor")
236        return preprocessor.fit_transform(data, **kwargs)

Singleton manager for preprocessors.

def register_preprocessor( self, name: str, preprocessor_class: Type[BasePreprocessor]):
211    def register_preprocessor(self, name: str, preprocessor_class: Type[BasePreprocessor]):
212        """
213        Register a preprocessor.
214        
215        Args:
216            name: Name to register the preprocessor under
217            preprocessor_class: Preprocessor class
218        """
219        if not issubclass(preprocessor_class, BasePreprocessor):
220            raise ValueError(f"Preprocessor class must inherit from BasePreprocessor")
221        self.register(name, preprocessor_class)

Register a preprocessor.

Args: name: Name to register the preprocessor under preprocessor_class: Preprocessor class

def preprocess_data(self, preprocessor_name: str, data: Any, **kwargs) -> Any:
223    def preprocess_data(self, preprocessor_name: str, data: Any, **kwargs) -> Any:
224        """
225        Preprocess data using the specified preprocessor.
226        
227        Args:
228            preprocessor_name: Name of the preprocessor
229            data: Input data to preprocess
230            **kwargs: Additional arguments for preprocessing
231            
232        Returns:
233            Preprocessed data
234        """
235        preprocessor = self.get_cached_instance(preprocessor_name, preprocessor_name, f"{preprocessor_name} preprocessor")
236        return preprocessor.fit_transform(data, **kwargs)

Preprocess data using the specified preprocessor.

Args: preprocessor_name: Name of the preprocessor data: Input data to preprocess **kwargs: Additional arguments for preprocessing

Returns: Preprocessed data

class EDAManager(gaitsetpy.core.managers.BaseManager):
239class EDAManager(BaseManager):
240    """
241    Singleton manager for EDA analyzers.
242    """
243    
244    def register_analyzer(self, name: str, analyzer_class: Type[BaseEDAAnalyzer]):
245        """
246        Register an EDA analyzer.
247        
248        Args:
249            name: Name to register the analyzer under
250            analyzer_class: EDA analyzer class
251        """
252        if not issubclass(analyzer_class, BaseEDAAnalyzer):
253            raise ValueError(f"Analyzer class must inherit from BaseEDAAnalyzer")
254        self.register(name, analyzer_class)
255    
256    def analyze_data(self, analyzer_name: str, data: Any, **kwargs) -> Dict[str, Any]:
257        """
258        Analyze data using the specified analyzer.
259        
260        Args:
261            analyzer_name: Name of the EDA analyzer
262            data: Input data to analyze
263            **kwargs: Additional arguments for analysis
264            
265        Returns:
266            Analysis results dictionary
267        """
268        analyzer = self.get_cached_instance(analyzer_name, analyzer_name, f"{analyzer_name} analyzer")
269        return analyzer.analyze(data, **kwargs)
270    
271    def visualize_data(self, analyzer_name: str, data: Any, **kwargs):
272        """
273        Create visualizations using the specified analyzer.
274        
275        Args:
276            analyzer_name: Name of the EDA analyzer
277            data: Input data to visualize
278            **kwargs: Additional arguments for visualization
279        """
280        analyzer = self.get_cached_instance(analyzer_name, analyzer_name, f"{analyzer_name} analyzer")
281        analyzer.visualize(data, **kwargs)

Singleton manager for EDA analyzers.

def register_analyzer( self, name: str, analyzer_class: Type[BaseEDAAnalyzer]):
244    def register_analyzer(self, name: str, analyzer_class: Type[BaseEDAAnalyzer]):
245        """
246        Register an EDA analyzer.
247        
248        Args:
249            name: Name to register the analyzer under
250            analyzer_class: EDA analyzer class
251        """
252        if not issubclass(analyzer_class, BaseEDAAnalyzer):
253            raise ValueError(f"Analyzer class must inherit from BaseEDAAnalyzer")
254        self.register(name, analyzer_class)

Register an EDA analyzer.

Args: name: Name to register the analyzer under analyzer_class: EDA analyzer class

def analyze_data(self, analyzer_name: str, data: Any, **kwargs) -> Dict[str, Any]:
256    def analyze_data(self, analyzer_name: str, data: Any, **kwargs) -> Dict[str, Any]:
257        """
258        Analyze data using the specified analyzer.
259        
260        Args:
261            analyzer_name: Name of the EDA analyzer
262            data: Input data to analyze
263            **kwargs: Additional arguments for analysis
264            
265        Returns:
266            Analysis results dictionary
267        """
268        analyzer = self.get_cached_instance(analyzer_name, analyzer_name, f"{analyzer_name} analyzer")
269        return analyzer.analyze(data, **kwargs)

Analyze data using the specified analyzer.

Args: analyzer_name: Name of the EDA analyzer data: Input data to analyze **kwargs: Additional arguments for analysis

Returns: Analysis results dictionary

def visualize_data(self, analyzer_name: str, data: Any, **kwargs):
271    def visualize_data(self, analyzer_name: str, data: Any, **kwargs):
272        """
273        Create visualizations using the specified analyzer.
274        
275        Args:
276            analyzer_name: Name of the EDA analyzer
277            data: Input data to visualize
278            **kwargs: Additional arguments for visualization
279        """
280        analyzer = self.get_cached_instance(analyzer_name, analyzer_name, f"{analyzer_name} analyzer")
281        analyzer.visualize(data, **kwargs)

Create visualizations using the specified analyzer.

Args: analyzer_name: Name of the EDA analyzer data: Input data to visualize **kwargs: Additional arguments for visualization

class ClassificationManager(gaitsetpy.core.managers.BaseManager):
284class ClassificationManager(BaseManager):
285    """
286    Singleton manager for classification models.
287    """
288    
289    def register_model(self, name: str, model_class: Type[BaseClassificationModel]):
290        """
291        Register a classification model.
292        
293        Args:
294            name: Name to register the model under
295            model_class: Classification model class
296        """
297        if not issubclass(model_class, BaseClassificationModel):
298            raise ValueError(f"Model class must inherit from BaseClassificationModel")
299        self.register(name, model_class)
300    
301    def train_model(self, model_name: str, features: List[Dict], **kwargs) -> BaseClassificationModel:
302        """
303        Train a classification model.
304        
305        Args:
306            model_name: Name of the classification model
307            features: List of feature dictionaries
308            **kwargs: Additional arguments for training
309            
310        Returns:
311            Trained model instance
312        """
313        model = self.create_instance(model_name, model_name, f"{model_name} classification model")
314        model.train(features, **kwargs)
315        return model
316    
317    def predict(self, model_name: str, features: List[Dict], **kwargs) -> Any:
318        """
319        Make predictions using a trained model.
320        
321        Args:
322            model_name: Name of the classification model
323            features: List of feature dictionaries
324            **kwargs: Additional arguments for prediction
325            
326        Returns:
327            Predictions array
328        """
329        model = self.get_cached_instance(model_name, model_name, f"{model_name} classification model")
330        return model.predict(features, **kwargs)
331    
332    def evaluate_model(self, model_name: str, features: List[Dict], **kwargs) -> Dict[str, float]:
333        """
334        Evaluate a classification model.
335        
336        Args:
337            model_name: Name of the classification model
338            features: List of feature dictionaries
339            **kwargs: Additional arguments for evaluation
340            
341        Returns:
342            Evaluation metrics dictionary
343        """
344        model = self.get_cached_instance(model_name, model_name, f"{model_name} classification model")
345        return model.evaluate(features, **kwargs) 

Singleton manager for classification models.

def register_model( self, name: str, model_class: Type[BaseClassificationModel]):
289    def register_model(self, name: str, model_class: Type[BaseClassificationModel]):
290        """
291        Register a classification model.
292        
293        Args:
294            name: Name to register the model under
295            model_class: Classification model class
296        """
297        if not issubclass(model_class, BaseClassificationModel):
298            raise ValueError(f"Model class must inherit from BaseClassificationModel")
299        self.register(name, model_class)

Register a classification model.

Args: name: Name to register the model under model_class: Classification model class

def train_model( self, model_name: str, features: List[Dict], **kwargs) -> BaseClassificationModel:
301    def train_model(self, model_name: str, features: List[Dict], **kwargs) -> BaseClassificationModel:
302        """
303        Train a classification model.
304        
305        Args:
306            model_name: Name of the classification model
307            features: List of feature dictionaries
308            **kwargs: Additional arguments for training
309            
310        Returns:
311            Trained model instance
312        """
313        model = self.create_instance(model_name, model_name, f"{model_name} classification model")
314        model.train(features, **kwargs)
315        return model

Train a classification model.

Args: model_name: Name of the classification model features: List of feature dictionaries **kwargs: Additional arguments for training

Returns: Trained model instance

def predict(self, model_name: str, features: List[Dict], **kwargs) -> Any:
317    def predict(self, model_name: str, features: List[Dict], **kwargs) -> Any:
318        """
319        Make predictions using a trained model.
320        
321        Args:
322            model_name: Name of the classification model
323            features: List of feature dictionaries
324            **kwargs: Additional arguments for prediction
325            
326        Returns:
327            Predictions array
328        """
329        model = self.get_cached_instance(model_name, model_name, f"{model_name} classification model")
330        return model.predict(features, **kwargs)

Make predictions using a trained model.

Args: model_name: Name of the classification model features: List of feature dictionaries **kwargs: Additional arguments for prediction

Returns: Predictions array

def evaluate_model( self, model_name: str, features: List[Dict], **kwargs) -> Dict[str, float]:
332    def evaluate_model(self, model_name: str, features: List[Dict], **kwargs) -> Dict[str, float]:
333        """
334        Evaluate a classification model.
335        
336        Args:
337            model_name: Name of the classification model
338            features: List of feature dictionaries
339            **kwargs: Additional arguments for evaluation
340            
341        Returns:
342            Evaluation metrics dictionary
343        """
344        model = self.get_cached_instance(model_name, model_name, f"{model_name} classification model")
345        return model.evaluate(features, **kwargs) 

Evaluate a classification model.

Args: model_name: Name of the classification model features: List of feature dictionaries **kwargs: Additional arguments for evaluation

Returns: Evaluation metrics dictionary

class DaphnetLoader(gaitsetpy.BaseDatasetLoader):
 18class DaphnetLoader(BaseDatasetLoader):
 19    """
 20    Daphnet dataset loader class.
 21    
 22    This class handles loading and processing of the Daphnet dataset for gait analysis.
 23    """
 24    
 25    def __init__(self, max_workers: int = 8):
 26        """
 27        Initialize Daphnet loader with concurrent download support.
 28        
 29        Args:
 30            max_workers: Maximum number of concurrent download threads (default: 8)
 31        """
 32        super().__init__(
 33            name="daphnet",
 34            description="Daphnet Freezing of Gait Dataset - Contains accelerometer data from subjects with Parkinson's disease",
 35            max_workers=max_workers
 36        )
 37        self.metadata = {
 38            'sensors': ['shank', 'thigh', 'trunk'],
 39            'components': ['h_fd', 'v', 'h_l'],  # horizontal forward, vertical, horizontal lateral
 40            'sampling_frequency': 64,
 41            'annotations': {
 42                0: 'not_valid',
 43                1: 'no_freeze',
 44                2: 'freeze'
 45            }
 46        }
 47    
 48    def load_data(self, data_dir: str, **kwargs) -> Tuple[List[pd.DataFrame], List[str]]:
 49        """
 50        Load Daphnet dataset from the specified directory.
 51        
 52        Args:
 53            data_dir: Directory to store/find the dataset
 54            **kwargs: Additional arguments (unused for Daphnet)
 55            
 56        Returns:
 57            Tuple of (data_list, names_list)
 58        """
 59        # Download and extract if needed
 60        download_dataset("daphnet", data_dir)
 61        extract_dataset("daphnet", data_dir)
 62        
 63        file_path = os.path.join(data_dir, "dataset_fog_release/dataset")
 64        daphnet_data = []
 65        daphnet_names = []
 66        
 67        # Load all subject files
 68        for file in sorted(glob(os.path.join(file_path, "S*.txt"))):
 69            # Extract filename from path
 70            filename = os.path.basename(file)
 71            daphnet_names.append(filename)
 72            
 73            # Load CSV with proper column names
 74            column_names = [
 75                "time", "shank_h_fd", "shank_v", "shank_h_l", 
 76                "thigh_h_fd", "thigh_v", "thigh_h_l", 
 77                "trunk_h_fd", "trunk_v", "trunk_h_l", "annotations"
 78            ]
 79            
 80            df = pd.read_csv(file, sep=" ", names=column_names)
 81            
 82            # Set time as index
 83            df = df.set_index("time")
 84            
 85            # Calculate magnitude for each sensor
 86            df["thigh"] = np.sqrt(df["thigh_h_l"]**2 + df["thigh_v"]**2 + df["thigh_h_fd"]**2)
 87            df["shank"] = np.sqrt(df["shank_h_l"]**2 + df["shank_v"]**2 + df["shank_h_fd"]**2)
 88            df["trunk"] = np.sqrt(df["trunk_h_l"]**2 + df["trunk_v"]**2 + df["trunk_h_fd"]**2)
 89            
 90            # Reorder columns for consistency
 91            df = df[["shank", "shank_h_fd", "shank_v", "shank_h_l", 
 92                    "thigh", "thigh_h_fd", "thigh_v", "thigh_h_l", 
 93                    "trunk", "trunk_h_fd", "trunk_v", "trunk_h_l", "annotations"]]
 94            
 95            daphnet_data.append(df)
 96        
 97        # Store loaded data
 98        self.data = daphnet_data
 99        self.names = daphnet_names
100        
101        return daphnet_data, daphnet_names
102    
103    def create_sliding_windows(self, data: List[pd.DataFrame], names: List[str], 
104                             window_size: int = 192, step_size: int = 32) -> List[Dict]:
105        """
106        Create sliding windows from the Daphnet dataset.
107        
108        Args:
109            data: List of DataFrames containing Daphnet data
110            names: List of names corresponding to the data
111            window_size: Size of the sliding window (default: 192)
112            step_size: Step size for the sliding window (default: 32)
113            
114        Returns:
115            List of dictionaries containing sliding windows for each DataFrame
116        """
117        windows_data = []
118        
119        for idx, df in enumerate(data):
120            # Filter out invalid data (annotations == 0)
121            df_filtered = df[df.annotations > 0]
122            
123            if df_filtered.empty:
124                continue
125                
126            windows = []
127            processed_columns = set()
128            
129            # Process each sensor column
130            for col in df_filtered.columns:
131                if col != "annotations" and col not in processed_columns:
132                    window_data = sliding_window(df_filtered[col], window_size, step_size)
133                    windows.append({"name": col, "data": window_data})
134                    processed_columns.add(col)
135            
136            # Include annotations separately
137            annotations_window = sliding_window(df_filtered["annotations"], window_size, step_size)
138            windows.append({"name": "annotations", "data": annotations_window})
139            
140            windows_data.append({"name": names[idx], "windows": windows})
141        
142        return windows_data
143    
144    def get_supported_formats(self) -> List[str]:
145        """
146        Get list of supported file formats for Daphnet dataset.
147        
148        Returns:
149            List of supported file extensions
150        """
151        return ['.txt']
152    
153    def get_sensor_info(self) -> Dict[str, List[str]]:
154        """
155        Get information about sensors in the dataset.
156        
157        Returns:
158            Dictionary containing sensor information
159        """
160        return {
161            'sensors': self.metadata['sensors'],
162            'components': self.metadata['components'],
163            'sampling_frequency': self.metadata['sampling_frequency']
164        }
165    
166    def get_annotation_info(self) -> Dict[int, str]:
167        """
168        Get information about annotations in the dataset.
169        
170        Returns:
171            Dictionary mapping annotation values to descriptions
172        """
173        return self.metadata['annotations']

Daphnet dataset loader class.

This class handles loading and processing of the Daphnet dataset for gait analysis.

DaphnetLoader(max_workers: int = 8)
25    def __init__(self, max_workers: int = 8):
26        """
27        Initialize Daphnet loader with concurrent download support.
28        
29        Args:
30            max_workers: Maximum number of concurrent download threads (default: 8)
31        """
32        super().__init__(
33            name="daphnet",
34            description="Daphnet Freezing of Gait Dataset - Contains accelerometer data from subjects with Parkinson's disease",
35            max_workers=max_workers
36        )
37        self.metadata = {
38            'sensors': ['shank', 'thigh', 'trunk'],
39            'components': ['h_fd', 'v', 'h_l'],  # horizontal forward, vertical, horizontal lateral
40            'sampling_frequency': 64,
41            'annotations': {
42                0: 'not_valid',
43                1: 'no_freeze',
44                2: 'freeze'
45            }
46        }

Initialize Daphnet loader with concurrent download support.

Args: max_workers: Maximum number of concurrent download threads (default: 8)

metadata
def load_data( self, data_dir: str, **kwargs) -> Tuple[List[pandas.core.frame.DataFrame], List[str]]:
 48    def load_data(self, data_dir: str, **kwargs) -> Tuple[List[pd.DataFrame], List[str]]:
 49        """
 50        Load Daphnet dataset from the specified directory.
 51        
 52        Args:
 53            data_dir: Directory to store/find the dataset
 54            **kwargs: Additional arguments (unused for Daphnet)
 55            
 56        Returns:
 57            Tuple of (data_list, names_list)
 58        """
 59        # Download and extract if needed
 60        download_dataset("daphnet", data_dir)
 61        extract_dataset("daphnet", data_dir)
 62        
 63        file_path = os.path.join(data_dir, "dataset_fog_release/dataset")
 64        daphnet_data = []
 65        daphnet_names = []
 66        
 67        # Load all subject files
 68        for file in sorted(glob(os.path.join(file_path, "S*.txt"))):
 69            # Extract filename from path
 70            filename = os.path.basename(file)
 71            daphnet_names.append(filename)
 72            
 73            # Load CSV with proper column names
 74            column_names = [
 75                "time", "shank_h_fd", "shank_v", "shank_h_l", 
 76                "thigh_h_fd", "thigh_v", "thigh_h_l", 
 77                "trunk_h_fd", "trunk_v", "trunk_h_l", "annotations"
 78            ]
 79            
 80            df = pd.read_csv(file, sep=" ", names=column_names)
 81            
 82            # Set time as index
 83            df = df.set_index("time")
 84            
 85            # Calculate magnitude for each sensor
 86            df["thigh"] = np.sqrt(df["thigh_h_l"]**2 + df["thigh_v"]**2 + df["thigh_h_fd"]**2)
 87            df["shank"] = np.sqrt(df["shank_h_l"]**2 + df["shank_v"]**2 + df["shank_h_fd"]**2)
 88            df["trunk"] = np.sqrt(df["trunk_h_l"]**2 + df["trunk_v"]**2 + df["trunk_h_fd"]**2)
 89            
 90            # Reorder columns for consistency
 91            df = df[["shank", "shank_h_fd", "shank_v", "shank_h_l", 
 92                    "thigh", "thigh_h_fd", "thigh_v", "thigh_h_l", 
 93                    "trunk", "trunk_h_fd", "trunk_v", "trunk_h_l", "annotations"]]
 94            
 95            daphnet_data.append(df)
 96        
 97        # Store loaded data
 98        self.data = daphnet_data
 99        self.names = daphnet_names
100        
101        return daphnet_data, daphnet_names

Load Daphnet dataset from the specified directory.

Args: data_dir: Directory to store/find the dataset **kwargs: Additional arguments (unused for Daphnet)

Returns: Tuple of (data_list, names_list)

def create_sliding_windows( self, data: List[pandas.core.frame.DataFrame], names: List[str], window_size: int = 192, step_size: int = 32) -> List[Dict]:
103    def create_sliding_windows(self, data: List[pd.DataFrame], names: List[str], 
104                             window_size: int = 192, step_size: int = 32) -> List[Dict]:
105        """
106        Create sliding windows from the Daphnet dataset.
107        
108        Args:
109            data: List of DataFrames containing Daphnet data
110            names: List of names corresponding to the data
111            window_size: Size of the sliding window (default: 192)
112            step_size: Step size for the sliding window (default: 32)
113            
114        Returns:
115            List of dictionaries containing sliding windows for each DataFrame
116        """
117        windows_data = []
118        
119        for idx, df in enumerate(data):
120            # Filter out invalid data (annotations == 0)
121            df_filtered = df[df.annotations > 0]
122            
123            if df_filtered.empty:
124                continue
125                
126            windows = []
127            processed_columns = set()
128            
129            # Process each sensor column
130            for col in df_filtered.columns:
131                if col != "annotations" and col not in processed_columns:
132                    window_data = sliding_window(df_filtered[col], window_size, step_size)
133                    windows.append({"name": col, "data": window_data})
134                    processed_columns.add(col)
135            
136            # Include annotations separately
137            annotations_window = sliding_window(df_filtered["annotations"], window_size, step_size)
138            windows.append({"name": "annotations", "data": annotations_window})
139            
140            windows_data.append({"name": names[idx], "windows": windows})
141        
142        return windows_data

Create sliding windows from the Daphnet dataset.

Args: data: List of DataFrames containing Daphnet data names: List of names corresponding to the data window_size: Size of the sliding window (default: 192) step_size: Step size for the sliding window (default: 32)

Returns: List of dictionaries containing sliding windows for each DataFrame

def get_supported_formats(self) -> List[str]:
144    def get_supported_formats(self) -> List[str]:
145        """
146        Get list of supported file formats for Daphnet dataset.
147        
148        Returns:
149            List of supported file extensions
150        """
151        return ['.txt']

Get list of supported file formats for Daphnet dataset.

Returns: List of supported file extensions

def get_sensor_info(self) -> Dict[str, List[str]]:
153    def get_sensor_info(self) -> Dict[str, List[str]]:
154        """
155        Get information about sensors in the dataset.
156        
157        Returns:
158            Dictionary containing sensor information
159        """
160        return {
161            'sensors': self.metadata['sensors'],
162            'components': self.metadata['components'],
163            'sampling_frequency': self.metadata['sampling_frequency']
164        }

Get information about sensors in the dataset.

Returns: Dictionary containing sensor information

def get_annotation_info(self) -> Dict[int, str]:
166    def get_annotation_info(self) -> Dict[int, str]:
167        """
168        Get information about annotations in the dataset.
169        
170        Returns:
171            Dictionary mapping annotation values to descriptions
172        """
173        return self.metadata['annotations']

Get information about annotations in the dataset.

Returns: Dictionary mapping annotation values to descriptions

class MobiFallLoader(gaitsetpy.BaseDatasetLoader):
 17class MobiFallLoader(BaseDatasetLoader):
 18    """
 19    MobiFall dataset loader class.
 20    
 21    This class handles loading and processing of the MobiFall dataset for gait analysis.
 22    """
 23    
 24    def __init__(self, max_workers: int = 8):
 25        """
 26        Initialize MobiFall loader with concurrent download support.
 27        
 28        Args:
 29            max_workers: Maximum number of concurrent download threads (default: 8)
 30        """
 31        super().__init__(
 32            name="mobifall",
 33            description="MobiFall Dataset - Contains accelerometer and gyroscope data for fall detection",
 34            max_workers=max_workers
 35        )
 36        self.metadata = {
 37            'sensors': ['accelerometer', 'gyroscope'],
 38            'components': ['x', 'y', 'z'],
 39            'sampling_frequency': 100,  # Typical for MobiFall
 40            'activities': ['ADL', 'FALL']  # Activities of Daily Living and Falls
 41        }
 42    
 43    def load_data(self, data_dir: str, **kwargs) -> Tuple[List[pd.DataFrame], List[str]]:
 44        """
 45        Load MobiFall dataset from the specified directory.
 46        
 47        Args:
 48            data_dir: Directory to store/find the dataset
 49            **kwargs: Additional arguments (unused for MobiFall)
 50            
 51        Returns:
 52            Tuple of (data_list, names_list)
 53        """
 54        # TODO: Implement MobiFall data loading
 55        # This is a placeholder implementation
 56        print("MobiFall data loading is not yet implemented")
 57        return [], []
 58    
 59    def create_sliding_windows(self, data: List[pd.DataFrame], names: List[str], 
 60                             window_size: int = 192, step_size: int = 32) -> List[Dict]:
 61        """
 62        Create sliding windows from the MobiFall dataset.
 63        
 64        Args:
 65            data: List of DataFrames containing MobiFall data
 66            names: List of names corresponding to the data
 67            window_size: Size of the sliding window (default: 192)
 68            step_size: Step size for the sliding window (default: 32)
 69            
 70        Returns:
 71            List of dictionaries containing sliding windows for each DataFrame
 72        """
 73        # TODO: Implement MobiFall sliding window creation
 74        # This is a placeholder implementation
 75        print("MobiFall sliding window creation is not yet implemented")
 76        return []
 77    
 78    def get_supported_formats(self) -> List[str]:
 79        """
 80        Get list of supported file formats for MobiFall dataset.
 81        
 82        Returns:
 83            List of supported file extensions
 84        """
 85        return ['.csv', '.txt']
 86    
 87    def get_sensor_info(self) -> Dict[str, List[str]]:
 88        """
 89        Get information about sensors in the dataset.
 90        
 91        Returns:
 92            Dictionary containing sensor information
 93        """
 94        return {
 95            'sensors': self.metadata['sensors'],
 96            'components': self.metadata['components'],
 97            'sampling_frequency': self.metadata['sampling_frequency']
 98        }
 99    
100    def get_activity_info(self) -> List[str]:
101        """
102        Get information about activities in the dataset.
103        
104        Returns:
105            List of activity types
106        """
107        return self.metadata['activities']

MobiFall dataset loader class.

This class handles loading and processing of the MobiFall dataset for gait analysis.

MobiFallLoader(max_workers: int = 8)
24    def __init__(self, max_workers: int = 8):
25        """
26        Initialize MobiFall loader with concurrent download support.
27        
28        Args:
29            max_workers: Maximum number of concurrent download threads (default: 8)
30        """
31        super().__init__(
32            name="mobifall",
33            description="MobiFall Dataset - Contains accelerometer and gyroscope data for fall detection",
34            max_workers=max_workers
35        )
36        self.metadata = {
37            'sensors': ['accelerometer', 'gyroscope'],
38            'components': ['x', 'y', 'z'],
39            'sampling_frequency': 100,  # Typical for MobiFall
40            'activities': ['ADL', 'FALL']  # Activities of Daily Living and Falls
41        }

Initialize MobiFall loader with concurrent download support.

Args: max_workers: Maximum number of concurrent download threads (default: 8)

metadata
def load_data( self, data_dir: str, **kwargs) -> Tuple[List[pandas.core.frame.DataFrame], List[str]]:
43    def load_data(self, data_dir: str, **kwargs) -> Tuple[List[pd.DataFrame], List[str]]:
44        """
45        Load MobiFall dataset from the specified directory.
46        
47        Args:
48            data_dir: Directory to store/find the dataset
49            **kwargs: Additional arguments (unused for MobiFall)
50            
51        Returns:
52            Tuple of (data_list, names_list)
53        """
54        # TODO: Implement MobiFall data loading
55        # This is a placeholder implementation
56        print("MobiFall data loading is not yet implemented")
57        return [], []

Load MobiFall dataset from the specified directory.

Args: data_dir: Directory to store/find the dataset **kwargs: Additional arguments (unused for MobiFall)

Returns: Tuple of (data_list, names_list)

def create_sliding_windows( self, data: List[pandas.core.frame.DataFrame], names: List[str], window_size: int = 192, step_size: int = 32) -> List[Dict]:
59    def create_sliding_windows(self, data: List[pd.DataFrame], names: List[str], 
60                             window_size: int = 192, step_size: int = 32) -> List[Dict]:
61        """
62        Create sliding windows from the MobiFall dataset.
63        
64        Args:
65            data: List of DataFrames containing MobiFall data
66            names: List of names corresponding to the data
67            window_size: Size of the sliding window (default: 192)
68            step_size: Step size for the sliding window (default: 32)
69            
70        Returns:
71            List of dictionaries containing sliding windows for each DataFrame
72        """
73        # TODO: Implement MobiFall sliding window creation
74        # This is a placeholder implementation
75        print("MobiFall sliding window creation is not yet implemented")
76        return []

Create sliding windows from the MobiFall dataset.

Args: data: List of DataFrames containing MobiFall data names: List of names corresponding to the data window_size: Size of the sliding window (default: 192) step_size: Step size for the sliding window (default: 32)

Returns: List of dictionaries containing sliding windows for each DataFrame

def get_supported_formats(self) -> List[str]:
78    def get_supported_formats(self) -> List[str]:
79        """
80        Get list of supported file formats for MobiFall dataset.
81        
82        Returns:
83            List of supported file extensions
84        """
85        return ['.csv', '.txt']

Get list of supported file formats for MobiFall dataset.

Returns: List of supported file extensions

def get_sensor_info(self) -> Dict[str, List[str]]:
87    def get_sensor_info(self) -> Dict[str, List[str]]:
88        """
89        Get information about sensors in the dataset.
90        
91        Returns:
92            Dictionary containing sensor information
93        """
94        return {
95            'sensors': self.metadata['sensors'],
96            'components': self.metadata['components'],
97            'sampling_frequency': self.metadata['sampling_frequency']
98        }

Get information about sensors in the dataset.

Returns: Dictionary containing sensor information

def get_activity_info(self) -> List[str]:
100    def get_activity_info(self) -> List[str]:
101        """
102        Get information about activities in the dataset.
103        
104        Returns:
105            List of activity types
106        """
107        return self.metadata['activities']

Get information about activities in the dataset.

Returns: List of activity types

class ArduousLoader(gaitsetpy.BaseDatasetLoader):
 17class ArduousLoader(BaseDatasetLoader):
 18    """
 19    Arduous dataset loader class.
 20    
 21    This class handles loading and processing of the Arduous dataset for gait analysis.
 22    """
 23    
 24    def __init__(self, max_workers: int = 8):
 25        """
 26        Initialize Arduous loader with concurrent download support.
 27        
 28        Args:
 29            max_workers: Maximum number of concurrent download threads (default: 8)
 30        """
 31        super().__init__(
 32            name="arduous",
 33            description="Arduous Dataset - Contains multi-sensor wearable data for daily activity recognition",
 34            max_workers=max_workers
 35        )
 36        self.metadata = {
 37            'sensors': ['accelerometer', 'gyroscope', 'magnetometer'],
 38            'components': ['x', 'y', 'z'],
 39            'sampling_frequency': 50,  # Typical for Arduous
 40            'activities': ['walking', 'running', 'sitting', 'standing', 'lying']
 41        }
 42    
 43    def load_data(self, data_dir: str, **kwargs) -> Tuple[List[pd.DataFrame], List[str]]:
 44        """
 45        Load Arduous dataset from the specified directory.
 46        
 47        Args:
 48            data_dir: Directory to store/find the dataset
 49            **kwargs: Additional arguments (unused for Arduous)
 50            
 51        Returns:
 52            Tuple of (data_list, names_list)
 53        """
 54        # TODO: Implement Arduous data loading
 55        # This is a placeholder implementation
 56        print("Arduous data loading is not yet implemented")
 57        return [], []
 58    
 59    def create_sliding_windows(self, data: List[pd.DataFrame], names: List[str], 
 60                             window_size: int = 192, step_size: int = 32) -> List[Dict]:
 61        """
 62        Create sliding windows from the Arduous dataset.
 63        
 64        Args:
 65            data: List of DataFrames containing Arduous data
 66            names: List of names corresponding to the data
 67            window_size: Size of the sliding window (default: 192)
 68            step_size: Step size for the sliding window (default: 32)
 69            
 70        Returns:
 71            List of dictionaries containing sliding windows for each DataFrame
 72        """
 73        # TODO: Implement Arduous sliding window creation
 74        # This is a placeholder implementation
 75        print("Arduous sliding window creation is not yet implemented")
 76        return []
 77    
 78    def get_supported_formats(self) -> List[str]:
 79        """
 80        Get list of supported file formats for Arduous dataset.
 81        
 82        Returns:
 83            List of supported file extensions
 84        """
 85        return ['.csv', '.txt']
 86    
 87    def get_sensor_info(self) -> Dict[str, List[str]]:
 88        """
 89        Get information about sensors in the dataset.
 90        
 91        Returns:
 92            Dictionary containing sensor information
 93        """
 94        return {
 95            'sensors': self.metadata['sensors'],
 96            'components': self.metadata['components'],
 97            'sampling_frequency': self.metadata['sampling_frequency']
 98        }
 99    
100    def get_activity_info(self) -> List[str]:
101        """
102        Get information about activities in the dataset.
103        
104        Returns:
105            List of activity types
106        """
107        return self.metadata['activities']

Arduous dataset loader class.

This class handles loading and processing of the Arduous dataset for gait analysis.

ArduousLoader(max_workers: int = 8)
24    def __init__(self, max_workers: int = 8):
25        """
26        Initialize Arduous loader with concurrent download support.
27        
28        Args:
29            max_workers: Maximum number of concurrent download threads (default: 8)
30        """
31        super().__init__(
32            name="arduous",
33            description="Arduous Dataset - Contains multi-sensor wearable data for daily activity recognition",
34            max_workers=max_workers
35        )
36        self.metadata = {
37            'sensors': ['accelerometer', 'gyroscope', 'magnetometer'],
38            'components': ['x', 'y', 'z'],
39            'sampling_frequency': 50,  # Typical for Arduous
40            'activities': ['walking', 'running', 'sitting', 'standing', 'lying']
41        }

Initialize Arduous loader with concurrent download support.

Args: max_workers: Maximum number of concurrent download threads (default: 8)

metadata
def load_data( self, data_dir: str, **kwargs) -> Tuple[List[pandas.core.frame.DataFrame], List[str]]:
43    def load_data(self, data_dir: str, **kwargs) -> Tuple[List[pd.DataFrame], List[str]]:
44        """
45        Load Arduous dataset from the specified directory.
46        
47        Args:
48            data_dir: Directory to store/find the dataset
49            **kwargs: Additional arguments (unused for Arduous)
50            
51        Returns:
52            Tuple of (data_list, names_list)
53        """
54        # TODO: Implement Arduous data loading
55        # This is a placeholder implementation
56        print("Arduous data loading is not yet implemented")
57        return [], []

Load Arduous dataset from the specified directory.

Args: data_dir: Directory to store/find the dataset **kwargs: Additional arguments (unused for Arduous)

Returns: Tuple of (data_list, names_list)

def create_sliding_windows( self, data: List[pandas.core.frame.DataFrame], names: List[str], window_size: int = 192, step_size: int = 32) -> List[Dict]:
59    def create_sliding_windows(self, data: List[pd.DataFrame], names: List[str], 
60                             window_size: int = 192, step_size: int = 32) -> List[Dict]:
61        """
62        Create sliding windows from the Arduous dataset.
63        
64        Args:
65            data: List of DataFrames containing Arduous data
66            names: List of names corresponding to the data
67            window_size: Size of the sliding window (default: 192)
68            step_size: Step size for the sliding window (default: 32)
69            
70        Returns:
71            List of dictionaries containing sliding windows for each DataFrame
72        """
73        # TODO: Implement Arduous sliding window creation
74        # This is a placeholder implementation
75        print("Arduous sliding window creation is not yet implemented")
76        return []

Create sliding windows from the Arduous dataset.

Args: data: List of DataFrames containing Arduous data names: List of names corresponding to the data window_size: Size of the sliding window (default: 192) step_size: Step size for the sliding window (default: 32)

Returns: List of dictionaries containing sliding windows for each DataFrame

def get_supported_formats(self) -> List[str]:
78    def get_supported_formats(self) -> List[str]:
79        """
80        Get list of supported file formats for Arduous dataset.
81        
82        Returns:
83            List of supported file extensions
84        """
85        return ['.csv', '.txt']

Get list of supported file formats for Arduous dataset.

Returns: List of supported file extensions

def get_sensor_info(self) -> Dict[str, List[str]]:
87    def get_sensor_info(self) -> Dict[str, List[str]]:
88        """
89        Get information about sensors in the dataset.
90        
91        Returns:
92            Dictionary containing sensor information
93        """
94        return {
95            'sensors': self.metadata['sensors'],
96            'components': self.metadata['components'],
97            'sampling_frequency': self.metadata['sampling_frequency']
98        }

Get information about sensors in the dataset.

Returns: Dictionary containing sensor information

def get_activity_info(self) -> List[str]:
100    def get_activity_info(self) -> List[str]:
101        """
102        Get information about activities in the dataset.
103        
104        Returns:
105            List of activity types
106        """
107        return self.metadata['activities']

Get information about activities in the dataset.

Returns: List of activity types

class PhysioNetLoader(gaitsetpy.BaseDatasetLoader):
 25class PhysioNetLoader(BaseDatasetLoader):
 26    """
 27    PhysioNet VGRF dataset loader class.
 28    
 29    This class handles loading and processing of the PhysioNet Gait in Parkinson's Disease dataset.
 30    The dataset contains vertical ground reaction force (VGRF) data from subjects with Parkinson's 
 31    disease and healthy controls.
 32    
 33    Features concurrent downloading for efficient data retrieval.
 34    """
 35    
 36    def __init__(self, max_workers: int = 8):
 37        """
 38        Initialize PhysioNet loader with concurrent download support.
 39        
 40        Args:
 41            max_workers: Maximum number of concurrent download threads (default: 8)
 42        """
 43        super().__init__(
 44            name="physionet",
 45            description="PhysioNet Gait in Parkinson's Disease Dataset - Contains VGRF data from subjects with Parkinson's disease and healthy controls",
 46            max_workers=max_workers
 47        )
 48        self.metadata = {
 49            'sensors': ['VGRF_L1', 'VGRF_L2', 'VGRF_L3', 'VGRF_L4', 'VGRF_L5', 'VGRF_L6', 'VGRF_L7', 'VGRF_L8',
 50                       'VGRF_R1', 'VGRF_R2', 'VGRF_R3', 'VGRF_R4', 'VGRF_R5', 'VGRF_R6', 'VGRF_R7', 'VGRF_R8'],
 51            'sampling_frequency': 100,  # 100 Hz sampling frequency
 52            'subjects': {
 53                'Co': 'Control subjects',
 54                'Pt': 'Parkinson\'s disease patients'
 55            },
 56            'window_size': 600,  # 6 seconds at 100 Hz
 57            'url': 'https://physionet.org/files/gaitpdb/1.0.0/'
 58        }
 59        self.labels = []
 60        self.subject_types = []
 61    
 62    def _download_physionet_data(self, data_dir: str) -> str:
 63        """
 64        Download PhysioNet dataset if not already present using concurrent downloads.
 65        
 66        This method uses multi-threaded downloading to significantly speed up the
 67        download process for the 100+ files in the PhysioNet dataset.
 68        
 69        Args:
 70            data_dir: Directory to store the dataset
 71            
 72        Returns:
 73            Path to the downloaded/existing dataset directory
 74        """
 75        dataset_path = os.path.join(data_dir, "physionet_gaitpdb")
 76        
 77        if os.path.exists(dataset_path) and len(os.listdir(dataset_path)) > 0:
 78            print(f"PhysioNet dataset already exists at: {dataset_path}")
 79            return dataset_path
 80        
 81        os.makedirs(dataset_path, exist_ok=True)
 82        
 83        # Download the dataset files
 84        base_url = "https://physionet.org/files/gaitpdb/1.0.0/"
 85        
 86        # Get list of files (basic file names based on the reference)
 87        file_patterns = [
 88            # Control subjects - Ga prefix
 89            *[f"GaCo{i:02d}_{j:02d}.txt" for i in range(1, 18) for j in range(1, 3)],
 90            "GaCo22_01.txt", "GaCo22_10.txt",
 91            
 92            # Parkinson's patients - Ga prefix
 93            *[f"GaPt{i:02d}_{j:02d}.txt" for i in range(3, 10) for j in range(1, 3)],
 94            *[f"GaPt{i:02d}_{j:02d}.txt" for i in range(12, 34) for j in range(1, 3)],
 95            *[f"GaPt{i:02d}_10.txt" for i in range(13, 34)],
 96            
 97            # Control subjects - Ju prefix
 98            *[f"JuCo{i:02d}_01.txt" for i in range(1, 27)],
 99            
100            # Parkinson's patients - Ju prefix
101            *[f"JuPt{i:02d}_{j:02d}.txt" for i in range(1, 30) for j in range(1, 8)],
102            
103            # Control subjects - Si prefix
104            *[f"SiCo{i:02d}_01.txt" for i in range(1, 31)],
105            
106            # Parkinson's patients - Si prefix
107            *[f"SiPt{i:02d}_01.txt" for i in range(2, 41)]
108        ]
109        
110        # Prepare download tasks for concurrent execution
111        download_tasks = [
112            {
113                'url': base_url + filename,
114                'dest_path': os.path.join(dataset_path, filename)
115            }
116            for filename in file_patterns
117        ]
118        
119        print(f"Downloading PhysioNet dataset to {dataset_path} using {self.max_workers} threads")
120        
121        # Use concurrent downloading from base class
122        results = self.download_files_concurrent(
123            download_tasks, 
124            show_progress=True, 
125            desc="Downloading PhysioNet files"
126        )
127        
128        # Print summary
129        print(f"\nDownload Summary:")
130        print(f"  Total files: {results['total']}")
131        print(f"  Successfully downloaded: {results['success']}")
132        print(f"  Already existed (skipped): {results['skipped']}")
133        print(f"  Failed: {results['failed']}")
134        
135        if results['failed'] > 0 and len(results['failed_downloads']) > 0:
136            print(f"\nFailed downloads (showing first 10):")
137            for failed in results['failed_downloads'][:10]:
138                print(f"  - {os.path.basename(failed['dest_path'])}: {failed['error']}")
139            if len(results['failed_downloads']) > 10:
140                print(f"  ... and {len(results['failed_downloads']) - 10} more failures")
141        
142        return dataset_path
143    
144    def load_data(self, data_dir: str, **kwargs) -> Tuple[List[pd.DataFrame], List[str]]:
145        """
146        Load PhysioNet VGRF dataset from the specified directory.
147        
148        Args:
149            data_dir: Directory to store/find the dataset
150            **kwargs: Additional arguments (unused for PhysioNet)
151            
152        Returns:
153            Tuple of (data_list, names_list)
154        """
155        # Download dataset if needed
156        dataset_path = self._download_physionet_data(data_dir)
157        
158        physionet_data = []
159        physionet_names = []
160        self.labels = []
161        self.subject_types = []
162        
163        # Load all available files
164        for filepath in sorted(glob(os.path.join(dataset_path, "Ga*.txt"))):
165            filename = os.path.basename(filepath)
166            
167            # Extract subject type from filename
168            if 'Co' in filename:
169                subject_type = 'Control'
170                label = 'Co'
171            elif 'Pt' in filename:
172                subject_type = 'Patient'
173                label = 'Pt'
174            else:
175                continue  # Skip files that don't match expected pattern
176            
177            try:
178                # Read the file - PhysioNet files are tab-delimited with variable columns
179                # Column 0: time, Columns 1-16: VGRF sensors, additional columns may exist
180                df = pd.read_csv(filepath, delimiter='\t', header=None)
181                
182                # Handle variable number of columns
183                n_cols = min(df.shape[1], 19)  # Limit to 19 columns max
184                df = df.iloc[:, :n_cols]
185                
186                # Create column names
187                col_names = ['time']
188                for i in range(1, n_cols):
189                    if i <= 8:
190                        col_names.append(f'VGRF_L{i}')
191                    elif i <= 16:
192                        col_names.append(f'VGRF_R{i-8}')
193                    else:
194                        col_names.append(f'sensor_{i}')
195                
196                df.columns = col_names
197                
198                # Set time as index
199                df = df.set_index('time')
200                
201                # Add subject metadata
202                df['subject_type'] = subject_type
203                df['label'] = label
204                
205                physionet_data.append(df)
206                physionet_names.append(filename)
207                self.labels.append(label)
208                self.subject_types.append(subject_type)
209                
210            except Exception as e:
211                print(f"Error loading {filename}: {e}")
212                continue
213        
214        # Store loaded data
215        self.data = physionet_data
216        self.names = physionet_names
217        
218        print(f"Loaded {len(physionet_data)} PhysioNet files")
219        print(f"Subject distribution: {dict(zip(*np.unique(self.subject_types, return_counts=True)))}")
220        
221        return physionet_data, physionet_names
222    
223    def create_sliding_windows(self, data: List[pd.DataFrame], names: List[str], 
224                             window_size: int = 600, step_size: int = 100) -> List[Dict]:
225        """
226        Create sliding windows from the PhysioNet dataset.
227        
228        Args:
229            data: List of DataFrames containing PhysioNet data
230            names: List of names corresponding to the data
231            window_size: Size of the sliding window (default: 600 for 6 seconds at 100Hz)
232            step_size: Step size for the sliding window (default: 100)
233            
234        Returns:
235            List of dictionaries containing sliding windows for each DataFrame
236        """
237        windows_data = []
238        
239        for idx, df in enumerate(data):
240            # Remove metadata columns for windowing
241            sensor_columns = [col for col in df.columns if col.startswith('VGRF_') or col.startswith('sensor_')]
242            df_sensors = df[sensor_columns]
243            
244            if df_sensors.empty or len(df_sensors) < window_size:
245                continue
246                
247            windows = []
248            
249            # Create windows for each sensor
250            for col in sensor_columns:
251                try:
252                    window_data = sliding_window(df_sensors[col].values, window_size, step_size)
253                    windows.append({"name": col, "data": window_data})
254                except Exception as e:
255                    print(f"Error creating windows for {col} in {names[idx]}: {e}")
256                    continue
257            
258            if windows:
259                windows_data.append({
260                    "name": names[idx],
261                    "windows": windows,
262                    "metadata": {
263                        "subject_type": df['subject_type'].iloc[0] if 'subject_type' in df.columns else 'Unknown',
264                        "label": df['label'].iloc[0] if 'label' in df.columns else 'Unknown',
265                        "window_size": window_size,
266                        "step_size": step_size,
267                        "num_windows": len(windows[0]["data"]) if windows else 0
268                    }
269                })
270        
271        return windows_data
272    
273    def get_supported_formats(self) -> List[str]:
274        """
275        Get list of supported file formats for PhysioNet dataset.
276        
277        Returns:
278            List of supported file extensions
279        """
280        return ['.txt']
281    
282    def get_sensor_info(self) -> Dict[str, List[str]]:
283        """
284        Get information about sensors in the dataset.
285        
286        Returns:
287            Dictionary containing sensor information
288        """
289        return {
290            'sensors': self.metadata['sensors'],
291            'sampling_frequency': self.metadata['sampling_frequency'],
292            'window_size': self.metadata['window_size']
293        }
294    
295    def get_subject_info(self) -> Dict[str, str]:
296        """
297        Get information about subjects in the dataset.
298        
299        Returns:
300            Dictionary containing subject information
301        """
302        return self.metadata['subjects']
303    
304    def get_labels(self) -> List[str]:
305        """
306        Get labels for loaded data.
307        
308        Returns:
309            List of labels corresponding to loaded data
310        """
311        return self.labels
312    
313    def filter_by_subject_type(self, subject_type: str) -> Tuple[List[pd.DataFrame], List[str]]:
314        """
315        Filter loaded data by subject type.
316        
317        Args:
318            subject_type: 'Control' or 'Patient'
319            
320        Returns:
321            Tuple of (filtered_data, filtered_names)
322        """
323        if not self.data:
324            raise ValueError("No data loaded. Call load_data() first.")
325        
326        filtered_data = []
327        filtered_names = []
328        
329        for i, df in enumerate(self.data):
330            if df['subject_type'].iloc[0] == subject_type:
331                filtered_data.append(df)
332                filtered_names.append(self.names[i])
333        
334        return filtered_data, filtered_names

PhysioNet VGRF dataset loader class.

This class handles loading and processing of the PhysioNet Gait in Parkinson's Disease dataset. The dataset contains vertical ground reaction force (VGRF) data from subjects with Parkinson's disease and healthy controls.

Features concurrent downloading for efficient data retrieval.

PhysioNetLoader(max_workers: int = 8)
36    def __init__(self, max_workers: int = 8):
37        """
38        Initialize PhysioNet loader with concurrent download support.
39        
40        Args:
41            max_workers: Maximum number of concurrent download threads (default: 8)
42        """
43        super().__init__(
44            name="physionet",
45            description="PhysioNet Gait in Parkinson's Disease Dataset - Contains VGRF data from subjects with Parkinson's disease and healthy controls",
46            max_workers=max_workers
47        )
48        self.metadata = {
49            'sensors': ['VGRF_L1', 'VGRF_L2', 'VGRF_L3', 'VGRF_L4', 'VGRF_L5', 'VGRF_L6', 'VGRF_L7', 'VGRF_L8',
50                       'VGRF_R1', 'VGRF_R2', 'VGRF_R3', 'VGRF_R4', 'VGRF_R5', 'VGRF_R6', 'VGRF_R7', 'VGRF_R8'],
51            'sampling_frequency': 100,  # 100 Hz sampling frequency
52            'subjects': {
53                'Co': 'Control subjects',
54                'Pt': 'Parkinson\'s disease patients'
55            },
56            'window_size': 600,  # 6 seconds at 100 Hz
57            'url': 'https://physionet.org/files/gaitpdb/1.0.0/'
58        }
59        self.labels = []
60        self.subject_types = []

Initialize PhysioNet loader with concurrent download support.

Args: max_workers: Maximum number of concurrent download threads (default: 8)

metadata
labels
subject_types
def load_data( self, data_dir: str, **kwargs) -> Tuple[List[pandas.core.frame.DataFrame], List[str]]:
144    def load_data(self, data_dir: str, **kwargs) -> Tuple[List[pd.DataFrame], List[str]]:
145        """
146        Load PhysioNet VGRF dataset from the specified directory.
147        
148        Args:
149            data_dir: Directory to store/find the dataset
150            **kwargs: Additional arguments (unused for PhysioNet)
151            
152        Returns:
153            Tuple of (data_list, names_list)
154        """
155        # Download dataset if needed
156        dataset_path = self._download_physionet_data(data_dir)
157        
158        physionet_data = []
159        physionet_names = []
160        self.labels = []
161        self.subject_types = []
162        
163        # Load all available files
164        for filepath in sorted(glob(os.path.join(dataset_path, "Ga*.txt"))):
165            filename = os.path.basename(filepath)
166            
167            # Extract subject type from filename
168            if 'Co' in filename:
169                subject_type = 'Control'
170                label = 'Co'
171            elif 'Pt' in filename:
172                subject_type = 'Patient'
173                label = 'Pt'
174            else:
175                continue  # Skip files that don't match expected pattern
176            
177            try:
178                # Read the file - PhysioNet files are tab-delimited with variable columns
179                # Column 0: time, Columns 1-16: VGRF sensors, additional columns may exist
180                df = pd.read_csv(filepath, delimiter='\t', header=None)
181                
182                # Handle variable number of columns
183                n_cols = min(df.shape[1], 19)  # Limit to 19 columns max
184                df = df.iloc[:, :n_cols]
185                
186                # Create column names
187                col_names = ['time']
188                for i in range(1, n_cols):
189                    if i <= 8:
190                        col_names.append(f'VGRF_L{i}')
191                    elif i <= 16:
192                        col_names.append(f'VGRF_R{i-8}')
193                    else:
194                        col_names.append(f'sensor_{i}')
195                
196                df.columns = col_names
197                
198                # Set time as index
199                df = df.set_index('time')
200                
201                # Add subject metadata
202                df['subject_type'] = subject_type
203                df['label'] = label
204                
205                physionet_data.append(df)
206                physionet_names.append(filename)
207                self.labels.append(label)
208                self.subject_types.append(subject_type)
209                
210            except Exception as e:
211                print(f"Error loading {filename}: {e}")
212                continue
213        
214        # Store loaded data
215        self.data = physionet_data
216        self.names = physionet_names
217        
218        print(f"Loaded {len(physionet_data)} PhysioNet files")
219        print(f"Subject distribution: {dict(zip(*np.unique(self.subject_types, return_counts=True)))}")
220        
221        return physionet_data, physionet_names

Load PhysioNet VGRF dataset from the specified directory.

Args: data_dir: Directory to store/find the dataset **kwargs: Additional arguments (unused for PhysioNet)

Returns: Tuple of (data_list, names_list)

def create_sliding_windows( self, data: List[pandas.core.frame.DataFrame], names: List[str], window_size: int = 600, step_size: int = 100) -> List[Dict]:
223    def create_sliding_windows(self, data: List[pd.DataFrame], names: List[str], 
224                             window_size: int = 600, step_size: int = 100) -> List[Dict]:
225        """
226        Create sliding windows from the PhysioNet dataset.
227        
228        Args:
229            data: List of DataFrames containing PhysioNet data
230            names: List of names corresponding to the data
231            window_size: Size of the sliding window (default: 600 for 6 seconds at 100Hz)
232            step_size: Step size for the sliding window (default: 100)
233            
234        Returns:
235            List of dictionaries containing sliding windows for each DataFrame
236        """
237        windows_data = []
238        
239        for idx, df in enumerate(data):
240            # Remove metadata columns for windowing
241            sensor_columns = [col for col in df.columns if col.startswith('VGRF_') or col.startswith('sensor_')]
242            df_sensors = df[sensor_columns]
243            
244            if df_sensors.empty or len(df_sensors) < window_size:
245                continue
246                
247            windows = []
248            
249            # Create windows for each sensor
250            for col in sensor_columns:
251                try:
252                    window_data = sliding_window(df_sensors[col].values, window_size, step_size)
253                    windows.append({"name": col, "data": window_data})
254                except Exception as e:
255                    print(f"Error creating windows for {col} in {names[idx]}: {e}")
256                    continue
257            
258            if windows:
259                windows_data.append({
260                    "name": names[idx],
261                    "windows": windows,
262                    "metadata": {
263                        "subject_type": df['subject_type'].iloc[0] if 'subject_type' in df.columns else 'Unknown',
264                        "label": df['label'].iloc[0] if 'label' in df.columns else 'Unknown',
265                        "window_size": window_size,
266                        "step_size": step_size,
267                        "num_windows": len(windows[0]["data"]) if windows else 0
268                    }
269                })
270        
271        return windows_data

Create sliding windows from the PhysioNet dataset.

Args: data: List of DataFrames containing PhysioNet data names: List of names corresponding to the data window_size: Size of the sliding window (default: 600 for 6 seconds at 100Hz) step_size: Step size for the sliding window (default: 100)

Returns: List of dictionaries containing sliding windows for each DataFrame

def get_supported_formats(self) -> List[str]:
273    def get_supported_formats(self) -> List[str]:
274        """
275        Get list of supported file formats for PhysioNet dataset.
276        
277        Returns:
278            List of supported file extensions
279        """
280        return ['.txt']

Get list of supported file formats for PhysioNet dataset.

Returns: List of supported file extensions

def get_sensor_info(self) -> Dict[str, List[str]]:
282    def get_sensor_info(self) -> Dict[str, List[str]]:
283        """
284        Get information about sensors in the dataset.
285        
286        Returns:
287            Dictionary containing sensor information
288        """
289        return {
290            'sensors': self.metadata['sensors'],
291            'sampling_frequency': self.metadata['sampling_frequency'],
292            'window_size': self.metadata['window_size']
293        }

Get information about sensors in the dataset.

Returns: Dictionary containing sensor information

def get_subject_info(self) -> Dict[str, str]:
295    def get_subject_info(self) -> Dict[str, str]:
296        """
297        Get information about subjects in the dataset.
298        
299        Returns:
300            Dictionary containing subject information
301        """
302        return self.metadata['subjects']

Get information about subjects in the dataset.

Returns: Dictionary containing subject information

def get_labels(self) -> List[str]:
304    def get_labels(self) -> List[str]:
305        """
306        Get labels for loaded data.
307        
308        Returns:
309            List of labels corresponding to loaded data
310        """
311        return self.labels

Get labels for loaded data.

Returns: List of labels corresponding to loaded data

def filter_by_subject_type( self, subject_type: str) -> Tuple[List[pandas.core.frame.DataFrame], List[str]]:
313    def filter_by_subject_type(self, subject_type: str) -> Tuple[List[pd.DataFrame], List[str]]:
314        """
315        Filter loaded data by subject type.
316        
317        Args:
318            subject_type: 'Control' or 'Patient'
319            
320        Returns:
321            Tuple of (filtered_data, filtered_names)
322        """
323        if not self.data:
324            raise ValueError("No data loaded. Call load_data() first.")
325        
326        filtered_data = []
327        filtered_names = []
328        
329        for i, df in enumerate(self.data):
330            if df['subject_type'].iloc[0] == subject_type:
331                filtered_data.append(df)
332                filtered_names.append(self.names[i])
333        
334        return filtered_data, filtered_names

Filter loaded data by subject type.

Args: subject_type: 'Control' or 'Patient'

Returns: Tuple of (filtered_data, filtered_names)

class GaitFeatureExtractor(gaitsetpy.BaseFeatureExtractor):
 49class GaitFeatureExtractor(BaseFeatureExtractor):
 50    """
 51    Comprehensive gait feature extractor class.
 52    
 53    This class extracts various time-domain, frequency-domain, and statistical features
 54    from gait data sliding windows.
 55    """
 56    
 57    def __init__(self, verbose: bool = True):
 58        super().__init__(
 59            name="gait_features",
 60            description="Comprehensive gait feature extractor for time-domain, frequency-domain, and statistical features"
 61        )
 62        self.verbose = verbose
 63        self.config = {
 64            'time_domain': True,
 65            'frequency_domain': True,
 66            'statistical': True,
 67            'ar_order': 3  # Order for auto-regression coefficients
 68        }
 69        
 70        if self.verbose:
 71            print("🚀 GaitFeatureExtractor initialized successfully!")
 72            print(f"📊 Default configuration: {self.config}")
 73    
 74    def extract_features(self, windows: List[Dict], fs: int, **kwargs) -> List[Dict]:
 75        """
 76        Extract gait features from sliding windows.
 77        
 78        Args:
 79            windows: List of sliding window dictionaries
 80            fs: Sampling frequency
 81            **kwargs: Additional arguments including time_domain, frequency_domain, statistical flags
 82            
 83        Returns:
 84            List of feature dictionaries for each sensor
 85        """
 86        # Update config with any passed arguments
 87        time_domain = kwargs.get('time_domain', self.config['time_domain'])
 88        frequency_domain = kwargs.get('frequency_domain', self.config['frequency_domain'])
 89        statistical = kwargs.get('statistical', self.config['statistical'])
 90        ar_order = kwargs.get('ar_order', self.config['ar_order'])
 91        
 92        if self.verbose:
 93            print("\n" + "="*60)
 94            print("🔍 STARTING GAIT FEATURE EXTRACTION")
 95            print("="*60)
 96            print(f"📈 Total sensors/windows to process: {len(windows)}")
 97            print(f"🔊 Sampling frequency: {fs} Hz")
 98            print(f"⏱️  Time domain features: {'✅' if time_domain else '❌'}")
 99            print(f"🌊 Frequency domain features: {'✅' if frequency_domain else '❌'}")
100            print(f"📊 Statistical features: {'✅' if statistical else '❌'}")
101            print(f"🔄 Auto-regression order: {ar_order}")
102            print("-"*60)
103        
104        features = []
105        
106        # Main progress bar for processing all windows
107        main_pbar = tqdm(
108            windows, 
109            desc="🔍 Processing Sensors", 
110            unit="sensor",
111            disable=not self.verbose
112        )
113        
114        for i, window_dict in enumerate(main_pbar):
115            sensor_name = window_dict['name']
116            window_data = window_dict['data']
117            
118            if self.verbose:
119                main_pbar.set_postfix({
120                    'Current': sensor_name,
121                    'Windows': len(window_data) if isinstance(window_data, list) else 1
122                })
123            
124            # Skip annotation windows
125            if sensor_name == 'annotations':
126                if self.verbose:
127                    logger.info(f"📝 Processing annotation data for {sensor_name}")
128                
129                features.append({
130                    'name': sensor_name,
131                    'features': {},
132                    'annotations': [self._extract_annotation_labels(window) for window in window_data]
133                })
134                continue
135            
136            if self.verbose:
137                logger.info(f"🎯 Processing sensor: {sensor_name}")
138                logger.info(f"📦 Number of windows: {len(window_data)}")
139            
140            sensor_features = {'name': sensor_name, 'features': {}}
141            
142            # Time domain features
143            if time_domain:
144                if self.verbose:
145                    print(f"  ⏱️  Extracting time domain features for {sensor_name}...")
146                
147                time_features = self._extract_time_domain_features(window_data)
148                sensor_features['features'].update(time_features)
149                
150                if self.verbose:
151                    feature_count = sum(len(v) if isinstance(v, list) else 1 for v in time_features.values())
152                    print(f"  ✅ Time domain: {len(time_features)} feature types, {feature_count} total features")
153            
154            # Frequency domain features
155            if frequency_domain:
156                if self.verbose:
157                    print(f"  🌊 Extracting frequency domain features for {sensor_name}...")
158                
159                freq_features = self._extract_frequency_domain_features(window_data, fs)
160                sensor_features['features'].update(freq_features)
161                
162                if self.verbose:
163                    feature_count = sum(len(v) if isinstance(v, list) else 1 for v in freq_features.values())
164                    print(f"  ✅ Frequency domain: {len(freq_features)} feature types, {feature_count} total features")
165            
166            # Statistical features
167            if statistical:
168                if self.verbose:
169                    print(f"  📊 Extracting statistical features for {sensor_name}...")
170                
171                stat_features = self._extract_statistical_features(window_data)
172                sensor_features['features'].update(stat_features)
173                
174                if self.verbose:
175                    feature_count = sum(len(v) if isinstance(v, list) else 1 for v in stat_features.values())
176                    print(f"  ✅ Statistical: {len(stat_features)} feature types, {feature_count} total features")
177            
178            # Auto-regression coefficients
179            if self.verbose:
180                print(f"  🔄 Extracting auto-regression coefficients for {sensor_name}...")
181            
182            ar_features = self._extract_ar_coefficients(window_data, ar_order)
183            sensor_features['features'].update(ar_features)
184            
185            if self.verbose:
186                feature_count = sum(len(v) if isinstance(v, list) else 1 for v in ar_features.values())
187                print(f"  ✅ Auto-regression: {len(ar_features)} feature types, {feature_count} total features")
188            
189            # Calculate total features for this sensor
190            total_features = sum(
191                len(v) if isinstance(v, list) else 1 
192                for v in sensor_features['features'].values()
193            )
194            
195            if self.verbose:
196                print(f"  🎯 Total features extracted for {sensor_name}: {total_features}")
197                print(f"  📋 Feature types: {list(sensor_features['features'].keys())}")
198                print("-"*40)
199            
200            features.append(sensor_features)
201        
202        if self.verbose:
203            print("\n" + "="*60)
204            print("🎉 FEATURE EXTRACTION COMPLETED!")
205            print("="*60)
206            print(f"📊 Total sensors processed: {len(features)}")
207            
208            # Calculate overall statistics
209            total_feature_count = 0
210            for feature_dict in features:
211                if 'features' in feature_dict:
212                    total_feature_count += sum(
213                        len(v) if isinstance(v, list) else 1 
214                        for v in feature_dict['features'].values()
215                    )
216            
217            print(f"🔢 Total features extracted: {total_feature_count}")
218            print(f"📈 Average features per sensor: {total_feature_count / len(features):.1f}")
219            print("="*60)
220        
221        return features
222    
223    def _extract_time_domain_features(self, windows: List) -> Dict[str, List]:
224        """Extract time domain features from windows."""
225        if self.verbose:
226            print("    🔍 Computing time domain features...")
227        
228        time_features = {}
229        
230        # Define time domain feature functions
231        time_domain_funcs = {
232            'mean': calculate_mean,
233            'std': calculate_standard_deviation,
234            'variance': calculate_variance,
235            'rms': calculate_root_mean_square,
236            'range': calculate_range,
237            'median': calculate_median,
238            'mode': calculate_mode,
239            'mean_absolute_value': calculate_mean_absolute_value,
240            'median_absolute_deviation': calculate_median_absolute_deviation,
241            'peak_height': calculate_peak_height,
242            'zero_crossing_rate': calculate_zero_crossing_rate,
243            'energy': calculate_energy,
244        }
245        
246        # Progress bar for time domain features
247        feature_pbar = tqdm(
248            time_domain_funcs.items(), 
249            desc="    ⏱️  Time features", 
250            unit="feature",
251            leave=False,
252            disable=not self.verbose
253        )
254        
255        for feature_name, func in feature_pbar:
256            if self.verbose:
257                feature_pbar.set_postfix({'Computing': feature_name})
258            
259            time_features[feature_name] = [
260                func(self._ensure_numpy_array(window)) for window in windows
261            ]
262        
263        return time_features
264    
265    def _ensure_numpy_array(self, signal):
266        """Convert pandas Series to numpy array if needed."""
267        if hasattr(signal, 'values'):
268            return signal.values
269        return signal
270    
271    def _extract_frequency_domain_features(self, windows: List, fs: int) -> Dict[str, List]:
272        """Extract frequency domain features from windows."""
273        if self.verbose:
274            print("    🔍 Computing frequency domain features...")
275        
276        freq_features = {}
277        
278        # Define frequency domain feature functions
279        freq_domain_funcs = {
280            'dominant_frequency': lambda w: calculate_dominant_frequency(w, fs),
281            'peak_frequency': lambda w: calculate_peak_frequency(w, fs),
282            'power_spectral_entropy': lambda w: calculate_power_spectral_entropy(w, fs),
283            'principal_harmonic_frequency': lambda w: calculate_principal_harmonic_frequency(w, fs),
284            'stride_times': lambda w: calculate_stride_times(w, fs),
285            'step_time': lambda w: calculate_step_time(w, fs),
286            'cadence': lambda w: calculate_cadence(w, fs),
287            'freezing_index': lambda w: calculate_freezing_index(w, fs),
288        }
289        
290        # Progress bar for frequency domain features
291        feature_pbar = tqdm(
292            freq_domain_funcs.items(), 
293            desc="    🌊 Freq features", 
294            unit="feature",
295            leave=False,
296            disable=not self.verbose
297        )
298        
299        for feature_name, func in feature_pbar:
300            if self.verbose:
301                feature_pbar.set_postfix({'Computing': feature_name})
302            
303            freq_features[feature_name] = [
304                func(self._ensure_numpy_array(window)) for window in windows
305            ]
306        
307        return freq_features
308    
309    def _extract_statistical_features(self, windows: List) -> Dict[str, List]:
310        """Extract statistical features from windows."""
311        if self.verbose:
312            print("    🔍 Computing statistical features...")
313        
314        stat_features = {}
315        
316        # Define statistical feature functions
317        stat_funcs = {
318            'skewness': calculate_skewness,
319            'kurtosis': calculate_kurtosis,
320            'entropy': calculate_entropy,
321            'interquartile_range': calculate_interquartile_range,
322        }
323        
324        # Progress bar for statistical features
325        feature_pbar = tqdm(
326            stat_funcs.items(), 
327            desc="    📊 Stat features", 
328            unit="feature",
329            leave=False,
330            disable=not self.verbose
331        )
332        
333        for feature_name, func in feature_pbar:
334            if self.verbose:
335                feature_pbar.set_postfix({'Computing': feature_name})
336            
337            stat_features[feature_name] = [
338                func(self._ensure_numpy_array(window)) for window in windows
339            ]
340        
341        # Handle correlation separately (needs two signals)
342        if self.verbose:
343            print("      🔗 Computing correlation features...")
344        
345        stat_features['correlation'] = [
346            calculate_correlation(
347                self._ensure_numpy_array(window)[:-1], 
348                self._ensure_numpy_array(window)[1:]
349            ) if len(window) > 1 else 0 
350            for window in windows
351        ]
352        
353        return stat_features
354    
355    def _extract_ar_coefficients(self, windows: List, order: int) -> Dict[str, List]:
356        """Extract auto-regression coefficients from windows."""
357        if self.verbose:
358            print(f"    🔍 Computing auto-regression coefficients (order={order})...")
359        
360        # Progress bar for AR coefficients
361        ar_pbar = tqdm(
362            windows, 
363            desc="    🔄 AR coeffs", 
364            unit="window",
365            leave=False,
366            disable=not self.verbose
367        )
368        
369        ar_coeffs = []
370        for window in ar_pbar:
371            coeffs = calculate_auto_regression_coefficients(
372                self._ensure_numpy_array(window), order
373            )
374            ar_coeffs.append(coeffs)
375        
376        return {'ar_coefficients': ar_coeffs}
377    
378    def _extract_annotation_labels(self, window) -> int:
379        """Extract the most common annotation label from a window."""
380        if hasattr(window, 'mode'):
381            return window.mode().iloc[0] if len(window.mode()) > 0 else 0
382        else:
383            # For numpy arrays or other types
384            unique, counts = np.unique(window, return_counts=True)
385            return unique[np.argmax(counts)]
386    
387    def get_feature_names(self) -> List[str]:
388        """
389        Get names of all features that can be extracted.
390        
391        Returns:
392            List of feature names
393        """
394        time_domain_features = [
395            'mean', 'std', 'variance', 'rms', 'range', 'median', 'mode',
396            'mean_absolute_value', 'median_absolute_deviation', 'peak_height',
397            'zero_crossing_rate', 'energy'
398        ]
399        
400        frequency_domain_features = [
401            'dominant_frequency', 'peak_frequency', 'power_spectral_entropy',
402            'principal_harmonic_frequency', 'stride_times', 'step_time',
403            'cadence', 'freezing_index'
404        ]
405        
406        statistical_features = [
407            'skewness', 'kurtosis', 'entropy', 'interquartile_range', 'correlation'
408        ]
409        
410        other_features = ['ar_coefficients']
411        
412        return time_domain_features + frequency_domain_features + statistical_features + other_features
413
414    def print_extraction_summary(self, features: List[Dict]) -> None:
415        """
416        Print a detailed summary of extracted features.
417        
418        Args:
419            features: List of feature dictionaries returned by extract_features
420        """
421        print("\n" + "="*80)
422        print("📊 FEATURE EXTRACTION SUMMARY")
423        print("="*80)
424        
425        for i, feature_dict in enumerate(features):
426            sensor_name = feature_dict['name']
427            print(f"\n🎯 Sensor {i+1}: {sensor_name}")
428            print("-" * 40)
429            
430            if 'features' in feature_dict and feature_dict['features']:
431                for feature_type, feature_values in feature_dict['features'].items():
432                    if isinstance(feature_values, list):
433                        print(f"  📈 {feature_type}: {len(feature_values)} values")
434                        if feature_values:
435                            sample_value = feature_values[0]
436                            if isinstance(sample_value, (list, np.ndarray)):
437                                print(f"    └── Shape per window: {np.array(sample_value).shape}")
438                            else:
439                                print(f"    └── Sample value: {sample_value:.4f}")
440                    else:
441                        print(f"  📈 {feature_type}: {feature_values}")
442            
443            if 'annotations' in feature_dict:
444                print(f"  📝 Annotations: {len(feature_dict['annotations'])} windows")
445        
446        print("\n" + "="*80)

Comprehensive gait feature extractor class.

This class extracts various time-domain, frequency-domain, and statistical features from gait data sliding windows.

GaitFeatureExtractor(verbose: bool = True)
57    def __init__(self, verbose: bool = True):
58        super().__init__(
59            name="gait_features",
60            description="Comprehensive gait feature extractor for time-domain, frequency-domain, and statistical features"
61        )
62        self.verbose = verbose
63        self.config = {
64            'time_domain': True,
65            'frequency_domain': True,
66            'statistical': True,
67            'ar_order': 3  # Order for auto-regression coefficients
68        }
69        
70        if self.verbose:
71            print("🚀 GaitFeatureExtractor initialized successfully!")
72            print(f"📊 Default configuration: {self.config}")

Initialize the feature extractor.

Args: name: Name of the feature extractor description: Description of the feature extractor

verbose
config
def extract_features(self, windows: List[Dict], fs: int, **kwargs) -> List[Dict]:
 74    def extract_features(self, windows: List[Dict], fs: int, **kwargs) -> List[Dict]:
 75        """
 76        Extract gait features from sliding windows.
 77        
 78        Args:
 79            windows: List of sliding window dictionaries
 80            fs: Sampling frequency
 81            **kwargs: Additional arguments including time_domain, frequency_domain, statistical flags
 82            
 83        Returns:
 84            List of feature dictionaries for each sensor
 85        """
 86        # Update config with any passed arguments
 87        time_domain = kwargs.get('time_domain', self.config['time_domain'])
 88        frequency_domain = kwargs.get('frequency_domain', self.config['frequency_domain'])
 89        statistical = kwargs.get('statistical', self.config['statistical'])
 90        ar_order = kwargs.get('ar_order', self.config['ar_order'])
 91        
 92        if self.verbose:
 93            print("\n" + "="*60)
 94            print("🔍 STARTING GAIT FEATURE EXTRACTION")
 95            print("="*60)
 96            print(f"📈 Total sensors/windows to process: {len(windows)}")
 97            print(f"🔊 Sampling frequency: {fs} Hz")
 98            print(f"⏱️  Time domain features: {'✅' if time_domain else '❌'}")
 99            print(f"🌊 Frequency domain features: {'✅' if frequency_domain else '❌'}")
100            print(f"📊 Statistical features: {'✅' if statistical else '❌'}")
101            print(f"🔄 Auto-regression order: {ar_order}")
102            print("-"*60)
103        
104        features = []
105        
106        # Main progress bar for processing all windows
107        main_pbar = tqdm(
108            windows, 
109            desc="🔍 Processing Sensors", 
110            unit="sensor",
111            disable=not self.verbose
112        )
113        
114        for i, window_dict in enumerate(main_pbar):
115            sensor_name = window_dict['name']
116            window_data = window_dict['data']
117            
118            if self.verbose:
119                main_pbar.set_postfix({
120                    'Current': sensor_name,
121                    'Windows': len(window_data) if isinstance(window_data, list) else 1
122                })
123            
124            # Skip annotation windows
125            if sensor_name == 'annotations':
126                if self.verbose:
127                    logger.info(f"📝 Processing annotation data for {sensor_name}")
128                
129                features.append({
130                    'name': sensor_name,
131                    'features': {},
132                    'annotations': [self._extract_annotation_labels(window) for window in window_data]
133                })
134                continue
135            
136            if self.verbose:
137                logger.info(f"🎯 Processing sensor: {sensor_name}")
138                logger.info(f"📦 Number of windows: {len(window_data)}")
139            
140            sensor_features = {'name': sensor_name, 'features': {}}
141            
142            # Time domain features
143            if time_domain:
144                if self.verbose:
145                    print(f"  ⏱️  Extracting time domain features for {sensor_name}...")
146                
147                time_features = self._extract_time_domain_features(window_data)
148                sensor_features['features'].update(time_features)
149                
150                if self.verbose:
151                    feature_count = sum(len(v) if isinstance(v, list) else 1 for v in time_features.values())
152                    print(f"  ✅ Time domain: {len(time_features)} feature types, {feature_count} total features")
153            
154            # Frequency domain features
155            if frequency_domain:
156                if self.verbose:
157                    print(f"  🌊 Extracting frequency domain features for {sensor_name}...")
158                
159                freq_features = self._extract_frequency_domain_features(window_data, fs)
160                sensor_features['features'].update(freq_features)
161                
162                if self.verbose:
163                    feature_count = sum(len(v) if isinstance(v, list) else 1 for v in freq_features.values())
164                    print(f"  ✅ Frequency domain: {len(freq_features)} feature types, {feature_count} total features")
165            
166            # Statistical features
167            if statistical:
168                if self.verbose:
169                    print(f"  📊 Extracting statistical features for {sensor_name}...")
170                
171                stat_features = self._extract_statistical_features(window_data)
172                sensor_features['features'].update(stat_features)
173                
174                if self.verbose:
175                    feature_count = sum(len(v) if isinstance(v, list) else 1 for v in stat_features.values())
176                    print(f"  ✅ Statistical: {len(stat_features)} feature types, {feature_count} total features")
177            
178            # Auto-regression coefficients
179            if self.verbose:
180                print(f"  🔄 Extracting auto-regression coefficients for {sensor_name}...")
181            
182            ar_features = self._extract_ar_coefficients(window_data, ar_order)
183            sensor_features['features'].update(ar_features)
184            
185            if self.verbose:
186                feature_count = sum(len(v) if isinstance(v, list) else 1 for v in ar_features.values())
187                print(f"  ✅ Auto-regression: {len(ar_features)} feature types, {feature_count} total features")
188            
189            # Calculate total features for this sensor
190            total_features = sum(
191                len(v) if isinstance(v, list) else 1 
192                for v in sensor_features['features'].values()
193            )
194            
195            if self.verbose:
196                print(f"  🎯 Total features extracted for {sensor_name}: {total_features}")
197                print(f"  📋 Feature types: {list(sensor_features['features'].keys())}")
198                print("-"*40)
199            
200            features.append(sensor_features)
201        
202        if self.verbose:
203            print("\n" + "="*60)
204            print("🎉 FEATURE EXTRACTION COMPLETED!")
205            print("="*60)
206            print(f"📊 Total sensors processed: {len(features)}")
207            
208            # Calculate overall statistics
209            total_feature_count = 0
210            for feature_dict in features:
211                if 'features' in feature_dict:
212                    total_feature_count += sum(
213                        len(v) if isinstance(v, list) else 1 
214                        for v in feature_dict['features'].values()
215                    )
216            
217            print(f"🔢 Total features extracted: {total_feature_count}")
218            print(f"📈 Average features per sensor: {total_feature_count / len(features):.1f}")
219            print("="*60)
220        
221        return features

Extract gait features from sliding windows.

Args: windows: List of sliding window dictionaries fs: Sampling frequency **kwargs: Additional arguments including time_domain, frequency_domain, statistical flags

Returns: List of feature dictionaries for each sensor

def get_feature_names(self) -> List[str]:
387    def get_feature_names(self) -> List[str]:
388        """
389        Get names of all features that can be extracted.
390        
391        Returns:
392            List of feature names
393        """
394        time_domain_features = [
395            'mean', 'std', 'variance', 'rms', 'range', 'median', 'mode',
396            'mean_absolute_value', 'median_absolute_deviation', 'peak_height',
397            'zero_crossing_rate', 'energy'
398        ]
399        
400        frequency_domain_features = [
401            'dominant_frequency', 'peak_frequency', 'power_spectral_entropy',
402            'principal_harmonic_frequency', 'stride_times', 'step_time',
403            'cadence', 'freezing_index'
404        ]
405        
406        statistical_features = [
407            'skewness', 'kurtosis', 'entropy', 'interquartile_range', 'correlation'
408        ]
409        
410        other_features = ['ar_coefficients']
411        
412        return time_domain_features + frequency_domain_features + statistical_features + other_features

Get names of all features that can be extracted.

Returns: List of feature names

def print_extraction_summary(self, features: List[Dict]) -> None:
414    def print_extraction_summary(self, features: List[Dict]) -> None:
415        """
416        Print a detailed summary of extracted features.
417        
418        Args:
419            features: List of feature dictionaries returned by extract_features
420        """
421        print("\n" + "="*80)
422        print("📊 FEATURE EXTRACTION SUMMARY")
423        print("="*80)
424        
425        for i, feature_dict in enumerate(features):
426            sensor_name = feature_dict['name']
427            print(f"\n🎯 Sensor {i+1}: {sensor_name}")
428            print("-" * 40)
429            
430            if 'features' in feature_dict and feature_dict['features']:
431                for feature_type, feature_values in feature_dict['features'].items():
432                    if isinstance(feature_values, list):
433                        print(f"  📈 {feature_type}: {len(feature_values)} values")
434                        if feature_values:
435                            sample_value = feature_values[0]
436                            if isinstance(sample_value, (list, np.ndarray)):
437                                print(f"    └── Shape per window: {np.array(sample_value).shape}")
438                            else:
439                                print(f"    └── Sample value: {sample_value:.4f}")
440                    else:
441                        print(f"  📈 {feature_type}: {feature_values}")
442            
443            if 'annotations' in feature_dict:
444                print(f"  📝 Annotations: {len(feature_dict['annotations'])} windows")
445        
446        print("\n" + "="*80)

Print a detailed summary of extracted features.

Args: features: List of feature dictionaries returned by extract_features

class LBPFeatureExtractor(gaitsetpy.BaseFeatureExtractor):
 26class LBPFeatureExtractor(BaseFeatureExtractor):
 27    """
 28    Local Binary Pattern (LBP) feature extractor for VGRF data.
 29    
 30    This extractor converts time-series data into LBP codes and extracts
 31    histogram features from the LBP representation.
 32    """
 33    
 34    def __init__(self, verbose: bool = True):
 35        super().__init__(
 36            name="lbp_features",
 37            description="Local Binary Pattern feature extractor for VGRF time-series data"
 38        )
 39        self.verbose = verbose
 40        self.config = {
 41            'radius': 2,  # LBP radius (number of neighbors)
 42            'n_bins': 256,  # Number of histogram bins
 43            'normalize': True  # Normalize histogram
 44        }
 45        
 46        if self.verbose:
 47            print("🔍 LBP Feature Extractor initialized!")
 48    
 49    def lbp_1d(self, data: np.ndarray, radius: int = 2) -> str:
 50        """
 51        Compute 1D Local Binary Pattern for time-series data.
 52        
 53        Args:
 54            data: Input time-series data
 55            radius: Radius for LBP computation
 56            
 57        Returns:
 58            LBP code as binary string
 59        """
 60        n = len(data)
 61        lbp_code = ''
 62        
 63        for i in range(n):
 64            pattern = ''
 65            for j in range(i - radius, i + radius + 1):
 66                if j < 0 or j >= n:
 67                    pattern += '0'
 68                elif data[j] >= data[i]:
 69                    pattern += '1'
 70                else:
 71                    pattern += '0'
 72            lbp_code += pattern
 73        
 74        return lbp_code
 75    
 76    def lbp_to_histogram(self, lbp_code: str, n_bins: int = 256, normalize: bool = True) -> np.ndarray:
 77        """
 78        Convert LBP code to histogram features.
 79        
 80        Args:
 81            lbp_code: Binary LBP code string
 82            n_bins: Number of histogram bins
 83            normalize: Whether to normalize histogram
 84            
 85        Returns:
 86            Histogram features as numpy array
 87        """
 88        # Convert LBP code to integer values
 89        if len(lbp_code) == 0:
 90            return np.zeros(n_bins)
 91        
 92        # Process LBP code in chunks of 8 bits (or smaller)
 93        chunk_size = 8
 94        lbp_values = []
 95        
 96        for i in range(0, len(lbp_code), chunk_size):
 97            chunk = lbp_code[i:i + chunk_size]
 98            if len(chunk) > 0:
 99                # Convert binary string to integer
100                try:
101                    value = int(chunk, 2)
102                    lbp_values.append(value % n_bins)  # Ensure within bin range
103                except ValueError:
104                    continue
105        
106        if len(lbp_values) == 0:
107            return np.zeros(n_bins)
108        
109        # Create histogram
110        hist, _ = np.histogram(lbp_values, bins=n_bins, range=(0, n_bins))
111        
112        if normalize and np.sum(hist) > 0:
113            hist = hist / np.sum(hist)
114        
115        return hist
116    
117    def extract_features(self, windows: List[Dict], fs: int, **kwargs) -> List[Dict]:
118        """
119        Extract LBP features from sliding windows.
120        
121        Args:
122            windows: List of sliding window dictionaries
123            fs: Sampling frequency (unused for LBP)
124            **kwargs: Additional arguments
125            
126        Returns:
127            List of feature dictionaries
128        """
129        # Update config with any passed arguments
130        radius = kwargs.get('radius', self.config['radius'])
131        n_bins = kwargs.get('n_bins', self.config['n_bins'])
132        normalize = kwargs.get('normalize', self.config['normalize'])
133        
134        if self.verbose:
135            print(f"\n🔍 LBP Feature Extraction")
136            print(f"📊 Radius: {radius}, Bins: {n_bins}, Normalize: {normalize}")
137        
138        features = []
139        
140        for window_dict in tqdm(windows, desc="Processing LBP features", disable=not self.verbose):
141            sensor_name = window_dict['name']
142            window_data = window_dict['data']
143            
144            # Skip annotation windows
145            if sensor_name == 'annotations':
146                continue
147            
148            sensor_features = {'name': sensor_name, 'features': {}}
149            
150            # Extract LBP features for each window
151            lbp_histograms = []
152            lbp_means = []
153            lbp_stds = []
154            
155            for window in window_data:
156                # Ensure window is numpy array
157                if hasattr(window, 'values'):
158                    window = window.values
159                
160                # Compute LBP
161                lbp_code = self.lbp_1d(window, radius)
162                
163                # Convert to histogram
164                hist = self.lbp_to_histogram(lbp_code, n_bins, normalize)
165                lbp_histograms.append(hist)
166                
167                # Extract summary statistics
168                lbp_means.append(np.mean(hist))
169                lbp_stds.append(np.std(hist))
170            
171            # Store features
172            sensor_features['features'] = {
173                'lbp_histograms': lbp_histograms,
174                'lbp_mean': lbp_means,
175                'lbp_std': lbp_stds,
176                'lbp_energy': [np.sum(hist**2) for hist in lbp_histograms],
177                'lbp_entropy': [self._calculate_entropy(hist) for hist in lbp_histograms]
178            }
179            
180            features.append(sensor_features)
181        
182        return features
183    
184    def _calculate_entropy(self, hist: np.ndarray) -> float:
185        """Calculate entropy of histogram."""
186        # Avoid log(0) by adding small value
187        hist = hist + 1e-10
188        return -np.sum(hist * np.log2(hist))
189    
190    def get_feature_names(self) -> List[str]:
191        """Get names of LBP features."""
192        return [
193            'lbp_histograms', 'lbp_mean', 'lbp_std', 
194            'lbp_energy', 'lbp_entropy'
195        ]

Local Binary Pattern (LBP) feature extractor for VGRF data.

This extractor converts time-series data into LBP codes and extracts histogram features from the LBP representation.

LBPFeatureExtractor(verbose: bool = True)
34    def __init__(self, verbose: bool = True):
35        super().__init__(
36            name="lbp_features",
37            description="Local Binary Pattern feature extractor for VGRF time-series data"
38        )
39        self.verbose = verbose
40        self.config = {
41            'radius': 2,  # LBP radius (number of neighbors)
42            'n_bins': 256,  # Number of histogram bins
43            'normalize': True  # Normalize histogram
44        }
45        
46        if self.verbose:
47            print("🔍 LBP Feature Extractor initialized!")

Initialize the feature extractor.

Args: name: Name of the feature extractor description: Description of the feature extractor

verbose
config
def lbp_1d(self, data: numpy.ndarray, radius: int = 2) -> str:
49    def lbp_1d(self, data: np.ndarray, radius: int = 2) -> str:
50        """
51        Compute 1D Local Binary Pattern for time-series data.
52        
53        Args:
54            data: Input time-series data
55            radius: Radius for LBP computation
56            
57        Returns:
58            LBP code as binary string
59        """
60        n = len(data)
61        lbp_code = ''
62        
63        for i in range(n):
64            pattern = ''
65            for j in range(i - radius, i + radius + 1):
66                if j < 0 or j >= n:
67                    pattern += '0'
68                elif data[j] >= data[i]:
69                    pattern += '1'
70                else:
71                    pattern += '0'
72            lbp_code += pattern
73        
74        return lbp_code

Compute 1D Local Binary Pattern for time-series data.

Args: data: Input time-series data radius: Radius for LBP computation

Returns: LBP code as binary string

def lbp_to_histogram( self, lbp_code: str, n_bins: int = 256, normalize: bool = True) -> numpy.ndarray:
 76    def lbp_to_histogram(self, lbp_code: str, n_bins: int = 256, normalize: bool = True) -> np.ndarray:
 77        """
 78        Convert LBP code to histogram features.
 79        
 80        Args:
 81            lbp_code: Binary LBP code string
 82            n_bins: Number of histogram bins
 83            normalize: Whether to normalize histogram
 84            
 85        Returns:
 86            Histogram features as numpy array
 87        """
 88        # Convert LBP code to integer values
 89        if len(lbp_code) == 0:
 90            return np.zeros(n_bins)
 91        
 92        # Process LBP code in chunks of 8 bits (or smaller)
 93        chunk_size = 8
 94        lbp_values = []
 95        
 96        for i in range(0, len(lbp_code), chunk_size):
 97            chunk = lbp_code[i:i + chunk_size]
 98            if len(chunk) > 0:
 99                # Convert binary string to integer
100                try:
101                    value = int(chunk, 2)
102                    lbp_values.append(value % n_bins)  # Ensure within bin range
103                except ValueError:
104                    continue
105        
106        if len(lbp_values) == 0:
107            return np.zeros(n_bins)
108        
109        # Create histogram
110        hist, _ = np.histogram(lbp_values, bins=n_bins, range=(0, n_bins))
111        
112        if normalize and np.sum(hist) > 0:
113            hist = hist / np.sum(hist)
114        
115        return hist

Convert LBP code to histogram features.

Args: lbp_code: Binary LBP code string n_bins: Number of histogram bins normalize: Whether to normalize histogram

Returns: Histogram features as numpy array

def extract_features(self, windows: List[Dict], fs: int, **kwargs) -> List[Dict]:
117    def extract_features(self, windows: List[Dict], fs: int, **kwargs) -> List[Dict]:
118        """
119        Extract LBP features from sliding windows.
120        
121        Args:
122            windows: List of sliding window dictionaries
123            fs: Sampling frequency (unused for LBP)
124            **kwargs: Additional arguments
125            
126        Returns:
127            List of feature dictionaries
128        """
129        # Update config with any passed arguments
130        radius = kwargs.get('radius', self.config['radius'])
131        n_bins = kwargs.get('n_bins', self.config['n_bins'])
132        normalize = kwargs.get('normalize', self.config['normalize'])
133        
134        if self.verbose:
135            print(f"\n🔍 LBP Feature Extraction")
136            print(f"📊 Radius: {radius}, Bins: {n_bins}, Normalize: {normalize}")
137        
138        features = []
139        
140        for window_dict in tqdm(windows, desc="Processing LBP features", disable=not self.verbose):
141            sensor_name = window_dict['name']
142            window_data = window_dict['data']
143            
144            # Skip annotation windows
145            if sensor_name == 'annotations':
146                continue
147            
148            sensor_features = {'name': sensor_name, 'features': {}}
149            
150            # Extract LBP features for each window
151            lbp_histograms = []
152            lbp_means = []
153            lbp_stds = []
154            
155            for window in window_data:
156                # Ensure window is numpy array
157                if hasattr(window, 'values'):
158                    window = window.values
159                
160                # Compute LBP
161                lbp_code = self.lbp_1d(window, radius)
162                
163                # Convert to histogram
164                hist = self.lbp_to_histogram(lbp_code, n_bins, normalize)
165                lbp_histograms.append(hist)
166                
167                # Extract summary statistics
168                lbp_means.append(np.mean(hist))
169                lbp_stds.append(np.std(hist))
170            
171            # Store features
172            sensor_features['features'] = {
173                'lbp_histograms': lbp_histograms,
174                'lbp_mean': lbp_means,
175                'lbp_std': lbp_stds,
176                'lbp_energy': [np.sum(hist**2) for hist in lbp_histograms],
177                'lbp_entropy': [self._calculate_entropy(hist) for hist in lbp_histograms]
178            }
179            
180            features.append(sensor_features)
181        
182        return features

Extract LBP features from sliding windows.

Args: windows: List of sliding window dictionaries fs: Sampling frequency (unused for LBP) **kwargs: Additional arguments

Returns: List of feature dictionaries

def get_feature_names(self) -> List[str]:
190    def get_feature_names(self) -> List[str]:
191        """Get names of LBP features."""
192        return [
193            'lbp_histograms', 'lbp_mean', 'lbp_std', 
194            'lbp_energy', 'lbp_entropy'
195        ]

Get names of LBP features.

class FourierSeriesFeatureExtractor(gaitsetpy.BaseFeatureExtractor):
198class FourierSeriesFeatureExtractor(BaseFeatureExtractor):
199    """
200    Fourier Series feature extractor for VGRF data.
201    
202    This extractor fits Fourier series to time-series data and extracts
203    coefficients and reconstruction features.
204    """
205    
206    def __init__(self, verbose: bool = True):
207        super().__init__(
208            name="fourier_features",
209            description="Fourier series feature extractor for VGRF time-series data"
210        )
211        self.verbose = verbose
212        self.config = {
213            'n_terms': 10,  # Number of Fourier terms
214            'period': 3.0,  # Period for Fourier series
215            'extract_coefficients': True,
216            'extract_reconstruction_error': True
217        }
218        
219        if self.verbose:
220            print("🌊 Fourier Series Feature Extractor initialized!")
221    
222    def fit_fourier_series(self, signal: np.ndarray, time_points: np.ndarray, 
223                          period: float = 3.0, n_terms: int = 10) -> Dict[str, Any]:
224        """
225        Fit Fourier series to signal.
226        
227        Args:
228            signal: Input signal
229            time_points: Time points
230            period: Period of the Fourier series
231            n_terms: Number of Fourier terms
232            
233        Returns:
234            Dictionary containing Fourier series parameters
235        """
236        try:
237            # Calculate Fourier coefficients
238            L = period
239            
240            # Calculate a0 (DC component)
241            a0 = 2/L * simpson(signal, time_points)
242            
243            # Calculate an and bn coefficients
244            an = []
245            bn = []
246            
247            for n in range(1, n_terms + 1):
248                # Calculate an coefficient
249                an_val = 2.0/L * simpson(signal * np.cos(2.*np.pi*n*time_points/L), time_points)
250                an.append(an_val)
251                
252                # Calculate bn coefficient
253                bn_val = 2.0/L * simpson(signal * np.sin(2.*np.pi*n*time_points/L), time_points)
254                bn.append(bn_val)
255            
256            # Reconstruct signal
257            reconstructed = np.full_like(time_points, a0/2)
258            for n in range(n_terms):
259                reconstructed += an[n] * np.cos(2.*np.pi*(n+1)*time_points/L)
260                reconstructed += bn[n] * np.sin(2.*np.pi*(n+1)*time_points/L)
261            
262            # Calculate reconstruction error
263            reconstruction_error = np.mean((signal - reconstructed)**2)
264            
265            return {
266                'a0': a0,
267                'an': an,
268                'bn': bn,
269                'reconstructed': reconstructed,
270                'reconstruction_error': reconstruction_error,
271                'fourier_energy': a0**2 + 2*np.sum(np.array(an)**2 + np.array(bn)**2)
272            }
273            
274        except Exception as e:
275            if self.verbose:
276                print(f"Error in Fourier series fitting: {e}")
277            return {
278                'a0': 0,
279                'an': [0] * n_terms,
280                'bn': [0] * n_terms,
281                'reconstructed': np.zeros_like(time_points),
282                'reconstruction_error': float('inf'),
283                'fourier_energy': 0
284            }
285    
286    def extract_features(self, windows: List[Dict], fs: int, **kwargs) -> List[Dict]:
287        """
288        Extract Fourier series features from sliding windows.
289        
290        Args:
291            windows: List of sliding window dictionaries
292            fs: Sampling frequency
293            **kwargs: Additional arguments
294            
295        Returns:
296            List of feature dictionaries
297        """
298        # Update config with any passed arguments
299        n_terms = kwargs.get('n_terms', self.config['n_terms'])
300        period = kwargs.get('period', self.config['period'])
301        
302        if self.verbose:
303            print(f"\n🌊 Fourier Series Feature Extraction")
304            print(f"📊 Terms: {n_terms}, Period: {period}")
305        
306        features = []
307        
308        for window_dict in tqdm(windows, desc="Processing Fourier features", disable=not self.verbose):
309            sensor_name = window_dict['name']
310            window_data = window_dict['data']
311            
312            # Skip annotation windows
313            if sensor_name == 'annotations':
314                continue
315            
316            sensor_features = {'name': sensor_name, 'features': {}}
317            
318            # Extract Fourier features for each window
319            a0_values = []
320            an_values = []
321            bn_values = []
322            reconstruction_errors = []
323            fourier_energies = []
324            
325            for window in window_data:
326                # Ensure window is numpy array
327                if hasattr(window, 'values'):
328                    window = window.values
329                
330                # Create time points
331                time_points = np.linspace(0, period, len(window))
332                
333                # Fit Fourier series
334                fourier_result = self.fit_fourier_series(window, time_points, period, n_terms)
335                
336                # Store results
337                a0_values.append(fourier_result['a0'])
338                an_values.append(fourier_result['an'])
339                bn_values.append(fourier_result['bn'])
340                reconstruction_errors.append(fourier_result['reconstruction_error'])
341                fourier_energies.append(fourier_result['fourier_energy'])
342            
343            # Store features
344            sensor_features['features'] = {
345                'fourier_a0': a0_values,
346                'fourier_an': an_values,
347                'fourier_bn': bn_values,
348                'fourier_reconstruction_error': reconstruction_errors,
349                'fourier_energy': fourier_energies,
350                'fourier_an_mean': [np.mean(an) for an in an_values],
351                'fourier_bn_mean': [np.mean(bn) for bn in bn_values],
352                'fourier_an_std': [np.std(an) for an in an_values],
353                'fourier_bn_std': [np.std(bn) for bn in bn_values]
354            }
355            
356            features.append(sensor_features)
357        
358        return features
359    
360    def get_feature_names(self) -> List[str]:
361        """Get names of Fourier series features."""
362        return [
363            'fourier_a0', 'fourier_an', 'fourier_bn', 
364            'fourier_reconstruction_error', 'fourier_energy',
365            'fourier_an_mean', 'fourier_bn_mean',
366            'fourier_an_std', 'fourier_bn_std'
367        ]

Fourier Series feature extractor for VGRF data.

This extractor fits Fourier series to time-series data and extracts coefficients and reconstruction features.

FourierSeriesFeatureExtractor(verbose: bool = True)
206    def __init__(self, verbose: bool = True):
207        super().__init__(
208            name="fourier_features",
209            description="Fourier series feature extractor for VGRF time-series data"
210        )
211        self.verbose = verbose
212        self.config = {
213            'n_terms': 10,  # Number of Fourier terms
214            'period': 3.0,  # Period for Fourier series
215            'extract_coefficients': True,
216            'extract_reconstruction_error': True
217        }
218        
219        if self.verbose:
220            print("🌊 Fourier Series Feature Extractor initialized!")

Initialize the feature extractor.

Args: name: Name of the feature extractor description: Description of the feature extractor

verbose
config
def fit_fourier_series( self, signal: numpy.ndarray, time_points: numpy.ndarray, period: float = 3.0, n_terms: int = 10) -> Dict[str, Any]:
222    def fit_fourier_series(self, signal: np.ndarray, time_points: np.ndarray, 
223                          period: float = 3.0, n_terms: int = 10) -> Dict[str, Any]:
224        """
225        Fit Fourier series to signal.
226        
227        Args:
228            signal: Input signal
229            time_points: Time points
230            period: Period of the Fourier series
231            n_terms: Number of Fourier terms
232            
233        Returns:
234            Dictionary containing Fourier series parameters
235        """
236        try:
237            # Calculate Fourier coefficients
238            L = period
239            
240            # Calculate a0 (DC component)
241            a0 = 2/L * simpson(signal, time_points)
242            
243            # Calculate an and bn coefficients
244            an = []
245            bn = []
246            
247            for n in range(1, n_terms + 1):
248                # Calculate an coefficient
249                an_val = 2.0/L * simpson(signal * np.cos(2.*np.pi*n*time_points/L), time_points)
250                an.append(an_val)
251                
252                # Calculate bn coefficient
253                bn_val = 2.0/L * simpson(signal * np.sin(2.*np.pi*n*time_points/L), time_points)
254                bn.append(bn_val)
255            
256            # Reconstruct signal
257            reconstructed = np.full_like(time_points, a0/2)
258            for n in range(n_terms):
259                reconstructed += an[n] * np.cos(2.*np.pi*(n+1)*time_points/L)
260                reconstructed += bn[n] * np.sin(2.*np.pi*(n+1)*time_points/L)
261            
262            # Calculate reconstruction error
263            reconstruction_error = np.mean((signal - reconstructed)**2)
264            
265            return {
266                'a0': a0,
267                'an': an,
268                'bn': bn,
269                'reconstructed': reconstructed,
270                'reconstruction_error': reconstruction_error,
271                'fourier_energy': a0**2 + 2*np.sum(np.array(an)**2 + np.array(bn)**2)
272            }
273            
274        except Exception as e:
275            if self.verbose:
276                print(f"Error in Fourier series fitting: {e}")
277            return {
278                'a0': 0,
279                'an': [0] * n_terms,
280                'bn': [0] * n_terms,
281                'reconstructed': np.zeros_like(time_points),
282                'reconstruction_error': float('inf'),
283                'fourier_energy': 0
284            }

Fit Fourier series to signal.

Args: signal: Input signal time_points: Time points period: Period of the Fourier series n_terms: Number of Fourier terms

Returns: Dictionary containing Fourier series parameters

def extract_features(self, windows: List[Dict], fs: int, **kwargs) -> List[Dict]:
286    def extract_features(self, windows: List[Dict], fs: int, **kwargs) -> List[Dict]:
287        """
288        Extract Fourier series features from sliding windows.
289        
290        Args:
291            windows: List of sliding window dictionaries
292            fs: Sampling frequency
293            **kwargs: Additional arguments
294            
295        Returns:
296            List of feature dictionaries
297        """
298        # Update config with any passed arguments
299        n_terms = kwargs.get('n_terms', self.config['n_terms'])
300        period = kwargs.get('period', self.config['period'])
301        
302        if self.verbose:
303            print(f"\n🌊 Fourier Series Feature Extraction")
304            print(f"📊 Terms: {n_terms}, Period: {period}")
305        
306        features = []
307        
308        for window_dict in tqdm(windows, desc="Processing Fourier features", disable=not self.verbose):
309            sensor_name = window_dict['name']
310            window_data = window_dict['data']
311            
312            # Skip annotation windows
313            if sensor_name == 'annotations':
314                continue
315            
316            sensor_features = {'name': sensor_name, 'features': {}}
317            
318            # Extract Fourier features for each window
319            a0_values = []
320            an_values = []
321            bn_values = []
322            reconstruction_errors = []
323            fourier_energies = []
324            
325            for window in window_data:
326                # Ensure window is numpy array
327                if hasattr(window, 'values'):
328                    window = window.values
329                
330                # Create time points
331                time_points = np.linspace(0, period, len(window))
332                
333                # Fit Fourier series
334                fourier_result = self.fit_fourier_series(window, time_points, period, n_terms)
335                
336                # Store results
337                a0_values.append(fourier_result['a0'])
338                an_values.append(fourier_result['an'])
339                bn_values.append(fourier_result['bn'])
340                reconstruction_errors.append(fourier_result['reconstruction_error'])
341                fourier_energies.append(fourier_result['fourier_energy'])
342            
343            # Store features
344            sensor_features['features'] = {
345                'fourier_a0': a0_values,
346                'fourier_an': an_values,
347                'fourier_bn': bn_values,
348                'fourier_reconstruction_error': reconstruction_errors,
349                'fourier_energy': fourier_energies,
350                'fourier_an_mean': [np.mean(an) for an in an_values],
351                'fourier_bn_mean': [np.mean(bn) for bn in bn_values],
352                'fourier_an_std': [np.std(an) for an in an_values],
353                'fourier_bn_std': [np.std(bn) for bn in bn_values]
354            }
355            
356            features.append(sensor_features)
357        
358        return features

Extract Fourier series features from sliding windows.

Args: windows: List of sliding window dictionaries fs: Sampling frequency **kwargs: Additional arguments

Returns: List of feature dictionaries

def get_feature_names(self) -> List[str]:
360    def get_feature_names(self) -> List[str]:
361        """Get names of Fourier series features."""
362        return [
363            'fourier_a0', 'fourier_an', 'fourier_bn', 
364            'fourier_reconstruction_error', 'fourier_energy',
365            'fourier_an_mean', 'fourier_bn_mean',
366            'fourier_an_std', 'fourier_bn_std'
367        ]

Get names of Fourier series features.

class PhysioNetFeatureExtractor(gaitsetpy.BaseFeatureExtractor):
370class PhysioNetFeatureExtractor(BaseFeatureExtractor):
371    """
372    Combined feature extractor for PhysioNet VGRF data.
373    
374    This extractor combines LBP and Fourier series features along with
375    basic statistical features specific to VGRF data.
376    """
377    
378    def __init__(self, verbose: bool = True):
379        super().__init__(
380            name="physionet_features",
381            description="Combined feature extractor for PhysioNet VGRF data including LBP and Fourier features"
382        )
383        self.verbose = verbose
384        self.lbp_extractor = LBPFeatureExtractor(verbose=False)
385        self.fourier_extractor = FourierSeriesFeatureExtractor(verbose=False)
386        
387        if self.verbose:
388            print("🚀 PhysioNet Feature Extractor initialized!")
389    
390    def extract_features(self, windows: List[Dict], fs: int, **kwargs) -> List[Dict]:
391        """
392        Extract combined features from sliding windows.
393        
394        Args:
395            windows: List of sliding window dictionaries
396            fs: Sampling frequency
397            **kwargs: Additional arguments
398            
399        Returns:
400            List of feature dictionaries
401        """
402        # Extract features from each extractor
403        extract_lbp = kwargs.get('extract_lbp', True)
404        extract_fourier = kwargs.get('extract_fourier', True)
405        extract_statistical = kwargs.get('extract_statistical', True)
406        
407        if self.verbose:
408            print(f"\n🔍 PhysioNet Feature Extraction")
409            print(f"📊 LBP: {extract_lbp}, Fourier: {extract_fourier}, Statistical: {extract_statistical}")
410        
411        features = []
412        
413        # Extract LBP features
414        if extract_lbp:
415            lbp_features = self.lbp_extractor.extract_features(windows, fs, **kwargs)
416        else:
417            lbp_features = []
418        
419        # Extract Fourier features
420        if extract_fourier:
421            fourier_features = self.fourier_extractor.extract_features(windows, fs, **kwargs)
422        else:
423            fourier_features = []
424        
425        # Extract statistical features
426        if extract_statistical:
427            statistical_features = self._extract_statistical_features(windows)
428        else:
429            statistical_features = []
430        
431        # Combine features
432        for i, window_dict in enumerate(windows):
433            sensor_name = window_dict['name']
434            
435            # Skip annotation windows
436            if sensor_name == 'annotations':
437                continue
438            
439            combined_features = {'name': sensor_name, 'features': {}}
440            
441            # Add LBP features
442            if extract_lbp and i < len(lbp_features):
443                combined_features['features'].update(lbp_features[i]['features'])
444            
445            # Add Fourier features
446            if extract_fourier and i < len(fourier_features):
447                combined_features['features'].update(fourier_features[i]['features'])
448            
449            # Add statistical features
450            if extract_statistical and i < len(statistical_features):
451                combined_features['features'].update(statistical_features[i]['features'])
452            
453            features.append(combined_features)
454        
455        return features
456    
457    def _extract_statistical_features(self, windows: List[Dict]) -> List[Dict]:
458        """Extract basic statistical features."""
459        features = []
460        
461        for window_dict in windows:
462            sensor_name = window_dict['name']
463            window_data = window_dict['data']
464            
465            # Skip annotation windows
466            if sensor_name == 'annotations':
467                continue
468            
469            sensor_features = {'name': sensor_name, 'features': {}}
470            
471            # Extract statistical features for each window
472            means = []
473            stds = []
474            maxs = []
475            mins = []
476            ranges = []
477            
478            for window in window_data:
479                # Ensure window is numpy array
480                if hasattr(window, 'values'):
481                    window = window.values
482                
483                means.append(np.mean(window))
484                stds.append(np.std(window))
485                maxs.append(np.max(window))
486                mins.append(np.min(window))
487                ranges.append(np.max(window) - np.min(window))
488            
489            # Store features
490            sensor_features['features'] = {
491                'vgrf_mean': means,
492                'vgrf_std': stds,
493                'vgrf_max': maxs,
494                'vgrf_min': mins,
495                'vgrf_range': ranges
496            }
497            
498            features.append(sensor_features)
499        
500        return features
501    
502    def get_feature_names(self) -> List[str]:
503        """Get names of all features."""
504        feature_names = []
505        feature_names.extend(self.lbp_extractor.get_feature_names())
506        feature_names.extend(self.fourier_extractor.get_feature_names())
507        feature_names.extend(['vgrf_mean', 'vgrf_std', 'vgrf_max', 'vgrf_min', 'vgrf_range'])
508        return feature_names

Combined feature extractor for PhysioNet VGRF data.

This extractor combines LBP and Fourier series features along with basic statistical features specific to VGRF data.

PhysioNetFeatureExtractor(verbose: bool = True)
378    def __init__(self, verbose: bool = True):
379        super().__init__(
380            name="physionet_features",
381            description="Combined feature extractor for PhysioNet VGRF data including LBP and Fourier features"
382        )
383        self.verbose = verbose
384        self.lbp_extractor = LBPFeatureExtractor(verbose=False)
385        self.fourier_extractor = FourierSeriesFeatureExtractor(verbose=False)
386        
387        if self.verbose:
388            print("🚀 PhysioNet Feature Extractor initialized!")

Initialize the feature extractor.

Args: name: Name of the feature extractor description: Description of the feature extractor

verbose
lbp_extractor
fourier_extractor
def extract_features(self, windows: List[Dict], fs: int, **kwargs) -> List[Dict]:
390    def extract_features(self, windows: List[Dict], fs: int, **kwargs) -> List[Dict]:
391        """
392        Extract combined features from sliding windows.
393        
394        Args:
395            windows: List of sliding window dictionaries
396            fs: Sampling frequency
397            **kwargs: Additional arguments
398            
399        Returns:
400            List of feature dictionaries
401        """
402        # Extract features from each extractor
403        extract_lbp = kwargs.get('extract_lbp', True)
404        extract_fourier = kwargs.get('extract_fourier', True)
405        extract_statistical = kwargs.get('extract_statistical', True)
406        
407        if self.verbose:
408            print(f"\n🔍 PhysioNet Feature Extraction")
409            print(f"📊 LBP: {extract_lbp}, Fourier: {extract_fourier}, Statistical: {extract_statistical}")
410        
411        features = []
412        
413        # Extract LBP features
414        if extract_lbp:
415            lbp_features = self.lbp_extractor.extract_features(windows, fs, **kwargs)
416        else:
417            lbp_features = []
418        
419        # Extract Fourier features
420        if extract_fourier:
421            fourier_features = self.fourier_extractor.extract_features(windows, fs, **kwargs)
422        else:
423            fourier_features = []
424        
425        # Extract statistical features
426        if extract_statistical:
427            statistical_features = self._extract_statistical_features(windows)
428        else:
429            statistical_features = []
430        
431        # Combine features
432        for i, window_dict in enumerate(windows):
433            sensor_name = window_dict['name']
434            
435            # Skip annotation windows
436            if sensor_name == 'annotations':
437                continue
438            
439            combined_features = {'name': sensor_name, 'features': {}}
440            
441            # Add LBP features
442            if extract_lbp and i < len(lbp_features):
443                combined_features['features'].update(lbp_features[i]['features'])
444            
445            # Add Fourier features
446            if extract_fourier and i < len(fourier_features):
447                combined_features['features'].update(fourier_features[i]['features'])
448            
449            # Add statistical features
450            if extract_statistical and i < len(statistical_features):
451                combined_features['features'].update(statistical_features[i]['features'])
452            
453            features.append(combined_features)
454        
455        return features

Extract combined features from sliding windows.

Args: windows: List of sliding window dictionaries fs: Sampling frequency **kwargs: Additional arguments

Returns: List of feature dictionaries

def get_feature_names(self) -> List[str]:
502    def get_feature_names(self) -> List[str]:
503        """Get names of all features."""
504        feature_names = []
505        feature_names.extend(self.lbp_extractor.get_feature_names())
506        feature_names.extend(self.fourier_extractor.get_feature_names())
507        feature_names.extend(['vgrf_mean', 'vgrf_std', 'vgrf_max', 'vgrf_min', 'vgrf_range'])
508        return feature_names

Get names of all features.

class ClippingPreprocessor(gaitsetpy.BasePreprocessor):
18class ClippingPreprocessor(BasePreprocessor):
19    """
20    Preprocessor for clipping values to a specified range.
21    """
22    
23    def __init__(self, min_val: float = -1, max_val: float = 1):
24        super().__init__(
25            name="clipping",
26            description="Clips values in the data to be within a specified range"
27        )
28        self.config = {
29            'min_val': min_val,
30            'max_val': max_val
31        }
32    
33    def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs):
34        """
35        Fit the preprocessor (no fitting needed for clipping).
36        
37        Args:
38            data: Input data to fit on
39            **kwargs: Additional arguments
40        """
41        # Update config with any passed arguments
42        self.config.update({k: v for k, v in kwargs.items() if k in ['min_val', 'max_val']})
43        self.fitted = True
44    
45    def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]:
46        """
47        Clip values in the data to be within the specified range.
48        
49        Args:
50            data: Input data to transform
51            **kwargs: Additional arguments
52            
53        Returns:
54            Clipped data
55        """
56        min_val = kwargs.get('min_val', self.config['min_val'])
57        max_val = kwargs.get('max_val', self.config['max_val'])
58        
59        return np.clip(data, min_val, max_val)

Preprocessor for clipping values to a specified range.

ClippingPreprocessor(min_val: float = -1, max_val: float = 1)
23    def __init__(self, min_val: float = -1, max_val: float = 1):
24        super().__init__(
25            name="clipping",
26            description="Clips values in the data to be within a specified range"
27        )
28        self.config = {
29            'min_val': min_val,
30            'max_val': max_val
31        }

Initialize the preprocessor.

Args: name: Name of the preprocessor description: Description of the preprocessor

config
def fit( self, data: Union[pandas.core.frame.DataFrame, numpy.ndarray], **kwargs):
33    def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs):
34        """
35        Fit the preprocessor (no fitting needed for clipping).
36        
37        Args:
38            data: Input data to fit on
39            **kwargs: Additional arguments
40        """
41        # Update config with any passed arguments
42        self.config.update({k: v for k, v in kwargs.items() if k in ['min_val', 'max_val']})
43        self.fitted = True

Fit the preprocessor (no fitting needed for clipping).

Args: data: Input data to fit on **kwargs: Additional arguments

def transform( self, data: Union[pandas.core.frame.DataFrame, numpy.ndarray], **kwargs) -> Union[pandas.core.frame.DataFrame, numpy.ndarray]:
45    def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]:
46        """
47        Clip values in the data to be within the specified range.
48        
49        Args:
50            data: Input data to transform
51            **kwargs: Additional arguments
52            
53        Returns:
54            Clipped data
55        """
56        min_val = kwargs.get('min_val', self.config['min_val'])
57        max_val = kwargs.get('max_val', self.config['max_val'])
58        
59        return np.clip(data, min_val, max_val)

Clip values in the data to be within the specified range.

Args: data: Input data to transform **kwargs: Additional arguments

Returns: Clipped data

class NoiseRemovalPreprocessor(gaitsetpy.BasePreprocessor):
 62class NoiseRemovalPreprocessor(BasePreprocessor):
 63    """
 64    Preprocessor for removing noise using moving average filter.
 65    """
 66    
 67    def __init__(self, window_size: int = 5):
 68        super().__init__(
 69            name="noise_removal",
 70            description="Applies a moving average filter to reduce noise"
 71        )
 72        self.config = {
 73            'window_size': window_size
 74        }
 75    
 76    def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs):
 77        """
 78        Fit the preprocessor (no fitting needed for noise removal).
 79        
 80        Args:
 81            data: Input data to fit on
 82            **kwargs: Additional arguments
 83        """
 84        self.config.update({k: v for k, v in kwargs.items() if k in ['window_size']})
 85        self.fitted = True
 86    
 87    def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]:
 88        """
 89        Apply a moving average filter to reduce noise.
 90        
 91        Args:
 92            data: Input data to transform
 93            **kwargs: Additional arguments
 94            
 95        Returns:
 96            Noise-reduced data
 97        """
 98        window_size = kwargs.get('window_size', self.config['window_size'])
 99        
100        if isinstance(data, pd.DataFrame):
101            return data.rolling(window=window_size, center=True).mean().bfill().ffill()
102        elif isinstance(data, pd.Series):
103            return data.rolling(window=window_size, center=True).mean().bfill().ffill()
104        else:
105            # For numpy arrays, use uniform filter
106            from scipy.ndimage import uniform_filter1d
107            return uniform_filter1d(data, size=window_size, mode='nearest')

Preprocessor for removing noise using moving average filter.

NoiseRemovalPreprocessor(window_size: int = 5)
67    def __init__(self, window_size: int = 5):
68        super().__init__(
69            name="noise_removal",
70            description="Applies a moving average filter to reduce noise"
71        )
72        self.config = {
73            'window_size': window_size
74        }

Initialize the preprocessor.

Args: name: Name of the preprocessor description: Description of the preprocessor

config
def fit( self, data: Union[pandas.core.frame.DataFrame, numpy.ndarray], **kwargs):
76    def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs):
77        """
78        Fit the preprocessor (no fitting needed for noise removal).
79        
80        Args:
81            data: Input data to fit on
82            **kwargs: Additional arguments
83        """
84        self.config.update({k: v for k, v in kwargs.items() if k in ['window_size']})
85        self.fitted = True

Fit the preprocessor (no fitting needed for noise removal).

Args: data: Input data to fit on **kwargs: Additional arguments

def transform( self, data: Union[pandas.core.frame.DataFrame, numpy.ndarray], **kwargs) -> Union[pandas.core.frame.DataFrame, numpy.ndarray]:
 87    def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]:
 88        """
 89        Apply a moving average filter to reduce noise.
 90        
 91        Args:
 92            data: Input data to transform
 93            **kwargs: Additional arguments
 94            
 95        Returns:
 96            Noise-reduced data
 97        """
 98        window_size = kwargs.get('window_size', self.config['window_size'])
 99        
100        if isinstance(data, pd.DataFrame):
101            return data.rolling(window=window_size, center=True).mean().bfill().ffill()
102        elif isinstance(data, pd.Series):
103            return data.rolling(window=window_size, center=True).mean().bfill().ffill()
104        else:
105            # For numpy arrays, use uniform filter
106            from scipy.ndimage import uniform_filter1d
107            return uniform_filter1d(data, size=window_size, mode='nearest')

Apply a moving average filter to reduce noise.

Args: data: Input data to transform **kwargs: Additional arguments

Returns: Noise-reduced data

class OutlierRemovalPreprocessor(gaitsetpy.BasePreprocessor):
110class OutlierRemovalPreprocessor(BasePreprocessor):
111    """
112    Preprocessor for removing outliers using Z-score method.
113    """
114    
115    def __init__(self, threshold: float = 3):
116        super().__init__(
117            name="outlier_removal",
118            description="Removes outliers beyond a given threshold using the Z-score method"
119        )
120        self.config = {
121            'threshold': threshold
122        }
123        self.mean_ = None
124        self.std_ = None
125    
126    def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs):
127        """
128        Fit the preprocessor by computing mean and standard deviation.
129        
130        Args:
131            data: Input data to fit on
132            **kwargs: Additional arguments
133        """
134        self.config.update({k: v for k, v in kwargs.items() if k in ['threshold']})
135        
136        if isinstance(data, (pd.DataFrame, pd.Series)):
137            self.mean_ = data.mean()
138            self.std_ = data.std()
139        else:
140            self.mean_ = np.mean(data)
141            self.std_ = np.std(data)
142        
143        self.fitted = True
144    
145    def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]:
146        """
147        Remove outliers beyond the threshold using Z-score method.
148        
149        Args:
150            data: Input data to transform
151            **kwargs: Additional arguments
152            
153        Returns:
154            Data with outliers removed
155        """
156        threshold = kwargs.get('threshold', self.config['threshold'])
157        
158        if isinstance(data, (pd.DataFrame, pd.Series)):
159            z_scores = (data - self.mean_).abs() / self.std_
160            return data[z_scores <= threshold]
161        else:
162            z_scores = np.abs(data - self.mean_) / self.std_
163            return data[z_scores <= threshold]

Preprocessor for removing outliers using Z-score method.

OutlierRemovalPreprocessor(threshold: float = 3)
115    def __init__(self, threshold: float = 3):
116        super().__init__(
117            name="outlier_removal",
118            description="Removes outliers beyond a given threshold using the Z-score method"
119        )
120        self.config = {
121            'threshold': threshold
122        }
123        self.mean_ = None
124        self.std_ = None

Initialize the preprocessor.

Args: name: Name of the preprocessor description: Description of the preprocessor

config
mean_
std_
def fit( self, data: Union[pandas.core.frame.DataFrame, numpy.ndarray], **kwargs):
126    def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs):
127        """
128        Fit the preprocessor by computing mean and standard deviation.
129        
130        Args:
131            data: Input data to fit on
132            **kwargs: Additional arguments
133        """
134        self.config.update({k: v for k, v in kwargs.items() if k in ['threshold']})
135        
136        if isinstance(data, (pd.DataFrame, pd.Series)):
137            self.mean_ = data.mean()
138            self.std_ = data.std()
139        else:
140            self.mean_ = np.mean(data)
141            self.std_ = np.std(data)
142        
143        self.fitted = True

Fit the preprocessor by computing mean and standard deviation.

Args: data: Input data to fit on **kwargs: Additional arguments

def transform( self, data: Union[pandas.core.frame.DataFrame, numpy.ndarray], **kwargs) -> Union[pandas.core.frame.DataFrame, numpy.ndarray]:
145    def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]:
146        """
147        Remove outliers beyond the threshold using Z-score method.
148        
149        Args:
150            data: Input data to transform
151            **kwargs: Additional arguments
152            
153        Returns:
154            Data with outliers removed
155        """
156        threshold = kwargs.get('threshold', self.config['threshold'])
157        
158        if isinstance(data, (pd.DataFrame, pd.Series)):
159            z_scores = (data - self.mean_).abs() / self.std_
160            return data[z_scores <= threshold]
161        else:
162            z_scores = np.abs(data - self.mean_) / self.std_
163            return data[z_scores <= threshold]

Remove outliers beyond the threshold using Z-score method.

Args: data: Input data to transform **kwargs: Additional arguments

Returns: Data with outliers removed

class BaselineRemovalPreprocessor(gaitsetpy.BasePreprocessor):
166class BaselineRemovalPreprocessor(BasePreprocessor):
167    """
168    Preprocessor for removing baseline by subtracting the mean.
169    """
170    
171    def __init__(self):
172        super().__init__(
173            name="baseline_removal",
174            description="Removes baseline by subtracting the mean"
175        )
176        self.mean_ = None
177    
178    def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs):
179        """
180        Fit the preprocessor by computing the mean.
181        
182        Args:
183            data: Input data to fit on
184            **kwargs: Additional arguments
185        """
186        if isinstance(data, (pd.DataFrame, pd.Series)):
187            self.mean_ = data.mean()
188        else:
189            self.mean_ = np.mean(data)
190        
191        self.fitted = True
192    
193    def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]:
194        """
195        Remove baseline by subtracting the mean.
196        
197        Args:
198            data: Input data to transform
199            **kwargs: Additional arguments
200            
201        Returns:
202            Baseline-corrected data
203        """
204        return data - self.mean_

Preprocessor for removing baseline by subtracting the mean.

BaselineRemovalPreprocessor()
171    def __init__(self):
172        super().__init__(
173            name="baseline_removal",
174            description="Removes baseline by subtracting the mean"
175        )
176        self.mean_ = None

Initialize the preprocessor.

Args: name: Name of the preprocessor description: Description of the preprocessor

mean_
def fit( self, data: Union[pandas.core.frame.DataFrame, numpy.ndarray], **kwargs):
178    def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs):
179        """
180        Fit the preprocessor by computing the mean.
181        
182        Args:
183            data: Input data to fit on
184            **kwargs: Additional arguments
185        """
186        if isinstance(data, (pd.DataFrame, pd.Series)):
187            self.mean_ = data.mean()
188        else:
189            self.mean_ = np.mean(data)
190        
191        self.fitted = True

Fit the preprocessor by computing the mean.

Args: data: Input data to fit on **kwargs: Additional arguments

def transform( self, data: Union[pandas.core.frame.DataFrame, numpy.ndarray], **kwargs) -> Union[pandas.core.frame.DataFrame, numpy.ndarray]:
193    def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]:
194        """
195        Remove baseline by subtracting the mean.
196        
197        Args:
198            data: Input data to transform
199            **kwargs: Additional arguments
200            
201        Returns:
202            Baseline-corrected data
203        """
204        return data - self.mean_

Remove baseline by subtracting the mean.

Args: data: Input data to transform **kwargs: Additional arguments

Returns: Baseline-corrected data

class DriftRemovalPreprocessor(gaitsetpy.BasePreprocessor):
207class DriftRemovalPreprocessor(BasePreprocessor):
208    """
209    Preprocessor for removing low-frequency drift using high-pass filter.
210    """
211    
212    def __init__(self, cutoff: float = 0.01, fs: int = 100):
213        super().__init__(
214            name="drift_removal",
215            description="Removes low-frequency drift using a high-pass filter"
216        )
217        self.config = {
218            'cutoff': cutoff,
219            'fs': fs
220        }
221    
222    def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs):
223        """
224        Fit the preprocessor (no fitting needed for drift removal).
225        
226        Args:
227            data: Input data to fit on
228            **kwargs: Additional arguments
229        """
230        self.config.update({k: v for k, v in kwargs.items() if k in ['cutoff', 'fs']})
231        self.fitted = True
232    
233    def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]:
234        """
235        Remove low-frequency drift using a high-pass filter.
236        
237        Args:
238            data: Input data to transform
239            **kwargs: Additional arguments
240            
241        Returns:
242            Drift-corrected data
243        """
244        cutoff = kwargs.get('cutoff', self.config['cutoff'])
245        fs = kwargs.get('fs', self.config['fs'])
246        
247        b, a = butter(1, cutoff / (fs / 2), btype='highpass')
248        
249        if isinstance(data, (pd.DataFrame, pd.Series)):
250            return pd.Series(filtfilt(b, a, data), index=data.index)
251        else:
252            return filtfilt(b, a, data)

Preprocessor for removing low-frequency drift using high-pass filter.

DriftRemovalPreprocessor(cutoff: float = 0.01, fs: int = 100)
212    def __init__(self, cutoff: float = 0.01, fs: int = 100):
213        super().__init__(
214            name="drift_removal",
215            description="Removes low-frequency drift using a high-pass filter"
216        )
217        self.config = {
218            'cutoff': cutoff,
219            'fs': fs
220        }

Initialize the preprocessor.

Args: name: Name of the preprocessor description: Description of the preprocessor

config
def fit( self, data: Union[pandas.core.frame.DataFrame, numpy.ndarray], **kwargs):
222    def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs):
223        """
224        Fit the preprocessor (no fitting needed for drift removal).
225        
226        Args:
227            data: Input data to fit on
228            **kwargs: Additional arguments
229        """
230        self.config.update({k: v for k, v in kwargs.items() if k in ['cutoff', 'fs']})
231        self.fitted = True

Fit the preprocessor (no fitting needed for drift removal).

Args: data: Input data to fit on **kwargs: Additional arguments

def transform( self, data: Union[pandas.core.frame.DataFrame, numpy.ndarray], **kwargs) -> Union[pandas.core.frame.DataFrame, numpy.ndarray]:
233    def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]:
234        """
235        Remove low-frequency drift using a high-pass filter.
236        
237        Args:
238            data: Input data to transform
239            **kwargs: Additional arguments
240            
241        Returns:
242            Drift-corrected data
243        """
244        cutoff = kwargs.get('cutoff', self.config['cutoff'])
245        fs = kwargs.get('fs', self.config['fs'])
246        
247        b, a = butter(1, cutoff / (fs / 2), btype='highpass')
248        
249        if isinstance(data, (pd.DataFrame, pd.Series)):
250            return pd.Series(filtfilt(b, a, data), index=data.index)
251        else:
252            return filtfilt(b, a, data)

Remove low-frequency drift using a high-pass filter.

Args: data: Input data to transform **kwargs: Additional arguments

Returns: Drift-corrected data

class HighFrequencyNoiseRemovalPreprocessor(gaitsetpy.BasePreprocessor):
255class HighFrequencyNoiseRemovalPreprocessor(BasePreprocessor):
256    """
257    Preprocessor for removing high-frequency noise using low-pass filter.
258    """
259    
260    def __init__(self, cutoff: float = 10, fs: int = 100):
261        super().__init__(
262            name="high_frequency_noise_removal",
263            description="Applies a low-pass filter to remove high-frequency noise"
264        )
265        self.config = {
266            'cutoff': cutoff,
267            'fs': fs
268        }
269    
270    def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs):
271        """
272        Fit the preprocessor (no fitting needed for filtering).
273        
274        Args:
275            data: Input data to fit on
276            **kwargs: Additional arguments
277        """
278        self.config.update({k: v for k, v in kwargs.items() if k in ['cutoff', 'fs']})
279        self.fitted = True
280    
281    def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]:
282        """
283        Apply a low-pass filter to remove high-frequency noise.
284        
285        Args:
286            data: Input data to transform
287            **kwargs: Additional arguments
288            
289        Returns:
290            Filtered data
291        """
292        cutoff = kwargs.get('cutoff', self.config['cutoff'])
293        fs = kwargs.get('fs', self.config['fs'])
294        
295        b, a = butter(1, cutoff / (fs / 2), btype='lowpass')
296        
297        if isinstance(data, (pd.DataFrame, pd.Series)):
298            return pd.Series(filtfilt(b, a, data), index=data.index)
299        else:
300            return filtfilt(b, a, data)

Preprocessor for removing high-frequency noise using low-pass filter.

HighFrequencyNoiseRemovalPreprocessor(cutoff: float = 10, fs: int = 100)
260    def __init__(self, cutoff: float = 10, fs: int = 100):
261        super().__init__(
262            name="high_frequency_noise_removal",
263            description="Applies a low-pass filter to remove high-frequency noise"
264        )
265        self.config = {
266            'cutoff': cutoff,
267            'fs': fs
268        }

Initialize the preprocessor.

Args: name: Name of the preprocessor description: Description of the preprocessor

config
def fit( self, data: Union[pandas.core.frame.DataFrame, numpy.ndarray], **kwargs):
270    def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs):
271        """
272        Fit the preprocessor (no fitting needed for filtering).
273        
274        Args:
275            data: Input data to fit on
276            **kwargs: Additional arguments
277        """
278        self.config.update({k: v for k, v in kwargs.items() if k in ['cutoff', 'fs']})
279        self.fitted = True

Fit the preprocessor (no fitting needed for filtering).

Args: data: Input data to fit on **kwargs: Additional arguments

def transform( self, data: Union[pandas.core.frame.DataFrame, numpy.ndarray], **kwargs) -> Union[pandas.core.frame.DataFrame, numpy.ndarray]:
281    def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]:
282        """
283        Apply a low-pass filter to remove high-frequency noise.
284        
285        Args:
286            data: Input data to transform
287            **kwargs: Additional arguments
288            
289        Returns:
290            Filtered data
291        """
292        cutoff = kwargs.get('cutoff', self.config['cutoff'])
293        fs = kwargs.get('fs', self.config['fs'])
294        
295        b, a = butter(1, cutoff / (fs / 2), btype='lowpass')
296        
297        if isinstance(data, (pd.DataFrame, pd.Series)):
298            return pd.Series(filtfilt(b, a, data), index=data.index)
299        else:
300            return filtfilt(b, a, data)

Apply a low-pass filter to remove high-frequency noise.

Args: data: Input data to transform **kwargs: Additional arguments

Returns: Filtered data

class LowFrequencyNoiseRemovalPreprocessor(gaitsetpy.BasePreprocessor):
303class LowFrequencyNoiseRemovalPreprocessor(BasePreprocessor):
304    """
305    Preprocessor for removing low-frequency noise using high-pass filter.
306    """
307    
308    def __init__(self, cutoff: float = 0.5, fs: int = 100):
309        super().__init__(
310            name="low_frequency_noise_removal",
311            description="Applies a high-pass filter to remove low-frequency noise"
312        )
313        self.config = {
314            'cutoff': cutoff,
315            'fs': fs
316        }
317    
318    def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs):
319        """
320        Fit the preprocessor (no fitting needed for filtering).
321        
322        Args:
323            data: Input data to fit on
324            **kwargs: Additional arguments
325        """
326        self.config.update({k: v for k, v in kwargs.items() if k in ['cutoff', 'fs']})
327        self.fitted = True
328    
329    def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]:
330        """
331        Apply a high-pass filter to remove low-frequency noise.
332        
333        Args:
334            data: Input data to transform
335            **kwargs: Additional arguments
336            
337        Returns:
338            Filtered data
339        """
340        cutoff = kwargs.get('cutoff', self.config['cutoff'])
341        fs = kwargs.get('fs', self.config['fs'])
342        
343        b, a = butter(1, cutoff / (fs / 2), btype='highpass')
344        
345        if isinstance(data, (pd.DataFrame, pd.Series)):
346            return pd.Series(filtfilt(b, a, data), index=data.index)
347        else:
348            return filtfilt(b, a, data)

Preprocessor for removing low-frequency noise using high-pass filter.

LowFrequencyNoiseRemovalPreprocessor(cutoff: float = 0.5, fs: int = 100)
308    def __init__(self, cutoff: float = 0.5, fs: int = 100):
309        super().__init__(
310            name="low_frequency_noise_removal",
311            description="Applies a high-pass filter to remove low-frequency noise"
312        )
313        self.config = {
314            'cutoff': cutoff,
315            'fs': fs
316        }

Initialize the preprocessor.

Args: name: Name of the preprocessor description: Description of the preprocessor

config
def fit( self, data: Union[pandas.core.frame.DataFrame, numpy.ndarray], **kwargs):
318    def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs):
319        """
320        Fit the preprocessor (no fitting needed for filtering).
321        
322        Args:
323            data: Input data to fit on
324            **kwargs: Additional arguments
325        """
326        self.config.update({k: v for k, v in kwargs.items() if k in ['cutoff', 'fs']})
327        self.fitted = True

Fit the preprocessor (no fitting needed for filtering).

Args: data: Input data to fit on **kwargs: Additional arguments

def transform( self, data: Union[pandas.core.frame.DataFrame, numpy.ndarray], **kwargs) -> Union[pandas.core.frame.DataFrame, numpy.ndarray]:
329    def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]:
330        """
331        Apply a high-pass filter to remove low-frequency noise.
332        
333        Args:
334            data: Input data to transform
335            **kwargs: Additional arguments
336            
337        Returns:
338            Filtered data
339        """
340        cutoff = kwargs.get('cutoff', self.config['cutoff'])
341        fs = kwargs.get('fs', self.config['fs'])
342        
343        b, a = butter(1, cutoff / (fs / 2), btype='highpass')
344        
345        if isinstance(data, (pd.DataFrame, pd.Series)):
346            return pd.Series(filtfilt(b, a, data), index=data.index)
347        else:
348            return filtfilt(b, a, data)

Apply a high-pass filter to remove low-frequency noise.

Args: data: Input data to transform **kwargs: Additional arguments

Returns: Filtered data

class ArtifactRemovalPreprocessor(gaitsetpy.BasePreprocessor):
351class ArtifactRemovalPreprocessor(BasePreprocessor):
352    """
353    Preprocessor for removing artifacts by interpolating missing values.
354    """
355    
356    def __init__(self, method: str = "linear"):
357        super().__init__(
358            name="artifact_removal",
359            description="Removes artifacts by interpolating missing values"
360        )
361        self.config = {
362            'method': method
363        }
364    
365    def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs):
366        """
367        Fit the preprocessor (no fitting needed for interpolation).
368        
369        Args:
370            data: Input data to fit on
371            **kwargs: Additional arguments
372        """
373        self.config.update({k: v for k, v in kwargs.items() if k in ['method']})
374        self.fitted = True
375    
376    def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]:
377        """
378        Remove artifacts by interpolating missing values.
379        
380        Args:
381            data: Input data to transform
382            **kwargs: Additional arguments
383            
384        Returns:
385            Artifact-free data
386        """
387        method = kwargs.get('method', self.config['method'])
388        
389        if isinstance(data, (pd.DataFrame, pd.Series)):
390            return data.interpolate(method=method).bfill().ffill()
391        else:
392            # For numpy arrays, use linear interpolation
393            from scipy.interpolate import interp1d
394            x = np.arange(len(data))
395            valid_mask = ~np.isnan(data)
396            if np.any(valid_mask):
397                f = interp1d(x[valid_mask], data[valid_mask], kind='linear', fill_value='extrapolate')
398                return f(x)
399            else:
400                return data

Preprocessor for removing artifacts by interpolating missing values.

ArtifactRemovalPreprocessor(method: str = 'linear')
356    def __init__(self, method: str = "linear"):
357        super().__init__(
358            name="artifact_removal",
359            description="Removes artifacts by interpolating missing values"
360        )
361        self.config = {
362            'method': method
363        }

Initialize the preprocessor.

Args: name: Name of the preprocessor description: Description of the preprocessor

config
def fit( self, data: Union[pandas.core.frame.DataFrame, numpy.ndarray], **kwargs):
365    def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs):
366        """
367        Fit the preprocessor (no fitting needed for interpolation).
368        
369        Args:
370            data: Input data to fit on
371            **kwargs: Additional arguments
372        """
373        self.config.update({k: v for k, v in kwargs.items() if k in ['method']})
374        self.fitted = True

Fit the preprocessor (no fitting needed for interpolation).

Args: data: Input data to fit on **kwargs: Additional arguments

def transform( self, data: Union[pandas.core.frame.DataFrame, numpy.ndarray], **kwargs) -> Union[pandas.core.frame.DataFrame, numpy.ndarray]:
376    def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]:
377        """
378        Remove artifacts by interpolating missing values.
379        
380        Args:
381            data: Input data to transform
382            **kwargs: Additional arguments
383            
384        Returns:
385            Artifact-free data
386        """
387        method = kwargs.get('method', self.config['method'])
388        
389        if isinstance(data, (pd.DataFrame, pd.Series)):
390            return data.interpolate(method=method).bfill().ffill()
391        else:
392            # For numpy arrays, use linear interpolation
393            from scipy.interpolate import interp1d
394            x = np.arange(len(data))
395            valid_mask = ~np.isnan(data)
396            if np.any(valid_mask):
397                f = interp1d(x[valid_mask], data[valid_mask], kind='linear', fill_value='extrapolate')
398                return f(x)
399            else:
400                return data

Remove artifacts by interpolating missing values.

Args: data: Input data to transform **kwargs: Additional arguments

Returns: Artifact-free data

class TrendRemovalPreprocessor(gaitsetpy.BasePreprocessor):
403class TrendRemovalPreprocessor(BasePreprocessor):
404    """
405    Preprocessor for removing trends using polynomial fitting.
406    """
407    
408    def __init__(self, order: int = 2):
409        super().__init__(
410            name="trend_removal",
411            description="Removes trends using polynomial fitting"
412        )
413        self.config = {
414            'order': order
415        }
416    
417    def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs):
418        """
419        Fit the preprocessor (no fitting needed for detrending).
420        
421        Args:
422            data: Input data to fit on
423            **kwargs: Additional arguments
424        """
425        self.config.update({k: v for k, v in kwargs.items() if k in ['order']})
426        self.fitted = True
427    
428    def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]:
429        """
430        Remove trends using polynomial fitting.
431        
432        Args:
433            data: Input data to transform
434            **kwargs: Additional arguments
435            
436        Returns:
437            Detrended data
438        """
439        order = kwargs.get('order', self.config['order'])
440        
441        if isinstance(data, (pd.DataFrame, pd.Series)):
442            x = np.arange(len(data))
443            poly_coeffs = np.polyfit(x, data, order)
444            trend = np.polyval(poly_coeffs, x)
445            return data - trend
446        else:
447            x = np.arange(len(data))
448            poly_coeffs = np.polyfit(x, data, order)
449            trend = np.polyval(poly_coeffs, x)
450            return data - trend

Preprocessor for removing trends using polynomial fitting.

TrendRemovalPreprocessor(order: int = 2)
408    def __init__(self, order: int = 2):
409        super().__init__(
410            name="trend_removal",
411            description="Removes trends using polynomial fitting"
412        )
413        self.config = {
414            'order': order
415        }

Initialize the preprocessor.

Args: name: Name of the preprocessor description: Description of the preprocessor

config
def fit( self, data: Union[pandas.core.frame.DataFrame, numpy.ndarray], **kwargs):
417    def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs):
418        """
419        Fit the preprocessor (no fitting needed for detrending).
420        
421        Args:
422            data: Input data to fit on
423            **kwargs: Additional arguments
424        """
425        self.config.update({k: v for k, v in kwargs.items() if k in ['order']})
426        self.fitted = True

Fit the preprocessor (no fitting needed for detrending).

Args: data: Input data to fit on **kwargs: Additional arguments

def transform( self, data: Union[pandas.core.frame.DataFrame, numpy.ndarray], **kwargs) -> Union[pandas.core.frame.DataFrame, numpy.ndarray]:
428    def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]:
429        """
430        Remove trends using polynomial fitting.
431        
432        Args:
433            data: Input data to transform
434            **kwargs: Additional arguments
435            
436        Returns:
437            Detrended data
438        """
439        order = kwargs.get('order', self.config['order'])
440        
441        if isinstance(data, (pd.DataFrame, pd.Series)):
442            x = np.arange(len(data))
443            poly_coeffs = np.polyfit(x, data, order)
444            trend = np.polyval(poly_coeffs, x)
445            return data - trend
446        else:
447            x = np.arange(len(data))
448            poly_coeffs = np.polyfit(x, data, order)
449            trend = np.polyval(poly_coeffs, x)
450            return data - trend

Remove trends using polynomial fitting.

Args: data: Input data to transform **kwargs: Additional arguments

Returns: Detrended data

class DCOffsetRemovalPreprocessor(gaitsetpy.BasePreprocessor):
453class DCOffsetRemovalPreprocessor(BasePreprocessor):
454    """
455    Preprocessor for removing DC offset by subtracting the mean.
456    """
457    
458    def __init__(self):
459        super().__init__(
460            name="dc_offset_removal",
461            description="Removes DC offset by subtracting the mean"
462        )
463        self.mean_ = None
464    
465    def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs):
466        """
467        Fit the preprocessor by computing the mean.
468        
469        Args:
470            data: Input data to fit on
471            **kwargs: Additional arguments
472        """
473        if isinstance(data, (pd.DataFrame, pd.Series)):
474            self.mean_ = data.mean()
475        else:
476            self.mean_ = np.mean(data)
477        
478        self.fitted = True
479    
480    def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]:
481        """
482        Remove DC offset by subtracting the mean.
483        
484        Args:
485            data: Input data to transform
486            **kwargs: Additional arguments
487            
488        Returns:
489            DC-corrected data
490        """
491        return data - self.mean_ 

Preprocessor for removing DC offset by subtracting the mean.

DCOffsetRemovalPreprocessor()
458    def __init__(self):
459        super().__init__(
460            name="dc_offset_removal",
461            description="Removes DC offset by subtracting the mean"
462        )
463        self.mean_ = None

Initialize the preprocessor.

Args: name: Name of the preprocessor description: Description of the preprocessor

mean_
def fit( self, data: Union[pandas.core.frame.DataFrame, numpy.ndarray], **kwargs):
465    def fit(self, data: Union[pd.DataFrame, np.ndarray], **kwargs):
466        """
467        Fit the preprocessor by computing the mean.
468        
469        Args:
470            data: Input data to fit on
471            **kwargs: Additional arguments
472        """
473        if isinstance(data, (pd.DataFrame, pd.Series)):
474            self.mean_ = data.mean()
475        else:
476            self.mean_ = np.mean(data)
477        
478        self.fitted = True

Fit the preprocessor by computing the mean.

Args: data: Input data to fit on **kwargs: Additional arguments

def transform( self, data: Union[pandas.core.frame.DataFrame, numpy.ndarray], **kwargs) -> Union[pandas.core.frame.DataFrame, numpy.ndarray]:
480    def transform(self, data: Union[pd.DataFrame, np.ndarray], **kwargs) -> Union[pd.DataFrame, np.ndarray]:
481        """
482        Remove DC offset by subtracting the mean.
483        
484        Args:
485            data: Input data to transform
486            **kwargs: Additional arguments
487            
488        Returns:
489            DC-corrected data
490        """
491        return data - self.mean_ 

Remove DC offset by subtracting the mean.

Args: data: Input data to transform **kwargs: Additional arguments

Returns: DC-corrected data

class DaphnetVisualizationAnalyzer(gaitsetpy.BaseEDAAnalyzer):
 18class DaphnetVisualizationAnalyzer(BaseEDAAnalyzer):
 19    """
 20    EDA analyzer for Daphnet dataset visualization.
 21    
 22    This analyzer provides comprehensive visualization capabilities for Daphnet dataset
 23    including thigh, shank, and trunk sensor data.
 24    """
 25    
 26    def __init__(self):
 27        super().__init__(
 28            name="daphnet_visualization",
 29            description="Comprehensive visualization analyzer for Daphnet dataset sensor data"
 30        )
 31        self.config = {
 32            'figsize': (20, 16),
 33            'colors': {
 34                'no_freeze': 'orange',
 35                'freeze': 'purple'
 36            },
 37            'alpha': 0.6
 38        }
 39    
 40    def analyze(self, data: Union[pd.DataFrame, List[pd.DataFrame]], **kwargs) -> Dict[str, Any]:
 41        """
 42        Analyze the data and return statistical summaries.
 43        
 44        Args:
 45            data: Input data to analyze
 46            **kwargs: Additional arguments
 47            
 48        Returns:
 49            Dictionary containing analysis results
 50        """
 51        if isinstance(data, list):
 52            # Multiple datasets
 53            results = {}
 54            for i, df in enumerate(data):
 55                results[f'dataset_{i}'] = self._analyze_single_dataset(df)
 56            return results
 57        else:
 58            # Single dataset
 59            return self._analyze_single_dataset(data)
 60    
 61    def _analyze_single_dataset(self, df: pd.DataFrame) -> Dict[str, Any]:
 62        """Analyze a single dataset."""
 63        # Basic statistics
 64        stats = {
 65            'shape': df.shape,
 66            'columns': df.columns.tolist(),
 67            'annotation_distribution': df['annotations'].value_counts().to_dict() if 'annotations' in df.columns else {},
 68            'missing_values': df.isnull().sum().to_dict(),
 69            'data_range': {
 70                'min': df.select_dtypes(include=[np.number]).min().to_dict(),
 71                'max': df.select_dtypes(include=[np.number]).max().to_dict()
 72            }
 73        }
 74        
 75        # Sensor-specific statistics
 76        sensor_stats = {}
 77        for sensor in ['thigh', 'shank', 'trunk']:
 78            if sensor in df.columns:
 79                sensor_stats[sensor] = {
 80                    'mean': df[sensor].mean(),
 81                    'std': df[sensor].std(),
 82                    'min': df[sensor].min(),
 83                    'max': df[sensor].max()
 84                }
 85        
 86        stats['sensor_statistics'] = sensor_stats
 87        return stats
 88    
 89    def visualize(self, data: Union[pd.DataFrame, List[pd.DataFrame]], **kwargs):
 90        """
 91        Create visualizations of the data.
 92        
 93        Args:
 94            data: Input data to visualize
 95            **kwargs: Additional arguments including sensor_type, dataset_index, names
 96        """
 97        sensor_type = kwargs.get('sensor_type', 'all')
 98        dataset_index = kwargs.get('dataset_index', 0)
 99        names = kwargs.get('names', [])
100        
101        if isinstance(data, list):
102            if dataset_index < len(data):
103                df = data[dataset_index]
104                dataset_name = names[dataset_index] if dataset_index < len(names) else f"Dataset {dataset_index}"
105            else:
106                print(f"Dataset index {dataset_index} out of range")
107                return
108        else:
109            df = data
110            dataset_name = names[0] if names else "Dataset"
111        
112        if sensor_type == 'all':
113            self._plot_all_sensors(df, dataset_name)
114        elif sensor_type == 'thigh':
115            self._plot_thigh_data(df, dataset_name)
116        elif sensor_type == 'shank':
117            self._plot_shank_data(df, dataset_name)
118        elif sensor_type == 'trunk':
119            self._plot_trunk_data(df, dataset_name)
120        else:
121            print(f"Unknown sensor type: {sensor_type}")
122    
123    def _plot_thigh_data(self, df: pd.DataFrame, dataset_name: str):
124        """Plot thigh sensor data."""
125        print(f"Plotting thigh data for {dataset_name}")
126        
127        # Filter data
128        df_filtered = df[df.annotations > 0] if 'annotations' in df.columns else df
129        
130        if df_filtered.empty:
131            print("No valid data to plot")
132            return
133        
134        # Create figure
135        fig, axes = plt.subplots(4, 1, sharex=True, figsize=self.config['figsize'])
136        fig.suptitle(f"Thigh Data from {dataset_name}")
137        
138        # Separate freeze and no-freeze data
139        if 'annotations' in df.columns:
140            neg = df_filtered[df_filtered.annotations == 1]  # No freeze
141            pos = df_filtered[df_filtered.annotations == 2]  # Freeze
142        else:
143            neg = df_filtered
144            pos = pd.DataFrame()
145        
146        # Plot each component
147        components = ['thigh_h_fd', 'thigh_v', 'thigh_h_l', 'thigh']
148        labels = ['Horizontal Forward', 'Vertical', 'Horizontal Lateral', 'Overall']
149        
150        for i, (component, label) in enumerate(zip(components, labels)):
151            if component in df_filtered.columns:
152                # Plot main signal
153                axes[i].plot(df_filtered.index, df_filtered[component])
154                axes[i].set_ylabel(f"{label} Thigh Acceleration")
155                
156                # Plot annotations if available
157                if not neg.empty:
158                    axes[i].scatter(neg.index, neg[component], 
159                                  c=self.config['colors']['no_freeze'], 
160                                  label="no freeze", alpha=self.config['alpha'])
161                if not pos.empty:
162                    axes[i].scatter(pos.index, pos[component], 
163                                  c=self.config['colors']['freeze'], 
164                                  label="freeze", alpha=self.config['alpha'])
165                
166                axes[i].legend()
167        
168        plt.xlabel("Time")
169        plt.tight_layout()
170        plt.show()
171    
172    def _plot_shank_data(self, df: pd.DataFrame, dataset_name: str):
173        """Plot shank sensor data."""
174        print(f"Plotting shank data for {dataset_name}")
175        
176        # Filter data
177        df_filtered = df[df.annotations > 0] if 'annotations' in df.columns else df
178        
179        if df_filtered.empty:
180            print("No valid data to plot")
181            return
182        
183        # Create figure
184        fig, axes = plt.subplots(4, 1, sharex=True, figsize=self.config['figsize'])
185        fig.suptitle(f"Shank Data from {dataset_name}")
186        
187        # Separate freeze and no-freeze data
188        if 'annotations' in df.columns:
189            neg = df_filtered[df_filtered.annotations == 1]  # No freeze
190            pos = df_filtered[df_filtered.annotations == 2]  # Freeze
191        else:
192            neg = df_filtered
193            pos = pd.DataFrame()
194        
195        # Plot each component
196        components = ['shank_h_fd', 'shank_v', 'shank_h_l', 'shank']
197        labels = ['Horizontal Forward', 'Vertical', 'Horizontal Lateral', 'Overall']
198        
199        for i, (component, label) in enumerate(zip(components, labels)):
200            if component in df_filtered.columns:
201                # Plot main signal
202                axes[i].plot(df_filtered.index, df_filtered[component])
203                axes[i].set_ylabel(f"{label} Shank Acceleration")
204                
205                # Plot annotations if available
206                if not neg.empty:
207                    axes[i].scatter(neg.index, neg[component], 
208                                  c=self.config['colors']['no_freeze'], 
209                                  label="no freeze", alpha=self.config['alpha'])
210                if not pos.empty:
211                    axes[i].scatter(pos.index, pos[component], 
212                                  c=self.config['colors']['freeze'], 
213                                  label="freeze", alpha=self.config['alpha'])
214                
215                axes[i].legend()
216        
217        plt.xlabel("Time")
218        plt.tight_layout()
219        plt.show()
220    
221    def _plot_trunk_data(self, df: pd.DataFrame, dataset_name: str):
222        """Plot trunk sensor data."""
223        print(f"Plotting trunk data for {dataset_name}")
224        
225        # Filter data
226        df_filtered = df[df.annotations > 0] if 'annotations' in df.columns else df
227        
228        if df_filtered.empty:
229            print("No valid data to plot")
230            return
231        
232        # Create figure
233        fig, axes = plt.subplots(4, 1, sharex=True, figsize=self.config['figsize'])
234        fig.suptitle(f"Trunk Data from {dataset_name}")
235        
236        # Separate freeze and no-freeze data
237        if 'annotations' in df.columns:
238            neg = df_filtered[df_filtered.annotations == 1]  # No freeze
239            pos = df_filtered[df_filtered.annotations == 2]  # Freeze
240        else:
241            neg = df_filtered
242            pos = pd.DataFrame()
243        
244        # Plot each component
245        components = ['trunk_h_fd', 'trunk_v', 'trunk_h_l', 'trunk']
246        labels = ['Horizontal Forward', 'Vertical', 'Horizontal Lateral', 'Overall']
247        
248        for i, (component, label) in enumerate(zip(components, labels)):
249            if component in df_filtered.columns:
250                # Plot main signal
251                axes[i].plot(df_filtered.index, df_filtered[component])
252                axes[i].set_ylabel(f"{label} Trunk Acceleration")
253                
254                # Plot annotations if available
255                if not neg.empty:
256                    axes[i].scatter(neg.index, neg[component], 
257                                  c=self.config['colors']['no_freeze'], 
258                                  label="no freeze", alpha=self.config['alpha'])
259                if not pos.empty:
260                    axes[i].scatter(pos.index, pos[component], 
261                                  c=self.config['colors']['freeze'], 
262                                  label="freeze", alpha=self.config['alpha'])
263                
264                axes[i].legend()
265        
266        plt.xlabel("Time")
267        plt.tight_layout()
268        plt.show()
269    
270    def _plot_all_sensors(self, df: pd.DataFrame, dataset_name: str):
271        """Plot all sensor data in a combined view."""
272        print(f"Plotting all sensor data for {dataset_name}")
273        
274        # Create figure with subplots for each sensor
275        fig, axes = plt.subplots(3, 1, sharex=True, figsize=self.config['figsize'])
276        fig.suptitle(f"All Sensor Data from {dataset_name}")
277        
278        # Filter data
279        df_filtered = df[df.annotations > 0] if 'annotations' in df.columns else df
280        
281        if df_filtered.empty:
282            print("No valid data to plot")
283            return
284        
285        sensors = ['thigh', 'shank', 'trunk']
286        for i, sensor in enumerate(sensors):
287            if sensor in df_filtered.columns:
288                axes[i].plot(df_filtered.index, df_filtered[sensor])
289                axes[i].set_ylabel(f"{sensor.capitalize()} Acceleration")
290                
291                # Add annotations if available
292                if 'annotations' in df_filtered.columns:
293                    neg = df_filtered[df_filtered.annotations == 1]
294                    pos = df_filtered[df_filtered.annotations == 2]
295                    
296                    if not neg.empty:
297                        axes[i].scatter(neg.index, neg[sensor], 
298                                      c=self.config['colors']['no_freeze'], 
299                                      label="no freeze", alpha=self.config['alpha'])
300                    if not pos.empty:
301                        axes[i].scatter(pos.index, pos[sensor], 
302                                      c=self.config['colors']['freeze'], 
303                                      label="freeze", alpha=self.config['alpha'])
304                    
305                    axes[i].legend()
306        
307        plt.xlabel("Time")
308        plt.tight_layout()
309        plt.show()

EDA analyzer for Daphnet dataset visualization.

This analyzer provides comprehensive visualization capabilities for Daphnet dataset including thigh, shank, and trunk sensor data.

DaphnetVisualizationAnalyzer()
26    def __init__(self):
27        super().__init__(
28            name="daphnet_visualization",
29            description="Comprehensive visualization analyzer for Daphnet dataset sensor data"
30        )
31        self.config = {
32            'figsize': (20, 16),
33            'colors': {
34                'no_freeze': 'orange',
35                'freeze': 'purple'
36            },
37            'alpha': 0.6
38        }

Initialize the EDA analyzer.

Args: name: Name of the EDA analyzer description: Description of the EDA analyzer

config
def analyze( self, data: Union[pandas.core.frame.DataFrame, List[pandas.core.frame.DataFrame]], **kwargs) -> Dict[str, Any]:
40    def analyze(self, data: Union[pd.DataFrame, List[pd.DataFrame]], **kwargs) -> Dict[str, Any]:
41        """
42        Analyze the data and return statistical summaries.
43        
44        Args:
45            data: Input data to analyze
46            **kwargs: Additional arguments
47            
48        Returns:
49            Dictionary containing analysis results
50        """
51        if isinstance(data, list):
52            # Multiple datasets
53            results = {}
54            for i, df in enumerate(data):
55                results[f'dataset_{i}'] = self._analyze_single_dataset(df)
56            return results
57        else:
58            # Single dataset
59            return self._analyze_single_dataset(data)

Analyze the data and return statistical summaries.

Args: data: Input data to analyze **kwargs: Additional arguments

Returns: Dictionary containing analysis results

def visualize( self, data: Union[pandas.core.frame.DataFrame, List[pandas.core.frame.DataFrame]], **kwargs):
 89    def visualize(self, data: Union[pd.DataFrame, List[pd.DataFrame]], **kwargs):
 90        """
 91        Create visualizations of the data.
 92        
 93        Args:
 94            data: Input data to visualize
 95            **kwargs: Additional arguments including sensor_type, dataset_index, names
 96        """
 97        sensor_type = kwargs.get('sensor_type', 'all')
 98        dataset_index = kwargs.get('dataset_index', 0)
 99        names = kwargs.get('names', [])
100        
101        if isinstance(data, list):
102            if dataset_index < len(data):
103                df = data[dataset_index]
104                dataset_name = names[dataset_index] if dataset_index < len(names) else f"Dataset {dataset_index}"
105            else:
106                print(f"Dataset index {dataset_index} out of range")
107                return
108        else:
109            df = data
110            dataset_name = names[0] if names else "Dataset"
111        
112        if sensor_type == 'all':
113            self._plot_all_sensors(df, dataset_name)
114        elif sensor_type == 'thigh':
115            self._plot_thigh_data(df, dataset_name)
116        elif sensor_type == 'shank':
117            self._plot_shank_data(df, dataset_name)
118        elif sensor_type == 'trunk':
119            self._plot_trunk_data(df, dataset_name)
120        else:
121            print(f"Unknown sensor type: {sensor_type}")

Create visualizations of the data.

Args: data: Input data to visualize **kwargs: Additional arguments including sensor_type, dataset_index, names

class SensorStatisticsAnalyzer(gaitsetpy.BaseEDAAnalyzer):
312class SensorStatisticsAnalyzer(BaseEDAAnalyzer):
313    """
314    EDA analyzer for sensor data statistics and feature visualization.
315    
316    This analyzer provides statistical analysis and feature visualization capabilities
317    for sensor data including sliding windows and extracted features.
318    """
319    
320    def __init__(self):
321        super().__init__(
322            name="sensor_statistics",
323            description="Statistical analysis and feature visualization for sensor data"
324        )
325        self.config = {
326            'figsize': (20, 10),
327            'feature_markers': {
328                'mean': 'x',
329                'rms': 'o',
330                'peak_height': 'v',
331                'mode': '<',
332                'median': '^'
333            }
334        }
335    
336    def analyze(self, data: Union[pd.DataFrame, List[pd.DataFrame]], **kwargs) -> Dict[str, Any]:
337        """
338        Analyze sensor data and return statistical summaries.
339        
340        Args:
341            data: Input data to analyze
342            **kwargs: Additional arguments
343            
344        Returns:
345            Dictionary containing analysis results
346        """
347        if isinstance(data, list):
348            # Multiple datasets
349            results = {}
350            for i, df in enumerate(data):
351                results[f'dataset_{i}'] = self._compute_statistics(df)
352            return results
353        else:
354            # Single dataset
355            return self._compute_statistics(data)
356    
357    def _compute_statistics(self, df: pd.DataFrame) -> Dict[str, Any]:
358        """Compute comprehensive statistics for a dataset."""
359        stats = {
360            'basic_stats': df.describe().to_dict(),
361            'correlation_matrix': df.corr().to_dict() if len(df.select_dtypes(include=[np.number]).columns) > 1 else {},
362            'skewness': df.skew().to_dict(),
363            'kurtosis': df.kurtosis().to_dict()
364        }
365        
366        # Add sensor-specific statistics
367        sensor_stats = {}
368        for sensor in ['thigh', 'shank', 'trunk']:
369            if sensor in df.columns:
370                sensor_data = df[sensor].dropna()
371                sensor_stats[sensor] = {
372                    'mean': sensor_data.mean(),
373                    'std': sensor_data.std(),
374                    'variance': sensor_data.var(),
375                    'min': sensor_data.min(),
376                    'max': sensor_data.max(),
377                    'range': sensor_data.max() - sensor_data.min(),
378                    'median': sensor_data.median(),
379                    'q25': sensor_data.quantile(0.25),
380                    'q75': sensor_data.quantile(0.75),
381                    'iqr': sensor_data.quantile(0.75) - sensor_data.quantile(0.25)
382                }
383        
384        stats['sensor_statistics'] = sensor_stats
385        return stats
386    
387    def visualize(self, sliding_windows: List[Dict], features: List[Dict], **kwargs):
388        """
389        Create visualizations of sensor data with overlaid features.
390        
391        Args:
392            sliding_windows: List of sliding window dictionaries
393            features: List of feature dictionaries
394            **kwargs: Additional arguments including sensor_name, start_idx, end_idx, num_windows
395        """
396        sensor_name = kwargs.get('sensor_name', 'shank')
397        start_idx = kwargs.get('start_idx', 0)
398        end_idx = kwargs.get('end_idx', 1000)
399        num_windows = kwargs.get('num_windows', 10)
400        save = kwargs.get('save', False)
401        
402        self._plot_sensor_with_features(sliding_windows, features, start_idx, end_idx, 
403                                      sensor_name, num_windows, save)
404    
405    def _plot_sensor_with_features(self, sliding_windows: List[Dict], features: List[Dict], 
406                                 start_idx: int, end_idx: int, sensor_name: str = "shank", 
407                                 num_windows: int = 10, save: bool = False):
408        """
409        Plot sliding windows of sensor data with overlaid statistical features.
410        
411        Args:
412            sliding_windows: List of sliding window dictionaries
413            features: List of feature dictionaries
414            start_idx: Start index of the time window
415            end_idx: End index of the time window
416            sensor_name: Name of the sensor to plot
417            num_windows: Number of sliding windows to plot
418            save: Whether to save the plot
419        """
420        fig, axes = plt.subplots(2, 1, figsize=self.config['figsize'], 
421                                gridspec_kw={'height_ratios': [3, 1]})
422        
423        # Extract sensor windows
424        sensor_windows = next((sw['data'] for sw in sliding_windows if sw['name'] == sensor_name), None)
425        if sensor_windows is None:
426            print(f"Sensor '{sensor_name}' not found in sliding_windows.")
427            return
428        
429        # Extract corresponding features
430        sensor_features = next((feat['features'] for feat in features if feat['name'] == sensor_name), None)
431        if sensor_features is None:
432            print(f"Sensor '{sensor_name}' not found in features.")
433            return
434        
435        # Filter windows based on start_idx and end_idx
436        filtered_windows = [series for series in sensor_windows 
437                           if start_idx <= series.index[0] and series.index[-1] <= end_idx]
438        
439        if not filtered_windows:
440            print(f"No windows found in the specified index range ({start_idx} - {end_idx}).")
441            return
442        
443        # Store entropy & frequency features for separate plotting
444        entropy_values = []
445        dominant_frequencies = []
446        
447        # Plot first num_windows windows
448        for i in range(min(num_windows, len(filtered_windows))):
449            series = filtered_windows[i]
450            
451            # Extract time and signal values
452            time_values = series.index.to_numpy()
453            signal_values = series.values
454            
455            # Determine actual start and end indices for this window
456            window_start, window_end = time_values[0], time_values[-1]
457            
458            # Plot time series data
459            axes[0].plot(time_values, signal_values, alpha=0.6)
460            
461            # Mark start and end of each window with vertical dotted lines
462            axes[0].axvline(x=window_start, color='black', linestyle='dotted', alpha=0.7)
463            axes[0].axvline(x=window_end, color='black', linestyle='dotted', alpha=0.7)
464            
465            # Overlay statistical features
466            for feature_name, marker in self.config['feature_markers'].items():
467                if feature_name in sensor_features and len(sensor_features[feature_name]) > i:
468                    feature_value = sensor_features[feature_name][i]
469                    if feature_value != 0:  # Skip zero values
470                        closest_index = np.argmin(np.abs(signal_values - feature_value))
471                        closest_time = time_values[closest_index]
472                        axes[0].scatter(closest_time, feature_value, color='red', 
473                                      marker=marker, s=100, label=feature_name if i == 0 else "")
474            
475            # Store entropy & frequency features for separate plotting
476            if 'entropy' in sensor_features and len(sensor_features['entropy']) > i:
477                entropy_values.append(sensor_features['entropy'][i])
478            if 'dominant_frequency' in sensor_features and len(sensor_features['dominant_frequency']) > i:
479                dominant_frequencies.append(sensor_features['dominant_frequency'][i])
480        
481        # Labels and title for time-series plot
482        axes[0].set_xlabel('Time')
483        axes[0].set_ylabel(f'{sensor_name} Signal')
484        axes[0].set_title(f'First {num_windows} windows of {sensor_name} in range {start_idx}-{end_idx} with Features')
485        axes[0].legend()
486        
487        # Frequency-domain & entropy plot
488        if dominant_frequencies:
489            window_indices = list(range(len(dominant_frequencies)))
490            axes[1].plot(window_indices, dominant_frequencies, 
491                        label="Dominant Frequency", marker="o", linestyle="dashed", color="blue")
492        
493        if entropy_values:
494            axes[1].bar(window_indices, entropy_values, alpha=0.6, label="Entropy", color="green")
495        
496        axes[1].set_xlabel("Window Index")
497        axes[1].set_ylabel("Feature Value")
498        axes[1].set_title("Frequency & Entropy Features")
499        axes[1].legend()
500        
501        plt.tight_layout()
502        
503        # Save or show plot
504        if save:
505            file_path = input("Enter the file path to save the plot (e.g., 'plot.png'): ")
506            plt.savefig(file_path, dpi=300)
507            print(f"Plot saved at {file_path}")
508        else:
509            plt.show() 

EDA analyzer for sensor data statistics and feature visualization.

This analyzer provides statistical analysis and feature visualization capabilities for sensor data including sliding windows and extracted features.

SensorStatisticsAnalyzer()
320    def __init__(self):
321        super().__init__(
322            name="sensor_statistics",
323            description="Statistical analysis and feature visualization for sensor data"
324        )
325        self.config = {
326            'figsize': (20, 10),
327            'feature_markers': {
328                'mean': 'x',
329                'rms': 'o',
330                'peak_height': 'v',
331                'mode': '<',
332                'median': '^'
333            }
334        }

Initialize the EDA analyzer.

Args: name: Name of the EDA analyzer description: Description of the EDA analyzer

config
def analyze( self, data: Union[pandas.core.frame.DataFrame, List[pandas.core.frame.DataFrame]], **kwargs) -> Dict[str, Any]:
336    def analyze(self, data: Union[pd.DataFrame, List[pd.DataFrame]], **kwargs) -> Dict[str, Any]:
337        """
338        Analyze sensor data and return statistical summaries.
339        
340        Args:
341            data: Input data to analyze
342            **kwargs: Additional arguments
343            
344        Returns:
345            Dictionary containing analysis results
346        """
347        if isinstance(data, list):
348            # Multiple datasets
349            results = {}
350            for i, df in enumerate(data):
351                results[f'dataset_{i}'] = self._compute_statistics(df)
352            return results
353        else:
354            # Single dataset
355            return self._compute_statistics(data)

Analyze sensor data and return statistical summaries.

Args: data: Input data to analyze **kwargs: Additional arguments

Returns: Dictionary containing analysis results

def visualize(self, sliding_windows: List[Dict], features: List[Dict], **kwargs):
387    def visualize(self, sliding_windows: List[Dict], features: List[Dict], **kwargs):
388        """
389        Create visualizations of sensor data with overlaid features.
390        
391        Args:
392            sliding_windows: List of sliding window dictionaries
393            features: List of feature dictionaries
394            **kwargs: Additional arguments including sensor_name, start_idx, end_idx, num_windows
395        """
396        sensor_name = kwargs.get('sensor_name', 'shank')
397        start_idx = kwargs.get('start_idx', 0)
398        end_idx = kwargs.get('end_idx', 1000)
399        num_windows = kwargs.get('num_windows', 10)
400        save = kwargs.get('save', False)
401        
402        self._plot_sensor_with_features(sliding_windows, features, start_idx, end_idx, 
403                                      sensor_name, num_windows, save)

Create visualizations of sensor data with overlaid features.

Args: sliding_windows: List of sliding window dictionaries features: List of feature dictionaries **kwargs: Additional arguments including sensor_name, start_idx, end_idx, num_windows

class RandomForestModel(gaitsetpy.BaseClassificationModel):
 21class RandomForestModel(BaseClassificationModel):
 22    """
 23    Random Forest classification model.
 24    
 25    This class provides Random Forest classification functionality with
 26    comprehensive training, prediction, and evaluation capabilities.
 27    """
 28    
 29    def __init__(self, n_estimators: int = 100, random_state: int = 42, max_depth: Optional[int] = None):
 30        super().__init__(
 31            name="random_forest",
 32            description="Random Forest classifier for gait data classification"
 33        )
 34        self.config = {
 35            'n_estimators': n_estimators,
 36            'random_state': random_state,
 37            'max_depth': max_depth
 38        }
 39        self.model = RandomForestClassifier(
 40            n_estimators=n_estimators,
 41            random_state=random_state,
 42            max_depth=max_depth
 43        )
 44        self.feature_names = []
 45        self.class_names = []
 46        
 47    def train(self, features: List[Dict], **kwargs):
 48        """
 49        Train the Random Forest model on the given features.
 50        
 51        Args:
 52            features: List of feature dictionaries
 53            **kwargs: Additional arguments including test_size, validation_split
 54        """
 55        # Preprocess features
 56        X, y = preprocess_features(features)
 57        
 58        # Store feature and class information
 59        self.feature_names = [f"feature_{i}" for i in range(X.shape[1])]
 60        self.class_names = list(set(y))
 61        
 62        # Split data if test_size is specified
 63        test_size = kwargs.get('test_size', 0.2)
 64        validation_split = kwargs.get('validation_split', True)
 65        
 66        if validation_split:
 67            X_train, X_test, y_train, y_test = train_test_split(
 68                X, y, test_size=test_size, random_state=self.config['random_state']
 69            )
 70            
 71            # Train model
 72            self.model.fit(X_train, y_train)
 73            
 74            # Store validation data for later evaluation
 75            self.X_test = X_test
 76            self.y_test = y_test
 77            
 78            # Print training accuracy
 79            train_accuracy = self.model.score(X_train, y_train)
 80            test_accuracy = self.model.score(X_test, y_test)
 81            
 82            print(f"Training accuracy: {train_accuracy:.4f}")
 83            print(f"Validation accuracy: {test_accuracy:.4f}")
 84        else:
 85            # Train on all data
 86            self.model.fit(X, y)
 87            train_accuracy = self.model.score(X, y)
 88            print(f"Training accuracy: {train_accuracy:.4f}")
 89        
 90        self.trained = True
 91        print("Random Forest model trained successfully.")
 92    
 93    def predict(self, features: List[Dict], **kwargs) -> Union[np.ndarray, Any]:
 94        """
 95        Make predictions using the trained Random Forest model.
 96        
 97        Args:
 98            features: List of feature dictionaries
 99            **kwargs: Additional arguments including return_probabilities
100            
101        Returns:
102            Array of predictions or probabilities
103        """
104        if not self.trained:
105            raise ValueError("Model must be trained before making predictions")
106        
107        # Preprocess features
108        X, _ = preprocess_features(features)
109        
110        # Make predictions
111        return_probabilities = kwargs.get('return_probabilities', False)
112        
113        if return_probabilities:
114            return self.model.predict_proba(X)
115        else:
116            return self.model.predict(X)
117    
118    def evaluate(self, features: List[Dict], **kwargs) -> Dict[str, float]:
119        """
120        Evaluate the Random Forest model performance.
121        
122        Args:
123            features: List of feature dictionaries
124            **kwargs: Additional arguments including detailed_report
125            
126        Returns:
127            Dictionary containing evaluation metrics
128        """
129        if not self.trained:
130            raise ValueError("Model must be trained before evaluation")
131        
132        # Use validation data if available, otherwise use provided features
133        if hasattr(self, 'X_test') and hasattr(self, 'y_test'):
134            X_test, y_test = self.X_test, self.y_test
135        else:
136            X_test, y_test = preprocess_features(features)
137        
138        # Make predictions
139        y_pred = self.model.predict(X_test)
140        
141        # Calculate metrics
142        accuracy = accuracy_score(y_test, y_pred)
143        conf_matrix = confusion_matrix(y_test, y_pred)
144        
145        # Basic metrics
146        metrics = {
147            'accuracy': accuracy,
148            'confusion_matrix': conf_matrix.tolist()
149        }
150        
151        # Detailed report if requested
152        detailed_report = kwargs.get('detailed_report', False)
153        if detailed_report:
154            class_report = classification_report(y_test, y_pred, output_dict=True)
155            metrics['classification_report'] = class_report
156            
157            # Feature importance
158            if hasattr(self.model, 'feature_importances_'):
159                feature_importance = dict(zip(self.feature_names, self.model.feature_importances_))
160                metrics['feature_importance'] = feature_importance
161        
162        return metrics
163    
164    def save_model(self, filepath: str):
165        """
166        Save the trained Random Forest model to a file.
167        
168        Args:
169            filepath: Path to save the model
170        """
171        if not self.trained:
172            raise ValueError("Model must be trained before saving")
173        
174        # Save model with additional metadata
175        model_data = {
176            'model': self.model,
177            'config': self.config,
178            'feature_names': self.feature_names,
179            'class_names': self.class_names,
180            'trained': self.trained
181        }
182        
183        joblib.dump(model_data, filepath)
184        print(f"Random Forest model saved to {filepath}")
185    
186    def load_model(self, filepath: str):
187        """
188        Load a trained Random Forest model from a file.
189        
190        Args:
191            filepath: Path to the saved model
192        """
193        try:
194            model_data = joblib.load(filepath)
195            
196            # Handle legacy model format
197            if isinstance(model_data, dict):
198                self.model = model_data['model']
199                self.config = model_data.get('config', self.config)
200                self.feature_names = model_data.get('feature_names', [])
201                self.class_names = model_data.get('class_names', [])
202                self.trained = model_data.get('trained', True)
203            else:
204                # Legacy format - just the model
205                self.model = model_data
206                self.trained = True
207            
208            print(f"Random Forest model loaded from {filepath}")
209        except Exception as e:
210            print(f"Error loading model: {e}")
211            raise
212    
213    def get_feature_importance(self) -> Dict[str, float]:
214        """
215        Get feature importance scores.
216        
217        Returns:
218            Dictionary mapping feature names to importance scores
219        """
220        if not self.trained:
221            raise ValueError("Model must be trained to get feature importance")
222        
223        if hasattr(self.model, 'feature_importances_'):
224            return dict(zip(self.feature_names, self.model.feature_importances_))
225        else:
226            return {}
227    
228    def predict_single(self, single_features: Dict) -> int:
229        """
230        Make prediction for a single feature vector.
231        
232        Args:
233            single_features: Dictionary containing features for a single sample
234            
235        Returns:
236            Predicted class
237        """
238        if not self.trained:
239            raise ValueError("Model must be trained before making predictions")
240        
241        # Convert single feature dict to format expected by preprocess_features
242        features_list = [single_features]
243        X, _ = preprocess_features(features_list)
244        
245        return self.model.predict(X)[0]

Random Forest classification model.

This class provides Random Forest classification functionality with comprehensive training, prediction, and evaluation capabilities.

RandomForestModel( n_estimators: int = 100, random_state: int = 42, max_depth: Optional[int] = None)
29    def __init__(self, n_estimators: int = 100, random_state: int = 42, max_depth: Optional[int] = None):
30        super().__init__(
31            name="random_forest",
32            description="Random Forest classifier for gait data classification"
33        )
34        self.config = {
35            'n_estimators': n_estimators,
36            'random_state': random_state,
37            'max_depth': max_depth
38        }
39        self.model = RandomForestClassifier(
40            n_estimators=n_estimators,
41            random_state=random_state,
42            max_depth=max_depth
43        )
44        self.feature_names = []
45        self.class_names = []

Initialize the classification model.

Args: name: Name of the classification model description: Description of the classification model

config
model
feature_names
class_names
def train(self, features: List[Dict], **kwargs):
47    def train(self, features: List[Dict], **kwargs):
48        """
49        Train the Random Forest model on the given features.
50        
51        Args:
52            features: List of feature dictionaries
53            **kwargs: Additional arguments including test_size, validation_split
54        """
55        # Preprocess features
56        X, y = preprocess_features(features)
57        
58        # Store feature and class information
59        self.feature_names = [f"feature_{i}" for i in range(X.shape[1])]
60        self.class_names = list(set(y))
61        
62        # Split data if test_size is specified
63        test_size = kwargs.get('test_size', 0.2)
64        validation_split = kwargs.get('validation_split', True)
65        
66        if validation_split:
67            X_train, X_test, y_train, y_test = train_test_split(
68                X, y, test_size=test_size, random_state=self.config['random_state']
69            )
70            
71            # Train model
72            self.model.fit(X_train, y_train)
73            
74            # Store validation data for later evaluation
75            self.X_test = X_test
76            self.y_test = y_test
77            
78            # Print training accuracy
79            train_accuracy = self.model.score(X_train, y_train)
80            test_accuracy = self.model.score(X_test, y_test)
81            
82            print(f"Training accuracy: {train_accuracy:.4f}")
83            print(f"Validation accuracy: {test_accuracy:.4f}")
84        else:
85            # Train on all data
86            self.model.fit(X, y)
87            train_accuracy = self.model.score(X, y)
88            print(f"Training accuracy: {train_accuracy:.4f}")
89        
90        self.trained = True
91        print("Random Forest model trained successfully.")

Train the Random Forest model on the given features.

Args: features: List of feature dictionaries **kwargs: Additional arguments including test_size, validation_split

def predict(self, features: List[Dict], **kwargs) -> Union[numpy.ndarray, Any]:
 93    def predict(self, features: List[Dict], **kwargs) -> Union[np.ndarray, Any]:
 94        """
 95        Make predictions using the trained Random Forest model.
 96        
 97        Args:
 98            features: List of feature dictionaries
 99            **kwargs: Additional arguments including return_probabilities
100            
101        Returns:
102            Array of predictions or probabilities
103        """
104        if not self.trained:
105            raise ValueError("Model must be trained before making predictions")
106        
107        # Preprocess features
108        X, _ = preprocess_features(features)
109        
110        # Make predictions
111        return_probabilities = kwargs.get('return_probabilities', False)
112        
113        if return_probabilities:
114            return self.model.predict_proba(X)
115        else:
116            return self.model.predict(X)

Make predictions using the trained Random Forest model.

Args: features: List of feature dictionaries **kwargs: Additional arguments including return_probabilities

Returns: Array of predictions or probabilities

def evaluate(self, features: List[Dict], **kwargs) -> Dict[str, float]:
118    def evaluate(self, features: List[Dict], **kwargs) -> Dict[str, float]:
119        """
120        Evaluate the Random Forest model performance.
121        
122        Args:
123            features: List of feature dictionaries
124            **kwargs: Additional arguments including detailed_report
125            
126        Returns:
127            Dictionary containing evaluation metrics
128        """
129        if not self.trained:
130            raise ValueError("Model must be trained before evaluation")
131        
132        # Use validation data if available, otherwise use provided features
133        if hasattr(self, 'X_test') and hasattr(self, 'y_test'):
134            X_test, y_test = self.X_test, self.y_test
135        else:
136            X_test, y_test = preprocess_features(features)
137        
138        # Make predictions
139        y_pred = self.model.predict(X_test)
140        
141        # Calculate metrics
142        accuracy = accuracy_score(y_test, y_pred)
143        conf_matrix = confusion_matrix(y_test, y_pred)
144        
145        # Basic metrics
146        metrics = {
147            'accuracy': accuracy,
148            'confusion_matrix': conf_matrix.tolist()
149        }
150        
151        # Detailed report if requested
152        detailed_report = kwargs.get('detailed_report', False)
153        if detailed_report:
154            class_report = classification_report(y_test, y_pred, output_dict=True)
155            metrics['classification_report'] = class_report
156            
157            # Feature importance
158            if hasattr(self.model, 'feature_importances_'):
159                feature_importance = dict(zip(self.feature_names, self.model.feature_importances_))
160                metrics['feature_importance'] = feature_importance
161        
162        return metrics

Evaluate the Random Forest model performance.

Args: features: List of feature dictionaries **kwargs: Additional arguments including detailed_report

Returns: Dictionary containing evaluation metrics

def save_model(self, filepath: str):
164    def save_model(self, filepath: str):
165        """
166        Save the trained Random Forest model to a file.
167        
168        Args:
169            filepath: Path to save the model
170        """
171        if not self.trained:
172            raise ValueError("Model must be trained before saving")
173        
174        # Save model with additional metadata
175        model_data = {
176            'model': self.model,
177            'config': self.config,
178            'feature_names': self.feature_names,
179            'class_names': self.class_names,
180            'trained': self.trained
181        }
182        
183        joblib.dump(model_data, filepath)
184        print(f"Random Forest model saved to {filepath}")

Save the trained Random Forest model to a file.

Args: filepath: Path to save the model

def load_model(self, filepath: str):
186    def load_model(self, filepath: str):
187        """
188        Load a trained Random Forest model from a file.
189        
190        Args:
191            filepath: Path to the saved model
192        """
193        try:
194            model_data = joblib.load(filepath)
195            
196            # Handle legacy model format
197            if isinstance(model_data, dict):
198                self.model = model_data['model']
199                self.config = model_data.get('config', self.config)
200                self.feature_names = model_data.get('feature_names', [])
201                self.class_names = model_data.get('class_names', [])
202                self.trained = model_data.get('trained', True)
203            else:
204                # Legacy format - just the model
205                self.model = model_data
206                self.trained = True
207            
208            print(f"Random Forest model loaded from {filepath}")
209        except Exception as e:
210            print(f"Error loading model: {e}")
211            raise

Load a trained Random Forest model from a file.

Args: filepath: Path to the saved model

def get_feature_importance(self) -> Dict[str, float]:
213    def get_feature_importance(self) -> Dict[str, float]:
214        """
215        Get feature importance scores.
216        
217        Returns:
218            Dictionary mapping feature names to importance scores
219        """
220        if not self.trained:
221            raise ValueError("Model must be trained to get feature importance")
222        
223        if hasattr(self.model, 'feature_importances_'):
224            return dict(zip(self.feature_names, self.model.feature_importances_))
225        else:
226            return {}

Get feature importance scores.

Returns: Dictionary mapping feature names to importance scores

def predict_single(self, single_features: Dict) -> int:
228    def predict_single(self, single_features: Dict) -> int:
229        """
230        Make prediction for a single feature vector.
231        
232        Args:
233            single_features: Dictionary containing features for a single sample
234            
235        Returns:
236            Predicted class
237        """
238        if not self.trained:
239            raise ValueError("Model must be trained before making predictions")
240        
241        # Convert single feature dict to format expected by preprocess_features
242        features_list = [single_features]
243        X, _ = preprocess_features(features_list)
244        
245        return self.model.predict(X)[0]

Make prediction for a single feature vector.

Args: single_features: Dictionary containing features for a single sample

Returns: Predicted class

def get_dataset_manager():
53def get_dataset_manager():
54    """Get the singleton DatasetManager instance."""
55    return DatasetManager()

Get the singleton DatasetManager instance.

def get_feature_manager():
93def get_feature_manager():
94    """Get the singleton FeatureManager instance."""
95    return FeatureManager()

Get the singleton FeatureManager instance.

def get_preprocessing_manager():
69def get_preprocessing_manager():
70    """Get the singleton PreprocessingManager instance."""
71    return PreprocessingManager()

Get the singleton PreprocessingManager instance.

def get_eda_manager():
51def get_eda_manager():
52    """Get the singleton EDAManager instance."""
53    return EDAManager()

Get the singleton EDAManager instance.

def get_classification_manager():
44def get_classification_manager():
45    """Get the singleton ClassificationManager instance."""
46    return ClassificationManager()

Get the singleton ClassificationManager instance.

def get_all_managers():
194def get_all_managers():
195    """
196    Get all singleton managers.
197    
198    Returns:
199        Dictionary containing all manager instances
200    """
201    return {
202        'dataset': DatasetManager(),
203        'feature': FeatureManager(),
204        'preprocessing': PreprocessingManager(),
205        'eda': EDAManager(),
206        'classification': ClassificationManager()
207    }

Get all singleton managers.

Returns: Dictionary containing all manager instances

def get_available_datasets():
58def get_available_datasets():
59    """Get list of available dataset names."""
60    return DatasetManager().get_available_components()

Get list of available dataset names.

def get_available_extractors():
 98def get_available_extractors():
 99    """Get list of available feature extractor names."""
100    return FeatureManager().get_available_components()

Get list of available feature extractor names.

def get_available_preprocessors():
74def get_available_preprocessors():
75    """Get list of available preprocessor names."""
76    return PreprocessingManager().get_available_components()

Get list of available preprocessor names.

def get_available_analyzers():
56def get_available_analyzers():
57    """Get list of available EDA analyzer names."""
58    return EDAManager().get_available_components()

Get list of available EDA analyzer names.

def get_available_models():
49def get_available_models():
50    """Get list of available classification model names."""
51    return ClassificationManager().get_available_components()

Get list of available classification model names.

def get_system_info():
210def get_system_info():
211    """
212    Get information about the available components in the system.
213    
214    Returns:
215        Dictionary containing system information
216    """
217    return {
218        'version': __version__,
219        'author': __author__,
220        'available_datasets': get_available_datasets(),
221        'available_extractors': get_available_extractors(),
222        'available_preprocessors': get_available_preprocessors(),
223        'available_analyzers': get_available_analyzers(),
224        'available_models': get_available_models(),
225        'architecture': 'Modular with singleton design pattern'
226    }

Get information about the available components in the system.

Returns: Dictionary containing system information

def load_and_analyze_daphnet(data_dir: str, sensor_type: str = 'all', window_size: int = 192):
229def load_and_analyze_daphnet(data_dir: str, sensor_type: str = 'all', window_size: int = 192):
230    """
231    Complete workflow for loading and analyzing Daphnet data.
232    
233    Args:
234        data_dir: Directory containing the Daphnet dataset
235        sensor_type: Type of sensor to analyze ('all', 'thigh', 'shank', 'trunk')
236        window_size: Size of sliding windows for feature extraction
237        
238    Returns:
239        Dictionary containing data, features, and analysis results
240    """
241    # Load dataset
242    loader = DaphnetLoader()
243    data, names = loader.load_data(data_dir)
244    
245    # Create sliding windows
246    windows = loader.create_sliding_windows(data, names, window_size=window_size)
247    
248    # Extract features
249    extractor = GaitFeatureExtractor()
250    features = extractor.extract_features(windows[0]['windows'], fs=64)
251    
252    # Analyze data
253    analyzer = DaphnetVisualizationAnalyzer()
254    analysis = analyzer.analyze(data)
255    
256    return {
257        'data': data,
258        'names': names,
259        'windows': windows,
260        'features': features,
261        'analysis': analysis,
262        'loader': loader,
263        'extractor': extractor,
264        'analyzer': analyzer
265    }

Complete workflow for loading and analyzing Daphnet data.

Args: data_dir: Directory containing the Daphnet dataset sensor_type: Type of sensor to analyze ('all', 'thigh', 'shank', 'trunk') window_size: Size of sliding windows for feature extraction

Returns: Dictionary containing data, features, and analysis results

def load_and_analyze_physionet(data_dir: str, window_size: int = 600, step_size: int = 100):
267def load_and_analyze_physionet(data_dir: str, window_size: int = 600, step_size: int = 100):
268    """
269    Complete workflow for loading and analyzing PhysioNet VGRF data.
270    
271    Args:
272        data_dir: Directory to store/find the PhysioNet dataset
273        window_size: Size of sliding windows for feature extraction (default: 600)
274        step_size: Step size for sliding windows (default: 100)
275        
276    Returns:
277        Dictionary containing data, features, and analysis results
278    """
279    # Load dataset
280    loader = PhysioNetLoader()
281    data, names = loader.load_data(data_dir)
282    
283    # Create sliding windows
284    windows = loader.create_sliding_windows(data, names, window_size=window_size, step_size=step_size)
285    
286    # Extract PhysioNet-specific features
287    extractor = PhysioNetFeatureExtractor()
288    all_features = []
289    
290    for window_dict in windows:
291        if 'windows' in window_dict:
292            features = extractor.extract_features(window_dict['windows'], fs=100)
293            all_features.append({
294                'name': window_dict['name'],
295                'features': features,
296                'metadata': window_dict.get('metadata', {})
297            })
298    
299    return {
300        'data': data,
301        'names': names,
302        'windows': windows,
303        'features': all_features,
304        'labels': loader.get_labels(),
305        'loader': loader,
306        'extractor': extractor
307    }

Complete workflow for loading and analyzing PhysioNet VGRF data.

Args: data_dir: Directory to store/find the PhysioNet dataset window_size: Size of sliding windows for feature extraction (default: 600) step_size: Step size for sliding windows (default: 100)

Returns: Dictionary containing data, features, and analysis results

def train_gait_classifier(features, model_type: str = 'random_forest', **kwargs):
309def train_gait_classifier(features, model_type: str = 'random_forest', **kwargs):
310    """
311    Train a gait classification model.
312    
313    Args:
314        features: List of feature dictionaries
315        model_type: Type of model to train ('random_forest', etc.)
316        **kwargs: Additional arguments for model training
317        
318    Returns:
319        Trained model instance
320    """
321    if model_type == 'random_forest':
322        model = RandomForestModel(**kwargs)
323        model.train(features, **kwargs)
324        return model
325    else:
326        raise ValueError(f"Model type '{model_type}' not supported")

Train a gait classification model.

Args: features: List of feature dictionaries model_type: Type of model to train ('random_forest', etc.) **kwargs: Additional arguments for model training

Returns: Trained model instance

def load_daphnet_data(data_dir: str):
177def load_daphnet_data(data_dir: str):
178    """
179    Legacy function for loading Daphnet data.
180    
181    Args:
182        data_dir: Directory to store the dataset
183        
184    Returns:
185        Tuple of (data_list, names_list)
186    """
187    loader = DaphnetLoader()
188    return loader.load_data(data_dir)

Legacy function for loading Daphnet data.

Args: data_dir: Directory to store the dataset

Returns: Tuple of (data_list, names_list)

def create_sliding_windows(daphnet, daphnet_names, window_size=192, step_size=32):
191def create_sliding_windows(daphnet, daphnet_names, window_size=192, step_size=32):
192    """
193    Legacy function for creating sliding windows.
194    
195    Args:
196        daphnet: List of dataframes containing Daphnet data
197        daphnet_names: List of names of the Daphnet dataframes
198        window_size: Size of the sliding window
199        step_size: Step size for the sliding window
200        
201    Returns:
202        List of dictionaries containing sliding windows for each DataFrame
203    """
204    loader = DaphnetLoader()
205    return loader.create_sliding_windows(daphnet, daphnet_names, window_size, step_size)

Legacy function for creating sliding windows.

Args: daphnet: List of dataframes containing Daphnet data daphnet_names: List of names of the Daphnet dataframes window_size: Size of the sliding window step_size: Step size for the sliding window

Returns: List of dictionaries containing sliding windows for each DataFrame

def load_mobifall_data():
111def load_mobifall_data():
112    """
113    Legacy function for loading MobiFall data.
114    
115    Returns:
116        Tuple of (data_list, names_list)
117    """
118    loader = MobiFallLoader()
119    return loader.load_data("")

Legacy function for loading MobiFall data.

Returns: Tuple of (data_list, names_list)

def load_arduous_data():
111def load_arduous_data():
112    """
113    Legacy function for loading Arduous data.
114    
115    Returns:
116        Tuple of (data_list, names_list)
117    """
118    loader = ArduousLoader()
119    return loader.load_data("")

Legacy function for loading Arduous data.

Returns: Tuple of (data_list, names_list)

def load_physionet_data(data_dir: str) -> Tuple[List[pandas.core.frame.DataFrame], List[str]]:
338def load_physionet_data(data_dir: str) -> Tuple[List[pd.DataFrame], List[str]]:
339    """
340    Legacy function to load PhysioNet data.
341    
342    Args:
343        data_dir: Directory containing the dataset
344        
345    Returns:
346        Tuple of (data_list, names_list)
347    """
348    loader = PhysioNetLoader()
349    return loader.load_data(data_dir)

Legacy function to load PhysioNet data.

Args: data_dir: Directory containing the dataset

Returns: Tuple of (data_list, names_list)

def create_physionet_windows( data: List[pandas.core.frame.DataFrame], names: List[str], window_size: int = 600, step_size: int = 100) -> List[Dict]:
352def create_physionet_windows(data: List[pd.DataFrame], names: List[str], 
353                           window_size: int = 600, step_size: int = 100) -> List[Dict]:
354    """
355    Legacy function to create sliding windows from PhysioNet data.
356    
357    Args:
358        data: List of DataFrames
359        names: List of names
360        window_size: Size of sliding window
361        step_size: Step size for sliding window
362        
363    Returns:
364        List of sliding window dictionaries
365    """
366    loader = PhysioNetLoader()
367    return loader.create_sliding_windows(data, names, window_size, step_size) 

Legacy function to create sliding windows from PhysioNet data.

Args: data: List of DataFrames names: List of names window_size: Size of sliding window step_size: Step size for sliding window

Returns: List of sliding window dictionaries

def load_harup_data(data_dir: str, subjects=None, activities=None, trials=None):
399def load_harup_data(data_dir: str, subjects=None, activities=None, trials=None):
400    """
401    Legacy function for loading HAR-UP data.
402    
403    Args:
404        data_dir: Directory containing the dataset
405        subjects: List of subject IDs to load (default: all subjects)
406        activities: List of activity IDs to load (default: all activities)
407        trials: List of trial IDs to load (default: all trials)
408        
409    Returns:
410        Tuple of (data_list, names_list)
411    """
412    loader = HARUPLoader()
413    return loader.load_data(data_dir, subjects, activities, trials)

Legacy function for loading HAR-UP data.

Args: data_dir: Directory containing the dataset subjects: List of subject IDs to load (default: all subjects) activities: List of activity IDs to load (default: all activities) trials: List of trial IDs to load (default: all trials)

Returns: Tuple of (data_list, names_list)

def create_harup_windows(harup_data, harup_names, window_size=100, step_size=50):
416def create_harup_windows(harup_data, harup_names, window_size=100, step_size=50):
417    """
418    Legacy function for creating sliding windows from HAR-UP data.
419    
420    Args:
421        harup_data: List of dataframes containing HAR-UP data
422        harup_names: List of names of the HAR-UP dataframes
423        window_size: Size of the sliding window
424        step_size: Step size for the sliding window
425        
426    Returns:
427        List of dictionaries containing sliding windows for each DataFrame
428    """
429    loader = HARUPLoader()
430    return loader.create_sliding_windows(harup_data, harup_names, window_size, step_size)

Legacy function for creating sliding windows from HAR-UP data.

Args: harup_data: List of dataframes containing HAR-UP data harup_names: List of names of the HAR-UP dataframes window_size: Size of the sliding window step_size: Step size for the sliding window

Returns: List of dictionaries containing sliding windows for each DataFrame

def extract_harup_features(windows_data, time_domain=True, freq_domain=True):
433def extract_harup_features(windows_data, time_domain=True, freq_domain=True):
434    """
435    Legacy function for extracting features from HAR-UP windows.
436    
437    Args:
438        windows_data: List of dictionaries containing sliding windows
439        time_domain: Whether to extract time domain features
440        freq_domain: Whether to extract frequency domain features
441        
442    Returns:
443        List of dictionaries containing extracted features
444    """
445    loader = HARUPLoader()
446    return loader.extract_features(windows_data, time_domain, freq_domain)

Legacy function for extracting features from HAR-UP windows.

Args: windows_data: List of dictionaries containing sliding windows time_domain: Whether to extract time domain features freq_domain: Whether to extract frequency domain features

Returns: List of dictionaries containing extracted features

def download_dataset(dataset_name, data_dir):
25def download_dataset(dataset_name, data_dir):
26    """Download the dataset."""
27    if dataset_name == "daphnet":
28        download_daphnet_data(data_dir)
29    elif dataset_name == "mobifall":
30        download_mobifall_data(data_dir)
31    elif dataset_name == "arduous":
32        download_arduous_data(data_dir)
33    elif dataset_name == "harup":
34        download_harup_data(data_dir)
35    elif dataset_name == "urfall":
36        download_urfall_data(data_dir)
37    elif dataset_name == "physionet":
38        # PhysioNet dataset is handled by the PhysioNetLoader itself
39        pass
40    else:
41        raise ValueError(f"Dataset {dataset_name} not supported.")

Download the dataset.

def extract_dataset(dataset_name, data_dir):
243def extract_dataset(dataset_name, data_dir):
244    """Extract the dataset."""
245    if dataset_name == "daphnet":
246        extract_daphnet_data(data_dir)
247    elif dataset_name == "mobifall":
248        extract_mobifall_data(data_dir)
249    elif dataset_name == "arduous":
250        extract_arduous_data(data_dir)
251    elif dataset_name == "harup":
252        extract_harup_data(data_dir)
253    elif dataset_name == "urfall":
254        extract_urfall_data(data_dir)
255    elif dataset_name == "physionet":
256        # PhysioNet dataset is handled by the PhysioNetLoader itself
257        pass
258    else:
259        raise ValueError(f"Dataset {dataset_name} not supported.")

Extract the dataset.

def sliding_window(data, window_size, step_size):
320def sliding_window(data, window_size, step_size):
321    if window_size <= 0 or step_size <= 0:
322        return []
323    if len(data) < window_size:
324        return []
325    num_windows = (len(data) - window_size) // step_size + 1
326    windows = []
327    for i in range(num_windows):
328        start = i * step_size
329        end = start + window_size
330        windows.append(data[start:end])
331    return windows
def calculate_mean(signal):
133def calculate_mean(signal):
134    """Calculate the mean of the signal."""
135    return np.mean(signal)

Calculate the mean of the signal.

def calculate_standard_deviation(signal):
64def calculate_standard_deviation(signal):
65    """
66    Calculate the standard deviation of a signal.
67    Args:
68        signal (np.array): Input signal.
69    Returns:
70        std_dev (float): Standard deviation.
71    """
72    return np.std(signal)

Calculate the standard deviation of a signal. Args: signal (np.array): Input signal. Returns: std_dev (float): Standard deviation.

def calculate_variance(signal):
 96def calculate_variance(signal):
 97    """
 98    Calculate the variance of a signal.
 99    Args:
100        signal (np.array): Input signal.
101    Returns:
102        variance (float): Variance.
103    """
104    return np.var(signal)

Calculate the variance of a signal. Args: signal (np.array): Input signal. Returns: variance (float): Variance.

def calculate_skewness(signal):
149def calculate_skewness(signal):
150    """Calculate the skewness of the signal."""
151    try:
152        return skew(signal)
153    except Exception as e:
154        print(f"An error occurred in skewness: {e}")
155        return 0

Calculate the skewness of the signal.

def calculate_kurtosis(signal):
106def calculate_kurtosis(signal):
107    """
108    Calculate the kurtosis of a signal.
109    Args:
110        signal (np.array): Input signal.
111    Returns:
112        kurtosis_value (float): Kurtosis.
113    """
114    try:
115        return kurtosis(signal, fisher=False)
116    except Exception as e:
117        print(f"An error occurred in feature 'kurtosis': {e}")
118        return 0

Calculate the kurtosis of a signal. Args: signal (np.array): Input signal. Returns: kurtosis_value (float): Kurtosis.

def calculate_root_mean_square(signal):
157def calculate_root_mean_square(signal):
158    """Calculate the root mean square of the signal."""
159    return np.sqrt(np.mean(np.square(signal)))

Calculate the root mean square of the signal.

def calculate_range(signal):
161def calculate_range(signal):
162    """Calculate the range of the signal."""
163    return np.max(signal) - np.min(signal)

Calculate the range of the signal.

def calculate_median(signal):
145def calculate_median(signal):
146    """Calculate the median of the signal."""
147    return np.median(signal)

Calculate the median of the signal.

def calculate_mode(signal):
194def calculate_mode(signal):
195    """Calculate the mode of the signal."""
196    values, counts = np.unique(signal, return_counts=True)
197    return values[np.argmax(counts)]

Calculate the mode of the signal.

def calculate_mean_absolute_value(signal):
206def calculate_mean_absolute_value(signal):
207    """Calculate the mean absolute value of the signal."""
208    return np.mean(np.abs(signal))

Calculate the mean absolute value of the signal.

def calculate_median_absolute_deviation(signal):
210def calculate_median_absolute_deviation(signal):
211    """Calculate the median absolute deviation of the signal."""
212    return np.median(np.abs(signal - np.median(signal)))

Calculate the median absolute deviation of the signal.

def calculate_peak_height(signal):
180def calculate_peak_height(signal):
181    """Calculate the peak height of the signal."""
182    peaks, _ = find_peaks(signal)
183    return np.max(signal[peaks]) if len(peaks) > 0 else 0

Calculate the peak height of the signal.

def calculate_stride_times(signal, fs):
 9def calculate_stride_times(signal, fs):
10    """
11    Calculate stride times from a signal using peak detection.
12    Args:
13        signal (np.array): Input signal.
14        fs (int): Sampling frequency.
15    Returns:
16        avg_stride_time (float): Average stride time.
17    """
18    peaks, _ = find_peaks(signal)
19    stride_times = np.diff(peaks) / fs
20    avg_stride_time = np.mean(stride_times) if len(stride_times) > 0 else 0
21    return avg_stride_time

Calculate stride times from a signal using peak detection. Args: signal (np.array): Input signal. fs (int): Sampling frequency. Returns: avg_stride_time (float): Average stride time.

def calculate_step_time(signal, fs):
120def calculate_step_time(signal, fs):
121    """
122    Calculate step times from a signal using peak detection.
123    Args:
124        signal (np.array): Input signal.
125        fs (int): Sampling frequency.
126    Returns:
127        step_times (np.array): Array of step times.
128    """
129    peaks, _ = find_peaks(signal)
130    step_times = np.diff(peaks) / fs
131    return step_times

Calculate step times from a signal using peak detection. Args: signal (np.array): Input signal. fs (int): Sampling frequency. Returns: step_times (np.array): Array of step times.

def calculate_cadence(signal, fs):
199def calculate_cadence(signal, fs):
200    """Calculate the cadence (steps per minute) of the signal."""
201    peaks, _ = find_peaks(signal)
202    step_count = len(peaks)
203    duration = len(signal) / fs
204    return (step_count / duration) * 60

Calculate the cadence (steps per minute) of the signal.

def calculate_freezing_index(signal, fs):
50def calculate_freezing_index(signal, fs):
51    """
52    Calculate the freezing index of a signal.
53    Args:
54        signal (np.array): Input signal.
55        fs (int): Sampling frequency.
56    Returns:
57        freezing_index (float): Freezing index.
58    """
59    power_3_8 = calculate_power(signal, fs, (3, 8))
60    power_0_5_3 = calculate_power(signal, fs, (0.5, 3))
61    freezing_index = power_3_8 / power_0_5_3 if power_0_5_3 != 0 else 0
62    return freezing_index

Calculate the freezing index of a signal. Args: signal (np.array): Input signal. fs (int): Sampling frequency. Returns: freezing_index (float): Freezing index.

def calculate_dominant_frequency(signal, fs):
169def calculate_dominant_frequency(signal, fs):
170    """Calculate the dominant frequency of the signal."""
171    try:
172        fft_values = np.abs(fft(signal))
173        freqs = np.fft.fftfreq(len(signal), 1 / fs)
174        dominant_freq = freqs[np.argmax(fft_values)]
175        return dominant_freq
176    except Exception as e:
177        print(f"An error occurred: {e}")
178        return 0

Calculate the dominant frequency of the signal.

def calculate_peak_frequency(signal, fs):
214def calculate_peak_frequency(signal, fs):
215    """Calculate the peak frequency of the signal."""
216    try:
217        f, Pxx = welch(signal, fs=fs, nperseg=min(len(signal), 192))  # Ensure nperseg ≤ length
218        return f[np.argmax(Pxx)]
219    except Exception as e:
220        print(f"An error occurred in feature 'peak_frequency': {e}")
221        return 0

Calculate the peak frequency of the signal.

def calculate_power_spectral_entropy(signal, fs):
233def calculate_power_spectral_entropy(signal, fs):
234    """Calculate the power spectral entropy of the signal."""
235    try:
236        f, Pxx = welch(signal, fs=fs, nperseg=min(len(signal), 192))  # Ensure nperseg ≤ length
237        Pxx_norm = Pxx / np.sum(Pxx)
238        return -np.sum(Pxx_norm * np.log2(Pxx_norm + np.finfo(float).eps))
239    except Exception as e:
240        print(f"An error occurred in feature 'power spectral entropy': {e}")
241        return 0

Calculate the power spectral entropy of the signal.

def calculate_principal_harmonic_frequency(signal, fs):
243def calculate_principal_harmonic_frequency(signal, fs):
244    """Calculate the principal harmonic frequency of the signal."""
245    try:
246        fft_values = np.abs(fft(signal))
247        freqs = np.fft.fftfreq(len(signal), 1 / fs)
248        return freqs[np.argmax(fft_values)]
249    except Exception as e:
250        print(f"An error occurred in feature 'principal_harmonic_frequency': {e}")
251        return 0

Calculate the principal harmonic frequency of the signal.

def calculate_entropy(signal):
74def calculate_entropy(signal):
75    """
76    Calculate the entropy of a signal.
77    Args:
78        signal (np.array): Input signal.
79    Returns:
80        entropy_value (float): Entropy.
81    """
82    value, counts = np.unique(signal, return_counts=True)
83    probabilities = counts / len(signal)
84    return entropy(probabilities, base=2)

Calculate the entropy of a signal. Args: signal (np.array): Input signal. Returns: entropy_value (float): Entropy.

def calculate_interquartile_range(signal):
185def calculate_interquartile_range(signal):
186    """Calculate the interquartile range of the signal."""
187    try:
188        q75, q25 = np.percentile(signal, [75, 25])
189        return q75 - q25
190    except Exception as e:
191        print(f"An error occurred in feature 'interquartile_range': {e}")
192        return 0

Calculate the interquartile range of the signal.

def calculate_correlation(signal1, signal2):
165def calculate_correlation(signal1, signal2):
166    """Calculate the correlation between two signals."""
167    return np.corrcoef(signal1, signal2)[0, 1]

Calculate the correlation between two signals.

def calculate_auto_regression_coefficients(signal, order=3):
253def calculate_auto_regression_coefficients(signal, order=3):
254    """Calculate the auto-regression coefficients of the signal."""
255    try:
256        model = AutoReg(signal, lags=order)
257        results = model.fit()
258        return results.params
259    except Exception as e:
260        print(f"An error occurred in feature 'auto_regression_coefficients': {e}")
261        return 0

Calculate the auto-regression coefficients of the signal.

def get_mean_for_windows(windows):
486def get_mean_for_windows(windows):
487    return [calculate_mean(window) for window in windows]
def get_standard_deviation_for_windows(windows):
489def get_standard_deviation_for_windows(windows):
490    return [calculate_standard_deviation(window) for window in windows]
def get_variance_for_windows(windows):
492def get_variance_for_windows(windows):
493    return [calculate_variance(window) for window in windows]
def clip_sliding_windows(data, min_val=-1, max_val=1):
15def clip_sliding_windows(data, min_val=-1, max_val=1):
16    """
17    Clip values in the sliding windows to be within a specified range.
18    """
19    return np.clip(data, min_val, max_val)

Clip values in the sliding windows to be within a specified range.

def remove_noise(data, window_size=5):
21def remove_noise(data, window_size=5):
22    """
23    Apply a moving average filter to reduce noise.
24    """
25    return data.rolling(window=window_size, center=True).mean().fillna(method="bfill").fillna(method="ffill")

Apply a moving average filter to reduce noise.

def remove_outliers(data, threshold=3):
27def remove_outliers(data, threshold=3):
28    """
29    Remove outliers beyond a given threshold using the Z-score method.
30    """
31    mean, std = data.mean(), data.std()
32    return data[(data - mean).abs() <= threshold * std]

Remove outliers beyond a given threshold using the Z-score method.

def remove_baseline(data):
34def remove_baseline(data):
35    """
36    Remove baseline by subtracting the mean.
37    """
38    return data - data.mean()

Remove baseline by subtracting the mean.

def remove_drift(data, cutoff=0.01, fs=100):
40def remove_drift(data, cutoff=0.01, fs=100):
41    """
42    Remove low-frequency drift using a high-pass filter.
43    """
44    b, a = butter(1, cutoff / (fs / 2), btype='highpass')
45    return filtfilt(b, a, data)

Remove low-frequency drift using a high-pass filter.

def remove_artifacts(data, method='interpolate'):
47def remove_artifacts(data, method="interpolate"):
48    """
49    Remove artifacts by interpolating missing values.
50    """
51    return data.interpolate(method="linear").fillna(method="bfill").fillna(method="ffill")

Remove artifacts by interpolating missing values.

def remove_trend(data, order=2):
53def remove_trend(data, order=2):
54    """
55    Remove trends using polynomial fitting.
56    """
57    x = np.arange(len(data))
58    poly_coeffs = np.polyfit(x, data, order)
59    trend = np.polyval(poly_coeffs, x)
60    return data - trend

Remove trends using polynomial fitting.

def remove_dc_offset(data):
62def remove_dc_offset(data):
63    """
64    Remove DC offset by subtracting the mean.
65    """
66    return data - data.mean()

Remove DC offset by subtracting the mean.

def remove_high_frequency_noise(data, cutoff=10, fs=100):
68def remove_high_frequency_noise(data, cutoff=10, fs=100):
69    """
70    Apply a low-pass filter to remove high-frequency noise.
71    """
72    b, a = butter(1, cutoff / (fs / 2), btype='lowpass')
73    return filtfilt(b, a, data)

Apply a low-pass filter to remove high-frequency noise.

def remove_low_frequency_noise(data, cutoff=0.5, fs=100):
75def remove_low_frequency_noise(data, cutoff=0.5, fs=100):
76    """
77    Apply a high-pass filter to remove low-frequency noise.
78    """
79    b, a = butter(1, cutoff / (fs / 2), btype='highpass')
80    return filtfilt(b, a, data)

Apply a high-pass filter to remove low-frequency noise.

def plot_thigh_data(daphnetThigh, daphnetNames, i):
19def plot_thigh_data(daphnetThigh, daphnetNames, i):
20    """
21    Plot thigh acceleration data for a specific dataset.
22    Args:
23        daphnetThigh (list): List of DataFrames containing thigh acceleration data.
24        daphnetNames (list): List of dataset names.
25        i (int): Index of the dataset to plot.
26    """
27    print(daphnetNames[i])
28    fig, axes = plt.subplots(4, 1, sharex=True, sharey=True, figsize=(20, 16))
29    fig.suptitle("Thigh Data from " + daphnetNames[i])
30    plt.xlabel("Time")
31
32    df = daphnetThigh[i]
33    df = df[df.annotations > 0]  # Filter out rows with no annotations
34    neg = df[df.annotations == 1]  # No freeze
35    pos = df[df.annotations == 2]  # Freeze
36
37    # Plot horizontal forward thigh acceleration
38    ax1 = axes[0]
39    ax1.plot(df.thigh_h_fd)
40    ax1.set_ylabel("Horizontal Forward Thigh Acceleration")
41    ax1.scatter(neg.index, neg.thigh_h_fd, c='orange', label="no freeze")
42    ax1.scatter(pos.index, pos.thigh_h_fd, c='purple', label="freeze")
43    ax1.legend()
44
45    # Plot vertical thigh acceleration
46    ax2 = axes[1]
47    ax2.plot(df.thigh_v)
48    ax2.set_ylabel("Vertical Thigh Acceleration")
49    ax2.scatter(neg.index, neg.thigh_v, c='orange', label="no freeze")
50    ax2.scatter(pos.index, pos.thigh_v, c='purple', label="freeze")
51    ax2.legend()
52
53    # Plot horizontal lateral thigh acceleration
54    ax3 = axes[2]
55    ax3.plot(df.thigh_h_l)
56    ax3.set_ylabel("Horizontal Lateral Thigh Acceleration")
57    ax3.scatter(neg.index, neg.thigh_h_l, c='orange', label="no freeze")
58    ax3.scatter(pos.index, pos.thigh_h_l, c='purple', label="freeze")
59    ax3.legend()
60
61    # Plot overall thigh acceleration
62    ax4 = axes[3]
63    ax4.plot(df.thigh)
64    ax4.set_ylabel("Overall Thigh Acceleration")
65    ax4.scatter(neg.index, neg.thigh, c='orange', label="no freeze")
66    ax4.scatter(pos.index, pos.thigh, c='purple', label="freeze")
67    ax4.legend()
68
69    plt.tight_layout()
70    plt.show()

Plot thigh acceleration data for a specific dataset. Args: daphnetThigh (list): List of DataFrames containing thigh acceleration data. daphnetNames (list): List of dataset names. i (int): Index of the dataset to plot.

def plot_shank_data(daphnetShank, daphnetNames, i):
 73def plot_shank_data(daphnetShank, daphnetNames, i):
 74    """
 75    Plot shank acceleration data for a specific dataset.
 76    Args:
 77        daphnetShank (list): List of DataFrames containing shank acceleration data.
 78        daphnetNames (list): List of dataset names.
 79        i (int): Index of the dataset to plot.
 80    """
 81    print(daphnetNames[i])
 82    fig, axes = plt.subplots(4, 1, sharex=True, sharey=True, figsize=(20, 16))
 83    fig.suptitle("Shank Data from " + daphnetNames[i])
 84    plt.xlabel("Time")
 85
 86    df = daphnetShank[i]
 87    df["shank"] = np.sqrt(df["shank_h_l"]**2 + df["shank_v"]**2 + df["shank_h_fd"]**2)
 88    df = df[df.annotations > 0]
 89    neg = df[df.annotations == 1]
 90    pos = df[df.annotations == 2]
 91
 92    ax1 = axes[0]
 93    ax1.plot(df.shank_h_fd)
 94    ax1.set_ylabel("Horizontal Forward Shank Acceleration")
 95    ax1.scatter(neg.index, neg.shank_h_fd, c='orange', label="no freeze")
 96    ax1.scatter(pos.index, pos.shank_h_fd, c='purple', label="freeze")
 97    ax1.legend()
 98
 99    ax2 = axes[1]
100    ax2.plot(df.shank_v)
101    ax2.set_ylabel("Vertical Shank Acceleration")
102    ax2.scatter(neg.index, neg.shank_v, c='orange', label="no freeze")
103    ax2.scatter(pos.index, pos.shank_v, c='purple', label="freeze")
104    ax2.legend()
105
106    ax3 = axes[2]
107    ax3.plot(df.shank_h_l)
108    ax3.set_ylabel("Horizontal Lateral Shank Acceleration")
109    ax3.scatter(neg.index, neg.shank_h_l, c='orange', label="no freeze")
110    ax3.scatter(pos.index, pos.shank_h_l, c='purple', label="freeze")
111    ax3.legend()
112
113    ax4 = axes[3]
114    ax4.plot(df.shank)
115    ax4.set_ylabel("Overall Shank Acceleration")
116    ax4.scatter(neg.index, neg.shank, c='orange', label="no freeze")
117    ax4.scatter(pos.index, pos.shank, c='purple', label="freeze")
118    ax4.legend()
119
120    plt.tight_layout()
121    plt.show()

Plot shank acceleration data for a specific dataset. Args: daphnetShank (list): List of DataFrames containing shank acceleration data. daphnetNames (list): List of dataset names. i (int): Index of the dataset to plot.

def plot_trunk_data(daphnetTrunk, daphnetNames, i):
124def plot_trunk_data(daphnetTrunk, daphnetNames, i):
125    """
126    Plot trunk acceleration data for a specific dataset.
127    Args:
128        daphnetTrunk (list): List of DataFrames containing trunk acceleration data.
129        daphnetNames (list): List of dataset names.
130        i (int): Index of the dataset to plot.
131    """
132    print(daphnetNames[i])
133    fig, axes = plt.subplots(4, 1, sharex=True, sharey=True, figsize=(20, 16))
134    fig.suptitle("Trunk Data from " + daphnetNames[i])
135    plt.xlabel("Time")
136
137    df = daphnetTrunk[i]
138    df["trunk"] = np.sqrt(df["trunk_h_l"]**2 + df["trunk_v"]**2 + df["trunk_h_fd"]**2)
139    df = df[df.annotations > 0]
140    neg = df[df.annotations == 1]
141    pos = df[df.annotations == 2]
142
143    ax1 = axes[0]
144    ax1.plot(df.trunk_h_fd)
145    ax1.set_ylabel("Horizontal Forward Trunk Acceleration")
146    ax1.scatter(neg.index, neg.trunk_h_fd, c='orange', label="no freeze")
147    ax1.scatter(pos.index, pos.trunk_h_fd, c='purple', label="freeze")
148    ax1.legend()
149
150    ax2 = axes[1]
151    ax2.plot(df.trunk_v)
152    ax2.set_ylabel("Vertical Trunk Acceleration")
153    ax2.scatter(neg.index, neg.trunk_v, c='orange', label="no freeze")
154    ax2.scatter(pos.index, pos.trunk_v, c='purple', label="freeze")
155    ax2.legend()
156
157    ax3 = axes[2]
158    ax3.plot(df.trunk_h_l)
159    ax3.set_ylabel("Horizontal Lateral Trunk Acceleration")
160    ax3.scatter(neg.index, neg.trunk_h_l, c='orange', label="no freeze")
161    ax3.scatter(pos.index, pos.trunk_h_l, c='purple', label="freeze")
162    ax3.legend()
163
164    ax4 = axes[3]
165    ax4.plot(df.trunk)
166    ax4.set_ylabel("Overall Trunk Acceleration")
167    ax4.scatter(neg.index, neg.trunk, c='orange', label="no freeze")
168    ax4.scatter(pos.index, pos.trunk, c='purple', label="freeze")
169    ax4.legend()
170
171    plt.tight_layout()
172    plt.show()

Plot trunk acceleration data for a specific dataset. Args: daphnetTrunk (list): List of DataFrames containing trunk acceleration data. daphnetNames (list): List of dataset names. i (int): Index of the dataset to plot.

def plot_all_data(daphnetThigh, daphnetShank, daphnetTrunk, daphnetNames, i):
191def plot_all_data(daphnetThigh, daphnetShank, daphnetTrunk, daphnetNames, i):
192    """
193    Plot thigh, shank, and trunk acceleration data for a specific dataset.
194    Args:
195        daphnetThigh (list): List of DataFrames containing thigh acceleration data.
196        daphnetShank (list): List of DataFrames containing shank acceleration data.
197        daphnetTrunk (list): List of DataFrames containing trunk acceleration data.
198        daphnetNames (list): List of dataset names.
199        i (int): Index of the dataset to plot.
200    """
201    plot_thigh_data(daphnetThigh, daphnetNames, i)
202    plot_shank_data(daphnetShank, daphnetNames, i)
203    plot_trunk_data(daphnetTrunk, daphnetNames, i)

Plot thigh, shank, and trunk acceleration data for a specific dataset. Args: daphnetThigh (list): List of DataFrames containing thigh acceleration data. daphnetShank (list): List of DataFrames containing shank acceleration data. daphnetTrunk (list): List of DataFrames containing trunk acceleration data. daphnetNames (list): List of dataset names. i (int): Index of the dataset to plot.

def plot_all_thigh_data(daphnetThigh, daphnetNames):
175def plot_all_thigh_data(daphnetThigh, daphnetNames):
176    """Plot thigh acceleration data for all datasets."""
177    for i in range(len(daphnetThigh)):
178        plot_thigh_data(daphnetThigh, daphnetNames, i)

Plot thigh acceleration data for all datasets.

def plot_all_shank_data(daphnetShank, daphnetNames):
180def plot_all_shank_data(daphnetShank, daphnetNames):
181    """Plot shank acceleration data for all datasets."""
182    for i in range(len(daphnetShank)):
183        plot_shank_data(daphnetShank, daphnetNames, i)

Plot shank acceleration data for all datasets.

def plot_all_trunk_data(daphnetTrunk, daphnetNames):
185def plot_all_trunk_data(daphnetTrunk, daphnetNames):
186    """Plot trunk acceleration data for all datasets."""
187    for i in range(len(daphnetTrunk)):
188        plot_trunk_data(daphnetTrunk, daphnetNames, i)

Plot trunk acceleration data for all datasets.

def plot_all_datasets(daphnetThigh, daphnetShank, daphnetTrunk, daphnetNames):
205def plot_all_datasets(daphnetThigh, daphnetShank, daphnetTrunk, daphnetNames):
206    """Plot thigh, shank, and trunk acceleration data for all datasets."""
207    for i in range(len(daphnetThigh)):
208        plot_all_data(daphnetThigh, daphnetShank, daphnetTrunk, daphnetNames, i)

Plot thigh, shank, and trunk acceleration data for all datasets.

def plot_sensor_with_features( sliding_windows, features, start_idx, end_idx, sensor_name='shank', num_windows=10, save=False):
 11def plot_sensor_with_features(sliding_windows, features, start_idx, end_idx, sensor_name="shank", num_windows=10, save=False):
 12    """
 13    @brief Plots sliding windows of a sensor's time series data with overlaid statistical features.
 14
 15    This function plots the first `num_windows` sliding windows within the given `start_idx` and `end_idx`
 16    for a specified sensor and overlays feature values at their corresponding time indices. 
 17    It also displays entropy and dominant frequency in a separate plot.
 18
 19    @param[in] sliding_windows List of dictionaries, where each dictionary contains:
 20                   - 'name': sensor name (str)
 21                   - 'data': List of time-series windows (each as a Pandas Series)
 22    @param[in] features List of dictionaries, where each dictionary contains:
 23                   - 'name': sensor name (str)
 24                   - 'features': Dictionary of extracted feature lists
 25    @param[in] start_idx Start index of the time window to be plotted.
 26    @param[in] end_idx End index of the time window to be plotted.
 27    @param[in] sensor_name Name of the sensor to be plotted (default: "shank").
 28    @param[in] num_windows Number of sliding windows to plot (default: 10).
 29    @param[in] save If True, saves the plot to a file instead of displaying it.
 30
 31    @return None
 32    """
 33
 34    fig, axes = plt.subplots(2, 1, figsize=(20, 10), gridspec_kw={'height_ratios': [3, 1]})
 35    
 36    # Extract sensor windows
 37    sensor_windows = next((sw['data'] for sw in sliding_windows if sw['name'] == sensor_name), None)
 38    if sensor_windows is None:
 39        print(f"Sensor '{sensor_name}' not found in sliding_windows.")
 40        return
 41
 42    # Extract corresponding features
 43    sensor_features = next((feat['features'] for feat in features if feat['name'] == sensor_name), None)
 44    if sensor_features is None:
 45        print(f"Sensor '{sensor_name}' not found in features.")
 46        return
 47
 48    # Filter windows based on start_idx and end_idx
 49    filtered_windows = [series for series in sensor_windows if start_idx <= series.index[0] and series.index[-1] <= end_idx]
 50    
 51    if not filtered_windows:
 52        print(f"No windows found in the specified index range ({start_idx} - {end_idx}).")
 53        return
 54
 55    # Store entropy & frequency features for separate plotting
 56    entropy_values = []
 57    dominant_frequencies = []
 58
 59    # Plot first `num_windows` windows
 60    for i in range(min(num_windows, len(filtered_windows))):
 61        series = filtered_windows[i]  # Each window is a Pandas Series
 62
 63        # Extract time and signal values
 64        time_values = series.index.to_numpy()  # Time is the index
 65        signal_values = series.values  # Sensor readings
 66
 67        # Determine actual start and end indices for this window
 68        window_start, window_end = time_values[0], time_values[-1]
 69
 70        # Plot time series data
 71        axes[0].plot(time_values, signal_values, alpha=0.6)
 72
 73        # Mark start and end of each window with vertical dotted lines
 74        axes[0].axvline(x=window_start, color='black', linestyle='dotted', alpha=0.7)
 75        axes[0].axvline(x=window_end, color='black', linestyle='dotted', alpha=0.7)
 76
 77        # Overlay statistical features
 78        for feature, marker in zip(['mean', 'rms', 'peak_height', 'mode', 'median'], ['x', 'o', 'v', '<', '^']):
 79            if feature in sensor_features and len(sensor_features[feature]) > i:
 80                feature_value = sensor_features[feature][i]
 81                if feature_value != 0:  # Skip zero values
 82                    closest_index = np.argmin(np.abs(signal_values - feature_value))
 83                    closest_time = time_values[closest_index]
 84                    axes[0].scatter(closest_time, feature_value, color='red', marker=marker, s=100)
 85
 86        # Store entropy & frequency features for separate plotting
 87        if 'entropy' in sensor_features and len(sensor_features['entropy']) > i:
 88            entropy_values.append(sensor_features['entropy'][i])
 89        if 'dominant_frequency' in sensor_features and len(sensor_features['dominant_frequency']) > i:
 90            dominant_frequencies.append(sensor_features['dominant_frequency'][i])
 91
 92    # Labels and title for time-series plot
 93    axes[0].set_xlabel('Time')
 94    axes[0].set_ylabel(f'{sensor_name} Signal')
 95    axes[0].set_title(f'First {num_windows} windows of {sensor_name} in range {start_idx}-{end_idx} with Features')
 96
 97    # Frequency-domain & entropy plot (axes[1])
 98    if dominant_frequencies:
 99        window_indices = list(range(len(dominant_frequencies)))
100        axes[1].plot(window_indices, dominant_frequencies, label="Dominant Frequency", marker="o", linestyle="dashed", color="blue")
101    
102    if entropy_values:
103        axes[1].bar(window_indices, entropy_values, alpha=0.6, label="Entropy", color="green")
104
105    axes[1].set_xlabel("Window Index")
106    axes[1].set_ylabel("Feature Value")
107    axes[1].set_title("Frequency & Entropy Features")
108    axes[1].legend()
109
110    plt.tight_layout()
111
112    # Save or show plot
113    if save:
114        file_path = input("Enter the file path to save the plot (e.g., 'plot.png'): ")
115        plt.savefig(file_path, dpi=300)
116        print(f"Plot saved at {file_path}")
117    else:
118        plt.show()

@brief Plots sliding windows of a sensor's time series data with overlaid statistical features.

This function plots the first num_windows sliding windows within the given start_idx and end_idx for a specified sensor and overlays feature values at their corresponding time indices. It also displays entropy and dominant frequency in a separate plot.

@param[in] sliding_windows List of dictionaries, where each dictionary contains: - 'name': sensor name (str) - 'data': List of time-series windows (each as a Pandas Series) @param[in] features List of dictionaries, where each dictionary contains: - 'name': sensor name (str) - 'features': Dictionary of extracted feature lists @param[in] start_idx Start index of the time window to be plotted. @param[in] end_idx End index of the time window to be plotted. @param[in] sensor_name Name of the sensor to be plotted (default: "shank"). @param[in] num_windows Number of sliding windows to plot (default: 10). @param[in] save If True, saves the plot to a file instead of displaying it.

@return None

def create_random_forest_model(n_estimators=100, random_state=42, max_depth=None):
249def create_random_forest_model(n_estimators=100, random_state=42, max_depth=None):
250    """
251    Create a Random Forest model with specified parameters.
252    
253    Args:
254        n_estimators: Number of trees in the forest
255        random_state: Random state for reproducibility
256        max_depth: Maximum depth of the tree
257        
258    Returns:
259        RandomForestModel instance
260    """
261    return RandomForestModel(n_estimators=n_estimators, random_state=random_state, max_depth=max_depth)

Create a Random Forest model with specified parameters.

Args: n_estimators: Number of trees in the forest random_state: Random state for reproducibility max_depth: Maximum depth of the tree

Returns: RandomForestModel instance

def preprocess_features(features):
14def preprocess_features(features):
15    """
16    Convert the features dictionary into X (feature matrix) and y (labels),
17    ensuring all feature vectors have a consistent length.
18    """
19    X = []
20    y = []
21    feature_lengths = []  # Track feature lengths to standardize across sensors
22
23    for sensor_dict in features:
24        sensor_name = sensor_dict["name"]
25        sensor_features = sensor_dict["features"]
26        sensor_annotations = sensor_dict["annotations"]
27
28        num_windows = len(sensor_annotations)  # Expected number of windows
29        feature_arrays = []
30
31        for key in sensor_features:
32            feature_array = sensor_features[key]  # Extract the feature list
33            feature_array = np.array(feature_array, dtype=object)  # Convert to NumPy object array
34
35            # Ensure it's a list of equal-length vectors
36            if isinstance(feature_array[0], (list, np.ndarray)):
37                print(f"Fixing inconsistent feature '{key}' in sensor '{sensor_name}'.")
38
39                # Find max length for this feature across all windows
40                max_length = max(len(f) if isinstance(f, (list, np.ndarray)) else 1 for f in feature_array)
41                feature_lengths.append(max_length)  # Store max feature length for later
42
43                # Pad/truncate each feature to be the same length
44                feature_array = np.array([
45                    np.pad(np.ravel(f), (0, max_length - len(f)), 'constant', constant_values=0)
46                    if isinstance(f, (list, np.ndarray)) else np.array([f] + [0] * (max_length - 1))
47                    for f in feature_array
48                ])
49
50            # Ensure consistency in number of windows
51            if len(feature_array) != num_windows:
52                print(f"Skipping feature '{key}' due to mismatched length: {len(feature_array)} instead of {num_windows}.")
53                continue
54
55            feature_arrays.append(feature_array)
56
57        if not feature_arrays:
58            continue
59
60        # Concatenate features per window
61        try:
62            feature_matrix = np.column_stack(feature_arrays)
63        except ValueError:
64            print(f"Error: Features in sensor '{sensor_name}' have inconsistent shapes. Skipping sensor.")
65            continue
66
67        X.append(feature_matrix)
68        y.append(np.array(sensor_annotations))
69
70    if not X or not y:
71        raise ValueError("No valid features or labels found.")
72
73    # **Fix: Standardize feature matrix sizes across sensors**
74    max_feature_dim = max(map(lambda x: x.shape[1], X))  # Get the max feature size
75    print(f"Standardizing all feature vectors to {max_feature_dim} dimensions.")
76
77    # Pad/truncate all feature matrices to match max_feature_dim
78    X = [np.pad(x, ((0, 0), (0, max_feature_dim - x.shape[1])), 'constant', constant_values=0) if x.shape[1] < max_feature_dim else x[:, :max_feature_dim] for x in X]
79
80    # Stack all feature matrices
81    X = np.vstack(X).astype(np.float32)
82    y = np.concatenate(y)
83
84    # Remap labels to zero-based contiguous integers
85    unique_labels = np.unique(y)
86    label_map = {label: idx for idx, label in enumerate(unique_labels)}
87    y_remapped = np.array([label_map[label] for label in y])
88
89    # Also update annotations in feature_dicts
90    # This part of the code was not provided in the original file,
91    # so I'm not adding it as per instruction 1.
92
93    return X, y_remapped

Convert the features dictionary into X (feature matrix) and y (labels), ensuring all feature vectors have a consistent length.

def evaluate_model(model, features):
12def evaluate_model(model, features):
13    """
14    Evaluates the given model on the provided features and prints accuracy and confusion matrix.
15    """
16    X, y = preprocess_features(features)
17    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
18
19    y_pred = model.predict(X_test)
20
21    acc = accuracy_score(y_test, y_pred)
22    # conf_matrix = confusion_matrix(y_test, y_pred)
23
24    print(f"Accuracy: {acc:.4f}")
25    # print(f"Confusion Matrix:\n{conf_matrix}")

Evaluates the given model on the provided features and prints accuracy and confusion matrix.