from neuron import h
import numpy as np
def get_range(X, window, Y=None):
# X ia a matrix with 2 cols
# window is a 2 elements vector
r = X.__ge__(window[0]).__and__(X.__le__(window[1]))
if Y is None:
# Return a boolean array to be used as index for X rows.
return r
else:
return Y[r]
def X_is_running():
from subprocess import Popen, PIPE
p = Popen(["xset", "-q"], stdout=PIPE, stderr=PIPE)
p.communicate()
return p.returncode == 0
def dig_dict_save(name,d,hfg):
type_h_vector = type(h.Vector())
# Loop on the dict items
if type(d) == type({}):
for label,item in list(d.items()):
# The item is a dict create an hdf5 group and redo the loop
if label is not '__builtins__':
dig_dict_save(label,item,hfg.create_group(label))
elif type(d) == type(10.) or type(d) == type(10) or type(d) == type('') :
# Save floats and int
hfg.create_dataset(name,data=d)
elif type(d) is type([]):
# The d is a list
for element_index,element in enumerate(d):
# For each element create a group with unique label and redo the loop
dig_dict_save('%s_%g'%(name,element_index),element,hfg.create_group('%s_%g'%(name,element_index)))
elif isinstance(d,np.ndarray):
# The d is a np.array
hfg.create_dataset(name,data=d)
elif type(d) == type_h_vector:
# Save compressed vector
if name is 'Spikes':
hfg.create_dataset(name,data=np.array(d))
else:
hfg.create_dataset(
name,
data=np.
reshape(np.array(d),(np.array(d).shape[0],1)),
chunks=(min(np.array(d).shape[0],1000),1),
compression='gzip',
compression_opts=9)
def dig_dict_convert_to_savemat(d_source,d_dest):
# Loop on the dict items
for name,value in list(d_source.items()):
if type(value) == type({}):
# The value is a dict create an hdf5 group and redo the loop
d_dest[name.encode('utf-8')] = {}
dig_dict_convert_to_savemat(value,d_dest[name])
elif type(value) == type(10.) or type(value) == type(10) or type(value) == type('') :
# Save floats and int
d_dest[name.encode('utf-8')] = value
elif type(value) == type(h.Vector()) or type(value) == type([]):
# Save compressed vector
d_dest[name.encode('utf-8')] = np.array(value)
else:
print((type(value), "is not among the considered data types"))
def EPSP_peak(test_spikes,time,PSP, time_window=[0,500], slope = 'instantaneous'):
# Arguments:
# test_spikes time of spike for eliciting test EPSPs
# time array
# PSP signal to process
# tw time window to consider around each spike
import numpy as np
# time and PSP can be an HDF5 dataset
time = np.array(time)
bgs = []
peaks = []
amplitudes = []
slopes = []
# f,a = plt.subplots(1,1)
for spike in test_spikes:
tw = [spike-time_window[0],spike+time_window[1]]
tw_idx = time.__ge__(tw[0]).__and__(time.__le__(tw[1]))
EPSP = np.array(PSP[tw_idx])
bgs.append(EPSP[0])
peaks.append(np.max(EPSP))
amplitudes.append(np.max(EPSP) - bgs[-1])
# Slopes
# import ipdb; ipdb.set_trace()
begin_idx = (time[tw_idx]-tw[0]).__le__(2)
time_begin = time[tw_idx][begin_idx]-tw[0] # use only 2 ms after EPSP onset
EPSP_begin = EPSP[begin_idx]
# import ipdb;ipdb.set_trace()
# remove nan in time due to 0 time steps
if slope == 'instantaneous':
epsp_deriv = np.diff(EPSP_begin)
time_deriv = np.diff(time_begin)
# plot(time_begin,EPSP_begin,figure=a)
sl = epsp_deriv / time_deriv
slopes_not_nan = sl[~np.isnan(sl)]
# print slopes_not_nan
if slopes_not_nan.tolist():
slopes.append(max(slopes_not_nan))
if not slopes:
slopes = np.zero_like(epsp_deriv)
elif slope == 'differential':
slopes.append((EPSP_begin[-1]-EPSP_begin[0]) / (time_begin[-1]-time_begin[0]))
else:
print(('Slope can be either instantaneous or differential!!!!', slope))
# plt.show()
return bgs,peaks,amplitudes,slopes
def EPSP_trace(test_spike,time,PSP, time_window=[0,100], zero = 0):
# Arguments:
# test_spikes time of spike for eliciting test EPSPs
# time array
# PSP signal to process
# tw time window to consider around each spike
import numpy as np
# time and PSP can be an HDF5 dataset
time = np.array(time)
spike = test_spike
tw = [spike+time_window[0],spike+time_window[1]]
tw_idx = time.__ge__(tw[0]).__and__(time.__le__(tw[1]))
EPSP = np.array(PSP[tw_idx])
print((time_window, time[tw_idx][0], spike))
EPSP = np.column_stack((time[tw_idx]-zero,EPSP))
print((EPSP[0,:]))
return EPSP
def EPSP_features(run_data,section,location, time = None):
# Test EPSPs
if time is None:
tests = {'pre':{},'dur':{},'post':{}}
(tests['pre']['bgs'],tests['pre']['peaks'],tests['pre']['amplitudes'],tests['pre']['slopes']) = EPSP_peak(
run_data['test_pre_spike_times'],
run_data['%s/time/Data'%section],
run_data['%s/v_%g/Data'%(section,location)],
slope = 'instantaneous')
(tests['dur']['bgs'],tests['dur']['peaks'],tests['dur']['amplitudes'],tests['dur']['slopes']) = EPSP_peak(
run_data['test_during_spike_times'],
run_data['%s/time/Data'%section],
run_data['%s/v_%g/Data'%(section,location)],
slope = 'instantaneous')
(tests['post']['bgs'],tests['post']['peaks'],tests['post']['amplitudes'],tests['post']['slopes']) = EPSP_peak(
run_data['test_post_spike_times'],
run_data['%s/time/Data'%section],
run_data['%s/v_%g/Data'%(section,location)],
slope = 'instantaneous')
else:
tests = {'pre':{},'dur':{},'post':{}}
print((list(run_data.keys()),'%s/v_%g/Data'%(section,location)))
(tests['pre']['bgs'],tests['pre']['peaks'],tests['pre']['amplitudes'],tests['pre']['slopes']) = EPSP_peak(
run_data['test_pre_spike_times'],
time,
run_data['%s_v_%g/Data'%(section,location)],
slope = 'instantaneous')
(tests['dur']['bgs'],tests['dur']['peaks'],tests['dur']['amplitudes'],tests['dur']['slopes']) = EPSP_peak(
run_data['test_during_spike_times'],
time,
run_data['%s_v_%g/Data'%(section,location)],
slope = 'instantaneous')
(tests['post']['bgs'],tests['post']['peaks'],tests['post']['amplitudes'],tests['post']['slopes']) = EPSP_peak(
run_data['test_post_spike_times'],
time,
run_data['%s_v_%g/Data'%(section,location)],
slope = 'instantaneous')
EPSP_LTP_pre = np.column_stack((np.array(run_data['test_pre_spike_times'])[1:],
np.array(tests['pre']['amplitudes'])[1:]/np.median(tests['pre']['amplitudes'])*100))
EPSP_LTP_dur = np.column_stack((np.array(run_data['test_during_spike_times']),
np.array(tests['dur']['amplitudes'])/np.median(tests['dur']['amplitudes'])*100))
EPSP_LTP_post = np.column_stack((np.array(run_data['test_post_spike_times'])[1:],
np.array(tests['post']['amplitudes'])[1:]/np.median(tests['pre']['amplitudes'])*100))
EPSP_LTP = np.concatenate((EPSP_LTP_pre,EPSP_LTP_dur,EPSP_LTP_post), axis=0)
EPSP_LTP_sl_pre = np.column_stack((np.array(run_data['test_pre_spike_times'])[1:],
np.array(tests['pre']['slopes'])[1:]/np.median(tests['pre']['slopes'])*100))
EPSP_LTP_sl_dur = np.column_stack((np.array(run_data['test_during_spike_times']),
np.array(tests['dur']['slopes'])/np.median(tests['dur']['slopes'])*100))
print((np.array(run_data['test_post_spike_times'])[1:].shape, np.array(tests['post']['slopes'])[1:].shape))
EPSP_LTP_sl_post = np.column_stack((np.array(run_data['test_post_spike_times'])[1:-1],
np.array(tests['post']['slopes'])[1:]/np.median(tests['pre']['slopes'])*100))
EPSP_LTP_sl = np.concatenate((EPSP_LTP_pre,EPSP_LTP_dur,EPSP_LTP_post), axis=0)
return EPSP_LTP, EPSP_LTP_sl
def balance_currents(Vrest,sl, check = False):
# Arguments: $1 Vrest
h.init()
print(("Balancing all currents to ", Vrest))
h.finitialize(Vrest)
for sec in sl:
for seg in sec:
if check:
e_pas = seg.e_pas
seg.e_pas = seg.v
if h.ismembrane("na_ion"):
seg.e_pas = seg.e_pas + (seg.ina + seg.ik) / seg.g_pas
if h.ismembrane("hd"):
seg.e_pas = seg.e_pas + seg.i_hd/seg.g_pas
if h.ismembrane("ca_ion"):
seg.e_pas = seg.e_pas + seg.ica/seg.g_pas
if check:
print((e_pas, seg.e_pas))
def PPR(time,V,t1,t2,t3):
# Cal 100 * max(V([t1,t2])-Vrest) / max(V([t2,t3])-Vrest)
# time = np.array(branch.cell.records['time']['val'])
trest = t1 -10 # ms
trest_i = time.__ge__(trest).nonzero()[0][0]
Vrest = V[trest_i]
t1_i = time.__ge__(t1).nonzero()[0][0]
t2_i = time.__ge__(t2).nonzero()[0][0]
t3_i = time.__ge__(t3).nonzero()[0][0]
V1 = np.max(V[t1_i:t2_i]) - Vrest
V2 = np.max(V[t2_i:t3_i]) - Vrest
return V2/V1 * 100