Ticket #10658: 10658-v2.patch

File 10658-v2.patch, 13.7 KB (added by danielk, 8 years ago)

Rework which allows short reads, but still avoids ever returning 0 bytes.

  • mythtv/libs/libmythtv/avfringbuffer.cpp

    diff --git a/mythtv/libs/libmythtv/avfringbuffer.cpp b/mythtv/libs/libmythtv/avfringbuffer.cpp
    index cb1a095..61cd0c3 100644
    a b int AVFRingBuffer::AVF_Read(URLContext *h, uint8_t *buf, int buf_size) 
    1818    AVFRingBuffer *avfr = (AVFRingBuffer *)h->priv_data;
    1919
    2020    if (!avfr)
    21         return 0;
     21        return AVERROR(EBADF);
     22
     23    int ret = avfr->GetRingBuffer()->Read(buf, buf_size);
    2224
    23     return avfr->GetRingBuffer()->Read(buf, buf_size);
     25    return (ret < 0) ? AVERROR(errno) : ret;
    2426}
    2527
    2628int AVFRingBuffer::AVF_Write(URLContext *h, const uint8_t *buf, int buf_size)
    int AVFRingBuffer::AVF_Write(URLContext *h, const uint8_t *buf, int buf_size) 
    2830    AVFRingBuffer *avfr = (AVFRingBuffer *)h->priv_data;
    2931
    3032    if (!avfr)
    31         return 0;
     33        return AVERROR(EBADF);
    3234
    33     return avfr->GetRingBuffer()->Write(buf, buf_size);
     35    int ret = avfr->GetRingBuffer()->Write(buf, buf_size);
     36   
     37    return (ret < 0) ? AVERROR(errno) : ret;
    3438}
    3539
    3640int64_t AVFRingBuffer::AVF_Seek(URLContext *h, int64_t offset, int whence)
    int64_t AVFRingBuffer::AVF_Seek(URLContext *h, int64_t offset, int whence) 
    3842    AVFRingBuffer *avfr = (AVFRingBuffer *)h->priv_data;
    3943
    4044    if (!avfr)
    41         return 0;
     45        return AVERROR(EBADF);
    4246
    4347    if (whence == AVSEEK_SIZE)
    4448        return avfr->GetRingBuffer()->GetRealFileSize();
  • mythtv/libs/libmythtv/fileringbuffer.cpp

    diff --git a/mythtv/libs/libmythtv/fileringbuffer.cpp b/mythtv/libs/libmythtv/fileringbuffer.cpp
    index 07aad39..03cbbe3 100644
    a b long long FileRingBuffer::Seek(long long pos, int whence, bool has_lock) 
    648648            {
    649649                ateof = false;
    650650                readsallowed = false;
     651                readsdesired = false;
     652                recentseek = true;
    651653            }
    652654            readpos = new_pos;
    653655            poslock.unlock();
    long long FileRingBuffer::Seek(long long pos, int whence, bool has_lock) 
    759761            {
    760762                ateof = false;
    761763                readsallowed = false;
     764                readsdesired = false;
     765                recentseek = true;
    762766            }
    763767
    764768            poslock.unlock();
  • mythtv/libs/libmythtv/ringbuffer.cpp

    diff --git a/mythtv/libs/libmythtv/ringbuffer.cpp b/mythtv/libs/libmythtv/ringbuffer.cpp
    index bd6f734..1e8af00 100644
    a b  
    1717#include "ThreadedFileWriter.h"
    1818#include "fileringbuffer.h"
    1919#include "streamingringbuffer.h"
     20#include "mythmiscutil.h"
    2021#include "livetvchain.h"
    2122#include "mythcontext.h"
    2223#include "ringbuffer.h"
    RingBuffer::RingBuffer(RingBufferType rbtype) : 
    191192    startreadahead(false),    readAheadBuffer(NULL),
    192193    readaheadrunning(false),  reallyrunning(false),
    193194    request_pause(false),     paused(false),
    194     ateof(false),             readsallowed(false),
     195    ateof(false),
     196    readsallowed(false),      readsdesired(false),
     197    recentseek(true),
    195198    setswitchtonext(false),
    196199    rawbitrate(8000),         playspeed(1.0f),
    197200    fill_threshold(65536),    fill_min(-1),
    void RingBuffer::CalcReadAheadThresh(void) 
    348351    uint estbitrate = 0;
    349352
    350353    readsallowed   = false;
     354    readsdesired   = false;
    351355    readblocksize  = max(readblocksize, CHUNK);
    352356
    353357    // loop without sleeping if the buffered data is less than this
    void RingBuffer::ResetReadAhead(long long newinternal) 
    484488    internalreadpos = newinternal;
    485489    ateof = false;
    486490    readsallowed = false;
     491    readsdesired = false;
     492    recentseek = true;
    487493    setswitchtonext = false;
    488494    generalWait.wakeAll();
    489495
    void RingBuffer::run(void) 
    906912        bool reads_were_allowed = readsallowed;
    907913
    908914        if ((0 == read_return) || (numfailures > 5) ||
    909             (readsallowed != (used >= fill_min || ateof ||
    910                               setswitchtonext || commserror)))
     915            (readsallowed != (used >= 1 || ateof ||
     916                              setswitchtonext || commserror)) ||
     917            (readsdesired != (used >= fill_min || ateof ||
     918                                setswitchtonext || commserror)))
    911919        {
    912920            // If readpos changes while the lock is released
    913921            // we should not handle the 0 read_return now.
    void RingBuffer::run(void) 
    918926
    919927            commserror |= (numfailures > 5);
    920928
    921             readsallowed = used >= fill_min || ateof ||
     929            readsallowed = used >= 1 || ateof ||
     930                setswitchtonext || commserror;
     931            readsdesired = used >= fill_min || ateof ||
    922932                setswitchtonext || commserror;
    923933
    924934            if (0 == read_return && old_readpos == readpos)
    void RingBuffer::run(void) 
    987997    rbwpos = 0;
    988998    reallyrunning = false;
    989999    readsallowed = false;
     1000    readsdesired = false;
    9901001    delete [] readAheadBuffer;
    9911002
    9921003    readAheadBuffer = NULL;
    int RingBuffer::Peek(void *buf, int count) 
    10201031    return ret;
    10211032}
    10221033
    1023 bool RingBuffer::WaitForReadsAllowed(void)
    1024 {
    1025     MythTimer t;
    1026     t.start();
    1027 
    1028     while (!readsallowed && !stopreads &&
    1029            !request_pause && !commserror && readaheadrunning)
    1030     {
    1031         generalWait.wait(&rwlock, 1000);
    1032         if (!readsallowed && t.elapsed() > 1000)
    1033         {
    1034             LOG(VB_GENERAL, LOG_WARNING, LOC +
    1035                 "Taking too long to be allowed to read..");
    1036 
    1037             if (t.elapsed() > 10000)
    1038             {
    1039                 LOG(VB_GENERAL, LOG_ERR, LOC + "Took more than 10 seconds to "
    1040                                                "be allowed to read, aborting.");
    1041                 return false;
    1042             }
    1043         }
    1044     }
    1045 
    1046     return readsallowed;
    1047 }
    1048 
    1049 bool RingBuffer::WaitForAvail(int count)
     1034int RingBuffer::WaitForAvail(int count, int how_long_ms)
    10501035{
    10511036    int avail = ReadBufAvail();
     1037    if (avail >= count)
     1038        return avail;
     1039
    10521040    count = (ateof && avail < count) ? avail : count;
    10531041
    10541042    if (livetvchain && setswitchtonext && avail < count)
    bool RingBuffer::WaitForAvail(int count) 
    10561044        LOG(VB_GENERAL, LOG_INFO, LOC +
    10571045            "Checking to see if there's a new livetv program to switch to..");
    10581046        livetvchain->ReloadAll();
    1059         return false;
     1047        return avail;
    10601048    }
    10611049
    10621050    // Make sure that if the read ahead thread is sleeping and
    bool RingBuffer::WaitForAvail(int count) 
    10731061           !request_pause && !commserror && readaheadrunning)
    10741062    {
    10751063        wanttoread = count;
    1076         generalWait.wait(&rwlock, 250);
     1064        generalWait.wait(&rwlock, clamp(how_long_ms - t.elapsed(), 10, 250));
    10771065        avail = ReadBufAvail();
    10781066
    1079         if (ateof && avail < count)
    1080             count = avail;
    1081 
    1082         if (avail < count)
    1083         {
    1084             int elapsed = t.elapsed();
    1085             if (elapsed > 500 && low_buffers && avail >= fill_min)
    1086                 count = avail;
    1087             else if  (((elapsed > 250) && (elapsed < 500))  ||
    1088                      ((elapsed >  500) && (elapsed < 750))  ||
    1089                      ((elapsed > 1000) && (elapsed < 1250)) ||
    1090                      ((elapsed > 2000) && (elapsed < 2250)) ||
    1091                      ((elapsed > 4000) && (elapsed < 4250)) ||
    1092                      ((elapsed > 8000) && (elapsed < 8250)) ||
    1093                      ((elapsed > 9000)))
    1094             {
    1095                 LOG(VB_GENERAL, LOG_INFO, LOC + "Waited " +
    1096                     QString("%1").arg((elapsed / 250) * 0.25f, 3, 'f', 1) +
    1097                     " seconds for data \n\t\t\tto become available..." +
    1098                     QString(" %2 < %3") .arg(avail).arg(count));
    1099             }
    1100 
    1101             if (elapsed > 16000)
    1102             {
    1103                 LOG(VB_GENERAL, LOG_ERR, LOC + "Waited " +
    1104                     QString("%1").arg(elapsed/1000) +
    1105                     " seconds for data, aborting.");
    1106                 return false;
    1107             }
    1108         }
     1067        if (ateof)
     1068            break;
     1069        if (low_buffers && avail >= fill_min)
     1070            break;
     1071        if (t.elapsed() > how_long_ms)
     1072            break;
    11091073    }
    11101074
    11111075    wanttoread = 0;
    11121076
    1113     return avail >= count;
     1077    return avail;
    11141078}
    11151079
    11161080int RingBuffer::ReadDirect(void *buf, int count, bool peek)
    int RingBuffer::ReadDirect(void *buf, int count, bool peek) 
    11831147 *  \param buf   Pointer to where data will be written
    11841148 *  \param count Number of bytes to read
    11851149 *  \param peek  If true, don't increment read count
    1186  *  \return Returns number of bytes read
     1150 *
     1151 *  \note If we return a -1, errno will be set as if the C
     1152 *        read function had been called on a file.
     1153 *
     1154 *  \return Returns number of bytes read, or -1 on error
    11871155 */
    11881156int RingBuffer::ReadPriv(void *buf, int count, bool peek)
    11891157{
    int RingBuffer::ReadPriv(void *buf, int count, bool peek) 
    12341202        rwlock.lockForRead();
    12351203    }
    12361204
    1237     if (!WaitForReadsAllowed())
     1205    // Wait up to 10000 ms for reads allowed (or readsdesired if post seek/open)
     1206    bool &check = (recentseek) ? readsdesired : readsallowed;
     1207    recentseek = false;
     1208    MythTimer t; t.start();
     1209    int timeout_ms = 10000;
     1210    while ((t.elapsed() < timeout_ms) && !check && !stopreads &&
     1211           !request_pause && !commserror && readaheadrunning)
     1212    {
     1213        generalWait.wait(&rwlock, clamp(timeout_ms - t.elapsed(), 10, 100));
     1214    }
     1215    if (!readsallowed)
    12381216    {
    1239         LOG(VB_FILE, LOG_NOTICE, LOC + loc_desc + ": !WaitForReadsAllowed()");
     1217        LOG(VB_GENERAL, LOG_ERR, LOC + loc_desc +
     1218            QString(" -- timed out waiting for readsallowed (%1 ms)")
     1219            .arg(t.elapsed()));
     1220
    12401221        rwlock.unlock();
    12411222        stopreads = true; // this needs to be outside the lock
    12421223        rwlock.lockForWrite();
    12431224        wanttoread = 0;
    12441225        rwlock.unlock();
    1245         return 0;
     1226        errno = EIO;
     1227        return -1;
     1228    }
     1229    if (t.elapsed() > 2000)
     1230    {
     1231        LOG(VB_GENERAL, LOG_INFO, LOC + loc_desc +
     1232            QString(" -- waited %1 ms for readsdesired or readsallowed")
     1233            .arg(t.elapsed()));
    12461234    }
    12471235
    1248     if (!WaitForAvail(count))
     1236    // Wait up to 100 ms for reads preferred (at least fill_min of data)
     1237    timeout_ms = 100;
     1238    while ((t.elapsed() < timeout_ms) && !readsallowed && !stopreads &&
     1239           !request_pause && !commserror && readaheadrunning && !ateof)
    12491240    {
    1250         LOG(VB_FILE, LOG_NOTICE, LOC + loc_desc + ": !WaitForAvail()");
    1251         rwlock.unlock();
    1252         stopreads = true; // this needs to be outside the lock
    1253         rwlock.lockForWrite();
    1254         ateof = true;
    1255         wanttoread = 0;
    1256         rwlock.unlock();
    1257         return 0;
     1241        generalWait.wait(&rwlock, clamp(timeout_ms - t.elapsed(), 10, 100));
     1242    }
     1243
     1244    // Wait up to 100 ms for count data
     1245    timeout_ms = 100;
     1246    int avail = WaitForAvail(count, max(timeout_ms - t.elapsed(), 1));
     1247       
     1248    // Wait up to 10000 ms for any data
     1249    timeout_ms = 10000;
     1250    while ((0 == avail) && (t.elapsed() < timeout_ms) && !stopreads &&
     1251           !request_pause && !commserror && readaheadrunning)
     1252    {
     1253        avail = WaitForAvail(count, max(timeout_ms - t.elapsed(), 1));
     1254    }
     1255    if (t.elapsed() > 2000)
     1256    {
     1257        LOG(VB_GENERAL, LOG_INFO, LOC + loc_desc +
     1258            QString(" -- waited %1 ms for avail(%2) > count(%3)")
     1259            .arg(t.elapsed()).arg(avail).arg(count));
    12581260    }
    12591261
    1260     count = min(ReadBufAvail(), count);
     1262    // Read at most the available number of bytes..
     1263    count = min(avail, count);
    12611264
    1262     if (count <= 0)
     1265    if ((count == 0) && ateof)
     1266    {
     1267        // If we're at the end of file return 0 bytes
     1268        rwlock.unlock();
     1269        return 0;
     1270    }
     1271    else if (count == 0)
    12631272    {
    1264         // this can happen under a few conditions but the most
    1265         // notable is an exit from the read ahead thread or
    1266         // the end of the file stream has been reached.
    1267         LOG(VB_FILE, LOG_NOTICE, LOC + loc_desc + ": ReadBufAvail() == 0");
     1273        // If we're not at the end of file but have no data
     1274        // at this point time out and shutdown read ahead.
     1275        LOG(VB_GENERAL, LOG_ERR, LOC + loc_desc +
     1276            QString(" -- timed out waiting for data (%1 ms)")
     1277            .arg(t.elapsed()));
     1278
     1279        rwlock.unlock();
     1280        stopreads = true; // this needs to be outside the lock
     1281        rwlock.lockForWrite();
     1282        wanttoread = 0;
    12681283        rwlock.unlock();
    1269         return count;
     1284        errno = EIO;
     1285        return -1;
    12701286    }
    12711287
    12721288    if (peek)
    int RingBuffer::Read(void *buf, int count) 
    13181334        poslock.lockForWrite();
    13191335        readpos += ret;
    13201336        poslock.unlock();
     1337        UpdateDecoderRate(ret);
    13211338    }
    13221339
    1323     UpdateDecoderRate(ret);
    13241340    return ret;
    13251341}
    13261342
  • mythtv/libs/libmythtv/ringbuffer.h

    diff --git a/mythtv/libs/libmythtv/ringbuffer.h b/mythtv/libs/libmythtv/ringbuffer.h
    index f208aed..c489cc8 100644
    a b class MTV_PUBLIC RingBuffer : protected MThread 
    165165
    166166    int ReadPriv(void *buf, int count, bool peek);
    167167    int ReadDirect(void *buf, int count, bool peek);
    168     bool WaitForReadsAllowed(void);
    169     bool WaitForAvail(int count);
     168    int WaitForAvail(int count, int how_long_ms);
    170169
    171170    int ReadBufFree(void) const;
    172171    int ReadBufAvail(void) const;
    class MTV_PUBLIC RingBuffer : protected MThread 
    218217    bool      paused;             // protected by rwlock
    219218    bool      ateof;              // protected by rwlock
    220219    bool      readsallowed;       // protected by rwlock
     220    bool      readsdesired;       // protected by rwlock
     221    volatile bool recentseek;
    221222    bool      setswitchtonext;    // protected by rwlock
    222223    uint      rawbitrate;         // protected by rwlock
    223224    float     playspeed;          // protected by rwlock