Source code for medusa.graph_theory.complexity

# Built-in imports
import warnings, os

# External imports
import numpy as np
import scipy.special as sps

# Medusa imports
from medusa import tensorflow_integration

# Extras
if os.environ.get("MEDUSA_EXTRAS_GPU_TF") == "1":
    import tensorflow as tf
    import tensorflow_probability as tfp

def __divergency_cpu(W):
    """
    Calculates the graph divergency. Its the entropic distance (as euclidean
    distance) between a uniform weight distribution (random graph) and the 
    network under study using CPU
    
    Parameters
    ----------
    W : numpy 2D matrix
        Graph matrix. ChannelsXChannels.

    Returns
    -------
    D : numpy array
        Network divergency.
        
    """    
    N = W.shape[0]
    ind = np.argwhere(np.triu(np.ones((N,N)),k=1))
    W_wei = np.sum(W[ind[:,0],ind[:,1]]) # Tri Up indices
    
    if not W.all() == 0:
        p = W[ind[:,0],ind[:,1]] / W_wei # Weights normalising
    
        p_sort = np.sort(p,axis=0)[::-1] # Weight sorting (descending)
        
        p_equi_sort = 1 / len(p_sort) * np.ones((1,len(p_sort))) # Full network is the
        #equilibrium network (same weight for all the edges)
        
        # Normalised euclidean distance
        c = sps.comb(N,2) # Number of connections without autoloops
        D = np.sqrt(c / (c - 1)) * np.sqrt(np.nansum(
                np.power(np.squeeze(np.asarray(p_equi_sort))-np.squeeze(np.asarray(p_sort)),2)))
    else: # Empty graph
        D = 0 
        
    return D


def __graph_entropy_cpu(W):
    """
    Calculates the Shannon entropy of a weighted graph using CPU
    
    Parameters
    ----------
    W : numpy 2D matrix
        Graph matrix. ChannelsXChannels.

    Returns
    -------
    H : numpy array
        Network entropy.
        
    """     
    N = W.shape[0]
    ind = np.argwhere(np.triu(np.ones((N,N)),k=1))
    W_wei = np.sum(W[ind[:,0],ind[:,1]]) # Tri Up indices

    if not W.all() == 0:
        p = W[ind[:,0],ind[:,1]] / W_wei # Weights normalising

        H = -1 * np.sum(p[p > 0] * np.log2(p[p>0])) # Shannon entropy calculation

        # Normalising H to range [0 1]
        K = np.log2(p.shape[0])
        H = H/K
    else: # Empty or full graph
        H = 1

    return H         
    

def __complexity_cpu(W):
    """
    Calculates the Shannon Graph Complexity of a graph that node i belong to 
    one of the network shortest paths using CPU
    
    Note: W must be converted to a connection-length matrix. It is common to 
    obtain it via mapping from weight to length. BC is normalised to the 
    range [0 1] as BC/[(N-1)(N-2)]
        
    Parameters
    ----------
    W : numpy 2D matrix
        Graph matrix. ChannelsXChannels.

    Returns
    -------
    C : numpy array
        Network complexity.
        
    """
    tmp_1 = __divergency_cpu(W)
    tmp_2 = __graph_entropy_cpu(W)
    C = tmp_1 * tmp_2
                
    return C

def __aux_divergency(W,N,ind,W_wei):
    p = tf.math.divide(tf.gather_nd(W,tf.cast(ind,tf.int32)),W_wei) # Weights normalising

    p_sort = tf.sort(p,axis=0,direction='DESCENDING') # Weight sorting (descending)
    
    p_equi_sort = tf.math.multiply(tf.math.divide(1,p_sort.shape[0]),tf.ones((1,p_sort.shape[0]),dtype=tf.float64)) # Full network is the
    #equilibrium network (same weight for all the edges)
    
    # Normalised euclidean distance
    N = tf.cast(N,tf.float32)
    c1 = tfp.distributions.Binomial(total_count=N, probs=0.5) # Binomial distr.
    c2 = tf.math.multiply(tf.math.pow(0.5,2),tf.math.pow((1-0.5),tf.math.subtract(N,tf.constant(2,dtype=tf.float32))))
    c = tf.math.round(tf.math.divide(c1.prob(2),c2)) # N choose 2
    f1 = tf.math.sqrt(tf.math.divide(c,tf.math.subtract(c,1.0)))
    f2 = tf.math.pow(tf.math.subtract(tf.reshape(p_equi_sort,[-1]),tf.reshape(p_sort,[-1])),2)
    f2 = tf.math.sqrt(tf.experimental.numpy.nansum(f2,dtype=tf.float32))
    D = tf.math.multiply(f1,f2)
    
    return D
        
        
def __divergency_gpu(W):
    """
    Calculates the graph divergency. Its the entropic distance (as euclidean
    distance) between a uniform weight distribution (random graph) and the 
    network under study using GPU
    
    Parameters
    ----------
    W : numpy 2D matrix
        Graph matrix. ChannelsXChannels.

    Returns
    -------
    D : numpy array
        Network divergency.
        
    """    
    W = tf.convert_to_tensor(W)
    N = W.shape[0]
    
    ind = tf.where(tf.linalg.band_part(tf.ones((N,N)), 0, -1) - tf.linalg.band_part(tf.ones((N,N)), 0, 0))
    W_wei = tf.reduce_sum(tf.gather_nd(W,ind)) # Tri Up indices
    
    D = tf.cond(tf.reduce_sum(W)!=0, lambda: __aux_divergency(W,N,ind,W_wei),lambda: tf.constant(0))
        
    return D


def __aux_entropy(W,ind,W_wei):
    p = tf.math.divide(tf.gather_nd(W,tf.cast(ind,tf.int32)),W_wei) # Weights normalising
    
    H1 = tf.constant(-1,dtype=tf.float64)
    H2 = tf.gather_nd(p,tf.where(p>0))
    H3 = tf.experimental.numpy.log2(tf.transpose(tf.gather_nd(p,tf.where(p>0))))
    H4 = tf.reduce_sum(tf.math.multiply(H2,H3))

    H = tf.math.multiply(H1,H4)

    K = tf.experimental.numpy.log2(p.shape[0])
    
    H = tf.math.divide(H,K)
    return H

def __graph_entropy_gpu(W):
    """
    Calculates the Shannon entropy of a weighted graph using GPU
    
    Parameters
    ----------
    W : numpy 2D matrix
        Graph matrix. ChannelsXChannels.

    Returns
    -------
    H : numpy array
        Network entropy.
        
    """     
    W = tf.convert_to_tensor(W)
    N = W.shape[0]
    
    ind = tf.where(tf.linalg.band_part(tf.ones((N,N)), 0, -1) - tf.linalg.band_part(tf.ones((N,N)), 0, 0))
    W_wei = tf.reduce_sum(tf.gather_nd(W,ind)) # Tri Up indices
    
    H = tf.cond(tf.reduce_sum(W)!=0, lambda: __aux_entropy(W,ind,W_wei),lambda: tf.constant(1))

    return H         
    

def __complexity_gpu(W):
    """
    Calculates the Shannon Graph Complexity of a graph that node i belong to 
    one of the network shortest paths using GPU
    
    Note: W must be converted to a connection-length matrix. It is common to 
    obtain it via mapping from weight to length. BC is normalised to the 
    range [0 1] as BC/[(N-1)(N-2)]
        
    Parameters
    ----------
    W : numpy 2D matrix
        Graph matrix. ChannelsXChannels.

    Returns
    -------
    C : numpy array
        Network complexity.
        
    """
    tmp_1 = __divergency_gpu(W)
    tmp_2 = __graph_entropy_gpu(W)
    C = tf.math.multiply(tmp_1,tf.cast(tmp_2,tf.float32))
                
    return C


[docs]def complexity(W,mode): """ Calculates the graph complexity. Parameters ---------- W : numpy 2D matrix Graph matrix. ChannelsXChannels. mode : string GPU or CPU Returns ------- global_comp : numpy array Global complexity. """ if W.shape[0] is not W.shape[1]: raise ValueError('W matrix must be square') if not np.issubdtype(W.dtype, np.number): raise ValueError('W matrix contains non-numeric values') if mode == 'GPU' and os.environ.get("MEDUSA_EXTRAS_GPU_TF") == "1" and \ tensorflow_integration.check_tf_config(autoconfig=True): global_comp = __complexity_gpu(W) else: global_comp = __complexity_cpu(W) return global_comp