MythTV  master
iptvstreamhandler.cpp
Go to the documentation of this file.
1 // -*- Mode: c++ -*-
2 
3 // System headers
4 #ifdef _WIN32
5 # include <ws2tcpip.h>
6 #else
7 # include <sys/types.h>
8 # include <sys/socket.h>
9 # include <netinet/in.h>
10 # include <netinet/ip.h>
11 #endif
12 
13 // Qt headers
14 #include <QUdpSocket>
15 #include <QByteArray>
16 #include <QHostInfo>
17 
18 // MythTV headers
19 #include "iptvstreamhandler.h"
20 #include "rtppacketbuffer.h"
21 #include "udppacketbuffer.h"
22 #include "rtptsdatapacket.h"
23 #include "rtpdatapacket.h"
24 #include "rtpfecpacket.h"
25 #include "rtcpdatapacket.h"
26 #include "mythlogging.h"
27 #include "cetonrtsp.h"
28 
29 #define LOC QString("IPTVSH[%1](%2): ").arg(m_inputid).arg(m_device)
30 
31 QMap<QString,IPTVStreamHandler*> IPTVStreamHandler::s_iptvhandlers;
34 
36  int inputid)
37 {
38  QMutexLocker locker(&s_iptvhandlers_lock);
39 
40  QString devkey = tuning.GetDeviceKey();
41 
42  QMap<QString,IPTVStreamHandler*>::iterator it = s_iptvhandlers.find(devkey);
43 
44  if (it == s_iptvhandlers.end())
45  {
46  IPTVStreamHandler *newhandler = new IPTVStreamHandler(tuning, inputid);
47  newhandler->Start();
48  s_iptvhandlers[devkey] = newhandler;
49  s_iptvhandlers_refcnt[devkey] = 1;
50 
51  LOG(VB_RECORD, LOG_INFO,
52  QString("IPTVSH[%1]: Creating new stream handler %2 for %3")
53  .arg(inputid).arg(devkey).arg(tuning.GetDeviceName()));
54  }
55  else
56  {
57  s_iptvhandlers_refcnt[devkey]++;
58  uint rcount = s_iptvhandlers_refcnt[devkey];
59  LOG(VB_RECORD, LOG_INFO,
60  QString("IPTVSH[%1]: Using existing stream handler %2 for %3")
61  .arg(inputid).arg(devkey).arg(tuning.GetDeviceName()) +
62  QString(" (%1 in use)").arg(rcount));
63  }
64 
65  return s_iptvhandlers[devkey];
66 }
67 
68 void IPTVStreamHandler::Return(IPTVStreamHandler * & ref, int inputid)
69 {
70  QMutexLocker locker(&s_iptvhandlers_lock);
71 
72  QString devname = ref->m_device;
73 
74  QMap<QString,uint>::iterator rit = s_iptvhandlers_refcnt.find(devname);
75  if (rit == s_iptvhandlers_refcnt.end())
76  return;
77 
78  LOG(VB_RECORD, LOG_INFO, QString("IPTVSH[%1]: Return(%2) has %3 handlers")
79  .arg(inputid).arg(devname).arg(*rit));
80 
81  if (*rit > 1)
82  {
83  ref = nullptr;
84  (*rit)--;
85  return;
86  }
87 
88  QMap<QString,IPTVStreamHandler*>::iterator it = s_iptvhandlers.find(devname);
89  if ((it != s_iptvhandlers.end()) && (*it == ref))
90  {
91  LOG(VB_RECORD, LOG_INFO, QString("IPTVSH[%1]: Closing handler for %2")
92  .arg(inputid).arg(devname));
93  ref->Stop();
94  delete *it;
95  s_iptvhandlers.erase(it);
96  }
97  else
98  {
99  LOG(VB_GENERAL, LOG_ERR,
100  QString("IPTVSH[%1] Error: Couldn't find handler for %2")
101  .arg(inputid).arg(devname));
102  }
103 
104  s_iptvhandlers_refcnt.erase(rit);
105  ref = nullptr;
106 }
107 
109  : StreamHandler(tuning.GetDeviceKey(), inputid)
110  , m_tuning(tuning)
111 {
112  memset(m_sockets, 0, sizeof(m_sockets));
113  memset(m_read_helpers, 0, sizeof(m_read_helpers));
115 }
116 
118 {
119  RunProlog();
120 
121  LOG(VB_GENERAL, LOG_INFO, LOC + "run()");
122 
123  SetRunning(true, false, false);
124 
125  // TODO Error handling..
126 
127  // Setup
128  CetonRTSP *rtsp = nullptr;
129  IPTVTuningData tuning = m_tuning;
130  if(m_tuning.IsRTSP())
131  {
132  rtsp = new CetonRTSP(m_tuning.GetURL(0));
133 
134  // Check RTSP capabilities
135  QStringList options;
136  if (!(rtsp->GetOptions(options) && options.contains("DESCRIBE") &&
137  options.contains("SETUP") && options.contains("PLAY") &&
138  options.contains("TEARDOWN")))
139  {
140  LOG(VB_RECORD, LOG_ERR, LOC +
141  "RTSP interface did not support the necessary options");
142  delete rtsp;
143  SetRunning(false, false, false);
144  RunEpilog();
145  return;
146  }
147 
148  if (!rtsp->Describe())
149  {
150  LOG(VB_RECORD, LOG_ERR, LOC +
151  "RTSP Describe command failed");
152  delete rtsp;
153  SetRunning(false, false, false);
154  RunEpilog();
155  return;
156  }
157 
158  m_use_rtp_streaming = true;
159 
160  QUrl urltuned = m_tuning.GetURL(0);
161  urltuned.setScheme("rtp");
162  urltuned.setPort(0);
163  tuning = IPTVTuningData(urltuned.toString(), 0, IPTVTuningData::kNone,
164  urltuned.toString(), 0, "", 0);
165  }
166 
167  bool error = false;
168 
169  int start_port = 0;
170  for (uint i = 0; i < IPTV_SOCKET_COUNT; i++)
171  {
172  QUrl url = tuning.GetURL(i);
173  if (url.port() < 0)
174  continue;
175 
176  LOG(VB_RECORD, LOG_DEBUG, LOC +
177  QString("setting up url[%1]:%2").arg(i).arg(url.toString()));
178 
179  // always ensure we use consecutive port numbers
180  int port = start_port ? start_port + 1 : url.port();
181  QString host = url.host();
182  QHostAddress dest_addr(host);
183 
184  if (!host.isEmpty() && dest_addr.isNull())
185  {
186  // address is a hostname, attempts to resolve it
187  QHostInfo info = QHostInfo::fromName(host);
188  QList<QHostAddress> list = info.addresses();
189 
190  if (list.isEmpty())
191  {
192  LOG(VB_RECORD, LOG_ERR, LOC +
193  QString("Can't resolve hostname:'%1'").arg(host));
194  }
195  else
196  {
197  for (int j=0; j < list.size(); j++)
198  {
199  dest_addr = list[j];
200  if (list[j].protocol() == QAbstractSocket::IPv6Protocol)
201  {
202  // We prefer first IPv4
203  break;
204  }
205  }
206  LOG(VB_RECORD, LOG_DEBUG, LOC +
207  QString("resolved %1 as %2").arg(host).arg(dest_addr.toString()));
208  }
209  }
210  bool ipv6 = dest_addr.protocol() == QAbstractSocket::IPv6Protocol;
211  bool is_multicast = ipv6 ?
212  dest_addr.isInSubnet(QHostAddress::parseSubnet("ff00::/8")) :
213  (dest_addr.toIPv4Address() & 0xf0000000) == 0xe0000000;
214 
215  m_sockets[i] = new QUdpSocket();
216  if (!is_multicast)
217  {
218  // this allow to filter incoming traffic, and make sure it's from
219  // the requested server
220  m_sender[i] = dest_addr;
221  }
223  this, m_sockets[i], i);
224 
225  // we need to open the descriptor ourselves so we
226  // can set some socket options
227  int fd = socket(ipv6 ? AF_INET6 : AF_INET, SOCK_DGRAM, 0); // create IPv4 socket
228  if (fd < 0)
229  {
230  LOG(VB_GENERAL, LOG_ERR, LOC +
231  "Unable to create socket " + ENO);
232  continue;
233  }
234  int buf_size = 2 * 1024 * max(tuning.GetBitrate(i)/1000, 500U);
235  if (!tuning.GetBitrate(i))
236  buf_size = 2 * 1024 * 1024;
237  int err = setsockopt(fd, SOL_SOCKET, SO_RCVBUF,
238  (char *)&buf_size, sizeof(buf_size));
239  if (err)
240  {
241  LOG(VB_GENERAL, LOG_INFO, LOC +
242  QString("Increasing buffer size to %1 failed")
243  .arg(buf_size) + ENO);
244  }
245 
246  m_sockets[i]->setSocketDescriptor(
247  fd, QAbstractSocket::UnconnectedState, QIODevice::ReadOnly);
248 
249  // we bind to destination address if it's a multicast address, or
250  // the local ones otherwise
251  if (!m_sockets[i]->bind(is_multicast ?
252  dest_addr :
253  (ipv6 ? QHostAddress::AnyIPv6 : QHostAddress::Any),
254  port))
255  {
256  LOG(VB_GENERAL, LOG_ERR, LOC + "Binding to port failed.");
257  error = true;
258  }
259  else
260  {
261  start_port = m_sockets[i]->localPort();
262  }
263 
264  if (is_multicast)
265  {
266  m_sockets[i]->joinMulticastGroup(dest_addr);
267  LOG(VB_GENERAL, LOG_INFO, LOC + QString("Joining %1")
268  .arg(dest_addr.toString()));
269  }
270 
271  if (!is_multicast && rtsp && i == 1)
272  {
273  m_rtcp_dest = dest_addr;
274  }
275  }
276 
277  if (!error)
278  {
279  if (m_tuning.IsRTP() || m_tuning.IsRTSP())
280  m_buffer = new RTPPacketBuffer(tuning.GetBitrate(0));
281  else
282  m_buffer = new UDPPacketBuffer(tuning.GetBitrate(0));
286  }
287 
288  if (!error && rtsp)
289  {
290  // Start Streaming
291  if (!rtsp->Setup(m_sockets[0]->localPort(), m_sockets[1]->localPort(),
293  !rtsp->Play())
294  {
295  LOG(VB_RECORD, LOG_ERR, LOC +
296  "Starting recording (RTP initialization failed). Aborting.");
297  error = true;
298  }
299  if (m_rtsp_rtcp_port > 0)
300  {
303  }
304  }
305 
306  if (!error)
307  {
308  // Enter event loop
309  exec();
310  }
311 
312  // Clean up
313  for (uint i = 0; i < IPTV_SOCKET_COUNT; i++)
314  {
315  if (m_sockets[i])
316  {
317  delete m_sockets[i];
318  m_sockets[i] = nullptr;
319  delete m_read_helpers[i];
320  m_read_helpers[i] = nullptr;
321  }
322  }
323  delete m_buffer;
324  m_buffer = nullptr;
325  delete m_write_helper;
326  m_write_helper = nullptr;
327 
328  if (rtsp)
329  {
330  rtsp->Teardown();
331  delete rtsp;
332  }
333 
334  SetRunning(false, false, false);
335  RunEpilog();
336 }
337 
339  IPTVStreamHandler *p, QUdpSocket *s, uint stream) :
340  m_parent(p), m_socket(s), m_sender(p->m_sender[stream]),
341  m_stream(stream)
342 {
343  connect(m_socket, SIGNAL(readyRead()),
344  this, SLOT(ReadPending()));
345 }
346 
347 #define LOC_WH QString("IPTVSH(%1): ").arg(m_parent->m_device)
348 
350 {
351  QHostAddress sender;
352  quint16 senderPort;
353  bool sender_null = m_sender.isNull();
354 
355  if (0 == m_stream)
356  {
357  while (m_socket->hasPendingDatagrams())
358  {
360  QByteArray &data = packet.GetDataReference();
361  data.resize(m_socket->pendingDatagramSize());
362  m_socket->readDatagram(data.data(), data.size(),
363  &sender, &senderPort);
364  if (sender_null || sender == m_sender)
365  {
366  m_parent->m_buffer->PushDataPacket(packet);
367  }
368  else
369  {
370  LOG(VB_RECORD, LOG_WARNING, LOC_WH +
371  QString("Received on socket(%1) %2 bytes from non expected "
372  "sender:%3 (expected:%4) ignoring")
373  .arg(m_stream).arg(data.size())
374  .arg(sender.toString()).arg(m_sender.toString()));
375  }
376  }
377  }
378  else
379  {
380  while (m_socket->hasPendingDatagrams())
381  {
383  QByteArray &data = packet.GetDataReference();
384  data.resize(m_socket->pendingDatagramSize());
385  m_socket->readDatagram(data.data(), data.size(),
386  &sender, &senderPort);
387  if (sender_null || sender == m_sender)
388  {
389  m_parent->m_buffer->PushFECPacket(packet, m_stream - 1);
390  }
391  else
392  {
393  LOG(VB_RECORD, LOG_WARNING, LOC_WH +
394  QString("Received on socket(%1) %2 bytes from non expected "
395  "sender:%3 (expected:%4) ignoring")
396  .arg(m_stream).arg(data.size())
397  .arg(sender.toString()).arg(m_sender.toString()));
398  }
399  }
400  }
401 }
402 
404 {
405  if (m_timer)
406  {
407  killTimer(m_timer);
408  }
409  if (m_timer_rtcp)
410  {
411  killTimer(m_timer_rtcp);
412  }
413  m_timer = 0;
414  m_timer_rtcp = 0;
415  m_parent = nullptr;
416 }
417 
419 {
420  if (event->timerId() == m_timer_rtcp)
421  {
422  SendRTCPReport();
423  return;
424  }
425 
427  return;
428 
429  while (!m_parent->m_use_rtp_streaming)
430  {
432 
433  if (packet.GetDataReference().isEmpty())
434  break;
435 
436  int remainder = 0;
437  {
438  QMutexLocker locker(&m_parent->m_listener_lock);
439  QByteArray &data = packet.GetDataReference();
440  IPTVStreamHandler::StreamDataList::const_iterator sit;
441  sit = m_parent->m_stream_data_list.begin();
442  for (; sit != m_parent->m_stream_data_list.end(); ++sit)
443  {
444  remainder = sit.key()->ProcessData(
445  reinterpret_cast<const unsigned char*>(data.data()),
446  data.size());
447  }
448  }
449 
450  if (remainder != 0)
451  {
452  LOG(VB_RECORD, LOG_INFO, LOC_WH +
453  QString("data_length = %1 remainder = %2")
454  .arg(packet.GetDataReference().size()).arg(remainder));
455  }
456 
457  m_parent->m_buffer->FreePacket(packet);
458  }
459 
461  {
463 
464  if (!packet.IsValid())
465  break;
466 
467  if (packet.GetPayloadType() == RTPDataPacket::kPayLoadTypeTS)
468  {
469  RTPTSDataPacket ts_packet(packet);
470 
471  if (!ts_packet.IsValid())
472  {
473  m_parent->m_buffer->FreePacket(packet);
474  continue;
475  }
476 
477  uint exp_seq_num = m_last_sequence_number + 1;
478  uint seq_num = ts_packet.GetSequenceNumber();
480  ((exp_seq_num&0xFFFF) != (seq_num&0xFFFF)))
481  {
482  LOG(VB_RECORD, LOG_INFO, LOC_WH +
483  QString("Sequence number mismatch %1!=%2")
484  .arg(seq_num).arg(exp_seq_num));
485  if (seq_num > exp_seq_num)
486  {
487  m_lost_interval = seq_num - exp_seq_num;
489  }
490  }
491  m_last_sequence_number = seq_num;
492  m_last_timestamp = ts_packet.GetTimeStamp();
493  LOG(VB_RECORD, LOG_DEBUG,
494  QString("Processing RTP packet(seq:%1 ts:%2)")
496 
497  m_parent->m_listener_lock.lock();
498 
499  int remainder = 0;
500  IPTVStreamHandler::StreamDataList::const_iterator sit;
501  sit = m_parent->m_stream_data_list.begin();
502  for (; sit != m_parent->m_stream_data_list.end(); ++sit)
503  {
504  remainder = sit.key()->ProcessData(
505  ts_packet.GetTSData(), ts_packet.GetTSDataSize());
506  }
507 
508  m_parent->m_listener_lock.unlock();
509 
510  if (remainder != 0)
511  {
512  LOG(VB_RECORD, LOG_INFO, LOC_WH +
513  QString("data_length = %1 remainder = %2")
514  .arg(ts_packet.GetTSDataSize()).arg(remainder));
515  }
516  }
517  m_parent->m_buffer->FreePacket(packet);
518  }
519 }
520 
522 {
523  if (m_parent->m_rtcp_dest.isNull())
524  {
525  // no point sending data if we don't know where to
526  return;
527  }
529  RTCPDataPacket rtcp =
533  QByteArray buf = rtcp.GetData();
534 
535  LOG(VB_RECORD, LOG_DEBUG, LOC_WH +
536  QString("Sending RTCPReport to %1:%2")
537  .arg(m_parent->m_rtcp_dest.toString())
538  .arg(m_parent->m_rtsp_rtcp_port));
539  m_parent->m_sockets[1]->writeDatagram(buf.constData(), buf.size(),
542 }
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:215
IPTVStreamHandler(const IPTVTuningData &tuning, int inputid)
uint GetTimeStamp(void) const
UDPPacket GetEmptyPacket(void)
Gets a packet for use in PushDataPacket/PushFECPacket.
QUrl GetURL(uint i) const
IPTVTuningData m_tuning
QByteArray GetData(void) const
UDP Packet.
Definition: udppacket.h:20
QUdpSocket * m_sockets[IPTV_SOCKET_COUNT]
RTP Transport Stream Data Packet.
#define LOC_WH
static void error(const char *str,...)
Definition: vbi.c:42
virtual void PushFECPacket(const UDPPacket &, unsigned int)=0
uint GetSequenceNumber(void) const
QString m_device
static QMap< QString, IPTVStreamHandler * > s_iptvhandlers
bool Setup(ushort clientPort1, ushort clientPort2, ushort &rtpPort, ushort &rtcpPort, uint32_t &ssrc)
Definition: cetonrtsp.cpp:393
QByteArray & GetDataReference(void)
Definition: udppacket.h:41
unsigned int uint
Definition: compat.h:140
QMutex m_listener_lock
virtual void PushDataPacket(const UDPPacket &)=0
bool HasAvailablePacket(void) const
Returns true if there are ordered packets ready for processing.
IPTVStreamHandler * m_parent
friend class IPTVStreamHandlerWriteHelper
IPTVStreamHandlerReadHelper * m_read_helpers[IPTV_SOCKET_COUNT]
bool Describe(void)
Definition: cetonrtsp.cpp:322
RTP Data Packet.
Definition: rtpdatapacket.h:30
bool Play(void)
Definition: cetonrtsp.cpp:449
#define IPTV_SOCKET_COUNT
void SetRunning(bool running, bool using_buffering, bool using_section_reader)
bool IsRTSP(void) const
RTCP Data Packet.
void Stop(void)
bool GetOptions(QStringList &options)
Definition: cetonrtsp.cpp:240
void Start(void)
static QMap< QString, uint > s_iptvhandlers_refcnt
friend class IPTVStreamHandlerReadHelper
uint GetBitrate(uint i) const
void timerEvent(QTimerEvent *) override
static IPTVStreamHandler * Get(const IPTVTuningData &tuning, int inputid)
IPTVStreamHandler * m_parent
#define ENO
This can be appended to the LOG args with "+".
Definition: mythlogging.h:99
#define LOC
QString GetDeviceName(void) const
bool Teardown(void)
Definition: cetonrtsp.cpp:457
QString GetDeviceKey(void) const
#define LOG(_MASK_, _LEVEL_, _STRING_)
Definition: mythlogging.h:41
PacketBuffer * m_buffer
StreamDataList m_stream_data_list
QHostAddress m_rtcp_dest
void FreePacket(const UDPPacket &)
Frees an RTPDataPacket returned by PopDataPacket.
const unsigned char * GetTSData(void) const
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:202
IPTVStreamHandlerWriteHelper * m_write_helper
UDPPacket PopDataPacket(void)
Fetches a data packet for processing.
QHostAddress m_sender[IPTV_SOCKET_COUNT]
IPTVStreamHandlerReadHelper(IPTVStreamHandler *p, QUdpSocket *s, uint stream)
unsigned int GetTSDataSize(void) const
bool IsValid(void) const override
IsValid() must return true before any data access methods are called, other than GetDataReference() a...
Definition: rtpdatapacket.h:45
int exec(void)
Enters the qt event loop. call exit or quit to exit thread.
Definition: mthread.cpp:328
static QMutex s_iptvhandlers_lock
bool IsRTP(void) const
static void Return(IPTVStreamHandler *&ref, int inputid)
#define RTCP_TIMER
void run(void) override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.