MythTV  master
mthreadpool.cpp
Go to the documentation of this file.
1 /* -*- Mode: c++ -*-
2  *
3  * Class MThreadPool
4  *
5  * Copyright (C) Daniel Kristjansson 2011
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  */
21 
75 // C++ headers
76 #include <algorithm>
77 using namespace std;
78 
79 // Qt headers
80 #include <QCoreApplication>
81 #include <QList>
82 #include <QMap>
83 #include <QMutex>
84 #include <QMutexLocker>
85 #include <QPair>
86 #include <QRunnable>
87 #include <QSet>
88 #include <QWaitCondition>
89 #include <utility>
90 
91 // MythTV headers
92 #include "mthreadpool.h"
93 #include "mythlogging.h"
94 #include "mythtimer.h"
95 #include "logging.h"
96 #include "mthread.h"
97 #include "mythdb.h"
98 
99 using MPoolEntry = QPair<QRunnable*,QString>;
100 using MPoolQueue = QList<MPoolEntry>;
101 using MPoolQueues = QMap<int, MPoolQueue>;
102 
103 class MPoolThread : public MThread
104 {
105  public:
107  MThread("PT"), m_pool(pool), m_expiryTimeout(timeout)
108  {
109  QMutexLocker locker(&s_lock);
110  setObjectName(QString("PT%1").arg(s_thread_num));
111  s_thread_num++;
112  }
113 
114  void run(void) override // MThread
115  {
116  RunProlog();
117 
118  MythTimer t;
119  t.start();
120  QMutexLocker locker(&m_lock);
121  while (true)
122  {
123  if (m_doRun && !m_runnable)
124  m_wait.wait(locker.mutex(), m_expiryTimeout+1);
125 
126  if (!m_runnable)
127  {
128  m_doRun = false;
129 
130  locker.unlock();
131  m_pool.NotifyDone(this);
132  locker.relock();
133  break;
134  }
135 
136  if (!m_runnableName.isEmpty())
137  loggingRegisterThread(m_runnableName);
138 
139  bool autodelete = m_runnable->autoDelete();
140  m_runnable->run();
141  if (autodelete)
142  delete m_runnable;
143  if (m_reserved)
144  m_pool.ReleaseThread();
145  m_reserved = false;
146  m_runnable = nullptr;
147 
149  loggingRegisterThread(objectName());
150 
151  GetMythDB()->GetDBManager()->PurgeIdleConnections(false);
152  qApp->processEvents();
153  qApp->sendPostedEvents(nullptr, QEvent::DeferredDelete);
154 
155  t.start();
156 
157  if (m_doRun)
158  {
159  locker.unlock();
160  m_pool.NotifyAvailable(this);
161  locker.relock();
162  }
163  else
164  {
165  locker.unlock();
166  m_pool.NotifyDone(this);
167  locker.relock();
168  break;
169  }
170  }
171 
172  RunEpilog();
173  }
174 
175  bool SetRunnable(QRunnable *runnable, QString runnableName,
176  bool reserved)
177  {
178  QMutexLocker locker(&m_lock);
179  if (m_doRun && (m_runnable == nullptr))
180  {
181  m_runnable = runnable;
182  m_runnableName = std::move(runnableName);
183  m_reserved = reserved;
184  m_wait.wakeAll();
185  return true;
186  }
187  return false;
188  }
189 
190  void Shutdown(void)
191  {
192  QMutexLocker locker(&m_lock);
193  m_doRun = false;
194  m_wait.wakeAll();
195  }
196 
197  QMutex m_lock;
198  QWaitCondition m_wait;
201  bool m_doRun {true};
202  QString m_runnableName;
203  bool m_reserved {false};
204 
205  static QMutex s_lock;
207 };
208 QMutex MPoolThread::s_lock;
210 
212 
214 {
215  public:
216  explicit MThreadPoolPrivate(QString name) :
217  m_name(std::move(name)) {}
218 
220  {
221  return max(m_maxThreadCount,1) + m_reserveThread;
222  }
223 
224  mutable QMutex m_lock;
225  QString m_name;
226  QWaitCondition m_wait;
227  bool m_running {true};
228  int m_expiryTimeout {120 * 1000};
229  int m_maxThreadCount {QThread::idealThreadCount()};
230  int m_reserveThread {0};
231 
233  QSet<MPoolThread*> m_availThreads;
234  QSet<MPoolThread*> m_runningThreads;
235  QList<MPoolThread*> m_deleteThreads;
236 
237  static QMutex s_pool_lock;
239  static QList<MThreadPool*> s_all_pools;
240 };
241 
242 QMutex MThreadPoolPrivate::s_pool_lock(QMutex::Recursive);
244 QList<MThreadPool*> MThreadPoolPrivate::s_all_pools;
245 
247 
248 MThreadPool::MThreadPool(const QString &name) :
249  m_priv(new MThreadPoolPrivate(name))
250 {
251  QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
252  MThreadPoolPrivate::s_all_pools.push_back(this);
253 }
254 
256 {
257  Stop();
259  {
260  QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
261  MThreadPoolPrivate::s_all_pools.removeAll(this);
262  }
263  delete m_priv;
264  m_priv = nullptr;
265 }
266 
268 {
269  QMutexLocker locker(&m_priv->m_lock);
270  m_priv->m_running = false;
271  QSet<MPoolThread*>::iterator it = m_priv->m_availThreads.begin();
272  for (; it != m_priv->m_availThreads.end(); ++it)
273  (*it)->Shutdown();
274  it = m_priv->m_runningThreads.begin();
275  for (; it != m_priv->m_runningThreads.end(); ++it)
276  (*it)->Shutdown();
277  m_priv->m_wait.wakeAll();
278 }
279 
281 {
282  waitForDone();
283 
284  QMutexLocker locker(&m_priv->m_lock);
285  foreach (auto thread, m_priv->m_availThreads)
286  {
287  m_priv->m_deleteThreads.push_front(thread);
288  }
289  m_priv->m_availThreads.clear();
290 
291  while (!m_priv->m_deleteThreads.empty())
292  {
293  MPoolThread *thread = m_priv->m_deleteThreads.back();
294  locker.unlock();
295 
296  thread->wait();
297 
298  locker.relock();
299  delete thread;
300  if (m_priv->m_deleteThreads.back() == thread)
301  m_priv->m_deleteThreads.pop_back();
302  else
303  m_priv->m_deleteThreads.removeAll(thread);
304  }
305 }
306 
308 {
309  QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
311  MThreadPoolPrivate::s_pool = new MThreadPool("GlobalPool");
313 }
314 
316 {
317  QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
318  QList<MThreadPool*>::iterator it;
319  for (it = MThreadPoolPrivate::s_all_pools.begin();
320  it != MThreadPoolPrivate::s_all_pools.end(); ++it)
321  {
322  (*it)->Stop();
323  }
324 }
325 
327 {
328  QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
329  QList<MThreadPool*>::iterator it;
330  for (it = MThreadPoolPrivate::s_all_pools.begin();
331  it != MThreadPoolPrivate::s_all_pools.end(); ++it)
332  {
333  (*it)->Stop();
334  }
335  for (it = MThreadPoolPrivate::s_all_pools.begin();
336  it != MThreadPoolPrivate::s_all_pools.end(); ++it)
337  {
338  (*it)->DeletePoolThreads();
339  }
340 }
341 
342 void MThreadPool::start(QRunnable *runnable, const QString& debugName, int priority)
343 {
344  QMutexLocker locker(&m_priv->m_lock);
345  if (TryStartInternal(runnable, debugName, false))
346  return;
347 
348  MPoolQueues::iterator it = m_priv->m_runQueues.find(priority);
349  if (it != m_priv->m_runQueues.end())
350  {
351  (*it).push_back(MPoolEntry(runnable,debugName));
352  }
353  else
354  {
355  MPoolQueue list;
356  list.push_back(MPoolEntry(runnable,debugName));
357  m_priv->m_runQueues[priority] = list;
358  }
359 }
360 
362  QRunnable *runnable, const QString& debugName, int waitForAvailMS)
363 {
364  QMutexLocker locker(&m_priv->m_lock);
365  if (waitForAvailMS > 0 && m_priv->m_availThreads.empty() &&
367  {
368  MythTimer t;
369  t.start();
370  int left = waitForAvailMS - t.elapsed();
371  while (left > 0 && m_priv->m_availThreads.empty() &&
373  {
374  m_priv->m_wait.wait(locker.mutex(), left);
375  left = waitForAvailMS - t.elapsed();
376  }
377  }
378  TryStartInternal(runnable, debugName, true);
379 }
380 
381 
382 bool MThreadPool::tryStart(QRunnable *runnable, const QString& debugName)
383 {
384  QMutexLocker locker(&m_priv->m_lock);
385  return TryStartInternal(runnable, debugName, false);
386 }
387 
389  QRunnable *runnable, const QString& debugName, bool reserved)
390 {
391  if (!m_priv->m_running)
392  return false;
393 
394  while (!m_priv->m_deleteThreads.empty())
395  {
396  m_priv->m_deleteThreads.back()->wait();
397  delete m_priv->m_deleteThreads.back();
398  m_priv->m_deleteThreads.pop_back();
399  }
400 
401  while (m_priv->m_availThreads.begin() != m_priv->m_availThreads.end())
402  {
403  MPoolThread *thread = *m_priv->m_availThreads.begin();
404  m_priv->m_availThreads.erase(m_priv->m_availThreads.begin());
405  m_priv->m_runningThreads.insert(thread);
406  if (reserved)
408  if (thread->SetRunnable(runnable, debugName, reserved))
409  {
410  return true;
411  }
412 
413  if (reserved)
415  thread->Shutdown();
416  m_priv->m_runningThreads.remove(thread);
417  m_priv->m_deleteThreads.push_front(thread);
418  }
419 
420  if (reserved ||
422  {
423  if (reserved)
425  auto *thread = new MPoolThread(*this, m_priv->m_expiryTimeout);
426  m_priv->m_runningThreads.insert(thread);
427  thread->SetRunnable(runnable, debugName, reserved);
428  thread->start();
429  if (thread->isRunning())
430  {
431  return true;
432  }
433 
434  // Thread failed to run, OOM?
435  // QThread will print an error, so we don't have to
436  if (reserved)
438  thread->Shutdown();
439  m_priv->m_runningThreads.remove(thread);
440  m_priv->m_deleteThreads.push_front(thread);
441  }
442 
443  return false;
444 }
445 
447 {
448  QMutexLocker locker(&m_priv->m_lock);
449 
450  if (!m_priv->m_running)
451  {
452  m_priv->m_runningThreads.remove(thread);
453  thread->Shutdown();
454  m_priv->m_deleteThreads.push_front(thread);
455  m_priv->m_wait.wakeAll();
456  return;
457  }
458 
459  MPoolQueues::iterator it = m_priv->m_runQueues.begin();
460  if (it == m_priv->m_runQueues.end())
461  {
462  m_priv->m_runningThreads.remove(thread);
463  m_priv->m_availThreads.insert(thread);
464  m_priv->m_wait.wakeAll();
465  return;
466  }
467 
468  MPoolEntry e = (*it).front();
469  if (!thread->SetRunnable(e.first, e.second, false))
470  {
471  m_priv->m_runningThreads.remove(thread);
472  m_priv->m_wait.wakeAll();
473  if (!TryStartInternal(e.first, e.second, false))
474  {
475  thread->Shutdown();
476  m_priv->m_deleteThreads.push_front(thread);
477  return;
478  }
479  thread->Shutdown();
480  m_priv->m_deleteThreads.push_front(thread);
481  }
482 
483  (*it).pop_front();
484  if ((*it).empty())
485  m_priv->m_runQueues.erase(it);
486 }
487 
489 {
490  QMutexLocker locker(&m_priv->m_lock);
491  m_priv->m_runningThreads.remove(thread);
492  m_priv->m_availThreads.remove(thread);
493  if (!m_priv->m_deleteThreads.contains(thread))
494  m_priv->m_deleteThreads.push_front(thread);
495  m_priv->m_wait.wakeAll();
496 }
497 
499 {
500  QMutexLocker locker(&m_priv->m_lock);
501  return m_priv->m_expiryTimeout;
502 }
503 
504 void MThreadPool::setExpiryTimeout(int expiryTimeout)
505 {
506  QMutexLocker locker(&m_priv->m_lock);
508 }
509 
511 {
512  QMutexLocker locker(&m_priv->m_lock);
513  return m_priv->m_maxThreadCount;
514 }
515 
516 void MThreadPool::setMaxThreadCount(int maxThreadCount)
517 {
518  QMutexLocker locker(&m_priv->m_lock);
520 }
521 
523 {
524  QMutexLocker locker(&m_priv->m_lock);
525  return m_priv->m_availThreads.size() + m_priv->m_runningThreads.size();
526 }
527 
528 /*
529 void MThreadPool::reserveThread(void)
530 {
531  QMutexLocker locker(&m_priv->m_lock);
532  m_priv->m_reserveThread++;
533 }
534 
535 void MThreadPool::releaseThread(void)
536 {
537  QMutexLocker locker(&m_priv->m_lock);
538  if (m_priv->m_reserveThread > 0)
539  m_priv->m_reserveThread--;
540 }
541 */
542 
544 {
545  QMutexLocker locker(&m_priv->m_lock);
546  if (m_priv->m_reserveThread > 0)
548 }
549 
550 #if 0
551 static void print_set(QString title, QSet<MPoolThread*> set)
552 {
553  LOG(VB_GENERAL, LOG_INFO, title);
554  foreach (auto item, set)
555  {
556  LOG(VB_GENERAL, LOG_INFO, QString(" : 0x%1")
557  .arg((quint64)item,0,16));
558  }
559  LOG(VB_GENERAL, LOG_INFO, "");
560 }
561 #endif
562 
564 {
565  QMutexLocker locker(&m_priv->m_lock);
566  while (true)
567  {
568  while (!m_priv->m_deleteThreads.empty())
569  {
570  m_priv->m_deleteThreads.back()->wait();
571  delete m_priv->m_deleteThreads.back();
572  m_priv->m_deleteThreads.pop_back();
573  }
574 
575  if (m_priv->m_running && !m_priv->m_runQueues.empty())
576  {
577  m_priv->m_wait.wait(locker.mutex());
578  continue;
579  }
580 
581  QSet<MPoolThread*> working = m_priv->m_runningThreads;
582  working = working.subtract(m_priv->m_availThreads);
583  if (working.empty())
584  break;
585  m_priv->m_wait.wait(locker.mutex());
586  }
587 }
588 
589 /* vim: set expandtab tabstop=4 shiftwidth=4: */
MThreadPoolPrivate * m_priv
Definition: mthreadpool.h:56
void setMaxThreadCount(int maxThreadCount)
int maxThreadCount(void) const
This is a wrapper around QThread that does several additional things.
Definition: mthread.h:46
A QElapsedTimer based timer to replace use of QTime as a timer.
Definition: mythtimer.h:13
static QList< MThreadPool * > s_all_pools
bool SetRunnable(QRunnable *runnable, QString runnableName, bool reserved)
void Stop(void)
bool wait(unsigned long time=ULONG_MAX)
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:311
bool TryStartInternal(QRunnable *runnable, const QString &debugName, bool reserved)
int GetRealMaxThread(void)
MPoolThread(MThreadPool &pool, int timeout)
static void ShutdownAllPools(void)
void ReleaseThread(void)
static QMutex s_lock
void loggingRegisterThread(const QString &name)
Register the current thread with the given name.
Definition: logging.cpp:765
MThreadPoolPrivate(QString name)
void waitForDone(void)
QString m_runnableName
QSet< MPoolThread * > m_availThreads
void run(void) override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.
QList< MPoolEntry > MPoolQueue
QWaitCondition m_wait
void DeletePoolThreads(void)
QSet< MPoolThread * > m_runningThreads
static QMutex s_pool_lock
QList< MPoolThread * > m_deleteThreads
MThreadPool(const QString &name)
void NotifyDone(MPoolThread *thread)
unsigned int uint
Definition: compat.h:140
void loggingDeregisterThread(void)
Deregister the current thread's name.
Definition: logging.cpp:784
int activeThreadCount(void) const
void setExpiryTimeout(int expiryTimeout)
void Shutdown(void)
const char * m_name
Definition: ParseText.cpp:328
QPair< QRunnable *, QString > MPoolEntry
Definition: mthreadpool.cpp:99
QWaitCondition m_wait
static MThreadPool * globalInstance(void)
int expiryTimeout(void) const
static void StopAllPools(void)
#define LOG(_MASK_, _LEVEL_, _STRING_)
Definition: mythlogging.h:41
void NotifyAvailable(MPoolThread *thread)
void start(QRunnable *runnable, const QString &debugName, int priority=0)
static uint s_thread_num
QMap< int, MPoolQueue > MPoolQueues
MThreadPool & m_pool
void startReserved(QRunnable *runnable, const QString &debugName, int waitForAvailMS=0)
bool tryStart(QRunnable *runnable, const QString &debugName)
static MThreadPool * s_pool
MythDB * GetMythDB(void)
Definition: mythdb.cpp:46
MPoolQueues m_runQueues
friend class MPoolThread
Definition: mthreadpool.h:19