Nektar++
ThreadBoost.cpp
Go to the documentation of this file.
1 ///////////////////////////////////////////////////////////////////////////////
2 //
3 // File ThreadBoost.cpp
4 //
5 // For more information, please see: http://www.nektar.info
6 //
7 // The MIT License
8 //
9 // Copyright (c) 2006 Division of Applied Mathematics, Brown University (USA),
10 // Department of Aeronautics, Imperial College London (UK), and Scientific
11 // Computing and Imaging Institute, University of Utah (USA).
12 //
13 // Permission is hereby granted, free of charge, to any person obtaining a
14 // copy of this software and associated documentation files (the "Software"),
15 // to deal in the Software without restriction, including without limitation
16 // the rights to use, copy, modify, merge, publish, distribute, sublicense,
17 // and/or sell copies of the Software, and to permit persons to whom the
18 // Software is furnished to do so, subject to the following conditions:
19 //
20 // The above copyright notice and this permission notice shall be included
21 // in all copies or substantial portions of the Software.
22 //
23 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
24 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
25 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
26 // THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
27 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
28 // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
29 // DEALINGS IN THE SOFTWARE.
30 //
31 // Description:
32 //
33 ///////////////////////////////////////////////////////////////////////////////
34 
35 #define NOMINMAX
36 
38 #include <iostream>
39 #include <algorithm>
40 
41 namespace Nektar
42 {
43 namespace Thread
44 {
45 
47  GetThreadManagerFactory().RegisterCreatorFunction("ThreadManagerBoost",
48  ThreadManagerBoost::Create, "Threading using Boost.");
49 
50 
51 /**
52  * @param numWorkers The number of threads to start (including master thread).
53  * @note Do not use, use factory instead.
54  */
56  m_numThreads(numT), m_numWorkers(numT), m_masterQueue(),
57  m_masterQueueMutex(), m_masterActiveMutex(), m_masterQueueCondVar(),
58  m_masterActiveCondVar(), m_chunkSize(1), m_schedType(e_dynamic),
59  m_threadMap()
60 {
61  using namespace std;
62  try
63  {
65  m_threadThreadList = new boost::thread*[m_numThreads];
66  m_threadBusyList = new bool[m_numThreads];
67  m_threadActiveList = new bool[m_numThreads];
68  }
69  catch (exception &e)
70  {
71  cerr << "Exception while allocating thread storage: "
72  << e.what() << endl;
73  abort();
74  }
75  unsigned int i = 0;
76  while (i < m_numThreads)
77  {
79  try
80  {
81  tw = new ThreadWorkerBoost(this, i);
82  }
83  catch (exception &e)
84  {
85  cerr << "Exception while allocating worker threads: "
86  << e.what() << endl;
87  abort();
88  }
89 
90  m_threadList[i] = tw;
91  m_threadBusyList[i] = false;
92  m_threadActiveList[i] = true;
93 
94  try
95  {
96  m_threadThreadList[i] = new boost::thread(boost::ref(*tw));
97  boost::thread::id id = m_threadThreadList[i]->get_id();
98  m_threadMap[id] = i;
99  }
100  catch (...)
101  {
102  std::cerr << "Exception while creating worker threads" << std::endl;
103  abort();
104  }
105  i++;
106  }
107  m_masterThreadId = boost::this_thread::get_id();
108  m_barrier = new boost::barrier(m_numWorkers > 0 ? m_numWorkers : 1);
109  m_type = "Threading with Boost";
110 }
111 
112 
113 /**
114  * Terminates all running threads (they will finish their current job),
115  * releases resources and destructs.
116  */
118 {
119  // This is an immediate teardown. We attempt to kill everything.
120  // we daren't lock anything as we may cause a deadlock
121  for (unsigned int i=0; i<m_numThreads; i++)
122  {
123  m_threadList[i]->Stop();
124  }
125 
126  m_masterQueueCondVar.notify_all();
127  m_masterActiveCondVar.notify_all();
128  for (unsigned int i=0; i<m_numThreads; i++)
129  {
130  m_threadThreadList[i]->join();
131  delete m_threadThreadList[i];
132  delete m_threadList[i];
133  }
134 
135  delete []m_threadList;
136  delete []m_threadThreadList;
137  delete []m_threadActiveList;
138  delete []m_threadBusyList;
139  delete m_barrier;
140 }
141 
142 
143 /**
144  *
145  */
146 void ThreadManagerBoost::QueueJobs(std::vector<ThreadJob*> &joblist)
147 {
148  std::vector<ThreadJob *>::iterator it;
149  for (it=joblist.begin(); it<joblist.end(); ++it)
150  {
151  QueueJob(*it);
152  }
153 }
154 
155 
156 /*
157  *
158  */
160 {
161  Lock masterQueueLock(m_masterQueueMutex); // locks the queue
162  m_masterQueue.push(job);
163  m_masterQueueCondVar.notify_all(); // alert a waiting thread.
164 } // queue unlocked
165 
166 
167 /**
168  *
169  */
171 {
172  bool working = false;
173  Lock masterActiveLock(m_masterActiveMutex);
174  for (unsigned int i = 0; i < m_numWorkers; i++)
175  {
176  working = working || m_threadBusyList[i];
177  }
178  return working;
179 }
180 
181 
182 /**
183  *
184  */
185 void ThreadManagerBoost::SetChunkSize(unsigned int chnk)
186 {
187  Lock masterQueueLock(m_masterQueueMutex); // locks the queue
188  m_chunkSize = std::max(chnk, 1U);
189 }
190 
191 
192 /**
193  *
194  */
196 {
197  Lock masterQueueLock(m_masterQueueMutex); // locks the queue
198  m_schedType = s;
199 }
200 
201 
202 /**
203  *
204  */
206 {
207  boost::thread::id id = boost::this_thread::get_id();
208  return (id != m_masterThreadId);
209 }
210 
211 
212 /**
213  *
214  */
216 {
217  bool working;
218  Lock masterQueueLock(m_masterQueueMutex); // locks the queue
219  working = IsWorking();
220  m_masterActiveCondVar.notify_all();
221  m_masterQueueCondVar.notify_all();
222  while (!m_masterQueue.empty() || working)
223  {
224  // while waiting, master queue is unlocked
225  m_masterQueueCondVar.wait(masterQueueLock);
226  // on exiting wait master queue is locked again
227  working = IsWorking();
228  }
229 }
230 
231 
232 /**
233  *
234  */
236 {
237  return m_numWorkers;
238 }
239 
240 
241 /**
242  *
243  */
245 {
246  boost::thread::id id = boost::this_thread::get_id();
247  return m_threadMap[id];
248 }
249 
250 
251 /**
252  *
253  */
254 void ThreadManagerBoost::SetNumWorkersImpl(const unsigned int num)
255 {
256  Lock masterActiveLock(m_masterActiveMutex); // locks the active
257 
258  if (m_numWorkers == num)
259  {
260  return;
261  }
262 
263  delete m_barrier;
264  m_barrier = new boost::barrier(num > 0 ? num : 1);
265 
266  m_numWorkers = num;
267  for (unsigned int i = 0; i < m_numThreads; i++)
268  {
269  m_threadActiveList[i] = i < m_numWorkers ? true : false;
270  }
271  m_masterActiveCondVar.notify_all();
272 } // Lock on active released here
273 
274 
275 /**
276  *
277  */
278 void ThreadManagerBoost::SetNumWorkers(const unsigned int num)
279 {
280  unsigned int n;
281  n = std::min(num, m_numThreads);
282  n = std::max(n, static_cast<unsigned int>(0));
284 }
285 
286 
287 /**
288  *
289  */
291 {
293 }
294 
295 
296 /**
297  *
298  */
300 {
301  return m_numThreads;
302 }
303 
304 
305 /**
306  *
307  */
309 {
310  m_barrier->wait();
311 }
312 
313 
314 /**
315  *
316  */
317 const std::string& ThreadManagerBoost::GetType() const
318 {
319  return m_type;
320 }
321 
322 
323 /**
324  * @param threadManager Pointer to the ThreadManagerBoost that is controlling
325  * this worker.
326  * @param workerNum Unique number from 0..(number_of_threads - 1)
327  *
328  * Called by the ThreadManagerBoost instance.
329  */
331  ThreadManagerBoost *tm, unsigned int workerNum) :
332  m_threadManager(tm), m_workerQueue(),
333  m_keepgoing(true), m_threadNum(workerNum)
334 {
335  // Nothing to see here
336 
337 }
338 
339 
340 /**
341  * Winds up this thread's execution. Jobs in its queue are lost.
342  */
344 {
345  if (m_keepgoing)
346  {
347  std::cerr << "Warning: ThreadWorker: " << m_threadNum
348  << "destroyed while running!" << std::endl;
349  }
350  // on destuction the m_workerQueue will be destructed and that
351  // will destruct any ThreadJobs still in there.
352 }
353 
354 
355 /**
356  *
357  */
359 {
360  // Lock the master queue
361  Lock masterQueueLock(m_threadManager->m_masterQueueMutex);
364  while (m_threadManager->m_masterQueue.empty()
365  && m_keepgoing)
366  {
367  // while waiting, master queue is unlocked
368  m_threadManager->m_masterQueueCondVar.wait(masterQueueLock);
369  // on exiting wait master queue is locked again
370  }
371  bool active;
372  {
373  Lock masterActiveLock(m_threadManager->m_masterActiveMutex);
375  }
376  if (active && m_keepgoing)
377  {
378  unsigned int numToLoad = GetNumToLoad();
379  while (m_workerQueue.size() < numToLoad
380  && !m_threadManager->m_masterQueue.empty())
381  {
382  ThreadJob *tj = m_threadManager->m_masterQueue.front();
383  m_workerQueue.push(tj);
385  }
386  }
388 } // lock on master queue released here
389 
390 
391 /**
392  *
393  */
395 {
396  unsigned int numToLoad = 0;
397  switch (m_threadManager->m_schedType)
398  {
399  case e_guided:
400  numToLoad = std::max(
401  static_cast<unsigned long>(m_threadManager->m_chunkSize),
402  static_cast<unsigned long>(
404  / (2*m_threadManager->m_numWorkers +1)));
405  break;
406 
407  case e_dynamic:
408  numToLoad = m_threadManager->m_chunkSize;
409  break;
410 
411  default:
412  NEKERROR(ErrorUtil::efatal, "Invalid value for SchedType.");
413  break;
414  }
415  return numToLoad;
416 }
417 
418 
419 /**
420  *
421  */
423 {
424  Lock masterActiveLock(m_threadManager->m_masterActiveMutex);
425 
427  && m_keepgoing)
428  {
429  // while waiting, master active is unlocked
430  m_threadManager->m_masterActiveCondVar.wait(masterActiveLock);
431  // on exiting wait master active is locked again
432  }
433 }
434 
435 
436 /**
437  *
438  */
440 {
441  while (m_keepgoing)
442  {
443  WaitForActive();
444  LoadJobs();
445  RunJobs();
446  }
447 } // exiting here should terminate the thread
448 
449 
450 /**
451  *
452  */
454 {
455  while (!m_workerQueue.empty() && m_keepgoing)
456  {
457  ThreadJob * tj;
458  try
459  {
460  tj = m_workerQueue.front();
462  tj->Run();
463  m_workerQueue.pop();
464  delete tj;
465  } catch(...)
466  {
467  // something bad happened, probably time to die
468  // maybe signal ThreadManager
469  throw;
470  }
471  }
472 }
473 
474 } // Thread
475 } /* namespace Nektar */
virtual bool InThread()
Indicates whether the code is in a worker thread or not.
virtual unsigned int GetMaxNumWorkers()
Gets the maximum available number of threads.
ThreadManagerBoost * m_threadManager
Definition: ThreadBoost.h:161
boost::thread::id m_masterThreadId
Definition: ThreadBoost.h:108
#define NEKERROR(type, msg)
Assert Level 0 – Fundamental assert which is used whether in FULLDEBUG, DEBUG or OPT compilation mod...
Definition: ErrorUtil.hpp:209
std::queue< ThreadJob * > m_workerQueue
Definition: ThreadBoost.h:162
const unsigned int m_numThreads
Definition: ThreadBoost.h:99
virtual void Wait()
Waits until all queued jobs are finished.
STL namespace.
void SetWorkerNum(unsigned int num)
Set number of worker threads.
Definition: Thread.cpp:79
virtual void SetSchedType(SchedType s)
Sets the current scheduling algorithm.
virtual unsigned int GetWorkerNum()
Returns the worker number of the executing thread.
boost::condition_variable m_masterQueueCondVar
Definition: ThreadBoost.h:104
std::map< boost::thread::id, unsigned int > m_threadMap
Definition: ThreadBoost.h:114
SchedType
Identifies the algorithm for scheduling.
Definition: Thread.h:65
virtual void QueueJob(ThreadJob *job)
Pass a single job to the master queue.
virtual void Hold()
A calling threads holds until all active threads call this method.
virtual void QueueJobs(std::vector< ThreadJob *> &joblist)
Pass a list of tasklets to the master queue.
virtual void SetNumWorkers()
Sets the number of active workers to the maximum.
virtual void Run()=0
virtual ~ThreadManagerBoost()
Shuts down threading.
ThreadManagerFactory & GetThreadManagerFactory()
Definition: Thread.cpp:50
void Stop()
A signal to shut down.
Definition: ThreadBoost.h:149
static ThreadManagerSharedPtr Create(unsigned int numT)
Called by the factory method.
Definition: ThreadBoost.h:86
virtual void SetChunkSize(unsigned int chnk)
Controls how many jobs are sent to each worker at a time.
Base class for tasks to be sent to the ThreadManager to run.
Definition: Thread.h:97
Implementation class for ThreadManagerBoost.
Definition: ThreadBoost.h:126
void SetNumWorkersImpl(const unsigned int num)
virtual unsigned int GetNumWorkers()
Return the number of active workers.
tKey RegisterCreatorFunction(tKey idKey, CreatorFunction classCreator, std::string pDesc="")
Register a class with the factory.
Definition: NekFactory.hpp:199
std::queue< ThreadJob * > m_masterQueue
Definition: ThreadBoost.h:101
virtual const std::string & GetType() const
Returns a description of the type of threading.
ThreadWorkerBoost ** m_threadList
Definition: ThreadBoost.h:106
Implementation of ThreadManager using Boost threads.
Definition: ThreadBoost.h:58
boost::condition_variable m_masterActiveCondVar
Definition: ThreadBoost.h:105
boost::unique_lock< boost::mutex > Lock
Definition: ThreadBoost.h:51