Nektar++
ThreadBoost.cpp
Go to the documentation of this file.
1///////////////////////////////////////////////////////////////////////////////
2//
3// File: ThreadBoost.cpp
4//
5// For more information, please see: http://www.nektar.info
6//
7// The MIT License
8//
9// Copyright (c) 2006 Division of Applied Mathematics, Brown University (USA),
10// Department of Aeronautics, Imperial College London (UK), and Scientific
11// Computing and Imaging Institute, University of Utah (USA).
12//
13// Permission is hereby granted, free of charge, to any person obtaining a
14// copy of this software and associated documentation files (the "Software"),
15// to deal in the Software without restriction, including without limitation
16// the rights to use, copy, modify, merge, publish, distribute, sublicense,
17// and/or sell copies of the Software, and to permit persons to whom the
18// Software is furnished to do so, subject to the following conditions:
19//
20// The above copyright notice and this permission notice shall be included
21// in all copies or substantial portions of the Software.
22//
23// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
24// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
25// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
26// THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
27// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
28// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
29// DEALINGS IN THE SOFTWARE.
30//
31// Description:
32//
33///////////////////////////////////////////////////////////////////////////////
34
35#define NOMINMAX
36
38#include <algorithm>
39#include <iostream>
40
41namespace Nektar
42{
43namespace Thread
44{
45
48 "ThreadManagerBoost", ThreadManagerBoost::Create,
49 "Threading using Boost.");
50
51/**
52 * @param numWorkers The number of threads to start (including master thread).
53 * @note Do not use, use factory instead.
54 */
56 : m_numThreads(numT), m_numWorkers(numT), m_masterQueue(),
57 m_masterQueueMutex(), m_masterActiveMutex(), m_masterQueueCondVar(),
58 m_masterActiveCondVar(), m_chunkSize(1), m_schedType(e_dynamic),
59 m_threadMap()
60{
61 using namespace std;
62 try
63 {
65 m_threadThreadList = new boost::thread *[m_numThreads];
68 }
69 catch (exception &e)
70 {
71 cerr << "Exception while allocating thread storage: " << e.what()
72 << endl;
73 abort();
74 }
75 unsigned int i = 0;
76 while (i < m_numThreads)
77 {
79 try
80 {
81 tw = new ThreadWorkerBoost(this, i);
82 }
83 catch (exception &e)
84 {
85 cerr << "Exception while allocating worker threads: " << e.what()
86 << endl;
87 abort();
88 }
89
90 m_threadList[i] = tw;
91 m_threadBusyList[i] = false;
92 m_threadActiveList[i] = true;
93
94 try
95 {
96 m_threadThreadList[i] = new boost::thread(boost::ref(*tw));
97 boost::thread::id id = m_threadThreadList[i]->get_id();
98 m_threadMap[id] = i;
99 }
100 catch (...)
101 {
102 std::cerr << "Exception while creating worker threads" << std::endl;
103 abort();
104 }
105 i++;
106 }
107 m_masterThreadId = boost::this_thread::get_id();
108 m_barrier = new boost::barrier(m_numWorkers > 0 ? m_numWorkers : 1);
109 m_type = "Threading with Boost";
110}
111
112/**
113 * Terminates all running threads (they will finish their current job),
114 * releases resources and destructs.
115 */
117{
118 // This is an immediate teardown. We attempt to kill everything.
119 // we daren't lock anything as we may cause a deadlock
120 for (unsigned int i = 0; i < m_numThreads; i++)
121 {
122 m_threadList[i]->Stop();
123 }
124
125 m_masterQueueCondVar.notify_all();
126 m_masterActiveCondVar.notify_all();
127 for (unsigned int i = 0; i < m_numThreads; i++)
128 {
129 m_threadThreadList[i]->join();
130 delete m_threadThreadList[i];
131 delete m_threadList[i];
132 }
133
134 delete[] m_threadList;
135 delete[] m_threadThreadList;
136 delete[] m_threadActiveList;
137 delete[] m_threadBusyList;
138 delete m_barrier;
139}
140
141/**
142 *
143 */
144void ThreadManagerBoost::v_QueueJobs(std::vector<ThreadJob *> &joblist)
145{
146 std::vector<ThreadJob *>::iterator it;
147 for (it = joblist.begin(); it < joblist.end(); ++it)
148 {
149 QueueJob(*it);
150 }
151}
152
153/*
154 *
155 */
157{
158 Lock masterQueueLock(m_masterQueueMutex); // locks the queue
159 m_masterQueue.push(job);
160 m_masterQueueCondVar.notify_all(); // alert a waiting thread.
161} // queue unlocked
162
163/**
164 *
165 */
167{
168 bool working = false;
169 Lock masterActiveLock(m_masterActiveMutex);
170 for (unsigned int i = 0; i < m_numWorkers; i++)
171 {
172 working = working || m_threadBusyList[i];
173 }
174 return working;
175}
176
177/**
178 *
179 */
181{
182 Lock masterQueueLock(m_masterQueueMutex); // locks the queue
183 m_chunkSize = std::max(chnk, 1U);
184}
185
186/**
187 *
188 */
190{
191 Lock masterQueueLock(m_masterQueueMutex); // locks the queue
192 m_schedType = s;
193}
194
195/**
196 *
197 */
199{
200 boost::thread::id id = boost::this_thread::get_id();
201 return (id != m_masterThreadId);
202}
203
204/**
205 *
206 */
208{
209 bool working;
210 Lock masterQueueLock(m_masterQueueMutex); // locks the queue
211 working = IsWorking();
212 m_masterActiveCondVar.notify_all();
213 m_masterQueueCondVar.notify_all();
214 while (!m_masterQueue.empty() || working)
215 {
216 // while waiting, master queue is unlocked
217 m_masterQueueCondVar.wait(masterQueueLock);
218 // on exiting wait master queue is locked again
219 working = IsWorking();
220 }
221}
222
223/**
224 *
225 */
227{
228 return m_numWorkers;
229}
230
231/**
232 *
233 */
235{
236 boost::thread::id id = boost::this_thread::get_id();
237 return m_threadMap[id];
238}
239
240/**
241 *
242 */
243void ThreadManagerBoost::SetNumWorkersImpl(const unsigned int num)
244{
245 Lock masterActiveLock(m_masterActiveMutex); // locks the active
246
247 if (m_numWorkers == num)
248 {
249 return;
250 }
251
252 delete m_barrier;
253 m_barrier = new boost::barrier(num > 0 ? num : 1);
254
255 m_numWorkers = num;
256 for (unsigned int i = 0; i < m_numThreads; i++)
257 {
258 m_threadActiveList[i] = i < m_numWorkers ? true : false;
259 }
260 m_masterActiveCondVar.notify_all();
261} // Lock on active released here
262
263/**
264 *
265 */
266void ThreadManagerBoost::v_SetNumWorkers(const unsigned int num)
267{
268 unsigned int n;
269 n = std::min(num, m_numThreads);
270 n = std::max(n, static_cast<unsigned int>(0));
272}
273
274/**
275 *
276 */
278{
280}
281
282/**
283 *
284 */
286{
287 return m_numThreads;
288}
289
290/**
291 *
292 */
294{
295 m_barrier->wait();
296}
297
298/**
299 *
300 */
301const std::string &ThreadManagerBoost::v_GetType() const
302{
303 return m_type;
304}
305
306/**
307 * @param threadManager Pointer to the ThreadManagerBoost that is controlling
308 * this worker.
309 * @param workerNum Unique number from 0..(number_of_threads - 1)
310 *
311 * Called by the ThreadManagerBoost instance.
312 */
314 unsigned int workerNum)
315 : m_threadManager(tm), m_workerQueue(), m_keepgoing(true),
316 m_threadNum(workerNum)
317{
318 // Nothing to see here
319}
320
321/**
322 * Winds up this thread's execution. Jobs in its queue are lost.
323 */
325{
326 if (m_keepgoing)
327 {
328 std::cerr << "Warning: ThreadWorker: " << m_threadNum
329 << "destroyed while running!" << std::endl;
330 }
331 // on destuction the m_workerQueue will be destructed and that
332 // will destruct any ThreadJobs still in there.
333}
334
335/**
336 *
337 */
339{
340 // Lock the master queue
341 Lock masterQueueLock(m_threadManager->m_masterQueueMutex);
344 while (m_threadManager->m_masterQueue.empty() && m_keepgoing)
345 {
346 // while waiting, master queue is unlocked
347 m_threadManager->m_masterQueueCondVar.wait(masterQueueLock);
348 // on exiting wait master queue is locked again
349 }
350 bool active;
351 {
352 Lock masterActiveLock(m_threadManager->m_masterActiveMutex);
354 }
355 if (active && m_keepgoing)
356 {
357 unsigned int numToLoad = GetNumToLoad();
358 while (m_workerQueue.size() < numToLoad &&
360 {
362 m_workerQueue.push(tj);
364 }
365 }
367} // lock on master queue released here
368
369/**
370 *
371 */
373{
374 unsigned int numToLoad = 0;
376 {
377 case e_guided:
378 numToLoad = std::max(
379 static_cast<unsigned long>(m_threadManager->m_chunkSize),
380 static_cast<unsigned long>(
382 (2 * m_threadManager->m_numWorkers + 1)));
383 break;
384
385 case e_dynamic:
386 numToLoad = m_threadManager->m_chunkSize;
387 break;
388
389 default:
390 NEKERROR(ErrorUtil::efatal, "Invalid value for SchedType.");
391 break;
392 }
393 return numToLoad;
394}
395
396/**
397 *
398 */
400{
401 Lock masterActiveLock(m_threadManager->m_masterActiveMutex);
402
404 {
405 // while waiting, master active is unlocked
406 m_threadManager->m_masterActiveCondVar.wait(masterActiveLock);
407 // on exiting wait master active is locked again
408 }
409}
410
411/**
412 *
413 */
415{
416 while (m_keepgoing)
417 {
419 LoadJobs();
420 RunJobs();
421 }
422} // exiting here should terminate the thread
423
424/**
425 *
426 */
428{
429 while (!m_workerQueue.empty() && m_keepgoing)
430 {
431 ThreadJob *tj;
432 try
433 {
434 tj = m_workerQueue.front();
436 tj->Run();
437 m_workerQueue.pop();
438 delete tj;
439 }
440 catch (...)
441 {
442 // something bad happened, probably time to die
443 // maybe signal ThreadManager
444 throw;
445 }
446 }
447}
448
449} // namespace Thread
450} /* namespace Nektar */
#define NEKERROR(type, msg)
Assert Level 0 – Fundamental assert which is used whether in FULLDEBUG, DEBUG or OPT compilation mode...
Definition: ErrorUtil.hpp:209
tKey RegisterCreatorFunction(tKey idKey, CreatorFunction classCreator, std::string pDesc="")
Register a class with the factory.
Definition: NekFactory.hpp:198
Base class for tasks to be sent to the ThreadManager to run.
Definition: Thread.h:97
void SetWorkerNum(unsigned int num)
Set number of worker threads.
Definition: Thread.cpp:76
virtual void Run()=0
Implementation of ThreadManager using Boost threads.
Definition: ThreadBoost.h:59
virtual void v_QueueJob(ThreadJob *job) override
ThreadWorkerBoost ** m_threadList
Definition: ThreadBoost.h:106
virtual const std::string & v_GetType() const override
virtual ~ThreadManagerBoost()
Shuts down threading.
virtual unsigned int v_GetMaxNumWorkers() override
virtual void v_SetChunkSize(unsigned int chnk) override
virtual void v_SetSchedType(SchedType s) override
boost::condition_variable m_masterQueueCondVar
Definition: ThreadBoost.h:104
virtual bool v_InThread() override
virtual unsigned int v_GetNumWorkers() override
boost::thread::id m_masterThreadId
Definition: ThreadBoost.h:108
const unsigned int m_numThreads
Definition: ThreadBoost.h:99
std::queue< ThreadJob * > m_masterQueue
Definition: ThreadBoost.h:101
static ThreadManagerSharedPtr Create(unsigned int numT)
Called by the factory method.
Definition: ThreadBoost.h:72
virtual void v_Wait() override
std::map< boost::thread::id, unsigned int > m_threadMap
Definition: ThreadBoost.h:114
virtual unsigned int v_GetWorkerNum() override
virtual void v_Hold() override
void SetNumWorkersImpl(const unsigned int num)
virtual void v_SetNumWorkers() override
virtual void v_QueueJobs(std::vector< ThreadJob * > &joblist) override
boost::condition_variable m_masterActiveCondVar
Definition: ThreadBoost.h:105
void QueueJob(ThreadJob *job)
Pass a single job to the master queue.
Definition: Thread.h:191
Implementation class for ThreadManagerBoost.
Definition: ThreadBoost.h:127
void Stop()
A signal to shut down.
Definition: ThreadBoost.h:155
std::queue< ThreadJob * > m_workerQueue
Definition: ThreadBoost.h:171
ThreadManagerBoost * m_threadManager
Definition: ThreadBoost.h:170
SchedType
Identifies the algorithm for scheduling.
Definition: Thread.h:66
ThreadManagerFactory & GetThreadManagerFactory()
Definition: Thread.cpp:49
boost::unique_lock< boost::mutex > Lock
Definition: ThreadBoost.h:51
The above copyright notice and this permission notice shall be included.
Definition: CoupledSolver.h:2