MythTV  master
eventing.cpp
Go to the documentation of this file.
1 // Program Name: eventing.cpp
3 // Created : Dec. 22, 2006
4 //
5 // Purpose : uPnp Eventing Base Class Implementation
6 //
7 // Copyright (c) 2006 David Blain <dblain@mythtv.org>
8 //
9 // Licensed under the GPL v2 or later, see LICENSE for details
10 //
12 
13 #include <cmath>
14 
15 #include <QStringList>
16 #if QT_VERSION >= QT_VERSION_CHECK(6,0,0)
17 #include <QStringConverter>
18 #else
19 #include <QTextCodec>
20 #endif
21 #include <QTextStream>
22 
23 #include "upnp.h"
24 #include "eventing.h"
25 #include "upnptaskevent.h"
29 
30 #if QT_VERSION < QT_VERSION_CHECK(5,14,0)
31  #define QT_ENDL endl
32  #define QT_FLUSH flush
33 #else
34  #define QT_ENDL Qt::endl
35  #define QT_FLUSH Qt::flush
36 #endif
37 
39 //
41 
43  QTextStream &ts, TaskTime ttLastNotified) const
44 {
45  uint nCount = 0;
46 
47  ts << "<?xml version=\"1.0\"?>" << QT_ENDL
48  << "<e:propertyset xmlns:e=\"urn:schemas-upnp-org:event-1-0\">" << QT_ENDL;
49 
50  for (auto *prop : qAsConst(m_map))
51  {
52  if ( ttLastNotified < prop->m_ttLastChanged )
53  {
54  nCount++;
55 
56  ts << "<e:property>" << QT_ENDL;
57  ts << "<" << prop->m_sName << ">";
58  ts << prop->ToString();
59  ts << "</" << prop->m_sName << ">";
60  ts << "</e:property>" << QT_ENDL;
61  }
62  }
63 
64  ts << "</e:propertyset>" << QT_ENDL;
65  ts << QT_FLUSH;
66 
67  return nCount;
68 }
69 
71 //
73 
74 Eventing::Eventing(const QString &sExtensionName,
75  QString sEventMethodName,
76  const QString &sSharePath) :
77  HttpServerExtension(sExtensionName, sSharePath),
78  m_sEventMethodName(std::move(sEventMethodName)),
79  m_nSubscriptionDuration(
80  MythCoreContext::GetConfiguration()->GetDuration<std::chrono::seconds>("UPnP/SubscriptionDuration", 30min))
81 {
83 }
84 
86 //
88 
90 {
91  for (const auto *subscriber : qAsConst(m_subscribers))
92  delete subscriber;
93  m_subscribers.clear();
94 }
95 
97 //
99 
100 inline short Eventing::HoldEvents()
101 {
102  // -=>TODO: Should use an Atomic increment...
103  // need to research available functions.
104 
105  m_mutex.lock();
106  bool err = (m_nHoldCount >= 127);
107  short nVal = (m_nHoldCount++);
108  m_mutex.unlock();
109 
110  if (err)
111  {
112  LOG(VB_GENERAL, LOG_ERR, "Exceeded maximum guarranteed range of "
113  "m_nHoldCount short [-128..127]");
114  LOG(VB_GENERAL, LOG_ERR,
115  "UPnP may not exhibit strange behavior or crash mythtv");
116  }
117 
118  return nVal;
119 }
120 
122 //
124 
126 {
127  // -=>TODO: Should use an Atomic decrement...
128 
129  m_mutex.lock();
130  short nVal = (m_nHoldCount--);
131  m_mutex.unlock();
132 
133  if (nVal == 0)
134  Notify();
135 
136  return nVal;
137 }
138 
140 //
142 
144 {
145  // -=>TODO: This isn't very efficient... Need to find out if we can make
146  // this something unique, other than root.
147 
148  return QStringList( "/" );
149 }
150 
152 //
154 
156 {
157  if (pRequest)
158  {
159  if ( pRequest->m_sBaseUrl != "/" )
160  return false;
161 
162  if ( pRequest->m_sMethod != m_sEventMethodName )
163  return false;
164 
165  LOG(VB_UPNP, LOG_INFO, QString("Eventing::ProcessRequest - Method (%1)")
166  .arg(pRequest->m_sMethod ));
167 
168  switch( pRequest->m_eType )
169  {
170  case RequestTypeSubscribe : HandleSubscribe ( pRequest ); break;
171  case RequestTypeUnsubscribe : HandleUnsubscribe ( pRequest ); break;
172  default:
174  break;
175  }
176  }
177 
178  return( true );
179 
180 }
181 
183 //
185 
187 {
188  // Use PostProcessing Hook to perform Initial Notification
189  // to make sure they receive it AFTER the subscription results
190 
191  if (m_pInitializeSubscriber != nullptr)
192  {
194 
195  m_pInitializeSubscriber = nullptr;
196  }
197 }
198 
200 //
202 
204 {
205  pRequest->m_eResponseType = ResponseTypeXML;
206  pRequest->m_nResponseStatus = 412;
207 
208  QString sCallBack = pRequest->GetRequestHeader( "CALLBACK", "" );
209  QString sNT = pRequest->GetRequestHeader( "NT" , "" );
210  QString sTimeout = pRequest->GetRequestHeader( "TIMEOUT" , "" );
211  QString sSID = pRequest->GetRequestHeader( "SID" , "" );
212 
213  SubscriberInfo *pInfo = nullptr;
214 
215  // ----------------------------------------------------------------------
216  // Validate Header Values...
217  // ----------------------------------------------------------------------
218 
219  // -=>TODO: Need to add support for more than one CallBack URL.
220 
221  if ( sCallBack.length() != 0 )
222  {
223  // ------------------------------------------------------------------
224  // New Subscription
225  // ------------------------------------------------------------------
226 
227  if ( sSID.length() != 0 )
228  {
229  pRequest->m_nResponseStatus = 400;
230  return;
231  }
232 
233  if ( sNT != "upnp:event" )
234  return;
235 
236  // ----------------------------------------------------------------------
237  // Process Subscription
238  // ----------------------------------------------------------------------
239 
240  // -=>TODO: Temp code until support for multiple callbacks are supported.
241 
242  sCallBack = sCallBack.mid( 1, sCallBack.indexOf(">") - 1);
243 
244  std::chrono::seconds nDuration = m_nSubscriptionDuration;
245  if ( sTimeout.startsWith("Second-") )
246  {
247  bool ok = false;
248  auto nValue = std::chrono::seconds(sTimeout.section("-", 1).toInt(&ok));
249  if (ok)
250  nDuration = nValue;
251  }
252 
253  pInfo = new SubscriberInfo( sCallBack, nDuration );
254 
255  Subscribers::iterator it = m_subscribers.find(pInfo->m_sUUID);
256  if (it != m_subscribers.end())
257  {
258  delete *it;
259  m_subscribers.erase(it);
260  }
261  m_subscribers[pInfo->m_sUUID] = pInfo;
262 
263  // Use PostProcess Hook to Send Initial FULL Notification...
264  // *** Must send this response first then notify.
265 
266  m_pInitializeSubscriber = pInfo;
267  pRequest->m_pPostProcess = (IPostProcess *)this;
268 
269  }
270  else
271  {
272  // ------------------------------------------------------------------
273  // Renewal
274  // ------------------------------------------------------------------
275 
276  if ( sSID.length() != 0 )
277  {
278  sSID = sSID.mid( 5 );
279  pInfo = m_subscribers[sSID];
280  }
281 
282  }
283 
284  if (pInfo != nullptr)
285  {
286  pRequest->m_mapRespHeaders[ "SID" ] = QString( "uuid:%1" )
287  .arg( pInfo->m_sUUID );
288 
289  pRequest->m_mapRespHeaders[ "TIMEOUT"] = QString( "Second-%1" )
290  .arg( pInfo->m_nDuration.count() );
291 
292  pRequest->m_nResponseStatus = 200;
293 
294  }
295 
296 }
297 
299 //
301 
303 {
304  pRequest->m_eResponseType = ResponseTypeXML;
305  pRequest->m_nResponseStatus = 412;
306 
307  QString sCallBack = pRequest->GetRequestHeader( "CALLBACK", "" );
308  QString sNT = pRequest->GetRequestHeader( "NT" , "" );
309  QString sSID = pRequest->GetRequestHeader( "SID" , "" );
310 
311  if ((sCallBack.length() != 0) || (sNT.length() != 0))
312  {
313  pRequest->m_nResponseStatus = 400;
314  return;
315  }
316 
317  sSID = sSID.mid( 5 );
318 
319  Subscribers::iterator it = m_subscribers.find(sSID);
320  if (it != m_subscribers.end())
321  {
322  delete *it;
323  m_subscribers.erase(it);
324  pRequest->m_nResponseStatus = 200;
325  }
326 }
327 
329 //
331 
333 {
334  auto tt = nowAsDuration<std::chrono::microseconds>();
335 
336  m_mutex.lock();
337 
338  Subscribers::iterator it = m_subscribers.begin();
339  while (it != m_subscribers.end())
340  {
341  if (!(*it))
342  { // This should never happen, but if someone inserted bad data...
343  ++it;
344  continue;
345  }
346 
347  if (tt < (*it)->m_ttExpires)
348  {
349  // Subscription not expired yet. Send event notification.
350  NotifySubscriber(*it);
351  ++it;
352  }
353  else
354  {
355  // Time to expire this subscription. Remove subscriber from list.
356  delete *it;
357  it = m_subscribers.erase(it);
358  }
359  }
360 
361  m_mutex.unlock();
362 }
363 
365 //
367 
369 {
370  if (pInfo == nullptr)
371  return;
372 
373  QByteArray aBody;
374  QTextStream tsBody( &aBody, QIODevice::WriteOnly );
375 
376 #if QT_VERSION < QT_VERSION_CHECK(6,0,0)
377  tsBody.setCodec(QTextCodec::codecForName("UTF-8"));
378 #else
379  tsBody.setEncoding(QStringConverter::Utf8);
380 #endif
381 
382  // ----------------------------------------------------------------------
383  // Build Body... Only send if there are changes
384  // ----------------------------------------------------------------------
385 
386  uint nCount = BuildNotifyBody(tsBody, pInfo->m_ttLastNotified);
387  if (nCount)
388  {
389 
390  // -=>TODO: Need to add support for more than one CallBack URL.
391 
392  auto *pBuffer = new QByteArray(); // UPnpEventTask will delete this pointer.
393  QTextStream tsMsg( pBuffer, QIODevice::WriteOnly );
394 
395 #if QT_VERSION < QT_VERSION_CHECK(6,0,0)
396  tsMsg.setCodec(QTextCodec::codecForName("UTF-8"));
397 #else
398  tsMsg.setEncoding(QStringConverter::Utf8);
399 #endif
400 
401  // ----------------------------------------------------------------------
402  // Build Message Header
403  // ----------------------------------------------------------------------
404 
405  int nPort = (pInfo->m_qURL.port()>=0) ? pInfo->m_qURL.port() : 80;
406  QString sHost = QString( "%1:%2" ).arg( pInfo->m_qURL.host() )
407  .arg( nPort );
408 
409  tsMsg << "NOTIFY " << pInfo->m_qURL.path() << " HTTP/1.1\r\n";
410  tsMsg << "HOST: " << sHost << "\r\n";
411  tsMsg << "CONTENT-TYPE: \"text/xml\"\r\n";
412  tsMsg << "Content-Length: " << QString::number( aBody.size() ) << "\r\n";
413  tsMsg << "NT: upnp:event\r\n";
414  tsMsg << "NTS: upnp:propchange\r\n";
415  tsMsg << "SID: uuid:" << pInfo->m_sUUID << "\r\n";
416  tsMsg << "SEQ: " << QString::number( pInfo->m_nKey ) << "\r\n";
417  tsMsg << "\r\n";
418  tsMsg << aBody;
419  tsMsg << QT_FLUSH;
420 
421  // ------------------------------------------------------------------
422  // Add new EventTask to the TaskQueue to do the actual sending.
423  // ------------------------------------------------------------------
424 
425  LOG(VB_UPNP, LOG_INFO,
426  QString("UPnp::Eventing::NotifySubscriber( %1 ) : %2 Variables")
427  .arg( sHost ).arg(nCount));
428 
429  auto *pEventTask = new UPnpEventTask(QHostAddress(pInfo->m_qURL.host()),
430  nPort, pBuffer);
431 
432  TaskQueue::Instance()->AddTask( 250ms, pEventTask );
433 
434  pEventTask->DecrRef();
435 
436  // ------------------------------------------------------------------
437  // Update the subscribers Key & last Notified fields
438  // ------------------------------------------------------------------
439 
440  pInfo->IncrementKey();
441 
442  pInfo->m_ttLastNotified = nowAsDuration<std::chrono::microseconds>();
443  }
444 }
445 
HTTPRequest::GetRequestHeader
QString GetRequestHeader(const QString &sKey, const QString &sDefault)
Definition: httprequest.cpp:1109
Eventing::ExecutePostProcess
void ExecutePostProcess() override
Definition: eventing.cpp:186
HTTPRequest::m_sBaseUrl
QString m_sBaseUrl
Definition: httprequest.h:127
Eventing::ReleaseEvents
short ReleaseEvents()
Definition: eventing.cpp:125
HttpServerExtension::m_nSupportedMethods
uint m_nSupportedMethods
Definition: httpserver.h:83
HTTPRequest
Definition: httprequest.h:109
Eventing::m_subscribers
Subscribers m_subscribers
Definition: eventing.h:256
TaskTime
std::chrono::microseconds TaskTime
Definition: upnputil.h:31
Eventing::GetBasePaths
QStringList GetBasePaths() override
Definition: eventing.cpp:143
Eventing::m_sEventMethodName
QString m_sEventMethodName
Definition: eventing.h:255
HTTPRequest::m_sMethod
QString m_sMethod
Definition: httprequest.h:129
StateVariables::m_map
SVMap m_map
Definition: eventing.h:171
SubscriberInfo::m_nKey
unsigned short m_nKey
Definition: eventing.h:67
LOG
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
Definition: mythlogging.h:39
Eventing::ProcessRequest
bool ProcessRequest(HTTPRequest *pRequest) override
Definition: eventing.cpp:155
SubscriberInfo::m_nDuration
std::chrono::seconds m_nDuration
Definition: eventing.h:68
RequestTypeSubscribe
@ RequestTypeSubscribe
Definition: httprequest.h:58
Eventing::m_pInitializeSubscriber
SubscriberInfo * m_pInitializeSubscriber
Definition: eventing.h:262
RequestTypeUnsubscribe
@ RequestTypeUnsubscribe
Definition: httprequest.h:59
UPnp::FormatErrorResponse
static void FormatErrorResponse(HTTPRequest *pRequest, UPnPResultCode eCode, const QString &sMsg="")
Definition: upnp.cpp:241
HTTPRequest::m_mapRespHeaders
QStringMap m_mapRespHeaders
Definition: httprequest.h:153
Eventing::HandleSubscribe
void HandleSubscribe(HTTPRequest *pRequest)
Definition: eventing.cpp:203
eventing.h
upnp.h
HTTPRequest::m_nResponseStatus
long m_nResponseStatus
Definition: httprequest.h:152
mythlogging.h
Eventing::HandleUnsubscribe
void HandleUnsubscribe(HTTPRequest *pRequest)
Definition: eventing.cpp:302
upnptaskevent.h
SubscriberInfo
Definition: eventing.h:30
TaskQueue::Instance
static TaskQueue * Instance()
Definition: taskqueue.cpp:59
HTTPRequest::m_pPostProcess
IPostProcess * m_pPostProcess
Definition: httprequest.h:159
SubscriberInfo::m_sUUID
QString m_sUUID
Definition: eventing.h:65
Eventing::NotifySubscriber
void NotifySubscriber(SubscriberInfo *pInfo)
Definition: eventing.cpp:368
Eventing::Eventing
Eventing(const QString &sExtensionName, QString sEventMethodName, const QString &sSharePath)
Definition: eventing.cpp:74
SubscriberInfo::IncrementKey
unsigned long IncrementKey()
Definition: eventing.h:53
ResponseTypeXML
@ ResponseTypeXML
Definition: httprequest.h:78
uint
unsigned int uint
Definition: compat.h:79
SubscriberInfo::m_qURL
QUrl m_qURL
Definition: eventing.h:66
Eventing::Notify
void Notify() override
Definition: eventing.cpp:332
SubscriberInfo::m_ttLastNotified
TaskTime m_ttLastNotified
Definition: eventing.h:63
IPostProcess
Definition: httprequest.h:98
QT_FLUSH
#define QT_FLUSH
Definition: eventing.cpp:35
UPnPResult_InvalidAction
@ UPnPResult_InvalidAction
Definition: upnp.h:39
MythCoreContext
This class contains the runtime context for MythTV.
Definition: mythcorecontext.h:65
Eventing::m_mutex
QMutex m_mutex
Definition: eventing.h:253
mythcorecontext.h
Eventing::m_nSubscriptionDuration
std::chrono::seconds m_nSubscriptionDuration
Definition: eventing.h:258
std
Definition: mythchrono.h:23
HTTPRequest::m_eResponseType
HttpResponseType m_eResponseType
Definition: httprequest.h:149
UPnpEventTask
Definition: upnptaskevent.h:27
TaskQueue::AddTask
void AddTask(std::chrono::milliseconds msec, Task *pTask)
Add a task to run in the future.
Definition: taskqueue.cpp:172
configuration.h
StateVariables::BuildNotifyBody
uint BuildNotifyBody(QTextStream &ts, TaskTime ttLastNotified) const
Definition: eventing.cpp:42
HttpServerExtension
Definition: httpserver.h:71
Eventing::m_nHoldCount
short m_nHoldCount
Definition: eventing.h:260
QT_ENDL
#define QT_ENDL
Definition: eventing.cpp:34
HTTPRequest::m_eType
HttpRequestType m_eType
Definition: httprequest.h:120
Eventing::HoldEvents
short HoldEvents()
Definition: eventing.cpp:100
Eventing::~Eventing
~Eventing() override
Definition: eventing.cpp:89