Source code for opmcoils.analysis
# Author: Mainak Jas <mainakjas@gmail.com>
import numpy as np
import mne
[docs]
def add_ch_loc(raws, info_helmet, ch_mapping):
"""Add channel locations.
Parameters
----------
raws : list
List of raw objects.
info_helmet : mne.Info
Info object containing channel locations.
ch_mapping : dict
Dictionary containing channel mapping.
"""
if not isinstance(raws, list):
raws = [raws]
n_runs = len(raws)
def sanitize_helmet(ch_name):
ch_label = int(ch_name.strip('A'))
return f'MEG{ch_label:02d}'
mne.channels.rename_channels(info_helmet, sanitize_helmet)
for run in range(n_runs):
raw = raws[run]
# get inverse mapping
inv_mapping = dict()
for (k, v) in ch_mapping[f'run{run + 1}'].items():
if v in raw.ch_names:
if n_runs == 1:
inv_mapping[v] = f'MEG{k}'
else:
inv_mapping[v] = f'MEG{k}_run{run + 1}'
raw.rename_channels(inv_mapping)
raw.reorder_channels(np.sort(raw.ch_names).tolist())
for ch in raw.info['chs']:
ch_name = None
if ch['ch_name'] in info_helmet.ch_names:
ch_name = ch['ch_name']
elif ch['ch_name'].split('_')[0] in info_helmet.ch_names:
ch_name = ch['ch_name'].split('_')[0]
if ch_name is None:
continue
idx = info_helmet.ch_names.index(ch_name)
ch['loc'] = info_helmet['chs'][idx]['loc']
[docs]
def read_opm_info(fname):
"""Read info for OPM.
Parameters
----------
fname : str
Path to the info file.
"""
info = mne.io.read_info(fname)
sel = mne.pick_channels_regexp(info['ch_names'], '(?!.*_index)')
info = mne.pick_info(info, sel)
for ch in info['chs']:
if ch['coil_type'] == 3024:
ch['coil_type'] = 8101
ch['loc'][:3] /= 1000. # convert to meters
return info
[docs]
def find_events(raw, **kwargs):
"""Find events in OPM-MEG data."""
if not raw.preload:
raise ValueError('raw data must be preloaded')
if 'stim_channel' in kwargs:
picks = mne.pick_channels(raw.ch_names, kwargs['stim_channel'])
else:
picks = mne.pick_types(raw.info, stim=True, meg=False, eeg=False)
for pow, pick in enumerate(picks):
stim_data = np.zeros_like(raw.times)
st_data, times = raw[pick]
st_data[st_data < 4] = 0. # TTL pulses are 0-5 V
st_data[st_data >=4] = 2 ** pow
raw._data[pick] = st_data
return mne.find_events(raw, **kwargs)
[docs]
def get_good_chs(fnames):
"""Get good channels from txt files with residual fields.
Each time on-sensor coils are used to zero the background field,
the sensors which do not work may be different. This function will return
the intersection of the good channels.
Parameters
----------
fnames : list
List of file names.
Returns
-------
good_chs : list
The good channels.
"""
good_ch_mask = None
for fname in fnames:
X = np.loadtxt(fname,
[('ch_name', 'U5'), ('Bx', '<f4'),
('By', '<f4'), ('Bz', '<f4'), ('Cal', '<f4')])
if good_ch_mask is None:
good_ch_mask = ~np.isnan(X['Bx'])
else:
good_ch_mask = good_ch_mask & (~np.isnan(X['Bx']))
return X['ch_name'][good_ch_mask]
[docs]
def load_remnant_fields(fname, ch_names, bias=None):
"""Load remnant fields from txt file.
Parameters
----------
fname : str
Path to the txt file.
ch_names : list
List of channel names.
bias : dict
Dictionary containing bias for each channel.
"""
B = np.empty(len(ch_names,))
X = np.loadtxt(fname,
[('ch_name', 'U5'), ('Bx', '<f4'),
('By', '<f4'), ('Bz', '<f4'), ('Cal', '<f4')])
for idx, ch_name in enumerate(ch_names):
zdx = X['ch_name'].tolist().index(ch_name)
B[idx] = X['Bz'][zdx] - bias[ch_name]
return B