MythTV  master
mthreadpool.cpp
Go to the documentation of this file.
1 /* -*- Mode: c++ -*-
2  *
3  * Class MThreadPool
4  *
5  * Copyright (C) Daniel Kristjansson 2011
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
75 // C++ headers
76 #include <algorithm>
77 
78 // Qt headers
79 #include <QCoreApplication>
80 #include <QList>
81 #include <QMap>
82 #include <QMutex>
83 #include <QMutexLocker>
84 #include <QPair>
85 #include <QRunnable>
86 #include <QRecursiveMutex>
87 #include <QSet>
88 #include <QWaitCondition>
89 #include <utility>
90 
91 // MythTV headers
92 #include "mthreadpool.h"
93 #include "mythlogging.h"
94 #include "mythtimer.h"
95 #include "logging.h"
96 #include "mthread.h"
97 #include "mythdb.h"
98 
99 using MPoolEntry = QPair<QRunnable*,QString>;
100 using MPoolQueue = QList<MPoolEntry>;
101 using MPoolQueues = QMap<int, MPoolQueue>;
102 
103 class MPoolThread : public MThread
104 {
105  public:
106  MPoolThread(MThreadPool &pool, std::chrono::milliseconds timeout) :
107  MThread("PT"), m_pool(pool), m_expiryTimeout(timeout)
108  {
109  QMutexLocker locker(&s_lock);
110  setObjectName(QString("PT%1").arg(s_thread_num));
111  s_thread_num++;
112  }
113 
114  void run(void) override // MThread
115  {
116  RunProlog();
117 
118  MythTimer t;
119  t.start();
120  QMutexLocker locker(&m_lock);
121  while (true)
122  {
123  if (m_doRun && !m_runnable)
124  m_wait.wait(locker.mutex(), (m_expiryTimeout+1ms).count());
125 
126  if (!m_runnable)
127  {
128  m_doRun = false;
129 
130  locker.unlock();
131  m_pool.NotifyDone(this);
132  locker.relock();
133  break;
134  }
135 
136  if (!m_runnableName.isEmpty())
138 
139  bool autodelete = m_runnable->autoDelete();
140  m_runnable->run();
141  if (autodelete)
142  delete m_runnable;
143  if (m_reserved)
145  m_reserved = false;
146  m_runnable = nullptr;
147 
150 
151  GetMythDB()->GetDBManager()->PurgeIdleConnections(false);
152  qApp->processEvents();
153  qApp->sendPostedEvents(nullptr, QEvent::DeferredDelete);
154 
155  t.start();
156 
157  if (m_doRun)
158  {
159  locker.unlock();
160  m_pool.NotifyAvailable(this);
161  locker.relock();
162  }
163  else
164  {
165  locker.unlock();
166  m_pool.NotifyDone(this);
167  locker.relock();
168  break;
169  }
170  }
171 
172  RunEpilog();
173  }
174 
175  bool SetRunnable(QRunnable *runnable, QString runnableName,
176  bool reserved)
177  {
178  QMutexLocker locker(&m_lock);
179  if (m_doRun && (m_runnable == nullptr))
180  {
181  m_runnable = runnable;
182  m_runnableName = std::move(runnableName);
183  m_reserved = reserved;
184  m_wait.wakeAll();
185  return true;
186  }
187  return false;
188  }
189 
190  void Shutdown(void)
191  {
192  QMutexLocker locker(&m_lock);
193  m_doRun = false;
194  m_wait.wakeAll();
195  }
196 
197  QMutex m_lock;
198  QWaitCondition m_wait;
200  std::chrono::milliseconds m_expiryTimeout;
201  bool m_doRun {true};
202  QString m_runnableName;
203  bool m_reserved {false};
204 
205  static QMutex s_lock;
207 };
208 QMutex MPoolThread::s_lock;
210 
212 
214 {
215  public:
216  explicit MThreadPoolPrivate(QString name) :
217  m_name(std::move(name)) {}
218 
219  int GetRealMaxThread(void) const
220  {
221  return std::max(m_maxThreadCount,1) + m_reserveThread;
222  }
223 
224  mutable QMutex m_lock;
225  QString m_name;
226  QWaitCondition m_wait;
227  bool m_running {true};
228  std::chrono::milliseconds m_expiryTimeout {2min};
229  int m_maxThreadCount {QThread::idealThreadCount()};
231 
233  QSet<MPoolThread*> m_availThreads;
234  QSet<MPoolThread*> m_runningThreads;
235  QList<MPoolThread*> m_deleteThreads;
236 
237  static QRecursiveMutex s_pool_lock;
239  static QList<MThreadPool*> s_all_pools;
240 };
241 
242 QRecursiveMutex MThreadPoolPrivate::s_pool_lock;
244 QList<MThreadPool*> MThreadPoolPrivate::s_all_pools;
245 
247 
248 MThreadPool::MThreadPool(const QString &name) :
249  m_priv(new MThreadPoolPrivate(name))
250 {
251  QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
252  MThreadPoolPrivate::s_all_pools.push_back(this);
253 }
254 
256 {
257  Stop();
259  {
260  QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
261  MThreadPoolPrivate::s_all_pools.removeAll(this);
262  }
263  delete m_priv;
264  m_priv = nullptr;
265 }
266 
268 {
269  QMutexLocker locker(&m_priv->m_lock);
270  m_priv->m_running = false;
271  QSet<MPoolThread*>::iterator it = m_priv->m_availThreads.begin();
272  for (; it != m_priv->m_availThreads.end(); ++it)
273  (*it)->Shutdown();
274  it = m_priv->m_runningThreads.begin();
275  for (; it != m_priv->m_runningThreads.end(); ++it)
276  (*it)->Shutdown();
277  m_priv->m_wait.wakeAll();
278 }
279 
281 {
282  waitForDone();
283 
284  QMutexLocker locker(&m_priv->m_lock);
285  for (auto *thread : std::as_const(m_priv->m_availThreads))
286  {
287  m_priv->m_deleteThreads.push_front(thread);
288  }
289  m_priv->m_availThreads.clear();
290 
291  while (!m_priv->m_deleteThreads.empty())
292  {
293  MPoolThread *thread = m_priv->m_deleteThreads.back();
294  locker.unlock();
295 
296  thread->wait();
297 
298  locker.relock();
299  delete thread;
300  if (m_priv->m_deleteThreads.back() == thread)
301  m_priv->m_deleteThreads.pop_back();
302  else
303  m_priv->m_deleteThreads.removeAll(thread);
304  }
305 }
306 
308 {
309  QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
311  MThreadPoolPrivate::s_pool = new MThreadPool("GlobalPool");
313 }
314 
316 {
317  QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
318  QList<MThreadPool*>::iterator it;
319  for (it = MThreadPoolPrivate::s_all_pools.begin();
320  it != MThreadPoolPrivate::s_all_pools.end(); ++it)
321  {
322  (*it)->Stop();
323  }
324 }
325 
327 {
328  QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
329  QList<MThreadPool*>::iterator it;
330  for (it = MThreadPoolPrivate::s_all_pools.begin();
331  it != MThreadPoolPrivate::s_all_pools.end(); ++it)
332  {
333  (*it)->Stop();
334  }
335  for (it = MThreadPoolPrivate::s_all_pools.begin();
336  it != MThreadPoolPrivate::s_all_pools.end(); ++it)
337  {
338  (*it)->DeletePoolThreads();
339  }
340 }
341 
342 void MThreadPool::start(QRunnable *runnable, const QString& debugName, int priority)
343 {
344  QMutexLocker locker(&m_priv->m_lock);
345  if (TryStartInternal(runnable, debugName, false))
346  return;
347 
348  MPoolQueues::iterator it = m_priv->m_runQueues.find(priority);
349  if (it != m_priv->m_runQueues.end())
350  {
351  (*it).push_back(MPoolEntry(runnable,debugName));
352  }
353  else
354  {
355  MPoolQueue list;
356  list.push_back(MPoolEntry(runnable,debugName));
357  m_priv->m_runQueues[priority] = list;
358  }
359 }
360 
362  QRunnable *runnable, const QString& debugName,
363  std::chrono::milliseconds waitForAvailMS)
364 {
365  QMutexLocker locker(&m_priv->m_lock);
366  if (waitForAvailMS > 0ms && m_priv->m_availThreads.empty() &&
368  {
369  MythTimer t;
370  t.start();
371  auto left = waitForAvailMS - t.elapsed();
372  while (left > 0ms && m_priv->m_availThreads.empty() &&
374  {
375  m_priv->m_wait.wait(locker.mutex(), left.count());
376  left = waitForAvailMS - t.elapsed();
377  }
378  }
379  TryStartInternal(runnable, debugName, true);
380 }
381 
382 
383 bool MThreadPool::tryStart(QRunnable *runnable, const QString& debugName)
384 {
385  QMutexLocker locker(&m_priv->m_lock);
386  return TryStartInternal(runnable, debugName, false);
387 }
388 
390  QRunnable *runnable, const QString& debugName, bool reserved)
391 {
392  if (!m_priv->m_running)
393  return false;
394 
395  while (!m_priv->m_deleteThreads.empty())
396  {
397  m_priv->m_deleteThreads.back()->wait();
398  delete m_priv->m_deleteThreads.back();
399  m_priv->m_deleteThreads.pop_back();
400  }
401 
402  for (auto iter = m_priv->m_availThreads.begin();
403  iter != m_priv->m_availThreads.end(); )
404  {
405  MPoolThread *thread = *iter;
406  iter = m_priv->m_availThreads.erase(iter);
407  m_priv->m_runningThreads.insert(thread);
408  if (reserved)
410  if (thread->SetRunnable(runnable, debugName, reserved))
411  {
412  return true;
413  }
414 
415  if (reserved)
417  thread->Shutdown();
418  m_priv->m_runningThreads.remove(thread);
419  m_priv->m_deleteThreads.push_front(thread);
420  }
421 
422  if (reserved ||
424  {
425  if (reserved)
427  auto *thread = new MPoolThread(*this, m_priv->m_expiryTimeout);
428  m_priv->m_runningThreads.insert(thread);
429  thread->SetRunnable(runnable, debugName, reserved);
430  thread->start();
431  if (thread->isRunning())
432  {
433  return true;
434  }
435 
436  // Thread failed to run, OOM?
437  // QThread will print an error, so we don't have to
438  if (reserved)
440  thread->Shutdown();
441  m_priv->m_runningThreads.remove(thread);
442  m_priv->m_deleteThreads.push_front(thread);
443  }
444 
445  return false;
446 }
447 
449 {
450  QMutexLocker locker(&m_priv->m_lock);
451 
452  if (!m_priv->m_running)
453  {
454  m_priv->m_runningThreads.remove(thread);
455  thread->Shutdown();
456  m_priv->m_deleteThreads.push_front(thread);
457  m_priv->m_wait.wakeAll();
458  return;
459  }
460 
461  MPoolQueues::iterator it = m_priv->m_runQueues.begin();
462  if (it == m_priv->m_runQueues.end())
463  {
464  m_priv->m_runningThreads.remove(thread);
465  m_priv->m_availThreads.insert(thread);
466  m_priv->m_wait.wakeAll();
467  return;
468  }
469 
470  MPoolEntry e = (*it).front();
471  if (!thread->SetRunnable(e.first, e.second, false))
472  {
473  m_priv->m_runningThreads.remove(thread);
474  m_priv->m_wait.wakeAll();
475  if (!TryStartInternal(e.first, e.second, false))
476  {
477  thread->Shutdown();
478  m_priv->m_deleteThreads.push_front(thread);
479  return;
480  }
481  thread->Shutdown();
482  m_priv->m_deleteThreads.push_front(thread);
483  }
484 
485  (*it).pop_front();
486  if ((*it).empty())
487  m_priv->m_runQueues.erase(it);
488 }
489 
491 {
492  QMutexLocker locker(&m_priv->m_lock);
493  m_priv->m_runningThreads.remove(thread);
494  m_priv->m_availThreads.remove(thread);
495  if (!m_priv->m_deleteThreads.contains(thread))
496  m_priv->m_deleteThreads.push_front(thread);
497  m_priv->m_wait.wakeAll();
498 }
499 
500 std::chrono::milliseconds MThreadPool::expiryTimeout(void) const
501 {
502  QMutexLocker locker(&m_priv->m_lock);
503  return m_priv->m_expiryTimeout;
504 }
505 
506 void MThreadPool::setExpiryTimeout(std::chrono::milliseconds expiryTimeout)
507 {
508  QMutexLocker locker(&m_priv->m_lock);
510 }
511 
513 {
514  QMutexLocker locker(&m_priv->m_lock);
515  return m_priv->m_maxThreadCount;
516 }
517 
518 void MThreadPool::setMaxThreadCount(int maxThreadCount)
519 {
520  QMutexLocker locker(&m_priv->m_lock);
522 }
523 
525 {
526  QMutexLocker locker(&m_priv->m_lock);
527  return m_priv->m_availThreads.size() + m_priv->m_runningThreads.size();
528 }
529 
530 /*
531 void MThreadPool::reserveThread(void)
532 {
533  QMutexLocker locker(&m_priv->m_lock);
534  m_priv->m_reserveThread++;
535 }
536 
537 void MThreadPool::releaseThread(void)
538 {
539  QMutexLocker locker(&m_priv->m_lock);
540  if (m_priv->m_reserveThread > 0)
541  m_priv->m_reserveThread--;
542 }
543 */
544 
546 {
547  QMutexLocker locker(&m_priv->m_lock);
548  if (m_priv->m_reserveThread > 0)
550 }
551 
552 #if 0
553 static void print_set(QString title, QSet<MPoolThread*> set)
554 {
555  LOG(VB_GENERAL, LOG_INFO, title);
556  for (auto item : std::as_const(set))
557  {
558  LOG(VB_GENERAL, LOG_INFO, QString(" : 0x%1")
559  .arg((quint64)item,0,16));
560  }
561  LOG(VB_GENERAL, LOG_INFO, "");
562 }
563 #endif
564 
566 {
567  QMutexLocker locker(&m_priv->m_lock);
568  while (true)
569  {
570  while (!m_priv->m_deleteThreads.empty())
571  {
572  m_priv->m_deleteThreads.back()->wait();
573  delete m_priv->m_deleteThreads.back();
574  m_priv->m_deleteThreads.pop_back();
575  }
576 
577  if (m_priv->m_running && !m_priv->m_runQueues.empty())
578  {
579  m_priv->m_wait.wait(locker.mutex());
580  continue;
581  }
582 
583  QSet<MPoolThread*> working = m_priv->m_runningThreads;
584  working = working.subtract(m_priv->m_availThreads);
585  if (working.empty())
586  break;
587  m_priv->m_wait.wait(locker.mutex());
588  }
589 }
590 
591 /* vim: set expandtab tabstop=4 shiftwidth=4: */
loggingDeregisterThread
void loggingDeregisterThread(void)
Deregister the current thread's name.
Definition: logging.cpp:703
hardwareprofile.smolt.timeout
float timeout
Definition: smolt.py:102
MPoolThread::m_runnableName
QString m_runnableName
Definition: mthreadpool.cpp:202
MThreadPool::tryStart
bool tryStart(QRunnable *runnable, const QString &debugName)
Definition: mthreadpool.cpp:383
mythdb.h
MythTimer
A QElapsedTimer based timer to replace use of QTime as a timer.
Definition: mythtimer.h:13
MPoolThread::m_reserved
bool m_reserved
Definition: mthreadpool.cpp:203
MPoolThread::run
void run(void) override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.
Definition: mthreadpool.cpp:114
MThread::wait
bool wait(std::chrono::milliseconds time=std::chrono::milliseconds::max())
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:300
MThreadPoolPrivate::m_expiryTimeout
std::chrono::milliseconds m_expiryTimeout
Definition: mthreadpool.cpp:228
MThreadPool::maxThreadCount
int maxThreadCount(void) const
Definition: mthreadpool.cpp:512
MPoolThread::m_wait
QWaitCondition m_wait
Definition: mthreadpool.cpp:198
MThread::setObjectName
void setObjectName(const QString &name)
Definition: mthread.cpp:238
MThreadPoolPrivate::MThreadPoolPrivate
MThreadPoolPrivate(QString name)
Definition: mthreadpool.cpp:216
MThreadPoolPrivate::m_running
bool m_running
Definition: mthreadpool.cpp:227
MThreadPool::MPoolThread
friend class MPoolThread
Definition: mthreadpool.h:20
MThreadPoolPrivate::m_lock
QMutex m_lock
Definition: mthreadpool.cpp:224
LOG
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
Definition: mythlogging.h:39
MThread::RunProlog
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:196
MThreadPoolPrivate
Definition: mthreadpool.cpp:213
MPoolThread::SetRunnable
bool SetRunnable(QRunnable *runnable, QString runnableName, bool reserved)
Definition: mthreadpool.cpp:175
GetMythDB
MythDB * GetMythDB(void)
Definition: mythdb.cpp:50
MThreadPool::Stop
void Stop(void)
Definition: mthreadpool.cpp:267
MThreadPool::startReserved
void startReserved(QRunnable *runnable, const QString &debugName, std::chrono::milliseconds waitForAvailMS=0ms)
Definition: mthreadpool.cpp:361
MPoolThread::m_lock
QMutex m_lock
Definition: mthreadpool.cpp:197
MThreadPool::DeletePoolThreads
void DeletePoolThreads(void)
Definition: mthreadpool.cpp:280
MThreadPool::activeThreadCount
int activeThreadCount(void) const
Definition: mthreadpool.cpp:524
MThreadPool::TryStartInternal
bool TryStartInternal(QRunnable *runnable, const QString &debugName, bool reserved)
Definition: mthreadpool.cpp:389
MThreadPool::ShutdownAllPools
static void ShutdownAllPools(void)
Definition: mthreadpool.cpp:326
MThreadPool::ReleaseThread
void ReleaseThread(void)
Definition: mthreadpool.cpp:545
mythlogging.h
MThreadPoolPrivate::m_maxThreadCount
int m_maxThreadCount
Definition: mthreadpool.cpp:229
MThreadPool::StopAllPools
static void StopAllPools(void)
Definition: mthreadpool.cpp:315
MThreadPool::waitForDone
void waitForDone(void)
Definition: mthreadpool.cpp:565
loggingRegisterThread
void loggingRegisterThread(const QString &name)
Register the current thread with the given name.
Definition: logging.cpp:684
hardwareprofile.i18n.t
t
Definition: i18n.py:36
MPoolThread::Shutdown
void Shutdown(void)
Definition: mthreadpool.cpp:190
MPoolThread::s_thread_num
static uint s_thread_num
Definition: mthreadpool.cpp:206
MThreadPoolPrivate::GetRealMaxThread
int GetRealMaxThread(void) const
Definition: mthreadpool.cpp:219
MThread::RunEpilog
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:209
MPoolQueues
QMap< int, MPoolQueue > MPoolQueues
Definition: mthreadpool.cpp:101
MThreadPoolPrivate::m_name
QString m_name
Definition: mthreadpool.cpp:225
uint
unsigned int uint
Definition: compat.h:81
MThreadPoolPrivate::s_all_pools
static QList< MThreadPool * > s_all_pools
Definition: mthreadpool.cpp:239
MPoolQueue
QList< MPoolEntry > MPoolQueue
Definition: mthreadpool.cpp:100
MThreadPool::setExpiryTimeout
void setExpiryTimeout(std::chrono::milliseconds expiryTimeout)
Definition: mthreadpool.cpp:506
MThreadPoolPrivate::s_pool_lock
static QRecursiveMutex s_pool_lock
Definition: mthreadpool.cpp:237
MThreadPoolPrivate::m_deleteThreads
QList< MPoolThread * > m_deleteThreads
Definition: mthreadpool.cpp:235
mthreadpool.h
MPoolThread::m_expiryTimeout
std::chrono::milliseconds m_expiryTimeout
Definition: mthreadpool.cpp:200
MThreadPool
Definition: mthreadpool.h:18
MPoolEntry
QPair< QRunnable *, QString > MPoolEntry
Definition: mthreadpool.cpp:99
MThreadPool::NotifyDone
void NotifyDone(MPoolThread *thread)
Definition: mthreadpool.cpp:490
MThreadPool::m_priv
MThreadPoolPrivate * m_priv
Definition: mthreadpool.h:57
logging.h
MThreadPool::MThreadPool
MThreadPool(const QString &name)
Definition: mthreadpool.cpp:248
MThreadPoolPrivate::s_pool
static MThreadPool * s_pool
Definition: mthreadpool.cpp:238
MThread
This is a wrapper around QThread that does several additional things.
Definition: mthread.h:48
mthread.h
MPoolThread::m_doRun
bool m_doRun
Definition: mthreadpool.cpp:201
MThreadPoolPrivate::m_wait
QWaitCondition m_wait
Definition: mthreadpool.cpp:226
MThreadPool::expiryTimeout
std::chrono::milliseconds expiryTimeout(void) const
Definition: mthreadpool.cpp:500
MThread::objectName
QString objectName(void) const
Definition: mthread.cpp:243
mythtimer.h
MThreadPool::setMaxThreadCount
void setMaxThreadCount(int maxThreadCount)
Definition: mthreadpool.cpp:518
MThreadPool::NotifyAvailable
void NotifyAvailable(MPoolThread *thread)
Definition: mthreadpool.cpp:448
MPoolThread
Definition: mthreadpool.cpp:103
MThreadPoolPrivate::m_availThreads
QSet< MPoolThread * > m_availThreads
Definition: mthreadpool.cpp:233
MPoolThread::MPoolThread
MPoolThread(MThreadPool &pool, std::chrono::milliseconds timeout)
Definition: mthreadpool.cpp:106
MThread::m_runnable
QRunnable * m_runnable
Definition: mthread.h:136
MPoolThread::m_pool
MThreadPool & m_pool
Definition: mthreadpool.cpp:199
MThreadPool::globalInstance
static MThreadPool * globalInstance(void)
Definition: mthreadpool.cpp:307
MThreadPoolPrivate::m_runningThreads
QSet< MPoolThread * > m_runningThreads
Definition: mthreadpool.cpp:234
MThreadPoolPrivate::m_reserveThread
int m_reserveThread
Definition: mthreadpool.cpp:230
MThreadPool::start
void start(QRunnable *runnable, const QString &debugName, int priority=0)
Definition: mthreadpool.cpp:342
MPoolThread::s_lock
static QMutex s_lock
Definition: mthreadpool.cpp:205
MThreadPool::~MThreadPool
~MThreadPool()
Definition: mthreadpool.cpp:255
MThreadPoolPrivate::m_runQueues
MPoolQueues m_runQueues
Definition: mthreadpool.cpp:232