diff --git a/mythtv/libs/libmythbase/threadedfilewriter.cpp b/mythtv/libs/libmythbase/threadedfilewriter.cpp
index aa2fd9d..deec603 100644
a
|
b
|
void TFWWriteThread::run(void) |
32 | 32 | RunEpilog(); |
33 | 33 | } |
34 | 34 | |
35 | | /// \brief Runs ThreadedFileWriter::SyncLoop(void) |
36 | | void TFWSyncThread::run(void) |
37 | | { |
38 | | RunProlog(); |
39 | | m_parent->SyncLoop(); |
40 | | RunEpilog(); |
41 | | } |
42 | | |
43 | 35 | const uint ThreadedFileWriter::kMaxBufferSize = 8 * 1024 * 1024; |
44 | 36 | const uint ThreadedFileWriter::kMinWriteSize = 64 * 1024; |
45 | 37 | const uint ThreadedFileWriter::kMaxBlockSize = 1 * 1024 * 1024; |
… |
… |
const uint ThreadedFileWriter::kMaxBlockSize = 1 * 1024 * 1024; |
50 | 42 | * This class allows us manage the buffering when writing to |
51 | 43 | * disk. We write to the kernel image of the disk using one |
52 | 44 | * thread, and sync the kernel's image of the disk to hardware |
53 | | * using another thread. The goal here so to block as little as |
| 45 | * at most every seconds. The goal here so to block as little as |
54 | 46 | * possible when the classes using this class want to add data |
55 | 47 | * to the stream. |
56 | 48 | */ |
… |
… |
ThreadedFileWriter::ThreadedFileWriter(const QString &fname, |
68 | 60 | ignore_writes(false), tfw_min_write_size(kMinWriteSize), |
69 | 61 | totalBufferUse(0), |
70 | 62 | // threads |
71 | | writeThread(NULL), syncThread(NULL), |
| 63 | writeThread(NULL), |
72 | 64 | m_warned(false), m_blocking(false) |
73 | 65 | { |
74 | 66 | filename.detach(); |
… |
… |
bool ThreadedFileWriter::ReOpen(QString newFilename) |
88 | 80 | |
89 | 81 | if (fd >= 0) |
90 | 82 | { |
| 83 | Sync(); |
91 | 84 | close(fd); |
92 | 85 | fd = -1; |
93 | 86 | } |
… |
… |
bool ThreadedFileWriter::Open(void) |
135 | 128 | writeThread->start(); |
136 | 129 | } |
137 | 130 | |
138 | | if (!syncThread) |
139 | | { |
140 | | syncThread = new TFWSyncThread(this); |
141 | | syncThread->start(); |
142 | | } |
143 | | |
144 | 131 | return true; |
145 | 132 | } |
146 | 133 | } |
… |
… |
ThreadedFileWriter::~ThreadedFileWriter() |
155 | 142 | { /* tell child threads to exit */ |
156 | 143 | QMutexLocker locker(&buflock); |
157 | 144 | in_dtor = true; |
158 | | bufferSyncWait.wakeAll(); |
159 | 145 | bufferHasData.wakeAll(); |
160 | 146 | } |
161 | 147 | |
… |
… |
ThreadedFileWriter::~ThreadedFileWriter() |
178 | 164 | emptyBuffers.pop_front(); |
179 | 165 | } |
180 | 166 | |
181 | | if (syncThread) |
182 | | { |
183 | | syncThread->wait(); |
184 | | delete syncThread; |
185 | | syncThread = NULL; |
186 | | } |
187 | | |
188 | 167 | if (fd >= 0) |
189 | 168 | { |
| 169 | Sync(); |
190 | 170 | close(fd); |
191 | 171 | fd = -1; |
192 | 172 | } |
… |
… |
void ThreadedFileWriter::Flush(void) |
365 | 345 | */ |
366 | 346 | void ThreadedFileWriter::Sync(void) |
367 | 347 | { |
| 348 | LOG(VB_FILE, LOG_INFO, LOC + "Sync() called"); |
| 349 | |
368 | 350 | if (fd >= 0) |
369 | 351 | { |
370 | 352 | #if defined(_POSIX_SYNCHRONIZED_IO) && _POSIX_SYNCHRONIZED_IO > 0 |
… |
… |
void ThreadedFileWriter::SetWriteBufferMinWriteSize(uint newMinSize) |
390 | 372 | bufferHasData.wakeAll(); |
391 | 373 | } |
392 | 374 | |
393 | | /** \fn ThreadedFileWriter::SyncLoop(void) |
394 | | * \brief The thread run method that calls Sync(void). |
395 | | */ |
396 | | void ThreadedFileWriter::SyncLoop(void) |
397 | | { |
398 | | QMutexLocker locker(&buflock); |
399 | | while (!in_dtor) |
400 | | { |
401 | | locker.unlock(); |
402 | | |
403 | | Sync(); |
404 | | |
405 | | locker.relock(); |
406 | | bufferSyncWait.wait(&buflock, 1000); |
407 | | } |
408 | | } |
409 | | |
410 | 375 | /** \fn ThreadedFileWriter::DiskLoop(void) |
411 | 376 | * \brief The thread run method that actually calls writes to disk. |
412 | 377 | */ |
… |
… |
void ThreadedFileWriter::DiskLoop(void) |
422 | 387 | // Even if the bytes buffered is less than the minimum write |
423 | 388 | // size we do want to write to the OS buffers periodically. |
424 | 389 | // This timer makes sure we do. |
425 | | MythTimer minWriteTimer; |
| 390 | MythTimer minWriteTimer, lastSyncTimer; |
426 | 391 | minWriteTimer.start(); |
| 392 | lastSyncTimer.start(); |
427 | 393 | |
428 | 394 | while (!in_dtor) |
429 | 395 | { |
… |
… |
void ThreadedFileWriter::DiskLoop(void) |
482 | 448 | uint tot = 0; |
483 | 449 | uint errcnt = 0; |
484 | 450 | |
485 | | LOG(VB_FILE, LOG_DEBUG, LOC + QString("write(%1) cnt %2 total %3") |
| 451 | LOG(VB_FILE, LOG_INFO, LOC + QString("write(%1) cnt %2 total %3") |
486 | 452 | .arg(sz).arg(writeBuffers.size()) |
487 | 453 | .arg(totalBufferUse)); |
488 | 454 | |
… |
… |
void ThreadedFileWriter::DiskLoop(void) |
526 | 492 | bufferHasData.wait(locker.mutex(), 50); |
527 | 493 | } |
528 | 494 | |
| 495 | if (lastSyncTimer.elapsed() >= 1000) |
| 496 | { |
| 497 | locker.unlock(); |
| 498 | Sync(); |
| 499 | locker.relock(); |
| 500 | lastSyncTimer.restart(); |
| 501 | } |
| 502 | |
529 | 503 | ////////////////////////////////////////// |
530 | 504 | |
531 | 505 | buf->lastUsed = MythDate::current(); |
… |
… |
bool ThreadedFileWriter::SetBlocking(bool block) |
604 | 578 | bool old = m_blocking; |
605 | 579 | m_blocking = block; |
606 | 580 | return old; |
607 | | } |
608 | | No newline at end of file |
| 581 | } |
diff --git a/mythtv/libs/libmythbase/threadedfilewriter.h b/mythtv/libs/libmythbase/threadedfilewriter.h
index 1142d27..e06fc29 100644
a
|
b
|
class TFWWriteThread : public MThread |
28 | 28 | ThreadedFileWriter *m_parent; |
29 | 29 | }; |
30 | 30 | |
31 | | class TFWSyncThread : public MThread |
32 | | { |
33 | | public: |
34 | | TFWSyncThread(ThreadedFileWriter *p) : MThread("TFWSync"), m_parent(p) {} |
35 | | virtual ~TFWSyncThread() { wait(); m_parent = NULL; } |
36 | | virtual void run(void); |
37 | | private: |
38 | | ThreadedFileWriter *m_parent; |
39 | | }; |
40 | | |
41 | 31 | class MBASE_PUBLIC ThreadedFileWriter |
42 | 32 | { |
43 | 33 | friend class TFWWriteThread; |
44 | | friend class TFWSyncThread; |
| 34 | |
45 | 35 | public: |
46 | 36 | ThreadedFileWriter(const QString &fname, int flags, mode_t mode); |
47 | 37 | ~ThreadedFileWriter(); |
… |
… |
class MBASE_PUBLIC ThreadedFileWriter |
60 | 50 | |
61 | 51 | protected: |
62 | 52 | void DiskLoop(void); |
63 | | void SyncLoop(void); |
64 | 53 | void TrimEmptyBuffers(void); |
65 | 54 | |
66 | 55 | private: |
… |
… |
class MBASE_PUBLIC ThreadedFileWriter |
90 | 79 | |
91 | 80 | // threads |
92 | 81 | TFWWriteThread *writeThread; |
93 | | TFWSyncThread *syncThread; |
94 | 82 | |
95 | 83 | // wait conditions |
96 | 84 | QWaitCondition bufferEmpty; |
97 | 85 | QWaitCondition bufferHasData; |
98 | | QWaitCondition bufferSyncWait; |
99 | 86 | QWaitCondition bufferWasFreed; |
100 | 87 | |
101 | 88 | // constants |
diff --git a/mythtv/libs/libmythtv/fileringbuffer.cpp b/mythtv/libs/libmythtv/fileringbuffer.cpp
index 2548703..3c7cc92 100644
a
|
b
|
int FileRingBuffer::safe_read(int fd, void *data, uint sz) |
454 | 454 | |
455 | 455 | while (tot < sz) |
456 | 456 | { |
| 457 | LOG(VB_FILE, LOG_DEBUG, LOC + |
| 458 | QString("read(%1) -- begin").arg(sz)); |
457 | 459 | ret = read(fd2, (char *)data + tot, sz - tot); |
| 460 | LOG(VB_FILE, LOG_DEBUG, LOC + |
| 461 | QString("read(%1) -> %2 end").arg(sz).arg(ret)); |
| 462 | |
458 | 463 | if (ret < 0) |
459 | 464 | { |
460 | 465 | if (errno == EAGAIN) |
diff --git a/mythtv/libs/libmythtv/ringbuffer.cpp b/mythtv/libs/libmythtv/ringbuffer.cpp
index cbca8a6..5dbd48f 100644
a
|
b
|
void RingBuffer::CalcReadAheadThresh(void) |
348 | 348 | uint estbitrate = 0; |
349 | 349 | |
350 | 350 | readsallowed = false; |
| 351 | LOG(VB_FILE, LOG_INFO, LOC + |
| 352 | "CalcReadAheadThresh: readsallowed set to false."); |
351 | 353 | readblocksize = max(readblocksize, CHUNK); |
352 | 354 | |
353 | 355 | // loop without sleeping if the buffered data is less than this |
… |
… |
void RingBuffer::ResetReadAhead(long long newinternal) |
484 | 486 | internalreadpos = newinternal; |
485 | 487 | ateof = false; |
486 | 488 | readsallowed = false; |
| 489 | LOG(VB_FILE, LOG_INFO, LOC + |
| 490 | "ResetReadAhead: readsallowed set to false."); |
487 | 491 | setswitchtonext = false; |
488 | 492 | generalWait.wakeAll(); |
489 | 493 | |
… |
… |
void RingBuffer::run(void) |
762 | 766 | |
763 | 767 | LOG(VB_FILE, LOG_INFO, LOC + |
764 | 768 | QString("Initial readblocksize %1K & fill_min %2K") |
765 | | .arg(readblocksize/1024).arg(fill_min/1024)); |
| 769 | .arg(readblocksize/1024).arg(fill_min/1024)); |
766 | 770 | |
767 | 771 | while (readaheadrunning) |
768 | 772 | { |
769 | 773 | if (PauseAndWait()) |
770 | 774 | { |
771 | 775 | ignore_for_read_timing = true; |
| 776 | LOG(VB_FILE, LOG_DEBUG, LOC + "run: PauseAndWait Not reading continuing"); |
772 | 777 | continue; |
773 | 778 | } |
774 | 779 | |
… |
… |
void RingBuffer::run(void) |
783 | 788 | ignore_for_read_timing |= |
784 | 789 | (ignorereadpos >= 0) || commserror || stopreads; |
785 | 790 | generalWait.wait(&rwlock, (stopreads) ? 50 : 1000); |
| 791 | LOG(VB_FILE, LOG_DEBUG, LOC + "run: Not reading continuing"); |
786 | 792 | continue; |
787 | 793 | } |
788 | 794 | |
… |
… |
void RingBuffer::run(void) |
843 | 849 | ignore_for_read_timing = (totfree < readblocksize) ? true : false; |
844 | 850 | lastread = now; |
845 | 851 | |
| 852 | LOG(VB_FILE, LOG_INFO, LOC + "run(): rbwlock.lockForRead() start"); |
846 | 853 | rbwlock.lockForRead(); |
| 854 | LOG(VB_FILE, LOG_INFO, LOC + "run(): rbwlock.lockForRead() done"); |
847 | 855 | if (rbwpos + totfree > bufferSize) |
848 | 856 | { |
849 | 857 | totfree = bufferSize - rbwpos; |
850 | | LOG(VB_FILE, LOG_DEBUG, LOC + |
| 858 | LOG(VB_FILE, LOG_INFO, LOC + |
851 | 859 | "Shrinking read, near end of buffer"); |
852 | 860 | } |
853 | 861 | |
… |
… |
void RingBuffer::run(void) |
858 | 866 | "Reading enough data to start playback"); |
859 | 867 | } |
860 | 868 | |
861 | | LOG(VB_FILE, LOG_DEBUG, LOC + |
| 869 | LOG(VB_FILE, LOG_INFO, LOC + |
862 | 870 | QString("safe_read(...@%1, %2) -- begin") |
863 | 871 | .arg(rbwpos).arg(totfree)); |
864 | 872 | |
… |
… |
void RingBuffer::run(void) |
896 | 904 | LOC + QString("rbwpos += %1K requested %2K in read") |
897 | 905 | .arg(read_return/1024,3).arg(totfree/1024,3)); |
898 | 906 | } |
| 907 | else |
| 908 | { |
| 909 | LOG(VB_FILE, LOG_INFO, LOC + "rbwpos != rbwposcopy"); |
| 910 | } |
899 | 911 | rbwlock.unlock(); |
900 | 912 | poslock.unlock(); |
901 | 913 | } |
902 | 914 | } |
| 915 | else |
| 916 | { |
| 917 | LOG(VB_FILE, LOG_INFO, LOC + QString("We are not reading anything (totfree: %1 commserror:%2 ateof:%3 setswitchtonext:%4").arg(totfree).arg(commserror).arg(ateof).arg(setswitchtonext)); |
| 918 | } |
903 | 919 | |
904 | 920 | int used = bufferSize - ReadBufFree(); |
905 | 921 | |
… |
… |
void RingBuffer::run(void) |
945 | 961 | used = bufferSize - ReadBufFree(); |
946 | 962 | } |
947 | 963 | |
948 | | LOG(VB_FILE, LOG_DEBUG, LOC + "@ end of read ahead loop"); |
| 964 | LOG(VB_FILE, LOG_INFO, LOC + "@ end of read ahead loop"); |
949 | 965 | |
950 | 966 | if (!readsallowed || commserror || ateof || setswitchtonext || |
951 | 967 | (wanttoread <= used && wanttoread > 0)) |
… |
… |
bool RingBuffer::WaitForAvail(int count) |
1084 | 1100 | int elapsed = t.elapsed(); |
1085 | 1101 | if (elapsed > 500 && low_buffers && avail >= fill_min) |
1086 | 1102 | count = avail; |
1087 | | else if (((elapsed > 250) && (elapsed < 500)) || |
1088 | | ((elapsed > 500) && (elapsed < 750)) || |
| 1103 | else if (((elapsed > 500) && (elapsed < 750)) || |
1089 | 1104 | ((elapsed > 1000) && (elapsed < 1250)) || |
1090 | 1105 | ((elapsed > 2000) && (elapsed < 2250)) || |
1091 | 1106 | ((elapsed > 4000) && (elapsed < 4250)) || |
1092 | 1107 | ((elapsed > 8000) && (elapsed < 8250)) || |
1093 | 1108 | ((elapsed > 9000))) |
1094 | 1109 | { |
| 1110 | LOG(VB_FILE, LOG_INFO, LOC + QString("used = %1").arg(bufferSize - ReadBufFree())); |
1095 | 1111 | LOG(VB_GENERAL, LOG_INFO, LOC + "Waited " + |
1096 | 1112 | QString("%1").arg((elapsed / 250) * 0.25f, 3, 'f', 1) + |
1097 | 1113 | " seconds for data \n\t\t\tto become available..." + |
1098 | | QString(" %2 < %3") .arg(avail).arg(count)); |
| 1114 | QString(" %2 < %3") .arg(avail).arg(count) + QString(" thread:%1").arg(QThread::currentThreadId())); |
1099 | 1115 | } |
1100 | 1116 | |
1101 | 1117 | if (elapsed > 16000) |