* scheduler.cpp
* This file is part of NEST.
* Copyright (C) 2004 The NEST Initiative
* NEST is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
* NEST is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with NEST. If not, see <http://www.gnu.org/licenses/>.
#include <cmath>
#include <iostream>
#include <sstream>
#include <set>
#include "config.h"
#include "compose.hpp"
#undef __PURE_CNAME
#include <pthread.h>
#define __PURE_CNAME
#include <pthread.h>
#ifdef _OPENMP
#include <omp.h>
#include <climits>
#include "network.h"
#include "exceptions.h"
#include "scheduler.h"
#include "event.h"
#include "dict.h"
#include "integerdatum.h"
#include "doubledatum.h"
#include "dictutils.h"
#include "arraydatum.h"
#include "randomgen.h"
#include "random_datums.h"
#include "nest_timemodifier.h"
#include "nest_timeconverter.h"
#ifdef N_DEBUG
#undef N_DEBUG
extern int SLIsignalflag;
std::vector<nest::delay> nest::Scheduler::moduli_;
std::vector<nest::delay> nest::Scheduler::slice_moduli_;
nest::delay nest::Scheduler::max_delay_ = 1;
nest::delay nest::Scheduler::min_delay_ = 1;
const nest::delay nest::Scheduler::comm_marker_ = 0;
nest::Scheduler::Scheduler(Network &net)
: initialized_(false),
to_step_(0L), // consistent with to_do_ == 0
void nest::Scheduler::reset()
// Reset TICS_PER_MS, MS_PER_TICS and TICS_PER_STEP to the compiled in default values.
// See ticket #217 for details.
clock_.set_to_zero(); // ensures consistent state
to_do_ = 0;
slice_ = 0;
from_step_ = 0;
to_step_ = 0; // consistent with to_do_ = 0
void nest::Scheduler::clear_pending_spikes()
void nest::Scheduler::init_()
assert(initialized_ == false);
simulated_ = false;
min_delay_ = max_delay_ = 0;
update_ref_ = true;
int status = pthread_cond_init(&done_, NULL);
if(status != 0)
net_.message(SLIInterpreter::M_ERROR, "Scheduler::reset",
"Error initializing condition variable done_");
throw PthreadException(status);
status = pthread_cond_init(&ready_, NULL);
if(status != 0)
net_.message(SLIInterpreter::M_ERROR, "Scheduler::reset",
"Error initializing condition variable ready_");
throw PthreadException(status);
#ifndef _OPENMP
if (n_threads_ > 1)
net_.message(SLIInterpreter::M_ERROR, "Scheduler::reset",
"No multithreading available, using single threading");
n_threads_ = 1;
force_singlethreading_ = true;
create_rngs_(true); // flag that this is a call from the ctr
create_grng_(true); // flag that this is a call from the ctr
initialized_ = true;
void nest::Scheduler::finalize_()
int status = pthread_cond_destroy(&done_);
if(status != 0)
net_.message(SLIInterpreter::M_ERROR, "Scheduler::reset",
"Error in destruction of condition variable done_");
throw PthreadException(status);
status = pthread_cond_destroy(&ready_);
if(status != 0)
net_.message(SLIInterpreter::M_ERROR, "Scheduler::reset",
"Error in destruction of condition variable ready_");
throw PthreadException(status);
// clear the buffers
initialized_ = false;
void nest::Scheduler::init_moduli_()
assert (min_delay_ != 0);
assert (max_delay_ != 0);
* Ring buffers use modulos to determine where to store incoming events
* with given time stamps, relative to the beginning of the slice in which
* the spikes are delivered from the queue, ie, the slice after the one
* in which they were generated. The pertaining offsets are 0..max_delay-1.
moduli_.resize(min_delay_ + max_delay_);
for(delay d=0; d < min_delay_ + max_delay_; ++d)
moduli_[d]= ( clock_.get_steps()+ d ) % (min_delay_ + max_delay_);
// Slice-based ring-buffers have one bin per min_delay steps,
// up to max_delay. Time is counted as for normal ring buffers.
// The slice_moduli_ table maps time steps to these bins
const size_t nbuff = static_cast<size_t>(std::ceil(static_cast<double>(min_delay_+max_delay_) / min_delay_));
for ( delay d = 0 ; d < min_delay_+max_delay_ ; ++d )
slice_moduli_[d] = ( (clock_.get_steps() + d) / min_delay_ ) % nbuff;
* This function is called after all nodes have been updated.
* We can compute the value of (T+d) mod max_delay without explicit
* reference to the network clock, because compute_moduli_ is
* called whenever the network clock advances.
* The various modulos for all available delays are stored in
* a lookup-table and this table is rotated once per time slice.
void nest::Scheduler::compute_moduli_()
assert (min_delay_ != 0);
assert (max_delay_ != 0);
* Note that for updating the modulos, it is sufficient
* to rotate the buffer to the left.
assert(moduli_.size() == min_delay_ + max_delay_);
/* For the slice-based ring buffer, we cannot rotate the table, but
have to re-compute it, since max_delay_ may not be a multiple of
min_delay_. Reference time is the time at the beginning of the slice.
const size_t nbuff = static_cast<size_t>(std::ceil(static_cast<double>(min_delay_+max_delay_) / min_delay_));
for ( delay d = 0 ; d < min_delay_+max_delay_ ; ++d )
slice_moduli_[d] = ((clock_.get_steps() + d) / min_delay_ ) % nbuff;
void nest::Scheduler::compute_delay_extrema_(delay& min_delay, delay& max_delay) const
min_delay = net_.connection_manager_.get_min_delay().get_steps();
max_delay = net_.connection_manager_.get_max_delay().get_steps();
if (Communicator::get_num_processes() > 1)
std::vector<int_t> min_delays(Communicator::get_num_processes());
min_delays[Communicator::get_rank()] = min_delay;
min_delay = *std::min_element(min_delays.begin(), min_delays.end());
std::vector<int_t> max_delays(Communicator::get_num_processes());
max_delays[Communicator::get_rank()] = max_delay;
max_delay = *std::max_element(max_delays.begin(), max_delays.end());
void nest::Scheduler::configure_spike_buffers_()
assert(min_delay_ != 0);
// the following line does not compile with gcc <= 3.3.5
spike_register_.resize(n_threads_, std::vector<std::vector<uint_t> >(min_delay_));
for (size_t j = 0; j < spike_register_.size(); ++j)
for (size_t k = 0; k < spike_register_[j].size(); ++k)
// the following line does not compile with gcc <= 3.3.5
offgrid_spike_register_.resize(n_threads_, std::vector<std::vector<OffGridSpike> >(min_delay_));
for (size_t j = 0; j < offgrid_spike_register_.size(); ++j)
for (size_t k = 0; k < offgrid_spike_register_[j].size(); ++k)
//send_buffer must be >= 2 as the 'overflow' signal takes up 2 spaces.
int send_buffer_size = n_threads_ * min_delay_ > 2 ? n_threads_ * min_delay_ : 2;
int recv_buffer_size = send_buffer_size * Communicator::get_num_processes();
Communicator::set_buffer_sizes(send_buffer_size, recv_buffer_size);
// DEC cxx required 0U literal, HEP 2007-03-26
local_grid_spikes_.resize(send_buffer_size, 0U);
local_offgrid_spikes_.resize(send_buffer_size, OffGridSpike(0,0.0));
global_grid_spikes_.resize(recv_buffer_size, 0U);
global_offgrid_spikes_.resize(recv_buffer_size, OffGridSpike(0,0.0));
displacements_.resize(Communicator::get_num_processes(), 0);
void nest::Scheduler::clear_nodes_vec_()
for(index t = 0; t < n_threads_; ++t)
void nest::Scheduler::simulate(Time const & t)
t_real_ = 0;
t_slice_begin_ = timeval();
t_slice_end_ = timeval();
if (t == Time::ms(0.0))
if (t < Time::step(1))
net_.message(SLIInterpreter::M_ERROR, "Scheduler::simulate",
String::compose("Simulation time must be >= %1 ms (one time step).",
throw KernelException();
// Check for synchronicity of global rngs over processes
if(Communicator::get_num_processes() > 1)
if (!Communicator::grng_synchrony(grng_->ulrand(100000)))
net_.message(SLIInterpreter::M_ERROR, "Scheduler::simulate",
"Global Random Number Generators are not synchronized prior to simulation.");
throw KernelException();
// As default, we have to place the iterator at the
// leftmost node of the tree.
// If the iteration process was suspended, we leave the
// iterator where it was and continue the iteration
// process by calling "resume()"
// Note that the time argument accumulates.
// This is needed so that the following two
// calling sequences yield the same results:
// a: net.simulate(t1); // suspend is called here
// net.simulate(t2);
// assert(net.t == t1+t2);
// b: net.simulate(t1); // NO suspend called
// net.simulate(t2);
// assert(net.t == t1+t2);
// The philosophy behind this behaviour is that the user
// can in principle not know whether an element will suspend
// the cycle.
// check whether the simulation clock
// will overflow during the simulation.
if ( t.is_finite() )
Time time1 = clock_ + t;
if( !time1.is_finite() )
std::string msg = String::compose("A clock overflow will occur after %1 of %2 ms. Please reset network "
"clock first!", (Time::max()-clock_).get_ms(), t.get_ms());
net_.message(SLIInterpreter::M_ERROR, "Scheduler::simulate", msg);
throw KernelException();
std::string msg = String::compose("The requested simulation time exceeds the largest time NEST can handle "
"(T_max = %1 ms). Please use a shorter time!", Time::max().get_ms());
net_.message(SLIInterpreter::M_ERROR, "Scheduler::simulate", msg);
throw KernelException();
to_do_ += t.get_steps();
to_do_total_ = to_do_;
// find shortest and longest delay across all MPI processes
// this call sets the member variables
compute_delay_extrema_(min_delay_, max_delay_);
// Warn about possible inconsistencies, see #504.
// This test cannot come any earlier, because we first need to compute min_delay_
// above.
if ( t.get_steps() % min_delay_ != 0 )
net_.message(SLIInterpreter::M_WARNING, "Scheduler::simulate",
"The requested simulation time is not an integer multiple of the minimal delay in the network. "
"This may result in inconsistent results under the following conditions: (i) A network contains "
"more than one source of randomness, e.g., two different poisson_generators, and (ii) Simulate "
"is called repeatedly with simulation times that are not multiples of the minimal delay.");
// from_step_ is not touched here. If we are at the beginning
// of a simulation, it has been reset properly elsewhere. If
// a simulation was ended and is now continued, from_step_ will
// have the proper value. to_step_ is set as in advance_time().
ulong_t end_sim = from_step_ + to_do_;
if (min_delay_ < end_sim)
to_step_ = min_delay_; // update to end of time slice
to_step_ = end_sim; // update to end of simulation time
simulated_ = true;
// Check for synchronicity of global rngs over processes
if(Communicator::get_num_processes() > 1)
if (!Communicator::grng_synchrony(grng_->ulrand(100000)))
net_.message(SLIInterpreter::M_ERROR, "Scheduler::simulate",
"Global Random Number Generators are not synchronized after simulation.");
throw KernelException();
void nest::Scheduler::resume()
terminate_ = false;
if(to_do_ == 0)
// The following is needed on Solaris 7 and higher
// to get one processor per thread
int status = pthread_setconcurrency(n_threads_);
if(status != 0)
net_.message(SLIInterpreter::M_ERROR, "Scheduler::resume","Could not set concurrency level.");
throw PthreadException(status);
assert(threads_.size() == n_threads_);
// if at the beginning of a simulation, set up spike buffers
if ( !simulated_ )
// we have to do enter_runtime after prepre_nodes, since we use
// calibrate to map the ports of MUSIC devices, which has to be done
// before enter_runtime
if (!simulated_) // only enter the runtime mode once
double tick = Time::get_resolution().get_ms() * min_delay_;
std::string msg = String::compose("Entering MUSIC runtime with tick = %1 ms", tick);
net_.message(SLIInterpreter::M_INFO, "Scheduler::resume", msg);
simulating_ = true;
if (print_time_)
std::cout << std::endl;
if (n_threads_ == 1)
// Now we fire up the threads ...
for(index i = 0; i < threads_.size(); ++i)
threads_[i].init(i, this);
// ... and wait until all are done.
for(vector<Thread>::iterator i = threads_.begin(); i != threads_.end(); ++i)
#ifdef _OPENMP
// use openmp, if compiled with right compiler switch
// for gcc this is -fopenmp
net_.message(SLIInterpreter::M_ERROR, "Scheduler::reset",
"No multithreading available, using single threading");
simulating_ = false;
if (print_time_)
std::cout << std::endl;
net_.message(SLIInterpreter::M_ERROR, "Scheduler::resume", "Exiting on error or user signal.");
net_.message(SLIInterpreter::M_ERROR, "Scheduler::resume", "Scheduler: Use 'ResumeSimulation' to resume.");
if(SLIsignalflag != 0)
SystemSignal signal(SLIsignalflag);
throw signal;
throw SimulationError();
net_.message(SLIInterpreter::M_INFO, "Scheduler::resume", "Simulation finished.");
* the single-threaded update is just a single loop over all nodes
* per time-slice.
void nest::Scheduler::serial_update()
std::vector<Node*>::iterator i;
if (print_time_)
gettimeofday(&t_slice_begin_, NULL);
if ( from_step_ == 0 ) // deliver only at beginning of slice
// advance the time of music by one step (min_delay * h) must
// be done after deliver_events_() since it calls
// music_event_out_proxy::handle(), which hands the spikes over to
// MUSIC *before* MUSIC time is advanced
if (slice_ > 0)
net_.update_music_event_handlers_(clock_, from_step_, to_step_);
for (i = nodes_vec_[0].begin(); i != nodes_vec_[0].end(); ++i)
if ( static_cast<ulong_t>(to_step_) == min_delay_ ) // gather only at end of slice
if(SLIsignalflag != 0)
net_.message(SLIInterpreter::M_INFO, "Scheduler::serial_update", "Simulation exiting on user signal.");
if (print_time_)
gettimeofday(&t_slice_end_, NULL);
} while((to_do_ != 0) && (! terminate_));
void nest::Scheduler::threaded_update_openmp()
#ifdef _OPENMP
net_.message(SLIInterpreter::M_INFO, "Scheduler::threaded_update_openmp", "Simulating using OpenMP.");
// parallel section begins
#pragma omp parallel
int t = 0;
#ifdef _OPENMP
t = omp_get_thread_num(); // which thread am I
do {
if (print_time_)
gettimeofday(&t_slice_begin_, NULL);
if ( from_step_ == 0 ) // deliver only at beginning of slice
// advance the time of music by one step (min_delay * h) must
// be done after deliver_events_() since it calls
// music_event_out_proxy::handle(), which hands the spikes over to
// MUSIC *before* MUSIC time is advanced
if (slice_ > 0)
net_.update_music_event_handlers_(clock_, from_step_, to_step_);
for (std::vector<Node*>::iterator i = nodes_vec_[t].begin(); i != nodes_vec_[t].end(); ++i)
// parallel section ends, wait until all threads are done -> synchronize
#pragma omp barrier
// the following block is executed by a single thread
// the other threads wait at the end of the block
#pragma omp single
if ( static_cast<ulong_t>(to_step_) == min_delay_ ) // gather only at end of slice
if(SLIsignalflag != 0)
net_.message(SLIInterpreter::M_INFO, "Scheduler::serial_update", "Simulation exiting on user signal.");
terminate_ = true;
if (print_time_)
gettimeofday(&t_slice_end_, NULL);
// end of single section, all threads synchronize at this point
while((to_do_ != 0) && (! terminate_));
} // end of #pragma parallel omp
void nest::Scheduler::threaded_update(thread t)
/** @todo Should not all variables related to thread control
* (ready_mutex_, ready_, done_, etnry_counter_, exit_counter_)
* be local variables in this function instead of class variables?
* Creating the pthread variables must be fast compared to simulation
* time.
assert(t >= 0);
while ( true )
/* The default proceeding:
* - update the nodes.
* - synchronise threads.
* - advance time.
* This loop is left if and only if
* - the simulation time has elapsed and all nodes
* of the current thread are updated (test on to_do_).
* - the end of a time slice has been reached and
* a user interrupt occured (test on SLISignalflag).
//////////// Serial section beyond this line
// Wait until all threads have finished communicating and
// have advance time. The final thread arriving here has
// entry_counter_ == n_threads_, owns the ready_mutex_ and
// initiates unlocking everything as follows:
// 1. pthread_cond_signal(&ready_) wakes one arbitry waiting thread
// 2. the awoken thread tries to acquire the ready_mutex_ as part of
// waking up, but blocks on it
// 3. the signaling thread unlocks the ready_mutex_ and proceeds into
// the parallel section
// 4. the awoken thread acquires the ready_mutex_ and the process
// starts at 1.
// 5. the last thread to awake signals, but this signal has no effect
// 6. all threads are now in the parallel section
if (print_time_)
gettimeofday(&t_slice_begin_, NULL);
if(entry_counter_ == n_threads_)
exit_counter_ = 0;
// protect against spurious wake-ups by while loop
// see Butenhof, Programming with POSIX Threads, sec 3.3.2
while(entry_counter_ < n_threads_)
//////////// Parallel section beyond this line
// Deliver events and update all neurons in parallel
if ( from_step_ == 0 ) // deliver only at beginning of slice
if (t == 0) // only the first thread handles MUSIC stuff
// advance the time of music by one step (min_delay * h) must
// be done after deliver_events_() since it calls
// music_event_out_proxy::handle(), which hands the spikes over to
// MUSIC *before* MUSIC time is advanced
if (slice_ > 0)
net_.update_music_event_handlers_(clock_, from_step_, to_step_);
vector<Node*>::iterator i;
for(i = nodes_vec_[t].begin(); i != nodes_vec_[t].end(); ++i)
//////////// Serial section beyond this line
++exit_counter_; // Mark thread idle.
// We are at the end of a time slice or the end of simulation time,
// which may be inside a time slice.
// We wait for all threads to finish the parallel section above. The
// last thread arriving here fulfills exit_counter_==n_threads_ and
// performs event collection and communication with all other MPI
// processes, and advances time.
// Threads are then awoken one by one similar as described above.
// Each awoken thread retains the ready_mutex_ though, until it goes
// to sleep again by calling pthread_cond_wait(&ready_,...) above.
// The overall effect of this is that we first wait for all threads to
// finish updating, then communicate and advance time by help of the
// last thread, and then check if we need to leave the simulation loop
// because of a signal flag or because time has elapsed. In the latter
// case, each thread releases the ready_mutex_ once it has broken out
// of the while loop, so that the next thread can wake up.
if (exit_counter_ == n_threads_)
if ( static_cast<ulong_t>(to_step_) == min_delay_ ) // gather only at end of slice
entry_counter_ = 0;
// protect against spurious wake-ups by while loop
// see Butenhof, Programming with POSIX Threads, sec 3.3.2
while(exit_counter_ < n_threads_)
if ( SLIsignalflag != 0 )
std::string msg = String::compose("Thread %1 exiting on error or user signal.", t);
net_.message(SLIInterpreter::M_INFO, "Scheduler::threaded_update", msg);
if(to_do_ == 0) // is time up?
assert(to_do_ > 0); // for safety's sake
if (print_time_)
gettimeofday(&t_slice_end_, NULL);
} // while
ready_mutex_.unlock(); // unlock mutex after breaking out of while loop
void nest::Scheduler::threaded_update(thread)
net_.message(SLIInterpreter::M_ERROR, "Scheduler::threaded_update", "Multithreading mode not available");
throw KernelException();
void nest::Scheduler::prepare_nodes()
net_.message(SLIInterpreter::M_INFO, "Scheduler::prepare_nodes", "Please wait. Preparing elements.");
/* We clear the existing nodes_vec_ and then rebuild it.
prepare_node_() below inserts each node into the appropriate
nodes_vec_ after initializing its buffers and calibrating it.
We must rebuild the nodes_vec_ each time Simulate is called,
in case nodes have been added or deleted between Simulate calls.
#ifdef _OPENMP
#pragma omp parallel
size_t t = omp_get_thread_num();
for (index t = 0; t < n_threads_; ++t)
size_t num_thread_local_nodes = 0;
for (size_t idx = 0; idx < net_.size(); ++idx)
Node* node = net_.get_node(idx);
if (static_cast<index>(node->get_thread()) == t || node->num_thread_siblings_() > 0)
for(index n = 0; n < net_.size(); ++n)
if (net_.is_local_gid(n) && net_.nodes_[n] != 0)
if ((*net_.nodes_[n]).num_thread_siblings_() > 0)
Node* node = net_.get_node(n, t);
if (static_cast<uint_t>(node->get_thread()) == t)
} // end of parallel section / end of for threads
n_nodes_ = 0;
for (index t = 0; t < n_threads_; ++t)
n_nodes_ += nodes_vec_[t].size();
std::string msg = String::compose("Simulating %1 nodes.", n_nodes_);
net_.message(SLIInterpreter::M_INFO, "Scheduler::prepare_nodes", msg);
void nest::Scheduler::finalize_nodes()
for (index t = 0; t < n_threads_; ++t)
for(index n = 0; n < net_.size(); ++n)
if ( net_.is_local_gid(n) && net_.nodes_[n]!=0 )
if ((*net_.nodes_[n]).num_thread_siblings_() > 0)
Node* node = net_.get_node(n, t);
if (static_cast<uint_t>(node->get_thread()) == t)
void nest::Scheduler::set_status(DictionaryDatum const &d)
// Create an instance of time converter here to capture the current
// representation of time objects: TICS_PER_MS and TICS_PER_STEP
// will be stored in time_converter.
// This object can then be used to convert times in steps
// (e.g. Connection::delay_) or tics to the new representation.
// We pass this object to ConnectionManager::calibrate to update
// all time objects in the connection system to the new representation.
// MH 08-04-14
TimeConverter time_converter;
double_t time;
if(updateValue<double_t>(d, "time", time))
if ( time != 0.0 )
throw BadProperty("The simulation time can only be set to 0.0.");
if ( clock_ > Time(Time::step(0)) )
// reset only if time has passed
net_.message(SLIInterpreter::M_WARNING, "Scheduler::set_status",
"Simulation time reset to t=0.0. Resetting the simulation time is not"
"fully supported in NEST at present. Some spikes may be lost, and"
"stimulating devices may behave unexpectedly. PLEASE REVIEW YOUR"
clock_ = Time::step(0);
from_step_ = 0;
slice_ = 0;
configure_spike_buffers_(); // clear all old spikes
updateValue<bool>(d, "print_time", print_time_);
long n_threads;
bool n_threads_updated = updateValue<long>(d, "local_num_threads", n_threads);
if (n_threads_updated)
if ( net_.size() > 1 )
throw KernelException("Nodes exist: Thread/process number cannot be changed.");
if ( net_.connection_manager_.has_user_prototypes() )
throw KernelException("Custom synapse types exist: Thread/process number cannot be changed.");
if ( net_.connection_manager_.get_user_set_delay_extrema() )
throw KernelException("Delay extrema have been set: Thread/process number cannot be changed.");
if ( net_.get_simulated() )
throw KernelException("The network has been simulated: Thread/process number cannot be changed.");
if ( not Time::resolution_is_default() )
throw KernelException("The resolution has been set: Thread/process number cannot be changed.");
if ( n_threads > 1 && force_singlethreading_ )
net_.message(SLIInterpreter::M_WARNING, "Scheduler::set_status",
"No multithreading available, using single threading");
n_threads_ = 1;
// it is essential to call net_.reset() here to adapt memory pools and more
// to the new number of threads and VPs.
n_threads_ = n_threads;
long n_vps;
bool n_vps_updated = updateValue<long>(d, "total_num_virtual_procs", n_vps);
if (n_vps_updated)
if ( net_.size() > 1 )
throw KernelException("Nodes exist: Thread/process number cannot be changed.");
if ( net_.connection_manager_.has_user_prototypes() )
throw KernelException("Custom synapse types exist: Thread/process number cannot be changed.");
if ( net_.connection_manager_.get_user_set_delay_extrema() )
throw KernelException("Delay extrema have been set: Thread/process number cannot be changed.");
if ( net_.get_simulated() )
throw KernelException("The network has been simulated: Thread/process number cannot be changed.");
if ( not Time::resolution_is_default() )
throw KernelException("The resolution has been set: Thread/process number cannot be changed.");
if (n_vps % Communicator::get_num_processes() != 0)
throw BadProperty("Number of virtual processes (threads*processes) must be an integer "
"multiple of the number of processes. Value unchanged.");
n_threads_ = n_vps / Communicator::get_num_processes();
if ((n_threads > 1) && (force_singlethreading_))
net_.message(SLIInterpreter::M_WARNING, "Scheduler::set_status",
"No multithreading available, using single threading");
n_threads_ = 1;
// it is essential to call net_.reset() here to adapt memory pools and more
// to the new number of threads and VPs
// tics_per_ms and resolution must come after local_num_thread / total_num_threads
// because they might reset the network and the time representation
nest::double_t tics_per_ms;
bool tics_per_ms_updated = updateValue<nest::double_t>(d, "tics_per_ms", tics_per_ms);
double_t resd;
bool res_updated = updateValue<double_t>(d, "resolution", resd);
if (tics_per_ms_updated || res_updated)
if (net_.size() > 1) // root always exists
net_.message(SLIInterpreter::M_ERROR, "Scheduler::set_status",
"Cannot change time representation after nodes have been created. Please call ResetKernel first.");
throw KernelException();
else if ( net_.get_simulated() ) // someone may have simulated empty network
net_.message(SLIInterpreter::M_ERROR, "Scheduler::set_status",
"Cannot change time representation after the network has been simulated. Please call ResetKernel first.");
throw KernelException();
else if ( net_.connection_manager_.get_num_connections() != 0 )
net_.message(SLIInterpreter::M_ERROR, "Scheduler::set_status",
"Cannot change time representation after connections have been created. Please call ResetKernel first.");
throw KernelException();
else if (res_updated && tics_per_ms_updated) // only allow TICS_PER_MS to be changed together with resolution
if ( resd < 1.0 / tics_per_ms )
net_.message(SLIInterpreter::M_ERROR, "Scheduler::set_status",
"Resolution must be greater than or equal to one tic. Value unchanged.");
throw KernelException();
nest::TimeModifier::set_time_representation(tics_per_ms, resd);
clock_.calibrate(); // adjust to new resolution
net_.connection_manager_.calibrate(time_converter); // adjust delays in the connection system to new resolution
net_.message(SLIInterpreter::M_INFO, "Scheduler::set_status", "tics per ms and resolution changed.");
else if (res_updated) // only resolution changed
if ( resd < Time::get_ms_per_tic() )
net_.message(SLIInterpreter::M_ERROR, "Scheduler::set_status",
"Resolution must be greater than or equal to one tic. Value unchanged.");
throw KernelException();
clock_.calibrate(); // adjust to new resolution
net_.connection_manager_.calibrate(time_converter); // adjust delays in the connection system to new resolution
net_.message(SLIInterpreter::M_INFO, "Scheduler::set_status", "Temporal resolution changed.");
net_.message(SLIInterpreter::M_ERROR, "Scheduler::set_status",
"change of tics_per_step requires simultaneous specification of resolution.");
throw KernelException();
bool off_grid_spiking;
bool grid_spiking_updated = updateValue<bool>(d, "off_grid_spiking", off_grid_spiking);
if (grid_spiking_updated)
off_grid_spiking_ = off_grid_spiking;
bool comm_allgather;
bool commstyle_updated = updateValue<bool>(d, "communicate_allgather", comm_allgather);
if (commstyle_updated)
// set RNGs --- MUST come after n_threads_ is updated
if (d->known("rngs"))
// this array contains pre-seeded RNGs, so they can be used
// directly, no seeding required
ArrayDatum *ad =
dynamic_cast<ArrayDatum *>((*d)["rngs"].datum());
if ( ad == 0 )
throw BadProperty();
// n_threads_ is the new value after a change of the number of
// threads
if (ad->size() != (size_t)(Communicator::get_num_virtual_processes()))
net_.message(SLIInterpreter::M_ERROR, "Scheduler::set_status",
"Number of RNGs must equal number of virtual processes (threads*processes). RNGs unchanged.");
throw DimensionMismatch((size_t)(Communicator::get_num_virtual_processes()), ad->size());
// delete old generators, insert new generators this code is
// robust under change of thread number in this call to
// set_status, as long as it comes AFTER n_threads_ has been
// upated
for (index i = 0 ; i < ad->size() ; ++i)
else if (n_threads_updated && net_.size() == 0)
net_.message(SLIInterpreter::M_WARNING, "Schedulder::set_status", "Equipping threads with new default RNGs");
if ( d->known("rng_seeds") )
ArrayDatum *ad = dynamic_cast<ArrayDatum *>((*d)["rng_seeds"].datum());
if ( ad == 0 )
throw BadProperty();
if (ad->size() != (size_t)(Communicator::get_num_virtual_processes()))
net_.message(SLIInterpreter::M_ERROR, "Scheduler::set_status",
"Number of seeds must equal number of virtual processes (threads*processes). RNGs unchanged.");
throw DimensionMismatch((size_t)(Communicator::get_num_virtual_processes()), ad->size());
// check if seeds are unique
std::set<ulong_t> seedset;
for ( index i = 0 ; i < ad->size() ; ++i )
long s = (*ad)[i]; // SLI has no ulong tokens
if ( !seedset.insert(s).second )
net_.message(SLIInterpreter::M_WARNING, "Scheduler::set_status",
"Seeds are not unique across threads!");
// now apply seeds, resets generators automatically
for ( index i = 0 ; i < ad->size() ; ++i )
long s = (*ad)[i];
rng_seeds_[i] = s;
} // if rng_seeds
// set GRNG
if (d->known("grng"))
// pre-seeded grng that can be used directly, no seeding required
updateValue<librandom::RngDatum>(d, "grng", grng_);
else if (n_threads_updated && net_.size() == 0)
net_.message(SLIInterpreter::M_WARNING, "Schedulder::set_status", "Equipping threads with new default GRNG");
if ( d->known("grng_seed") )
long s = getValue<long>(d, "grng_seed");
// check if grng seed is unique with respect to rng seeds
// if grng_seed and rng_seeds given in one SetStatus call
std::set<ulong_t> seedset;
if (d->known("rng_seeds"))
ArrayDatum *ad_rngseeds =
dynamic_cast<ArrayDatum *>((*d)["rng_seeds"].datum());
if ( ad_rngseeds == 0 )
throw BadProperty();
for ( index i = 0 ; i < ad_rngseeds->size() ; ++i )
s = (*ad_rngseeds)[i]; // SLI has no ulong tokens
if ( !seedset.insert(s).second )
net_.message(SLIInterpreter::M_WARNING, "Scheduler::set_status",
"Seeds are not unique across threads!");
// now apply seed, resets generator automatically
grng_seed_ = s;
} // if grng_seed
long rng_bs;
if ( updateValue<long>(d, "rng_buffsize", rng_bs) )
if ( rng_bs < 1 )
throw BadProperty();
// now apply seeds, resets generators automatically
for ( index i = 0 ; i < rng_.size() ; ++i )
void nest::Scheduler::get_status(DictionaryDatum &d) const
def<long>(d, "local_num_threads", n_threads_);
def<long>(d, "total_num_virtual_procs", Communicator::get_num_virtual_processes());
def<long>(d, "num_processes", Communicator::get_num_processes());
def<double_t>(d, "time", get_time().get_ms());
def<long>(d, "to_do", to_do_);
def<bool>(d, "print_time", print_time_);
def<double>(d, "tics_per_ms", Time::get_tics_per_ms());
def<double>(d, "resolution", Time::get_resolution().get_ms());
delay min_delay = 0;
delay max_delay = 0;
compute_delay_extrema_(min_delay, max_delay);
Time tmp_delay = Time::step(min_delay);
def<double>(d, "min_delay", tmp_delay.get_ms());
tmp_delay = Time::step(max_delay);
def<double>(d, "max_delay", tmp_delay.get_ms());
def<double>(d, "ms_per_tic", Time::get_ms_per_tic());
def<double>(d, "tics_per_ms", Time::get_tics_per_ms());
def<long>(d, "tics_per_step", Time::get_tics_per_step());
def<double>(d, "T_min", Time::min().get_ms());
def<double>(d, "T_max", Time::max().get_ms());
// buffer size is the same for all threads,
// so we get it from the first thread
def<long>(d, "rng_buffsize", rng_[0]->get_buffsize());
(*d)["rng_seeds"] = Token(rng_seeds_);
def<long>(d, "grng_seed", grng_seed_);
def<bool>(d, "off_grid_spiking", off_grid_spiking_);
def<bool>(d, "communicate_allgather", Communicator::get_use_Allgather());
void nest::Scheduler::create_rngs_(const bool ctor_call)
// net_.message(SLIInterpreter::M_INFO, ) calls must not be called
// if create_rngs_ is called from Scheduler::Scheduler(), since net_
// is not fully constructed then
// if old generators exist, remove them; since rng_ contains
// lockPTRs, we don't have to worry about deletion
if ( rng_.size() > 0 )
if ( !ctor_call )
net_.message(SLIInterpreter::M_INFO, "Scheduler::create_rngs_", "Deleting existing random number generators");
// create new rngs
if ( !ctor_call )
net_.message(SLIInterpreter::M_INFO, "Scheduler::create_rngs_", "Creating default RNGs");
for ( index i = 0; i < static_cast<index>(Communicator::get_num_virtual_processes()); ++i )
unsigned long s = i + 1;
We have to ensure that each thread is provided with a different
stream of random numbers. The seeding method for Knuth's LFG
generator guarantees that different seeds yield non-overlapping
random number sequences.
We therefore have to seed with known numbers: using random
seeds here would run the risk of using the same seed twice.
For simplicity, we use 1 .. n_vps.
librandom::RngPtr rng = librandom::RandomGen::create_knuthlfg_rng(s);
if ( !rng )
if ( !ctor_call )
net_.message(SLIInterpreter::M_ERROR, "Scheduler::create_rngs_", "Error initializing knuthlfg");
std::cerr << "\nScheduler::create_rngs_\n" << "Error initializing knuthlfg" << std::endl;
throw KernelException();
rng_seeds_[i] = s;
void nest::Scheduler::create_grng_(const bool ctor_call)
// create new grng
if ( !ctor_call )
net_.message(SLIInterpreter::M_INFO, "Scheduler::create_grng_", "Creating new default global RNG");
// create default RNG with default seed
grng_ = librandom::RandomGen::create_knuthlfg_rng(librandom::RandomGen::DefaultSeed);
if ( !grng_ )
if ( !ctor_call )
net_.message(SLIInterpreter::M_ERROR, "Scheduler::create_grng_",
"Error initializing knuthlfg");
std::cerr << "\nScheduler::create_grng_\n"
<< "Error initializing knuthlfg"
<< std::endl;
throw KernelException();
The seed for the global rng should be different from the seeds
of the local rngs_ for each thread seeded with 1,..., n_vps.
long s = Communicator::get_num_virtual_processes() + 1;
grng_seed_ = s;
void nest::Scheduler::collocate_buffers_()
//count number of spikes in registers
int num_spikes = 0;
int num_grid_spikes = 0;
int num_offgrid_spikes = 0;
std::vector<std::vector<std::vector<uint_t> > >::iterator i;
std::vector<std::vector<uint_t> >::iterator j;
for (i = spike_register_.begin(); i != spike_register_.end(); ++i)
for (j = i->begin(); j != i->end(); ++j)
num_grid_spikes += j->size();
std::vector<std::vector<std::vector<OffGridSpike> > >::iterator it;
std::vector<std::vector<OffGridSpike> >::iterator jt;
for (it = offgrid_spike_register_.begin(); it != offgrid_spike_register_.end(); ++it)
for (jt = it->begin(); jt != it->end(); ++jt)
num_offgrid_spikes += jt->size();
num_spikes = num_grid_spikes + num_offgrid_spikes;
if (!off_grid_spiking_) //on grid spiking
// make sure buffers are correctly sized and empty
std::vector<uint_t> tmp(global_grid_spikes_.size(), 0);
if (global_grid_spikes_.size() != static_cast<uint_t>(Communicator::get_recv_buffer_size()))
global_grid_spikes_.resize(Communicator::get_recv_buffer_size(), 0);
std::vector<uint_t> tmp2(local_grid_spikes_.size(), 0);
if (num_spikes + (n_threads_ * min_delay_) > static_cast<uint_t>(Communicator::get_send_buffer_size()))
local_grid_spikes_.resize((num_spikes + (min_delay_ * n_threads_)),0);
else if (local_grid_spikes_.size() < static_cast<uint_t>(Communicator::get_send_buffer_size()))
local_grid_spikes_.resize(Communicator::get_send_buffer_size(), 0);
// collocate the entries of spike_registers into local_grid_spikes__
std::vector<uint_t>::iterator pos = local_grid_spikes_.begin();
if (num_offgrid_spikes == 0)
for (i = spike_register_.begin(); i != spike_register_.end(); ++i)
for (j = i->begin(); j != i->end(); ++j)
pos = std::copy(j->begin(), j->end(), pos);
*pos = comm_marker_;
std::vector<OffGridSpike>::iterator n;
it = offgrid_spike_register_.begin();
for (i = spike_register_.begin(); i != spike_register_.end(); ++i)
jt = it->begin();
for (j = i->begin(); j != i->end(); ++j)
pos = std::copy(j->begin(), j->end(), pos);
for (n = jt->begin() ; n != jt->end() ; ++n )
*pos = n->get_gid();
*pos = comm_marker_;
for (it = offgrid_spike_register_.begin(); it != offgrid_spike_register_.end(); ++it)
for (jt = it->begin(); jt != it->end(); ++jt)
// remove old spikes from the spike_register_
for (i = spike_register_.begin(); i != spike_register_.end(); ++i)
for (j = i->begin(); j != i->end(); ++j)
else //off_grid_spiking
// make sure buffers are correctly sized and empty
std::vector<OffGridSpike> tmp(global_offgrid_spikes_.size(), OffGridSpike(0,0.0));
if (global_offgrid_spikes_.size() != static_cast<uint_t>(Communicator::get_recv_buffer_size()))
global_offgrid_spikes_.resize(Communicator::get_recv_buffer_size(), OffGridSpike(0,0.0));
std::vector<OffGridSpike> tmp2(local_offgrid_spikes_.size(), OffGridSpike(0,0.0));
if (num_spikes + (n_threads_ * min_delay_) > static_cast<uint_t>(Communicator::get_send_buffer_size()))
local_offgrid_spikes_.resize((num_spikes + (min_delay_ * n_threads_)), OffGridSpike(0,0.0));
else if (local_offgrid_spikes_.size() < static_cast<uint_t>(Communicator::get_send_buffer_size()))
local_offgrid_spikes_.resize(Communicator::get_send_buffer_size(), OffGridSpike(0,0.0));
// collocate the entries of spike_registers into local_offgrid_spikes__
std::vector<OffGridSpike>::iterator pos = local_offgrid_spikes_.begin();
if (num_grid_spikes == 0)
for (it = offgrid_spike_register_.begin(); it != offgrid_spike_register_.end(); ++it)
for (jt = it->begin(); jt != it->end(); ++jt)
pos = std::copy(jt->begin(), jt->end(), pos);
std::vector<uint_t>::iterator n;
i = spike_register_.begin();
for (it = offgrid_spike_register_.begin(); it != offgrid_spike_register_.end(); ++it)
j = i->begin();
for (jt = it->begin(); jt != it->end(); ++jt)
pos = std::copy(jt->begin(), jt->end(), pos);
for (n = j->begin() ; n != j->end() ; ++n )
*pos = OffGridSpike(*n,0);
for (i = spike_register_.begin(); i != spike_register_.end(); ++i)
for (j = i->begin(); j != i->end(); ++j)
//empty offgrid_spike_register_
for (it = offgrid_spike_register_.begin(); it != offgrid_spike_register_.end(); ++it)
for (jt = it->begin(); jt != it->end(); ++jt)
void nest::Scheduler::deliver_events_(thread t)
// deliver only at beginning of time slice
if ( from_step_ > 0 )
size_t n_markers = 0;
SpikeEvent se;
std::vector<int> pos(displacements_);
if (!off_grid_spiking_) //on_grid_spiking
for (size_t vp = 0; vp < (size_t)Communicator::get_num_virtual_processes(); ++vp)
size_t pid = get_process_id(vp);
int lag = min_delay_ - 1;
while(n_markers < min_delay_)
index nid = global_grid_spikes_[pos[pid]];
if (nid != comm_marker_)
// tell all local nodes about spikes on remote machines.
se.set_stamp(clock_ - Time::step(lag));
net_.connection_manager_.send(t, nid, se);
n_markers = 0;
else //off grid spiking
for (size_t vp = 0; vp < (size_t)Communicator::get_num_virtual_processes(); ++vp)
size_t pid = get_process_id(vp);
int lag = min_delay_ - 1;
while(n_markers < min_delay_)
index nid = global_offgrid_spikes_[pos[pid]].get_gid();
if (nid != comm_marker_)
// tell all local nodes about spikes on remote machines.
se.set_stamp(clock_ - Time::step(lag));
net_.connection_manager_.send(t, nid, se);
n_markers = 0;
void nest::Scheduler::gather_events_()
if (off_grid_spiking_)
Communicator::communicate(local_offgrid_spikes_, global_offgrid_spikes_, displacements_);
Communicator::communicate(local_grid_spikes_, global_grid_spikes_, displacements_);
void nest::Scheduler::advance_time_()
* After each time slice, we flip the value of the
* update_ref_ flag.
* The updated flag of a Node is changed after each
* update.
* We determine whether a Node was updated in the
* current cycle by comparing its updated flag with the update_ref
* flag.
update_ref_= !update_ref_;
// time now advanced time by the duration of the previous step
to_do_ -= to_step_ - from_step_;
// advance clock, update modulos, slice counter only if slice completed
if ( (delay)to_step_ == min_delay_ )
clock_ += Time::step(min_delay_);
from_step_ = 0;
from_step_ = to_step_;
long_t end_sim = from_step_ + to_do_;
if ( min_delay_ < (delay)end_sim )
to_step_ = min_delay_; // update to end of time slice
to_step_ = end_sim; // update to end of simulation time
assert(to_step_ - from_step_ <= (long_t)min_delay_);
void nest::Scheduler::print_progress_()
double_t rt_factor = 0.0;
if (t_slice_end_.tv_sec != 0)
long t_real_s = (t_slice_end_.tv_sec - t_slice_begin_.tv_sec) * 1e6; // usec
t_real_ += t_real_s + (t_slice_end_.tv_usec - t_slice_begin_.tv_usec); // usec
double_t t_real_acc = (t_real_) / 1000.; //ms
double_t t_sim_acc = (to_do_total_ - to_do_) * Time::get_resolution().get_ms();
rt_factor = t_sim_acc / t_real_acc;
int_t percentage = (100 - int(float(to_do_) / to_do_total_ * 100));
std::cout << "\r" << std::setw(3) << std::right << percentage << " %: "
<< "network time: " << std::fixed << std::setprecision(1) << clock_.get_ms() << " ms, "
<< "realtime factor: " << std::setprecision(4) << rt_factor
<< std::resetiosflags(std::ios_base::floatfield);
void nest::Scheduler::set_num_threads(thread n_threads)
n_threads_ = n_threads;
#ifdef _OPENMP