MythTV  master
DeviceReadBuffer.cpp
Go to the documentation of this file.
1 #include <algorithm>
2 using namespace std;
3 
4 #include "DeviceReadBuffer.h"
5 #include "mythcorecontext.h"
6 #include "mythbaseutil.h"
7 #include "mythlogging.h"
8 #include "tspacket.h"
9 #include "mthread.h"
10 #include "compat.h"
11 
12 #ifndef _WIN32
13 #include <sys/poll.h>
14 #endif
15 
17 #define REPORT_RING_STATS 0
18 
19 #define LOC QString("DevRdB(%1): ").arg(m_videoDevice)
20 
22  DeviceReaderCB *cb, bool use_poll, bool error_exit_on_poll_timeout)
23  : MThread("DeviceReadBuffer"),
24  m_readerCB(cb),
25  m_usingPoll(use_poll),
26  m_pollTimeoutIsError(error_exit_on_poll_timeout)
27 {
28 #ifdef USING_MINGW
29 #warning mingw DeviceReadBuffer::Poll
30  if (m_usingPoll)
31  {
32  LOG(VB_GENERAL, LOG_WARNING, LOC +
33  "mingw DeviceReadBuffer::Poll is not implemented");
34  m_usingPoll = false;
35  }
36 #endif
37 }
38 
40 {
41  Stop();
42  if (m_buffer)
43  {
44  delete[] m_buffer;
45  m_buffer = nullptr;
46  }
47 }
48 
49 bool DeviceReadBuffer::Setup(const QString &streamName, int streamfd,
50  uint readQuanta, uint deviceBufferSize,
51  uint deviceBufferCount)
52 {
53  QMutexLocker locker(&m_lock);
54 
55  delete[] m_buffer;
56 
57  m_videoDevice = streamName;
58  m_videoDevice = m_videoDevice.isNull() ? "" : m_videoDevice;
59  m_streamFd = streamfd;
60 
61  // Setup device ringbuffer
62  m_eof = false;
63  m_error = false;
64  m_requestPause = false;
65  m_paused = false;
66 
67  m_readQuanta = (readQuanta) ? readQuanta : m_readQuanta;
68  m_devBufferCount = deviceBufferCount;
70  "HDRingbufferSize", static_cast<int>(50 * m_readQuanta)) * 1024;
71  m_used = 0;
72  m_devReadSize = m_readQuanta * (m_usingPoll ? 256 : 48);
73  m_devReadSize = (deviceBufferSize) ?
74  min(m_devReadSize, (size_t)deviceBufferSize) : m_devReadSize;
76 
77  m_buffer = new (nothrow) unsigned char[m_size + m_devReadSize];
80 
81  // Initialize buffer, if it exists
82  if (!m_buffer)
83  {
84  m_endPtr = nullptr;
85  LOG(VB_GENERAL, LOG_ERR, LOC +
86  QString("Failed to allocate buffer of size %1 = %2 + %3")
87  .arg(m_size+m_devReadSize).arg(m_size).arg(m_devReadSize));
88  return false;
89  }
91  memset(m_buffer, 0xFF, m_size + m_readQuanta);
92 
93  // Initialize statistics
94  m_maxUsed = 0;
95  m_avgUsed = 0;
96  m_avgBufWriteCnt = 0;
97  m_avgBufReadCnt = 0;
98  m_avgBufSleepCnt = 0;
100 
101  LOG(VB_RECORD, LOG_INFO, LOC + QString("buffer size %1 KB").arg(m_size/1024));
102 
103  return true;
104 }
105 
107 {
108  LOG(VB_RECORD, LOG_INFO, LOC + "Start() -- begin");
109 
110  QMutexLocker locker(&m_lock);
111  if (isRunning() || m_doRun)
112  {
113  m_doRun = false;
114  locker.unlock();
115  WakePoll();
116  wait();
117  locker.relock();
118  }
119 
120  m_doRun = true;
121  m_error = false;
122  m_eof = false;
123 
124  start();
125 
126  LOG(VB_RECORD, LOG_INFO, LOC + "Start() -- middle");
127 
128  while (m_doRun && !isRunning())
129  m_runWait.wait(locker.mutex(), 100);
130 
131  LOG(VB_RECORD, LOG_INFO, LOC + "Start() -- end");
132 }
133 
134 void DeviceReadBuffer::Reset(const QString &streamName, int streamfd)
135 {
136  QMutexLocker locker(&m_lock);
137 
138  m_videoDevice = streamName;
139  m_videoDevice = m_videoDevice.isNull() ? "" : m_videoDevice;
140  m_streamFd = streamfd;
141 
142  m_used = 0;
145 
146  m_error = false;
147 }
148 
150 {
151  LOG(VB_RECORD, LOG_INFO, LOC + "Stop() -- begin");
152  QMutexLocker locker(&m_lock);
153  if (isRunning() || m_doRun)
154  {
155  m_doRun = false;
156  locker.unlock();
157  WakePoll();
158  wait();
159  }
160  LOG(VB_RECORD, LOG_INFO, LOC + "Stop() -- end");
161 }
162 
164 {
165  QMutexLocker locker(&m_lock);
166  m_requestPause = req;
167  WakePoll();
168 }
169 
171 {
172  QMutexLocker locker(&m_lock);
173  m_paused = val;
174  if (val)
175  m_pauseWait.wakeAll();
176  else
177  m_unpauseWait.wakeAll();
178 }
179 
180 // The WakePoll code is copied from MythSocketThread::WakeReadyReadThread()
182 {
183  char buf[1];
184  buf[0] = '0';
185  ssize_t wret = 0;
186  while (isRunning() && (wret <= 0) && (m_wakePipe[1] >= 0))
187  {
188  wret = ::write(m_wakePipe[1], &buf, 1);
189  if ((wret < 0) && (EAGAIN != errno) && (EINTR != errno))
190  {
191  LOG(VB_GENERAL, LOG_ERR, LOC + "WakePoll failed.");
192  ClosePipes();
193  break;
194  }
195  }
196 }
197 
199 {
200  for (uint i = 0; i < 2; i++)
201  {
202  if (m_wakePipe[i] >= 0)
203  {
204  ::close(m_wakePipe[i]);
205  m_wakePipe[i] = -1;
206  m_wakePipeFlags[i] = 0;
207  }
208  }
209 }
210 
212 {
213  QMutexLocker locker(&m_lock);
214  return m_paused;
215 }
216 
218 {
219  QMutexLocker locker(&m_lock);
220 
221  if (!m_paused)
222  m_pauseWait.wait(&m_lock, timeout);
223 
224  return m_paused;
225 }
226 
228 {
229  QMutexLocker locker(&m_lock);
230 
231  if (m_paused)
232  m_unpauseWait.wait(&m_lock, timeout);
233 
234  return m_paused;
235 }
236 
238 {
239  QMutexLocker locker(&m_lock);
240  return m_requestPause;
241 }
242 
244 {
245  QMutexLocker locker(&m_lock);
246  return m_error;
247 }
248 
249 bool DeviceReadBuffer::IsEOF(void) const
250 {
251  QMutexLocker locker(&m_lock);
252  return m_eof;
253 }
254 
256 {
257  QMutexLocker locker(&m_lock);
258  return isRunning();
259 }
260 
262 {
263  QMutexLocker locker(&m_lock);
264  return m_size - m_used;
265 }
266 
268 {
269  QMutexLocker locker(&m_lock);
270  return m_used;
271 }
272 
274 {
275  QMutexLocker locker(&m_lock);
276  return m_endPtr - m_writePtr;
277 }
278 
280 {
281  QMutexLocker locker(&m_lock);
282  m_used += len;
283  m_writePtr += len;
285 #if REPORT_RING_STATS
286  m_maxUsed = max(m_used, m_maxUsed);
289 #endif
290  m_dataWait.wakeAll();
291 }
292 
294 {
295  QMutexLocker locker(&m_lock);
296  m_used -= len;
297  m_readPtr += len;
299 #if REPORT_RING_STATS
300  ++m_avgBufReadCnt;
301 #endif
302 }
303 
305 {
306  RunProlog();
307 
308  uint errcnt = 0;
309  uint cnt = 0;
310  ssize_t len = 0;
311  size_t total = 0;
312  size_t throttle = m_devReadSize * m_devBufferCount / 2;
313 
314  m_lock.lock();
315  m_runWait.wakeAll();
316  m_lock.unlock();
317 
318  if (m_usingPoll)
320 
321  while (m_doRun)
322  {
323  if (!HandlePausing())
324  continue;
325 
326  if (!IsOpen())
327  {
328  usleep(5000);
329  continue;
330  }
331 
332  if (m_usingPoll && !Poll())
333  continue;
334 
335  {
336  QMutexLocker locker(&m_lock);
337  if (m_error)
338  {
339  LOG(VB_RECORD, LOG_ERR, LOC + "fill_ringbuffer: error state");
340  break;
341  }
342  }
343 
344  /* Some device drivers segment their buffer into small pieces,
345  * So allow for the reading of multiple buffers */
346  for (cnt = 0, len = 0, total = 0;
347  m_doRun && len >= 0 && cnt < m_devBufferCount; ++cnt)
348  {
349  // Limit read size for faster return from read
350  auto unused = static_cast<size_t>(WaitForUnused(m_readQuanta));
351  size_t read_size = min(m_devReadSize, unused);
352 
353  // if read_size > 0 do the read...
354  if (read_size)
355  {
356  len = read(m_streamFd, m_writePtr, read_size);
357  if (!CheckForErrors(len, read_size, errcnt))
358  break;
359  errcnt = 0;
360 
361  // if we wrote past the official end of the buffer,
362  // copy to start
363  if (m_writePtr + len > m_endPtr)
364  memcpy(m_buffer, m_endPtr, m_writePtr + len - m_endPtr);
365  IncrWritePointer(len);
366  total += len;
367  }
368  }
369  if (errcnt > 5)
370  break;
371 
372  // Slow down reading if not under load
373  if (errcnt == 0 && total < throttle)
374  usleep(1000);
375  }
376 
377  ClosePipes();
378 
379  m_lock.lock();
380  m_eof = true;
381  m_runWait.wakeAll();
382  m_dataWait.wakeAll();
383  m_pauseWait.wakeAll();
384  m_unpauseWait.wakeAll();
385  m_lock.unlock();
386 
387  RunEpilog();
388 }
389 
391 {
392  if (IsPauseRequested())
393  {
394  SetPaused(true);
395 
396  if (m_readerCB)
398 
399  usleep(5000);
400  return false;
401  }
402  if (IsPaused())
403  {
405  SetPaused(false);
406  }
407  return true;
408 }
409 
410 bool DeviceReadBuffer::Poll(void) const
411 {
412 #ifdef _WIN32
413 # ifdef _MSC_VER
414 # pragma message( "mingw DeviceReadBuffer::Poll" )
415 # else
416 # warning mingw DeviceReadBuffer::Poll
417 # endif
418  LOG(VB_GENERAL, LOG_ERR, LOC +
419  "mingw DeviceReadBuffer::Poll is not implemented");
420  return false;
421 #else
422  bool retval = true;
423  MythTimer timer;
424  timer.start();
425 
426  int poll_cnt = 1;
427  struct pollfd polls[2];
428  memset(polls, 0, sizeof(polls));
429 
430  polls[0].fd = m_streamFd;
431  polls[0].events = POLLIN | POLLPRI;
432  polls[0].revents = 0;
433 
434  if (m_wakePipe[0] >= 0)
435  {
436  poll_cnt = 2;
437  polls[1].fd = m_wakePipe[0];
438  polls[1].events = POLLIN;
439  polls[1].revents = 0;
440  }
441 
442  while (true)
443  {
444  polls[0].revents = 0;
445  polls[1].revents = 0;
446  poll_cnt = (m_wakePipe[0] >= 0) ? poll_cnt : 1;
447 
448  int timeout = m_maxPollWait;
449  if (1 == poll_cnt)
450  timeout = 10;
451  else if (m_pollTimeoutIsError)
452  // subtract a bit to allow processing time.
453  timeout = max((int)m_maxPollWait - timer.elapsed() - 15, 10);
454 
455  int ret = poll(polls, poll_cnt, timeout);
456 
457  if (polls[0].revents & POLLHUP)
458  {
459  LOG(VB_GENERAL, LOG_ERR, LOC + "poll eof (POLLHUP)");
460  break;
461  }
462  if (polls[0].revents & POLLNVAL)
463  {
464  LOG(VB_GENERAL, LOG_ERR, LOC + "poll error" + ENO);
465  m_error = true;
466  return true;
467  }
468 
469  if (!m_doRun || !IsOpen() || IsPauseRequested())
470  {
471  retval = false;
472  break; // are we supposed to pause, stop, etc.
473  }
474 
475  if (polls[0].revents & POLLPRI)
476  {
477  m_readerCB->PriorityEvent(polls[0].fd);
478  }
479 
480  if (polls[0].revents & POLLIN)
481  {
482  if (ret > 0)
483  break; // we have data to read :)
484  if (ret < 0)
485  {
486  if ((EOVERFLOW == errno))
487  break; // we have an error to handle
488 
489  if ((EAGAIN == errno) || (EINTR == errno))
490  continue; // errors that tell you to try again
491 
492  usleep(2500 /*2.5 ms*/);
493  }
494  else // ret == 0
495  {
496  if (m_pollTimeoutIsError &&
497  (timer.elapsed() >= (int)m_maxPollWait))
498  {
499  LOG(VB_GENERAL, LOG_ERR, LOC + "Poll giving up 1");
500  QMutexLocker locker(&m_lock);
501  m_error = true;
502  return true;
503  }
504  }
505  }
506 
507  // Clear out any pending pipe reads
508  if ((poll_cnt > 1) && (polls[1].revents & POLLIN))
509  {
510  char dummy[128];
511  int cnt = (m_wakePipeFlags[0] & O_NONBLOCK) ? 128 : 1;
512  ::read(m_wakePipe[0], dummy, cnt);
513  }
514 
515  if (m_pollTimeoutIsError && (timer.elapsed() >= (int)m_maxPollWait))
516  {
517  LOG(VB_GENERAL, LOG_ERR, LOC + QString("Poll giving up after %1ms")
518  .arg(m_maxPollWait));
519  QMutexLocker locker(&m_lock);
520  m_error = true;
521  return true;
522  }
523  }
524 
525  int e = timer.elapsed();
526  if (e > (int)m_maxPollWait)
527  {
528  LOG(VB_GENERAL, LOG_WARNING, LOC +
529  QString("Poll took an unusually long time %1 ms")
530  .arg(timer.elapsed()));
531  }
532 
533  return retval;
534 #endif
535 }
536 
538  ssize_t len, size_t requested_len, uint &errcnt)
539 {
540  if (len > (ssize_t)requested_len)
541  {
542  LOG(VB_GENERAL, LOG_ERR, LOC +
543  "Driver is returning bogus values on read");
544  if (++errcnt > 5)
545  {
546  LOG(VB_RECORD, LOG_ERR, LOC + "Too many errors.");
547  QMutexLocker locker(&m_lock);
548  m_error = true;
549  }
550  return false;
551  }
552 
553 #ifdef _WIN32
554 # ifdef _MSC_VER
555 # pragma message( "mingw DeviceReadBuffer::CheckForErrors" )
556 # else
557 # warning mingw DeviceReadBuffer::CheckForErrors
558 # endif
559  LOG(VB_GENERAL, LOG_ERR, LOC +
560  "mingw DeviceReadBuffer::CheckForErrors is not implemented");
561  return false;
562 #else
563  if (len < 0)
564  {
565  if (EINTR == errno)
566  return false;
567  if (EAGAIN == errno)
568  {
569  usleep(2500);
570  return false;
571  }
572  if (EOVERFLOW == errno)
573  {
574  LOG(VB_GENERAL, LOG_ERR, LOC + "Driver buffers overflowed");
575  return false;
576  }
577 
578  LOG(VB_GENERAL, LOG_ERR, LOC +
579  QString("Problem reading fd(%1)").arg(m_streamFd) + ENO);
580 
581  if (++errcnt > 5)
582  {
583  LOG(VB_RECORD, LOG_ERR, LOC + "Too many errors.");
584  QMutexLocker locker(&m_lock);
585  m_error = true;
586  return false;
587  }
588 
589  usleep(500);
590  return false;
591  }
592  if (len == 0)
593  {
594  if (++errcnt > 5)
595  {
596  LOG(VB_GENERAL, LOG_ERR, LOC +
597  QString("End-Of-File? fd(%1)").arg(m_streamFd));
598 
599  m_lock.lock();
600  m_eof = true;
601  m_lock.unlock();
602 
603  return false;
604  }
605  usleep(500);
606  return false;
607  }
608  return true;
609 #endif
610 }
611 
618 uint DeviceReadBuffer::Read(unsigned char *buf, const uint count)
619 {
620  uint avail = WaitForUsed(min(count, (uint)m_readThreshold), 20);
621  size_t cnt = min(count, avail);
622 
623  if (!cnt)
624  return 0;
625 
626  if (m_readPtr + cnt > m_endPtr)
627  {
628  // Process as two pieces
629  size_t len = m_endPtr - m_readPtr;
630  if (len)
631  {
632  memcpy(buf, m_readPtr, len);
633  buf += len;
634  IncrReadPointer(len);
635  }
636  if (cnt > len)
637  {
638  len = cnt - len;
639  memcpy(buf, m_readPtr, len);
640  IncrReadPointer(len);
641  }
642  }
643  else
644  {
645  memcpy(buf, m_readPtr, cnt);
646  IncrReadPointer(cnt);
647  }
648 
649 #if REPORT_RING_STATS
650  ReportStats();
651 #endif
652 
653  return cnt;
654 }
655 
661 {
662  size_t unused = GetUnused();
663 
664  if (unused > m_readQuanta)
665  {
666  while (unused < needed)
667  {
668  unused = GetUnused();
669  if (IsPauseRequested() || !IsOpen() || !m_doRun)
670  return 0;
671  usleep(5000);
672  }
673  if (IsPauseRequested() || !IsOpen() || !m_doRun)
674  return 0;
675  unused = GetUnused();
676  }
677 
678  return unused;
679 }
680 
687 {
688  MythTimer timer;
689  timer.start();
690 
691  QMutexLocker locker(&m_lock);
692  size_t avail = m_used;
693  while ((needed > avail) && isRunning() &&
694  !m_requestPause && !m_error && !m_eof &&
695  (timer.elapsed() < (int)max_wait))
696  {
697  m_dataWait.wait(locker.mutex(), 10);
698  avail = m_used;
699  }
700  return avail;
701 }
702 
704 {
705 #if REPORT_RING_STATS
706  static const int secs = 20;
707  static const double d1_s = 1.0 / secs;
708  if (m_lastReport.elapsed() > secs * 1000 /* msg every 20 seconds */)
709  {
710  QMutexLocker locker(&m_lock);
711  double rsize = 100.0 / m_size;
712  QString msg = QString("fill avg(%1%) ").arg(m_avgUsed*rsize,5,'f',2);
713  msg += QString("fill max(%1%) ").arg(m_maxUsed*rsize,5,'f',2);
714  msg += QString("writes/sec(%1) ").arg(m_avgBufWriteCnt*d1_s);
715  msg += QString("reads/sec(%1) ").arg(m_avgBufReadCnt*d1_s);
716  msg += QString("sleeps/sec(%1)").arg(m_avgBufSleepCnt*d1_s);
717 
718  m_avgUsed = 0;
719  m_avgBufWriteCnt = 0;
720  m_avgBufReadCnt = 0;
721  m_avgBufSleepCnt = 0;
722  m_maxUsed = 0;
724 
725  LOG(VB_GENERAL, LOG_INFO, LOC + msg);
726  }
727 #endif
728 }
729 
730 /*
731  * vim:ts=4:sw=4:ai:et:si:sts=4
732  */
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:215
QWaitCondition m_unpauseWait
DeviceReadBuffer(DeviceReaderCB *cb, bool use_poll=true, bool error_exit_on_poll_timeout=true)
def write(text, progress=True)
Definition: mythburn.py:308
bool IsRunning(void) const
This is a wrapper around QThread that does several additional things.
Definition: mthread.h:46
QWaitCondition m_dataWait
A QElapsedTimer based timer to replace use of QTime as a timer.
Definition: mythtimer.h:13
~DeviceReadBuffer() override
#define O_NONBLOCK
Definition: mythmedia.cpp:25
bool IsEOF(void) const
volatile bool m_doRun
void ClosePipes(void) const
uint GetContiguousUnused(void) const
void run(void) override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.
bool IsOpen(void) const
bool wait(unsigned long time=ULONG_MAX)
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:311
void WakePoll(void) const
MythCoreContext * gCoreContext
This global variable contains the MythCoreContext instance for the app.
unsigned char * m_buffer
void IncrReadPointer(uint len)
static void setup_pipe(int[2], long[2])
Definition: mythbaseutil.h:16
unsigned char * m_writePtr
bool IsErrored(void) const
def read(device=None, features=[])
Definition: disc.py:35
bool IsPaused(void) const
uint Read(unsigned char *buf, uint count)
Try to Read count bytes from into buffer.
#define LOC
#define close
Definition: compat.h:16
virtual void ReaderPaused(int fd)=0
uint WaitForUsed(uint needed, uint max_wait) const
bool WaitForPaused(unsigned long timeout)
void SetRequestPause(bool request)
bool isRunning(void) const
Definition: mthread.cpp:274
uint WaitForUnused(uint needed) const
virtual void PriorityEvent(int fd)=0
bool WaitForUnpause(unsigned long timeout)
unsigned char * m_endPtr
unsigned int uint
Definition: compat.h:140
DeviceReaderCB * m_readerCB
bool Poll(void) const
#define ENO
This can be appended to the LOG args with "+".
Definition: mythlogging.h:109
bool CheckForErrors(ssize_t read_len, size_t requested_len, uint &errcnt)
void IncrWritePointer(uint len)
QWaitCondition m_runWait
int GetNumSetting(const QString &key, int defaultval=0)
void Reset(const QString &streamName, int streamfd)
bool Setup(const QString &streamName, int streamfd, uint readQuanta=sizeof(TSPacket), uint deviceBufferSize=0, uint deviceBufferCount=1)
int elapsed(void)
Returns milliseconds elapsed since last start() or restart()
Definition: mythtimer.cpp:90
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:202
void start(QThread::Priority p=QThread::InheritPriority)
Tell MThread to start running the thread in the near future.
Definition: mthread.cpp:294
uint GetUsed(void) const
uint GetUnused(void) const
static void usleep(unsigned long time)
Definition: mthread.cpp:348
QWaitCondition m_pauseWait
void SetPaused(bool val)
unsigned char * m_readPtr
bool IsPauseRequested(void) const
void start(void)
starts measuring elapsed time.
Definition: mythtimer.cpp:47
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
Definition: mythlogging.h:41