MythTV  master
DeviceReadBuffer.cpp
Go to the documentation of this file.
1 #include <algorithm>
2 using namespace std;
3 
4 #include "DeviceReadBuffer.h"
5 #include "mythcorecontext.h"
6 #include "mythbaseutil.h"
7 #include "mythlogging.h"
8 #include "tspacket.h"
9 #include "mthread.h"
10 #include "compat.h"
11 
12 #ifndef _WIN32
13 #include <sys/poll.h>
14 #endif
15 
17 #define REPORT_RING_STATS 0
18 
19 #define LOC QString("DevRdB(%1): ").arg(m_videodevice)
20 
22  DeviceReaderCB *cb, bool use_poll, bool error_exit_on_poll_timeout)
23  : MThread("DeviceReadBuffer"),
24  m_readerCB(cb),
25  m_using_poll(use_poll),
26  m_poll_timeout_is_error(error_exit_on_poll_timeout)
27 {
28 #ifdef USING_MINGW
29 #warning mingw DeviceReadBuffer::Poll
30  if (m_using_poll)
31  {
32  LOG(VB_GENERAL, LOG_WARNING, LOC +
33  "mingw DeviceReadBuffer::Poll is not implemented");
34  m_using_poll = false;
35  }
36 #endif
37 }
38 
40 {
41  Stop();
42  if (m_buffer)
43  {
44  delete[] m_buffer;
45  m_buffer = nullptr;
46  }
47 }
48 
49 bool DeviceReadBuffer::Setup(const QString &streamName, int streamfd,
50  uint readQuanta, uint deviceBufferSize,
51  uint deviceBufferCount)
52 {
53  QMutexLocker locker(&m_lock);
54 
55  delete[] m_buffer;
56 
57  m_videodevice = streamName;
58  m_videodevice = m_videodevice.isNull() ? "" : m_videodevice;
59  m_stream_fd = streamfd;
60 
61  // Setup device ringbuffer
62  m_eof = false;
63  m_error = false;
64  m_request_pause = false;
65  m_paused = false;
66 
67  m_read_quanta = (readQuanta) ? readQuanta : m_read_quanta;
68  m_dev_buffer_count = deviceBufferCount;
70  "HDRingbufferSize", static_cast<int>(50 * m_read_quanta)) * 1024;
71  m_used = 0;
73  m_dev_read_size = (deviceBufferSize) ?
74  min(m_dev_read_size, (size_t)deviceBufferSize) : m_dev_read_size;
76 
77  m_buffer = new (nothrow) unsigned char[m_size + m_dev_read_size];
80 
81  // Initialize buffer, if it exists
82  if (!m_buffer)
83  {
84  m_endPtr = nullptr;
85  LOG(VB_GENERAL, LOG_ERR, LOC +
86  QString("Failed to allocate buffer of size %1 = %2 + %3")
88  return false;
89  }
91  memset(m_buffer, 0xFF, m_size + m_read_quanta);
92 
93  // Initialize statistics
94  m_max_used = 0;
95  m_avg_used = 0;
100 
101  LOG(VB_RECORD, LOG_INFO, LOC + QString("buffer size %1 KB").arg(m_size/1024));
102 
103  return true;
104 }
105 
107 {
108  LOG(VB_RECORD, LOG_INFO, LOC + "Start() -- begin");
109 
110  QMutexLocker locker(&m_lock);
111  if (isRunning() || m_dorun)
112  {
113  m_dorun = false;
114  locker.unlock();
115  WakePoll();
116  wait();
117  locker.relock();
118  }
119 
120  m_dorun = true;
121  m_error = false;
122  m_eof = false;
123 
124  start();
125 
126  LOG(VB_RECORD, LOG_INFO, LOC + "Start() -- middle");
127 
128  while (m_dorun && !isRunning())
129  m_runWait.wait(locker.mutex(), 100);
130 
131  LOG(VB_RECORD, LOG_INFO, LOC + "Start() -- end");
132 }
133 
134 void DeviceReadBuffer::Reset(const QString &streamName, int streamfd)
135 {
136  QMutexLocker locker(&m_lock);
137 
138  m_videodevice = streamName;
139  m_videodevice = m_videodevice.isNull() ? "" : m_videodevice;
140  m_stream_fd = streamfd;
141 
142  m_used = 0;
145 
146  m_error = false;
147 }
148 
150 {
151  LOG(VB_RECORD, LOG_INFO, LOC + "Stop() -- begin");
152  QMutexLocker locker(&m_lock);
153  if (isRunning() || m_dorun)
154  {
155  m_dorun = false;
156  locker.unlock();
157  WakePoll();
158  wait();
159  }
160  LOG(VB_RECORD, LOG_INFO, LOC + "Stop() -- end");
161 }
162 
164 {
165  QMutexLocker locker(&m_lock);
166  m_request_pause = req;
167  WakePoll();
168 }
169 
171 {
172  QMutexLocker locker(&m_lock);
173  m_paused = val;
174  if (val)
175  m_pauseWait.wakeAll();
176  else
177  m_unpauseWait.wakeAll();
178 }
179 
180 // The WakePoll code is copied from MythSocketThread::WakeReadyReadThread()
182 {
183  char buf[1];
184  buf[0] = '0';
185  ssize_t wret = 0;
186  while (isRunning() && (wret <= 0) && (m_wake_pipe[1] >= 0))
187  {
188  wret = ::write(m_wake_pipe[1], &buf, 1);
189  if ((wret < 0) && (EAGAIN != errno) && (EINTR != errno))
190  {
191  LOG(VB_GENERAL, LOG_ERR, LOC + "WakePoll failed.");
192  ClosePipes();
193  break;
194  }
195  }
196 }
197 
199 {
200  for (uint i = 0; i < 2; i++)
201  {
202  if (m_wake_pipe[i] >= 0)
203  {
204  ::close(m_wake_pipe[i]);
205  m_wake_pipe[i] = -1;
206  m_wake_pipe_flags[i] = 0;
207  }
208  }
209 }
210 
212 {
213  QMutexLocker locker(&m_lock);
214  return m_paused;
215 }
216 
218 {
219  QMutexLocker locker(&m_lock);
220 
221  if (!m_paused)
222  m_pauseWait.wait(&m_lock, timeout);
223 
224  return m_paused;
225 }
226 
228 {
229  QMutexLocker locker(&m_lock);
230 
231  if (m_paused)
232  m_unpauseWait.wait(&m_lock, timeout);
233 
234  return m_paused;
235 }
236 
238 {
239  QMutexLocker locker(&m_lock);
240  return m_request_pause;
241 }
242 
244 {
245  QMutexLocker locker(&m_lock);
246  return m_error;
247 }
248 
249 bool DeviceReadBuffer::IsEOF(void) const
250 {
251  QMutexLocker locker(&m_lock);
252  return m_eof;
253 }
254 
256 {
257  QMutexLocker locker(&m_lock);
258  return isRunning();
259 }
260 
262 {
263  QMutexLocker locker(&m_lock);
264  return m_size - m_used;
265 }
266 
268 {
269  QMutexLocker locker(&m_lock);
270  return m_used;
271 }
272 
274 {
275  QMutexLocker locker(&m_lock);
276  return m_endPtr - m_writePtr;
277 }
278 
280 {
281  QMutexLocker locker(&m_lock);
282  m_used += len;
283  m_writePtr += len;
285 #if REPORT_RING_STATS
286  m_max_used = max(m_used, m_max_used);
289 #endif
290  m_dataWait.wakeAll();
291 }
292 
294 {
295  QMutexLocker locker(&m_lock);
296  m_used -= len;
297  m_readPtr += len;
299 #if REPORT_RING_STATS
301 #endif
302 }
303 
305 {
306  RunProlog();
307 
308  uint errcnt = 0;
309  uint cnt;
310  ssize_t len;
311  size_t read_size;
312  size_t unused;
313  size_t total;
314  size_t throttle = m_dev_read_size * m_dev_buffer_count / 2;
315 
316  m_lock.lock();
317  m_runWait.wakeAll();
318  m_lock.unlock();
319 
320  if (m_using_poll)
322 
323  while (m_dorun)
324  {
325  if (!HandlePausing())
326  continue;
327 
328  if (!IsOpen())
329  {
330  usleep(5000);
331  continue;
332  }
333 
334  if (m_using_poll && !Poll())
335  continue;
336 
337  {
338  QMutexLocker locker(&m_lock);
339  if (m_error)
340  {
341  LOG(VB_RECORD, LOG_ERR, LOC + "fill_ringbuffer: error state");
342  break;
343  }
344  }
345 
346  /* Some device drivers segment their buffer into small pieces,
347  * So allow for the reading of multiple buffers */
348  for (cnt = 0, len = 0, total = 0;
349  m_dorun && len >= 0 && cnt < m_dev_buffer_count; ++cnt)
350  {
351  // Limit read size for faster return from read
352  unused = static_cast<size_t>(WaitForUnused(m_read_quanta));
353  read_size = min(m_dev_read_size, unused);
354 
355  // if read_size > 0 do the read...
356  if (read_size)
357  {
358  len = read(m_stream_fd, m_writePtr, read_size);
359  if (!CheckForErrors(len, read_size, errcnt))
360  break;
361  errcnt = 0;
362 
363  // if we wrote past the official end of the buffer,
364  // copy to start
365  if (m_writePtr + len > m_endPtr)
366  memcpy(m_buffer, m_endPtr, m_writePtr + len - m_endPtr);
367  IncrWritePointer(len);
368  total += len;
369  }
370  }
371  if (errcnt > 5)
372  break;
373 
374  // Slow down reading if not under load
375  if (errcnt == 0 && total < throttle)
376  usleep(1000);
377  }
378 
379  ClosePipes();
380 
381  m_lock.lock();
382  m_eof = true;
383  m_runWait.wakeAll();
384  m_dataWait.wakeAll();
385  m_pauseWait.wakeAll();
386  m_unpauseWait.wakeAll();
387  m_lock.unlock();
388 
389  RunEpilog();
390 }
391 
393 {
394  if (IsPauseRequested())
395  {
396  SetPaused(true);
397 
398  if (m_readerCB)
400 
401  usleep(5000);
402  return false;
403  }
404  if (IsPaused())
405  {
407  SetPaused(false);
408  }
409  return true;
410 }
411 
412 bool DeviceReadBuffer::Poll(void) const
413 {
414 #ifdef _WIN32
415 # ifdef _MSC_VER
416 # pragma message( "mingw DeviceReadBuffer::Poll" )
417 # else
418 # warning mingw DeviceReadBuffer::Poll
419 # endif
420  LOG(VB_GENERAL, LOG_ERR, LOC +
421  "mingw DeviceReadBuffer::Poll is not implemented");
422  return false;
423 #else
424  bool retval = true;
425  MythTimer timer;
426  timer.start();
427 
428  int poll_cnt = 1;
429  struct pollfd polls[2];
430  memset(polls, 0, sizeof(polls));
431 
432  polls[0].fd = m_stream_fd;
433  polls[0].events = POLLIN | POLLPRI;
434  polls[0].revents = 0;
435 
436  if (m_wake_pipe[0] >= 0)
437  {
438  poll_cnt = 2;
439  polls[1].fd = m_wake_pipe[0];
440  polls[1].events = POLLIN;
441  polls[1].revents = 0;
442  }
443 
444  while (true)
445  {
446  polls[0].revents = 0;
447  polls[1].revents = 0;
448  poll_cnt = (m_wake_pipe[0] >= 0) ? poll_cnt : 1;
449 
450  int timeout = m_max_poll_wait;
451  if (1 == poll_cnt)
452  timeout = 10;
453  else if (m_poll_timeout_is_error)
454  // subtract a bit to allow processing time.
455  timeout = max((int)m_max_poll_wait - timer.elapsed() - 15, 10);
456 
457  int ret = poll(polls, poll_cnt, timeout);
458 
459  if (polls[0].revents & POLLHUP)
460  {
461  LOG(VB_GENERAL, LOG_ERR, LOC + "poll eof (POLLHUP)");
462  break;
463  }
464  if (polls[0].revents & POLLNVAL)
465  {
466  LOG(VB_GENERAL, LOG_ERR, LOC + "poll error" + ENO);
467  m_error = true;
468  return true;
469  }
470 
471  if (!m_dorun || !IsOpen() || IsPauseRequested())
472  {
473  retval = false;
474  break; // are we supposed to pause, stop, etc.
475  }
476 
477  if (polls[0].revents & POLLPRI)
478  {
479  m_readerCB->PriorityEvent(polls[0].fd);
480  }
481 
482  if (polls[0].revents & POLLIN)
483  {
484  if (ret > 0)
485  break; // we have data to read :)
486  if (ret < 0)
487  {
488  if ((EOVERFLOW == errno))
489  break; // we have an error to handle
490 
491  if ((EAGAIN == errno) || (EINTR == errno))
492  continue; // errors that tell you to try again
493 
494  usleep(2500 /*2.5 ms*/);
495  }
496  else // ret == 0
497  {
499  (timer.elapsed() >= (int)m_max_poll_wait))
500  {
501  LOG(VB_GENERAL, LOG_ERR, LOC + "Poll giving up 1");
502  QMutexLocker locker(&m_lock);
503  m_error = true;
504  return true;
505  }
506  }
507  }
508 
509  // Clear out any pending pipe reads
510  if ((poll_cnt > 1) && (polls[1].revents & POLLIN))
511  {
512  char dummy[128];
513  int cnt = (m_wake_pipe_flags[0] & O_NONBLOCK) ? 128 : 1;
514  ::read(m_wake_pipe[0], dummy, cnt);
515  }
516 
517  if (m_poll_timeout_is_error && (timer.elapsed() >= (int)m_max_poll_wait))
518  {
519  LOG(VB_GENERAL, LOG_ERR, LOC + QString("Poll giving up after %1ms")
520  .arg(m_max_poll_wait));
521  QMutexLocker locker(&m_lock);
522  m_error = true;
523  return true;
524  }
525  }
526 
527  int e = timer.elapsed();
528  if (e > (int)m_max_poll_wait)
529  {
530  LOG(VB_GENERAL, LOG_WARNING, LOC +
531  QString("Poll took an unusually long time %1 ms")
532  .arg(timer.elapsed()));
533  }
534 
535  return retval;
536 #endif
537 }
538 
540  ssize_t len, size_t requested_len, uint &errcnt)
541 {
542  if (len > (ssize_t)requested_len)
543  {
544  LOG(VB_GENERAL, LOG_ERR, LOC +
545  "Driver is returning bogus values on read");
546  if (++errcnt > 5)
547  {
548  LOG(VB_RECORD, LOG_ERR, LOC + "Too many errors.");
549  QMutexLocker locker(&m_lock);
550  m_error = true;
551  }
552  return false;
553  }
554 
555 #ifdef _WIN32
556 # ifdef _MSC_VER
557 # pragma message( "mingw DeviceReadBuffer::CheckForErrors" )
558 # else
559 # warning mingw DeviceReadBuffer::CheckForErrors
560 # endif
561  LOG(VB_GENERAL, LOG_ERR, LOC +
562  "mingw DeviceReadBuffer::CheckForErrors is not implemented");
563  return false;
564 #else
565  if (len < 0)
566  {
567  if (EINTR == errno)
568  return false;
569  if (EAGAIN == errno)
570  {
571  usleep(2500);
572  return false;
573  }
574  if (EOVERFLOW == errno)
575  {
576  LOG(VB_GENERAL, LOG_ERR, LOC + "Driver buffers overflowed");
577  return false;
578  }
579 
580  LOG(VB_GENERAL, LOG_ERR, LOC +
581  QString("Problem reading fd(%1)").arg(m_stream_fd) + ENO);
582 
583  if (++errcnt > 5)
584  {
585  LOG(VB_RECORD, LOG_ERR, LOC + "Too many errors.");
586  QMutexLocker locker(&m_lock);
587  m_error = true;
588  return false;
589  }
590 
591  usleep(500);
592  return false;
593  }
594  if (len == 0)
595  {
596  if (++errcnt > 5)
597  {
598  LOG(VB_GENERAL, LOG_ERR, LOC +
599  QString("End-Of-File? fd(%1)").arg(m_stream_fd));
600 
601  m_lock.lock();
602  m_eof = true;
603  m_lock.unlock();
604 
605  return false;
606  }
607  usleep(500);
608  return false;
609  }
610  return true;
611 #endif
612 }
613 
620 uint DeviceReadBuffer::Read(unsigned char *buf, const uint count)
621 {
622  uint avail = WaitForUsed(min(count, (uint)m_readThreshold), 20);
623  size_t cnt = min(count, avail);
624 
625  if (!cnt)
626  return 0;
627 
628  if (m_readPtr + cnt > m_endPtr)
629  {
630  // Process as two pieces
631  size_t len = m_endPtr - m_readPtr;
632  if (len)
633  {
634  memcpy(buf, m_readPtr, len);
635  buf += len;
636  IncrReadPointer(len);
637  }
638  if (cnt > len)
639  {
640  len = cnt - len;
641  memcpy(buf, m_readPtr, len);
642  IncrReadPointer(len);
643  }
644  }
645  else
646  {
647  memcpy(buf, m_readPtr, cnt);
648  IncrReadPointer(cnt);
649  }
650 
651 #if REPORT_RING_STATS
652  ReportStats();
653 #endif
654 
655  return cnt;
656 }
657 
663 {
664  size_t unused = GetUnused();
665 
666  if (unused > m_read_quanta)
667  {
668  while (unused < needed)
669  {
670  unused = GetUnused();
671  if (IsPauseRequested() || !IsOpen() || !m_dorun)
672  return 0;
673  usleep(5000);
674  }
675  if (IsPauseRequested() || !IsOpen() || !m_dorun)
676  return 0;
677  unused = GetUnused();
678  }
679 
680  return unused;
681 }
682 
689 {
690  MythTimer timer;
691  timer.start();
692 
693  QMutexLocker locker(&m_lock);
694  size_t avail = m_used;
695  while ((needed > avail) && isRunning() &&
696  !m_request_pause && !m_error && !m_eof &&
697  (timer.elapsed() < (int)max_wait))
698  {
699  m_dataWait.wait(locker.mutex(), 10);
700  avail = m_used;
701  }
702  return avail;
703 }
704 
706 {
707 #if REPORT_RING_STATS
708  static const int secs = 20;
709  static const double d1_s = 1.0 / secs;
710  if (m_lastReport.elapsed() > secs * 1000 /* msg every 20 seconds */)
711  {
712  QMutexLocker locker(&m_lock);
713  double rsize = 100.0 / m_size;
714  QString msg = QString("fill avg(%1%) ").arg(m_avg_used*rsize,5,'f',2);
715  msg += QString("fill max(%1%) ").arg(m_max_used*rsize,5,'f',2);
716  msg += QString("writes/sec(%1) ").arg(m_avg_buf_write_cnt*d1_s);
717  msg += QString("reads/sec(%1) ").arg(m_avg_buf_read_cnt*d1_s);
718  msg += QString("sleeps/sec(%1)").arg(m_avg_buf_sleep_cnt*d1_s);
719 
720  m_avg_used = 0;
722  m_avg_buf_read_cnt = 0;
724  m_max_used = 0;
726 
727  LOG(VB_GENERAL, LOG_INFO, LOC + msg);
728  }
729 #endif
730 }
731 
732 /*
733  * vim:ts=4:sw=4:ai:et:si:sts=4
734  */
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:215
QWaitCondition m_unpauseWait
DeviceReadBuffer(DeviceReaderCB *cb, bool use_poll=true, bool error_exit_on_poll_timeout=true)
void start(QThread::Priority=QThread::InheritPriority)
Tell MThread to start running the thread in the near future.
Definition: mthread.cpp:294
def write(text, progress=True)
Definition: mythburn.py:279
bool IsRunning(void) const
This is a wrapper around QThread that does several additional things.
Definition: mthread.h:46
QWaitCondition m_dataWait
A QElapsedTimer based timer to replace use of QTime as a timer.
Definition: mythtimer.h:13
#define O_NONBLOCK
Definition: mythmedia.cpp:25
bool IsEOF(void) const
void ClosePipes(void) const
uint GetContiguousUnused(void) const
void run(void) override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.
bool IsOpen(void) const
bool wait(unsigned long time=ULONG_MAX)
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:311
volatile bool m_dorun
void WakePoll(void) const
unsigned int uint
Definition: compat.h:140
MythCoreContext * gCoreContext
This global variable contains the MythCoreContext instance for the app.
unsigned char * m_buffer
void IncrReadPointer(uint len)
static void setup_pipe(int[2], long[2])
Definition: mythbaseutil.h:16
unsigned char * m_writePtr
bool IsErrored(void) const
def read(device=None, features=[])
Definition: disc.py:35
bool IsPaused(void) const
uint Read(unsigned char *buf, uint count)
Try to Read count bytes from into buffer.
#define LOC
#define close
Definition: compat.h:16
virtual void ReaderPaused(int fd)=0
uint WaitForUsed(uint needed, uint max_wait) const
bool WaitForPaused(unsigned long timeout)
void SetRequestPause(bool request)
bool isRunning(void) const
Definition: mthread.cpp:274
uint WaitForUnused(uint needed) const
virtual void PriorityEvent(int fd)=0
bool WaitForUnpause(unsigned long timeout)
unsigned char * m_endPtr
DeviceReaderCB * m_readerCB
bool Poll(void) const
#define ENO
This can be appended to the LOG args with "+".
Definition: mythlogging.h:99
bool CheckForErrors(ssize_t read_len, size_t requested_len, uint &errcnt)
int elapsed(void) const
Returns milliseconds elapsed since last start() or restart()
Definition: mythtimer.cpp:90
void IncrWritePointer(uint len)
QWaitCondition m_runWait
int GetNumSetting(const QString &key, int defaultval=0)
void Reset(const QString &streamName, int streamfd)
#define LOG(_MASK_, _LEVEL_, _STRING_)
Definition: mythlogging.h:41
bool Setup(const QString &streamName, int streamfd, uint readQuanta=sizeof(TSPacket), uint deviceBufferSize=0, uint deviceBufferCount=1)
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:202
uint GetUsed(void) const
uint GetUnused(void) const
static void usleep(unsigned long time)
Definition: mthread.cpp:348
QWaitCondition m_pauseWait
unsigned char * m_readPtr
bool IsPauseRequested(void) const
void start(void)
starts measuring elapsed time.
Definition: mythtimer.cpp:47