Source code for medusa.signal_orthogonalization

import numpy as np
import warnings
from medusa.utils import check_dimensions


[docs]def signal_orthogonalization_cpu(signal_1, signal_2): """ This method implements the ortogonalization of each channel of signal_1 regarding all the channels in signal_2 using CPU REFERENCES: O’Neill, G. C., Barratt, E. L., Hunt, B. A., Tewarie, P. K., & Brookes, M.J. (2015). Measuring electrophysiological connectivity by power envelope correlation: a technical review on MEG methods. Physics in Medicine & Biology, 60(21), R271. Parameters ---------- signal_1 : numpy 2D matrix First MEEG Signal. Allowed dimensions: [n_epochs, n_samples, n_channels], [n_samples, n_channels] and [n_samples]. signal_2 : numpy 2D matrix Second MEEG Signal. If empty signal_2 will be set to be equal to signal_1. Allowed dimensions: [n_epochs, n_samples, n_channels], [n_samples, n_channels] and [n_samples]. Returns ------- signal_ort : numpy 3D matrix MEEG ortogonalised signals. The first dimension is epochs. The second dimension is samples. The fourth dimension is the channel of first signal, and the second dimension is regarding which channel of second signal it has been orthogonalized the channel in third dimension. [n_epochs, n_samples, n_channels, n_channels]. """ # Error check if signal_2 is None: signal_2 = signal_1 if len(signal_1) != len(signal_2): raise ValueError("Signals must have the same length") if (type(signal_1) != np.ndarray) or (type(signal_2) != np.ndarray): raise ValueError("Parameter signal_1 and signal_2 must be of type " "numpy.ndarray") # Set correct dimensions signal_1 = np.ascontiguousarray(check_dimensions(signal_1)) signal_2 = np.ascontiguousarray(check_dimensions(signal_2)) n_chan_1 = signal_1.shape[2] n_chan_2 = signal_2.shape[2] n_samp_1 = signal_1.shape[1] n_epo_1 = signal_1.shape[0] signal_1_chan = np.transpose(np.repeat(signal_1, repeats=n_chan_1, axis=2), (0,2,1)) signal_2_chan = np.transpose(np.tile(signal_2, (1, 1, n_chan_2)),(0,2,1)) # Add extra dimensions signal_1_chan = signal_1_chan[:,:,np.newaxis,:] signal_2_chan = signal_2_chan[:,:,np.newaxis,:] # Calculate pseudo inverse inv_signal_1_chan = np.repeat(np.linalg.pinv(np.transpose(signal_1,(0,2,1))[:, :, np.newaxis, :]),repeats=n_chan_1,axis=1) beta = np.matmul(signal_2_chan, inv_signal_1_chan) signal_ort = np.transpose(np.reshape(signal_2_chan - beta * signal_1_chan, (n_epo_1,n_chan_2,n_chan_1,n_samp_1),order='F'),(0,3,1,2)) signal_ort[:,:,np.arange(n_chan_2),np.arange(n_chan_1)] = signal_1 return signal_ort
[docs]def signal_orthogonalization_gpu(signal_1, signal_2): """ This function ortogonalizes each channel of signal_1 regarding all the channels in signal_2. Based in O'Neill et al. 2015 Parameters ---------- signal_1 : numpy 3D matrix First MEEG Signal. Allowed dimensions: [n_epochs, n_samples, n_channels], [n_samples, n_channels] and [n_samples]. signal_2 : numpy 3D matrix Second MEEG Signal. If empty signal_2 will be set to be equal to signal_1. Allowed dimensions: [n_epochs, n_samples, n_channels], [n_samples, n_channels] and [n_samples]. Returns ------- signal_ort : numpy 3D matrix MEEG ortogonalized signals. The first dimension is epochs. The second dimension is samples. The fourth dimension is the channel of first signal, and the second dimension is regarding which channel of second signal it has been orthogonalized the channel in third dimension. [n_epochs, n_samples, n_channels, n_channels]. """ import tensorflow as tf # Error check if signal_2 is None: signal_2 = signal_1 if len(signal_1) != len(signal_2): raise ValueError("Signals must have the same length") if (type(signal_1) != np.ndarray) or (type(signal_2) != np.ndarray): raise ValueError("Parameter signal_1 and signal_2 must be of type " "numpy.ndarray") # Set correct dimensions signal_1 = np.ascontiguousarray(check_dimensions(signal_1)) signal_2 = np.ascontiguousarray(check_dimensions(signal_2)) n_chan_1 = signal_1.shape[2] n_chan_2 = signal_2.shape[2] n_samp_1 = signal_1.shape[1] n_epo_1 = signal_1.shape[0] signal_1_chan = tf.transpose(tf.repeat(signal_1, repeats=n_chan_1, axis=2), perm=[0, 2, 1]) signal_2_chan = tf.transpose(tf.tile(signal_2,(1,1,n_chan_2)),perm=[0,2,1]) # Add extra dimensions signal_1_chan = signal_1_chan[:,:,tf.newaxis,:] signal_2_chan = signal_2_chan[:,:,tf.newaxis,:] # Calculate pseudo inverse inv_signal_1_chan = tf.repeat( tf.linalg.pinv(tf.transpose(signal_1, perm=[0, 2, 1])[:, :, np.newaxis, :]), repeats=n_chan_1, axis=1) beta = np.matmul(signal_2_chan, inv_signal_1_chan) signal_ort = tf.transpose(tf.transpose(tf.reshape(tf.transpose(signal_2_chan - beta * signal_1_chan,perm=[0,2,3,1]),(n_epo_1, n_samp_1, n_chan_2, n_chan_1)), perm =(0,3,2,1)),perm=[0,3,1,2]).numpy() signal_ort[:,:,tf.range(n_chan_2),tf.range(n_chan_1)] = signal_1 return signal_ort
[docs]def signal_orthogonalization_gpu_old(signal_1, signal_2): """ DEPRECATED: This functions performs the orthogonalization of the signal but it is not vectorized nor allow epochs as inputs. Its use is not recommended. This function ortogonalizes each channel of signal_1 regarding all the channels in signal_2. Based in O'Neill et al. 2015 Parameters ---------- signal_1 : numpy 2D matrix MEEG Signal. SamplesXChannel. signal_2 : numpy 2D matrix MEEG Signal. SamplesXChannel. Returns ------- signal_ort : numpy 3D matrix MEEG ortogonalised signals. Samples x Channel x Channel. The first dimension are the samples, the third is the base channel used to ortogonalize the other channels and the second dimension are the ortogonalized channels regarding the third dimension """ import tensorflow as tf # Error check if signal_2 is None: signal_2 = signal_1 if len(signal_1) != len(signal_2): raise ValueError("Signals must have the same length") if (type(signal_1) != np.ndarray) or (type(signal_2) != np.ndarray): raise ValueError("Parameter signal_1 and signal_2 must be of type " "numpy.ndarray") # Variable initialization signal_ort = tf.zeros((len(signal_1), len(signal_2[0]), len(signal_1[0])), dtype=tf.dtypes.float64) i1 = tf.range(0, len(signal_1)) i2 = tf.ones_like(i1) i3 = tf.ones_like(i1) # Orthogonalization process signal_1_tf = tf.convert_to_tensor(signal_1) signal_2_tf = tf.convert_to_tensor(signal_2) for chan_1 in range(0, len(signal_1[0])): for chan_2 in range(0, len(signal_2[0])): signal_1_chan = tf.slice(signal_1_tf, [0, chan_1], [len(signal_1), 1]) signal_2_chan = tf.slice(signal_2_tf, [0, chan_2], [len(signal_2), 1]) if not np.array_equal(signal_1_chan.numpy(), signal_2_chan.numpy()): beta = tf.linalg.matmul( tf.transpose(signal_2_chan), tf.transpose(tf.linalg.pinv(signal_1_chan)) ) tmp = tf.math.subtract( signal_2_chan, tf.math.multiply(beta,signal_1_chan) ) idx = tf.stack([i1, chan_2 * i2, chan_1 * i3], axis=-1) signal_ort = tf.tensor_scatter_nd_update( signal_ort, idx, tf.squeeze(tmp) ) else: idx = tf.stack([i1, chan_2 * i2, chan_1 * i3], axis=-1) signal_ort = tf.tensor_scatter_nd_update( signal_ort, idx, tf.squeeze(signal_1_chan) ) return signal_ort.numpy()