Ticket #6330: locklesstfw-v2.diff

File locklesstfw-v2.diff, 28.3 KB (added by Matthias "mortalmatt" Dahl <devel@…>, 11 years ago)

ThreadedFileWriterLockless? v2

  • mythtv/libs/libmythtv/ThreadedFileWriterLockless.h

     
     1#ifndef TFW_LOCKLESS_H_
     2#define TFW_LOCKLESS_H_
     3
     4#include <pthread.h>
     5
     6#include <QAtomicInt>
     7#include <QAtomicPointer>
     8#include <QMutex>
     9#include <QWaitCondition>
     10#include <QString>
     11
     12class ThreadedFileWriterLockless
     13{
     14  public:
     15    ThreadedFileWriterLockless(const QString &fname, int flags, mode_t mode);
     16    ~ThreadedFileWriterLockless();
     17
     18    bool Open(void);
     19    long long Seek(long long pos, int whence);
     20    uint Write(const void *data, uint count);
     21
     22    void SetWriteBufferSize(uint newSize = DEFAULT_BUFFER_SIZE);
     23    void SetWriteBufferMinWriteSize(uint newMinSize = MIN_WRITE_SIZE);
     24
     25    uint BufUsed(void) const;
     26    uint BufFree(void) const;
     27
     28    void Sync(void);
     29    void Flush(void);
     30
     31    static uint safe_write(int fd, const void *data, uint count, bool &ok);
     32
     33  protected:
     34    static void *BootWriter(void *);
     35    void DiskWriterLoop(void);
     36
     37    long long WriteFwdFree(const char* readPtr, const char* writePtr) const;
     38    long long ReadFwdFree(const char* readPtr, const char* writePtr) const;
     39    long long WriteFwdContinuousFree(const char* readPtr, const char* writePtr) const;
     40    long long ReadFwdContinuousFree(const char* readPtr, const char* writePtr) const;
     41
     42    void SetupBuffer(uint reqBufferSize);
     43
     44  private:
     45    // file handling related
     46    QString    fileName;
     47    int        fileFlags;
     48    mode_t     fileMode;
     49    int        fileFD;
     50    QAtomicInt fileMinWriteSize; // IMPORTANT CHANGE: kilobytes
     51    long long  fileBytesWritten;
     52
     53    // state flags
     54    QAtomicInt isOpening;
     55    QAtomicInt isFlushing;
     56    QAtomicInt isWriterRunning;
     57    QAtomicInt isResettingBuffer;
     58    QAtomicInt isInDestructor;
     59    QAtomicInt isIgnoringWritesToBuffer;
     60    QAtomicInt isIgnoringWritesToDisk;
     61
     62    // locks and wait conditions
     63    mutable QMutex bufferWriterLock;
     64    mutable QMutex diskWriterLock;
     65
     66    QWaitCondition bufferEmpty;
     67    QWaitCondition bufferHasData;
     68    QWaitCondition bufferWroteData;
     69
     70    // buffer and related
     71    char                *buffer;
     72    char                *bufferEnd;
     73    unsigned long        bufferSize;
     74    QAtomicPointer<char> bufferReadPtr;
     75    QAtomicPointer<char> bufferWritePtr;
     76    QAtomicPointer<char> bufferAheadWritePtr;
     77    QAtomicInt           bufferReservationSlot;
     78    QAtomicInt           bufferReservationNextActiveSlot;
     79    QAtomicInt           numberOfActiveWrites;
     80
     81    // threads
     82    pthread_t       writer;
     83
     84  private:
     85    // constants
     86    /// default buffer size
     87    static const uint DEFAULT_BUFFER_SIZE;
     88    /// maximum chunk size to write to disk in one go
     89    static const uint MAX_WRITE_SIZE;
     90    /// minimum chunk size to write to disk in one go (except when flushing)
     91    static const uint MIN_WRITE_SIZE;
     92    /// number of extra bytes to reserve before and after the actual buffer
     93    static const uint BUFFER_CUSHION;
     94    /// maximum write errors for a single call to safe_write before giving up
     95    static const uint MAX_WRITE_ERRORS;
     96};
     97#endif
  • mythtv/libs/libmythtv/libmythtv.pro

     
    247247SOURCES += dtvmultiplex.cpp
    248248SOURCES += dtvconfparser.cpp        dtvconfparserhelpers.cpp
    249249
     250exists( $$[QT_INSTALL_HEADERS]/QtCore/QAtomicPointer ):exists( $$[QT_INSTALL_HEADERS]/QtCore/QAtomicInt ) {
     251    HEADERS -= ThreadedFileWriter.h
     252    HEADERS += ThreadedFileWriterLockless.h
     253    SOURCES -= ThreadedFileWriter.cpp
     254    SOURCES += ThreadedFileWriterLockless.cpp
     255}
     256
    250257using_frontend {
    251258    # Recording profile stuff
    252259    HEADERS += profilegroup.h
  • mythtv/libs/libmythtv/ThreadedFileWriterLockless.cpp

     
     1// ANSI C headers
     2#include <cstdio>
     3#include <cstdlib>
     4#include <cerrno>
     5#include <ctime>
     6#include <climits>
     7
     8// Unix C headers
     9#include <sys/types.h>
     10#include <sys/stat.h>
     11#include <unistd.h>
     12#include <signal.h>
     13#include <fcntl.h>
     14
     15// MythTV headers
     16#include "ThreadedFileWriterLockless.h"
     17#include "mythcontext.h"
     18#include "compat.h"
     19#include "mythverbose.h"
     20
     21#define LOC QString("TFWL: ")
     22#define LOC_ERR QString("TFWL, Error: ")
     23
     24const uint ThreadedFileWriterLockless::DEFAULT_BUFFER_SIZE = 2*1024*1024;
     25const uint ThreadedFileWriterLockless::MAX_WRITE_SIZE      = DEFAULT_BUFFER_SIZE / 4;
     26const uint ThreadedFileWriterLockless::MIN_WRITE_SIZE      = DEFAULT_BUFFER_SIZE / 1024 / 32;
     27const uint ThreadedFileWriterLockless::BUFFER_CUSHION      = 512;
     28const uint ThreadedFileWriterLockless::MAX_WRITE_ERRORS    = 10;
     29
     30/** \class ThreadedFileWriterLockless
     31 *  \brief A (almost) lock-free threaded disk writer that buffers data written
     32 *         to it and writes it to the disk in a seperate thread.
     33 *
     34 *  This class implements a threaded ringbuffer. All data written via ::Write()
     35 *  is inserted into the ringbuffer while a seperate thread reads the data from
     36 *  the buffer and writes it to disk.
     37 *
     38 *  The implementation is thread-safe and thus also reentrant.
     39 */
     40
     41/** \fn ThreadedFileWriterLockless::safe_write(int fd, const void *data, uint count, bool &ok)
     42 *  \brief Writes all data to disk and retries in case of errors.
     43 *
     44 *  The standard POSIX write() can return early with no or just a portion of
     45 *  the desired data written to disk. This function tries to make sure that
     46 *  all data gets written to disk by...
     47 *
     48 *    1) endlessly retrying in case of EAGAIN or EINTR errors
     49 *    2) retrying up to MAX_WRITE_ERRORS times in case of other errors
     50 *    3) writing out the rest amount of data until no data is left
     51 *
     52 *  \param fd    File descriptor
     53 *  \param data  Pointer to data to write
     54 *  \param count Size of data to write in bytes
     55 *
     56 *  \return Number of written bytes
     57 */
     58uint ThreadedFileWriterLockless::safe_write(int fd, const void *data,
     59                                            uint count, bool &ok)
     60{
     61    uint bytesWritten = 0;
     62    uint errors       = 0;
     63
     64    while(bytesWritten < count && errors < MAX_WRITE_ERRORS)
     65    {
     66        int ret = write(fd, (char*)data + bytesWritten, count - bytesWritten);
     67
     68        if(ret >= 0)
     69            bytesWritten += ret;
     70        else if(errno != EAGAIN || errno != EINTR)
     71            ++errors;
     72    }
     73
     74    if(errors < MAX_WRITE_ERRORS)
     75        ok = true;
     76    else
     77        ok = false;
     78
     79    return bytesWritten;
     80}
     81
     82/** \fn ThreadedFileWriterLockless::BootWriter(void *tfw)
     83 *  \brief Helper function which simply runs the DiskWriterLoop().
     84 *
     85 *  \param tfw Pointer to ThreadedFileWriterLockless instance
     86 */
     87void *ThreadedFileWriterLockless::BootWriter(void *tfw)
     88{
     89#ifndef USING_MINGW
     90    signal(SIGXFSZ, SIG_IGN);
     91#endif
     92    ((ThreadedFileWriterLockless *)tfw)->DiskWriterLoop();
     93    return NULL;
     94}
     95
     96/** \fn ThreadedFileWriterLockless::ThreadedFileWriterLockless(const QString &fname, int flags, mode_t mode)
     97 *  \brief Constructor
     98 *
     99 *  The constructor sets the filename, the access/creation/status flags and
     100 *  the file permissions. It does not create the file nor does it start the
     101 *  writer thread.
     102 *
     103 *  \param fname  Filename
     104 *  \param flags  Access/Creation/Status flags (see POSIX open())
     105 *  \param mode   File permissions (see POSIX open())
     106 */
     107ThreadedFileWriterLockless::ThreadedFileWriterLockless(const QString &fname,
     108                                                       int flags, mode_t mode) :
     109    // file handling related
     110    fileName(fname), fileFlags(flags), fileMode(mode), fileFD(-1),
     111    fileMinWriteSize(0), fileBytesWritten(0),
     112    // state flags
     113    isFlushing(0), isWriterRunning(0), isResettingBuffer(0),  isInDestructor(0),
     114    isIgnoringWritesToBuffer(0), isIgnoringWritesToDisk(0),
     115    // buffer and related
     116    buffer(NULL), bufferEnd(NULL), bufferSize(0), bufferReadPtr(NULL),
     117    bufferWritePtr(NULL), bufferAheadWritePtr(NULL),
     118    bufferReservationSlot(0), bufferReservationNextActiveSlot(0),
     119    numberOfActiveWrites(0)
     120{
     121    fileName.detach();
     122}
     123
     124/** \fn ThreadedFileWriterLockless::Open(void)
     125 *  \brief Opens the file we will be writing to and starts the DiskWriter thread.
     126 *
     127 *  \return TRUE if file was opened successfully and DiskWriter thread started
     128 */
     129bool ThreadedFileWriterLockless::Open(void)
     130{
     131    if(isWriterRunning)
     132    {
     133        VERBOSE(VB_IMPORTANT, LOC_ERR +
     134                QString("Opening file '%1' failed. Writer already running")
     135                .arg(fileName));
     136
     137        return false;
     138    }
     139    else if(!isOpening.testAndSetOrdered(0, 1))
     140    {
     141        return false;
     142    }
     143
     144    isFlushing               = 0;
     145    isResettingBuffer        = 0;
     146    isInDestructor           = 0;
     147    isIgnoringWritesToBuffer = 0;
     148    isIgnoringWritesToDisk   = 0;
     149
     150    bool result = false;
     151
     152    if (fileName == "-")
     153        fileFD = fileno(stdout);
     154    else
     155        fileFD = open(fileName.toAscii().constData(), fileFlags, fileMode);
     156
     157    if (fileFD < 0)
     158    {
     159        VERBOSE(VB_IMPORTANT, LOC_ERR +
     160                QString("Opening file '%1'.").arg(fileName) + ENO);
     161
     162        result = false;
     163    }
     164    else
     165    {
     166        SetupBuffer(DEFAULT_BUFFER_SIZE);
     167        fileMinWriteSize = MIN_WRITE_SIZE;
     168
     169        pthread_create(&writer, NULL, BootWriter, this);
     170/*
     171        if(gContext->GetNumSetting("RealtimePriority", 1))
     172        {
     173            // thread priority in respect to SCHED_RR: 1
     174            struct sched_param sp = {2};
     175
     176            if(!pthread_setschedparam(writer, SCHED_RR, &sp))
     177            {
     178                VERBOSE(VB_GENERAL, LOC +
     179                        "Using realtime priority.");
     180            }
     181            else
     182            {
     183                VERBOSE(VB_IMPORTANT, LOC_ERR +
     184                        "Realtime priority requires sufficient user priviledges.");
     185            }
     186        }
     187*/
     188        result = true;
     189    }
     190
     191    isOpening.fetchAndStoreOrdered(0);
     192    return result;
     193}
     194
     195/** \fn ThreadedFileWriterLockless::~ThreadedFileWriterLockless()
     196 *  \brief Destructor which commits all writes, ends the DiskWriter thread
     197 *         and closes the file.
     198 */
     199ThreadedFileWriterLockless::~ThreadedFileWriterLockless()
     200{
     201    isInDestructor.fetchAndStoreOrdered(1);
     202    isIgnoringWritesToBuffer.fetchAndAddOrdered(1);
     203
     204    if (fileFD >= 0)
     205    {
     206        // wake everyone
     207        bufferHasData.wakeAll();
     208        bufferEmpty.wakeAll();
     209
     210        // if there is still data, flush it to disk
     211        Flush();
     212
     213        pthread_join(writer, NULL);
     214
     215        close(fileFD);
     216    }
     217
     218    if (buffer)
     219        delete [] (buffer - BUFFER_CUSHION);
     220}
     221
     222/** \fn ThreadedFileWriterLockless::Write(const void *data, uint count)
     223 *  \brief Writes data to the internal buffer.
     224 *
     225 *  This function writes data to the internal buffer. It is guaranteed that
     226 *  the data won't get fragmented if there are parallel calls to this function.
     227 *  However in order to keep the internal data structures sane and prevent
     228 *  deadlocks and similar problems, the following limitations exist:
     229 *
     230 *   - if there concurrent calls and the ringbuffer is filled up to the point
     231 *     that there is not enough free space, a fallback to a FIFO-behaviour
     232 *     happens until all calls are served
     233 *
     234 *   - all concurrent calls will return one after one in the order their data
     235 *     were written to the internal ringbuffer. Up to that point tough, they
     236 *     are fully concurrent.
     237 *
     238 *  NOTE: Currently it is not supported to write data larger than the current
     239 *        ringbuffer size.
     240 *
     241 *  \param data  Pointer to data
     242 *  \param count Size of data in bytes
     243 *
     244 *  \return Returns the number of bytes written
     245 */
     246uint ThreadedFileWriterLockless::Write(const void *data, uint count)
     247{
     248    if(count == 0 || isIgnoringWritesToBuffer || isResettingBuffer
     249                  || isFlushing || isInDestructor || buffer == NULL)
     250        return 0;
     251
     252    // TODO: implement proper handling of data chunks larger than buffer size
     253    if (count >= bufferSize)
     254    {
     255        VERBOSE(VB_IMPORTANT, LOC_ERR +
     256                QString("ERROR: Unable to buffer data larger than buffer size: "
     257                        "%1 > %2.").arg(count).arg(bufferSize));
     258
     259        return 0;
     260    }
     261
     262    // register active write() - important for proper thread safe behaviour
     263    // of other functions like Flush()
     264    numberOfActiveWrites.fetchAndAddOrdered(1);
     265
     266    // if other other write() calls are queued, force this one to the queue as
     267    // well for proper fairness and to prevent starvation
     268    bool forcedQueue = (bufferReservationNextActiveSlot != bufferReservationSlot);
     269    bool hasSlot     = false;
     270    int  slotNumber  = 0;
     271
     272    char* localReadPtr           = NULL;
     273    char* localWritePtr          = NULL;
     274    char* newBufferAheadWritePtr = NULL;
     275    do
     276    {
     277        bool locked = false;
     278
     279        if(forcedQueue)
     280        {
     281            if(!hasSlot)
     282            {
     283                hasSlot    = true;
     284                slotNumber = bufferReservationSlot.fetchAndAddOrdered(-1);
     285            }
     286
     287            // for proper serialization of the waking waiters
     288            bufferWriterLock.lock();
     289            locked = true;
     290
     291            do
     292            {
     293                bufferWroteData.wait(&bufferWriterLock, 100);
     294
     295                if(isIgnoringWritesToBuffer || isResettingBuffer || isFlushing
     296                   || isInDestructor)
     297                {
     298                    bufferReservationNextActiveSlot.fetchAndAddOrdered(-1);
     299                    numberOfActiveWrites.fetchAndAddOrdered(-1);
     300                    bufferWriterLock.unlock();
     301                    return 0;
     302                }
     303            }
     304            while(bufferReservationNextActiveSlot != slotNumber);
     305
     306            forcedQueue = false;
     307        }
     308
     309        localReadPtr = static_cast<char*>(bufferReadPtr);
     310        localWritePtr= static_cast<char*>(bufferAheadWritePtr);
     311
     312        if(!forcedQueue && WriteFwdFree(localReadPtr, localWritePtr) >= count)
     313        {
     314            uint contSize = WriteFwdContinuousFree(localReadPtr, localWritePtr);
     315
     316            if(contSize >= count)
     317            {
     318                newBufferAheadWritePtr = localWritePtr + count;
     319            }
     320            else
     321            {
     322                newBufferAheadWritePtr = buffer + (count - contSize);
     323            }
     324        }
     325        else
     326        {
     327            forcedQueue = true;
     328        }
     329
     330        if(locked)
     331        {
     332            locked = false;
     333            bufferWriterLock.unlock();
     334        }
     335    }
     336    while(forcedQueue || !bufferAheadWritePtr.testAndSetOrdered(localWritePtr, newBufferAheadWritePtr));
     337
     338    if(hasSlot)
     339        bufferReservationNextActiveSlot.fetchAndAddOrdered(-1);
     340
     341    long long offset  = 0;
     342    uint totalWritten = 0;
     343    uint dataOffset   = 0;
     344    if(WriteFwdContinuousFree(localReadPtr, localWritePtr) < count)
     345    {
     346        uint chunkSize = WriteFwdContinuousFree(localReadPtr, localWritePtr);
     347        memcpy(localWritePtr, data, chunkSize);
     348
     349        count          -= chunkSize;
     350        totalWritten   += chunkSize;
     351        offset          = chunkSize;
     352        dataOffset      = chunkSize;
     353
     354        if(localWritePtr + offset > bufferEnd)
     355            offset = -(localWritePtr - buffer);
     356    }
     357    memcpy(localWritePtr + offset, (char*)data + dataOffset, count);
     358    totalWritten += count;
     359
     360    while(!bufferWritePtr.testAndSetOrdered(localWritePtr, newBufferAheadWritePtr))
     361    {
     362        struct timespec sleepTime = {0, 10};
     363        nanosleep(&sleepTime, NULL);
     364    }
     365
     366    bufferHasData.wakeAll();
     367
     368    numberOfActiveWrites.fetchAndAddOrdered(-1);
     369
     370    return totalWritten;
     371}
     372
     373/** \fn ThreadedFileWriterLockless::Seek(long long pos, int whence)
     374 *  \brief Seek to a position within the disk file
     375 *
     376 *  \warning This method has not been tested yet!
     377 *
     378 *  \param pos    New file offset (with regards to whence)
     379 *  \param whence Specifies how to interpret offset (see lseek())
     380 *
     381 *  \return Resulting offset
     382 */
     383long long ThreadedFileWriterLockless::Seek(long long pos, int whence)
     384{
     385    isIgnoringWritesToBuffer.fetchAndAddOrdered(1);
     386
     387    Flush();
     388
     389    long long result = lseek(fileFD, pos, whence);
     390
     391    isIgnoringWritesToBuffer.fetchAndAddOrdered(-1);
     392
     393    return result;
     394}
     395
     396/** \fn ThreadedFileWriterLockless::Flush(void)
     397 *  \brief Instructs the DiskWriter thread to flush all remaining data in the
     398 *         ringbuffer to the disk. If the DiskWriter thread is currently ignoring
     399 *         writes to the disk because of e.g. previous errors, the buffer just
     400 *         gets discarded.
     401 *
     402 *  \warning This call is expensive and no concurrent buffer writes are accepted.
     403 */
     404void ThreadedFileWriterLockless::Flush(void)
     405{
     406    // other functions rely on the fact that once Flush() returns, the
     407    // internal buffer is indeed emptied
     408    while(!isFlushing.testAndSetOrdered(0, 1))
     409        usleep(250);
     410
     411    isIgnoringWritesToBuffer.fetchAndAddOrdered(1);
     412
     413    // wake up DiskWriter thread
     414    bufferHasData.wakeAll();
     415
     416    diskWriterLock.lock();
     417    // loop as long as there is still data in the buffer or an ongoing write()
     418    // is registered which could potentially fill up the buffer
     419    while (BufUsed() > 0 || numberOfActiveWrites > 0)
     420    {
     421        if (!bufferEmpty.wait(&diskWriterLock, 2000))
     422           VERBOSE(VB_IMPORTANT, LOC + "Taking a long time to flush...");
     423    }
     424    diskWriterLock.unlock();
     425
     426    isIgnoringWritesToBuffer.fetchAndAddOrdered(-1);
     427
     428    isFlushing.fetchAndStoreOrdered(0);
     429}
     430
     431/** \fn ThreadedFileWriterLockless::Sync(void)
     432 *  \brief Instructs the OS to sync all file data associated data to the disk.
     433 *
     434 *  \warning This is a very expensive call which blocks until the data is on
     435 *           the disk. Some filesystems (e.g. ext3) sync the entire filesystem
     436 *           data to disk, not only the file associated ones. This can cause
     437 *           high latency and delays if used extensively. Use with care.
     438 */
     439void ThreadedFileWriterLockless::Sync(void)
     440{
     441    if (fileFD >= 0)
     442    {
     443#if defined(_POSIX_SYNCHRONIZED_IO) && _POSIX_SYNCHRONIZED_IO > 0
     444        fdatasync(fileFD);
     445#else
     446        fsync(fileFD);
     447#endif
     448    }
     449}
     450
     451/** \fn ThreadedFileWriterLockless::SetWriteBufferSize(uint newSize)
     452 *  \brief Sets the total size of the internal ringbuffer.
     453 *
     454 *  All data currently in the ringbuffer is flushed to the disk. During the
     455 *  time of the resetting, no writes to the buffer are accepted.
     456 *
     457 *  \warning This is an expensive call.
     458 *
     459 *  \param newSize New ringbuffer size in bytes
     460 */
     461void ThreadedFileWriterLockless::SetWriteBufferSize(uint newSize)
     462{
     463    if (newSize <= 0)
     464        return;
     465
     466    while(!isResettingBuffer.testAndSetOrdered(0, 1))
     467        usleep(250);
     468
     469    isIgnoringWritesToBuffer.fetchAndAddOrdered(1);
     470
     471    Flush();
     472    SetupBuffer(newSize);
     473
     474    isIgnoringWritesToBuffer.fetchAndAddOrdered(-1);
     475
     476    isResettingBuffer.fetchAndStoreOrdered(0);
     477}
     478
     479/** \fn ThreadedFileWriterLockless::SetWriteBufferMinWriteSize(uint newMinSize)
     480 *  \brief Sets the minumum number of bytes to write to disk in a single write.
     481 *
     482 *  \warning The size gets stored in kilobytes internally and is thus
     483*            susceptible to rounding effects.
     484 *
     485 *  \param newMinSize New minimum size in bytes for a single write
     486 */
     487void ThreadedFileWriterLockless::SetWriteBufferMinWriteSize(uint newMinSize)
     488{
     489    if (newMinSize <= 0)
     490        return;
     491
     492    newMinSize /= 1024;
     493    if(newMinSize > INT_MAX)
     494        newMinSize = INT_MAX;
     495
     496    fileMinWriteSize.fetchAndStoreOrdered(newMinSize);
     497}
     498
     499/** \fn ThreadedFileWriterLockless::DiskLoop(void)
     500 *  \brief DiskWriter thread which reads data from the ringbuffer and writes it
     501 *         out to the disk.
     502 *
     503 *  The DiskWriterLoop() tries to free space as soon as possible but is limited
     504 *  by the minimum and maximum write size for a single write. Setting those
     505 *  higher can increase throughput but also latency.
     506 *
     507 */
     508void ThreadedFileWriterLockless::DiskWriterLoop(void)
     509{
     510    while (!isInDestructor || BufUsed() > 0)
     511    {
     512        uint bufferUsage = BufUsed();
     513
     514        if (!bufferUsage)
     515            bufferEmpty.wakeAll();
     516
     517        if (!bufferUsage || isResettingBuffer || (!isInDestructor && !isFlushing &&
     518             (((uint)(fileMinWriteSize * 1024) > bufferUsage) &&
     519             ((fileMinWriteSize * 1024) <= fileBytesWritten))))
     520        {
     521            diskWriterLock.lock();
     522            bufferHasData.wait(&diskWriterLock, 250);
     523            diskWriterLock.unlock();
     524            continue;
     525        }
     526
     527        uint  currWritten   = 0;
     528        char* localReadPtr  = bufferReadPtr;
     529        char* localWritePtr = bufferWritePtr;
     530        bool  wasWriteOk    = true;
     531
     532        while(!isResettingBuffer && wasWriteOk && bufferUsage > 0
     533              && ReadFwdFree(localReadPtr, localWritePtr) > 0)
     534        {
     535            long long continuousSize = ReadFwdContinuousFree(localReadPtr, localWritePtr);
     536            long long chunkSize = (bufferUsage <= continuousSize) ? bufferUsage : continuousSize;
     537
     538            if(chunkSize > MAX_WRITE_SIZE && !isFlushing)
     539                chunkSize = MAX_WRITE_SIZE;
     540
     541            if(!isIgnoringWritesToDisk)
     542                chunkSize = safe_write(fileFD, localReadPtr, chunkSize, wasWriteOk);
     543
     544            bufferUsage  -= chunkSize;
     545            currWritten  += chunkSize;
     546            localReadPtr += chunkSize;
     547
     548            if(localReadPtr > bufferEnd)
     549                localReadPtr = buffer;
     550
     551            bufferReadPtr.fetchAndStoreOrdered(localReadPtr);
     552            bufferWroteData.wakeAll();
     553        }
     554
     555        if (!isIgnoringWritesToDisk && !wasWriteOk && (EFBIG == errno))
     556        {
     557            QString msg =
     558                "Maximum file size exceeded by '%1'"
     559                "\n\t\t\t"
     560                "You must either change the process ulimits, configure"
     561                "\n\t\t\t"
     562                "your operating system with \"Large File\" support, or use"
     563                "\n\t\t\t"
     564                "a filesystem which supports 64-bit or 128-bit files."
     565                "\n\t\t\t"
     566                "HINT: FAT32 is a 32-bit filesystem.";
     567
     568            VERBOSE(VB_IMPORTANT, msg.arg(fileName));
     569            isIgnoringWritesToBuffer.fetchAndAddOrdered(1);
     570            isIgnoringWritesToDisk.fetchAndAddOrdered(1);
     571        }
     572
     573        // if we wrote anything, make sure it gets uncached
     574        if(!isIgnoringWritesToDisk && currWritten > 0)
     575        {
     576            posix_fadvise(fileFD, fileBytesWritten, currWritten, POSIX_FADV_DONTNEED);
     577        }
     578
     579        if(!isIgnoringWritesToDisk)
     580            fileBytesWritten += currWritten;
     581    }
     582}
     583
     584/** \fn ThreadedFileWriterLockless::BufUsed(void) const
     585 *  \brief Gets the buffer usage in bytes which are queued for disk write
     586 *
     587 *  \warning It is not safe to call this function while a buffer reset is
     588 *           in progress!
     589 *
     590 *  \return Buffer usage in bytes
     591 */
     592uint ThreadedFileWriterLockless::BufUsed(void) const
     593{
     594    return (bufferSize-1) - BufFree();
     595}
     596
     597/** \fn ThreadedFileWriterLockless::BufFree(void) const
     598 *  \brief Gets the free buffer space in bytes
     599 *
     600 *  \warning It is not safe to call this function while a buffer reset is
     601 *           in progress!
     602 *
     603 *  \return Free buffer space in bytes
     604 */
     605uint ThreadedFileWriterLockless::BufFree(void) const
     606{
     607    return WriteFwdFree(bufferReadPtr, bufferWritePtr);
     608}
     609
     610/** \fn ThreadedFileWriterLockless::WriteFwdFree(void) const
     611 *  \brief Gets the number of bytes available for write to the ring buffer
     612 *         from the write pointer with regards to the passed pointers.
     613 *
     614 *  \warning It is not safe to call this function while a buffer reset is
     615 *           in progress!
     616 *
     617 *  \return Free buffer space in bytes for write.
     618 */
     619long long ThreadedFileWriterLockless::WriteFwdFree(const char* readPtr, const char* writePtr) const
     620{
     621    return (readPtr - writePtr) + ((readPtr <= writePtr) ? bufferSize : 0) - 1;
     622}
     623
     624/** \fn ThreadedFileWriterLockless::ReadFwdFree(void) const
     625 *  \brief Gets the number of bytes available for read from the ring buffer
     626 *         from the read pointer with regards to the passed pointers.
     627 *
     628 *  \warning It is not safe to call this function while a buffer reset is
     629 *           in progress!
     630 *
     631 *  \return Bytes available for read from the buffer
     632 */
     633long long ThreadedFileWriterLockless::ReadFwdFree(const char *readPtr,
     634                                                  const char *writePtr) const
     635{
     636    return (writePtr - readPtr) + ((readPtr <= writePtr) ? 0 : bufferSize);
     637}
     638
     639/** \fn ThreadedFileWriterLockless::WriteFwdContinuousFree(void) const
     640 *  \brief Gets the number of connected bytes available for write to the ring buffer
     641 *         from the write pointer with regards to the passed pointers.
     642 *
     643 *  \warning It is not safe to call this function while a buffer reset is
     644 *           in progress!
     645 *
     646 *  \return Free buffer space in bytes for write.
     647 */
     648long long ThreadedFileWriterLockless::WriteFwdContinuousFree(const char* readPtr,
     649                                                             const char* writePtr) const
     650{
     651    uint offset = (readPtr > buffer) ? 1 : 0;
     652
     653    return (readPtr <= writePtr) ? (bufferEnd - writePtr) + offset
     654                          : (readPtr - writePtr) - 1;
     655}
     656
     657/** \fn ThreadedFileWriterLockless::ReadFwdContinuousFree(void) const
     658 *  \brief Gets the number of connected bytes available for read from the ring
     659 *         buffer from the read pointer with regards to the passed pointers.
     660 *
     661 *  \warning It is not safe to call this function while a buffer reset is
     662 *           in progress!
     663 *
     664 *  \return Bytes available for read from the buffer
     665 */
     666long long ThreadedFileWriterLockless::ReadFwdContinuousFree(const char* readPtr,
     667                                                            const char* writePtr) const
     668{
     669    return ((readPtr <= writePtr) ? writePtr : (bufferEnd + 1)) - readPtr;
     670}
     671
     672/** \fn ThreadedFileWriterLockless::SetupBuffer(void)
     673 *  \brief Helper function which sets up the buffer and all necessary pointers.
     674 *
     675 *  \warning This function does not make sure no buffer accesses happen while
     676 *           the new buffer is being set up.
     677 *
     678 *  \param reqBufferSize Requested buffer size
     679 */
     680void ThreadedFileWriterLockless::SetupBuffer(uint reqBufferSize)
     681{
     682    if(buffer)
     683        delete [] (buffer - BUFFER_CUSHION);
     684
     685    buffer = new char[1 + reqBufferSize + 2 * BUFFER_CUSHION];
     686    bzero(buffer, reqBufferSize + 2 * BUFFER_CUSHION);
     687
     688    buffer    += BUFFER_CUSHION;
     689    bufferSize = reqBufferSize;
     690    bufferEnd  = buffer + (reqBufferSize-1);
     691
     692    bufferWritePtr.fetchAndStoreOrdered(buffer);
     693    bufferAheadWritePtr.fetchAndStoreOrdered(buffer);
     694    bufferReadPtr.fetchAndStoreOrdered(buffer);
     695}
  • mythtv/libs/libmythtv/RingBuffer.cpp

     
    2323#include "RingBuffer.h"
    2424#include "remotefile.h"
    2525#include "remoteencoder.h"
     26
     27#if QT_VERSION < 0x040400
    2628#include "ThreadedFileWriter.h"
     29#else
     30#include "ThreadedFileWriterLockless.h"
     31#endif
     32
    2733#include "livetvchain.h"
    2834#include "DVDRingBuffer.h"
    2935#include "util.h"
     
    132138
    133139    if (write)
    134140    {
     141#if QT_VERSION < 0x040400
    135142        tfw = new ThreadedFileWriter(
     143#else
     144        tfw = new ThreadedFileWriterLockless(
     145#endif
    136146            filename, O_WRONLY|O_TRUNC|O_CREAT|O_LARGEFILE, 0644);
    137147
    138148        if (!tfw->Open())
  • mythtv/libs/libmythtv/RingBuffer.h

     
    1616
    1717class RemoteFile;
    1818class RemoteEncoder;
     19#if QT_VERSION < 0x040400
    1920class ThreadedFileWriter;
     21#else
     22class ThreadedFileWriterLockless;
     23#endif
    2024class DVDRingBufferPriv;
    2125class LiveTVChain;
    2226
     
    118122  private:
    119123    QString filename;
    120124
     125#if QT_VERSION < 0x040400
    121126    ThreadedFileWriter *tfw;
     127#else
     128    ThreadedFileWriterLockless *tfw;
     129#endif
    122130    int fd2;
    123131
    124132    bool writemode;