Nektar++
ThreadStd.cpp
Go to the documentation of this file.
1///////////////////////////////////////////////////////////////////////////////
2//
3// File: ThreadStd.cpp
4//
5// For more information, please see: http://www.nektar.info
6//
7// The MIT License
8//
9// Copyright (c) 2006 Division of Applied Mathematics, Brown University (USA),
10// Department of Aeronautics, Imperial College London (UK), and Scientific
11// Computing and Imaging Institute, University of Utah (USA).
12//
13// Permission is hereby granted, free of charge, to any person obtaining a
14// copy of this software and associated documentation files (the "Software"),
15// to deal in the Software without restriction, including without limitation
16// the rights to use, copy, modify, merge, publish, distribute, sublicense,
17// and/or sell copies of the Software, and to permit persons to whom the
18// Software is furnished to do so, subject to the following conditions:
19//
20// The above copyright notice and this permission notice shall be included
21// in all copies or substantial portions of the Software.
22//
23// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
24// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
25// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
26// THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
27// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
28// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
29// DEALINGS IN THE SOFTWARE.
30//
31// Description:
32//
33///////////////////////////////////////////////////////////////////////////////
34
35#define NOMINMAX
36
37#include <algorithm>
38#include <iostream>
39
41
42namespace Nektar::Thread
43{
44
47 "ThreadManagerStd", ThreadManagerStd::Create,
48 "Threading using std::thread.");
49
50/**
51 * @param numWorkers The number of threads to start (including master thread).
52 * @note Do not use, use factory instead.
53 */
55 : m_numThreads(numT), m_numWorkers(numT), m_masterQueue(),
56 m_masterQueueMutex(), m_masterActiveMutex(), m_masterQueueCondVar(),
57 m_masterActiveCondVar(), m_chunkSize(1), m_schedType(e_dynamic),
58 m_threadMap()
59{
60 using namespace std;
61 try
62 {
64 m_threadThreadList = new std::thread *[m_numThreads];
67 }
68 catch (exception &e)
69 {
70 cerr << "Exception while allocating thread storage: " << e.what()
71 << endl;
72 abort();
73 }
74 unsigned int i = 0;
75 while (i < m_numThreads)
76 {
78 try
79 {
80 tw = new ThreadWorkerStd(this, i);
81 }
82 catch (exception &e)
83 {
84 cerr << "Exception while allocating worker threads: " << e.what()
85 << endl;
86 abort();
87 }
88
89 m_threadList[i] = tw;
90 m_threadBusyList[i] = false;
91 m_threadActiveList[i] = true;
92
93 try
94 {
95 m_threadThreadList[i] = new std::thread(std::ref(*tw));
96 std::thread::id id = m_threadThreadList[i]->get_id();
97 m_threadMap[id] = i;
98 }
99 catch (...)
100 {
101 std::cerr << "Exception while creating worker threads" << std::endl;
102 abort();
103 }
104 i++;
105 }
106 m_masterThreadId = std::this_thread::get_id();
108 m_type = "Threading with Std";
109}
110
111/**
112 * Terminates all running threads (they will finish their current job),
113 * releases resources and destructs.
114 */
116{
117 // This is an immediate teardown. We attempt to kill everything.
118 // we daren't lock anything as we may cause a deadlock
119 for (unsigned int i = 0; i < m_numThreads; i++)
120 {
121 m_threadList[i]->Stop();
122 }
123
124 m_masterQueueCondVar.notify_all();
125 m_masterActiveCondVar.notify_all();
126 for (unsigned int i = 0; i < m_numThreads; i++)
127 {
128 m_threadThreadList[i]->join();
129 delete m_threadThreadList[i];
130 delete m_threadList[i];
131 }
132
133 delete[] m_threadList;
134 delete[] m_threadThreadList;
135 delete[] m_threadActiveList;
136 delete[] m_threadBusyList;
137 delete m_barrier;
138}
139
140/**
141 *
142 */
143void ThreadManagerStd::v_QueueJobs(std::vector<ThreadJob *> &joblist)
144{
145 std::vector<ThreadJob *>::iterator it;
146 for (it = joblist.begin(); it < joblist.end(); ++it)
147 {
148 QueueJob(*it);
149 }
150}
151
152/*
153 *
154 */
156{
157 Lock masterQueueLock(m_masterQueueMutex); // locks the queue
158 m_masterQueue.push(job);
159 m_masterQueueCondVar.notify_all(); // alert a waiting thread.
160} // queue unlocked
161
162/**
163 *
164 */
166{
167 bool working = false;
168 Lock masterActiveLock(m_masterActiveMutex);
169 for (unsigned int i = 0; i < m_numWorkers; i++)
170 {
171 working = working || m_threadBusyList[i];
172 }
173 return working;
174}
175
176/**
177 *
178 */
179void ThreadManagerStd::v_SetChunkSize(unsigned int chnk)
180{
181 Lock masterQueueLock(m_masterQueueMutex); // locks the queue
182 m_chunkSize = std::max(chnk, 1U);
183}
184
185/**
186 *
187 */
189{
190 Lock masterQueueLock(m_masterQueueMutex); // locks the queue
191 m_schedType = s;
192}
193
194/**
195 *
196 */
198{
199 std::thread::id id = std::this_thread::get_id();
200 return (id != m_masterThreadId);
201}
202
203/**
204 *
205 */
207{
208 bool working;
209 Lock masterQueueLock(m_masterQueueMutex); // locks the queue
210 working = IsWorking();
211 m_masterActiveCondVar.notify_all();
212 m_masterQueueCondVar.notify_all();
213 while (!m_masterQueue.empty() || working)
214 {
215 // while waiting, master queue is unlocked
216 m_masterQueueCondVar.wait(masterQueueLock);
217 // on exiting wait master queue is locked again
218 working = IsWorking();
219 }
220}
221
222/**
223 *
224 */
226{
227 return m_numWorkers;
228}
229
230/**
231 *
232 */
234{
235 std::thread::id id = std::this_thread::get_id();
236 return m_threadMap[id];
237}
238
239/**
240 *
241 */
242void ThreadManagerStd::SetNumWorkersImpl(const unsigned int num)
243{
244 Lock masterActiveLock(m_masterActiveMutex); // locks the active
245
246 if (m_numWorkers == num)
247 {
248 return;
249 }
250
251 delete m_barrier;
252 m_barrier = new Barrier(num > 0 ? num : 1);
253
254 m_numWorkers = num;
255 for (unsigned int i = 0; i < m_numThreads; i++)
256 {
257 m_threadActiveList[i] = i < m_numWorkers ? true : false;
258 }
259 m_masterActiveCondVar.notify_all();
260} // Lock on active released here
261
262/**
263 *
264 */
265void ThreadManagerStd::v_SetNumWorkers(const unsigned int num)
266{
267 unsigned int n;
268 n = std::min(num, m_numThreads);
269 n = std::max(n, static_cast<unsigned int>(0));
271}
272
273/**
274 *
275 */
277{
279}
280
281/**
282 *
283 */
285{
286 return m_numThreads;
287}
288
289/**
290 *
291 */
293{
294 m_barrier->Wait();
295}
296
297/**
298 *
299 */
300const std::string &ThreadManagerStd::v_GetType() const
301{
302 return m_type;
303}
304
305/**
306 * @param threadManager Pointer to the ThreadManagerStd that is controlling
307 * this worker.
308 * @param workerNum Unique number from 0..(number_of_threads - 1)
309 *
310 * Called by the ThreadManagerStd instance.
311 */
313 : m_threadManager(tm), m_workerQueue(), m_keepgoing(true),
314 m_threadNum(workerNum)
315{
316 // Nothing to see here
317}
318
319/**
320 * Winds up this thread's execution. Jobs in its queue are lost.
321 */
323{
324 if (m_keepgoing)
325 {
326 std::cerr << "Warning: ThreadWorker: " << m_threadNum
327 << "destroyed while running!" << std::endl;
328 }
329 // on destuction the m_workerQueue will be destructed and that
330 // will destruct any ThreadJobs still in there.
331}
332
333/**
334 *
335 */
337{
338 // Lock the master queue
339 Lock masterQueueLock(m_threadManager->m_masterQueueMutex);
342 while (m_threadManager->m_masterQueue.empty() && m_keepgoing)
343 {
344 // while waiting, master queue is unlocked
345 m_threadManager->m_masterQueueCondVar.wait(masterQueueLock);
346 // on exiting wait master queue is locked again
347 }
348 bool active;
349 {
350 Lock masterActiveLock(m_threadManager->m_masterActiveMutex);
352 }
353 if (active && m_keepgoing)
354 {
355 unsigned int numToLoad = GetNumToLoad();
356 while (m_workerQueue.size() < numToLoad &&
358 {
360 m_workerQueue.push(tj);
362 }
363 }
365} // lock on master queue released here
366
367/**
368 *
369 */
371{
372 unsigned int numToLoad = 0;
374 {
375 case e_guided:
376 numToLoad = std::max(
377 static_cast<unsigned long>(m_threadManager->m_chunkSize),
378 static_cast<unsigned long>(
380 (2 * m_threadManager->m_numWorkers + 1)));
381 break;
382
383 case e_dynamic:
384 numToLoad = m_threadManager->m_chunkSize;
385 break;
386
387 default:
388 NEKERROR(ErrorUtil::efatal, "Invalid value for SchedType.");
389 break;
390 }
391 return numToLoad;
392}
393
394/**
395 *
396 */
398{
399 Lock masterActiveLock(m_threadManager->m_masterActiveMutex);
400
402 {
403 // while waiting, master active is unlocked
404 m_threadManager->m_masterActiveCondVar.wait(masterActiveLock);
405 // on exiting wait master active is locked again
406 }
407}
408
409/**
410 *
411 */
413{
414 while (m_keepgoing)
415 {
417 LoadJobs();
418 RunJobs();
419 }
420} // exiting here should terminate the thread
421
422/**
423 *
424 */
426{
427 while (!m_workerQueue.empty() && m_keepgoing)
428 {
429 ThreadJob *tj;
430 try
431 {
432 tj = m_workerQueue.front();
434 tj->Run();
435 m_workerQueue.pop();
436 delete tj;
437 }
438 catch (...)
439 {
440 // something bad happened, probably time to die
441 // maybe signal ThreadManager
442 throw;
443 }
444 }
445}
446
447} // namespace Nektar::Thread
#define NEKERROR(type, msg)
Assert Level 0 – Fundamental assert which is used whether in FULLDEBUG, DEBUG or OPT compilation mode...
Definition: ErrorUtil.hpp:202
tKey RegisterCreatorFunction(tKey idKey, CreatorFunction classCreator, std::string pDesc="")
Register a class with the factory.
Definition: NekFactory.hpp:197
Base class for tasks to be sent to the ThreadManager to run.
Definition: Thread.h:94
void SetWorkerNum(unsigned int num)
Set number of worker threads.
Definition: Thread.cpp:72
virtual void Run()=0
void QueueJob(ThreadJob *job)
Pass a single job to the master queue.
Definition: Thread.h:188
Lightweight barrier class.
Definition: ThreadStd.h:61
Implementation of ThreadManager using std::thread.
Definition: ThreadStd.h:58
void v_SetSchedType(SchedType s) override
Definition: ThreadStd.cpp:188
~ThreadManagerStd() override
Shuts down threading.
Definition: ThreadStd.cpp:115
const std::string & v_GetType() const override
Definition: ThreadStd.cpp:300
std::map< std::thread::id, unsigned int > m_threadMap
Definition: ThreadStd.h:146
void v_QueueJob(ThreadJob *job) override
Definition: ThreadStd.cpp:155
ThreadWorkerStd ** m_threadList
Definition: ThreadStd.h:138
void v_QueueJobs(std::vector< ThreadJob * > &joblist) override
Definition: ThreadStd.cpp:143
static ThreadManagerSharedPtr Create(unsigned int numT)
Called by the factory method.
Definition: ThreadStd.h:104
std::thread ** m_threadThreadList
Definition: ThreadStd.h:139
std::thread::id m_masterThreadId
Definition: ThreadStd.h:140
unsigned int v_GetNumWorkers() override
Definition: ThreadStd.cpp:225
const unsigned int m_numThreads
Definition: ThreadStd.h:131
std::queue< ThreadJob * > m_masterQueue
Definition: ThreadStd.h:133
std::condition_variable m_masterQueueCondVar
Definition: ThreadStd.h:136
std::condition_variable m_masterActiveCondVar
Definition: ThreadStd.h:137
void SetNumWorkersImpl(const unsigned int num)
Definition: ThreadStd.cpp:242
unsigned int v_GetMaxNumWorkers() override
Definition: ThreadStd.cpp:284
void v_SetChunkSize(unsigned int chnk) override
Definition: ThreadStd.cpp:179
unsigned int v_GetWorkerNum() override
Definition: ThreadStd.cpp:233
static std::string className
Definition: ThreadStd.h:147
Implementation class for ThreadManagerStd.
Definition: ThreadStd.h:159
std::queue< ThreadJob * > m_workerQueue
Definition: ThreadStd.h:201
ThreadManagerStd * m_threadManager
Definition: ThreadStd.h:200
void Stop()
A signal to shut down.
Definition: ThreadStd.h:185
std::unique_lock< std::mutex > Lock
Definition: ThreadStd.h:50
SchedType
Identifies the algorithm for scheduling.
Definition: Thread.h:63
ThreadManagerFactory & GetThreadManagerFactory()
Definition: Thread.cpp:45