MythTV  master
ringbuffer.cpp
Go to the documentation of this file.
1 // ANSI C headers
2 #include <cmath>
3 #include <cstdio>
4 #include <cstdlib>
5 #include <cerrno>
6 #include <chrono> // for milliseconds
7 #include <thread> // for sleep_for
8 
9 // POSIX C headers
10 #include <sys/types.h>
11 #include <sys/time.h>
12 #include <unistd.h>
13 #include <fcntl.h>
14 
15 // Qt headers
16 #include <QFile>
17 #include <QDateTime>
18 #include <QReadLocker>
19 
20 #include "threadedfilewriter.h"
21 #include "fileringbuffer.h"
22 #include "streamingringbuffer.h"
23 #include "mythmiscutil.h"
24 #include "dvdstream.h"
25 #include "livetvchain.h"
26 #include "mythcontext.h"
27 #include "ringbuffer.h"
28 #include "mythconfig.h"
29 #include "remotefile.h"
30 #include "compat.h"
31 #include "mythdate.h"
32 #include "mythtimer.h"
33 #include "mythlogging.h"
34 #include "DVD/dvdringbuffer.h"
35 #include "Bluray/bdringbuffer.h"
37 #include "mythcdrom.h"
38 
39 const int RingBuffer::kDefaultOpenTimeout = 2000; // ms
40 const int RingBuffer::kLiveTVOpenTimeout = 10000;
41 
42 #define LOC QString("RingBuf(%1): ").arg(m_filename)
43 
45 QStringList RingBuffer::s_subExt;
46 QStringList RingBuffer::s_subExtNoCheck;
47 
48 extern "C" {
49 #include "libavformat/avformat.h"
50 }
52 
53 /*
54  Locking relations:
55  rwlock->poslock->rbrlock->rbwlock
56 
57  A child should never lock any of the parents without locking
58  the parent lock before the child lock.
59  void RingBuffer::Example1()
60  {
61  poslock.lockForWrite();
62  rwlock.lockForRead(); // error!
63  blah(); // <- does not implicitly acquire any locks
64  rwlock.unlock();
65  poslock.unlock();
66  }
67  void RingBuffer::Example2()
68  {
69  rwlock.lockForRead();
70  rbrlock.lockForWrite(); // ok!
71  blah(); // <- does not implicitly acquire any locks
72  rbrlock.unlock();
73  rwlock.unlock();
74  }
75 */
76 
105  const QString &xfilename, bool write,
106  bool usereadahead, int timeout_ms, bool stream_only)
107 {
108  QString lfilename = xfilename;
109  QString lower = lfilename.toLower();
110 
111  if (write)
112  return new FileRingBuffer(lfilename, write, usereadahead, timeout_ms);
113 
114  bool dvddir = false;
115  bool bddir = false;
116  bool httpurl = lower.startsWith("http://") || lower.startsWith("https://");
117  bool iptvurl =
118  lower.startsWith("rtp://") || lower.startsWith("tcp://") ||
119  lower.startsWith("udp://");
120  bool mythurl = lower.startsWith("myth://");
121  bool bdurl = lower.startsWith("bd:");
122  bool dvdurl = lower.startsWith("dvd:");
123  bool imgext = lower.endsWith(".img") || lower.endsWith(".iso");
124 
125  if (imgext)
126  {
127  switch (MythCDROM::inspectImage(lfilename))
128  {
129  case MythCDROM::kBluray:
130  bdurl = true;
131  break;
132 
133  case MythCDROM::kDVD:
134  dvdurl = true;
135  break;
136 
137  default:
138  break;
139  }
140  }
141 
142  if (httpurl || iptvurl)
143  {
144  if (!iptvurl && HLSRingBuffer::TestForHTTPLiveStreaming(lfilename))
145  {
146  return new HLSRingBuffer(lfilename);
147  }
148  return new StreamingRingBuffer(lfilename);
149  }
150  if (!stream_only && mythurl)
151  {
152  struct stat fileInfo;
153  if ((RemoteFile::Exists(lfilename, &fileInfo)) &&
154  (S_ISDIR(fileInfo.st_mode)))
155  {
156  if (RemoteFile::Exists(lfilename + "/VIDEO_TS"))
157  dvddir = true;
158  else if (RemoteFile::Exists(lfilename + "/BDMV"))
159  bddir = true;
160  }
161  }
162  else if (!stream_only && !mythurl)
163  {
164  if (QFile::exists(lfilename + "/VIDEO_TS"))
165  dvddir = true;
166  else if (QFile::exists(lfilename + "/BDMV"))
167  bddir = true;
168  }
169 
170  if (!stream_only && (dvdurl || dvddir))
171  {
172  if (lfilename.startsWith("dvd:")) // URI "dvd:" + path
173  lfilename.remove(0,4); // e.g. "dvd:/dev/dvd"
174 
175  if (!(mythurl || QFile::exists(lfilename)))
176  lfilename = "/dev/dvd";
177  LOG(VB_PLAYBACK, LOG_INFO, "Trying DVD at " + lfilename);
178 
179  return new DVDRingBuffer(lfilename);
180  }
181  if (!stream_only && (bdurl || bddir))
182  {
183  if (lfilename.startsWith("bd:")) // URI "bd:" + path
184  lfilename.remove(0,3); // e.g. "bd:/videos/ET"
185 
186  if (!(mythurl || QFile::exists(lfilename)))
187  lfilename = "/dev/dvd";
188  LOG(VB_PLAYBACK, LOG_INFO, "Trying BD at " + lfilename);
189 
190  return new BDRingBuffer(lfilename);
191  }
192 
193  if (!mythurl && imgext && lfilename.startsWith("dvd:"))
194  {
195  LOG(VB_PLAYBACK, LOG_INFO, "DVD image at " + lfilename);
196  return new DVDStream(lfilename);
197  }
198  if (!mythurl && lower.endsWith(".vob") && lfilename.contains("/VIDEO_TS/"))
199  {
200  LOG(VB_PLAYBACK, LOG_INFO, "DVD VOB at " + lfilename);
201  DVDStream *s = new DVDStream(lfilename);
202  if (s && s->IsOpen())
203  return s;
204 
205  delete s;
206  }
207 
208  return new FileRingBuffer(
209  lfilename, write, usereadahead, timeout_ms);
210 }
211 
213  MThread("RingBuffer"),
214  m_type(rbtype)
215 {
216  {
217  QMutexLocker locker(&s_subExtLock);
218  if (s_subExt.empty())
219  {
220  // Possible subtitle file extensions '.srt', '.sub', '.txt'
221  // since #9294 also .ass and .ssa
222  s_subExt += ".ass";
223  s_subExt += ".srt";
224  s_subExt += ".ssa";
225  s_subExt += ".sub";
226  s_subExt += ".txt";
227 
228  // Extensions for which a subtitle file should not exist
230  s_subExtNoCheck += ".gif";
231  s_subExtNoCheck += ".png";
232  }
233  }
234 }
235 
236 #undef NDEBUG
237 #include <cassert>
238 
247 {
248  assert(!isRunning());
249  wait();
250 
251  delete [] m_readAheadBuffer;
252  m_readAheadBuffer = nullptr;
253 
254  if (m_tfw)
255  {
256  m_tfw->Flush();
257  delete m_tfw;
258  m_tfw = nullptr;
259  }
260 }
261 
265 void RingBuffer::Reset(bool full, bool toAdjust, bool resetInternal)
266 {
267  LOG(VB_FILE, LOG_INFO, LOC + QString("Reset(%1,%2,%3)")
268  .arg(full).arg(toAdjust).arg(resetInternal));
269 
270  m_rwLock.lockForWrite();
271  m_posLock.lockForWrite();
272 
273  m_numFailures = 0;
274  m_commsError = false;
275  m_setSwitchToNext = false;
276 
277  m_writePos = 0;
278  m_readPos = (toAdjust) ? (m_readPos - m_readAdjust) : 0;
279 
280  if (m_readPos != 0)
281  {
282  LOG(VB_GENERAL, LOG_ERR, LOC +
283  QString("RingBuffer::Reset() nonzero readpos. toAdjust: %1 "
284  "readpos: %2 readAdjust: %3")
285  .arg(toAdjust).arg(m_readPos).arg(m_readAdjust));
286  }
287 
288  m_readAdjust = 0;
289  m_readPos = (m_readPos < 0) ? 0 : m_readPos;
290 
291  if (full)
293 
294  if (resetInternal)
296 
297  m_generalWait.wakeAll();
298  m_posLock.unlock();
299  m_rwLock.unlock();
300 }
301 
308 {
309  LOG(VB_FILE, LOG_INFO, LOC +
310  QString("UpdateRawBitrate(%1Kb)").arg(raw_bitrate));
311 
312  // an audio only stream could be as low as 64Kb (DVB radio) and
313  // an MHEG only stream is likely to be reported as 0Kb
314  if (raw_bitrate < 64)
315  {
316  LOG(VB_FILE, LOG_INFO, LOC +
317  QString("Bitrate too low - setting to 64Kb"));
318  raw_bitrate = 64;
319  }
320  else if (raw_bitrate > 100000)
321  {
322  LOG(VB_FILE, LOG_INFO, LOC +
323  QString("Bitrate too high - setting to 100Mb"));
324  raw_bitrate = 100000;
325  }
326 
327  m_rwLock.lockForWrite();
328  m_rawBitrate = raw_bitrate;
330  m_bitrateInitialized = true;
331  m_rwLock.unlock();
332 }
333 
338 void RingBuffer::UpdatePlaySpeed(float play_speed)
339 {
340  m_rwLock.lockForWrite();
341  m_playSpeed = play_speed;
343  m_rwLock.unlock();
344 }
345 
351 void RingBuffer::SetBufferSizeFactors(bool estbitrate, bool matroska)
352 {
353  m_rwLock.lockForWrite();
354  m_unknownBitrate = estbitrate;
355  m_fileIsMatroska = matroska;
356  m_rwLock.unlock();
358 }
359 
368 {
369  uint estbitrate = 0;
370 
371  m_readsAllowed = false;
372  m_readsDesired = false;
373 
374  // loop without sleeping if the buffered data is less than this
375  m_fillThreshold = 7 * m_bufferSize / 8;
376 
377  const uint KB2 = 2*1024;
378  const uint KB4 = 4*1024;
379  const uint KB8 = 8*1024;
380  const uint KB16 = 16*1024;
381  const uint KB32 = 32*1024;
382  const uint KB64 = 64*1024;
383  const uint KB128 = 128*1024;
384  const uint KB256 = 256*1024;
385  const uint KB512 = 512*1024;
386 
387  estbitrate = (uint) max(abs(m_rawBitrate * m_playSpeed),
388  0.5F * m_rawBitrate);
389  estbitrate = min(m_rawBitrate * 3, estbitrate);
390  int const rbs = (estbitrate > 18000) ? KB512 :
391  (estbitrate > 9000) ? KB256 :
392  (estbitrate > 5000) ? KB128 :
393  (estbitrate > 2500) ? KB64 :
394  (estbitrate > 1250) ? KB32 :
395  (estbitrate >= 500) ? KB16 :
396  (estbitrate > 250) ? KB8 :
397  (estbitrate > 125) ? KB4 : KB2;
398  if (rbs < CHUNK)
399  {
400  m_readBlockSize = rbs;
401  }
402  else
403  {
405  }
406 
407  // minimum seconds of buffering before allowing read
408  float secs_min = 0.3;
409  // set the minimum buffering before allowing ffmpeg read
410  m_fillMin = (uint) ((estbitrate * 1000 * secs_min) * 0.125F);
411  // make this a multiple of ffmpeg block size..
412  if (m_fillMin >= CHUNK || rbs >= CHUNK)
413  {
414  if (m_lowBuffers)
415  {
416  LOG(VB_GENERAL, LOG_INFO, LOC +
417  "Buffering optimisations disabled.");
418  }
419  m_lowBuffers = false;
420  m_fillMin = ((m_fillMin / CHUNK) + 1) * CHUNK;
421  m_fillMin = min((uint)m_fillMin, m_bufferSize / 2);
422  }
423  else
424  {
425  m_lowBuffers = true;
426  LOG(VB_GENERAL, LOG_WARNING, "Enabling buffering optimisations "
427  "for low bitrate stream.");
428  }
429 
430  LOG(VB_FILE, LOG_INFO, LOC +
431  QString("CalcReadAheadThresh(%1 Kb)\n\t\t\t -> "
432  "threshold(%2 KB) min read(%3 KB) blk size(%4 KB)")
433  .arg(estbitrate).arg(m_fillThreshold/1024)
434  .arg(m_fillMin/1024).arg(m_readBlockSize/1024));
435 }
436 
437 bool RingBuffer::IsNearEnd(double /*fps*/, uint vvf) const
438 {
439  QReadLocker lock(&m_rwLock);
440 
441  if (!m_ateof && !m_setSwitchToNext)
442  {
443  // file is still being read, so can't be finished
444  return false;
445  }
446 
447  m_posLock.lockForRead();
448  long long rp = m_readPos;
449  long long sz = m_internalReadPos - m_readPos;
450  m_posLock.unlock();
451 
452  // telecom kilobytes (i.e. 1000 per k not 1024)
453  uint tmp = (uint) max(abs(m_rawBitrate * m_playSpeed), 0.5F * m_rawBitrate);
454  uint kbits_per_sec = min(m_rawBitrate * 3, tmp);
455  if (kbits_per_sec == 0)
456  return false;
457 
458  double readahead_time = sz / (kbits_per_sec * (1000.0/8.0));
459 
460  bool near_end = readahead_time <= 1.5;
461  LOG(VB_PLAYBACK, LOG_INFO, LOC + "IsReallyNearEnd()" +
462  QString(" br(%1KB)").arg(kbits_per_sec/8) +
463  QString(" sz(%1KB)").arg(sz / 1000LL) +
464  QString(" vfl(%1)").arg(vvf) +
465  QString(" time(%1)").arg(readahead_time) +
466  QString(" rawbitrate(%1)").arg(m_rawBitrate) +
467  QString(" avail(%1)").arg(sz) +
468  QString(" internal_size(%1)").arg(m_internalReadPos) +
469  QString(" readposition(%1)").arg(rp) +
470  QString(" stopreads(%1)").arg(m_stopReads) +
471  QString(" paused(%1)").arg(m_paused) +
472  QString(" ne:%1").arg(near_end));
473 
474  return near_end;
475 }
476 
479 int RingBuffer::ReadBufFree(void) const
480 {
481  m_rbrLock.lockForRead();
482  m_rbwLock.lockForRead();
483  int ret = ((m_rbwPos >= m_rbrPos) ? m_rbrPos + m_bufferSize : m_rbrPos) - m_rbwPos - 1;
484  m_rbwLock.unlock();
485  m_rbrLock.unlock();
486  return ret;
487 }
488 
491 {
492  QReadLocker lock(&m_rwLock);
493 
494  return ReadBufAvail();
495 }
496 
497 long long RingBuffer::GetRealFileSize(void) const
498 {
499  {
500  QReadLocker lock(&m_rwLock);
501  if (m_readInternalMode)
502  {
503  return ReadBufAvail();
504  }
505  }
506 
507  return GetRealFileSizeInternal();
508 }
509 
510 long long RingBuffer::Seek(long long pos, int whence, bool has_lock)
511 {
512  LOG(VB_FILE, LOG_INFO, LOC + QString("Seek(%1,%2,%3)")
513  .arg(pos).arg((SEEK_SET==whence)?"SEEK_SET":
514  ((SEEK_CUR==whence)?"SEEK_CUR":"SEEK_END"))
515  .arg(has_lock?"locked":"unlocked"));
516 
517  if (!has_lock)
518  {
519  m_rwLock.lockForWrite();
520  }
521 
522  long long ret;
523 
524  if (m_readInternalMode)
525  {
526  m_posLock.lockForWrite();
527  // only valid for SEEK_SET & SEEK_CUR
528  switch (whence)
529  {
530  case SEEK_SET:
531  m_readPos = pos;
532  break;
533  case SEEK_CUR:
534  m_readPos += pos;
535  break;
536  case SEEK_END:
537  m_readPos = ReadBufAvail() - pos;
538  break;
539  }
541  m_posLock.unlock();
542  ret = m_readPos;
543  }
544  else
545  {
546  ret = SeekInternal(pos, whence);
547  }
548 
549  m_generalWait.wakeAll();
550 
551  if (!has_lock)
552  {
553  m_rwLock.unlock();
554  }
555  return ret;
556 }
557 
559 {
560  QWriteLocker lock(&m_rwLock);
561  bool old = m_readInternalMode;
562 
563  if (mode == old)
564  {
565  return old;
566  }
567 
568  m_readInternalMode = mode;
569 
570  if (!mode)
571  {
572  // adjust real read position in ringbuffer
573  m_rbrLock.lockForWrite();
575  m_generalWait.wakeAll();
576  m_rbrLock.unlock();
577  // reset the read offset as we are exiting the internal read mode
578  m_readOffset = 0;
579  }
580 
581  LOG(VB_FILE, LOG_DEBUG, LOC +
582  QString("SetReadInternalMode: %1").arg(mode ? "on" : "off"));
583 
584  return old;
585 }
586 
590 {
591  m_rbrLock.lockForRead();
592  m_rbwLock.lockForRead();
594  m_rbwLock.unlock();
595  m_rbrLock.unlock();
596  return ret;
597 }
598 
610 void RingBuffer::ResetReadAhead(long long newinternal)
611 {
612  LOG(VB_FILE, LOG_INFO, LOC +
613  QString("ResetReadAhead(internalreadpos = %1->%2)")
614  .arg(m_internalReadPos).arg(newinternal));
615 
616  m_readInternalMode = false;
617  m_readOffset = 0;
618 
619  m_rbrLock.lockForWrite();
620  m_rbwLock.lockForWrite();
621 
623 
624  m_rbrPos = 0;
625  m_rbwPos = 0;
626  m_internalReadPos = newinternal;
627  m_ateof = false;
628  m_readsAllowed = false;
629  m_readsDesired = false;
630  m_recentSeek = true;
631  m_setSwitchToNext = false;
632 
633  m_generalWait.wakeAll();
634 
635  m_rbwLock.unlock();
636  m_rbrLock.unlock();
637 }
638 
654 {
655  bool do_start = true;
656 
657  m_rwLock.lockForWrite();
658  if (!m_startReadAhead)
659  {
660  do_start = false;
661  }
662  else if (m_writeMode)
663  {
664  LOG(VB_GENERAL, LOG_WARNING, LOC + "Not starting read ahead thread, "
665  "this is a write only RingBuffer");
666  do_start = false;
667  }
668  else if (m_readAheadRunning)
669  {
670  LOG(VB_GENERAL, LOG_WARNING, LOC + "Not starting read ahead thread, "
671  "already running");
672  do_start = false;
673  }
674 
675  if (!do_start)
676  {
677  m_rwLock.unlock();
678  return;
679  }
680 
681  StartReads();
682 
683  MThread::start();
684 
686  m_generalWait.wait(&m_rwLock);
687 
688  m_rwLock.unlock();
689 }
690 
695 {
696  while (isRunning())
697  {
698  m_rwLock.lockForWrite();
699  m_readAheadRunning = false;
700  StopReads();
701  m_generalWait.wakeAll();
702  m_rwLock.unlock();
703  MThread::wait(5000);
704  }
705 }
706 
712 {
713  LOG(VB_FILE, LOG_INFO, LOC + "StopReads()");
714  m_stopReads = true;
715  m_generalWait.wakeAll();
716 }
717 
723 {
724  LOG(VB_FILE, LOG_INFO, LOC + "StartReads()");
725  m_stopReads = false;
726  m_generalWait.wakeAll();
727 }
728 
734 {
735  LOG(VB_FILE, LOG_INFO, LOC + "Pause()");
736  StopReads();
737 
738  m_rwLock.lockForWrite();
739  m_requestPause = true;
740  m_rwLock.unlock();
741 }
742 
748 {
749  LOG(VB_FILE, LOG_INFO, LOC + "Unpause()");
750  StartReads();
751 
752  m_rwLock.lockForWrite();
753  m_requestPause = false;
754  m_generalWait.wakeAll();
755  m_rwLock.unlock();
756 }
757 
759 bool RingBuffer::isPaused(void) const
760 {
761  m_rwLock.lockForRead();
762  bool ret = !m_readAheadRunning || m_paused;
763  m_rwLock.unlock();
764  return ret;
765 }
766 
771 {
772  MythTimer t;
773  t.start();
774 
775  m_rwLock.lockForRead();
777  {
778  m_generalWait.wait(&m_rwLock, 1000);
779  if (m_readAheadRunning && !m_paused && m_requestPause && t.elapsed() > 1000)
780  {
781  LOG(VB_GENERAL, LOG_WARNING, LOC +
782  QString("Waited %1 ms for ringbuffer pause..")
783  .arg(t.elapsed()));
784  }
785  }
786  m_rwLock.unlock();
787 }
788 
790 {
791  const uint timeout = 500; // ms
792 
793  if (m_requestPause)
794  {
795  if (!m_paused)
796  {
797  m_rwLock.unlock();
798  m_rwLock.lockForWrite();
799 
800  if (m_requestPause)
801  {
802  m_paused = true;
803  m_generalWait.wakeAll();
804  }
805 
806  m_rwLock.unlock();
807  m_rwLock.lockForRead();
808  }
809 
812  }
813 
814  if (!m_requestPause && m_paused)
815  {
816  m_rwLock.unlock();
817  m_rwLock.lockForWrite();
818 
819  if (!m_requestPause)
820  {
821  m_paused = false;
822  m_generalWait.wakeAll();
823  }
824 
825  m_rwLock.unlock();
826  m_rwLock.lockForRead();
827  }
828 
829  return m_requestPause || m_paused;
830 }
831 
833 {
834  m_rwLock.lockForWrite();
835  m_posLock.lockForWrite();
836 
837  uint oldsize = m_bufferSize;
838  uint newsize = BUFFER_SIZE_MINIMUM;
839  if (m_remotefile)
840  {
841  newsize *= BUFFER_FACTOR_NETWORK;
842  if (m_fileIsMatroska)
843  newsize *= BUFFER_FACTOR_MATROSKA;
844  if (m_unknownBitrate)
845  newsize *= BUFFER_FACTOR_BITRATE;
846  }
847 
848  // N.B. Don't try and make it smaller - bad things happen...
849  if (m_readAheadBuffer && oldsize >= newsize)
850  {
851  m_posLock.unlock();
852  m_rwLock.unlock();
853  return;
854  }
855 
856  m_bufferSize = newsize;
857  if (m_readAheadBuffer)
858  {
859  char* newbuffer = new char[m_bufferSize + 1024];
860  memcpy(newbuffer, m_readAheadBuffer + m_rbwPos, oldsize - m_rbwPos);
861  memcpy(newbuffer + (oldsize - m_rbwPos), m_readAheadBuffer, m_rbwPos);
862  delete [] m_readAheadBuffer;
863  m_readAheadBuffer = newbuffer;
865  (m_rbrPos + oldsize - m_rbwPos);
866  m_rbwPos = oldsize;
867  }
868  else
869  {
870  m_readAheadBuffer = new char[m_bufferSize + 1024];
871  }
873  m_posLock.unlock();
874  m_rwLock.unlock();
875 
876  LOG(VB_FILE, LOG_INFO, LOC + QString("Created readAheadBuffer: %1Mb")
877  .arg(newsize >> 20));
878 }
879 
880 void RingBuffer::run(void)
881 {
882  RunProlog();
883 
884  // These variables are used to adjust the read block size
885  struct timeval lastread, now;
886  int readtimeavg = 300;
887  bool ignore_for_read_timing = true;
888  int eofreads = 0;
889 
890  gettimeofday(&lastread, nullptr); // this is just to keep gcc happy
891 
893  m_rwLock.lockForWrite();
894  m_posLock.lockForWrite();
895  m_requestPause = false;
896  ResetReadAhead(0);
897  m_readAheadRunning = true;
898  m_reallyRunning = true;
899  m_generalWait.wakeAll();
900  m_posLock.unlock();
901  m_rwLock.unlock();
902 
903  // NOTE: this must loop at some point hold only
904  // a read lock on rwlock, so that other functions
905  // such as reset and seek can take priority.
906 
907  m_rwLock.lockForRead();
908 
909  LOG(VB_FILE, LOG_INFO, LOC +
910  QString("Initial readblocksize %1K & fill_min %2K")
911  .arg(m_readBlockSize/1024).arg(m_fillMin/1024));
912 
913  while (m_readAheadRunning)
914  {
915  m_rwLock.unlock();
916  bool isopened = IsOpen();
917  m_rwLock.lockForRead();
918 
919  if (!isopened)
920  {
921  LOG(VB_FILE, LOG_WARNING, LOC +
922  QString("File not opened, terminating readahead thread"));
923  m_posLock.lockForWrite();
924  m_readAheadRunning = false;
925  m_generalWait.wakeAll();
926  m_posLock.unlock();
927  break;
928  }
929  if (PauseAndWait())
930  {
931  ignore_for_read_timing = true;
932  LOG(VB_FILE, LOG_DEBUG, LOC +
933  "run: PauseAndWait Not reading continuing");
934  continue;
935  }
936 
937  long long totfree = ReadBufFree();
938 
939  const uint KB32 = 32*1024;
940  const int KB512 = 512*1024;
941  // These are conditions where we don't want to go through
942  // the loop if they are true.
943  if (((totfree < KB32) && m_readsAllowed) ||
945  {
946  ignore_for_read_timing |=
948  m_generalWait.wait(&m_rwLock, (m_stopReads) ? 50 : 1000);
949  LOG(VB_FILE, LOG_DEBUG, LOC +
950  QString("run: Not reading continuing: totfree(%1) "
951  "readsallowed(%2) ignorereadpos(%3) commserror(%4) "
952  "stopreads(%5)")
953  .arg(totfree).arg(m_readsAllowed).arg(m_ignoreReadPos)
954  .arg(m_commsError).arg(m_stopReads));
955  continue;
956  }
957 
958  // These are conditions where we want to sleep to allow
959  // other threads to do stuff.
961  {
962  ignore_for_read_timing = true;
963  m_generalWait.wait(&m_rwLock, 1000);
964  totfree = ReadBufFree();
965  }
966 
967  int read_return = -1;
968  if (totfree >= KB32 && !m_commsError &&
970  {
971  // limit the read size
972  if (m_readBlockSize > totfree)
973  totfree = (totfree / KB32) * KB32; // must be multiple of 32KB
974  else
975  totfree = m_readBlockSize;
976 
977  // adapt blocksize
978  gettimeofday(&now, nullptr);
979  if (!ignore_for_read_timing)
980  {
981  int readinterval = (now.tv_sec - lastread.tv_sec ) * 1000 +
982  (now.tv_usec - lastread.tv_usec) / 1000;
983  readtimeavg = (readtimeavg * 9 + readinterval) / 10;
984 
985  if (readtimeavg < 150 &&
987  m_readBlockSize >= CHUNK /* low_buffers */ &&
988  m_readBlockSize <= KB512)
989  {
990  int old_block_size = m_readBlockSize;
993  if (m_readBlockSize > KB512)
994  {
995  m_readBlockSize = KB512;
996  }
997  LOG(VB_FILE, LOG_INFO, LOC +
998  QString("Avg read interval was %1 msec. "
999  "%2K -> %3K block size")
1000  .arg(readtimeavg)
1001  .arg(old_block_size/1024)
1002  .arg(m_readBlockSize/1024));
1003  readtimeavg = 225;
1004  }
1005  else if (readtimeavg > 300 && m_readBlockSize > CHUNK)
1006  {
1008  LOG(VB_FILE, LOG_INFO, LOC +
1009  QString("Avg read interval was %1 msec. "
1010  "%2K -> %3K block size")
1011  .arg(readtimeavg)
1012  .arg((m_readBlockSize+CHUNK)/1024)
1013  .arg(m_readBlockSize/1024));
1014  readtimeavg = 225;
1015  }
1016  }
1017  lastread = now;
1018 
1019  m_rbwLock.lockForRead();
1020  if (m_rbwPos + totfree > m_bufferSize)
1021  {
1022  totfree = m_bufferSize - m_rbwPos;
1023  LOG(VB_FILE, LOG_DEBUG, LOC +
1024  "Shrinking read, near end of buffer");
1025  }
1026 
1027  if (m_internalReadPos == 0)
1028  {
1029  totfree = max(m_fillMin, m_readBlockSize);
1030  LOG(VB_FILE, LOG_DEBUG, LOC +
1031  "Reading enough data to start playback");
1032  }
1033 
1034  LOG(VB_FILE, LOG_DEBUG, LOC +
1035  QString("safe_read(...@%1, %2) -- begin")
1036  .arg(m_rbwPos).arg(totfree));
1037 
1038  MythTimer sr_timer;
1039  sr_timer.start();
1040 
1041  int rbwposcopy = m_rbwPos;
1042 
1043  // FileRingBuffer::safe_read(RemoteFile*...) acquires poslock;
1044  // so we need to unlock this here to preserve locking order.
1045  m_rbwLock.unlock();
1046 
1047  read_return = safe_read(m_readAheadBuffer + rbwposcopy, totfree);
1048 
1049  int sr_elapsed = sr_timer.elapsed();
1050  uint64_t bps = !sr_elapsed ? 1000000001 :
1051  (uint64_t)(((double)read_return * 8000.0) /
1052  (double)sr_elapsed);
1053  LOG(VB_FILE, LOG_DEBUG, LOC +
1054  QString("safe_read(...@%1, %2) -> %3, took %4 ms %5 avg %6 ms")
1055  .arg(rbwposcopy).arg(totfree).arg(read_return)
1056  .arg(sr_elapsed)
1057  .arg(QString("(%1Mbps)").arg((double)bps / 1000000.0))
1058  .arg(readtimeavg));
1059  UpdateStorageRate(bps);
1060 
1061  if (read_return >= 0)
1062  {
1063  m_posLock.lockForWrite();
1064  m_rbwLock.lockForWrite();
1065 
1066  if (rbwposcopy == m_rbwPos)
1067  {
1068  m_internalReadPos += read_return;
1069  m_rbwPos = (m_rbwPos + read_return) % m_bufferSize;
1070  LOG(VB_FILE, LOG_DEBUG,
1071  LOC + QString("rbwpos += %1K requested %2K in read")
1072  .arg(read_return/1024,3).arg(totfree/1024,3));
1073  }
1074  m_numFailures = 0;
1075 
1076  m_rbwLock.unlock();
1077  m_posLock.unlock();
1078 
1079  LOG(VB_FILE, LOG_DEBUG, LOC +
1080  QString("total read so far: %1 bytes")
1081  .arg(m_internalReadPos));
1082  }
1083  }
1084  else
1085  {
1086  LOG(VB_FILE, LOG_DEBUG, LOC +
1087  QString("We are not reading anything "
1088  "(totfree: %1 commserror:%2 ateof:%3 "
1089  "setswitchtonext:%4")
1090  .arg(totfree).arg(m_commsError).arg(m_ateof).arg(m_setSwitchToNext));
1091  }
1092 
1093  int used = m_bufferSize - ReadBufFree();
1094 
1095  bool reads_were_allowed = m_readsAllowed;
1096 
1097  ignore_for_read_timing =
1098  (totfree < m_readBlockSize) || (read_return < totfree);
1099 
1100  if ((0 == read_return) || (m_numFailures > 5) ||
1101  (m_readsAllowed != (used >= 1 || m_ateof ||
1103  (m_readsDesired != (used >= m_fillMin || m_ateof ||
1105  {
1106  // If readpos changes while the lock is released
1107  // we should not handle the 0 read_return now.
1108  long long old_readpos = m_readPos;
1109 
1110  m_rwLock.unlock();
1111  m_rwLock.lockForWrite();
1112 
1113  m_commsError |= (m_numFailures > 5);
1114 
1116  m_readsDesired =
1118 
1119  if (0 == read_return && old_readpos == m_readPos)
1120  {
1121  eofreads++;
1122  if (eofreads >= 3 && m_readBlockSize >= KB512)
1123  {
1124  // not reading anything
1127  }
1128 
1129  if (m_liveTVChain)
1130  {
1133  {
1134  // we receive new livetv chain element event
1135  // before we receive file closed for writing event
1136  // so don't need to test if file is closed for writing
1137  m_liveTVChain->SwitchToNext(true);
1138  m_setSwitchToNext = true;
1139  }
1141  {
1142  LOG(VB_FILE, LOG_DEBUG, LOC +
1143  QString("EOF encountered, but %1 still being written to")
1144  .arg(m_filename));
1145  // We reached EOF, but file still open for writing and
1146  // no next program in livetvchain
1147  // wait a little bit (60ms same wait as typical safe_read)
1148  m_generalWait.wait(&m_rwLock, 60);
1149  }
1150  }
1152  {
1153  LOG(VB_FILE, LOG_DEBUG, LOC +
1154  QString("EOF encountered, but %1 still being written to")
1155  .arg(m_filename));
1156  // We reached EOF, but file still open for writing,
1157  // typically active in-progress recording
1158  // wait a little bit (60ms same wait as typical safe_read)
1159  m_generalWait.wait(&m_rwLock, 60);
1160  m_beingWritten = true;
1161  }
1162  else
1163  {
1165  {
1166  LOG(VB_FILE, LOG_DEBUG, LOC +
1167  "Waiting for file to grow large enough to process.");
1168  m_generalWait.wait(&m_rwLock, 300);
1169  }
1170  else
1171  {
1172  LOG(VB_FILE, LOG_DEBUG,
1173  LOC + "setting ateof (read_return == 0)");
1174  m_ateof = true;
1175  }
1176  }
1177  }
1178 
1179  m_rwLock.unlock();
1180  m_rwLock.lockForRead();
1181  used = m_bufferSize - ReadBufFree();
1182  }
1183  else
1184  {
1185  eofreads = 0;
1186  }
1187 
1188  LOG(VB_FILE, LOG_DEBUG, LOC + "@ end of read ahead loop");
1189 
1191  (m_wantToRead <= used && m_wantToRead > 0))
1192  {
1193  // To give other threads a good chance to handle these
1194  // conditions, even if they are only requesting a read lock
1195  // like us, yield (currently implemented with short usleep).
1196  m_generalWait.wakeAll();
1197  m_rwLock.unlock();
1198  std::this_thread::sleep_for(std::chrono::milliseconds(5));
1199  m_rwLock.lockForRead();
1200  }
1201  else
1202  {
1203  // yield if we have nothing to do...
1204  if (!m_requestPause && reads_were_allowed &&
1205  (used >= m_fillThreshold || m_ateof || m_setSwitchToNext))
1206  {
1207  m_generalWait.wait(&m_rwLock, 50);
1208  }
1209  else if (m_readsAllowed)
1210  { // if reads are allowed release the lock and yield so the
1211  // reader gets a chance to read before the buffer is full.
1212  m_generalWait.wakeAll();
1213  m_rwLock.unlock();
1214  std::this_thread::sleep_for(std::chrono::milliseconds(5));
1215  m_rwLock.lockForRead();
1216  }
1217  }
1218  }
1219 
1220  m_rwLock.unlock();
1221 
1222  m_rwLock.lockForWrite();
1223  m_rbrLock.lockForWrite();
1224  m_rbwLock.lockForWrite();
1225 
1226  delete [] m_readAheadBuffer;
1227 
1228  m_readAheadBuffer = nullptr;
1229  m_rbrPos = 0;
1230  m_rbwPos = 0;
1231  m_reallyRunning = false;
1232  m_readsAllowed = false;
1233  m_readsDesired = false;
1234 
1235  m_rbwLock.unlock();
1236  m_rbrLock.unlock();
1237  m_rwLock.unlock();
1238 
1239  LOG(VB_FILE, LOG_INFO, LOC + QString("Exiting readahead thread"));
1240 
1241  RunEpilog();
1242 }
1243 
1245 {
1246  m_rwLock.lockForWrite();
1247  m_posLock.lockForRead();
1249  long long ra = m_readAdjust;
1250  m_posLock.unlock();
1251  m_rwLock.unlock();
1252  return ra;
1253 }
1254 
1255 int RingBuffer::Peek(void *buf, int count)
1256 {
1257  int ret = ReadPriv(buf, count, true);
1258  if (ret != count)
1259  {
1260  LOG(VB_GENERAL, LOG_WARNING, LOC +
1261  QString("Peek() requested %1 bytes, but only returning %2")
1262  .arg(count).arg(ret));
1263  }
1264  return ret;
1265 }
1266 
1268 {
1269  // Wait up to 30000 ms for reads allowed (or readsdesired if post seek/open)
1271  m_recentSeek = false;
1272  int timeout_ms = 30000;
1273  int count = 0;
1274  MythTimer t;
1275  t.start();
1276 
1277  while ((t.elapsed() < timeout_ms) && !check && !m_stopReads &&
1279  {
1280  m_generalWait.wait(&m_rwLock, clamp(timeout_ms - t.elapsed(), 10, 100));
1281  if (!check && t.elapsed() > 1000 && (count % 100) == 0)
1282  {
1283  LOG(VB_GENERAL, LOG_WARNING, LOC +
1284  "Taking too long to be allowed to read..");
1285  }
1286  count++;
1287  }
1288  if (t.elapsed() >= timeout_ms)
1289  {
1290  LOG(VB_GENERAL, LOG_ERR, LOC +
1291  QString("Took more than %1 seconds to be allowed to read, aborting.")
1292  .arg(timeout_ms / 1000));
1293  return false;
1294  }
1295  return check;
1296 }
1297 
1299 {
1300  int avail = ReadBufAvail();
1301  if (avail >= count)
1302  return avail;
1303 
1304  count = (m_ateof && avail < count) ? avail : count;
1305 
1306  if (m_liveTVChain && m_setSwitchToNext && avail < count)
1307  {
1308  return avail;
1309  }
1310 
1311  // Make sure that if the read ahead thread is sleeping and
1312  // it should be reading that we start reading right away.
1313  if ((avail < count) && !m_stopReads &&
1315  {
1316  m_generalWait.wakeAll();
1317  }
1318 
1319  MythTimer t;
1320  t.start();
1321  while ((avail < count) && !m_stopReads &&
1323  {
1324  m_wantToRead = count;
1325  m_generalWait.wait(&m_rwLock, clamp(timeout - t.elapsed(), 10, 250));
1326  avail = ReadBufAvail();
1327  if (m_ateof)
1328  break;
1329  if (m_lowBuffers && avail >= m_fillMin)
1330  break;
1331  if (t.elapsed() > timeout)
1332  break;
1333  }
1334 
1335  m_wantToRead = 0;
1336 
1337  return avail;
1338 }
1339 
1340 int RingBuffer::ReadDirect(void *buf, int count, bool peek)
1341 {
1342  long long old_pos = 0;
1343  if (peek)
1344  {
1345  m_posLock.lockForRead();
1346  old_pos = (m_ignoreReadPos >= 0) ? m_ignoreReadPos : m_readPos;
1347  m_posLock.unlock();
1348  }
1349 
1350  MythTimer timer;
1351  timer.start();
1352  int ret = safe_read(buf, count);
1353  int elapsed = timer.elapsed();
1354  uint64_t bps = !elapsed ? 1000000001 :
1355  (uint64_t)(((float)ret * 8000.0F) / (float)elapsed);
1356  UpdateStorageRate(bps);
1357 
1358  m_posLock.lockForWrite();
1359  if (m_ignoreReadPos >= 0 && ret > 0)
1360  {
1361  if (peek)
1362  {
1363  // seek should always succeed since we were at this position
1364  long long cur_pos = -1;
1365  if (m_remotefile)
1366  cur_pos = m_remotefile->Seek(old_pos, SEEK_SET);
1367  else if (m_fd2 >= 0)
1368  cur_pos = lseek64(m_fd2, old_pos, SEEK_SET);
1369  if (cur_pos < 0)
1370  {
1371  LOG(VB_FILE, LOG_ERR, LOC +
1372  "Seek failed repositioning to previous position");
1373  }
1374  }
1375  else
1376  {
1377  m_ignoreReadPos += ret;
1378  }
1379  m_posLock.unlock();
1380  return ret;
1381  }
1382  m_posLock.unlock();
1383 
1384  if (peek && ret > 0)
1385  {
1386  if ((IsDVD() || IsBD()) && old_pos != 0)
1387  {
1388  LOG(VB_GENERAL, LOG_ERR, LOC +
1389  "DVD and Blu-Ray do not support arbitrary "
1390  "peeks except when read-ahead is enabled."
1391  "\n\t\t\tWill seek to beginning of video.");
1392  old_pos = 0;
1393  }
1394 
1395  long long new_pos = Seek(old_pos, SEEK_SET, true);
1396 
1397  if (new_pos != old_pos)
1398  {
1399  LOG(VB_GENERAL, LOG_ERR, LOC +
1400  QString("Peek() Failed to return from new "
1401  "position %1 to old position %2, now "
1402  "at position %3")
1403  .arg(old_pos - ret).arg(old_pos).arg(new_pos));
1404  }
1405  }
1406 
1407  return ret;
1408 }
1409 
1418 int RingBuffer::ReadPriv(void *buf, int count, bool peek)
1419 {
1420  QString loc_desc = QString("ReadPriv(..%1, %2)")
1421  .arg(count).arg(peek?"peek":"normal");
1422  LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc +
1423  QString(" @%1 -- begin").arg(m_rbrPos));
1424 
1425  m_rwLock.lockForRead();
1426  if (m_writeMode)
1427  {
1428  LOG(VB_GENERAL, LOG_ERR, LOC + loc_desc +
1429  ": Attempt to read from a write only file");
1430  errno = EBADF;
1431  m_rwLock.unlock();
1432  return -1;
1433  }
1434 
1436  {
1437  m_rwLock.unlock();
1438  m_rwLock.lockForWrite();
1439  // we need a write lock so the read-ahead thread
1440  // can't start mucking with the read position.
1441  // If the read ahead thread was started while we
1442  // didn't hold the lock, we proceed with a normal
1443  // read from the buffer, otherwise we read directly.
1444  if (m_requestPause || m_stopReads ||
1446  {
1447  int ret = ReadDirect(buf, count, peek);
1448  LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc +
1449  QString(": ReadDirect checksum %1")
1450  .arg(qChecksum((char*)buf,count)));
1451  m_rwLock.unlock();
1452  return ret;
1453  }
1454  m_rwLock.unlock();
1455  m_rwLock.lockForRead();
1456  }
1457 
1458  if (!WaitForReadsAllowed())
1459  {
1460  LOG(VB_FILE, LOG_NOTICE, LOC + loc_desc + ": !WaitForReadsAllowed()");
1461  m_rwLock.unlock();
1462  m_stopReads = true; // this needs to be outside the lock
1463  m_rwLock.lockForWrite();
1464  m_wantToRead = 0;
1465  m_rwLock.unlock();
1466  return 0;
1467  }
1468 
1469  int avail = ReadBufAvail();
1471 
1472  // Wait up to 10000 ms for any data
1473  int timeout_ms = 10000;
1474  while (!m_readInternalMode && !m_ateof &&
1475  (t.elapsed() < timeout_ms) && m_readAheadRunning &&
1477  {
1478  avail = WaitForAvail(count, min(timeout_ms - t.elapsed(), 100));
1479  if (m_liveTVChain && m_setSwitchToNext && avail < count)
1480  {
1481  LOG(VB_GENERAL, LOG_INFO, LOC +
1482  "Checking to see if there's a new livetv program to switch to..");
1484  break;
1485  }
1486  if (avail > 0)
1487  break;
1488  }
1489  if (t.elapsed() > 6000)
1490  {
1491  LOG(VB_GENERAL, LOG_WARNING, LOC + loc_desc +
1492  QString(" -- waited %1 ms for avail(%2) > count(%3)")
1493  .arg(t.elapsed()).arg(avail).arg(count));
1494  }
1495 
1496  if (m_readInternalMode)
1497  {
1498  LOG(VB_FILE, LOG_DEBUG, LOC +
1499  QString("ReadPriv: %1 bytes available, %2 left")
1500  .arg(avail).arg(avail-m_readOffset));
1501  }
1502  count = min(avail - m_readOffset, count);
1503 
1504  if ((count <= 0) && (m_ateof || m_readInternalMode))
1505  {
1506  // If we're at the end of file return 0 bytes
1507  m_rwLock.unlock();
1508  return count;
1509  }
1510  if (count <= 0)
1511  {
1512  // If we're not at the end of file but have no data
1513  // at this point time out and shutdown read ahead.
1514  LOG(VB_GENERAL, LOG_ERR, LOC + loc_desc +
1515  QString(" -- timed out waiting for data (%1 ms)")
1516  .arg(t.elapsed()));
1517 
1518  m_rwLock.unlock();
1519  m_stopReads = true; // this needs to be outside the lock
1520  m_rwLock.lockForWrite();
1521  m_ateof = true;
1522  m_wantToRead = 0;
1523  m_generalWait.wakeAll();
1524  m_rwLock.unlock();
1525  return count;
1526  }
1527 
1528  if (peek || m_readInternalMode)
1529  m_rbrLock.lockForRead();
1530  else
1531  m_rbrLock.lockForWrite();
1532 
1533  LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc + " -- copying data");
1534 
1535  int rpos;
1536  if (m_rbrPos + m_readOffset > (int) m_bufferSize)
1537  {
1538  rpos = (m_rbrPos + m_readOffset) - m_bufferSize;
1539  }
1540  else
1541  {
1542  rpos = m_rbrPos + m_readOffset;
1543  }
1544  if (rpos + count > (int) m_bufferSize)
1545  {
1546  int firstsize = m_bufferSize - rpos;
1547  int secondsize = count - firstsize;
1548 
1549  memcpy(buf, m_readAheadBuffer + rpos, firstsize);
1550  memcpy((char *)buf + firstsize, m_readAheadBuffer, secondsize);
1551  }
1552  else
1553  {
1554  memcpy(buf, m_readAheadBuffer + rpos, count);
1555  }
1556  LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc + QString(" -- checksum %1")
1557  .arg(qChecksum((char*)buf,count)));
1558 
1559  if (!peek)
1560  {
1561  if (m_readInternalMode)
1562  {
1563  m_readOffset += count;
1564  }
1565  else
1566  {
1567  m_rbrPos = (m_rbrPos + count) % m_bufferSize;
1568  m_generalWait.wakeAll();
1569  }
1570  }
1571  m_rbrLock.unlock();
1572  m_rwLock.unlock();
1573 
1574  return count;
1575 }
1576 
1585 int RingBuffer::Read(void *buf, int count)
1586 {
1587  int ret = ReadPriv(buf, count, false);
1588  if (ret > 0)
1589  {
1590  m_posLock.lockForWrite();
1591  m_readPos += ret;
1592  m_posLock.unlock();
1593  UpdateDecoderRate(ret);
1594  }
1595 
1596  return ret;
1597 }
1598 
1599 QString RingBuffer::BitrateToString(uint64_t rate, bool hz)
1600 {
1601  QString msg;
1602  float bitrate;
1603  int range = 0;
1604  if (rate < 1)
1605  {
1606  return "-";
1607  }
1608  if (rate > 1000000000)
1609  {
1610  return QObject::tr(">1Gbps");
1611  }
1612  if (rate >= 1000000)
1613  {
1614  msg = hz ? QObject::tr("%1MHz") : QObject::tr("%1Mbps");
1615  bitrate = (float)rate / (1000000.0F);
1616  range = hz ? 3 : 1;
1617  }
1618  else if (rate >= 1000)
1619  {
1620  msg = hz ? QObject::tr("%1kHz") : QObject::tr("%1kbps");
1621  bitrate = (float)rate / 1000.0F;
1622  range = hz ? 1 : 0;
1623  }
1624  else
1625  {
1626  msg = hz ? QObject::tr("%1Hz") : QObject::tr("%1bps");
1627  bitrate = (float)rate;
1628  }
1629  return msg.arg(bitrate, 0, 'f', range);
1630 }
1631 
1633 {
1635 }
1636 
1638 {
1640 }
1641 
1643 {
1645  return "N/A";
1646 
1647  int avail = (m_rbwPos >= m_rbrPos) ? m_rbwPos - m_rbrPos
1649  return QString("%1%").arg(lroundf((float)avail / (float)m_bufferSize * 100.0F));
1650 }
1651 
1652 uint64_t RingBuffer::UpdateDecoderRate(uint64_t latest)
1653 {
1655  return 0;
1656 
1657  // TODO use QDateTime once we've moved to Qt 4.7
1658  static QTime midnight = QTime(0, 0, 0);
1659  QTime now = QTime::currentTime();
1660  qint64 age = midnight.msecsTo(now);
1661  qint64 oldest = age - 1000;
1662 
1663  m_decoderReadLock.lock();
1664  if (latest)
1665  m_decoderReads.insert(age, latest);
1666 
1667  uint64_t total = 0;
1668  QMutableMapIterator<qint64,uint64_t> it(m_decoderReads);
1669  while (it.hasNext())
1670  {
1671  it.next();
1672  if (it.key() < oldest || it.key() > age)
1673  it.remove();
1674  else
1675  total += it.value();
1676  }
1677 
1678  uint64_t average = (uint64_t)((double)total * 8.0);
1679  m_decoderReadLock.unlock();
1680 
1681  LOG(VB_FILE, LOG_INFO, LOC + QString("Decoder read speed: %1 %2")
1682  .arg(average).arg(m_decoderReads.size()));
1683  return average;
1684 }
1685 
1686 uint64_t RingBuffer::UpdateStorageRate(uint64_t latest)
1687 {
1689  return 0;
1690 
1691  // TODO use QDateTime once we've moved to Qt 4.7
1692  static QTime midnight = QTime(0, 0, 0);
1693  QTime now = QTime::currentTime();
1694  qint64 age = midnight.msecsTo(now);
1695  qint64 oldest = age - 1000;
1696 
1697  m_storageReadLock.lock();
1698  if (latest)
1699  m_storageReads.insert(age, latest);
1700 
1701  uint64_t total = 0;
1702  QMutableMapIterator<qint64,uint64_t> it(m_storageReads);
1703  while (it.hasNext())
1704  {
1705  it.next();
1706  if (it.key() < oldest || it.key() > age)
1707  it.remove();
1708  else
1709  total += it.value();
1710  }
1711 
1712  int size = m_storageReads.size();
1713  m_storageReadLock.unlock();
1714 
1715  uint64_t average = size ? (uint64_t)(((double)total) / (double)size) : 0;
1716 
1717  LOG(VB_FILE, LOG_INFO, LOC + QString("Average storage read speed: %1 %2")
1718  .arg(average).arg(m_storageReads.size()));
1719  return average;
1720 }
1721 
1726 int RingBuffer::Write(const void *buf, uint count)
1727 {
1728  m_rwLock.lockForRead();
1729 
1730  if (!m_writeMode)
1731  {
1732  LOG(VB_GENERAL, LOG_ERR, LOC + "Tried to write to a read only file.");
1733  m_rwLock.unlock();
1734  return -1;
1735  }
1736 
1737  if (!m_tfw && !m_remotefile)
1738  {
1739  m_rwLock.unlock();
1740  return -1;
1741  }
1742 
1743  int ret = -1;
1744  if (m_tfw)
1745  ret = m_tfw->Write(buf, count);
1746  else
1747  ret = m_remotefile->Write(buf, count);
1748 
1749  if (ret > 0)
1750  {
1751  m_posLock.lockForWrite();
1752  m_writePos += ret;
1753  m_posLock.unlock();
1754  }
1755 
1756  m_rwLock.unlock();
1757 
1758  return ret;
1759 }
1760 
1765 {
1766  m_rwLock.lockForRead();
1767  if (m_tfw)
1768  m_tfw->Sync();
1769  m_rwLock.unlock();
1770 }
1771 
1774 long long RingBuffer::WriterSeek(long long pos, int whence, bool has_lock)
1775 {
1776  long long ret = -1;
1777 
1778  if (!has_lock)
1779  m_rwLock.lockForRead();
1780 
1781  m_posLock.lockForWrite();
1782 
1783  if (m_tfw)
1784  {
1785  ret = m_tfw->Seek(pos, whence);
1786  m_writePos = ret;
1787  }
1788 
1789  m_posLock.unlock();
1790 
1791  if (!has_lock)
1792  m_rwLock.unlock();
1793 
1794  return ret;
1795 }
1796 
1801 {
1802  m_rwLock.lockForRead();
1803  if (m_tfw)
1804  m_tfw->Flush();
1805  m_rwLock.unlock();
1806 }
1807 
1812 {
1813  m_rwLock.lockForRead();
1814  if (m_tfw)
1815  m_tfw->SetWriteBufferMinWriteSize(newMinSize);
1816  m_rwLock.unlock();
1817 }
1818 
1823 {
1824  QReadLocker lock(&m_rwLock);
1825 
1826  if (m_tfw)
1827  return m_tfw->SetBlocking(block);
1828  return false;
1829 }
1830 
1846 void RingBuffer::SetOldFile(bool is_old)
1847 {
1848  LOG(VB_FILE, LOG_INFO, LOC + QString("SetOldFile(%1)").arg(is_old));
1849  m_rwLock.lockForWrite();
1850  m_oldfile = is_old;
1851  m_rwLock.unlock();
1852 }
1853 
1855 QString RingBuffer::GetFilename(void) const
1856 {
1857  m_rwLock.lockForRead();
1858  QString tmp = m_filename;
1859  m_rwLock.unlock();
1860  return tmp;
1861 }
1862 
1864 {
1865  m_rwLock.lockForRead();
1866  QString tmp = m_subtitleFilename;
1867  m_rwLock.unlock();
1868  return tmp;
1869 }
1870 
1871 QString RingBuffer::GetLastError(void) const
1872 {
1873  m_rwLock.lockForRead();
1874  QString tmp = m_lastError;
1875  m_rwLock.unlock();
1876  return tmp;
1877 }
1878 
1882 long long RingBuffer::GetWritePosition(void) const
1883 {
1884  m_posLock.lockForRead();
1885  long long ret = m_writePos;
1886  m_posLock.unlock();
1887  return ret;
1888 }
1889 
1894 bool RingBuffer::LiveMode(void) const
1895 {
1896  m_rwLock.lockForRead();
1897  bool ret = (m_liveTVChain);
1898  m_rwLock.unlock();
1899  return ret;
1900 }
1901 
1907 {
1908  m_rwLock.lockForWrite();
1909  m_liveTVChain = chain;
1910  m_rwLock.unlock();
1911 }
1912 
1914 void RingBuffer::IgnoreLiveEOF(bool ignore)
1915 {
1916  m_rwLock.lockForWrite();
1917  m_ignoreLiveEOF = ignore;
1918  m_rwLock.unlock();
1919 }
1920 
1921 const DVDRingBuffer *RingBuffer::DVD(void) const
1922 {
1923  return dynamic_cast<const DVDRingBuffer*>(this);
1924 }
1925 
1926 const BDRingBuffer *RingBuffer::BD(void) const
1927 {
1928  return dynamic_cast<const BDRingBuffer*>(this);
1929 }
1930 
1932 {
1933  return dynamic_cast<DVDRingBuffer*>(this);
1934 }
1935 
1937 {
1938  return dynamic_cast<BDRingBuffer*>(this);
1939 }
1940 
1942 {
1943  QMutexLocker lock(avcodeclock);
1944 
1946  {
1947  avformat_network_init();
1949  }
1950 }
1951 
1952 /* vim: set expandtab tabstop=4 shiftwidth=4: */
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:215
static const int kDefaultOpenTimeout
void start(QThread::Priority=QThread::InheritPriority)
Tell MThread to start running the thread in the near future.
Definition: mthread.cpp:294
def write(text, progress=True)
Definition: mythburn.py:279
This is a wrapper around QThread that does several additional things.
Definition: mthread.h:46
void Pause(void)
Pauses the read-ahead thread.
Definition: ringbuffer.cpp:733
LiveTVChain * m_liveTVChain
A QElapsedTimer based timer to replace use of QTime as a timer.
Definition: mythtimer.h:13
const DVDRingBuffer * DVD(void) const
int GetReadBufAvail() const
Returns number of bytes available for reading from buffer.
Definition: ringbuffer.cpp:490
static QString BitrateToString(uint64_t rate, bool hz=false)
bool LiveMode(void) const
Returns true if this RingBuffer has been assigned a LiveTVChain.
void WriterFlush(void)
Calls ThreadedFileWriter::Flush(void)
#define BUFFER_FACTOR_BITRATE
QString GetLastError(void) const
#define BUFFER_SIZE_MINIMUM
static bool TestForHTTPLiveStreaming(const QString &filename)
bool WriterSetBlocking(bool lock=true)
Calls ThreadedFileWriter::SetBlocking(bool)
bool wait(unsigned long time=ULONG_MAX)
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:311
uint64_t UpdateStorageRate(uint64_t latest=0)
void SetLiveMode(LiveTVChain *chain)
Assigns a LiveTVChain to this RingBuffer.
void StartReads(void)
????
Definition: ringbuffer.cpp:722
uint64_t UpdateDecoderRate(uint64_t latest=0)
QReadWriteLock m_rwLock
QString GetFilename(void) const
Returns name of file used by this RingBuffer.
#define LOC
Definition: ringbuffer.cpp:42
unsigned int uint
Definition: compat.h:140
MythCoreContext * gCoreContext
This global variable contains the MythCoreContext instance for the app.
static RingBuffer * Create(const QString &xfilename, bool write, bool usereadahead=true, int timeout_ms=kDefaultOpenTimeout, bool stream_only=false)
Creates a RingBuffer instance.
Definition: ringbuffer.cpp:104
virtual long long SeekInternal(long long pos, int whence)=0
bool HasNext(void) const
long long Seek(long long pos, int whence)
Seek to a position within stream; May be unsafe.
QString GetDecoderRate(void)
QString GetStorageRate(void)
static guint32 * tmp
Definition: goom_core.c:35
void CalcReadAheadThresh(void)
Calculates m_fillMin, m_fillThreshold, and m_readBlockSize from the estimated effective bitrate of th...
Definition: ringbuffer.cpp:367
long long SetAdjustFilesize(void)
void SetBufferSizeFactors(bool estbitrate, bool matroska)
Tells RingBuffer that the raw bitrate may be innacurate and the underlying container is matroska,...
Definition: ringbuffer.cpp:351
int ReadPriv(void *buf, int count, bool peek)
When possible reads from the read-ahead buffer, otherwise reads directly from the device.
#define BUFFER_FACTOR_NETWORK
bool IsDVD(void) const
#define BUFFER_FACTOR_MATROSKA
int Write(const void *data, uint count)
Writes data to the end of the write buffer.
long long WriterSeek(long long pos, int whence, bool has_lock=false)
Calls ThreadedFileWriter::Seek(long long,int).
void ReloadAll(const QStringList &data=QStringList())
QMap< qint64, uint64_t > m_decoderReads
bool PauseAndWait(void)
Definition: ringbuffer.cpp:789
static bool gAVformat_net_initialised
QReadWriteLock m_rbwLock
QString GetAvailableBuffer(void)
QReadWriteLock m_rbrLock
static void AVFormatInitNetwork(void)
void Sync(void)
Calls ThreadedFileWriter::Sync(void)
void KillReadAheadThread(void)
Stops the read-ahead thread, and waits for it to stop.
Definition: ringbuffer.cpp:694
QReadWriteLock m_posLock
int ReadDirect(void *buf, int count, bool peek)
void IgnoreLiveEOF(bool ignore)
Tells RingBuffer whether to ignore the end-of-file.
unsigned char t
Definition: ParseText.cpp:329
virtual ~RingBuffer()=0
Deletes.
Definition: ringbuffer.cpp:246
bool IsOpen(void) const override
Returns true if open for either reading or writing.
Definition: dvdstream.cpp:156
void Flush(void)
Allow DiskLoop() to flush buffer completely ignoring low watermark.
bool isRunning(void) const
Definition: mthread.cpp:274
void SetOldFile(bool is_old)
Tell RingBuffer if this is an old file or not.
static const int kLiveTVOpenTimeout
void run(void) override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.
Definition: ringbuffer.cpp:880
virtual bool IsOpen(void) const =0
Returns true if open for either reading or writing.
void SetWriteBufferMinWriteSize(uint newMinSize=kMinWriteSize)
Sets the minumum number of bytes to write to disk in a single write.
int ReadBufFree(void) const
Returns number of bytes available for reading into buffer.
Definition: ringbuffer.cpp:479
bool IsBD(void) const
int Peek(void *buf, int count)
float clamp(float val, float minimum, float maximum)
Definition: mythmiscutil.h:41
virtual int safe_read(void *data, uint sz)=0
volatile bool m_stopReads
void Start(void)
Starts the read-ahead thread.
Definition: ringbuffer.cpp:653
int elapsed(void) const
Returns milliseconds elapsed since last start() or restart()
Definition: mythtimer.cpp:90
ThreadedFileWriter * m_tfw
RingBufferType m_type
void WaitForPause(void)
Waits for Pause(void) to take effect.
Definition: ringbuffer.cpp:770
void StopReads(void)
????
Definition: ringbuffer.cpp:711
static bool Exists(const QString &url, struct stat *fileinfo)
Definition: remotefile.cpp:461
long long GetWritePosition(void) const
Returns how far into a ThreadedFileWriter file we have written.
#define LOG(_MASK_, _LEVEL_, _STRING_)
Definition: mythlogging.h:41
static QStringList s_subExt
void Unpause(void)
Unpauses the read-ahead thread.
Definition: ringbuffer.cpp:747
#define assert(x)
QWaitCondition m_generalWait
Condition to signal that the read ahead thread is running.
QMutex * avcodeclock
This global variable is used to makes certain calls to avlib threadsafe.
bool WaitForReadsAllowed(void)
QString GetSubtitleFilename(void) const
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:202
long long GetRealFileSize(void) const
Returns the size of the file we are reading/writing, or -1 if the query fails.
Definition: ringbuffer.cpp:497
static QMutex s_subExtLock
RemoteFile * m_remotefile
void Reset(bool full=false, bool toAdjust=false, bool resetInternal=false)
Resets the read-ahead thread and our position in the file.
Definition: ringbuffer.cpp:265
int Read(void *buf, int count)
This is the public method for reading from a file, it calls the appropriate read method if the file i...
void Sync(void)
Flush data written to the file descriptor to disk.
bool SetReadInternalMode(bool mode)
Definition: ringbuffer.cpp:558
bool IsNearEnd(double fps, uint vvf) const
Definition: ringbuffer.cpp:437
static ImageType inspectImage(const QString &path)
Definition: mythcdrom.cpp:179
int Write(const void *buf, uint count)
Writes buffer to ThreadedFileWriter::Write(const void*,uint)
void SetWriteBufferMinWriteSize(int newMinSize)
Calls ThreadedFileWriter::SetWriteBufferMinWriteSize(int)
void ResetReadAhead(long long newinternal)
Restart the read-ahead thread at the 'newinternal' position.
Definition: ringbuffer.cpp:610
QMap< qint64, uint64_t > m_storageReads
Implements a file/stream reader/writer.
long long Seek(long long pos, int whence, long long curpos=-1)
Definition: remotefile.cpp:759
static QStringList s_subExtNoCheck
bool IsRegisteredFileForWrite(const QString &file)
void SwitchToNext(bool up)
Sets the recording to switch to.
void UpdateRawBitrate(uint raw_bitrate)
Set the raw bit rate, to allow RingBuffer adjust effective bitrate.
Definition: ringbuffer.cpp:307
long long Seek(long long pos, int whence, bool has_lock=false)
Seeks to a particular position in the file.
Definition: ringbuffer.cpp:510
const BDRingBuffer * BD(void) const
int Write(const void *data, int size)
Definition: remotefile.cpp:834
void UpdatePlaySpeed(float play_speed)
Set the play speed, to allow RingBuffer adjust effective bitrate.
Definition: ringbuffer.cpp:338
void start(void)
starts measuring elapsed time.
Definition: mythtimer.cpp:47
Keeps track of recordings in a current LiveTV instance.
Definition: livetvchain.h:31
#define CHUNK
virtual long long GetRealFileSizeInternal(void) const
int WaitForAvail(int count, int timeout)
bool SetBlocking(bool block=true)
Set write blocking mode While in blocking mode, ThreadedFileWriter::Write will wait for buffers to be...
Stream content from a DVD image file.
Definition: dvdstream.h:20
void CreateReadAheadBuffer(void)
Definition: ringbuffer.cpp:832
bool isPaused(void) const
Returns false iff read-ahead is not running and read-ahead is not paused.
Definition: ringbuffer.cpp:759
RingBuffer(RingBufferType rbtype)
Definition: ringbuffer.cpp:212
int ReadBufAvail(void) const
Returns number of bytes available for reading from buffer.
Definition: ringbuffer.cpp:589
volatile bool m_recentSeek