Ticket #6330: locklesstfw-v3.diff

File locklesstfw-v3.diff, 28.1 KB (added by Matthias "mortalmatt" Dahl <devel@…>, 11 years ago)
  • mythtv/libs/libmythtv/ThreadedFileWriterLockless.h

     
     1#ifndef TFW_LOCKLESS_H_
     2#define TFW_LOCKLESS_H_
     3
     4#include <pthread.h>
     5
     6#include <QAtomicInt>
     7#include <QAtomicPointer>
     8#include <QMutex>
     9#include <QWaitCondition>
     10#include <QString>
     11
     12class ThreadedFileWriterLockless
     13{
     14  public:
     15    ThreadedFileWriterLockless(const QString &fname, int flags, mode_t mode);
     16    ~ThreadedFileWriterLockless();
     17
     18    bool Open(void);
     19    long long Seek(long long pos, int whence);
     20    uint Write(const void *data, uint count);
     21
     22    void SetWriteBufferSize(uint newSize = DEFAULT_BUFFER_SIZE);
     23    void SetWriteBufferMinWriteSize(uint newMinSize = MIN_WRITE_SIZE);
     24
     25    uint BufUsed(void) const;
     26    uint BufFree(void) const;
     27
     28    void Sync(void);
     29    void Flush(void);
     30
     31    static uint safe_write(int fd, const void *data, uint count, bool &ok);
     32
     33  protected:
     34    static void *BootWriter(void *);
     35    void DiskWriterLoop(void);
     36
     37    long long WriteFwdFree(const char* readPtr, const char* writePtr) const;
     38    long long ReadFwdFree(const char* readPtr, const char* writePtr) const;
     39    long long WriteFwdContinuousFree(const char* readPtr, const char* writePtr) const;
     40    long long ReadFwdContinuousFree(const char* readPtr, const char* writePtr) const;
     41
     42    void SetupBuffer(uint reqBufferSize);
     43
     44  private:
     45    // file handling related
     46    QString              fileName;
     47    int                  fileFlags;
     48    mode_t               fileMode;
     49    int                  fileFD;
     50    QAtomicPointer<uint> fileMinWriteSize;
     51    long long            fileBytesWritten;
     52
     53    // state flags
     54    QAtomicInt isOpening;
     55    QAtomicInt isFlushing;
     56    QAtomicInt isWriterRunning;
     57    QAtomicInt isResettingBuffer;
     58    QAtomicInt isInDestructor;
     59    QAtomicInt isIgnoringWritesToBuffer;
     60    QAtomicInt isIgnoringWritesToDisk;
     61
     62    // locks and wait conditions
     63    mutable QMutex bufferWriterLock;
     64    mutable QMutex diskWriterLock;
     65
     66    QWaitCondition bufferEmpty;
     67    QWaitCondition bufferHasData;
     68    QWaitCondition bufferWroteData;
     69
     70    // buffer and related
     71    char                *buffer;
     72    char                *bufferEnd;
     73    unsigned long        bufferSize;
     74    QAtomicPointer<char> bufferReadPtr;
     75    QAtomicPointer<char> bufferWritePtr;
     76    QAtomicPointer<char> bufferAheadWritePtr;
     77    QAtomicInt           bufferReservationSlot;
     78    QAtomicInt           bufferReservationNextActiveSlot;
     79    QAtomicInt           numberOfActiveWrites;
     80
     81    // threads
     82    pthread_t writer;
     83
     84  private:
     85    // constants
     86    /// default buffer size
     87    static const uint DEFAULT_BUFFER_SIZE;
     88    /// maximum chunk size to write to disk in one go
     89    static const uint MAX_WRITE_SIZE;
     90    /// minimum chunk size to write to disk in one go (except when flushing)
     91    static const uint MIN_WRITE_SIZE;
     92    /// number of extra bytes to reserve before and after the actual buffer
     93    static const uint BUFFER_CUSHION;
     94    /// maximum write errors for a single call to safe_write before giving up
     95    static const uint MAX_WRITE_ERRORS;
     96};
     97#endif
  • mythtv/libs/libmythtv/libmythtv.pro

     
    163163HEADERS += channelsettings.h        previewgenerator.h
    164164HEADERS += transporteditor.h
    165165HEADERS += myth_imgconvert.h
     166HEADERS += ThreadedFileWriterLockless.h
    166167
    167168# Remove when everything is switched to MythUI
    168169HEADERS += proglist_qt.h
     
    186187SOURCES += progdetails.cpp
    187188SOURCES += channelsettings.cpp      previewgenerator.cpp
    188189SOURCES += transporteditor.cpp
     190SOURCES += ThreadedFileWriterLockless.cpp
    189191
    190192contains( CONFIG_SWSCALE, yes ) {
    191193    SOURCES += myth_imgconvert.cpp
  • mythtv/libs/libmythtv/ThreadedFileWriterLockless.cpp

     
     1// ANSI C headers
     2#include <cerrno>
     3#include <cstring>
     4
     5// Unix C headers
     6#include <sys/types.h>
     7#include <sys/stat.h>
     8#include <fcntl.h>
     9#include <unistd.h>
     10#include <signal.h>
     11
     12// MythTV headers
     13#include "ThreadedFileWriterLockless.h"
     14#include "mythcontext.h"
     15#include "compat.h"
     16#include "mythverbose.h"
     17
     18#define LOC QString("TFWL: ")
     19#define LOC_ERR QString("TFWL, Error: ")
     20
     21const uint ThreadedFileWriterLockless::DEFAULT_BUFFER_SIZE = 2*1024*1024;
     22const uint ThreadedFileWriterLockless::MAX_WRITE_SIZE      = DEFAULT_BUFFER_SIZE / 4;
     23const uint ThreadedFileWriterLockless::MIN_WRITE_SIZE      = DEFAULT_BUFFER_SIZE / 1024 / 32;
     24const uint ThreadedFileWriterLockless::BUFFER_CUSHION      = 512;
     25const uint ThreadedFileWriterLockless::MAX_WRITE_ERRORS    = 10;
     26
     27/** \class ThreadedFileWriterLockless
     28 *  \brief A (almost) lock-free threaded disk writer that buffers data written
     29 *         to it and writes it to the disk in a seperate thread.
     30 *
     31 *  This class implements a threaded ringbuffer. All data written via ::Write()
     32 *  is inserted into the ringbuffer while a seperate thread reads the data from
     33 *  the buffer and writes it to disk.
     34 *
     35 *  The implementation is thread-safe and thus also reentrant.
     36 */
     37
     38/** \fn ThreadedFileWriterLockless::safe_write(int fd, const void *data, uint count, bool &ok)
     39 *  \brief Writes all data to disk and retries in case of errors.
     40 *
     41 *  The standard POSIX write() can return early with no or just a portion of
     42 *  the desired data written to disk. This function tries to make sure that
     43 *  all data gets written to disk by...
     44 *
     45 *    1) endlessly retrying in case of EAGAIN or EINTR errors
     46 *    2) retrying up to MAX_WRITE_ERRORS times in case of other errors
     47 *    3) writing out the rest amount of data until no data is left
     48 *
     49 *  \param fd    File descriptor
     50 *  \param data  Pointer to data to write
     51 *  \param count Size of data to write in bytes
     52 *
     53 *  \return Number of written bytes
     54 */
     55uint ThreadedFileWriterLockless::safe_write(int fd, const void *data,
     56                                            uint count, bool &ok)
     57{
     58    uint bytesWritten = 0;
     59    uint errors       = 0;
     60
     61    while(bytesWritten < count && errors < MAX_WRITE_ERRORS)
     62    {
     63        int ret = write(fd, (char*)data + bytesWritten, count - bytesWritten);
     64
     65        if(ret >= 0)
     66            bytesWritten += ret;
     67        else if(errno != EAGAIN || errno != EINTR)
     68            ++errors;
     69    }
     70
     71    if(errors < MAX_WRITE_ERRORS)
     72        ok = true;
     73    else
     74        ok = false;
     75
     76    return bytesWritten;
     77}
     78
     79/** \fn ThreadedFileWriterLockless::BootWriter(void *tfw)
     80 *  \brief Helper function which simply runs the DiskWriterLoop().
     81 *
     82 *  \param tfw Pointer to ThreadedFileWriterLockless instance
     83 */
     84void *ThreadedFileWriterLockless::BootWriter(void *tfw)
     85{
     86#ifndef USING_MINGW
     87    signal(SIGXFSZ, SIG_IGN);
     88#endif
     89    ((ThreadedFileWriterLockless *)tfw)->DiskWriterLoop();
     90    return NULL;
     91}
     92
     93/** \fn ThreadedFileWriterLockless::ThreadedFileWriterLockless(const QString &fname, int flags, mode_t mode)
     94 *  \brief Constructor
     95 *
     96 *  The constructor sets the filename, the access/creation/status flags and
     97 *  the file permissions. It does not create the file nor does it start the
     98 *  writer thread.
     99 *
     100 *  \param fname  Filename
     101 *  \param flags  Access/Creation/Status flags (see POSIX open())
     102 *  \param mode   File permissions (see POSIX open())
     103 */
     104ThreadedFileWriterLockless::ThreadedFileWriterLockless(const QString &fname,
     105                                                       int flags, mode_t mode) :
     106    // file handling related
     107    fileName(fname), fileFlags(flags), fileMode(mode), fileFD(-1),
     108    fileMinWriteSize(NULL), fileBytesWritten(0),
     109    // state flags
     110    isFlushing(0), isWriterRunning(0), isResettingBuffer(0),  isInDestructor(0),
     111    isIgnoringWritesToBuffer(0), isIgnoringWritesToDisk(0),
     112    // buffer and related
     113    buffer(NULL), bufferEnd(NULL), bufferSize(0), bufferReadPtr(NULL),
     114    bufferWritePtr(NULL), bufferAheadWritePtr(NULL),
     115    bufferReservationSlot(0), bufferReservationNextActiveSlot(0),
     116    numberOfActiveWrites(0)
     117{
     118    fileName.detach();
     119}
     120
     121/** \fn ThreadedFileWriterLockless::Open(void)
     122 *  \brief Opens the file we will be writing to and starts the DiskWriter thread.
     123 *
     124 *  \return TRUE if file was opened successfully and DiskWriter thread started
     125 */
     126bool ThreadedFileWriterLockless::Open(void)
     127{
     128    if(isWriterRunning)
     129    {
     130        VERBOSE(VB_IMPORTANT, LOC_ERR +
     131                QString("Opening file '%1' failed. Writer already running")
     132                .arg(fileName));
     133
     134        return false;
     135    }
     136    else if(!isOpening.testAndSetOrdered(0, 1))
     137    {
     138        return false;
     139    }
     140
     141    isFlushing               = 0;
     142    isResettingBuffer        = 0;
     143    isInDestructor           = 0;
     144    isIgnoringWritesToBuffer = 0;
     145    isIgnoringWritesToDisk   = 0;
     146
     147    bool result = false;
     148
     149    if (fileName == "-")
     150        fileFD = fileno(stdout);
     151    else
     152        fileFD = open(fileName.toAscii().constData(), fileFlags, fileMode);
     153
     154    if (fileFD < 0)
     155    {
     156        VERBOSE(VB_IMPORTANT, LOC_ERR +
     157                QString("Opening file '%1'.").arg(fileName) + ENO);
     158
     159        result = false;
     160    }
     161    else
     162    {
     163        SetupBuffer(DEFAULT_BUFFER_SIZE);
     164        if(fileMinWriteSize == NULL)
     165            fileMinWriteSize = new uint(MIN_WRITE_SIZE);
     166
     167        pthread_create(&writer, NULL, BootWriter, this);
     168/*
     169        if(gContext->GetNumSetting("RealtimePriority", 1))
     170        {
     171            // thread priority in respect to SCHED_RR: 1
     172            struct sched_param sp = {2};
     173
     174            if(!pthread_setschedparam(writer, SCHED_RR, &sp))
     175            {
     176                VERBOSE(VB_GENERAL, LOC +
     177                        "Using realtime priority.");
     178            }
     179            else
     180            {
     181                VERBOSE(VB_IMPORTANT, LOC_ERR +
     182                        "Realtime priority requires sufficient user priviledges.");
     183            }
     184        }
     185*/
     186        result = true;
     187    }
     188
     189    isOpening.fetchAndStoreOrdered(0);
     190    return result;
     191}
     192
     193/** \fn ThreadedFileWriterLockless::~ThreadedFileWriterLockless()
     194 *  \brief Destructor which commits all writes, ends the DiskWriter thread
     195 *         and closes the file.
     196 */
     197ThreadedFileWriterLockless::~ThreadedFileWriterLockless()
     198{
     199    isInDestructor.fetchAndStoreOrdered(1);
     200    isIgnoringWritesToBuffer.fetchAndAddOrdered(1);
     201
     202    if (fileFD >= 0)
     203    {
     204        // wake everyone
     205        bufferHasData.wakeAll();
     206        bufferEmpty.wakeAll();
     207
     208        // if there is still data, flush it to disk
     209        Flush();
     210
     211        pthread_join(writer, NULL);
     212
     213        close(fileFD);
     214    }
     215
     216    if (buffer)
     217        delete [] (buffer - BUFFER_CUSHION);
     218
     219    if (fileMinWriteSize)
     220        delete fileMinWriteSize;
     221}
     222
     223/** \fn ThreadedFileWriterLockless::Write(const void *data, uint count)
     224 *  \brief Writes data to the internal buffer.
     225 *
     226 *  This function writes data to the internal buffer. It is guaranteed that
     227 *  the data won't get fragmented if there are parallel calls to this function.
     228 *  However in order to keep the internal data structures sane and prevent
     229 *  deadlocks and similar problems, the following limitations exist:
     230 *
     231 *   - if there concurrent calls and the ringbuffer is filled up to the point
     232 *     that there is not enough free space, a fallback to a FIFO-behaviour
     233 *     happens until all calls are served
     234 *
     235 *   - all concurrent calls will return one after one in the order their data
     236 *     were written to the internal ringbuffer. Up to that point tough, they
     237 *     are fully concurrent.
     238 *
     239 *  NOTE: Currently it is not supported to write data larger than the current
     240 *        ringbuffer size.
     241 *
     242 *  \param data  Pointer to data
     243 *  \param count Size of data in bytes
     244 *
     245 *  \return Returns the number of bytes written
     246 */
     247uint ThreadedFileWriterLockless::Write(const void *data, uint count)
     248{
     249    if(count == 0 || isIgnoringWritesToBuffer || isResettingBuffer
     250                  || isFlushing || isInDestructor || buffer == NULL)
     251        return 0;
     252
     253    // TODO: implement proper handling of data chunks larger than buffer size
     254    if (count >= bufferSize)
     255    {
     256        VERBOSE(VB_IMPORTANT, LOC_ERR +
     257                QString("ERROR: Unable to buffer data larger than buffer size: "
     258                        "%1 > %2.").arg(count).arg(bufferSize));
     259
     260        return 0;
     261    }
     262
     263    // register active write() - important for proper thread safe behaviour
     264    // of other functions like Flush()
     265    numberOfActiveWrites.fetchAndAddOrdered(1);
     266
     267    // if other other write() calls are queued, force this one to the queue as
     268    // well for proper fairness and to prevent starvation
     269    bool forcedQueue = (bufferReservationNextActiveSlot != bufferReservationSlot);
     270    bool hasSlot     = false;
     271    int  slotNumber  = 0;
     272
     273    char* localReadPtr           = NULL;
     274    char* localWritePtr          = NULL;
     275    char* newBufferAheadWritePtr = NULL;
     276    do
     277    {
     278        bool locked = false;
     279
     280        if(forcedQueue)
     281        {
     282            if(!hasSlot)
     283            {
     284                hasSlot    = true;
     285                slotNumber = bufferReservationSlot.fetchAndAddOrdered(-1);
     286            }
     287
     288            // for proper serialization of the waking waiters
     289            bufferWriterLock.lock();
     290            locked = true;
     291
     292            do
     293            {
     294                bufferWroteData.wait(&bufferWriterLock, 100);
     295
     296                if(isIgnoringWritesToBuffer || isResettingBuffer || isFlushing
     297                   || isInDestructor)
     298                {
     299                    bufferReservationNextActiveSlot.fetchAndAddOrdered(-1);
     300                    numberOfActiveWrites.fetchAndAddOrdered(-1);
     301                    bufferWriterLock.unlock();
     302                    return 0;
     303                }
     304            }
     305            while(bufferReservationNextActiveSlot != slotNumber);
     306
     307            forcedQueue = false;
     308        }
     309
     310        localReadPtr = static_cast<char*>(bufferReadPtr);
     311        localWritePtr= static_cast<char*>(bufferAheadWritePtr);
     312
     313        if(!forcedQueue && WriteFwdFree(localReadPtr, localWritePtr) >= count)
     314        {
     315            uint contSize = WriteFwdContinuousFree(localReadPtr, localWritePtr);
     316
     317            if(contSize >= count)
     318            {
     319                newBufferAheadWritePtr = localWritePtr + count;
     320            }
     321            else
     322            {
     323                newBufferAheadWritePtr = buffer + (count - contSize);
     324            }
     325        }
     326        else
     327        {
     328            forcedQueue = true;
     329        }
     330
     331        if(locked)
     332        {
     333            locked = false;
     334            bufferWriterLock.unlock();
     335        }
     336    }
     337    while(forcedQueue || !bufferAheadWritePtr.testAndSetOrdered(localWritePtr, newBufferAheadWritePtr));
     338
     339    if(hasSlot)
     340        bufferReservationNextActiveSlot.fetchAndAddOrdered(-1);
     341
     342    long long offset  = 0;
     343    uint totalWritten = 0;
     344    uint dataOffset   = 0;
     345    if(WriteFwdContinuousFree(localReadPtr, localWritePtr) < count)
     346    {
     347        uint chunkSize = WriteFwdContinuousFree(localReadPtr, localWritePtr);
     348        memcpy(localWritePtr, data, chunkSize);
     349
     350        count          -= chunkSize;
     351        totalWritten   += chunkSize;
     352        offset          = chunkSize;
     353        dataOffset      = chunkSize;
     354
     355        if(localWritePtr + offset > bufferEnd)
     356            offset = -(localWritePtr - buffer);
     357    }
     358    memcpy(localWritePtr + offset, (char*)data + dataOffset, count);
     359    totalWritten += count;
     360
     361    while(!bufferWritePtr.testAndSetOrdered(localWritePtr, newBufferAheadWritePtr))
     362    {
     363        struct timespec sleepTime = {0, 10};
     364        nanosleep(&sleepTime, NULL);
     365    }
     366
     367    bufferHasData.wakeAll();
     368
     369    numberOfActiveWrites.fetchAndAddOrdered(-1);
     370
     371    return totalWritten;
     372}
     373
     374/** \fn ThreadedFileWriterLockless::Seek(long long pos, int whence)
     375 *  \brief Seek to a position within the disk file
     376 *
     377 *  \warning This method has not been tested yet!
     378 *
     379 *  \param pos    New file offset (with regards to whence)
     380 *  \param whence Specifies how to interpret offset (see lseek())
     381 *
     382 *  \return Resulting offset
     383 */
     384long long ThreadedFileWriterLockless::Seek(long long pos, int whence)
     385{
     386    isIgnoringWritesToBuffer.fetchAndAddOrdered(1);
     387
     388    Flush();
     389
     390    long long result = lseek(fileFD, pos, whence);
     391
     392    isIgnoringWritesToBuffer.fetchAndAddOrdered(-1);
     393
     394    return result;
     395}
     396
     397/** \fn ThreadedFileWriterLockless::Flush(void)
     398 *  \brief Instructs the DiskWriter thread to flush all remaining data in the
     399 *         ringbuffer to the disk. If the DiskWriter thread is currently ignoring
     400 *         writes to the disk because of e.g. previous errors, the buffer just
     401 *         gets discarded.
     402 *
     403 *  \warning This call is expensive and no concurrent buffer writes are accepted.
     404 */
     405void ThreadedFileWriterLockless::Flush(void)
     406{
     407    // other functions rely on the fact that once Flush() returns, the
     408    // internal buffer is indeed emptied
     409    while(!isFlushing.testAndSetOrdered(0, 1))
     410        usleep(250);
     411
     412    isIgnoringWritesToBuffer.fetchAndAddOrdered(1);
     413
     414    // wake up DiskWriter thread
     415    bufferHasData.wakeAll();
     416
     417    diskWriterLock.lock();
     418    // loop as long as there is still data in the buffer or an ongoing write()
     419    // is registered which could potentially fill up the buffer
     420    while (BufUsed() > 0 || numberOfActiveWrites > 0)
     421    {
     422        if (!bufferEmpty.wait(&diskWriterLock, 2000))
     423           VERBOSE(VB_IMPORTANT, LOC + "Taking a long time to flush...");
     424    }
     425    diskWriterLock.unlock();
     426
     427    isIgnoringWritesToBuffer.fetchAndAddOrdered(-1);
     428
     429    isFlushing.fetchAndStoreOrdered(0);
     430}
     431
     432/** \fn ThreadedFileWriterLockless::Sync(void)
     433 *  \brief Instructs the OS to sync all file data associated data to the disk.
     434 *
     435 *  \warning This is a very expensive call which blocks until the data is on
     436 *           the disk. Some filesystems (e.g. ext3) sync the entire filesystem
     437 *           data to disk, not only the file associated ones. This can cause
     438 *           high latency and delays if used extensively. Use with care.
     439 */
     440void ThreadedFileWriterLockless::Sync(void)
     441{
     442    if (fileFD >= 0)
     443    {
     444#if defined(_POSIX_SYNCHRONIZED_IO) && _POSIX_SYNCHRONIZED_IO > 0
     445        fdatasync(fileFD);
     446#else
     447        fsync(fileFD);
     448#endif
     449    }
     450}
     451
     452/** \fn ThreadedFileWriterLockless::SetWriteBufferSize(uint newSize)
     453 *  \brief Sets the total size of the internal ringbuffer.
     454 *
     455 *  All data currently in the ringbuffer is flushed to the disk. During the
     456 *  time of the resetting, no writes to the buffer are accepted.
     457 *
     458 *  \warning This is an expensive call.
     459 *
     460 *  \param newSize New ringbuffer size in bytes
     461 */
     462void ThreadedFileWriterLockless::SetWriteBufferSize(uint newSize)
     463{
     464    if (newSize <= 0)
     465        return;
     466
     467    while(!isResettingBuffer.testAndSetOrdered(0, 1))
     468        usleep(250);
     469
     470    isIgnoringWritesToBuffer.fetchAndAddOrdered(1);
     471
     472    Flush();
     473    SetupBuffer(newSize);
     474
     475    isIgnoringWritesToBuffer.fetchAndAddOrdered(-1);
     476
     477    isResettingBuffer.fetchAndStoreOrdered(0);
     478}
     479
     480/** \fn ThreadedFileWriterLockless::SetWriteBufferMinWriteSize(uint newMinSize)
     481 *  \brief Sets the minumum number of bytes to write to disk in a single write.
     482 *
     483 *  \param newMinSize New minimum size in bytes for a single write
     484 */
     485void ThreadedFileWriterLockless::SetWriteBufferMinWriteSize(uint newMinSize)
     486{
     487    if (newMinSize <= 0)
     488        return;
     489
     490    delete(fileMinWriteSize.fetchAndStoreOrdered(new uint(newMinSize)));
     491}
     492
     493/** \fn ThreadedFileWriterLockless::DiskLoop(void)
     494 *  \brief DiskWriter thread which reads data from the ringbuffer and writes it
     495 *         out to the disk.
     496 *
     497 *  The DiskWriterLoop() tries to free space as soon as possible but is limited
     498 *  by the minimum and maximum write size for a single write. Setting those
     499 *  higher can increase throughput but also latency.
     500 *
     501 */
     502void ThreadedFileWriterLockless::DiskWriterLoop(void)
     503{
     504    while (!isInDestructor || BufUsed() > 0)
     505    {
     506        uint bufferUsage = BufUsed();
     507
     508        if (!bufferUsage)
     509            bufferEmpty.wakeAll();
     510
     511        if (!bufferUsage || isResettingBuffer || (!isInDestructor && !isFlushing &&
     512            *fileMinWriteSize > bufferUsage && *fileMinWriteSize <= fileBytesWritten))
     513        {
     514            diskWriterLock.lock();
     515            bufferHasData.wait(&diskWriterLock, 250);
     516            diskWriterLock.unlock();
     517            continue;
     518        }
     519
     520        uint  currWritten   = 0;
     521        char* localReadPtr  = bufferReadPtr;
     522        char* localWritePtr = bufferWritePtr;
     523        bool  wasWriteOk    = true;
     524
     525        while(!isResettingBuffer && wasWriteOk && bufferUsage > 0
     526              && ReadFwdFree(localReadPtr, localWritePtr) > 0)
     527        {
     528            long long continuousSize = ReadFwdContinuousFree(localReadPtr, localWritePtr);
     529            long long chunkSize = (bufferUsage <= continuousSize) ? bufferUsage : continuousSize;
     530
     531            if(chunkSize > MAX_WRITE_SIZE && !isFlushing)
     532                chunkSize = MAX_WRITE_SIZE;
     533
     534            if(!isIgnoringWritesToDisk)
     535                chunkSize = safe_write(fileFD, localReadPtr, chunkSize, wasWriteOk);
     536
     537            bufferUsage  -= chunkSize;
     538            currWritten  += chunkSize;
     539            localReadPtr += chunkSize;
     540
     541            if(localReadPtr > bufferEnd)
     542                localReadPtr = buffer;
     543
     544            bufferReadPtr.fetchAndStoreOrdered(localReadPtr);
     545            bufferWroteData.wakeAll();
     546        }
     547
     548        if (!isIgnoringWritesToDisk && !wasWriteOk && (EFBIG == errno))
     549        {
     550            QString msg =
     551                "Maximum file size exceeded by '%1'"
     552                "\n\t\t\t"
     553                "You must either change the process ulimits, configure"
     554                "\n\t\t\t"
     555                "your operating system with \"Large File\" support, or use"
     556                "\n\t\t\t"
     557                "a filesystem which supports 64-bit or 128-bit files."
     558                "\n\t\t\t"
     559                "HINT: FAT32 is a 32-bit filesystem.";
     560
     561            VERBOSE(VB_IMPORTANT, msg.arg(fileName));
     562            isIgnoringWritesToBuffer.fetchAndAddOrdered(1);
     563            isIgnoringWritesToDisk.fetchAndAddOrdered(1);
     564        }
     565
     566        // if we wrote anything, make sure it gets uncached
     567        if(!isIgnoringWritesToDisk && currWritten > 0)
     568        {
     569            posix_fadvise(fileFD, fileBytesWritten, currWritten, POSIX_FADV_DONTNEED);
     570        }
     571
     572        if(!isIgnoringWritesToDisk)
     573            fileBytesWritten += currWritten;
     574    }
     575}
     576
     577/** \fn ThreadedFileWriterLockless::BufUsed(void) const
     578 *  \brief Gets the buffer usage in bytes which are queued for disk write
     579 *
     580 *  \warning It is not safe to call this function while a buffer reset is
     581 *           in progress!
     582 *
     583 *  \return Buffer usage in bytes
     584 */
     585uint ThreadedFileWriterLockless::BufUsed(void) const
     586{
     587    return (bufferSize-1) - BufFree();
     588}
     589
     590/** \fn ThreadedFileWriterLockless::BufFree(void) const
     591 *  \brief Gets the free buffer space in bytes
     592 *
     593 *  \warning It is not safe to call this function while a buffer reset is
     594 *           in progress!
     595 *
     596 *  \return Free buffer space in bytes
     597 */
     598uint ThreadedFileWriterLockless::BufFree(void) const
     599{
     600    return WriteFwdFree(bufferReadPtr, bufferWritePtr);
     601}
     602
     603/** \fn ThreadedFileWriterLockless::WriteFwdFree(void) const
     604 *  \brief Gets the number of bytes available for write to the ring buffer
     605 *         from the write pointer with regards to the passed pointers.
     606 *
     607 *  \warning It is not safe to call this function while a buffer reset is
     608 *           in progress!
     609 *
     610 *  \return Free buffer space in bytes for write.
     611 */
     612long long ThreadedFileWriterLockless::WriteFwdFree(const char* readPtr, const char* writePtr) const
     613{
     614    return (readPtr - writePtr) + ((readPtr <= writePtr) ? bufferSize : 0) - 1;
     615}
     616
     617/** \fn ThreadedFileWriterLockless::ReadFwdFree(void) const
     618 *  \brief Gets the number of bytes available for read from the ring buffer
     619 *         from the read pointer with regards to the passed pointers.
     620 *
     621 *  \warning It is not safe to call this function while a buffer reset is
     622 *           in progress!
     623 *
     624 *  \return Bytes available for read from the buffer
     625 */
     626long long ThreadedFileWriterLockless::ReadFwdFree(const char *readPtr,
     627                                                  const char *writePtr) const
     628{
     629    return (writePtr - readPtr) + ((readPtr <= writePtr) ? 0 : bufferSize);
     630}
     631
     632/** \fn ThreadedFileWriterLockless::WriteFwdContinuousFree(void) const
     633 *  \brief Gets the number of connected bytes available for write to the ring buffer
     634 *         from the write pointer with regards to the passed pointers.
     635 *
     636 *  \warning It is not safe to call this function while a buffer reset is
     637 *           in progress!
     638 *
     639 *  \return Free buffer space in bytes for write.
     640 */
     641long long ThreadedFileWriterLockless::WriteFwdContinuousFree(const char* readPtr,
     642                                                             const char* writePtr) const
     643{
     644    uint offset = (readPtr > buffer) ? 1 : 0;
     645
     646    return (readPtr <= writePtr) ? (bufferEnd - writePtr) + offset
     647                          : (readPtr - writePtr) - 1;
     648}
     649
     650/** \fn ThreadedFileWriterLockless::ReadFwdContinuousFree(void) const
     651 *  \brief Gets the number of connected bytes available for read from the ring
     652 *         buffer from the read pointer with regards to the passed pointers.
     653 *
     654 *  \warning It is not safe to call this function while a buffer reset is
     655 *           in progress!
     656 *
     657 *  \return Bytes available for read from the buffer
     658 */
     659long long ThreadedFileWriterLockless::ReadFwdContinuousFree(const char* readPtr,
     660                                                            const char* writePtr) const
     661{
     662    return ((readPtr <= writePtr) ? writePtr : (bufferEnd + 1)) - readPtr;
     663}
     664
     665/** \fn ThreadedFileWriterLockless::SetupBuffer(void)
     666 *  \brief Helper function which sets up the buffer and all necessary pointers.
     667 *
     668 *  \warning This function does not make sure no buffer accesses happen while
     669 *           the new buffer is being set up.
     670 *
     671 *  \param reqBufferSize Requested buffer size
     672 */
     673void ThreadedFileWriterLockless::SetupBuffer(uint reqBufferSize)
     674{
     675    if(buffer)
     676        delete [] (buffer - BUFFER_CUSHION);
     677
     678    buffer = new char[1 + reqBufferSize + 2 * BUFFER_CUSHION];
     679    bzero(buffer, reqBufferSize + 2 * BUFFER_CUSHION);
     680
     681    buffer    += BUFFER_CUSHION;
     682    bufferSize = reqBufferSize;
     683    bufferEnd  = buffer + (reqBufferSize-1);
     684
     685    bufferWritePtr.fetchAndStoreOrdered(buffer);
     686    bufferAheadWritePtr.fetchAndStoreOrdered(buffer);
     687    bufferReadPtr.fetchAndStoreOrdered(buffer);
     688}
  • mythtv/libs/libmythtv/RingBuffer.cpp

     
    2323#include "RingBuffer.h"
    2424#include "remotefile.h"
    2525#include "remoteencoder.h"
     26
    2627#include "ThreadedFileWriter.h"
     28#include "ThreadedFileWriterLockless.h"
     29
    2730#include "livetvchain.h"
    2831#include "DVDRingBuffer.h"
    2932#include "util.h"
     
    132135
    133136    if (write)
    134137    {
     138#if QT_VERSION < 0x040400
    135139        tfw = new ThreadedFileWriter(
     140#else
     141        tfw = new ThreadedFileWriterLockless(
     142#endif
    136143            filename, O_WRONLY|O_TRUNC|O_CREAT|O_LARGEFILE, 0644);
    137144
    138145        if (!tfw->Open())
  • mythtv/libs/libmythtv/RingBuffer.h

     
    1616
    1717class RemoteFile;
    1818class RemoteEncoder;
     19
    1920class ThreadedFileWriter;
     21class ThreadedFileWriterLockless;
     22
    2023class DVDRingBufferPriv;
    2124class LiveTVChain;
    2225
     
    118121  private:
    119122    QString filename;
    120123
     124#if QT_VERSION < 0x040400
    121125    ThreadedFileWriter *tfw;
     126#else
     127    ThreadedFileWriterLockless *tfw;
     128#endif
    122129    int fd2;
    123130
    124131    bool writemode;