Source code for medusa.connectivity.amplitude_connectivity

# Built-in imports
import warnings, os

# External imports
import numpy as np
from scipy import stats as sp_stats

# Medusa imports
import medusa.components
from medusa import signal_orthogonalization as orthogonalizate
from medusa.transforms import hilbert
from medusa.utils import check_dimensions
from medusa import tensorflow_integration


if os.environ.get("MEDUSA_EXTRAS_GPU_TF") == "1":
    import tensorflow as tf
    import tensorflow_probability as tfp


def __aec_gpu(data):
    """ This method implements the amplitude envelope correlation using GPU.

    NOTE: See the orthogonalized version.

    Parameters
    ----------
    data : numpy.ndarray
        MEEG Signal. [n_epochs, n_samples, n_channels].

    Returns
    -------
    aec : numpy.ndarray
        aec-based connectivity matrix. [n_epochs, n_channels, n_channels].
    """
    # Error check
    if type(data) != np.ndarray:
        raise ValueError("Parameter data must be of type numpy.ndarray")
    # Set to correct dimensions
    data = check_dimensions(data)

    # AEC computation
    hilb = hilbert(data)
    envelope = tf.math.abs(hilb) 
    env = tf.math.log(tf.math.square(envelope))

    aec = tfp.stats.correlation(env,sample_axis=1)

    return aec.numpy()


def __aec_cpu(data):
    """ This method implements the amplitude envelope correlation using CPU.

    NOTE: See the orthogonalized version.

    Parameters
    ----------
    data : numpy.ndarray
        MEEG Signal. [n_epochs, n_samples, n_channels].

    Returns
    -------
    aec : numpy.ndarray
        aec-based connectivity matrix. [n_epochs, n_channels, n_channels].

    """
    # Error check
    if type(data) != np.ndarray:
        raise ValueError("Parameter data must be of type numpy.ndarray")
    # Set to correct dimensions
    data = check_dimensions(data)

    #  Variable initialization
    n_epo = data.shape[0]
    n_cha = data.shape[2]
    aec = np.empty((n_epo, n_cha, n_cha))
    aec[:] = np.nan

    # AEC computation
    hilb = hilbert(data)
    envelope = abs(hilb)
    env = np.log(envelope ** 2)

    # Concurrent calculation for more than one epoch
    w_threads = []
    for epoch in env:
        t = medusa.components.ThreadWithReturnValue(target= np.corrcoef,
                                                    args=(epoch,None,False,))
        w_threads.append(t)
        t.start()

    for epoch_idx, thread in enumerate(w_threads):
        aec[epoch_idx, :, :] = thread.join()

    return aec


def __aec_ort_gpu(data):
    """ This method implements the orthogonalized version of the amplitude
    envelope correlation using GPU. This orthogonalized version minimizes the
    spurious connectivity caused by common sources (zero-lag correlations).

    Parameters
    ----------
    data : numpy.ndarray
        MEEG Signal. [n_epochs, n_samples, n_channels].

    Returns
    -------
    aec : numpy.ndarray
        aec-based connectivity matrix. [n_epochs, n_channels, n_channels].

    """
    # Error check
    if type(data) != np.ndarray:
        raise ValueError("Parameter data must be of type numpy.ndarray")
    # Set to correct dimensions
    data = check_dimensions(data)

    # Variable initialization
    n_epo = data.shape[0]
    n_samp = data.shape[1]
    n_cha = data.shape[2]
    aec_ort = np.empty((n_epo,n_cha, n_cha))
    aec_ort[:] = np.nan
    
    # AEC Ort Calculation (CPU orthogonalization is much faster than GPU one)
    signal_ort = orthogonalizate.signal_orthogonalization_cpu(data, data)
    signal_ort_2 = tf.transpose(tf.reshape(tf.transpose(signal_ort,perm=[0,3,2,1]),
                                           (n_epo,n_cha*n_cha,
                                            n_samp)),perm=[0,2,1])

    hilb_1 = hilbert(signal_ort_2)
    envelope_1 = tf.math.abs(hilb_1) 
    env = tf.math.log(tf.math.square(envelope_1))

    # Concurrent calculation for more than one epoch
    w_threads = []
    for epoch in env:
        t = medusa.components.ThreadWithReturnValue(target=__aec_ort_comp_aux,
                                                    args=(epoch, n_cha, 'gpu',))
        w_threads.append(t)
        t.start()

    for epoch_idx, thread in enumerate(w_threads):
        aec_ort[epoch_idx, :, :] = thread.join()
        
    return aec_ort


def __aec_ort_cpu(data):
    """ This method implements the orthogonalized version of the amplitude
    envelope correlation using CPU. This orthogonalized version minimizes the
    spurious connectivity caused by common sources (zero-lag correlations).

    Parameters
    ----------
    data : numpy.ndarray
        MEEG Signal. [n_epochs, n_samples, n_channels].

    Returns
    -------
    aec : numpy.ndarray
        aec-based connectivity matrix. [n_epochs, n_channels, n_channels].

    """
    # Error check
    if type(data) != np.ndarray:
        raise ValueError("Parameter data must be of type numpy.ndarray")

    # Set to correct dimensions
    data = check_dimensions(data)

    # Variable initialization
    n_epo = data.shape[0]
    n_samp = data.shape[1]
    n_cha = data.shape[2]
    aec_ort = np.empty((n_epo, n_cha, n_cha))
    aec_ort[:] = np.nan

    # AEC Ort Calculation
    signal_ort = orthogonalizate.signal_orthogonalization_cpu(data, data)
    signal_ort_2 = np.transpose(np.reshape(np.transpose(signal_ort, (0, 3, 2, 1)),
                            (n_epo, n_cha * n_cha, n_samp)),(0,2,1))

    hilb_1 = hilbert(signal_ort_2)
    envelope_1 = np.abs(hilb_1)
    env = np.log(np.square(envelope_1))

    # Concurrent calculation for more than one epoch
    w_threads = []
    for epoch in env:
        t = medusa.components.ThreadWithReturnValue(target=__aec_ort_comp_aux,
                                                    args=(epoch,n_cha,'cpu',))
        w_threads.append(t)
        t.start()

    for epoch_idx, thread in enumerate(w_threads):
        aec_ort[epoch_idx,:,:] = thread.join()

    return aec_ort


def __aec_ort_comp_aux(env, n_cha, ctype='cpu'):
    """
    Auxiliary method that implements a function to compute the orthogonalized AEC.
    Parameters
    ----------
    env: numpy.ndarray
        Array with signal envelope. [n_epochs, n_samples, n_channels x n_channels].
    type: str
        Calculation type: 'cpu' or 'gpu'.
    Returns
    -------
    aec_ort: numpy.ndarray
        AEC orthogonalized connectivity matrix. [n_channels, n_channels].
    """
    # Note: Orthogonalize A regarding B is not the same as orthogonalize B regarding
    # A, so we average lower and upper triangular matrices to construct the
    # symmetric matrix required for Orthogonalized AEC

    if ctype == 'cpu':
        aec_tmp = np.corrcoef(env, rowvar=False)
        aec_tmp2 = np.transpose(
            np.reshape(
                np.transpose(aec_tmp),
                (int(aec_tmp.shape[0] * aec_tmp.shape[0] / n_cha), -1)
            )
        )
        idx = np.linspace(0, aec_tmp2.shape[1] - 1, n_cha).astype(np.int32)
        aec = aec_tmp2[:, idx]
        aec_upper = np.triu(np.squeeze(aec))
        aec_lower = np.transpose(np.tril(np.squeeze(aec)))
        aec_ort = (aec_upper + aec_lower) / 2
        aec_ort = abs(np.triu(aec_ort, 1) + np.transpose(aec_ort))
        return aec_ort

    elif ctype == 'gpu':
        aec_tmp = tfp.stats.correlation(env)
        aec_tmp2 = tf.transpose(
            tf.reshape(
                tf.transpose(aec_tmp),
                (tf.cast(aec_tmp.shape[0] * aec_tmp.shape[0] / n_cha,
                         tf.int32), -1)
            )
        )
        idx = tf.cast(tf.linspace(0, len(aec_tmp2[0]) - 1, n_cha), tf.int32)
        aec = tf.gather(aec_tmp2, idx, axis=1).numpy()
        aec_upper = tf.linalg.band_part(aec, 0, -1)
        aec_lower = tf.transpose(tf.linalg.band_part(aec, -1, 0))
        aec_ort = tf.math.divide(tf.math.add(aec_upper, aec_lower), 2)
        aux = tf.linalg.band_part(aec_ort, 0, -1) - tf.linalg.band_part(
            aec_ort, 0, 0)
        aec_ort = tf.math.abs(tf.math.add(aux, tf.transpose(aec_ort)))
        return aec_ort


[docs]def aec(data, ort=True): """ This method implements the amplitude envelope correlation (using GPU if available). Based on the "ort" param, the signals could be orthogonalized before the computation of the amplitude envelope correlation. REFERENCES: Liu, Z., Fukunaga, M., de Zwart, J. A., & Duyn, J. H. (2010). Large-scale spontaneous fluctuations and correlations in brain electrical activity observed with magnetoencephalography. Neuroimage, 51(1), 102-111. Hipp, J. F., Hawellek, D. J., Corbetta, M., Siegel, M., & Engel, A. K. (2012). Large-scale cortical correlation structure of spontaneous oscillatory activity. Nature neuroscience, 15(6), 884-890. O’Neill, G. C., Barratt, E. L., Hunt, B. A., Tewarie, P. K., & Brookes, M. J. (2015). Measuring electrophysiological connectivity by power envelope correlation: a technical review on MEG methods. Physics in Medicine & Biology, 60(21), R271. Parameters ---------- data : numpy.ndarray MEEG Signal. Allowed dimensions: [n_epochs, n_samples, n_channels] and [n_samples, n_channels]. ort : bool If True, the signals on "data" will be orthogonalized before the computation of the amplitude envelope correlation. Returns ------- aec : numpy.ndarray aec-based connectivity matrix. [n_epochs, n_channels, n_channels]. """ # Error check if not np.issubdtype(data.dtype, np.number): raise ValueError('data matrix contains non-numeric values') if not ort: if os.environ.get("MEDUSA_EXTRAS_GPU_TF") == "1" and \ tensorflow_integration.check_tf_config(autoconfig=True): aec = __aec_gpu(data) else: aec = __aec_cpu(data) else: if os.environ.get("MEDUSA_EXTRAS_GPU_TF") == "1" and \ tensorflow_integration.check_tf_config(autoconfig=True): aec = __aec_ort_gpu(data) else: aec = __aec_ort_cpu(data) return aec
def __iac_gpu(data): """ This method implements the instantaneous amplitude correlation using GPU. NOTE: See the orthogonalized version. In the original paper, the orthogonalized version was used Parameters ---------- data : numpy.ndarray MEEG Signal. [n_epochs, n_samples, n_channels]. Returns ------- iac : numpy.ndarray iac-based connectivity matrix. [n_epochs, n_channels, n_channels, n_samples]. """ # Error check if type(data) != np.ndarray: raise ValueError("Parameter data must be of type numpy.ndarray") # Set to correct dimensions data = check_dimensions(data) # Variable initialization n_epo = data.shape[0] n_samp = data.shape[1] n_cha = data.shape[2] # Z-score data = tf.math.divide( tf.math.subtract(data, tf.math.reduce_mean(data, axis=1, keepdims=True)), tf.math.reduce_std(data, axis=1, keepdims=True)) # IAC computation hilb = hilbert(data) envelope = tf.math.abs(hilb) iac = tf.multiply(tf.tile(envelope, (1, 1, n_cha)), tf.transpose(tf.reshape( tf.transpose(tf.tile(envelope, (1, n_cha,1)),perm=[0,2,1]), (n_epo, n_cha*n_cha, n_samp)), perm=[0,2,1])).numpy() iac = tf.reshape(tf.transpose(iac,perm=[0,2,1]), (n_epo, n_cha, n_cha, n_samp)).numpy() # Set diagonal to 0 diag_mask = tf.ones((n_cha, n_cha)) diag_mask = tf.linalg.set_diag(diag_mask, tf.zeros(diag_mask.shape[0:-1]), name=None) iac = tf.multiply(iac, tf.repeat(tf.repeat(diag_mask[None,:, :, None], n_samp, axis=-1), n_epo,axis=0)) return iac.numpy() def __iac_cpu(data): """ This method implements the instantaneous amplitude correlation using CPU. NOTE: See the orthogonalized version. In the original paper, the orthogonalized version was used Parameters ---------- data : numpy.ndarray MEEG Signal. [n_epochs, n_samples, n_channels]. Returns ------- iac : numpy.ndarray iac-based connectivity matrix. [n_epochs, n_channels, n_channels, n_samples]. """ # Error check if type(data) != np.ndarray: raise ValueError("Parameter data must be of type numpy.ndarray") # Set to correct dimensions data = check_dimensions(data) # Variable initialization n_epo = data.shape[0] n_samp = data.shape[1] n_cha = data.shape[2] # IAC computation data = sp_stats.zscore(data, axis=1) hilb = hilbert(data) envelope = abs(hilb) iac = np.multiply(np.reshape( np.tile(envelope, (1, n_cha, 1)), (n_epo, n_samp, n_cha*n_cha), order='F'), np.tile(envelope, (1, 1, n_cha))) iac = np.reshape(np.transpose(iac,(0,2,1)), (n_epo, n_cha, n_cha, n_samp)) # Set diagonal to 0 diag_mask = np.ones((n_cha, n_cha)) np.fill_diagonal(diag_mask, 0) iac = iac * np.repeat(np.repeat(diag_mask[None,:, :, None], n_samp, axis=-1), n_epo,axis=0) return iac def __iac_ort_gpu(data): """ This method implements the orthogonalized version of the instantaneous amplitude correlation using GPU. This orthogonalized version minimizes the spurious connectivity caused by common sources (zero-lag correlations). data : numpy.ndarray MEEG Signal. [n_epochs, n_samples, n_channels]. Returns ------- iac_ort : numpy.ndarray iac-based connectivity matrix. [n_epochs, n_channels, n_channels, n_samples]. """ # Error check if type(data) != np.ndarray: raise ValueError("Parameter data must be of type numpy.ndarray") # Set to correct dimensions data = check_dimensions(data) # Variable initialization n_epo = data.shape[0] n_samp = data.shape[1] n_cha = data.shape[2] # IAC Ort Calculation (CPU orthogonalization is much faster than GPU one) # Z-score data = tf.math.divide( tf.math.subtract(data, tf.math.reduce_mean(data, axis=1, keepdims=True)), tf.math.reduce_std(data, axis=1, keepdims=True)) signal_ort = orthogonalizate.signal_orthogonalization_cpu(data.numpy(), data.numpy()) signal_ort_2 = tf.transpose(tf.reshape(tf.transpose(signal_ort, perm= [0,3,2,1]), (n_epo, n_cha * n_cha,n_samp)), perm = [0,2,1]) hilb_1 = hilbert(signal_ort_2) envelope = tf.math.abs(hilb_1) iac = tf.multiply(tf.tile(envelope, (1, 1, n_cha**2)), tf.transpose(tf.reshape( tf.transpose(tf.tile(envelope, (1, n_cha**2, 1)), perm= [0,2,1]), (n_epo,(n_cha*n_cha)**2, n_samp)),perm=[0,2,1])) iac = tf.reshape(tf.transpose(iac,perm=[0,2,1]), (n_epo, n_cha**2, n_cha**2, n_samp)) iac_tmp2 = tf.transpose( tf.reshape( tf.transpose(iac, (0,2, 1, 3)), (n_epo,int(iac.shape[1] * iac.shape[1] / n_cha), -1, iac.shape[3]) ), (0,2, 1, 3) ) idx = tf.cast(tf.linspace(0, iac_tmp2.shape[2]-1, n_cha), dtype=tf.int32) iac = tf.gather(iac_tmp2, idx, axis=2) # Orthogonalize A regarding B is not the same as orthogonalize B regarding # A, so we average lower and upper triangular matrices to construct the # symmetric matrix required for Orthogonalized iac iac_upper = tf.linalg.band_part(tf.transpose(iac, (0,3, 1, 2)), 0, -1) iac_lower = tf.transpose(tf.linalg.band_part(tf.transpose(iac, (0,3, 1, 2)), -1, 0), (0,1, 3, 2)) iac_ort = tf.math.divide(tf.math.add(iac_upper, iac_lower), 2) aux = tf.linalg.band_part(iac_ort, 0, -1) - tf.linalg.band_part(iac_ort, 0, 0) iac_ort = tf.math.abs(tf.math.add(aux, tf.transpose(aux, (0,1, 3, 2)))) return tf.transpose(iac_ort, (0, 2, 3, 1)).numpy() def __iac_ort_cpu(data): """ This method implements the orthogonalized version of the instantaneous amplitude correlation using CPU. This orthogonalized version minimizes the spurious connectivity caused by common sources (zero-lag correlations). Parameters ---------- data : numpy.ndarray MEEG Signal. [n_epochs, n_samples, n_channels]. Returns ------- iac_ort : numpy.ndarray iac-based connectivity matrix. [n_epochs, n_channels, n_channels, n_samples]. """ # Error check if type(data) != np.ndarray: raise ValueError("Parameter data must be of type numpy.ndarray") # Set to correct dimensions data = check_dimensions(data) # Variable initialization n_epo = data.shape[0] n_samp = data.shape[1] n_cha = data.shape[2] # AEC Ort Calculation data = sp_stats.zscore(data, axis=1) signal_ort = orthogonalizate.signal_orthogonalization_cpu(data, data) signal_ort_2 = np.transpose( np.reshape(np.transpose(signal_ort, (0, 3, 2, 1)), (n_epo, n_cha * n_cha, n_samp)), (0, 2, 1)) hilb_1 = hilbert(signal_ort_2) envelope_1 = np.abs(hilb_1) iac = np.multiply(np.reshape(np.tile( envelope_1, (1, n_cha**2, 1)), (n_epo, n_samp, n_cha**2*n_cha**2), order='F'), np.tile(envelope_1, (1, 1, n_cha**2))) iac = np.reshape(np.transpose(iac,[0,2,1]), (n_epo,n_cha**2, n_cha**2, n_samp)) iac_tmp2 = np.transpose( np.reshape( np.transpose(iac, (0,2, 1, 3)), (n_epo,int(iac.shape[1] * iac.shape[1] / n_cha), -1, n_samp) ), (0,2, 1, 3) ) idx = np.linspace(0, iac_tmp2.shape[2]-1, n_cha).astype(np.int32) iac = iac_tmp2[:,:, idx, :] # Orthogonalize A regarding B is not the same as orthogonalize B regarding # A, so we average lower and upper triangular matrices to construct the # symmetric matrix required for Orthogonalized AEC iac_upper = np.triu(np.transpose(iac, (0,3, 1, 2)), k=1) iac_lower = np.transpose(np.tril(np.transpose(iac, (0,3, 1, 2)), k=-1), (0,1, 3, 2)) iac_ort = (iac_upper + iac_lower) / 2 iac_ort = abs(np.triu(iac_ort, k=1) + np.transpose(iac_ort, (0,1, 3, 2))) return np.transpose(iac_ort, (0,2, 3, 1))
[docs]def iac(data, ort=True): """ This method implements the instantaneous amplitude correlation (using GPU if available). Based on the "ort" param, the signals could be orthogonalized before the computation of the amplitude envelope correlation. REFERENCES: Tewarie, P., Liuzzi, L., O'Neill, G. C., Quinn, A. J., Griffa, A., Woolrich, M. W., ... & Brookes, M. J. (2019). Tracking dynamic brain networks using high temporal resolution MEG measures of functional connectivity. Neuroimage, 200, 38-50. O’Neill, G. C., Barratt, E. L., Hunt, B. A., Tewarie, P. K., & Brookes, M. J. (2015). Measuring electrophysiological connectivity by power envelope correlation: a technical review on MEG methods. Physics in Medicine & Biology, 60(21), R271. Parameters ---------- data : numpy.ndarray MEEG Signal. Allowed dimensions: [n_epochs, n_samples, n_channels] and [n_samples, n_channels]. ort : bool If True, the signals on "data" will be orthogonalized before the computation of the instantaneous amplitude correlation. Returns ------- iac : numpy 2D square matrix iac-based connectivity matrix. [n_epochs, n_channels, n_channels, n_samples]. """ # Error check if not np.issubdtype(data.dtype, np.number): raise ValueError('data matrix contains non-numeric values') if not ort: if os.environ.get("MEDUSA_EXTRAS_GPU_TF") == "1" and \ tensorflow_integration.check_tf_config(autoconfig=True): iac = __iac_gpu(data) else: iac = __iac_cpu(data) else: if os.environ.get("MEDUSA_EXTRAS_GPU_TF") == "1" and \ tensorflow_integration.check_tf_config(autoconfig=True): iac = __iac_ort_gpu(data) else: iac = __iac_ort_cpu(data) return iac