8# include <sys/socket.h>
9# include <netinet/in.h>
10# include <netinet/ip.h>
30#define LOC QString("IPTVSH[%1](%2): ").arg(m_inputId).arg(m_device)
43 QMap<QString,IPTVStreamHandler*>::iterator it =
s_iptvhandlers.find(devkey);
52 LOG(VB_RECORD, LOG_INFO,
53 QString(
"IPTVSH[%1]: Creating new stream handler %2 for %3")
54 .arg(QString::number(inputid), devkey, tuning.
GetDeviceName()));
60 LOG(VB_RECORD, LOG_INFO,
61 QString(
"IPTVSH[%1]: Using existing stream handler %2 for %3")
62 .arg(QString::number(inputid), devkey, tuning.
GetDeviceName()) +
63 QString(
" (%1 in use)").arg(rcount));
79 LOG(VB_RECORD, LOG_INFO, QString(
"IPTVSH[%1]: Return(%2) has %3 handlers")
80 .arg(QString::number(inputid), devname).arg(*rit));
89 QMap<QString,IPTVStreamHandler*>::iterator it =
s_iptvhandlers.find(devname);
92 LOG(VB_RECORD, LOG_INFO, QString(
"IPTVSH[%1]: Closing handler for %2")
93 .arg(QString::number(inputid), devname));
100 LOG(VB_GENERAL, LOG_ERR,
101 QString(
"IPTVSH[%1] Error: Couldn't find handler for %2")
102 .arg(QString::number(inputid), devname));
112 , m_useRtpStreaming(m_tuning.IsRTP())
120 LOG(VB_GENERAL, LOG_INFO,
LOC +
"run()");
139 LOG(VB_RECORD, LOG_ERR,
LOC +
140 "RTSP interface did not support the necessary options");
149 LOG(VB_RECORD, LOG_ERR,
LOC +
150 "RTSP Describe command failed");
160 urltuned.setScheme(
"rtp");
163 urltuned.toString(), 0,
"", 0);
171 QUrl url = tuning.
GetURL(i);
175 LOG(VB_RECORD, LOG_DEBUG,
LOC +
176 QString(
"setting up url[%1]:%2").arg(i).arg(url.toString()));
179 int port = start_port ? start_port + 1 : url.port();
180 QString host = url.host();
181 QHostAddress dest_addr(host);
183 if (!host.isEmpty() && dest_addr.isNull())
186 QHostInfo
info = QHostInfo::fromName(host);
187 QList<QHostAddress> list =
info.addresses();
191 LOG(VB_RECORD, LOG_ERR,
LOC +
192 QString(
"Can't resolve hostname:'%1'").arg(host));
196 for (
const auto & addr : std::as_const(list))
199 if (addr.protocol() == QAbstractSocket::IPv6Protocol)
205 LOG(VB_RECORD, LOG_DEBUG,
LOC +
206 QString(
"resolved %1 as %2").arg(host, dest_addr.toString()));
209 bool ipv6 = dest_addr.protocol() == QAbstractSocket::IPv6Protocol;
210 bool is_multicast = ipv6 ?
211 dest_addr.isInSubnet(QHostAddress::parseSubnet(
"ff00::/8")) :
212 (dest_addr.toIPv4Address() & 0xf0000000) == 0xe0000000;
225 int fd = socket(ipv6 ? AF_INET6 : AF_INET, SOCK_DGRAM, 0);
228 LOG(VB_GENERAL, LOG_ERR,
LOC +
229 "Unable to create socket " +
ENO);
232 int buf_size = 2 * 1024 * std::max(tuning.
GetBitrate(i)/1000, 500U);
234 buf_size = 2 * 1024 * 1024;
235 int err = setsockopt(fd, SOL_SOCKET, SO_RCVBUF,
236 (
char *)&buf_size,
sizeof(buf_size));
239 LOG(VB_GENERAL, LOG_INFO,
LOC +
240 QString(
"Increasing buffer size to %1 failed")
241 .arg(buf_size) +
ENO);
245 fd, QAbstractSocket::UnconnectedState, QIODevice::ReadOnly);
249 QHostAddress a {QHostAddress::Any};
253 a = QHostAddress::AnyIPv6;
256 LOG(VB_GENERAL, LOG_ERR,
LOC +
"Binding to port failed.");
266 m_sockets[i]->joinMulticastGroup(dest_addr);
267 LOG(VB_GENERAL, LOG_INFO,
LOC + QString(
"Joining %1")
268 .arg(dest_addr.toString()));
271 if (!is_multicast && rtsp && i == 1)
294 LOG(VB_RECORD, LOG_ERR,
LOC +
295 "Starting recording (RTP initialization failed). Aborting.");
339 m_parent(
p), m_socket(s), m_sender(
p->m_sender[stream]),
342 connect(
m_socket, &QIODevice::readyRead,
346#define LOC_WH QString("IPTVSH(%1): ").arg(m_parent->m_device)
351 quint16 senderPort = 0;
352 bool sender_null =
m_sender.isNull();
356 while (
m_socket->hasPendingDatagrams())
360 data.resize(
m_socket->pendingDatagramSize());
361 m_socket->readDatagram(data.data(), data.size(),
362 &sender, &senderPort);
363 if (sender_null || sender ==
m_sender)
370 QString(
"Received on socket(%1) %2 bytes from non expected "
371 "sender:%3 (expected:%4) ignoring")
373 .arg(sender.toString(),
m_sender.toString()));
379 while (
m_socket->hasPendingDatagrams())
383 data.resize(
m_socket->pendingDatagramSize());
384 m_socket->readDatagram(data.data(), data.size(),
385 &sender, &senderPort);
386 if (sender_null || sender ==
m_sender)
393 QString(
"Received on socket(%1) %2 bytes from non expected "
394 "sender:%3 (expected:%4) ignoring")
396 .arg(sender.toString(),
m_sender.toString()));
442 remainder = sit.key()->ProcessData(
443 reinterpret_cast<const unsigned char*
>(data.data()),
451 QString(
"data_length = %1 remainder = %2")
478 ((exp_seq_num&0xFFFF) != (seq_num&0xFFFF)))
481 QString(
"Sequence number mismatch %1!=%2")
482 .arg(seq_num).arg(exp_seq_num));
483 if (seq_num > exp_seq_num)
491 LOG(VB_RECORD, LOG_DEBUG,
492 QString(
"Processing RTP packet(seq:%1 ts:%2)")
501 remainder = sit.key()->ProcessData(
510 QString(
"data_length = %1 remainder = %2")
530 QByteArray buf = rtcp.
GetData();
533 QString(
"Sending RTCPReport to %1:%2")
bool Setup(ushort clientPort1, ushort clientPort2, ushort &rtpPort, ushort &rtcpPort, uint32_t &ssrc)
bool GetOptions(QStringList &options)
IPTVStreamHandler * m_parent
IPTVStreamHandlerReadHelper(IPTVStreamHandler *p, QUdpSocket *s, uint stream)
uint m_previousLastSequenceNumber
void SendRTCPReport(void)
void timerEvent(QTimerEvent *event) override
IPTVStreamHandler * m_parent
~IPTVStreamHandlerWriteHelper() override
uint m_lastSequenceNumber
std::array< QHostAddress, IPTV_SOCKET_COUNT > m_sender
IPTVStreamHandler(const IPTVTuningData &tuning, int inputid)
void run(void) override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.
static QMap< QString, uint > s_iptvhandlers_refcnt
friend class IPTVStreamHandlerWriteHelper
std::array< QUdpSocket *, IPTV_SOCKET_COUNT > m_sockets
std::array< IPTVStreamHandlerReadHelper *, IPTV_SOCKET_COUNT > m_readHelpers
static IPTVStreamHandler * Get(const IPTVTuningData &tuning, int inputid)
friend class IPTVStreamHandlerReadHelper
static QMutex s_iptvhandlers_lock
static QMap< QString, IPTVStreamHandler * > s_iptvhandlers
static void Return(IPTVStreamHandler *&ref, int inputid)
IPTVStreamHandlerWriteHelper * m_writeHelper
QString GetDeviceKey(void) const
uint GetBitrate(uint i) const
QString GetDeviceName(void) const
QUrl GetURL(uint i) const
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
int exec(void)
Enters the qt event loop. call exit or quit to exit thread.
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
UDPPacket PopDataPacket(void)
Fetches a data packet for processing.
virtual void PushFECPacket(const UDPPacket &, unsigned int)=0
UDPPacket GetEmptyPacket(void)
Gets a packet for use in PushDataPacket/PushFECPacket.
virtual void PushDataPacket(const UDPPacket &)=0
void FreePacket(const UDPPacket &packet)
Frees an RTPDataPacket returned by PopDataPacket.
bool HasAvailablePacket(void) const
Returns true if there are ordered packets ready for processing.
QByteArray GetData(void) const
bool IsValid(void) const override
IsValid() must return true before any data access methods are called, other than GetDataReference() a...
uint GetTimeStamp(void) const
uint GetPayloadType(void) const
uint GetSequenceNumber(void) const
RTP Transport Stream Data Packet.
unsigned int GetTSDataSize(void) const
const unsigned char * GetTSData(void) const
StreamDataList m_streamDataList
void SetRunning(bool running, bool using_buffering, bool using_section_reader)
QRecursiveMutex m_listenerLock
QByteArray & GetDataReference(void)
static constexpr std::chrono::milliseconds RTCP_TIMER
static constexpr size_t IPTV_SOCKET_COUNT
#define ENO
This can be appended to the LOG args with "+".
#define LOG(_MASK_, _LEVEL_, _QSTRING_)