MythTV master
iptvstreamhandler.cpp
Go to the documentation of this file.
1// -*- Mode: c++ -*-
2
3#include <QtGlobal>
4#if QT_VERSION >= QT_VERSION_CHECK(6,0,0)
5#include <QtSystemDetection>
6#endif
7
8// System headers
9#ifdef Q_OS_WINDOWS
10# include <ws2tcpip.h>
11#else
12# include <sys/types.h>
13# include <sys/socket.h>
14# include <netinet/in.h>
15# include <netinet/ip.h>
16#endif
17
18// Qt headers
19#include <QUdpSocket>
20#include <QByteArray>
21#include <QHostInfo>
22
23// MythTV headers
25
26#include "cetonrtsp.h"
27#include "iptvstreamhandler.h"
28#include "rtp/rtcpdatapacket.h"
29#include "rtp/rtpdatapacket.h"
30#include "rtp/rtpfecpacket.h"
31#include "rtp/rtppacketbuffer.h"
32#include "rtp/rtptsdatapacket.h"
33#include "rtp/udppacketbuffer.h"
34
35#define LOC QString("IPTVSH[%1](%2): ").arg(m_inputId).arg(m_device)
36
37QMap<QString,IPTVStreamHandler*> IPTVStreamHandler::s_iptvhandlers;
40
42 int inputid)
43{
44 QMutexLocker locker(&s_iptvhandlers_lock);
45
46 QString devkey = tuning.GetDeviceKey();
47
48 QMap<QString,IPTVStreamHandler*>::iterator it = s_iptvhandlers.find(devkey);
49
50 if (it == s_iptvhandlers.end())
51 {
52 auto *newhandler = new IPTVStreamHandler(tuning, inputid);
53 newhandler->Start();
54 s_iptvhandlers[devkey] = newhandler;
55 s_iptvhandlers_refcnt[devkey] = 1;
56
57 LOG(VB_RECORD, LOG_INFO,
58 QString("IPTVSH[%1]: Creating new stream handler %2 for %3")
59 .arg(QString::number(inputid), devkey, tuning.GetDeviceName()));
60 }
61 else
62 {
63 s_iptvhandlers_refcnt[devkey]++;
64 uint rcount = s_iptvhandlers_refcnt[devkey];
65 LOG(VB_RECORD, LOG_INFO,
66 QString("IPTVSH[%1]: Using existing stream handler %2 for %3")
67 .arg(QString::number(inputid), devkey, tuning.GetDeviceName()) +
68 QString(" (%1 in use)").arg(rcount));
69 }
70
71 return s_iptvhandlers[devkey];
72}
73
75{
76 QMutexLocker locker(&s_iptvhandlers_lock);
77
78 QString devname = ref->m_device;
79
80 QMap<QString,uint>::iterator rit = s_iptvhandlers_refcnt.find(devname);
81 if (rit == s_iptvhandlers_refcnt.end())
82 return;
83
84 LOG(VB_RECORD, LOG_INFO, QString("IPTVSH[%1]: Return(%2) has %3 handlers")
85 .arg(QString::number(inputid), devname).arg(*rit));
86
87 if (*rit > 1)
88 {
89 ref = nullptr;
90 (*rit)--;
91 return;
92 }
93
94 QMap<QString,IPTVStreamHandler*>::iterator it = s_iptvhandlers.find(devname);
95 if ((it != s_iptvhandlers.end()) && (*it == ref))
96 {
97 LOG(VB_RECORD, LOG_INFO, QString("IPTVSH[%1]: Closing handler for %2")
98 .arg(QString::number(inputid), devname));
99 ref->Stop();
100 delete *it;
101 s_iptvhandlers.erase(it);
102 }
103 else
104 {
105 LOG(VB_GENERAL, LOG_ERR,
106 QString("IPTVSH[%1] Error: Couldn't find handler for %2")
107 .arg(QString::number(inputid), devname));
108 }
109
110 s_iptvhandlers_refcnt.erase(rit);
111 ref = nullptr;
112}
113
115 : StreamHandler(tuning.GetDeviceKey(), inputid)
116 , m_tuning(tuning)
117 , m_useRtpStreaming(m_tuning.IsRTP())
118{
119}
120
122{
123 RunProlog();
124
125 LOG(VB_GENERAL, LOG_INFO, LOC + "run()");
126
127 SetRunning(true, false, false);
128
129 // TODO Error handling..
130
131 // Setup
132 CetonRTSP *rtsp = nullptr;
133 IPTVTuningData tuning = m_tuning;
134 if(m_tuning.IsRTSP())
135 {
136 rtsp = new CetonRTSP(m_tuning.GetURL(0));
137
138 // Check RTSP capabilities
139 QStringList options;
140 if (!(rtsp->GetOptions(options) && options.contains("DESCRIBE") &&
141 options.contains("SETUP") && options.contains("PLAY") &&
142 options.contains("TEARDOWN")))
143 {
144 LOG(VB_RECORD, LOG_ERR, LOC +
145 "RTSP interface did not support the necessary options");
146 delete rtsp;
147 SetRunning(false, false, false);
148 RunEpilog();
149 return;
150 }
151
152 if (!rtsp->Describe())
153 {
154 LOG(VB_RECORD, LOG_ERR, LOC +
155 "RTSP Describe command failed");
156 delete rtsp;
157 SetRunning(false, false, false);
158 RunEpilog();
159 return;
160 }
161
162 m_useRtpStreaming = true;
163
164 QUrl urltuned = m_tuning.GetURL(0);
165 urltuned.setScheme("rtp");
166 urltuned.setPort(0);
167 tuning = IPTVTuningData(urltuned.toString(), 0, IPTVTuningData::kNone,
168 urltuned.toString(), 0, "", 0);
169 }
170
171 bool error = false;
172
173 int start_port = 0;
174 for (size_t i = 0; i < IPTV_SOCKET_COUNT; i++)
175 {
176 QUrl url = tuning.GetURL(i);
177 if (url.port() < 0)
178 continue;
179
180 LOG(VB_RECORD, LOG_DEBUG, LOC +
181 QString("setting up url[%1]:%2").arg(i).arg(url.toString()));
182
183 // always ensure we use consecutive port numbers
184 int port = start_port ? start_port + 1 : url.port();
185 QString host = url.host();
186 QHostAddress dest_addr(host);
187
188 if (!host.isEmpty() && dest_addr.isNull())
189 {
190 // address is a hostname, attempts to resolve it
191 QHostInfo info = QHostInfo::fromName(host);
192 QList<QHostAddress> list = info.addresses();
193
194 if (list.isEmpty())
195 {
196 LOG(VB_RECORD, LOG_ERR, LOC +
197 QString("Can't resolve hostname:'%1'").arg(host));
198 }
199 else
200 {
201 for (const auto & addr : std::as_const(list))
202 {
203 dest_addr = addr;
204 if (addr.protocol() == QAbstractSocket::IPv6Protocol)
205 {
206 // We prefer first IPv4
207 break;
208 }
209 }
210 LOG(VB_RECORD, LOG_DEBUG, LOC +
211 QString("resolved %1 as %2").arg(host, dest_addr.toString()));
212 }
213 }
214 bool ipv6 = dest_addr.protocol() == QAbstractSocket::IPv6Protocol;
215 bool is_multicast = ipv6 ?
216 dest_addr.isInSubnet(QHostAddress::parseSubnet("ff00::/8")) :
217 (dest_addr.toIPv4Address() & 0xf0000000) == 0xe0000000;
218
219 m_sockets[i] = new QUdpSocket();
220 if (!is_multicast)
221 {
222 // this allow to filter incoming traffic, and make sure it's from
223 // the requested server
224 m_sender[i] = dest_addr;
225 }
227
228 // we need to open the descriptor ourselves so we
229 // can set some socket options
230 int fd = socket(ipv6 ? AF_INET6 : AF_INET, SOCK_DGRAM, 0); // create IPv4 socket
231 if (fd < 0)
232 {
233 LOG(VB_GENERAL, LOG_ERR, LOC +
234 "Unable to create socket " + ENO);
235 continue;
236 }
237 int buf_size = 2 * 1024 * std::max(tuning.GetBitrate(i)/1000, 500U);
238 if (!tuning.GetBitrate(i))
239 buf_size = 2 * 1024 * 1024;
240 int err = setsockopt(fd, SOL_SOCKET, SO_RCVBUF,
241 (char *)&buf_size, sizeof(buf_size));
242 if (err)
243 {
244 LOG(VB_GENERAL, LOG_INFO, LOC +
245 QString("Increasing buffer size to %1 failed")
246 .arg(buf_size) + ENO);
247 }
248
249 m_sockets[i]->setSocketDescriptor(
250 fd, QAbstractSocket::UnconnectedState, QIODevice::ReadOnly);
251
252 // we bind to destination address if it's a multicast address, or
253 // the local ones otherwise
254 QHostAddress a {QHostAddress::Any};
255 if (is_multicast)
256 a = dest_addr;
257 else if (ipv6)
258 a = QHostAddress::AnyIPv6;
259 if (!m_sockets[i]->bind(a, port))
260 {
261 LOG(VB_GENERAL, LOG_ERR, LOC + "Binding to port failed.");
262 error = true;
263 }
264 else
265 {
266 start_port = m_sockets[i]->localPort();
267 }
268
269 if (is_multicast)
270 {
271 m_sockets[i]->joinMulticastGroup(dest_addr);
272 LOG(VB_GENERAL, LOG_INFO, LOC + QString("Joining %1")
273 .arg(dest_addr.toString()));
274 }
275
276 if (!is_multicast && rtsp && i == 1)
277 {
278 m_rtcpDest = dest_addr;
279 }
280 }
281
282 if (!error)
283 {
284 if (m_tuning.IsRTP() || m_tuning.IsRTSP())
285 m_buffer = new RTPPacketBuffer(tuning.GetBitrate(0));
286 else
287 m_buffer = new UDPPacketBuffer(tuning.GetBitrate(0));
290 }
291
292 if (!error && rtsp)
293 {
294 // Start Streaming
295 if (!rtsp->Setup(m_sockets[0]->localPort(), m_sockets[1]->localPort(),
297 !rtsp->Play())
298 {
299 LOG(VB_RECORD, LOG_ERR, LOC +
300 "Starting recording (RTP initialization failed). Aborting.");
301 error = true;
302 }
303 if (m_rtspRtcpPort > 0)
304 {
307 }
308 }
309
310 if (!error)
311 {
312 // Enter event loop
313 exec();
314 }
315
316 // Clean up
317 for (size_t i = 0; i < IPTV_SOCKET_COUNT; i++)
318 {
319 if (m_sockets[i])
320 {
321 delete m_sockets[i];
322 m_sockets[i] = nullptr;
323 delete m_readHelpers[i];
324 m_readHelpers[i] = nullptr;
325 }
326 }
327 delete m_buffer;
328 m_buffer = nullptr;
329 delete m_writeHelper;
330 m_writeHelper = nullptr;
331
332 if (rtsp)
333 {
334 rtsp->Teardown();
335 delete rtsp;
336 }
337
338 SetRunning(false, false, false);
339 RunEpilog();
340}
341
343 IPTVStreamHandler *p, QUdpSocket *s, uint stream) :
344 m_parent(p), m_socket(s), m_sender(p->m_sender[stream]),
345 m_stream(stream)
346{
347 connect(m_socket, &QIODevice::readyRead,
349}
350
351#define LOC_WH QString("IPTVSH(%1): ").arg(m_parent->m_device)
352
354{
355 QHostAddress sender;
356 quint16 senderPort = 0;
357 bool sender_null = m_sender.isNull();
358
359 if (0 == m_stream)
360 {
361 while (m_socket->hasPendingDatagrams())
362 {
364 QByteArray &data = packet.GetDataReference();
365 data.resize(m_socket->pendingDatagramSize());
366 m_socket->readDatagram(data.data(), data.size(),
367 &sender, &senderPort);
368 if (sender_null || sender == m_sender)
369 {
371 }
372 else
373 {
374 LOG(VB_RECORD, LOG_WARNING, LOC_WH +
375 QString("Received on socket(%1) %2 bytes from non expected "
376 "sender:%3 (expected:%4) ignoring")
377 .arg(m_stream).arg(data.size())
378 .arg(sender.toString(), m_sender.toString()));
379 }
380 }
381 }
382 else
383 {
384 while (m_socket->hasPendingDatagrams())
385 {
387 QByteArray &data = packet.GetDataReference();
388 data.resize(m_socket->pendingDatagramSize());
389 m_socket->readDatagram(data.data(), data.size(),
390 &sender, &senderPort);
391 if (sender_null || sender == m_sender)
392 {
394 }
395 else
396 {
397 LOG(VB_RECORD, LOG_WARNING, LOC_WH +
398 QString("Received on socket(%1) %2 bytes from non expected "
399 "sender:%3 (expected:%4) ignoring")
400 .arg(m_stream).arg(data.size())
401 .arg(sender.toString(), m_sender.toString()));
402 }
403 }
404 }
405}
406
408{
409 if (m_timer)
410 {
411 killTimer(m_timer);
412 }
413 if (m_timerRtcp)
414 {
415 killTimer(m_timerRtcp);
416 }
417 m_timer = 0;
418 m_timerRtcp = 0;
419 m_parent = nullptr;
420}
421
423{
424 if (event->timerId() == m_timerRtcp)
425 {
427 return;
428 }
429
431 return;
432
434 {
436
437 if (packet.GetDataReference().isEmpty())
438 break;
439
440 int remainder = 0;
441 {
442 QMutexLocker locker(&m_parent->m_listenerLock);
443 QByteArray &data = packet.GetDataReference();
444 for (auto sit = m_parent->m_streamDataList.cbegin();
445 sit != m_parent->m_streamDataList.cend(); ++sit)
446 {
447 remainder = sit.key()->ProcessData(
448 reinterpret_cast<const unsigned char*>(data.data()),
449 data.size());
450 }
451 }
452
453 if (remainder != 0)
454 {
455 LOG(VB_RECORD, LOG_INFO, LOC_WH +
456 QString("data_length = %1 remainder = %2")
457 .arg(packet.GetDataReference().size()).arg(remainder));
458 }
459
460 m_parent->m_buffer->FreePacket(packet);
461 }
462
464 {
466
467 if (!packet.IsValid())
468 break;
469
471 {
472 RTPTSDataPacket ts_packet(packet);
473
474 if (!ts_packet.IsValid())
475 {
476 m_parent->m_buffer->FreePacket(packet);
477 continue;
478 }
479
480 uint exp_seq_num = m_lastSequenceNumber + 1;
481 uint seq_num = ts_packet.GetSequenceNumber();
483 ((exp_seq_num&0xFFFF) != (seq_num&0xFFFF)))
484 {
485 LOG(VB_RECORD, LOG_INFO, LOC_WH +
486 QString("Sequence number mismatch %1!=%2")
487 .arg(seq_num).arg(exp_seq_num));
488 if (seq_num > exp_seq_num)
489 {
490 m_lostInterval = seq_num - exp_seq_num;
492 }
493 }
494 m_lastSequenceNumber = seq_num;
495 m_lastTimestamp = ts_packet.GetTimeStamp();
496 LOG(VB_RECORD, LOG_DEBUG,
497 QString("Processing RTP packet(seq:%1 ts:%2)")
499
500 m_parent->m_listenerLock.lock();
501
502 int remainder = 0;
503 for (auto sit = m_parent->m_streamDataList.cbegin();
504 sit != m_parent->m_streamDataList.cend(); ++sit)
505 {
506 remainder = sit.key()->ProcessData(
507 ts_packet.GetTSData(), ts_packet.GetTSDataSize());
508 }
509
510 m_parent->m_listenerLock.unlock();
511
512 if (remainder != 0)
513 {
514 LOG(VB_RECORD, LOG_INFO, LOC_WH +
515 QString("data_length = %1 remainder = %2")
516 .arg(ts_packet.GetTSDataSize()).arg(remainder));
517 }
518 }
519 m_parent->m_buffer->FreePacket(packet);
520 }
521}
522
524{
525 if (m_parent->m_rtcpDest.isNull())
526 {
527 // no point sending data if we don't know where to
528 return;
529 }
531 RTCPDataPacket rtcp =
535 QByteArray buf = rtcp.GetData();
536
537 LOG(VB_RECORD, LOG_DEBUG, LOC_WH +
538 QString("Sending RTCPReport to %1:%2")
539 .arg(m_parent->m_rtcpDest.toString())
540 .arg(m_parent->m_rtspRtcpPort));
541 m_parent->m_sockets[1]->writeDatagram(buf.constData(), buf.size(),
545}
bool Setup(ushort clientPort1, ushort clientPort2, ushort &rtpPort, ushort &rtcpPort, uint32_t &ssrc)
Definition: cetonrtsp.cpp:400
bool Play(void)
Definition: cetonrtsp.cpp:455
bool Teardown(void)
Definition: cetonrtsp.cpp:463
bool Describe(void)
Definition: cetonrtsp.cpp:329
bool GetOptions(QStringList &options)
Definition: cetonrtsp.cpp:244
IPTVStreamHandler * m_parent
IPTVStreamHandlerReadHelper(IPTVStreamHandler *p, QUdpSocket *s, uint stream)
void timerEvent(QTimerEvent *event) override
IPTVStreamHandler * m_parent
std::array< QHostAddress, IPTV_SOCKET_COUNT > m_sender
IPTVStreamHandler(const IPTVTuningData &tuning, int inputid)
QHostAddress m_rtcpDest
void run(void) override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.
static QMap< QString, uint > s_iptvhandlers_refcnt
friend class IPTVStreamHandlerWriteHelper
IPTVTuningData m_tuning
std::array< QUdpSocket *, IPTV_SOCKET_COUNT > m_sockets
std::array< IPTVStreamHandlerReadHelper *, IPTV_SOCKET_COUNT > m_readHelpers
PacketBuffer * m_buffer
static IPTVStreamHandler * Get(const IPTVTuningData &tuning, int inputid)
friend class IPTVStreamHandlerReadHelper
static QMutex s_iptvhandlers_lock
static QMap< QString, IPTVStreamHandler * > s_iptvhandlers
static void Return(IPTVStreamHandler *&ref, int inputid)
IPTVStreamHandlerWriteHelper * m_writeHelper
QString GetDeviceKey(void) const
bool IsRTP(void) const
bool IsRTSP(void) const
uint GetBitrate(uint i) const
QString GetDeviceName(void) const
QUrl GetURL(uint i) const
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:194
int exec(void)
Enters the qt event loop. call exit or quit to exit thread.
Definition: mthread.cpp:323
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:207
UDPPacket PopDataPacket(void)
Fetches a data packet for processing.
virtual void PushFECPacket(const UDPPacket &, unsigned int)=0
UDPPacket GetEmptyPacket(void)
Gets a packet for use in PushDataPacket/PushFECPacket.
virtual void PushDataPacket(const UDPPacket &)=0
void FreePacket(const UDPPacket &packet)
Frees an RTPDataPacket returned by PopDataPacket.
bool HasAvailablePacket(void) const
Returns true if there are ordered packets ready for processing.
RTCP Data Packet.
QByteArray GetData(void) const
RTP Data Packet.
Definition: rtpdatapacket.h:29
bool IsValid(void) const override
IsValid() must return true before any data access methods are called, other than GetDataReference() a...
uint GetTimeStamp(void) const
Definition: rtpdatapacket.h:64
uint GetPayloadType(void) const
Definition: rtpdatapacket.h:54
uint GetSequenceNumber(void) const
Definition: rtpdatapacket.h:59
RTP Transport Stream Data Packet.
unsigned int GetTSDataSize(void) const
const unsigned char * GetTSData(void) const
StreamDataList m_streamDataList
QString m_device
void SetRunning(bool running, bool using_buffering, bool using_section_reader)
void Stop(void)
QRecursiveMutex m_listenerLock
UDP Packet.
Definition: udppacket.h:21
QByteArray & GetDataReference(void)
Definition: udppacket.h:36
unsigned int uint
Definition: compat.h:60
#define LOC
#define LOC_WH
static constexpr std::chrono::milliseconds RTCP_TIMER
static constexpr size_t IPTV_SOCKET_COUNT
#define ENO
This can be appended to the LOG args with "+".
Definition: mythlogging.h:74
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
Definition: mythlogging.h:39
dictionary info
Definition: azlyrics.py:7
def error(message)
Definition: smolt.py:409