Nektar++
Public Member Functions | Static Public Member Functions | Private Member Functions | Private Attributes | Static Private Attributes | Friends | List of all members
Nektar::Thread::ThreadManagerBoost Class Reference

Implementation of ThreadManager using Boost threads. More...

#include <ThreadBoost.h>

Inheritance diagram for Nektar::Thread::ThreadManagerBoost:
[legend]

Public Member Functions

 ThreadManagerBoost (unsigned int numWorkers)
 Constructs a ThreadManagerBoost. More...
 
virtual ~ThreadManagerBoost ()
 Shuts down threading. More...
 
virtual void QueueJobs (std::vector< ThreadJob * > &joblist)
 Pass a list of tasklets to the master queue. More...
 
virtual void QueueJob (ThreadJob *job)
 Pass a single job to the master queue. More...
 
virtual unsigned int GetNumWorkers ()
 Return the number of active workers. More...
 
virtual unsigned int GetWorkerNum ()
 Returns the worker number of the executing thread. More...
 
virtual void SetNumWorkers (const unsigned int num)
 Sets the number of active workers. More...
 
virtual void SetNumWorkers ()
 Sets the number of active workers to the maximum. More...
 
virtual unsigned int GetMaxNumWorkers ()
 Gets the maximum available number of threads. More...
 
virtual void Wait ()
 Waits until all queued jobs are finished. More...
 
virtual void SetChunkSize (unsigned int chnk)
 Controls how many jobs are sent to each worker at a time. More...
 
virtual void SetSchedType (SchedType s)
 Sets the current scheduling algorithm. More...
 
virtual bool InThread ()
 Indicates whether the code is in a worker thread or not. More...
 
virtual void Hold ()
 A calling threads holds until all active threads call this method. More...
 
virtual const std::string & GetType () const
 Returns a description of the type of threading. More...
 
- Public Member Functions inherited from Nektar::Thread::ThreadManager
virtual ~ThreadManager ()
 Destructor. More...
 
virtual bool IsInitialised ()
 ThreadManager implementation. More...
 
int GetThrFromPartition (int pPartition)
 
int GetRankFromPartition (int pPartition)
 
int GetPartitionFromRankThr (int pRank, unsigned int pThr)
 

Static Public Member Functions

static ThreadManagerSharedPtr Create (unsigned int numT)
 Called by the factory method. More...
 

Private Member Functions

 ThreadManagerBoost ()
 
 ThreadManagerBoost (const ThreadManagerBoost &)
 
bool IsWorking ()
 
void SetNumWorkersImpl (const unsigned int num)
 

Private Attributes

const unsigned int m_numThreads
 
unsigned int m_numWorkers
 
std::queue< ThreadJob * > m_masterQueue
 
boost::mutex m_masterQueueMutex
 
boost::mutex m_masterActiveMutex
 
boost::condition_variable m_masterQueueCondVar
 
boost::condition_variable m_masterActiveCondVar
 
ThreadWorkerBoost ** m_threadList
 
boost::thread ** m_threadThreadList
 
boost::thread::id m_masterThreadId
 
bool * m_threadBusyList
 
bool * m_threadActiveList
 
unsigned int m_chunkSize
 
SchedType m_schedType
 
boost::barrier * m_barrier
 
std::map< boost::thread::id, unsigned int > m_threadMap
 
std::string m_type
 

Static Private Attributes

static std::string className
 

Friends

class ThreadWorkerBoost
 

Detailed Description

Implementation of ThreadManager using Boost threads.

Definition at line 58 of file ThreadBoost.h.

Constructor & Destructor Documentation

◆ ThreadManagerBoost() [1/3]

Nektar::Thread::ThreadManagerBoost::ThreadManagerBoost ( unsigned int  numT)

Constructs a ThreadManagerBoost.

Parameters
numWorkersThe number of threads to start (including master thread).
Note
Do not use, use factory instead.

Definition at line 55 of file ThreadBoost.cpp.

55  :
56  m_numThreads(numT), m_numWorkers(numT), m_masterQueue(),
59  m_threadMap()
60 {
61  using namespace std;
62  try
63  {
65  m_threadThreadList = new boost::thread*[m_numThreads];
66  m_threadBusyList = new bool[m_numThreads];
67  m_threadActiveList = new bool[m_numThreads];
68  }
69  catch (exception &e)
70  {
71  cerr << "Exception while allocating thread storage: "
72  << e.what() << endl;
73  abort();
74  }
75  unsigned int i = 0;
76  while (i < m_numThreads)
77  {
79  try
80  {
81  tw = new ThreadWorkerBoost(this, i);
82  }
83  catch (exception &e)
84  {
85  cerr << "Exception while allocating worker threads: "
86  << e.what() << endl;
87  abort();
88  }
89 
90  m_threadList[i] = tw;
91  m_threadBusyList[i] = false;
92  m_threadActiveList[i] = true;
93 
94  try
95  {
96  m_threadThreadList[i] = new boost::thread(boost::ref(*tw));
97  boost::thread::id id = m_threadThreadList[i]->get_id();
98  m_threadMap[id] = i;
99  }
100  catch (...)
101  {
102  std::cerr << "Exception while creating worker threads" << std::endl;
103  abort();
104  }
105  i++;
106  }
107  m_masterThreadId = boost::this_thread::get_id();
108  m_barrier = new boost::barrier(m_numWorkers > 0 ? m_numWorkers : 1);
109  m_type = "Threading with Boost";
110 }
ThreadWorkerBoost ** m_threadList
Definition: ThreadBoost.h:106
std::queue< ThreadJob * > m_masterQueue
Definition: ThreadBoost.h:101
boost::condition_variable m_masterQueueCondVar
Definition: ThreadBoost.h:104
boost::thread::id m_masterThreadId
Definition: ThreadBoost.h:108
const unsigned int m_numThreads
Definition: ThreadBoost.h:99
std::map< boost::thread::id, unsigned int > m_threadMap
Definition: ThreadBoost.h:114
boost::condition_variable m_masterActiveCondVar
Definition: ThreadBoost.h:105

References m_barrier, m_masterThreadId, m_numThreads, m_numWorkers, m_threadActiveList, m_threadBusyList, m_threadList, m_threadMap, m_threadThreadList, m_type, and ThreadWorkerBoost.

◆ ~ThreadManagerBoost()

Nektar::Thread::ThreadManagerBoost::~ThreadManagerBoost ( )
virtual

Shuts down threading.

Terminates all running threads (they will finish their current job), releases resources and destructs.

Definition at line 117 of file ThreadBoost.cpp.

118 {
119  // This is an immediate teardown. We attempt to kill everything.
120  // we daren't lock anything as we may cause a deadlock
121  for (unsigned int i=0; i<m_numThreads; i++)
122  {
123  m_threadList[i]->Stop();
124  }
125 
126  m_masterQueueCondVar.notify_all();
127  m_masterActiveCondVar.notify_all();
128  for (unsigned int i=0; i<m_numThreads; i++)
129  {
130  m_threadThreadList[i]->join();
131  delete m_threadThreadList[i];
132  delete m_threadList[i];
133  }
134 
135  delete []m_threadList;
136  delete []m_threadThreadList;
137  delete []m_threadActiveList;
138  delete []m_threadBusyList;
139  delete m_barrier;
140 }
void Stop()
A signal to shut down.
Definition: ThreadBoost.h:149

References m_barrier, m_masterActiveCondVar, m_masterQueueCondVar, m_numThreads, m_threadActiveList, m_threadBusyList, m_threadList, m_threadThreadList, and Nektar::Thread::ThreadWorkerBoost::Stop().

◆ ThreadManagerBoost() [2/3]

Nektar::Thread::ThreadManagerBoost::ThreadManagerBoost ( )
private

Referenced by Create().

◆ ThreadManagerBoost() [3/3]

Nektar::Thread::ThreadManagerBoost::ThreadManagerBoost ( const ThreadManagerBoost )
private

Member Function Documentation

◆ Create()

static ThreadManagerSharedPtr Nektar::Thread::ThreadManagerBoost::Create ( unsigned int  numT)
inlinestatic

Called by the factory method.

Definition at line 86 of file ThreadBoost.h.

87  {
88  return std::shared_ptr<ThreadManager>(
89  new ThreadManagerBoost(numT));
90  }

References ThreadManagerBoost().

◆ GetMaxNumWorkers()

unsigned int Nektar::Thread::ThreadManagerBoost::GetMaxNumWorkers ( )
virtual

Gets the maximum available number of threads.

Returns
The maximum number of workers.

Implements Nektar::Thread::ThreadManager.

Definition at line 299 of file ThreadBoost.cpp.

300 {
301  return m_numThreads;
302 }

References m_numThreads.

◆ GetNumWorkers()

unsigned int Nektar::Thread::ThreadManagerBoost::GetNumWorkers ( )
virtual

Return the number of active workers.

Active workers are threads that are either running jobs or are waiting for jobs to be queued.

Implements Nektar::Thread::ThreadManager.

Definition at line 235 of file ThreadBoost.cpp.

236 {
237  return m_numWorkers;
238 }

References m_numWorkers.

◆ GetType()

const std::string & Nektar::Thread::ThreadManagerBoost::GetType ( ) const
virtual

Returns a description of the type of threading.

E.g. "Threading with Boost"

Implements Nektar::Thread::ThreadManager.

Definition at line 317 of file ThreadBoost.cpp.

318 {
319  return m_type;
320 }

References m_type.

◆ GetWorkerNum()

unsigned int Nektar::Thread::ThreadManagerBoost::GetWorkerNum ( )
virtual

Returns the worker number of the executing thread.

Returns an unsigned int between 0 and N-1 where N is the number of active worker threads. Repeated calls from within this thread will always return the same value and the value will be the same as returned from ThreadJob.GetWorkerNum(). The same thread will run a job until it finishes.

Although if there are active threads then thread 0 is always one of them, it is possible that thread 0 does not run for a given set of jobs. For example, if there are 4 active threads and 3 jobs are submitted with a e_static scheduling strategy and a chunksize of 1, then it is possible that threads 1,2, and 3 pick up the jobs and thread 0 remains idle.

Returns 0 if called by non-thread.

Implements Nektar::Thread::ThreadManager.

Definition at line 244 of file ThreadBoost.cpp.

245 {
246  boost::thread::id id = boost::this_thread::get_id();
247  return m_threadMap[id];
248 }

References m_threadMap.

◆ Hold()

void Nektar::Thread::ThreadManagerBoost::Hold ( )
virtual

A calling threads holds until all active threads call this method.

When called, the calling thread will sleep until all active workers have called this method. Once all have done so all threads awake and continue execution.

Note
Behaviour is likely undefined if the number of active workers is altered after a thread has called this method. It is only safe to call SetNumWorkers() when no threads are holding.

Implements Nektar::Thread::ThreadManager.

Definition at line 308 of file ThreadBoost.cpp.

309 {
310  m_barrier->wait();
311 }

References m_barrier.

◆ InThread()

bool Nektar::Thread::ThreadManagerBoost::InThread ( )
virtual

Indicates whether the code is in a worker thread or not.

Returns
True if the caller is in a worker thread.

Implements Nektar::Thread::ThreadManager.

Definition at line 205 of file ThreadBoost.cpp.

206 {
207  boost::thread::id id = boost::this_thread::get_id();
208  return (id != m_masterThreadId);
209 }

References m_masterThreadId.

◆ IsWorking()

bool Nektar::Thread::ThreadManagerBoost::IsWorking ( )
private

Definition at line 170 of file ThreadBoost.cpp.

171 {
172  bool working = false;
173  Lock masterActiveLock(m_masterActiveMutex);
174  for (unsigned int i = 0; i < m_numWorkers; i++)
175  {
176  working = working || m_threadBusyList[i];
177  }
178  return working;
179 }
boost::unique_lock< boost::mutex > Lock
Definition: ThreadBoost.h:51

References m_masterActiveMutex, m_numWorkers, and m_threadBusyList.

Referenced by Wait().

◆ QueueJob()

void Nektar::Thread::ThreadManagerBoost::QueueJob ( ThreadJob job)
virtual

Pass a single job to the master queue.

Parameters
jobA pointer to a ThreadJob subclass.

The job may become available for running immediately. If this is an issue then suspend the workers with SetNumWorkers(0) until the jobs are queued.

Implements Nektar::Thread::ThreadManager.

Definition at line 159 of file ThreadBoost.cpp.

160 {
161  Lock masterQueueLock(m_masterQueueMutex); // locks the queue
162  m_masterQueue.push(job);
163  m_masterQueueCondVar.notify_all(); // alert a waiting thread.
164 } // queue unlocked

References m_masterQueue, m_masterQueueCondVar, and m_masterQueueMutex.

Referenced by QueueJobs().

◆ QueueJobs()

void Nektar::Thread::ThreadManagerBoost::QueueJobs ( std::vector< ThreadJob * > &  joblist)
virtual

Pass a list of tasklets to the master queue.

Parameters
joblistVector of ThreadJob pointers.

The list of jobs is copied into the master queue. Jobs may be available for running immediately, even before the list has been fully copied. This can have consequences for the scheduling. If this is an issue then suspend the workers with SetNumWorkers(0) until the jobs are queued.

See also
SchedType

Implements Nektar::Thread::ThreadManager.

Definition at line 146 of file ThreadBoost.cpp.

147 {
148  std::vector<ThreadJob *>::iterator it;
149  for (it=joblist.begin(); it<joblist.end(); ++it)
150  {
151  QueueJob(*it);
152  }
153 }
virtual void QueueJob(ThreadJob *job)
Pass a single job to the master queue.

References QueueJob().

◆ SetChunkSize()

void Nektar::Thread::ThreadManagerBoost::SetChunkSize ( unsigned int  chnk)
virtual

Controls how many jobs are sent to each worker at a time.

The exact meaning of this parameter depends on the current scheduling algorithm.

See also
SchedType
SetSchedType()

Implements Nektar::Thread::ThreadManager.

Definition at line 185 of file ThreadBoost.cpp.

186 {
187  Lock masterQueueLock(m_masterQueueMutex); // locks the queue
188  m_chunkSize = std::max(chnk, 1U);
189 }

References m_chunkSize, and m_masterQueueMutex.

◆ SetNumWorkers() [1/2]

void Nektar::Thread::ThreadManagerBoost::SetNumWorkers ( )
virtual

Sets the number of active workers to the maximum.

Sets the number of active workers to the maximum available.

Implements Nektar::Thread::ThreadManager.

Definition at line 290 of file ThreadBoost.cpp.

291 {
293 }
void SetNumWorkersImpl(const unsigned int num)

References m_numThreads, and SetNumWorkersImpl().

◆ SetNumWorkers() [2/2]

void Nektar::Thread::ThreadManagerBoost::SetNumWorkers ( const unsigned int  num)
virtual

Sets the number of active workers.

Parameters
numThe number of active workers.

Active workers are threads that are either running jobs or are waiting for jobs to be queued.

If num is greater than the maximum allowed number of active workers, then the maximum value will be used instead.

Implements Nektar::Thread::ThreadManager.

Definition at line 278 of file ThreadBoost.cpp.

279 {
280  unsigned int n;
281  n = std::min(num, m_numThreads);
282  n = std::max(n, static_cast<unsigned int>(0));
284 }

References m_numThreads, and SetNumWorkersImpl().

◆ SetNumWorkersImpl()

void Nektar::Thread::ThreadManagerBoost::SetNumWorkersImpl ( const unsigned int  num)
private

Definition at line 254 of file ThreadBoost.cpp.

255 {
256  Lock masterActiveLock(m_masterActiveMutex); // locks the active
257 
258  if (m_numWorkers == num)
259  {
260  return;
261  }
262 
263  delete m_barrier;
264  m_barrier = new boost::barrier(num > 0 ? num : 1);
265 
266  m_numWorkers = num;
267  for (unsigned int i = 0; i < m_numThreads; i++)
268  {
269  m_threadActiveList[i] = i < m_numWorkers ? true : false;
270  }
271  m_masterActiveCondVar.notify_all();
272 } // Lock on active released here

References m_barrier, m_masterActiveCondVar, m_masterActiveMutex, m_numThreads, m_numWorkers, and m_threadActiveList.

Referenced by SetNumWorkers().

◆ SetSchedType()

void Nektar::Thread::ThreadManagerBoost::SetSchedType ( SchedType  s)
virtual

Sets the current scheduling algorithm.

See also
SetChunkSize()

Implements Nektar::Thread::ThreadManager.

Definition at line 195 of file ThreadBoost.cpp.

196 {
197  Lock masterQueueLock(m_masterQueueMutex); // locks the queue
198  m_schedType = s;
199 }

References m_masterQueueMutex, and m_schedType.

◆ Wait()

void Nektar::Thread::ThreadManagerBoost::Wait ( )
virtual

Waits until all queued jobs are finished.

If there are no jobs running or queued this method returns immediately. Otherwise it blocks until the queue is empty and the worker threads are idle.

Implementations must ensure that trivial deadlocks are not possible from this method, that is, that this code:

// assume ThreadManager* tm
// assume SomeJob is subclass of ThreadJob
// assume SomeJob job
tm->SetNumWorkers(0);
tm->QueueJob(job);
tm->Wait();

does not wait forever. Since the master thread is counted in the number of worker threads, implementations should increase the number of active workers by 1 on entering Wait().

Implements Nektar::Thread::ThreadManager.

Definition at line 215 of file ThreadBoost.cpp.

216 {
217  bool working;
218  Lock masterQueueLock(m_masterQueueMutex); // locks the queue
219  working = IsWorking();
220  m_masterActiveCondVar.notify_all();
221  m_masterQueueCondVar.notify_all();
222  while (!m_masterQueue.empty() || working)
223  {
224  // while waiting, master queue is unlocked
225  m_masterQueueCondVar.wait(masterQueueLock);
226  // on exiting wait master queue is locked again
227  working = IsWorking();
228  }
229 }

References IsWorking(), m_masterActiveCondVar, m_masterQueue, m_masterQueueCondVar, and m_masterQueueMutex.

Friends And Related Function Documentation

◆ ThreadWorkerBoost

friend class ThreadWorkerBoost
friend

So the workers can access the master queue and locks.

Definition at line 63 of file ThreadBoost.h.

Referenced by ThreadManagerBoost().

Member Data Documentation

◆ className

std::string Nektar::Thread::ThreadManagerBoost::className
staticprivate
Initial value:
=
ThreadManagerBoost::Create, "Threading using Boost.")
tKey RegisterCreatorFunction(tKey idKey, CreatorFunction classCreator, std::string pDesc="")
Register a class with the factory.
Definition: NekFactory.hpp:200
static ThreadManagerSharedPtr Create(unsigned int numT)
Called by the factory method.
Definition: ThreadBoost.h:86
ThreadManagerFactory & GetThreadManagerFactory()
Definition: Thread.cpp:50

Definition at line 115 of file ThreadBoost.h.

◆ m_barrier

boost::barrier* Nektar::Thread::ThreadManagerBoost::m_barrier
private

Definition at line 113 of file ThreadBoost.h.

Referenced by Hold(), SetNumWorkersImpl(), ThreadManagerBoost(), and ~ThreadManagerBoost().

◆ m_chunkSize

unsigned int Nektar::Thread::ThreadManagerBoost::m_chunkSize
private

Definition at line 111 of file ThreadBoost.h.

Referenced by Nektar::Thread::ThreadWorkerBoost::GetNumToLoad(), and SetChunkSize().

◆ m_masterActiveCondVar

boost::condition_variable Nektar::Thread::ThreadManagerBoost::m_masterActiveCondVar
private

◆ m_masterActiveMutex

boost::mutex Nektar::Thread::ThreadManagerBoost::m_masterActiveMutex
private

◆ m_masterQueue

std::queue<ThreadJob*> Nektar::Thread::ThreadManagerBoost::m_masterQueue
private

◆ m_masterQueueCondVar

boost::condition_variable Nektar::Thread::ThreadManagerBoost::m_masterQueueCondVar
private

◆ m_masterQueueMutex

boost::mutex Nektar::Thread::ThreadManagerBoost::m_masterQueueMutex
private

◆ m_masterThreadId

boost::thread::id Nektar::Thread::ThreadManagerBoost::m_masterThreadId
private

Definition at line 108 of file ThreadBoost.h.

Referenced by InThread(), and ThreadManagerBoost().

◆ m_numThreads

const unsigned int Nektar::Thread::ThreadManagerBoost::m_numThreads
private

◆ m_numWorkers

unsigned int Nektar::Thread::ThreadManagerBoost::m_numWorkers
private

◆ m_schedType

SchedType Nektar::Thread::ThreadManagerBoost::m_schedType
private

Definition at line 112 of file ThreadBoost.h.

Referenced by Nektar::Thread::ThreadWorkerBoost::GetNumToLoad(), and SetSchedType().

◆ m_threadActiveList

bool* Nektar::Thread::ThreadManagerBoost::m_threadActiveList
private

◆ m_threadBusyList

bool* Nektar::Thread::ThreadManagerBoost::m_threadBusyList
private

◆ m_threadList

ThreadWorkerBoost** Nektar::Thread::ThreadManagerBoost::m_threadList
private

Definition at line 106 of file ThreadBoost.h.

Referenced by ThreadManagerBoost(), and ~ThreadManagerBoost().

◆ m_threadMap

std::map<boost::thread::id, unsigned int> Nektar::Thread::ThreadManagerBoost::m_threadMap
private

Definition at line 114 of file ThreadBoost.h.

Referenced by GetWorkerNum(), and ThreadManagerBoost().

◆ m_threadThreadList

boost::thread** Nektar::Thread::ThreadManagerBoost::m_threadThreadList
private

Definition at line 107 of file ThreadBoost.h.

Referenced by ThreadManagerBoost(), and ~ThreadManagerBoost().

◆ m_type

std::string Nektar::Thread::ThreadManagerBoost::m_type
private

Definition at line 116 of file ThreadBoost.h.

Referenced by GetType(), and ThreadManagerBoost().