MythTV  master
threadedfilewriter.cpp
Go to the documentation of this file.
1 // C++ headers
2 #include <cerrno>
3 #include <csignal>
4 #include <cstdio>
5 #include <cstdlib>
6 #include <cstring>
7 #include <fcntl.h>
8 #include <sys/stat.h>
9 #include <sys/types.h>
10 #include <unistd.h>
11 
12 // Qt headers
13 #include <QString>
14 
15 // MythTV headers
16 #include "threadedfilewriter.h"
17 #include "mythlogging.h"
18 #include "mythcorecontext.h"
19 
20 #include "mythtimer.h"
21 #include "compat.h"
22 #include "mythdate.h"
23 
24 #define LOC QString("TFW(%1:%2): ").arg(m_filename).arg(m_fd)
25 
28 {
29  RunProlog();
30  m_parent->DiskLoop();
31  RunEpilog();
32 }
33 
36 {
37  RunProlog();
38  m_parent->SyncLoop();
39  RunEpilog();
40 }
41 
42 const uint ThreadedFileWriter::kMaxBufferSize = 8 * 1024 * 1024;
43 const uint ThreadedFileWriter::kMinWriteSize = 64 * 1024;
44 const uint ThreadedFileWriter::kMaxBlockSize = 1 * 1024 * 1024;
45 
63 bool ThreadedFileWriter::ReOpen(const QString& newFilename)
64 {
65  Flush();
66 
67  m_bufLock.lock();
68 
69  if (m_fd >= 0)
70  {
71  close(m_fd);
72  m_fd = -1;
73  }
74 
75  if (m_registered)
76  {
78  }
79 
80  if (!newFilename.isEmpty())
81  m_filename = newFilename;
82 
83  m_bufLock.unlock();
84 
85  return Open();
86 }
87 
93 {
94  m_ignoreWrites = false;
95 
96  if (m_filename == "-")
97  m_fd = fileno(stdout);
98  else
99  {
100  QByteArray fname = m_filename.toLocal8Bit();
101  m_fd = open(fname.constData(), m_flags, m_mode);
102  }
103 
104  if (m_fd < 0)
105  {
106  LOG(VB_GENERAL, LOG_ERR, LOC +
107  QString("Opening file '%1'.").arg(m_filename) + ENO);
108  return false;
109  }
110 
112  m_registered = true;
113 
114  LOG(VB_FILE, LOG_INFO, LOC + "Open() successful");
115 
116 #ifdef _WIN32
117  _setmode(m_fd, _O_BINARY);
118 #endif
119  if (!m_writeThread)
120  {
121  m_writeThread = new TFWWriteThread(this);
122  m_writeThread->start();
123  }
124 
125  if (!m_syncThread)
126  {
127  m_syncThread = new TFWSyncThread(this);
128  m_syncThread->start();
129  }
130 
131  return true;
132 }
133 
138 {
139  Flush();
140 
141  { /* tell child threads to exit */
142  QMutexLocker locker(&m_bufLock);
143  m_inDtor = true;
144  m_bufferSyncWait.wakeAll();
145  m_bufferHasData.wakeAll();
146  }
147 
148  if (m_writeThread)
149  {
150  m_writeThread->wait();
151  delete m_writeThread;
152  m_writeThread = nullptr;
153  }
154 
155  while (!m_writeBuffers.empty())
156  {
157  delete m_writeBuffers.front();
158  m_writeBuffers.pop_front();
159  }
160 
161  while (!m_emptyBuffers.empty())
162  {
163  delete m_emptyBuffers.front();
164  m_emptyBuffers.pop_front();
165  }
166 
167  if (m_syncThread)
168  {
169  m_syncThread->wait();
170  delete m_syncThread;
171  m_syncThread = nullptr;
172  }
173 
174  if (m_fd >= 0)
175  {
176  close(m_fd);
177  m_fd = -1;
178  }
179 
181  m_registered = false;
182 }
183 
190 int ThreadedFileWriter::Write(const void *data, uint count)
191 {
192  if (count == 0)
193  return 0;
194 
195  QMutexLocker locker(&m_bufLock);
196 
197  if (m_ignoreWrites)
198  return -1;
199 
200  uint written = 0;
201  uint left = count;
202 
203  while (written < count)
204  {
205  uint towrite = (left > kMaxBlockSize) ? kMaxBlockSize : left;
206 
207  if ((m_totalBufferUse + towrite) > (kMaxBufferSize * (m_blocking ? 1 : 8)))
208  {
209  if (!m_blocking)
210  {
211  LOG(VB_GENERAL, LOG_ERR, LOC +
212  "Maximum buffer size exceeded."
213  "\n\t\t\tfile will be truncated, no further writing "
214  "will be done."
215  "\n\t\t\tThis generally indicates your disk performance "
216  "\n\t\t\tis insufficient to deal with the number of on-going "
217  "\n\t\t\trecordings, or you have a disk failure.");
218  m_ignoreWrites = true;
219  return -1;
220  }
221  if (!m_warned)
222  {
223  LOG(VB_GENERAL, LOG_WARNING, LOC +
224  "Maximum buffer size exceeded."
225  "\n\t\t\tThis generally indicates your disk performance "
226  "\n\t\t\tis insufficient or you have a disk failure.");
227  m_warned = true;
228  }
229  // wait until some was written to disk, and try again
230  if (!m_bufferWasFreed.wait(locker.mutex(), 1000))
231  {
232  LOG(VB_GENERAL, LOG_DEBUG, LOC +
233  QString("Taking a long time waiting to write.. "
234  "buffer size %1 (needing %2, %3 to go)")
235  .arg(m_totalBufferUse).arg(towrite)
236  .arg(towrite-(kMaxBufferSize-m_totalBufferUse)));
237  }
238  continue;
239  }
240 
241  TFWBuffer *buf = nullptr;
242 
243  if (!m_writeBuffers.empty() &&
244  (m_writeBuffers.back()->data.size() + towrite) < kMinWriteSize)
245  {
246  buf = m_writeBuffers.back();
247  m_writeBuffers.pop_back();
248  }
249  else
250  {
251  if (!m_emptyBuffers.empty())
252  {
253  buf = m_emptyBuffers.front();
254  m_emptyBuffers.pop_front();
255  buf->data.clear();
256  }
257  else
258  {
259  buf = new TFWBuffer();
260  }
261  }
262 
263  m_totalBufferUse += towrite;
264 
265  const char *cdata = (const char*) data + written;
266  buf->data.insert(buf->data.end(), cdata, cdata+towrite);
267  buf->lastUsed = MythDate::current();
268 
269  m_writeBuffers.push_back(buf);
270 
271  if ((m_writeBuffers.size() > 1) || (buf->data.size() >= kMinWriteSize))
272  {
273  m_bufferHasData.wakeAll();
274  }
275 
276  written += towrite;
277  left -= towrite;
278  }
279 
280  LOG(VB_FILE, LOG_DEBUG, LOC + QString("Write(*, %1) total %2 cnt %3")
281  .arg(count,4).arg(m_totalBufferUse).arg(m_writeBuffers.size()));
282 
283  return count;
284 }
285 
297 long long ThreadedFileWriter::Seek(long long pos, int whence)
298 {
299  QMutexLocker locker(&m_bufLock);
300  m_flush = true;
301  while (!m_writeBuffers.empty())
302  {
303  m_bufferHasData.wakeAll();
304  if (!m_bufferEmpty.wait(locker.mutex(), 2000))
305  {
306  LOG(VB_GENERAL, LOG_WARNING, LOC +
307  QString("Taking a long time to flush.. buffer size %1")
308  .arg(m_totalBufferUse));
309  }
310  }
311  m_flush = false;
312  return lseek(m_fd, pos, whence);
313 }
314 
319 {
320  QMutexLocker locker(&m_bufLock);
321  m_flush = true;
322  while (!m_writeBuffers.empty())
323  {
324  m_bufferHasData.wakeAll();
325  if (!m_bufferEmpty.wait(locker.mutex(), 2000))
326  {
327  LOG(VB_GENERAL, LOG_WARNING, LOC +
328  QString("Taking a long time to flush.. buffer size %1")
329  .arg(m_totalBufferUse));
330  }
331  }
332  m_flush = false;
333 }
334 
355 void ThreadedFileWriter::Sync(void) const
356 {
357  if (m_fd >= 0)
358  {
359 #if defined(_POSIX_SYNCHRONIZED_IO) && _POSIX_SYNCHRONIZED_IO > 0
360  // fdatasync tries to avoid updating metadata, but will in
361  // practice always update metadata if any data is written
362  // as the file will usually have grown.
363  fdatasync(m_fd);
364 #else
365  fsync(m_fd);
366 #endif
367  }
368 }
369 
375 {
376  QMutexLocker locker(&m_bufLock);
377  if (newMinSize > 0)
378  m_tfwMinWriteSize = newMinSize;
379  m_bufferHasData.wakeAll();
380 }
381 
386 {
387  QMutexLocker locker(&m_bufLock);
388  while (!m_inDtor)
389  {
390  locker.unlock();
391 
392  Sync();
393 
394  locker.relock();
395 
397  {
398  // we aren't going to write to the disk anymore, so can de-register
400  m_registered = false;
401  }
402  m_bufferSyncWait.wait(&m_bufLock, 1000);
403  }
404 }
405 
410 {
411 #ifndef _WIN32
412  // don't exit program if file gets larger than quota limit..
413  signal(SIGXFSZ, SIG_IGN);
414 #endif
415 
416  QMutexLocker locker(&m_bufLock);
417 
418  // Even if the bytes buffered is less than the minimum write
419  // size we do want to write to the OS buffers periodically.
420  // This timer makes sure we do.
421  MythTimer minWriteTimer;
422  MythTimer lastRegisterTimer;
423  minWriteTimer.start();
424  lastRegisterTimer.start();
425 
426  uint64_t total_written = 0LL;
427 
428  while (!m_inDtor)
429  {
430  if (m_ignoreWrites)
431  {
432  while (!m_writeBuffers.empty())
433  {
434  delete m_writeBuffers.front();
435  m_writeBuffers.pop_front();
436  }
437  while (!m_emptyBuffers.empty())
438  {
439  delete m_emptyBuffers.front();
440  m_emptyBuffers.pop_front();
441  }
442  m_bufferEmpty.wakeAll();
443  m_bufferHasData.wait(locker.mutex());
444  continue;
445  }
446 
447  if (m_writeBuffers.empty())
448  {
449  m_bufferEmpty.wakeAll();
450  m_bufferHasData.wait(locker.mutex(), 1000);
452  continue;
453  }
454 
455  auto mwte = minWriteTimer.elapsed();
456  if (!m_flush && (mwte < 250ms) && (m_totalBufferUse < kMinWriteSize))
457  {
458  m_bufferHasData.wait(locker.mutex(), (250ms - mwte).count());
460  continue;
461  }
462 
463  if (m_fd == -1)
464  {
465  m_bufferHasData.wait(locker.mutex(), 200);
467  continue;
468  }
469 
470  TFWBuffer *buf = m_writeBuffers.front();
471  m_writeBuffers.pop_front();
472  m_totalBufferUse -= buf->data.size();
473  m_bufferWasFreed.wakeAll();
474  minWriteTimer.start();
475 
477 
478  const void *data = (buf->data).data();
479  uint sz = buf->data.size();
480 
481  bool write_ok = true;
482  uint tot = 0;
483  uint errcnt = 0;
484 
485  LOG(VB_FILE, LOG_DEBUG, LOC + QString("write(%1) cnt %2 total %3")
486  .arg(sz).arg(m_writeBuffers.size())
487  .arg(m_totalBufferUse));
488 
489  MythTimer writeTimer;
490  writeTimer.start();
491 
492  while ((tot < sz) && !m_inDtor)
493  {
494  locker.unlock();
495 
496  int ret = write(m_fd, (char *)data + tot, sz - tot);
497 
498  if (ret < 0)
499  {
500  if (errno == EAGAIN)
501  {
502  LOG(VB_GENERAL, LOG_WARNING, LOC + "Got EAGAIN.");
503  }
504  else
505  {
506  errcnt++;
507  LOG(VB_GENERAL, LOG_ERR, LOC + "File I/O " +
508  QString(" errcnt: %1").arg(errcnt) + ENO);
509  }
510 
511  if ((errcnt >= 3) || (ENOSPC == errno) || (EFBIG == errno))
512  {
513  locker.relock();
514  write_ok = false;
515  break;
516  }
517  }
518  else
519  {
520  tot += ret;
521  total_written += ret;
522  LOG(VB_FILE, LOG_DEBUG, LOC +
523  QString("total written so far: %1 bytes")
524  .arg(total_written));
525  }
526 
527  locker.relock();
528 
529  if ((tot < sz) && !m_inDtor)
530  m_bufferHasData.wait(locker.mutex(), 50);
531  }
532 
534 
535  if (lastRegisterTimer.elapsed() >= 10s)
536  {
538  m_registered = true;
539  lastRegisterTimer.restart();
540  }
541 
542  buf->lastUsed = MythDate::current();
543  m_emptyBuffers.push_back(buf);
544 
545  if (writeTimer.elapsed() > 1s)
546  {
547  LOG(VB_GENERAL, LOG_WARNING, LOC +
548  QString("write(%1) cnt %2 total %3 -- took a long time, %4 ms")
549  .arg(sz).arg(m_writeBuffers.size())
550  .arg(m_totalBufferUse).arg(writeTimer.elapsed().count()));
551  }
552 
553  if (!write_ok && ((EFBIG == errno) || (ENOSPC == errno)))
554  {
555  QString msg;
556  switch (errno)
557  {
558  case EFBIG:
559  msg =
560  "Maximum file size exceeded by '%1'"
561  "\n\t\t\t"
562  "You must either change the process ulimits, configure"
563  "\n\t\t\t"
564  "your operating system with \"Large File\" support, "
565  "or use"
566  "\n\t\t\t"
567  "a filesystem which supports 64-bit or 128-bit files."
568  "\n\t\t\t"
569  "HINT: FAT32 is a 32-bit filesystem.";
570  break;
571  case ENOSPC:
572  msg =
573  "No space left on the device for file '%1'"
574  "\n\t\t\t"
575  "file will be truncated, no further writing "
576  "will be done.";
577  break;
578  }
579 
580  LOG(VB_GENERAL, LOG_ERR, LOC + msg.arg(m_filename));
581  m_ignoreWrites = true;
582  }
583  }
584 }
585 
587 {
588  QDateTime cur = MythDate::current();
589  QDateTime cur_m_60 = cur.addSecs(-60);
590 
591  QList<TFWBuffer*>::iterator it = m_emptyBuffers.begin();
592  while (it != m_emptyBuffers.end())
593  {
594  if (((*it)->lastUsed < cur_m_60) ||
595  ((*it)->data.capacity() > 3 * (*it)->data.size() &&
596  (*it)->data.capacity() > 64 * 1024LL))
597  {
598  delete *it;
599  it = m_emptyBuffers.erase(it);
600  continue;
601  }
602  ++it;
603  }
604 }
605 
614 {
615  bool old = m_blocking;
616  m_blocking = block;
617  return old;
618 }
ThreadedFileWriter::m_bufLock
QMutex m_bufLock
Definition: threadedfilewriter.h:93
MythTimer::elapsed
std::chrono::milliseconds elapsed(void)
Returns milliseconds elapsed since last start() or restart()
Definition: mythtimer.cpp:91
ThreadedFileWriter::m_flags
int m_flags
Definition: threadedfilewriter.h:75
ThreadedFileWriter::Seek
long long Seek(long long pos, int whence)
Seek to a position within stream; May be unsafe.
Definition: threadedfilewriter.cpp:297
MThread::start
void start(QThread::Priority p=QThread::InheritPriority)
Tell MThread to start running the thread in the near future.
Definition: mthread.cpp:283
ENO
#define ENO
This can be appended to the LOG args with "+".
Definition: mythlogging.h:73
ThreadedFileWriter::m_totalBufferUse
uint m_totalBufferUse
Definition: threadedfilewriter.h:84
ThreadedFileWriter::m_mode
mode_t m_mode
Definition: threadedfilewriter.h:76
ThreadedFileWriter::Sync
void Sync(void) const
Flush data written to the file descriptor to disk.
Definition: threadedfilewriter.cpp:355
ThreadedFileWriter::m_bufferWasFreed
QWaitCondition m_bufferWasFreed
Definition: threadedfilewriter.h:105
MythTimer
A QElapsedTimer based timer to replace use of QTime as a timer.
Definition: mythtimer.h:13
MythCoreContext::UnregisterFileForWrite
void UnregisterFileForWrite(const QString &file)
Definition: mythcorecontext.cpp:2140
MThread::wait
bool wait(std::chrono::milliseconds time=std::chrono::milliseconds::max())
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:300
MythCoreContext::RegisterFileForWrite
void RegisterFileForWrite(const QString &file, uint64_t size=0LL)
Definition: mythcorecontext.cpp:2122
mythburn.write
def write(text, progress=True)
Definition: mythburn.py:308
ThreadedFileWriter::m_registered
bool m_registered
Definition: threadedfilewriter.h:116
ThreadedFileWriter::m_blocking
bool m_blocking
Definition: threadedfilewriter.h:115
ThreadedFileWriter::m_bufferEmpty
QWaitCondition m_bufferEmpty
Definition: threadedfilewriter.h:102
ThreadedFileWriter::Flush
void Flush(void)
Allow DiskLoop() to flush buffer completely ignoring low watermark.
Definition: threadedfilewriter.cpp:318
ThreadedFileWriter::m_emptyBuffers
QList< TFWBuffer * > m_emptyBuffers
Definition: threadedfilewriter.h:95
MythTimer::start
void start(void)
starts measuring elapsed time.
Definition: mythtimer.cpp:47
LOG
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
Definition: mythlogging.h:39
MThread::RunProlog
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:196
TFWWriteThread::run
void run(void) override
Runs ThreadedFileWriter::DiskLoop(void)
Definition: threadedfilewriter.cpp:27
ThreadedFileWriter::TFWBuffer::lastUsed
QDateTime lastUsed
Definition: threadedfilewriter.h:91
ThreadedFileWriter::m_filename
QString m_filename
Definition: threadedfilewriter.h:74
ThreadedFileWriter::m_writeBuffers
QList< TFWBuffer * > m_writeBuffers
Definition: threadedfilewriter.h:94
MythDate::current
QDateTime current(bool stripped)
Returns current Date and Time in UTC.
Definition: mythdate.cpp:14
close
#define close
Definition: compat.h:43
lseek
#define lseek
Definition: mythiowrapper.cpp:239
ThreadedFileWriter::SetWriteBufferMinWriteSize
void SetWriteBufferMinWriteSize(uint newMinSize=kMinWriteSize)
Sets the minumum number of bytes to write to disk in a single write. This is ignored during a Flush(v...
Definition: threadedfilewriter.cpp:374
threadedfilewriter.h
mythdate.h
mythlogging.h
ThreadedFileWriter::m_syncThread
TFWSyncThread * m_syncThread
Definition: threadedfilewriter.h:99
ThreadedFileWriter::m_tfwMinWriteSize
uint m_tfwMinWriteSize
Definition: threadedfilewriter.h:83
compat.h
MythTimer::restart
std::chrono::milliseconds restart(void)
Returns milliseconds elapsed since last start() or restart() and resets the count.
Definition: mythtimer.cpp:62
ThreadedFileWriter::m_bufferSyncWait
QWaitCondition m_bufferSyncWait
Definition: threadedfilewriter.h:104
ThreadedFileWriter::TFWBuffer::data
std::vector< char > data
Definition: threadedfilewriter.h:90
ThreadedFileWriter::m_flush
bool m_flush
Definition: threadedfilewriter.h:80
MThread::RunEpilog
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:209
ThreadedFileWriter::m_writeThread
TFWWriteThread * m_writeThread
Definition: threadedfilewriter.h:98
hardwareprofile.distros.mythtv_data.main.stdout
stdout
Definition: main.py:87
uint
unsigned int uint
Definition: compat.h:81
gCoreContext
MythCoreContext * gCoreContext
This global variable contains the MythCoreContext instance for the app.
Definition: mythcorecontext.cpp:55
ThreadedFileWriter::kMaxBufferSize
static const uint kMaxBufferSize
Definition: threadedfilewriter.h:108
ThreadedFileWriter::Write
int Write(const void *data, uint count)
Writes data to the end of the write buffer.
Definition: threadedfilewriter.cpp:190
ThreadedFileWriter::TrimEmptyBuffers
void TrimEmptyBuffers(void)
Definition: threadedfilewriter.cpp:586
ThreadedFileWriter::m_ignoreWrites
bool m_ignoreWrites
Definition: threadedfilewriter.h:82
mythcorecontext.h
ThreadedFileWriter::ReOpen
bool ReOpen(const QString &newFilename="")
Reopens the file we are writing to or opens a new file.
Definition: threadedfilewriter.cpp:63
ThreadedFileWriter::SyncLoop
void SyncLoop(void)
The thread run method that calls Sync(void).
Definition: threadedfilewriter.cpp:385
TFWSyncThread::m_parent
ThreadedFileWriter * m_parent
Definition: threadedfilewriter.h:39
ThreadedFileWriter::~ThreadedFileWriter
~ThreadedFileWriter()
Commits all writes and closes the file.
Definition: threadedfilewriter.cpp:137
ThreadedFileWriter::m_warned
bool m_warned
Definition: threadedfilewriter.h:114
ThreadedFileWriter::m_bufferHasData
QWaitCondition m_bufferHasData
Definition: threadedfilewriter.h:103
mythtimer.h
ThreadedFileWriter::DiskLoop
void DiskLoop(void)
The thread run method that actually calls writes to disk.
Definition: threadedfilewriter.cpp:409
LOC
#define LOC
Definition: threadedfilewriter.cpp:24
ThreadedFileWriter::m_fd
int m_fd
Definition: threadedfilewriter.h:77
ThreadedFileWriter::Open
bool Open(void)
Opens the file we will be writing to.
Definition: threadedfilewriter.cpp:92
TFWSyncThread::run
void run(void) override
Runs ThreadedFileWriter::SyncLoop(void)
Definition: threadedfilewriter.cpp:35
ThreadedFileWriter::TFWWriteThread
friend class TFWWriteThread
Definition: threadedfilewriter.h:44
ThreadedFileWriter::SetBlocking
bool SetBlocking(bool block=true)
Set write blocking mode While in blocking mode, ThreadedFileWriter::Write will wait for buffers to be...
Definition: threadedfilewriter.cpp:613
ThreadedFileWriter::TFWSyncThread
friend class TFWSyncThread
Definition: threadedfilewriter.h:45
ThreadedFileWriter::kMinWriteSize
static const uint kMinWriteSize
Minimum to write to disk in a single write, when not flushing buffer.
Definition: threadedfilewriter.h:110
ThreadedFileWriter::TFWBuffer
Definition: threadedfilewriter.h:87
ThreadedFileWriter::kMaxBlockSize
static const uint kMaxBlockSize
Maximum block size to write at a time.
Definition: threadedfilewriter.h:112
TFWWriteThread::m_parent
ThreadedFileWriter * m_parent
Definition: threadedfilewriter.h:29
ThreadedFileWriter::m_inDtor
bool m_inDtor
Definition: threadedfilewriter.h:81
fsync
#define fsync(FD)
Definition: compat.h:76