MythTV  master
ringbuffer.cpp
Go to the documentation of this file.
1 // ANSI C headers
2 #include <cmath>
3 #include <cstdio>
4 #include <cstdlib>
5 #include <cerrno>
6 #include <chrono> // for milliseconds
7 #include <thread> // for sleep_for
8 
9 // POSIX C headers
10 #include <sys/types.h>
11 #include <sys/time.h>
12 #include <unistd.h>
13 #include <fcntl.h>
14 
15 // Qt headers
16 #include <QFile>
17 #include <QDateTime>
18 #include <QReadLocker>
19 
20 #include "threadedfilewriter.h"
21 #include "fileringbuffer.h"
22 #include "streamingringbuffer.h"
23 #include "mythmiscutil.h"
24 #include "dvdstream.h"
25 #include "livetvchain.h"
26 #include "mythcontext.h"
27 #include "ringbuffer.h"
28 #include "mythconfig.h"
29 #include "remotefile.h"
30 #include "compat.h"
31 #include "mythdate.h"
32 #include "mythtimer.h"
33 #include "mythlogging.h"
34 #include "DVD/dvdringbuffer.h"
35 #include "Bluray/bdringbuffer.h"
37 #include "mythcdrom.h"
38 
39 const int RingBuffer::kDefaultOpenTimeout = 2000; // ms
40 const int RingBuffer::kLiveTVOpenTimeout = 10000;
41 
42 #define LOC QString("RingBuf(%1): ").arg(m_filename)
43 
45 QStringList RingBuffer::s_subExt;
46 QStringList RingBuffer::s_subExtNoCheck;
47 
48 extern "C" {
49 #include "libavformat/avformat.h"
50 }
52 
53 /*
54  Locking relations:
55  rwlock->poslock->rbrlock->rbwlock
56 
57  A child should never lock any of the parents without locking
58  the parent lock before the child lock.
59  void RingBuffer::Example1()
60  {
61  poslock.lockForWrite();
62  rwlock.lockForRead(); // error!
63  blah(); // <- does not implicitly acquire any locks
64  rwlock.unlock();
65  poslock.unlock();
66  }
67  void RingBuffer::Example2()
68  {
69  rwlock.lockForRead();
70  rbrlock.lockForWrite(); // ok!
71  blah(); // <- does not implicitly acquire any locks
72  rbrlock.unlock();
73  rwlock.unlock();
74  }
75 */
76 
105  const QString &xfilename, bool write,
106  bool usereadahead, int timeout_ms, bool stream_only)
107 {
108  QString lfilename = xfilename;
109  QString lower = lfilename.toLower();
110 
111  if (write)
112  return new FileRingBuffer(lfilename, write, usereadahead, timeout_ms);
113 
114  bool dvddir = false;
115  bool bddir = false;
116  bool httpurl = lower.startsWith("http://") || lower.startsWith("https://");
117  bool iptvurl =
118  lower.startsWith("rtp://") || lower.startsWith("tcp://") ||
119  lower.startsWith("udp://");
120  bool mythurl = lower.startsWith("myth://");
121  bool bdurl = lower.startsWith("bd:");
122  bool dvdurl = lower.startsWith("dvd:");
123  bool imgext = lower.endsWith(".img") || lower.endsWith(".iso");
124 
125  if (imgext)
126  {
127  switch (MythCDROM::inspectImage(lfilename))
128  {
129  case MythCDROM::kBluray:
130  bdurl = true;
131  break;
132 
133  case MythCDROM::kDVD:
134  dvdurl = true;
135  break;
136 
137  default:
138  break;
139  }
140  }
141 
142  if (httpurl || iptvurl)
143  {
144  if (!iptvurl && HLSRingBuffer::TestForHTTPLiveStreaming(lfilename))
145  {
146  return new HLSRingBuffer(lfilename);
147  }
148  return new StreamingRingBuffer(lfilename);
149  }
150  if (!stream_only && mythurl)
151  {
152  struct stat fileInfo {};
153  if ((RemoteFile::Exists(lfilename, &fileInfo)) &&
154  (S_ISDIR(fileInfo.st_mode)))
155  {
156  if (RemoteFile::Exists(lfilename + "/VIDEO_TS"))
157  dvddir = true;
158  else if (RemoteFile::Exists(lfilename + "/BDMV"))
159  bddir = true;
160  }
161  }
162  else if (!stream_only && !mythurl)
163  {
164  if (QFile::exists(lfilename + "/VIDEO_TS"))
165  dvddir = true;
166  else if (QFile::exists(lfilename + "/BDMV"))
167  bddir = true;
168  }
169 
170  if (!stream_only && (dvdurl || dvddir))
171  {
172  if (lfilename.startsWith("dvd:")) // URI "dvd:" + path
173  lfilename.remove(0,4); // e.g. "dvd:/dev/dvd"
174 
175  if (!(mythurl || QFile::exists(lfilename)))
176  lfilename = "/dev/dvd";
177  LOG(VB_PLAYBACK, LOG_INFO, "Trying DVD at " + lfilename);
178 
179  return new DVDRingBuffer(lfilename);
180  }
181  if (!stream_only && (bdurl || bddir))
182  {
183  if (lfilename.startsWith("bd:")) // URI "bd:" + path
184  lfilename.remove(0,3); // e.g. "bd:/videos/ET"
185 
186  if (!(mythurl || QFile::exists(lfilename)))
187  lfilename = "/dev/dvd";
188  LOG(VB_PLAYBACK, LOG_INFO, "Trying BD at " + lfilename);
189 
190  return new BDRingBuffer(lfilename);
191  }
192 
193  if (!mythurl && imgext && lfilename.startsWith("dvd:"))
194  {
195  LOG(VB_PLAYBACK, LOG_INFO, "DVD image at " + lfilename);
196  return new DVDStream(lfilename);
197  }
198  if (!mythurl && lower.endsWith(".vob") && lfilename.contains("/VIDEO_TS/"))
199  {
200  LOG(VB_PLAYBACK, LOG_INFO, "DVD VOB at " + lfilename);
201  auto *s = new DVDStream(lfilename);
202  if (s && s->IsOpen())
203  return s;
204 
205  delete s;
206  }
207 
208  return new FileRingBuffer(
209  lfilename, write, usereadahead, timeout_ms);
210 }
211 
213  MThread("RingBuffer"),
214  m_type(rbtype)
215 {
216  {
217  QMutexLocker locker(&s_subExtLock);
218  if (s_subExt.empty())
219  {
220  // Possible subtitle file extensions '.srt', '.sub', '.txt'
221  // since #9294 also .ass and .ssa
222  s_subExt += ".ass";
223  s_subExt += ".srt";
224  s_subExt += ".ssa";
225  s_subExt += ".sub";
226  s_subExt += ".txt";
227 
228  // Extensions for which a subtitle file should not exist
230  s_subExtNoCheck += ".gif";
231  s_subExtNoCheck += ".png";
232  }
233  }
234 }
235 
236 #undef NDEBUG
237 #include <cassert>
238 
247 {
248  assert(!isRunning());
249  wait();
250 
251  delete [] m_readAheadBuffer;
252  m_readAheadBuffer = nullptr;
253 
254  if (m_tfw)
255  {
256  m_tfw->Flush();
257  delete m_tfw;
258  m_tfw = nullptr;
259  }
260 }
261 
265 void RingBuffer::Reset(bool full, bool toAdjust, bool resetInternal)
266 {
267  LOG(VB_FILE, LOG_INFO, LOC + QString("Reset(%1,%2,%3)")
268  .arg(full).arg(toAdjust).arg(resetInternal));
269 
270  m_rwLock.lockForWrite();
271  m_posLock.lockForWrite();
272 
273  m_numFailures = 0;
274  m_commsError = false;
275  m_setSwitchToNext = false;
276 
277  m_writePos = 0;
278  m_readPos = (toAdjust) ? (m_readPos - m_readAdjust) : 0;
279 
280  if (m_readPos != 0)
281  {
282  LOG(VB_GENERAL, LOG_ERR, LOC +
283  QString("RingBuffer::Reset() nonzero readpos. toAdjust: %1 "
284  "readpos: %2 readAdjust: %3")
285  .arg(toAdjust).arg(m_readPos).arg(m_readAdjust));
286  }
287 
288  m_readAdjust = 0;
289  m_readPos = (m_readPos < 0) ? 0 : m_readPos;
290 
291  if (full)
293 
294  if (resetInternal)
296 
297  m_generalWait.wakeAll();
298  m_posLock.unlock();
299  m_rwLock.unlock();
300 }
301 
308 {
309  LOG(VB_FILE, LOG_INFO, LOC +
310  QString("UpdateRawBitrate(%1Kb)").arg(raw_bitrate));
311 
312  // an audio only stream could be as low as 64Kb (DVB radio) and
313  // an MHEG only stream is likely to be reported as 0Kb
314  if (raw_bitrate < 64)
315  {
316  LOG(VB_FILE, LOG_INFO, LOC +
317  QString("Bitrate too low - setting to 64Kb"));
318  raw_bitrate = 64;
319  }
320  else if (raw_bitrate > 100000)
321  {
322  LOG(VB_FILE, LOG_INFO, LOC +
323  QString("Bitrate too high - setting to 100Mb"));
324  raw_bitrate = 100000;
325  }
326 
327  m_rwLock.lockForWrite();
328  m_rawBitrate = raw_bitrate;
330  m_bitrateInitialized = true;
331  m_rwLock.unlock();
332 }
333 
338 void RingBuffer::UpdatePlaySpeed(float play_speed)
339 {
340  m_rwLock.lockForWrite();
341  m_playSpeed = play_speed;
343  m_rwLock.unlock();
344 }
345 
351 void RingBuffer::SetBufferSizeFactors(bool estbitrate, bool matroska)
352 {
353  m_rwLock.lockForWrite();
354  m_unknownBitrate = estbitrate;
355  m_fileIsMatroska = matroska;
356  m_rwLock.unlock();
358 }
359 
368 {
369  uint estbitrate = 0;
370 
371  m_readsAllowed = false;
372  m_readsDesired = false;
373 
374  // loop without sleeping if the buffered data is less than this
375  m_fillThreshold = 7 * m_bufferSize / 8;
376 
377  const uint KB2 = 2*1024;
378  const uint KB4 = 4*1024;
379  const uint KB8 = 8*1024;
380  const uint KB16 = 16*1024;
381  const uint KB32 = 32*1024;
382  const uint KB64 = 64*1024;
383  const uint KB128 = 128*1024;
384  const uint KB256 = 256*1024;
385  const uint KB512 = 512*1024;
386 
387  estbitrate = (uint) max(abs(m_rawBitrate * m_playSpeed),
388  0.5F * m_rawBitrate);
389  estbitrate = min(m_rawBitrate * 3, estbitrate);
390  int const rbs = (estbitrate > 18000) ? KB512 :
391  (estbitrate > 9000) ? KB256 :
392  (estbitrate > 5000) ? KB128 :
393  (estbitrate > 2500) ? KB64 :
394  (estbitrate > 1250) ? KB32 :
395  (estbitrate >= 500) ? KB16 :
396  (estbitrate > 250) ? KB8 :
397  (estbitrate > 125) ? KB4 : KB2;
398  if (rbs < CHUNK)
399  {
400  m_readBlockSize = rbs;
401  }
402  else
403  {
405  }
406 
407  // minimum seconds of buffering before allowing read
408  float secs_min = 0.3;
409  // set the minimum buffering before allowing ffmpeg read
410  m_fillMin = (uint) ((estbitrate * 1000 * secs_min) * 0.125F);
411  // make this a multiple of ffmpeg block size..
412  if (m_fillMin >= CHUNK || rbs >= CHUNK)
413  {
414  if (m_lowBuffers)
415  {
416  LOG(VB_GENERAL, LOG_INFO, LOC +
417  "Buffering optimisations disabled.");
418  }
419  m_lowBuffers = false;
420  m_fillMin = ((m_fillMin / CHUNK) + 1) * CHUNK;
421  m_fillMin = min((uint)m_fillMin, m_bufferSize / 2);
422  }
423  else
424  {
425  m_lowBuffers = true;
426  LOG(VB_GENERAL, LOG_WARNING, "Enabling buffering optimisations "
427  "for low bitrate stream.");
428  }
429 
430  LOG(VB_FILE, LOG_INFO, LOC +
431  QString("CalcReadAheadThresh(%1 Kb)\n\t\t\t -> "
432  "threshold(%2 KB) min read(%3 KB) blk size(%4 KB)")
433  .arg(estbitrate).arg(m_fillThreshold/1024)
434  .arg(m_fillMin/1024).arg(m_readBlockSize/1024));
435 }
436 
437 bool RingBuffer::IsNearEnd(double /*fps*/, uint vvf) const
438 {
439  QReadLocker lock(&m_rwLock);
440 
441  if (!m_ateof && !m_setSwitchToNext)
442  {
443  // file is still being read, so can't be finished
444  return false;
445  }
446 
447  m_posLock.lockForRead();
448  long long rp = m_readPos;
449  long long sz = m_internalReadPos - m_readPos;
450  m_posLock.unlock();
451 
452  // telecom kilobytes (i.e. 1000 per k not 1024)
453  uint tmp = (uint) max(abs(m_rawBitrate * m_playSpeed), 0.5F * m_rawBitrate);
454  uint kbits_per_sec = min(m_rawBitrate * 3, tmp);
455  if (kbits_per_sec == 0)
456  return false;
457 
458  double readahead_time = sz / (kbits_per_sec * (1000.0/8.0));
459 
460  bool near_end = readahead_time <= 1.5;
461  LOG(VB_PLAYBACK, LOG_INFO, LOC + "IsReallyNearEnd()" +
462  QString(" br(%1KB)").arg(kbits_per_sec/8) +
463  QString(" sz(%1KB)").arg(sz / 1000LL) +
464  QString(" vfl(%1)").arg(vvf) +
465  QString(" time(%1)").arg(readahead_time) +
466  QString(" rawbitrate(%1)").arg(m_rawBitrate) +
467  QString(" avail(%1)").arg(sz) +
468  QString(" internal_size(%1)").arg(m_internalReadPos) +
469  QString(" readposition(%1)").arg(rp) +
470  QString(" stopreads(%1)").arg(m_stopReads) +
471  QString(" paused(%1)").arg(m_paused) +
472  QString(" ne:%1").arg(near_end));
473 
474  return near_end;
475 }
476 
479 int RingBuffer::ReadBufFree(void) const
480 {
481  m_rbrLock.lockForRead();
482  m_rbwLock.lockForRead();
483  int ret = ((m_rbwPos >= m_rbrPos) ? m_rbrPos + m_bufferSize : m_rbrPos) - m_rbwPos - 1;
484  m_rbwLock.unlock();
485  m_rbrLock.unlock();
486  return ret;
487 }
488 
491 {
492  QReadLocker lock(&m_rwLock);
493 
494  return ReadBufAvail();
495 }
496 
497 long long RingBuffer::GetRealFileSize(void) const
498 {
499  {
500  QReadLocker lock(&m_rwLock);
501  if (m_readInternalMode)
502  {
503  return ReadBufAvail();
504  }
505  }
506 
507  return GetRealFileSizeInternal();
508 }
509 
510 long long RingBuffer::Seek(long long pos, int whence, bool has_lock)
511 {
512  LOG(VB_FILE, LOG_INFO, LOC + QString("Seek(%1,%2,%3)")
513  .arg(pos).arg((SEEK_SET==whence)?"SEEK_SET":
514  ((SEEK_CUR==whence)?"SEEK_CUR":"SEEK_END"))
515  .arg(has_lock?"locked":"unlocked"));
516 
517  if (!has_lock)
518  {
519  m_rwLock.lockForWrite();
520  }
521 
522  long long ret;
523 
524  if (m_readInternalMode)
525  {
526  m_posLock.lockForWrite();
527  // only valid for SEEK_SET & SEEK_CUR
528  switch (whence)
529  {
530  case SEEK_SET:
531  m_readPos = pos;
532  break;
533  case SEEK_CUR:
534  m_readPos += pos;
535  break;
536  case SEEK_END:
537  m_readPos = ReadBufAvail() - pos;
538  break;
539  }
541  m_posLock.unlock();
542  ret = m_readPos;
543  }
544  else
545  {
546  ret = SeekInternal(pos, whence);
547  }
548 
549  m_generalWait.wakeAll();
550 
551  if (!has_lock)
552  {
553  m_rwLock.unlock();
554  }
555  return ret;
556 }
557 
559 {
560  QWriteLocker lock(&m_rwLock);
561  bool old = m_readInternalMode;
562 
563  if (mode == old)
564  {
565  return old;
566  }
567 
568  m_readInternalMode = mode;
569 
570  if (!mode)
571  {
572  // adjust real read position in ringbuffer
573  m_rbrLock.lockForWrite();
575  m_generalWait.wakeAll();
576  m_rbrLock.unlock();
577  // reset the read offset as we are exiting the internal read mode
578  m_readOffset = 0;
579  }
580 
581  LOG(VB_FILE, LOG_DEBUG, LOC +
582  QString("SetReadInternalMode: %1").arg(mode ? "on" : "off"));
583 
584  return old;
585 }
586 
590 {
591  m_rbrLock.lockForRead();
592  m_rbwLock.lockForRead();
594  m_rbwLock.unlock();
595  m_rbrLock.unlock();
596  return ret;
597 }
598 
610 void RingBuffer::ResetReadAhead(long long newinternal)
611 {
612  LOG(VB_FILE, LOG_INFO, LOC +
613  QString("ResetReadAhead(internalreadpos = %1->%2)")
614  .arg(m_internalReadPos).arg(newinternal));
615 
616  m_readInternalMode = false;
617  m_readOffset = 0;
618 
619  m_rbrLock.lockForWrite();
620  m_rbwLock.lockForWrite();
621 
623 
624  m_rbrPos = 0;
625  m_rbwPos = 0;
626  m_internalReadPos = newinternal;
627  m_ateof = false;
628  m_readsAllowed = false;
629  m_readsDesired = false;
630  m_recentSeek = true;
631  m_setSwitchToNext = false;
632 
633  m_generalWait.wakeAll();
634 
635  m_rbwLock.unlock();
636  m_rbrLock.unlock();
637 }
638 
654 {
655  bool do_start = true;
656 
657  m_rwLock.lockForWrite();
658  if (!m_startReadAhead)
659  {
660  do_start = false;
661  }
662  else if (m_writeMode)
663  {
664  LOG(VB_GENERAL, LOG_WARNING, LOC + "Not starting read ahead thread, "
665  "this is a write only RingBuffer");
666  do_start = false;
667  }
668  else if (m_readAheadRunning)
669  {
670  LOG(VB_GENERAL, LOG_WARNING, LOC + "Not starting read ahead thread, "
671  "already running");
672  do_start = false;
673  }
674 
675  if (!do_start)
676  {
677  m_rwLock.unlock();
678  return;
679  }
680 
681  StartReads();
682 
683  MThread::start();
684 
686  m_generalWait.wait(&m_rwLock);
687 
688  m_rwLock.unlock();
689 }
690 
695 {
696  while (isRunning())
697  {
698  m_rwLock.lockForWrite();
699  m_readAheadRunning = false;
700  StopReads();
701  m_generalWait.wakeAll();
702  m_rwLock.unlock();
703  MThread::wait(5000);
704  }
705 }
706 
712 {
713  LOG(VB_FILE, LOG_INFO, LOC + "StopReads()");
714  m_stopReads = true;
715  m_generalWait.wakeAll();
716 }
717 
723 {
724  LOG(VB_FILE, LOG_INFO, LOC + "StartReads()");
725  m_stopReads = false;
726  m_generalWait.wakeAll();
727 }
728 
734 {
735  LOG(VB_FILE, LOG_INFO, LOC + "Pause()");
736  StopReads();
737 
738  m_rwLock.lockForWrite();
739  m_requestPause = true;
740  m_rwLock.unlock();
741 }
742 
748 {
749  LOG(VB_FILE, LOG_INFO, LOC + "Unpause()");
750  StartReads();
751 
752  m_rwLock.lockForWrite();
753  m_requestPause = false;
754  m_generalWait.wakeAll();
755  m_rwLock.unlock();
756 }
757 
759 bool RingBuffer::isPaused(void) const
760 {
761  m_rwLock.lockForRead();
762  bool ret = !m_readAheadRunning || m_paused;
763  m_rwLock.unlock();
764  return ret;
765 }
766 
771 {
772  MythTimer t;
773  t.start();
774 
775  m_rwLock.lockForRead();
777  {
778  m_generalWait.wait(&m_rwLock, 1000);
779  if (m_readAheadRunning && !m_paused && m_requestPause && t.elapsed() > 1000)
780  {
781  LOG(VB_GENERAL, LOG_WARNING, LOC +
782  QString("Waited %1 ms for ringbuffer pause..")
783  .arg(t.elapsed()));
784  }
785  }
786  m_rwLock.unlock();
787 }
788 
790 {
791  const uint timeout = 500; // ms
792 
793  if (m_requestPause)
794  {
795  if (!m_paused)
796  {
797  m_rwLock.unlock();
798  m_rwLock.lockForWrite();
799 
800  if (m_requestPause)
801  {
802  m_paused = true;
803  m_generalWait.wakeAll();
804  }
805 
806  m_rwLock.unlock();
807  m_rwLock.lockForRead();
808  }
809 
812  }
813 
814  if (!m_requestPause && m_paused)
815  {
816  m_rwLock.unlock();
817  m_rwLock.lockForWrite();
818 
819  if (!m_requestPause)
820  {
821  m_paused = false;
822  m_generalWait.wakeAll();
823  }
824 
825  m_rwLock.unlock();
826  m_rwLock.lockForRead();
827  }
828 
829  return m_requestPause || m_paused;
830 }
831 
833 {
834  m_rwLock.lockForWrite();
835  m_posLock.lockForWrite();
836 
837  uint oldsize = m_bufferSize;
838  uint newsize = BUFFER_SIZE_MINIMUM;
839  if (m_remotefile)
840  {
841  newsize *= BUFFER_FACTOR_NETWORK;
842  if (m_fileIsMatroska)
843  newsize *= BUFFER_FACTOR_MATROSKA;
844  if (m_unknownBitrate)
845  newsize *= BUFFER_FACTOR_BITRATE;
846  }
847 
848  // N.B. Don't try and make it smaller - bad things happen...
849  if (m_readAheadBuffer && oldsize >= newsize)
850  {
851  m_posLock.unlock();
852  m_rwLock.unlock();
853  return;
854  }
855 
856  m_bufferSize = newsize;
857  if (m_readAheadBuffer)
858  {
859  char* newbuffer = new char[m_bufferSize + 1024];
860  memcpy(newbuffer, m_readAheadBuffer + m_rbwPos, oldsize - m_rbwPos);
861  memcpy(newbuffer + (oldsize - m_rbwPos), m_readAheadBuffer, m_rbwPos);
862  delete [] m_readAheadBuffer;
863  m_readAheadBuffer = newbuffer;
865  (m_rbrPos + oldsize - m_rbwPos);
866  m_rbwPos = oldsize;
867  }
868  else
869  {
870  m_readAheadBuffer = new char[m_bufferSize + 1024];
871  }
873  m_posLock.unlock();
874  m_rwLock.unlock();
875 
876  LOG(VB_FILE, LOG_INFO, LOC + QString("Created readAheadBuffer: %1Mb")
877  .arg(newsize >> 20));
878 }
879 
880 void RingBuffer::run(void)
881 {
882  RunProlog();
883 
884  // These variables are used to adjust the read block size
885  struct timeval lastread {};
886  struct timeval now {};
887  int readtimeavg = 300;
888  bool ignore_for_read_timing = true;
889  int eofreads = 0;
890 
891  gettimeofday(&lastread, nullptr); // this is just to keep gcc happy
892 
894  m_rwLock.lockForWrite();
895  m_posLock.lockForWrite();
896  m_requestPause = false;
897  ResetReadAhead(0);
898  m_readAheadRunning = true;
899  m_reallyRunning = true;
900  m_generalWait.wakeAll();
901  m_posLock.unlock();
902  m_rwLock.unlock();
903 
904  // NOTE: this must loop at some point hold only
905  // a read lock on rwlock, so that other functions
906  // such as reset and seek can take priority.
907 
908  m_rwLock.lockForRead();
909 
910  LOG(VB_FILE, LOG_INFO, LOC +
911  QString("Initial readblocksize %1K & fill_min %2K")
912  .arg(m_readBlockSize/1024).arg(m_fillMin/1024));
913 
914  while (m_readAheadRunning)
915  {
916  m_rwLock.unlock();
917  bool isopened = IsOpen();
918  m_rwLock.lockForRead();
919 
920  if (!isopened)
921  {
922  LOG(VB_FILE, LOG_WARNING, LOC +
923  QString("File not opened, terminating readahead thread"));
924  m_posLock.lockForWrite();
925  m_readAheadRunning = false;
926  m_generalWait.wakeAll();
927  m_posLock.unlock();
928  break;
929  }
930  if (PauseAndWait())
931  {
932  ignore_for_read_timing = true;
933  LOG(VB_FILE, LOG_DEBUG, LOC +
934  "run: PauseAndWait Not reading continuing");
935  continue;
936  }
937 
938  long long totfree = ReadBufFree();
939 
940  const uint KB32 = 32*1024;
941  const int KB512 = 512*1024;
942  // These are conditions where we don't want to go through
943  // the loop if they are true.
944  if (((totfree < KB32) && m_readsAllowed) ||
946  {
947  ignore_for_read_timing |=
949  m_generalWait.wait(&m_rwLock, (m_stopReads) ? 50 : 1000);
950  LOG(VB_FILE, LOG_DEBUG, LOC +
951  QString("run: Not reading continuing: totfree(%1) "
952  "readsallowed(%2) ignorereadpos(%3) commserror(%4) "
953  "stopreads(%5)")
954  .arg(totfree).arg(m_readsAllowed).arg(m_ignoreReadPos)
955  .arg(m_commsError).arg(m_stopReads));
956  continue;
957  }
958 
959  // These are conditions where we want to sleep to allow
960  // other threads to do stuff.
962  {
963  ignore_for_read_timing = true;
964  m_generalWait.wait(&m_rwLock, 1000);
965  totfree = ReadBufFree();
966  }
967 
968  int read_return = -1;
969  if (totfree >= KB32 && !m_commsError &&
971  {
972  // limit the read size
973  if (m_readBlockSize > totfree)
974  totfree = (totfree / KB32) * KB32; // must be multiple of 32KB
975  else
976  totfree = m_readBlockSize;
977 
978  // adapt blocksize
979  gettimeofday(&now, nullptr);
980  if (!ignore_for_read_timing)
981  {
982  int readinterval = (now.tv_sec - lastread.tv_sec ) * 1000 +
983  (now.tv_usec - lastread.tv_usec) / 1000;
984  readtimeavg = (readtimeavg * 9 + readinterval) / 10;
985 
986  if (readtimeavg < 150 &&
988  m_readBlockSize >= CHUNK /* low_buffers */ &&
989  m_readBlockSize <= KB512)
990  {
991  int old_block_size = m_readBlockSize;
994  if (m_readBlockSize > KB512)
995  {
996  m_readBlockSize = KB512;
997  }
998  LOG(VB_FILE, LOG_INFO, LOC +
999  QString("Avg read interval was %1 msec. "
1000  "%2K -> %3K block size")
1001  .arg(readtimeavg)
1002  .arg(old_block_size/1024)
1003  .arg(m_readBlockSize/1024));
1004  readtimeavg = 225;
1005  }
1006  else if (readtimeavg > 300 && m_readBlockSize > CHUNK)
1007  {
1009  LOG(VB_FILE, LOG_INFO, LOC +
1010  QString("Avg read interval was %1 msec. "
1011  "%2K -> %3K block size")
1012  .arg(readtimeavg)
1013  .arg((m_readBlockSize+CHUNK)/1024)
1014  .arg(m_readBlockSize/1024));
1015  readtimeavg = 225;
1016  }
1017  }
1018  lastread = now;
1019 
1020  m_rbwLock.lockForRead();
1021  if (m_rbwPos + totfree > m_bufferSize)
1022  {
1023  totfree = m_bufferSize - m_rbwPos;
1024  LOG(VB_FILE, LOG_DEBUG, LOC +
1025  "Shrinking read, near end of buffer");
1026  }
1027 
1028  if (m_internalReadPos == 0)
1029  {
1030  totfree = max(m_fillMin, m_readBlockSize);
1031  LOG(VB_FILE, LOG_DEBUG, LOC +
1032  "Reading enough data to start playback");
1033  }
1034 
1035  LOG(VB_FILE, LOG_DEBUG, LOC +
1036  QString("safe_read(...@%1, %2) -- begin")
1037  .arg(m_rbwPos).arg(totfree));
1038 
1039  MythTimer sr_timer;
1040  sr_timer.start();
1041 
1042  int rbwposcopy = m_rbwPos;
1043 
1044  // FileRingBuffer::safe_read(RemoteFile*...) acquires poslock;
1045  // so we need to unlock this here to preserve locking order.
1046  m_rbwLock.unlock();
1047 
1048  read_return = safe_read(m_readAheadBuffer + rbwposcopy, totfree);
1049 
1050  int sr_elapsed = sr_timer.elapsed();
1051  uint64_t bps = !sr_elapsed ? 1000000001 :
1052  (uint64_t)(((double)read_return * 8000.0) /
1053  (double)sr_elapsed);
1054  LOG(VB_FILE, LOG_DEBUG, LOC +
1055  QString("safe_read(...@%1, %2) -> %3, took %4 ms %5 avg %6 ms")
1056  .arg(rbwposcopy).arg(totfree).arg(read_return)
1057  .arg(sr_elapsed)
1058  .arg(QString("(%1Mbps)").arg((double)bps / 1000000.0))
1059  .arg(readtimeavg));
1060  UpdateStorageRate(bps);
1061 
1062  if (read_return >= 0)
1063  {
1064  m_posLock.lockForWrite();
1065  m_rbwLock.lockForWrite();
1066 
1067  if (rbwposcopy == m_rbwPos)
1068  {
1069  m_internalReadPos += read_return;
1070  m_rbwPos = (m_rbwPos + read_return) % m_bufferSize;
1071  LOG(VB_FILE, LOG_DEBUG,
1072  LOC + QString("rbwpos += %1K requested %2K in read")
1073  .arg(read_return/1024,3).arg(totfree/1024,3));
1074  }
1075  m_numFailures = 0;
1076 
1077  m_rbwLock.unlock();
1078  m_posLock.unlock();
1079 
1080  LOG(VB_FILE, LOG_DEBUG, LOC +
1081  QString("total read so far: %1 bytes")
1082  .arg(m_internalReadPos));
1083  }
1084  }
1085  else
1086  {
1087  LOG(VB_FILE, LOG_DEBUG, LOC +
1088  QString("We are not reading anything "
1089  "(totfree: %1 commserror:%2 ateof:%3 "
1090  "setswitchtonext:%4")
1091  .arg(totfree).arg(m_commsError).arg(m_ateof).arg(m_setSwitchToNext));
1092  }
1093 
1094  int used = m_bufferSize - ReadBufFree();
1095 
1096  bool reads_were_allowed = m_readsAllowed;
1097 
1098  ignore_for_read_timing =
1099  (totfree < m_readBlockSize) || (read_return < totfree);
1100 
1101  if ((0 == read_return) || (m_numFailures > 5) ||
1102  (m_readsAllowed != (used >= 1 || m_ateof ||
1104  (m_readsDesired != (used >= m_fillMin || m_ateof ||
1106  {
1107  // If readpos changes while the lock is released
1108  // we should not handle the 0 read_return now.
1109  long long old_readpos = m_readPos;
1110 
1111  m_rwLock.unlock();
1112  m_rwLock.lockForWrite();
1113 
1114  m_commsError |= (m_numFailures > 5);
1115 
1117  m_readsDesired =
1119 
1120  if (0 == read_return && old_readpos == m_readPos)
1121  {
1122  eofreads++;
1123  if (eofreads >= 3 && m_readBlockSize >= KB512)
1124  {
1125  // not reading anything
1128  }
1129 
1130  if (m_liveTVChain)
1131  {
1134  {
1135  // we receive new livetv chain element event
1136  // before we receive file closed for writing event
1137  // so don't need to test if file is closed for writing
1138  m_liveTVChain->SwitchToNext(true);
1139  m_setSwitchToNext = true;
1140  }
1142  {
1143  LOG(VB_FILE, LOG_DEBUG, LOC +
1144  QString("EOF encountered, but %1 still being written to")
1145  .arg(m_filename));
1146  // We reached EOF, but file still open for writing and
1147  // no next program in livetvchain
1148  // wait a little bit (60ms same wait as typical safe_read)
1149  m_generalWait.wait(&m_rwLock, 60);
1150  }
1151  }
1153  {
1154  LOG(VB_FILE, LOG_DEBUG, LOC +
1155  QString("EOF encountered, but %1 still being written to")
1156  .arg(m_filename));
1157  // We reached EOF, but file still open for writing,
1158  // typically active in-progress recording
1159  // wait a little bit (60ms same wait as typical safe_read)
1160  m_generalWait.wait(&m_rwLock, 60);
1161  m_beingWritten = true;
1162  }
1163  else
1164  {
1166  {
1167  LOG(VB_FILE, LOG_DEBUG, LOC +
1168  "Waiting for file to grow large enough to process.");
1169  m_generalWait.wait(&m_rwLock, 300);
1170  }
1171  else
1172  {
1173  LOG(VB_FILE, LOG_DEBUG,
1174  LOC + "setting ateof (read_return == 0)");
1175  m_ateof = true;
1176  }
1177  }
1178  }
1179 
1180  m_rwLock.unlock();
1181  m_rwLock.lockForRead();
1182  used = m_bufferSize - ReadBufFree();
1183  }
1184  else
1185  {
1186  eofreads = 0;
1187  }
1188 
1189  LOG(VB_FILE, LOG_DEBUG, LOC + "@ end of read ahead loop");
1190 
1192  (m_wantToRead <= used && m_wantToRead > 0))
1193  {
1194  // To give other threads a good chance to handle these
1195  // conditions, even if they are only requesting a read lock
1196  // like us, yield (currently implemented with short usleep).
1197  m_generalWait.wakeAll();
1198  m_rwLock.unlock();
1199  std::this_thread::sleep_for(std::chrono::milliseconds(5));
1200  m_rwLock.lockForRead();
1201  }
1202  else
1203  {
1204  // yield if we have nothing to do...
1205  if (!m_requestPause && reads_were_allowed &&
1206  (used >= m_fillThreshold || m_ateof || m_setSwitchToNext))
1207  {
1208  m_generalWait.wait(&m_rwLock, 50);
1209  }
1210  else if (m_readsAllowed)
1211  { // if reads are allowed release the lock and yield so the
1212  // reader gets a chance to read before the buffer is full.
1213  m_generalWait.wakeAll();
1214  m_rwLock.unlock();
1215  std::this_thread::sleep_for(std::chrono::milliseconds(5));
1216  m_rwLock.lockForRead();
1217  }
1218  }
1219  }
1220 
1221  m_rwLock.unlock();
1222 
1223  m_rwLock.lockForWrite();
1224  m_rbrLock.lockForWrite();
1225  m_rbwLock.lockForWrite();
1226 
1227  delete [] m_readAheadBuffer;
1228 
1229  m_readAheadBuffer = nullptr;
1230  m_rbrPos = 0;
1231  m_rbwPos = 0;
1232  m_reallyRunning = false;
1233  m_readsAllowed = false;
1234  m_readsDesired = false;
1235 
1236  m_rbwLock.unlock();
1237  m_rbrLock.unlock();
1238  m_rwLock.unlock();
1239 
1240  LOG(VB_FILE, LOG_INFO, LOC + QString("Exiting readahead thread"));
1241 
1242  RunEpilog();
1243 }
1244 
1246 {
1247  m_rwLock.lockForWrite();
1248  m_posLock.lockForRead();
1250  long long ra = m_readAdjust;
1251  m_posLock.unlock();
1252  m_rwLock.unlock();
1253  return ra;
1254 }
1255 
1256 int RingBuffer::Peek(void *buf, int count)
1257 {
1258  int ret = ReadPriv(buf, count, true);
1259  if (ret != count)
1260  {
1261  LOG(VB_GENERAL, LOG_WARNING, LOC +
1262  QString("Peek() requested %1 bytes, but only returning %2")
1263  .arg(count).arg(ret));
1264  }
1265  return ret;
1266 }
1267 
1269 {
1270  // Wait up to 30000 ms for reads allowed (or readsdesired if post seek/open)
1272  m_recentSeek = false;
1273  int timeout_ms = 30000;
1274  int count = 0;
1275  MythTimer t;
1276  t.start();
1277 
1278  while ((t.elapsed() < timeout_ms) && !check && !m_stopReads &&
1280  {
1281  m_generalWait.wait(&m_rwLock, clamp(timeout_ms - t.elapsed(), 10, 100));
1282  if (!check && t.elapsed() > 1000 && (count % 100) == 0)
1283  {
1284  LOG(VB_GENERAL, LOG_WARNING, LOC +
1285  "Taking too long to be allowed to read..");
1286  }
1287  count++;
1288  }
1289  if (t.elapsed() >= timeout_ms)
1290  {
1291  LOG(VB_GENERAL, LOG_ERR, LOC +
1292  QString("Took more than %1 seconds to be allowed to read, aborting.")
1293  .arg(timeout_ms / 1000));
1294  return false;
1295  }
1296  return check;
1297 }
1298 
1300 {
1301  int avail = ReadBufAvail();
1302  if (avail >= count)
1303  return avail;
1304 
1305  count = (m_ateof && avail < count) ? avail : count;
1306 
1307  if (m_liveTVChain && m_setSwitchToNext && avail < count)
1308  {
1309  return avail;
1310  }
1311 
1312  // Make sure that if the read ahead thread is sleeping and
1313  // it should be reading that we start reading right away.
1314  if ((avail < count) && !m_stopReads &&
1316  {
1317  m_generalWait.wakeAll();
1318  }
1319 
1320  MythTimer t;
1321  t.start();
1322  while ((avail < count) && !m_stopReads &&
1324  {
1325  m_wantToRead = count;
1326  m_generalWait.wait(&m_rwLock, clamp(timeout - t.elapsed(), 10, 250));
1327  avail = ReadBufAvail();
1328  if (m_ateof)
1329  break;
1330  if (m_lowBuffers && avail >= m_fillMin)
1331  break;
1332  if (t.elapsed() > timeout)
1333  break;
1334  }
1335 
1336  m_wantToRead = 0;
1337 
1338  return avail;
1339 }
1340 
1341 int RingBuffer::ReadDirect(void *buf, int count, bool peek)
1342 {
1343  long long old_pos = 0;
1344  if (peek)
1345  {
1346  m_posLock.lockForRead();
1347  old_pos = (m_ignoreReadPos >= 0) ? m_ignoreReadPos : m_readPos;
1348  m_posLock.unlock();
1349  }
1350 
1351  MythTimer timer;
1352  timer.start();
1353  int ret = safe_read(buf, count);
1354  int elapsed = timer.elapsed();
1355  uint64_t bps = !elapsed ? 1000000001 :
1356  (uint64_t)(((float)ret * 8000.0F) / (float)elapsed);
1357  UpdateStorageRate(bps);
1358 
1359  m_posLock.lockForWrite();
1360  if (m_ignoreReadPos >= 0 && ret > 0)
1361  {
1362  if (peek)
1363  {
1364  // seek should always succeed since we were at this position
1365  long long cur_pos = -1;
1366  if (m_remotefile)
1367  cur_pos = m_remotefile->Seek(old_pos, SEEK_SET);
1368  else if (m_fd2 >= 0)
1369  cur_pos = lseek64(m_fd2, old_pos, SEEK_SET);
1370  if (cur_pos < 0)
1371  {
1372  LOG(VB_FILE, LOG_ERR, LOC +
1373  "Seek failed repositioning to previous position");
1374  }
1375  }
1376  else
1377  {
1378  m_ignoreReadPos += ret;
1379  }
1380  m_posLock.unlock();
1381  return ret;
1382  }
1383  m_posLock.unlock();
1384 
1385  if (peek && ret > 0)
1386  {
1387  if ((IsDVD() || IsBD()) && old_pos != 0)
1388  {
1389  LOG(VB_GENERAL, LOG_ERR, LOC +
1390  "DVD and Blu-Ray do not support arbitrary "
1391  "peeks except when read-ahead is enabled."
1392  "\n\t\t\tWill seek to beginning of video.");
1393  old_pos = 0;
1394  }
1395 
1396  long long new_pos = Seek(old_pos, SEEK_SET, true);
1397 
1398  if (new_pos != old_pos)
1399  {
1400  LOG(VB_GENERAL, LOG_ERR, LOC +
1401  QString("Peek() Failed to return from new "
1402  "position %1 to old position %2, now "
1403  "at position %3")
1404  .arg(old_pos - ret).arg(old_pos).arg(new_pos));
1405  }
1406  }
1407 
1408  return ret;
1409 }
1410 
1419 int RingBuffer::ReadPriv(void *buf, int count, bool peek)
1420 {
1421  QString loc_desc = QString("ReadPriv(..%1, %2)")
1422  .arg(count).arg(peek?"peek":"normal");
1423  LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc +
1424  QString(" @%1 -- begin").arg(m_rbrPos));
1425 
1426  m_rwLock.lockForRead();
1427  if (m_writeMode)
1428  {
1429  LOG(VB_GENERAL, LOG_ERR, LOC + loc_desc +
1430  ": Attempt to read from a write only file");
1431  errno = EBADF;
1432  m_rwLock.unlock();
1433  return -1;
1434  }
1435 
1437  {
1438  m_rwLock.unlock();
1439  m_rwLock.lockForWrite();
1440  // we need a write lock so the read-ahead thread
1441  // can't start mucking with the read position.
1442  // If the read ahead thread was started while we
1443  // didn't hold the lock, we proceed with a normal
1444  // read from the buffer, otherwise we read directly.
1445  if (m_requestPause || m_stopReads ||
1447  {
1448  int ret = ReadDirect(buf, count, peek);
1449  LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc +
1450  QString(": ReadDirect checksum %1")
1451  .arg(qChecksum((char*)buf,count)));
1452  m_rwLock.unlock();
1453  return ret;
1454  }
1455  m_rwLock.unlock();
1456  m_rwLock.lockForRead();
1457  }
1458 
1459  if (!WaitForReadsAllowed())
1460  {
1461  LOG(VB_FILE, LOG_NOTICE, LOC + loc_desc + ": !WaitForReadsAllowed()");
1462  m_rwLock.unlock();
1463  m_stopReads = true; // this needs to be outside the lock
1464  m_rwLock.lockForWrite();
1465  m_wantToRead = 0;
1466  m_rwLock.unlock();
1467  return 0;
1468  }
1469 
1470  int avail = ReadBufAvail();
1472 
1473  // Wait up to 10000 ms for any data
1474  int timeout_ms = 10000;
1475  while (!m_readInternalMode && !m_ateof &&
1476  (t.elapsed() < timeout_ms) && m_readAheadRunning &&
1478  {
1479  avail = WaitForAvail(count, min(timeout_ms - t.elapsed(), 100));
1480  if (m_liveTVChain && m_setSwitchToNext && avail < count)
1481  {
1482  LOG(VB_GENERAL, LOG_INFO, LOC +
1483  "Checking to see if there's a new livetv program to switch to..");
1485  break;
1486  }
1487  if (avail > 0)
1488  break;
1489  }
1490  if (t.elapsed() > 6000)
1491  {
1492  LOG(VB_GENERAL, LOG_WARNING, LOC + loc_desc +
1493  QString(" -- waited %1 ms for avail(%2) > count(%3)")
1494  .arg(t.elapsed()).arg(avail).arg(count));
1495  }
1496 
1497  if (m_readInternalMode)
1498  {
1499  LOG(VB_FILE, LOG_DEBUG, LOC +
1500  QString("ReadPriv: %1 bytes available, %2 left")
1501  .arg(avail).arg(avail-m_readOffset));
1502  }
1503  count = min(avail - m_readOffset, count);
1504 
1505  if ((count <= 0) && (m_ateof || m_readInternalMode))
1506  {
1507  // If we're at the end of file return 0 bytes
1508  m_rwLock.unlock();
1509  return count;
1510  }
1511  if (count <= 0)
1512  {
1513  // If we're not at the end of file but have no data
1514  // at this point time out and shutdown read ahead.
1515  LOG(VB_GENERAL, LOG_ERR, LOC + loc_desc +
1516  QString(" -- timed out waiting for data (%1 ms)")
1517  .arg(t.elapsed()));
1518 
1519  m_rwLock.unlock();
1520  m_stopReads = true; // this needs to be outside the lock
1521  m_rwLock.lockForWrite();
1522  m_ateof = true;
1523  m_wantToRead = 0;
1524  m_generalWait.wakeAll();
1525  m_rwLock.unlock();
1526  return count;
1527  }
1528 
1529  if (peek || m_readInternalMode)
1530  m_rbrLock.lockForRead();
1531  else
1532  m_rbrLock.lockForWrite();
1533 
1534  LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc + " -- copying data");
1535 
1536  int rpos;
1537  if (m_rbrPos + m_readOffset > (int) m_bufferSize)
1538  {
1539  rpos = (m_rbrPos + m_readOffset) - m_bufferSize;
1540  }
1541  else
1542  {
1543  rpos = m_rbrPos + m_readOffset;
1544  }
1545  if (rpos + count > (int) m_bufferSize)
1546  {
1547  int firstsize = m_bufferSize - rpos;
1548  int secondsize = count - firstsize;
1549 
1550  memcpy(buf, m_readAheadBuffer + rpos, firstsize);
1551  memcpy((char *)buf + firstsize, m_readAheadBuffer, secondsize);
1552  }
1553  else
1554  {
1555  memcpy(buf, m_readAheadBuffer + rpos, count);
1556  }
1557  LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc + QString(" -- checksum %1")
1558  .arg(qChecksum((char*)buf,count)));
1559 
1560  if (!peek)
1561  {
1562  if (m_readInternalMode)
1563  {
1564  m_readOffset += count;
1565  }
1566  else
1567  {
1568  m_rbrPos = (m_rbrPos + count) % m_bufferSize;
1569  m_generalWait.wakeAll();
1570  }
1571  }
1572  m_rbrLock.unlock();
1573  m_rwLock.unlock();
1574 
1575  return count;
1576 }
1577 
1586 int RingBuffer::Read(void *buf, int count)
1587 {
1588  int ret = ReadPriv(buf, count, false);
1589  if (ret > 0)
1590  {
1591  m_posLock.lockForWrite();
1592  m_readPos += ret;
1593  m_posLock.unlock();
1594  UpdateDecoderRate(ret);
1595  }
1596 
1597  return ret;
1598 }
1599 
1600 QString RingBuffer::BitrateToString(uint64_t rate, bool hz)
1601 {
1602  QString msg;
1603  float bitrate;
1604  int range = 0;
1605  if (rate < 1)
1606  {
1607  return "-";
1608  }
1609  if (rate > 1000000000)
1610  {
1611  return QObject::tr(">1Gbps");
1612  }
1613  if (rate >= 1000000)
1614  {
1615  msg = hz ? QObject::tr("%1MHz") : QObject::tr("%1Mbps");
1616  bitrate = (float)rate / (1000000.0F);
1617  range = hz ? 3 : 1;
1618  }
1619  else if (rate >= 1000)
1620  {
1621  msg = hz ? QObject::tr("%1kHz") : QObject::tr("%1kbps");
1622  bitrate = (float)rate / 1000.0F;
1623  range = hz ? 1 : 0;
1624  }
1625  else
1626  {
1627  msg = hz ? QObject::tr("%1Hz") : QObject::tr("%1bps");
1628  bitrate = (float)rate;
1629  }
1630  return msg.arg(bitrate, 0, 'f', range);
1631 }
1632 
1634 {
1636 }
1637 
1639 {
1641 }
1642 
1644 {
1646  return "N/A";
1647 
1648  int avail = (m_rbwPos >= m_rbrPos) ? m_rbwPos - m_rbrPos
1650  return QString("%1%").arg(lroundf((float)avail / (float)m_bufferSize * 100.0F));
1651 }
1652 
1653 uint64_t RingBuffer::UpdateDecoderRate(uint64_t latest)
1654 {
1656  return 0;
1657 
1658  // TODO use QDateTime once we've moved to Qt 4.7
1659  static QTime s_midnight = QTime(0, 0, 0);
1660  QTime now = QTime::currentTime();
1661  qint64 age = s_midnight.msecsTo(now);
1662  qint64 oldest = age - 1000;
1663 
1664  m_decoderReadLock.lock();
1665  if (latest)
1666  m_decoderReads.insert(age, latest);
1667 
1668  uint64_t total = 0;
1669  QMutableMapIterator<qint64,uint64_t> it(m_decoderReads);
1670  while (it.hasNext())
1671  {
1672  it.next();
1673  if (it.key() < oldest || it.key() > age)
1674  it.remove();
1675  else
1676  total += it.value();
1677  }
1678 
1679  auto average = (uint64_t)((double)total * 8.0);
1680  m_decoderReadLock.unlock();
1681 
1682  LOG(VB_FILE, LOG_INFO, LOC + QString("Decoder read speed: %1 %2")
1683  .arg(average).arg(m_decoderReads.size()));
1684  return average;
1685 }
1686 
1687 uint64_t RingBuffer::UpdateStorageRate(uint64_t latest)
1688 {
1690  return 0;
1691 
1692  // TODO use QDateTime once we've moved to Qt 4.7
1693  static QTime s_midnight = QTime(0, 0, 0);
1694  QTime now = QTime::currentTime();
1695  qint64 age = s_midnight.msecsTo(now);
1696  qint64 oldest = age - 1000;
1697 
1698  m_storageReadLock.lock();
1699  if (latest)
1700  m_storageReads.insert(age, latest);
1701 
1702  uint64_t total = 0;
1703  QMutableMapIterator<qint64,uint64_t> it(m_storageReads);
1704  while (it.hasNext())
1705  {
1706  it.next();
1707  if (it.key() < oldest || it.key() > age)
1708  it.remove();
1709  else
1710  total += it.value();
1711  }
1712 
1713  int size = m_storageReads.size();
1714  m_storageReadLock.unlock();
1715 
1716  uint64_t average = size ? (uint64_t)(((double)total) / (double)size) : 0;
1717 
1718  LOG(VB_FILE, LOG_INFO, LOC + QString("Average storage read speed: %1 %2")
1719  .arg(average).arg(m_storageReads.size()));
1720  return average;
1721 }
1722 
1727 int RingBuffer::Write(const void *buf, uint count)
1728 {
1729  m_rwLock.lockForRead();
1730 
1731  if (!m_writeMode)
1732  {
1733  LOG(VB_GENERAL, LOG_ERR, LOC + "Tried to write to a read only file.");
1734  m_rwLock.unlock();
1735  return -1;
1736  }
1737 
1738  if (!m_tfw && !m_remotefile)
1739  {
1740  m_rwLock.unlock();
1741  return -1;
1742  }
1743 
1744  int ret = -1;
1745  if (m_tfw)
1746  ret = m_tfw->Write(buf, count);
1747  else
1748  ret = m_remotefile->Write(buf, count);
1749 
1750  if (ret > 0)
1751  {
1752  m_posLock.lockForWrite();
1753  m_writePos += ret;
1754  m_posLock.unlock();
1755  }
1756 
1757  m_rwLock.unlock();
1758 
1759  return ret;
1760 }
1761 
1766 {
1767  m_rwLock.lockForRead();
1768  if (m_tfw)
1769  m_tfw->Sync();
1770  m_rwLock.unlock();
1771 }
1772 
1775 long long RingBuffer::WriterSeek(long long pos, int whence, bool has_lock)
1776 {
1777  long long ret = -1;
1778 
1779  if (!has_lock)
1780  m_rwLock.lockForRead();
1781 
1782  m_posLock.lockForWrite();
1783 
1784  if (m_tfw)
1785  {
1786  ret = m_tfw->Seek(pos, whence);
1787  m_writePos = ret;
1788  }
1789 
1790  m_posLock.unlock();
1791 
1792  if (!has_lock)
1793  m_rwLock.unlock();
1794 
1795  return ret;
1796 }
1797 
1802 {
1803  m_rwLock.lockForRead();
1804  if (m_tfw)
1805  m_tfw->Flush();
1806  m_rwLock.unlock();
1807 }
1808 
1813 {
1814  m_rwLock.lockForRead();
1815  if (m_tfw)
1816  m_tfw->SetWriteBufferMinWriteSize(newMinSize);
1817  m_rwLock.unlock();
1818 }
1819 
1824 {
1825  QReadLocker lock(&m_rwLock);
1826 
1827  if (m_tfw)
1828  return m_tfw->SetBlocking(block);
1829  return false;
1830 }
1831 
1847 void RingBuffer::SetOldFile(bool is_old)
1848 {
1849  LOG(VB_FILE, LOG_INFO, LOC + QString("SetOldFile(%1)").arg(is_old));
1850  m_rwLock.lockForWrite();
1851  m_oldfile = is_old;
1852  m_rwLock.unlock();
1853 }
1854 
1856 QString RingBuffer::GetFilename(void) const
1857 {
1858  m_rwLock.lockForRead();
1859  QString tmp = m_filename;
1860  m_rwLock.unlock();
1861  return tmp;
1862 }
1863 
1865 {
1866  m_rwLock.lockForRead();
1867  QString tmp = m_subtitleFilename;
1868  m_rwLock.unlock();
1869  return tmp;
1870 }
1871 
1872 QString RingBuffer::GetLastError(void) const
1873 {
1874  m_rwLock.lockForRead();
1875  QString tmp = m_lastError;
1876  m_rwLock.unlock();
1877  return tmp;
1878 }
1879 
1883 long long RingBuffer::GetWritePosition(void) const
1884 {
1885  m_posLock.lockForRead();
1886  long long ret = m_writePos;
1887  m_posLock.unlock();
1888  return ret;
1889 }
1890 
1895 bool RingBuffer::LiveMode(void) const
1896 {
1897  m_rwLock.lockForRead();
1898  bool ret = (m_liveTVChain);
1899  m_rwLock.unlock();
1900  return ret;
1901 }
1902 
1908 {
1909  m_rwLock.lockForWrite();
1910  m_liveTVChain = chain;
1911  m_rwLock.unlock();
1912 }
1913 
1915 void RingBuffer::IgnoreLiveEOF(bool ignore)
1916 {
1917  m_rwLock.lockForWrite();
1918  m_ignoreLiveEOF = ignore;
1919  m_rwLock.unlock();
1920 }
1921 
1922 const DVDRingBuffer *RingBuffer::DVD(void) const
1923 {
1924  return dynamic_cast<const DVDRingBuffer*>(this);
1925 }
1926 
1927 const BDRingBuffer *RingBuffer::BD(void) const
1928 {
1929  return dynamic_cast<const BDRingBuffer*>(this);
1930 }
1931 
1933 {
1934  return dynamic_cast<DVDRingBuffer*>(this);
1935 }
1936 
1938 {
1939  return dynamic_cast<BDRingBuffer*>(this);
1940 }
1941 
1943 {
1944  QMutexLocker lock(avcodeclock);
1945 
1947  {
1948  avformat_network_init();
1950  }
1951 }
1952 
1953 /* vim: set expandtab tabstop=4 shiftwidth=4: */
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:215
static const int kDefaultOpenTimeout
void start(QThread::Priority=QThread::InheritPriority)
Tell MThread to start running the thread in the near future.
Definition: mthread.cpp:294
def write(text, progress=True)
Definition: mythburn.py:279
This is a wrapper around QThread that does several additional things.
Definition: mthread.h:46
void Pause(void)
Pauses the read-ahead thread.
Definition: ringbuffer.cpp:733
LiveTVChain * m_liveTVChain
A QElapsedTimer based timer to replace use of QTime as a timer.
Definition: mythtimer.h:13
const DVDRingBuffer * DVD(void) const
int GetReadBufAvail() const
Returns number of bytes available for reading from buffer.
Definition: ringbuffer.cpp:490
static QString BitrateToString(uint64_t rate, bool hz=false)
bool LiveMode(void) const
Returns true if this RingBuffer has been assigned a LiveTVChain.
void WriterFlush(void)
Calls ThreadedFileWriter::Flush(void)
#define BUFFER_FACTOR_BITRATE
QString GetLastError(void) const
#define BUFFER_SIZE_MINIMUM
static bool TestForHTTPLiveStreaming(const QString &filename)
bool WriterSetBlocking(bool lock=true)
Calls ThreadedFileWriter::SetBlocking(bool)
bool wait(unsigned long time=ULONG_MAX)
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:311
uint64_t UpdateStorageRate(uint64_t latest=0)
void SetLiveMode(LiveTVChain *chain)
Assigns a LiveTVChain to this RingBuffer.
void StartReads(void)
????
Definition: ringbuffer.cpp:722
uint64_t UpdateDecoderRate(uint64_t latest=0)
QReadWriteLock m_rwLock
QString GetFilename(void) const
Returns name of file used by this RingBuffer.
#define LOC
Definition: ringbuffer.cpp:42
MythCoreContext * gCoreContext
This global variable contains the MythCoreContext instance for the app.
static RingBuffer * Create(const QString &xfilename, bool write, bool usereadahead=true, int timeout_ms=kDefaultOpenTimeout, bool stream_only=false)
Creates a RingBuffer instance.
Definition: ringbuffer.cpp:104
virtual long long SeekInternal(long long pos, int whence)=0
bool HasNext(void) const
long long Seek(long long pos, int whence)
Seek to a position within stream; May be unsafe.
QString GetDecoderRate(void)
QString GetStorageRate(void)
static guint32 * tmp
Definition: goom_core.c:35
void CalcReadAheadThresh(void)
Calculates m_fillMin, m_fillThreshold, and m_readBlockSize from the estimated effective bitrate of th...
Definition: ringbuffer.cpp:367
long long SetAdjustFilesize(void)
void SetBufferSizeFactors(bool estbitrate, bool matroska)
Tells RingBuffer that the raw bitrate may be innacurate and the underlying container is matroska,...
Definition: ringbuffer.cpp:351
int ReadPriv(void *buf, int count, bool peek)
When possible reads from the read-ahead buffer, otherwise reads directly from the device.
#define BUFFER_FACTOR_NETWORK
bool IsDVD(void) const
#define BUFFER_FACTOR_MATROSKA
int Write(const void *data, uint count)
Writes data to the end of the write buffer.
long long WriterSeek(long long pos, int whence, bool has_lock=false)
Calls ThreadedFileWriter::Seek(long long,int).
void ReloadAll(const QStringList &data=QStringList())
QMap< qint64, uint64_t > m_decoderReads
bool PauseAndWait(void)
Definition: ringbuffer.cpp:789
static bool gAVformat_net_initialised
QReadWriteLock m_rbwLock
QString GetAvailableBuffer(void)
QReadWriteLock m_rbrLock
static void AVFormatInitNetwork(void)
void Sync(void)
Calls ThreadedFileWriter::Sync(void)
void KillReadAheadThread(void)
Stops the read-ahead thread, and waits for it to stop.
Definition: ringbuffer.cpp:694
QReadWriteLock m_posLock
int ReadDirect(void *buf, int count, bool peek)
void IgnoreLiveEOF(bool ignore)
Tells RingBuffer whether to ignore the end-of-file.
virtual ~RingBuffer()=0
Deletes.
Definition: ringbuffer.cpp:246
void Flush(void)
Allow DiskLoop() to flush buffer completely ignoring low watermark.
bool isRunning(void) const
Definition: mthread.cpp:274
void SetOldFile(bool is_old)
Tell RingBuffer if this is an old file or not.
static const int kLiveTVOpenTimeout
void run(void) override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.
Definition: ringbuffer.cpp:880
virtual bool IsOpen(void) const =0
Returns true if open for either reading or writing.
void SetWriteBufferMinWriteSize(uint newMinSize=kMinWriteSize)
Sets the minumum number of bytes to write to disk in a single write.
unsigned int uint
Definition: compat.h:140
int ReadBufFree(void) const
Returns number of bytes available for reading into buffer.
Definition: ringbuffer.cpp:479
bool IsBD(void) const
int Peek(void *buf, int count)
float clamp(float val, float minimum, float maximum)
Definition: mythmiscutil.h:41
virtual int safe_read(void *data, uint sz)=0
volatile bool m_stopReads
void Start(void)
Starts the read-ahead thread.
Definition: ringbuffer.cpp:653
ThreadedFileWriter * m_tfw
RingBufferType m_type
void WaitForPause(void)
Waits for Pause(void) to take effect.
Definition: ringbuffer.cpp:770
void StopReads(void)
????
Definition: ringbuffer.cpp:711
static bool Exists(const QString &url, struct stat *fileinfo)
Definition: remotefile.cpp:461
long long GetWritePosition(void) const
Returns how far into a ThreadedFileWriter file we have written.
#define LOG(_MASK_, _LEVEL_, _STRING_)
Definition: mythlogging.h:41
static QStringList s_subExt
void Unpause(void)
Unpauses the read-ahead thread.
Definition: ringbuffer.cpp:747
#define assert(x)
QWaitCondition m_generalWait
Condition to signal that the read ahead thread is running.
QMutex * avcodeclock
This global variable is used to makes certain calls to avlib threadsafe.
bool WaitForReadsAllowed(void)
QString GetSubtitleFilename(void) const
int elapsed(void)
Returns milliseconds elapsed since last start() or restart()
Definition: mythtimer.cpp:90
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:202
long long GetRealFileSize(void) const
Returns the size of the file we are reading/writing, or -1 if the query fails.
Definition: ringbuffer.cpp:497
static QMutex s_subExtLock
RemoteFile * m_remotefile
void Reset(bool full=false, bool toAdjust=false, bool resetInternal=false)
Resets the read-ahead thread and our position in the file.
Definition: ringbuffer.cpp:265
int Read(void *buf, int count)
This is the public method for reading from a file, it calls the appropriate read method if the file i...
void Sync(void)
Flush data written to the file descriptor to disk.
bool SetReadInternalMode(bool mode)
Definition: ringbuffer.cpp:558
bool IsNearEnd(double fps, uint vvf) const
Definition: ringbuffer.cpp:437
static ImageType inspectImage(const QString &path)
Definition: mythcdrom.cpp:179
int Write(const void *buf, uint count)
Writes buffer to ThreadedFileWriter::Write(const void*,uint)
void SetWriteBufferMinWriteSize(int newMinSize)
Calls ThreadedFileWriter::SetWriteBufferMinWriteSize(int)
void ResetReadAhead(long long newinternal)
Restart the read-ahead thread at the 'newinternal' position.
Definition: ringbuffer.cpp:610
QMap< qint64, uint64_t > m_storageReads
Implements a file/stream reader/writer.
long long Seek(long long pos, int whence, long long curpos=-1)
Definition: remotefile.cpp:759
static QStringList s_subExtNoCheck
bool IsRegisteredFileForWrite(const QString &file)
void SwitchToNext(bool up)
Sets the recording to switch to.
void UpdateRawBitrate(uint raw_bitrate)
Set the raw bit rate, to allow RingBuffer adjust effective bitrate.
Definition: ringbuffer.cpp:307
long long Seek(long long pos, int whence, bool has_lock=false)
Seeks to a particular position in the file.
Definition: ringbuffer.cpp:510
const BDRingBuffer * BD(void) const
int Write(const void *data, int size)
Definition: remotefile.cpp:834
void UpdatePlaySpeed(float play_speed)
Set the play speed, to allow RingBuffer adjust effective bitrate.
Definition: ringbuffer.cpp:338
void start(void)
starts measuring elapsed time.
Definition: mythtimer.cpp:47
Keeps track of recordings in a current LiveTV instance.
Definition: livetvchain.h:31
#define CHUNK
virtual long long GetRealFileSizeInternal(void) const
int WaitForAvail(int count, int timeout)
bool SetBlocking(bool block=true)
Set write blocking mode While in blocking mode, ThreadedFileWriter::Write will wait for buffers to be...
Stream content from a DVD image file.
Definition: dvdstream.h:20
void CreateReadAheadBuffer(void)
Definition: ringbuffer.cpp:832
bool isPaused(void) const
Returns false iff read-ahead is not running and read-ahead is not paused.
Definition: ringbuffer.cpp:759
RingBuffer(RingBufferType rbtype)
Definition: ringbuffer.cpp:212
int ReadBufAvail(void) const
Returns number of bytes available for reading from buffer.
Definition: ringbuffer.cpp:589
volatile bool m_recentSeek