MythTV  master
eventing.cpp
Go to the documentation of this file.
1 // Program Name: eventing.cpp
3 // Created : Dec. 22, 2006
4 //
5 // Purpose : uPnp Eventing Base Class Implementation
6 //
7 // Copyright (c) 2006 David Blain <dblain@mythtv.org>
8 //
9 // Licensed under the GPL v2 or later, see COPYING for details
10 //
12 
13 #include <cmath>
14 
15 #include <QStringList>
16 #if QT_VERSION >= QT_VERSION_CHECK(6,0,0)
17 #include <QStringConverter>
18 #else
19 #include <QTextCodec>
20 #endif
21 #include <QTextStream>
22 
23 #include "upnp.h"
24 #include "eventing.h"
25 #include "upnptaskevent.h"
26 #include "mythlogging.h"
27 
28 #if QT_VERSION < QT_VERSION_CHECK(5,14,0)
29  #define QT_ENDL endl
30  #define QT_FLUSH flush
31 #else
32  #define QT_ENDL Qt::endl
33  #define QT_FLUSH Qt::flush
34 #endif
35 
37 //
39 
41  QTextStream &ts, TaskTime ttLastNotified) const
42 {
43  uint nCount = 0;
44 
45  ts << "<?xml version=\"1.0\"?>" << QT_ENDL
46  << "<e:propertyset xmlns:e=\"urn:schemas-upnp-org:event-1-0\">" << QT_ENDL;
47 
48  for (auto *prop : qAsConst(m_map))
49  {
50  if ( ttLastNotified < prop->m_ttLastChanged )
51  {
52  nCount++;
53 
54  ts << "<e:property>" << QT_ENDL;
55  ts << "<" << prop->m_sName << ">";
56  ts << prop->ToString();
57  ts << "</" << prop->m_sName << ">";
58  ts << "</e:property>" << QT_ENDL;
59  }
60  }
61 
62  ts << "</e:propertyset>" << QT_ENDL;
63  ts << QT_FLUSH;
64 
65  return nCount;
66 }
67 
69 //
71 
72 Eventing::Eventing(const QString &sExtensionName,
73  QString sEventMethodName,
74  const QString &sSharePath) :
75  HttpServerExtension(sExtensionName, sSharePath),
76  m_sEventMethodName(std::move(sEventMethodName)),
77  m_nSubscriptionDuration(
78  UPnp::GetConfiguration()->GetDuration<std::chrono::seconds>("UPnP/SubscriptionDuration", 30min))
79 {
81 }
82 
84 //
86 
88 {
89  for (const auto *subscriber : qAsConst(m_subscribers))
90  delete subscriber;
91  m_subscribers.clear();
92 }
93 
95 //
97 
98 inline short Eventing::HoldEvents()
99 {
100  // -=>TODO: Should use an Atomic increment...
101  // need to research available functions.
102 
103  m_mutex.lock();
104  bool err = (m_nHoldCount >= 127);
105  short nVal = (m_nHoldCount++);
106  m_mutex.unlock();
107 
108  if (err)
109  {
110  LOG(VB_GENERAL, LOG_ERR, "Exceeded maximum guarranteed range of "
111  "m_nHoldCount short [-128..127]");
112  LOG(VB_GENERAL, LOG_ERR,
113  "UPnP may not exhibit strange behavior or crash mythtv");
114  }
115 
116  return nVal;
117 }
118 
120 //
122 
124 {
125  // -=>TODO: Should use an Atomic decrement...
126 
127  m_mutex.lock();
128  short nVal = (m_nHoldCount--);
129  m_mutex.unlock();
130 
131  if (nVal == 0)
132  Notify();
133 
134  return nVal;
135 }
136 
138 //
140 
142 {
143  // -=>TODO: This isn't very efficient... Need to find out if we can make
144  // this something unique, other than root.
145 
146  return QStringList( "/" );
147 }
148 
150 //
152 
154 {
155  if (pRequest)
156  {
157  if ( pRequest->m_sBaseUrl != "/" )
158  return false;
159 
160  if ( pRequest->m_sMethod != m_sEventMethodName )
161  return false;
162 
163  LOG(VB_UPNP, LOG_INFO, QString("Eventing::ProcessRequest - Method (%1)")
164  .arg(pRequest->m_sMethod ));
165 
166  switch( pRequest->m_eType )
167  {
168  case RequestTypeSubscribe : HandleSubscribe ( pRequest ); break;
169  case RequestTypeUnsubscribe : HandleUnsubscribe ( pRequest ); break;
170  default:
172  break;
173  }
174  }
175 
176  return( true );
177 
178 }
179 
181 //
183 
185 {
186  // Use PostProcessing Hook to perform Initial Notification
187  // to make sure they receive it AFTER the subscription results
188 
189  if (m_pInitializeSubscriber != nullptr)
190  {
192 
193  m_pInitializeSubscriber = nullptr;
194  }
195 }
196 
198 //
200 
202 {
203  pRequest->m_eResponseType = ResponseTypeXML;
204  pRequest->m_nResponseStatus = 412;
205 
206  QString sCallBack = pRequest->GetRequestHeader( "CALLBACK", "" );
207  QString sNT = pRequest->GetRequestHeader( "NT" , "" );
208  QString sTimeout = pRequest->GetRequestHeader( "TIMEOUT" , "" );
209  QString sSID = pRequest->GetRequestHeader( "SID" , "" );
210 
211  SubscriberInfo *pInfo = nullptr;
212 
213  // ----------------------------------------------------------------------
214  // Validate Header Values...
215  // ----------------------------------------------------------------------
216 
217  // -=>TODO: Need to add support for more than one CallBack URL.
218 
219  if ( sCallBack.length() != 0 )
220  {
221  // ------------------------------------------------------------------
222  // New Subscription
223  // ------------------------------------------------------------------
224 
225  if ( sSID.length() != 0 )
226  {
227  pRequest->m_nResponseStatus = 400;
228  return;
229  }
230 
231  if ( sNT != "upnp:event" )
232  return;
233 
234  // ----------------------------------------------------------------------
235  // Process Subscription
236  // ----------------------------------------------------------------------
237 
238  // -=>TODO: Temp code until support for multiple callbacks are supported.
239 
240  sCallBack = sCallBack.mid( 1, sCallBack.indexOf(">") - 1);
241 
242  std::chrono::seconds nDuration = m_nSubscriptionDuration;
243  if ( sTimeout.startsWith("Second-") )
244  {
245  bool ok = false;
246  auto nValue = std::chrono::seconds(sTimeout.section("-", 1).toInt(&ok));
247  if (ok)
248  nDuration = nValue;
249  }
250 
251  pInfo = new SubscriberInfo( sCallBack, nDuration );
252 
253  Subscribers::iterator it = m_subscribers.find(pInfo->m_sUUID);
254  if (it != m_subscribers.end())
255  {
256  delete *it;
257  m_subscribers.erase(it);
258  }
259  m_subscribers[pInfo->m_sUUID] = pInfo;
260 
261  // Use PostProcess Hook to Send Initial FULL Notification...
262  // *** Must send this response first then notify.
263 
264  m_pInitializeSubscriber = pInfo;
265  pRequest->m_pPostProcess = (IPostProcess *)this;
266 
267  }
268  else
269  {
270  // ------------------------------------------------------------------
271  // Renewal
272  // ------------------------------------------------------------------
273 
274  if ( sSID.length() != 0 )
275  {
276  sSID = sSID.mid( 5 );
277  pInfo = m_subscribers[sSID];
278  }
279 
280  }
281 
282  if (pInfo != nullptr)
283  {
284  pRequest->m_mapRespHeaders[ "SID" ] = QString( "uuid:%1" )
285  .arg( pInfo->m_sUUID );
286 
287  pRequest->m_mapRespHeaders[ "TIMEOUT"] = QString( "Second-%1" )
288  .arg( pInfo->m_nDuration.count() );
289 
290  pRequest->m_nResponseStatus = 200;
291 
292  }
293 
294 }
295 
297 //
299 
301 {
302  pRequest->m_eResponseType = ResponseTypeXML;
303  pRequest->m_nResponseStatus = 412;
304 
305  QString sCallBack = pRequest->GetRequestHeader( "CALLBACK", "" );
306  QString sNT = pRequest->GetRequestHeader( "NT" , "" );
307  QString sSID = pRequest->GetRequestHeader( "SID" , "" );
308 
309  if ((sCallBack.length() != 0) || (sNT.length() != 0))
310  {
311  pRequest->m_nResponseStatus = 400;
312  return;
313  }
314 
315  sSID = sSID.mid( 5 );
316 
317  Subscribers::iterator it = m_subscribers.find(sSID);
318  if (it != m_subscribers.end())
319  {
320  delete *it;
321  m_subscribers.erase(it);
322  pRequest->m_nResponseStatus = 200;
323  }
324 }
325 
327 //
329 
331 {
332  auto tt = nowAsDuration<std::chrono::microseconds>();
333 
334  m_mutex.lock();
335 
336  Subscribers::iterator it = m_subscribers.begin();
337  while (it != m_subscribers.end())
338  {
339  if (!(*it))
340  { // This should never happen, but if someone inserted bad data...
341  ++it;
342  continue;
343  }
344 
345  if (tt < (*it)->m_ttExpires)
346  {
347  // Subscription not expired yet. Send event notification.
348  NotifySubscriber(*it);
349  ++it;
350  }
351  else
352  {
353  // Time to expire this subscription. Remove subscriber from list.
354  delete *it;
355  it = m_subscribers.erase(it);
356  }
357  }
358 
359  m_mutex.unlock();
360 }
361 
363 //
365 
367 {
368  if (pInfo == nullptr)
369  return;
370 
371  QByteArray aBody;
372  QTextStream tsBody( &aBody, QIODevice::WriteOnly );
373 
374 #if QT_VERSION < QT_VERSION_CHECK(6,0,0)
375  tsBody.setCodec(QTextCodec::codecForName("UTF-8"));
376 #else
377  tsBody.setEncoding(QStringConverter::Utf8);
378 #endif
379 
380  // ----------------------------------------------------------------------
381  // Build Body... Only send if there are changes
382  // ----------------------------------------------------------------------
383 
384  uint nCount = BuildNotifyBody(tsBody, pInfo->m_ttLastNotified);
385  if (nCount)
386  {
387 
388  // -=>TODO: Need to add support for more than one CallBack URL.
389 
390  auto *pBuffer = new QByteArray(); // UPnpEventTask will delete this pointer.
391  QTextStream tsMsg( pBuffer, QIODevice::WriteOnly );
392 
393 #if QT_VERSION < QT_VERSION_CHECK(6,0,0)
394  tsMsg.setCodec(QTextCodec::codecForName("UTF-8"));
395 #else
396  tsMsg.setEncoding(QStringConverter::Utf8);
397 #endif
398 
399  // ----------------------------------------------------------------------
400  // Build Message Header
401  // ----------------------------------------------------------------------
402 
403  int nPort = (pInfo->m_qURL.port()>=0) ? pInfo->m_qURL.port() : 80;
404  QString sHost = QString( "%1:%2" ).arg( pInfo->m_qURL.host() )
405  .arg( nPort );
406 
407  tsMsg << "NOTIFY " << pInfo->m_qURL.path() << " HTTP/1.1\r\n";
408  tsMsg << "HOST: " << sHost << "\r\n";
409  tsMsg << "CONTENT-TYPE: \"text/xml\"\r\n";
410  tsMsg << "Content-Length: " << QString::number( aBody.size() ) << "\r\n";
411  tsMsg << "NT: upnp:event\r\n";
412  tsMsg << "NTS: upnp:propchange\r\n";
413  tsMsg << "SID: uuid:" << pInfo->m_sUUID << "\r\n";
414  tsMsg << "SEQ: " << QString::number( pInfo->m_nKey ) << "\r\n";
415  tsMsg << "\r\n";
416  tsMsg << aBody;
417  tsMsg << QT_FLUSH;
418 
419  // ------------------------------------------------------------------
420  // Add new EventTask to the TaskQueue to do the actual sending.
421  // ------------------------------------------------------------------
422 
423  LOG(VB_UPNP, LOG_INFO,
424  QString("UPnp::Eventing::NotifySubscriber( %1 ) : %2 Variables")
425  .arg( sHost ).arg(nCount));
426 
427  auto *pEventTask = new UPnpEventTask(QHostAddress(pInfo->m_qURL.host()),
428  nPort, pBuffer);
429 
430  TaskQueue::Instance()->AddTask( 250ms, pEventTask );
431 
432  pEventTask->DecrRef();
433 
434  // ------------------------------------------------------------------
435  // Update the subscribers Key & last Notified fields
436  // ------------------------------------------------------------------
437 
438  pInfo->IncrementKey();
439 
440  pInfo->m_ttLastNotified = nowAsDuration<std::chrono::microseconds>();
441  }
442 }
443 
HTTPRequest::GetRequestHeader
QString GetRequestHeader(const QString &sKey, const QString &sDefault)
Definition: httprequest.cpp:1179
Eventing::ExecutePostProcess
void ExecutePostProcess() override
Definition: eventing.cpp:184
HTTPRequest::m_sBaseUrl
QString m_sBaseUrl
Definition: httprequest.h:126
Eventing::ReleaseEvents
short ReleaseEvents()
Definition: eventing.cpp:123
HttpServerExtension::m_nSupportedMethods
uint m_nSupportedMethods
Definition: httpserver.h:79
HTTPRequest
Definition: httprequest.h:108
Eventing::m_subscribers
Subscribers m_subscribers
Definition: eventing.h:256
TaskTime
std::chrono::microseconds TaskTime
Definition: upnputil.h:43
Eventing::GetBasePaths
QStringList GetBasePaths() override
Definition: eventing.cpp:141
Eventing::m_sEventMethodName
QString m_sEventMethodName
Definition: eventing.h:255
HTTPRequest::m_sMethod
QString m_sMethod
Definition: httprequest.h:128
StateVariables::m_map
SVMap m_map
Definition: eventing.h:171
SubscriberInfo::m_nKey
unsigned short m_nKey
Definition: eventing.h:67
LOG
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
Definition: mythlogging.h:23
Eventing::ProcessRequest
bool ProcessRequest(HTTPRequest *pRequest) override
Definition: eventing.cpp:153
SubscriberInfo::m_nDuration
std::chrono::seconds m_nDuration
Definition: eventing.h:68
RequestTypeSubscribe
@ RequestTypeSubscribe
Definition: httprequest.h:57
Eventing::m_pInitializeSubscriber
SubscriberInfo * m_pInitializeSubscriber
Definition: eventing.h:262
RequestTypeUnsubscribe
@ RequestTypeUnsubscribe
Definition: httprequest.h:58
UPnp::FormatErrorResponse
static void FormatErrorResponse(HTTPRequest *pRequest, UPnPResultCode eCode, const QString &sMsg="")
Definition: upnp.cpp:280
HTTPRequest::m_mapRespHeaders
QStringMap m_mapRespHeaders
Definition: httprequest.h:152
Eventing::HandleSubscribe
void HandleSubscribe(HTTPRequest *pRequest)
Definition: eventing.cpp:201
eventing.h
upnp.h
HTTPRequest::m_nResponseStatus
long m_nResponseStatus
Definition: httprequest.h:151
mythlogging.h
Eventing::HandleUnsubscribe
void HandleUnsubscribe(HTTPRequest *pRequest)
Definition: eventing.cpp:300
upnptaskevent.h
SubscriberInfo
Definition: eventing.h:30
TaskQueue::Instance
static TaskQueue * Instance()
Definition: taskqueue.cpp:58
HTTPRequest::m_pPostProcess
IPostProcess * m_pPostProcess
Definition: httprequest.h:158
SubscriberInfo::m_sUUID
QString m_sUUID
Definition: eventing.h:65
Eventing::NotifySubscriber
void NotifySubscriber(SubscriberInfo *pInfo)
Definition: eventing.cpp:366
Eventing::Eventing
Eventing(const QString &sExtensionName, QString sEventMethodName, const QString &sSharePath)
Definition: eventing.cpp:72
SubscriberInfo::IncrementKey
unsigned long IncrementKey()
Definition: eventing.h:53
ResponseTypeXML
@ ResponseTypeXML
Definition: httprequest.h:77
uint
unsigned int uint
Definition: compat.h:140
SubscriberInfo::m_qURL
QUrl m_qURL
Definition: eventing.h:66
Eventing::Notify
void Notify() override
Definition: eventing.cpp:330
SubscriberInfo::m_ttLastNotified
TaskTime m_ttLastNotified
Definition: eventing.h:63
IPostProcess
Definition: httprequest.h:97
QT_FLUSH
#define QT_FLUSH
Definition: eventing.cpp:33
UPnPResult_InvalidAction
@ UPnPResult_InvalidAction
Definition: upnp.h:40
UPnp
Definition: upnp.h:99
Eventing::m_mutex
QMutex m_mutex
Definition: eventing.h:253
Eventing::m_nSubscriptionDuration
std::chrono::seconds m_nSubscriptionDuration
Definition: eventing.h:258
std
Definition: mythchrono.h:23
HTTPRequest::m_eResponseType
HttpResponseType m_eResponseType
Definition: httprequest.h:148
UPnpEventTask
Definition: upnptaskevent.h:27
TaskQueue::AddTask
void AddTask(std::chrono::milliseconds msec, Task *pTask)
Add a task to run in the future.
Definition: taskqueue.cpp:171
StateVariables::BuildNotifyBody
uint BuildNotifyBody(QTextStream &ts, TaskTime ttLastNotified) const
Definition: eventing.cpp:40
HttpServerExtension
Definition: httpserver.h:69
Eventing::m_nHoldCount
short m_nHoldCount
Definition: eventing.h:260
QT_ENDL
#define QT_ENDL
Definition: eventing.cpp:32
HTTPRequest::m_eType
HttpRequestType m_eType
Definition: httprequest.h:119
Eventing::HoldEvents
short HoldEvents()
Definition: eventing.cpp:98
Eventing::~Eventing
~Eventing() override
Definition: eventing.cpp:87