MythTV  master
mthreadpool.cpp
Go to the documentation of this file.
1 /* -*- Mode: c++ -*-
2  *
3  * Class MThreadPool
4  *
5  * Copyright (C) Daniel Kristjansson 2011
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
75 // C++ headers
76 #include <algorithm>
77 
78 // Qt headers
79 #include <QCoreApplication>
80 #include <QList>
81 #include <QMap>
82 #include <QMutex>
83 #include <QMutexLocker>
84 #include <QPair>
85 #include <QRunnable>
86 #include <QSet>
87 #include <QWaitCondition>
88 #include <utility>
89 
90 // MythTV headers
91 #include "mthreadpool.h"
92 #include "mythlogging.h"
93 #include "mythtimer.h"
94 #include "logging.h"
95 #include "mthread.h"
96 #include "mythdb.h"
97 
98 using MPoolEntry = QPair<QRunnable*,QString>;
99 using MPoolQueue = QList<MPoolEntry>;
100 using MPoolQueues = QMap<int, MPoolQueue>;
101 
102 class MPoolThread : public MThread
103 {
104  public:
105  MPoolThread(MThreadPool &pool, std::chrono::milliseconds timeout) :
106  MThread("PT"), m_pool(pool), m_expiryTimeout(timeout)
107  {
108  QMutexLocker locker(&s_lock);
109  setObjectName(QString("PT%1").arg(s_thread_num));
110  s_thread_num++;
111  }
112 
113  void run(void) override // MThread
114  {
115  RunProlog();
116 
117  MythTimer t;
118  t.start();
119  QMutexLocker locker(&m_lock);
120  while (true)
121  {
122  if (m_doRun && !m_runnable)
123  m_wait.wait(locker.mutex(), (m_expiryTimeout+1ms).count());
124 
125  if (!m_runnable)
126  {
127  m_doRun = false;
128 
129  locker.unlock();
130  m_pool.NotifyDone(this);
131  locker.relock();
132  break;
133  }
134 
135  if (!m_runnableName.isEmpty())
137 
138  bool autodelete = m_runnable->autoDelete();
139  m_runnable->run();
140  if (autodelete)
141  delete m_runnable;
142  if (m_reserved)
144  m_reserved = false;
145  m_runnable = nullptr;
146 
149 
150  GetMythDB()->GetDBManager()->PurgeIdleConnections(false);
151  qApp->processEvents();
152  qApp->sendPostedEvents(nullptr, QEvent::DeferredDelete);
153 
154  t.start();
155 
156  if (m_doRun)
157  {
158  locker.unlock();
159  m_pool.NotifyAvailable(this);
160  locker.relock();
161  }
162  else
163  {
164  locker.unlock();
165  m_pool.NotifyDone(this);
166  locker.relock();
167  break;
168  }
169  }
170 
171  RunEpilog();
172  }
173 
174  bool SetRunnable(QRunnable *runnable, QString runnableName,
175  bool reserved)
176  {
177  QMutexLocker locker(&m_lock);
178  if (m_doRun && (m_runnable == nullptr))
179  {
180  m_runnable = runnable;
181  m_runnableName = std::move(runnableName);
182  m_reserved = reserved;
183  m_wait.wakeAll();
184  return true;
185  }
186  return false;
187  }
188 
189  void Shutdown(void)
190  {
191  QMutexLocker locker(&m_lock);
192  m_doRun = false;
193  m_wait.wakeAll();
194  }
195 
196  QMutex m_lock;
197  QWaitCondition m_wait;
199  std::chrono::milliseconds m_expiryTimeout;
200  bool m_doRun {true};
201  QString m_runnableName;
202  bool m_reserved {false};
203 
204  static QMutex s_lock;
206 };
207 QMutex MPoolThread::s_lock;
209 
211 
213 {
214  public:
215  explicit MThreadPoolPrivate(QString name) :
216  m_name(std::move(name)) {}
217 
218  int GetRealMaxThread(void) const
219  {
220  return std::max(m_maxThreadCount,1) + m_reserveThread;
221  }
222 
223  mutable QMutex m_lock;
224  QString m_name;
225  QWaitCondition m_wait;
226  bool m_running {true};
227  std::chrono::milliseconds m_expiryTimeout {2min};
228  int m_maxThreadCount {QThread::idealThreadCount()};
230 
232  QSet<MPoolThread*> m_availThreads;
233  QSet<MPoolThread*> m_runningThreads;
234  QList<MPoolThread*> m_deleteThreads;
235 
236  static QMutex s_pool_lock;
238  static QList<MThreadPool*> s_all_pools;
239 };
240 
241 QMutex MThreadPoolPrivate::s_pool_lock(QMutex::Recursive);
243 QList<MThreadPool*> MThreadPoolPrivate::s_all_pools;
244 
246 
247 MThreadPool::MThreadPool(const QString &name) :
248  m_priv(new MThreadPoolPrivate(name))
249 {
250  QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
251  MThreadPoolPrivate::s_all_pools.push_back(this);
252 }
253 
255 {
256  Stop();
258  {
259  QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
260  MThreadPoolPrivate::s_all_pools.removeAll(this);
261  }
262  delete m_priv;
263  m_priv = nullptr;
264 }
265 
267 {
268  QMutexLocker locker(&m_priv->m_lock);
269  m_priv->m_running = false;
270  QSet<MPoolThread*>::iterator it = m_priv->m_availThreads.begin();
271  for (; it != m_priv->m_availThreads.end(); ++it)
272  (*it)->Shutdown();
273  it = m_priv->m_runningThreads.begin();
274  for (; it != m_priv->m_runningThreads.end(); ++it)
275  (*it)->Shutdown();
276  m_priv->m_wait.wakeAll();
277 }
278 
280 {
281  waitForDone();
282 
283  QMutexLocker locker(&m_priv->m_lock);
284  for (auto *thread : qAsConst(m_priv->m_availThreads))
285  {
286  m_priv->m_deleteThreads.push_front(thread);
287  }
288  m_priv->m_availThreads.clear();
289 
290  while (!m_priv->m_deleteThreads.empty())
291  {
292  MPoolThread *thread = m_priv->m_deleteThreads.back();
293  locker.unlock();
294 
295  thread->wait();
296 
297  locker.relock();
298  delete thread;
299  if (m_priv->m_deleteThreads.back() == thread)
300  m_priv->m_deleteThreads.pop_back();
301  else
302  m_priv->m_deleteThreads.removeAll(thread);
303  }
304 }
305 
307 {
308  QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
310  MThreadPoolPrivate::s_pool = new MThreadPool("GlobalPool");
312 }
313 
315 {
316  QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
317  QList<MThreadPool*>::iterator it;
318  for (it = MThreadPoolPrivate::s_all_pools.begin();
319  it != MThreadPoolPrivate::s_all_pools.end(); ++it)
320  {
321  (*it)->Stop();
322  }
323 }
324 
326 {
327  QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
328  QList<MThreadPool*>::iterator it;
329  for (it = MThreadPoolPrivate::s_all_pools.begin();
330  it != MThreadPoolPrivate::s_all_pools.end(); ++it)
331  {
332  (*it)->Stop();
333  }
334  for (it = MThreadPoolPrivate::s_all_pools.begin();
335  it != MThreadPoolPrivate::s_all_pools.end(); ++it)
336  {
337  (*it)->DeletePoolThreads();
338  }
339 }
340 
341 void MThreadPool::start(QRunnable *runnable, const QString& debugName, int priority)
342 {
343  QMutexLocker locker(&m_priv->m_lock);
344  if (TryStartInternal(runnable, debugName, false))
345  return;
346 
347  MPoolQueues::iterator it = m_priv->m_runQueues.find(priority);
348  if (it != m_priv->m_runQueues.end())
349  {
350  (*it).push_back(MPoolEntry(runnable,debugName));
351  }
352  else
353  {
354  MPoolQueue list;
355  list.push_back(MPoolEntry(runnable,debugName));
356  m_priv->m_runQueues[priority] = list;
357  }
358 }
359 
361  QRunnable *runnable, const QString& debugName,
362  std::chrono::milliseconds waitForAvailMS)
363 {
364  QMutexLocker locker(&m_priv->m_lock);
365  if (waitForAvailMS > 0ms && m_priv->m_availThreads.empty() &&
367  {
368  MythTimer t;
369  t.start();
370  auto left = waitForAvailMS - t.elapsed();
371  while (left > 0ms && m_priv->m_availThreads.empty() &&
373  {
374  m_priv->m_wait.wait(locker.mutex(), left.count());
375  left = waitForAvailMS - t.elapsed();
376  }
377  }
378  TryStartInternal(runnable, debugName, true);
379 }
380 
381 
382 bool MThreadPool::tryStart(QRunnable *runnable, const QString& debugName)
383 {
384  QMutexLocker locker(&m_priv->m_lock);
385  return TryStartInternal(runnable, debugName, false);
386 }
387 
389  QRunnable *runnable, const QString& debugName, bool reserved)
390 {
391  if (!m_priv->m_running)
392  return false;
393 
394  while (!m_priv->m_deleteThreads.empty())
395  {
396  m_priv->m_deleteThreads.back()->wait();
397  delete m_priv->m_deleteThreads.back();
398  m_priv->m_deleteThreads.pop_back();
399  }
400 
401  while (m_priv->m_availThreads.begin() != m_priv->m_availThreads.end())
402  {
403  MPoolThread *thread = *m_priv->m_availThreads.begin();
404  m_priv->m_availThreads.erase(m_priv->m_availThreads.begin());
405  m_priv->m_runningThreads.insert(thread);
406  if (reserved)
408  if (thread->SetRunnable(runnable, debugName, reserved))
409  {
410  return true;
411  }
412 
413  if (reserved)
415  thread->Shutdown();
416  m_priv->m_runningThreads.remove(thread);
417  m_priv->m_deleteThreads.push_front(thread);
418  }
419 
420  if (reserved ||
422  {
423  if (reserved)
425  auto *thread = new MPoolThread(*this, m_priv->m_expiryTimeout);
426  m_priv->m_runningThreads.insert(thread);
427  thread->SetRunnable(runnable, debugName, reserved);
428  thread->start();
429  if (thread->isRunning())
430  {
431  return true;
432  }
433 
434  // Thread failed to run, OOM?
435  // QThread will print an error, so we don't have to
436  if (reserved)
438  thread->Shutdown();
439  m_priv->m_runningThreads.remove(thread);
440  m_priv->m_deleteThreads.push_front(thread);
441  }
442 
443  return false;
444 }
445 
447 {
448  QMutexLocker locker(&m_priv->m_lock);
449 
450  if (!m_priv->m_running)
451  {
452  m_priv->m_runningThreads.remove(thread);
453  thread->Shutdown();
454  m_priv->m_deleteThreads.push_front(thread);
455  m_priv->m_wait.wakeAll();
456  return;
457  }
458 
459  MPoolQueues::iterator it = m_priv->m_runQueues.begin();
460  if (it == m_priv->m_runQueues.end())
461  {
462  m_priv->m_runningThreads.remove(thread);
463  m_priv->m_availThreads.insert(thread);
464  m_priv->m_wait.wakeAll();
465  return;
466  }
467 
468  MPoolEntry e = (*it).front();
469  if (!thread->SetRunnable(e.first, e.second, false))
470  {
471  m_priv->m_runningThreads.remove(thread);
472  m_priv->m_wait.wakeAll();
473  if (!TryStartInternal(e.first, e.second, false))
474  {
475  thread->Shutdown();
476  m_priv->m_deleteThreads.push_front(thread);
477  return;
478  }
479  thread->Shutdown();
480  m_priv->m_deleteThreads.push_front(thread);
481  }
482 
483  (*it).pop_front();
484  if ((*it).empty())
485  m_priv->m_runQueues.erase(it);
486 }
487 
489 {
490  QMutexLocker locker(&m_priv->m_lock);
491  m_priv->m_runningThreads.remove(thread);
492  m_priv->m_availThreads.remove(thread);
493  if (!m_priv->m_deleteThreads.contains(thread))
494  m_priv->m_deleteThreads.push_front(thread);
495  m_priv->m_wait.wakeAll();
496 }
497 
498 std::chrono::milliseconds MThreadPool::expiryTimeout(void) const
499 {
500  QMutexLocker locker(&m_priv->m_lock);
501  return m_priv->m_expiryTimeout;
502 }
503 
504 void MThreadPool::setExpiryTimeout(std::chrono::milliseconds expiryTimeout)
505 {
506  QMutexLocker locker(&m_priv->m_lock);
508 }
509 
511 {
512  QMutexLocker locker(&m_priv->m_lock);
513  return m_priv->m_maxThreadCount;
514 }
515 
516 void MThreadPool::setMaxThreadCount(int maxThreadCount)
517 {
518  QMutexLocker locker(&m_priv->m_lock);
520 }
521 
523 {
524  QMutexLocker locker(&m_priv->m_lock);
525  return m_priv->m_availThreads.size() + m_priv->m_runningThreads.size();
526 }
527 
528 /*
529 void MThreadPool::reserveThread(void)
530 {
531  QMutexLocker locker(&m_priv->m_lock);
532  m_priv->m_reserveThread++;
533 }
534 
535 void MThreadPool::releaseThread(void)
536 {
537  QMutexLocker locker(&m_priv->m_lock);
538  if (m_priv->m_reserveThread > 0)
539  m_priv->m_reserveThread--;
540 }
541 */
542 
544 {
545  QMutexLocker locker(&m_priv->m_lock);
546  if (m_priv->m_reserveThread > 0)
548 }
549 
550 #if 0
551 static void print_set(QString title, QSet<MPoolThread*> set)
552 {
553  LOG(VB_GENERAL, LOG_INFO, title);
554  for (auto item : qAsConst(set))
555  {
556  LOG(VB_GENERAL, LOG_INFO, QString(" : 0x%1")
557  .arg((quint64)item,0,16));
558  }
559  LOG(VB_GENERAL, LOG_INFO, "");
560 }
561 #endif
562 
564 {
565  QMutexLocker locker(&m_priv->m_lock);
566  while (true)
567  {
568  while (!m_priv->m_deleteThreads.empty())
569  {
570  m_priv->m_deleteThreads.back()->wait();
571  delete m_priv->m_deleteThreads.back();
572  m_priv->m_deleteThreads.pop_back();
573  }
574 
575  if (m_priv->m_running && !m_priv->m_runQueues.empty())
576  {
577  m_priv->m_wait.wait(locker.mutex());
578  continue;
579  }
580 
581  QSet<MPoolThread*> working = m_priv->m_runningThreads;
582  working = working.subtract(m_priv->m_availThreads);
583  if (working.empty())
584  break;
585  m_priv->m_wait.wait(locker.mutex());
586  }
587 }
588 
589 /* vim: set expandtab tabstop=4 shiftwidth=4: */
loggingDeregisterThread
void loggingDeregisterThread(void)
Deregister the current thread's name.
Definition: logging.cpp:742
e
QDomElement e
Definition: mythplugins/mytharchive/mytharchivehelper/main.cpp:1420
hardwareprofile.smolt.timeout
float timeout
Definition: smolt.py:103
MPoolThread::m_runnableName
QString m_runnableName
Definition: mthreadpool.cpp:201
MThreadPool::tryStart
bool tryStart(QRunnable *runnable, const QString &debugName)
Definition: mthreadpool.cpp:382
mythdb.h
MythTimer
A QElapsedTimer based timer to replace use of QTime as a timer.
Definition: mythtimer.h:13
MPoolThread::m_reserved
bool m_reserved
Definition: mthreadpool.cpp:202
MPoolThread::run
void run(void) override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.
Definition: mthreadpool.cpp:113
title
QString title
Definition: mythplugins/mytharchive/mytharchivehelper/main.cpp:636
MThread::wait
bool wait(std::chrono::milliseconds time=std::chrono::milliseconds::max())
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:303
MThreadPoolPrivate::m_expiryTimeout
std::chrono::milliseconds m_expiryTimeout
Definition: mthreadpool.cpp:227
MThreadPool::maxThreadCount
int maxThreadCount(void) const
Definition: mthreadpool.cpp:510
MPoolThread::m_wait
QWaitCondition m_wait
Definition: mthreadpool.cpp:197
MThread::setObjectName
void setObjectName(const QString &name)
Definition: mthread.cpp:241
arg
arg(title).arg(filename).arg(doDelete))
MThreadPoolPrivate::MThreadPoolPrivate
MThreadPoolPrivate(QString name)
Definition: mthreadpool.cpp:215
MThreadPoolPrivate::m_running
bool m_running
Definition: mthreadpool.cpp:226
MThreadPool::MPoolThread
friend class MPoolThread
Definition: mthreadpool.h:20
MThreadPoolPrivate::m_lock
QMutex m_lock
Definition: mthreadpool.cpp:223
LOG
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
Definition: mythlogging.h:23
MThread::RunProlog
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:196
MThreadPoolPrivate
Definition: mthreadpool.cpp:212
MPoolThread::SetRunnable
bool SetRunnable(QRunnable *runnable, QString runnableName, bool reserved)
Definition: mthreadpool.cpp:174
GetMythDB
MythDB * GetMythDB(void)
Definition: mythdb.cpp:45
MThreadPool::Stop
void Stop(void)
Definition: mthreadpool.cpp:266
MThreadPool::startReserved
void startReserved(QRunnable *runnable, const QString &debugName, std::chrono::milliseconds waitForAvailMS=0ms)
Definition: mthreadpool.cpp:360
MPoolThread::m_lock
QMutex m_lock
Definition: mthreadpool.cpp:196
MThreadPool::DeletePoolThreads
void DeletePoolThreads(void)
Definition: mthreadpool.cpp:279
MThreadPool::activeThreadCount
int activeThreadCount(void) const
Definition: mthreadpool.cpp:522
MThreadPool::TryStartInternal
bool TryStartInternal(QRunnable *runnable, const QString &debugName, bool reserved)
Definition: mthreadpool.cpp:388
MThreadPool::ShutdownAllPools
static void ShutdownAllPools(void)
Definition: mthreadpool.cpp:325
MThreadPool::ReleaseThread
void ReleaseThread(void)
Definition: mthreadpool.cpp:543
mythlogging.h
MThreadPoolPrivate::m_maxThreadCount
int m_maxThreadCount
Definition: mthreadpool.cpp:228
MThreadPool::StopAllPools
static void StopAllPools(void)
Definition: mthreadpool.cpp:314
MThreadPool::waitForDone
void waitForDone(void)
Definition: mthreadpool.cpp:563
loggingRegisterThread
void loggingRegisterThread(const QString &name)
Register the current thread with the given name.
Definition: logging.cpp:723
hardwareprofile.i18n.t
t
Definition: i18n.py:36
MPoolThread::Shutdown
void Shutdown(void)
Definition: mthreadpool.cpp:189
MPoolThread::s_thread_num
static uint s_thread_num
Definition: mthreadpool.cpp:205
MThreadPoolPrivate::GetRealMaxThread
int GetRealMaxThread(void) const
Definition: mthreadpool.cpp:218
MThread::RunEpilog
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:209
MPoolQueues
QMap< int, MPoolQueue > MPoolQueues
Definition: mthreadpool.cpp:100
MThreadPoolPrivate::m_name
QString m_name
Definition: mthreadpool.cpp:224
uint
unsigned int uint
Definition: compat.h:140
MThreadPoolPrivate::s_all_pools
static QList< MThreadPool * > s_all_pools
Definition: mthreadpool.cpp:238
MPoolQueue
QList< MPoolEntry > MPoolQueue
Definition: mthreadpool.cpp:99
MThreadPool::setExpiryTimeout
void setExpiryTimeout(std::chrono::milliseconds expiryTimeout)
Definition: mthreadpool.cpp:504
MThreadPoolPrivate::m_deleteThreads
QList< MPoolThread * > m_deleteThreads
Definition: mthreadpool.cpp:234
mthreadpool.h
MPoolThread::m_expiryTimeout
std::chrono::milliseconds m_expiryTimeout
Definition: mthreadpool.cpp:199
MThreadPool
Definition: mthreadpool.h:18
MPoolEntry
QPair< QRunnable *, QString > MPoolEntry
Definition: mthreadpool.cpp:98
MThreadPool::NotifyDone
void NotifyDone(MPoolThread *thread)
Definition: mthreadpool.cpp:488
std
Definition: mythchrono.h:23
MThreadPool::m_priv
MThreadPoolPrivate * m_priv
Definition: mthreadpool.h:57
logging.h
MThreadPool::MThreadPool
MThreadPool(const QString &name)
Definition: mthreadpool.cpp:247
MThreadPoolPrivate::s_pool
static MThreadPool * s_pool
Definition: mthreadpool.cpp:237
MThread
This is a wrapper around QThread that does several additional things.
Definition: mthread.h:49
mthread.h
MPoolThread::m_doRun
bool m_doRun
Definition: mthreadpool.cpp:200
MThreadPoolPrivate::m_wait
QWaitCondition m_wait
Definition: mthreadpool.cpp:225
MThreadPool::expiryTimeout
std::chrono::milliseconds expiryTimeout(void) const
Definition: mthreadpool.cpp:498
MThread::objectName
QString objectName(void) const
Definition: mthread.cpp:246
mythtimer.h
MThreadPool::setMaxThreadCount
void setMaxThreadCount(int maxThreadCount)
Definition: mthreadpool.cpp:516
MThreadPool::NotifyAvailable
void NotifyAvailable(MPoolThread *thread)
Definition: mthreadpool.cpp:446
MPoolThread
Definition: mthreadpool.cpp:102
MThreadPoolPrivate::m_availThreads
QSet< MPoolThread * > m_availThreads
Definition: mthreadpool.cpp:232
MPoolThread::MPoolThread
MPoolThread(MThreadPool &pool, std::chrono::milliseconds timeout)
Definition: mthreadpool.cpp:105
MThread::m_runnable
QRunnable * m_runnable
Definition: mthread.h:137
MPoolThread::m_pool
MThreadPool & m_pool
Definition: mthreadpool.cpp:198
MThreadPool::globalInstance
static MThreadPool * globalInstance(void)
Definition: mthreadpool.cpp:306
MThreadPoolPrivate::m_runningThreads
QSet< MPoolThread * > m_runningThreads
Definition: mthreadpool.cpp:233
MThreadPoolPrivate::m_reserveThread
int m_reserveThread
Definition: mthreadpool.cpp:229
MThreadPool::start
void start(QRunnable *runnable, const QString &debugName, int priority=0)
Definition: mthreadpool.cpp:341
MPoolThread::s_lock
static QMutex s_lock
Definition: mthreadpool.cpp:204
MThreadPool::~MThreadPool
~MThreadPool()
Definition: mthreadpool.cpp:254
MThreadPoolPrivate::s_pool_lock
static QMutex s_pool_lock
Definition: mthreadpool.cpp:236
MThreadPoolPrivate::m_runQueues
MPoolQueues m_runQueues
Definition: mthreadpool.cpp:231