Changeset 1ae6a1fd6 in mythtv


Ignore:
Timestamp:
May 11, 2014, 11:38:11 AM (10 years ago)
Author:
Jean-Yves Avenard <jyavenard@…>
Branches:
fixes/0.27
Children:
3ed8cef18
Parents:
4668d05b58
git-author:
Jean-Yves Avenard <jyavenard@…> (05/11/14 11:38:11)
git-committer:
Jean-Yves Avenard <jyavenard@…> (05/11/14 11:43:54)
Message:

Rework IPTV and RTSP recorders

Lots of issues fixed and addition of missing (but required) features

  • Add IPv6 support
  • Handle RTCP channel (not 100% sure it’s the right information sent, but it allows to keep alive the connection with some server, and doesn’t impact playback)
  • Handle Keep Alive and maintain the RTSP TCP connection life so the rtsp server doesn’t drop us (seen with VLC rtsp server and FreeBox? IPTV stream)
  • Can use hostnames in URL instead of just IP addresses
  • Proper support of rtsp’s SDP

Fixes #11949

(cherry picked from commit 57b64c4af16aedee1608119f8e68ee01e2343d6f)

Conflicts:

mythtv/libs/libmythtv/recorders/iptvstreamhandler.cpp

Location:
mythtv/libs/libmythtv
Files:
1 added
5 edited

Legend:

Unmodified
Added
Removed
  • mythtv/libs/libmythtv/libmythtv.pro

    r4668d05b58 r1ae6a1fd6  
    640640    HEADERS += recorders/rtp/rtpdatapacket.h
    641641    HEADERS += recorders/rtp/rtpfecpacket.h
     642    HEADERS += recorders/rtp/rtcpdatapacket.h
    642643
    643644    SOURCES += recorders/cetonrtsp.cpp
  • mythtv/libs/libmythtv/recorders/cetonrtsp.cpp

    r4668d05b58 r1ae6a1fd6  
    88#include <QTcpSocket>
    99#include <QUrl>
     10#include <QVector>
    1011
    1112// MythTV includes
    1213#include "cetonrtsp.h"
    1314#include "mythlogging.h"
    14 
    15 
    16 #define LOC QString("CetonRTSP(%1): ").arg(_requestUrl)
     15#include "mythsocket.h"
     16
     17
     18#define LOC QString("CetonRTSP(%1): ").arg(_requestUrl.toString())
    1719
    1820QMutex CetonRTSP::_rtspMutex;
    1921
    2022CetonRTSP::CetonRTSP(const QString &ip, uint tuner, ushort port) :
    21     _ip(ip),
    22     _port(port),
     23    _socket(NULL),
    2324    _sequenceNumber(0),
    2425    _sessionId("0"),
    25     _responseCode(-1)
    26 {
    27     _requestUrl = QString("rtsp://%1:%2/cetonmpeg%3")
    28         .arg(ip).arg(port).arg(tuner);
     26    _responseCode(-1),
     27    _timeout(60),
     28    _timer(0)
     29{
     30    _requestUrl.setHost(ip);
     31    _requestUrl.setPort(port);
     32    _requestUrl.setScheme("rtsp");
     33    _requestUrl.setPath(QString("cetonmpeg%1").arg(tuner));
    2934}
    3035
    3136CetonRTSP::CetonRTSP(const QUrl &url) :
    32     _ip(url.host()),
    33     _port((url.port() >= 0) ? url.port() : 554),
     37    _socket(NULL),
    3438    _sequenceNumber(0),
    3539    _sessionId("0"),
    36     _responseCode(-1)
    37 {
    38     _requestUrl = url.toString();
     40    _requestUrl(url),
     41    _responseCode(-1),
     42    _timeout(60),
     43    _timer(0)
     44{
     45    if (url.port() < 0)
     46    {
     47        // default rtsp port
     48        _requestUrl.setPort(554);
     49    }
     50}
     51
     52CetonRTSP::~CetonRTSP()
     53{
     54    StopKeepAlive();
    3955}
    4056
    4157bool CetonRTSP::ProcessRequest(
    42     const QString &method, const QStringList* headers)
     58    const QString &method, const QStringList* headers,
     59    bool use_control, bool waitforanswer)
    4360{
    4461    QMutexLocker locker(&_rtspMutex);
    45     QTcpSocket socket;
    46     socket.connectToHost(_ip, _port);
     62
     63    _responseHeaders.clear();
     64    _responseContent.clear();
     65
     66    // Create socket if socket object has never been created or in non-connected state
     67    if (!_socket || _socket->state() != QAbstractSocket::ConnectedState)
     68    {
     69        if (!_socket)
     70        {
     71            _socket = new QTcpSocket();
     72        }
     73        else
     74        {
     75            _socket->close();
     76        }
     77        _socket->connectToHost(_requestUrl.host(), _requestUrl.port(),
     78                               QAbstractSocket::ReadWrite);
     79        bool ok = _socket->waitForConnected();
     80
     81        if (!ok)
     82        {
     83            LOG(VB_GENERAL, LOG_ERR, LOC +
     84                QString("Could not connect to server %1:%2")
     85                .arg(_requestUrl.host()).arg(_requestUrl.port()));
     86            delete _socket;
     87            _socket = NULL;
     88            return false;
     89        }
     90    }
     91    else
     92    {
     93        // empty socket's waiting data just in case
     94        _socket->waitForReadyRead(30);
     95        do
     96        {
     97            QVector<char> trash;
     98            uint avail = _socket->bytesAvailable();
     99            trash.resize(std::max((uint)trash.size(), avail));
     100            _socket->read(trash.data(), avail);
     101            _socket->waitForReadyRead(30);
     102        }
     103        while (_socket->bytesAvailable() > 0);
     104    }
    47105
    48106    QStringList requestHeaders;
    49     requestHeaders.append(QString("%1 %2 RTSP/1.0").arg(method, _requestUrl));
     107    requestHeaders.append(QString("%1 %2 RTSP/1.0")
     108        .arg(method)
     109        .arg(use_control ? _controlUrl.toString() :  _requestUrl.toString()));
    50110    requestHeaders.append(QString("User-Agent: MythTV Ceton Recorder"));
    51111    requestHeaders.append(QString("CSeq: %1").arg(++_sequenceNumber));
     
    65125
    66126    LOG(VB_RECORD, LOG_DEBUG, LOC + QString("write: %1").arg(request));
    67     socket.write(request.toLatin1());
     127    _socket->write(request.toLatin1());
    68128
    69129    _responseHeaders.clear();
    70130    _responseContent.clear();
     131
     132    if (!waitforanswer)
     133        return true;
    71134
    72135    QRegExp firstLineRegex(
     
    80143    while (true)
    81144    {
    82         if (!socket.canReadLine())
    83         {
    84             bool ready = socket.waitForReadyRead();
     145        if (!_socket->canReadLine())
     146        {
     147            bool ready = _socket->waitForReadyRead(30 * 1000);
    85148            if (!ready)
    86149            {
    87                 LOG(VB_RECORD, LOG_ERR, LOC + "RTSP server did not respond");
     150                LOG(VB_RECORD, LOG_ERR, LOC + "RTSP server did not respond after 30s");
    88151                return false;
    89152            }
     
    91154        }
    92155
    93         QString line = socket.readLine();
     156        QString line = _socket->readLine();
    94157        LOG(VB_RECORD, LOG_DEBUG, LOC + QString("read: %1").arg(line));
    95158
     
    109172            _responseMessage = parts.at(2);
    110173
     174            if (_responseCode != 200)
     175            {
     176                _responseMessage =
     177                    QString("Server couldn't process the request: '%1'")
     178                    .arg(_responseMessage);
     179                return false;
     180            }
    111181            firstLine = false;
    112182            continue;
     
    126196    }
    127197
    128     QString cSeq = _responseHeaders.value("CSeq");
     198    QString cSeq;
     199
     200    if (_responseHeaders.contains("CSeq"))
     201    {
     202        cSeq = _responseHeaders["CSeq"];
     203    }
     204    else
     205    {
     206        // Handle broken implementation, such as VLC
     207        // doesn't respect the case of "CSeq", so find it regardless of the spelling
     208        foreach (QString key, _responseHeaders.keys())
     209        {
     210            if (key.compare("CSeq", Qt::CaseInsensitive) == 0)
     211            {
     212                cSeq = _responseHeaders.value(key);
     213                break;
     214            }
     215        }
     216    }
    129217    if (cSeq != QString("%1").arg(_sequenceNumber))
    130218    {
     
    143231        while (bytesRead < contentLength)
    144232        {
    145             if (socket.bytesAvailable() == 0)
    146                 socket.waitForReadyRead();
    147 
    148             int count = socket.read(data+bytesRead, contentLength-bytesRead);
     233            if (_socket->bytesAvailable() == 0)
     234                _socket->waitForReadyRead();
     235
     236            int count = _socket->read(data+bytesRead, contentLength-bytesRead);
    149237            if (count == -1)
    150238            {
     
    155243            bytesRead += count;
    156244        }
     245        LOG(VB_RECORD, LOG_DEBUG, LOC +
     246            QString("received: %1").arg(_responseContent.constData()));
    157247    }
    158248    return true;
     
    169259}
    170260
     261/**
     262 * splitLines. prepare SDP content for easy read
     263 */
     264QStringList CetonRTSP::splitLines(const QByteArray &lines)
     265{
     266    QStringList list;
     267    QTextStream stream(lines);
     268    QString line;
     269
     270    do
     271    {
     272        line = stream.readLine();
     273        if (!line.isNull())
     274        {
     275            list.append(line);
     276        }
     277    }
     278    while (!line.isNull());
     279
     280    return list;
     281}
     282
     283/**
     284 * readParamaters. Scan a line like: Session: 1234556;destination=xx;client_port
     285 * and return the first entry and fill the arguments in the provided Params
     286 */
     287QString CetonRTSP::readParamaters(const QString &key, Params &parameters)
     288{
     289    QString val;
     290
     291    if (!_responseHeaders.contains(key))
     292    {
     293        return val;
     294    }
     295
     296    QStringList header = _responseHeaders.value(key).split(";");
     297
     298    for (int i = 0; i < header.size(); i++)
     299    {
     300        QString entry = header[i].trimmed();
     301
     302        if (i ==0)
     303        {
     304            val = entry;
     305            continue;
     306        }
     307        QStringList args = entry.split("=");
     308
     309        parameters.insert(args[0].trimmed(),
     310                          args.size() > 1 ? args[1].trimmed() : QString());
     311    }
     312    return val;
     313}
     314
     315/**
     316 * Return the base URL for the last DESCRIBE answer
     317 */
     318QUrl CetonRTSP::GetBaseUrl(void)
     319{
     320    if (_responseHeaders.contains("Content-Base"))
     321    {
     322        return _responseHeaders["Content-Base"];
     323    }
     324    if (_responseHeaders.contains("Content-Location"))
     325    {
     326        return _responseHeaders["Content-Location"];
     327    }
     328    return _requestUrl;
     329}
     330
    171331bool CetonRTSP::Describe(void)
    172332{
    173     if (!ProcessRequest("DESCRIBE"))
     333    QStringList headers;
     334
     335    headers.append("Accept: application/sdp");
     336
     337    if (!ProcessRequest("DESCRIBE", &headers))
    174338        return false;
    175339
    176     if (!_responseContent.contains("m=video 0 RTP/AVP 33"))
     340    // find control url
     341    QStringList lines = splitLines(_responseContent);
     342    bool found = false;
     343    QUrl base = _controlUrl = GetBaseUrl();
     344
     345    foreach (QString line, lines)
     346    {
     347        if (line.startsWith("m="))
     348        {
     349            if (found)
     350            {
     351                // another new stream, no need to parse further
     352                break;
     353            }
     354            if (!line.startsWith("m=video"))
     355            {
     356                // not a video stream
     357                continue;
     358            }
     359            QStringList args = line.split(" ");
     360            if (args[2] == "RTP/AVP" && args[3] == "33")
     361            {
     362                found = true;
     363            }
     364            continue;
     365        }
     366        if (line.startsWith("c="))
     367        {
     368            // TODO, connection parameter
     369            // assume we will always get a control entry
     370            continue;
     371        }
     372        if (line.startsWith("a=control:"))
     373        {
     374            // Per RFC: a=control:rtsp://example.com/foo
     375            // This attribute may contain either relative and absolute URLs,
     376            // following the rules and conventions set out in RFC 1808 [25].
     377            QString url = line.mid(10).trimmed();
     378            _controlUrl = url;
     379            if (url == "*")
     380            {
     381                _controlUrl = base;
     382            }
     383            else if (_controlUrl.isRelative())
     384            {
     385                _controlUrl = base.resolved(_controlUrl);
     386            }
     387            continue;
     388        }
     389    }
     390
     391    if (!found)
    177392    {
    178393        LOG(VB_RECORD, LOG_ERR, LOC + "expected content to be type "
    179394            "\"m=video 0 RTP/AVP 33\" but it appears not to be");
     395        _controlUrl = QUrl();
    180396        return false;
    181397    }
     
    184400}
    185401
    186 bool CetonRTSP::Setup(ushort clientPort1, ushort clientPort2)
     402bool CetonRTSP::Setup(ushort clientPort1, ushort clientPort2,
     403                      ushort &rtpPort, ushort &rtcpPort,
     404                      uint32_t &ssrc)
    187405{
    188406    LOG(VB_GENERAL, LOG_INFO, QString("CetonRTSP: ") +
     
    195413        .arg(clientPort1).arg(clientPort2));
    196414
    197     if (!ProcessRequest("SETUP", &extraHeaders))
     415    if (!ProcessRequest("SETUP", &extraHeaders, true))
    198416        return false;
    199417
    200     _sessionId = _responseHeaders.value("Session");
    201     if (_sessionId.size() < 8)
     418    Params params;
     419    QString session = readParamaters("Session", params);
     420
     421    if (session.isEmpty())
    202422    {
    203423        LOG(VB_RECORD, LOG_ERR, LOC +
     
    205425        return false;
    206426    }
     427    if (session.size() < 8)
     428    {
     429        LOG(VB_RECORD, LOG_WARNING, LOC +
     430            "invalid session id received");
     431    }
     432    _sessionId = session;
     433
     434    if (params.contains("timeout"))
     435    {
     436        _timeout = params["timeout"].toInt();
     437    }
     438
     439    QString transport = readParamaters("Transport", params);
     440    if (params.contains("ssrc"))
     441    {
     442        bool ok;
     443        ssrc = params["ssrc"].toUInt(&ok, 16);
     444    }
     445    if (params.contains("server_port"))
     446    {
     447        QString line = params["server_port"];
     448        QStringList val = line.split("-");
     449
     450        rtpPort = val[0].toInt();
     451        rtcpPort = val.size() > 1 ? val[1].toInt() : 0;
     452    }
    207453
    208454    return true;
     
    211457bool CetonRTSP::Play(void)
    212458{
    213     return ProcessRequest("PLAY");
     459    bool result = ProcessRequest("PLAY");
     460
     461    StartKeepAlive();
     462    return result;
    214463}
    215464
    216465bool CetonRTSP::Teardown(void)
    217466{
     467    StopKeepAlive();
     468
    218469    bool result = ProcessRequest("TEARDOWN");
     470
     471    QMutexLocker locker(&_rtspMutex);
     472
     473    delete _socket;
     474    _socket = NULL;
     475
    219476    _sessionId = "0";
    220477    return result;
    221478}
     479
     480void CetonRTSP::StartKeepAlive()
     481{
     482    if (_timer)
     483        return;
     484    int timeout = std::max(_timeout - 5, 5);
     485    LOG(VB_RECORD, LOG_DEBUG, LOC +
     486        QString("Start KeepAlive, every %1s").arg(timeout));
     487    _timer = startTimer(timeout * 1000);
     488}
     489
     490void CetonRTSP::StopKeepAlive()
     491{
     492    if (_timer)
     493    {
     494        killTimer(_timer);
     495        LOG(VB_RECORD, LOG_DEBUG, LOC + "Stop KeepAlive");
     496    }
     497    _timer = 0;
     498}
     499
     500void CetonRTSP::timerEvent(QTimerEvent*)
     501{
     502    QStringList dummy;
     503
     504    LOG(VB_RECORD, LOG_DEBUG, LOC + "Sending KeepAlive");
     505    ProcessRequest("GET_PARAMETER", NULL, false, false);
     506}
  • mythtv/libs/libmythtv/recorders/cetonrtsp.h

    r4668d05b58 r1ae6a1fd6  
    88#define CETONRTSP_H
    99
    10 #include <QObject>
    11 #include <QHash>
     10#include <QMap>
    1211#include <QString>
    1312#include <QMutex>
     13#include <QUrl>
    1414
    15 class QUrl;
     15class QTcpSocket;
     16class QUdpSocket;
    1617
    17 class CetonRTSP
     18typedef QMap<QString, QString> Params;
     19
     20class CetonRTSP : QObject
    1821{
     22    Q_OBJECT
     23
    1924  public:
    2025    explicit CetonRTSP(const QString &ip, uint tuner, ushort port);
    2126    explicit CetonRTSP(const QUrl&);
     27    ~CetonRTSP();
    2228
    2329    bool GetOptions(QStringList &options);
    2430    bool Describe(void);
    25     bool Setup(ushort clientPort1, ushort clientPort2);
     31    bool Setup(ushort clientPort1, ushort clientPort2,
     32               ushort &rtpPort, ushort &rtcpPort, uint32_t &ssrc);
    2633    bool Play(void);
    2734    bool Teardown(void);
    2835
    29   protected:
     36    void StartKeepAlive(void);
     37    void StopKeepAlive(void);
     38
     39protected:
    3040    bool ProcessRequest(
    31         const QString &method, const QStringList *headers = NULL);
     41        const QString &method, const QStringList *headers = NULL,
     42                        bool use_control = false, bool waitforanswer = true);
    3243
    3344  private:
    34     QString     _ip;
    35     ushort      _port;
     45    QStringList splitLines(const QByteArray &lines);
     46    QString readParamaters(const QString &key, Params &parameters);
     47    QUrl GetBaseUrl(void);
     48    void timerEvent(QTimerEvent*);
     49
     50    QTcpSocket *_socket;
    3651    uint        _sequenceNumber;
    3752    QString     _sessionId;
    38     QString     _requestUrl;
     53    QUrl        _requestUrl;
     54    QUrl        _controlUrl;
    3955
    4056    int                     _responseCode;
    4157    QString                 _responseMessage;
    42     QHash<QString,QString>  _responseHeaders;
     58    Params                  _responseHeaders;
    4359    QByteArray              _responseContent;
     60    int                     _timeout;
     61    int                     _timer;
    4462
    4563    static QMutex _rtspMutex;
  • mythtv/libs/libmythtv/recorders/iptvstreamhandler.cpp

    r4668d05b58 r1ae6a1fd6  
    1515// Qt headers
    1616#include <QUdpSocket>
     17#include <QByteArray>
     18#include <QHostInfo>
    1719
    1820// MythTV headers
     
    2325#include "rtpdatapacket.h"
    2426#include "rtpfecpacket.h"
     27#include "rtcpdatapacket.h"
    2528#include "mythlogging.h"
    2629#include "cetonrtsp.h"
     
    108111    m_tuning(tuning),
    109112    m_write_helper(NULL),
    110     m_buffer(NULL)
     113    m_buffer(NULL),
     114    m_rtsp_rtp_port(0),
     115    m_rtsp_rtcp_port(0),
     116    m_rtsp_ssrc(0)
    111117{
    112118    memset(m_sockets, 0, sizeof(m_sockets));
     
    134140        // Check RTSP capabilities
    135141        QStringList options;
    136         if (!(rtsp->GetOptions(options)     && options.contains("OPTIONS") &&
    137               options.contains("DESCRIBE")  && options.contains("SETUP")    &&
    138               options.contains("PLAY")      && options.contains("TEARDOWN")))
     142        if (!(rtsp->GetOptions(options)     && options.contains("DESCRIBE") &&
     143              options.contains("SETUP")     && options.contains("PLAY")     &&
     144              options.contains("TEARDOWN")))
    139145        {
    140146            LOG(VB_RECORD, LOG_ERR, LOC +
     
    156162        }
    157163
    158         tuning = IPTVTuningData(
    159             QString("rtp://%1@%2:0")
    160             .arg(m_tuning.GetURL(0).host())
    161             .arg(QHostAddress(QHostAddress::Any).toString()), 0,
    162             IPTVTuningData::kNone,
    163             QString("rtp://%1@%2:0")
    164             .arg(m_tuning.GetURL(0).host())
    165             .arg(QHostAddress(QHostAddress::Any).toString()), 0,
    166             "", 0);
     164        m_use_rtp_streaming = true;
     165
     166        QUrl urltuned = m_tuning.GetURL(0);
     167        urltuned.setScheme("rtp");
     168        urltuned.setPort(0);
     169        tuning = IPTVTuningData(urltuned.toString(), 0, IPTVTuningData::kNone,
     170                                urltuned.toString(), 0, "", 0);
    167171    }
    168172
    169173    bool error = false;
     174    int start_port = 0;
    170175    for (uint i = 0; i < IPTV_SOCKET_COUNT; i++)
    171176    {
     
    174179            continue;
    175180
     181        LOG(VB_RECORD, LOG_DEBUG, LOC +
     182            QString("setting up url[%1]:%2").arg(i).arg(url.toString()));
     183
     184        // always ensure we use consecutive port numbers
     185        int port = start_port ? start_port + 1 : url.port();
     186        QString host = url.host();
     187        QHostAddress dest_addr(host);
     188
     189        if (!host.isEmpty() && dest_addr.isNull())
     190        {
     191            // address is a hostname, attempts to resolve it
     192            QHostInfo info = QHostInfo::fromName(host);
     193            QList<QHostAddress> list = info.addresses();
     194
     195            if (list.isEmpty())
     196            {
     197                LOG(VB_RECORD, LOG_ERR, LOC +
     198                    QString("Can't resolve hostname:'%1'").arg(host));
     199            }
     200            else
     201            {
     202                for (int i=0; i < list.size(); i++)
     203                {
     204                    dest_addr = list[i];
     205                    if (list[i].protocol() == QAbstractSocket::IPv6Protocol)
     206                    {
     207                        // We prefer first IPv4
     208                        break;
     209                    }
     210                }
     211                LOG(VB_RECORD, LOG_DEBUG, LOC +
     212                    QString("resolved %1 as %2").arg(host).arg(dest_addr.toString()));
     213            }
     214        }
     215        bool ipv6 = dest_addr.protocol() == QAbstractSocket::IPv6Protocol;
     216        bool is_multicast = ipv6 ?
     217            dest_addr.isInSubnet(QHostAddress::parseSubnet("ff00::/8")) :
     218            (dest_addr.toIPv4Address() & 0xf0000000) == 0xe0000000;
     219
    176220        m_sockets[i] = new QUdpSocket();
     221        if (!is_multicast)
     222        {
     223            // this allow to filter incoming traffic, and make sure it's from
     224            // the requested server
     225            m_sender[i] = dest_addr;
     226        }
    177227        m_read_helpers[i] = new IPTVStreamHandlerReadHelper(
    178228            this, m_sockets[i], i);
     
    180230        // we need to open the descriptor ourselves so we
    181231        // can set some socket options
    182         int fd = socket(AF_INET, SOCK_DGRAM, 0); // create IPv4 socket
     232        int fd = socket(ipv6 ? AF_INET6 : AF_INET, SOCK_DGRAM, 0); // create IPv4 socket
    183233        if (fd < 0)
    184234        {
     
    190240        if (!tuning.GetBitrate(i))
    191241            buf_size = 2 * 1024 * 1024;
    192         int ok = setsockopt(fd, SOL_SOCKET, SO_RCVBUF,
     242        int err = setsockopt(fd, SOL_SOCKET, SO_RCVBUF,
    193243                            (char *)&buf_size, sizeof(buf_size));
    194         if (ok)
     244        if (err)
    195245        {
    196246            LOG(VB_GENERAL, LOG_INFO, LOC +
     
    198248                .arg(buf_size) + ENO);
    199249        }
    200         /*
    201           int broadcast = 1;
    202           ok = setsockopt(fd, SOL_SOCKET, SO_BROADCAST,
    203           (char *)&broadcast, sizeof(broadcast));
    204           if (ok)
    205           {
    206           LOG(VB_GENERAL, LOG_INFO, LOC +
    207           QString("Enabling broadcast failed") + ENO);
    208           }
    209         */
     250
    210251        m_sockets[i]->setSocketDescriptor(
    211252            fd, QAbstractSocket::UnconnectedState, QIODevice::ReadOnly);
    212253
    213         QHostAddress dest_addr(tuning.GetURL(i).host());
    214 
    215         if (!m_sockets[i]->bind(dest_addr, url.port()))
     254        // we bind to destination address if it's a multicast address, or
     255        // the local ones otherwise
     256        if (!m_sockets[i]->bind(is_multicast ?
     257                                dest_addr :
     258                                (ipv6 ? QHostAddress::AnyIPv6 : QHostAddress::Any),
     259                                port))
    216260        {
    217261            LOG(VB_GENERAL, LOG_ERR, LOC + "Binding to port failed.");
    218262            error = true;
    219263        }
    220 
    221         if (dest_addr != QHostAddress::Any)
    222         {
    223             //m_sockets[i]->joinMulticastGroup(dest_addr); // needs Qt 4.8
     264        else
     265        {
     266            start_port = m_sockets[i]->localPort();
     267        }
     268
     269        if (is_multicast)
     270        {
     271            m_sockets[i]->joinMulticastGroup(dest_addr);
    224272            LOG(VB_GENERAL, LOG_INFO, LOC + QString("Joining %1")
    225273                .arg(dest_addr.toString()));
    226             struct ip_mreq imr;
    227             memset(&imr, 0, sizeof(struct ip_mreq));
    228             imr.imr_multiaddr.s_addr = inet_addr(
    229                 dest_addr.toString().toLatin1().constData());
    230             imr.imr_interface.s_addr = htonl(INADDR_ANY);
    231             if (setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP,
    232                            &imr, sizeof(imr)) < 0)
    233             {
    234                 LOG(VB_GENERAL, LOG_ERR, LOC +
    235                     "setsockopt - IP_ADD_MEMBERSHIP " + ENO);
    236             }
    237         }
    238 
    239         if (!url.userInfo().isEmpty())
    240             m_sender[i] = QHostAddress(url.userInfo());
     274        }
     275
     276        if (!is_multicast && rtsp && i == 1)
     277        {
     278            m_rtcp_dest = dest_addr;
     279        }
    241280    }
    242281
     
    247286        else
    248287            m_buffer = new UDPPacketBuffer(tuning.GetBitrate(0));
    249         m_write_helper = new IPTVStreamHandlerWriteHelper(this);
     288        m_write_helper =
     289            new IPTVStreamHandlerWriteHelper(this);
    250290        m_write_helper->Start();
    251291    }
     
    254294    {
    255295        // Start Streaming
    256         if (!rtsp->Setup(m_sockets[0]->localPort(),
    257                          m_sockets[1]->localPort()) ||
     296        if (!rtsp->Setup(m_sockets[0]->localPort(), m_sockets[1]->localPort(),
     297                         m_rtsp_rtp_port, m_rtsp_rtcp_port, m_rtsp_ssrc) ||
    258298            !rtsp->Play())
    259299        {
     
    261301                "Starting recording (RTP initialization failed). Aborting.");
    262302            error = true;
     303        }
     304        if (m_rtsp_rtcp_port > 0)
     305        {
     306            m_write_helper->SendRTCPReport();
     307            m_write_helper->StartRTCPRR();
    263308        }
    264309    }
     
    304349            this,     SLOT(ReadPending()));
    305350}
     351
     352#define LOC_WH QString("IPTVSH(%1): ").arg(m_parent->_device)
    306353
    307354void IPTVStreamHandlerReadHelper::ReadPending(void)
     
    321368                                   &sender, &senderPort);
    322369            if (sender_null || sender == m_sender)
     370            {
    323371                m_parent->m_buffer->PushDataPacket(packet);
     372            }
     373            else
     374            {
     375                LOG(VB_RECORD, LOG_WARNING, LOC_WH +
     376                    QString("Received on socket(%1) %2 bytes from non expected "
     377                            "sender:%3 (expected:%4) ignoring")
     378                    .arg(m_stream).arg(data.size())
     379                    .arg(sender.toString()).arg(m_sender.toString()));
     380            }
    324381        }
    325382    }
     
    334391                                   &sender, &senderPort);
    335392            if (sender_null || sender == m_sender)
     393            {
    336394                m_parent->m_buffer->PushFECPacket(packet, m_stream - 1);
    337         }
    338     }
    339 }
    340 
    341 #define LOC_WH QString("IPTVSH(%1): ").arg(m_parent->_device)
    342 
    343 void IPTVStreamHandlerWriteHelper::timerEvent(QTimerEvent*)
    344 {
     395            }
     396            else
     397            {
     398                LOG(VB_RECORD, LOG_WARNING, LOC_WH +
     399                    QString("Received on socket(%1) %2 bytes from non expected "
     400                            "sender:%3 (expected:%4) ignoring")
     401                    .arg(m_stream).arg(data.size())
     402                    .arg(sender.toString()).arg(m_sender.toString()));
     403            }
     404        }
     405    }
     406}
     407
     408IPTVStreamHandlerWriteHelper::IPTVStreamHandlerWriteHelper(IPTVStreamHandler *p)
     409  : m_parent(p),                m_timer(0),             m_timer_rtcp(0),
     410    m_last_sequence_number(0),  m_last_timestamp(0),
     411    m_lost(0),                  m_lost_interval(0)
     412{
     413}
     414
     415IPTVStreamHandlerWriteHelper::~IPTVStreamHandlerWriteHelper()
     416{
     417    if (m_timer)
     418    {
     419        killTimer(m_timer);
     420    }
     421    if (m_timer_rtcp)
     422    {
     423        killTimer(m_timer_rtcp);
     424    }
     425    m_timer = 0;
     426    m_timer_rtcp = 0;
     427    m_parent = NULL;
     428}
     429
     430void IPTVStreamHandlerWriteHelper::timerEvent(QTimerEvent* event)
     431{
     432    if (event->timerId() == m_timer_rtcp)
     433    {
     434        SendRTCPReport();
     435        return;
     436    }
     437
    345438    if (!m_parent->m_buffer->HasAvailablePacket())
    346439        return;
     
    402495                    QString("Sequence number mismatch %1!=%2")
    403496                    .arg(seq_num).arg(exp_seq_num));
     497                if (seq_num > exp_seq_num)
     498                {
     499                    m_lost_interval = seq_num - exp_seq_num;
     500                    m_lost += m_lost_interval;
     501                }
    404502            }
    405503            m_last_sequence_number = seq_num;
     504            m_last_timestamp = ts_packet.GetTimeStamp();
     505            LOG(VB_RECORD, LOG_DEBUG,
     506                QString("Processing RTP packet(seq:%1 ts:%2)")
     507                .arg(m_last_sequence_number).arg(m_last_timestamp));
    406508
    407509            m_parent->_listener_lock.lock();
     
    425527            }
    426528        }
    427 
    428529        m_parent->m_buffer->FreePacket(packet);
    429530    }
    430531}
     532
     533void IPTVStreamHandlerWriteHelper::SendRTCPReport(void)
     534{
     535    if (m_parent->m_rtcp_dest.isNull())
     536    {
     537        // no point sending data if we don't know where to
     538        return;
     539    }
     540    int seq_delta = m_last_sequence_number - m_previous_last_sequence_number;
     541    RTCPDataPacket rtcp =
     542        RTCPDataPacket(m_last_timestamp, m_last_timestamp + RTCP_TIMER * 1000,
     543                       m_last_sequence_number, m_last_sequence_number + seq_delta,
     544                       m_lost, m_lost_interval, m_parent->m_rtsp_ssrc);
     545    QByteArray buf = rtcp.GetData();
     546
     547    LOG(VB_RECORD, LOG_DEBUG, LOC_WH +
     548        QString("Sending RTCPReport to %1:%2")
     549        .arg(m_parent->m_rtcp_dest.toString())
     550        .arg(m_parent->m_rtsp_rtcp_port));
     551    m_parent->m_sockets[1]->writeDatagram(buf.constData(), buf.size(),
     552                                          m_parent->m_rtcp_dest, m_parent->m_rtsp_rtcp_port);
     553    m_previous_last_sequence_number = m_last_sequence_number;
     554}
  • mythtv/libs/libmythtv/recorders/iptvstreamhandler.h

    r4668d05b58 r1ae6a1fd6  
    1616#include "streamhandler.h"
    1717
    18 #define IPTV_SOCKET_COUNT 3
     18#define IPTV_SOCKET_COUNT   3
     19#define RTCP_TIMER          10
    1920
    2021class IPTVStreamHandler;
     
    4647    Q_OBJECT
    4748
    48   public:
    49     IPTVStreamHandlerWriteHelper(IPTVStreamHandler *p) :
    50         m_parent(p), m_timer(0), m_last_sequence_number(0) { }
    51     ~IPTVStreamHandlerWriteHelper()
    52     {
    53         killTimer(m_timer);
    54         m_timer = 0;
    55         m_parent = NULL;
    56     }
     49public:
     50    IPTVStreamHandlerWriteHelper(IPTVStreamHandler *);
     51    ~IPTVStreamHandlerWriteHelper();
    5752
    5853    void Start(void)
     
    6055        m_timer = startTimer(200);
    6156    }
     57    void StartRTCPRR(void)
     58    {
     59        m_timer_rtcp = startTimer(RTCP_TIMER * 1000);
     60    }
    6261
    63   private:
     62    void SendRTCPReport(void);
     63
     64private:
    6465    void timerEvent(QTimerEvent*);
    6566
    66   private:
     67private:
    6768    IPTVStreamHandler *m_parent;
    68     int m_timer;
    69     uint m_last_sequence_number;
     69    int m_timer, m_timer_rtcp;
     70    uint m_last_sequence_number, m_last_timestamp, m_previous_last_sequence_number;
     71    int m_lost, m_lost_interval;
    7072};
    7173
     
    98100    IPTVStreamHandlerWriteHelper *m_write_helper;
    99101    PacketBuffer *m_buffer;
     102
    100103    bool m_use_rtp_streaming;
     104    ushort m_rtsp_rtp_port, m_rtsp_rtcp_port;
     105    uint32_t m_rtsp_ssrc;
     106    QHostAddress m_rtcp_dest;
    101107
    102108    // for implementing Get & Return
     
    104110    static QMap<QString, IPTVStreamHandler*> s_handlers;
    105111    static QMap<QString, uint>               s_handlers_refcnt;
     112
     113private:
     114    void timerEvent(QTimerEvent*);
     115
    106116};
    107117
Note: See TracChangeset for help on using the changeset viewer.