MythTV  master
threadedfilewriter.cpp
Go to the documentation of this file.
1 // C++ headers
2 #include <cerrno>
3 #include <csignal>
4 #include <cstdio>
5 #include <cstdlib>
6 #include <cstring>
7 #include <fcntl.h>
8 #include <sys/stat.h>
9 #include <sys/types.h>
10 #include <unistd.h>
11 
12 // Qt headers
13 #include <QString>
14 
15 // MythTV headers
16 #include "threadedfilewriter.h"
17 #include "mythlogging.h"
18 #include "mythcorecontext.h"
19 
20 #include "mythtimer.h"
21 #include "compat.h"
22 #include "mythdate.h"
23 
24 #define LOC QString("TFW(%1:%2): ").arg(m_filename).arg(m_fd)
25 
28 {
29  RunProlog();
30  m_parent->DiskLoop();
31  RunEpilog();
32 }
33 
36 {
37  RunProlog();
38  m_parent->SyncLoop();
39  RunEpilog();
40 }
41 
42 const uint ThreadedFileWriter::kMaxBufferSize = 8 * 1024 * 1024;
43 const uint ThreadedFileWriter::kMinWriteSize = 64 * 1024;
44 const uint ThreadedFileWriter::kMaxBlockSize = 1 * 1024 * 1024;
45 
63 bool ThreadedFileWriter::ReOpen(const QString& newFilename)
64 {
65  Flush();
66 
67  m_buflock.lock();
68 
69  if (m_fd >= 0)
70  {
71  close(m_fd);
72  m_fd = -1;
73  }
74 
75  if (m_registered)
76  {
78  }
79 
80  if (!newFilename.isEmpty())
81  m_filename = newFilename;
82 
83  m_buflock.unlock();
84 
85  return Open();
86 }
87 
93 {
94  m_ignore_writes = false;
95 
96  if (m_filename == "-")
97  m_fd = fileno(stdout);
98  else
99  {
100  QByteArray fname = m_filename.toLocal8Bit();
101  m_fd = open(fname.constData(), m_flags, m_mode);
102  }
103 
104  if (m_fd < 0)
105  {
106  LOG(VB_GENERAL, LOG_ERR, LOC +
107  QString("Opening file '%1'.").arg(m_filename) + ENO);
108  return false;
109  }
110 
112  m_registered = true;
113 
114  LOG(VB_FILE, LOG_INFO, LOC + "Open() successful");
115 
116 #ifdef _WIN32
117  _setmode(m_fd, _O_BINARY);
118 #endif
119  if (!m_writeThread)
120  {
121  m_writeThread = new TFWWriteThread(this);
122  m_writeThread->start();
123  }
124 
125  if (!m_syncThread)
126  {
127  m_syncThread = new TFWSyncThread(this);
128  m_syncThread->start();
129  }
130 
131  return true;
132 }
133 
138 {
139  Flush();
140 
141  { /* tell child threads to exit */
142  QMutexLocker locker(&m_buflock);
143  m_in_dtor = true;
144  m_bufferSyncWait.wakeAll();
145  m_bufferHasData.wakeAll();
146  }
147 
148  if (m_writeThread)
149  {
150  m_writeThread->wait();
151  delete m_writeThread;
152  m_writeThread = nullptr;
153  }
154 
155  while (!m_writeBuffers.empty())
156  {
157  delete m_writeBuffers.front();
158  m_writeBuffers.pop_front();
159  }
160 
161  while (!m_emptyBuffers.empty())
162  {
163  delete m_emptyBuffers.front();
164  m_emptyBuffers.pop_front();
165  }
166 
167  if (m_syncThread)
168  {
169  m_syncThread->wait();
170  delete m_syncThread;
171  m_syncThread = nullptr;
172  }
173 
174  if (m_fd >= 0)
175  {
176  close(m_fd);
177  m_fd = -1;
178  }
179 
181  m_registered = false;
182 }
183 
190 int ThreadedFileWriter::Write(const void *data, uint count)
191 {
192  if (count == 0)
193  return 0;
194 
195  QMutexLocker locker(&m_buflock);
196 
197  if (m_ignore_writes)
198  return -1;
199 
200  uint written = 0;
201  uint left = count;
202 
203  while (written < count)
204  {
205  uint towrite = (left > kMaxBlockSize) ? kMaxBlockSize : left;
206 
207  if ((m_totalBufferUse + towrite) > (kMaxBufferSize * (m_blocking ? 1 : 8)))
208  {
209  if (!m_blocking)
210  {
211  LOG(VB_GENERAL, LOG_ERR, LOC +
212  "Maximum buffer size exceeded."
213  "\n\t\t\tfile will be truncated, no further writing "
214  "will be done."
215  "\n\t\t\tThis generally indicates your disk performance "
216  "\n\t\t\tis insufficient to deal with the number of on-going "
217  "\n\t\t\trecordings, or you have a disk failure.");
218  m_ignore_writes = true;
219  return -1;
220  }
221  if (!m_warned)
222  {
223  LOG(VB_GENERAL, LOG_WARNING, LOC +
224  "Maximum buffer size exceeded."
225  "\n\t\t\tThis generally indicates your disk performance "
226  "\n\t\t\tis insufficient or you have a disk failure.");
227  m_warned = true;
228  }
229  // wait until some was written to disk, and try again
230  if (!m_bufferWasFreed.wait(locker.mutex(), 1000))
231  {
232  LOG(VB_GENERAL, LOG_DEBUG, LOC +
233  QString("Taking a long time waiting to write.. "
234  "buffer size %1 (needing %2, %3 to go)")
235  .arg(m_totalBufferUse).arg(towrite)
236  .arg(towrite-(kMaxBufferSize-m_totalBufferUse)));
237  }
238  continue;
239  }
240 
241  TFWBuffer *buf = nullptr;
242 
243  if (!m_writeBuffers.empty() &&
244  (m_writeBuffers.back()->data.size() + towrite) < kMinWriteSize)
245  {
246  buf = m_writeBuffers.back();
247  m_writeBuffers.pop_back();
248  }
249  else
250  {
251  if (!m_emptyBuffers.empty())
252  {
253  buf = m_emptyBuffers.front();
254  m_emptyBuffers.pop_front();
255  buf->data.clear();
256  }
257  else
258  {
259  buf = new TFWBuffer();
260  }
261  }
262 
263  m_totalBufferUse += towrite;
264 
265  const char *cdata = (const char*) data + written;
266  buf->data.insert(buf->data.end(), cdata, cdata+towrite);
267  buf->lastUsed = MythDate::current();
268 
269  m_writeBuffers.push_back(buf);
270 
271  if ((m_writeBuffers.size() > 1) || (buf->data.size() >= kMinWriteSize))
272  {
273  m_bufferHasData.wakeAll();
274  }
275 
276  written += towrite;
277  left -= towrite;
278  }
279 
280  LOG(VB_FILE, LOG_DEBUG, LOC + QString("Write(*, %1) total %2 cnt %3")
281  .arg(count,4).arg(m_totalBufferUse).arg(m_writeBuffers.size()));
282 
283  return count;
284 }
285 
297 long long ThreadedFileWriter::Seek(long long pos, int whence)
298 {
299  QMutexLocker locker(&m_buflock);
300  m_flush = true;
301  while (!m_writeBuffers.empty())
302  {
303  m_bufferHasData.wakeAll();
304  if (!m_bufferEmpty.wait(locker.mutex(), 2000))
305  {
306  LOG(VB_GENERAL, LOG_WARNING, LOC +
307  QString("Taking a long time to flush.. buffer size %1")
308  .arg(m_totalBufferUse));
309  }
310  }
311  m_flush = false;
312  return lseek(m_fd, pos, whence);
313 }
314 
319 {
320  QMutexLocker locker(&m_buflock);
321  m_flush = true;
322  while (!m_writeBuffers.empty())
323  {
324  m_bufferHasData.wakeAll();
325  if (!m_bufferEmpty.wait(locker.mutex(), 2000))
326  {
327  LOG(VB_GENERAL, LOG_WARNING, LOC +
328  QString("Taking a long time to flush.. buffer size %1")
329  .arg(m_totalBufferUse));
330  }
331  }
332  m_flush = false;
333 }
334 
356 {
357  if (m_fd >= 0)
358  {
359 #if defined(_POSIX_SYNCHRONIZED_IO) && _POSIX_SYNCHRONIZED_IO > 0
360  // fdatasync tries to avoid updating metadata, but will in
361  // practice always update metadata if any data is written
362  // as the file will usually have grown.
363  fdatasync(m_fd);
364 #else
365  fsync(m_fd);
366 #endif
367  }
368 }
369 
375 {
376  QMutexLocker locker(&m_buflock);
377  if (newMinSize > 0)
378  m_tfw_min_write_size = newMinSize;
379  m_bufferHasData.wakeAll();
380 }
381 
386 {
387  QMutexLocker locker(&m_buflock);
388  while (!m_in_dtor)
389  {
390  locker.unlock();
391 
392  Sync();
393 
394  locker.relock();
395 
397  {
398  // we aren't going to write to the disk anymore, so can de-register
400  m_registered = false;
401  }
402  m_bufferSyncWait.wait(&m_buflock, 1000);
403  }
404 }
405 
410 {
411 #ifndef _WIN32
412  // don't exit program if file gets larger than quota limit..
413  signal(SIGXFSZ, SIG_IGN);
414 #endif
415 
416  QMutexLocker locker(&m_buflock);
417 
418  // Even if the bytes buffered is less than the minimum write
419  // size we do want to write to the OS buffers periodically.
420  // This timer makes sure we do.
421  MythTimer minWriteTimer, lastRegisterTimer;
422  minWriteTimer.start();
423  lastRegisterTimer.start();
424 
425  uint64_t total_written = 0LL;
426 
427  while (!m_in_dtor)
428  {
429  if (m_ignore_writes)
430  {
431  while (!m_writeBuffers.empty())
432  {
433  delete m_writeBuffers.front();
434  m_writeBuffers.pop_front();
435  }
436  while (!m_emptyBuffers.empty())
437  {
438  delete m_emptyBuffers.front();
439  m_emptyBuffers.pop_front();
440  }
441  m_bufferEmpty.wakeAll();
442  m_bufferHasData.wait(locker.mutex());
443  continue;
444  }
445 
446  if (m_writeBuffers.empty())
447  {
448  m_bufferEmpty.wakeAll();
449  m_bufferHasData.wait(locker.mutex(), 1000);
451  continue;
452  }
453 
454  int mwte = minWriteTimer.elapsed();
455  if (!m_flush && (mwte < 250) && (m_totalBufferUse < kMinWriteSize))
456  {
457  m_bufferHasData.wait(locker.mutex(), 250 - mwte);
459  continue;
460  }
461 
462  if (m_fd == -1)
463  {
464  m_bufferHasData.wait(locker.mutex(), 200);
466  continue;
467  }
468 
469  TFWBuffer *buf = m_writeBuffers.front();
470  m_writeBuffers.pop_front();
471  m_totalBufferUse -= buf->data.size();
472  m_bufferWasFreed.wakeAll();
473  minWriteTimer.start();
474 
476 
477  const void *data = &(buf->data[0]);
478  uint sz = buf->data.size();
479 
480  bool write_ok = true;
481  uint tot = 0;
482  uint errcnt = 0;
483 
484  LOG(VB_FILE, LOG_DEBUG, LOC + QString("write(%1) cnt %2 total %3")
485  .arg(sz).arg(m_writeBuffers.size())
486  .arg(m_totalBufferUse));
487 
488  MythTimer writeTimer;
489  writeTimer.start();
490 
491  while ((tot < sz) && !m_in_dtor)
492  {
493  locker.unlock();
494 
495  int ret = write(m_fd, (char *)data + tot, sz - tot);
496 
497  if (ret < 0)
498  {
499  if (errno == EAGAIN)
500  {
501  LOG(VB_GENERAL, LOG_WARNING, LOC + "Got EAGAIN.");
502  }
503  else
504  {
505  errcnt++;
506  LOG(VB_GENERAL, LOG_ERR, LOC + "File I/O " +
507  QString(" errcnt: %1").arg(errcnt) + ENO);
508  }
509 
510  if ((errcnt >= 3) || (ENOSPC == errno) || (EFBIG == errno))
511  {
512  locker.relock();
513  write_ok = false;
514  break;
515  }
516  }
517  else
518  {
519  tot += ret;
520  total_written += ret;
521  LOG(VB_FILE, LOG_DEBUG, LOC +
522  QString("total written so far: %1 bytes")
523  .arg(total_written));
524  }
525 
526  locker.relock();
527 
528  if ((tot < sz) && !m_in_dtor)
529  m_bufferHasData.wait(locker.mutex(), 50);
530  }
531 
533 
534  if (lastRegisterTimer.elapsed() >= 10000)
535  {
537  m_registered = true;
538  lastRegisterTimer.restart();
539  }
540 
541  buf->lastUsed = MythDate::current();
542  m_emptyBuffers.push_back(buf);
543 
544  if (writeTimer.elapsed() > 1000)
545  {
546  LOG(VB_GENERAL, LOG_WARNING, LOC +
547  QString("write(%1) cnt %2 total %3 -- took a long time, %4 ms")
548  .arg(sz).arg(m_writeBuffers.size())
549  .arg(m_totalBufferUse).arg(writeTimer.elapsed()));
550  }
551 
552  if (!write_ok && ((EFBIG == errno) || (ENOSPC == errno)))
553  {
554  QString msg;
555  switch (errno)
556  {
557  case EFBIG:
558  msg =
559  "Maximum file size exceeded by '%1'"
560  "\n\t\t\t"
561  "You must either change the process ulimits, configure"
562  "\n\t\t\t"
563  "your operating system with \"Large File\" support, "
564  "or use"
565  "\n\t\t\t"
566  "a filesystem which supports 64-bit or 128-bit files."
567  "\n\t\t\t"
568  "HINT: FAT32 is a 32-bit filesystem.";
569  break;
570  case ENOSPC:
571  msg =
572  "No space left on the device for file '%1'"
573  "\n\t\t\t"
574  "file will be truncated, no further writing "
575  "will be done.";
576  break;
577  }
578 
579  LOG(VB_GENERAL, LOG_ERR, LOC + msg.arg(m_filename));
580  m_ignore_writes = true;
581  }
582  }
583 }
584 
586 {
587  QDateTime cur = MythDate::current();
588  QDateTime cur_m_60 = cur.addSecs(-60);
589 
590  QList<TFWBuffer*>::iterator it = m_emptyBuffers.begin();
591  while (it != m_emptyBuffers.end())
592  {
593  if (((*it)->lastUsed < cur_m_60) ||
594  ((*it)->data.capacity() > 3 * (*it)->data.size() &&
595  (*it)->data.capacity() > 64 * 1024))
596  {
597  delete *it;
598  it = m_emptyBuffers.erase(it);
599  continue;
600  }
601  ++it;
602  }
603 }
604 
613 {
614  bool old = m_blocking;
615  m_blocking = block;
616  return old;
617 }
QList< TFWBuffer * > m_emptyBuffers
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:215
#define fsync(FD)
Definition: compat.h:136
ThreadedFileWriter * m_parent
void start(QThread::Priority=QThread::InheritPriority)
Tell MThread to start running the thread in the near future.
Definition: mthread.cpp:294
def write(text, progress=True)
Definition: mythburn.py:279
int restart(void)
Returns milliseconds elapsed since last start() or restart() and resets the count.
Definition: mythtimer.cpp:62
TFWSyncThread * m_syncThread
A QElapsedTimer based timer to replace use of QTime as a timer.
Definition: mythtimer.h:13
void RegisterFileForWrite(const QString &file, uint64_t size=0LL)
friend class TFWWriteThread
bool wait(unsigned long time=ULONG_MAX)
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:311
friend class TFWSyncThread
#define lseek
unsigned int uint
Definition: compat.h:140
MythCoreContext * gCoreContext
This global variable contains the MythCoreContext instance for the app.
long long Seek(long long pos, int whence)
Seek to a position within stream; May be unsafe.
QWaitCondition m_bufferWasFreed
bool ReOpen(const QString &newFilename="")
Reopens the file we are writing to or opens a new file.
int Write(const void *data, uint count)
Writes data to the end of the write buffer.
#define close
Definition: compat.h:16
QDateTime current(bool stripped)
Returns current Date and Time in UTC.
Definition: mythdate.cpp:10
QWaitCondition m_bufferEmpty
void Flush(void)
Allow DiskLoop() to flush buffer completely ignoring low watermark.
void SetWriteBufferMinWriteSize(uint newMinSize=kMinWriteSize)
Sets the minumum number of bytes to write to disk in a single write.
void run(void) override
Runs ThreadedFileWriter::DiskLoop(void)
#define ENO
This can be appended to the LOG args with "+".
Definition: mythlogging.h:99
void UnregisterFileForWrite(const QString &file)
void DiskLoop(void)
The thread run method that actually calls writes to disk.
ThreadedFileWriter * m_parent
int elapsed(void) const
Returns milliseconds elapsed since last start() or restart()
Definition: mythtimer.cpp:90
~ThreadedFileWriter()
Commits all writes and closes the file.
bool Open(void)
Opens the file we will be writing to.
#define LOG(_MASK_, _LEVEL_, _STRING_)
Definition: mythlogging.h:41
void run(void) override
Runs ThreadedFileWriter::SyncLoop(void)
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:202
QWaitCondition m_bufferSyncWait
static const uint kMaxBlockSize
Maximum block size to write at a time.
void Sync(void)
Flush data written to the file descriptor to disk.
static const uint kMinWriteSize
Minimum to write to disk in a single write, when not flushing buffer.
TFWWriteThread * m_writeThread
#define LOC
void SyncLoop(void)
The thread run method that calls Sync(void).
void start(void)
starts measuring elapsed time.
Definition: mythtimer.cpp:47
QList< TFWBuffer * > m_writeBuffers
QWaitCondition m_bufferHasData
bool SetBlocking(bool block=true)
Set write blocking mode While in blocking mode, ThreadedFileWriter::Write will wait for buffers to be...
static const uint kMaxBufferSize