import warnings
from copy import copy
import numpy as np
from numpy import linalg as nlinalg
from scipy import linalg as slinalg
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import make_axes_locatable
from medusa import components
from medusa.plots.head_plots import TopographicPlot
[docs]class LaplacianFilter(components.ProcessingMethod):
"""
Class for fitting and applying Laplacian Filter to EEG Signals.
A channel set from EEGChannelSet class must have been defined before calling
LaplacianFilter class.
This class implements the second order Hjorth's approximation of the Laplacian
surface for a spatial-discrete system [1].
It counts with two different modes:
- Auto: First, the location of the channel to be filtered is identified.
This allows us to determine the number of surrounding electrodes to
be taken into account when calculating the laplacian surface
(i.e., an electrode located in the center of the assembly is
not the same as an electrode located in a corner). Then, apply the
Laplacian surface correction taking into account the distance at
which each electrode is located. It should take into account that
this mode only applies the laplacian surface to the closest
electrodes, so for next-nearest neighbours [2] the custom mode should be
used.
- Custom: In this mode, a list containing the labels of the channels to be
used to calculate the Laplacian surface of each channel to be
filtered must be defined. This allows the design of long
distance filters [2].
References
[1] Claudio Carvalhaes, J. Acacio de Barros, The surface Laplacian technique
in EEG: Theory and methods, International Journal of Psychophysiology,
Volume 97, Issue 3, 2015, Pages 174-188.
[2] Dennis J McFarland, Lynn M. McCabe, Stephen V. David, Jonathan R. Wolpaw,
Spatial filter selection for EEG-based communication, Electroencephalography
and clinical Neurophysiology, Volume 193, 1997, Pages 386-394.
"""
[docs] def __init__(self, channel_set, mode='auto'):
super().__init__(apply_lp=['s_filtered'])
"""
Constructor of class LaplacianFilter
Parameters
----------
channel_set: EEGChannelSet
EEGChannelSet instance
mode: str {'auto'|'custom'}
"""
# Check Channel Set is initialized
if not channel_set.channels:
raise Exception('Cannot compute the nearest neighbors if channel set '
'is not initialized!')
# Chech if there are enough channels to perform filtering
if len(channel_set.l_cha) < 5:
raise Exception('There are not enough channels to perform Laplacian'
'surface filtering. ')
# Parameters
self.channel_set = channel_set
self.l_cha = channel_set.l_cha
self.n_cha = channel_set.n_cha
self.mode = mode
# Variables
self.dist_matrix = None
self.lp_filter = []
self.idx_cha_to_filter = None
self.dist_weights = []
[docs] def fit_lp(self, l_cha_to_filter, l_cha_laplace=None):
"""
Fits the Laplacian Filter depending on the mode chosen
Parameters
----------
l_cha_to_filter: list of strings
List [N x 1] containing the labels of the channels to filter. Used in
both filtering modes.
l_cha_laplace: list
List of lists [N x M] containing the labels of the channels to
compute the laplace filter for channel in position Ni of
l_cha_to_filter. Only used in mode custom.
"""
self.dist_matrix = self.channel_set.compute_dist_matrix()
self.idx_cha_to_filter = np.empty(len(l_cha_to_filter))
if self.mode == 'auto':
for i in range(len(l_cha_to_filter)):
# Get the indexes of channels to filter
self.idx_cha_to_filter[i] = self.l_cha.index(l_cha_to_filter[i])
nearest_channels = np.unique(np.round(np.sort(self.dist_matrix
[self.idx_cha_to_filter[i].astype(int),:])[:5],2))
# Check the num of channels that can be taken to perform Laplacian
# It is fully surrounded by other channels
if len(nearest_channels) == 2 or len(nearest_channels) == 3:
n_cha_lp = 4
# It is in one side
elif len(nearest_channels) == 4:
n_cha_lp = 3
# It is in a corner
elif len(nearest_channels) == 5:
n_cha_lp = 2
# Get the closest n channels
self.lp_filter.append(np.argsort(self.dist_matrix[self.
idx_cha_to_filter[i].astype(int), :])
[1:n_cha_lp + 1])
# Get the distances of the n channels
self.dist_weights.append(1./np.sort(self.dist_matrix[self.
idx_cha_to_filter[i].astype(int), :])
[1:n_cha_lp + 1])
self.lp_filter = np.array(self.lp_filter,dtype=object)
self.dist_weights = np.array(self.dist_weights, dtype=object)
elif self.mode == 'custom':
# Check Errors
if l_cha_to_filter is None:
raise ValueError("[LaplacianFilter] In 'custom' mode is necessary to "
"set the labels of the channels to filter in"
"'l_cha_to_filter'")
if l_cha_laplace is None:
raise ValueError("[LaplacianFilter] In 'custom' mode is necessary to "
"set the labels of the channels to compute the "
"Laplacian filter in 'l_cha_to_filter'.")
if len(l_cha_to_filter) != len(l_cha_laplace):
raise ValueError("[LaplacianFilter] In 'custom' mode is necessary to"
"define as many list with channel labels to compute"
"the Laplacian filter as channels to apply such"
"filter.")
for i in range(len(l_cha_to_filter)):
# Get the channel indexes
self.idx_cha_to_filter[i] = self.l_cha.index(l_cha_to_filter[i])
# Get the channel indexes for Laplacian filtering
self.lp_filter.append(np.array([self.l_cha.index(x)
for x in l_cha_laplace[i]]))
self.dist_weights.append(1./self.dist_matrix[self.idx_cha_to_filter[i].astype(int),
np.array([self.l_cha.index(x)
for x in l_cha_laplace[i]])])
[docs] def apply_lp(self, signal):
"""
Applies Laplacian filter to an EEG signal
Parameters
----------
signal: np.ndarray
Array of EEG signal with shape [N_samples x N_channels]
Returns
-------
s_filtered: np.ndarray
Filtered EEG signal with shape [N_samples x len(l_cha_to_filter)].
"""
# Check dimensions
if signal.shape[1] != len(self.l_cha):
raise ValueError('Dimensions of s in axis 1 must match the number '
'of channels')
s_filtered = np.empty((signal.shape[0],len( self.idx_cha_to_filter)))
for i, index in enumerate(self.idx_cha_to_filter.astype(int)):
s_filtered[:,i] = signal[:, index] - np.average(signal[:,
self.lp_filter[i].astype(int)],
weights=self.dist_weights[i],
axis=1)
return s_filtered
[docs]def car(signal):
"""Implementation of the common average reference (CAR) spatial filter.
This class applies a CAR filter over a signal with shape [samples x
channels]. The CAR filtering substracts the averaged signal of all
electrodes to every single one, following the expression below:
X_car = X - (1/N)*sum(X,2),
where X is the EEG signal with dimensions [samples x channels], N is the
total number of channels, and sum(~,2) denotes the sum over the second
dimension (i.e., over the channels, returning a [samples x 1] signal). In
practice, this operation can be implemented as a matrix multiplication:
X_car = (M * X')',
where X is the original signal [samples x ch], and M is the spatial
filter matrix, composed by:
| 1-(1/N) -1/N -1/N |
M = | -1/N 1-(1/N) -1/N |
| -1/N -1/N 1-(1/N) |
Parameters
----------
signal: np.ndarray
EEG raw signal with dimensions [samples x ch]. Note that the TRIGGER
channel must be DELETED before calling this function. In other
words, eeg_signal must not contain the TRIGGER channel, only the EEG
channels.
Returns
-------
signal: np.array
Filtered signal
"""
# Number of channels
n_cha = signal.shape[1]
if n_cha > 1:
# Create the CAR spatial filter matrix
m = np.ones([n_cha, n_cha]) * (-1 / float(n_cha))
np.fill_diagonal(m, 1 - (1 / float(n_cha)))
signal = np.dot(signal, m)
return signal
[docs]class CSP(components.ProcessingMethod):
""" Common Spatial Pattern filtering.
Attributes
----------
filters : {(…, M, M) numpy.ndarray, (…, M, M) matrix}
Mixing matrix (spatial filters are stored in columns).
eigenvalues : (…, M) numpy.ndarray
Eigenvalues of w.
patterns : numpy.ndarray
De-mixing matrix (activation patterns are stored in columns).
"""
[docs] def __init__(self, n_filters=4, selection="extremes"):
"""
Parameters
----------
n_filters : int or None
Number of filters to select. Use None to return all filters
(default: 4).
selection : basestring
Selection method:
- "extremes" (default): classic method that takes the filters
from the extremes, which belong to both classes separately. This
method cannot be applied in problems with more than 2 classes.
- "eigenvalues": the eigenvalues are sorted and the highest ones
are eligible to be selected.
Attributes
----------
filters: np.ndarray (n_channels, n_channels)
Mixing matrix, or forward model (spatial filters are stored in
rows).
patterns: np.ndarray (n_channels, n_channels)
De-mixing matrix, or backward model (patterns are stored in rows).
eigenvalues: np.ndarray (n_channels, )
Eigenvalues associated to each filter and pattern.
sel_idxs: np.ndarray (n_filters, )
Selected indexes to get the desired filters and patterns.
sel_filters: np.ndarray (n_filters, n_channels)
Selected spatial filters (stored in rows).
sel_patterns: np.ndarray (n_filters, n_channels)
Selected patterns (stored in rows).
sel_eigenvalues: np.ndarray (n_filters, )
Selected eigenvalues.
"""
self.n_filters = n_filters # Number of filters to choose
self.selection = selection # Selection method
if self.n_filters is None:
self.selection = "none"
# Generated
self.filters = None # Mixing matrix (spatial filters)
self.patterns = None # De-mixing matrix
self.eigenvalues = None # Eigenvalues
self.sel_idxs = None # Selected indexes
self.sel_filters = None # Selected spatial filters
self.sel_patterns = None # Selected patterns
self.sel_eigenvalues = None # Selected eigenvalues
self.is_fitted = False
[docs] def fit(self, X, y):
""" Method to train the CSP.
This code is based on [1] for the 2-class problem, and based on [2]
for the > 2-class problem.
Parameters
----------
X : numpy.ndarray (n_epochs, n_samples, n_channels)
Epoched data of shape (n_epochs, n_samples, n_channels)
y : numpy.ndarray (n_epochs, )
Labels for epoched data of shape (n_epochs, )
References
----------
[1] Blankertz, B., Tomioka, R., Lemm, S., Kawanabe, M., & Muller,
K. R. (2007). Optimizing spatial filters for robust EEG single-trial
analysis. IEEE Signal processing magazine, 25(1), 41-56.
[2] Grosse-Wentrup, Moritz, and Martin Buss. "Multiclass common
spatial patterns and information theoretic feature extraction."
Biomedical Engineering, IEEE Transactions on 55, no. 8 (2008):
1991-2000.
"""
# Error detection
n_classes = np.unique(y)
if len(n_classes) > 2 and self.selection == "extremes":
raise ValueError("Cannot use 'extremes' selection if the data has "
"more than 2 classes (%i classess found)!"
% len(n_classes))
# Covariance matrices
cov = []
for c in n_classes:
cov.append(np.cov(X[y == c].reshape(-1, X.shape[-1]).T))
cov = np.array(cov) # dimensions [n_classes x n_cha x n_cha]
# Classic implementation for 2 classes
if len(n_classes) == 2:
# Solve the eigenvalue problem
self.eigenvalues, eigenvectors = slinalg.eigh(cov[0], cov.sum(0))
# Indexes for sorting eigenvectors (w)
if self.selection == "eigenvalues":
# Automatic sorting using eigenvalues
self.sel_idxs = np.argsort(np.abs(self.eigenvalues - 0.5))[::-1]
self.sel_idxs = self.sel_idxs[:self.n_filters]
# if self.selection == "ratio-of-means":
# proj0 = [np.dot(eigenvectors.T, trial.T) for trial in X[
# y == n_classes[0]]]
# proj0 = np.transpose(np.array(proj0), (0, 2, 1))
# # epochs x filters x channels
if self.selection == "extremes":
# Automatic selection using extremes for both classes
self.sel_idxs = list()
ids = np.arange(len(self.eigenvalues)).tolist()
start = False
while len(self.sel_idxs) < self.n_filters:
if start:
self.sel_idxs.append(ids.pop(0))
else:
self.sel_idxs.append((ids.pop(len(ids) - 1)))
start = not start
# Implementation for more than 2 classes
elif len(n_classes) > 2:
# Approximate joint diagonalization based on jacobi angle
filters, d = self._adj_pham(x=cov)
filters = filters.T
# Normalization
# Mean covariance
cmean = np.average(cov, axis=0)
for i in range(filters.shape[1]):
temp = np.dot(np.dot(filters[:, i].T, cmean), filters[:, i])
filters[:, i] /= np.sqrt(temp)
# We calculate the probability of each class
self.eigenvalues = []
prob_class = [np.mean(y == c) for c in n_classes]
for j in range(filters.shape[1]):
patterns, b = 0, 0
for i, c in enumerate(n_classes):
temp = np.dot(np.dot(filters[:, j].T, cov[i]),
filters[:, j])
patterns += prob_class[i] * np.log(np.sqrt(temp))
b += prob_class[i] * (temp ** 2 - 1)
mutual_info = - (patterns + (3.0 / 16) * (b ** 2))
self.eigenvalues.append(mutual_info)
# Indexes for sorting eigenvalues (w)
if self.selection == "eigenvalues":
self.sel_idxs = np.argsort(self.eigenvalues)[::-1]
self.sel_idxs = self.sel_idxs[:self.n_filters]
else:
raise ValueError("Number of classes must be >= 2")
# Get all the spatial filters, patterns and eigenvalues (non-sorted)
self.filters = eigenvectors.T
self.patterns = slinalg.pinv(eigenvectors)
self.eigenvalues = np.array(self.eigenvalues)
# Get the selected spatial filters, patterns and eigenvalues
if self.sel_idxs is not None:
self.sel_filters = self.filters[self.sel_idxs, :]
self.sel_patterns = self.patterns[self.sel_idxs, :]
self.sel_eigenvalues = self.eigenvalues[self.sel_idxs]
else:
self.sel_filters = self.filters
self.sel_patterns = self.patterns
self.sel_eigenvalues = self.eigenvalues
self.is_fitted = True
[docs] def project(self, X):
""" Projects the input data X with the selected spatial filters.
Parameters
----------
X : numpy.ndarray (n_epochs, n_samples, n_channels)
Epoched data of shape (n_epochs, n_samples, n_channels).
Returns
-------
numpy.ndarray (n_epochs, n_filters, n_channels)
Array with the epochs of signal projected in the CSP space.
"""
if len(X.shape) != 3:
raise Exception("X must be 3-dimensional (n_epochs x n_samples x "
"n_channels!")
if not self.is_fitted:
raise Exception("CSP must be fitted first")
# Project each trial separately
projection = [np.dot(self.sel_filters, trial.T) for trial in X]
projection = np.transpose(np.array(projection), (0, 2, 1))
return projection
@staticmethod
def _adj_pham(x, eps=1e-6, n_iter_max=15):
"""Approximate joint diagonalization based on pham's algorithm.
This is a direct implementation of the PHAM's AJD algorithm [1].
Extracted from pyriemann module:
http://github.com/alexandrebarachant/pyRiemann
Parameters
----------
x : ndarray, shape (n_trials, n_channels, n_channels)
A set of covariance matrices to diagonalize
eps : float (default 1e-6)
tolerance for stopping criterion.
n_iter_max : int (default 15)
The maximum number of iteration to reach convergence.
Returns
-------
v : numpy.ndarray, [n_channels, n_channels]
Diagonalizer
d : numpy.ndarray, [n_trials, n_channels, n_channels]
Set of quasi diagonal matrices
References
----------
[1] Pham, Dinh Tuan. "Joint approximate diagonalization of positive
definite Hermitian matrices." SIAM Journal on Matrix Analysis and
Applications 22, no. 4 (2001): 1136-1152.
"""
n_epochs = x.shape[0]
# Reshape input matrix
a = np.concatenate(x, axis=0).T
# Init variables
n_times, n_m = a.shape
v = np.eye(n_times)
epsilon = n_times * (n_times - 1) * eps
for it in range(n_iter_max):
decr = 0
for ii in range(1, n_times):
for jj in range(ii):
Ii = np.arange(ii, n_m, n_times)
Ij = np.arange(jj, n_m, n_times)
c1 = a[ii, Ii]
c2 = a[jj, Ij]
g12 = np.mean(a[ii, Ij] / c1)
g21 = np.mean(a[ii, Ij] / c2)
omega21 = np.mean(c1 / c2)
omega12 = np.mean(c2 / c1)
omega = np.sqrt(omega12 * omega21)
tmp = np.sqrt(omega21 / omega12)
tmp1 = (tmp * g12 + g21) / (omega + 1)
tmp2 = (tmp * g12 - g21) / max(omega - 1, 1e-9)
h12 = tmp1 + tmp2
h21 = np.conj((tmp1 - tmp2) / tmp)
decr += n_epochs * (g12 * np.conj(h12) + g21 * h21) / 2.0
tmp = 1 + 1.j * 0.5 * np.imag(h12 * h21)
tmp = np.real(tmp + np.sqrt(tmp ** 2 - h12 * h21))
tau = np.array([[1, -h12 / tmp], [-h21 / tmp, 1]])
a[[ii, jj], :] = np.dot(tau, a[[ii, jj], :])
tmp = np.c_[a[:, Ii], a[:, Ij]]
tmp = np.reshape(tmp, (n_times * n_epochs, 2), order='F')
tmp = np.dot(tmp, tau.T)
tmp = np.reshape(tmp, (n_times, n_epochs * 2), order='F')
a[:, Ii] = tmp[:, :n_epochs]
a[:, Ij] = tmp[:, n_epochs:]
v[[ii, jj], :] = np.dot(tau, v[[ii, jj], :])
if decr < epsilon:
break
d = np.reshape(a, (n_times, -1, n_times)).transpose(1, 0, 2)
return v, d
[docs] def to_dict(self):
dict_ = copy(self.__dict__)
for key, value in dict_.items():
if isinstance(value, np.ndarray):
dict_[key] = value.tolist()
return dict_
[docs] @staticmethod
def from_dict(dict_data):
csp = CSP()
csp.n_filters = dict_data['n_filters']
csp.selection = dict_data['selection']
csp.filters = np.array(dict_data['filters'])
csp.patterns = np.array(dict_data['patterns'])
csp.eigenvalues = np.array(dict_data['eigenvalues'])
csp.sel_idxs = np.array(dict_data['sel_idxs'])
csp.sel_filters = np.array(dict_data['sel_filters'])
csp.sel_patterns = np.array(dict_data['sel_patterns'])
csp.sel_eigenvalues = np.array(dict_data['sel_eigenvalues'])
return csp
[docs] def plot(self, channel_set, figure=None, plot_filters=False,
plot_patterns=True, topo_settings=None, show=False,
plot_eig=True, only_selected=True):
# Error detection and initialization
if not plot_patterns and not plot_filters:
raise Exception("Cannot plot CSP if plot_filters and "
"plot_patterns are both None")
if figure is None:
figure = plt.figure(figsize=(7.5, 3), dpi=300)
if len(channel_set.l_cha) != self.sel_filters.shape[1]:
raise Exception("The number of channels (%i) must be the same as "
"the number of channels used to train the CSP ("
"%i)" % (len(channel_set.l_cha),
self.sel_filters.shape[1]))
if topo_settings is None:
topo_settings = {
"head_radius": 1.0,
"head_line_width": 2,
"interp_contour_width": 1,
"interp_points": 500,
}
if only_selected:
sel_patterns = self.sel_patterns
sel_filters = self.sel_filters
sel_eigenvalues = self.sel_eigenvalues
else:
sel_patterns = self.patterns
sel_filters = self.filters
sel_eigenvalues = self.eigenvalues
# Parameters
n_row = 2 if plot_patterns and plot_filters else 1
max_f = 0
max_p = 0
for i in range(sel_filters.shape[0]):
if np.max(np.abs(sel_patterns[i, :])) > max_p:
max_p = np.max(np.abs(sel_patterns[i, :]))
if np.max(np.abs(sel_filters[i, :])) > max_f:
max_f = np.max(np.abs(sel_filters[i, :]))
# Plot filters
j = 0
if plot_filters:
for j in range(sel_filters.shape[0]):
ax = figure.add_subplot(n_row, sel_filters.shape[0], j + 1)
topo_settings["clim"] = (-max_f, max_f)
topo_settings["cmap"] = "RdBu"
topo = TopographicPlot(axes=ax, channel_set=channel_set,
**topo_settings)
topo.update(values=sel_filters[j, :])
ax.set_title("Filter %i" % j)
if plot_eig and not plot_patterns:
ax.set_xlabel('Eig: %.3f' % sel_eigenvalues[j])
# Colorbar
if j == sel_filters.shape[0] - 1:
divider = make_axes_locatable(ax)
cax = divider.append_axes('right', size='5%', pad=0.05)
cbar = figure.colorbar(
topo.plot_handles["color-mesh"], cax=cax,
orientation='vertical')
j += 1
# Plot patterns
if plot_patterns:
for i in range(sel_filters.shape[0]):
ax = figure.add_subplot(n_row, sel_filters.shape[0], j + i + 1)
topo_settings["clim"] = (-max_p, max_p)
topo_settings["cmap"] = "PiYG"
topo = TopographicPlot(axes=ax, channel_set=channel_set,
**topo_settings)
topo.update(values=sel_patterns[i, :])
ax.set_title("Pattern %i" % i)
if plot_eig:
ax.set_xlabel('Eig: %.3f' % sel_eigenvalues[i])
# Colorbar
if i == sel_filters.shape[0] - 1:
divider = make_axes_locatable(ax)
cax = divider.append_axes('right', size='5%', pad=0.05)
cbar = figure.colorbar(
topo.plot_handles["color-mesh"], cax=cax,
orientation='vertical')
plt.suptitle("CSP")
# Show?
if show:
plt.show()
return figure
[docs]class CCA(components.ProcessingMethod):
"""
The class CCA performs a Canonical Correlation Analysis filtering. First,
function fit() sould be called to train the spatial filters. Then, spatial
filters could be used to project testing data. After fit(), the following
attributes are computed:
Attributes
----------
wx : {(channels, no_filters) ndarray}
Mixing matrix for projecting the data, where spatial filters are stored
in columns.
wy : {(channels, no_filters) ndarray}
Mixing matrix for projecting the reference, where spatial filters are
stored in columns.
r : {(channels, ) ndarray}
Sample canonical correlations.
"""
[docs] def __init__(self):
self.wx = None
self.wy = None
self.r = None
[docs] def fit(self, x, y):
"""
Fits the CCA spatial filters given the two input matrices data and
reference, storing relevant parameters.
Parameters
----------
x : {(samples, channels) ndarray}
First input matrix, usually data with concatenated epochs.
y : {(samples, channels) ndarray}
Second input matrix, usually the data reference. The number of
samples must match the samples of the data matrix. Repeat the
reference if necessary before calling this function.
"""
[self.wx, self.wy, self.r] = self.canoncorr(x, y)
[docs] def project(self, data, filter_idx=(0), projection='wy'):
"""
Projects the input data matrix using the given spatial filter. Note that
the spatial filter will have dimensions [no. channels x no. channels].
Parameters
----------
data : {(samples, channels) ndarray}
Testing data matrix. The number of channels must be the same as the
no. channels used to train.
filter_idx : {int}, optional
Indexes of the spatial filters to be used. Since filters are sorted by
their importance in terms of correlation, the default filter (0) is
the one that yields highest correlation.
projection: {str}, optional
Canonical coefficients to be used in projection. By default, the
function uses Wy. Typically, if data is averaged, Wy must be used;
otherwise, if data is not averaged and just concatenated, Wx must
be used.
Returns
-------
projected_data : {(samples, ) ndarray}
Projected data along the selected spatial filter.
"""
if projection.lower() == 'wy':
return np.matmul(data, self.wy[:, filter_idx])
elif projection.lower() == 'wx':
return np.matmul(data, self.wx[:, filter_idx])
else:
raise Exception('[CCA] Unknown projection %s' % str(projection))
[docs] @staticmethod
def canoncorr(X, Y):
"""
Computes the canonical correlation analysis (CCA) for the data matrices
X (dimensions N-by-P1) and Y (dimensions N-by-P2). X and Y must have the
same number of observations (rows) but can have different numbers of
variables (cols). The j-th columns of A and B contain the canonial
coefficients, i.e. the linear combination of variables making up the
j-th canoncial variable for X and Y, respectively. If X or Y are less
than full rank, canoncorr gives a warning and returns zeros in the rows
of A or B corresponding to dependent columns of X or Y. Final dimension
D is computed as D = min(rank_X, rank_Y).
Notes
----------
This method is adapted from the MATLAB function 'canoncorr'. Check
that file in case of conflicts, doubts or additional information.
Parameters
----------
X : {(N, P1) ndarray}
Input matrix with dimensions N-by-P1. Rows are observations and cols
are variables.
Y : {(N, P2) ndarray}
Input matrix with dimensions N-by-P1. Rows are observations and cols
are variables.
Returns
-------
A : {(P1, D) ndarray}
Sample canonical coefficients for the variables in X. The j-th
column of A contains the linear combination of variables that makes
up the j-th canonical variable for X. If X is less than full rank,
A will have zeros in the rows corresponding to dependent cols of X.
B : {(P2, D) ndarray}
Sample canonical coefficients for the variables in Y. The j-th
column of B contains the linear combination of variables that makes
up the j-th canonical variable for Y. If Y is less than full rank,
B will have zeros in the rows corresponding to dependent cols of Y.
r : {(D,) ndarray}
Sample canonical correlations. The j-th element of r is the
correlation between the h-th columns of the canonical scores for
the variables in X and Y.
References
-------
[1] Krzanowski, W.J., Principles of Multivariate Analysis,
Oxford University Press, Oxford, 1988.
[2] Seber, G.A.F., Multivariate Observations, Wiley, New York, 1984.
Example
--------
>>> import numpy as np
>>> X = np.random.rand(10, 4)
>>> Y = np.random.rand(10, 4)
>>> A, B, r = canoncorr(X, Y)
"""
# X dims
if len(X.shape) == 1:
n = X.shape[0]
p1 = 1
X = X.reshape(n, 1)
else:
[n, p1] = X.shape
# Check the input size
if Y.shape[0] != n:
raise ValueError('[canoncorr] Input size mismatch')
elif n == 1:
raise ValueError('[canoncorr] Not enough data')
# Y dims
if len(Y.shape) == 1:
p2 = 1
Y = Y.reshape(n, 1)
else:
p2 = Y.shape[1]
# Center the variables
X = X - np.mean(X, 0)
Y = Y - np.mean(Y, 0)
# Factor the inputs and find a full rank set of columns if necessary
[Q1, T11, perm1] = slinalg.qr(X, mode='economic', pivoting=True)
rankX = nlinalg.matrix_rank(T11)
if rankX == 0:
raise ValueError('[canoncorr] Rank of X is 0. Invalid Data')
elif rankX < p1:
warnings.warn('[canoncorr] Data X is not full rank')
Q1 = Q1[:, 0:rankX]
T11 = T11[0:rankX, 0:rankX]
[Q2, T22, perm2] = slinalg.qr(Y, mode='economic', pivoting=True)
rankY = nlinalg.matrix_rank(T22)
if rankY == 0:
raise ValueError('[canoncorr] Rank of Y is 0. Invalid Data')
elif rankY < p2:
warnings.warn('[canoncorr] Data Y is not full rank')
Q2 = Q2[:, 0:rankY]
T22 = T22[0:rankY, 0:rankY]
# Compute canonical coefficients and canonical correlations. For
# rankX > rank Y, the economy-size version ignores the extra columns
# in L and rows in D. For rankX < rankY, need to ignore extra columns
# in M and explicitly. Normalize A and B to give U and V unit variance
d = np.min([rankX, rankY])
[L, D, M] = nlinalg.svd(np.matmul(Q1.T, Q2))
M = M.T
A = nlinalg.lstsq(T11, L[:, 0:d], rcond=None)[0] * np.sqrt(n - 1)
B = nlinalg.lstsq(T22, M[:, 0:d], rcond=None)[0] * np.sqrt(n - 1)
r = D[0:d]
r[r < 0] = 0 # remove roundoff errors
r[r > 1] = 1
# Put coefficients back to their full size and their correct order
A = np.concatenate((A, np.zeros((p1 - rankX, d))), axis=0)
A = A[np.argsort(perm1), :]
B = np.concatenate((B, np.zeros((p2 - rankY, d))), axis=0)
B = B[np.argsort(perm2), :]
return A, B, r
[docs] def to_dict(self):
return self.__dict__
[docs] @staticmethod
def from_dict(dict_data):
cca = CCA()
cca.wx = dict_data['wx']
cca.wy = dict_data['wy']
cca.r = dict_data['r']
return cca