/*! \file
* \brief Thread pool core.
*
* This file contains the threadpool's core class: pool<Task, SchedulingPolicy>.
*
* Thread pools are a mechanism for asynchronous and parallel processing
* within the same process. The pool class provides a convenient way
* for dispatching asynchronous tasks as functions objects. The scheduling
* of these tasks can be easily controlled by using customized schedulers.
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are subject to the
* Boost Software License, Version 1.0. (See accompanying file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/
#ifndef THREADPOOL_POOL_CORE_HPP_INCLUDED
#define THREADPOOL_POOL_CORE_HPP_INCLUDED
#include "locking_ptr.hpp"
#include "worker_thread.hpp"
#include "../task_adaptors.hpp"
#include <boost/thread.hpp>
#include <boost/thread/exceptions.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <boost/smart_ptr.hpp>
#include <boost/bind.hpp>
#include <boost/static_assert.hpp>
#include <boost/type_traits.hpp>
#include <vector>
/// The namespace threadpool contains a thread pool and related utility classes.
namespace boost { namespace threadpool { namespace detail
{
/*! \brief Thread pool.
*
* Thread pools are a mechanism for asynchronous and parallel processing
* within the same process. The pool class provides a convenient way
* for dispatching asynchronous tasks as functions objects. The scheduling
* of these tasks can be easily controlled by using customized schedulers.
* A task must not throw an exception.
*
* A pool_impl is DefaultConstructible and NonCopyable.
*
* \param Task A function object which implements the operator 'void operator() (void) const'. The operator () is called by the pool to execute the task. Exceptions are ignored.
* \param Scheduler A task container which determines how tasks are scheduled. It is guaranteed that this container is accessed only by one thread at a time. The scheduler shall not throw exceptions.
*
* \remarks The pool class is thread-safe.
*
* \see Tasks: task_func, prio_task_func
* \see Scheduling policies: fifo_scheduler, lifo_scheduler, prio_scheduler
*/
template <
typename Task,
template <typename> class SchedulingPolicy,
template <typename> class SizePolicy,
template <typename> class SizePolicyController,
template <typename> class ShutdownPolicy
>
class pool_core
: public enable_shared_from_this< pool_core<Task, SchedulingPolicy, SizePolicy, SizePolicyController, ShutdownPolicy > >
, private noncopyable
{
public: // Type definitions
typedef Task task_type; //!< Indicates the task's type.
typedef SchedulingPolicy<task_type> scheduler_type; //!< Indicates the scheduler's type.
typedef pool_core<Task,
SchedulingPolicy,
SizePolicy,
SizePolicyController,
ShutdownPolicy > pool_type; //!< Indicates the thread pool's type.
typedef SizePolicy<pool_type> size_policy_type; //!< Indicates the sizer's type.
//typedef typename size_policy_type::size_controller size_controller_type;
typedef SizePolicyController<pool_type> size_controller_type;
// typedef SizePolicy<pool_type>::size_controller size_controller_type;
typedef ShutdownPolicy<pool_type> shutdown_policy_type;//!< Indicates the shutdown policy's type.
typedef worker_thread<pool_type> worker_type;
// The task is required to be a nullary function.
BOOST_STATIC_ASSERT(function_traits<task_type()>::arity == 0);
// The task function's result type is required to be void.
BOOST_STATIC_ASSERT(is_void<typename result_of<task_type()>::type >::value);
private: // Friends
friend class worker_thread<pool_type>;
#if defined(__SUNPRO_CC) && (__SUNPRO_CC <= 0x580) // Tested with CC: Sun C++ 5.8 Patch 121018-08 2006/12/06
friend class SizePolicy;
friend class ShutdownPolicy;
#else
friend class SizePolicy<pool_type>;
friend class ShutdownPolicy<pool_type>;
#endif
private: // The following members may be accessed by _multiple_ threads at the same time:
volatile size_t m_worker_count;
volatile size_t m_target_worker_count;
volatile size_t m_active_worker_count;
private: // The following members are accessed only by _one_ thread at the same time:
scheduler_type m_scheduler;
scoped_ptr<size_policy_type> m_size_policy; // is never null
bool m_terminate_all_workers; // Indicates if termination of all workers was triggered.
std::vector<shared_ptr<worker_type> > m_terminated_workers; // List of workers which are terminated but not fully destructed.
private: // The following members are implemented thread-safe:
mutable recursive_mutex m_monitor;
mutable condition m_worker_idle_or_terminated_event; // A worker is idle or was terminated.
mutable condition m_task_or_terminate_workers_event; // Task is available OR total worker count should be reduced.
public:
/// Constructor.
pool_core()
: m_worker_count(0)
, m_target_worker_count(0)
, m_active_worker_count(0)
, m_terminate_all_workers(false)
{
pool_type volatile & self_ref = *this;
m_size_policy.reset(new size_policy_type(self_ref));
m_scheduler.clear();
}
/// Destructor.
~pool_core()
{
}
/*! Gets the size controller which manages the number of threads in the pool.
* \return The size controller.
* \see SizePolicy
*/
size_controller_type size_controller()
{
return size_controller_type(*m_size_policy, this->shared_from_this());
}
/*! Gets the number of threads in the pool.
* \return The number of threads.
*/
size_t size() const volatile
{
return m_worker_count;
}
// TODO is only called once
void shutdown()
{
ShutdownPolicy<pool_type>::shutdown(*this);
}
/*! Schedules a task for asynchronous execution. The task will be executed once only.
* \param task The task function object. It should not throw execeptions.
* \return true, if the task could be scheduled and false otherwise.
*/
bool schedule(task_type const & task) volatile
{
locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
if(lockedThis->m_scheduler.push(task))
{
lockedThis->m_task_or_terminate_workers_event.notify_one();
return true;
}
else
{
return false;
}
}
/*! Returns the number of tasks which are currently executed.
* \return The number of active tasks.
*/
size_t active() const volatile
{
return m_active_worker_count;
}
/*! Returns the number of tasks which are ready for execution.
* \return The number of pending tasks.
*/
size_t pending() const volatile
{
locking_ptr<const pool_type, recursive_mutex> lockedThis(*this, m_monitor);
return lockedThis->m_scheduler.size();
}
/*! Removes all pending tasks from the pool's scheduler.
*/
void clear() volatile
{
locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
lockedThis->m_scheduler.clear();
}
/*! Indicates that there are no tasks pending.
* \return true if there are no tasks ready for execution.
* \remarks This function is more efficient that the check 'pending() == 0'.
*/
bool empty() const volatile
{
locking_ptr<const pool_type, recursive_mutex> lockedThis(*this, m_monitor);
return lockedThis->m_scheduler.empty();
}
/*! The current thread of execution is blocked until the sum of all active
* and pending tasks is equal or less than a given threshold.
* \param task_threshold The maximum number of tasks in pool and scheduler.
*/
void wait(size_t const task_threshold = 0) const volatile
{
const pool_type* self = const_cast<const pool_type*>(this);
recursive_mutex::scoped_lock lock(self->m_monitor);
if(0 == task_threshold)
{
while(0 != self->m_active_worker_count || !self->m_scheduler.empty())
{
self->m_worker_idle_or_terminated_event.wait(lock);
}
}
else
{
while(task_threshold < self->m_active_worker_count + self->m_scheduler.size())
{
self->m_worker_idle_or_terminated_event.wait(lock);
}
}
}
/*! The current thread of execution is blocked until the timestamp is met
* or the sum of all active and pending tasks is equal or less
* than a given threshold.
* \param timestamp The time when function returns at the latest.
* \param task_threshold The maximum number of tasks in pool and scheduler.
* \return true if the task sum is equal or less than the threshold, false otherwise.
*/
bool wait(xtime const & timestamp, size_t const task_threshold = 0) const volatile
{
const pool_type* self = const_cast<const pool_type*>(this);
recursive_mutex::scoped_lock lock(self->m_monitor);
if(0 == task_threshold)
{
while(0 != self->m_active_worker_count || !self->m_scheduler.empty())
{
if(!self->m_worker_idle_or_terminated_event.timed_wait(lock, timestamp)) return false;
}
}
else
{
while(task_threshold < self->m_active_worker_count + self->m_scheduler.size())
{
if(!self->m_worker_idle_or_terminated_event.timed_wait(lock, timestamp)) return false;
}
}
return true;
}
private:
void terminate_all_workers(bool const wait) volatile
{
pool_type* self = const_cast<pool_type*>(this);
recursive_mutex::scoped_lock lock(self->m_monitor);
self->m_terminate_all_workers = true;
m_target_worker_count = 0;
self->m_task_or_terminate_workers_event.notify_all();
if(wait)
{
while(m_active_worker_count > 0)
{
self->m_worker_idle_or_terminated_event.wait(lock);
}
for(typename std::vector<shared_ptr<worker_type> >::iterator it = self->m_terminated_workers.begin();
it != self->m_terminated_workers.end();
++it)
{
(*it)->join();
}
self->m_terminated_workers.clear();
}
}
/*! Changes the number of worker threads in the pool. The resizing
* is handled by the SizePolicy.
* \param threads The new number of worker threads.
* \return true, if pool will be resized and false if not.
*/
bool resize(size_t const worker_count) volatile
{
locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
if(!m_terminate_all_workers)
{
m_target_worker_count = worker_count;
}
else
{
return false;
}
if(m_worker_count <= m_target_worker_count)
{ // increase worker count
while(m_worker_count < m_target_worker_count)
{
try
{
worker_thread<pool_type>::create_and_attach(lockedThis->shared_from_this());
m_worker_count++;
m_active_worker_count++;
}
catch(thread_resource_error)
{
return false;
}
}
}
else
{ // decrease worker count
lockedThis->m_task_or_terminate_workers_event.notify_all(); // TODO: Optimize number of notified workers
}
return true;
}
// worker died with unhandled exception
void worker_died_unexpectedly(shared_ptr<worker_type> worker) volatile
{
locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
m_worker_count--;
m_active_worker_count--;
lockedThis->m_worker_idle_or_terminated_event.notify_all();
if(m_terminate_all_workers)
{
lockedThis->m_terminated_workers.push_back(worker);
}
else
{
lockedThis->m_size_policy->worker_died_unexpectedly(m_worker_count);
}
}
void worker_destructed(shared_ptr<worker_type> worker) volatile
{
locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
m_worker_count--;
m_active_worker_count--;
lockedThis->m_worker_idle_or_terminated_event.notify_all();
if(m_terminate_all_workers)
{
lockedThis->m_terminated_workers.push_back(worker);
}
}
bool execute_task() volatile
{
function0<void> task;
{ // fetch task
pool_type* lockedThis = const_cast<pool_type*>(this);
recursive_mutex::scoped_lock lock(lockedThis->m_monitor);
// decrease number of threads if necessary
if(m_worker_count > m_target_worker_count)
{
return false; // terminate worker
}
// wait for tasks
while(lockedThis->m_scheduler.empty())
{
// decrease number of workers if necessary
if(m_worker_count > m_target_worker_count)
{
return false; // terminate worker
}
else
{
m_active_worker_count--;
lockedThis->m_worker_idle_or_terminated_event.notify_all();
lockedThis->m_task_or_terminate_workers_event.wait(lock);
m_active_worker_count++;
}
}
task = lockedThis->m_scheduler.top();
lockedThis->m_scheduler.pop();
}
// call task function
if(task)
{
task();
}
//guard->disable();
return true;
}
};
} } } // namespace boost::threadpool::detail
#endif // THREADPOOL_POOL_CORE_HPP_INCLUDED