MythTV  master
taskqueue.cpp
Go to the documentation of this file.
1 // Program Name: taskqueue.cpp
3 // Created : Oct. 24, 2005
4 //
5 // Purpose : Used to process delayed tasks
6 //
7 // Copyright (c) 2005 David Blain <dblain@mythtv.org>
8 //
9 // Licensed under the GPL v2 or later, see LICENSE for details
10 //
12 
13 #include <iostream>
14 
15 #include <QDateTime>
16 
18 
19 #include "taskqueue.h"
20 
22 // Define Global instance
24 
27 
30 //
31 // Task Implementation
32 //
35 
36 long Task::m_nTaskCount = 0;
37 
39 //
41 
42 Task::Task(const QString &debugName) : ReferenceCounter(debugName)
43 {
45 }
46 
49 //
50 // Task Queue Implementation
51 //
54 
56 //
58 
60 {
61  QMutexLocker locker(&g_pTaskQueueCreationLock);
62  return g_pTaskQueue ? g_pTaskQueue : (g_pTaskQueue = new TaskQueue());
63 }
64 
66 //
68 
70 {
71  QMutexLocker locker(&g_pTaskQueueCreationLock);
72  delete g_pTaskQueue;
73  g_pTaskQueue = nullptr;
74 }
75 
77 //
79 
80 TaskQueue::TaskQueue() : MThread("TaskQueue")
81 {
82  LOG(VB_UPNP, LOG_INFO, "Starting TaskQueue Thread...");
83 
84  start();
85 
86  LOG(VB_UPNP, LOG_INFO, "TaskQueue Thread Started.");
87 }
88 
90 //
92 
94 {
95  m_bTermRequested = true;
96 
97  wait();
98 
99  Clear();
100 }
101 
103 {
104  m_bTermRequested = true;
105 }
106 
108 //
110 
112 {
113  RunProlog();
114 
115  LOG(VB_UPNP, LOG_INFO, "TaskQueue Thread Running.");
116 
117  while ( !m_bTermRequested )
118  {
119  // ------------------------------------------------------------------
120  // Process Any Tasks that may need to be executed.
121  // ------------------------------------------------------------------
122 
123  auto ttNow = nowAsDuration<std::chrono::microseconds>();
124 
125  Task *pTask = GetNextExpiredTask( ttNow );
126  if (pTask != nullptr)
127  {
128  try
129  {
130  pTask->Execute( this );
131  pTask->DecrRef();
132  }
133  catch( ... )
134  {
135  LOG(VB_GENERAL, LOG_ERR, "Call to Execute threw an exception.");
136  }
137 
138  }
139  // Make sure to throttle our processing.
140 
141  usleep( 100ms );
142  }
143 
144  RunEpilog();
145 }
146 
148 //
150 
152 {
153  m_mutex.lock();
154 
155  for (auto & task : m_mapTasks)
156  {
157  if (task.second != nullptr)
158  task.second->DecrRef();
159  }
160 
161  m_mapTasks.clear();
162 
163  m_mutex.unlock();
164 }
165 
171 
172 void TaskQueue::AddTask( std::chrono::milliseconds msec, Task *pTask )
173 {
174  auto tt = nowAsDuration<std::chrono::microseconds>() + msec;
175 
176  AddTaskAbsolute( tt, pTask );
177 }
178 
184 
186 {
187  if (pTask != nullptr)
188  {
189  m_mutex.lock();
190  pTask->IncrRef();
191  m_mapTasks.insert( TaskMap::value_type( ttKey, pTask ));
192  m_mutex.unlock();
193  }
194 }
195 
200 
201 void TaskQueue::AddTask( Task *pTask )
202 {
203 
204  if (pTask != nullptr)
205  {
206  auto tt = nowAsDuration<std::chrono::microseconds>();
207 
208  AddTaskAbsolute( tt, pTask );
209  }
210 }
211 
213 //
215 
216 Task *TaskQueue::GetNextExpiredTask( TaskTime tt, std::chrono::milliseconds nWithinMilliSecs /*=50*/ )
217 {
218  Task *pTask = nullptr;
219 
220  tt += nWithinMilliSecs ;
221 
222  m_mutex.lock();
223 
224  auto it = m_mapTasks.begin();
225  if (it != m_mapTasks.end())
226  {
227  TaskTime ttTask = (*it).first;
228 
229  if (ttTask < tt)
230  {
231  // Do not release here... caller must call release.
232 
233  pTask = (*it).second;
234 
235  m_mapTasks.erase( it );
236  }
237  }
238  m_mutex.unlock();
239 
240  return pTask;
241 }
TaskQueue::Clear
void Clear()
Definition: taskqueue.cpp:151
MThread::start
void start(QThread::Priority p=QThread::InheritPriority)
Tell MThread to start running the thread in the near future.
Definition: mthread.cpp:283
ReferenceCounter::DecrRef
virtual int DecrRef(void)
Decrements reference count and deletes on 0.
Definition: referencecounter.cpp:125
TaskTime
std::chrono::microseconds TaskTime
Definition: upnputil.h:31
TaskQueue::m_mapTasks
TaskMap m_mapTasks
Definition: taskqueue.h:91
MThread::wait
bool wait(std::chrono::milliseconds time=std::chrono::milliseconds::max())
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:300
TaskQueue::RequestTerminate
void RequestTerminate()
Definition: taskqueue.cpp:102
Task::Execute
virtual void Execute(TaskQueue *pQueue)=0
TaskQueue::GetNextExpiredTask
Task * GetNextExpiredTask(TaskTime tt, std::chrono::milliseconds nWithinMilliSecs=50ms)
Definition: taskqueue.cpp:216
MThread::usleep
static void usleep(std::chrono::microseconds time)
Definition: mthread.cpp:335
LOG
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
Definition: mythlogging.h:39
MThread::RunProlog
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:196
TaskQueue
Definition: taskqueue.h:82
g_pTaskQueueCreationLock
static QMutex g_pTaskQueueCreationLock
Definition: taskqueue.cpp:25
TaskQueue::m_mutex
QMutex m_mutex
Definition: taskqueue.h:92
mythlogging.h
TaskQueue::Instance
static TaskQueue * Instance()
Definition: taskqueue.cpp:59
Task
Definition: taskqueue.h:54
TaskQueue::m_bTermRequested
bool m_bTermRequested
Definition: taskqueue.h:93
MThread::RunEpilog
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:209
TaskQueue::run
void run() override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.
Definition: taskqueue.cpp:111
TaskQueue::Shutdown
static void Shutdown()
Definition: taskqueue.cpp:69
TaskQueue::g_pTaskQueue
static TaskQueue * g_pTaskQueue
Definition: taskqueue.h:87
taskqueue.h
Task::m_nTaskCount
static long m_nTaskCount
Definition: taskqueue.h:57
TaskQueue::AddTaskAbsolute
void AddTaskAbsolute(TaskTime tt, Task *pTask)
Add a task to run at a specific time.
Definition: taskqueue.cpp:185
Task::m_nTaskId
long m_nTaskId
Definition: taskqueue.h:59
TaskQueue::~TaskQueue
~TaskQueue() override
Definition: taskqueue.cpp:93
Task::Task
Task(const QString &debugName)
Definition: taskqueue.cpp:42
TaskQueue::AddTask
void AddTask(std::chrono::milliseconds msec, Task *pTask)
Add a task to run in the future.
Definition: taskqueue.cpp:172
MThread
This is a wrapper around QThread that does several additional things.
Definition: mthread.h:48
TaskQueue::TaskQueue
TaskQueue()
Definition: taskqueue.cpp:80
ReferenceCounter::IncrRef
virtual int IncrRef(void)
Increments reference count.
Definition: referencecounter.cpp:101
ReferenceCounter
General purpose reference counter.
Definition: referencecounter.h:26