Nektar++
ThreadBoost.cpp
Go to the documentation of this file.
1 ///////////////////////////////////////////////////////////////////////////////
2 //
3 // File ThreadBoost.cpp
4 //
5 // For more information, please see: http://www.nektar.info
6 //
7 // The MIT License
8 //
9 // Copyright (c) 2006 Division of Applied Mathematics, Brown University (USA),
10 // Department of Aeronautics, Imperial College London (UK), and Scientific
11 // Computing and Imaging Institute, University of Utah (USA).
12 //
13 // Permission is hereby granted, free of charge, to any person obtaining a
14 // copy of this software and associated documentation files (the "Software"),
15 // to deal in the Software without restriction, including without limitation
16 // the rights to use, copy, modify, merge, publish, distribute, sublicense,
17 // and/or sell copies of the Software, and to permit persons to whom the
18 // Software is furnished to do so, subject to the following conditions:
19 //
20 // The above copyright notice and this permission notice shall be included
21 // in all copies or substantial portions of the Software.
22 //
23 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
24 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
25 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
26 // THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
27 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
28 // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
29 // DEALINGS IN THE SOFTWARE.
30 //
31 // Description:
32 //
33 ///////////////////////////////////////////////////////////////////////////////
34 
35 #define NOMINMAX
36 
38 #include <iostream>
39 #include <algorithm>
40 
41 namespace Nektar
42 {
43 namespace Thread
44 {
45 
47  GetThreadManagerFactory().RegisterCreatorFunction("ThreadManagerBoost",
48  ThreadManagerBoost::Create, "Threading using Boost.");
49 
50 
51 /**
52  * @param numWorkers The number of threads to start (including master thread).
53  * @note Do not use, use factory instead.
54  */
56  m_numThreads(numT), m_numWorkers(numT), m_masterQueue(),
57  m_masterQueueMutex(), m_masterActiveMutex(), m_masterQueueCondVar(),
58  m_masterActiveCondVar(), m_chunkSize(1), m_schedType(e_dynamic),
59  m_threadMap()
60 {
61  using namespace std;
62  try
63  {
65  m_threadThreadList = new boost::thread*[m_numThreads];
66  m_threadBusyList = new bool[m_numThreads];
67  m_threadActiveList = new bool[m_numThreads];
68  }
69  catch (exception &e)
70  {
71  cerr << "Exception while allocating thread storage: "
72  << e.what() << endl;
73  abort();
74  }
75  unsigned int i = 0;
76  while (i < m_numThreads)
77  {
79  try
80  {
81  tw = new ThreadWorkerBoost(this, i);
82  }
83  catch (exception &e)
84  {
85  cerr << "Exception while allocating worker threads: "
86  << e.what() << endl;
87  abort();
88  }
89 
90  m_threadList[i] = tw;
91  m_threadBusyList[i] = false;
92  m_threadActiveList[i] = true;
93 
94  try
95  {
96  m_threadThreadList[i] = new boost::thread(boost::ref(*tw));
97  boost::thread::id id = m_threadThreadList[i]->get_id();
98  m_threadMap[id] = i;
99  }
100  catch (...)
101  {
102  std::cerr << "Exception while creating worker threads" << std::endl;
103  abort();
104  }
105  i++;
106  }
107  m_masterThreadId = boost::this_thread::get_id();
108  m_barrier = new boost::barrier(m_numWorkers > 0 ? m_numWorkers : 1);
109  m_type = "Threading with Boost";
110 }
111 
112 
113 /**
114  * Terminates all running threads (they will finish their current job),
115  * releases resources and destructs.
116  */
118 {
119  // This is an immediate teardown. We attempt to kill everything.
120  // we daren't lock anything as we may cause a deadlock
121  for (unsigned int i=0; i<m_numThreads; i++)
122  {
123  m_threadList[i]->Stop();
124  }
125 
126  m_masterQueueCondVar.notify_all();
127  m_masterActiveCondVar.notify_all();
128  for (unsigned int i=0; i<m_numThreads; i++)
129  {
130  m_threadThreadList[i]->join();
131  delete m_threadThreadList[i];
132  delete m_threadList[i];
133  }
134 
135  delete []m_threadList;
136  delete []m_threadThreadList;
137  delete []m_threadActiveList;
138  delete []m_threadBusyList;
139  delete m_barrier;
140 }
141 
142 
143 /**
144  *
145  */
146 void ThreadManagerBoost::QueueJobs(std::vector<ThreadJob*> &joblist)
147 {
148  std::vector<ThreadJob *>::iterator it;
149  for (it=joblist.begin(); it<joblist.end(); ++it)
150  {
151  QueueJob(*it);
152  }
153 }
154 
155 
156 /*
157  *
158  */
160 {
161  Lock masterQueueLock(m_masterQueueMutex); // locks the queue
162  m_masterQueue.push(job);
163  m_masterQueueCondVar.notify_all(); // alert a waiting thread.
164 } // queue unlocked
165 
166 
167 /**
168  *
169  */
171 {
172  bool working = false;
173  Lock masterActiveLock(m_masterActiveMutex);
174  for (unsigned int i = 0; i < m_numWorkers; i++)
175  {
176  working = working || m_threadBusyList[i];
177  }
178  return working;
179 }
180 
181 
182 /**
183  *
184  */
185 void ThreadManagerBoost::SetChunkSize(unsigned int chnk)
186 {
187  Lock masterQueueLock(m_masterQueueMutex); // locks the queue
188  m_chunkSize = std::max(chnk, 1U);
189 }
190 
191 
192 /**
193  *
194  */
196 {
197  Lock masterQueueLock(m_masterQueueMutex); // locks the queue
198  m_schedType = s;
199 }
200 
201 
202 /**
203  *
204  */
206 {
207  boost::thread::id id = boost::this_thread::get_id();
208  return (id != m_masterThreadId);
209 }
210 
211 
212 /**
213  *
214  */
216 {
217  bool working;
218  Lock masterQueueLock(m_masterQueueMutex); // locks the queue
219  working = IsWorking();
220  m_masterActiveCondVar.notify_all();
221  m_masterQueueCondVar.notify_all();
222  while (!m_masterQueue.empty() || working)
223  {
224  // while waiting, master queue is unlocked
225  m_masterQueueCondVar.wait(masterQueueLock);
226  // on exiting wait master queue is locked again
227  working = IsWorking();
228  }
229 }
230 
231 
232 /**
233  *
234  */
236 {
237  return m_numWorkers;
238 }
239 
240 
241 /**
242  *
243  */
245 {
246  boost::thread::id id = boost::this_thread::get_id();
247  return m_threadMap[id];
248 }
249 
250 
251 /**
252  *
253  */
254 void ThreadManagerBoost::SetNumWorkersImpl(const unsigned int num)
255 {
256  Lock masterActiveLock(m_masterActiveMutex); // locks the active
257 
258  if (m_numWorkers == num)
259  {
260  return;
261  }
262 
263  delete m_barrier;
264  m_barrier = new boost::barrier(num > 0 ? num : 1);
265 
266  m_numWorkers = num;
267  for (unsigned int i = 0; i < m_numThreads; i++)
268  {
269  m_threadActiveList[i] = i < m_numWorkers ? true : false;
270  }
271  m_masterActiveCondVar.notify_all();
272 } // Lock on active released here
273 
274 
275 /**
276  *
277  */
278 void ThreadManagerBoost::SetNumWorkers(const unsigned int num)
279 {
280  unsigned int n;
281  n = std::min(num, m_numThreads);
282  n = std::max(n, static_cast<unsigned int>(0));
284 }
285 
286 
287 /**
288  *
289  */
291 {
293 }
294 
295 
296 /**
297  *
298  */
300 {
301  return m_numThreads;
302 }
303 
304 
305 /**
306  *
307  */
309 {
310  m_barrier->wait();
311 }
312 
313 
314 /**
315  *
316  */
317 const std::string& ThreadManagerBoost::GetType() const
318 {
319  return m_type;
320 }
321 
322 
323 /**
324  * @param threadManager Pointer to the ThreadManagerBoost that is controlling
325  * this worker.
326  * @param workerNum Unique number from 0..(number_of_threads - 1)
327  *
328  * Called by the ThreadManagerBoost instance.
329  */
331  ThreadManagerBoost *tm, unsigned int workerNum) :
332  m_threadManager(tm), m_workerQueue(),
333  m_keepgoing(true), m_threadNum(workerNum)
334 {
335  // Nothing to see here
336 
337 }
338 
339 
340 /**
341  * Winds up this thread's execution. Jobs in its queue are lost.
342  */
344 {
345  if (m_keepgoing)
346  {
347  std::cerr << "Warning: ThreadWorker: " << m_threadNum
348  << "destroyed while running!" << std::endl;
349  }
350  // on destuction the m_workerQueue will be destructed and that
351  // will destruct any ThreadJobs still in there.
352 }
353 
354 
355 /**
356  *
357  */
359 {
360  // Lock the master queue
361  Lock masterQueueLock(m_threadManager->m_masterQueueMutex);
364  while (m_threadManager->m_masterQueue.empty()
365  && m_keepgoing)
366  {
367  // while waiting, master queue is unlocked
368  m_threadManager->m_masterQueueCondVar.wait(masterQueueLock);
369  // on exiting wait master queue is locked again
370  }
371  bool active;
372  {
373  Lock masterActiveLock(m_threadManager->m_masterActiveMutex);
375  }
376  if (active && m_keepgoing)
377  {
378  unsigned int numToLoad = GetNumToLoad();
379  while (m_workerQueue.size() < numToLoad
380  && !m_threadManager->m_masterQueue.empty())
381  {
382  ThreadJob *tj = m_threadManager->m_masterQueue.front();
383  m_workerQueue.push(tj);
385  }
386  }
388 } // lock on master queue released here
389 
390 
391 /**
392  *
393  */
395 {
396  unsigned int numToLoad = 0;
397  switch (m_threadManager->m_schedType)
398  {
399  case e_guided:
400  numToLoad = std::max(
401  static_cast<unsigned long>(m_threadManager->m_chunkSize),
402  static_cast<unsigned long>(
404  / (2*m_threadManager->m_numWorkers +1)));
405  break;
406 
407  case e_dynamic:
408  numToLoad = m_threadManager->m_chunkSize;
409  break;
410 
411  default:
412  NEKERROR(ErrorUtil::efatal, "Invalid value for SchedType.");
413  break;
414  }
415  return numToLoad;
416 }
417 
418 
419 /**
420  *
421  */
423 {
424  Lock masterActiveLock(m_threadManager->m_masterActiveMutex);
425 
427  && m_keepgoing)
428  {
429  // while waiting, master active is unlocked
430  m_threadManager->m_masterActiveCondVar.wait(masterActiveLock);
431  // on exiting wait master active is locked again
432  }
433 }
434 
435 
436 /**
437  *
438  */
440 {
441  while (m_keepgoing)
442  {
443  WaitForActive();
444  LoadJobs();
445  RunJobs();
446  }
447 } // exiting here should terminate the thread
448 
449 
450 /**
451  *
452  */
454 {
455  while (!m_workerQueue.empty() && m_keepgoing)
456  {
457  ThreadJob * tj;
458  try
459  {
460  tj = m_workerQueue.front();
462  tj->Run();
463  m_workerQueue.pop();
464  delete tj;
465  } catch(...)
466  {
467  // something bad happened, probably time to die
468  // maybe signal ThreadManager
469  throw;
470  }
471  }
472 }
473 
474 } // Thread
475 } /* namespace Nektar */
#define NEKERROR(type, msg)
Assert Level 0 – Fundamental assert which is used whether in FULLDEBUG, DEBUG or OPT compilation mode...
Definition: ErrorUtil.hpp:209
tKey RegisterCreatorFunction(tKey idKey, CreatorFunction classCreator, std::string pDesc="")
Register a class with the factory.
Definition: NekFactory.hpp:200
Base class for tasks to be sent to the ThreadManager to run.
Definition: Thread.h:98
void SetWorkerNum(unsigned int num)
Set number of worker threads.
Definition: Thread.cpp:79
virtual void Run()=0
Implementation of ThreadManager using Boost threads.
Definition: ThreadBoost.h:59
ThreadWorkerBoost ** m_threadList
Definition: ThreadBoost.h:106
virtual const std::string & GetType() const
Returns a description of the type of threading.
virtual unsigned int GetWorkerNum()
Returns the worker number of the executing thread.
virtual ~ThreadManagerBoost()
Shuts down threading.
virtual void QueueJob(ThreadJob *job)
Pass a single job to the master queue.
std::queue< ThreadJob * > m_masterQueue
Definition: ThreadBoost.h:101
boost::condition_variable m_masterQueueCondVar
Definition: ThreadBoost.h:104
virtual bool InThread()
Indicates whether the code is in a worker thread or not.
boost::thread::id m_masterThreadId
Definition: ThreadBoost.h:108
const unsigned int m_numThreads
Definition: ThreadBoost.h:99
virtual void SetNumWorkers()
Sets the number of active workers to the maximum.
static ThreadManagerSharedPtr Create(unsigned int numT)
Called by the factory method.
Definition: ThreadBoost.h:86
virtual void Hold()
A calling threads holds until all active threads call this method.
std::map< boost::thread::id, unsigned int > m_threadMap
Definition: ThreadBoost.h:114
virtual unsigned int GetNumWorkers()
Return the number of active workers.
void SetNumWorkersImpl(const unsigned int num)
virtual unsigned int GetMaxNumWorkers()
Gets the maximum available number of threads.
boost::condition_variable m_masterActiveCondVar
Definition: ThreadBoost.h:105
virtual void SetChunkSize(unsigned int chnk)
Controls how many jobs are sent to each worker at a time.
virtual void QueueJobs(std::vector< ThreadJob * > &joblist)
Pass a list of tasklets to the master queue.
virtual void SetSchedType(SchedType s)
Sets the current scheduling algorithm.
virtual void Wait()
Waits until all queued jobs are finished.
Implementation class for ThreadManagerBoost.
Definition: ThreadBoost.h:127
void Stop()
A signal to shut down.
Definition: ThreadBoost.h:149
std::queue< ThreadJob * > m_workerQueue
Definition: ThreadBoost.h:162
ThreadManagerBoost * m_threadManager
Definition: ThreadBoost.h:161
SchedType
Identifies the algorithm for scheduling.
Definition: Thread.h:66
ThreadManagerFactory & GetThreadManagerFactory()
Definition: Thread.cpp:50
boost::unique_lock< boost::mutex > Lock
Definition: ThreadBoost.h:51
The above copyright notice and this permission notice shall be included.
Definition: CoupledSolver.h:1