MythTV master
eventing.cpp
Go to the documentation of this file.
1
2// Program Name: eventing.cpp
3// Created : Dec. 22, 2006
4//
5// Purpose : uPnp Eventing Base Class Implementation
6//
7// Copyright (c) 2006 David Blain <dblain@mythtv.org>
8//
9// Licensed under the GPL v2 or later, see LICENSE for details
10//
12#include "eventing.h"
13
14#include <cmath>
15
16#include <QStringList>
17#if QT_VERSION >= QT_VERSION_CHECK(6,0,0)
18#include <QStringConverter>
19#else
20#include <QTextCodec>
21#endif
22#include <QTextStream>
23
24#include "upnp.h"
25#include "taskqueue.h"
26#include "upnptaskevent.h"
29
31//
33
35 QTextStream &ts, std::chrono::microseconds ttLastNotified) const
36{
37 uint nCount = 0;
38
39 ts << "<?xml version=\"1.0\"?>" << Qt::endl
40 << "<e:propertyset xmlns:e=\"urn:schemas-upnp-org:event-1-0\">" << Qt::endl;
41
42 for (auto *prop : std::as_const(m_map))
43 {
44 if ( ttLastNotified < prop->m_ttLastChanged )
45 {
46 nCount++;
47
48 ts << "<e:property>" << Qt::endl;
49 ts << "<" << prop->m_sName << ">";
50 ts << prop->ToString();
51 ts << "</" << prop->m_sName << ">";
52 ts << "</e:property>" << Qt::endl;
53 }
54 }
55
56 ts << "</e:propertyset>" << Qt::endl;
57 ts << Qt::flush;
58
59 return nCount;
60}
61
63//
65
66Eventing::Eventing(const QString &sExtensionName,
67 QString sEventMethodName,
68 const QString &sSharePath) :
69 HttpServerExtension(sExtensionName, sSharePath),
70 m_sEventMethodName(std::move(sEventMethodName)),
71 m_nSubscriptionDuration(
72 XmlConfiguration().GetDuration<std::chrono::seconds>("UPnP/SubscriptionDuration", 30min))
73{
75}
76
78//
80
82{
83 for (const auto *subscriber : std::as_const(m_subscribers))
84 delete subscriber;
85 m_subscribers.clear();
86}
87
89//
91
93{
94 // -=>TODO: Should use an Atomic increment...
95 // need to research available functions.
96
97 m_mutex.lock();
98 bool err = (m_nHoldCount >= 127);
99 short nVal = (m_nHoldCount++);
100 m_mutex.unlock();
101
102 if (err)
103 {
104 LOG(VB_GENERAL, LOG_ERR, "Exceeded maximum guarranteed range of "
105 "m_nHoldCount short [-128..127]");
106 LOG(VB_GENERAL, LOG_ERR,
107 "UPnP may not exhibit strange behavior or crash mythtv");
108 }
109
110 return nVal;
111}
112
114//
116
118{
119 // -=>TODO: Should use an Atomic decrement...
120
121 m_mutex.lock();
122 short nVal = (m_nHoldCount--);
123 m_mutex.unlock();
124
125 if (nVal == 0)
126 Notify();
127
128 return nVal;
129}
130
132//
134
136{
137 // -=>TODO: This isn't very efficient... Need to find out if we can make
138 // this something unique, other than root.
139
140 return QStringList( "/" );
141}
142
144//
146
148{
149 if (pRequest)
150 {
151 if ( pRequest->m_sBaseUrl != "/" )
152 return false;
153
154 if ( pRequest->m_sMethod != m_sEventMethodName )
155 return false;
156
157 LOG(VB_UPNP, LOG_INFO, QString("Eventing::ProcessRequest - Method (%1)")
158 .arg(pRequest->m_sMethod ));
159
160 switch( pRequest->m_eType )
161 {
162 case RequestTypeSubscribe : HandleSubscribe ( pRequest ); break;
163 case RequestTypeUnsubscribe : HandleUnsubscribe ( pRequest ); break;
164 default:
166 break;
167 }
168 }
169
170 return( true );
171
172}
173
175//
177
179{
180 // Use PostProcessing Hook to perform Initial Notification
181 // to make sure they receive it AFTER the subscription results
182
183 if (m_pInitializeSubscriber != nullptr)
184 {
186
187 m_pInitializeSubscriber = nullptr;
188 }
189}
190
192//
194
196{
198 pRequest->m_nResponseStatus = 412;
199
200 QString sCallBack = pRequest->GetRequestHeader( "CALLBACK", "" );
201 QString sNT = pRequest->GetRequestHeader( "NT" , "" );
202 QString sTimeout = pRequest->GetRequestHeader( "TIMEOUT" , "" );
203 QString sSID = pRequest->GetRequestHeader( "SID" , "" );
204
205 SubscriberInfo *pInfo = nullptr;
206
207 // ----------------------------------------------------------------------
208 // Validate Header Values...
209 // ----------------------------------------------------------------------
210
211 // -=>TODO: Need to add support for more than one CallBack URL.
212
213 if ( sCallBack.length() != 0 )
214 {
215 // ------------------------------------------------------------------
216 // New Subscription
217 // ------------------------------------------------------------------
218
219 if ( sSID.length() != 0 )
220 {
221 pRequest->m_nResponseStatus = 400;
222 return;
223 }
224
225 if ( sNT != "upnp:event" )
226 return;
227
228 // ----------------------------------------------------------------------
229 // Process Subscription
230 // ----------------------------------------------------------------------
231
232 // -=>TODO: Temp code until support for multiple callbacks are supported.
233
234 sCallBack = sCallBack.mid( 1, sCallBack.indexOf(">") - 1);
235
236 std::chrono::seconds nDuration = m_nSubscriptionDuration;
237 if ( sTimeout.startsWith("Second-") )
238 {
239 bool ok = false;
240 auto nValue = std::chrono::seconds(sTimeout.section("-", 1).toInt(&ok));
241 if (ok)
242 nDuration = nValue;
243 }
244
245 pInfo = new SubscriberInfo( sCallBack, nDuration );
246
247 Subscribers::iterator it = m_subscribers.find(pInfo->m_sUUID);
248 if (it != m_subscribers.end())
249 {
250 delete *it;
251 m_subscribers.erase(it);
252 }
253 m_subscribers[pInfo->m_sUUID] = pInfo;
254
255 // Use PostProcess Hook to Send Initial FULL Notification...
256 // *** Must send this response first then notify.
257
259 pRequest->m_pPostProcess = (IPostProcess *)this;
260
261 }
262 else
263 {
264 // ------------------------------------------------------------------
265 // Renewal
266 // ------------------------------------------------------------------
267
268 if ( sSID.length() != 0 )
269 {
270 sSID = sSID.mid( 5 );
271 pInfo = m_subscribers[sSID];
272 }
273
274 }
275
276 if (pInfo != nullptr)
277 {
278 pRequest->m_mapRespHeaders[ "SID" ] = QString( "uuid:%1" )
279 .arg( pInfo->m_sUUID );
280
281 pRequest->m_mapRespHeaders[ "TIMEOUT"] = QString( "Second-%1" )
282 .arg( pInfo->m_nDuration.count() );
283
284 pRequest->m_nResponseStatus = 200;
285
286 }
287
288}
289
291//
293
295{
297 pRequest->m_nResponseStatus = 412;
298
299 QString sCallBack = pRequest->GetRequestHeader( "CALLBACK", "" );
300 QString sNT = pRequest->GetRequestHeader( "NT" , "" );
301 QString sSID = pRequest->GetRequestHeader( "SID" , "" );
302
303 if ((sCallBack.length() != 0) || (sNT.length() != 0))
304 {
305 pRequest->m_nResponseStatus = 400;
306 return;
307 }
308
309 sSID = sSID.mid( 5 );
310
311 Subscribers::iterator it = m_subscribers.find(sSID);
312 if (it != m_subscribers.end())
313 {
314 delete *it;
315 m_subscribers.erase(it);
316 pRequest->m_nResponseStatus = 200;
317 }
318}
319
321//
323
325{
326 auto tt = nowAsDuration<std::chrono::microseconds>();
327
328 m_mutex.lock();
329
330 Subscribers::iterator it = m_subscribers.begin();
331 while (it != m_subscribers.end())
332 {
333 if (!(*it))
334 { // This should never happen, but if someone inserted bad data...
335 ++it;
336 continue;
337 }
338
339 if (tt < (*it)->m_ttExpires)
340 {
341 // Subscription not expired yet. Send event notification.
342 NotifySubscriber(*it);
343 ++it;
344 }
345 else
346 {
347 // Time to expire this subscription. Remove subscriber from list.
348 delete *it;
349 it = m_subscribers.erase(it);
350 }
351 }
352
353 m_mutex.unlock();
354}
355
357//
359
361{
362 if (pInfo == nullptr)
363 return;
364
365 QByteArray aBody;
366 QTextStream tsBody( &aBody, QIODevice::WriteOnly );
367
368#if QT_VERSION < QT_VERSION_CHECK(6,0,0)
369 tsBody.setCodec(QTextCodec::codecForName("UTF-8"));
370#else
371 tsBody.setEncoding(QStringConverter::Utf8);
372#endif
373
374 // ----------------------------------------------------------------------
375 // Build Body... Only send if there are changes
376 // ----------------------------------------------------------------------
377
378 uint nCount = BuildNotifyBody(tsBody, pInfo->m_ttLastNotified);
379 if (nCount)
380 {
381
382 // -=>TODO: Need to add support for more than one CallBack URL.
383
384 auto *pBuffer = new QByteArray(); // UPnpEventTask will delete this pointer.
385 QTextStream tsMsg( pBuffer, QIODevice::WriteOnly );
386
387#if QT_VERSION < QT_VERSION_CHECK(6,0,0)
388 tsMsg.setCodec(QTextCodec::codecForName("UTF-8"));
389#else
390 tsMsg.setEncoding(QStringConverter::Utf8);
391#endif
392
393 // ----------------------------------------------------------------------
394 // Build Message Header
395 // ----------------------------------------------------------------------
396
397 int nPort = (pInfo->m_qURL.port()>=0) ? pInfo->m_qURL.port() : 80;
398 QString sHost = QString( "%1:%2" ).arg( pInfo->m_qURL.host() )
399 .arg( nPort );
400
401 tsMsg << "NOTIFY " << pInfo->m_qURL.path() << " HTTP/1.1\r\n";
402 tsMsg << "HOST: " << sHost << "\r\n";
403 tsMsg << "CONTENT-TYPE: \"text/xml\"\r\n";
404 tsMsg << "Content-Length: " << QString::number( aBody.size() ) << "\r\n";
405 tsMsg << "NT: upnp:event\r\n";
406 tsMsg << "NTS: upnp:propchange\r\n";
407 tsMsg << "SID: uuid:" << pInfo->m_sUUID << "\r\n";
408 tsMsg << "SEQ: " << QString::number( pInfo->m_nKey ) << "\r\n";
409 tsMsg << "\r\n";
410 tsMsg << aBody;
411 tsMsg << Qt::flush;
412
413 // ------------------------------------------------------------------
414 // Add new EventTask to the TaskQueue to do the actual sending.
415 // ------------------------------------------------------------------
416
417 LOG(VB_UPNP, LOG_INFO,
418 QString("UPnp::Eventing::NotifySubscriber( %1 ) : %2 Variables")
419 .arg( sHost ).arg(nCount));
420
421 auto *pEventTask = new UPnpEventTask(QHostAddress(pInfo->m_qURL.host()),
422 nPort, pBuffer);
423
424 TaskQueue::Instance()->AddTask( 250ms, pEventTask );
425
426 pEventTask->DecrRef();
427
428 // ------------------------------------------------------------------
429 // Update the subscribers Key & last Notified fields
430 // ------------------------------------------------------------------
431
432 pInfo->IncrementKey();
433
434 pInfo->m_ttLastNotified = nowAsDuration<std::chrono::microseconds>();
435 }
436}
437
short m_nHoldCount
Definition: eventing.h:260
void HandleUnsubscribe(HTTPRequest *pRequest)
Definition: eventing.cpp:294
QString m_sEventMethodName
Definition: eventing.h:255
~Eventing() override
Definition: eventing.cpp:81
short ReleaseEvents()
Definition: eventing.cpp:117
short HoldEvents()
Definition: eventing.cpp:92
void HandleSubscribe(HTTPRequest *pRequest)
Definition: eventing.cpp:195
QMutex m_mutex
Definition: eventing.h:253
bool ProcessRequest(HTTPRequest *pRequest) override
Definition: eventing.cpp:147
void NotifySubscriber(SubscriberInfo *pInfo)
Definition: eventing.cpp:360
QStringList GetBasePaths() override
Definition: eventing.cpp:135
void Notify() override
Definition: eventing.cpp:324
void ExecutePostProcess() override
Definition: eventing.cpp:178
Eventing(const QString &sExtensionName, QString sEventMethodName, const QString &sSharePath)
Definition: eventing.cpp:66
SubscriberInfo * m_pInitializeSubscriber
Definition: eventing.h:262
Subscribers m_subscribers
Definition: eventing.h:256
std::chrono::seconds m_nSubscriptionDuration
Definition: eventing.h:258
HttpResponseType m_eResponseType
Definition: httprequest.h:150
long m_nResponseStatus
Definition: httprequest.h:153
QString m_sMethod
Definition: httprequest.h:130
QString GetRequestHeader(const QString &sKey, const QString &sDefault)
IPostProcess * m_pPostProcess
Definition: httprequest.h:160
HttpRequestType m_eType
Definition: httprequest.h:121
QStringMap m_mapRespHeaders
Definition: httprequest.h:154
QString m_sBaseUrl
Definition: httprequest.h:128
uint BuildNotifyBody(QTextStream &ts, std::chrono::microseconds ttLastNotified) const
Definition: eventing.cpp:34
std::chrono::microseconds m_ttLastNotified
Definition: eventing.h:64
unsigned long IncrementKey()
Definition: eventing.h:54
QString m_sUUID
Definition: eventing.h:66
unsigned short m_nKey
Definition: eventing.h:68
std::chrono::seconds m_nDuration
Definition: eventing.h:69
void AddTask(std::chrono::milliseconds msec, Task *pTask)
Add a task to run in the future.
Definition: taskqueue.cpp:168
static TaskQueue * Instance()
Definition: taskqueue.cpp:55
static void FormatErrorResponse(HTTPRequest *pRequest, UPnPResultCode eCode, const QString &sMsg="")
Definition: upnp.cpp:242
unsigned int uint
Definition: freesurround.h:24
@ ResponseTypeXML
Definition: httprequest.h:79
@ RequestTypeSubscribe
Definition: httprequest.h:59
@ RequestTypeUnsubscribe
Definition: httprequest.h:60
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
Definition: mythlogging.h:39
STL namespace.
@ UPnPResult_InvalidAction
Definition: upnp.h:37