MythTV  master
taskqueue.cpp
Go to the documentation of this file.
1 // Program Name: taskqueue.cpp
3 // Created : Oct. 24, 2005
4 //
5 // Purpose : Used to process delayed tasks
6 //
7 // Copyright (c) 2005 David Blain <dblain@mythtv.org>
8 //
9 // Licensed under the GPL v2 or later, see COPYING for details
10 //
12 
13 #include "taskqueue.h"
14 #include "mythlogging.h"
15 
16 #include <QDateTime>
17 
18 #include <iostream>
19 
21 // Define Global instance
23 
26 
29 //
30 // Task Implementation
31 //
34 
35 long Task::m_nTaskCount = 0;
36 
38 //
40 
41 Task::Task(const QString &debugName) : ReferenceCounter(debugName)
42 {
44 }
45 
48 //
49 // Task Queue Implementation
50 //
53 
55 //
57 
59 {
60  QMutexLocker locker(&g_pTaskQueueCreationLock);
61  return g_pTaskQueue ? g_pTaskQueue : (g_pTaskQueue = new TaskQueue());
62 }
63 
65 //
67 
69 {
70  QMutexLocker locker(&g_pTaskQueueCreationLock);
71  delete g_pTaskQueue;
72  g_pTaskQueue = nullptr;
73 }
74 
76 //
78 
79 TaskQueue::TaskQueue() : MThread("TaskQueue")
80 {
81  LOG(VB_UPNP, LOG_INFO, "Starting TaskQueue Thread...");
82 
83  start();
84 
85  LOG(VB_UPNP, LOG_INFO, "TaskQueue Thread Started.");
86 }
87 
89 //
91 
93 {
94  m_bTermRequested = true;
95 
96  wait();
97 
98  Clear();
99 }
100 
102 {
103  m_bTermRequested = true;
104 }
105 
107 //
109 
111 {
112  RunProlog();
113 
114  LOG(VB_UPNP, LOG_INFO, "TaskQueue Thread Running.");
115 
116  while ( !m_bTermRequested )
117  {
118  // ------------------------------------------------------------------
119  // Process Any Tasks that may need to be executed.
120  // ------------------------------------------------------------------
121 
122  auto ttNow = nowAsDuration<std::chrono::microseconds>();
123 
124  Task *pTask = GetNextExpiredTask( ttNow );
125  if (pTask != nullptr)
126  {
127  try
128  {
129  pTask->Execute( this );
130  pTask->DecrRef();
131  }
132  catch( ... )
133  {
134  LOG(VB_GENERAL, LOG_ERR, "Call to Execute threw an exception.");
135  }
136 
137  }
138  // Make sure to throttle our processing.
139 
140  usleep( 100ms );
141  }
142 
143  RunEpilog();
144 }
145 
147 //
149 
151 {
152  m_mutex.lock();
153 
154  for (auto & task : m_mapTasks)
155  {
156  if (task.second != nullptr)
157  task.second->DecrRef();
158  }
159 
160  m_mapTasks.clear();
161 
162  m_mutex.unlock();
163 }
164 
167 // \param msec The number of milliseconds in the future to run this task
168 // \param pTask A pointer to the task.
170 
171 void TaskQueue::AddTask( std::chrono::milliseconds msec, Task *pTask )
172 {
173  auto tt = nowAsDuration<std::chrono::microseconds>() + msec;
174 
175  AddTaskAbsolute( tt, pTask );
176 }
177 
180 // \param msec The time when this task should run
181 // \param pTask A pointer to the task.
183 
185 {
186  if (pTask != nullptr)
187  {
188  m_mutex.lock();
189  pTask->IncrRef();
190  m_mapTasks.insert( TaskMap::value_type( ttKey, pTask ));
191  m_mutex.unlock();
192  }
193 }
194 
197 // \param pTask A pointer to the task.
199 
200 void TaskQueue::AddTask( Task *pTask )
201 {
202 
203  if (pTask != nullptr)
204  {
205  auto tt = nowAsDuration<std::chrono::microseconds>();
206 
207  AddTaskAbsolute( tt, pTask );
208  }
209 }
210 
212 //
214 
215 Task *TaskQueue::GetNextExpiredTask( TaskTime tt, std::chrono::milliseconds nWithinMilliSecs /*=50*/ )
216 {
217  Task *pTask = nullptr;
218 
219  tt += nWithinMilliSecs ;
220 
221  m_mutex.lock();
222 
223  auto it = m_mapTasks.begin();
224  if (it != m_mapTasks.end())
225  {
226  TaskTime ttTask = (*it).first;
227 
228  if (ttTask < tt)
229  {
230  // Do not release here... caller must call release.
231 
232  pTask = (*it).second;
233 
234  m_mapTasks.erase( it );
235  }
236  }
237  m_mutex.unlock();
238 
239  return pTask;
240 }
TaskQueue::Clear
void Clear()
Definition: taskqueue.cpp:150
MThread::start
void start(QThread::Priority p=QThread::InheritPriority)
Tell MThread to start running the thread in the near future.
Definition: mthread.cpp:286
ReferenceCounter::DecrRef
virtual int DecrRef(void)
Decrements reference count and deletes on 0.
Definition: referencecounter.cpp:125
TaskTime
std::chrono::microseconds TaskTime
Definition: upnputil.h:43
TaskQueue::m_mapTasks
TaskMap m_mapTasks
Definition: taskqueue.h:90
MThread::wait
bool wait(std::chrono::milliseconds time=std::chrono::milliseconds::max())
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:303
TaskQueue::RequestTerminate
void RequestTerminate()
Definition: taskqueue.cpp:101
Task::Execute
virtual void Execute(TaskQueue *pQueue)=0
TaskQueue::GetNextExpiredTask
Task * GetNextExpiredTask(TaskTime tt, std::chrono::milliseconds nWithinMilliSecs=50ms)
Definition: taskqueue.cpp:215
MThread::usleep
static void usleep(std::chrono::microseconds time)
Definition: mthread.cpp:330
LOG
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
Definition: mythlogging.h:23
MThread::RunProlog
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:196
TaskQueue
Definition: taskqueue.h:81
g_pTaskQueueCreationLock
static QMutex g_pTaskQueueCreationLock
Definition: taskqueue.cpp:24
TaskQueue::m_mutex
QMutex m_mutex
Definition: taskqueue.h:91
mythlogging.h
TaskQueue::Instance
static TaskQueue * Instance()
Definition: taskqueue.cpp:58
Task
Definition: taskqueue.h:53
TaskQueue::m_bTermRequested
bool m_bTermRequested
Definition: taskqueue.h:92
MThread::RunEpilog
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:209
TaskQueue::run
void run() override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.
Definition: taskqueue.cpp:110
TaskQueue::Shutdown
static void Shutdown()
Definition: taskqueue.cpp:68
TaskQueue::g_pTaskQueue
static TaskQueue * g_pTaskQueue
Definition: taskqueue.h:86
taskqueue.h
Task::m_nTaskCount
static long m_nTaskCount
Definition: taskqueue.h:56
TaskQueue::AddTaskAbsolute
void AddTaskAbsolute(TaskTime tt, Task *pTask)
Add a task to run at a specific time.
Definition: taskqueue.cpp:184
Task::m_nTaskId
long m_nTaskId
Definition: taskqueue.h:58
TaskQueue::~TaskQueue
~TaskQueue() override
Definition: taskqueue.cpp:92
Task::Task
Task(const QString &debugName)
Definition: taskqueue.cpp:41
TaskQueue::AddTask
void AddTask(std::chrono::milliseconds msec, Task *pTask)
Add a task to run in the future.
Definition: taskqueue.cpp:171
MThread
This is a wrapper around QThread that does several additional things.
Definition: mthread.h:49
TaskQueue::TaskQueue
TaskQueue()
Definition: taskqueue.cpp:79
ReferenceCounter::IncrRef
virtual int IncrRef(void)
Increments reference count.
Definition: referencecounter.cpp:101
ReferenceCounter
General purpose reference counter.
Definition: referencecounter.h:26