MythTV master
iptvstreamhandler.cpp
Go to the documentation of this file.
1// -*- Mode: c++ -*-
2
3// System headers
4#ifdef _WIN32
5# include <ws2tcpip.h>
6#else
7# include <sys/types.h>
8# include <sys/socket.h>
9# include <netinet/in.h>
10# include <netinet/ip.h>
11#endif
12
13// Qt headers
14#include <QUdpSocket>
15#include <QByteArray>
16#include <QHostInfo>
17
18// MythTV headers
20
21#include "cetonrtsp.h"
22#include "iptvstreamhandler.h"
23#include "rtp/rtcpdatapacket.h"
24#include "rtp/rtpdatapacket.h"
25#include "rtp/rtpfecpacket.h"
26#include "rtp/rtppacketbuffer.h"
27#include "rtp/rtptsdatapacket.h"
28#include "rtp/udppacketbuffer.h"
29
30#define LOC QString("IPTVSH[%1](%2): ").arg(m_inputId).arg(m_device)
31
32QMap<QString,IPTVStreamHandler*> IPTVStreamHandler::s_iptvhandlers;
35
37 int inputid)
38{
39 QMutexLocker locker(&s_iptvhandlers_lock);
40
41 QString devkey = tuning.GetDeviceKey();
42
43 QMap<QString,IPTVStreamHandler*>::iterator it = s_iptvhandlers.find(devkey);
44
45 if (it == s_iptvhandlers.end())
46 {
47 auto *newhandler = new IPTVStreamHandler(tuning, inputid);
48 newhandler->Start();
49 s_iptvhandlers[devkey] = newhandler;
50 s_iptvhandlers_refcnt[devkey] = 1;
51
52 LOG(VB_RECORD, LOG_INFO,
53 QString("IPTVSH[%1]: Creating new stream handler %2 for %3")
54 .arg(QString::number(inputid), devkey, tuning.GetDeviceName()));
55 }
56 else
57 {
58 s_iptvhandlers_refcnt[devkey]++;
59 uint rcount = s_iptvhandlers_refcnt[devkey];
60 LOG(VB_RECORD, LOG_INFO,
61 QString("IPTVSH[%1]: Using existing stream handler %2 for %3")
62 .arg(QString::number(inputid), devkey, tuning.GetDeviceName()) +
63 QString(" (%1 in use)").arg(rcount));
64 }
65
66 return s_iptvhandlers[devkey];
67}
68
70{
71 QMutexLocker locker(&s_iptvhandlers_lock);
72
73 QString devname = ref->m_device;
74
75 QMap<QString,uint>::iterator rit = s_iptvhandlers_refcnt.find(devname);
76 if (rit == s_iptvhandlers_refcnt.end())
77 return;
78
79 LOG(VB_RECORD, LOG_INFO, QString("IPTVSH[%1]: Return(%2) has %3 handlers")
80 .arg(QString::number(inputid), devname).arg(*rit));
81
82 if (*rit > 1)
83 {
84 ref = nullptr;
85 (*rit)--;
86 return;
87 }
88
89 QMap<QString,IPTVStreamHandler*>::iterator it = s_iptvhandlers.find(devname);
90 if ((it != s_iptvhandlers.end()) && (*it == ref))
91 {
92 LOG(VB_RECORD, LOG_INFO, QString("IPTVSH[%1]: Closing handler for %2")
93 .arg(QString::number(inputid), devname));
94 ref->Stop();
95 delete *it;
96 s_iptvhandlers.erase(it);
97 }
98 else
99 {
100 LOG(VB_GENERAL, LOG_ERR,
101 QString("IPTVSH[%1] Error: Couldn't find handler for %2")
102 .arg(QString::number(inputid), devname));
103 }
104
105 s_iptvhandlers_refcnt.erase(rit);
106 ref = nullptr;
107}
108
110 : StreamHandler(tuning.GetDeviceKey(), inputid)
111 , m_tuning(tuning)
112 , m_useRtpStreaming(m_tuning.IsRTP())
113{
114}
115
117{
118 RunProlog();
119
120 LOG(VB_GENERAL, LOG_INFO, LOC + "run()");
121
122 SetRunning(true, false, false);
123
124 // TODO Error handling..
125
126 // Setup
127 CetonRTSP *rtsp = nullptr;
128 IPTVTuningData tuning = m_tuning;
129 if(m_tuning.IsRTSP())
130 {
131 rtsp = new CetonRTSP(m_tuning.GetURL(0));
132
133 // Check RTSP capabilities
134 QStringList options;
135 if (!(rtsp->GetOptions(options) && options.contains("DESCRIBE") &&
136 options.contains("SETUP") && options.contains("PLAY") &&
137 options.contains("TEARDOWN")))
138 {
139 LOG(VB_RECORD, LOG_ERR, LOC +
140 "RTSP interface did not support the necessary options");
141 delete rtsp;
142 SetRunning(false, false, false);
143 RunEpilog();
144 return;
145 }
146
147 if (!rtsp->Describe())
148 {
149 LOG(VB_RECORD, LOG_ERR, LOC +
150 "RTSP Describe command failed");
151 delete rtsp;
152 SetRunning(false, false, false);
153 RunEpilog();
154 return;
155 }
156
157 m_useRtpStreaming = true;
158
159 QUrl urltuned = m_tuning.GetURL(0);
160 urltuned.setScheme("rtp");
161 urltuned.setPort(0);
162 tuning = IPTVTuningData(urltuned.toString(), 0, IPTVTuningData::kNone,
163 urltuned.toString(), 0, "", 0);
164 }
165
166 bool error = false;
167
168 int start_port = 0;
169 for (size_t i = 0; i < IPTV_SOCKET_COUNT; i++)
170 {
171 QUrl url = tuning.GetURL(i);
172 if (url.port() < 0)
173 continue;
174
175 LOG(VB_RECORD, LOG_DEBUG, LOC +
176 QString("setting up url[%1]:%2").arg(i).arg(url.toString()));
177
178 // always ensure we use consecutive port numbers
179 int port = start_port ? start_port + 1 : url.port();
180 QString host = url.host();
181 QHostAddress dest_addr(host);
182
183 if (!host.isEmpty() && dest_addr.isNull())
184 {
185 // address is a hostname, attempts to resolve it
186 QHostInfo info = QHostInfo::fromName(host);
187 QList<QHostAddress> list = info.addresses();
188
189 if (list.isEmpty())
190 {
191 LOG(VB_RECORD, LOG_ERR, LOC +
192 QString("Can't resolve hostname:'%1'").arg(host));
193 }
194 else
195 {
196 for (const auto & addr : std::as_const(list))
197 {
198 dest_addr = addr;
199 if (addr.protocol() == QAbstractSocket::IPv6Protocol)
200 {
201 // We prefer first IPv4
202 break;
203 }
204 }
205 LOG(VB_RECORD, LOG_DEBUG, LOC +
206 QString("resolved %1 as %2").arg(host, dest_addr.toString()));
207 }
208 }
209 bool ipv6 = dest_addr.protocol() == QAbstractSocket::IPv6Protocol;
210 bool is_multicast = ipv6 ?
211 dest_addr.isInSubnet(QHostAddress::parseSubnet("ff00::/8")) :
212 (dest_addr.toIPv4Address() & 0xf0000000) == 0xe0000000;
213
214 m_sockets[i] = new QUdpSocket();
215 if (!is_multicast)
216 {
217 // this allow to filter incoming traffic, and make sure it's from
218 // the requested server
219 m_sender[i] = dest_addr;
220 }
222
223 // we need to open the descriptor ourselves so we
224 // can set some socket options
225 int fd = socket(ipv6 ? AF_INET6 : AF_INET, SOCK_DGRAM, 0); // create IPv4 socket
226 if (fd < 0)
227 {
228 LOG(VB_GENERAL, LOG_ERR, LOC +
229 "Unable to create socket " + ENO);
230 continue;
231 }
232 int buf_size = 2 * 1024 * std::max(tuning.GetBitrate(i)/1000, 500U);
233 if (!tuning.GetBitrate(i))
234 buf_size = 2 * 1024 * 1024;
235 int err = setsockopt(fd, SOL_SOCKET, SO_RCVBUF,
236 (char *)&buf_size, sizeof(buf_size));
237 if (err)
238 {
239 LOG(VB_GENERAL, LOG_INFO, LOC +
240 QString("Increasing buffer size to %1 failed")
241 .arg(buf_size) + ENO);
242 }
243
244 m_sockets[i]->setSocketDescriptor(
245 fd, QAbstractSocket::UnconnectedState, QIODevice::ReadOnly);
246
247 // we bind to destination address if it's a multicast address, or
248 // the local ones otherwise
249 QHostAddress a {QHostAddress::Any};
250 if (is_multicast)
251 a = dest_addr;
252 else if (ipv6)
253 a = QHostAddress::AnyIPv6;
254 if (!m_sockets[i]->bind(a, port))
255 {
256 LOG(VB_GENERAL, LOG_ERR, LOC + "Binding to port failed.");
257 error = true;
258 }
259 else
260 {
261 start_port = m_sockets[i]->localPort();
262 }
263
264 if (is_multicast)
265 {
266 m_sockets[i]->joinMulticastGroup(dest_addr);
267 LOG(VB_GENERAL, LOG_INFO, LOC + QString("Joining %1")
268 .arg(dest_addr.toString()));
269 }
270
271 if (!is_multicast && rtsp && i == 1)
272 {
273 m_rtcpDest = dest_addr;
274 }
275 }
276
277 if (!error)
278 {
279 if (m_tuning.IsRTP() || m_tuning.IsRTSP())
280 m_buffer = new RTPPacketBuffer(tuning.GetBitrate(0));
281 else
282 m_buffer = new UDPPacketBuffer(tuning.GetBitrate(0));
285 }
286
287 if (!error && rtsp)
288 {
289 // Start Streaming
290 if (!rtsp->Setup(m_sockets[0]->localPort(), m_sockets[1]->localPort(),
292 !rtsp->Play())
293 {
294 LOG(VB_RECORD, LOG_ERR, LOC +
295 "Starting recording (RTP initialization failed). Aborting.");
296 error = true;
297 }
298 if (m_rtspRtcpPort > 0)
299 {
302 }
303 }
304
305 if (!error)
306 {
307 // Enter event loop
308 exec();
309 }
310
311 // Clean up
312 for (size_t i = 0; i < IPTV_SOCKET_COUNT; i++)
313 {
314 if (m_sockets[i])
315 {
316 delete m_sockets[i];
317 m_sockets[i] = nullptr;
318 delete m_readHelpers[i];
319 m_readHelpers[i] = nullptr;
320 }
321 }
322 delete m_buffer;
323 m_buffer = nullptr;
324 delete m_writeHelper;
325 m_writeHelper = nullptr;
326
327 if (rtsp)
328 {
329 rtsp->Teardown();
330 delete rtsp;
331 }
332
333 SetRunning(false, false, false);
334 RunEpilog();
335}
336
338 IPTVStreamHandler *p, QUdpSocket *s, uint stream) :
339 m_parent(p), m_socket(s), m_sender(p->m_sender[stream]),
340 m_stream(stream)
341{
342 connect(m_socket, &QIODevice::readyRead,
344}
345
346#define LOC_WH QString("IPTVSH(%1): ").arg(m_parent->m_device)
347
349{
350 QHostAddress sender;
351 quint16 senderPort = 0;
352 bool sender_null = m_sender.isNull();
353
354 if (0 == m_stream)
355 {
356 while (m_socket->hasPendingDatagrams())
357 {
359 QByteArray &data = packet.GetDataReference();
360 data.resize(m_socket->pendingDatagramSize());
361 m_socket->readDatagram(data.data(), data.size(),
362 &sender, &senderPort);
363 if (sender_null || sender == m_sender)
364 {
366 }
367 else
368 {
369 LOG(VB_RECORD, LOG_WARNING, LOC_WH +
370 QString("Received on socket(%1) %2 bytes from non expected "
371 "sender:%3 (expected:%4) ignoring")
372 .arg(m_stream).arg(data.size())
373 .arg(sender.toString(), m_sender.toString()));
374 }
375 }
376 }
377 else
378 {
379 while (m_socket->hasPendingDatagrams())
380 {
382 QByteArray &data = packet.GetDataReference();
383 data.resize(m_socket->pendingDatagramSize());
384 m_socket->readDatagram(data.data(), data.size(),
385 &sender, &senderPort);
386 if (sender_null || sender == m_sender)
387 {
389 }
390 else
391 {
392 LOG(VB_RECORD, LOG_WARNING, LOC_WH +
393 QString("Received on socket(%1) %2 bytes from non expected "
394 "sender:%3 (expected:%4) ignoring")
395 .arg(m_stream).arg(data.size())
396 .arg(sender.toString(), m_sender.toString()));
397 }
398 }
399 }
400}
401
403{
404 if (m_timer)
405 {
406 killTimer(m_timer);
407 }
408 if (m_timerRtcp)
409 {
410 killTimer(m_timerRtcp);
411 }
412 m_timer = 0;
413 m_timerRtcp = 0;
414 m_parent = nullptr;
415}
416
418{
419 if (event->timerId() == m_timerRtcp)
420 {
422 return;
423 }
424
426 return;
427
429 {
431
432 if (packet.GetDataReference().isEmpty())
433 break;
434
435 int remainder = 0;
436 {
437 QMutexLocker locker(&m_parent->m_listenerLock);
438 QByteArray &data = packet.GetDataReference();
439 for (auto sit = m_parent->m_streamDataList.cbegin();
440 sit != m_parent->m_streamDataList.cend(); ++sit)
441 {
442 remainder = sit.key()->ProcessData(
443 reinterpret_cast<const unsigned char*>(data.data()),
444 data.size());
445 }
446 }
447
448 if (remainder != 0)
449 {
450 LOG(VB_RECORD, LOG_INFO, LOC_WH +
451 QString("data_length = %1 remainder = %2")
452 .arg(packet.GetDataReference().size()).arg(remainder));
453 }
454
455 m_parent->m_buffer->FreePacket(packet);
456 }
457
459 {
461
462 if (!packet.IsValid())
463 break;
464
466 {
467 RTPTSDataPacket ts_packet(packet);
468
469 if (!ts_packet.IsValid())
470 {
471 m_parent->m_buffer->FreePacket(packet);
472 continue;
473 }
474
475 uint exp_seq_num = m_lastSequenceNumber + 1;
476 uint seq_num = ts_packet.GetSequenceNumber();
478 ((exp_seq_num&0xFFFF) != (seq_num&0xFFFF)))
479 {
480 LOG(VB_RECORD, LOG_INFO, LOC_WH +
481 QString("Sequence number mismatch %1!=%2")
482 .arg(seq_num).arg(exp_seq_num));
483 if (seq_num > exp_seq_num)
484 {
485 m_lostInterval = seq_num - exp_seq_num;
487 }
488 }
489 m_lastSequenceNumber = seq_num;
490 m_lastTimestamp = ts_packet.GetTimeStamp();
491 LOG(VB_RECORD, LOG_DEBUG,
492 QString("Processing RTP packet(seq:%1 ts:%2)")
494
495 m_parent->m_listenerLock.lock();
496
497 int remainder = 0;
498 for (auto sit = m_parent->m_streamDataList.cbegin();
499 sit != m_parent->m_streamDataList.cend(); ++sit)
500 {
501 remainder = sit.key()->ProcessData(
502 ts_packet.GetTSData(), ts_packet.GetTSDataSize());
503 }
504
505 m_parent->m_listenerLock.unlock();
506
507 if (remainder != 0)
508 {
509 LOG(VB_RECORD, LOG_INFO, LOC_WH +
510 QString("data_length = %1 remainder = %2")
511 .arg(ts_packet.GetTSDataSize()).arg(remainder));
512 }
513 }
514 m_parent->m_buffer->FreePacket(packet);
515 }
516}
517
519{
520 if (m_parent->m_rtcpDest.isNull())
521 {
522 // no point sending data if we don't know where to
523 return;
524 }
526 RTCPDataPacket rtcp =
530 QByteArray buf = rtcp.GetData();
531
532 LOG(VB_RECORD, LOG_DEBUG, LOC_WH +
533 QString("Sending RTCPReport to %1:%2")
534 .arg(m_parent->m_rtcpDest.toString())
535 .arg(m_parent->m_rtspRtcpPort));
536 m_parent->m_sockets[1]->writeDatagram(buf.constData(), buf.size(),
540}
bool Setup(ushort clientPort1, ushort clientPort2, ushort &rtpPort, ushort &rtcpPort, uint32_t &ssrc)
Definition: cetonrtsp.cpp:398
bool Play(void)
Definition: cetonrtsp.cpp:453
bool Teardown(void)
Definition: cetonrtsp.cpp:461
bool Describe(void)
Definition: cetonrtsp.cpp:327
bool GetOptions(QStringList &options)
Definition: cetonrtsp.cpp:244
IPTVStreamHandler * m_parent
IPTVStreamHandlerReadHelper(IPTVStreamHandler *p, QUdpSocket *s, uint stream)
void timerEvent(QTimerEvent *event) override
IPTVStreamHandler * m_parent
std::array< QHostAddress, IPTV_SOCKET_COUNT > m_sender
IPTVStreamHandler(const IPTVTuningData &tuning, int inputid)
QHostAddress m_rtcpDest
void run(void) override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.
static QMap< QString, uint > s_iptvhandlers_refcnt
friend class IPTVStreamHandlerWriteHelper
IPTVTuningData m_tuning
std::array< QUdpSocket *, IPTV_SOCKET_COUNT > m_sockets
std::array< IPTVStreamHandlerReadHelper *, IPTV_SOCKET_COUNT > m_readHelpers
PacketBuffer * m_buffer
static IPTVStreamHandler * Get(const IPTVTuningData &tuning, int inputid)
friend class IPTVStreamHandlerReadHelper
static QMutex s_iptvhandlers_lock
static QMap< QString, IPTVStreamHandler * > s_iptvhandlers
static void Return(IPTVStreamHandler *&ref, int inputid)
IPTVStreamHandlerWriteHelper * m_writeHelper
QString GetDeviceKey(void) const
bool IsRTP(void) const
bool IsRTSP(void) const
uint GetBitrate(uint i) const
QString GetDeviceName(void) const
QUrl GetURL(uint i) const
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:196
int exec(void)
Enters the qt event loop. call exit or quit to exit thread.
Definition: mthread.cpp:325
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:209
UDPPacket PopDataPacket(void)
Fetches a data packet for processing.
virtual void PushFECPacket(const UDPPacket &, unsigned int)=0
UDPPacket GetEmptyPacket(void)
Gets a packet for use in PushDataPacket/PushFECPacket.
virtual void PushDataPacket(const UDPPacket &)=0
void FreePacket(const UDPPacket &packet)
Frees an RTPDataPacket returned by PopDataPacket.
bool HasAvailablePacket(void) const
Returns true if there are ordered packets ready for processing.
RTCP Data Packet.
QByteArray GetData(void) const
RTP Data Packet.
Definition: rtpdatapacket.h:33
bool IsValid(void) const override
IsValid() must return true before any data access methods are called, other than GetDataReference() a...
uint GetTimeStamp(void) const
Definition: rtpdatapacket.h:68
uint GetPayloadType(void) const
Definition: rtpdatapacket.h:58
uint GetSequenceNumber(void) const
Definition: rtpdatapacket.h:63
RTP Transport Stream Data Packet.
unsigned int GetTSDataSize(void) const
const unsigned char * GetTSData(void) const
StreamDataList m_streamDataList
QString m_device
void SetRunning(bool running, bool using_buffering, bool using_section_reader)
void Stop(void)
QRecursiveMutex m_listenerLock
UDP Packet.
Definition: udppacket.h:21
QByteArray & GetDataReference(void)
Definition: udppacket.h:36
unsigned int uint
Definition: freesurround.h:24
#define LOC
#define LOC_WH
static constexpr std::chrono::milliseconds RTCP_TIMER
static constexpr size_t IPTV_SOCKET_COUNT
#define ENO
This can be appended to the LOG args with "+".
Definition: mythlogging.h:74
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
Definition: mythlogging.h:39
dictionary info
Definition: azlyrics.py:7
def error(message)
Definition: smolt.py:409