MythTV master
threadedfilewriter.cpp
Go to the documentation of this file.
1// C++ headers
2#include <cerrno>
3#include <csignal>
4#include <cstdio>
5#include <cstdlib>
6#include <cstring>
7#include <fcntl.h>
8#include <sys/stat.h>
9#include <sys/types.h>
10#include <unistd.h>
11
12// Qt headers
13#include <QString>
14
15// MythTV headers
16#include "threadedfilewriter.h"
17#include "mythlogging.h"
18#include "mythcorecontext.h"
19
20#include "mythtimer.h"
21#include "compat.h"
22#include "mythdate.h"
23
24#define LOC QString("TFW(%1:%2): ").arg(m_filename).arg(m_fd)
25
28{
29 RunProlog();
31 RunEpilog();
32}
33
36{
37 RunProlog();
39 RunEpilog();
40}
41
42const uint ThreadedFileWriter::kMaxBufferSize = 8 * 1024 * 1024;
44const uint ThreadedFileWriter::kMaxBlockSize = 1 * 1024 * 1024;
45
63bool ThreadedFileWriter::ReOpen(const QString& newFilename)
64{
65 Flush();
66
67 m_bufLock.lock();
68
69 if (m_fd >= 0)
70 {
71 close(m_fd);
72 m_fd = -1;
73 }
74
75 if (m_registered)
76 {
78 }
79
80 if (!newFilename.isEmpty())
81 m_filename = newFilename;
82
83 m_bufLock.unlock();
84
85 return Open();
86}
87
93{
94 m_ignoreWrites = false;
95
96 if (m_filename == "-")
97 m_fd = fileno(stdout);
98 else
99 {
100 QByteArray fname = m_filename.toLocal8Bit();
101 m_fd = open(fname.constData(), m_flags, m_mode);
102 }
103
104 if (m_fd < 0)
105 {
106 LOG(VB_GENERAL, LOG_ERR, LOC +
107 QString("Opening file '%1'.").arg(m_filename) + ENO);
108 return false;
109 }
110
112 m_registered = true;
113
114 LOG(VB_FILE, LOG_INFO, LOC + "Open() successful");
115
116#ifdef _WIN32
117 _setmode(m_fd, _O_BINARY);
118#endif
119 if (!m_writeThread)
120 {
121 m_writeThread = new TFWWriteThread(this);
123 }
124
125 if (!m_syncThread)
126 {
127 m_syncThread = new TFWSyncThread(this);
129 }
130
131 return true;
132}
133
138{
139 Flush();
140
141 { /* tell child threads to exit */
142 QMutexLocker locker(&m_bufLock);
143 m_inDtor = true;
144 m_bufferSyncWait.wakeAll();
145 m_bufferHasData.wakeAll();
146 }
147
148 if (m_writeThread)
149 {
151 delete m_writeThread;
152 m_writeThread = nullptr;
153 }
154
155 while (!m_writeBuffers.empty())
156 {
157 delete m_writeBuffers.front();
158 m_writeBuffers.pop_front();
159 }
160
161 while (!m_emptyBuffers.empty())
162 {
163 delete m_emptyBuffers.front();
164 m_emptyBuffers.pop_front();
165 }
166
167 if (m_syncThread)
168 {
170 delete m_syncThread;
171 m_syncThread = nullptr;
172 }
173
174 if (m_fd >= 0)
175 {
176 close(m_fd);
177 m_fd = -1;
178 }
179
181 m_registered = false;
182}
183
190int ThreadedFileWriter::Write(const void *data, uint count)
191{
192 if (count == 0)
193 return 0;
194
195 QMutexLocker locker(&m_bufLock);
196
197 if (m_ignoreWrites)
198 return -1;
199
200 uint written = 0;
201 uint left = count;
202
203 while (written < count)
204 {
205 uint towrite = (left > kMaxBlockSize) ? kMaxBlockSize : left;
206
207 if ((m_totalBufferUse + towrite) > (kMaxBufferSize * (m_blocking ? 1 : 8)))
208 {
209 if (!m_blocking)
210 {
211 LOG(VB_GENERAL, LOG_ERR, LOC +
212 "Maximum buffer size exceeded."
213 "\n\t\t\tfile will be truncated, no further writing "
214 "will be done."
215 "\n\t\t\tThis generally indicates your disk performance "
216 "\n\t\t\tis insufficient to deal with the number of on-going "
217 "\n\t\t\trecordings, or you have a disk failure.");
218 m_ignoreWrites = true;
219 return -1;
220 }
221 if (!m_warned)
222 {
223 LOG(VB_GENERAL, LOG_WARNING, LOC +
224 "Maximum buffer size exceeded."
225 "\n\t\t\tThis generally indicates your disk performance "
226 "\n\t\t\tis insufficient or you have a disk failure.");
227 m_warned = true;
228 }
229 // wait until some was written to disk, and try again
230 if (!m_bufferWasFreed.wait(locker.mutex(), 1000))
231 {
232 LOG(VB_GENERAL, LOG_DEBUG, LOC +
233 QString("Taking a long time waiting to write.. "
234 "buffer size %1 (needing %2, %3 to go)")
235 .arg(m_totalBufferUse).arg(towrite)
236 .arg(towrite-(kMaxBufferSize-m_totalBufferUse)));
237 }
238 continue;
239 }
240
241 TFWBuffer *buf = nullptr;
242
243 if (!m_writeBuffers.empty() &&
244 (m_writeBuffers.back()->data.size() + towrite) < kMinWriteSize)
245 {
246 buf = m_writeBuffers.back();
247 m_writeBuffers.pop_back();
248 }
249 else
250 {
251 if (!m_emptyBuffers.empty())
252 {
253 buf = m_emptyBuffers.front();
254 m_emptyBuffers.pop_front();
255 buf->data.clear();
256 }
257 else
258 {
259 buf = new TFWBuffer();
260 }
261 }
262
263 m_totalBufferUse += towrite;
264
265 const char *cdata = (const char*) data + written;
266 buf->data.insert(buf->data.end(), cdata, cdata+towrite);
268
269 m_writeBuffers.push_back(buf);
270
271 if ((m_writeBuffers.size() > 1) || (buf->data.size() >= kMinWriteSize))
272 {
273 m_bufferHasData.wakeAll();
274 }
275
276 written += towrite;
277 left -= towrite;
278 }
279
280 LOG(VB_FILE, LOG_DEBUG, LOC + QString("Write(*, %1) total %2 cnt %3")
281 .arg(count,4).arg(m_totalBufferUse).arg(m_writeBuffers.size()));
282
283 return count;
284}
285
297long long ThreadedFileWriter::Seek(long long pos, int whence)
298{
299 QMutexLocker locker(&m_bufLock);
300 m_flush = true;
301 while (!m_writeBuffers.empty())
302 {
303 m_bufferHasData.wakeAll();
304 if (!m_bufferEmpty.wait(locker.mutex(), 2000))
305 {
306 LOG(VB_GENERAL, LOG_WARNING, LOC +
307 QString("Taking a long time to flush.. buffer size %1")
308 .arg(m_totalBufferUse));
309 }
310 }
311 m_flush = false;
312 return lseek(m_fd, pos, whence);
313}
314
319{
320 QMutexLocker locker(&m_bufLock);
321 m_flush = true;
322 while (!m_writeBuffers.empty())
323 {
324 m_bufferHasData.wakeAll();
325 if (!m_bufferEmpty.wait(locker.mutex(), 2000))
326 {
327 LOG(VB_GENERAL, LOG_WARNING, LOC +
328 QString("Taking a long time to flush.. buffer size %1")
329 .arg(m_totalBufferUse));
330 }
331 }
332 m_flush = false;
333}
334
356{
357 if (m_fd >= 0)
358 {
359#if defined(_POSIX_SYNCHRONIZED_IO) && _POSIX_SYNCHRONIZED_IO > 0
360 // fdatasync tries to avoid updating metadata, but will in
361 // practice always update metadata if any data is written
362 // as the file will usually have grown.
363 fdatasync(m_fd);
364#else
365 fsync(m_fd);
366#endif
367 }
368}
369
375{
376 QMutexLocker locker(&m_bufLock);
377 if (newMinSize > 0)
378 m_tfwMinWriteSize = newMinSize;
379 m_bufferHasData.wakeAll();
380}
381
386{
387 QMutexLocker locker(&m_bufLock);
388 while (!m_inDtor)
389 {
390 locker.unlock();
391
392 Sync();
393
394 locker.relock();
395
397 {
398 // we aren't going to write to the disk anymore, so can de-register
400 m_registered = false;
401 }
402 m_bufferSyncWait.wait(&m_bufLock, 1000);
403 }
404}
405
410{
411#ifndef _WIN32
412 // don't exit program if file gets larger than quota limit..
413 signal(SIGXFSZ, SIG_IGN);
414#endif
415
416 QMutexLocker locker(&m_bufLock);
417
418 // Even if the bytes buffered is less than the minimum write
419 // size we do want to write to the OS buffers periodically.
420 // This timer makes sure we do.
421 MythTimer minWriteTimer;
422 MythTimer lastRegisterTimer;
423 minWriteTimer.start();
424 lastRegisterTimer.start();
425
426 uint64_t total_written = 0LL;
427
428 while (!m_inDtor)
429 {
430 if (m_ignoreWrites)
431 {
432 while (!m_writeBuffers.empty())
433 {
434 delete m_writeBuffers.front();
435 m_writeBuffers.pop_front();
436 }
437 while (!m_emptyBuffers.empty())
438 {
439 delete m_emptyBuffers.front();
440 m_emptyBuffers.pop_front();
441 }
442 m_bufferEmpty.wakeAll();
443 m_bufferHasData.wait(locker.mutex());
444 continue;
445 }
446
447 if (m_writeBuffers.empty())
448 {
449 m_bufferEmpty.wakeAll();
450 m_bufferHasData.wait(locker.mutex(), 1000);
452 continue;
453 }
454
455 auto mwte = minWriteTimer.elapsed();
456 if (!m_flush && (mwte < 250ms) && (m_totalBufferUse < kMinWriteSize))
457 {
458 m_bufferHasData.wait(locker.mutex(), (250ms - mwte).count());
460 continue;
461 }
462
463 if (m_fd == -1)
464 {
465 m_bufferHasData.wait(locker.mutex(), 200);
467 continue;
468 }
469
470 TFWBuffer *buf = m_writeBuffers.front();
471 m_writeBuffers.pop_front();
472 m_totalBufferUse -= buf->data.size();
473 m_bufferWasFreed.wakeAll();
474 minWriteTimer.start();
475
477
478 const void *data = (buf->data).data();
479 uint sz = buf->data.size();
480
481 bool write_ok = true;
482 uint tot = 0;
483 uint errcnt = 0;
484
485 LOG(VB_FILE, LOG_DEBUG, LOC + QString("write(%1) cnt %2 total %3")
486 .arg(sz).arg(m_writeBuffers.size())
487 .arg(m_totalBufferUse));
488
489 MythTimer writeTimer;
490 writeTimer.start();
491
492 while ((tot < sz) && !m_inDtor)
493 {
494 locker.unlock();
495
496 int ret = write(m_fd, (char *)data + tot, sz - tot);
497
498 if (ret < 0)
499 {
500 if (errno == EAGAIN)
501 {
502 LOG(VB_GENERAL, LOG_WARNING, LOC + "Got EAGAIN.");
503 }
504 else
505 {
506 errcnt++;
507 LOG(VB_GENERAL, LOG_ERR, LOC + "File I/O " +
508 QString(" errcnt: %1").arg(errcnt) + ENO);
509 }
510
511 if ((errcnt >= 3) || (ENOSPC == errno) || (EFBIG == errno))
512 {
513 locker.relock();
514 write_ok = false;
515 break;
516 }
517 }
518 else
519 {
520 tot += ret;
521 total_written += ret;
522 LOG(VB_FILE, LOG_DEBUG, LOC +
523 QString("total written so far: %1 bytes")
524 .arg(total_written));
525 }
526
527 locker.relock();
528
529 if ((tot < sz) && !m_inDtor)
530 m_bufferHasData.wait(locker.mutex(), 50);
531 }
532
534
535 if (lastRegisterTimer.elapsed() >= 10s)
536 {
538 m_registered = true;
539 lastRegisterTimer.restart();
540 }
541
543 m_emptyBuffers.push_back(buf);
544
545 if (writeTimer.elapsed() > 1s)
546 {
547 LOG(VB_GENERAL, LOG_WARNING, LOC +
548 QString("write(%1) cnt %2 total %3 -- took a long time, %4 ms")
549 .arg(sz).arg(m_writeBuffers.size())
550 .arg(m_totalBufferUse).arg(writeTimer.elapsed().count()));
551 }
552
553 if (!write_ok && ((EFBIG == errno) || (ENOSPC == errno)))
554 {
555 QString msg;
556 switch (errno)
557 {
558 case EFBIG:
559 msg =
560 "Maximum file size exceeded by '%1'"
561 "\n\t\t\t"
562 "You must either change the process ulimits, configure"
563 "\n\t\t\t"
564 "your operating system with \"Large File\" support, "
565 "or use"
566 "\n\t\t\t"
567 "a filesystem which supports 64-bit or 128-bit files."
568 "\n\t\t\t"
569 "HINT: FAT32 is a 32-bit filesystem.";
570 break;
571 case ENOSPC:
572 msg =
573 "No space left on the device for file '%1'"
574 "\n\t\t\t"
575 "file will be truncated, no further writing "
576 "will be done.";
577 break;
578 }
579
580 LOG(VB_GENERAL, LOG_ERR, LOC + msg.arg(m_filename));
581 m_ignoreWrites = true;
582 }
583 }
584}
585
587{
588 QDateTime cur = MythDate::current();
589 QDateTime cur_m_60 = cur.addSecs(-60);
590
591 QList<TFWBuffer*>::iterator it = m_emptyBuffers.begin();
592 while (it != m_emptyBuffers.end())
593 {
594 if (((*it)->lastUsed < cur_m_60) ||
595 ((*it)->data.capacity() > 3 * (*it)->data.size() &&
596 (*it)->data.capacity() > 64 * 1024LL))
597 {
598 delete *it;
599 it = m_emptyBuffers.erase(it);
600 continue;
601 }
602 ++it;
603 }
604}
605
614{
615 bool old = m_blocking;
616 m_blocking = block;
617 return old;
618}
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:196
void start(QThread::Priority p=QThread::InheritPriority)
Tell MThread to start running the thread in the near future.
Definition: mthread.cpp:283
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:209
bool wait(std::chrono::milliseconds time=std::chrono::milliseconds::max())
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:300
void RegisterFileForWrite(const QString &file, uint64_t size=0LL)
void UnregisterFileForWrite(const QString &file)
A QElapsedTimer based timer to replace use of QTime as a timer.
Definition: mythtimer.h:14
std::chrono::milliseconds restart(void)
Returns milliseconds elapsed since last start() or restart() and resets the count.
Definition: mythtimer.cpp:62
std::chrono::milliseconds elapsed(void)
Returns milliseconds elapsed since last start() or restart()
Definition: mythtimer.cpp:91
void start(void)
starts measuring elapsed time.
Definition: mythtimer.cpp:47
void run(void) override
Runs ThreadedFileWriter::SyncLoop(void)
ThreadedFileWriter * m_parent
void run(void) override
Runs ThreadedFileWriter::DiskLoop(void)
ThreadedFileWriter * m_parent
void DiskLoop(void)
The thread run method that actually calls writes to disk.
void SetWriteBufferMinWriteSize(uint newMinSize=kMinWriteSize)
Sets the minumum number of bytes to write to disk in a single write.
QWaitCondition m_bufferHasData
TFWWriteThread * m_writeThread
void Sync(void) const
Flush data written to the file descriptor to disk.
bool SetBlocking(bool block=true)
Set write blocking mode While in blocking mode, ThreadedFileWriter::Write will wait for buffers to be...
QWaitCondition m_bufferEmpty
long long Seek(long long pos, int whence)
Seek to a position within stream; May be unsafe.
static const uint kMaxBufferSize
friend class TFWSyncThread
bool Open(void)
Opens the file we will be writing to.
void Flush(void)
Allow DiskLoop() to flush buffer completely ignoring low watermark.
static const uint kMinWriteSize
Minimum to write to disk in a single write, when not flushing buffer.
friend class TFWWriteThread
QWaitCondition m_bufferWasFreed
int Write(const void *data, uint count)
Writes data to the end of the write buffer.
QList< TFWBuffer * > m_emptyBuffers
bool ReOpen(const QString &newFilename="")
Reopens the file we are writing to or opens a new file.
void SyncLoop(void)
The thread run method that calls Sync(void).
static const uint kMaxBlockSize
Maximum block size to write at a time.
QList< TFWBuffer * > m_writeBuffers
~ThreadedFileWriter()
Commits all writes and closes the file.
QWaitCondition m_bufferSyncWait
TFWSyncThread * m_syncThread
#define fsync(FD)
Definition: compat.h:72
#define close
Definition: compat.h:39
unsigned int uint
Definition: freesurround.h:24
MythCoreContext * gCoreContext
This global variable contains the MythCoreContext instance for the app.
#define lseek
#define ENO
This can be appended to the LOG args with "+".
Definition: mythlogging.h:74
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
Definition: mythlogging.h:39
QDateTime current(bool stripped)
Returns current Date and Time in UTC.
Definition: mythdate.cpp:15
def write(text, progress=True)
Definition: mythburn.py:307
#define LOC