MythTV master
threadedfilewriter.cpp
Go to the documentation of this file.
1// C++ headers
2#include <cerrno>
3#include <csignal>
4#include <cstdio>
5#include <cstdlib>
6#include <cstring>
7#include <fcntl.h>
8#include <sys/stat.h>
9#include <sys/types.h>
10#include <unistd.h>
11
12// Qt headers
13#include <QtGlobal>
14#if QT_VERSION >= QT_VERSION_CHECK(6,0,0)
15#include <QtSystemDetection>
16#endif
17#include <QString>
18
19// MythTV headers
20#include "threadedfilewriter.h"
21#include "mythlogging.h"
22#include "mythcorecontext.h"
23
24#include "mythtimer.h"
25#include "compat.h"
26#include "mythdate.h"
27
28#define LOC QString("TFW(%1:%2): ").arg(m_filename).arg(m_fd)
29
32{
33 RunProlog();
35 RunEpilog();
36}
37
40{
41 RunProlog();
43 RunEpilog();
44}
45
46const uint ThreadedFileWriter::kMaxBufferSize = 8 * 1024 * 1024;
48const uint ThreadedFileWriter::kMaxBlockSize = 1 * 1024 * 1024;
49
67bool ThreadedFileWriter::ReOpen(const QString& newFilename)
68{
69 Flush();
70
71 m_bufLock.lock();
72
73 if (m_fd >= 0)
74 {
75 close(m_fd);
76 m_fd = -1;
77 }
78
79 if (m_registered)
80 {
82 }
83
84 if (!newFilename.isEmpty())
85 m_filename = newFilename;
86
87 m_bufLock.unlock();
88
89 return Open();
90}
91
97{
98 m_ignoreWrites = false;
99
100 if (m_filename == "-")
101 m_fd = fileno(stdout);
102 else
103 {
104 QByteArray fname = m_filename.toLocal8Bit();
105 m_fd = open(fname.constData(), m_flags, m_mode);
106 }
107
108 if (m_fd < 0)
109 {
110 LOG(VB_GENERAL, LOG_ERR, LOC +
111 QString("Opening file '%1'.").arg(m_filename) + ENO);
112 return false;
113 }
114
116 m_registered = true;
117
118 LOG(VB_FILE, LOG_INFO, LOC + "Open() successful");
119
120#ifdef Q_OS_WINDOWS
121 _setmode(m_fd, _O_BINARY);
122#endif
123 if (!m_writeThread)
124 {
125 m_writeThread = new TFWWriteThread(this);
127 }
128
129 if (!m_syncThread)
130 {
131 m_syncThread = new TFWSyncThread(this);
133 }
134
135 return true;
136}
137
142{
143 Flush();
144
145 { /* tell child threads to exit */
146 QMutexLocker locker(&m_bufLock);
147 m_inDtor = true;
148 m_bufferSyncWait.wakeAll();
149 m_bufferHasData.wakeAll();
150 }
151
152 if (m_writeThread)
153 {
155 delete m_writeThread;
156 m_writeThread = nullptr;
157 }
158
159 while (!m_writeBuffers.empty())
160 {
161 delete m_writeBuffers.front();
162 m_writeBuffers.pop_front();
163 }
164
165 while (!m_emptyBuffers.empty())
166 {
167 delete m_emptyBuffers.front();
168 m_emptyBuffers.pop_front();
169 }
170
171 if (m_syncThread)
172 {
174 delete m_syncThread;
175 m_syncThread = nullptr;
176 }
177
178 if (m_fd >= 0)
179 {
180 close(m_fd);
181 m_fd = -1;
182 }
183
185 m_registered = false;
186}
187
194int ThreadedFileWriter::Write(const void *data, uint count)
195{
196 if (count == 0)
197 return 0;
198
199 QMutexLocker locker(&m_bufLock);
200
201 if (m_ignoreWrites)
202 return -1;
203
204 uint written = 0;
205 uint left = count;
206
207 while (written < count)
208 {
209 uint towrite = (left > kMaxBlockSize) ? kMaxBlockSize : left;
210
211 if ((m_totalBufferUse + towrite) > (kMaxBufferSize * (m_blocking ? 1 : 8)))
212 {
213 if (!m_blocking)
214 {
215 LOG(VB_GENERAL, LOG_ERR, LOC +
216 "Maximum buffer size exceeded."
217 "\n\t\t\tfile will be truncated, no further writing "
218 "will be done."
219 "\n\t\t\tThis generally indicates your disk performance "
220 "\n\t\t\tis insufficient to deal with the number of on-going "
221 "\n\t\t\trecordings, or you have a disk failure.");
222 m_ignoreWrites = true;
223 return -1;
224 }
225 if (!m_warned)
226 {
227 LOG(VB_GENERAL, LOG_WARNING, LOC +
228 "Maximum buffer size exceeded."
229 "\n\t\t\tThis generally indicates your disk performance "
230 "\n\t\t\tis insufficient or you have a disk failure.");
231 m_warned = true;
232 }
233 // wait until some was written to disk, and try again
234 if (!m_bufferWasFreed.wait(locker.mutex(), 1000))
235 {
236 LOG(VB_GENERAL, LOG_DEBUG, LOC +
237 QString("Taking a long time waiting to write.. "
238 "buffer size %1 (needing %2, %3 to go)")
239 .arg(m_totalBufferUse).arg(towrite)
240 .arg(towrite-(kMaxBufferSize-m_totalBufferUse)));
241 }
242 continue;
243 }
244
245 TFWBuffer *buf = nullptr;
246
247 if (!m_writeBuffers.empty() &&
248 (m_writeBuffers.back()->data.size() + towrite) < kMinWriteSize)
249 {
250 buf = m_writeBuffers.back();
251 m_writeBuffers.pop_back();
252 }
253 else
254 {
255 if (!m_emptyBuffers.empty())
256 {
257 buf = m_emptyBuffers.front();
258 m_emptyBuffers.pop_front();
259 buf->data.clear();
260 }
261 else
262 {
263 buf = new TFWBuffer();
264 }
265 }
266
267 m_totalBufferUse += towrite;
268
269 const char *cdata = (const char*) data + written;
270 buf->data.insert(buf->data.end(), cdata, cdata+towrite);
272
273 m_writeBuffers.push_back(buf);
274
275 if ((m_writeBuffers.size() > 1) || (buf->data.size() >= kMinWriteSize))
276 {
277 m_bufferHasData.wakeAll();
278 }
279
280 written += towrite;
281 left -= towrite;
282 }
283
284 LOG(VB_FILE, LOG_DEBUG, LOC + QString("Write(*, %1) total %2 cnt %3")
285 .arg(count,4).arg(m_totalBufferUse).arg(m_writeBuffers.size()));
286
287 return count;
288}
289
301long long ThreadedFileWriter::Seek(long long pos, int whence)
302{
303 QMutexLocker locker(&m_bufLock);
304 m_flush = true;
305 while (!m_writeBuffers.empty())
306 {
307 m_bufferHasData.wakeAll();
308 if (!m_bufferEmpty.wait(locker.mutex(), 2000))
309 {
310 LOG(VB_GENERAL, LOG_WARNING, LOC +
311 QString("Taking a long time to flush.. buffer size %1")
312 .arg(m_totalBufferUse));
313 }
314 }
315 m_flush = false;
316 return lseek(m_fd, pos, whence);
317}
318
323{
324 QMutexLocker locker(&m_bufLock);
325 m_flush = true;
326 while (!m_writeBuffers.empty())
327 {
328 m_bufferHasData.wakeAll();
329 if (!m_bufferEmpty.wait(locker.mutex(), 2000))
330 {
331 LOG(VB_GENERAL, LOG_WARNING, LOC +
332 QString("Taking a long time to flush.. buffer size %1")
333 .arg(m_totalBufferUse));
334 }
335 }
336 m_flush = false;
337}
338
360{
361 if (m_fd >= 0)
362 {
363#if defined(_POSIX_SYNCHRONIZED_IO) && _POSIX_SYNCHRONIZED_IO > 0
364 // fdatasync tries to avoid updating metadata, but will in
365 // practice always update metadata if any data is written
366 // as the file will usually have grown.
367 fdatasync(m_fd);
368#else
369 fsync(m_fd);
370#endif
371 }
372}
373
379{
380 QMutexLocker locker(&m_bufLock);
381 if (newMinSize > 0)
382 m_tfwMinWriteSize = newMinSize;
383 m_bufferHasData.wakeAll();
384}
385
390{
391 QMutexLocker locker(&m_bufLock);
392 while (!m_inDtor)
393 {
394 locker.unlock();
395
396 Sync();
397
398 locker.relock();
399
401 {
402 // we aren't going to write to the disk anymore, so can de-register
404 m_registered = false;
405 }
406 m_bufferSyncWait.wait(&m_bufLock, 1000);
407 }
408}
409
414{
415#ifndef Q_OS_WINDOWS
416 // don't exit program if file gets larger than quota limit..
417 signal(SIGXFSZ, SIG_IGN);
418#endif
419
420 QMutexLocker locker(&m_bufLock);
421
422 // Even if the bytes buffered is less than the minimum write
423 // size we do want to write to the OS buffers periodically.
424 // This timer makes sure we do.
425 MythTimer minWriteTimer;
426 MythTimer lastRegisterTimer;
427 minWriteTimer.start();
428 lastRegisterTimer.start();
429
430 uint64_t total_written = 0LL;
431
432 while (!m_inDtor)
433 {
434 if (m_ignoreWrites)
435 {
436 while (!m_writeBuffers.empty())
437 {
438 delete m_writeBuffers.front();
439 m_writeBuffers.pop_front();
440 }
441 while (!m_emptyBuffers.empty())
442 {
443 delete m_emptyBuffers.front();
444 m_emptyBuffers.pop_front();
445 }
446 m_bufferEmpty.wakeAll();
447 m_bufferHasData.wait(locker.mutex());
448 continue;
449 }
450
451 if (m_writeBuffers.empty())
452 {
453 m_bufferEmpty.wakeAll();
454 m_bufferHasData.wait(locker.mutex(), 1000);
456 continue;
457 }
458
459 auto mwte = minWriteTimer.elapsed();
460 if (!m_flush && (mwte < 250ms) && (m_totalBufferUse < kMinWriteSize))
461 {
462 m_bufferHasData.wait(locker.mutex(), (250ms - mwte).count());
464 continue;
465 }
466
467 if (m_fd == -1)
468 {
469 m_bufferHasData.wait(locker.mutex(), 200);
471 continue;
472 }
473
474 TFWBuffer *buf = m_writeBuffers.front();
475 m_writeBuffers.pop_front();
476 m_totalBufferUse -= buf->data.size();
477 m_bufferWasFreed.wakeAll();
478 minWriteTimer.start();
479
481
482 const void *data = (buf->data).data();
483 uint sz = buf->data.size();
484
485 bool write_ok = true;
486 uint tot = 0;
487 uint errcnt = 0;
488
489 LOG(VB_FILE, LOG_DEBUG, LOC + QString("write(%1) cnt %2 total %3")
490 .arg(sz).arg(m_writeBuffers.size())
491 .arg(m_totalBufferUse));
492
493 MythTimer writeTimer;
494 writeTimer.start();
495
496 while ((tot < sz) && !m_inDtor)
497 {
498 locker.unlock();
499
500 int ret = write(m_fd, (char *)data + tot, sz - tot);
501
502 if (ret < 0)
503 {
504 if (errno == EAGAIN)
505 {
506 LOG(VB_GENERAL, LOG_WARNING, LOC + "Got EAGAIN.");
507 }
508 else
509 {
510 errcnt++;
511 LOG(VB_GENERAL, LOG_ERR, LOC + "File I/O " +
512 QString(" errcnt: %1").arg(errcnt) + ENO);
513 }
514
515 if ((errcnt >= 3) || (ENOSPC == errno) || (EFBIG == errno))
516 {
517 locker.relock();
518 write_ok = false;
519 break;
520 }
521 }
522 else
523 {
524 tot += ret;
525 total_written += ret;
526 LOG(VB_FILE, LOG_DEBUG, LOC +
527 QString("total written so far: %1 bytes")
528 .arg(total_written));
529 }
530
531 locker.relock();
532
533 if ((tot < sz) && !m_inDtor)
534 m_bufferHasData.wait(locker.mutex(), 50);
535 }
536
538
539 if (lastRegisterTimer.elapsed() >= 10s)
540 {
542 m_registered = true;
543 lastRegisterTimer.restart();
544 }
545
547 m_emptyBuffers.push_back(buf);
548
549 if (writeTimer.elapsed() > 1s)
550 {
551 LOG(VB_GENERAL, LOG_WARNING, LOC +
552 QString("write(%1) cnt %2 total %3 -- took a long time, %4 ms")
553 .arg(sz).arg(m_writeBuffers.size())
554 .arg(m_totalBufferUse).arg(writeTimer.elapsed().count()));
555 }
556
557 if (!write_ok && ((EFBIG == errno) || (ENOSPC == errno)))
558 {
559 QString msg;
560 switch (errno)
561 {
562 case EFBIG:
563 msg =
564 "Maximum file size exceeded by '%1'"
565 "\n\t\t\t"
566 "You must either change the process ulimits, configure"
567 "\n\t\t\t"
568 "your operating system with \"Large File\" support, "
569 "or use"
570 "\n\t\t\t"
571 "a filesystem which supports 64-bit or 128-bit files."
572 "\n\t\t\t"
573 "HINT: FAT32 is a 32-bit filesystem.";
574 break;
575 case ENOSPC:
576 msg =
577 "No space left on the device for file '%1'"
578 "\n\t\t\t"
579 "file will be truncated, no further writing "
580 "will be done.";
581 break;
582 }
583
584 LOG(VB_GENERAL, LOG_ERR, LOC + msg.arg(m_filename));
585 m_ignoreWrites = true;
586 }
587 }
588}
589
591{
592 QDateTime cur = MythDate::current();
593 QDateTime cur_m_60 = cur.addSecs(-60);
594
595 QList<TFWBuffer*>::iterator it = m_emptyBuffers.begin();
596 while (it != m_emptyBuffers.end())
597 {
598 if (((*it)->lastUsed < cur_m_60) ||
599 ((*it)->data.capacity() > 3 * (*it)->data.size() &&
600 (*it)->data.capacity() > 64 * 1024LL))
601 {
602 delete *it;
603 it = m_emptyBuffers.erase(it);
604 continue;
605 }
606 ++it;
607 }
608}
609
618{
619 bool old = m_blocking;
620 m_blocking = block;
621 return old;
622}
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:196
void start(QThread::Priority p=QThread::InheritPriority)
Tell MThread to start running the thread in the near future.
Definition: mthread.cpp:283
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:209
bool wait(std::chrono::milliseconds time=std::chrono::milliseconds::max())
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:300
void RegisterFileForWrite(const QString &file, uint64_t size=0LL)
void UnregisterFileForWrite(const QString &file)
A QElapsedTimer based timer to replace use of QTime as a timer.
Definition: mythtimer.h:14
std::chrono::milliseconds restart(void)
Returns milliseconds elapsed since last start() or restart() and resets the count.
Definition: mythtimer.cpp:62
std::chrono::milliseconds elapsed(void)
Returns milliseconds elapsed since last start() or restart()
Definition: mythtimer.cpp:91
void start(void)
starts measuring elapsed time.
Definition: mythtimer.cpp:47
void run(void) override
Runs ThreadedFileWriter::SyncLoop(void)
ThreadedFileWriter * m_parent
void run(void) override
Runs ThreadedFileWriter::DiskLoop(void)
ThreadedFileWriter * m_parent
void DiskLoop(void)
The thread run method that actually calls writes to disk.
void SetWriteBufferMinWriteSize(uint newMinSize=kMinWriteSize)
Sets the minumum number of bytes to write to disk in a single write.
QWaitCondition m_bufferHasData
TFWWriteThread * m_writeThread
void Sync(void) const
Flush data written to the file descriptor to disk.
bool SetBlocking(bool block=true)
Set write blocking mode While in blocking mode, ThreadedFileWriter::Write will wait for buffers to be...
QWaitCondition m_bufferEmpty
long long Seek(long long pos, int whence)
Seek to a position within stream; May be unsafe.
static const uint kMaxBufferSize
friend class TFWSyncThread
bool Open(void)
Opens the file we will be writing to.
void Flush(void)
Allow DiskLoop() to flush buffer completely ignoring low watermark.
static const uint kMinWriteSize
Minimum to write to disk in a single write, when not flushing buffer.
friend class TFWWriteThread
QWaitCondition m_bufferWasFreed
int Write(const void *data, uint count)
Writes data to the end of the write buffer.
QList< TFWBuffer * > m_emptyBuffers
bool ReOpen(const QString &newFilename="")
Reopens the file we are writing to or opens a new file.
void SyncLoop(void)
The thread run method that calls Sync(void).
static const uint kMaxBlockSize
Maximum block size to write at a time.
QList< TFWBuffer * > m_writeBuffers
~ThreadedFileWriter()
Commits all writes and closes the file.
QWaitCondition m_bufferSyncWait
TFWSyncThread * m_syncThread
#define fsync(FD)
Definition: compat.h:64
unsigned int uint
Definition: compat.h:68
#define close
Definition: compat.h:31
MythCoreContext * gCoreContext
This global variable contains the MythCoreContext instance for the app.
#define ENO
This can be appended to the LOG args with "+".
Definition: mythlogging.h:74
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
Definition: mythlogging.h:39
QDateTime current(bool stripped)
Returns current Date and Time in UTC.
Definition: mythdate.cpp:15
def write(text, progress=True)
Definition: mythburn.py:307
#define LOC