"""Created on Monday March 15 19:27:14 2021
In this module, you will find all functions and data structures related to
EEG and MEG signals. Enjoy!
References:
https://stefanappelhoff.com/eeg_positions
https://github.com/sappelhoff/eeg_positions
https://mne.tools/dev/auto_tutorials/intro/40_sensor_locations.html
@author: Eduardo Santamaría-Vázquez
"""
# Built-in imports
import math
# External imports
import warnings
import scipy.io
import numpy as np
# Medusa imports
from medusa import components
from medusa import meeg
[docs]class MEEGChannel(components.SerializableComponent):
"""This class implements a M/EEG channel.
"""
# TODO: This class is till not being used for compatibility reasons, but it
# should be introduced in the next major update of medusa kernel to remove
# channel dictionaries and simplify the management of MEEG signals
[docs] def __init__(self, label, coordinates=None, reference=None):
"""Constructor for class MEEGChannel.
Parameters
----------
label: str
Label of the channel.
coordinates: dict, optional
Dict with the coordinates of the electrode. It is strongly
recommended to set this optional parameter in order to use advanced
features of MEDUSA (e.g., topographic plots).
reference: MEEGChannel, optional
Use only for bipolar montages to define the reference electrode.
"""
# Check errors
if reference is not None:
if not isinstance(reference, MEEGChannel):
raise ValueError(
'Parameter reference must be of type MEEGChannel or None')
if coordinates is None:
warnings.warn('Channel coordinates are used by some advanced '
'features of MEDUSA (e.g., topographic plots). '
'Please consider setting the coordinates or use '
'function MEEGChannel.from_standard_set to load '
'standard coordinates.')
# Set attributes
self.label = label
self.coordinates = coordinates
self.reference = reference
[docs] def to_serializable_obj(self):
pass
[docs] @classmethod
def from_standard_set(cls, label, standard='10-5'):
pass
[docs] @classmethod
def from_serializable_obj(cls, data):
pass
[docs]class EEGChannelSet(components.SerializableComponent):
"""Class to represent an EEG montage with ordered channels in specific
coordinates. It also provides functionality to load channels from EEG
standards 10-20, 10-10 and 10-5 directly from the labels.
"""
[docs] def __init__(self, reference_method='common', dim='2D',
coord_system='spherical'):
"""Constructor of class EEGChannelSet
Parameters
----------
reference_method: str {'common'|'average'|'bipolar'}
Reference method. Recordings with common reference are referenced
to the same channel (e.g., ear lobe, mastoid). Recordings with
average reference are referenced to the average of all or several
channels. Finally, bipolar reference is the subtraction of 2
channels.
dim: str {'2D'|'3D'}
Dimensions of the coordinates plane.
coord_system: str {'cartesian'|'spherical'}
Coordinates system. Take into account that, if dim = '2D' spherical
refers to polar coordinates
"""
# Check errors
if reference_method not in ('common', 'average', 'bipolar'):
raise ValueError('Unknown reference method %s' % reference_method)
if dim not in ('2D', '3D'):
raise ValueError('Unknown number of dimensions %s' % dim)
if coord_system not in ('cartesian', 'spherical'):
raise ValueError('Unknown coordinates system %s' % coord_system)
# Set attributes
self.reference_method = reference_method
self.dim = dim
self.coord_system = coord_system
self.channels = None
self.n_cha = None
self.l_cha = None
self.montage = None
self.ground = None
self.allow_unlocated_channels = False
[docs] def set_ground(self, ground):
"""Sets the ground of the montage
Parameters
----------
ground: dict
Dict with the ground data. The easiest way to calculate it is using
function get_standard_channel_data_from_label. Keys:
- label: channel label
- coordinates: depends on the coordinate system and dimensions:
- Dim = 2D:
- Cartesian coordinates. Ex: {'x': 0.5, 'y': 0}
- Spherical coordinates Ex: {'r': 0.5,
'theta': np.pi/2}
- Dim = 3D:
- Cartesian coordinates. Ex: {'x': 0.5,
'y': 0, 'z': 0.8}
- Spherical coordinates. Ex: {'r': 0.5,
'theta': np.pi/2, 'phi': np.pi/4}
"""
self.ground = ground
[docs] def add_channel(self, channel, reference):
"""Function to add a channel to the end of the current montage.
Take into account that the order of the channels is important!
Parameters
----------
channel: dict
Dict with the channel data. The easiest way to calculate it is using
function get_standard_channel_data_from_label. Keys:
- label: channel label
- coordinates: depends on the coordinate system and dimensions:
- Dim = 2D:
- Cartesian coordinates. Ex: {'x': 0.5, 'y': 0}
- Spherical coordinates Ex: {'r': 0.5,
'theta': np.pi/2}
- Dim = 3D:
- Cartesian coordinates. Ex: {'x': 0.5,
'y': 0, 'z': 0.8}
- Spherical coordinates. Ex: {'r': 0.5,
'theta': np.pi/2, 'phi': np.pi/4}
reference: dict, list of dicts or str, optional
For common and bipolar reference modes, reference must be a dict
with the label and coordinates of the reference. For average
reference, it can be 'all' to indicate a common average
reference, or a list of dicts (with label and coordinates) of the
averaged reference for each channel
See Also
--------
get_standard_channel_data_from_label: returns channel data given the
channel label and the standard. It can be used to get the reference
"""
# TODO: check input
channel['reference'] = reference
channels = list() if self.channels is None else self.channels
channels.append(channel)
# Check channels
self.__check_channels(channels)
# Store attributes
self.channels = channels
self.n_cha = len(self.channels)
self.l_cha = [cha['label'] for cha in self.channels]
[docs] def set_montage(self, channels, ground=None,
allow_unlocated_channels=False):
"""Sets a custom montage, overwriting the previous one. Add single
channels more easily using function add_custom_channel and
add_standard_channel.
Parameters
----------
channels : list
List of dicts, each of them representing a channel. The dict must
contain the label, coordinates according to parameters dim an
coord_system, and reference. For common reference mode, the
reference must be a single channel with label and coordinates. For
average reference mode, reference can be a list of dicts with the
channels (label and coordinates) or "all", to specify common average
reference. For bipolar reference mode, the reference must be a
single channel with label and coordinates. In all cases you can set
the reference to None, but this is not recommended. The definition
of the coordinates may be skipped depending on the value of
allow_unlocated_channels.
ground : dict
Dict containing the label and coordinates of the ground electrode
allow_unlocated_channels: bool
If False, the coordinates of all channels must be defined within
channels dict. If True, the channels may not have known coordinates,
This may be convenient if the localization of a channel is not known
or it has a non-standard label, but the behaviour in functions that
need coordinates (e.g., topographic_plots) is unpredictable.
See Also
--------
set_standard_montage: preferred choice in most cases
"""
# Check errors
self.allow_unlocated_channels = allow_unlocated_channels
self.__check_channels(channels, ground)
# Set attributes
self.channels = channels
self.ground = ground
self.n_cha = len(self.channels)
self.l_cha = [cha['label'] for cha in self.channels]
[docs] def set_standard_montage(self, l_cha=None, l_reference=None, l_ground=None,
montage='10-05', drop_landmarks=True,
allow_unlocated_channels=False, standard=None):
"""Set standard EEG channels with common reference. In 3 dimensions,
the equator is taken a Nz-T10-Iz-T9.
Parameters
----------
l_cha : list, optional
List of channels labels. The data will be returned keeping the
same order. If None, the channels will be returned in the same order
as they appear in the corresponding standard in medusa.meeg
l_reference : str, optional
Label of the reference. Usual choices are left preauricular point
(LPA) and right preauricular point (RPA). Leave to None if you do
not want to specify the reference (not recommended). Use only if
reference_method is 'common'.
l_ground : str, optional
Label of the ground. Usual choices are AFz or FPz.
montage : str {'10-20'|'10-10'|'10-05'} or dict
EEG standard. If its a string, the corresponding labels and
locations of the standard channels will be loaded using the files in
medusa.meeg. To load a different montage, use function
as meeg.read_montage_file and pass the returned dict here.
drop_landmarks : bool
Drop landmarks: nasion (NAS), left preauricular point (LPA) and
right preauricular point (RPA) or mastoids (M1, M2). These are
usually used as reference or ground points, so they are usually
removed from the channel set for data analysis. Only used if l_cha
is None.
allow_unlocated_channels: bool
If False, all the labels in parameter l_cha must be contained in the
montage, which contains the corresponding coordinates. If True, the
channels may not be in the standard, and they will be saved with no
coordinates. This allows to save labels that are not defined in the
montage, but the behaviour in functions that need locations
(e.g., topographic_plots) is unpredictable.
standard: str
DEPRECATED. Only left for compatibility reasons.
"""
# Check errors
if self.reference_method != 'common':
raise ValueError('Function set_standard_channels is available '
'only for recordings with common reference. For '
'custom montages use set_custom_channels')
assert self.dim == '2D' or self.dim == '3D', \
'Incorrect input on dim parameter'
assert self.coord_system == 'cartesian' or \
self.coord_system == 'spherical', \
'Incorrect input on coord_system parameter'
# Compatibility
montage = standard if standard is not None else montage
# Get montage
if isinstance(montage, str):
# Load standard montage
self.montage = montage
montage = meeg.get_standard_montage(standard=montage,
dim=self.dim,
coord_system=self.coord_system)
else:
# Set custom montage
montage = montage.copy()
self.montage = montage
# Reference
if l_reference is not None:
if l_reference in montage:
reference = montage[l_reference]
else:
if allow_unlocated_channels:
reference = dict()
warnings.warn('Reference not defined in montage')
else:
raise meeg.ChannelNotFound(l_reference)
reference['label'] = l_reference
else:
reference = None
# Get list of labels to get
labels = montage.keys() if l_cha is None \
else [l.upper().strip() for l in l_cha]
# Get channels
channels = list()
for label in labels:
# Drop landmarks
if l_cha is None:
if drop_landmarks:
if label in ('NAS', 'LPA', 'RPA', 'M1', 'M2'):
continue
# Append info
if label in montage:
channel_data = montage[label]
else:
if allow_unlocated_channels:
channel_data = dict()
warnings.warn('Channel %s not defined in montage' % label)
else:
raise meeg.ChannelNotFound(label)
channel_data['label'] = label
channel_data['reference'] = reference
channels.append(channel_data)
# Ground
if l_ground is not None:
if l_ground in montage:
ground = montage[l_ground]
else:
if allow_unlocated_channels:
ground = dict()
warnings.warn('Ground not defined in montage')
else:
raise meeg.ChannelNotFound(l_ground)
ground['label'] = ground
else:
ground = None
# Check channels
self.set_montage(channels, ground=ground,
allow_unlocated_channels=allow_unlocated_channels)
def __check_channels(self, channels, ground=None):
# Get mandatory and coordinates keys for each dim and coord_system
cha_keys = ['label', 'reference']
ref_keys = ['label']
gnd_keys = ['label']
if self.dim == '2D':
if self.coord_system == 'cartesian':
coord_keys = ['x', 'y']
else:
coord_keys = ['r', 'theta']
else:
if self.coord_system == 'cartesian':
coord_keys = ['x', 'y', 'z']
else:
coord_keys = ['r', 'theta', 'phi']
if not self.allow_unlocated_channels:
cha_keys += coord_keys
ref_keys += ref_keys
gnd_keys += gnd_keys
# Check keys
references = list()
for cha in channels:
if not all(k in cha for k in cha_keys):
raise ValueError('Malformed channel %s. Dict keys must be %s' %
(str(cha), str(cha_keys)))
# Check reference
reference = cha['reference']
references.append(reference)
if reference is not None:
if self.reference_method == 'common':
# Single reference
assert isinstance(reference, dict), \
'Reference must be None or dict'
if not all(k in reference for k in ref_keys):
raise ValueError('Malformed reference in channel %s. '
'Reference keys must be %s' %
(str(cha), str(ref_keys)))
# All references must be identical
if not all(ref == references[0] for ref in references):
raise ValueError('All references must be identical '
'in common reference mode')
elif self.reference_method == 'average':
# Average reference
assert isinstance(reference, list), \
'Reference must be None or list of dicts'
for r in reference:
if not all(k in r for k in ref_keys):
raise ValueError('Malformed reference in '
'channel %s. Reference keys '
'must be %s' %
(str(cha), str(ref_keys)))
elif self.reference_method == 'bipolar':
# Single reference
assert isinstance(reference, dict), \
'Reference must be of type dict'
if not all(k in reference for k in ref_keys):
raise ValueError('Malformed reference in channel %s. '
'Reference keys must be %s' %
(str(cha), str(ref_keys)))
# Check ground
if ground is not None:
if not all(k in ground for k in gnd_keys):
raise ValueError(
'Malformed ground. Dict keys must be %s' % (str(gnd_keys)))
[docs] def get_cha_idx_from_labels(self, labels):
"""Returns the position of the channels given the labels
Parameters
----------
labels : list
Labels to check. The order matters
Returns
-------
indexes : np.ndarray
Indexes of the channels in the set
"""
return [self.l_cha.index(l) for l in labels]
[docs] def check_channels_labels(self, labels, strict=False):
"""Checks the order and labels of the channels
Parameters
----------
labels : list
Labels to check. The order matters
strict : bool
If True, comparison is strict. The function will check that the
channel set contains the channels given by parameter labels and
in the same order. If false, the function checks that the
channels are contained in the channel set, but they could be in
different order and the set could contain more channels
Returns
-------
check : bool
True if the labels and order are the same. False otherwise
"""
if strict:
check = True
for i in range(len(labels)):
if self.l_cha[i] != labels[i]:
check = False
else:
check = True
for l in labels:
if l not in self.l_cha:
check = False
return check
[docs] def subset(self, cha_idx):
"""Selects the channels given the indexes, creating a subset. The
order of the channels will be updated
Parameters
----------
cha_idx : np.ndarray
Indexes of the channels to select. The order matters
"""
self.channels = [self.channels[idx] for idx in cha_idx]
self.n_cha = len(self.channels)
self.l_cha = [cha['label'] for cha in self.channels]
[docs] def sort_nearest_channels(self):
""" Sorts the nearest channels for each possible channel
Return
----------
sorted_dist_ch: dict()
Dictionary that includes, for each channel (as a key), the rest
of channels sorted by its closeness to that channel.
"""
# TODO: Añadir la opción de hacer un sort de una coordenada específica,
# o en otra función
if not self.channels:
raise Exception(
'Cannot compute the nearest channels if channel set '
'is not initialized!')
# Compute distance matrix
dist_matrix = self.compute_dist_matrix()
# Create the dictionary and format it
sorted_dist_ch = dict()
for i, label in enumerate(self.l_cha):
d_sorted = np.sort(dist_matrix[i, :])
idx_sorted = np.argsort(dist_matrix[i, :])
if d_sorted[0] != 0:
raise Exception('[meeg/EEGChannelSet/sort_nearest_channels] '
'Something ocurred, distance between the same '
'channel is not zero!')
sorted_dist_ch[label] = []
for j in range(1, len(d_sorted)):
sorted_dist_ch[label].append({
"dist": d_sorted[j],
"channel": self.channels[idx_sorted[j]]
})
return sorted_dist_ch
[docs] def compute_dist_matrix(self):
"""This function computes the distances between all channels in the
channel set and stores them into a matrix.
Returns
-------------
dist_matrix: ndarray of dimensions [ncha x ncha]
Distances between all the channels.
"""
if not self.channels:
raise Exception(
'Cannot compute the distance matrix if channel set '
'is not initialized!')
# Instantiate matrix of distances
dist_matrix = np.empty((self.n_cha, self.n_cha))
# For 2D coordinates
if self.dim == '2D':
if self.coord_system == 'cartesian':
for i, cha in enumerate(self.channels):
# Find location of channel
cha_pos = np.array([cha['x'], cha['y']])
# Find the location of the rest of the channels
for j, temp_cha in enumerate(self.channels):
temp_cha_pos = np.array(
[temp_cha['x'], temp_cha['y']]
)
d = np.sqrt(np.sum(np.power(temp_cha_pos - cha_pos, 2)))
dist_matrix[i, j] = d
elif self.coord_system == 'spherical':
for i, cha in enumerate(self.channels):
# Find location of channel
r_cha, theta_cha = cha['r'], cha['theta']
# Find the location of the rest of the channels
for j, temp_cha in enumerate(self.channels):
r_temp_cha, theta_temp_cha = \
temp_cha['r'], temp_cha['theta']
d = np.abs(np.sqrt(r_cha ** 2 + r_temp_cha ** 2 -
2 * r_cha * r_temp_cha *
np.cos(theta_temp_cha -
theta_cha)))
dist_matrix[i, j] = d
# For 3D coordinates
elif self.dim == '3D':
if self.coord_system == 'cartesian':
for i, cha in enumerate(self.channels):
# Find location of channel
cha_pos = np.array([cha['x'], cha['y'], cha['z']])
# Find the location of the rest of the channels
for j, temp_cha in enumerate(self.channels):
temp_cha_pos = np.array(
[temp_cha['x'], temp_cha['y'], temp_cha['z']]
)
d = np.sqrt(np.sum(np.power(temp_cha_pos - cha_pos, 2)))
dist_matrix[i, j] = d
elif self.coord_system == 'spherical':
for i, cha in enumerate(self.channels):
# Find location of channel
r_cha, theta_cha, phi_cha = \
cha['r'], cha['theta'], cha['phi']
# Find the location of the rest of the channels
for j, temp_cha in enumerate(self.channels):
r_temp_cha, theta_temp_cha, phi_temp_cha = \
temp_cha['r'], temp_cha['theta'], temp_cha['phi']
d = np.abs(
np.sqrt(
r_cha ** 2 +
r_temp_cha ** 2 -
2 * r_cha * r_temp_cha *
(
np.sin(theta_cha) *
np.sin(theta_temp_cha) *
np.cos(phi_cha - phi_temp_cha) +
np.cos(theta_temp_cha) *
np.cos(theta_cha))
)
)
dist_matrix[i, j] = d
return dist_matrix
[docs] def to_serializable_obj(self):
return self.__dict__
[docs] @classmethod
def from_serializable_obj(cls, dict_data):
inst = cls()
inst.__dict__.update(dict_data)
return inst
[docs]class MEGChannelSet(components.SerializableComponent):
# TODO
[docs] def __init__(self):
self.channels = None
self.n_cha = None
self.l_cha = None
[docs] def to_serializable_obj(self):
return self.__dict__
[docs] @classmethod
def from_serializable_obj(cls, dict_data):
inst = cls()
inst.__dict__.update(dict_data)
return inst
[docs]class EEG(components.BiosignalData):
"""Electroencephalography (EEG) biosignal
"""
[docs] def __init__(self, times, signal, fs, channel_set, **kwargs):
"""EEG constructor
Parameters
----------
times : list or numpy.ndarray
1D numpy array [n_samples]. Timestamps of each sample. If they are
not available, generate them
artificially. Nevertheless, all signals and events must have the
same temporal origin
signal : list or numpy.ndarray
2D numpy array [n_samples x n_channels]. EEG samples (the units
should be defined using kwargs)
fs : int or float
Sample rate of the recording.
channel_set : meeg_standards.EEGChannelSet
EEG channel set
kwargs: kwargs
Any other parameter provided will be saved in the class (e.g.,
equipment description)
"""
# To numpy arrays
times = np.array(times)
signal = np.array(signal)
# Check errors
if signal.shape[1] != channel_set.n_cha:
raise Exception("Signal with shape [samples x channels] does not "
"match with the number of channels")
if times.shape[0] != signal.shape[0]:
raise Exception("Parameters times (shape: %s) and signal (shape: "
"%s) must have the same length" % (
str(times.shape), str(signal.shape))
)
# Standard attributes
self.times = times
self.signal = signal
self.fs = fs
self.channel_set = channel_set
# Optional attributes
for key, value in kwargs.items():
setattr(self, key, value)
[docs] def change_channel_set(self, channel_set):
"""Smart change of channel set, updating the signal and all related
attributes
Parameters
----------
channel_set : meeg_standards.EEGChannelSet
EEG channel set
"""
# Get the index of the channels
cha_idx = self.channel_set.get_cha_idx_from_labels(channel_set.l_cha)
# Select and reorganize channels channels
self.channel_set.subset(cha_idx)
# Reorganize signal
self.signal = self.signal[:, cha_idx]
[docs] def to_serializable_obj(self):
rec_dict = self.__dict__
for key in rec_dict.keys():
if type(rec_dict[key]) == np.ndarray:
rec_dict[key] = rec_dict[key].tolist()
if type(rec_dict[key]) == EEGChannelSet:
rec_dict[key] = rec_dict[key].to_serializable_obj()
return rec_dict
[docs] @classmethod
def from_serializable_obj(cls, dict_data):
# Load channel set dict
dict_data['channel_set'] = EEGChannelSet.from_serializable_obj(
dict_data['channel_set']
)
return cls(**dict_data)
[docs]class MEG(components.BiosignalData):
# TODO check everything
"""Magnetoencephalography (MEG) biosignal
"""
[docs] def __init__(self, times, signal, fs, channel_set, **kwargs):
"""MEG constructor
Parameters
----------
times : list or numpy.ndarray
1D numpy array [n_samples]. Timestamps of each sample. If they are
not available, generate them
artificially. Nevertheless, all signals and events must have the
same temporal origin
signal : list or numpy.ndarray
2D numpy array [n_samples x n_channels]. MEG samples (the units
should be defined using kwargs)
fs : int or float
Sample rate of the recording.
channel_set : list or meeg_standards.MEGChannelSet
MEG channel set.
kwargs: kwargs
Any other parameter provided will be saved in the class (e.g.,
equipment description)
"""
# To numpy arrays
times = np.array(times)
signal = np.array(signal)
# Check errors
if signal.shape[1] != channel_set.n_cha:
raise Exception("Signal with shape [samples x channels] does not "
"match with the number of channels")
if times.shape[0] != signal.shape[0]:
raise Exception("Parameters times and signal must have the same "
"length")
# Standard attributes
self.times = times
self.signal = signal
self.fs = fs
self.channel_set = channel_set
# Optional attributes
for key, value in kwargs.items():
setattr(self, key, value)
[docs] def change_channel_set(self, channel_set):
"""Smart change of channel set, updating the signal and all related
attributes
Parameters
----------
channel_set : meeg_standards.MEGChannelSet
MEG channel set
"""
raise NotImplementedError
[docs] def to_serializable_obj(self):
rec_dict = self.__dict__
for key in rec_dict.keys():
if type(rec_dict[key]) == np.ndarray:
rec_dict[key] = rec_dict[key].tolist()
return rec_dict
[docs] @classmethod
def from_serializable_obj(cls, dict_data):
return cls(**dict_data)
[docs] @staticmethod
def load_meg_signal_from_spm_file(path, fs):
# TODO check everything
"""Function to load a MEG recording from a spm file
Parameters
----------
path : str
Path of the file
fs : int or float
Sample rate of the recording
"""
data = scipy.io.loadmat(path)
info = data['D'][0, 0]
datatype = info['datatype'][0]
num_chan = np.size(info['channels'])
num_samples = info['Nsamples'][0, 0]
raw = np.fromfile(path[0:len(path) - 3] + 'dat',
datatype[0:len(datatype) - 3])
signal = np.reshape(raw, [num_chan, num_samples], order='F')
times = np.linspace(0, signal[0] / fs, signal[0])
channels = None
return MEG(times, signal, fs, channels)
[docs]class Connecitivity:
# TODO: check everything
"""Customizable class with connectivity info from EEG/MEG recordings
"""
[docs] def __init__(self, data, trial_len, parameter, filt_mode, **kwargs):
"""Class constructor
Parameters
----------
data : bla bla
Bla bla
trial_len : bla bla
Bla bla
parameter : bla bla
Bla bla
filt_mode : bla bla
Bla bla
kwargs
Optional information of the EEG recording (e.g. subject, amplifier,
etc)
"""
# Params
data = np.array(data)
if not (filt_mode == 'all' or
filt_mode == 'bands' or
filt_mode == 'win'):
raise ValueError("Unknown filtering mode")
self.data = data
self.trial_len = trial_len
self.parameter = parameter
self.filt_mode = filt_mode
# Optional attributes
for key, value in kwargs.items():
setattr(self, key, value)
[docs]class UnknownStandardChannel(Exception):
[docs] def __init__(self, msg=None):
"""Class constructor
Parameters
----------
msg: string or None
Custom message
"""
if msg is None:
msg = 'Unknown standard channel'
super().__init__(msg)