Ticket #10019: 0002-ringbuffer-Adapt-readahead-for-low-bit-rate-high-lat.patch

File 0002-ringbuffer-Adapt-readahead-for-low-bit-rate-high-lat.patch, 10.4 KB (added by Lawrence Rust <lvr@…>, 13 years ago)

Fix ringbuffer problems with high latency streams

  • mythtv/libs/libmythtv/ringbuffer.cpp

    From ca919253268a1cd137fe91b9c7d3390c6b9be9bd Mon Sep 17 00:00:00 2001
    From: Lawrence Rust <lvr@softsystem.co.uk>
    Date: Sun, 31 Jul 2011 17:14:52 +0200
    Subject: [PATCH 2/2] ringbuffer: Adapt readahead for low bit rate / high latency streams
    
    DVB-S radio programs are often low bit rate (64Kbps..256Kbps) and
    experience occasional high packet latency.
    
    MHEG interaction streams are http sourced, have medium bit rate
    (250Kbps..1000Kbps) and often experience congestion resulting in high
    packet latency.
    
    - Add hysteresis to the ringbuffer readahead.  This avoids audio
    underrun and rapid play/wait cycling in http video streams.
    
    - RingBuffer::WaitForReadsAllowed incease timeout to 30 Secs to allow
     for http congestion.
    
    - RingBuffer::WaitFailAvail returns bytes available if less than that
     requested for low bit rate streams (where fill_min is < 32kB).
    
    - Fix a bug in RingBuffer::ReadPriv where if WaitForReadsAllowed times out
      then the next read would cause data to be returned out of sequence
    
    Signed-off-by: Lawrence Rust <lvr@softsystem.co.uk>
    ---
     mythtv/libs/libmythtv/ringbuffer.cpp |  118 +++++++++++++++++++++-------------
     mythtv/libs/libmythtv/ringbuffer.h   |    2 +
     2 files changed, 76 insertions(+), 44 deletions(-)
    
    diff --git a/mythtv/libs/libmythtv/ringbuffer.cpp b/mythtv/libs/libmythtv/ringbuffer.cpp
    index ddd018c..c21d118 100644
    a b bool RingBuffer::IsNearEnd(double fps, uint vvf) const 
    407407    // WARNING: readahead_frames can greatly overestimate or underestimate
    408408    //          the number of frames available in the read ahead buffer
    409409    //          when rh_frames is less than the keyframe distance.
     410    if (fps == 0.)
     411        return false;
    410412    double bytes_per_frame = kbits_per_sec * (1000.0/8.0) / fps;
     413    if (bytes_per_frame == 0.)
     414        return false;
    411415    double readahead_frames = sz / bytes_per_frame;
    412416
    413417    bool near_end = ((vvf + readahead_frames) < 10.0) || (sz < rbs*1.5);
    int RingBuffer::ReadBufAvail(void) const 
    446450    return ret;
    447451}
    448452
     453inline int RingBuffer::ReadBufUsed() const
     454{
     455    return (bufferSize - 1) - ReadBufFree();
     456}
     457
     458inline bool RingBuffer::ReadsAllowed() const
     459{
     460    return ateof || setswitchtonext || commserror ||
     461        // Ensure some hysteresis around fill_min
     462        ReadBufUsed() >= (readsallowed ? 1 : fill_min);
     463}
     464
    449465/** \fn RingBuffer::ResetReadAhead(long long)
    450466 *  \brief Restart the read-ahead thread at the 'newinternal' position.
    451467 *
    void RingBuffer::run(void) 
    888904            }
    889905        }
    890906
    891         int used = bufferSize - ReadBufFree();
    892 
    893907        bool reads_were_allowed = readsallowed;
    894908
    895         if ((0 == read_return) || (numfailures > 5) ||
    896             (readsallowed != (used >= fill_min || ateof ||
    897                               setswitchtonext || commserror)))
     909        if (0 == read_return || numfailures > 5 || ReadsAllowed() != readsallowed)
    898910        {
    899911            // If readpos changes while the lock is released
    900912            // we should not handle the 0 read_return now.
    void RingBuffer::run(void) 
    905917
    906918            commserror |= (numfailures > 5);
    907919
    908             readsallowed = used >= fill_min || ateof ||
    909                 setswitchtonext || commserror;
     920            bool bReadsAllowed = ReadsAllowed();
     921            if (readsallowed != bReadsAllowed)
     922            {
     923                readsallowed = bReadsAllowed;
     924                LOG(VB_FILE, LOG_INFO, LOC + (bReadsAllowed ?
     925                    QString("Reads allowed: %1 bytes available").arg(ReadBufUsed()) :
     926                    QString("Rebuffering %1..%2").arg(ReadBufUsed()).arg(fill_min)) );
     927            }
    910928
    911929            if (0 == read_return && old_readpos == readpos)
    912930            {
    void RingBuffer::run(void) 
    929947
    930948            rwlock.unlock();
    931949            rwlock.lockForRead();
    932             used = bufferSize - ReadBufFree();
    933950        }
    934951
    935952        LOG(VB_FILE, LOG_DEBUG, LOC + "@ end of read ahead loop");
    936953
    937954        if (!readsallowed || commserror || ateof || setswitchtonext ||
    938             (wanttoread <= used && wanttoread > 0))
     955            (wanttoread <= ReadBufUsed() && wanttoread > 0))
    939956        {
    940957            // To give other threads a good chance to handle these
    941958            // conditions, even if they are only requesting a read lock
    void RingBuffer::run(void) 
    943960            generalWait.wakeAll();
    944961            rwlock.unlock();
    945962            usleep(5 * 1000);
    946             rwlock.lockForRead();           
     963            rwlock.lockForRead();
    947964        }
    948965        else
    949966        {
    950967            // yield if we have nothing to do...
    951968            if (!request_pause && reads_were_allowed &&
    952                 (used >= fill_threshold || ateof || setswitchtonext))
     969                (ReadBufUsed() >= fill_threshold || ateof || setswitchtonext || ignoreliveeof))
    953970            {
    954971                generalWait.wait(&rwlock, 50);
    955972            }
    void RingBuffer::run(void) 
    959976                generalWait.wakeAll();
    960977                rwlock.unlock();
    961978                usleep(5 * 1000);
    962                 rwlock.lockForRead();           
     979                rwlock.lockForRead();
    963980            }
    964981        }
    965982    }
    bool RingBuffer::WaitForReadsAllowed(void) 
    10151032    while (!readsallowed && !stopreads &&
    10161033           !request_pause && !commserror && readaheadrunning)
    10171034    {
    1018         generalWait.wait(&rwlock, 1000);
    1019         if (!readsallowed && t.elapsed() > 1000)
     1035        // The timeout should allow for congestion of internet streamed media
     1036        if (t.elapsed() >= 30000)
    10201037        {
    1021             LOG(VB_GENERAL, LOG_WARNING, LOC +
    1022                 "Taking too long to be allowed to read..");
    1023 
    1024             if (t.elapsed() > 10000)
    1025             {
    1026                 LOG(VB_GENERAL, LOG_ERR, LOC + "Took more than 10 seconds to "
    1027                                                "be allowed to read, aborting.");
    1028                 return false;
    1029             }
     1038            LOG(VB_GENERAL, LOG_ERR, LOC +
     1039                QString("Waited %1 seconds to be allowed to read, aborting.")
     1040                .arg(t.elapsed()/1000) );
     1041            return false;
    10301042        }
     1043
     1044        generalWait.wait(&rwlock, 250);
    10311045    }
    10321046
    1033     return readsallowed;
     1047    if (t.elapsed() >= 500)
     1048    {
     1049        LOG(VB_GENERAL, LOG_WARNING, LOC +
     1050            QString("Waited %1 mS to be allowed to read (avail=%2 fill_min=%3)..")
     1051            .arg(t.elapsed()).arg(ReadBufAvail()).arg(fill_min) );
     1052    }
     1053    return true;
    10341054}
    10351055
    10361056bool RingBuffer::WaitForAvail(int count)
    bool RingBuffer::WaitForAvail(int count) 
    10541074        generalWait.wakeAll();
    10551075    }
    10561076
    1057     MythTimer t;
    1058     t.start();
    1059     while ((avail < count) && !stopreads &&
    1060            !request_pause && !commserror && readaheadrunning)
     1077    MythTimer t; t.start();
     1078    wanttoread = count;
     1079    while (avail < count && !stopreads && !request_pause &&
     1080            !commserror && readaheadrunning)
    10611081    {
    1062         wanttoread = count;
    1063         generalWait.wait(&rwlock, 250);
    1064         avail = ReadBufAvail();
    1065 
    1066         if (ateof && avail < count)
    1067             count = avail;
    1068 
    1069         if (avail < count)
     1082        uint elapsed = t.elapsed();
     1083        if (elapsed >= 10000)
    10701084        {
    10711085            int elapsed = t.elapsed();
    10721086            if (elapsed > 500 && low_buffers && avail >= fill_min)
    bool RingBuffer::WaitForAvail(int count) 
    10931107                return false;
    10941108            }
    10951109        }
     1110        else if (elapsed >= 100 && avail)
     1111        {
     1112            LOG(VB_GENERAL, LOG_INFO, LOC +
     1113                QString("Waited %1 mS for %2 bytes (wanted %3)")
     1114                .arg(elapsed).arg(avail).arg(count) );
     1115            count = avail;
     1116            generalWait.wakeAll();
     1117            break;
     1118        }
     1119
     1120        generalWait.wait(&rwlock, 100);
     1121        avail = ReadBufAvail();
     1122        if (ateof && avail < count)
     1123            count = avail;
    10961124    }
    10971125
    10981126    wanttoread = 0;
    int RingBuffer::ReadDirect(void *buf, int count, bool peek) 
    11541182        if (new_pos != old_pos)
    11551183        {
    11561184            LOG(VB_GENERAL, LOG_ERR, LOC +
    1157                 QString("Peek() Failed to return from new "
     1185                QString("Seek() Failed to return from new "
    11581186                        "position %1 to old position %2, now "
    11591187                        "at position %3")
    11601188                    .arg(old_pos - ret).arg(old_pos).arg(new_pos));
    int RingBuffer::ReadDirect(void *buf, int count, bool peek) 
    11741202 */
    11751203int RingBuffer::ReadPriv(void *buf, int count, bool peek)
    11761204{
    1177     QString loc_desc = QString("ReadPriv(..%1, %2)")
     1205    const QString loc_desc = QString("ReadPriv(..%1, %2)")
    11781206        .arg(count).arg(peek?"peek":"normal");
    1179     LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc +
    1180         QString(" @%1 -- begin").arg(rbrpos));
    11811207
    11821208    rwlock.lockForRead();
     1209
     1210    LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc +
     1211        QString(" @%1 avail=%2 -- begin").arg(rbrpos).arg(ReadBufAvail()));
     1212
    11831213    if (writemode)
    11841214    {
    11851215        LOG(VB_GENERAL, LOG_ERR, LOC + loc_desc +
    int RingBuffer::ReadPriv(void *buf, int count, bool peek) 
    12101240        if (request_pause || stopreads ||
    12111241            !readaheadrunning || (ignorereadpos >= 0))
    12121242        {
     1243            LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc + " -- direct read");
    12131244            int ret = ReadDirect(buf, count, peek);
    12141245            LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc +
    12151246                QString(": ReadDirect checksum %1")
    int RingBuffer::ReadPriv(void *buf, int count, bool peek) 
    12241255    if (!WaitForReadsAllowed())
    12251256    {
    12261257        LOG(VB_FILE, LOG_NOTICE, LOC + loc_desc + ": !WaitForReadsAllowed()");
    1227         rwlock.unlock();
    1228         stopreads = true; // this needs to be outside the lock
    1229         rwlock.lockForWrite();
    1230         wanttoread = 0;
     1258        // NB don't set stopreads or else the next ReadPriv will call ReadDirect
     1259        // which, if there's any readahead, will cause data to be returned out
     1260        // of sequence
    12311261        rwlock.unlock();
    12321262        return 0;
    12331263    }
  • mythtv/libs/libmythtv/ringbuffer.h

    diff --git a/mythtv/libs/libmythtv/ringbuffer.h b/mythtv/libs/libmythtv/ringbuffer.h
    index 45bd956..d7632ff 100644
    a b class MTV_PUBLIC RingBuffer : protected MThread 
    167167
    168168    int ReadBufFree(void) const;
    169169    int ReadBufAvail(void) const;
     170    int ReadBufUsed() const;
     171    bool ReadsAllowed() const;
    170172
    171173    void ResetReadAhead(long long newinternal);
    172174    void KillReadAheadThread(void);