Source code for medusa.bci.nft_paradigms
"""
In this module you will find useful functions and classes to apply on-line
Neurofeedback models. Each model is based on different features to be used
as target to train. Enjoy!
@author: Diego Marcos-Martínez
"""
# Built-in imports
from abc import ABC, abstractmethod
import concurrent
# External imports
import numpy as np
import scipy.signal
# Medusa imports
import medusa as mds
from medusa import components
from medusa import meeg
from medusa.spatial_filtering import LaplacianFilter, car
from medusa.connectivity.phase_connectivity import phase_connectivity
from medusa.connectivity.amplitude_connectivity import __aec_ort_cpu as aec
from medusa.graph_theory import degree
from medusa.artifact_removal import reject_noisy_epochs
from medusa.epoching import get_epochs_of_events
from medusa.local_activation.spectral_parameteres import absolute_band_power
[docs]class SignalPreprocessing(components.ProcessingMethod):
"""
Common preprocessing applied in Neurofeedback applications.
It is composed by a frequency IIR filter followed by a spatial
filters. Functions are adapted to filter the signal in more than one frequency
range, if necessary.
"""
[docs] def __init__(self, filter_dict=None, montage=None, target_channels=None,
laplacian=False, car=False, n_cha_lp=None):
super().__init__(prep_fit_transform=['signal'],
prep_transform=['signal'],
narrow_transform=['signal'])
# Error check
if not filter_dict:
raise ValueError('[SignalPreprocessing] Filter dict parameter '
'"filter_dict" must be a list containing all '
'necessary information to perform the filtering!. '
'The information should be: type and cutoff')
for filter in filter_dict:
if not isinstance(filter, dict):
raise ValueError('[SignalPreprocessing] Each filter must '
'be a dict()!')
if 'cutoff' not in filter or 'type' not in filter:
raise ValueError('[SignalPreprocessing] Each filter must '
'be a dict() containing the following keys: '
'"cutoff"and "type"!')
if filter['type'] != 'training' and filter['type'] != 'artifact':
raise ValueError('[SignalPreprocessing] "type" must be "training"'
'or "artifact".')
if not montage:
raise ValueError('[SignalPreprocessing] Pre-processing parameter'
'"montage" must be a dict containing all'
'labels of channels and montage standard key')
if laplacian and target_channels is None:
raise ValueError('[SignalPreprocessing] Laplacian filter needs to '
'define "target_channels" parameter.')
# Parameters
self.filter_dict = filter_dict
self.l_cha = montage.l_cha
self.target_channels = target_channels
self.montage = montage
self.perform_car = car
self.perform_laplacian = laplacian
# Variables
self.artifact_iir_filters = []
self.target_iir_filters = []
self.offset_line_removal = None
self.laplacian_filter = None
[docs] def fit(self, fs):
"""
Fits the IIR filter and Laplacian spatial filter (if selected) for signal
preprocessing stage.
Parameters
----------
fs: float
Sampling rate in Hz.
"""
# Fit Spectral Filters (Predefined to be optimal)
self.offset_line_removal = mds.IIRFilter(order=3,
cutoff=[0.5, 40],
btype='bandpass',
filt_method='sosfiltfilt')
self.offset_line_removal.fit(fs, len(self.l_cha))
# Define filters for filtering over epochs
for filter in self.filter_dict:
for f in filter['cutoff']:
iir = mds.IIRFilter(order=1,
cutoff=f,
btype='bandpass',
filt_method='sosfiltfilt')
if self.target_channels is None:
iir.fit(fs, len(self.l_cha))
else:
iir.fit(fs, len(self.target_channels))
if filter['type'] == 'artifact':
self.artifact_iir_filters.append(iir)
elif filter['type'] == 'training':
self.target_iir_filters.append(iir)
# Fit Laplacian Filter
if self.perform_laplacian:
if len(self.montage.l_cha) >= 5:
self.laplacian_filter = LaplacianFilter(self.montage, mode='auto')
self.laplacian_filter.fit_lp(l_cha_to_filter=self.target_channels)
[docs] def prep_transform(self, signal, parallel_computing=True):
"""
Transforms an EEG signal applying IIR filter. It also applies CAR and
Laplacian spatial filter sequentially if desired.
Parameters
----------
signal: list or numpy.ndarray
Signal to transform. Shape [n_samples x n_channels]
parallel_computing: bool
If true, it filters the signal concurrently
Returns
-------
signal_: numpy.ndarray
Original signal with power line and offset removed, and spatially
filtered if chosen. [n_samples, n_channels].
signal_artifacts: numpy.ndarray
If it has been chosen to reject artifact sections, this matrix
contains the filtered signal in the frequency bands associated
with these artifacts. [n_artifact_bands, n_samples, n_channels].
"""
# Initialize variable
n_samples = signal.shape[0]
if len(self.artifact_iir_filters) == 0:
signal_artifacts = None
else:
if self.target_channels is None:
signal_artifacts = np.empty(
(
len(self.artifact_iir_filters), n_samples, len(self.l_cha)))
else:
signal_artifacts = np.empty(
(len(self.artifact_iir_filters), n_samples,
len(self.target_channels)))
signal_ = self.offset_line_removal.transform(signal)
# Spatial filtering
if self.perform_car:
signal_ = car(signal_)
if self.perform_laplacian:
# Check if surface laplacian filter cannot be performed
if self.laplacian_filter is not None:
signal_ = self.laplacian_filter.apply_lp(signal_)
else:
signal_ = signal_[:,self.montage.get_cha_idx_from_labels(self.target_channels)]
signal__ = signal_.copy()
if signal_artifacts is not None:
# Filter only the target channels in Power Based models
if signal__.shape[1] != signal_artifacts.shape[2]:
signal__ = signal__[:,self.montage.get_cha_idx_from_labels(
self.target_channels)]
# Frequency filtering on artifact-related bands
if parallel_computing:
filt_threads = []
for filter in self.artifact_iir_filters:
t = components.ThreadWithReturnValue(target=
filter.transform,
args=(signal__,))
filt_threads.append(t)
t.start()
for filt_idx, thread in enumerate(filt_threads):
signal_artifacts[filt_idx, :, :] = thread.join()
else:
for filt_idx, filter in enumerate(self.artifact_iir_filters):
signal_artifacts[filt_idx, :, :] = filter.transform(
signal__[np.newaxis, :, :])
return signal_, signal_artifacts
[docs] def prep_fit_transform(self, fs, signal):
"""
Fits the IIR filter and transforms an EEG signal applying IIR
filter and spatial filters sequentially.
Parameters
----------
fs: float
Sampling rate in Hz.
n_cha_lp: int
Number of nearest channels to compute Laplacian spatial
filter (Auto mode).
signal: list or numpy.ndarray
Signal to transform. Shape [n_samples x n_channels]
Returns
-------
signal: numpy.ndarray
Original signal with power line and offset removed, and spatially
filtered if chosen. [n_samples, n_channels].
signal_artifacts: numpy.ndarray
If it has been chosen to reject artifact sections, this matrix
contains the filtered signal in the frequency bands associated
with these artifacts. [n_artifact_bands, n_samples, n_channels].
"""
self.fit(fs)
return self.prep_transform(signal)
[docs] def narrow_transform(self, signal, parallel_computing=True):
"""
Applies the IIR filter for narrow band filtering.
Parameters
----------
signal: list or numpy.ndarray
Signal to transform. Shape [n_samples x n_channels].
parallel_computing: bool
If true, it filters the signal concurrently.
Returns
-------
signal: numpy.ndarray
Signal filtered in the training band [n_samples, n_channels].
"""
f_signals = np.empty(
(len(self.target_iir_filters), signal.shape[0], signal.shape[1]))
if parallel_computing:
filt_threads = []
for filter in self.target_iir_filters:
t = components.ThreadWithReturnValue(target=
filter.transform,
args=(signal,))
filt_threads.append(t)
t.start()
for filt_idx, thread in enumerate(filt_threads):
f_signals[filt_idx, :, :] = thread.join()
else:
for filt_idx, filter in enumerate(self.target_iir_filters):
f_signals[filt_idx, :, :] = filter.transform(signal)
return np.squeeze(f_signals)
[docs]def ignore_noisy_windows(signals, thresholds, pct_tol):
"""
This function check if a specific signal segment contains noise above the
pre-established thresholds.
Parameters
----------
signals: numpy.ndarray
Array containing the signal filtered in the frequency bands associated to
artifacts to avoid. [n_artifact_bands, n_samples, n_channels].
thresholds: numpy.ndarray
Array containing the variance thresholds related to artifacts to avoid.
pct_tol: numpy.ndarray
Array containing variance increase (in percentage) tolerated.
Returns
-------
"""
# Check if power in forbidden bands is over thresholds
over_var = np.sum(np.std(signals, axis=1).mean(axis=-1) >=
(1 + pct_tol) * thresholds)
if over_var < 1:
return True
else:
return False
[docs]def make_windows(signal, fs, update_feature_window, update_rate,
n_cha=1, n_samp=2, k=4, reject=True):
"""
Parameters
----------
signal: numpy.ndarray
Signal to be converted into epochs. [n_samples, n_channels].
fs: int or float
Sampling rate.
update_feature_window: int or float
Time window taken for the calculation of the characteristic (in seconds).
update_rate: int or float
Feedback update time in online mode.
n_cha: int
Threshold number of channels meeting the rejection condition to reject
the epoch.
n_samp: int
Threshold number of samples meeting the rejection condition to reject
the epoch.
k: int
Standard deviation of the signal. Used in the definition of the
rejection criterion.
reject: bool
If true, it returns the epochs that have not been reject. Else, it
returns the whole windowed signal.
Returns
-------
good_epochs: numpy.ndarray
Array containing the signal divided into epochs that are not noisy.
[n_epochs, n_samples, n_channels].
ind: numpy.ndarray
Array containing bools. True for epochs that were rejected and False for
epochs that were not.
"""
if len(signal.shape) == 1:
signal = signal[:, np.newaxis]
# Define necessary parameters
s_duration = signal.shape[0] / fs
s_mean = np.mean(signal, axis=0)
s_std = np.std(signal, axis=0)
# Set onsets vector
onsets = np.arange(0, s_duration - update_feature_window, update_rate)
s_windowed = get_epochs_of_events(np.arange(0, s_duration, 1 / fs), signal,
onsets, fs,
[0, update_feature_window * 1000])
# Return windows without discarding
if not reject:
return s_windowed
pct_rejected, good_epochs, idx = reject_noisy_epochs(s_windowed, s_mean,
s_std, k, n_samp,
n_cha)
return good_epochs, idx
[docs]class ConnectivityExtraction(components.ProcessingMethod):
"""
Functional Connectivity-based features to extract from user's EEG.
"""
[docs] def __init__(self, l_baseline_t=5, fs=250, update_feature_window=2,
update_rate=0.25, fc_measure=None, mode=None, montage=None,
target_channels=None, pct_tol=0.9):
"""
Class constructor
l_baseline_t: int
Time employed to calculate the number of samples to obtain baseline
connectivity parameter. In seconds.
fs: int or float
Sample rate of the recording.
update_feature_window: int or float
Length in seconds of the temporal window applied to calculate
the feature.
update_rate: int or float
Feedback update time in online mode.
fc_measure: str
"WPLI" or "AECORT". Measure of Functional Connectivity to calculate.
mode: str
"Global coupling", "Strength" or "Coupling". Information extracted
from adjacency matrix.
montage: EEGChannelSet
target_channels: list or None
List containing the labels of the target channels.
pct_tol: numpy.ndarray
Array containing variance increase (in percentage) tolerated.
"""
super().__init__(ext_feature=['conn_value'])
# Check errors
if not montage:
raise ValueError('[ConnectivityExtraction] "montage parameter"'
' must be a dict containing all'
'labels of channels and montage standard key')
if fc_measure != "WPLI" and fc_measure != "AECORT":
raise ValueError('[ConnectivityExtraction] Invalid functional '
'connectivity measure. Available measures are '
'"WPLI" and "AECORT".')
if mode != "Global coupling" and mode != "Strength" and mode != \
"Coupling":
raise ValueError('[ConnectivityExtraction] Invalid mode. '
'Available modes are '
'"Global coupling", "Strength" and "Coupling".')
if target_channels is None:
if mode == "Strength":
raise UserWarning(
'[ConnectivityExtraction] Using "Strength" mode'
'without defining target channels. Average strength'
'of all channels will be returned instead.')
if mode == "Coupling":
mode = "Global coupling"
raise UserWarning(
'[ConnectivityExtraction] Using "Coupling" mode'
'without defining target channels. Global coupling of all '
'channels will be returned instead.')
self.target_channels = target_channels
else:
self.target_channels = montage.get_cha_idx_from_labels(
target_channels)
self.fc_measure = fc_measure
self.mode = mode
self.fs = fs
self.l_baseline_t = l_baseline_t
self.montage = montage
self.update_feature_window = update_feature_window
self.update_rate = update_rate
self.w_signal_samples = int(update_feature_window * self.fs)
self.w_signal_samples_calibration = int((self.l_baseline_t)* self.fs)
self.pct_tol = pct_tol
self.thresholds = None
self.baseline_value = None
[docs] def set_baseline(self, signal, signal_artifacts, filtered_signal):
"""
This functions establish the baseline value.
Parameters
----------
signal: numpy.ndarray
Original signal pre-processed (offset and power-line removed, and CAR)
[n_samples, n_channels].
signal_artifacts: numpy.ndarray
Array containing the signal filtered in each frequency band associated
to the artifacts to avoid. [n_artifact_bands, n_samples, n_channels].
filtered_signal: numpy.ndarray
Signal filtered in the narrow band for training. [n_samples, n_channels].
Returns
-------
baseline_value: float
"""
# Delete borders to avoid effect borders
dif_window = round((signal.shape[0] - self.w_signal_samples_calibration)/2)
if dif_window <= 0:
raise ValueError('The duration of the signal for the baseline '
'calculation should be slightly longer than the '
'window time taken for the baseline calculation. ')
epochs_original, index = make_windows(signal[dif_window:-dif_window, :]
, self.fs, self.update_feature_window,
self.update_rate)
filtered_epochs = make_windows(
filtered_signal[dif_window:-dif_window, :], self.fs,
self.update_feature_window, self.update_rate,
reject=False)[~index, :, :]
adj_mat = self.calculate_adj_mat(filtered_epochs)
# Parallel computing baseline values
filt_threads = []
baseline_values = []
for epoch_mat in adj_mat:
t = components.ThreadWithReturnValue(target=self.calculate_feature,
args=(epoch_mat,))
filt_threads.append(t)
t.start()
for filt_idx, thread in enumerate(filt_threads):
baseline_values.append(thread.join())
self.baseline_value = np.mean(baseline_values)
# Define artifact related thresholds
if signal_artifacts is not None:
self.thresholds = np.std(
signal_artifacts[:, -self.w_signal_samples_calibration:,
:], axis=1).mean(axis=-1)
return self.baseline_value
[docs] def ext_feature(self, signal, signal_artifacts):
"""
Function for extracting FC values in online mode.
Parameters
----------
signal: numpy.ndarray
Signal filtered in the narrow band for training. [n_samples, n_channels].
signal_artifacts: numpy.ndarray or None
Array containing the signal filtered in each frequency band associated
to the artifacts to avoid. [n_artifact_bands, n_samples, n_channels].
Returns
-------
c_value: float
"""
if self.baseline_value is None:
raise ValueError(
'[ConnectivityExtraction] Calibration not performed.')
# Delete borders
dif_window = round((signal.shape[0] - self.w_signal_samples)/2)
if dif_window <= 0:
raise ValueError('The duration of the signal for feedback '
'calculation should be slightly longer than the '
'window time taken for the feedback calculation. ')
adj_mat = self.calculate_adj_mat(signal[dif_window:-dif_window, :])
c_value = self.calculate_feature(np.squeeze(adj_mat)) \
- self.baseline_value
# Check if artifact bands are defined
if signal_artifacts is not None:
if ignore_noisy_windows(signal_artifacts[:,dif_window:-dif_window,
self.target_channels], self.thresholds,
self.pct_tol):
return c_value
else:
return None
return c_value
[docs] def calculate_adj_mat(self, signal):
"""
This function calculates the adjacency matrix depending on the FC mode.
Parameters
----------
signal: numpy.ndarray
Signal filtered in the narrow band for training.
[n_epochs, n_samples, n_channels] or [n_samples, n_channels].
Returns
-------
adj_mat: numpy.ndarray
[n_epochs, n_channels, n_channels].
"""
# Calculate adjacency matrix depending on FC measure chosen
adj_mat = None
if self.fc_measure == "WPLI":
adj_mat = phase_connectivity(signal,'wpli')
# This is under development
elif self.fc_measure == "AECORT":
adj_mat = aec(signal)
return adj_mat
[docs] def calculate_feature(self, adj_mat):
"""
Calculates Graph metric from adjacency matrix.
Parameters
----------
adj_mat: numpy.ndarray
[n_channels, n_channels].
"""
# Calculate the baseline value depending on mode chosen
if self.mode == "Global coupling":
tri_l_idx = np.tril_indices(adj_mat.shape[0], -1)
return np.nanmean(np.asarray(adj_mat)[tri_l_idx])
elif self.mode == "Strength":
if self.target_channels is None:
return np.mean(degree.degree(np.asarray(adj_mat),
'CPU'))
else:
return np.mean(degree.degree(np.asarray(adj_mat),
'CPU')[
self.target_channels])
elif self.mode == "Coupling":
return self.mean_coupling(adj_mat)
[docs] def mean_coupling(self, adj_mat):
"""
This function calculates the connectivity values between all the
target channels and average it value.
Parameters
----------
adj_mat: numpy.ndarray
[n_channels, n_channels].
"""
c = []
for ind, ch_ind_1 in enumerate(self.target_channels[:-1]):
for ch_ind_2 in self.target_channels[ind + 1:]:
c.append(np.array(adj_mat[ch_ind_1, ch_ind_2]))
return np.mean(c)
[docs]class PowerExtraction(components.ProcessingMethod):
"""
Power-based features to extract from user's EEG.
"""
[docs] def __init__(self, l_baseline_t=5, fs=250, update_feature_window=2,
update_rate=0.25, pct_tol = 0.9, f_dict=None, mode=None):
"""
Class constructor
l_baseline_t: int
Time employed to calculate the number of samples to obtain baseline
power parameter. In seconds.
fs: int or float
Sample rate of the recording.
update_feature_window = int
Length in seconds of the temporal window applied to calculate
the feature.
update_rate: int or float
Feedback update time in online mode.
f_dict: dict
Dict containing the frequency bands associated to training band
and artifacts to avoid.
pct_tol: numpy.ndarray
Array containing variance increase (in percentage) tolerated.
mode: str
"single" or "ratio"
"""
super().__init__(band_power=['band power'])
self.mode = mode
self.fs = fs
self.l_baseline_t = l_baseline_t
self.update_feature_window = update_feature_window
self.update_rate = update_rate
self.w_signal_samples = int(update_feature_window * self.fs)
self.w_signal_samples_calibration = int(self.l_baseline_t * self.fs)
self.f_dict = f_dict
self.pct_tol = pct_tol
self.baseline_power = None
self.thresholds = None
[docs] def set_baseline(self, signal, signal_artifacts):
"""
This function sets the power baseline, given the already filtered EEG
containing the calibration phase. Also, takes into account the
Neurofeedback training mode, so performs different baseline calculations
depending on the mode set.
Parameters
----------
signal: numpy.ndarray
EEG already pre-processed. [n_samples, n_channels].
signal_artifacts: numpy.ndarray
Signal filtered in the frequency bands associated to the artifacts
to be avoided. [n_artifact_bands, n_samples, n_channels].
Returns
------
baseline_power: float
"""
# Delete borders
dif_window = round(
(signal.shape[0] - self.w_signal_samples_calibration) / 2)
if dif_window <= 0:
raise ValueError('The duration of the signal for the baseline '
'calculation should be slightly longer than the '
'window time taken for the baseline calculation.')
epochs, _ = make_windows(signal[dif_window:-dif_window, :]
, self.fs, self.update_feature_window,
self.update_rate)
_, psd = scipy.signal.welch(epochs, self.fs, 'hamming',
self.w_signal_samples,
axis=1,scaling='density')
b_power = self.power(psd)
# Define artifact related thresholds
if signal_artifacts is not None:
self.thresholds = np.std(
signal_artifacts[:, -self.w_signal_samples_calibration:,
:], axis=1).mean(axis=-1)
if self.mode == 'single':
self.baseline_power = b_power[0]
elif self.mode == 'ratio':
self.baseline_power = b_power[0] / b_power[1]
return self.baseline_power
[docs] def band_power(self, signal, signal_artifacts):
"""
This function returns the band power from Power Spectral Density.
If signal noise is above the pre-established thresholds, this function
will return None.
Parameters
----------
signal: numpy.ndarray
Signal pre-processed. [n_samples, n_channels].
signal_artifacts: numpy.ndarray
Signal filtered in the frequency bands associated to the artifacts
to be avoided. [n_artifact_bands, n_samples, n_channels].
Returns
------
b_power: float or None
"""
if self.baseline_power is None:
raise ValueError('[PowerExtraction] Calibration not performed.')
dif_window = round(
(signal.shape[0] - self.w_signal_samples) / 2)
if dif_window <= 0:
raise ValueError('The duration of the signal for the feedback '
'calculation should be slightly longer than the '
'window time taken for the feedback calculation. ')
_, psd = scipy.signal.welch(signal[dif_window:-dif_window], self.fs, 'hamming',
self.w_signal_samples,
axis=0,scaling='density')
b_power_uncorrected = self.power(psd)
if self.mode == 'single':
b_power = b_power_uncorrected[0] - self.baseline_power
elif self.mode == 'ratio':
b_power = b_power_uncorrected[0] / b_power_uncorrected[
1] - self.baseline_power
# Check if artifact bands are defined
if signal_artifacts is not None:
if signal_artifacts is not None:
if ignore_noisy_windows(signal_artifacts, self.thresholds,
self.pct_tol):
return b_power
else:
return None
return b_power
[docs] def power(self, psd):
"""
This function calculates power from Power Spectral Density
Parameters
----------
psd: numpy.ndarray
[n_epochs, n_samples, n_channels].
Returns
-------
powers: numpy.ndarray
[n_training_bands].
"""
# Check if psd has epochs dimension
if len(psd.shape) == 2:
psd = psd[np.newaxis,:,:]
bands = []
# Extract training bands limits
for dict in self.f_dict:
if dict['type'] == 'training':
bands.append(dict['cutoff'])
powers = np.zeros(len(bands))
# Calculate band power relative to the whole bandwidth
for idx, band in enumerate(bands):
for b in band:
powers[idx] += np.mean(np.mean(absolute_band_power(psd, self.fs,
b),
axis=0))
return powers
[docs]class ConnectivityBasedNFTModel(components.Algorithm):
[docs] def __init__(self, fs, filter_dict, l_baseline_t, update_feature_window,
update_rate, montage, target_channels, fc_measure, mode,
apply_car, pct_tol_ocular=None,pct_tol_muscular=None):
super().__init__(calibration=['baseline_value'],
training=['feedback_value'])
"""
Pipeline for Connectivity-based Neurofeedback training. This class
inherits from components.Algorithm. Therefore, it can be used to create
standalone algorithms that can be used in compatible apps from
medusa-platform for online experiments. See components.Algorithm to know
more about this functionality.
"""
# Settings
self.fs = fs
self.filter_dict = filter_dict
self.l_baseline_t = l_baseline_t
self.update_feature_window = update_feature_window
self.montage = montage
self.target_channels = target_channels
self.fc_measure = fc_measure
self.mode = mode
self.apply_car = apply_car
# Variables
self.baseline_value = None
self.pct_tol = None
# Set percentage tolerance to noisy signal
if pct_tol_ocular is None and pct_tol_muscular is not None:
self.pct_tol = pct_tol_muscular
elif pct_tol_ocular is not None and pct_tol_muscular is None:
self.pct_tol = pct_tol_ocular
elif pct_tol_ocular is not None and pct_tol_muscular is not None:
self.pct_tol = np.array([pct_tol_ocular, pct_tol_muscular])
# Check filter dict
if not self.check_cutoff_settings():
raise Exception('The number of frequency bands selected does not '
'match the Neurofeedback mode.')
# Add Pre-processing and Feature Extraction methods
self.add_method('prep_method',
SignalPreprocessing(filter_dict=self.filter_dict,
montage=self.montage,
target_channels=None,
car=self.apply_car))
self.add_method('feat_ext_method',
ConnectivityExtraction(fs=self.fs, l_baseline_t=self.l_baseline_t,
update_feature_window=update_feature_window,
fc_measure=self.fc_measure,
mode=self.mode,montage=self.montage,
target_channels=self.target_channels,
pct_tol=self.pct_tol,update_rate=update_rate))
[docs] def calibration(self, eeg):
"""
It pre-process eeg, gets signal filtered in artifact-related bands and
filters the pre-processed eeg in training band. Then, it calculates the
baseline value.
Parameters
----------
eeg: numpy.ndarray
[n_samples, n_channels]
Returns
-------
baseline_value: float
"""
original_signal, signal_artifacts = self.get_inst('prep_method').\
prep_fit_transform(
signal=eeg,
fs=self.fs)
narrow_filtered_signal = self.get_inst('prep_method').\
narrow_transform(signal=original_signal)
self.baseline_value = self.get_inst('feat_ext_method').\
set_baseline(signal=original_signal,signal_artifacts=signal_artifacts,
filtered_signal=narrow_filtered_signal)
[docs] def training(self, eeg):
"""
It pre-process eeg, gets signal filtered in artifact-related bands and
filters the pre-processed eeg in training band. Then, it calculates the
feedback value.
Parameters
----------
eeg: numpy.ndarray
[n_samples, n_channels]
Returns
-------
feedback_value: float
"""
original_signal, signal_artifacts = self.get_inst('prep_method').prep_transform(
signal=eeg)
narrow_filtered_signal = self.get_inst('prep_method'). \
narrow_transform(signal=original_signal)
feedback_value = self.get_inst('feat_ext_method').ext_feature(
signal=narrow_filtered_signal,signal_artifacts=signal_artifacts)
return feedback_value
[docs] def check_cutoff_settings(self):
"""
Function to check the correct definition of training band dictionary.
"""
target_bands = 0
for filter in self.filter_dict:
if filter['type'] == 'training':
target_bands += 1
if target_bands == 1:
return True
else:
return False
[docs]class PowerBasedNFTModel(components.Algorithm):
[docs] def __init__(self, fs, filter_dict, l_baseline_t, update_feature_window,
update_rate,montage, target_channels, mode, apply_car,
apply_laplacian, pct_tol_ocular=None, pct_tol_muscular=None):
"""
Pipeline for Power-based Neurofeedback training. This class
inherits from components.Algorithm. Therefore, it can be used to create
standalone algorithms that can be used in compatible apps from
medusa-platform for online experiments. See components.Algorithm to know
more about this functionality.
"""
super().__init__(calibration=['baseline_parameters'],
training=['feedback_value'])
"""
Class constructor
"""
# Settings
self.fs = fs
self.filter_dict = filter_dict
self.l_baseline_t = l_baseline_t
self.update_feature_window = update_feature_window
self.montage = montage
self.target_channels = target_channels
self.mode = mode
self.apply_car = apply_car
self.apply_laplacian = apply_laplacian
# Init variables
self.baseline_value = None
self.pct_tol = None
# Set percentage tolerance to noisy signal
if pct_tol_ocular is None and pct_tol_muscular is not None:
self.pct_tol = pct_tol_muscular
elif pct_tol_ocular is not None and pct_tol_muscular is None:
self.pct_tol = pct_tol_ocular
elif pct_tol_ocular is not None and pct_tol_muscular is not None:
self.pct_tol = np.array([pct_tol_ocular, pct_tol_muscular])
# # Check correct filter dict definition
if not self.check_cutoff_settings():
raise Exception('The number of frequency bands selected does not '
'match the Neurofeedback mode.')
self.add_method('prep_method',
SignalPreprocessing(filter_dict=self.filter_dict,
montage=self.montage,
target_channels=self.target_channels,
laplacian=self.apply_laplacian,
car=self.apply_car))
self.add_method('feat_ext_method',
PowerExtraction(fs=fs, l_baseline_t=l_baseline_t,
update_feature_window=update_feature_window,
update_rate=update_rate,mode=self.mode,
pct_tol=self.pct_tol,f_dict=filter_dict))
[docs] def calibration(self, eeg, **kwargs):
"""
It pre-process eeg and gets signal filtered in artifact-related bands.
Then, it calculates the baseline value.
Parameters
----------
eeg: numpy.ndarray
[n_samples, n_channels]
Returns
-------
baseline_value: float
"""
original_signal, signal_artifacts = self.get_inst('prep_method').\
prep_fit_transform(signal=eeg, fs=self.fs)
self.baseline_value = self.get_inst('feat_ext_method').set_baseline(
signal=original_signal,signal_artifacts=signal_artifacts)
[docs] def training(self, eeg):
"""
It pre-process eeg, gets signal filtered in artifact-related bands. Then,
it calculates the feedback value.
Parameters
----------
eeg: numpy.ndarray
[n_samples, n_channels]
Returns
-------
feedback_value: float
"""
original_signal, signal_artifacts = self.get_inst('prep_method').\
prep_transform(signal=eeg)
feedback_value = self.get_inst('feat_ext_method').band_power(
signal=original_signal, signal_artifacts=signal_artifacts)
return feedback_value
[docs] def check_cutoff_settings(self):
"""
Function to check the correct definition of training band dictionary.
"""
target_bands = 0
for filter in self.filter_dict:
if filter['type'] == 'training':
target_bands += 1
if self.mode == 'single':
if target_bands == 1:
return True
else:
return False
elif self.mode == 'ratio':
if target_bands == 2:
return True
else:
return False
[docs]class NeurofeedbackData(components.ExperimentData):
"""Experiment info class for Neurofeedback training experiments. It records
the important events that take place during a Neurofeedback run,
allowing offline analysis."""
[docs] def __init__(self, run_onsets, run_durations, run_success, run_pauses,
run_restarts, medusa_nft_app_settings, nft_values, nft_times,
nft_baseline):
self.run_onsets = run_onsets
self.run_durations = run_durations
self.run_success = run_success
self.run_pauses = run_pauses
self.run_restarts = run_restarts
self.medusa_nft_app_settings = medusa_nft_app_settings
self.nft_values = nft_values
self.nft_times = nft_times
self.nft_baseline = nft_baseline
[docs] def to_serializable_obj(self):
rec_dict = self.__dict__
for key in rec_dict.keys():
if type(rec_dict[key]) == np.ndarray:
rec_dict[key] = rec_dict[key].tolist()
return rec_dict