/*! \file
* \brief Thread pool core.
*
* This file contains the threadpool's core class: pool<Task, SchedulingPolicy>.
*
* Thread pools are a mechanism for asynchronous and parallel processing 
* within the same process. The pool class provides a convenient way 
* for dispatching asynchronous tasks as functions objects. The scheduling
* of these tasks can be easily controlled by using customized schedulers. 
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are  subject to the
* Boost Software License, Version 1.0. (See accompanying  file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/


#ifndef THREADPOOL_POOL_CORE_HPP_INCLUDED
#define THREADPOOL_POOL_CORE_HPP_INCLUDED




#include "locking_ptr.hpp"
#include "worker_thread.hpp"

#include "../task_adaptors.hpp"

#include <boost/thread.hpp>
#include <boost/thread/exceptions.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <boost/smart_ptr.hpp>
#include <boost/bind.hpp>
#include <boost/static_assert.hpp>
#include <boost/type_traits.hpp>

#include <vector>


/// The namespace threadpool contains a thread pool and related utility classes.
namespace boost { namespace threadpool { namespace detail 
{

  /*! \brief Thread pool. 
  *
  * Thread pools are a mechanism for asynchronous and parallel processing 
  * within the same process. The pool class provides a convenient way 
  * for dispatching asynchronous tasks as functions objects. The scheduling
  * of these tasks can be easily controlled by using customized schedulers. 
  * A task must not throw an exception.
  *
  * A pool_impl is DefaultConstructible and NonCopyable.
  *
  * \param Task A function object which implements the operator 'void operator() (void) const'. The operator () is called by the pool to execute the task. Exceptions are ignored.
  * \param Scheduler A task container which determines how tasks are scheduled. It is guaranteed that this container is accessed only by one thread at a time. The scheduler shall not throw exceptions.
  *
  * \remarks The pool class is thread-safe.
  * 
  * \see Tasks: task_func, prio_task_func
  * \see Scheduling policies: fifo_scheduler, lifo_scheduler, prio_scheduler
  */ 
  template <
    typename Task, 

    template <typename> class SchedulingPolicy,
    template <typename> class SizePolicy,
    template <typename> class SizePolicyController,
    template <typename> class ShutdownPolicy
  > 
  class pool_core
  : public enable_shared_from_this< pool_core<Task, SchedulingPolicy, SizePolicy, SizePolicyController, ShutdownPolicy > > 
  , private noncopyable
  {

  public: // Type definitions
    typedef Task task_type;                                 //!< Indicates the task's type.
    typedef SchedulingPolicy<task_type> scheduler_type;     //!< Indicates the scheduler's type.
    typedef pool_core<Task, 
                      SchedulingPolicy, 
                      SizePolicy,
                      SizePolicyController,
                      ShutdownPolicy > pool_type;           //!< Indicates the thread pool's type.
    typedef SizePolicy<pool_type> size_policy_type;         //!< Indicates the sizer's type.
    //typedef typename size_policy_type::size_controller size_controller_type;

    typedef SizePolicyController<pool_type> size_controller_type;

//    typedef SizePolicy<pool_type>::size_controller size_controller_type;
    typedef ShutdownPolicy<pool_type> shutdown_policy_type;//!< Indicates the shutdown policy's type.  

    typedef worker_thread<pool_type> worker_type;

    // The task is required to be a nullary function.
    BOOST_STATIC_ASSERT(function_traits<task_type()>::arity == 0);

    // The task function's result type is required to be void.
    BOOST_STATIC_ASSERT(is_void<typename result_of<task_type()>::type >::value);


  private:  // Friends 
    friend class worker_thread<pool_type>;

#if defined(__SUNPRO_CC) && (__SUNPRO_CC <= 0x580)  // Tested with CC: Sun C++ 5.8 Patch 121018-08 2006/12/06
   friend class SizePolicy;
   friend class ShutdownPolicy;
#else
   friend class SizePolicy<pool_type>;
   friend class ShutdownPolicy<pool_type>;
#endif

  private: // The following members may be accessed by _multiple_ threads at the same time:
    volatile size_t m_worker_count;	
    volatile size_t m_target_worker_count;	
    volatile size_t m_active_worker_count;
      


  private: // The following members are accessed only by _one_ thread at the same time:
    scheduler_type  m_scheduler;
    scoped_ptr<size_policy_type> m_size_policy; // is never null
    
    bool  m_terminate_all_workers;								// Indicates if termination of all workers was triggered.
    std::vector<shared_ptr<worker_type> > m_terminated_workers; // List of workers which are terminated but not fully destructed.
    
  private: // The following members are implemented thread-safe:
    mutable recursive_mutex  m_monitor;
    mutable condition m_worker_idle_or_terminated_event;	// A worker is idle or was terminated.
    mutable condition m_task_or_terminate_workers_event;  // Task is available OR total worker count should be reduced.

  public:
    /// Constructor.
    pool_core()
      : m_worker_count(0) 
      , m_target_worker_count(0)
      , m_active_worker_count(0)
      , m_terminate_all_workers(false)
    {
      pool_type volatile & self_ref = *this;
      m_size_policy.reset(new size_policy_type(self_ref));

      m_scheduler.clear();
    }


    /// Destructor.
    ~pool_core()
    {
    }

    /*! Gets the size controller which manages the number of threads in the pool. 
    * \return The size controller.
    * \see SizePolicy
    */
    size_controller_type size_controller()
    {
      return size_controller_type(*m_size_policy, this->shared_from_this());
    }

    /*! Gets the number of threads in the pool.
    * \return The number of threads.
    */
    size_t size()	const volatile
    {
      return m_worker_count;
    }

// TODO is only called once
    void shutdown()
    {
      ShutdownPolicy<pool_type>::shutdown(*this);
    }

    /*! Schedules a task for asynchronous execution. The task will be executed once only.
    * \param task The task function object. It should not throw execeptions.
    * \return true, if the task could be scheduled and false otherwise. 
    */  
    bool schedule(task_type const & task) volatile
    {	
      locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor); 
      
      if(lockedThis->m_scheduler.push(task))
      {
        lockedThis->m_task_or_terminate_workers_event.notify_one();
        return true;
      }
      else
      {
        return false;
      }
    }	


    /*! Returns the number of tasks which are currently executed.
    * \return The number of active tasks. 
    */  
    size_t active() const volatile
    {
      return m_active_worker_count;
    }


    /*! Returns the number of tasks which are ready for execution.    
    * \return The number of pending tasks. 
    */  
    size_t pending() const volatile
    {
      locking_ptr<const pool_type, recursive_mutex> lockedThis(*this, m_monitor);
      return lockedThis->m_scheduler.size();
    }


    /*! Removes all pending tasks from the pool's scheduler.
    */  
    void clear() volatile
    { 
      locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
      lockedThis->m_scheduler.clear();
    }    


    /*! Indicates that there are no tasks pending. 
    * \return true if there are no tasks ready for execution.	
    * \remarks This function is more efficient that the check 'pending() == 0'.
    */   
    bool empty() const volatile
    {
      locking_ptr<const pool_type, recursive_mutex> lockedThis(*this, m_monitor);
      return lockedThis->m_scheduler.empty();
    }	


    /*! The current thread of execution is blocked until the sum of all active
    *  and pending tasks is equal or less than a given threshold. 
    * \param task_threshold The maximum number of tasks in pool and scheduler.
    */     
    void wait(size_t const task_threshold = 0) const volatile
    {
      const pool_type* self = const_cast<const pool_type*>(this);
      recursive_mutex::scoped_lock lock(self->m_monitor);

      if(0 == task_threshold)
      {
        while(0 != self->m_active_worker_count || !self->m_scheduler.empty())
        { 
          self->m_worker_idle_or_terminated_event.wait(lock);
        }
      }
      else
      {
        while(task_threshold < self->m_active_worker_count + self->m_scheduler.size())
        { 
          self->m_worker_idle_or_terminated_event.wait(lock);
        }
      }
    }	

    /*! The current thread of execution is blocked until the timestamp is met
    * or the sum of all active and pending tasks is equal or less 
    * than a given threshold.  
    * \param timestamp The time when function returns at the latest.
    * \param task_threshold The maximum number of tasks in pool and scheduler.
    * \return true if the task sum is equal or less than the threshold, false otherwise.
    */       
    bool wait(xtime const & timestamp, size_t const task_threshold = 0) const volatile
    {
      const pool_type* self = const_cast<const pool_type*>(this);
      recursive_mutex::scoped_lock lock(self->m_monitor);

      if(0 == task_threshold)
      {
        while(0 != self->m_active_worker_count || !self->m_scheduler.empty())
        { 
          if(!self->m_worker_idle_or_terminated_event.timed_wait(lock, timestamp)) return false;
        }
      }
      else
      {
        while(task_threshold < self->m_active_worker_count + self->m_scheduler.size())
        { 
          if(!self->m_worker_idle_or_terminated_event.timed_wait(lock, timestamp)) return false;
        }
      }

      return true;
    }


  private:	


    void terminate_all_workers(bool const wait) volatile
    {
      pool_type* self = const_cast<pool_type*>(this);
      recursive_mutex::scoped_lock lock(self->m_monitor);

      self->m_terminate_all_workers = true;

      m_target_worker_count = 0;
      self->m_task_or_terminate_workers_event.notify_all();

      if(wait)
      {
        while(m_active_worker_count > 0)
        {
          self->m_worker_idle_or_terminated_event.wait(lock);
        }

        for(typename std::vector<shared_ptr<worker_type> >::iterator it = self->m_terminated_workers.begin();
          it != self->m_terminated_workers.end();
          ++it)
        {
          (*it)->join();
        }
        self->m_terminated_workers.clear();
      }
    }


    /*! Changes the number of worker threads in the pool. The resizing 
    *  is handled by the SizePolicy.
    * \param threads The new number of worker threads.
    * \return true, if pool will be resized and false if not. 
    */
    bool resize(size_t const worker_count) volatile
    {
      locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor); 

      if(!m_terminate_all_workers)
      {
        m_target_worker_count = worker_count;
      }
      else
      { 
        return false;
      }


      if(m_worker_count <= m_target_worker_count)
      { // increase worker count
        while(m_worker_count < m_target_worker_count)
        {
          try
          {
            worker_thread<pool_type>::create_and_attach(lockedThis->shared_from_this());
            m_worker_count++;
            m_active_worker_count++;	
          }
          catch(thread_resource_error)
          {
            return false;
          }
        }
      }
      else
      { // decrease worker count
        lockedThis->m_task_or_terminate_workers_event.notify_all();   // TODO: Optimize number of notified workers
      }

      return true;
    }


    // worker died with unhandled exception
    void worker_died_unexpectedly(shared_ptr<worker_type> worker) volatile
    {
      locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);

      m_worker_count--;
      m_active_worker_count--;
      lockedThis->m_worker_idle_or_terminated_event.notify_all();	

      if(m_terminate_all_workers)
      {
        lockedThis->m_terminated_workers.push_back(worker);
      }
      else
      {
        lockedThis->m_size_policy->worker_died_unexpectedly(m_worker_count);
      }
    }

    void worker_destructed(shared_ptr<worker_type> worker) volatile
    {
      locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
      m_worker_count--;
      m_active_worker_count--;
      lockedThis->m_worker_idle_or_terminated_event.notify_all();	

      if(m_terminate_all_workers)
      {
        lockedThis->m_terminated_workers.push_back(worker);
      }
    }


    bool execute_task() volatile
    {
      function0<void> task;

      { // fetch task
        pool_type* lockedThis = const_cast<pool_type*>(this);
        recursive_mutex::scoped_lock lock(lockedThis->m_monitor);

        // decrease number of threads if necessary
        if(m_worker_count > m_target_worker_count)
        {	
          return false;	// terminate worker
        }


        // wait for tasks
        while(lockedThis->m_scheduler.empty())
        {	
          // decrease number of workers if necessary
          if(m_worker_count > m_target_worker_count)
          {	
            return false;	// terminate worker
          }
          else
          {
            m_active_worker_count--;
            lockedThis->m_worker_idle_or_terminated_event.notify_all();	
            lockedThis->m_task_or_terminate_workers_event.wait(lock);
            m_active_worker_count++;
          }
        }

        task = lockedThis->m_scheduler.top();
        lockedThis->m_scheduler.pop();
      }

      // call task function
      if(task)
      {
        task();
      }
 
      //guard->disable();
      return true;
    }
  };




} } } // namespace boost::threadpool::detail

#endif // THREADPOOL_POOL_CORE_HPP_INCLUDED