Nektar++
ThreadBoost.cpp
Go to the documentation of this file.
1 ///////////////////////////////////////////////////////////////////////////////
2 //
3 // File ThreadBoost.cpp
4 //
5 // For more information, please see: http://www.nektar.info
6 //
7 // The MIT License
8 //
9 // Copyright (c) 2006 Division of Applied Mathematics, Brown University (USA),
10 // Department of Aeronautics, Imperial College London (UK), and Scientific
11 // Computing and Imaging Institute, University of Utah (USA).
12 //
13 // Permission is hereby granted, free of charge, to any person obtaining a
14 // copy of this software and associated documentation files (the "Software"),
15 // to deal in the Software without restriction, including without limitation
16 // the rights to use, copy, modify, merge, publish, distribute, sublicense,
17 // and/or sell copies of the Software, and to permit persons to whom the
18 // Software is furnished to do so, subject to the following conditions:
19 //
20 // The above copyright notice and this permission notice shall be included
21 // in all copies or substantial portions of the Software.
22 //
23 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
24 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
25 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
26 // THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
27 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
28 // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
29 // DEALINGS IN THE SOFTWARE.
30 //
31 // Description:
32 //
33 ///////////////////////////////////////////////////////////////////////////////
34 
35 #define NOMINMAX
36 
38 #include <algorithm>
39 #include <iostream>
40 
41 namespace Nektar
42 {
43 namespace Thread
44 {
45 
48  "ThreadManagerBoost", ThreadManagerBoost::Create,
49  "Threading using Boost.");
50 
51 /**
52  * @param numWorkers The number of threads to start (including master thread).
53  * @note Do not use, use factory instead.
54  */
56  : m_numThreads(numT), m_numWorkers(numT), m_masterQueue(),
57  m_masterQueueMutex(), m_masterActiveMutex(), m_masterQueueCondVar(),
58  m_masterActiveCondVar(), m_chunkSize(1), m_schedType(e_dynamic),
59  m_threadMap()
60 {
61  using namespace std;
62  try
63  {
65  m_threadThreadList = new boost::thread *[m_numThreads];
66  m_threadBusyList = new bool[m_numThreads];
67  m_threadActiveList = new bool[m_numThreads];
68  }
69  catch (exception &e)
70  {
71  cerr << "Exception while allocating thread storage: " << e.what()
72  << endl;
73  abort();
74  }
75  unsigned int i = 0;
76  while (i < m_numThreads)
77  {
79  try
80  {
81  tw = new ThreadWorkerBoost(this, i);
82  }
83  catch (exception &e)
84  {
85  cerr << "Exception while allocating worker threads: " << e.what()
86  << endl;
87  abort();
88  }
89 
90  m_threadList[i] = tw;
91  m_threadBusyList[i] = false;
92  m_threadActiveList[i] = true;
93 
94  try
95  {
96  m_threadThreadList[i] = new boost::thread(boost::ref(*tw));
97  boost::thread::id id = m_threadThreadList[i]->get_id();
98  m_threadMap[id] = i;
99  }
100  catch (...)
101  {
102  std::cerr << "Exception while creating worker threads" << std::endl;
103  abort();
104  }
105  i++;
106  }
107  m_masterThreadId = boost::this_thread::get_id();
108  m_barrier = new boost::barrier(m_numWorkers > 0 ? m_numWorkers : 1);
109  m_type = "Threading with Boost";
110 }
111 
112 /**
113  * Terminates all running threads (they will finish their current job),
114  * releases resources and destructs.
115  */
117 {
118  // This is an immediate teardown. We attempt to kill everything.
119  // we daren't lock anything as we may cause a deadlock
120  for (unsigned int i = 0; i < m_numThreads; i++)
121  {
122  m_threadList[i]->Stop();
123  }
124 
125  m_masterQueueCondVar.notify_all();
126  m_masterActiveCondVar.notify_all();
127  for (unsigned int i = 0; i < m_numThreads; i++)
128  {
129  m_threadThreadList[i]->join();
130  delete m_threadThreadList[i];
131  delete m_threadList[i];
132  }
133 
134  delete[] m_threadList;
135  delete[] m_threadThreadList;
136  delete[] m_threadActiveList;
137  delete[] m_threadBusyList;
138  delete m_barrier;
139 }
140 
141 /**
142  *
143  */
144 void ThreadManagerBoost::QueueJobs(std::vector<ThreadJob *> &joblist)
145 {
146  std::vector<ThreadJob *>::iterator it;
147  for (it = joblist.begin(); it < joblist.end(); ++it)
148  {
149  QueueJob(*it);
150  }
151 }
152 
153 /*
154  *
155  */
157 {
158  Lock masterQueueLock(m_masterQueueMutex); // locks the queue
159  m_masterQueue.push(job);
160  m_masterQueueCondVar.notify_all(); // alert a waiting thread.
161 } // queue unlocked
162 
163 /**
164  *
165  */
167 {
168  bool working = false;
169  Lock masterActiveLock(m_masterActiveMutex);
170  for (unsigned int i = 0; i < m_numWorkers; i++)
171  {
172  working = working || m_threadBusyList[i];
173  }
174  return working;
175 }
176 
177 /**
178  *
179  */
180 void ThreadManagerBoost::SetChunkSize(unsigned int chnk)
181 {
182  Lock masterQueueLock(m_masterQueueMutex); // locks the queue
183  m_chunkSize = std::max(chnk, 1U);
184 }
185 
186 /**
187  *
188  */
190 {
191  Lock masterQueueLock(m_masterQueueMutex); // locks the queue
192  m_schedType = s;
193 }
194 
195 /**
196  *
197  */
199 {
200  boost::thread::id id = boost::this_thread::get_id();
201  return (id != m_masterThreadId);
202 }
203 
204 /**
205  *
206  */
208 {
209  bool working;
210  Lock masterQueueLock(m_masterQueueMutex); // locks the queue
211  working = IsWorking();
212  m_masterActiveCondVar.notify_all();
213  m_masterQueueCondVar.notify_all();
214  while (!m_masterQueue.empty() || working)
215  {
216  // while waiting, master queue is unlocked
217  m_masterQueueCondVar.wait(masterQueueLock);
218  // on exiting wait master queue is locked again
219  working = IsWorking();
220  }
221 }
222 
223 /**
224  *
225  */
227 {
228  return m_numWorkers;
229 }
230 
231 /**
232  *
233  */
235 {
236  boost::thread::id id = boost::this_thread::get_id();
237  return m_threadMap[id];
238 }
239 
240 /**
241  *
242  */
243 void ThreadManagerBoost::SetNumWorkersImpl(const unsigned int num)
244 {
245  Lock masterActiveLock(m_masterActiveMutex); // locks the active
246 
247  if (m_numWorkers == num)
248  {
249  return;
250  }
251 
252  delete m_barrier;
253  m_barrier = new boost::barrier(num > 0 ? num : 1);
254 
255  m_numWorkers = num;
256  for (unsigned int i = 0; i < m_numThreads; i++)
257  {
258  m_threadActiveList[i] = i < m_numWorkers ? true : false;
259  }
260  m_masterActiveCondVar.notify_all();
261 } // Lock on active released here
262 
263 /**
264  *
265  */
266 void ThreadManagerBoost::SetNumWorkers(const unsigned int num)
267 {
268  unsigned int n;
269  n = std::min(num, m_numThreads);
270  n = std::max(n, static_cast<unsigned int>(0));
272 }
273 
274 /**
275  *
276  */
278 {
280 }
281 
282 /**
283  *
284  */
286 {
287  return m_numThreads;
288 }
289 
290 /**
291  *
292  */
294 {
295  m_barrier->wait();
296 }
297 
298 /**
299  *
300  */
301 const std::string &ThreadManagerBoost::GetType() const
302 {
303  return m_type;
304 }
305 
306 /**
307  * @param threadManager Pointer to the ThreadManagerBoost that is controlling
308  * this worker.
309  * @param workerNum Unique number from 0..(number_of_threads - 1)
310  *
311  * Called by the ThreadManagerBoost instance.
312  */
314  unsigned int workerNum)
315  : m_threadManager(tm), m_workerQueue(), m_keepgoing(true),
316  m_threadNum(workerNum)
317 {
318  // Nothing to see here
319 }
320 
321 /**
322  * Winds up this thread's execution. Jobs in its queue are lost.
323  */
325 {
326  if (m_keepgoing)
327  {
328  std::cerr << "Warning: ThreadWorker: " << m_threadNum
329  << "destroyed while running!" << std::endl;
330  }
331  // on destuction the m_workerQueue will be destructed and that
332  // will destruct any ThreadJobs still in there.
333 }
334 
335 /**
336  *
337  */
339 {
340  // Lock the master queue
341  Lock masterQueueLock(m_threadManager->m_masterQueueMutex);
344  while (m_threadManager->m_masterQueue.empty() && m_keepgoing)
345  {
346  // while waiting, master queue is unlocked
347  m_threadManager->m_masterQueueCondVar.wait(masterQueueLock);
348  // on exiting wait master queue is locked again
349  }
350  bool active;
351  {
352  Lock masterActiveLock(m_threadManager->m_masterActiveMutex);
354  }
355  if (active && m_keepgoing)
356  {
357  unsigned int numToLoad = GetNumToLoad();
358  while (m_workerQueue.size() < numToLoad &&
359  !m_threadManager->m_masterQueue.empty())
360  {
361  ThreadJob *tj = m_threadManager->m_masterQueue.front();
362  m_workerQueue.push(tj);
364  }
365  }
367 } // lock on master queue released here
368 
369 /**
370  *
371  */
373 {
374  unsigned int numToLoad = 0;
375  switch (m_threadManager->m_schedType)
376  {
377  case e_guided:
378  numToLoad = std::max(
379  static_cast<unsigned long>(m_threadManager->m_chunkSize),
380  static_cast<unsigned long>(
382  (2 * m_threadManager->m_numWorkers + 1)));
383  break;
384 
385  case e_dynamic:
386  numToLoad = m_threadManager->m_chunkSize;
387  break;
388 
389  default:
390  NEKERROR(ErrorUtil::efatal, "Invalid value for SchedType.");
391  break;
392  }
393  return numToLoad;
394 }
395 
396 /**
397  *
398  */
400 {
401  Lock masterActiveLock(m_threadManager->m_masterActiveMutex);
402 
404  {
405  // while waiting, master active is unlocked
406  m_threadManager->m_masterActiveCondVar.wait(masterActiveLock);
407  // on exiting wait master active is locked again
408  }
409 }
410 
411 /**
412  *
413  */
415 {
416  while (m_keepgoing)
417  {
418  WaitForActive();
419  LoadJobs();
420  RunJobs();
421  }
422 } // exiting here should terminate the thread
423 
424 /**
425  *
426  */
428 {
429  while (!m_workerQueue.empty() && m_keepgoing)
430  {
431  ThreadJob *tj;
432  try
433  {
434  tj = m_workerQueue.front();
436  tj->Run();
437  m_workerQueue.pop();
438  delete tj;
439  }
440  catch (...)
441  {
442  // something bad happened, probably time to die
443  // maybe signal ThreadManager
444  throw;
445  }
446  }
447 }
448 
449 } // namespace Thread
450 } /* namespace Nektar */
#define NEKERROR(type, msg)
Assert Level 0 – Fundamental assert which is used whether in FULLDEBUG, DEBUG or OPT compilation mode...
Definition: ErrorUtil.hpp:209
tKey RegisterCreatorFunction(tKey idKey, CreatorFunction classCreator, std::string pDesc="")
Register a class with the factory.
Definition: NekFactory.hpp:198
Base class for tasks to be sent to the ThreadManager to run.
Definition: Thread.h:97
void SetWorkerNum(unsigned int num)
Set number of worker threads.
Definition: Thread.cpp:76
virtual void Run()=0
Implementation of ThreadManager using Boost threads.
Definition: ThreadBoost.h:59
ThreadWorkerBoost ** m_threadList
Definition: ThreadBoost.h:104
virtual const std::string & GetType() const
Returns a description of the type of threading.
virtual unsigned int GetWorkerNum()
Returns the worker number of the executing thread.
virtual ~ThreadManagerBoost()
Shuts down threading.
virtual void QueueJob(ThreadJob *job)
Pass a single job to the master queue.
boost::condition_variable m_masterQueueCondVar
Definition: ThreadBoost.h:102
virtual bool InThread()
Indicates whether the code is in a worker thread or not.
boost::thread::id m_masterThreadId
Definition: ThreadBoost.h:106
const unsigned int m_numThreads
Definition: ThreadBoost.h:97
virtual void SetNumWorkers()
Sets the number of active workers to the maximum.
std::queue< ThreadJob * > m_masterQueue
Definition: ThreadBoost.h:99
static ThreadManagerSharedPtr Create(unsigned int numT)
Called by the factory method.
Definition: ThreadBoost.h:85
virtual void Hold()
A calling threads holds until all active threads call this method.
std::map< boost::thread::id, unsigned int > m_threadMap
Definition: ThreadBoost.h:112
virtual unsigned int GetNumWorkers()
Return the number of active workers.
void SetNumWorkersImpl(const unsigned int num)
virtual unsigned int GetMaxNumWorkers()
Gets the maximum available number of threads.
boost::condition_variable m_masterActiveCondVar
Definition: ThreadBoost.h:103
virtual void SetChunkSize(unsigned int chnk)
Controls how many jobs are sent to each worker at a time.
virtual void QueueJobs(std::vector< ThreadJob * > &joblist)
Pass a list of tasklets to the master queue.
virtual void SetSchedType(SchedType s)
Sets the current scheduling algorithm.
virtual void Wait()
Waits until all queued jobs are finished.
Implementation class for ThreadManagerBoost.
Definition: ThreadBoost.h:125
void Stop()
A signal to shut down.
Definition: ThreadBoost.h:153
std::queue< ThreadJob * > m_workerQueue
Definition: ThreadBoost.h:169
ThreadManagerBoost * m_threadManager
Definition: ThreadBoost.h:168
SchedType
Identifies the algorithm for scheduling.
Definition: Thread.h:66
ThreadManagerFactory & GetThreadManagerFactory()
Definition: Thread.cpp:49
boost::unique_lock< boost::mutex > Lock
Definition: ThreadBoost.h:51
The above copyright notice and this permission notice shall be included.
Definition: CoupledSolver.h:1