MythTV master
mthreadpool.cpp
Go to the documentation of this file.
1/* -*- Mode: c++ -*-
2 *
3 * Class MThreadPool
4 *
5 * Copyright (C) Daniel Kristjansson 2011
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 */
21
75// C++ headers
76#include <algorithm>
77
78// Qt headers
79#include <QCoreApplication>
80#include <QList>
81#include <QMap>
82#include <QMutex>
83#include <QMutexLocker>
84#include <QPair>
85#include <QRunnable>
86#include <QRecursiveMutex>
87#include <QSet>
88#include <QWaitCondition>
89#include <utility>
90
91// MythTV headers
92#include "mthreadpool.h"
93#include "mythlogging.h"
94#include "mythtimer.h"
95#include "logging.h"
96#include "mthread.h"
97#include "mythdb.h"
98
99using MPoolEntry = QPair<QRunnable*,QString>;
100using MPoolQueue = QList<MPoolEntry>;
101using MPoolQueues = QMap<int, MPoolQueue>;
102
103class MPoolThread : public MThread
104{
105 public:
106 MPoolThread(const QString& objectName, MThreadPool &pool, std::chrono::milliseconds timeout) :
108 {
109 }
110
111 void run(void) override // MThread
112 {
113 RunProlog();
114
115 QMutexLocker locker(&m_lock);
116 if (m_doRun && m_runnable == nullptr)
117 {
118 m_wait.wait(locker.mutex(), (m_expiryTimeout+1ms).count());
119 }
120 while (m_runnable != nullptr)
121 {
122 if (!m_runnableName.isEmpty())
124
125 bool autodelete = m_runnable->autoDelete();
126 locker.unlock();
127 m_runnable->run();
128 locker.relock();
129 if (autodelete)
130 delete m_runnable;
131 if (m_reserved)
132 {
133 locker.unlock();
135 locker.relock();
136 }
137 m_reserved = false;
138 m_runnable = nullptr;
139
142
143 GetMythDB()->GetDBManager()->PurgeIdleConnections(false);
144 qApp->processEvents();
145 qApp->sendPostedEvents(nullptr, QEvent::DeferredDelete);
146
147 if (m_doRun)
148 {
149 locker.unlock();
151 locker.relock();
152 }
153 if (m_doRun && m_runnable == nullptr)
154 {
155 m_wait.wait(locker.mutex(), (m_expiryTimeout+1ms).count());
156 }
157 }
158
159 m_doRun = false;
160
161 locker.unlock();
162 m_pool.NotifyDone(this);
163 locker.relock();
164
165 RunEpilog();
166 }
167
168 bool SetRunnable(QRunnable *runnable, QString runnableName,
169 bool reserved)
170 {
171 QMutexLocker locker(&m_lock);
172 if (m_doRun && (m_runnable == nullptr))
173 {
174 m_runnable = runnable;
175 m_runnableName = std::move(runnableName);
176 m_reserved = reserved;
177 m_wait.wakeAll();
178 return true;
179 }
180 return false;
181 }
182
183 void Shutdown(void)
184 {
185 QMutexLocker locker(&m_lock);
186 m_doRun = false;
187 m_wait.wakeAll();
188 }
189
190 QMutex m_lock;
191 QWaitCondition m_wait;
193 std::chrono::milliseconds m_expiryTimeout;
194 bool m_doRun {true};
196 bool m_reserved {false};
197};
198
200
202{
203 public:
204 explicit MThreadPoolPrivate(QString name) :
205 m_name(std::move(name)) {}
206
207 int GetRealMaxThread(void) const
208 {
209 return std::max(m_maxThreadCount,1) + m_reserveThread;
210 }
211
212 mutable QMutex m_lock;
213 QString m_name;
214 QWaitCondition m_wait;
215 bool m_running {true};
216 std::chrono::milliseconds m_expiryTimeout {2min};
217 int m_maxThreadCount {QThread::idealThreadCount()};
220
222 QSet<MPoolThread*> m_availThreads;
223 QSet<MPoolThread*> m_runningThreads;
224 QList<MPoolThread*> m_deleteThreads;
225
226 static QRecursiveMutex s_pool_lock;
228 static QList<MThreadPool*> s_all_pools;
229};
230
231QRecursiveMutex MThreadPoolPrivate::s_pool_lock;
233QList<MThreadPool*> MThreadPoolPrivate::s_all_pools;
234
236
237MThreadPool::MThreadPool(const QString &name) :
238 m_priv(new MThreadPoolPrivate(name))
239{
240 QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
241 MThreadPoolPrivate::s_all_pools.push_back(this);
242}
243
245{
246 Stop();
248 {
249 QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
250 MThreadPoolPrivate::s_all_pools.removeAll(this);
251 }
252 delete m_priv;
253 m_priv = nullptr;
254}
255
257{
258 QMutexLocker locker(&m_priv->m_lock);
259 m_priv->m_running = false;
260 QSet<MPoolThread*>::iterator it = m_priv->m_availThreads.begin();
261 for (; it != m_priv->m_availThreads.end(); ++it)
262 (*it)->Shutdown();
263 it = m_priv->m_runningThreads.begin();
264 for (; it != m_priv->m_runningThreads.end(); ++it)
265 (*it)->Shutdown();
266 m_priv->m_wait.wakeAll();
267}
268
270{
271 waitForDone();
272
273 QMutexLocker locker(&m_priv->m_lock);
274 for (auto *thread : std::as_const(m_priv->m_availThreads))
275 {
276 m_priv->m_deleteThreads.push_front(thread);
277 }
278 m_priv->m_availThreads.clear();
279
280 while (!m_priv->m_deleteThreads.empty())
281 {
282 MPoolThread *thread = m_priv->m_deleteThreads.back();
283 locker.unlock();
284
285 thread->wait();
286
287 locker.relock();
288 delete thread;
289 if (m_priv->m_deleteThreads.back() == thread)
290 m_priv->m_deleteThreads.pop_back();
291 else
292 m_priv->m_deleteThreads.removeAll(thread);
293 }
294}
295
297{
298 QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
300 MThreadPoolPrivate::s_pool = new MThreadPool("GlobalPool");
302}
303
305{
306 QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
307 QList<MThreadPool*>::iterator it;
308 for (it = MThreadPoolPrivate::s_all_pools.begin();
309 it != MThreadPoolPrivate::s_all_pools.end(); ++it)
310 {
311 (*it)->Stop();
312 }
313}
314
316{
317 QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
318 QList<MThreadPool*>::iterator it;
319 for (it = MThreadPoolPrivate::s_all_pools.begin();
320 it != MThreadPoolPrivate::s_all_pools.end(); ++it)
321 {
322 (*it)->Stop();
323 }
324 for (it = MThreadPoolPrivate::s_all_pools.begin();
325 it != MThreadPoolPrivate::s_all_pools.end(); ++it)
326 {
327 (*it)->DeletePoolThreads();
328 }
329}
330
331void MThreadPool::start(QRunnable *runnable, const QString& debugName, int priority)
332{
333 QMutexLocker locker(&m_priv->m_lock);
334 if (TryStartInternal(runnable, debugName, false))
335 return;
336
337 MPoolQueues::iterator it = m_priv->m_runQueues.find(priority);
338 if (it != m_priv->m_runQueues.end())
339 {
340 (*it).push_back(MPoolEntry(runnable,debugName));
341 }
342 else
343 {
344 MPoolQueue list;
345 list.push_back(MPoolEntry(runnable,debugName));
346 m_priv->m_runQueues[priority] = list;
347 }
348}
349
351 QRunnable *runnable, const QString& debugName,
352 std::chrono::milliseconds waitForAvailMS)
353{
354 QMutexLocker locker(&m_priv->m_lock);
355 if (waitForAvailMS > 0ms && m_priv->m_availThreads.empty() &&
357 {
358 MythTimer t;
359 t.start();
360 auto left = waitForAvailMS - t.elapsed();
361 while (left > 0ms && m_priv->m_availThreads.empty() &&
363 {
364 m_priv->m_wait.wait(locker.mutex(), left.count());
365 left = waitForAvailMS - t.elapsed();
366 }
367 }
368 TryStartInternal(runnable, debugName, true);
369}
370
371
372bool MThreadPool::tryStart(QRunnable *runnable, const QString& debugName)
373{
374 QMutexLocker locker(&m_priv->m_lock);
375 return TryStartInternal(runnable, debugName, false);
376}
377
379 QRunnable *runnable, const QString& debugName, bool reserved)
380{
381 if (!m_priv->m_running)
382 return false;
383
384 while (!m_priv->m_deleteThreads.empty())
385 {
386 m_priv->m_deleteThreads.back()->wait();
387 delete m_priv->m_deleteThreads.back();
388 m_priv->m_deleteThreads.pop_back();
389 }
390
391 for (auto iter = m_priv->m_availThreads.begin();
392 iter != m_priv->m_availThreads.end(); )
393 {
394 MPoolThread *thread = *iter;
395 iter = m_priv->m_availThreads.erase(iter);
396 m_priv->m_runningThreads.insert(thread);
397 if (reserved)
399 if (thread->SetRunnable(runnable, debugName, reserved))
400 {
401 return true;
402 }
403
404 if (reserved)
406 thread->Shutdown();
407 m_priv->m_runningThreads.remove(thread);
408 m_priv->m_deleteThreads.push_front(thread);
409 }
410
411 if (reserved ||
413 {
414 if (reserved)
416 QString name {QString("%1%2").arg(m_priv->m_name, QString::number(m_priv->m_threadsCreated))};
418 auto *thread = new MPoolThread(name, *this, m_priv->m_expiryTimeout);
419 m_priv->m_runningThreads.insert(thread);
420 thread->SetRunnable(runnable, debugName, reserved);
421 thread->start();
422 if (thread->isRunning())
423 {
424 return true;
425 }
426
427 // Thread failed to run, OOM?
428 // QThread will print an error, so we don't have to
429 if (reserved)
431 thread->Shutdown();
432 m_priv->m_runningThreads.remove(thread);
433 m_priv->m_deleteThreads.push_front(thread);
434 }
435
436 return false;
437}
438
440{
441 QMutexLocker locker(&m_priv->m_lock);
442
443 if (!m_priv->m_running)
444 {
445 m_priv->m_runningThreads.remove(thread);
446 thread->Shutdown();
447 m_priv->m_deleteThreads.push_front(thread);
448 m_priv->m_wait.wakeAll();
449 return;
450 }
451
452 MPoolQueues::iterator it = m_priv->m_runQueues.begin();
453 if (it == m_priv->m_runQueues.end())
454 {
455 m_priv->m_runningThreads.remove(thread);
456 m_priv->m_availThreads.insert(thread);
457 m_priv->m_wait.wakeAll();
458 return;
459 }
460
461 MPoolEntry e = (*it).front();
462 if (!thread->SetRunnable(e.first, e.second, false))
463 {
464 m_priv->m_runningThreads.remove(thread);
465 m_priv->m_wait.wakeAll();
466 if (!TryStartInternal(e.first, e.second, false))
467 {
468 thread->Shutdown();
469 m_priv->m_deleteThreads.push_front(thread);
470 return;
471 }
472 thread->Shutdown();
473 m_priv->m_deleteThreads.push_front(thread);
474 }
475
476 (*it).pop_front();
477 if ((*it).empty())
478 m_priv->m_runQueues.erase(it);
479}
480
482{
483 QMutexLocker locker(&m_priv->m_lock);
484 m_priv->m_runningThreads.remove(thread);
485 m_priv->m_availThreads.remove(thread);
486 if (!m_priv->m_deleteThreads.contains(thread))
487 m_priv->m_deleteThreads.push_front(thread);
488 m_priv->m_wait.wakeAll();
489}
490
491std::chrono::milliseconds MThreadPool::expiryTimeout(void) const
492{
493 QMutexLocker locker(&m_priv->m_lock);
494 return m_priv->m_expiryTimeout;
495}
496
497void MThreadPool::setExpiryTimeout(std::chrono::milliseconds expiryTimeout)
498{
499 QMutexLocker locker(&m_priv->m_lock);
501}
502
504{
505 QMutexLocker locker(&m_priv->m_lock);
506 return m_priv->m_maxThreadCount;
507}
508
509void MThreadPool::setMaxThreadCount(int maxThreadCount)
510{
511 QMutexLocker locker(&m_priv->m_lock);
513}
514
516{
517 QMutexLocker locker(&m_priv->m_lock);
518 return m_priv->m_availThreads.size() + m_priv->m_runningThreads.size();
519}
520
521/*
522void MThreadPool::reserveThread(void)
523{
524 QMutexLocker locker(&m_priv->m_lock);
525 m_priv->m_reserveThread++;
526}
527
528void MThreadPool::releaseThread(void)
529{
530 QMutexLocker locker(&m_priv->m_lock);
531 if (m_priv->m_reserveThread > 0)
532 m_priv->m_reserveThread--;
533}
534*/
535
537{
538 QMutexLocker locker(&m_priv->m_lock);
539 if (m_priv->m_reserveThread > 0)
541}
542
543#if 0
544static void print_set(QString title, QSet<MPoolThread*> set)
545{
546 LOG(VB_GENERAL, LOG_INFO, title);
547 for (auto item : std::as_const(set))
548 {
549 LOG(VB_GENERAL, LOG_INFO, QString(" : 0x%1")
550 .arg((quint64)item,0,16));
551 }
552 LOG(VB_GENERAL, LOG_INFO, "");
553}
554#endif
555
557{
558 QMutexLocker locker(&m_priv->m_lock);
559 while (true)
560 {
561 while (!m_priv->m_deleteThreads.empty())
562 {
563 m_priv->m_deleteThreads.back()->wait();
564 delete m_priv->m_deleteThreads.back();
565 m_priv->m_deleteThreads.pop_back();
566 }
567
568 if (m_priv->m_running && !m_priv->m_runQueues.empty())
569 {
570 m_priv->m_wait.wait(locker.mutex());
571 continue;
572 }
573
574 QSet<MPoolThread*> working = m_priv->m_runningThreads;
575 working = working.subtract(m_priv->m_availThreads);
576 if (working.empty())
577 break;
578 m_priv->m_wait.wait(locker.mutex());
579 }
580}
581
582/* vim: set expandtab tabstop=4 shiftwidth=4: */
void Shutdown(void)
std::chrono::milliseconds m_expiryTimeout
QWaitCondition m_wait
MPoolThread(const QString &objectName, MThreadPool &pool, std::chrono::milliseconds timeout)
bool SetRunnable(QRunnable *runnable, QString runnableName, bool reserved)
MThreadPool & m_pool
QString m_runnableName
void run(void) override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.
QSet< MPoolThread * > m_runningThreads
static QList< MThreadPool * > s_all_pools
static QRecursiveMutex s_pool_lock
MThreadPoolPrivate(QString name)
QWaitCondition m_wait
std::chrono::milliseconds m_expiryTimeout
QSet< MPoolThread * > m_availThreads
MPoolQueues m_runQueues
int GetRealMaxThread(void) const
QList< MPoolThread * > m_deleteThreads
static MThreadPool * s_pool
int maxThreadCount(void) const
std::chrono::milliseconds expiryTimeout(void) const
void setExpiryTimeout(std::chrono::milliseconds expiryTimeout)
void DeletePoolThreads(void)
MThreadPool(const QString &name)
void setMaxThreadCount(int maxThreadCount)
MThreadPoolPrivate * m_priv
Definition: mthreadpool.h:57
void ReleaseThread(void)
void startReserved(QRunnable *runnable, const QString &debugName, std::chrono::milliseconds waitForAvailMS=0ms)
bool tryStart(QRunnable *runnable, const QString &debugName)
void NotifyAvailable(MPoolThread *thread)
void Stop(void)
int activeThreadCount(void) const
static void StopAllPools(void)
static MThreadPool * globalInstance(void)
friend class MPoolThread
Definition: mthreadpool.h:20
void start(QRunnable *runnable, const QString &debugName, int priority=0)
static void ShutdownAllPools(void)
void waitForDone(void)
void NotifyDone(MPoolThread *thread)
bool TryStartInternal(QRunnable *runnable, const QString &debugName, bool reserved)
This is a wrapper around QThread that does several additional things.
Definition: mthread.h:49
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:196
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:209
QRunnable * m_runnable
Definition: mthread.h:136
bool wait(std::chrono::milliseconds time=std::chrono::milliseconds::max())
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:300
QString objectName(void) const
Definition: mthread.cpp:243
A QElapsedTimer based timer to replace use of QTime as a timer.
Definition: mythtimer.h:14
void loggingDeregisterThread(void)
Deregister the current thread's name.
Definition: logging.cpp:723
void loggingRegisterThread(const QString &name)
Register the current thread with the given name.
Definition: logging.cpp:704
QList< MPoolEntry > MPoolQueue
QPair< QRunnable *, QString > MPoolEntry
Definition: mthreadpool.cpp:99
QMap< int, MPoolQueue > MPoolQueues
MythDB * GetMythDB(void)
Definition: mythdb.cpp:51
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
Definition: mythlogging.h:39
STL namespace.