MythTV  master
DeviceReadBuffer.cpp
Go to the documentation of this file.
1 #include <algorithm>
2 using namespace std;
3 
4 #include "DeviceReadBuffer.h"
5 #include "mythcorecontext.h"
6 #include "mythbaseutil.h"
7 #include "mythlogging.h"
8 #include "tspacket.h"
9 #include "mthread.h"
10 #include "compat.h"
11 
12 #ifndef _WIN32
13 #include <sys/poll.h>
14 #endif
15 
17 #define REPORT_RING_STATS 0
18 
19 #define LOC QString("DevRdB(%1): ").arg(m_videodevice)
20 
22  DeviceReaderCB *cb, bool use_poll, bool error_exit_on_poll_timeout)
23  : MThread("DeviceReadBuffer"),
24  m_readerCB(cb),
25  m_using_poll(use_poll),
26  m_poll_timeout_is_error(error_exit_on_poll_timeout)
27 {
28  for (int i = 0; i < 2; i++)
29  {
30  m_wake_pipe[i] = -1;
31  m_wake_pipe_flags[i] = 0;
32  }
33 
34 #ifdef USING_MINGW
35 #warning mingw DeviceReadBuffer::Poll
36  if (m_using_poll)
37  {
38  LOG(VB_GENERAL, LOG_WARNING, LOC +
39  "mingw DeviceReadBuffer::Poll is not implemented");
40  m_using_poll = false;
41  }
42 #endif
43 }
44 
46 {
47  Stop();
48  if (m_buffer)
49  {
50  delete[] m_buffer;
51  m_buffer = nullptr;
52  }
53 }
54 
55 bool DeviceReadBuffer::Setup(const QString &streamName, int streamfd,
56  uint readQuanta, uint deviceBufferSize,
57  uint deviceBufferCount)
58 {
59  QMutexLocker locker(&m_lock);
60 
61  delete[] m_buffer;
62 
63  m_videodevice = streamName;
64  m_videodevice = m_videodevice.isNull() ? "" : m_videodevice;
65  m_stream_fd = streamfd;
66 
67  // Setup device ringbuffer
68  m_eof = false;
69  m_error = false;
70  m_request_pause = false;
71  m_paused = false;
72 
73  m_read_quanta = (readQuanta) ? readQuanta : m_read_quanta;
74  m_dev_buffer_count = deviceBufferCount;
76  "HDRingbufferSize", static_cast<int>(50 * m_read_quanta)) * 1024;
77  m_used = 0;
79  m_dev_read_size = (deviceBufferSize) ?
80  min(m_dev_read_size, (size_t)deviceBufferSize) : m_dev_read_size;
82 
83  m_buffer = new (nothrow) unsigned char[m_size + m_dev_read_size];
86 
87  // Initialize buffer, if it exists
88  if (!m_buffer)
89  {
90  m_endPtr = nullptr;
91  LOG(VB_GENERAL, LOG_ERR, LOC +
92  QString("Failed to allocate buffer of size %1 = %2 + %3")
94  return false;
95  }
97  memset(m_buffer, 0xFF, m_size + m_read_quanta);
98 
99  // Initialize statistics
100  m_max_used = 0;
101  m_avg_used = 0;
103  m_avg_buf_read_cnt = 0;
106 
107  LOG(VB_RECORD, LOG_INFO, LOC + QString("buffer size %1 KB").arg(m_size/1024));
108 
109  return true;
110 }
111 
113 {
114  LOG(VB_RECORD, LOG_INFO, LOC + "Start() -- begin");
115 
116  QMutexLocker locker(&m_lock);
117  if (isRunning() || m_dorun)
118  {
119  m_dorun = false;
120  locker.unlock();
121  WakePoll();
122  wait();
123  locker.relock();
124  }
125 
126  m_dorun = true;
127  m_error = false;
128  m_eof = false;
129 
130  start();
131 
132  LOG(VB_RECORD, LOG_INFO, LOC + "Start() -- middle");
133 
134  while (m_dorun && !isRunning())
135  m_runWait.wait(locker.mutex(), 100);
136 
137  LOG(VB_RECORD, LOG_INFO, LOC + "Start() -- end");
138 }
139 
140 void DeviceReadBuffer::Reset(const QString &streamName, int streamfd)
141 {
142  QMutexLocker locker(&m_lock);
143 
144  m_videodevice = streamName;
145  m_videodevice = m_videodevice.isNull() ? "" : m_videodevice;
146  m_stream_fd = streamfd;
147 
148  m_used = 0;
151 
152  m_error = false;
153 }
154 
156 {
157  LOG(VB_RECORD, LOG_INFO, LOC + "Stop() -- begin");
158  QMutexLocker locker(&m_lock);
159  if (isRunning() || m_dorun)
160  {
161  m_dorun = false;
162  locker.unlock();
163  WakePoll();
164  wait();
165  }
166  LOG(VB_RECORD, LOG_INFO, LOC + "Stop() -- end");
167 }
168 
170 {
171  QMutexLocker locker(&m_lock);
172  m_request_pause = req;
173  WakePoll();
174 }
175 
177 {
178  QMutexLocker locker(&m_lock);
179  m_paused = val;
180  if (val)
181  m_pauseWait.wakeAll();
182  else
183  m_unpauseWait.wakeAll();
184 }
185 
186 // The WakePoll code is copied from MythSocketThread::WakeReadyReadThread()
188 {
189  char buf[1];
190  buf[0] = '0';
191  ssize_t wret = 0;
192  while (isRunning() && (wret <= 0) && (m_wake_pipe[1] >= 0))
193  {
194  wret = ::write(m_wake_pipe[1], &buf, 1);
195  if ((wret < 0) && (EAGAIN != errno) && (EINTR != errno))
196  {
197  LOG(VB_GENERAL, LOG_ERR, LOC + "WakePoll failed.");
198  ClosePipes();
199  break;
200  }
201  }
202 }
203 
205 {
206  for (uint i = 0; i < 2; i++)
207  {
208  if (m_wake_pipe[i] >= 0)
209  {
210  ::close(m_wake_pipe[i]);
211  m_wake_pipe[i] = -1;
212  m_wake_pipe_flags[i] = 0;
213  }
214  }
215 }
216 
218 {
219  QMutexLocker locker(&m_lock);
220  return m_paused;
221 }
222 
224 {
225  QMutexLocker locker(&m_lock);
226 
227  if (!m_paused)
228  m_pauseWait.wait(&m_lock, timeout);
229 
230  return m_paused;
231 }
232 
234 {
235  QMutexLocker locker(&m_lock);
236 
237  if (m_paused)
238  m_unpauseWait.wait(&m_lock, timeout);
239 
240  return m_paused;
241 }
242 
244 {
245  QMutexLocker locker(&m_lock);
246  return m_request_pause;
247 }
248 
250 {
251  QMutexLocker locker(&m_lock);
252  return m_error;
253 }
254 
255 bool DeviceReadBuffer::IsEOF(void) const
256 {
257  QMutexLocker locker(&m_lock);
258  return m_eof;
259 }
260 
262 {
263  QMutexLocker locker(&m_lock);
264  return isRunning();
265 }
266 
268 {
269  QMutexLocker locker(&m_lock);
270  return m_size - m_used;
271 }
272 
274 {
275  QMutexLocker locker(&m_lock);
276  return m_used;
277 }
278 
280 {
281  QMutexLocker locker(&m_lock);
282  return m_endPtr - m_writePtr;
283 }
284 
286 {
287  QMutexLocker locker(&m_lock);
288  m_used += len;
289  m_writePtr += len;
291 #if REPORT_RING_STATS
292  m_max_used = max(m_used, m_max_used);
295 #endif
296  m_dataWait.wakeAll();
297 }
298 
300 {
301  QMutexLocker locker(&m_lock);
302  m_used -= len;
303  m_readPtr += len;
305 #if REPORT_RING_STATS
307 #endif
308 }
309 
311 {
312  RunProlog();
313 
314  uint errcnt = 0;
315  uint cnt;
316  ssize_t len;
317  size_t read_size;
318  size_t unused;
319  size_t total;
320  size_t throttle = m_dev_read_size * m_dev_buffer_count / 2;
321 
322  m_lock.lock();
323  m_runWait.wakeAll();
324  m_lock.unlock();
325 
326  if (m_using_poll)
328 
329  while (m_dorun)
330  {
331  if (!HandlePausing())
332  continue;
333 
334  if (!IsOpen())
335  {
336  usleep(5000);
337  continue;
338  }
339 
340  if (m_using_poll && !Poll())
341  continue;
342 
343  {
344  QMutexLocker locker(&m_lock);
345  if (m_error)
346  {
347  LOG(VB_RECORD, LOG_ERR, LOC + "fill_ringbuffer: error state");
348  break;
349  }
350  }
351 
352  /* Some device drivers segment their buffer into small pieces,
353  * So allow for the reading of multiple buffers */
354  for (cnt = 0, len = 0, total = 0;
355  m_dorun && len >= 0 && cnt < m_dev_buffer_count; ++cnt)
356  {
357  // Limit read size for faster return from read
358  unused = static_cast<size_t>(WaitForUnused(m_read_quanta));
359  read_size = min(m_dev_read_size, unused);
360 
361  // if read_size > 0 do the read...
362  if (read_size)
363  {
364  len = read(m_stream_fd, m_writePtr, read_size);
365  if (!CheckForErrors(len, read_size, errcnt))
366  break;
367  errcnt = 0;
368 
369  // if we wrote past the official end of the buffer,
370  // copy to start
371  if (m_writePtr + len > m_endPtr)
372  memcpy(m_buffer, m_endPtr, m_writePtr + len - m_endPtr);
373  IncrWritePointer(len);
374  total += len;
375  }
376  }
377  if (errcnt > 5)
378  break;
379 
380  // Slow down reading if not under load
381  if (errcnt == 0 && total < throttle)
382  usleep(1000);
383  }
384 
385  ClosePipes();
386 
387  m_lock.lock();
388  m_eof = true;
389  m_runWait.wakeAll();
390  m_dataWait.wakeAll();
391  m_pauseWait.wakeAll();
392  m_unpauseWait.wakeAll();
393  m_lock.unlock();
394 
395  RunEpilog();
396 }
397 
399 {
400  if (IsPauseRequested())
401  {
402  SetPaused(true);
403 
404  if (m_readerCB)
406 
407  usleep(5000);
408  return false;
409  }
410  if (IsPaused())
411  {
413  SetPaused(false);
414  }
415  return true;
416 }
417 
418 bool DeviceReadBuffer::Poll(void) const
419 {
420 #ifdef _WIN32
421 # ifdef _MSC_VER
422 # pragma message( "mingw DeviceReadBuffer::Poll" )
423 # else
424 # warning mingw DeviceReadBuffer::Poll
425 # endif
426  LOG(VB_GENERAL, LOG_ERR, LOC +
427  "mingw DeviceReadBuffer::Poll is not implemented");
428  return false;
429 #else
430  bool retval = true;
431  MythTimer timer;
432  timer.start();
433 
434  int poll_cnt = 1;
435  struct pollfd polls[2];
436  memset(polls, 0, sizeof(polls));
437 
438  polls[0].fd = m_stream_fd;
439  polls[0].events = POLLIN | POLLPRI;
440  polls[0].revents = 0;
441 
442  if (m_wake_pipe[0] >= 0)
443  {
444  poll_cnt = 2;
445  polls[1].fd = m_wake_pipe[0];
446  polls[1].events = POLLIN;
447  polls[1].revents = 0;
448  }
449 
450  while (true)
451  {
452  polls[0].revents = 0;
453  polls[1].revents = 0;
454  poll_cnt = (m_wake_pipe[0] >= 0) ? poll_cnt : 1;
455 
456  int timeout = m_max_poll_wait;
457  if (1 == poll_cnt)
458  timeout = 10;
459  else if (m_poll_timeout_is_error)
460  // subtract a bit to allow processing time.
461  timeout = max((int)m_max_poll_wait - timer.elapsed() - 15, 10);
462 
463  int ret = poll(polls, poll_cnt, timeout);
464 
465  if (polls[0].revents & POLLHUP)
466  {
467  LOG(VB_GENERAL, LOG_ERR, LOC + "poll eof (POLLHUP)");
468  break;
469  }
470  if (polls[0].revents & POLLNVAL)
471  {
472  LOG(VB_GENERAL, LOG_ERR, LOC + "poll error" + ENO);
473  m_error = true;
474  return true;
475  }
476 
477  if (!m_dorun || !IsOpen() || IsPauseRequested())
478  {
479  retval = false;
480  break; // are we supposed to pause, stop, etc.
481  }
482 
483  if (polls[0].revents & POLLPRI)
484  {
485  m_readerCB->PriorityEvent(polls[0].fd);
486  }
487 
488  if (polls[0].revents & POLLIN)
489  {
490  if (ret > 0)
491  break; // we have data to read :)
492  if (ret < 0)
493  {
494  if ((EOVERFLOW == errno))
495  break; // we have an error to handle
496 
497  if ((EAGAIN == errno) || (EINTR == errno))
498  continue; // errors that tell you to try again
499 
500  usleep(2500 /*2.5 ms*/);
501  }
502  else // ret == 0
503  {
505  (timer.elapsed() >= (int)m_max_poll_wait))
506  {
507  LOG(VB_GENERAL, LOG_ERR, LOC + "Poll giving up 1");
508  QMutexLocker locker(&m_lock);
509  m_error = true;
510  return true;
511  }
512  }
513  }
514 
515  // Clear out any pending pipe reads
516  if ((poll_cnt > 1) && (polls[1].revents & POLLIN))
517  {
518  char dummy[128];
519  int cnt = (m_wake_pipe_flags[0] & O_NONBLOCK) ? 128 : 1;
520  ::read(m_wake_pipe[0], dummy, cnt);
521  }
522 
523  if (m_poll_timeout_is_error && (timer.elapsed() >= (int)m_max_poll_wait))
524  {
525  LOG(VB_GENERAL, LOG_ERR, LOC + QString("Poll giving up after %1ms")
526  .arg(m_max_poll_wait));
527  QMutexLocker locker(&m_lock);
528  m_error = true;
529  return true;
530  }
531  }
532 
533  int e = timer.elapsed();
534  if (e > (int)m_max_poll_wait)
535  {
536  LOG(VB_GENERAL, LOG_WARNING, LOC +
537  QString("Poll took an unusually long time %1 ms")
538  .arg(timer.elapsed()));
539  }
540 
541  return retval;
542 #endif
543 }
544 
546  ssize_t len, size_t requested_len, uint &errcnt)
547 {
548  if (len > (ssize_t)requested_len)
549  {
550  LOG(VB_GENERAL, LOG_ERR, LOC +
551  "Driver is returning bogus values on read");
552  if (++errcnt > 5)
553  {
554  LOG(VB_RECORD, LOG_ERR, LOC + "Too many errors.");
555  QMutexLocker locker(&m_lock);
556  m_error = true;
557  }
558  return false;
559  }
560 
561 #ifdef _WIN32
562 # ifdef _MSC_VER
563 # pragma message( "mingw DeviceReadBuffer::CheckForErrors" )
564 # else
565 # warning mingw DeviceReadBuffer::CheckForErrors
566 # endif
567  LOG(VB_GENERAL, LOG_ERR, LOC +
568  "mingw DeviceReadBuffer::CheckForErrors is not implemented");
569  return false;
570 #else
571  if (len < 0)
572  {
573  if (EINTR == errno)
574  return false;
575  if (EAGAIN == errno)
576  {
577  usleep(2500);
578  return false;
579  }
580  if (EOVERFLOW == errno)
581  {
582  LOG(VB_GENERAL, LOG_ERR, LOC + "Driver buffers overflowed");
583  return false;
584  }
585 
586  LOG(VB_GENERAL, LOG_ERR, LOC +
587  QString("Problem reading fd(%1)").arg(m_stream_fd) + ENO);
588 
589  if (++errcnt > 5)
590  {
591  LOG(VB_RECORD, LOG_ERR, LOC + "Too many errors.");
592  QMutexLocker locker(&m_lock);
593  m_error = true;
594  return false;
595  }
596 
597  usleep(500);
598  return false;
599  }
600  if (len == 0)
601  {
602  if (++errcnt > 5)
603  {
604  LOG(VB_GENERAL, LOG_ERR, LOC +
605  QString("End-Of-File? fd(%1)").arg(m_stream_fd));
606 
607  m_lock.lock();
608  m_eof = true;
609  m_lock.unlock();
610 
611  return false;
612  }
613  usleep(500);
614  return false;
615  }
616  return true;
617 #endif
618 }
619 
626 uint DeviceReadBuffer::Read(unsigned char *buf, const uint count)
627 {
628  uint avail = WaitForUsed(min(count, (uint)m_readThreshold), 20);
629  size_t cnt = min(count, avail);
630 
631  if (!cnt)
632  return 0;
633 
634  if (m_readPtr + cnt > m_endPtr)
635  {
636  // Process as two pieces
637  size_t len = m_endPtr - m_readPtr;
638  if (len)
639  {
640  memcpy(buf, m_readPtr, len);
641  buf += len;
642  IncrReadPointer(len);
643  }
644  if (cnt > len)
645  {
646  len = cnt - len;
647  memcpy(buf, m_readPtr, len);
648  IncrReadPointer(len);
649  }
650  }
651  else
652  {
653  memcpy(buf, m_readPtr, cnt);
654  IncrReadPointer(cnt);
655  }
656 
657 #if REPORT_RING_STATS
658  ReportStats();
659 #endif
660 
661  return cnt;
662 }
663 
669 {
670  size_t unused = GetUnused();
671 
672  if (unused > m_read_quanta)
673  {
674  while (unused < needed)
675  {
676  unused = GetUnused();
677  if (IsPauseRequested() || !IsOpen() || !m_dorun)
678  return 0;
679  usleep(5000);
680  }
681  if (IsPauseRequested() || !IsOpen() || !m_dorun)
682  return 0;
683  unused = GetUnused();
684  }
685 
686  return unused;
687 }
688 
695 {
696  MythTimer timer;
697  timer.start();
698 
699  QMutexLocker locker(&m_lock);
700  size_t avail = m_used;
701  while ((needed > avail) && isRunning() &&
702  !m_request_pause && !m_error && !m_eof &&
703  (timer.elapsed() < (int)max_wait))
704  {
705  m_dataWait.wait(locker.mutex(), 10);
706  avail = m_used;
707  }
708  return avail;
709 }
710 
712 {
713 #if REPORT_RING_STATS
714  static const int secs = 20;
715  static const double d1_s = 1.0 / secs;
716  if (m_lastReport.elapsed() > secs * 1000 /* msg every 20 seconds */)
717  {
718  QMutexLocker locker(&m_lock);
719  double rsize = 100.0 / m_size;
720  QString msg = QString("fill avg(%1%) ").arg(m_avg_used*rsize,5,'f',2);
721  msg += QString("fill max(%1%) ").arg(m_max_used*rsize,5,'f',2);
722  msg += QString("writes/sec(%1) ").arg(m_avg_buf_write_cnt*d1_s);
723  msg += QString("reads/sec(%1) ").arg(m_avg_buf_read_cnt*d1_s);
724  msg += QString("sleeps/sec(%1)").arg(m_avg_buf_sleep_cnt*d1_s);
725 
726  m_avg_used = 0;
728  m_avg_buf_read_cnt = 0;
730  m_max_used = 0;
732 
733  LOG(VB_GENERAL, LOG_INFO, LOC + msg);
734  }
735 #endif
736 }
737 
738 /*
739  * vim:ts=4:sw=4:ai:et:si:sts=4
740  */
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:215
QWaitCondition m_unpauseWait
DeviceReadBuffer(DeviceReaderCB *cb, bool use_poll=true, bool error_exit_on_poll_timeout=true)
void start(QThread::Priority=QThread::InheritPriority)
Tell MThread to start running the thread in the near future.
Definition: mthread.cpp:294
def write(text, progress=True)
Definition: mythburn.py:279
bool IsRunning(void) const
This is a wrapper around QThread that does several additional things.
Definition: mthread.h:46
QWaitCondition m_dataWait
A QElapsedTimer based timer to replace use of QTime as a timer.
Definition: mythtimer.h:13
#define O_NONBLOCK
Definition: mythmedia.cpp:25
bool IsEOF(void) const
void ClosePipes(void) const
uint GetContiguousUnused(void) const
void run(void) override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.
bool IsOpen(void) const
bool wait(unsigned long time=ULONG_MAX)
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:311
volatile bool m_dorun
void WakePoll(void) const
unsigned int uint
Definition: compat.h:140
MythCoreContext * gCoreContext
This global variable contains the MythCoreContext instance for the app.
unsigned char * m_buffer
void IncrReadPointer(uint len)
static void setup_pipe(int[2], long[2])
Definition: mythbaseutil.h:16
unsigned char * m_writePtr
bool IsErrored(void) const
def read(device=None, features=[])
Definition: disc.py:35
bool IsPaused(void) const
uint Read(unsigned char *buf, uint count)
Try to Read count bytes from into buffer.
#define LOC
#define close
Definition: compat.h:16
virtual void ReaderPaused(int fd)=0
uint WaitForUsed(uint needed, uint max_wait) const
bool WaitForPaused(unsigned long timeout)
void SetRequestPause(bool request)
bool isRunning(void) const
Definition: mthread.cpp:274
uint WaitForUnused(uint needed) const
virtual void PriorityEvent(int fd)=0
bool WaitForUnpause(unsigned long timeout)
unsigned char * m_endPtr
DeviceReaderCB * m_readerCB
bool Poll(void) const
#define ENO
This can be appended to the LOG args with "+".
Definition: mythlogging.h:99
bool CheckForErrors(ssize_t read_len, size_t requested_len, uint &errcnt)
int elapsed(void) const
Returns milliseconds elapsed since last start() or restart()
Definition: mythtimer.cpp:90
void IncrWritePointer(uint len)
QWaitCondition m_runWait
int GetNumSetting(const QString &key, int defaultval=0)
void Reset(const QString &streamName, int streamfd)
#define LOG(_MASK_, _LEVEL_, _STRING_)
Definition: mythlogging.h:41
bool Setup(const QString &streamName, int streamfd, uint readQuanta=sizeof(TSPacket), uint deviceBufferSize=0, uint deviceBufferCount=1)
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:202
uint GetUsed(void) const
uint GetUnused(void) const
static void usleep(unsigned long time)
Definition: mthread.cpp:348
QWaitCondition m_pauseWait
unsigned char * m_readPtr
bool IsPauseRequested(void) const
void start(void)
starts measuring elapsed time.
Definition: mythtimer.cpp:47