Source code for medusa.epoching

import numpy as np
from scipy import signal


[docs]def get_epochs(signal, epochs_length, stride=None, norm=None): """This function returns the signal divided in epochs following a sliding window approach Parameters ---------- signal : list or numpy.ndarray Array to extract epochs with shape [samples, channels] epochs_length : int Epochs length in samples stride : int, optional Separation between consecutive epochs in samples. If None, stride is set to epochs_length. norm : str, optional Set to 'z' for Z-score normalization or 'dc' for DC normalization. Statistical parameters are computed using the whole epoch. Returns ------- numpy.ndarray Structured data with dimensions [n_epochs x length x n_channels] """ # Check errors if epochs_length <= 0: ValueError('Parameter epochs_length must be greater than 0') if stride is None: stride = epochs_length if stride <= 0: raise ValueError('Parameter stride must be None or greater than 0') # Assure ints n_cha = signal.shape[1] epochs_length = int(epochs_length) stride = int(stride) # Get epochs epochs = np.lib.stride_tricks.sliding_window_view( signal, (epochs_length, n_cha))[::stride].squeeze() # Normalize if norm is not None: epochs = normalize_epochs(epochs, norm) return epochs
[docs]def get_epochs_of_events(timestamps, signal, onsets, fs, w_epoch_t, w_baseline_t=None, norm=None): """This function calculates the epochs of signal given the onsets of events. Parameters ---------- timestamps : list or numpy.ndarray Timestamps of each biosignal sample signal : list or numpy.ndarray Biosignal samples onsets : list or numpy.ndarray Events timestamps fs : float Sample rate w_epoch_t : list or numpy.ndarray Temporal window in ms of the epoch. For example, w_epoch_t = [0, 1000] takes the epoch form 0 ms to 1000 ms after each onset (0 ms represents the onset). w_baseline_t : list or numpy.ndarray, optional Temporal window in ms of the baseline. For example, w_baseline_t = [-500, 100] takes the baseline from -500 ms before each onset to 100 ms after each onset (0 ms represents the onset). This chunk of signal is used to normalize the epoch, if applicable. norm : str, optional Set to 'z' for Z-score normalization or 'dc' for DC normalization. Statistical parameters are computed using the whole epoch. Returns ------- numpy.ndarray Structured data with dimensions [events x samples x channels] """ # Error prevention epoch_feasibility = check_epochs_feasibility(timestamps, onsets, fs, w_epoch_t) if epoch_feasibility == 'first': raise ValueError("Not enough EEG samples to get the first epoch") elif epoch_feasibility == 'last': raise ValueError("Not enough EEG samples to get the last epoch") if w_baseline_t is not None: baseline_feasibility = check_epochs_feasibility(timestamps, onsets, fs, w_baseline_t) if baseline_feasibility == 'first': raise ValueError("Not enough EEG samples to get the first baseline") elif baseline_feasibility == 'last': raise ValueError("Not enough EEG samples to get the last baseline") if norm is None: raise ValueError('If parameter w_baseline_t is not None, please ' 'specify the normalization type with parameter ' 'norm') if norm is not None and w_baseline_t is None: raise ValueError('If parameter norm is not None, please specify the ' 'baseline window with parameter w_baseline_t') # Useful parameters w_epoch_t = np.array(w_epoch_t) w_epoch_s = np.array(w_epoch_t * fs / 1000, dtype=int) l_epoch = w_epoch_s[1] - w_epoch_s[0] # For each onset n_cha = signal.shape[1] epochs_idx = get_nearest_idx(timestamps, onsets) epochs = np.lib.stride_tricks.sliding_window_view( signal, (l_epoch, n_cha))[epochs_idx + w_epoch_s[0]].squeeze(axis=1) # Baseline normalization if w_baseline_t is not None and norm is not None: # Baseline start-end (samples) w_baseline_t = np.array(w_baseline_t) w_baseline_s = np.array(w_baseline_t * fs / 1000, dtype=int) l_baseline = w_baseline_s[1] - w_baseline_s[0] # Extract baselines baseline_idx = get_nearest_idx(timestamps, onsets) baselines = np.lib.stride_tricks.sliding_window_view( signal, (l_baseline, n_cha))[ baseline_idx + w_baseline_s[0]].squeeze(axis=1) epochs = normalize_epochs(epochs, norm_epochs=baselines, norm=norm) return epochs
[docs]def normalize_epochs(epochs, norm_epochs=None, norm='z'): """ Normalizes epochs Parameters ---------- epochs: list or numpy.ndarray Epochs of signal with dimensions [n_epochs x n_samples x n_channels] norm_epochs: list or numpy.ndarray, optional Epochs of signal with dimensions [n_epochs x n_samples x n_channels] that are used to compute the statistical parameters for normalization. If None, norm_epochs=epochs. norm: str Set to 'z' for Z-score normalization or 'dc' for DC normalization. Statistical parameters are computed using the whole epoch. """ # Check errors if norm_epochs is None: norm_epochs = epochs if norm not in ['z', 'dc']: raise ValueError("Parameter norm must be 'z' or 'dc'") # Normalization if norm == 'z': # z-score mean = np.mean(norm_epochs, axis=1, keepdims=True) std = np.std(norm_epochs, axis=1, keepdims=True) epochs = (epochs - mean) / std elif norm == 'dc': # DC subtraction mean = np.mean(norm_epochs, axis=1, keepdims=True) epochs = epochs - mean else: raise ValueError("Parameter norm must be 'z' or 'dc'") return epochs
[docs]def resample_epochs(epochs, t_window, target_fs): """Resample epochs to the target_fs. IMPORTANT: No antialising filter is applied Parameters ---------- epochs : list or numpy.ndarray Epochs of signal with dimensions [n_epochs x samples x channels] t_window : list or numpy.ndarray Temporal window in ms of the epoch. For example, t_window = [0, 1000] takes the epoch form 0 ms to 1000 ms after each onset (0 ms represents the onset). target_fs : float Target sample rate Returns ------- numpy.ndarray Final epochs with dimensions [events x target_samples x channels] """ # Compute the desired window and baseline in samples l_t_window = t_window[1] - t_window[0] # Window length (ms) target_samples = np.floor((target_fs * l_t_window) / 1000).astype(int) # Resample epochs epochs = signal.resample(epochs, target_samples, axis=1) return epochs
[docs]def check_epochs_feasibility(timestamps, onsets, fs, t_window): """Checks if the extraction of the desired window is feasible with the available samples. Sometimes, the first/last stimulus sample is so close to the beginning/end of the signal data chunk that there are not enough samples to compute this window. In this case, the function will return "first" or "last" to identify which onset can not be extracted with the current t_window. It will return 'ok' if there is no conflict. Parameters ---------- timestamps : list or numpy.ndarray Timestamps of each biosginal sample onsets : list or numpy.ndarray Events timestamps fs : float Sample rate t_window : list or numpy.ndarray Temporal window in ms of the epoch. For example, t_window = [0, 1000] takes the epoch form 0 ms to 1000 ms after each onset (0 ms represents the onset). Returns ------- feasibility : string "ok" If window extraction is feasible. "first" If window extraction is not feasible for the first onset. "last" If window extraction is not feasible for the last onset. """ first_sti_sam_onset = np.argmin(np.abs(timestamps - onsets[0])) last_sti_sam_onset = np.argmin(np.abs(timestamps - onsets[-1])) s_window = np.array(np.array(t_window) * fs / 1000, dtype=int) last_sti_sam_end = last_sti_sam_onset + s_window[1] first_sti_sam_beginning = first_sti_sam_onset + s_window[0] if first_sti_sam_beginning < 0: feasibility = 'first' elif last_sti_sam_end >= len(timestamps): feasibility = 'last' else: feasibility = 'ok' return feasibility
[docs]def reject_noisy_epochs(epochs, signal_mean, signal_std, k=4, n_samp=2, n_cha=1): """Simple thresholding method to reject noisy epochs. It discards epochs with n_samp samples greater than k*std in n_cha channels Parameters ---------- epochs : list or numpy.ndarray Epochs of signal with dimensions [n_epochs x samples x channels] signal_mean : float Mean of the signal signal_std : float Standard deviation of the signal k : float Standard deviation multiplier to calculate threshold n_samp : int Minimum number of samples that have to be over the threshold in each epoch to be discarded n_cha : int Minimum number of channels that have to have n_samples over the threshold in each epoch to be discarded Returns ------- float Percentage of reject epochs in numpy.ndarray Clean epochs numpy.ndarray Indexes for rejected epochs. True for discarded epoch """ # Check errors if len(epochs.shape) != 3: raise Exception('Malformed epochs array. It must be of dimmensions ' '[epochs x samples x channels]') if signal_std.shape[0] != epochs.shape[2]: raise Exception('Array signal_std does not match with epochs size. ' 'It must have the same number of channels') if signal_mean.shape[0] != epochs.shape[2]: raise Exception('Array signal_mean does not match with epochs size. ' 'It must have the same number of channels') epochs_abs = np.abs(epochs) cmp = epochs_abs > np.abs(signal_mean) + k * signal_std idx = np.sum((np.sum(cmp, axis=1) >= n_samp), axis=1) >= n_cha pct_rejected = (np.sum(idx) / epochs.shape[0]) * 100 return pct_rejected, epochs[~idx, :, :], idx
[docs]def time_to_sample_index_events(times, onsets): """Converts an array of time onsets to an array that indicates the sample index of the event Parameters ---------- times : list or numpy.ndarray Array of shape [n_samples]. Timestamps of the biosignal onsets : list or numpy.ndarray Array of shape [n_events]. Onsets in time of the events Returns ------- numpy.ndarray Array of shape [n_samples]. The array is 1 only in the nearest timestamp to each onset, otherwise 0 """ # Check errors times = np.squeeze(times) onsets = np.squeeze(onsets) if len(times.shape) != 1: raise ValueError('Parameter times must be a 1D array') if len(onsets.shape) != 1: raise ValueError('Parameter onsets must be a 1D array') rep_times = np.matlib.repmat(times, onsets.shape[0], 1).T rep_onsets = np.matlib.repmat(onsets, times.shape[0], 1) return np.argmin(np.abs(rep_times - rep_onsets), axis=0)
[docs]def get_nearest_idx(timestamps, onsets): """This function returns the indexes of the timestamps that are closest to the onsets. """ array = np.array(timestamps) # get insert positions idxs = np.searchsorted(array, onsets, side="left") # find indexes where previous index is closer prev_idx_is_less = ((idxs == len(array)) | ( np.fabs(onsets - array[np.maximum(idxs - 1, 0)]) < np.fabs( onsets - array[np.minimum(idxs, len(array) - 1)]))) idxs[prev_idx_is_less] -= 1 return idxs