/*
 *  communicator.cpp
 *
 *  This file is part of NEST.
 *
 *  Copyright (C) 2004 The NEST Initiative
 *
 *  NEST is free software: you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation, either version 2 of the License, or
 *  (at your option) any later version.
 *
 *  NEST is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with NEST.  If not, see <http://www.gnu.org/licenses/>.
 *
 */

#include "config.h"

/* To avoid problems on BlueGene/L, mpi.h MUST be the 
   first included file after config.h.
*/
#ifdef HAVE_MPI
#include <mpi.h>
#endif /* #ifdef HAVE_MPI */

#include <limits>
#include <numeric>
#include <time.h>
#include <sys/time.h>  // required to fix header dependencies in OS X, HEP
#include <sys/times.h>
#include <sys/resource.h>
#include "communicator.h"
#include "communicator_impl.h"
#include "network.h"

#include "dictutils.h"
#include "nodelist.h"

nest::Network* nest::Communicator::net_ = 0;
int nest::Communicator::rank_ = 0;
int nest::Communicator::num_processes_ = 1;
int nest::Communicator::n_vps_ = 1;
int nest::Communicator::send_buffer_size_ = 1;
int nest::Communicator::recv_buffer_size_ = 1;
bool nest::Communicator::initialized_ = false;
bool nest::Communicator::use_Allgather_ = true;

#ifdef HAVE_MPI

#ifdef HAVE_MUSIC
MUSIC::Setup *nest::Communicator::music_setup=0;
MUSIC::Runtime *nest::Communicator::music_runtime=0;
#endif /* #ifdef HAVE_MUSIC */

// Variable to hold the MPI communicator to use.
#ifdef HAVE_MUSIC
MPI::Intracomm comm=0;
#else /* #ifdef HAVE_MUSIC */
MPI_Comm comm=0;
#endif /* #ifdef HAVE_MUSIC */

template<> MPI_Datatype MPI_Type<nest::int_t>::type = MPI_INT;
template<> MPI_Datatype MPI_Type<nest::double_t>::type = MPI_DOUBLE;
template<> MPI_Datatype MPI_Type<nest::long_t>::type = MPI_LONG;

MPI_Datatype MPI_OFFGRID_SPIKE = 0;

/* ------------------------------------------------------ */

unsigned int nest::Communicator::COMM_OVERFLOW_ERROR = std::numeric_limits<unsigned int>::max();

std::vector<int> nest::Communicator::comm_step_ = std::vector<int>();

/**
 * Set up MPI, establish number of processes and rank, and initialize
 * the vector of communication partners.
 */
void nest::Communicator::init (int* argc, char** argv[])
{
  /* Initialize MPI

     MPI_Init sets the working directory on all machines to the
     directory from which mpirun was called. This is usually what one
     intends.

     On some machines, eg Linux, executables compiled with MPI can be
     called without mpirun.  MPI_Init will then place NEST in the
     directory containing the NEST binary.  This is a user error: if
     compiled with mpi, nest must be run using mpirun or equivalent.
     Unfortunately, there seems to be no straightforward way to check
     if nest was started through mpirun.

     HEP,MD,AM 2006-08-04
  */

  int init;
  MPI_Initialized(&init);

  int provided_thread_level;
  if (init == 0)
  {
#ifdef HAVE_MUSIC
    music_setup = new MUSIC::Setup(*argc, *argv, MPI_THREAD_SERIALIZED, &provided_thread_level);
    // get a communicator from MUSIC
    comm = music_setup->communicator();
#else /* #ifdef HAVE_MUSIC */
    MPI_Init_thread(argc, argv, MPI_THREAD_SERIALIZED, &provided_thread_level);
    comm = MPI_COMM_WORLD;
#endif /* #ifdef HAVE_MUSIC */
  }

  MPI_Comm_size(comm, &num_processes_);
  MPI_Comm_rank(comm, &rank_);

  recv_buffer_size_ = send_buffer_size_ * num_processes_;
  
  //create off-grid-spike type for MPI communication
  //creating derived datatype
  OffGridSpike::assert_datatype_compatibility();
  MPI_Datatype source_types[2];
  int blockcounts[2];
  MPI_Aint offsets[2];
  MPI_Aint start_address,address;
  OffGridSpike ogs(0,0.0);

  //OffGridSpike.gid
  offsets[0] = 0;
  source_types[0] = MPI_DOUBLE;
  blockcounts[0] = 1;

  //OffGridSpike.offset
  MPI_Address(&(ogs.gid_),&start_address);
  MPI_Address(&(ogs.offset_),&address);
  offsets[1] = address-start_address;
  source_types[1] = MPI_DOUBLE;
  blockcounts[1] = 1;

  //generate and commit struct
  MPI_Type_struct(2, blockcounts, offsets, source_types, &MPI_OFFGRID_SPIKE);
  MPI_Type_commit(&MPI_OFFGRID_SPIKE);
  init_communication();
  initialized_ = true;
}

/**
 * Finish off MPI routines 
 */
void nest::Communicator::finalize()
{
  MPI_Type_free (&MPI_OFFGRID_SPIKE);

  int finalized;
  MPI_Finalized(&finalized);

  int initialized;
  MPI_Initialized(&initialized);

  if (finalized == 0 && initialized == 1) 
  {
    if (! net_->quit_by_error())
#ifdef HAVE_MUSIC
    {
      if (music_runtime == 0)
      {
        // we need a Runtime object to call finalize(), so we create
        // one, if we don't have one already
        music_runtime = new MUSIC::Runtime(music_setup, 1e-3);
      }

      music_runtime->finalize();
      delete music_runtime;
    }
#else /* #ifdef HAVE_MUSIC */
      MPI_Finalize();
#endif /* #ifdef HAVE_MUSIC */
    else
    {
      net_->message(SLIInterpreter::M_INFO, "Communicator::finalize()",
                    "Calling MPI_Abort() due to errors in the script.");
      MPI_Abort(MPI_COMM_WORLD, net_->get_exitcode());
    }
  }
}

void nest::Communicator::mpi_abort(int exitcode)
{
  MPI_Abort(MPI_COMM_WORLD, exitcode);
}


std::string nest::Communicator::get_processor_name()
{
  char name[1024];
  int len;
  MPI_Get_processor_name(name, &len);
  name[len] = '\0';
  return name;
}


/**
 * Perform the CPEX algorithm to determine the communication partner
 * of the process for each communication step.
 * See: A. Tam and C. Wang: Efficient scheduling of complete exchange
 *      on clusters. In 13th International Conference on Parallel
 *      and Distributed Computing Systems (PDCS 2000), Las Vegas, 2000
 */
void nest::Communicator::init_communication ()
{

  // number of communication steps required is (num_processes_ - 1) if
  // num_processes_ is even, and (num_processes_) if odd.
  int num_comm_steps = (num_processes_ - 1) + (num_processes_ % 2);
  comm_step_.clear();
  comm_step_.resize(num_comm_steps);

  int partner;
  for (int step = 1; step <= num_comm_steps; ++step)
  {
    if (rank_ < num_comm_steps)
      partner = step - rank_;
    else if (step % 2)
      partner = (step + num_comm_steps) / 2;
    else
      partner = step / 2;
    partner = (partner + num_comm_steps) % num_comm_steps;
    if (partner == rank_)
      partner = (num_processes_ % 2)? -1 : num_comm_steps;
    comm_step_[step - 1] = partner;
  }
}

void nest::Communicator::communicate (std::vector<uint_t>& send_buffer, 
				      std::vector<uint_t>& recv_buffer, 
				      std::vector<int>& displacements)
{
  if (num_processes_ == 1)    //purely thread-based
    {
      displacements[0] = 0;
      if (static_cast<uint_t>(recv_buffer_size_) < send_buffer.size())
	{
	  recv_buffer_size_ = send_buffer_size_ = send_buffer.size();
	  recv_buffer.resize(recv_buffer_size_);
	}
      recv_buffer.swap(send_buffer);
    }
  else if ((num_processes_ > 1) && use_Allgather_)   //communicate using Allgather
    communicate_Allgather(send_buffer, recv_buffer,displacements);
  else  
    communicate_CPEX(send_buffer, recv_buffer,displacements);
}

void nest::Communicator::communicate_Allgather (std::vector<uint_t>& send_buffer, 
						std::vector<uint_t>& recv_buffer, 
						std::vector<int>& displacements)
{  
  std::vector<int> recv_counts(num_processes_,send_buffer_size_);

  //attempt Allgather
  if (send_buffer.size() == static_cast<uint_t>(send_buffer_size_))
    MPI_Allgather(&send_buffer[0], send_buffer_size_, MPI_UNSIGNED, 
		  &recv_buffer[0], send_buffer_size_, MPI_UNSIGNED, comm);
  else
    {
      // DEC cxx required 0U literal, HEP 2007-03-26
      std::vector<uint_t> overflow_buffer(send_buffer_size_, 0U);
      overflow_buffer[0] = COMM_OVERFLOW_ERROR;
      overflow_buffer[1] = send_buffer.size();
      MPI_Allgather(&overflow_buffer[0], send_buffer_size_, MPI_UNSIGNED, 
		    &recv_buffer[0], send_buffer_size_, MPI_UNSIGNED, comm);
    }
  //check for overflow condition
  int disp = 0;
  uint_t max_recv_count = send_buffer_size_;
  bool overflow = false;
  for (int pid = 0; pid < num_processes_; ++pid)
    {
      uint_t block_disp = pid * send_buffer_size_;
      displacements[pid] = disp;
      if (recv_buffer[block_disp] == COMM_OVERFLOW_ERROR)
	{
	  overflow = true;
	  recv_counts[pid] = recv_buffer[block_disp + 1];
	  if (static_cast<uint_t>(recv_counts[pid]) > max_recv_count)
	    max_recv_count = recv_counts[pid];
	}
      disp += recv_counts[pid]; 
    }
	
  //do Allgatherv if necessary
  if (overflow)
    {
      recv_buffer.resize(disp,0);
      MPI_Allgatherv(&send_buffer[0], send_buffer.size(), MPI_UNSIGNED, 
		     &recv_buffer[0], &recv_counts[0], &displacements[0], MPI_UNSIGNED, comm);
      send_buffer_size_ = max_recv_count;
      recv_buffer_size_ = send_buffer_size_ * num_processes_;
    }
}
  
void nest::Communicator::communicate_CPEX (std::vector<uint_t>& send_buffer, 
					   std::vector<uint_t>& recv_buffer, 
					   std::vector<int>& displacements)
{      
  MPI_Status status;
  int partner;
  int disp;
  std::vector<int> recv_counts(num_processes_,send_buffer_size_);
  
  if (send_buffer.size() == static_cast<uint_t>(send_buffer_size_))    //no overflow condition
    {
      for (size_t step = 0; step < comm_step_.size(); ++step)
	{
	  partner = comm_step_[step];
	  disp = partner*send_buffer_size_;
	  if (partner >= 0)
	    {
	      if (rank_ < partner)
		{
		  MPI_Send(&send_buffer[0], send_buffer_size_, MPI_UNSIGNED, partner, 1, comm);
		  MPI_Recv(&recv_buffer[disp], send_buffer_size_, MPI_UNSIGNED, partner, 1, comm, &status);
		}
	      else
		{
		  MPI_Recv(&recv_buffer[disp], send_buffer_size_, MPI_UNSIGNED, partner, 1, comm, &status);
		  MPI_Send(&send_buffer[0], send_buffer_size_, MPI_UNSIGNED, partner, 1, comm);
		}
	    }
	}
      disp = rank_*send_buffer_size_;
      std::copy(send_buffer.begin(),send_buffer.end(),recv_buffer.begin() + disp);
      
    }
  else    //overflow
    {
      // DEC cxx required 0U literal, HEP 2007-03-26
      std::vector<uint_t> overflow_buffer(send_buffer_size_, 0U);
      overflow_buffer[0] = COMM_OVERFLOW_ERROR;
      overflow_buffer[1] = send_buffer.size();
      for (size_t step = 0; step < comm_step_.size(); ++step)
	{
	  partner = comm_step_[step];
	  disp = partner*send_buffer_size_;
	  displacements[partner] = disp;
	  if (partner >= 0)
	    {
	      if (rank_ < partner)
		{
		  MPI_Send(&overflow_buffer[0], send_buffer_size_, MPI_UNSIGNED, partner, 1, comm);
		  MPI_Recv(&recv_buffer[disp], send_buffer_size_, MPI_UNSIGNED, partner, 1, comm, &status);
		}
	      else
		{
		  MPI_Recv(&recv_buffer[disp], send_buffer_size_, MPI_UNSIGNED, partner, 1, comm, &status);
		  MPI_Send(&overflow_buffer[0], send_buffer_size_, MPI_UNSIGNED, partner, 1, comm);
		}
	    }
	}
      disp = rank_*send_buffer_size_;
      std::copy(overflow_buffer.begin(),overflow_buffer.end(),recv_buffer.begin() + disp);
    }

  //check for overflow in recv_buffer
  disp = 0;
  uint_t max_recv_count = send_buffer_size_;
  bool overflow = false;
  for (int pid = 0; pid < num_processes_; ++pid)
    {
      uint_t block_disp = pid * send_buffer_size_;
      displacements[pid] = disp;
      if (recv_buffer[block_disp] == COMM_OVERFLOW_ERROR)
	{
	  overflow = true;
	  recv_counts[pid] = recv_buffer[block_disp + 1];
	  if (static_cast<uint_t>(recv_counts[pid]) > max_recv_count)
	    max_recv_count = recv_counts[pid];
	}
      disp += recv_counts[pid]; 
    }

  if (overflow)
    {
      recv_buffer.resize(disp,0);
      for (size_t step = 0; step < comm_step_.size(); ++step)
	{
	  partner = comm_step_[step];
	  disp = displacements[partner];
	  if (partner >= 0)
	    {
	      if (rank_ < partner)
		{
		  MPI_Send(&send_buffer[0], send_buffer.size(), MPI_UNSIGNED, partner, 1, comm);
		  MPI_Recv(&recv_buffer[disp], recv_counts[partner], MPI_UNSIGNED, partner, 1, comm, &status);
		}
	      else
		{
		  MPI_Recv(&recv_buffer[disp], recv_counts[partner], MPI_UNSIGNED, partner, 1, comm, &status);
		  MPI_Send(&send_buffer[0], send_buffer.size(), MPI_UNSIGNED, partner, 1, comm);
		}
	    }
	}
      std::copy(send_buffer.begin(),send_buffer.end(),recv_buffer.begin() + displacements[rank_]);
      send_buffer_size_ = max_recv_count;
      recv_buffer_size_ = send_buffer_size_ * num_processes_;
    }
}

template <typename T>
void nest::Communicator::communicate_Allgather(std::vector<T>& send_buffer, 
					       std::vector<T>& recv_buffer, 
					       std::vector<int>& displacements)
{  
  std::vector<int> recv_counts(num_processes_,send_buffer_size_);

  //attempt Allgather
  if (send_buffer.size() == static_cast<uint_t>(send_buffer_size_))
    MPI_Allgather(&send_buffer[0], send_buffer_size_, MPI_Type<T>::type, 
		  &recv_buffer[0], send_buffer_size_, MPI_Type<T>::type, comm);
  else
    {
      // DEC cxx required 0U literal, HEP 2007-03-26
      std::vector<uint_t> overflow_buffer(send_buffer_size_, 0U);
      overflow_buffer[0] = COMM_OVERFLOW_ERROR;
      overflow_buffer[1] = send_buffer.size();
      MPI_Allgather(&overflow_buffer[0], send_buffer_size_, MPI_Type<T>::type, 
		    &recv_buffer[0], send_buffer_size_, MPI_Type<T>::type, comm);
    }
  //check for overflow condition
  int disp = 0;
  uint_t max_recv_count = send_buffer_size_;
  bool overflow = false;
  for (int pid = 0; pid < num_processes_; ++pid)
    {
      uint_t block_disp = pid * send_buffer_size_;
      displacements[pid] = disp;
      if (recv_buffer[block_disp] == COMM_OVERFLOW_ERROR)
	{
	  overflow = true;
	  recv_counts[pid] = recv_buffer[block_disp + 1];
	  if (static_cast<uint_t>(recv_counts[pid]) > max_recv_count)
	    max_recv_count = recv_counts[pid];
	}
      disp += recv_counts[pid]; 
    }
	
  //do Allgatherv if necessary
  if (overflow)
    {
      recv_buffer.resize(disp,0);
      MPI_Allgatherv(&send_buffer[0], send_buffer.size(), MPI_Type<T>::type, 
		     &recv_buffer[0], &recv_counts[0], &displacements[0], MPI_Type<T>::type, comm);
      send_buffer_size_ = max_recv_count;
      recv_buffer_size_ = send_buffer_size_ * num_processes_;
    }
}

void nest::Communicator::communicate (std::vector<OffGridSpike>& send_buffer, 
				      std::vector<OffGridSpike>& recv_buffer, 
				      std::vector<int>& displacements)
{
  if (num_processes_ == 1)    //purely thread-based
    {
      displacements[0] = 0;
      if (static_cast<uint_t>(recv_buffer_size_) < send_buffer.size())
	{
	  recv_buffer_size_ = send_buffer_size_ = send_buffer.size();
	  recv_buffer.resize(recv_buffer_size_);
	}
      recv_buffer.swap(send_buffer);
    }
  else if ((num_processes_ > 1) && use_Allgather_)   //communicate using Allgather
    communicate_Allgather(send_buffer, recv_buffer,displacements);
  else  
    communicate_CPEX(send_buffer, recv_buffer,displacements);
}

void nest::Communicator::communicate_Allgather (std::vector<OffGridSpike>& send_buffer, 
						std::vector<OffGridSpike>& recv_buffer, 
						std::vector<int>& displacements)
{  
  std::vector<int> recv_counts(num_processes_,send_buffer_size_);
  //attempt Allgather
  if (send_buffer.size() == static_cast<uint_t>(send_buffer_size_))
    MPI_Allgather(&send_buffer[0], send_buffer_size_, MPI_OFFGRID_SPIKE, 
		  &recv_buffer[0], send_buffer_size_, MPI_OFFGRID_SPIKE, comm);
  else
    {
      std::vector<OffGridSpike> overflow_buffer(send_buffer_size_);
      overflow_buffer[0] = OffGridSpike(COMM_OVERFLOW_ERROR,0.0);
      overflow_buffer[1] = OffGridSpike(send_buffer.size(),0.0);
      MPI_Allgather(&overflow_buffer[0], send_buffer_size_, MPI_OFFGRID_SPIKE, 
		    &recv_buffer[0], send_buffer_size_, MPI_OFFGRID_SPIKE, comm);
    }

  //check for overflow condition
  int disp = 0;
  uint_t max_recv_count = send_buffer_size_;
  bool overflow = false;
  for (int pid = 0; pid < num_processes_; ++pid)
    {
      uint_t block_disp = pid * send_buffer_size_;
      displacements[pid] = disp;
      if ((recv_buffer[block_disp]).get_gid() == COMM_OVERFLOW_ERROR)
	{
	  overflow = true;
	  recv_counts[pid] = (recv_buffer[block_disp + 1]).get_gid();
	  if (static_cast<uint_t>(recv_counts[pid]) > max_recv_count)
	    max_recv_count = recv_counts[pid];
	}
      disp += recv_counts[pid]; 
    }
	
  //do Allgatherv if necessary
  if (overflow)
    {
      recv_buffer.resize(disp);
      MPI_Allgatherv(&send_buffer[0], send_buffer.size(), MPI_OFFGRID_SPIKE, 
		     &recv_buffer[0], &recv_counts[0], &displacements[0], 
		     MPI_OFFGRID_SPIKE, comm);
      send_buffer_size_ = max_recv_count;
      recv_buffer_size_ = send_buffer_size_ * num_processes_;
    }
}

void nest::Communicator::communicate_CPEX (std::vector<OffGridSpike>& send_buffer, 
					   std::vector<OffGridSpike>& recv_buffer, 
					   std::vector<int>& displacements)
{      
  MPI_Status status;
  int partner;
  int disp;
  std::vector<int> recv_counts(num_processes_,send_buffer_size_);
  
  
  if (send_buffer.size() == static_cast<uint_t>(send_buffer_size_)) //no overflow condition
    {
      for (size_t step = 0; step < comm_step_.size(); ++step)
	{
	  partner = comm_step_[step];
	  disp = partner*send_buffer_size_;
	  if (partner >= 0)
	    {
	      if (rank_ < partner)
		{
		  MPI_Send(&send_buffer[0], send_buffer_size_, MPI_OFFGRID_SPIKE, 
			   partner, 1, comm);
		  MPI_Recv(&recv_buffer[disp], send_buffer_size_, MPI_OFFGRID_SPIKE, 
			   partner, 1, comm, &status);
		}
	      else
		{
		  MPI_Recv(&recv_buffer[disp], send_buffer_size_, MPI_OFFGRID_SPIKE, 
			   partner, 1, comm, &status);
		  MPI_Send(&send_buffer[0], send_buffer_size_, MPI_OFFGRID_SPIKE,
			   partner, 1, comm);
		}
	    }
	}
      disp = rank_*send_buffer_size_;
      std::copy(send_buffer.begin(),send_buffer.end(),recv_buffer.begin() + disp);
      
    }
  else    //overflow
    {
      std::vector<OffGridSpike> overflow_buffer(send_buffer_size_);
      overflow_buffer[0] = OffGridSpike(COMM_OVERFLOW_ERROR,0.0);
      overflow_buffer[1] = OffGridSpike(send_buffer.size(),0.0);
      for (size_t step = 0; step < comm_step_.size(); ++step)
	{
	  partner = comm_step_[step];
	  disp = partner*send_buffer_size_;
	  displacements[partner] = disp;
	  if (partner >= 0)
	    {
	      if (rank_ < partner)
		{
		  MPI_Send(&overflow_buffer[0], send_buffer_size_, MPI_OFFGRID_SPIKE, 
			   partner, 1, comm);
		  MPI_Recv(&recv_buffer[disp], send_buffer_size_, MPI_OFFGRID_SPIKE, 
			   partner, 1, comm, &status);
		}
	      else
		{
		  MPI_Recv(&recv_buffer[disp], send_buffer_size_, MPI_OFFGRID_SPIKE,
			   partner, 1, comm, &status);
		  MPI_Send(&overflow_buffer[0], send_buffer_size_, MPI_OFFGRID_SPIKE, 
			   partner, 1, comm);
		}
	    }
	}
      disp = rank_*send_buffer_size_;
      std::copy(overflow_buffer.begin(),overflow_buffer.end(),recv_buffer.begin() + disp);
    }

  //check for overflow in recv_buffer
  disp = 0;
  uint_t max_recv_count = send_buffer_size_;
  bool overflow = false;
  for (int pid = 0; pid < num_processes_; ++pid)
    {
      uint_t block_disp = pid * send_buffer_size_;
      displacements[pid] = disp;
      if ((recv_buffer[block_disp]).get_gid() == COMM_OVERFLOW_ERROR)
	{
	  overflow = true;
	  recv_counts[pid] = (recv_buffer[block_disp + 1]).get_gid();
	  if (static_cast<uint_t>(recv_counts[pid]) > max_recv_count)
	    max_recv_count = recv_counts[pid];
	}
      disp += recv_counts[pid]; 
    }

  if (overflow)
    {
      recv_buffer.resize(disp);
      for (size_t step = 0; step < comm_step_.size(); ++step)
	{
	  partner = comm_step_[step];
	  disp = displacements[partner];
	  if (partner >= 0)
	    {
	      if (rank_ < partner)
		{
		  MPI_Send(&send_buffer[0], send_buffer.size(), MPI_OFFGRID_SPIKE, 
			   partner, 1, comm);
		  MPI_Recv(&recv_buffer[disp], recv_counts[partner], MPI_OFFGRID_SPIKE, 
			   partner, 1, comm, &status);
		}
	      else
		{
		  MPI_Recv(&recv_buffer[disp], recv_counts[partner], MPI_OFFGRID_SPIKE, 
			   partner, 1, comm, &status);
		  MPI_Send(&send_buffer[0], send_buffer.size(), MPI_OFFGRID_SPIKE, 
			   partner, 1, comm);
		}
	    }
	}
      std::copy(send_buffer.begin(),send_buffer.end(),recv_buffer.begin() + displacements[rank_]);
      send_buffer_size_ = max_recv_count;
      recv_buffer_size_ = send_buffer_size_ * num_processes_;
    }
}


void nest::Communicator::communicate(std::vector<double_t>& send_buffer,
                                     std::vector<double_t>& recv_buffer,
                                     std::vector<int>& displacements)
{
  //get size of buffers
  std::vector<int> n_nodes(num_processes_);
  n_nodes[Communicator::rank_] = send_buffer.size();
  communicate(n_nodes);
  // Set up displacements vector.
  displacements.resize(num_processes_,0);

  for ( int i = 1; i < num_processes_; ++i )
    displacements.at(i) = displacements.at(i-1)+n_nodes.at(i-1);

  // Calculate total number of node data items to be gathered.
  size_t n_globals =
    std::accumulate(n_nodes.begin(),n_nodes.end(), 0);

  if (n_globals != 0) {
    recv_buffer.resize(n_globals,0.0);
    communicate_Allgatherv(send_buffer, recv_buffer, displacements, n_nodes);
  } else {
    recv_buffer.clear();
  }
}


void nest::Communicator::communicate(double_t send_val, std::vector<double_t>& recv_buffer)
{
  recv_buffer.resize(num_processes_);
  MPI_Allgather(&send_val, 1, MPI_DOUBLE, &recv_buffer[0], 1, MPI_DOUBLE, comm);
}


/**
 * communicate function for sending set-up information
 */
void nest::Communicator::communicate(std::vector<int_t>& buffer)
{
  if ( use_Allgather_ )
    communicate_Allgather(buffer);
  else
    communicate_CPEX(buffer);
}

void nest::Communicator::communicate_Allgather(std::vector<int_t>& buffer)
{
  // avoid aliasing, see http://www.mpi-forum.org/docs/mpi-11-html/node10.html
  int_t my_val = buffer[rank_];
  MPI_Allgather(&my_val, 1, MPI_INT, &buffer[0], 1, MPI_INT, comm);
}

void nest::Communicator::communicate_CPEX(std::vector<int_t>& buffer)
{
  MPI_Status status;
  int partner;
  for (size_t step = 0; step < comm_step_.size(); ++step)
  {
    partner = comm_step_[step];
    if (partner >= 0)
    {
      if (rank_ < partner) // smaller process number sends first
      {
        MPI_Send(&(buffer[rank_]), 1, MPI_INT, partner, 1, comm);
        MPI_Recv(&(buffer[partner]), 1, MPI_INT, partner, 1, comm, &status);
      }
      else
      {
        MPI_Recv(&buffer[partner], 1, MPI_INT, partner, 1, comm, &status);
        MPI_Send(&buffer[rank_], 1, MPI_INT, partner, 1, comm);
      }
    }
  }
}



		
/**
 * Ensure all processes have reached the same stage by waiting until all
 * processes have sent a dummy message to process 0.
 */
void nest::Communicator::synchronize()
{
  MPI_Barrier(comm);
}

void nest::Communicator::test_link(int sender, int receiver)
{
  assert(sender < num_processes_ && receiver < num_processes_);

  if (num_processes_ > 1)
  {  
    long dummy = 1;
    MPI_Status status;

    if (rank_ == sender)
      MPI_Ssend(&dummy, 1, MPI_LONG, receiver, 0, comm);
    else if (rank_ == receiver)
    {
      MPI_Recv(&dummy, 1, MPI_LONG, sender, 0, comm, &status);
      // std::cerr << "link between " << sender << " and " << receiver << " works" << std::endl;
    }
  }
} 

void nest::Communicator::test_links()
{
  for (int i = 0; i < num_processes_; ++i)
    for (int j = 0; j < num_processes_; ++j)
      if (i != j)
        test_link(i, j);
  // std::cerr << "all links are working" << std::endl;
}

// grng_synchrony: called at the beginning of each simulate
bool nest::Communicator::grng_synchrony(unsigned long process_rnd_number)
{
  if (num_processes_ > 1)
  {
    std::vector<unsigned long> rnd_numbers(num_processes_);
    MPI_Allgather(&process_rnd_number, 1, MPI_UNSIGNED_LONG, &rnd_numbers[0], 1, MPI_UNSIGNED_LONG, comm);
    // compare all rnd numbers
    for(uint_t i = 1; i < rnd_numbers.size(); ++i)
    {
      if (rnd_numbers[i-1] != rnd_numbers[i])
      {
	return false;
      }
    }
  }
  return true;
}

//average communication time for a packet size of num_bytes
nest::double_t nest::Communicator::time_communicate(int num_bytes, int samples) 
{ 
  if (num_processes_ == 1) 
    return 0.0; 
  uint_t packet_length = num_bytes/sizeof(uint_t);
  if (packet_length < 1)
    packet_length = 1; 
  std::vector<uint_t> test_send_buffer(packet_length); 
  std::vector<uint_t> test_recv_buffer(packet_length*num_processes_);
  //start time measurement here 
  struct tms foo; 
  const clock_t starttime = times(&foo); 
  for (int i = 0; i < samples; ++i) 
    MPI_Allgather(&test_send_buffer[0], packet_length, MPI_UNSIGNED, 
    &test_recv_buffer[0], packet_length, MPI_UNSIGNED, MPI_COMM_WORLD); 
  //finish time measurement here 
  const clock_t finishtime = times(&foo); 
  double_t total_duration = (double)(finishtime-starttime); 
  return total_duration/(samples * sysconf(_SC_CLK_TCK));
}
 
//average communication time for a packet size of num_bytes
nest::double_t nest::Communicator::time_communicate_offgrid(int num_bytes, int samples) 
{ 
  if (num_processes_ == 1) 
    return 0.0; 
  uint_t packet_length = num_bytes/sizeof(OffGridSpike);
  if (packet_length < 1)
    packet_length = 1; 
  std::vector<OffGridSpike> test_send_buffer(packet_length); 
  std::vector<OffGridSpike> test_recv_buffer(packet_length*num_processes_);
  //start time measurement here 
  struct tms foo; 
  const clock_t starttime = times(&foo); 
  for (int i = 0; i < samples; ++i) 
    MPI_Allgather(&test_send_buffer[0], packet_length, MPI_OFFGRID_SPIKE, 
    &test_recv_buffer[0], packet_length, MPI_OFFGRID_SPIKE, MPI_COMM_WORLD); 
  //finish time measurement here 
  const clock_t finishtime = times(&foo); 
  double_t total_duration = (double)(finishtime-starttime); 
  return total_duration/(samples * sysconf(_SC_CLK_TCK));
}

void nest::Communicator::communicate_connector_properties(DictionaryDatum& dict)
{
  // Confirm that we're having a MPI process
  if (num_processes_ > 1)
    {
      // Move local dictionary values to temporary storage vectors.
      std::vector<nest::long_t> targets = 
	getValue<std::vector<nest::long_t> >(dict, "targets");

      std::vector<nest::double_t> weights = 
	getValue<std::vector<nest::double_t> >(dict, "weights");

      std::vector<nest::double_t> delays = 
	getValue<std::vector<nest::double_t> >(dict, "delays");

      std::vector<nest::long_t> receptors = 
	getValue<std::vector<nest::long_t> >(dict, "receptors");
      
      // Calculate size of communication buffers (number of connections).
      std::vector<nest::int_t> num_connections(num_processes_);
      num_connections[rank_] = targets.size();
      communicate(num_connections);
      
      // Set up displacements vector.
      std::vector<int> displacements(num_processes_, 0); 

      for ( size_t i = 1; i < num_connections.size(); ++i )
	displacements.at(i) = displacements.at(i-1)+num_connections.at(i-1);
      
      // Calculate sum of global connections.
      nest::int_t num_connections_sum = 
	std::accumulate(num_connections.begin(),num_connections.end(), 0);
      
      if(num_connections_sum != 0)
	{
	  // Create global buffers.
	  std::vector<nest::long_t> 
	    targets_result(num_connections_sum, 0);
	  
	  std::vector<nest::long_t> 
	    receptors_result(num_connections_sum, 0);
	  
	  std::vector<nest::double_t> 
	    weights_result(num_connections_sum, 0);
	  
	  std::vector<nest::double_t> 
	    delays_result(num_connections_sum, 0);
	  
	  // Start communication.
	  communicate_Allgatherv<nest::long_t>(targets, targets_result, 
					       displacements, num_connections);
	  
	  communicate_Allgatherv<nest::long_t>(receptors, receptors_result, 
					       displacements, num_connections);
	  
	  communicate_Allgatherv<nest::double_t>(weights, weights_result, 
						 displacements, num_connections);
	  
	  communicate_Allgatherv<nest::double_t>(delays, delays_result, 
						 displacements, num_connections);

	  // Save global values in input dictionary.
	  (*dict)["targets"] = targets_result;
	  (*dict)["receptors"] = receptors_result;
	  (*dict)["weights"] = weights_result;
	  (*dict)["delays"] = delays_result;
	}
    }
}


#ifdef HAVE_MUSIC // functions for interaction with MUSIC library
MUSIC::Setup* nest::Communicator::get_music_setup()
{
  return music_setup;
}

MUSIC::Runtime* nest::Communicator::get_music_runtime()
{
  return music_runtime;
}

void nest::Communicator::enter_runtime(double_t h_min_delay)
{
  // MUSIC needs the step size in seconds
  //std::cout << "nest::Communicator::enter_runtime\n";
  //std::cout << "timestep = " << h_min_delay*1e-3 << std::endl;

  if (music_runtime == 0)
    music_runtime = new MUSIC::Runtime(music_setup, h_min_delay*1e-3);
}

void nest::Communicator::advance_music_time(long_t num_steps)
{
  for (int s = 0; s < num_steps; s++)
    music_runtime->tick();
}
#endif /* #ifdef HAVE_MUSIC */

#else /* #ifdef HAVE_MPI */

/**
 * communicate (on-grid) if compiled without MPI
 */
void nest::Communicator::communicate (std::vector<uint_t>& send_buffer, 
				      std::vector<uint_t>& recv_buffer, 
				      std::vector<int>& displacements)
{
  displacements[0] = 0;
  if (static_cast<size_t>(recv_buffer_size_) < send_buffer.size())
    {
      recv_buffer_size_ = send_buffer_size_ = send_buffer.size();
      recv_buffer.resize(recv_buffer_size_);
    }
  recv_buffer.swap(send_buffer);
}

/**
 * communicate (off-grid) if compiled without MPI
 */
void nest::Communicator::communicate (std::vector<OffGridSpike>& send_buffer, 
				      std::vector<OffGridSpike>& recv_buffer, 
				      std::vector<int>& displacements)
{
  displacements[0] = 0;
  if (static_cast<size_t>(recv_buffer_size_) < send_buffer.size())
    {
      recv_buffer_size_ = send_buffer_size_ = send_buffer.size();
      recv_buffer.resize(recv_buffer_size_);
    }
  recv_buffer.swap(send_buffer);
}

void nest::Communicator::communicate(std::vector<double_t>& send_buffer,
                                     std::vector<double_t>& recv_buffer,
                                     std::vector<int>& displacements)
{
  displacements.resize(1);
  displacements[0] = 0;
  recv_buffer.swap(send_buffer);
}

void nest::Communicator::communicate(double_t send_val, std::vector<double_t>& recv_buffer)
{
  recv_buffer.resize(1);
  recv_buffer[0] = send_val;
}

#endif /* #ifdef HAVE_MPI */