Nektar++
Public Member Functions | Static Public Member Functions | Private Member Functions | Private Attributes | Static Private Attributes | Friends | List of all members
Nektar::Thread::ThreadManagerBoost Class Reference

Implementation of ThreadManager using Boost threads. More...

#include <ThreadBoost.h>

Inheritance diagram for Nektar::Thread::ThreadManagerBoost:
[legend]

Public Member Functions

 ThreadManagerBoost (unsigned int numWorkers)
 Constructs a ThreadManagerBoost. More...
 
virtual ~ThreadManagerBoost ()
 Shuts down threading. More...
 
virtual void QueueJobs (std::vector< ThreadJob * > &joblist)
 Pass a list of tasklets to the master queue. More...
 
virtual void QueueJob (ThreadJob *job)
 Pass a single job to the master queue. More...
 
virtual unsigned int GetNumWorkers ()
 Return the number of active workers. More...
 
virtual unsigned int GetWorkerNum ()
 Returns the worker number of the executing thread. More...
 
virtual void SetNumWorkers (const unsigned int num)
 Sets the number of active workers. More...
 
virtual void SetNumWorkers ()
 Sets the number of active workers to the maximum. More...
 
virtual unsigned int GetMaxNumWorkers ()
 Gets the maximum available number of threads. More...
 
virtual void Wait ()
 Waits until all queued jobs are finished. More...
 
virtual void SetChunkSize (unsigned int chnk)
 Controls how many jobs are sent to each worker at a time. More...
 
virtual void SetSchedType (SchedType s)
 Sets the current scheduling algorithm. More...
 
virtual bool InThread ()
 Indicates whether the code is in a worker thread or not. More...
 
virtual void Hold ()
 A calling threads holds until all active threads call this method. More...
 
virtual const std::string & GetType () const
 Returns a description of the type of threading. More...
 
- Public Member Functions inherited from Nektar::Thread::ThreadManager
virtual ~ThreadManager ()
 Destructor. More...
 
virtual bool IsInitialised ()
 ThreadManager implementation. More...
 
int GetThrFromPartition (int pPartition)
 
int GetRankFromPartition (int pPartition)
 
int GetPartitionFromRankThr (int pRank, unsigned int pThr)
 

Static Public Member Functions

static ThreadManagerSharedPtr Create (unsigned int numT)
 Called by the factory method. More...
 

Private Member Functions

 ThreadManagerBoost ()
 
 ThreadManagerBoost (const ThreadManagerBoost &)
 
bool IsWorking ()
 
void SetNumWorkersImpl (const unsigned int num)
 

Private Attributes

const unsigned int m_numThreads
 
unsigned int m_numWorkers
 
std::queue< ThreadJob * > m_masterQueue
 
boost::mutex m_masterQueueMutex
 
boost::mutex m_masterActiveMutex
 
boost::condition_variable m_masterQueueCondVar
 
boost::condition_variable m_masterActiveCondVar
 
ThreadWorkerBoost ** m_threadList
 
boost::thread ** m_threadThreadList
 
boost::thread::id m_masterThreadId
 
bool * m_threadBusyList
 
bool * m_threadActiveList
 
unsigned int m_chunkSize
 
SchedType m_schedType
 
boost::barrier * m_barrier
 
std::map< boost::thread::id, unsigned int > m_threadMap
 
std::string m_type
 

Static Private Attributes

static std::string className
 

Friends

class ThreadWorkerBoost
 

Detailed Description

Implementation of ThreadManager using Boost threads.

Definition at line 58 of file ThreadBoost.h.

Constructor & Destructor Documentation

◆ ThreadManagerBoost() [1/3]

Nektar::Thread::ThreadManagerBoost::ThreadManagerBoost ( unsigned int  numT)

Constructs a ThreadManagerBoost.

Parameters
numWorkersThe number of threads to start (including master thread).
Note
Do not use, use factory instead.

Definition at line 55 of file ThreadBoost.cpp.

56  : m_numThreads(numT), m_numWorkers(numT), m_masterQueue(),
59  m_threadMap()
60 {
61  using namespace std;
62  try
63  {
65  m_threadThreadList = new boost::thread *[m_numThreads];
66  m_threadBusyList = new bool[m_numThreads];
67  m_threadActiveList = new bool[m_numThreads];
68  }
69  catch (exception &e)
70  {
71  cerr << "Exception while allocating thread storage: " << e.what()
72  << endl;
73  abort();
74  }
75  unsigned int i = 0;
76  while (i < m_numThreads)
77  {
79  try
80  {
81  tw = new ThreadWorkerBoost(this, i);
82  }
83  catch (exception &e)
84  {
85  cerr << "Exception while allocating worker threads: " << e.what()
86  << endl;
87  abort();
88  }
89 
90  m_threadList[i] = tw;
91  m_threadBusyList[i] = false;
92  m_threadActiveList[i] = true;
93 
94  try
95  {
96  m_threadThreadList[i] = new boost::thread(boost::ref(*tw));
97  boost::thread::id id = m_threadThreadList[i]->get_id();
98  m_threadMap[id] = i;
99  }
100  catch (...)
101  {
102  std::cerr << "Exception while creating worker threads" << std::endl;
103  abort();
104  }
105  i++;
106  }
107  m_masterThreadId = boost::this_thread::get_id();
108  m_barrier = new boost::barrier(m_numWorkers > 0 ? m_numWorkers : 1);
109  m_type = "Threading with Boost";
110 }
ThreadWorkerBoost ** m_threadList
Definition: ThreadBoost.h:104
boost::condition_variable m_masterQueueCondVar
Definition: ThreadBoost.h:102
boost::thread::id m_masterThreadId
Definition: ThreadBoost.h:106
const unsigned int m_numThreads
Definition: ThreadBoost.h:97
std::queue< ThreadJob * > m_masterQueue
Definition: ThreadBoost.h:99
std::map< boost::thread::id, unsigned int > m_threadMap
Definition: ThreadBoost.h:112
boost::condition_variable m_masterActiveCondVar
Definition: ThreadBoost.h:103

References m_barrier, m_masterThreadId, m_numThreads, m_numWorkers, m_threadActiveList, m_threadBusyList, m_threadList, m_threadMap, m_threadThreadList, m_type, and ThreadWorkerBoost.

◆ ~ThreadManagerBoost()

Nektar::Thread::ThreadManagerBoost::~ThreadManagerBoost ( )
virtual

Shuts down threading.

Terminates all running threads (they will finish their current job), releases resources and destructs.

Definition at line 116 of file ThreadBoost.cpp.

117 {
118  // This is an immediate teardown. We attempt to kill everything.
119  // we daren't lock anything as we may cause a deadlock
120  for (unsigned int i = 0; i < m_numThreads; i++)
121  {
122  m_threadList[i]->Stop();
123  }
124 
125  m_masterQueueCondVar.notify_all();
126  m_masterActiveCondVar.notify_all();
127  for (unsigned int i = 0; i < m_numThreads; i++)
128  {
129  m_threadThreadList[i]->join();
130  delete m_threadThreadList[i];
131  delete m_threadList[i];
132  }
133 
134  delete[] m_threadList;
135  delete[] m_threadThreadList;
136  delete[] m_threadActiveList;
137  delete[] m_threadBusyList;
138  delete m_barrier;
139 }
void Stop()
A signal to shut down.
Definition: ThreadBoost.h:153

References m_barrier, m_masterActiveCondVar, m_masterQueueCondVar, m_numThreads, m_threadActiveList, m_threadBusyList, m_threadList, m_threadThreadList, and Nektar::Thread::ThreadWorkerBoost::Stop().

◆ ThreadManagerBoost() [2/3]

Nektar::Thread::ThreadManagerBoost::ThreadManagerBoost ( )
private

Referenced by Create().

◆ ThreadManagerBoost() [3/3]

Nektar::Thread::ThreadManagerBoost::ThreadManagerBoost ( const ThreadManagerBoost )
private

Member Function Documentation

◆ Create()

static ThreadManagerSharedPtr Nektar::Thread::ThreadManagerBoost::Create ( unsigned int  numT)
inlinestatic

Called by the factory method.

Definition at line 85 of file ThreadBoost.h.

86  {
87  return std::shared_ptr<ThreadManager>(new ThreadManagerBoost(numT));
88  }

References ThreadManagerBoost().

◆ GetMaxNumWorkers()

unsigned int Nektar::Thread::ThreadManagerBoost::GetMaxNumWorkers ( )
virtual

Gets the maximum available number of threads.

Returns
The maximum number of workers.

Implements Nektar::Thread::ThreadManager.

Definition at line 285 of file ThreadBoost.cpp.

286 {
287  return m_numThreads;
288 }

References m_numThreads.

◆ GetNumWorkers()

unsigned int Nektar::Thread::ThreadManagerBoost::GetNumWorkers ( )
virtual

Return the number of active workers.

Active workers are threads that are either running jobs or are waiting for jobs to be queued.

Implements Nektar::Thread::ThreadManager.

Definition at line 226 of file ThreadBoost.cpp.

227 {
228  return m_numWorkers;
229 }

References m_numWorkers.

◆ GetType()

const std::string & Nektar::Thread::ThreadManagerBoost::GetType ( ) const
virtual

Returns a description of the type of threading.

E.g. "Threading with Boost"

Implements Nektar::Thread::ThreadManager.

Definition at line 301 of file ThreadBoost.cpp.

302 {
303  return m_type;
304 }

References m_type.

◆ GetWorkerNum()

unsigned int Nektar::Thread::ThreadManagerBoost::GetWorkerNum ( )
virtual

Returns the worker number of the executing thread.

Returns an unsigned int between 0 and N-1 where N is the number of active worker threads. Repeated calls from within this thread will always return the same value and the value will be the same as returned from ThreadJob.GetWorkerNum(). The same thread will run a job until it finishes.

Although if there are active threads then thread 0 is always one of them, it is possible that thread 0 does not run for a given set of jobs. For example, if there are 4 active threads and 3 jobs are submitted with a e_static scheduling strategy and a chunksize of 1, then it is possible that threads 1,2, and 3 pick up the jobs and thread 0 remains idle.

Returns 0 if called by non-thread.

Implements Nektar::Thread::ThreadManager.

Definition at line 234 of file ThreadBoost.cpp.

235 {
236  boost::thread::id id = boost::this_thread::get_id();
237  return m_threadMap[id];
238 }

References m_threadMap.

◆ Hold()

void Nektar::Thread::ThreadManagerBoost::Hold ( )
virtual

A calling threads holds until all active threads call this method.

When called, the calling thread will sleep until all active workers have called this method. Once all have done so all threads awake and continue execution.

Note
Behaviour is likely undefined if the number of active workers is altered after a thread has called this method. It is only safe to call SetNumWorkers() when no threads are holding.

Implements Nektar::Thread::ThreadManager.

Definition at line 293 of file ThreadBoost.cpp.

294 {
295  m_barrier->wait();
296 }

References m_barrier.

◆ InThread()

bool Nektar::Thread::ThreadManagerBoost::InThread ( )
virtual

Indicates whether the code is in a worker thread or not.

Returns
True if the caller is in a worker thread.

Implements Nektar::Thread::ThreadManager.

Definition at line 198 of file ThreadBoost.cpp.

199 {
200  boost::thread::id id = boost::this_thread::get_id();
201  return (id != m_masterThreadId);
202 }

References m_masterThreadId.

◆ IsWorking()

bool Nektar::Thread::ThreadManagerBoost::IsWorking ( )
private

Definition at line 166 of file ThreadBoost.cpp.

167 {
168  bool working = false;
169  Lock masterActiveLock(m_masterActiveMutex);
170  for (unsigned int i = 0; i < m_numWorkers; i++)
171  {
172  working = working || m_threadBusyList[i];
173  }
174  return working;
175 }
boost::unique_lock< boost::mutex > Lock
Definition: ThreadBoost.h:51

References m_masterActiveMutex, m_numWorkers, and m_threadBusyList.

Referenced by Wait().

◆ QueueJob()

void Nektar::Thread::ThreadManagerBoost::QueueJob ( ThreadJob job)
virtual

Pass a single job to the master queue.

Parameters
jobA pointer to a ThreadJob subclass.

The job may become available for running immediately. If this is an issue then suspend the workers with SetNumWorkers(0) until the jobs are queued.

Implements Nektar::Thread::ThreadManager.

Definition at line 156 of file ThreadBoost.cpp.

157 {
158  Lock masterQueueLock(m_masterQueueMutex); // locks the queue
159  m_masterQueue.push(job);
160  m_masterQueueCondVar.notify_all(); // alert a waiting thread.
161 } // queue unlocked

References m_masterQueue, m_masterQueueCondVar, and m_masterQueueMutex.

Referenced by QueueJobs().

◆ QueueJobs()

void Nektar::Thread::ThreadManagerBoost::QueueJobs ( std::vector< ThreadJob * > &  joblist)
virtual

Pass a list of tasklets to the master queue.

Parameters
joblistVector of ThreadJob pointers.

The list of jobs is copied into the master queue. Jobs may be available for running immediately, even before the list has been fully copied. This can have consequences for the scheduling. If this is an issue then suspend the workers with SetNumWorkers(0) until the jobs are queued.

See also
SchedType

Implements Nektar::Thread::ThreadManager.

Definition at line 144 of file ThreadBoost.cpp.

145 {
146  std::vector<ThreadJob *>::iterator it;
147  for (it = joblist.begin(); it < joblist.end(); ++it)
148  {
149  QueueJob(*it);
150  }
151 }
virtual void QueueJob(ThreadJob *job)
Pass a single job to the master queue.

References QueueJob().

◆ SetChunkSize()

void Nektar::Thread::ThreadManagerBoost::SetChunkSize ( unsigned int  chnk)
virtual

Controls how many jobs are sent to each worker at a time.

The exact meaning of this parameter depends on the current scheduling algorithm.

See also
SchedType
SetSchedType()

Implements Nektar::Thread::ThreadManager.

Definition at line 180 of file ThreadBoost.cpp.

181 {
182  Lock masterQueueLock(m_masterQueueMutex); // locks the queue
183  m_chunkSize = std::max(chnk, 1U);
184 }

References m_chunkSize, and m_masterQueueMutex.

◆ SetNumWorkers() [1/2]

void Nektar::Thread::ThreadManagerBoost::SetNumWorkers ( )
virtual

Sets the number of active workers to the maximum.

Sets the number of active workers to the maximum available.

Implements Nektar::Thread::ThreadManager.

Definition at line 277 of file ThreadBoost.cpp.

278 {
280 }
void SetNumWorkersImpl(const unsigned int num)

References m_numThreads, and SetNumWorkersImpl().

◆ SetNumWorkers() [2/2]

void Nektar::Thread::ThreadManagerBoost::SetNumWorkers ( const unsigned int  num)
virtual

Sets the number of active workers.

Parameters
numThe number of active workers.

Active workers are threads that are either running jobs or are waiting for jobs to be queued.

If num is greater than the maximum allowed number of active workers, then the maximum value will be used instead.

Implements Nektar::Thread::ThreadManager.

Definition at line 266 of file ThreadBoost.cpp.

267 {
268  unsigned int n;
269  n = std::min(num, m_numThreads);
270  n = std::max(n, static_cast<unsigned int>(0));
272 }

References m_numThreads, and SetNumWorkersImpl().

◆ SetNumWorkersImpl()

void Nektar::Thread::ThreadManagerBoost::SetNumWorkersImpl ( const unsigned int  num)
private

Definition at line 243 of file ThreadBoost.cpp.

244 {
245  Lock masterActiveLock(m_masterActiveMutex); // locks the active
246 
247  if (m_numWorkers == num)
248  {
249  return;
250  }
251 
252  delete m_barrier;
253  m_barrier = new boost::barrier(num > 0 ? num : 1);
254 
255  m_numWorkers = num;
256  for (unsigned int i = 0; i < m_numThreads; i++)
257  {
258  m_threadActiveList[i] = i < m_numWorkers ? true : false;
259  }
260  m_masterActiveCondVar.notify_all();
261 } // Lock on active released here

References m_barrier, m_masterActiveCondVar, m_masterActiveMutex, m_numThreads, m_numWorkers, and m_threadActiveList.

Referenced by SetNumWorkers().

◆ SetSchedType()

void Nektar::Thread::ThreadManagerBoost::SetSchedType ( SchedType  s)
virtual

Sets the current scheduling algorithm.

See also
SetChunkSize()

Implements Nektar::Thread::ThreadManager.

Definition at line 189 of file ThreadBoost.cpp.

190 {
191  Lock masterQueueLock(m_masterQueueMutex); // locks the queue
192  m_schedType = s;
193 }

References m_masterQueueMutex, and m_schedType.

◆ Wait()

void Nektar::Thread::ThreadManagerBoost::Wait ( )
virtual

Waits until all queued jobs are finished.

If there are no jobs running or queued this method returns immediately. Otherwise it blocks until the queue is empty and the worker threads are idle.

Implementations must ensure that trivial deadlocks are not possible from this method, that is, that this code:

// assume ThreadManager* tm
// assume SomeJob is subclass of ThreadJob
// assume SomeJob job
tm->SetNumWorkers(0);
tm->QueueJob(job);
tm->Wait();

does not wait forever. Since the master thread is counted in the number of worker threads, implementations should increase the number of active workers by 1 on entering Wait().

Implements Nektar::Thread::ThreadManager.

Definition at line 207 of file ThreadBoost.cpp.

208 {
209  bool working;
210  Lock masterQueueLock(m_masterQueueMutex); // locks the queue
211  working = IsWorking();
212  m_masterActiveCondVar.notify_all();
213  m_masterQueueCondVar.notify_all();
214  while (!m_masterQueue.empty() || working)
215  {
216  // while waiting, master queue is unlocked
217  m_masterQueueCondVar.wait(masterQueueLock);
218  // on exiting wait master queue is locked again
219  working = IsWorking();
220  }
221 }

References IsWorking(), m_masterActiveCondVar, m_masterQueue, m_masterQueueCondVar, and m_masterQueueMutex.

Friends And Related Function Documentation

◆ ThreadWorkerBoost

friend class ThreadWorkerBoost
friend

So the workers can access the master queue and locks.

Definition at line 63 of file ThreadBoost.h.

Referenced by ThreadManagerBoost().

Member Data Documentation

◆ className

std::string Nektar::Thread::ThreadManagerBoost::className
staticprivate
Initial value:
=
"ThreadManagerBoost", ThreadManagerBoost::Create,
"Threading using Boost.")
tKey RegisterCreatorFunction(tKey idKey, CreatorFunction classCreator, std::string pDesc="")
Register a class with the factory.
Definition: NekFactory.hpp:198
static ThreadManagerSharedPtr Create(unsigned int numT)
Called by the factory method.
Definition: ThreadBoost.h:85
ThreadManagerFactory & GetThreadManagerFactory()
Definition: Thread.cpp:49

Definition at line 113 of file ThreadBoost.h.

◆ m_barrier

boost::barrier* Nektar::Thread::ThreadManagerBoost::m_barrier
private

Definition at line 111 of file ThreadBoost.h.

Referenced by Hold(), SetNumWorkersImpl(), ThreadManagerBoost(), and ~ThreadManagerBoost().

◆ m_chunkSize

unsigned int Nektar::Thread::ThreadManagerBoost::m_chunkSize
private

Definition at line 109 of file ThreadBoost.h.

Referenced by Nektar::Thread::ThreadWorkerBoost::GetNumToLoad(), and SetChunkSize().

◆ m_masterActiveCondVar

boost::condition_variable Nektar::Thread::ThreadManagerBoost::m_masterActiveCondVar
private

◆ m_masterActiveMutex

boost::mutex Nektar::Thread::ThreadManagerBoost::m_masterActiveMutex
private

◆ m_masterQueue

std::queue<ThreadJob *> Nektar::Thread::ThreadManagerBoost::m_masterQueue
private

◆ m_masterQueueCondVar

boost::condition_variable Nektar::Thread::ThreadManagerBoost::m_masterQueueCondVar
private

◆ m_masterQueueMutex

boost::mutex Nektar::Thread::ThreadManagerBoost::m_masterQueueMutex
private

◆ m_masterThreadId

boost::thread::id Nektar::Thread::ThreadManagerBoost::m_masterThreadId
private

Definition at line 106 of file ThreadBoost.h.

Referenced by InThread(), and ThreadManagerBoost().

◆ m_numThreads

const unsigned int Nektar::Thread::ThreadManagerBoost::m_numThreads
private

◆ m_numWorkers

unsigned int Nektar::Thread::ThreadManagerBoost::m_numWorkers
private

◆ m_schedType

SchedType Nektar::Thread::ThreadManagerBoost::m_schedType
private

Definition at line 110 of file ThreadBoost.h.

Referenced by Nektar::Thread::ThreadWorkerBoost::GetNumToLoad(), and SetSchedType().

◆ m_threadActiveList

bool* Nektar::Thread::ThreadManagerBoost::m_threadActiveList
private

◆ m_threadBusyList

bool* Nektar::Thread::ThreadManagerBoost::m_threadBusyList
private

◆ m_threadList

ThreadWorkerBoost** Nektar::Thread::ThreadManagerBoost::m_threadList
private

Definition at line 104 of file ThreadBoost.h.

Referenced by ThreadManagerBoost(), and ~ThreadManagerBoost().

◆ m_threadMap

std::map<boost::thread::id, unsigned int> Nektar::Thread::ThreadManagerBoost::m_threadMap
private

Definition at line 112 of file ThreadBoost.h.

Referenced by GetWorkerNum(), and ThreadManagerBoost().

◆ m_threadThreadList

boost::thread** Nektar::Thread::ThreadManagerBoost::m_threadThreadList
private

Definition at line 105 of file ThreadBoost.h.

Referenced by ThreadManagerBoost(), and ~ThreadManagerBoost().

◆ m_type

std::string Nektar::Thread::ThreadManagerBoost::m_type
private

Definition at line 114 of file ThreadBoost.h.

Referenced by GetType(), and ThreadManagerBoost().