MythTV  0.27pre
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Groups Pages
upnpsubscription.cpp
Go to the documentation of this file.
1 /*
2 An HttpServer Extension that manages subscriptions to UPnP services.
3 
4 An object wishing to subscribe to a service needs to register as a listener for
5 events and subscribe using a valid usn and subscription path. The subscriber
6 is responsible for requesting a renewal before the subscription expires,
7 removing any stale subscriptions, unsubsubscribing on exit and must re-implement
8 QObject::customEvent to receive event notifications for subscribed services.
9 */
10 
11 #include <QTextCodec>
12 
13 #include "mythcorecontext.h"
14 #include "mythlogging.h"
15 #include "upnpsubscription.h"
16 
17 // default requested time for subscription (actual is dictated by server)
18 #define SUBSCRIPTION_TIME 1800
19 // maximum time to wait for responses to subscription requests (UPnP spec. 30s)
20 #define MAX_WAIT 30000
21 
22 #define LOC QString("UPnPSub: ")
23 
25 {
26  public:
27  Subscription(QUrl url, QString path)
28  : m_url(url), m_path(path), m_uuid(QString()) { }
29  QUrl m_url;
30  QString m_path;
31  QString m_uuid;
32 };
33 
34 UPNPSubscription::UPNPSubscription(const QString &share_path, int port)
35  : HttpServerExtension("UPnPSubscriptionManager", share_path),
36  m_subscriptionLock(QMutex::Recursive), m_callback(QString("NOTSET"))
37 {
38  QString host;
39  if (!UPnp::g_IPAddrList.isEmpty())
40  host = UPnp::g_IPAddrList.at(0);
41 
42  // taken from MythCoreContext
43 #if !defined(QT_NO_IPV6)
44  QHostAddress addr(host);
45  if ((addr.protocol() == QAbstractSocket::IPv6Protocol) ||
46  (host.contains(":")))
47  host = "[" + host + "]";
48 #endif
49 
50  m_callback = QString("http://%1:%2/Subscriptions/event?usn=")
51  .arg(host).arg(QString::number(port));
52 }
53 
55 {
56  m_subscriptionLock.lock();
57  QList<QString> usns = m_subscriptions.keys();
58  while (!usns.isEmpty())
59  Unsubscribe(usns.takeLast());
60  m_subscriptions.clear();
61  m_subscriptionLock.unlock();
62 
63  LOG(VB_UPNP, LOG_DEBUG, LOC + "Finished");
64 }
65 
66 int UPNPSubscription::Subscribe(const QString &usn, const QUrl &url,
67  const QString &path)
68 {
69  LOG(VB_UPNP, LOG_DEBUG, LOC + QString("Subscribe %1 %2 %3")
70  .arg(usn).arg(url.toString()).arg(path));
71 
72  // N.B. this is called from the client object's thread. Hence we have to
73  // lock until the subscription request has returned, otherwise we may
74  // receive the first event notification (in the HttpServer thread)
75  // before the subscription is processed and the event will fail
76 
77  QMutexLocker lock(&m_subscriptionLock);
78  if (m_subscriptions.contains(usn))
79  {
80  if (m_subscriptions[usn]->m_url != url ||
81  m_subscriptions[usn]->m_path != path)
82  {
83  LOG(VB_GENERAL, LOG_WARNING, LOC +
84  "Re-subscribing with different url and path.");
85  m_subscriptions[usn]->m_url = url;
86  m_subscriptions[usn]->m_path = path;
87  m_subscriptions[usn]->m_uuid = QString();
88  }
89  }
90  else
91  {
92  m_subscriptions.insert(usn, new Subscription(url, path));
93  }
94 
95  return SendSubscribeRequest(m_callback, usn, url, path, QString(),
96  m_subscriptions[usn]->m_uuid);
97 }
98 
99 void UPNPSubscription::Unsubscribe(const QString &usn)
100 {
101  QUrl url;
102  QString path;
103  QString uuid = QString();
104  m_subscriptionLock.lock();
105  if (m_subscriptions.contains(usn))
106  {
107  url = m_subscriptions[usn]->m_url;
108  path = m_subscriptions[usn]->m_path;
109  uuid = m_subscriptions[usn]->m_uuid;
110  delete m_subscriptions.value(usn);
111  m_subscriptions.remove(usn);
112  }
113  m_subscriptionLock.unlock();
114 
115  if (!uuid.isEmpty())
116  SendUnsubscribeRequest(usn, url, path, uuid);
117 }
118 
119 int UPNPSubscription::Renew(const QString &usn)
120 {
121  LOG(VB_UPNP, LOG_DEBUG, LOC + QString("Renew: %1").arg(usn));
122 
123  QUrl url;
124  QString path;
125  QString sid;
126 
127  // see locking comment in Subscribe
128  QMutexLocker lock(&m_subscriptionLock);
129  if (m_subscriptions.contains(usn))
130  {
131  url = m_subscriptions[usn]->m_url;
132  path = m_subscriptions[usn]->m_path;
133  sid = m_subscriptions[usn]->m_uuid;
134  }
135  else
136  {
137  LOG(VB_UPNP, LOG_ERR, LOC + QString("Unrecognised renewal usn: %1")
138  .arg(usn));
139  return 0;
140  }
141 
142  if (!sid.isEmpty())
143  {
144  return SendSubscribeRequest(m_callback, usn, url, path, sid,
145  m_subscriptions[usn]->m_uuid);
146  }
147 
148  LOG(VB_UPNP, LOG_ERR, LOC + QString("No uuid - not renewing usn: %1")
149  .arg(usn));
150  return 0;
151 }
152 
153 void UPNPSubscription::Remove(const QString &usn)
154 {
155  // this could be handled by hooking directly into the SSDPCache updates
156  // but the subscribing object will also be doing so. Having the that
157  // object initiate the removal avoids temoporary race conditions during
158  // periods of UPnP/SSDP activity
159  m_subscriptionLock.lock();
160  if (m_subscriptions.contains(usn))
161  {
162  LOG(VB_UPNP, LOG_INFO, LOC + QString("Removing %1").arg(usn));
163  delete m_subscriptions.value(usn);
164  m_subscriptions.remove(usn);
165  }
166  m_subscriptionLock.unlock();
167 }
168 
170 {
171  if (!pRequest)
172  return false;
173 
174  if (pRequest->m_sBaseUrl != "/Subscriptions")
175  return false;
176  if (pRequest->m_sMethod != "event")
177  return false;
178 
179  LOG(VB_UPNP, LOG_DEBUG, LOC + QString("%1\n%2")
180  .arg(pRequest->m_sRawRequest).arg(pRequest->m_sPayload));
181 
182  if (pRequest->m_sPayload.isEmpty())
183  return true;
184 
185  pRequest->m_eResponseType = ResponseTypeHTML;
186 
187  QString nt = pRequest->m_mapHeaders["nt"];
188  QString nts = pRequest->m_mapHeaders["nts"];
189  bool no = pRequest->m_sRawRequest.startsWith("NOTIFY");
190 
191  if (nt.isEmpty() || nts.isEmpty() || !no)
192  {
193  pRequest->m_nResponseStatus = 400;
194  return true;
195  }
196 
197  pRequest->m_nResponseStatus = 412;
198  if (nt != "upnp:event" || nts != "upnp:propchange")
199  return true;
200 
201  QString usn = pRequest->m_mapParams["usn"];
202  QString sid = pRequest->m_mapHeaders["sid"];
203  if (usn.isEmpty() || sid.isEmpty())
204  return true;
205 
206  // N.B. Validating the usn and uuid here might mean blocking for some time
207  // while waiting for a subscription to complete. While this operates in a
208  // worker thread, worker threads are a limited resource which we could
209  // rapidly overload if a number of events arrive. Instead let the
210  // subscribing objects validate the usn - the uuid should be superfluous.
211 
212  QString seq = pRequest->m_mapHeaders["seq"];
213 
214  // mediatomb sends some extra character(s) at the end of the payload
215  // which throw Qt, so try and trim them off
216  int loc = pRequest->m_sPayload.lastIndexOf("propertyset>");
217  QString payload = (loc > -1) ? pRequest->m_sPayload.left(loc + 12) :
218  pRequest->m_sPayload;
219 
220  LOG(VB_UPNP, LOG_DEBUG, LOC + QString("Payload:\n%1").arg(payload));
221 
222  pRequest->m_nResponseStatus = 400;
223  QDomDocument body;
224  QString error;
225  int errorCol = 0;
226  int errorLine = 0;
227  if (!body.setContent(payload, true, &error, &errorLine, &errorCol))
228  {
229  LOG(VB_GENERAL, LOG_ERR, LOC +
230  QString("Failed to parse event: Line: %1 Col: %2 Error: '%3'")
231  .arg(errorLine).arg(errorCol).arg(error));
232  return true;
233  }
234 
235  LOG(VB_UPNP, LOG_DEBUG, LOC + "/n/n" + body.toString(4) + "/n/n");
236 
237  QDomNodeList properties = body.elementsByTagName("property");
238  QHash<QString,QString> results;
239 
240  // this deals with both one argument per property (compliant) and mutliple
241  // arguments per property as sent by mediatomb
242  for (int i = 0; i < properties.size(); i++)
243  {
244  QDomNodeList arguments = properties.at(i).childNodes();
245  for (int j = 0; j < arguments.size(); j++)
246  {
247  QDomElement e = arguments.at(j).toElement();
248  if (!e.isNull() && !e.text().isEmpty() && !e.tagName().isEmpty())
249  results.insert(e.tagName(), e.text());
250  }
251  }
252 
253  // using MythObservable allows multiple objects to subscribe to the same
254  // service but is less efficient from an eventing perspective, especially
255  // if multiple objects are subscribing
256  if (!results.isEmpty())
257  {
258  pRequest->m_nResponseStatus = 200;
259  results.insert("usn", usn);
260  results.insert("seq", seq);
261  MythInfoMapEvent me("UPNP_EVENT", results);
262  dispatch(me);
263  }
264 
265  return true;
266 }
267 
269  const QUrl &url,
270  const QString &path,
271  const QString &uuid)
272 {
273  bool success = false;
274  QString host = url.host();
275  int port = url.port();
276 
277  QByteArray sub;
278  QTextStream data(&sub);
279  data.setCodec(QTextCodec::codecForName("UTF-8"));
280  // N.B. Play On needs an extra space between UNSUBSCRIBE and path...
281  data << QString("UNSUBSCRIBE %1 HTTP/1.1\r\n").arg(path);
282  data << QString("HOST: %1:%2\r\n").arg(host).arg(QString::number(port));
283  data << QString("SID: uuid:%1\r\n").arg(uuid);
284  data << "\r\n";
285  data.flush();
286 
287  LOG(VB_UPNP, LOG_DEBUG, LOC + "\n\n" + sub);
288 
290  BufferedSocketDevice *sock = new BufferedSocketDevice(sockdev);
291  sockdev->setBlocking(true);
292 
293  if (sock->Connect(QHostAddress(host), port))
294  {
295  if (sock->WriteBlockDirect(sub.constData(), sub.size()) != -1)
296  {
297  QString line = sock->ReadLine(MAX_WAIT);
298  success = !line.isEmpty();
299  }
300  else
301  {
302  LOG(VB_GENERAL, LOG_ERR, LOC +
303  QString("Socket write error for %1:%2") .arg(host).arg(port));
304  }
305  sock->Close();
306  }
307  else
308  {
309  LOG(VB_GENERAL, LOG_ERR, LOC +
310  QString("Failed to open socket for %1:%2") .arg(host).arg(port));
311  }
312 
313  delete sock;
314  delete sockdev;
315  if (success)
316  LOG(VB_GENERAL, LOG_INFO, LOC + QString("Unsubscribed to %1").arg(usn));
317  else
318  LOG(VB_UPNP, LOG_WARNING, LOC + QString("Failed to unsubscribe to %1")
319  .arg(usn));
320  return success;
321 }
322 
323 int UPNPSubscription::SendSubscribeRequest(const QString &callback,
324  const QString &usn,
325  const QUrl &url,
326  const QString &path,
327  const QString &uuidin,
328  QString &uuidout)
329 {
330  QString host = url.host();
331  int port = url.port();
332 
333  QByteArray sub;
334  QTextStream data(&sub);
335  data.setCodec(QTextCodec::codecForName("UTF-8"));
336  // N.B. Play On needs an extra space between SUBSCRIBE and path...
337  data << QString("SUBSCRIBE %1 HTTP/1.1\r\n").arg(path);
338  data << QString("HOST: %1:%2\r\n").arg(host).arg(QString::number(port));
339 
340 
341  if (uuidin.isEmpty()) // new subscription
342  {
343  data << QString("CALLBACK: <%1%2>\r\n")
344  .arg(callback).arg(usn);
345  data << "NT: upnp:event\r\n";
346  }
347  else // renewal
348  data << QString("SID: uuid:%1\r\n").arg(uuidin);
349 
350  data << QString("TIMEOUT: Second-%1\r\n").arg(SUBSCRIPTION_TIME);
351  data << "\r\n";
352  data.flush();
353 
354  LOG(VB_UPNP, LOG_DEBUG, LOC + "\n\n" + sub);
355 
357  BufferedSocketDevice *sock = new BufferedSocketDevice(sockdev);
358  sockdev->setBlocking(true);
359 
360  QString uuid;
361  QString timeout;
362  uint result = 0;
363 
364  if (sock->Connect(QHostAddress(host), port))
365  {
366  if (sock->WriteBlockDirect(sub.constData(), sub.size()) != -1)
367  {
368  bool ok = false;
369  QString line = sock->ReadLine(MAX_WAIT);
370  while (!line.isEmpty())
371  {
372  LOG(VB_UPNP, LOG_DEBUG, LOC + line);
373  if (line.contains("HTTP/1.1 200 OK", Qt::CaseInsensitive))
374  ok = true;
375  if (line.startsWith("SID:", Qt::CaseInsensitive))
376  uuid = line.mid(4).trimmed().mid(5).trimmed();
377  if (line.startsWith("TIMEOUT:", Qt::CaseInsensitive))
378  timeout = line.mid(8).trimmed().mid(7).trimmed();
379  if (ok && !uuid.isEmpty() && !timeout.isEmpty())
380  break;
381  line = sock->ReadLine(MAX_WAIT);
382  }
383 
384  if (ok && !uuid.isEmpty() && !timeout.isEmpty())
385  {
386  uuidout = uuid;
387  result = timeout.toUInt();
388  }
389  else
390  {
391  LOG(VB_GENERAL, LOG_ERR, LOC +
392  QString("Failed to subscribe to %1").arg(usn));
393  }
394  }
395  else
396  {
397  LOG(VB_GENERAL, LOG_ERR, LOC +
398  QString("Socket write error for %1:%2") .arg(host).arg(port));
399  }
400  sock->Close();
401  }
402  else
403  {
404  LOG(VB_GENERAL, LOG_ERR, LOC +
405  QString("Failed to open socket for %1:%2") .arg(host).arg(port));
406  }
407 
408  delete sock;
409  delete sockdev;
410  return result;
411 }