MythTV  master
mthreadpool.cpp
Go to the documentation of this file.
1 /* -*- Mode: c++ -*-
2  *
3  * Class MThreadPool
4  *
5  * Copyright (C) Daniel Kristjansson 2011
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
75 // C++ headers
76 #include <algorithm>
77 using namespace std;
78 
79 // Qt headers
80 #include <QCoreApplication>
81 #include <QList>
82 #include <QMap>
83 #include <QMutex>
84 #include <QMutexLocker>
85 #include <QPair>
86 #include <QRunnable>
87 #include <QSet>
88 #include <QWaitCondition>
89 #include <utility>
90 
91 // MythTV headers
92 #include "mthreadpool.h"
93 #include "mythlogging.h"
94 #include "mythtimer.h"
95 #include "logging.h"
96 #include "mthread.h"
97 #include "mythdb.h"
98 
99 typedef QPair<QRunnable*,QString> MPoolEntry;
100 typedef QList<MPoolEntry> MPoolQueue;
101 typedef QMap<int, MPoolQueue> MPoolQueues;
102 
103 class MPoolThread : public MThread
104 {
105  public:
107  MThread("PT"), m_pool(pool), m_expiry_timeout(timeout)
108  {
109  QMutexLocker locker(&s_lock);
110  setObjectName(QString("PT%1").arg(s_thread_num));
111  s_thread_num++;
112  }
113 
114  void run(void) override // MThread
115  {
116  RunProlog();
117 
118  MythTimer t;
119  t.start();
120  QMutexLocker locker(&m_lock);
121  while (true)
122  {
123  if (m_do_run && !m_runnable)
124  m_wait.wait(locker.mutex(), m_expiry_timeout+1);
125 
126  if (!m_runnable)
127  {
128  m_do_run = false;
129 
130  locker.unlock();
131  m_pool.NotifyDone(this);
132  locker.relock();
133  break;
134  }
135 
136  if (!m_runnable_name.isEmpty())
137  loggingRegisterThread(m_runnable_name);
138 
139  bool autodelete = m_runnable->autoDelete();
140  m_runnable->run();
141  if (autodelete)
142  delete m_runnable;
143  if (m_reserved)
144  m_pool.ReleaseThread();
145  m_reserved = false;
146  m_runnable = nullptr;
147 
149  loggingRegisterThread(objectName());
150 
151  GetMythDB()->GetDBManager()->PurgeIdleConnections(false);
152  qApp->processEvents();
153  qApp->sendPostedEvents(nullptr, QEvent::DeferredDelete);
154 
155  t.start();
156 
157  if (m_do_run)
158  {
159  locker.unlock();
160  m_pool.NotifyAvailable(this);
161  locker.relock();
162  }
163  else
164  {
165  locker.unlock();
166  m_pool.NotifyDone(this);
167  locker.relock();
168  break;
169  }
170  }
171 
172  RunEpilog();
173  }
174 
175  bool SetRunnable(QRunnable *runnable, QString runnableName,
176  bool reserved)
177  {
178  QMutexLocker locker(&m_lock);
179  if (m_do_run && (m_runnable == nullptr))
180  {
181  m_runnable = runnable;
182  m_runnable_name = std::move(runnableName);
183  m_reserved = reserved;
184  m_wait.wakeAll();
185  return true;
186  }
187  return false;
188  }
189 
190  void Shutdown(void)
191  {
192  QMutexLocker locker(&m_lock);
193  m_do_run = false;
194  m_wait.wakeAll();
195  }
196 
197  QMutex m_lock;
198  QWaitCondition m_wait;
201  bool m_do_run {true};
203  bool m_reserved {false};
204 
205  static QMutex s_lock;
207 };
208 QMutex MPoolThread::s_lock;
210 
212 
214 {
215  public:
216  explicit MThreadPoolPrivate(const QString &name) :
217  m_name(name)
218  {
219  }
220 
222  {
223  return max(m_max_thread_count,1) + m_reserve_thread;
224  }
225 
226  mutable QMutex m_lock;
227  QString m_name;
228  QWaitCondition m_wait;
229  bool m_running {true};
230  int m_expiry_timeout {120 * 1000};
231  int m_max_thread_count {QThread::idealThreadCount()};
232  int m_reserve_thread {0};
233 
235  QSet<MPoolThread*> m_avail_threads;
236  QSet<MPoolThread*> m_running_threads;
237  QList<MPoolThread*> m_delete_threads;
238 
239  static QMutex s_pool_lock;
241  static QList<MThreadPool*> s_all_pools;
242 };
243 
244 QMutex MThreadPoolPrivate::s_pool_lock(QMutex::Recursive);
246 QList<MThreadPool*> MThreadPoolPrivate::s_all_pools;
247 
249 
251  m_priv(new MThreadPoolPrivate(name))
252 {
253  QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
254  MThreadPoolPrivate::s_all_pools.push_back(this);
255 }
256 
258 {
259  Stop();
261  {
262  QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
263  MThreadPoolPrivate::s_all_pools.removeAll(this);
264  }
265  delete m_priv;
266  m_priv = nullptr;
267 }
268 
270 {
271  QMutexLocker locker(&m_priv->m_lock);
272  m_priv->m_running = false;
273  QSet<MPoolThread*>::iterator it = m_priv->m_avail_threads.begin();
274  for (; it != m_priv->m_avail_threads.end(); ++it)
275  (*it)->Shutdown();
276  it = m_priv->m_running_threads.begin();
277  for (; it != m_priv->m_running_threads.end(); ++it)
278  (*it)->Shutdown();
279  m_priv->m_wait.wakeAll();
280 }
281 
283 {
284  waitForDone();
285 
286  QMutexLocker locker(&m_priv->m_lock);
287  QSet<MPoolThread*>::iterator it = m_priv->m_avail_threads.begin();
288  for (; it != m_priv->m_avail_threads.end(); ++it)
289  {
290  m_priv->m_delete_threads.push_front(*it);
291  }
292  m_priv->m_avail_threads.clear();
293 
294  while (!m_priv->m_delete_threads.empty())
295  {
296  MPoolThread *thread = m_priv->m_delete_threads.back();
297  locker.unlock();
298 
299  thread->wait();
300 
301  locker.relock();
302  delete thread;
303  if (m_priv->m_delete_threads.back() == thread)
304  m_priv->m_delete_threads.pop_back();
305  else
306  m_priv->m_delete_threads.removeAll(thread);
307  }
308 }
309 
311 {
312  QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
314  MThreadPoolPrivate::s_pool = new MThreadPool("GlobalPool");
316 }
317 
319 {
320  QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
321  QList<MThreadPool*>::iterator it;
322  for (it = MThreadPoolPrivate::s_all_pools.begin();
323  it != MThreadPoolPrivate::s_all_pools.end(); ++it)
324  {
325  (*it)->Stop();
326  }
327 }
328 
330 {
331  QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
332  QList<MThreadPool*>::iterator it;
333  for (it = MThreadPoolPrivate::s_all_pools.begin();
334  it != MThreadPoolPrivate::s_all_pools.end(); ++it)
335  {
336  (*it)->Stop();
337  }
338  for (it = MThreadPoolPrivate::s_all_pools.begin();
339  it != MThreadPoolPrivate::s_all_pools.end(); ++it)
340  {
341  (*it)->DeletePoolThreads();
342  }
343 }
344 
345 void MThreadPool::start(QRunnable *runnable, const QString& debugName, int priority)
346 {
347  QMutexLocker locker(&m_priv->m_lock);
348  if (TryStartInternal(runnable, debugName, false))
349  return;
350 
351  MPoolQueues::iterator it = m_priv->m_run_queues.find(priority);
352  if (it != m_priv->m_run_queues.end())
353  {
354  (*it).push_back(MPoolEntry(runnable,debugName));
355  }
356  else
357  {
358  MPoolQueue list;
359  list.push_back(MPoolEntry(runnable,debugName));
360  m_priv->m_run_queues[priority] = list;
361  }
362 }
363 
365  QRunnable *runnable, QString debugName, int waitForAvailMS)
366 {
367  QMutexLocker locker(&m_priv->m_lock);
368  if (waitForAvailMS > 0 && m_priv->m_avail_threads.empty() &&
370  {
371  MythTimer t;
372  t.start();
373  int left = waitForAvailMS - t.elapsed();
374  while (left > 0 && m_priv->m_avail_threads.empty() &&
376  {
377  m_priv->m_wait.wait(locker.mutex(), left);
378  left = waitForAvailMS - t.elapsed();
379  }
380  }
381  TryStartInternal(runnable, std::move(debugName), true);
382 }
383 
384 
385 bool MThreadPool::tryStart(QRunnable *runnable, QString debugName)
386 {
387  QMutexLocker locker(&m_priv->m_lock);
388  return TryStartInternal(runnable, std::move(debugName), false);
389 }
390 
392  QRunnable *runnable, const QString& debugName, bool reserved)
393 {
394  if (!m_priv->m_running)
395  return false;
396 
397  while (!m_priv->m_delete_threads.empty())
398  {
399  m_priv->m_delete_threads.back()->wait();
400  delete m_priv->m_delete_threads.back();
401  m_priv->m_delete_threads.pop_back();
402  }
403 
404  while (m_priv->m_avail_threads.begin() != m_priv->m_avail_threads.end())
405  {
406  MPoolThread *thread = *m_priv->m_avail_threads.begin();
407  m_priv->m_avail_threads.erase(m_priv->m_avail_threads.begin());
408  m_priv->m_running_threads.insert(thread);
409  if (reserved)
411  if (thread->SetRunnable(runnable, debugName, reserved))
412  {
413  return true;
414  }
415 
416  if (reserved)
418  thread->Shutdown();
419  m_priv->m_running_threads.remove(thread);
420  m_priv->m_delete_threads.push_front(thread);
421  }
422 
423  if (reserved ||
425  {
426  if (reserved)
428  MPoolThread *thread = new MPoolThread(*this, m_priv->m_expiry_timeout);
429  m_priv->m_running_threads.insert(thread);
430  thread->SetRunnable(runnable, debugName, reserved);
431  thread->start();
432  if (thread->isRunning())
433  {
434  return true;
435  }
436 
437  // Thread failed to run, OOM?
438  // QThread will print an error, so we don't have to
439  if (reserved)
441  thread->Shutdown();
442  m_priv->m_running_threads.remove(thread);
443  m_priv->m_delete_threads.push_front(thread);
444  }
445 
446  return false;
447 }
448 
450 {
451  QMutexLocker locker(&m_priv->m_lock);
452 
453  if (!m_priv->m_running)
454  {
455  m_priv->m_running_threads.remove(thread);
456  thread->Shutdown();
457  m_priv->m_delete_threads.push_front(thread);
458  m_priv->m_wait.wakeAll();
459  return;
460  }
461 
462  MPoolQueues::iterator it = m_priv->m_run_queues.begin();
463  if (it == m_priv->m_run_queues.end())
464  {
465  m_priv->m_running_threads.remove(thread);
466  m_priv->m_avail_threads.insert(thread);
467  m_priv->m_wait.wakeAll();
468  return;
469  }
470 
471  MPoolEntry e = (*it).front();
472  if (!thread->SetRunnable(e.first, e.second, false))
473  {
474  m_priv->m_running_threads.remove(thread);
475  m_priv->m_wait.wakeAll();
476  if (!TryStartInternal(e.first, e.second, false))
477  {
478  thread->Shutdown();
479  m_priv->m_delete_threads.push_front(thread);
480  return;
481  }
482  thread->Shutdown();
483  m_priv->m_delete_threads.push_front(thread);
484  }
485 
486  (*it).pop_front();
487  if ((*it).empty())
488  m_priv->m_run_queues.erase(it);
489 }
490 
492 {
493  QMutexLocker locker(&m_priv->m_lock);
494  m_priv->m_running_threads.remove(thread);
495  m_priv->m_avail_threads.remove(thread);
496  if (!m_priv->m_delete_threads.contains(thread))
497  m_priv->m_delete_threads.push_front(thread);
498  m_priv->m_wait.wakeAll();
499 }
500 
502 {
503  QMutexLocker locker(&m_priv->m_lock);
504  return m_priv->m_expiry_timeout;
505 }
506 
507 void MThreadPool::setExpiryTimeout(int expiryTimeout)
508 {
509  QMutexLocker locker(&m_priv->m_lock);
511 }
512 
514 {
515  QMutexLocker locker(&m_priv->m_lock);
516  return m_priv->m_max_thread_count;
517 }
518 
519 void MThreadPool::setMaxThreadCount(int maxThreadCount)
520 {
521  QMutexLocker locker(&m_priv->m_lock);
523 }
524 
526 {
527  QMutexLocker locker(&m_priv->m_lock);
528  return m_priv->m_avail_threads.size() + m_priv->m_running_threads.size();
529 }
530 
531 /*
532 void MThreadPool::reserveThread(void)
533 {
534  QMutexLocker locker(&m_priv->m_lock);
535  m_priv->m_reserve_thread++;
536 }
537 
538 void MThreadPool::releaseThread(void)
539 {
540  QMutexLocker locker(&m_priv->m_lock);
541  if (m_priv->m_reserve_thread > 0)
542  m_priv->m_reserve_thread--;
543 }
544 */
545 
547 {
548  QMutexLocker locker(&m_priv->m_lock);
549  if (m_priv->m_reserve_thread > 0)
551 }
552 
553 #if 0
554 static void print_set(QString title, QSet<MPoolThread*> set)
555 {
556  LOG(VB_GENERAL, LOG_INFO, title);
557  QSet<MPoolThread*>::iterator it = set.begin();
558  for (; it != set.end(); ++it)
559  {
560  LOG(VB_GENERAL, LOG_INFO, QString(" : 0x%1")
561  .arg((quint64)(*it),0,16));
562  }
563  LOG(VB_GENERAL, LOG_INFO, "");
564 }
565 #endif
566 
568 {
569  QMutexLocker locker(&m_priv->m_lock);
570  while (true)
571  {
572  while (!m_priv->m_delete_threads.empty())
573  {
574  m_priv->m_delete_threads.back()->wait();
575  delete m_priv->m_delete_threads.back();
576  m_priv->m_delete_threads.pop_back();
577  }
578 
579  if (m_priv->m_running && !m_priv->m_run_queues.empty())
580  {
581  m_priv->m_wait.wait(locker.mutex());
582  continue;
583  }
584 
585  QSet<MPoolThread*> working = m_priv->m_running_threads;
586  working = working.subtract(m_priv->m_avail_threads);
587  if (working.empty())
588  break;
589  m_priv->m_wait.wait(locker.mutex());
590  }
591 }
592 
593 /* vim: set expandtab tabstop=4 shiftwidth=4: */
MThreadPoolPrivate * m_priv
Definition: mthreadpool.h:56
void setMaxThreadCount(int maxThreadCount)
int maxThreadCount(void) const
void start(QThread::Priority=QThread::InheritPriority)
Tell MThread to start running the thread in the near future.
Definition: mthread.cpp:294
This is a wrapper around QThread that does several additional things.
Definition: mthread.h:46
A QElapsedTimer based timer to replace use of QTime as a timer.
Definition: mythtimer.h:13
static QList< MThreadPool * > s_all_pools
void startReserved(QRunnable *runnable, QString debugName, int waitForAvailMS=0)
bool SetRunnable(QRunnable *runnable, QString runnableName, bool reserved)
void Stop(void)
void NotifyAvailable(MPoolThread *)
bool wait(unsigned long time=ULONG_MAX)
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:311
QMap< int, MPoolQueue > MPoolQueues
int GetRealMaxThread(void)
MPoolThread(MThreadPool &pool, int timeout)
QPair< QRunnable *, QString > MPoolEntry
Definition: mthreadpool.cpp:99
QList< MPoolEntry > MPoolQueue
static void ShutdownAllPools(void)
void ReleaseThread(void)
static QMutex s_lock
void NotifyDone(MPoolThread *)
unsigned int uint
Definition: compat.h:140
void loggingRegisterThread(const QString &name)
Register the current thread with the given name.
Definition: logging.cpp:768
void waitForDone(void)
int m_expiry_timeout
void run(void) override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.
QWaitCondition m_wait
void DeletePoolThreads(void)
QString m_runnable_name
unsigned char t
Definition: ParseText.cpp:329
static QMutex s_pool_lock
bool isRunning(void) const
Definition: mthread.cpp:274
MThreadPool(const QString &name)
bool tryStart(QRunnable *runnable, QString debugName)
void loggingDeregisterThread(void)
Deregister the current thread's name.
Definition: logging.cpp:787
const char * name
Definition: ParseText.cpp:328
int activeThreadCount(void) const
bool TryStartInternal(QRunnable *, const QString &, bool)
void setExpiryTimeout(int expiryTimeout)
void Shutdown(void)
QWaitCondition m_wait
static MThreadPool * globalInstance(void)
MThreadPoolPrivate(const QString &name)
int expiryTimeout(void) const
static void StopAllPools(void)
#define LOG(_MASK_, _LEVEL_, _STRING_)
Definition: mythlogging.h:41
void start(QRunnable *runnable, const QString &debugName, int priority=0)
static uint s_thread_num
MThreadPool & m_pool
MPoolQueues m_run_queues
static MThreadPool * s_pool
MythDB * GetMythDB(void)
Definition: mythdb.cpp:46
QSet< MPoolThread * > m_avail_threads
QList< MPoolThread * > m_delete_threads
QSet< MPoolThread * > m_running_threads
friend class MPoolThread
Definition: mthreadpool.h:19