"""
feedback.py -- Feedback figure showing phase-code retrieval due to single cues
Created by Joe Monaco on 2010-10-12.
Copyright (c) 2009-2011 Johns Hopkins University. All rights reserved.
This software is provided AS IS under the terms of the Open Source MIT License.
See http://www.opensource.org/licenses/mit-license.php.
"""
import os
import numpy as np
from numpy import pi
import matplotlib as mpl
import matplotlib.pylab as plt
from enthought.traits.api import Int, Float, Array
from ..core.model import ModelPostMortem
from ..core.analysis import BaseAnalysis
from ..vmo import VMOModel
from ..error import alpha, gamma
from ..tools.images import array_to_image
from ..tools.stats import smooth_pdf
from ..tools.radians import circle_diff_vec
class VMOTrackFeedback(VMOModel):
"""
Add tracking of p*M path integration signal across oscillators
"""
label = "Tracking Model"
omega = 0
dt = 0.002
local_phase_error = Array(track=True)
distal_phase_error = Array(track=True)
local_fdbk = Array(track=True)
distal_fdbk = Array(track=True)
theta = Array(track=True)
def run_timestep(self):
super(VMOTrackFeedback, self).run_timestep()
self.local_phase_error[:] = \
circle_diff_vec(
self.dtheta_local[self.active_local_cue], self.dtheta_t)
self.distal_phase_error[:] = \
circle_diff_vec(
self.dtheta_distal[self.active_distal_cue], self.dtheta_t)
self.local_fdbk[:] = self.C_local * self.local_phase_error
self.distal_fdbk[:] = self.C_distal * self.distal_phase_error
def _local_phase_error_default(self):
return np.zeros(self.N_theta, 'd')
def _distal_phase_error_default(self):
return np.zeros(self.N_theta, 'd')
def _local_fdbk_default(self):
return np.zeros(self.N_theta, 'd')
def _distal_fdbk_default(self):
return np.zeros(self.N_theta, 'd')
def _theta_default(self):
return np.zeros(self.N_theta, 'd')
class VMOToyModel(VMOTrackFeedback):
label = "Ideal Traversal"
N_theta = 1
N_outputs = 1
N_cues_local = 1
N_cues_distal = 1
N_errors = Int(5)
max_error = Float(pi/2)
cue_t0 = 16.776
local_cue_std = pi/15
cue_std_t = Float
num_trials = 5
theta_t0_test = Float(-pi/4)
vbar = Float(10.0)
def __init__(self, **traits):
super(VMOToyModel, self).__init__(**traits)
self.dx_t = lambda t: self.vbar
self.dy_t = lambda t: 0.0
self.gamma_distal = 0.0
self.T = 25
def trial_setup(self):
super(VMOToyModel, self).trial_setup()
if self.trial <= 2:
self.gamma_local = 0.0
else:
self.gamma_local = self._gamma_local_default()
def _theta_t0_default(self):
t0list = [[self.theta_t0_test]]*2
t0tests = self.theta_t0_test + self.max_error*np.linspace(-1, 1, self.N_errors)
t0list += [[error] for error in t0tests]
return np.array(t0list, 'd')
def _num_trials_default(self):
return self.N_errors + 2
def _scale_default(self):
return np.array([200], 'd')
def _phi_default(self):
return np.array([pi/3])
def _cue_std_t_default(self):
"""Convert from cue width in space to cue width in time
"""
return self.local_cue_std*(9/pi)
def _F_local_default(self):
"""Get cue profile in time, not space
"""
return lambda alpha: np.exp(-(self.t-self.cue_t0)**2/(2*self.cue_std_t**2))
class FeedbackFigure(BaseAnalysis):
"""
Run simple feedback simulations across one cue for comparison with analytic
solution. Tracking for *local_phase_error* must be set to True in
VMOTrackFeedback.
"""
label = "feedback"
def collect_data(self, cue_sizes=[10, 30], cue_offset=9*pi/8, **kwargs):
"""Run a series of feedback simulations with cues of varying sizes.
Additional keyword arguments are passed on to VMOTrackFeedback.
"""
self.results['cue_sizes'] = cue_sizes = (pi/180)*np.array(cue_sizes)
self.results['N_cues'] = N_cues = len(cue_sizes)
pdict = dict( N_outputs=1,
N_theta=16,
N_cues_local=1,
N_cues_distal=1,
cue_offset=cue_offset,
C_W=0.05,
gamma_local=0,
gamma_distal=0,
num_trials=2*N_cues+2,
monitoring=True )
pdict.update(kwargs)
self.out('Running feedback simulations...')
model = VMOTrackFeedback(**pdict)
model.T = 25.0
theta_t0 = model.theta_t0[0].copy()
theta_t0_new = model.theta_t0[1].copy()
model.theta_t0[1:N_cues+2] = theta_t0
model.theta_t0[N_cues+2:2*N_cues+2] = theta_t0_new
model.advance()
model.advance()
self.results['cue_t0'] = model.cue_t_local[0]
for repeat in 'once', 'twice':
for size in cue_sizes:
model.local_cue_std = size
model.gamma_local = model._gamma_local_default()
model.advance()
model.post_mortem().tofile(os.path.join(self.datadir, 'data'))
self.out('Running toy simulations...')
toy = VMOToyModel(cue_t0=model.cue_t_local[0], local_cue_std=cue_sizes[0],
cue_offset=cue_offset)
toy.advance_all()
self.results['toy_phase'] = toy.dtheta_local[0,0]
toy.post_mortem().tofile(os.path.join(self.datadir, 'toy'))
self.out('All done!')
def create_plots(self, time_window=8, draw_background=True, draw_lines=True):
os.chdir(self.datadir)
self.out.outfd = file('figure.log', 'w')
self.figure = {}
figsize = 14, 10
plt.rcParams['figure.figsize'] = figsize
self.figure['feedback'] = f = plt.figure(figsize=figsize)
f.suptitle(self.label.title())
N_cues = self.results['N_cues']
cue_t0 = self.results['cue_t0']
cue_sizes = self.results['cue_sizes']
data = ModelPostMortem.fromfile('data.tar.gz')
toy = ModelPostMortem.fromfile('toy.tar.gz')
target = self.results['toy_phase']
self.out('Cue sizes = %s'%repr(cue_sizes))
self.out('Cue occurs at t = %.3f seconds'%cue_t0)
self.out('Toy model phase target = %.3f'%target)
time_window = float(time_window)
self.out('Displaying a %.1f-sec window around t=%.1f-sec'%(time_window,cue_t0))
_model_dt = data.t[1]-data.t[0]
_t0 = int(cue_t0/_model_dt)
_htw = int(0.5*time_window/_model_dt)
dt = slice(_t0-_htw, _t0+_htw)
v = np.squeeze(data.vel[0])
rs_bar = np.sqrt((v**2).sum(axis=1))[dt].mean()
self.out('Average running speed across window = %.3f cm/s'%rs_bar)
def draw_cue_bg(ax, trial, ymin=None, ymax=None, bins=128, use_toy=False):
if ymin is None:
ymin = ax.get_ylim()[0]
if ymax is None:
ymax = ax.get_ylim()[1]
if use_toy:
cue_profile = toy.C_local[trial]
else:
cue_profile = data.C_local[trial]
ct = np.linspace(-time_window/2, time_window/2, bins+1)
for i in xrange(bins):
r = mpl.patches.Rectangle((ct[i], ymin), ct[i+1]-ct[i],
ymax-ymin, alpha=0.2, lw=0, fill=True, zorder=-1,
fc=mpl.cm.jet(cue_profile[
int((float(i)/bins)*(dt.stop-dt.start))+dt.start]/
cue_profile[dt].max()))
ax.add_artist(r)
time = toy.t[dt] - cue_t0
xlim = time[0], time[-1]
panels = 4
rows = 3
row = 1
inset = (-1.2, 1.4, 2.4, 1.0)
ax = plt.subplot(rows, panels, 1)
if draw_lines:
ax.plot(time, toy.theta[1][dt,0], 'm--', lw=2, zorder=5)
for trial in xrange(2, toy.ntrials):
ax.plot(time, toy.theta[trial][dt,0], 'k-', aa=True, zorder=3)
ax.set_xlim(xlim)
if draw_lines:
ax.add_artist(
mpl.patches.Rectangle(inset[:2], inset[2], inset[3],
fill=False, lw=1, ec='0.5', zorder=2))
if draw_background:
draw_cue_bg(ax, 2, use_toy=True)
ax = plt.subplot(rows, panels, 2)
if draw_lines:
ax.plot(time, toy.theta[1][dt,0], 'm--', lw=2, zorder=5)
for trial in xrange(2, toy.ntrials):
ax.plot(time, toy.theta[trial][dt,0], 'k-', aa=True, zorder=3)
ax.plot([inset[0], inset[0]+inset[2]], [target]*2, '-', c='0.5', lw=2, zorder=2)
ax.set_xlim(inset[0], inset[0]+inset[2])
ax.set_ylim(inset[1], inset[1]+inset[3])
if draw_background:
draw_cue_bg(ax, 2, bins=256, use_toy=True)
ax = plt.subplot(rows, panels, 3)
zero_trial = 1+(toy.ntrials-2)/2
if draw_lines:
for trial in xrange(2, toy.ntrials):
ax.plot(time, toy.theta[trial][dt,0]-toy.theta[-zero_trial][dt,0], 'k-')
ax.set_xlim(xlim)
if draw_background:
draw_cue_bg(ax, 2, use_toy=True)
ax = plt.subplot(rows, panels, 4)
alpha_t = alpha(time, A=2*pi*data.C_local[2].max(),
sigma=(9/pi)*cue_sizes[0], r=1, s=1)
if draw_lines:
for trial in xrange(2, toy.ntrials):
error = toy.theta[trial][dt,0]-toy.theta[-zero_trial][dt,0]
ax.plot(time, error/error[0], 'k-', aa=True)
ax.plot(time, alpha_t/alpha_t[0], 'r--', zorder=5, lw=2, aa=True)
ax.set_xlim(xlim)
ax.set_ylim(0, 1.05)
if draw_background:
draw_cue_bg(ax, 2, 0, 1.05, use_toy=True)
time = data.t[dt] - cue_t0
for i in xrange(N_cues):
trial = i + 2 + N_cues
row += 1
ax = plt.subplot(rows, panels, panels*(row-1)+1)
ax.plot(data.x[trial], data.y[trial], 'k-', lw=0.5, zorder=-1)
ax.scatter(x=data.x[trial][dt], y=data.y[trial][dt],
c=data.C_local[trial][dt], s=5, linewidths=0)
ax.axis('equal')
ax.set_axis_off()
self.out('Cue %d, max C = %.3f'%(i+1, data.C_local[trial].max()))
cuemod = data.theta[trial][dt]
for j in xrange(cuemod.shape[1]):
if cuemod[-1,j] < -pi:
cuemod[:,j] += 2*pi
elif cuemod[-1,j] > pi:
cuemod[:,j] -= 2*pi
ax = plt.subplot(rows, panels, panels*(row-1)+2)
if draw_lines:
ax.plot(time, cuemod, 'k-', aa=True)
ax.set_xlim(xlim)
if draw_background:
draw_cue_bg(ax, trial)
error = data.theta[trial][dt] - data.theta[trial-N_cues][dt]
for j in xrange(error.shape[1]):
if error[-1,j] < -pi:
error[:,j] += 2*pi
elif error[-1,j] > pi:
error[:,j] -= 2*pi
ax = plt.subplot(rows, panels, panels*(row-1)+3)
if draw_lines:
ax.plot(time, error, 'k-', aa=True)
ax.set_xlim(xlim)
if draw_background:
draw_cue_bg(ax, trial)
self.out('Cue %d, epsilon = %.3f'%(i+1, np.median(error[-1]/error[0])))
fwhm = _model_dt * (data.C_local[trial]>data.C_local[trial].max()/2).sum()
sigma_t = fwhm / (2*np.sqrt(2*np.log(2)))
alpha_t = alpha(time, A=2*pi*data.C_local[trial].max(), sigma=sigma_t, r=1, s=1)
ax = plt.subplot(rows, panels, panels*(row-1)+4)
if draw_lines:
ax.plot(time, error/error[0], 'k-', aa=True)
ax.plot(data.t[dt]-cue_t0, alpha_t/alpha_t[0], 'r--', lw=2, aa=True)
ax.set_xlim(xlim)
ax.set_ylim(0, 1.05)
if draw_background:
draw_cue_bg(ax, trial, 0, 1.05)
plt.draw()
plt.rcParams['figure.figsize'] = plt.rcParamsDefault['figure.figsize']
self.out.outfd.close()