MythTV  master
mthreadpool.cpp
Go to the documentation of this file.
1 /* -*- Mode: c++ -*-
2  *
3  * Class MThreadPool
4  *
5  * Copyright (C) Daniel Kristjansson 2011
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
75 // C++ headers
76 #include <algorithm>
77 
78 // Qt headers
79 #include <QCoreApplication>
80 #include <QList>
81 #include <QMap>
82 #include <QMutex>
83 #include <QMutexLocker>
84 #include <QPair>
85 #include <QRunnable>
86 #if QT_VERSION >= QT_VERSION_CHECK(5,14,0)
87 #include <QRecursiveMutex>
88 #endif
89 #include <QSet>
90 #include <QWaitCondition>
91 #include <utility>
92 
93 // MythTV headers
94 #include "mthreadpool.h"
95 #include "mythlogging.h"
96 #include "mythtimer.h"
97 #include "logging.h"
98 #include "mthread.h"
99 #include "mythdb.h"
100 
101 using MPoolEntry = QPair<QRunnable*,QString>;
102 using MPoolQueue = QList<MPoolEntry>;
103 using MPoolQueues = QMap<int, MPoolQueue>;
104 
105 class MPoolThread : public MThread
106 {
107  public:
108  MPoolThread(MThreadPool &pool, std::chrono::milliseconds timeout) :
109  MThread("PT"), m_pool(pool), m_expiryTimeout(timeout)
110  {
111  QMutexLocker locker(&s_lock);
112  setObjectName(QString("PT%1").arg(s_thread_num));
113  s_thread_num++;
114  }
115 
116  void run(void) override // MThread
117  {
118  RunProlog();
119 
120  MythTimer t;
121  t.start();
122  QMutexLocker locker(&m_lock);
123  while (true)
124  {
125  if (m_doRun && !m_runnable)
126  m_wait.wait(locker.mutex(), (m_expiryTimeout+1ms).count());
127 
128  if (!m_runnable)
129  {
130  m_doRun = false;
131 
132  locker.unlock();
133  m_pool.NotifyDone(this);
134  locker.relock();
135  break;
136  }
137 
138  if (!m_runnableName.isEmpty())
140 
141  bool autodelete = m_runnable->autoDelete();
142  m_runnable->run();
143  if (autodelete)
144  delete m_runnable;
145  if (m_reserved)
147  m_reserved = false;
148  m_runnable = nullptr;
149 
152 
153  GetMythDB()->GetDBManager()->PurgeIdleConnections(false);
154  qApp->processEvents();
155  qApp->sendPostedEvents(nullptr, QEvent::DeferredDelete);
156 
157  t.start();
158 
159  if (m_doRun)
160  {
161  locker.unlock();
162  m_pool.NotifyAvailable(this);
163  locker.relock();
164  }
165  else
166  {
167  locker.unlock();
168  m_pool.NotifyDone(this);
169  locker.relock();
170  break;
171  }
172  }
173 
174  RunEpilog();
175  }
176 
177  bool SetRunnable(QRunnable *runnable, QString runnableName,
178  bool reserved)
179  {
180  QMutexLocker locker(&m_lock);
181  if (m_doRun && (m_runnable == nullptr))
182  {
183  m_runnable = runnable;
184  m_runnableName = std::move(runnableName);
185  m_reserved = reserved;
186  m_wait.wakeAll();
187  return true;
188  }
189  return false;
190  }
191 
192  void Shutdown(void)
193  {
194  QMutexLocker locker(&m_lock);
195  m_doRun = false;
196  m_wait.wakeAll();
197  }
198 
199  QMutex m_lock;
200  QWaitCondition m_wait;
202  std::chrono::milliseconds m_expiryTimeout;
203  bool m_doRun {true};
204  QString m_runnableName;
205  bool m_reserved {false};
206 
207  static QMutex s_lock;
209 };
210 QMutex MPoolThread::s_lock;
212 
214 
216 {
217  public:
218  explicit MThreadPoolPrivate(QString name) :
219  m_name(std::move(name)) {}
220 
221  int GetRealMaxThread(void) const
222  {
223  return std::max(m_maxThreadCount,1) + m_reserveThread;
224  }
225 
226  mutable QMutex m_lock;
227  QString m_name;
228  QWaitCondition m_wait;
229  bool m_running {true};
230  std::chrono::milliseconds m_expiryTimeout {2min};
231  int m_maxThreadCount {QThread::idealThreadCount()};
233 
235  QSet<MPoolThread*> m_availThreads;
236  QSet<MPoolThread*> m_runningThreads;
237  QList<MPoolThread*> m_deleteThreads;
238 
239 #if QT_VERSION < QT_VERSION_CHECK(5,14,0)
240  static QMutex s_pool_lock;
241 #else
242  static QRecursiveMutex s_pool_lock;
243 #endif
245  static QList<MThreadPool*> s_all_pools;
246 };
247 
248 #if QT_VERSION < QT_VERSION_CHECK(5,14,0)
249 QMutex MThreadPoolPrivate::s_pool_lock(QMutex::Recursive);
250 #else
251 QRecursiveMutex MThreadPoolPrivate::s_pool_lock;
252 #endif
254 QList<MThreadPool*> MThreadPoolPrivate::s_all_pools;
255 
257 
258 MThreadPool::MThreadPool(const QString &name) :
259  m_priv(new MThreadPoolPrivate(name))
260 {
261  QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
262  MThreadPoolPrivate::s_all_pools.push_back(this);
263 }
264 
266 {
267  Stop();
269  {
270  QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
271  MThreadPoolPrivate::s_all_pools.removeAll(this);
272  }
273  delete m_priv;
274  m_priv = nullptr;
275 }
276 
278 {
279  QMutexLocker locker(&m_priv->m_lock);
280  m_priv->m_running = false;
281  QSet<MPoolThread*>::iterator it = m_priv->m_availThreads.begin();
282  for (; it != m_priv->m_availThreads.end(); ++it)
283  (*it)->Shutdown();
284  it = m_priv->m_runningThreads.begin();
285  for (; it != m_priv->m_runningThreads.end(); ++it)
286  (*it)->Shutdown();
287  m_priv->m_wait.wakeAll();
288 }
289 
291 {
292  waitForDone();
293 
294  QMutexLocker locker(&m_priv->m_lock);
295  for (auto *thread : qAsConst(m_priv->m_availThreads))
296  {
297  m_priv->m_deleteThreads.push_front(thread);
298  }
299  m_priv->m_availThreads.clear();
300 
301  while (!m_priv->m_deleteThreads.empty())
302  {
303  MPoolThread *thread = m_priv->m_deleteThreads.back();
304  locker.unlock();
305 
306  thread->wait();
307 
308  locker.relock();
309  delete thread;
310  if (m_priv->m_deleteThreads.back() == thread)
311  m_priv->m_deleteThreads.pop_back();
312  else
313  m_priv->m_deleteThreads.removeAll(thread);
314  }
315 }
316 
318 {
319  QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
321  MThreadPoolPrivate::s_pool = new MThreadPool("GlobalPool");
323 }
324 
326 {
327  QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
328  QList<MThreadPool*>::iterator it;
329  for (it = MThreadPoolPrivate::s_all_pools.begin();
330  it != MThreadPoolPrivate::s_all_pools.end(); ++it)
331  {
332  (*it)->Stop();
333  }
334 }
335 
337 {
338  QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
339  QList<MThreadPool*>::iterator it;
340  for (it = MThreadPoolPrivate::s_all_pools.begin();
341  it != MThreadPoolPrivate::s_all_pools.end(); ++it)
342  {
343  (*it)->Stop();
344  }
345  for (it = MThreadPoolPrivate::s_all_pools.begin();
346  it != MThreadPoolPrivate::s_all_pools.end(); ++it)
347  {
348  (*it)->DeletePoolThreads();
349  }
350 }
351 
352 void MThreadPool::start(QRunnable *runnable, const QString& debugName, int priority)
353 {
354  QMutexLocker locker(&m_priv->m_lock);
355  if (TryStartInternal(runnable, debugName, false))
356  return;
357 
358  MPoolQueues::iterator it = m_priv->m_runQueues.find(priority);
359  if (it != m_priv->m_runQueues.end())
360  {
361  (*it).push_back(MPoolEntry(runnable,debugName));
362  }
363  else
364  {
365  MPoolQueue list;
366  list.push_back(MPoolEntry(runnable,debugName));
367  m_priv->m_runQueues[priority] = list;
368  }
369 }
370 
372  QRunnable *runnable, const QString& debugName,
373  std::chrono::milliseconds waitForAvailMS)
374 {
375  QMutexLocker locker(&m_priv->m_lock);
376  if (waitForAvailMS > 0ms && m_priv->m_availThreads.empty() &&
378  {
379  MythTimer t;
380  t.start();
381  auto left = waitForAvailMS - t.elapsed();
382  while (left > 0ms && m_priv->m_availThreads.empty() &&
384  {
385  m_priv->m_wait.wait(locker.mutex(), left.count());
386  left = waitForAvailMS - t.elapsed();
387  }
388  }
389  TryStartInternal(runnable, debugName, true);
390 }
391 
392 
393 bool MThreadPool::tryStart(QRunnable *runnable, const QString& debugName)
394 {
395  QMutexLocker locker(&m_priv->m_lock);
396  return TryStartInternal(runnable, debugName, false);
397 }
398 
400  QRunnable *runnable, const QString& debugName, bool reserved)
401 {
402  if (!m_priv->m_running)
403  return false;
404 
405  while (!m_priv->m_deleteThreads.empty())
406  {
407  m_priv->m_deleteThreads.back()->wait();
408  delete m_priv->m_deleteThreads.back();
409  m_priv->m_deleteThreads.pop_back();
410  }
411 
412  for (auto iter = m_priv->m_availThreads.begin();
413  iter != m_priv->m_availThreads.end(); )
414  {
415  MPoolThread *thread = *iter;
416  iter = m_priv->m_availThreads.erase(iter);
417  m_priv->m_runningThreads.insert(thread);
418  if (reserved)
420  if (thread->SetRunnable(runnable, debugName, reserved))
421  {
422  return true;
423  }
424 
425  if (reserved)
427  thread->Shutdown();
428  m_priv->m_runningThreads.remove(thread);
429  m_priv->m_deleteThreads.push_front(thread);
430  }
431 
432  if (reserved ||
434  {
435  if (reserved)
437  auto *thread = new MPoolThread(*this, m_priv->m_expiryTimeout);
438  m_priv->m_runningThreads.insert(thread);
439  thread->SetRunnable(runnable, debugName, reserved);
440  thread->start();
441  if (thread->isRunning())
442  {
443  return true;
444  }
445 
446  // Thread failed to run, OOM?
447  // QThread will print an error, so we don't have to
448  if (reserved)
450  thread->Shutdown();
451  m_priv->m_runningThreads.remove(thread);
452  m_priv->m_deleteThreads.push_front(thread);
453  }
454 
455  return false;
456 }
457 
459 {
460  QMutexLocker locker(&m_priv->m_lock);
461 
462  if (!m_priv->m_running)
463  {
464  m_priv->m_runningThreads.remove(thread);
465  thread->Shutdown();
466  m_priv->m_deleteThreads.push_front(thread);
467  m_priv->m_wait.wakeAll();
468  return;
469  }
470 
471  MPoolQueues::iterator it = m_priv->m_runQueues.begin();
472  if (it == m_priv->m_runQueues.end())
473  {
474  m_priv->m_runningThreads.remove(thread);
475  m_priv->m_availThreads.insert(thread);
476  m_priv->m_wait.wakeAll();
477  return;
478  }
479 
480  MPoolEntry e = (*it).front();
481  if (!thread->SetRunnable(e.first, e.second, false))
482  {
483  m_priv->m_runningThreads.remove(thread);
484  m_priv->m_wait.wakeAll();
485  if (!TryStartInternal(e.first, e.second, false))
486  {
487  thread->Shutdown();
488  m_priv->m_deleteThreads.push_front(thread);
489  return;
490  }
491  thread->Shutdown();
492  m_priv->m_deleteThreads.push_front(thread);
493  }
494 
495  (*it).pop_front();
496  if ((*it).empty())
497  m_priv->m_runQueues.erase(it);
498 }
499 
501 {
502  QMutexLocker locker(&m_priv->m_lock);
503  m_priv->m_runningThreads.remove(thread);
504  m_priv->m_availThreads.remove(thread);
505  if (!m_priv->m_deleteThreads.contains(thread))
506  m_priv->m_deleteThreads.push_front(thread);
507  m_priv->m_wait.wakeAll();
508 }
509 
510 std::chrono::milliseconds MThreadPool::expiryTimeout(void) const
511 {
512  QMutexLocker locker(&m_priv->m_lock);
513  return m_priv->m_expiryTimeout;
514 }
515 
516 void MThreadPool::setExpiryTimeout(std::chrono::milliseconds expiryTimeout)
517 {
518  QMutexLocker locker(&m_priv->m_lock);
520 }
521 
523 {
524  QMutexLocker locker(&m_priv->m_lock);
525  return m_priv->m_maxThreadCount;
526 }
527 
528 void MThreadPool::setMaxThreadCount(int maxThreadCount)
529 {
530  QMutexLocker locker(&m_priv->m_lock);
532 }
533 
535 {
536  QMutexLocker locker(&m_priv->m_lock);
537  return m_priv->m_availThreads.size() + m_priv->m_runningThreads.size();
538 }
539 
540 /*
541 void MThreadPool::reserveThread(void)
542 {
543  QMutexLocker locker(&m_priv->m_lock);
544  m_priv->m_reserveThread++;
545 }
546 
547 void MThreadPool::releaseThread(void)
548 {
549  QMutexLocker locker(&m_priv->m_lock);
550  if (m_priv->m_reserveThread > 0)
551  m_priv->m_reserveThread--;
552 }
553 */
554 
556 {
557  QMutexLocker locker(&m_priv->m_lock);
558  if (m_priv->m_reserveThread > 0)
560 }
561 
562 #if 0
563 static void print_set(QString title, QSet<MPoolThread*> set)
564 {
565  LOG(VB_GENERAL, LOG_INFO, title);
566  for (auto item : qAsConst(set))
567  {
568  LOG(VB_GENERAL, LOG_INFO, QString(" : 0x%1")
569  .arg((quint64)item,0,16));
570  }
571  LOG(VB_GENERAL, LOG_INFO, "");
572 }
573 #endif
574 
576 {
577  QMutexLocker locker(&m_priv->m_lock);
578  while (true)
579  {
580  while (!m_priv->m_deleteThreads.empty())
581  {
582  m_priv->m_deleteThreads.back()->wait();
583  delete m_priv->m_deleteThreads.back();
584  m_priv->m_deleteThreads.pop_back();
585  }
586 
587  if (m_priv->m_running && !m_priv->m_runQueues.empty())
588  {
589  m_priv->m_wait.wait(locker.mutex());
590  continue;
591  }
592 
593  QSet<MPoolThread*> working = m_priv->m_runningThreads;
594  working = working.subtract(m_priv->m_availThreads);
595  if (working.empty())
596  break;
597  m_priv->m_wait.wait(locker.mutex());
598  }
599 }
600 
601 /* vim: set expandtab tabstop=4 shiftwidth=4: */
loggingDeregisterThread
void loggingDeregisterThread(void)
Deregister the current thread's name.
Definition: logging.cpp:703
hardwareprofile.smolt.timeout
float timeout
Definition: smolt.py:103
MPoolThread::m_runnableName
QString m_runnableName
Definition: mthreadpool.cpp:204
MThreadPool::tryStart
bool tryStart(QRunnable *runnable, const QString &debugName)
Definition: mthreadpool.cpp:393
mythdb.h
MythTimer
A QElapsedTimer based timer to replace use of QTime as a timer.
Definition: mythtimer.h:13
MPoolThread::m_reserved
bool m_reserved
Definition: mthreadpool.cpp:205
MPoolThread::run
void run(void) override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.
Definition: mthreadpool.cpp:116
MThread::wait
bool wait(std::chrono::milliseconds time=std::chrono::milliseconds::max())
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:300
MThreadPoolPrivate::m_expiryTimeout
std::chrono::milliseconds m_expiryTimeout
Definition: mthreadpool.cpp:230
MThreadPool::maxThreadCount
int maxThreadCount(void) const
Definition: mthreadpool.cpp:522
MPoolThread::m_wait
QWaitCondition m_wait
Definition: mthreadpool.cpp:200
MThread::setObjectName
void setObjectName(const QString &name)
Definition: mthread.cpp:238
MThreadPoolPrivate::MThreadPoolPrivate
MThreadPoolPrivate(QString name)
Definition: mthreadpool.cpp:218
MThreadPoolPrivate::m_running
bool m_running
Definition: mthreadpool.cpp:229
MThreadPool::MPoolThread
friend class MPoolThread
Definition: mthreadpool.h:20
MThreadPoolPrivate::m_lock
QMutex m_lock
Definition: mthreadpool.cpp:226
LOG
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
Definition: mythlogging.h:39
MThread::RunProlog
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:196
MThreadPoolPrivate
Definition: mthreadpool.cpp:215
MPoolThread::SetRunnable
bool SetRunnable(QRunnable *runnable, QString runnableName, bool reserved)
Definition: mthreadpool.cpp:177
GetMythDB
MythDB * GetMythDB(void)
Definition: mythdb.cpp:50
MThreadPool::Stop
void Stop(void)
Definition: mthreadpool.cpp:277
MThreadPool::startReserved
void startReserved(QRunnable *runnable, const QString &debugName, std::chrono::milliseconds waitForAvailMS=0ms)
Definition: mthreadpool.cpp:371
MPoolThread::m_lock
QMutex m_lock
Definition: mthreadpool.cpp:199
MThreadPool::DeletePoolThreads
void DeletePoolThreads(void)
Definition: mthreadpool.cpp:290
MThreadPool::activeThreadCount
int activeThreadCount(void) const
Definition: mthreadpool.cpp:534
MThreadPool::TryStartInternal
bool TryStartInternal(QRunnable *runnable, const QString &debugName, bool reserved)
Definition: mthreadpool.cpp:399
MThreadPool::ShutdownAllPools
static void ShutdownAllPools(void)
Definition: mthreadpool.cpp:336
MThreadPool::ReleaseThread
void ReleaseThread(void)
Definition: mthreadpool.cpp:555
mythlogging.h
MThreadPoolPrivate::m_maxThreadCount
int m_maxThreadCount
Definition: mthreadpool.cpp:231
MThreadPool::StopAllPools
static void StopAllPools(void)
Definition: mthreadpool.cpp:325
MThreadPool::waitForDone
void waitForDone(void)
Definition: mthreadpool.cpp:575
loggingRegisterThread
void loggingRegisterThread(const QString &name)
Register the current thread with the given name.
Definition: logging.cpp:684
hardwareprofile.i18n.t
t
Definition: i18n.py:36
MPoolThread::Shutdown
void Shutdown(void)
Definition: mthreadpool.cpp:192
MPoolThread::s_thread_num
static uint s_thread_num
Definition: mthreadpool.cpp:208
MThreadPoolPrivate::GetRealMaxThread
int GetRealMaxThread(void) const
Definition: mthreadpool.cpp:221
MThread::RunEpilog
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:209
MPoolQueues
QMap< int, MPoolQueue > MPoolQueues
Definition: mthreadpool.cpp:103
MThreadPoolPrivate::m_name
QString m_name
Definition: mthreadpool.cpp:227
uint
unsigned int uint
Definition: compat.h:81
MThreadPoolPrivate::s_all_pools
static QList< MThreadPool * > s_all_pools
Definition: mthreadpool.cpp:245
MPoolQueue
QList< MPoolEntry > MPoolQueue
Definition: mthreadpool.cpp:102
MThreadPool::setExpiryTimeout
void setExpiryTimeout(std::chrono::milliseconds expiryTimeout)
Definition: mthreadpool.cpp:516
MThreadPoolPrivate::s_pool_lock
static QRecursiveMutex s_pool_lock
Definition: mthreadpool.cpp:242
MThreadPoolPrivate::m_deleteThreads
QList< MPoolThread * > m_deleteThreads
Definition: mthreadpool.cpp:237
mthreadpool.h
MPoolThread::m_expiryTimeout
std::chrono::milliseconds m_expiryTimeout
Definition: mthreadpool.cpp:202
MThreadPool
Definition: mthreadpool.h:18
MPoolEntry
QPair< QRunnable *, QString > MPoolEntry
Definition: mthreadpool.cpp:101
MThreadPool::NotifyDone
void NotifyDone(MPoolThread *thread)
Definition: mthreadpool.cpp:500
std
Definition: mythchrono.h:23
MThreadPool::m_priv
MThreadPoolPrivate * m_priv
Definition: mthreadpool.h:57
logging.h
MThreadPool::MThreadPool
MThreadPool(const QString &name)
Definition: mthreadpool.cpp:258
MThreadPoolPrivate::s_pool
static MThreadPool * s_pool
Definition: mthreadpool.cpp:244
MThread
This is a wrapper around QThread that does several additional things.
Definition: mthread.h:48
mthread.h
MPoolThread::m_doRun
bool m_doRun
Definition: mthreadpool.cpp:203
MThreadPoolPrivate::m_wait
QWaitCondition m_wait
Definition: mthreadpool.cpp:228
MThreadPool::expiryTimeout
std::chrono::milliseconds expiryTimeout(void) const
Definition: mthreadpool.cpp:510
MThread::objectName
QString objectName(void) const
Definition: mthread.cpp:243
mythtimer.h
MThreadPool::setMaxThreadCount
void setMaxThreadCount(int maxThreadCount)
Definition: mthreadpool.cpp:528
MThreadPool::NotifyAvailable
void NotifyAvailable(MPoolThread *thread)
Definition: mthreadpool.cpp:458
MPoolThread
Definition: mthreadpool.cpp:105
MThreadPoolPrivate::m_availThreads
QSet< MPoolThread * > m_availThreads
Definition: mthreadpool.cpp:235
MPoolThread::MPoolThread
MPoolThread(MThreadPool &pool, std::chrono::milliseconds timeout)
Definition: mthreadpool.cpp:108
MThread::m_runnable
QRunnable * m_runnable
Definition: mthread.h:136
MPoolThread::m_pool
MThreadPool & m_pool
Definition: mthreadpool.cpp:201
MThreadPool::globalInstance
static MThreadPool * globalInstance(void)
Definition: mthreadpool.cpp:317
MThreadPoolPrivate::m_runningThreads
QSet< MPoolThread * > m_runningThreads
Definition: mthreadpool.cpp:236
MThreadPoolPrivate::m_reserveThread
int m_reserveThread
Definition: mthreadpool.cpp:232
MThreadPool::start
void start(QRunnable *runnable, const QString &debugName, int priority=0)
Definition: mthreadpool.cpp:352
MPoolThread::s_lock
static QMutex s_lock
Definition: mthreadpool.cpp:207
MThreadPool::~MThreadPool
~MThreadPool()
Definition: mthreadpool.cpp:265
MThreadPoolPrivate::m_runQueues
MPoolQueues m_runQueues
Definition: mthreadpool.cpp:234