MythTV  master
iptvstreamhandler.cpp
Go to the documentation of this file.
1 // -*- Mode: c++ -*-
2 
3 // System headers
4 #ifdef _WIN32
5 # include <ws2tcpip.h>
6 #else
7 # include <sys/types.h>
8 # include <sys/socket.h>
9 # include <netinet/in.h>
10 # include <netinet/ip.h>
11 #endif
12 
13 // Qt headers
14 #include <QUdpSocket>
15 #include <QByteArray>
16 #include <QHostInfo>
17 
18 // MythTV headers
19 #include "iptvstreamhandler.h"
20 #include "rtppacketbuffer.h"
21 #include "udppacketbuffer.h"
22 #include "rtptsdatapacket.h"
23 #include "rtpdatapacket.h"
24 #include "rtpfecpacket.h"
25 #include "rtcpdatapacket.h"
26 #include "mythlogging.h"
27 #include "cetonrtsp.h"
28 
29 #define LOC QString("IPTVSH[%1](%2): ").arg(m_inputid).arg(m_device)
30 
31 QMap<QString,IPTVStreamHandler*> IPTVStreamHandler::s_iptvhandlers;
34 
36  int inputid)
37 {
38  QMutexLocker locker(&s_iptvhandlers_lock);
39 
40  QString devkey = tuning.GetDeviceKey();
41 
42  QMap<QString,IPTVStreamHandler*>::iterator it = s_iptvhandlers.find(devkey);
43 
44  if (it == s_iptvhandlers.end())
45  {
46  IPTVStreamHandler *newhandler = new IPTVStreamHandler(tuning, inputid);
47  newhandler->Start();
48  s_iptvhandlers[devkey] = newhandler;
49  s_iptvhandlers_refcnt[devkey] = 1;
50 
51  LOG(VB_RECORD, LOG_INFO,
52  QString("IPTVSH[%1]: Creating new stream handler %2 for %3")
53  .arg(inputid).arg(devkey).arg(tuning.GetDeviceName()));
54  }
55  else
56  {
57  s_iptvhandlers_refcnt[devkey]++;
58  uint rcount = s_iptvhandlers_refcnt[devkey];
59  LOG(VB_RECORD, LOG_INFO,
60  QString("IPTVSH[%1]: Using existing stream handler %2 for %3")
61  .arg(inputid).arg(devkey).arg(tuning.GetDeviceName()) +
62  QString(" (%1 in use)").arg(rcount));
63  }
64 
65  return s_iptvhandlers[devkey];
66 }
67 
68 void IPTVStreamHandler::Return(IPTVStreamHandler * & ref, int inputid)
69 {
70  QMutexLocker locker(&s_iptvhandlers_lock);
71 
72  QString devname = ref->m_device;
73 
74  QMap<QString,uint>::iterator rit = s_iptvhandlers_refcnt.find(devname);
75  if (rit == s_iptvhandlers_refcnt.end())
76  return;
77 
78  LOG(VB_RECORD, LOG_INFO, QString("IPTVSH[%1]: Return(%2) has %3 handlers")
79  .arg(inputid).arg(devname).arg(*rit));
80 
81  if (*rit > 1)
82  {
83  ref = nullptr;
84  (*rit)--;
85  return;
86  }
87 
88  QMap<QString,IPTVStreamHandler*>::iterator it = s_iptvhandlers.find(devname);
89  if ((it != s_iptvhandlers.end()) && (*it == ref))
90  {
91  LOG(VB_RECORD, LOG_INFO, QString("IPTVSH[%1]: Closing handler for %2")
92  .arg(inputid).arg(devname));
93  ref->Stop();
94  delete *it;
95  s_iptvhandlers.erase(it);
96  }
97  else
98  {
99  LOG(VB_GENERAL, LOG_ERR,
100  QString("IPTVSH[%1] Error: Couldn't find handler for %2")
101  .arg(inputid).arg(devname));
102  }
103 
104  s_iptvhandlers_refcnt.erase(rit);
105  ref = nullptr;
106 }
107 
109  : StreamHandler(tuning.GetDeviceKey(), inputid)
110  , m_tuning(tuning)
111 {
113 }
114 
116 {
117  RunProlog();
118 
119  LOG(VB_GENERAL, LOG_INFO, LOC + "run()");
120 
121  SetRunning(true, false, false);
122 
123  // TODO Error handling..
124 
125  // Setup
126  CetonRTSP *rtsp = nullptr;
127  IPTVTuningData tuning = m_tuning;
128  if(m_tuning.IsRTSP())
129  {
130  rtsp = new CetonRTSP(m_tuning.GetURL(0));
131 
132  // Check RTSP capabilities
133  QStringList options;
134  if (!(rtsp->GetOptions(options) && options.contains("DESCRIBE") &&
135  options.contains("SETUP") && options.contains("PLAY") &&
136  options.contains("TEARDOWN")))
137  {
138  LOG(VB_RECORD, LOG_ERR, LOC +
139  "RTSP interface did not support the necessary options");
140  delete rtsp;
141  SetRunning(false, false, false);
142  RunEpilog();
143  return;
144  }
145 
146  if (!rtsp->Describe())
147  {
148  LOG(VB_RECORD, LOG_ERR, LOC +
149  "RTSP Describe command failed");
150  delete rtsp;
151  SetRunning(false, false, false);
152  RunEpilog();
153  return;
154  }
155 
156  m_use_rtp_streaming = true;
157 
158  QUrl urltuned = m_tuning.GetURL(0);
159  urltuned.setScheme("rtp");
160  urltuned.setPort(0);
161  tuning = IPTVTuningData(urltuned.toString(), 0, IPTVTuningData::kNone,
162  urltuned.toString(), 0, "", 0);
163  }
164 
165  bool error = false;
166 
167  int start_port = 0;
168  for (uint i = 0; i < IPTV_SOCKET_COUNT; i++)
169  {
170  QUrl url = tuning.GetURL(i);
171  if (url.port() < 0)
172  continue;
173 
174  LOG(VB_RECORD, LOG_DEBUG, LOC +
175  QString("setting up url[%1]:%2").arg(i).arg(url.toString()));
176 
177  // always ensure we use consecutive port numbers
178  int port = start_port ? start_port + 1 : url.port();
179  QString host = url.host();
180  QHostAddress dest_addr(host);
181 
182  if (!host.isEmpty() && dest_addr.isNull())
183  {
184  // address is a hostname, attempts to resolve it
185  QHostInfo info = QHostInfo::fromName(host);
186  QList<QHostAddress> list = info.addresses();
187 
188  if (list.isEmpty())
189  {
190  LOG(VB_RECORD, LOG_ERR, LOC +
191  QString("Can't resolve hostname:'%1'").arg(host));
192  }
193  else
194  {
195  for (int j=0; j < list.size(); j++)
196  {
197  dest_addr = list[j];
198  if (list[j].protocol() == QAbstractSocket::IPv6Protocol)
199  {
200  // We prefer first IPv4
201  break;
202  }
203  }
204  LOG(VB_RECORD, LOG_DEBUG, LOC +
205  QString("resolved %1 as %2").arg(host).arg(dest_addr.toString()));
206  }
207  }
208  bool ipv6 = dest_addr.protocol() == QAbstractSocket::IPv6Protocol;
209  bool is_multicast = ipv6 ?
210  dest_addr.isInSubnet(QHostAddress::parseSubnet("ff00::/8")) :
211  (dest_addr.toIPv4Address() & 0xf0000000) == 0xe0000000;
212 
213  m_sockets[i] = new QUdpSocket();
214  if (!is_multicast)
215  {
216  // this allow to filter incoming traffic, and make sure it's from
217  // the requested server
218  m_sender[i] = dest_addr;
219  }
221  this, m_sockets[i], i);
222 
223  // we need to open the descriptor ourselves so we
224  // can set some socket options
225  int fd = socket(ipv6 ? AF_INET6 : AF_INET, SOCK_DGRAM, 0); // create IPv4 socket
226  if (fd < 0)
227  {
228  LOG(VB_GENERAL, LOG_ERR, LOC +
229  "Unable to create socket " + ENO);
230  continue;
231  }
232  int buf_size = 2 * 1024 * max(tuning.GetBitrate(i)/1000, 500U);
233  if (!tuning.GetBitrate(i))
234  buf_size = 2 * 1024 * 1024;
235  int err = setsockopt(fd, SOL_SOCKET, SO_RCVBUF,
236  (char *)&buf_size, sizeof(buf_size));
237  if (err)
238  {
239  LOG(VB_GENERAL, LOG_INFO, LOC +
240  QString("Increasing buffer size to %1 failed")
241  .arg(buf_size) + ENO);
242  }
243 
244  m_sockets[i]->setSocketDescriptor(
245  fd, QAbstractSocket::UnconnectedState, QIODevice::ReadOnly);
246 
247  // we bind to destination address if it's a multicast address, or
248  // the local ones otherwise
249  if (!m_sockets[i]->bind(is_multicast ?
250  dest_addr :
251  (ipv6 ? QHostAddress::AnyIPv6 : QHostAddress::Any),
252  port))
253  {
254  LOG(VB_GENERAL, LOG_ERR, LOC + "Binding to port failed.");
255  error = true;
256  }
257  else
258  {
259  start_port = m_sockets[i]->localPort();
260  }
261 
262  if (is_multicast)
263  {
264  m_sockets[i]->joinMulticastGroup(dest_addr);
265  LOG(VB_GENERAL, LOG_INFO, LOC + QString("Joining %1")
266  .arg(dest_addr.toString()));
267  }
268 
269  if (!is_multicast && rtsp && i == 1)
270  {
271  m_rtcp_dest = dest_addr;
272  }
273  }
274 
275  if (!error)
276  {
277  if (m_tuning.IsRTP() || m_tuning.IsRTSP())
278  m_buffer = new RTPPacketBuffer(tuning.GetBitrate(0));
279  else
280  m_buffer = new UDPPacketBuffer(tuning.GetBitrate(0));
284  }
285 
286  if (!error && rtsp)
287  {
288  // Start Streaming
289  if (!rtsp->Setup(m_sockets[0]->localPort(), m_sockets[1]->localPort(),
291  !rtsp->Play())
292  {
293  LOG(VB_RECORD, LOG_ERR, LOC +
294  "Starting recording (RTP initialization failed). Aborting.");
295  error = true;
296  }
297  if (m_rtsp_rtcp_port > 0)
298  {
301  }
302  }
303 
304  if (!error)
305  {
306  // Enter event loop
307  exec();
308  }
309 
310  // Clean up
311  for (uint i = 0; i < IPTV_SOCKET_COUNT; i++)
312  {
313  if (m_sockets[i])
314  {
315  delete m_sockets[i];
316  m_sockets[i] = nullptr;
317  delete m_read_helpers[i];
318  m_read_helpers[i] = nullptr;
319  }
320  }
321  delete m_buffer;
322  m_buffer = nullptr;
323  delete m_write_helper;
324  m_write_helper = nullptr;
325 
326  if (rtsp)
327  {
328  rtsp->Teardown();
329  delete rtsp;
330  }
331 
332  SetRunning(false, false, false);
333  RunEpilog();
334 }
335 
337  IPTVStreamHandler *p, QUdpSocket *s, uint stream) :
338  m_parent(p), m_socket(s), m_sender(p->m_sender[stream]),
339  m_stream(stream)
340 {
341  connect(m_socket, SIGNAL(readyRead()),
342  this, SLOT(ReadPending()));
343 }
344 
345 #define LOC_WH QString("IPTVSH(%1): ").arg(m_parent->m_device)
346 
348 {
349  QHostAddress sender;
350  quint16 senderPort;
351  bool sender_null = m_sender.isNull();
352 
353  if (0 == m_stream)
354  {
355  while (m_socket->hasPendingDatagrams())
356  {
358  QByteArray &data = packet.GetDataReference();
359  data.resize(m_socket->pendingDatagramSize());
360  m_socket->readDatagram(data.data(), data.size(),
361  &sender, &senderPort);
362  if (sender_null || sender == m_sender)
363  {
364  m_parent->m_buffer->PushDataPacket(packet);
365  }
366  else
367  {
368  LOG(VB_RECORD, LOG_WARNING, LOC_WH +
369  QString("Received on socket(%1) %2 bytes from non expected "
370  "sender:%3 (expected:%4) ignoring")
371  .arg(m_stream).arg(data.size())
372  .arg(sender.toString()).arg(m_sender.toString()));
373  }
374  }
375  }
376  else
377  {
378  while (m_socket->hasPendingDatagrams())
379  {
381  QByteArray &data = packet.GetDataReference();
382  data.resize(m_socket->pendingDatagramSize());
383  m_socket->readDatagram(data.data(), data.size(),
384  &sender, &senderPort);
385  if (sender_null || sender == m_sender)
386  {
387  m_parent->m_buffer->PushFECPacket(packet, m_stream - 1);
388  }
389  else
390  {
391  LOG(VB_RECORD, LOG_WARNING, LOC_WH +
392  QString("Received on socket(%1) %2 bytes from non expected "
393  "sender:%3 (expected:%4) ignoring")
394  .arg(m_stream).arg(data.size())
395  .arg(sender.toString()).arg(m_sender.toString()));
396  }
397  }
398  }
399 }
400 
402 {
403  if (m_timer)
404  {
405  killTimer(m_timer);
406  }
407  if (m_timer_rtcp)
408  {
409  killTimer(m_timer_rtcp);
410  }
411  m_timer = 0;
412  m_timer_rtcp = 0;
413  m_parent = nullptr;
414 }
415 
417 {
418  if (event->timerId() == m_timer_rtcp)
419  {
420  SendRTCPReport();
421  return;
422  }
423 
425  return;
426 
427  while (!m_parent->m_use_rtp_streaming)
428  {
430 
431  if (packet.GetDataReference().isEmpty())
432  break;
433 
434  int remainder = 0;
435  {
436  QMutexLocker locker(&m_parent->m_listener_lock);
437  QByteArray &data = packet.GetDataReference();
438  IPTVStreamHandler::StreamDataList::const_iterator sit;
439  sit = m_parent->m_stream_data_list.begin();
440  for (; sit != m_parent->m_stream_data_list.end(); ++sit)
441  {
442  remainder = sit.key()->ProcessData(
443  reinterpret_cast<const unsigned char*>(data.data()),
444  data.size());
445  }
446  }
447 
448  if (remainder != 0)
449  {
450  LOG(VB_RECORD, LOG_INFO, LOC_WH +
451  QString("data_length = %1 remainder = %2")
452  .arg(packet.GetDataReference().size()).arg(remainder));
453  }
454 
455  m_parent->m_buffer->FreePacket(packet);
456  }
457 
459  {
461 
462  if (!packet.IsValid())
463  break;
464 
465  if (packet.GetPayloadType() == RTPDataPacket::kPayLoadTypeTS)
466  {
467  RTPTSDataPacket ts_packet(packet);
468 
469  if (!ts_packet.IsValid())
470  {
471  m_parent->m_buffer->FreePacket(packet);
472  continue;
473  }
474 
475  uint exp_seq_num = m_last_sequence_number + 1;
476  uint seq_num = ts_packet.GetSequenceNumber();
478  ((exp_seq_num&0xFFFF) != (seq_num&0xFFFF)))
479  {
480  LOG(VB_RECORD, LOG_INFO, LOC_WH +
481  QString("Sequence number mismatch %1!=%2")
482  .arg(seq_num).arg(exp_seq_num));
483  if (seq_num > exp_seq_num)
484  {
485  m_lost_interval = seq_num - exp_seq_num;
487  }
488  }
489  m_last_sequence_number = seq_num;
490  m_last_timestamp = ts_packet.GetTimeStamp();
491  LOG(VB_RECORD, LOG_DEBUG,
492  QString("Processing RTP packet(seq:%1 ts:%2)")
494 
495  m_parent->m_listener_lock.lock();
496 
497  int remainder = 0;
498  IPTVStreamHandler::StreamDataList::const_iterator sit;
499  sit = m_parent->m_stream_data_list.begin();
500  for (; sit != m_parent->m_stream_data_list.end(); ++sit)
501  {
502  remainder = sit.key()->ProcessData(
503  ts_packet.GetTSData(), ts_packet.GetTSDataSize());
504  }
505 
506  m_parent->m_listener_lock.unlock();
507 
508  if (remainder != 0)
509  {
510  LOG(VB_RECORD, LOG_INFO, LOC_WH +
511  QString("data_length = %1 remainder = %2")
512  .arg(ts_packet.GetTSDataSize()).arg(remainder));
513  }
514  }
515  m_parent->m_buffer->FreePacket(packet);
516  }
517 }
518 
520 {
521  if (m_parent->m_rtcp_dest.isNull())
522  {
523  // no point sending data if we don't know where to
524  return;
525  }
527  RTCPDataPacket rtcp =
531  QByteArray buf = rtcp.GetData();
532 
533  LOG(VB_RECORD, LOG_DEBUG, LOC_WH +
534  QString("Sending RTCPReport to %1:%2")
535  .arg(m_parent->m_rtcp_dest.toString())
536  .arg(m_parent->m_rtsp_rtcp_port));
537  m_parent->m_sockets[1]->writeDatagram(buf.constData(), buf.size(),
540 }
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:215
IPTVStreamHandler(const IPTVTuningData &tuning, int inputid)
uint GetTimeStamp(void) const
UDPPacket GetEmptyPacket(void)
Gets a packet for use in PushDataPacket/PushFECPacket.
QUrl GetURL(uint i) const
IPTVTuningData m_tuning
QByteArray GetData(void) const
UDP Packet.
Definition: udppacket.h:20
QUdpSocket * m_sockets[IPTV_SOCKET_COUNT]
RTP Transport Stream Data Packet.
#define LOC_WH
static void error(const char *str,...)
Definition: vbi.c:42
virtual void PushFECPacket(const UDPPacket &, unsigned int)=0
uint GetSequenceNumber(void) const
QString m_device
static QMap< QString, IPTVStreamHandler * > s_iptvhandlers
bool Setup(ushort clientPort1, ushort clientPort2, ushort &rtpPort, ushort &rtcpPort, uint32_t &ssrc)
Definition: cetonrtsp.cpp:393
QByteArray & GetDataReference(void)
Definition: udppacket.h:41
unsigned int uint
Definition: compat.h:140
QMutex m_listener_lock
virtual void PushDataPacket(const UDPPacket &)=0
bool HasAvailablePacket(void) const
Returns true if there are ordered packets ready for processing.
IPTVStreamHandler * m_parent
friend class IPTVStreamHandlerWriteHelper
IPTVStreamHandlerReadHelper * m_read_helpers[IPTV_SOCKET_COUNT]
bool Describe(void)
Definition: cetonrtsp.cpp:322
RTP Data Packet.
Definition: rtpdatapacket.h:30
bool Play(void)
Definition: cetonrtsp.cpp:449
#define IPTV_SOCKET_COUNT
void SetRunning(bool running, bool using_buffering, bool using_section_reader)
bool IsRTSP(void) const
RTCP Data Packet.
void Stop(void)
bool GetOptions(QStringList &options)
Definition: cetonrtsp.cpp:240
void Start(void)
static QMap< QString, uint > s_iptvhandlers_refcnt
friend class IPTVStreamHandlerReadHelper
uint GetBitrate(uint i) const
void timerEvent(QTimerEvent *) override
static IPTVStreamHandler * Get(const IPTVTuningData &tuning, int inputid)
IPTVStreamHandler * m_parent
#define ENO
This can be appended to the LOG args with "+".
Definition: mythlogging.h:99
#define LOC
QString GetDeviceName(void) const
bool Teardown(void)
Definition: cetonrtsp.cpp:457
QString GetDeviceKey(void) const
#define LOG(_MASK_, _LEVEL_, _STRING_)
Definition: mythlogging.h:41
PacketBuffer * m_buffer
StreamDataList m_stream_data_list
QHostAddress m_rtcp_dest
void FreePacket(const UDPPacket &)
Frees an RTPDataPacket returned by PopDataPacket.
const unsigned char * GetTSData(void) const
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:202
IPTVStreamHandlerWriteHelper * m_write_helper
UDPPacket PopDataPacket(void)
Fetches a data packet for processing.
QHostAddress m_sender[IPTV_SOCKET_COUNT]
IPTVStreamHandlerReadHelper(IPTVStreamHandler *p, QUdpSocket *s, uint stream)
unsigned int GetTSDataSize(void) const
bool IsValid(void) const override
IsValid() must return true before any data access methods are called, other than GetDataReference() a...
Definition: rtpdatapacket.h:45
int exec(void)
Enters the qt event loop. call exit or quit to exit thread.
Definition: mthread.cpp:328
static QMutex s_iptvhandlers_lock
bool IsRTP(void) const
static void Return(IPTVStreamHandler *&ref, int inputid)
#define RTCP_TIMER
void run(void) override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.