Nektar++
Thread.h
Go to the documentation of this file.
1 ///////////////////////////////////////////////////////////////////////////////
2 //
3 // File: Thread.h
4 //
5 // For more information, please see: http://www.nektar.info
6 //
7 // The MIT License
8 //
9 // Copyright (c) 2006 Division of Applied Mathematics, Brown University (USA),
10 // Department of Aeronautics, Imperial College London (UK), and Scientific
11 // Computing and Imaging Institute, University of Utah (USA).
12 //
13 // Permission is hereby granted, free of charge, to any person obtaining a
14 // copy of this software and associated documentation files (the "Software"),
15 // to deal in the Software without restriction, including without limitation
16 // the rights to use, copy, modify, merge, publish, distribute, sublicense,
17 // and/or sell copies of the Software, and to permit persons to whom the
18 // Software is furnished to do so, subject to the following conditions:
19 //
20 // The above copyright notice and this permission notice shall be included
21 // in all copies or substantial portions of the Software.
22 //
23 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
24 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
25 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
26 // THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
27 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
28 // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
29 // DEALINGS IN THE SOFTWARE.
30 //
31 // Description:
32 //
33 ///////////////////////////////////////////////////////////////////////////////
34 
35 #ifndef NEKTAR_LIB_UTILITIES_THREAD_H_
36 #define NEKTAR_LIB_UTILITIES_THREAD_H_
37 
38 #include <boost/thread/condition_variable.hpp>
39 #include <boost/thread/locks.hpp>
40 #include <boost/thread/mutex.hpp>
41 #include <boost/thread/thread.hpp>
42 #include <memory>
43 #include <queue>
44 #include <vector>
45 
47 
48 namespace Nektar
49 {
50 namespace Thread
51 {
52 
53 /**
54  * @brief Identifies the algorithm for scheduling.
55  *
56  * Currently there are two strategies to determine the number of jobs.
57  * e_static always loads m_chunkSize jobs onto each worker,
58  * e_guided uses a simple algorithm that loads a larger proportion of jobs
59  * onto the worker as the size of the queue list is large, but scales down
60  * to no fewer than m_chunkSize as the queue empties.
61  *
62  * @see ThreadManager::SetChunkSize()
63  *
64  */
66 {
68  e_dynamic
69 };
70 
71 class ThreadManager;
72 typedef std::shared_ptr<ThreadManager> ThreadManagerSharedPtr;
76 
77 /**
78  * @brief Base class for tasks to be sent to the ThreadManager to run.
79  *
80  * For a parallel region the overall task should be divided into tasklets, each
81  * capable of running independently of other tasklets, in any order. Each of
82  * these tasklets should be represented by an instance of a subclass of
83  * ThreadJob.
84  *
85  * Exactly how to partition the overall task will be problem dependent, but the
86  * ideal case is to have as many tasklets as there are worker threads (available
87  * through ThreadManager::GetNumWorkers() ) and for each tasklet to take the
88  * same amount of computational effort. Clearly, the tasklets should not
89  * overwrite one another's data, and data locations they are writing to should
90  * be sufficiently far apart to avoid cache ping pong.
91  *
92  * Subclasses may define as many variables and functions as are necessary.
93  * Instance of the subclass will be destructed once the ThreadManager has
94  * finished the Run() method.
95  */
96 class ThreadJob
97 {
98 public:
99  /// Base constructor
101  /// Base destructor.
103 
104  /**
105  * This method will be called when the task is loaded
106  * onto a worker thread and is ready to run. When Run
107  * has finished this instance will be destructed.
108  */
109  LIB_UTILITIES_EXPORT virtual void Run() = 0;
110 
111  /// Set number of worker threads.
112  LIB_UTILITIES_EXPORT void SetWorkerNum(unsigned int num);
113 
114 protected:
115  /**
116  * Returns an integer identifying the worker thread the
117  * job is running on. Value will be 0...N, where N is
118  * the number of active worker threads.
119  */
120  unsigned int GetWorkerNum();
121 
122 private:
123  unsigned int m_workerNum;
124 };
125 
126 /**
127  * @brief The interface class for the controller for worker threads and jobs.
128  *
129  * There may be multiple instances of implementations of this class.
130  * They are instantiated by ThreadMaster::CreateInstance(string, int).
131  * Use ThreadMaster::GetInstance(string)
132  * to get a pointer to the instance. Each ThreadManager is associated
133  * in ThreadMaster with a string.
134  *
135  * Jobs are sent to the worker threads by placing them in the
136  * queue with QueueJob() and QueueJobs(). Jobs may be run in
137  * indeterminate order. Jobs are permitted to run *immediately*
138  * they are queued. If this is not desired set the number of active
139  * workers to 0 until all jobs are queued.
140  *
141  * Jobs are taken from the master queue by active worker threads.
142  *
143  * We have the concept of the *master thread*. This is the thread
144  * that instantiates the ThreadManager, calls QueueJob(), Wait()
145  * and so on. This thread must be included in the number of threads
146  * started when the ThreadManager is created, so that no more than
147  * this number of threads are ever executing work at any time. The
148  * master thread is assumed to be working unless it is in Wait().
149  *
150  * Thus if the ThreadManager is called with CreateInstance(string s, int N)
151  * there should be N threads /em including the master thread. If the master
152  * thread calls Wait() (which causes the master thread to sleep until
153  * the job queue is empty) then the master thread should become available
154  * to run jobs from the queue.
155  *
156  * Only the master thread should call ThreadManager methods. Although
157  * it should always be possible for code to identify whether it's the
158  * master thread (through design) there is a InThread() method that returns
159  * true if the thread is a worker.
160  *
161  */
162 class ThreadManager : public std::enable_shared_from_this<ThreadManager>
163 {
164 public:
165  /// Destructor.
167  /**
168  * @brief Pass a list of tasklets to the master queue.
169  * @param joblist Vector of ThreadJob pointers.
170  *
171  * The list of jobs is copied into the master queue. Jobs may be
172  * available for running *immediately*, even before the list has been
173  * fully copied. This can have consequences for the scheduling. If
174  * this is an issue then suspend the workers with SetNumWorkers(0) until
175  * the jobs are queued.
176  *
177  * @see SchedType
178  */
179  LIB_UTILITIES_EXPORT void QueueJobs(std::vector<ThreadJob *> &joblist)
180  {
181  v_QueueJobs(joblist);
182  }
183  /**
184  * @brief Pass a single job to the master queue.
185  * @param job A pointer to a ThreadJob subclass.
186  *
187  * The job may become available for running immediately. If this is an
188  * issue then suspend the workers with SetNumWorkers(0) until the jobs
189  * are queued.
190  */
192  {
193  v_QueueJob(job);
194  }
195  /**
196  * @brief Return the number of active workers.
197  *
198  * Active workers are threads that are either running jobs
199  * or are waiting for jobs to be queued.
200  */
202  {
203  return v_GetNumWorkers();
204  }
205  /**
206  * @brief Returns the worker number of the executing thread.
207  *
208  * Returns an unsigned int between 0 and N-1 where N is the number of
209  * active worker threads. Repeated calls from within this thread will
210  * always return the same value and the value will be the same as
211  * returned from ThreadJob.GetWorkerNum(). The same thread will run a
212  * job until it finishes.
213  *
214  * Although if there are active threads then thread 0 is always one of
215  * them, it is possible that thread 0 does not run for a given set of
216  * jobs. For example, if there are 4 active threads and 3 jobs are
217  * submitted with a e_static scheduling strategy and a chunksize of 1,
218  * then it is possible that threads 1,2, and 3 pick up the jobs and
219  * thread 0 remains idle.
220  *
221  * Returns 0 if called by non-thread.
222  */
224  {
225  return v_GetWorkerNum();
226  }
227  /**
228  * @brief Sets the number of active workers.
229  * @param num The number of active workers.
230  *
231  * Active workers are threads that are either running jobs or are
232  * waiting for jobs to be queued.
233  *
234  * If num is greater than the maximum allowed number of active workers,
235  * then the maximum value will be used instead.
236  */
237  LIB_UTILITIES_EXPORT void SetNumWorkers(const unsigned int num)
238  {
239  v_SetNumWorkers(num);
240  }
241  /**
242  * @brief Sets the number of active workers to the maximum.
243  *
244  * Sets the number of active workers to the maximum available.
245  */
247  {
248  v_SetNumWorkers();
249  }
250  /**
251  * @brief Gets the maximum available number of threads.
252  * @return The maximum number of workers.
253  */
255  {
256  return v_GetMaxNumWorkers();
257  }
258  /**
259  * @brief Waits until all queued jobs are finished.
260  *
261  * If there are no jobs running or queued this method returns
262  * immediately. Otherwise it blocks until the queue is empty and the
263  * worker threads are idle.
264  *
265  * Implementations *must* ensure that trivial deadlocks are not possible
266  * from this method, that is, that this code:
267  * @code
268  * // assume ThreadManager* tm
269  * // assume SomeJob is subclass of ThreadJob
270  * // assume SomeJob job
271  *
272  * tm->SetNumWorkers(0);
273  * tm->QueueJob(job);
274  * tm->Wait();
275  * @endcode
276  * does not wait forever. Since the master thread is counted in the
277  * number of worker threads, implementations should increase the number
278  * of active workers by 1 on entering Wait().
279  */
281  {
282  v_Wait();
283  }
284  /**
285  * @brief Controls how many jobs are sent to each worker at a time.
286  *
287  * The exact meaning of this parameter depends on the current scheduling
288  * algorithm.
289  *
290  * @see SchedType
291  * @see SetSchedType()
292  */
293  LIB_UTILITIES_EXPORT void SetChunkSize(unsigned int chnk)
294  {
295  v_SetChunkSize(chnk);
296  }
297  /**
298  * @brief Sets the current scheduling algorithm.
299  * @see SetChunkSize()
300  */
302  {
303  v_SetSchedType(s);
304  }
305  /**
306  * @brief Indicates whether the code is in a worker thread or not.
307  * @return True if the caller is in a worker thread.
308  */
310  {
311  return v_InThread();
312  }
313  /**
314  * @brief A calling threads holds until all active threads call this
315  * method.
316  *
317  * When called, the calling thread will sleep until all active workers
318  * have called this method. Once all have done so all threads awake and
319  * continue execution.
320  *
321  * @note Behaviour is likely undefined if the number of active workers
322  * is altered after a thread has called this method. It is only safe to
323  * call SetNumWorkers() when no threads are holding.
324  */
326  {
327  v_Hold();
328  }
329  /**
330  * @brief Returns a description of the type of threading.
331  *
332  * E.g. "Threading with Boost"
333  */
334  LIB_UTILITIES_EXPORT const std::string &GetType()
335  {
336  return v_GetType();
337  }
338 
339  /// ThreadManager implementation.
341  {
342  return v_IsInitialised();
343  }
344 
345  inline int GetThrFromPartition(int pPartition)
346  {
347  return pPartition % GetMaxNumWorkers();
348  }
349 
350  inline int GetRankFromPartition(int pPartition)
351  {
352  return pPartition / GetMaxNumWorkers();
353  }
354 
355  inline int GetPartitionFromRankThr(int pRank, unsigned int pThr)
356  {
357  return pRank * GetMaxNumWorkers() + pThr;
358  }
359 
360 protected:
362  std::vector<ThreadJob *> &joblist) = 0;
363  LIB_UTILITIES_EXPORT virtual void v_QueueJob(ThreadJob *job) = 0;
364  LIB_UTILITIES_EXPORT virtual unsigned int v_GetNumWorkers() = 0;
365  LIB_UTILITIES_EXPORT virtual unsigned int v_GetWorkerNum() = 0;
367  const unsigned int num) = 0;
369  LIB_UTILITIES_EXPORT virtual unsigned int v_GetMaxNumWorkers() = 0;
370  LIB_UTILITIES_EXPORT virtual void v_Wait() = 0;
371  LIB_UTILITIES_EXPORT virtual void v_SetChunkSize(unsigned int chnk) = 0;
373  LIB_UTILITIES_EXPORT virtual bool v_InThread() = 0;
374  LIB_UTILITIES_EXPORT virtual void v_Hold() = 0;
375  LIB_UTILITIES_EXPORT virtual const std::string &v_GetType() const = 0;
376  LIB_UTILITIES_EXPORT virtual bool v_IsInitialised();
377 };
378 
379 typedef boost::unique_lock<boost::shared_mutex> WriteLock;
380 typedef boost::shared_lock<boost::shared_mutex> ReadLock;
381 
382 /**
383  * A class to manage multiple ThreadManagers. It also acts as a cut-out during
384  * static initialisation, where code attempts to grab a ThreadManager before any
385  * can have been initialised. When this happens the ThreadMaster creates a
386  * simple implementation of ThreadManager, ThreadStartupManager, which behaves
387  * as a single-threaded ThreadManager that fails if anything non-trivial is
388  * attempted. This means code should be cautious about caching the value of
389  * GetInstance(string), as it might change once that particular threading system
390  * is initialised.
391  *
392  * Multiple ThreadManagers should now be possible. Each is identified with a
393  * string and can be recovered by calling
394  * Thread::GetThreadMaster().GetInstance(string).
395  *
396  * ThreadMaster is thread-safe apart from CreateInstance. Call CreateInstance
397  * in single threaded code only.
398  */
400 {
401 private:
402  std::vector<ThreadManagerSharedPtr> m_threadManagers;
403  boost::shared_mutex m_mutex;
404  std::string m_threadingType;
405 
406 public:
408  {
411  };
412  /// Constructor
414  /// Destructor
416  /// Sets what ThreadManagers will be created in CreateInstance.
417  LIB_UTILITIES_EXPORT void SetThreadingType(const std::string &p_type);
418  /// Gets the ThreadManager associated with string s.
420  const ThreadManagerName t);
421  /// Creates an instance of a ThreadManager (which one is determined by
422  /// a previous call to SetThreadingType) and associates it with
423  /// the string s.
425  CreateInstance(const ThreadManagerName t, unsigned int nThr);
426 };
427 LIB_UTILITIES_EXPORT ThreadMaster &GetThreadMaster();
428 
429 /**
430  * @brief A default ThreadManager.
431  *
432  * This will be returned by ThreadMaster if a ThreadManager has not been
433  * initialised, such as if the code is still in static initialisation.
434  *
435  * This manager pretends to be a ThreadManager with 1 thread. It will
436  * cause an error if anything more than trivial functions are called.
437  */
439 {
440 public:
443  virtual ~ThreadStartupManager();
444 
445 protected:
446  virtual void v_QueueJobs(std::vector<ThreadJob *> &joblist) override;
447  virtual void v_QueueJob(ThreadJob *job) override;
448  virtual unsigned int v_GetNumWorkers() override;
449  virtual unsigned int v_GetWorkerNum() override;
450  virtual void v_SetNumWorkers(const unsigned int num) override;
451  virtual void v_SetNumWorkers() override;
452  virtual unsigned int v_GetMaxNumWorkers() override;
453  virtual void v_Wait() override;
454  virtual void v_SetChunkSize(unsigned int chnk) override;
455  virtual void v_SetSchedType(SchedType s) override;
456  virtual bool v_InThread() override;
457  virtual void v_Hold() override;
458  virtual bool v_IsInitialised() override;
459  virtual const std::string &v_GetType() const override;
460 
461 private:
462  // Do not allow assignment as m_type is const
464 
465  const std::string m_type;
466 };
467 
468 } // namespace Thread
469 
470 } // namespace Nektar
471 #endif /* THREAD_H_ */
#define LIB_UTILITIES_EXPORT
Provides a generic Factory class.
Definition: NekFactory.hpp:105
Base class for tasks to be sent to the ThreadManager to run.
Definition: Thread.h:97
virtual ~ThreadJob()
Base destructor.
Definition: Thread.cpp:66
void SetWorkerNum(unsigned int num)
Set number of worker threads.
Definition: Thread.cpp:76
virtual void Run()=0
ThreadJob()
Base constructor.
Definition: Thread.cpp:58
unsigned int m_workerNum
Definition: Thread.h:123
unsigned int GetWorkerNum()
Definition: Thread.cpp:84
The interface class for the controller for worker threads and jobs.
Definition: Thread.h:163
virtual unsigned int v_GetWorkerNum()=0
virtual void v_QueueJobs(std::vector< ThreadJob * > &joblist)=0
unsigned int GetNumWorkers()
Return the number of active workers.
Definition: Thread.h:201
void QueueJobs(std::vector< ThreadJob * > &joblist)
Pass a list of tasklets to the master queue.
Definition: Thread.h:179
bool InThread()
Indicates whether the code is in a worker thread or not.
Definition: Thread.h:309
int GetPartitionFromRankThr(int pRank, unsigned int pThr)
Definition: Thread.h:355
int GetThrFromPartition(int pPartition)
Definition: Thread.h:345
virtual void v_SetSchedType(SchedType s)=0
virtual ~ThreadManager()
Destructor.
Definition: Thread.cpp:106
void SetNumWorkers(const unsigned int num)
Sets the number of active workers.
Definition: Thread.h:237
void Wait()
Waits until all queued jobs are finished.
Definition: Thread.h:280
virtual unsigned int v_GetNumWorkers()=0
void SetSchedType(SchedType s)
Sets the current scheduling algorithm.
Definition: Thread.h:301
void QueueJob(ThreadJob *job)
Pass a single job to the master queue.
Definition: Thread.h:191
virtual void v_QueueJob(ThreadJob *job)=0
virtual void v_SetChunkSize(unsigned int chnk)=0
virtual void v_SetNumWorkers()=0
virtual unsigned int v_GetMaxNumWorkers()=0
const std::string & GetType()
Returns a description of the type of threading.
Definition: Thread.h:334
int GetRankFromPartition(int pPartition)
Definition: Thread.h:350
unsigned int GetMaxNumWorkers()
Gets the maximum available number of threads.
Definition: Thread.h:254
virtual bool v_IsInitialised()
Definition: Thread.cpp:95
void Hold()
A calling threads holds until all active threads call this method.
Definition: Thread.h:325
unsigned int GetWorkerNum()
Returns the worker number of the executing thread.
Definition: Thread.h:223
bool IsInitialised()
ThreadManager implementation.
Definition: Thread.h:340
void SetNumWorkers()
Sets the number of active workers to the maximum.
Definition: Thread.h:246
void SetChunkSize(unsigned int chnk)
Controls how many jobs are sent to each worker at a time.
Definition: Thread.h:293
virtual void v_SetNumWorkers(const unsigned int num)=0
virtual const std::string & v_GetType() const =0
~ThreadMaster()
Destructor.
Definition: Thread.cpp:123
void SetThreadingType(const std::string &p_type)
Sets what ThreadManagers will be created in CreateInstance.
Definition: Thread.cpp:147
ThreadManagerSharedPtr CreateInstance(const ThreadManagerName t, unsigned int nThr)
Creates an instance of a ThreadManager (which one is determined by a previous call to SetThreadingTyp...
Definition: Thread.cpp:183
ThreadManagerSharedPtr & GetInstance(const ThreadManagerName t)
Gets the ThreadManager associated with string s.
Definition: Thread.cpp:167
ThreadMaster()
Constructor.
Definition: Thread.cpp:114
std::vector< ThreadManagerSharedPtr > m_threadManagers
Definition: Thread.h:402
boost::shared_mutex m_mutex
Definition: Thread.h:403
std::string m_threadingType
Definition: Thread.h:404
A default ThreadManager.
Definition: Thread.h:439
virtual bool v_IsInitialised() override
Definition: Thread.cpp:317
virtual unsigned int v_GetWorkerNum() override
Definition: Thread.cpp:240
ThreadStartupManager & operator=(const ThreadStartupManager &src)
ThreadDefaultManager copy constructor.
Definition: Thread.cpp:333
virtual unsigned int v_GetMaxNumWorkers() override
Definition: Thread.cpp:265
ThreadStartupManager()
ThreadDefaultManager.
Definition: Thread.cpp:196
virtual void v_SetSchedType(SchedType s) override
Definition: Thread.cpp:291
virtual void v_SetNumWorkers() override
Definition: Thread.cpp:257
virtual void v_QueueJob(ThreadJob *job) override
Definition: Thread.cpp:222
virtual bool v_InThread() override
Definition: Thread.cpp:301
ThreadStartupManager(const ThreadStartupManager &src)=default
virtual void v_QueueJobs(std::vector< ThreadJob * > &joblist) override
Definition: Thread.cpp:212
virtual unsigned int v_GetNumWorkers() override
Definition: Thread.cpp:232
virtual void v_Hold() override
Definition: Thread.cpp:309
virtual void v_SetChunkSize(unsigned int chnk) override
Definition: Thread.cpp:281
virtual void v_Wait() override
Definition: Thread.cpp:273
virtual const std::string & v_GetType() const override
Definition: Thread.cpp:325
SchedType
Identifies the algorithm for scheduling.
Definition: Thread.h:66
LibUtilities::NekFactory< std::string, ThreadManager, unsigned int > ThreadManagerFactory
Definition: Thread.h:74
ThreadManagerFactory & GetThreadManagerFactory()
Definition: Thread.cpp:49
std::shared_ptr< ThreadManager > ThreadManagerSharedPtr
Definition: Thread.h:71
boost::shared_lock< boost::shared_mutex > ReadLock
Definition: Thread.h:380
ThreadMaster & GetThreadMaster()
Definition: Thread.cpp:132
boost::unique_lock< boost::shared_mutex > WriteLock
Definition: Thread.h:379
The above copyright notice and this permission notice shall be included.
Definition: CoupledSolver.h:2