Ticket #6330: locklesstfw-v1.diff

File locklesstfw-v1.diff, 26.6 KB (added by Matthias "mortalmatt" Dahl, 11 years ago)

ThreadedFileWriterLockless? v1

  • mythtv/libs/libmythtv/ThreadedFileWriterLockless.h

     
     1#ifndef TFW_LOCKLESS_H_
     2#define TFW_LOCKLESS_H_
     3
     4#include <pthread.h>
     5
     6#include <QAtomicInt>
     7#include <QAtomicPointer>
     8#include <QMutex>
     9#include <QWaitCondition>
     10#include <QString>
     11
     12class ThreadedFileWriterLockless
     13{
     14  public:
     15    ThreadedFileWriterLockless(const QString &fname, int flags, mode_t mode);
     16    ~ThreadedFileWriterLockless();
     17
     18    bool Open(void);
     19    long long Seek(long long pos, int whence);
     20    uint Write(const void *data, uint count);
     21
     22    void SetWriteBufferSize(uint newSize = DEFAULT_BUFFER_SIZE);
     23    void SetWriteBufferMinWriteSize(uint newMinSize = MIN_WRITE_SIZE);
     24
     25    uint BufUsed(void) const;
     26    uint BufFree(void) const;
     27
     28    void Sync(void);
     29    void Flush(void);
     30
     31    static uint safe_write(int fd, const void *data, uint count, bool &ok);
     32
     33  protected:
     34    static void *BootWriter(void *);
     35    void DiskWriterLoop(void);
     36
     37    long long WriteFwdFree(const char* readPtr, const char* writePtr) const;
     38    long long ReadFwdFree(const char* readPtr, const char* writePtr) const;
     39    long long WriteFwdContinuousFree(const char* readPtr, const char* writePtr) const;
     40    long long ReadFwdContinuousFree(const char* readPtr, const char* writePtr) const;
     41
     42    void SetupBuffer(uint reqBufferSize);
     43
     44  private:
     45    // file handling related
     46    QString         fileName;
     47    int             fileFlags;
     48    mode_t          fileMode;
     49    int             fileFD;
     50    QAtomicInt      fileMinWriteSize; // IMPORTANT CHANGE: kilobytes
     51    long long       fileBytesWritten;
     52
     53    // state flags
     54    QAtomicInt  isFlushing;
     55    QAtomicInt  isWriterRunning;
     56    QAtomicInt  isResettingBuffer;
     57    QAtomicInt  isInDestructor;
     58    QAtomicInt  isIgnoringWritesToBuffer;
     59    QAtomicInt  isIgnoringWritesToDisk;
     60
     61    // locks and wait conditions
     62    mutable QMutex  bufferWriterLock;
     63    mutable QMutex  diskWriterLock;
     64
     65    QWaitCondition  bufferEmpty;
     66    QWaitCondition  bufferHasData;
     67    QWaitCondition  bufferWroteData;
     68
     69    // buffer and related
     70    char                *buffer;
     71    char                *bufferEnd;
     72    unsigned long        bufferSize;
     73    QAtomicPointer<char> bufferReadPtr;
     74    QAtomicPointer<char> bufferWritePtr;
     75    QAtomicPointer<char> bufferAheadWritePtr;
     76    QAtomicInt           bufferReservationSlot;
     77    QAtomicInt           bufferReservationNextActiveSlot;
     78
     79    // threads
     80    pthread_t       writer;
     81
     82  private:
     83    // constants
     84    /// default buffer size
     85    static const uint DEFAULT_BUFFER_SIZE;
     86    /// maximum chunk size to write to disk in one go
     87    static const uint MAX_WRITE_SIZE;
     88    /// minimum chunk size to write to disk in one go (except when flushing)
     89    static const uint MIN_WRITE_SIZE;
     90    /// number of extra bytes to reserve before and after the actual buffer
     91    static const uint BUFFER_CUSHION;
     92    /// maximum write errors for a single call to safe_write before giving up
     93    static const uint MAX_WRITE_ERRORS;
     94};
     95#endif
  • mythtv/libs/libmythtv/libmythtv.pro

     
    247247SOURCES += dtvmultiplex.cpp
    248248SOURCES += dtvconfparser.cpp        dtvconfparserhelpers.cpp
    249249
     250exists( $$[QT_INSTALL_HEADERS]/QtCore/QAtomicPointer ):exists( $$[QT_INSTALL_HEADERS]/QtCore/QAtomicInt ) {
     251    HEADERS -= ThreadedFileWriter.h
     252    HEADERS += ThreadedFileWriterLockless.h
     253    SOURCES -= ThreadedFileWriter.cpp
     254    SOURCES += ThreadedFileWriterLockless.cpp
     255}
     256
    250257using_frontend {
    251258    # Recording profile stuff
    252259    HEADERS += profilegroup.h
  • mythtv/libs/libmythtv/ThreadedFileWriterLockless.cpp

     
     1// ANSI C headers
     2#include <cstdio>
     3#include <cstdlib>
     4#include <cerrno>
     5#include <ctime>
     6#include <climits>
     7
     8// Unix C headers
     9#include <sys/types.h>
     10#include <sys/stat.h>
     11#include <unistd.h>
     12#include <signal.h>
     13#include <fcntl.h>
     14
     15// MythTV headers
     16#include "ThreadedFileWriterLockless.h"
     17#include "mythcontext.h"
     18#include "compat.h"
     19#include "mythverbose.h"
     20
     21#if defined(_POSIX_SYNCHRONIZED_IO) && _POSIX_SYNCHRONIZED_IO > 0
     22#define HAVE_FDATASYNC
     23#endif
     24
     25#define LOC QString("TFWL: ")
     26#define LOC_ERR QString("TFWL, Error: ")
     27
     28const uint ThreadedFileWriterLockless::DEFAULT_BUFFER_SIZE = 2*1024*1024;
     29const uint ThreadedFileWriterLockless::MAX_WRITE_SIZE      = DEFAULT_BUFFER_SIZE / 4;
     30const uint ThreadedFileWriterLockless::MIN_WRITE_SIZE      = DEFAULT_BUFFER_SIZE / 1024 / 32;
     31const uint ThreadedFileWriterLockless::BUFFER_CUSHION      = 512;
     32const uint ThreadedFileWriterLockless::MAX_WRITE_ERRORS    = 10;
     33
     34/** \class ThreadedFileWriterLockless
     35 *  \brief A (almost) lock-free threaded disk writer that buffers data written
     36 *         to it and writes it to the disk in a seperate thread.
     37 *
     38 *  This class implements a threaded ringbuffer. All data written via ::Write()
     39 *  is inserted into the ringbuffer while a seperate thread reads the data from
     40 *  the buffer and writes it to disk.
     41 *
     42 *  The implementation is thread-safe and thus also reentrant.
     43 */
     44
     45/** \fn ThreadedFileWriterLockless::safe_write(int fd, const void *data, uint count, bool &ok)
     46 *  \brief Writes all data to disk and retries in case of errors.
     47 *
     48 *  The standard POSIX write() can return early with no or just a portion of
     49 *  the desired data written to disk. This function tries to make sure that
     50 *  all data gets written to disk by...
     51 *
     52 *    1) endlessly retrying in case of EAGAIN or EINTR errors
     53 *    2) retrying up to MAX_WRITE_ERRORS times in case of other errors
     54 *    3) writing out the rest amount of data until no data is left
     55 *
     56 *  \param fd    File descriptor
     57 *  \param data  Pointer to data to write
     58 *  \param count Size of data to write in bytes
     59 *
     60 *  \return Number of written bytes
     61 */
     62uint ThreadedFileWriterLockless::safe_write(int fd, const void *data,
     63                                            uint count, bool &ok)
     64{
     65    uint bytesWritten = 0;
     66    uint errors       = 0;
     67
     68    while(bytesWritten < count && errors < MAX_WRITE_ERRORS)
     69    {
     70        int ret = write(fd, (char*)data + bytesWritten, count - bytesWritten);
     71
     72        if(ret >= 0)
     73            bytesWritten += ret;
     74        else if(errno != EAGAIN || errno != EINTR)
     75            ++errors;
     76    }
     77
     78    if(errors < MAX_WRITE_ERRORS)
     79        ok = true;
     80    else
     81        ok = false;
     82
     83    return bytesWritten;
     84}
     85
     86/** \fn ThreadedFileWriterLockless::BootWriter(void *tfw)
     87 *  \brief Helper function which simply runs the DiskWriterLoop().
     88 *
     89 *  \param tfw Pointer to ThreadedFileWriterLockless instance
     90 */
     91void *ThreadedFileWriterLockless::BootWriter(void *tfw)
     92{
     93#ifndef USING_MINGW
     94    signal(SIGXFSZ, SIG_IGN);
     95#endif
     96    ((ThreadedFileWriterLockless *)tfw)->DiskWriterLoop();
     97    return NULL;
     98}
     99
     100/** \fn ThreadedFileWriterLockless::ThreadedFileWriterLockless(const QString &fname, int flags, mode_t mode)
     101 *  \brief Constructor
     102 *
     103 *  The constructor sets the filename, the access/creation/status flags and
     104 *  the file permissions. It does not create the file nor does it start the
     105 *  writer thread.
     106 *
     107 *  \param fname  Filename
     108 *  \param flags  Access/Creation/Status flags (see POSIX open())
     109 *  \param mode   File permissions (see POSIX open())
     110 */
     111ThreadedFileWriterLockless::ThreadedFileWriterLockless(const QString &fname,
     112                                                       int flags, mode_t mode) :
     113    // file handling related
     114    fileName(fname), fileFlags(flags), fileMode(mode), fileFD(-1),
     115    fileMinWriteSize(0), fileBytesWritten(0),
     116    // state flags
     117    isFlushing(0), isWriterRunning(0), isResettingBuffer(0),  isInDestructor(0),
     118    isIgnoringWritesToBuffer(0), isIgnoringWritesToDisk(0),
     119    // buffer and related
     120    buffer(NULL), bufferEnd(NULL), bufferSize(0), bufferReadPtr(NULL),
     121    bufferWritePtr(NULL), bufferAheadWritePtr(NULL),
     122    bufferReservationSlot(0), bufferReservationNextActiveSlot(0)
     123{
     124    fileName.detach();
     125}
     126
     127/** \fn ThreadedFileWriterLockless::Open(void)
     128 *  \brief Opens the file we will be writing to and starts the DiskWriter thread.
     129 *
     130 *  \return TRUE if file was opened successfully and DiskWriter thread started
     131 */
     132bool ThreadedFileWriterLockless::Open(void)
     133{
     134    // TODO: don't fail with an error, just flush the buffer and end the writer
     135    if(isWriterRunning)
     136    {
     137        VERBOSE(VB_IMPORTANT, LOC_ERR +
     138                QString("Opening file '%1' failed. Writer already running")
     139                .arg(fileName));
     140
     141        return false;
     142    }
     143
     144    isFlushing               = 0;
     145    isResettingBuffer        = 0;
     146    isInDestructor           = 0;
     147    isIgnoringWritesToBuffer = 0;
     148    isIgnoringWritesToDisk   = 0;
     149
     150    if (fileName == "-")
     151        fileFD = fileno(stdout);
     152    else
     153        fileFD = open(fileName.toAscii().constData(), fileFlags, fileMode);
     154
     155    if (fileFD < 0)
     156    {
     157        VERBOSE(VB_IMPORTANT, LOC_ERR +
     158                QString("Opening file '%1'.").arg(fileName) + ENO);
     159        return false;
     160    }
     161    else
     162    {
     163        SetupBuffer(DEFAULT_BUFFER_SIZE);
     164        fileMinWriteSize = MIN_WRITE_SIZE;
     165
     166        pthread_create(&writer, NULL, BootWriter, this);
     167/*
     168        if(gContext->GetNumSetting("RealtimePriority", 1))
     169        {
     170            // thread priority in respect to SCHED_RR: 1
     171            struct sched_param sp = {2};
     172
     173            if(!pthread_setschedparam(writer, SCHED_RR, &sp))
     174            {
     175                VERBOSE(VB_GENERAL, LOC +
     176                        "Using realtime priority.");
     177            }
     178            else
     179            {
     180                VERBOSE(VB_IMPORTANT, LOC_ERR +
     181                        "Realtime priority requires sufficient user priviledges.");
     182            }
     183        }
     184*/
     185        return true;
     186    }
     187}
     188
     189/** \fn ThreadedFileWriterLockless::~ThreadedFileWriterLockless()
     190 *  \brief Destructor which commits all writes, ends the DiskWriter thread
     191 *         and closes the file.
     192 */
     193ThreadedFileWriterLockless::~ThreadedFileWriterLockless()
     194{
     195    isIgnoringWritesToBuffer.fetchAndStoreOrdered(1);
     196
     197    if (fileFD >= 0)
     198    {
     199        Flush();
     200        isInDestructor.fetchAndStoreOrdered(1); /* tells child thread to exit */
     201
     202        // TODO: check if this is enough... or still required.
     203        bufferHasData.wakeAll();
     204        bufferEmpty.wakeAll();
     205        pthread_join(writer, NULL);
     206
     207        close(fileFD);
     208    }
     209
     210    if (buffer)
     211        delete [] (buffer - BUFFER_CUSHION);
     212}
     213
     214/** \fn ThreadedFileWriterLockless::Write(const void *data, uint count)
     215 *  \brief Writes data to the internal buffer.
     216 *
     217 *  This function writes data to the internal buffer. It is guaranteed that
     218 *  the data won't get fragmented if there are parallel calls to this function.
     219 *  However in order to keep the internal data structures sane and prevent
     220 *  deadlocks and similar problems, the following limitations exist:
     221 *
     222 *   - if there concurrent calls and the ringbuffer is filled up to the point
     223 *     that there is not enough free space, a fallback to a FIFO-behaviour
     224 *     happens until all calls are served
     225 *
     226 *   - all concurrent calls will return one after one in the order their data
     227 *     were written to the internal ringbuffer. Up to that point tough, they
     228 *     are fully concurrent.
     229 *
     230 *  NOTE: Currently it is not supported to write data larger than the current
     231 *        ringbuffer size.
     232 *
     233 *  \param data  Pointer to data
     234 *  \param count Size of data in bytes
     235 *
     236 *  \return Returns the number of bytes written
     237 */
     238uint ThreadedFileWriterLockless::Write(const void *data, uint count)
     239{
     240    if(count == 0 || isIgnoringWritesToBuffer || isResettingBuffer
     241                  || isInDestructor)
     242        return 0;
     243
     244    // TODO: implement proper handling of data chunks larger than buffer size
     245    if (count >= bufferSize)
     246    {
     247        VERBOSE(VB_IMPORTANT, LOC_ERR +
     248                QString("ERROR: Unable to buffer data larger than buffer size: "
     249                        "%1 > %2.").arg(count).arg(bufferSize));
     250
     251        return 0;
     252    }
     253
     254    bool hasSlot     = false;
     255    bool forcedQueue = (bufferReservationNextActiveSlot != bufferReservationSlot);
     256    int  slotNumber  = 0;
     257
     258    char* localReadPtr           = NULL;
     259    char* localWritePtr          = NULL;
     260    char* newBufferAheadWritePtr = NULL;
     261    do
     262    {
     263        bool locked = false;
     264
     265        if(forcedQueue)
     266        {
     267            if(!hasSlot)
     268            {
     269                hasSlot    = true;
     270                slotNumber = bufferReservationSlot.fetchAndAddOrdered(-1);
     271            }
     272
     273            bufferWriterLock.lock();
     274            locked = true;
     275
     276            do
     277            {
     278                bufferWroteData.wait(&bufferWriterLock, 100);
     279            }
     280            while(bufferReservationNextActiveSlot != slotNumber);
     281
     282            forcedQueue = false;
     283        }
     284
     285        localReadPtr = static_cast<char*>(bufferReadPtr);
     286        localWritePtr= static_cast<char*>(bufferAheadWritePtr);
     287
     288        if(!forcedQueue && WriteFwdFree(localReadPtr, localWritePtr) >= count)
     289        {
     290            uint contSize = WriteFwdContinuousFree(localReadPtr, localWritePtr);
     291
     292            if(contSize >= count)
     293            {
     294                newBufferAheadWritePtr = localWritePtr + count;
     295            }
     296            else
     297            {
     298                newBufferAheadWritePtr = buffer + (count - contSize);
     299            }
     300        }
     301        else
     302        {
     303            forcedQueue = true;
     304        }
     305
     306        if(locked)
     307        {
     308            locked = false;
     309            bufferWriterLock.unlock();
     310        }
     311    }
     312    while(forcedQueue || !bufferAheadWritePtr.testAndSetOrdered(localWritePtr, newBufferAheadWritePtr));
     313
     314    if(hasSlot)
     315        bufferReservationNextActiveSlot.fetchAndAddOrdered(-1);
     316
     317    long long offset  = 0;
     318    uint totalWritten = 0;
     319    uint dataOffset   = 0;
     320    if(WriteFwdContinuousFree(localReadPtr, localWritePtr) < count)
     321    {
     322        uint chunkSize = WriteFwdContinuousFree(localReadPtr, localWritePtr);
     323        memcpy(localWritePtr, data, chunkSize);
     324
     325        count          -= chunkSize;
     326        totalWritten   += chunkSize;
     327        offset          = chunkSize;
     328        dataOffset      = chunkSize;
     329
     330        if(localWritePtr + offset > bufferEnd)
     331            offset = -(localWritePtr - buffer);
     332    }
     333    memcpy(localWritePtr + offset, (char*)data + dataOffset, count);
     334    totalWritten += count;
     335
     336    while(!bufferWritePtr.testAndSetOrdered(localWritePtr, newBufferAheadWritePtr))
     337    {
     338        struct timespec sleepTime = {0, 10};
     339        nanosleep(&sleepTime, NULL);
     340    }
     341
     342    bufferHasData.wakeAll();
     343
     344    return totalWritten;
     345}
     346
     347/** \fn ThreadedFileWriterLockless::Seek(long long pos, int whence)
     348 *  \brief Seek to a position within the disk file
     349 *
     350 *  \warning This method has not been tested yet!
     351 *
     352 *  \param pos    New file offset (with regards to whence)
     353 *  \param whence Specifies how to interpret offset (see lseek())
     354 *
     355 *  \return Resulting offset
     356 */
     357long long ThreadedFileWriterLockless::Seek(long long pos, int whence)
     358{
     359    isIgnoringWritesToBuffer.fetchAndStoreOrdered(1);
     360
     361    Flush();
     362
     363    long long result = lseek(fileFD, pos, whence);
     364
     365    isIgnoringWritesToBuffer.fetchAndStoreOrdered(0);
     366
     367    return result;
     368}
     369
     370/** \fn ThreadedFileWriterLockless::Flush(void)
     371 *  \brief Instructs the DiskWriter thread to flush all remaining data in the
     372 *         ringbuffer to the disk. If the DiskWriter thread is currently ignoring
     373 *         writes to the disk because of e.g. previous errors, the buffer just
     374 *         gets discarded.
     375 *
     376 *  \warning This call is expensive and no concurrent buffer writes are accepted.
     377 */
     378void ThreadedFileWriterLockless::Flush(void)
     379{
     380    bool resetIgnoreBufferWrites = true;
     381
     382    if(!isIgnoringWritesToBuffer.testAndSetOrdered(0, 1))
     383        resetIgnoreBufferWrites = false;
     384
     385    isFlushing.fetchAndStoreOrdered(1);
     386
     387    // TODO: check isIgnoringWritesTo*, otherwise we might wait forever
     388
     389    bufferHasData.wakeAll();
     390
     391    diskWriterLock.lock();
     392    while (BufUsed() > 0)
     393    {
     394        if (!bufferEmpty.wait(&diskWriterLock, 2000))
     395           VERBOSE(VB_IMPORTANT, LOC + "Taking a long time to flush...");
     396    }
     397    diskWriterLock.unlock();
     398
     399    isFlushing.fetchAndStoreOrdered(0);
     400
     401    if(resetIgnoreBufferWrites)
     402        isIgnoringWritesToBuffer.fetchAndStoreOrdered(0);
     403}
     404
     405/** \fn ThreadedFileWriterLockless::Sync(void)
     406 *  \brief Instructs the OS to sync all file data associated data to the disk.
     407 *
     408 *  \warning This is a very expensive call which blocks until the data is on
     409 *           the disk. Some filesystems (e.g. ext3) sync the entire filesystem
     410 *           data to disk, not only the file associated ones. This can cause
     411 *           high latency and delays if used extensively. Use with care.
     412 */
     413void ThreadedFileWriterLockless::Sync(void)
     414{
     415    if (fileFD >= 0)
     416    {
     417#ifdef HAVE_FDATASYNC
     418        fdatasync(fileFD);
     419#else
     420        fsync(fileFD);
     421#endif
     422    }
     423}
     424
     425/** \fn ThreadedFileWriterLockless::SetWriteBufferSize(uint newSize)
     426 *  \brief Sets the total size of the internal ringbuffer.
     427 *
     428 *  All data currently in the ringbuffer is flushed to the disk. During the
     429 *  time of the resetting, no writes to the buffer are accepted.
     430 *
     431 *  \warning This is an expensive call.
     432 *
     433 *  \param newSize New ringbuffer size in bytes
     434 */
     435void ThreadedFileWriterLockless::SetWriteBufferSize(uint newSize)
     436{
     437    if (newSize <= 0)
     438        return;
     439
     440    // TODO: spinlock
     441    if(!isResettingBuffer.testAndSetOrdered(0, 1))
     442        return;
     443
     444    isIgnoringWritesToBuffer.fetchAndStoreOrdered(1);
     445
     446    Flush();
     447    SetupBuffer(newSize);
     448
     449    isIgnoringWritesToBuffer.fetchAndStoreOrdered(0);
     450    isResettingBuffer.fetchAndStoreOrdered(0);
     451}
     452
     453/** \fn ThreadedFileWriterLockless::SetWriteBufferMinWriteSize(uint newMinSize)
     454 *  \brief Sets the minumum number of bytes to write to disk in a single write.
     455 *
     456 *  \warning The size gets stored in kilobytes internally and is thus
     457*            susceptible to rounding effects.
     458 *
     459 *  \param newMinSize New minimum size in bytes for a single write
     460 */
     461void ThreadedFileWriterLockless::SetWriteBufferMinWriteSize(uint newMinSize)
     462{
     463    if (newMinSize <= 0)
     464        return;
     465
     466    newMinSize /= 1024;
     467    if(newMinSize > INT_MAX)
     468        newMinSize = INT_MAX;
     469
     470    fileMinWriteSize.fetchAndStoreOrdered(newMinSize);
     471}
     472
     473/** \fn ThreadedFileWriterLockless::DiskLoop(void)
     474 *  \brief DiskWriter thread which reads data from the ringbuffer and writes it
     475 *         out to the disk.
     476 *
     477 *  The DiskWriterLoop() tries to free space as soon as possible but is limited
     478 *  by the minimum and maximum write size for a single write. Setting those
     479 *  higher can increase throughput but also latency.
     480 *
     481 */
     482void ThreadedFileWriterLockless::DiskWriterLoop(void)
     483{
     484    while (!isInDestructor || BufUsed() > 0)
     485    {
     486        uint bufferUsage = BufUsed();
     487
     488        if (!bufferUsage)
     489            bufferEmpty.wakeAll();
     490
     491        if (!bufferUsage || isResettingBuffer || (!isInDestructor && !isFlushing &&
     492             (((uint)(fileMinWriteSize * 1024) > bufferUsage) &&
     493             ((fileMinWriteSize * 1024) <= fileBytesWritten))))
     494        {
     495            diskWriterLock.lock();
     496            bufferHasData.wait(&diskWriterLock, 250);
     497            diskWriterLock.unlock();
     498            continue;
     499        }
     500
     501        long long  currWriteSize = (bufferUsage > MAX_WRITE_SIZE && !isFlushing) ?
     502                                    MAX_WRITE_SIZE : bufferUsage;
     503
     504        uint  currWritten   = 0;
     505        char* localReadPtr  = bufferReadPtr;
     506        char* localWritePtr = bufferWritePtr;
     507        bool  wasWriteOk    = true;
     508        uint  chunkSize     = 0;
     509
     510        while(wasWriteOk && currWriteSize > 0 && ReadFwdFree(localReadPtr, localWritePtr) > 0)
     511        {
     512            long long continuousSize = ReadFwdContinuousFree(localReadPtr, localWritePtr);
     513
     514            chunkSize = (currWriteSize <= continuousSize) ? currWriteSize : continuousSize;
     515
     516            if(!isIgnoringWritesToDisk)
     517                chunkSize = safe_write(fileFD, localReadPtr, chunkSize, wasWriteOk);
     518
     519            currWriteSize -= chunkSize;
     520            currWritten   += chunkSize;
     521            localReadPtr  += chunkSize;
     522
     523            if(localReadPtr > bufferEnd)
     524                localReadPtr = buffer;
     525        }
     526
     527        bufferReadPtr.fetchAndStoreOrdered(localReadPtr);
     528
     529        bufferWroteData.wakeAll();
     530
     531        if (!isIgnoringWritesToDisk && !wasWriteOk && (EFBIG == errno))
     532        {
     533            QString msg =
     534                "Maximum file size exceeded by '%1'"
     535                "\n\t\t\t"
     536                "You must either change the process ulimits, configure"
     537                "\n\t\t\t"
     538                "your operating system with \"Large File\" support, or use"
     539                "\n\t\t\t"
     540                "a filesystem which supports 64-bit or 128-bit files."
     541                "\n\t\t\t"
     542                "HINT: FAT32 is a 32-bit filesystem.";
     543
     544            VERBOSE(VB_IMPORTANT, msg.arg(fileName));
     545            isIgnoringWritesToBuffer.fetchAndStoreOrdered(1);
     546            isIgnoringWritesToDisk.fetchAndStoreOrdered(1);
     547        }
     548
     549        // if we wrote anything, make sure it gets uncached
     550        if(!isIgnoringWritesToDisk && currWritten > 0)
     551        {
     552            posix_fadvise(fileFD, fileBytesWritten, currWritten, POSIX_FADV_DONTNEED);
     553        }
     554
     555        if(!isIgnoringWritesToDisk)
     556            fileBytesWritten += currWritten;
     557    }
     558}
     559
     560/** \fn ThreadedFileWriterLockless::BufUsed(void) const
     561 *  \brief Gets the buffer usage in bytes which are queued for disk write
     562 *  \return Buffer usage in bytes
     563 */
     564uint ThreadedFileWriterLockless::BufUsed(void) const
     565{
     566    return (bufferSize-1) - BufFree();
     567}
     568
     569/** \fn ThreadedFileWriterLockless::BufFree(void) const
     570 *  \brief Gets the free buffer space in bytes
     571 *  \return Free buffer space in bytes
     572 */
     573uint ThreadedFileWriterLockless::BufFree(void) const
     574{
     575    return WriteFwdFree(bufferReadPtr, bufferWritePtr);
     576}
     577
     578/** \fn ThreadedFileWriterLockless::WriteFwdFree(void) const
     579 *  \brief Gets the number of bytes available for write to the ring buffer
     580 *         from the write pointer with regards to the passed pointers.
     581 *
     582 *  \return Free buffer space in bytes for write.
     583 */
     584long long ThreadedFileWriterLockless::WriteFwdFree(const char* readPtr, const char* writePtr) const
     585{
     586    return (readPtr - writePtr) + ((readPtr <= writePtr) ? bufferSize : 0) - 1;
     587}
     588
     589/** \fn ThreadedFileWriterLockless::ReadFwdFree(void) const
     590 *  \brief Gets the number of bytes available for read from the ring buffer
     591 *         from the read pointer with regards to the passed pointers.
     592 *
     593 *  \return Bytes available for read from the buffer
     594 */
     595long long ThreadedFileWriterLockless::ReadFwdFree(const char *readPtr,
     596                                                  const char *writePtr) const
     597{
     598    return (writePtr - readPtr) + ((readPtr <= writePtr) ? 0 : bufferSize);
     599}
     600
     601/** \fn ThreadedFileWriterLockless::WriteFwdContinuousFree(void) const
     602 *  \brief Gets the number of connected bytes available for write to the ring buffer
     603 *         from the write pointer with regards to the passed pointers.
     604 *
     605 *  \return Free buffer space in bytes for write.
     606 */
     607long long ThreadedFileWriterLockless::WriteFwdContinuousFree(const char* readPtr,
     608                                                             const char* writePtr) const
     609{
     610    uint offset = (readPtr > buffer) ? 1 : 0;
     611
     612    return (readPtr <= writePtr) ? (bufferEnd - writePtr) + offset
     613                          : (readPtr - writePtr) - 1;
     614}
     615
     616/** \fn ThreadedFileWriterLockless::ReadFwdContinuousFree(void) const
     617 *  \brief Gets the number of connected bytes available for read from the ring
     618 *         buffer from the read pointer with regards to the passed pointers.
     619 *
     620 *  \return Bytes available for read from the buffer
     621 */
     622long long ThreadedFileWriterLockless::ReadFwdContinuousFree(const char* readPtr,
     623                                                            const char* writePtr) const
     624{
     625    return ((readPtr <= writePtr) ? writePtr : (bufferEnd + 1)) - readPtr;
     626}
     627
     628/** \fn ThreadedFileWriterLockless::SetupBuffer(void)
     629 *  \brief Helper function which sets up the buffer and all necessary pointers.
     630 *
     631 *  \param reqBufferSize Requested buffer size
     632 */
     633void ThreadedFileWriterLockless::SetupBuffer(uint reqBufferSize)
     634{
     635    if(buffer)
     636        delete [] (buffer - BUFFER_CUSHION);
     637
     638    buffer = new char[1 + reqBufferSize + 2 * BUFFER_CUSHION];
     639    bzero(buffer, reqBufferSize + 2 * BUFFER_CUSHION);
     640
     641    buffer    += BUFFER_CUSHION;
     642    bufferSize = reqBufferSize;
     643    bufferEnd  = buffer + (reqBufferSize-1);
     644
     645    bufferWritePtr      = buffer;
     646    bufferAheadWritePtr = buffer;
     647    bufferReadPtr       = buffer;
     648
     649    bufferReservationSlot           = 0;
     650    bufferReservationNextActiveSlot = 0;
     651}
  • mythtv/libs/libmythtv/RingBuffer.cpp

     
    2323#include "RingBuffer.h"
    2424#include "remotefile.h"
    2525#include "remoteencoder.h"
     26
     27#if QT_VERSION < 0x040400
    2628#include "ThreadedFileWriter.h"
     29#else
     30#include "ThreadedFileWriterLockless.h"
     31#endif
     32
    2733#include "livetvchain.h"
    2834#include "DVDRingBuffer.h"
    2935#include "util.h"
     
    132138
    133139    if (write)
    134140    {
     141#if QT_VERSION < 0x040400
    135142        tfw = new ThreadedFileWriter(
     143#else
     144        tfw = new ThreadedFileWriterLockless(
     145#endif
    136146            filename, O_WRONLY|O_TRUNC|O_CREAT|O_LARGEFILE, 0644);
    137147
    138148        if (!tfw->Open())
  • mythtv/libs/libmythtv/RingBuffer.h

     
    1616
    1717class RemoteFile;
    1818class RemoteEncoder;
     19#if QT_VERSION < 0x040400
    1920class ThreadedFileWriter;
     21#else
     22class ThreadedFileWriterLockless;
     23#endif
    2024class DVDRingBufferPriv;
    2125class LiveTVChain;
    2226
     
    118122  private:
    119123    QString filename;
    120124
     125#if QT_VERSION < 0x040400
    121126    ThreadedFileWriter *tfw;
     127#else
     128    ThreadedFileWriterLockless *tfw;
     129#endif
    122130    int fd2;
    123131
    124132    bool writemode;