Ticket #10019: 0001-ringbuffer-Adapt-readahead-for-low-bit-rate-64kbps-r.patch

File 0001-ringbuffer-Adapt-readahead-for-low-bit-rate-64kbps-r.patch, 16.3 KB (added by Lawrence Rust <lvr@…>, 13 years ago)
  • mythtv/libs/libmythtv/fileringbuffer.cpp

    From 7fd9d945f0e0abaf2f381adb9e763a3e9ee6b78b Mon Sep 17 00:00:00 2001
    From: Lawrence Rust <lvr@softsystem.co.uk>
    Date: Sun, 31 Jul 2011 17:14:52 +0200
    Subject: [PATCH 1/9] ringbuffer: Adapt readahead for low bit rate (64kbps radio) streams
    
    - The existing readahead block size is too large for 64/128 kBps audio
    streams and can cause audio underruns.
    
    - Make WaitFailAvail return the bytes available if less than that requested
    for low bit rate streams (where fill_min is < 32kB).
    
    - Make fileringbuffer::safe_read retry if reading from a remote file. Low
    bit rate sources such as radio can have considerable latency.
    
    Signed-off-by: Lawrence Rust <lvr@softsystem.co.uk>
    ---
     mythtv/libs/libmythtv/fileringbuffer.cpp |   48 +++++---
     mythtv/libs/libmythtv/ringbuffer.cpp     |  193 ++++++++++++++++--------------
     mythtv/libs/libmythtv/ringbuffer.h       |    2 +
     3 files changed, 138 insertions(+), 105 deletions(-)
    
    diff --git a/mythtv/libs/libmythtv/fileringbuffer.cpp b/mythtv/libs/libmythtv/fileringbuffer.cpp
    index a3b73f3..f4b90d7 100644
    a b bool FileRingBuffer::OpenFile(const QString &lfilename, uint retry_ms) 
    356356    commserror = false;
    357357    numfailures = 0;
    358358
    359     rawbitrate = 8000;
     359    // The initial bitrate needs to be set with consideration for low bit rate
     360    // streams (e.g. radio @ 64Kbps) such that fill_min bytes are received
     361    // in a reasonable time period to enable decoders to peek the first few KB
     362    // to determine type & settings.
     363    if (is_local)
     364        rawbitrate = 256; // Allow for radio
     365    else
     366        rawbitrate = 128; // remotefile
     367
    360368    CalcReadAheadThresh();
    361369
    362370    bool ok = fd2 >= 0 || remotefile;
    int FileRingBuffer::safe_read(int fd, void *data, uint sz) 
    458466 */
    459467int FileRingBuffer::safe_read(RemoteFile *rf, void *data, uint sz)
    460468{
    461     int ret = rf->Read(data, sz);
    462     if (ret < 0)
    463     {
    464         LOG(VB_GENERAL, LOG_ERR, LOC +
    465             "safe_read(RemoteFile* ...): read failed");
    466            
    467         poslock.lockForRead();
    468         rf->Seek(internalreadpos - readAdjust, SEEK_SET);
    469         poslock.unlock();
    470         numfailures++;
    471     }
    472     else if (ret == 0)
     469    for (int retries = 0; ; ++retries)
    473470    {
    474         LOG(VB_FILE, LOG_INFO, LOC +
    475             "safe_read(RemoteFile* ...): at EOF");
     471        int ret = rf->Read(data, sz);
     472        if (ret > 0)
     473            return ret;
     474        else if (ret < 0)
     475        {
     476            LOG(VB_GENERAL, LOG_ERR, LOC +
     477                "safe_read(RemoteFile* ...): read failed");
     478
     479            poslock.lockForRead();
     480            rf->Seek(internalreadpos - readAdjust, SEEK_SET);
     481            poslock.unlock();
     482            numfailures++;
     483            return ret;
     484        }
     485        // Retry for 300mS if liveTV for low bit rate (radio) streams
     486        else if (!livetvchain || retries >= 5)
     487            break;
     488
     489        usleep(60000);
    476490    }
    477491
    478     return ret;
     492    LOG(VB_FILE, LOG_INFO, LOC +
     493        "safe_read(RemoteFile* ...): at EOF");
     494    return 0;
    479495}
    480496
    481497long long FileRingBuffer::GetReadPosition(void) const
  • mythtv/libs/libmythtv/ringbuffer.cpp

    diff --git a/mythtv/libs/libmythtv/ringbuffer.cpp b/mythtv/libs/libmythtv/ringbuffer.cpp
    index 1caacef..cd3b6ce 100644
    a b void RingBuffer::UpdateRawBitrate(uint raw_bitrate) 
    289289{
    290290    LOG(VB_FILE, LOG_INFO, LOC +
    291291        QString("UpdateRawBitrate(%1Kb)").arg(raw_bitrate));
    292     if (raw_bitrate < 2500)
     292    // NB DVB-S radio can be 64kbps
     293    if (raw_bitrate < 64)
    293294    {
    294295        LOG(VB_FILE, LOG_INFO, LOC +
    295296            QString("UpdateRawBitrate(%1Kb) - ignoring bitrate,")
    void RingBuffer::SetBufferSizeFactors(bool estbitrate, bool matroska) 
    339340 */
    340341void RingBuffer::CalcReadAheadThresh(void)
    341342{
    342     uint estbitrate = 0;
    343 
    344343    readsallowed   = false;
    345344    readblocksize  = max(readblocksize, CHUNK);
    346345
    347     // loop without sleeping if the buffered data is less than this
    348     fill_threshold = 7 * bufferSize / 8;
    349 
    350     const uint KB32  =  32*1024;
    351     const uint KB64  =  64*1024;
    352     const uint KB128 = 128*1024;
    353     const uint KB256 = 256*1024;
    354     const uint KB512 = 512*1024;
    355 
    356     estbitrate     = (uint) max(abs(rawbitrate * playspeed),
     346    uint estbitrate = (uint) max(abs(rawbitrate * playspeed),
    357347                                0.5f * rawbitrate);
    358     estbitrate     = min(rawbitrate * 3, estbitrate);
    359     int rbs        = (estbitrate > 2500)  ? KB64  : KB32;
    360     rbs            = (estbitrate > 5000)  ? KB128 : rbs;
    361     rbs            = (estbitrate > 9000)  ? KB256 : rbs;
    362     rbs            = (estbitrate > 18000) ? KB512 : rbs;
    363     readblocksize  = max(rbs,readblocksize);
     348    estbitrate      = min(rawbitrate * 3, estbitrate);
     349
     350    int const KB1 = 1024;
     351    int const rbs = estbitrate > 18000 ? 512*KB1 :
     352                    estbitrate > 9000  ? 256*KB1 :
     353                    estbitrate > 5000  ? 128*KB1 :
     354                    estbitrate > 2500  ? 64*KB1 :
     355                    estbitrate >  250  ? 32*KB1 : // 32KB~=0.25s @ 1Mbps
     356                                         16*KB1 ;
     357     if (rbs < CHUNK)
     358         readblocksize = rbs;
     359     else
     360         readblocksize = max(rbs,readblocksize);
    364361
    365362    // minumum seconds of buffering before allowing read
    366     float secs_min = 0.25;
     363    float const secs_min = 0.25f;
    367364    // set the minimum buffering before allowing ffmpeg read
    368     fill_min        = (uint) ((estbitrate * secs_min) * 0.125f);
    369     // make this a multiple of ffmpeg block size..
    370     fill_min        = ((fill_min / KB32) + 1) * KB32;
     365    fill_min = (uint) (estbitrate * ((KB1 / 8) * secs_min));
     366    if (fill_min < readblocksize)
     367        fill_min = readblocksize;
     368    if (fill_min > CHUNK)
     369        fill_min = ((fill_min + CHUNK - 1) / CHUNK) * CHUNK;
     370    if ((uint)fill_min >= bufferSize)
     371        fill_min = bufferSize - 1;
     372
     373    // loop without sleeping if the buffered data is less than this
     374    fill_threshold = max((uint)fill_min, bufferSize / 8);
    371375
    372376    LOG(VB_FILE, LOG_INFO, LOC +
    373377        QString("CalcReadAheadThresh(%1 Kb)\n\t\t\t -> "
    bool RingBuffer::IsNearEnd(double fps, uint vvf) const 
    389393    // WARNING: readahead_frames can greatly overestimate or underestimate
    390394    //          the number of frames available in the read ahead buffer
    391395    //          when rh_frames is less than the keyframe distance.
     396    if (fps == 0.)
     397        return false;
    392398    double bytes_per_frame = kbits_per_sec * (1000.0/8.0) / fps;
     399    if (bytes_per_frame == 0.)
     400        return false;
    393401    double readahead_frames = sz / bytes_per_frame;
    394402
    395403    bool near_end = ((vvf + readahead_frames) < 10.0) || (sz < rbs*1.5);
    int RingBuffer::ReadBufAvail(void) const 
    428436    return ret;
    429437}
    430438
     439inline int RingBuffer::ReadBufUsed() const
     440{
     441    return (bufferSize - 1) - ReadBufFree();
     442}
     443
     444inline bool RingBuffer::ReadsAllowed() const
     445{
     446    return ateof || setswitchtonext || commserror ||
     447        // Ensure some hysteresis around fill_min
     448        ReadBufUsed() >= (readsallowed ? 1 : fill_min);
     449}
     450
    431451/** \fn RingBuffer::ResetReadAhead(long long)
    432452 *  \brief Restart the read-ahead thread at the 'newinternal' position.
    433453 *
    void RingBuffer::run(void) 
    780800                readtimeavg = (readtimeavg * 9 + readinterval) / 10;
    781801
    782802                if (readtimeavg < 150 &&
    783                     (uint)readblocksize < (BUFFER_SIZE_MINIMUM >>2))
     803                    (uint)readblocksize < (BUFFER_SIZE_MINIMUM >>2) &&
     804                    readblocksize >= CHUNK)
    784805                {
    785806                    int old_block_size = readblocksize;
    786807                    readblocksize = 3 * readblocksize / 2;
    void RingBuffer::run(void) 
    869890            }
    870891        }
    871892
    872         int used = bufferSize - ReadBufFree();
    873 
    874893        bool reads_were_allowed = readsallowed;
    875894
    876         if ((0 == read_return) || (numfailures > 5) ||
    877             (readsallowed != (used >= fill_min || ateof ||
    878                               setswitchtonext || commserror)))
     895        if ((0 == read_return) || (numfailures > 5) || ReadsAllowed() != readsallowed)
    879896        {
    880897            // If readpos changes while the lock is released
    881898            // we should not handle the 0 read_return now.
    void RingBuffer::run(void) 
    886903
    887904            commserror |= (numfailures > 5);
    888905
    889             readsallowed = used >= fill_min || ateof ||
    890                 setswitchtonext || commserror;
     906            bool bReadsAllowed = ReadsAllowed();
     907            if (readsallowed != bReadsAllowed)
     908            {
     909                readsallowed = bReadsAllowed;
     910                LOG(VB_FILE, LOG_INFO, LOC + (bReadsAllowed ?
     911                    QString("Reads allowed: %1 bytes available").arg(ReadBufUsed()) :
     912                    QString("Rebuffering %1..%2").arg(ReadBufUsed()).arg(fill_min)) );
     913            }
    891914
    892915            if (0 == read_return && old_readpos == readpos)
    893916            {
    void RingBuffer::run(void) 
    910933
    911934            rwlock.unlock();
    912935            rwlock.lockForRead();
    913             used = bufferSize - ReadBufFree();
    914936        }
    915937
    916938        LOG(VB_FILE, LOG_DEBUG, LOC + "@ end of read ahead loop");
    917939
    918940        if (!readsallowed || commserror || ateof || setswitchtonext ||
    919             (wanttoread <= used && wanttoread > 0))
     941            (wanttoread <= ReadBufUsed() && wanttoread > 0))
    920942        {
    921943            // To give other threads a good chance to handle these
    922944            // conditions, even if they are only requesting a read lock
    void RingBuffer::run(void) 
    930952        {
    931953            // yield if we have nothing to do...
    932954            if (!request_pause && reads_were_allowed &&
    933                 (used >= fill_threshold || ateof || setswitchtonext))
     955                (ReadBufUsed() >= fill_threshold || ateof || setswitchtonext || ignoreliveeof))
    934956            {
    935957                generalWait.wait(&rwlock, 50);
    936958            }
    void RingBuffer::run(void) 
    940962                generalWait.wakeAll();
    941963                rwlock.unlock();
    942964                usleep(5 * 1000);
    943                 rwlock.lockForRead();           
     965                rwlock.lockForRead();
    944966            }
    945967        }
    946968    }
    bool RingBuffer::WaitForReadsAllowed(void) 
    9961018    while (!readsallowed && !stopreads &&
    9971019           !request_pause && !commserror && readaheadrunning)
    9981020    {
    999         generalWait.wait(&rwlock, 1000);
    1000         if (!readsallowed && t.elapsed() > 1000)
     1021        // The timeout should allow for congestion of internet streamed media
     1022        if (t.elapsed() >= 30000)
    10011023        {
    1002             LOG(VB_GENERAL, LOG_WARNING, LOC +
    1003                 "Taking too long to be allowed to read..");
    1004 
    1005             if (t.elapsed() > 10000)
    1006             {
    1007                 LOG(VB_GENERAL, LOG_ERR, LOC + "Took more than 10 seconds to "
    1008                                                "be allowed to read, aborting.");
    1009                 return false;
    1010             }
     1024            LOG(VB_GENERAL, LOG_ERR, LOC +
     1025                QString("Waited %1 seconds to be allowed to read, aborting.")
     1026                .arg(t.elapsed()/1000) );
     1027            return false;
    10111028        }
     1029
     1030        generalWait.wait(&rwlock, 250);
    10121031    }
    10131032
    1014     return readsallowed;
     1033    if (t.elapsed() >= 500)
     1034    {
     1035        LOG(VB_GENERAL, LOG_WARNING, LOC +
     1036            QString("Waited %1 mS to be allowed to read (avail=%2 fill_min=%3)..")
     1037            .arg(t.elapsed()).arg(ReadBufAvail()).arg(fill_min) );
     1038    }
     1039    return true;
    10151040}
    10161041
    10171042bool RingBuffer::WaitForAvail(int count)
    bool RingBuffer::WaitForAvail(int count) 
    10351060        generalWait.wakeAll();
    10361061    }
    10371062
    1038     MythTimer t;
    1039     t.start();
    1040     while ((avail < count) && !stopreads &&
    1041            !request_pause && !commserror && readaheadrunning)
     1063    MythTimer t; t.start();
     1064    wanttoread = count;
     1065    while (avail < count && !stopreads && !request_pause &&
     1066            !commserror && readaheadrunning)
    10421067    {
    1043         wanttoread = count;
    1044         generalWait.wait(&rwlock, 250);
    1045         avail = ReadBufAvail();
    1046 
    1047         if (ateof && avail < count)
    1048             count = avail;
    1049 
    1050         if (avail < count)
     1068        uint elapsed = t.elapsed();
     1069        if (elapsed >= 10000)
    10511070        {
    1052             int elapsed = t.elapsed();
    1053             if  (((elapsed > 250)  && (elapsed < 500))  ||
    1054                  ((elapsed > 500)  && (elapsed < 750))  ||
    1055                  ((elapsed > 1000) && (elapsed < 1250)) ||
    1056                  ((elapsed > 2000) && (elapsed < 2250)) ||
    1057                  ((elapsed > 4000) && (elapsed < 4250)) ||
    1058                  ((elapsed > 8000) && (elapsed < 8250)) ||
    1059                  ((elapsed > 9000)))
    1060             {
    1061                 LOG(VB_GENERAL, LOG_INFO, LOC + "Waited " +
    1062                     QString("%1").arg((elapsed / 250) * 0.25f, 3, 'f', 1) +
    1063                     " seconds for data \n\t\t\tto become available..." +
    1064                     QString(" %2 < %3") .arg(avail).arg(count));
    1065             }
    1066 
    1067             if (elapsed > 16000)
    1068             {
    1069                 LOG(VB_GENERAL, LOG_ERR, LOC + "Waited " +
    1070                     QString("%1").arg(elapsed/1000) +
    1071                     " seconds for data, aborting.");
    1072                 return false;
    1073             }
     1071            LOG(VB_GENERAL, LOG_ERR, LOC +
     1072                QString("Timed out waiting for data available (wanted=%1, avail=%2)")
     1073                .arg(count).arg(avail) );
     1074            break;
    10741075        }
     1076        else if (elapsed >= 100 && avail)
     1077        {
     1078            LOG(VB_GENERAL, LOG_INFO, LOC +
     1079                QString("Waited %1 mS for %2 bytes (wanted %3)")
     1080                .arg(elapsed).arg(avail).arg(count) );
     1081            count = avail;
     1082            generalWait.wakeAll();
     1083            break;
     1084        }
     1085
     1086        generalWait.wait(&rwlock, 100);
     1087        avail = ReadBufAvail();
    10751088    }
    10761089
    10771090    wanttoread = 0;
    int RingBuffer::ReadDirect(void *buf, int count, bool peek) 
    11331146        if (new_pos != old_pos)
    11341147        {
    11351148            LOG(VB_GENERAL, LOG_ERR, LOC +
    1136                 QString("Peek() Failed to return from new "
     1149                QString("Seek() Failed to return from new "
    11371150                        "position %1 to old position %2, now "
    11381151                        "at position %3")
    11391152                    .arg(old_pos - ret).arg(old_pos).arg(new_pos));
    int RingBuffer::ReadDirect(void *buf, int count, bool peek) 
    11531166 */
    11541167int RingBuffer::ReadPriv(void *buf, int count, bool peek)
    11551168{
    1156     QString loc_desc = QString("ReadPriv(..%1, %2)")
     1169    const QString loc_desc = QString("ReadPriv(..%1, %2)")
    11571170        .arg(count).arg(peek?"peek":"normal");
    1158     LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc +
    1159         QString(" @%1 -- begin").arg(rbrpos));
    11601171
    11611172    rwlock.lockForRead();
     1173
     1174    LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc +
     1175        QString(" @%1 avail=%2 -- begin").arg(rbrpos).arg(ReadBufAvail()));
     1176
    11621177    if (writemode)
    11631178    {
    11641179        LOG(VB_GENERAL, LOG_ERR, LOC + loc_desc +
    int RingBuffer::ReadPriv(void *buf, int count, bool peek) 
    11891204        if (request_pause || stopreads ||
    11901205            !readaheadrunning || (ignorereadpos >= 0))
    11911206        {
     1207            LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc + " -- direct read");
    11921208            int ret = ReadDirect(buf, count, peek);
    11931209            LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc +
    11941210                QString(": ReadDirect checksum %1")
    int RingBuffer::ReadPriv(void *buf, int count, bool peek) 
    12031219    if (!WaitForReadsAllowed())
    12041220    {
    12051221        LOG(VB_FILE, LOG_NOTICE, LOC + loc_desc + ": !WaitForReadsAllowed()");
    1206         rwlock.unlock();
    1207         stopreads = true; // this needs to be outside the lock
    1208         rwlock.lockForWrite();
    1209         wanttoread = 0;
     1222        // NB don't set stopreads or else the next ReadPriv will call ReadDirect
     1223        // which, if there's any readahead, will cause data to be returned out
     1224        // of sequence
    12101225        rwlock.unlock();
    12111226        return 0;
    12121227    }
  • mythtv/libs/libmythtv/ringbuffer.h

    diff --git a/mythtv/libs/libmythtv/ringbuffer.h b/mythtv/libs/libmythtv/ringbuffer.h
    index f551ecd..8dc633d 100644
    a b class MTV_PUBLIC RingBuffer : protected MThread 
    166166
    167167    int ReadBufFree(void) const;
    168168    int ReadBufAvail(void) const;
     169    int ReadBufUsed() const;
     170    bool ReadsAllowed() const;
    169171
    170172    void ResetReadAhead(long long newinternal);
    171173    void KillReadAheadThread(void);