MythTV  master
eventing.cpp
Go to the documentation of this file.
1 // Program Name: eventing.cpp
3 // Created : Dec. 22, 2006
4 //
5 // Purpose : uPnp Eventing Base Class Implementation
6 //
7 // Copyright (c) 2006 David Blain <dblain@mythtv.org>
8 //
9 // Licensed under the GPL v2 or later, see LICENSE for details
10 //
12 
13 #include <cmath>
14 
15 #include <QStringList>
16 #if QT_VERSION >= QT_VERSION_CHECK(6,0,0)
17 #include <QStringConverter>
18 #else
19 #include <QTextCodec>
20 #endif
21 #include <QTextStream>
22 
23 #include "upnp.h"
24 #include "eventing.h"
25 #include "upnptaskevent.h"
28 
29 #if QT_VERSION < QT_VERSION_CHECK(5,14,0)
30  #define QT_ENDL endl
31  #define QT_FLUSH flush
32 #else
33  #define QT_ENDL Qt::endl
34  #define QT_FLUSH Qt::flush
35 #endif
36 
38 //
40 
42  QTextStream &ts, TaskTime ttLastNotified) const
43 {
44  uint nCount = 0;
45 
46  ts << "<?xml version=\"1.0\"?>" << QT_ENDL
47  << "<e:propertyset xmlns:e=\"urn:schemas-upnp-org:event-1-0\">" << QT_ENDL;
48 
49  for (auto *prop : qAsConst(m_map))
50  {
51  if ( ttLastNotified < prop->m_ttLastChanged )
52  {
53  nCount++;
54 
55  ts << "<e:property>" << QT_ENDL;
56  ts << "<" << prop->m_sName << ">";
57  ts << prop->ToString();
58  ts << "</" << prop->m_sName << ">";
59  ts << "</e:property>" << QT_ENDL;
60  }
61  }
62 
63  ts << "</e:propertyset>" << QT_ENDL;
64  ts << QT_FLUSH;
65 
66  return nCount;
67 }
68 
70 //
72 
73 Eventing::Eventing(const QString &sExtensionName,
74  QString sEventMethodName,
75  const QString &sSharePath) :
76  HttpServerExtension(sExtensionName, sSharePath),
77  m_sEventMethodName(std::move(sEventMethodName)),
78  m_nSubscriptionDuration(
79  XmlConfiguration().GetDuration<std::chrono::seconds>("UPnP/SubscriptionDuration", 30min))
80 {
82 }
83 
85 //
87 
89 {
90  for (const auto *subscriber : qAsConst(m_subscribers))
91  delete subscriber;
92  m_subscribers.clear();
93 }
94 
96 //
98 
99 inline short Eventing::HoldEvents()
100 {
101  // -=>TODO: Should use an Atomic increment...
102  // need to research available functions.
103 
104  m_mutex.lock();
105  bool err = (m_nHoldCount >= 127);
106  short nVal = (m_nHoldCount++);
107  m_mutex.unlock();
108 
109  if (err)
110  {
111  LOG(VB_GENERAL, LOG_ERR, "Exceeded maximum guarranteed range of "
112  "m_nHoldCount short [-128..127]");
113  LOG(VB_GENERAL, LOG_ERR,
114  "UPnP may not exhibit strange behavior or crash mythtv");
115  }
116 
117  return nVal;
118 }
119 
121 //
123 
125 {
126  // -=>TODO: Should use an Atomic decrement...
127 
128  m_mutex.lock();
129  short nVal = (m_nHoldCount--);
130  m_mutex.unlock();
131 
132  if (nVal == 0)
133  Notify();
134 
135  return nVal;
136 }
137 
139 //
141 
143 {
144  // -=>TODO: This isn't very efficient... Need to find out if we can make
145  // this something unique, other than root.
146 
147  return QStringList( "/" );
148 }
149 
151 //
153 
155 {
156  if (pRequest)
157  {
158  if ( pRequest->m_sBaseUrl != "/" )
159  return false;
160 
161  if ( pRequest->m_sMethod != m_sEventMethodName )
162  return false;
163 
164  LOG(VB_UPNP, LOG_INFO, QString("Eventing::ProcessRequest - Method (%1)")
165  .arg(pRequest->m_sMethod ));
166 
167  switch( pRequest->m_eType )
168  {
169  case RequestTypeSubscribe : HandleSubscribe ( pRequest ); break;
170  case RequestTypeUnsubscribe : HandleUnsubscribe ( pRequest ); break;
171  default:
173  break;
174  }
175  }
176 
177  return( true );
178 
179 }
180 
182 //
184 
186 {
187  // Use PostProcessing Hook to perform Initial Notification
188  // to make sure they receive it AFTER the subscription results
189 
190  if (m_pInitializeSubscriber != nullptr)
191  {
193 
194  m_pInitializeSubscriber = nullptr;
195  }
196 }
197 
199 //
201 
203 {
204  pRequest->m_eResponseType = ResponseTypeXML;
205  pRequest->m_nResponseStatus = 412;
206 
207  QString sCallBack = pRequest->GetRequestHeader( "CALLBACK", "" );
208  QString sNT = pRequest->GetRequestHeader( "NT" , "" );
209  QString sTimeout = pRequest->GetRequestHeader( "TIMEOUT" , "" );
210  QString sSID = pRequest->GetRequestHeader( "SID" , "" );
211 
212  SubscriberInfo *pInfo = nullptr;
213 
214  // ----------------------------------------------------------------------
215  // Validate Header Values...
216  // ----------------------------------------------------------------------
217 
218  // -=>TODO: Need to add support for more than one CallBack URL.
219 
220  if ( sCallBack.length() != 0 )
221  {
222  // ------------------------------------------------------------------
223  // New Subscription
224  // ------------------------------------------------------------------
225 
226  if ( sSID.length() != 0 )
227  {
228  pRequest->m_nResponseStatus = 400;
229  return;
230  }
231 
232  if ( sNT != "upnp:event" )
233  return;
234 
235  // ----------------------------------------------------------------------
236  // Process Subscription
237  // ----------------------------------------------------------------------
238 
239  // -=>TODO: Temp code until support for multiple callbacks are supported.
240 
241  sCallBack = sCallBack.mid( 1, sCallBack.indexOf(">") - 1);
242 
243  std::chrono::seconds nDuration = m_nSubscriptionDuration;
244  if ( sTimeout.startsWith("Second-") )
245  {
246  bool ok = false;
247  auto nValue = std::chrono::seconds(sTimeout.section("-", 1).toInt(&ok));
248  if (ok)
249  nDuration = nValue;
250  }
251 
252  pInfo = new SubscriberInfo( sCallBack, nDuration );
253 
254  Subscribers::iterator it = m_subscribers.find(pInfo->m_sUUID);
255  if (it != m_subscribers.end())
256  {
257  delete *it;
258  m_subscribers.erase(it);
259  }
260  m_subscribers[pInfo->m_sUUID] = pInfo;
261 
262  // Use PostProcess Hook to Send Initial FULL Notification...
263  // *** Must send this response first then notify.
264 
265  m_pInitializeSubscriber = pInfo;
266  pRequest->m_pPostProcess = (IPostProcess *)this;
267 
268  }
269  else
270  {
271  // ------------------------------------------------------------------
272  // Renewal
273  // ------------------------------------------------------------------
274 
275  if ( sSID.length() != 0 )
276  {
277  sSID = sSID.mid( 5 );
278  pInfo = m_subscribers[sSID];
279  }
280 
281  }
282 
283  if (pInfo != nullptr)
284  {
285  pRequest->m_mapRespHeaders[ "SID" ] = QString( "uuid:%1" )
286  .arg( pInfo->m_sUUID );
287 
288  pRequest->m_mapRespHeaders[ "TIMEOUT"] = QString( "Second-%1" )
289  .arg( pInfo->m_nDuration.count() );
290 
291  pRequest->m_nResponseStatus = 200;
292 
293  }
294 
295 }
296 
298 //
300 
302 {
303  pRequest->m_eResponseType = ResponseTypeXML;
304  pRequest->m_nResponseStatus = 412;
305 
306  QString sCallBack = pRequest->GetRequestHeader( "CALLBACK", "" );
307  QString sNT = pRequest->GetRequestHeader( "NT" , "" );
308  QString sSID = pRequest->GetRequestHeader( "SID" , "" );
309 
310  if ((sCallBack.length() != 0) || (sNT.length() != 0))
311  {
312  pRequest->m_nResponseStatus = 400;
313  return;
314  }
315 
316  sSID = sSID.mid( 5 );
317 
318  Subscribers::iterator it = m_subscribers.find(sSID);
319  if (it != m_subscribers.end())
320  {
321  delete *it;
322  m_subscribers.erase(it);
323  pRequest->m_nResponseStatus = 200;
324  }
325 }
326 
328 //
330 
332 {
333  auto tt = nowAsDuration<std::chrono::microseconds>();
334 
335  m_mutex.lock();
336 
337  Subscribers::iterator it = m_subscribers.begin();
338  while (it != m_subscribers.end())
339  {
340  if (!(*it))
341  { // This should never happen, but if someone inserted bad data...
342  ++it;
343  continue;
344  }
345 
346  if (tt < (*it)->m_ttExpires)
347  {
348  // Subscription not expired yet. Send event notification.
349  NotifySubscriber(*it);
350  ++it;
351  }
352  else
353  {
354  // Time to expire this subscription. Remove subscriber from list.
355  delete *it;
356  it = m_subscribers.erase(it);
357  }
358  }
359 
360  m_mutex.unlock();
361 }
362 
364 //
366 
368 {
369  if (pInfo == nullptr)
370  return;
371 
372  QByteArray aBody;
373  QTextStream tsBody( &aBody, QIODevice::WriteOnly );
374 
375 #if QT_VERSION < QT_VERSION_CHECK(6,0,0)
376  tsBody.setCodec(QTextCodec::codecForName("UTF-8"));
377 #else
378  tsBody.setEncoding(QStringConverter::Utf8);
379 #endif
380 
381  // ----------------------------------------------------------------------
382  // Build Body... Only send if there are changes
383  // ----------------------------------------------------------------------
384 
385  uint nCount = BuildNotifyBody(tsBody, pInfo->m_ttLastNotified);
386  if (nCount)
387  {
388 
389  // -=>TODO: Need to add support for more than one CallBack URL.
390 
391  auto *pBuffer = new QByteArray(); // UPnpEventTask will delete this pointer.
392  QTextStream tsMsg( pBuffer, QIODevice::WriteOnly );
393 
394 #if QT_VERSION < QT_VERSION_CHECK(6,0,0)
395  tsMsg.setCodec(QTextCodec::codecForName("UTF-8"));
396 #else
397  tsMsg.setEncoding(QStringConverter::Utf8);
398 #endif
399 
400  // ----------------------------------------------------------------------
401  // Build Message Header
402  // ----------------------------------------------------------------------
403 
404  int nPort = (pInfo->m_qURL.port()>=0) ? pInfo->m_qURL.port() : 80;
405  QString sHost = QString( "%1:%2" ).arg( pInfo->m_qURL.host() )
406  .arg( nPort );
407 
408  tsMsg << "NOTIFY " << pInfo->m_qURL.path() << " HTTP/1.1\r\n";
409  tsMsg << "HOST: " << sHost << "\r\n";
410  tsMsg << "CONTENT-TYPE: \"text/xml\"\r\n";
411  tsMsg << "Content-Length: " << QString::number( aBody.size() ) << "\r\n";
412  tsMsg << "NT: upnp:event\r\n";
413  tsMsg << "NTS: upnp:propchange\r\n";
414  tsMsg << "SID: uuid:" << pInfo->m_sUUID << "\r\n";
415  tsMsg << "SEQ: " << QString::number( pInfo->m_nKey ) << "\r\n";
416  tsMsg << "\r\n";
417  tsMsg << aBody;
418  tsMsg << QT_FLUSH;
419 
420  // ------------------------------------------------------------------
421  // Add new EventTask to the TaskQueue to do the actual sending.
422  // ------------------------------------------------------------------
423 
424  LOG(VB_UPNP, LOG_INFO,
425  QString("UPnp::Eventing::NotifySubscriber( %1 ) : %2 Variables")
426  .arg( sHost ).arg(nCount));
427 
428  auto *pEventTask = new UPnpEventTask(QHostAddress(pInfo->m_qURL.host()),
429  nPort, pBuffer);
430 
431  TaskQueue::Instance()->AddTask( 250ms, pEventTask );
432 
433  pEventTask->DecrRef();
434 
435  // ------------------------------------------------------------------
436  // Update the subscribers Key & last Notified fields
437  // ------------------------------------------------------------------
438 
439  pInfo->IncrementKey();
440 
441  pInfo->m_ttLastNotified = nowAsDuration<std::chrono::microseconds>();
442  }
443 }
444 
HTTPRequest::GetRequestHeader
QString GetRequestHeader(const QString &sKey, const QString &sDefault)
Definition: httprequest.cpp:1109
Eventing::ExecutePostProcess
void ExecutePostProcess() override
Definition: eventing.cpp:185
HTTPRequest::m_sBaseUrl
QString m_sBaseUrl
Definition: httprequest.h:127
Eventing::ReleaseEvents
short ReleaseEvents()
Definition: eventing.cpp:124
HttpServerExtension::m_nSupportedMethods
uint m_nSupportedMethods
Definition: httpserver.h:83
HTTPRequest
Definition: httprequest.h:109
Eventing::m_subscribers
Subscribers m_subscribers
Definition: eventing.h:256
TaskTime
std::chrono::microseconds TaskTime
Definition: upnputil.h:31
Eventing::GetBasePaths
QStringList GetBasePaths() override
Definition: eventing.cpp:142
Eventing::m_sEventMethodName
QString m_sEventMethodName
Definition: eventing.h:255
HTTPRequest::m_sMethod
QString m_sMethod
Definition: httprequest.h:129
StateVariables::m_map
SVMap m_map
Definition: eventing.h:171
SubscriberInfo::m_nKey
unsigned short m_nKey
Definition: eventing.h:67
LOG
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
Definition: mythlogging.h:39
Eventing::ProcessRequest
bool ProcessRequest(HTTPRequest *pRequest) override
Definition: eventing.cpp:154
SubscriberInfo::m_nDuration
std::chrono::seconds m_nDuration
Definition: eventing.h:68
RequestTypeSubscribe
@ RequestTypeSubscribe
Definition: httprequest.h:58
Eventing::m_pInitializeSubscriber
SubscriberInfo * m_pInitializeSubscriber
Definition: eventing.h:262
RequestTypeUnsubscribe
@ RequestTypeUnsubscribe
Definition: httprequest.h:59
UPnp::FormatErrorResponse
static void FormatErrorResponse(HTTPRequest *pRequest, UPnPResultCode eCode, const QString &sMsg="")
Definition: upnp.cpp:241
XmlConfiguration
Definition: configuration.h:38
HTTPRequest::m_mapRespHeaders
QStringMap m_mapRespHeaders
Definition: httprequest.h:153
Eventing::HandleSubscribe
void HandleSubscribe(HTTPRequest *pRequest)
Definition: eventing.cpp:202
eventing.h
upnp.h
HTTPRequest::m_nResponseStatus
long m_nResponseStatus
Definition: httprequest.h:152
mythlogging.h
Eventing::HandleUnsubscribe
void HandleUnsubscribe(HTTPRequest *pRequest)
Definition: eventing.cpp:301
upnptaskevent.h
SubscriberInfo
Definition: eventing.h:30
TaskQueue::Instance
static TaskQueue * Instance()
Definition: taskqueue.cpp:59
HTTPRequest::m_pPostProcess
IPostProcess * m_pPostProcess
Definition: httprequest.h:159
SubscriberInfo::m_sUUID
QString m_sUUID
Definition: eventing.h:65
Eventing::NotifySubscriber
void NotifySubscriber(SubscriberInfo *pInfo)
Definition: eventing.cpp:367
Eventing::Eventing
Eventing(const QString &sExtensionName, QString sEventMethodName, const QString &sSharePath)
Definition: eventing.cpp:73
SubscriberInfo::IncrementKey
unsigned long IncrementKey()
Definition: eventing.h:53
ResponseTypeXML
@ ResponseTypeXML
Definition: httprequest.h:78
uint
unsigned int uint
Definition: compat.h:81
SubscriberInfo::m_qURL
QUrl m_qURL
Definition: eventing.h:66
Eventing::Notify
void Notify() override
Definition: eventing.cpp:331
SubscriberInfo::m_ttLastNotified
TaskTime m_ttLastNotified
Definition: eventing.h:63
IPostProcess
Definition: httprequest.h:98
QT_FLUSH
#define QT_FLUSH
Definition: eventing.cpp:34
UPnPResult_InvalidAction
@ UPnPResult_InvalidAction
Definition: upnp.h:39
Eventing::m_mutex
QMutex m_mutex
Definition: eventing.h:253
Eventing::m_nSubscriptionDuration
std::chrono::seconds m_nSubscriptionDuration
Definition: eventing.h:258
std
Definition: mythchrono.h:23
HTTPRequest::m_eResponseType
HttpResponseType m_eResponseType
Definition: httprequest.h:149
UPnpEventTask
Definition: upnptaskevent.h:27
TaskQueue::AddTask
void AddTask(std::chrono::milliseconds msec, Task *pTask)
Add a task to run in the future.
Definition: taskqueue.cpp:172
configuration.h
StateVariables::BuildNotifyBody
uint BuildNotifyBody(QTextStream &ts, TaskTime ttLastNotified) const
Definition: eventing.cpp:41
HttpServerExtension
Definition: httpserver.h:71
Eventing::m_nHoldCount
short m_nHoldCount
Definition: eventing.h:260
QT_ENDL
#define QT_ENDL
Definition: eventing.cpp:33
HTTPRequest::m_eType
HttpRequestType m_eType
Definition: httprequest.h:120
Eventing::HoldEvents
short HoldEvents()
Definition: eventing.cpp:99
Eventing::~Eventing
~Eventing() override
Definition: eventing.cpp:88