Nektar++
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
ThreadBoost.cpp
Go to the documentation of this file.
1 ///////////////////////////////////////////////////////////////////////////////
2 //
3 // File ThreadBoost.cpp
4 //
5 // For more information, please see: http://www.nektar.info
6 //
7 // The MIT License
8 //
9 // Copyright (c) 2006 Division of Applied Mathematics, Brown University (USA),
10 // Department of Aeronautics, Imperial College London (UK), and Scientific
11 // Computing and Imaging Institute, University of Utah (USA).
12 //
13 // License for the specific language governing rights and limitations under
14 // Permission is hereby granted, free of charge, to any person obtaining a
15 // copy of this software and associated documentation files (the "Software"),
16 // to deal in the Software without restriction, including without limitation
17 // the rights to use, copy, modify, merge, publish, distribute, sublicense,
18 // and/or sell copies of the Software, and to permit persons to whom the
19 // Software is furnished to do so, subject to the following conditions:
20 //
21 // The above copyright notice and this permission notice shall be included
22 // in all copies or substantial portions of the Software.
23 //
24 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
25 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
26 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
27 // THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
28 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
29 // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
30 // DEALINGS IN THE SOFTWARE.
31 //
32 // Description:
33 //
34 ///////////////////////////////////////////////////////////////////////////////
35 
36 #define NOMINMAX
37 
39 #include <iostream>
40 #include <algorithm>
41 
42 namespace Nektar
43 {
44 namespace Thread
45 {
46 
48  GetThreadManagerFactory().RegisterCreatorFunction("ThreadManagerBoost",
49  ThreadManagerBoost::Create, "Threading using Boost.");
50 
51 
52 /**
53  * @param numWorkers The number of threads to start (including master thread).
54  * @note Do not use, use factory instead.
55  */
57  m_numThreads(numT), m_numWorkers(numT), m_masterQueue(),
58  m_masterQueueMutex(), m_masterActiveMutex(), m_masterQueueCondVar(),
59  m_masterActiveCondVar(), m_chunkSize(1), m_schedType(e_dynamic),
60  m_threadMap()
61 {
62  using namespace std;
63  try
64  {
66  m_threadThreadList = new boost::thread*[m_numThreads];
67  m_threadBusyList = new bool[m_numThreads];
68  m_threadActiveList = new bool[m_numThreads];
69  }
70  catch (exception &e)
71  {
72  cerr << "Exception while allocating thread storage: "
73  << e.what() << endl;
74  abort();
75  }
76  unsigned int i = 0;
77  while (i < m_numThreads)
78  {
80  try
81  {
82  tw = new ThreadWorkerBoost(this, i);
83  }
84  catch (exception &e)
85  {
86  cerr << "Exception while allocating worker threads: "
87  << e.what() << endl;
88  abort();
89  }
90 
91  m_threadList[i] = tw;
92  m_threadBusyList[i] = false;
93  m_threadActiveList[i] = true;
94 
95  try
96  {
97  m_threadThreadList[i] = new boost::thread(boost::ref(*tw));
98  boost::thread::id id = m_threadThreadList[i]->get_id();
99  m_threadMap[id] = i;
100  }
101  catch (...)
102  {
103  std::cerr << "Exception while creating worker threads" << std::endl;
104  abort();
105  }
106  i++;
107  }
108  m_masterThreadId = boost::this_thread::get_id();
109  m_barrier = new boost::barrier(m_numWorkers > 0 ? m_numWorkers : 1);
110  m_type = "Threading with Boost";
111 }
112 
113 
114 /**
115  * Terminates all running threads (they will finish their current job),
116  * releases resources and destructs.
117  */
119 {
120  // This is an immediate teardown. We attempt to kill everything.
121  // we daren't lock anything as we may cause a deadlock
122  for (unsigned int i=0; i<m_numThreads; i++)
123  {
124  m_threadList[i]->Stop();
125  }
126 
127  m_masterQueueCondVar.notify_all();
128  m_masterActiveCondVar.notify_all();
129  for (unsigned int i=0; i<m_numThreads; i++)
130  {
131  m_threadThreadList[i]->join();
132  delete m_threadThreadList[i];
133  delete m_threadList[i];
134  }
135 
136  delete []m_threadList;
137  delete []m_threadThreadList;
138  delete []m_threadActiveList;
139  delete []m_threadBusyList;
140  delete m_barrier;
141 }
142 
143 
144 /**
145  *
146  */
147 void ThreadManagerBoost::QueueJobs(std::vector<ThreadJob*> &joblist)
148 {
150  for (it=joblist.begin(); it<joblist.end(); ++it)
151  {
152  QueueJob(*it);
153  }
154 }
155 
156 
157 /*
158  *
159  */
161 {
162  Lock masterQueueLock(m_masterQueueMutex); // locks the queue
163  m_masterQueue.push(job);
164  m_masterQueueCondVar.notify_all(); // alert a waiting thread.
165 } // queue unlocked
166 
167 
168 /**
169  *
170  */
172 {
173  bool working = false;
174  Lock masterActiveLock(m_masterActiveMutex);
175  for (unsigned int i = 0; i < m_numWorkers; i++)
176  {
177  working = working || m_threadBusyList[i];
178  }
179  return working;
180 }
181 
182 
183 /**
184  *
185  */
186 void ThreadManagerBoost::SetChunkSize(unsigned int chnk)
187 {
188  Lock masterQueueLock(m_masterQueueMutex); // locks the queue
189  m_chunkSize = std::max(chnk, 1U);
190 }
191 
192 
193 /**
194  *
195  */
197 {
198  Lock masterQueueLock(m_masterQueueMutex); // locks the queue
199  m_schedType = s;
200 }
201 
202 
203 /**
204  *
205  */
207 {
208  boost::thread::id id = boost::this_thread::get_id();
209  return (id != m_masterThreadId);
210 }
211 
212 
213 /**
214  *
215  */
217 {
218  bool working;
219  Lock masterQueueLock(m_masterQueueMutex); // locks the queue
220  working = IsWorking();
221  m_masterActiveCondVar.notify_all();
222  m_masterQueueCondVar.notify_all();
223  while (!m_masterQueue.empty() || working)
224  {
225  // while waiting, master queue is unlocked
226  m_masterQueueCondVar.wait(masterQueueLock);
227  // on exiting wait master queue is locked again
228  working = IsWorking();
229  }
230 }
231 
232 
233 /**
234  *
235  */
237 {
238  return m_numWorkers;
239 }
240 
241 
242 /**
243  *
244  */
246 {
247  boost::thread::id id = boost::this_thread::get_id();
248  return m_threadMap[id];
249 }
250 
251 
252 /**
253  *
254  */
255 void ThreadManagerBoost::SetNumWorkersImpl(const unsigned int num)
256 {
257  Lock masterActiveLock(m_masterActiveMutex); // locks the active
258 
259  if (m_numWorkers == num)
260  {
261  return;
262  }
263 
264  delete m_barrier;
265  m_barrier = new boost::barrier(num > 0 ? num : 1);
266 
267  m_numWorkers = num;
268  for (unsigned int i = 0; i < m_numThreads; i++)
269  {
270  m_threadActiveList[i] = i < m_numWorkers ? true : false;
271  }
272  m_masterActiveCondVar.notify_all();
273 } // Lock on active released here
274 
275 
276 /**
277  *
278  */
279 void ThreadManagerBoost::SetNumWorkers(unsigned int num)
280 {
281  num = std::min(num, m_numThreads);
282  num = std::max(num, static_cast<unsigned int>(0));
283  SetNumWorkersImpl(num);
284 }
285 
286 
287 /**
288  *
289  */
291 {
293 }
294 
295 
296 /**
297  *
298  */
300 {
301  return m_numThreads;
302 }
303 
304 
305 /**
306  *
307  */
309 {
310  m_barrier->wait();
311 }
312 
313 
314 /**
315  *
316  */
317 const std::string& ThreadManagerBoost::GetType() const
318 {
319  return m_type;
320 }
321 
322 
323 /**
324  * @param threadManager Pointer to the ThreadManagerBoost that is controlling
325  * this worker.
326  * @param workerNum Unique number from 0..(number_of_threads - 1)
327  *
328  * Called by the ThreadManagerBoost instance.
329  */
331  ThreadManagerBoost *tm, unsigned int workerNum) :
332  m_threadManager(tm), m_workerQueue(),
333  m_keepgoing(true), m_threadNum(workerNum)
334 {
335  // Nothing to see here
336 
337 }
338 
339 
340 /**
341  * Winds up this thread's execution. Jobs in its queue are lost.
342  */
344 {
345  if (m_keepgoing)
346  {
347  std::cerr << "Warning: ThreadWorker: " << m_threadNum
348  << "destroyed while running!" << std::endl;
349  }
350  // on destuction the m_workerQueue will be destructed and that
351  // will destruct any ThreadJobs still in there.
352 }
353 
354 
355 /**
356  *
357  */
359 {
360  // Lock the master queue
361  Lock masterQueueLock(m_threadManager->m_masterQueueMutex);
364  while (m_threadManager->m_masterQueue.empty()
365  && m_keepgoing)
366  {
367  // while waiting, master queue is unlocked
368  m_threadManager->m_masterQueueCondVar.wait(masterQueueLock);
369  // on exiting wait master queue is locked again
370  }
371  bool active;
372  {
373  Lock masterActiveLock(m_threadManager->m_masterActiveMutex);
375  }
376  if (active && m_keepgoing)
377  {
378  unsigned int numToLoad = GetNumToLoad();
379  while (m_workerQueue.size() < numToLoad
380  && !m_threadManager->m_masterQueue.empty())
381  {
382  ThreadJob *tj = m_threadManager->m_masterQueue.front();
383  m_workerQueue.push(tj);
385  }
386  }
388 } // lock on master queue released here
389 
390 
391 /**
392  *
393  */
395 {
396  unsigned int numToLoad;
397  switch (m_threadManager->m_schedType)
398  {
399  case e_guided:
400  numToLoad = std::max(
401  static_cast<unsigned long>(m_threadManager->m_chunkSize),
402  static_cast<unsigned long>(
404  / (2*m_threadManager->m_numWorkers +1)));
405  break;
406 
407  case e_dynamic:
408  numToLoad = m_threadManager->m_chunkSize;
409  break;
410 
411  default:
412  ASSERTL0(0, "Invalid value for SchedType.");
413  break;
414  }
415  return numToLoad;
416 }
417 
418 
419 /**
420  *
421  */
423 {
424  Lock masterActiveLock(m_threadManager->m_masterActiveMutex);
425 
427  && m_keepgoing)
428  {
429  // while waiting, master active is unlocked
430  m_threadManager->m_masterActiveCondVar.wait(masterActiveLock);
431  // on exiting wait master active is locked again
432  }
433 }
434 
435 
436 /**
437  *
438  */
440 {
441  while (m_keepgoing)
442  {
443  WaitForActive();
444  LoadJobs();
445  RunJobs();
446  }
447 } // exiting here should terminate the thread
448 
449 
450 /**
451  *
452  */
454 {
455  while (!m_workerQueue.empty() && m_keepgoing)
456  {
457  ThreadJob * tj;
458  try
459  {
460  tj = m_workerQueue.front();
462  tj->Run();
463  m_workerQueue.pop();
464  delete tj;
465  } catch(...)
466  {
467  // something bad happened, probably time to die
468  // maybe signal ThreadManager
469  throw;
470  }
471  }
472 }
473 
474 } // Thread
475 } /* namespace Nektar */
virtual bool InThread()
Indicates whether the code is in a worker thread or not.
virtual unsigned int GetMaxNumWorkers()
Gets the maximum available number of threads.
ThreadManagerBoost * m_threadManager
Definition: ThreadBoost.h:162
boost::thread::id m_masterThreadId
Definition: ThreadBoost.h:109
#define ASSERTL0(condition, msg)
Definition: ErrorUtil.hpp:198
std::queue< ThreadJob * > m_workerQueue
Definition: ThreadBoost.h:163
virtual void Wait()
Waits until all queued jobs are finished.
STL namespace.
void SetWorkerNum(unsigned int num)
Set number of worker threads.
Definition: Thread.cpp:81
virtual const std::string & GetType() const
Returns a description of the type of threading.
virtual void SetSchedType(SchedType s)
Sets the current scheduling algorithm.
virtual unsigned int GetWorkerNum()
Returns the worker number of the executing thread.
boost::condition_variable m_masterQueueCondVar
Definition: ThreadBoost.h:105
virtual void QueueJobs(std::vector< ThreadJob * > &joblist)
Pass a list of tasklets to the master queue.
std::map< boost::thread::id, unsigned int > m_threadMap
Definition: ThreadBoost.h:115
SchedType
Identifies the algorithm for scheduling.
Definition: Thread.h:67
virtual void QueueJob(ThreadJob *job)
Pass a single job to the master queue.
virtual void Hold()
A calling threads holds until all active threads call this method.
virtual void SetNumWorkers()
Sets the number of active workers to the maximum.
virtual void Run()=0
virtual ~ThreadManagerBoost()
Shuts down threading.
ThreadManagerFactory & GetThreadManagerFactory()
Definition: Thread.cpp:49
void Stop()
A signal to shut down.
Definition: ThreadBoost.h:150
static ThreadManagerSharedPtr Create(unsigned int numT)
Called by the factory method.
Definition: ThreadBoost.h:87
virtual void SetChunkSize(unsigned int chnk)
Controls how many jobs are sent to each worker at a time.
StandardMatrixTag boost::call_traits< LhsDataType >::const_reference rhs typedef NekMatrix< LhsDataType, StandardMatrixTag >::iterator iterator
Base class for tasks to be sent to the ThreadManager to run.
Definition: Thread.h:99
Implementation class for ThreadManagerBoost.
Definition: ThreadBoost.h:127
void SetNumWorkersImpl(const unsigned int num)
virtual unsigned int GetNumWorkers()
Return the number of active workers.
std::queue< ThreadJob * > m_masterQueue
Definition: ThreadBoost.h:102
ThreadWorkerBoost ** m_threadList
Definition: ThreadBoost.h:107
Implementation of ThreadManager using Boost threads.
Definition: ThreadBoost.h:59
boost::condition_variable m_masterActiveCondVar
Definition: ThreadBoost.h:106
boost::unique_lock< boost::mutex > Lock
Definition: ThreadBoost.h:52
tKey RegisterCreatorFunction(tKey idKey, CreatorFunction classCreator, tDescription pDesc="")
Register a class with the factory.
Definition: NekFactory.hpp:215