MythTV  master
taskqueue.cpp
Go to the documentation of this file.
1 // Program Name: taskqueue.cpp
3 // Created : Oct. 24, 2005
4 //
5 // Purpose : Used to process delayed tasks
6 //
7 // Copyright (c) 2005 David Blain <dblain@mythtv.org>
8 //
9 // Licensed under the GPL v2 or later, see COPYING for details
10 //
12 
13 #include "taskqueue.h"
14 #include "mythlogging.h"
15 
16 #include <QDateTime>
17 
18 #include <iostream>
19 
21 // Define Global instance
23 
26 
29 //
30 // Task Implementation
31 //
34 
35 long Task::m_nTaskCount = 0;
36 
38 //
40 
41 Task::Task(const QString &debugName) : ReferenceCounter(debugName)
42 {
44 }
45 
48 //
49 // Task Queue Implementation
50 //
53 
55 //
57 
59 {
60  QMutexLocker locker(&g_pTaskQueueCreationLock);
61  return g_pTaskQueue ? g_pTaskQueue : (g_pTaskQueue = new TaskQueue());
62 }
63 
65 //
67 
69 {
70  QMutexLocker locker(&g_pTaskQueueCreationLock);
71  delete g_pTaskQueue;
72  g_pTaskQueue = nullptr;
73 }
74 
76 //
78 
79 TaskQueue::TaskQueue() : MThread("TaskQueue")
80 {
81  LOG(VB_UPNP, LOG_INFO, "Starting TaskQueue Thread...");
82 
83  start();
84 
85  LOG(VB_UPNP, LOG_INFO, "TaskQueue Thread Started.");
86 }
87 
89 //
91 
93 {
94  m_bTermRequested = true;
95 
96  wait();
97 
98  Clear();
99 }
100 
102 {
103  m_bTermRequested = true;
104 }
105 
107 //
109 
111 {
112  RunProlog();
113 
114  LOG(VB_UPNP, LOG_INFO, "TaskQueue Thread Running.");
115 
116  while ( !m_bTermRequested )
117  {
118  // ------------------------------------------------------------------
119  // Process Any Tasks that may need to be executed.
120  // ------------------------------------------------------------------
121 
122  TaskTime ttNow;
123  gettimeofday( (&ttNow), nullptr );
124 
125  Task *pTask = GetNextExpiredTask( ttNow );
126  if (pTask != nullptr)
127  {
128  try
129  {
130  pTask->Execute( this );
131  pTask->DecrRef();
132  }
133  catch( ... )
134  {
135  LOG(VB_GENERAL, LOG_ERR, "Call to Execute threw an exception.");
136  }
137 
138  }
139  // Make sure to throttle our processing.
140 
141  msleep( 100 );
142  }
143 
144  RunEpilog();
145 }
146 
148 //
150 
152 {
153  m_mutex.lock();
154 
155  for (auto & task : m_mapTasks)
156  {
157  if (task.second != nullptr)
158  task.second->DecrRef();
159  }
160 
161  m_mapTasks.clear();
162 
163  m_mutex.unlock();
164 }
165 
167 //
169 
170 void TaskQueue::AddTask( long msec, Task *pTask )
171 {
172  TaskTime tt;
173  gettimeofday( (&tt), nullptr );
174 
175  AddMicroSecToTaskTime( tt, (msec * 1000) );
176 
177  AddTask( tt, pTask );
178 }
179 
181 //
183 
184 void TaskQueue::AddTask( TaskTime ttKey, Task *pTask )
185 {
186  if (pTask != nullptr)
187  {
188  m_mutex.lock();
189  pTask->IncrRef();
190  m_mapTasks.insert( TaskMap::value_type( ttKey, pTask ));
191  m_mutex.unlock();
192  }
193 }
194 
196 //
198 
199 void TaskQueue::AddTask( Task *pTask )
200 {
201 
202  if (pTask != nullptr)
203  {
204  TaskTime tt;
205  gettimeofday( (&tt), nullptr );
206 
207  AddTask( tt, pTask );
208  }
209 }
210 
212 //
214 
215 Task *TaskQueue::GetNextExpiredTask( TaskTime tt, long nWithinMilliSecs /*=50*/ )
216 {
217  Task *pTask = nullptr;
218 
219  AddMicroSecToTaskTime( tt, nWithinMilliSecs * 1000 );
220 
221  m_mutex.lock();
222 
223  auto it = m_mapTasks.begin();
224  if (it != m_mapTasks.end())
225  {
226  TaskTime ttTask = (*it).first;
227 
228  if (ttTask < tt)
229  {
230  // Do not release here... caller must call release.
231 
232  pTask = (*it).second;
233 
234  m_mapTasks.erase( it );
235  }
236  }
237  m_mutex.unlock();
238 
239  return pTask;
240 }
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:215
long m_nTaskId
Definition: taskqueue.h:58
This is a wrapper around QThread that does several additional things.
Definition: mthread.h:46
~TaskQueue() override
Definition: taskqueue.cpp:92
static TaskQueue * Instance()
Definition: taskqueue.cpp:58
struct timeval TaskTime
Definition: httpserver.h:45
void AddMicroSecToTaskTime(TaskTime &t, suseconds_t uSecs)
Definition: upnputil.cpp:103
General purpose reference counter.
bool wait(unsigned long time=ULONG_MAX)
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:311
Task(const QString &debugName)
Definition: taskqueue.cpp:41
static TaskQueue * g_pTaskQueue
Definition: taskqueue.h:86
TaskMap m_mapTasks
Definition: taskqueue.h:90
static void msleep(unsigned long time)
Definition: mthread.cpp:343
QMutex m_mutex
Definition: taskqueue.h:91
static void Shutdown()
Definition: taskqueue.cpp:68
virtual int IncrRef(void)
Increments reference count.
static long m_nTaskCount
Definition: taskqueue.h:56
void run() override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.
Definition: taskqueue.cpp:110
virtual int DecrRef(void)
Decrements reference count and deletes on 0.
void AddTask(long msec, Task *pTask)
Definition: taskqueue.cpp:170
void RequestTerminate()
Definition: taskqueue.cpp:101
void Clear()
Definition: taskqueue.cpp:151
Task * GetNextExpiredTask(TaskTime tt, long nWithinMilliSecs=50)
Definition: taskqueue.cpp:215
#define LOG(_MASK_, _LEVEL_, _STRING_)
Definition: mythlogging.h:41
static QMutex g_pTaskQueueCreationLock
Definition: taskqueue.cpp:24
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:202
void start(QThread::Priority p=QThread::InheritPriority)
Tell MThread to start running the thread in the near future.
Definition: mthread.cpp:294
bool m_bTermRequested
Definition: taskqueue.h:92
Definition: taskqueue.h:53
virtual void Execute(TaskQueue *pQueue)=0