Source code for medusa.local_activation.nonlinear_parameters

# Built-in imports
import math
import ctypes
import os

# External imports
import numpy as np
from scipy.spatial.distance import pdist
from scipy.signal import decimate

# Medusa imports
from medusa.components import ThreadWithReturnValue
from medusa.utils import check_dimensions


[docs]def central_tendency_measure(signal, r): """ This method implements the central tendency measure (CTM). This parameter is useful to quantify the variability of a signal. It is based on calculating the second-order differences diagram of the time series and then counting the points within a radius "r". CTM assigns higher values to less variable signals References ---------- Cohen, M. E., Hudson, D. L., & Deedwania, P. C. (1996). Applying continuous chaotic modeling to cardiac signal analysis. IEEE Engineering in Medicine and Biology Magazine, 15(5), 97-102. Parameters ---------- signal : numpy.ndarray Signal with shape [n_epochs, n_samples, n_channels]. r : double Radius used to compute the CTM. Returns ------- ctm : numpy.ndarray CTM values for each channel in "signal". [n_epochs, n_channels]. """ # Error check if not np.issubdtype(signal.dtype, np.number): raise ValueError('data matrix contains non-numeric values') # Check dimensions signal = check_dimensions(signal) # Signal dimensions n_epo = signal.shape[0] n_samp = signal.shape[1] n_cha = signal.shape[2] # Values within a range (mean +- 3 std) upper_bound = np.mean(signal, axis=1) + 3 * np.std(signal, axis=1) lower_bound = np.mean(signal, axis=1) - 3 * np.std(signal, axis=1) idx_within_range = np.logical_and((signal < upper_bound[:, None, :]), (signal > lower_bound[:, None, :])) idx_out_upper = (signal > upper_bound[:, None, :]) idx_out_lower = (signal < lower_bound[:, None, :]) # Maximum value in the above defined range max_value = np.empty((n_epo, n_cha)) for ep in range(n_epo): for ch in range(n_cha): max_value[ep, ch] = np.max( abs(signal[ep, idx_within_range[ep, :, ch], ch]), axis=0) # Normalize the values within the range by its maximum.Values above that # range will be 1, and below the range will be - 1 data_norm = np.zeros_like(signal) data_norm[idx_within_range] = np.divide( signal[idx_within_range], np.tile(max_value, (1, n_samp, 1)).flatten()[ idx_within_range.flatten()]) data_norm[idx_out_upper] = 1 data_norm[idx_out_lower] = -1 # Difference time series y = data_norm[:, 3:n_samp, :] - data_norm[:, 2:n_samp - 1, :] x = data_norm[:, 2:n_samp - 1, :] - data_norm[:, 1:n_samp - 2, :] # CTM - Values below the radius 'r' ctm = np.sum(np.sqrt(np.square(x) + np.square(y)) < r, axis=1) / ( n_samp - 2) return ctm
[docs]def sample_entropy(signal, m, r, dist_type='chebyshev'): """ This method implements the sample entropy (SampEn). SampEn is an irregularity measure that assigns higher values to more irregular time sequences. It has two tuning parameters: the sequence length (m) and the tolerance (r) Notes: IF A = 0 or B = 0, SamEn would return an infinite value. However, the lowest non-zero conditional probability that SampEn should report is A/B = 2/[(N-m-1)*(N-m)]. SampEn has the following limits: - Lower bound: 0 - Upper bound : log(N-m) + log(N-m-1) - log(2) References ---------- Richman, J. S., & Moorman, J. R. (2000). Physiological time-series analysis using approximate entropy and sample entropy. American Journal of Physiology-Heart and Circulatory Physiology. Parameters ---------- signal : numpy.ndarray Signal with shape [n_epochs, n_samples, n_channels]. m : int Sequence length r : float Tolerance dist_type : string Distance metric Returns ------- sampen : numpy.ndarray SampEn value. [n_epochs, n_channels] """ # Check dimensions signal = check_dimensions(signal) # Check Errors if m > signal.shape[1]: raise ValueError('Embedding dimension must be smaller than the signal ' 'length (m<N).') if not isinstance(dist_type, str): raise ValueError('Distance type must be a string.') if dist_type not in ['braycurtis', 'canberra', 'chebyshev', 'cityblock', 'correlation', 'cosine', 'dice', 'euclidean', 'hamming', 'jaccard', 'jensenshannon', 'kulsinski', 'mahalanobis', 'matching', 'minkowski', 'rogerstanimoto', 'russellrao', 'seuclidean', 'sokalmichener', 'sokalsneath', 'sqeuclidean', 'yule']: raise ValueError( 'Distance type unknown. Please, check allowed distances' 'in pdist function from scipy.spatial.distance module.') # Useful parameters n_epo = signal.shape[0] N = signal.shape[1] n_channels = signal.shape[2] sigma = np.std(signal, axis=1) templates_m = [] templates_m_plus_one = [] B, A, value = np.empty((n_epo, n_channels)), np.empty((n_epo, n_channels)),\ np.empty((n_epo, n_channels)) # Calculate B values for i in range(N - m + 1): templates_m.append(signal[:, i:i + m, :]) templates_m = np.array(templates_m) for e_idx in range(n_epo): w_threads = [] for ch_idx in range(n_channels): t = ThreadWithReturnValue( target=pdist, args=(templates_m[:, e_idx, :, ch_idx], dist_type,)) w_threads.append(t) t.start() for th_idx, thread in enumerate(w_threads): B[e_idx, th_idx] = np.sum(thread.join() <= sigma[0, th_idx] * r) # Check if there is any B = 0 zeros_idx = np.where(B == 0) value[zeros_idx] = math.inf # Calculate A values m += 1 for i in range(N - m + 1): templates_m_plus_one.append(signal[:, i:i + m, :]) templates_m_plus_one = np.array(templates_m_plus_one) for e_idx in range(n_epo): w_threads = [] for ch_idx in range(n_channels): t = ThreadWithReturnValue( target=pdist, args=(templates_m_plus_one[:, e_idx, :, ch_idx], dist_type,)) w_threads.append(t) t.start() for th_idx, thread in enumerate(w_threads): A[e_idx, th_idx] = np.sum(thread.join() <= sigma[0, th_idx] * r) # Check if there is any A = 0 zeros_idx = np.where(A == 0) value[zeros_idx] = math.inf non_inf = np.where(value != math.inf) value[non_inf] = -np.log((A[non_inf] / B[non_inf])*((N - m + 1) / (N - m - 1))) # If there is infinity values inf_indx = np.where(value == math.inf) value[inf_indx] = -np.log(2 / ((N - m - 1) * (N - m))) return value
[docs]def multiscale_entropy(signal, max_scale, m, r): """ This method implements the Multiscale Entropy (MSE). MSE is a measurement of complexity which measures the irregularity of a signal over multiple time-scales. This is accomplished through estimation of the Sample Entropy (SampEn) on coarse-grained versions of the original signal. As a result of the these calculations, MSE curves are obtained and can be used to compare the complexity of time-series. The MSE curve whose entropy values are higher for the most of time-scales is considered more complex. References ---------- Costa, M., Goldberger, A. L., & Peng, C. K. (2005). Multiscale entropy analysis of biological signals, Physical review E, 71(2), 021906. Parameters ---------- signal : numpy.ndarray MEEG Signal. [n_epochs, n_samples, n_channels]. max_scale : int Maximum scale value m : int Sequence length r : float Tolerance Returns ------- mse_result : numpy.ndarray SampEn values for each scale for each channel in "signal" . [n_epochs, max_scale, n_channels]. """ # Check dimensions signal = check_dimensions(signal) # Signal dimensions n_epo = signal.shape[0] n_channels = signal.shape[2] mse_result = np.empty((n_epo, max_scale, n_channels)) w_threads = list() scales = list() for i in range(1, max_scale + 1): if i == 1: t = ThreadWithReturnValue( target=sample_entropy, args=(signal, m, r, 'chebyshev')) else: t = ThreadWithReturnValue( target=sample_entropy, args=(__coarse_grain(signal, i), m, r, 'chebyshev')) w_threads.append(t) scales.append(i) t.start() for t_idx, thread in enumerate(w_threads): mse_result[:, t_idx, :] = thread.join() return mse_result
def __coarse_grain(signal, scale, decimate_mode=True): if decimate_mode: return decimate(signal, scale, axis=1) else: # Signal dimensions n_epo = signal.shape[0] N = signal.shape[1] n_cha = signal.shape[2] # Number of coarse grains in which the signal is split tau = int(round(N / scale)) # Returned signal y = np.empty((n_epo,tau,n_cha)) for i in range(tau): y[:, i, :] = np.mean(signal[:, i * scale:(i * scale + scale), :]) return y def __lempelziv_complexity(signal): """ Calculate the signal binarisation and the Lempel-Ziv's complexity. This version allows the algorithm to be used for signals with more than one channel at the same time. Warnings: This function is deprecated and will be removed in future updates of MEDUSA Kernel. Parameters --------- signal: numpy 2D array Signal with shape [n_samples, n_channels] Returns ------- lz_channel_values: numpy 1D matrix Lempel-Ziv values for each channel with shape [n_channels]. """ if signal.size == len(signal): signal = signal[:, np.newaxis] signal = __binarisation(signal, [signal.shape[0]], signal.shape[0]) if signal.shape[1] == 1: return __lz_algorithm(signal) else: lz_channel_values = np.empty((signal.shape[1])) working_threads = list() for ch in range(signal.shape[1]): t = ThreadWithReturnValue(target=__lz_algorithm, args=(signal[:, ch],)) working_threads.append(t) t.start() for index, thread in enumerate(working_threads): lz_channel_values[index] = thread.join() return lz_channel_values
[docs]def lempelziv_complexity(signal): """Calculate the signal binarisation and the Lempel-Ziv's complexity. This function takes advantage of its implementation in C to achieve a high performance. Parameters --------- signal: numpy.ndarray MEEG Signal [n_epochs, n_samples, n_channels] Returns ------- lz_result: numpy.ndarray Lempel-Ziv values for each epoch and each channel. [n_epochs, n_channels]. """ # Check dimensions signal = check_dimensions(signal) # Signal dimensions n_epo = signal.shape[0] n_sample = signal.shape[1] n_cha = signal.shape[2] # Initialize result matrix lz_result = np.empty((n_epo, n_cha)) # Adapt the signal to the format required by the DLL signal = np.reshape(np.transpose(signal, (0, 2, 1)), (n_epo, -1)) # Get function from dll dll_file = os.path.join(os.path.dirname(__file__), 'computeLZC.dll') lib = ctypes.cdll.LoadLibrary(dll_file) lzc_func = lib.computeLempelZivCmplx # Access function for e_idx, epochs in enumerate(signal): # Create empty output vector lz_channel_values = np.zeros(int(n_cha), dtype=np.double) # Define inputs and outputs lzc_func.restype = None array_1d_double = np.ctypeslib.ndpointer(dtype=np.double, ndim=1, flags='CONTIGUOUS') lzc_func.argtypes = [array_1d_double, array_1d_double, ctypes.c_int32, ctypes.c_int32] # Call the function in the dll lzc_func(signal[e_idx, :], lz_channel_values, int(n_sample * n_cha), int(n_sample)) lz_result[e_idx, :] = lz_channel_values return lz_result
[docs]def multiscale_lempelziv_complexity(signal, W): """Calculate the multiscale signal binarisation and the Multiscale Lempel-Ziv's complexity. References ---------- Ibáñez-Molina, A. J., Iglesias-Parro, S., Soriano, M. F., & Aznarte, J. I, Multiscale Lempel-Ziv complexity for EEG measures. Clinical Neurophysiology, (2015), 126(3), 541–548. Parameters --------- signal: numpy.ndarray MEEG Signal [n_epochs, n_samples, n_channels] W: list or numpy 1D array Set of window length to consider at multiscale binarisation stage. Values must be odd. Returns ------- result: numpy.ndarray Matrix of results with shape [n_epochs, n_window_length, n_channels] """ # Check dimensions signal = check_dimensions(signal) # Signal dimensions n_epo = signal.shape[0] n_cha = signal.shape[2] # Useful parameter w_max = W[-1] + (W[1] - W[0]) # Define a matrix to store results result = np.empty((n_epo, len(W), n_cha)) # First get binarised signal for ep_idx, epoch in enumerate(signal): for w_idx, w in enumerate(W): binarised_signal = __binarisation(epoch, w, w_max, multiscale=True) # Parallelize the calculations if n_channel > 1 if binarised_signal.shape[1] > 1: threads = [] for ch in range(binarised_signal.shape[1]): t = ThreadWithReturnValue(target=__lz_algorithm, args=(binarised_signal[:, ch],)) threads.append(t) t.start() for ch_idx, t in enumerate(threads): result[ep_idx, w_idx, ch_idx] = t.join() else: result[ep_idx, w_idx, :] = __lz_algorithm(binarised_signal) return result
def __lz_algorithm(signal): """ Lempel-Ziv's complexity algorithm implemented in Python. References ---------- F. Kaspar, H. G. Schuster, "Easily-calculable measure for the complexity of spatiotemporal patterns", Physical Review A, Volume 36, Number 2 (1987). Parameters --------- signal: numpy 1D array Signal with shape of [n_samples] Returns ------- value: float Result of algorithm calculations. """ signal = signal.flatten().tolist() i, k, l = 0, 1, 1 c, k_max = 1, 1 n = len(signal) while True: if signal[i + k - 1] == signal[l + k - 1]: k = k + 1 if l + k > n: c = c + 1 break else: if k > k_max: k_max = k i = i + 1 if i == l: c = c + 1 l = l + k_max if l + 1 > n: break else: i = 0 k = 1 k_max = 1 else: k = 1 value = c * (np.log2(n) / n) return value def __binarisation(signal, w_length, w_max, multiscale=False): """ This function returns a binarised version of original signal. It can be used in both multiscale and simple binarisations. If multiscale mode is chosen, signals are shortened so that they all have the same length, taking the maximum window as a reference. Binarisation is performed by means of a median-based comparison. Parameters ---------- signal: numpy 2D array Signal with shape [n_samples x n_channels] w_length: int Window length to perform multiscale binarisation w_max: int Value of the maximum window length multiscale: bool If is True, performs the multiscale binarisation Returns ------- signal_binarised: numpy.ndarray Signal binarised with shape of [n_samples_shortened x n_channels] The n_samples_shortened parameter is calculated from w_max to ensure that all signals have the same length, regardless of the window length used. """ if multiscale: if w_length is None: raise ValueError('Width of window must be an integer value') if w_length % 2 == 0: raise ValueError('Width of window must be an odd value.') if w_max is None: raise ValueError('Maximum window width must be an integer value') # Get smoothed version from original signal smoothed = __multiscale_median_threshold(signal, w_length) # Useful parameters half_wind = int((w_length - 1) / 2) max_length = signal.shape[0] + 1 - w_max length_diff = smoothed.shape[0] - max_length # Shorten original and smoothed version smoothed_shortened = \ smoothed[int(length_diff / 2):-int(length_diff / 2), :] signal_shortened = \ signal[half_wind: signal.shape[0] - half_wind, :] signal_shortened = \ signal_shortened[int(length_diff / 2):-int(length_diff / 2), :] # Define template of binarised signal signal_binarised = \ np.zeros((signal_shortened.shape[0], signal_shortened.shape[1])) # Binarise the signal idx_one = signal_shortened >= smoothed_shortened signal_binarised[idx_one] = 1 else: signal_binarised = np.zeros((len(signal), signal.shape[1])) median = np.median(signal, axis=0) idx_one = signal >= median signal_binarised[np.squeeze(idx_one)] = 1 return signal_binarised def __multiscale_median_threshold(signal, w_length): """ Signal smoothing function. For each sample, we define a window in which the sample is in centre position. The median value of the window is calculated and assigned to this sample to obtain a smoothed version of the original signal. References ---------- Ibáñez-Molina, A. J., Iglesias-Parro, S., Soriano, M. F., & Aznarte, J. I, Multiscale Lempel-Ziv complexity for EEG measures. Clinical Neurophysiology, (2015), 126(3), 541–548. Parameters --------- signal: numpy 2D array Signal with shape of [n_samples, n_channels] w_length: int Length of window to calculate median values in smoothing process Returns ------- smoothed_signal: numpy 2D array Smoothed version with shape of [n_samples + 1 - w_length, n_channels] As a result of windowing, the final samples of the signal are lost. """ # Template of smoothed signal smoothed_signal = np.zeros(( signal.shape[0] + 1 - w_length, signal.shape[1])) half_wind = int((w_length - 1) / 2) # Index of sample to be smoothed from median window value index = 0 # We define a window with samp in central position and # get median value to smooth original signal for samp in range(half_wind, signal.shape[0] - half_wind): smoothed_signal[index, :] = np.median( signal[samp - half_wind: samp + half_wind], axis=0) index += 1 return smoothed_signal