Nektar++
ThreadBoost.cpp
Go to the documentation of this file.
1 ///////////////////////////////////////////////////////////////////////////////
2 //
3 // File: ThreadBoost.cpp
4 //
5 // For more information, please see: http://www.nektar.info
6 //
7 // The MIT License
8 //
9 // Copyright (c) 2006 Division of Applied Mathematics, Brown University (USA),
10 // Department of Aeronautics, Imperial College London (UK), and Scientific
11 // Computing and Imaging Institute, University of Utah (USA).
12 //
13 // Permission is hereby granted, free of charge, to any person obtaining a
14 // copy of this software and associated documentation files (the "Software"),
15 // to deal in the Software without restriction, including without limitation
16 // the rights to use, copy, modify, merge, publish, distribute, sublicense,
17 // and/or sell copies of the Software, and to permit persons to whom the
18 // Software is furnished to do so, subject to the following conditions:
19 //
20 // The above copyright notice and this permission notice shall be included
21 // in all copies or substantial portions of the Software.
22 //
23 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
24 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
25 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
26 // THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
27 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
28 // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
29 // DEALINGS IN THE SOFTWARE.
30 //
31 // Description:
32 //
33 ///////////////////////////////////////////////////////////////////////////////
34 
35 #define NOMINMAX
36 
38 #include <algorithm>
39 #include <iostream>
40 
41 namespace Nektar
42 {
43 namespace Thread
44 {
45 
48  "ThreadManagerBoost", ThreadManagerBoost::Create,
49  "Threading using Boost.");
50 
51 /**
52  * @param numWorkers The number of threads to start (including master thread).
53  * @note Do not use, use factory instead.
54  */
56  : m_numThreads(numT), m_numWorkers(numT), m_masterQueue(),
57  m_masterQueueMutex(), m_masterActiveMutex(), m_masterQueueCondVar(),
58  m_masterActiveCondVar(), m_chunkSize(1), m_schedType(e_dynamic),
59  m_threadMap()
60 {
61  using namespace std;
62  try
63  {
65  m_threadThreadList = new boost::thread *[m_numThreads];
66  m_threadBusyList = new bool[m_numThreads];
67  m_threadActiveList = new bool[m_numThreads];
68  }
69  catch (exception &e)
70  {
71  cerr << "Exception while allocating thread storage: " << e.what()
72  << endl;
73  abort();
74  }
75  unsigned int i = 0;
76  while (i < m_numThreads)
77  {
79  try
80  {
81  tw = new ThreadWorkerBoost(this, i);
82  }
83  catch (exception &e)
84  {
85  cerr << "Exception while allocating worker threads: " << e.what()
86  << endl;
87  abort();
88  }
89 
90  m_threadList[i] = tw;
91  m_threadBusyList[i] = false;
92  m_threadActiveList[i] = true;
93 
94  try
95  {
96  m_threadThreadList[i] = new boost::thread(boost::ref(*tw));
97  boost::thread::id id = m_threadThreadList[i]->get_id();
98  m_threadMap[id] = i;
99  }
100  catch (...)
101  {
102  std::cerr << "Exception while creating worker threads" << std::endl;
103  abort();
104  }
105  i++;
106  }
107  m_masterThreadId = boost::this_thread::get_id();
108  m_barrier = new boost::barrier(m_numWorkers > 0 ? m_numWorkers : 1);
109  m_type = "Threading with Boost";
110 }
111 
112 /**
113  * Terminates all running threads (they will finish their current job),
114  * releases resources and destructs.
115  */
117 {
118  // This is an immediate teardown. We attempt to kill everything.
119  // we daren't lock anything as we may cause a deadlock
120  for (unsigned int i = 0; i < m_numThreads; i++)
121  {
122  m_threadList[i]->Stop();
123  }
124 
125  m_masterQueueCondVar.notify_all();
126  m_masterActiveCondVar.notify_all();
127  for (unsigned int i = 0; i < m_numThreads; i++)
128  {
129  m_threadThreadList[i]->join();
130  delete m_threadThreadList[i];
131  delete m_threadList[i];
132  }
133 
134  delete[] m_threadList;
135  delete[] m_threadThreadList;
136  delete[] m_threadActiveList;
137  delete[] m_threadBusyList;
138  delete m_barrier;
139 }
140 
141 /**
142  *
143  */
144 void ThreadManagerBoost::v_QueueJobs(std::vector<ThreadJob *> &joblist)
145 {
146  std::vector<ThreadJob *>::iterator it;
147  for (it = joblist.begin(); it < joblist.end(); ++it)
148  {
149  QueueJob(*it);
150  }
151 }
152 
153 /*
154  *
155  */
157 {
158  Lock masterQueueLock(m_masterQueueMutex); // locks the queue
159  m_masterQueue.push(job);
160  m_masterQueueCondVar.notify_all(); // alert a waiting thread.
161 } // queue unlocked
162 
163 /**
164  *
165  */
167 {
168  bool working = false;
169  Lock masterActiveLock(m_masterActiveMutex);
170  for (unsigned int i = 0; i < m_numWorkers; i++)
171  {
172  working = working || m_threadBusyList[i];
173  }
174  return working;
175 }
176 
177 /**
178  *
179  */
180 void ThreadManagerBoost::v_SetChunkSize(unsigned int chnk)
181 {
182  Lock masterQueueLock(m_masterQueueMutex); // locks the queue
183  m_chunkSize = std::max(chnk, 1U);
184 }
185 
186 /**
187  *
188  */
190 {
191  Lock masterQueueLock(m_masterQueueMutex); // locks the queue
192  m_schedType = s;
193 }
194 
195 /**
196  *
197  */
199 {
200  boost::thread::id id = boost::this_thread::get_id();
201  return (id != m_masterThreadId);
202 }
203 
204 /**
205  *
206  */
208 {
209  bool working;
210  Lock masterQueueLock(m_masterQueueMutex); // locks the queue
211  working = IsWorking();
212  m_masterActiveCondVar.notify_all();
213  m_masterQueueCondVar.notify_all();
214  while (!m_masterQueue.empty() || working)
215  {
216  // while waiting, master queue is unlocked
217  m_masterQueueCondVar.wait(masterQueueLock);
218  // on exiting wait master queue is locked again
219  working = IsWorking();
220  }
221 }
222 
223 /**
224  *
225  */
227 {
228  return m_numWorkers;
229 }
230 
231 /**
232  *
233  */
235 {
236  boost::thread::id id = boost::this_thread::get_id();
237  return m_threadMap[id];
238 }
239 
240 /**
241  *
242  */
243 void ThreadManagerBoost::SetNumWorkersImpl(const unsigned int num)
244 {
245  Lock masterActiveLock(m_masterActiveMutex); // locks the active
246 
247  if (m_numWorkers == num)
248  {
249  return;
250  }
251 
252  delete m_barrier;
253  m_barrier = new boost::barrier(num > 0 ? num : 1);
254 
255  m_numWorkers = num;
256  for (unsigned int i = 0; i < m_numThreads; i++)
257  {
258  m_threadActiveList[i] = i < m_numWorkers ? true : false;
259  }
260  m_masterActiveCondVar.notify_all();
261 } // Lock on active released here
262 
263 /**
264  *
265  */
266 void ThreadManagerBoost::v_SetNumWorkers(const unsigned int num)
267 {
268  unsigned int n;
269  n = std::min(num, m_numThreads);
270  n = std::max(n, static_cast<unsigned int>(0));
272 }
273 
274 /**
275  *
276  */
278 {
280 }
281 
282 /**
283  *
284  */
286 {
287  return m_numThreads;
288 }
289 
290 /**
291  *
292  */
294 {
295  m_barrier->wait();
296 }
297 
298 /**
299  *
300  */
301 const std::string &ThreadManagerBoost::v_GetType() const
302 {
303  return m_type;
304 }
305 
306 /**
307  * @param threadManager Pointer to the ThreadManagerBoost that is controlling
308  * this worker.
309  * @param workerNum Unique number from 0..(number_of_threads - 1)
310  *
311  * Called by the ThreadManagerBoost instance.
312  */
314  unsigned int workerNum)
315  : m_threadManager(tm), m_workerQueue(), m_keepgoing(true),
316  m_threadNum(workerNum)
317 {
318  // Nothing to see here
319 }
320 
321 /**
322  * Winds up this thread's execution. Jobs in its queue are lost.
323  */
325 {
326  if (m_keepgoing)
327  {
328  std::cerr << "Warning: ThreadWorker: " << m_threadNum
329  << "destroyed while running!" << std::endl;
330  }
331  // on destuction the m_workerQueue will be destructed and that
332  // will destruct any ThreadJobs still in there.
333 }
334 
335 /**
336  *
337  */
339 {
340  // Lock the master queue
341  Lock masterQueueLock(m_threadManager->m_masterQueueMutex);
344  while (m_threadManager->m_masterQueue.empty() && m_keepgoing)
345  {
346  // while waiting, master queue is unlocked
347  m_threadManager->m_masterQueueCondVar.wait(masterQueueLock);
348  // on exiting wait master queue is locked again
349  }
350  bool active;
351  {
352  Lock masterActiveLock(m_threadManager->m_masterActiveMutex);
354  }
355  if (active && m_keepgoing)
356  {
357  unsigned int numToLoad = GetNumToLoad();
358  while (m_workerQueue.size() < numToLoad &&
359  !m_threadManager->m_masterQueue.empty())
360  {
361  ThreadJob *tj = m_threadManager->m_masterQueue.front();
362  m_workerQueue.push(tj);
364  }
365  }
367 } // lock on master queue released here
368 
369 /**
370  *
371  */
373 {
374  unsigned int numToLoad = 0;
375  switch (m_threadManager->m_schedType)
376  {
377  case e_guided:
378  numToLoad = std::max(
379  static_cast<unsigned long>(m_threadManager->m_chunkSize),
380  static_cast<unsigned long>(
382  (2 * m_threadManager->m_numWorkers + 1)));
383  break;
384 
385  case e_dynamic:
386  numToLoad = m_threadManager->m_chunkSize;
387  break;
388 
389  default:
390  NEKERROR(ErrorUtil::efatal, "Invalid value for SchedType.");
391  break;
392  }
393  return numToLoad;
394 }
395 
396 /**
397  *
398  */
400 {
401  Lock masterActiveLock(m_threadManager->m_masterActiveMutex);
402 
404  {
405  // while waiting, master active is unlocked
406  m_threadManager->m_masterActiveCondVar.wait(masterActiveLock);
407  // on exiting wait master active is locked again
408  }
409 }
410 
411 /**
412  *
413  */
415 {
416  while (m_keepgoing)
417  {
418  WaitForActive();
419  LoadJobs();
420  RunJobs();
421  }
422 } // exiting here should terminate the thread
423 
424 /**
425  *
426  */
428 {
429  while (!m_workerQueue.empty() && m_keepgoing)
430  {
431  ThreadJob *tj;
432  try
433  {
434  tj = m_workerQueue.front();
436  tj->Run();
437  m_workerQueue.pop();
438  delete tj;
439  }
440  catch (...)
441  {
442  // something bad happened, probably time to die
443  // maybe signal ThreadManager
444  throw;
445  }
446  }
447 }
448 
449 } // namespace Thread
450 } /* namespace Nektar */
#define NEKERROR(type, msg)
Assert Level 0 – Fundamental assert which is used whether in FULLDEBUG, DEBUG or OPT compilation mode...
Definition: ErrorUtil.hpp:209
tKey RegisterCreatorFunction(tKey idKey, CreatorFunction classCreator, std::string pDesc="")
Register a class with the factory.
Definition: NekFactory.hpp:198
Base class for tasks to be sent to the ThreadManager to run.
Definition: Thread.h:97
void SetWorkerNum(unsigned int num)
Set number of worker threads.
Definition: Thread.cpp:76
virtual void Run()=0
Implementation of ThreadManager using Boost threads.
Definition: ThreadBoost.h:59
virtual void v_QueueJob(ThreadJob *job) override
ThreadWorkerBoost ** m_threadList
Definition: ThreadBoost.h:106
virtual const std::string & v_GetType() const override
virtual ~ThreadManagerBoost()
Shuts down threading.
virtual unsigned int v_GetMaxNumWorkers() override
virtual void v_SetChunkSize(unsigned int chnk) override
virtual void v_SetSchedType(SchedType s) override
boost::condition_variable m_masterQueueCondVar
Definition: ThreadBoost.h:104
virtual bool v_InThread() override
virtual unsigned int v_GetNumWorkers() override
boost::thread::id m_masterThreadId
Definition: ThreadBoost.h:108
const unsigned int m_numThreads
Definition: ThreadBoost.h:99
std::queue< ThreadJob * > m_masterQueue
Definition: ThreadBoost.h:101
static ThreadManagerSharedPtr Create(unsigned int numT)
Called by the factory method.
Definition: ThreadBoost.h:72
virtual void v_Wait() override
std::map< boost::thread::id, unsigned int > m_threadMap
Definition: ThreadBoost.h:114
virtual unsigned int v_GetWorkerNum() override
virtual void v_Hold() override
void SetNumWorkersImpl(const unsigned int num)
virtual void v_SetNumWorkers() override
virtual void v_QueueJobs(std::vector< ThreadJob * > &joblist) override
boost::condition_variable m_masterActiveCondVar
Definition: ThreadBoost.h:105
void QueueJob(ThreadJob *job)
Pass a single job to the master queue.
Definition: Thread.h:191
Implementation class for ThreadManagerBoost.
Definition: ThreadBoost.h:127
void Stop()
A signal to shut down.
Definition: ThreadBoost.h:155
std::queue< ThreadJob * > m_workerQueue
Definition: ThreadBoost.h:171
ThreadManagerBoost * m_threadManager
Definition: ThreadBoost.h:170
SchedType
Identifies the algorithm for scheduling.
Definition: Thread.h:66
ThreadManagerFactory & GetThreadManagerFactory()
Definition: Thread.cpp:49
boost::unique_lock< boost::mutex > Lock
Definition: ThreadBoost.h:51
The above copyright notice and this permission notice shall be included.
Definition: CoupledSolver.h:2