MythTV  master
mthreadpool.cpp
Go to the documentation of this file.
1 /* -*- Mode: c++ -*-
2  *
3  * Class MThreadPool
4  *
5  * Copyright (C) Daniel Kristjansson 2011
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
75 // C++ headers
76 #include <algorithm>
77 
78 // Qt headers
79 #include <QCoreApplication>
80 #include <QList>
81 #include <QMap>
82 #include <QMutex>
83 #include <QMutexLocker>
84 #include <QPair>
85 #include <QRunnable>
86 #if QT_VERSION >= QT_VERSION_CHECK(5,14,0)
87 #include <QRecursiveMutex>
88 #endif
89 #include <QSet>
90 #include <QWaitCondition>
91 #include <utility>
92 
93 // MythTV headers
94 #include "mthreadpool.h"
95 #include "mythlogging.h"
96 #include "mythtimer.h"
97 #include "logging.h"
98 #include "mthread.h"
99 #include "mythdb.h"
100 
101 using MPoolEntry = QPair<QRunnable*,QString>;
102 using MPoolQueue = QList<MPoolEntry>;
103 using MPoolQueues = QMap<int, MPoolQueue>;
104 
105 class MPoolThread : public MThread
106 {
107  public:
108  MPoolThread(MThreadPool &pool, std::chrono::milliseconds timeout) :
109  MThread("PT"), m_pool(pool), m_expiryTimeout(timeout)
110  {
111  QMutexLocker locker(&s_lock);
112  setObjectName(QString("PT%1").arg(s_thread_num));
113  s_thread_num++;
114  }
115 
116  void run(void) override // MThread
117  {
118  RunProlog();
119 
120  MythTimer t;
121  t.start();
122  QMutexLocker locker(&m_lock);
123  while (true)
124  {
125  if (m_doRun && !m_runnable)
126  m_wait.wait(locker.mutex(), (m_expiryTimeout+1ms).count());
127 
128  if (!m_runnable)
129  {
130  m_doRun = false;
131 
132  locker.unlock();
133  m_pool.NotifyDone(this);
134  locker.relock();
135  break;
136  }
137 
138  if (!m_runnableName.isEmpty())
140 
141  bool autodelete = m_runnable->autoDelete();
142  m_runnable->run();
143  if (autodelete)
144  delete m_runnable;
145  if (m_reserved)
147  m_reserved = false;
148  m_runnable = nullptr;
149 
152 
153  GetMythDB()->GetDBManager()->PurgeIdleConnections(false);
154  qApp->processEvents();
155  qApp->sendPostedEvents(nullptr, QEvent::DeferredDelete);
156 
157  t.start();
158 
159  if (m_doRun)
160  {
161  locker.unlock();
162  m_pool.NotifyAvailable(this);
163  locker.relock();
164  }
165  else
166  {
167  locker.unlock();
168  m_pool.NotifyDone(this);
169  locker.relock();
170  break;
171  }
172  }
173 
174  RunEpilog();
175  }
176 
177  bool SetRunnable(QRunnable *runnable, QString runnableName,
178  bool reserved)
179  {
180  QMutexLocker locker(&m_lock);
181  if (m_doRun && (m_runnable == nullptr))
182  {
183  m_runnable = runnable;
184  m_runnableName = std::move(runnableName);
185  m_reserved = reserved;
186  m_wait.wakeAll();
187  return true;
188  }
189  return false;
190  }
191 
192  void Shutdown(void)
193  {
194  QMutexLocker locker(&m_lock);
195  m_doRun = false;
196  m_wait.wakeAll();
197  }
198 
199  QMutex m_lock;
200  QWaitCondition m_wait;
202  std::chrono::milliseconds m_expiryTimeout;
203  bool m_doRun {true};
204  QString m_runnableName;
205  bool m_reserved {false};
206 
207  static QMutex s_lock;
209 };
210 QMutex MPoolThread::s_lock;
212 
214 
216 {
217  public:
218  explicit MThreadPoolPrivate(QString name) :
219  m_name(std::move(name)) {}
220 
221  int GetRealMaxThread(void) const
222  {
223  return std::max(m_maxThreadCount,1) + m_reserveThread;
224  }
225 
226  mutable QMutex m_lock;
227  QString m_name;
228  QWaitCondition m_wait;
229  bool m_running {true};
230  std::chrono::milliseconds m_expiryTimeout {2min};
231  int m_maxThreadCount {QThread::idealThreadCount()};
233 
235  QSet<MPoolThread*> m_availThreads;
236  QSet<MPoolThread*> m_runningThreads;
237  QList<MPoolThread*> m_deleteThreads;
238 
239 #if QT_VERSION < QT_VERSION_CHECK(5,14,0)
240  static QMutex s_pool_lock;
241 #else
242  static QRecursiveMutex s_pool_lock;
243 #endif
245  static QList<MThreadPool*> s_all_pools;
246 };
247 
248 #if QT_VERSION < QT_VERSION_CHECK(5,14,0)
249 QMutex MThreadPoolPrivate::s_pool_lock(QMutex::Recursive);
250 #else
251 QRecursiveMutex MThreadPoolPrivate::s_pool_lock;
252 #endif
254 QList<MThreadPool*> MThreadPoolPrivate::s_all_pools;
255 
257 
258 MThreadPool::MThreadPool(const QString &name) :
259  m_priv(new MThreadPoolPrivate(name))
260 {
261  QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
262  MThreadPoolPrivate::s_all_pools.push_back(this);
263 }
264 
266 {
267  Stop();
269  {
270  QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
271  MThreadPoolPrivate::s_all_pools.removeAll(this);
272  }
273  delete m_priv;
274  m_priv = nullptr;
275 }
276 
278 {
279  QMutexLocker locker(&m_priv->m_lock);
280  m_priv->m_running = false;
281  QSet<MPoolThread*>::iterator it = m_priv->m_availThreads.begin();
282  for (; it != m_priv->m_availThreads.end(); ++it)
283  (*it)->Shutdown();
284  it = m_priv->m_runningThreads.begin();
285  for (; it != m_priv->m_runningThreads.end(); ++it)
286  (*it)->Shutdown();
287  m_priv->m_wait.wakeAll();
288 }
289 
291 {
292  waitForDone();
293 
294  QMutexLocker locker(&m_priv->m_lock);
295  for (auto *thread : qAsConst(m_priv->m_availThreads))
296  {
297  m_priv->m_deleteThreads.push_front(thread);
298  }
299  m_priv->m_availThreads.clear();
300 
301  while (!m_priv->m_deleteThreads.empty())
302  {
303  MPoolThread *thread = m_priv->m_deleteThreads.back();
304  locker.unlock();
305 
306  thread->wait();
307 
308  locker.relock();
309  delete thread;
310  if (m_priv->m_deleteThreads.back() == thread)
311  m_priv->m_deleteThreads.pop_back();
312  else
313  m_priv->m_deleteThreads.removeAll(thread);
314  }
315 }
316 
318 {
319  QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
321  MThreadPoolPrivate::s_pool = new MThreadPool("GlobalPool");
323 }
324 
326 {
327  QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
328  QList<MThreadPool*>::iterator it;
329  for (it = MThreadPoolPrivate::s_all_pools.begin();
330  it != MThreadPoolPrivate::s_all_pools.end(); ++it)
331  {
332  (*it)->Stop();
333  }
334 }
335 
337 {
338  QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
339  QList<MThreadPool*>::iterator it;
340  for (it = MThreadPoolPrivate::s_all_pools.begin();
341  it != MThreadPoolPrivate::s_all_pools.end(); ++it)
342  {
343  (*it)->Stop();
344  }
345  for (it = MThreadPoolPrivate::s_all_pools.begin();
346  it != MThreadPoolPrivate::s_all_pools.end(); ++it)
347  {
348  (*it)->DeletePoolThreads();
349  }
350 }
351 
352 void MThreadPool::start(QRunnable *runnable, const QString& debugName, int priority)
353 {
354  QMutexLocker locker(&m_priv->m_lock);
355  if (TryStartInternal(runnable, debugName, false))
356  return;
357 
358  MPoolQueues::iterator it = m_priv->m_runQueues.find(priority);
359  if (it != m_priv->m_runQueues.end())
360  {
361  (*it).push_back(MPoolEntry(runnable,debugName));
362  }
363  else
364  {
365  MPoolQueue list;
366  list.push_back(MPoolEntry(runnable,debugName));
367  m_priv->m_runQueues[priority] = list;
368  }
369 }
370 
372  QRunnable *runnable, const QString& debugName,
373  std::chrono::milliseconds waitForAvailMS)
374 {
375  QMutexLocker locker(&m_priv->m_lock);
376  if (waitForAvailMS > 0ms && m_priv->m_availThreads.empty() &&
378  {
379  MythTimer t;
380  t.start();
381  auto left = waitForAvailMS - t.elapsed();
382  while (left > 0ms && m_priv->m_availThreads.empty() &&
384  {
385  m_priv->m_wait.wait(locker.mutex(), left.count());
386  left = waitForAvailMS - t.elapsed();
387  }
388  }
389  TryStartInternal(runnable, debugName, true);
390 }
391 
392 
393 bool MThreadPool::tryStart(QRunnable *runnable, const QString& debugName)
394 {
395  QMutexLocker locker(&m_priv->m_lock);
396  return TryStartInternal(runnable, debugName, false);
397 }
398 
400  QRunnable *runnable, const QString& debugName, bool reserved)
401 {
402  if (!m_priv->m_running)
403  return false;
404 
405  while (!m_priv->m_deleteThreads.empty())
406  {
407  m_priv->m_deleteThreads.back()->wait();
408  delete m_priv->m_deleteThreads.back();
409  m_priv->m_deleteThreads.pop_back();
410  }
411 
412  while (m_priv->m_availThreads.begin() != m_priv->m_availThreads.end())
413  {
414  MPoolThread *thread = *m_priv->m_availThreads.begin();
415  m_priv->m_availThreads.erase(m_priv->m_availThreads.begin());
416  m_priv->m_runningThreads.insert(thread);
417  if (reserved)
419  if (thread->SetRunnable(runnable, debugName, reserved))
420  {
421  return true;
422  }
423 
424  if (reserved)
426  thread->Shutdown();
427  m_priv->m_runningThreads.remove(thread);
428  m_priv->m_deleteThreads.push_front(thread);
429  }
430 
431  if (reserved ||
433  {
434  if (reserved)
436  auto *thread = new MPoolThread(*this, m_priv->m_expiryTimeout);
437  m_priv->m_runningThreads.insert(thread);
438  thread->SetRunnable(runnable, debugName, reserved);
439  thread->start();
440  if (thread->isRunning())
441  {
442  return true;
443  }
444 
445  // Thread failed to run, OOM?
446  // QThread will print an error, so we don't have to
447  if (reserved)
449  thread->Shutdown();
450  m_priv->m_runningThreads.remove(thread);
451  m_priv->m_deleteThreads.push_front(thread);
452  }
453 
454  return false;
455 }
456 
458 {
459  QMutexLocker locker(&m_priv->m_lock);
460 
461  if (!m_priv->m_running)
462  {
463  m_priv->m_runningThreads.remove(thread);
464  thread->Shutdown();
465  m_priv->m_deleteThreads.push_front(thread);
466  m_priv->m_wait.wakeAll();
467  return;
468  }
469 
470  MPoolQueues::iterator it = m_priv->m_runQueues.begin();
471  if (it == m_priv->m_runQueues.end())
472  {
473  m_priv->m_runningThreads.remove(thread);
474  m_priv->m_availThreads.insert(thread);
475  m_priv->m_wait.wakeAll();
476  return;
477  }
478 
479  MPoolEntry e = (*it).front();
480  if (!thread->SetRunnable(e.first, e.second, false))
481  {
482  m_priv->m_runningThreads.remove(thread);
483  m_priv->m_wait.wakeAll();
484  if (!TryStartInternal(e.first, e.second, false))
485  {
486  thread->Shutdown();
487  m_priv->m_deleteThreads.push_front(thread);
488  return;
489  }
490  thread->Shutdown();
491  m_priv->m_deleteThreads.push_front(thread);
492  }
493 
494  (*it).pop_front();
495  if ((*it).empty())
496  m_priv->m_runQueues.erase(it);
497 }
498 
500 {
501  QMutexLocker locker(&m_priv->m_lock);
502  m_priv->m_runningThreads.remove(thread);
503  m_priv->m_availThreads.remove(thread);
504  if (!m_priv->m_deleteThreads.contains(thread))
505  m_priv->m_deleteThreads.push_front(thread);
506  m_priv->m_wait.wakeAll();
507 }
508 
509 std::chrono::milliseconds MThreadPool::expiryTimeout(void) const
510 {
511  QMutexLocker locker(&m_priv->m_lock);
512  return m_priv->m_expiryTimeout;
513 }
514 
515 void MThreadPool::setExpiryTimeout(std::chrono::milliseconds expiryTimeout)
516 {
517  QMutexLocker locker(&m_priv->m_lock);
519 }
520 
522 {
523  QMutexLocker locker(&m_priv->m_lock);
524  return m_priv->m_maxThreadCount;
525 }
526 
527 void MThreadPool::setMaxThreadCount(int maxThreadCount)
528 {
529  QMutexLocker locker(&m_priv->m_lock);
531 }
532 
534 {
535  QMutexLocker locker(&m_priv->m_lock);
536  return m_priv->m_availThreads.size() + m_priv->m_runningThreads.size();
537 }
538 
539 /*
540 void MThreadPool::reserveThread(void)
541 {
542  QMutexLocker locker(&m_priv->m_lock);
543  m_priv->m_reserveThread++;
544 }
545 
546 void MThreadPool::releaseThread(void)
547 {
548  QMutexLocker locker(&m_priv->m_lock);
549  if (m_priv->m_reserveThread > 0)
550  m_priv->m_reserveThread--;
551 }
552 */
553 
555 {
556  QMutexLocker locker(&m_priv->m_lock);
557  if (m_priv->m_reserveThread > 0)
559 }
560 
561 #if 0
562 static void print_set(QString title, QSet<MPoolThread*> set)
563 {
564  LOG(VB_GENERAL, LOG_INFO, title);
565  for (auto item : qAsConst(set))
566  {
567  LOG(VB_GENERAL, LOG_INFO, QString(" : 0x%1")
568  .arg((quint64)item,0,16));
569  }
570  LOG(VB_GENERAL, LOG_INFO, "");
571 }
572 #endif
573 
575 {
576  QMutexLocker locker(&m_priv->m_lock);
577  while (true)
578  {
579  while (!m_priv->m_deleteThreads.empty())
580  {
581  m_priv->m_deleteThreads.back()->wait();
582  delete m_priv->m_deleteThreads.back();
583  m_priv->m_deleteThreads.pop_back();
584  }
585 
586  if (m_priv->m_running && !m_priv->m_runQueues.empty())
587  {
588  m_priv->m_wait.wait(locker.mutex());
589  continue;
590  }
591 
592  QSet<MPoolThread*> working = m_priv->m_runningThreads;
593  working = working.subtract(m_priv->m_availThreads);
594  if (working.empty())
595  break;
596  m_priv->m_wait.wait(locker.mutex());
597  }
598 }
599 
600 /* vim: set expandtab tabstop=4 shiftwidth=4: */
loggingDeregisterThread
void loggingDeregisterThread(void)
Deregister the current thread's name.
Definition: logging.cpp:704
hardwareprofile.smolt.timeout
float timeout
Definition: smolt.py:103
MPoolThread::m_runnableName
QString m_runnableName
Definition: mthreadpool.cpp:204
MThreadPool::tryStart
bool tryStart(QRunnable *runnable, const QString &debugName)
Definition: mthreadpool.cpp:393
mythdb.h
MythTimer
A QElapsedTimer based timer to replace use of QTime as a timer.
Definition: mythtimer.h:13
MPoolThread::m_reserved
bool m_reserved
Definition: mthreadpool.cpp:205
MPoolThread::run
void run(void) override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.
Definition: mthreadpool.cpp:116
MThread::wait
bool wait(std::chrono::milliseconds time=std::chrono::milliseconds::max())
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:300
MThreadPoolPrivate::m_expiryTimeout
std::chrono::milliseconds m_expiryTimeout
Definition: mthreadpool.cpp:230
MThreadPool::maxThreadCount
int maxThreadCount(void) const
Definition: mthreadpool.cpp:521
MPoolThread::m_wait
QWaitCondition m_wait
Definition: mthreadpool.cpp:200
MThread::setObjectName
void setObjectName(const QString &name)
Definition: mthread.cpp:238
MThreadPoolPrivate::MThreadPoolPrivate
MThreadPoolPrivate(QString name)
Definition: mthreadpool.cpp:218
MThreadPoolPrivate::m_running
bool m_running
Definition: mthreadpool.cpp:229
MThreadPool::MPoolThread
friend class MPoolThread
Definition: mthreadpool.h:20
MThreadPoolPrivate::m_lock
QMutex m_lock
Definition: mthreadpool.cpp:226
LOG
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
Definition: mythlogging.h:39
MThread::RunProlog
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:196
MThreadPoolPrivate
Definition: mthreadpool.cpp:215
MPoolThread::SetRunnable
bool SetRunnable(QRunnable *runnable, QString runnableName, bool reserved)
Definition: mthreadpool.cpp:177
GetMythDB
MythDB * GetMythDB(void)
Definition: mythdb.cpp:50
MThreadPool::Stop
void Stop(void)
Definition: mthreadpool.cpp:277
MThreadPool::startReserved
void startReserved(QRunnable *runnable, const QString &debugName, std::chrono::milliseconds waitForAvailMS=0ms)
Definition: mthreadpool.cpp:371
MPoolThread::m_lock
QMutex m_lock
Definition: mthreadpool.cpp:199
MThreadPool::DeletePoolThreads
void DeletePoolThreads(void)
Definition: mthreadpool.cpp:290
MThreadPool::activeThreadCount
int activeThreadCount(void) const
Definition: mthreadpool.cpp:533
MThreadPool::TryStartInternal
bool TryStartInternal(QRunnable *runnable, const QString &debugName, bool reserved)
Definition: mthreadpool.cpp:399
MThreadPool::ShutdownAllPools
static void ShutdownAllPools(void)
Definition: mthreadpool.cpp:336
MThreadPool::ReleaseThread
void ReleaseThread(void)
Definition: mthreadpool.cpp:554
mythlogging.h
MThreadPoolPrivate::m_maxThreadCount
int m_maxThreadCount
Definition: mthreadpool.cpp:231
MThreadPool::StopAllPools
static void StopAllPools(void)
Definition: mthreadpool.cpp:325
MThreadPool::waitForDone
void waitForDone(void)
Definition: mthreadpool.cpp:574
loggingRegisterThread
void loggingRegisterThread(const QString &name)
Register the current thread with the given name.
Definition: logging.cpp:685
hardwareprofile.i18n.t
t
Definition: i18n.py:36
MPoolThread::Shutdown
void Shutdown(void)
Definition: mthreadpool.cpp:192
MPoolThread::s_thread_num
static uint s_thread_num
Definition: mthreadpool.cpp:208
MThreadPoolPrivate::GetRealMaxThread
int GetRealMaxThread(void) const
Definition: mthreadpool.cpp:221
MThread::RunEpilog
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:209
MPoolQueues
QMap< int, MPoolQueue > MPoolQueues
Definition: mthreadpool.cpp:103
MThreadPoolPrivate::m_name
QString m_name
Definition: mthreadpool.cpp:227
uint
unsigned int uint
Definition: compat.h:81
MThreadPoolPrivate::s_all_pools
static QList< MThreadPool * > s_all_pools
Definition: mthreadpool.cpp:245
MPoolQueue
QList< MPoolEntry > MPoolQueue
Definition: mthreadpool.cpp:102
MThreadPool::setExpiryTimeout
void setExpiryTimeout(std::chrono::milliseconds expiryTimeout)
Definition: mthreadpool.cpp:515
MThreadPoolPrivate::s_pool_lock
static QRecursiveMutex s_pool_lock
Definition: mthreadpool.cpp:242
MThreadPoolPrivate::m_deleteThreads
QList< MPoolThread * > m_deleteThreads
Definition: mthreadpool.cpp:237
mthreadpool.h
MPoolThread::m_expiryTimeout
std::chrono::milliseconds m_expiryTimeout
Definition: mthreadpool.cpp:202
MThreadPool
Definition: mthreadpool.h:18
MPoolEntry
QPair< QRunnable *, QString > MPoolEntry
Definition: mthreadpool.cpp:101
MThreadPool::NotifyDone
void NotifyDone(MPoolThread *thread)
Definition: mthreadpool.cpp:499
std
Definition: mythchrono.h:23
MThreadPool::m_priv
MThreadPoolPrivate * m_priv
Definition: mthreadpool.h:57
logging.h
MThreadPool::MThreadPool
MThreadPool(const QString &name)
Definition: mthreadpool.cpp:258
MThreadPoolPrivate::s_pool
static MThreadPool * s_pool
Definition: mthreadpool.cpp:244
MThread
This is a wrapper around QThread that does several additional things.
Definition: mthread.h:48
mthread.h
MPoolThread::m_doRun
bool m_doRun
Definition: mthreadpool.cpp:203
MThreadPoolPrivate::m_wait
QWaitCondition m_wait
Definition: mthreadpool.cpp:228
MThreadPool::expiryTimeout
std::chrono::milliseconds expiryTimeout(void) const
Definition: mthreadpool.cpp:509
MThread::objectName
QString objectName(void) const
Definition: mthread.cpp:243
mythtimer.h
MThreadPool::setMaxThreadCount
void setMaxThreadCount(int maxThreadCount)
Definition: mthreadpool.cpp:527
MThreadPool::NotifyAvailable
void NotifyAvailable(MPoolThread *thread)
Definition: mthreadpool.cpp:457
MPoolThread
Definition: mthreadpool.cpp:105
MThreadPoolPrivate::m_availThreads
QSet< MPoolThread * > m_availThreads
Definition: mthreadpool.cpp:235
MPoolThread::MPoolThread
MPoolThread(MThreadPool &pool, std::chrono::milliseconds timeout)
Definition: mthreadpool.cpp:108
MThread::m_runnable
QRunnable * m_runnable
Definition: mthread.h:136
MPoolThread::m_pool
MThreadPool & m_pool
Definition: mthreadpool.cpp:201
MThreadPool::globalInstance
static MThreadPool * globalInstance(void)
Definition: mthreadpool.cpp:317
MThreadPoolPrivate::m_runningThreads
QSet< MPoolThread * > m_runningThreads
Definition: mthreadpool.cpp:236
MThreadPoolPrivate::m_reserveThread
int m_reserveThread
Definition: mthreadpool.cpp:232
MThreadPool::start
void start(QRunnable *runnable, const QString &debugName, int priority=0)
Definition: mthreadpool.cpp:352
MPoolThread::s_lock
static QMutex s_lock
Definition: mthreadpool.cpp:207
MThreadPool::~MThreadPool
~MThreadPool()
Definition: mthreadpool.cpp:265
MThreadPoolPrivate::m_runQueues
MPoolQueues m_runQueues
Definition: mthreadpool.cpp:234