MythTV master
DeviceReadBuffer.cpp
Go to the documentation of this file.
1#include <algorithm>
2#include <cerrno>
3#include <fcntl.h>
4#include <sys/types.h>
5
6#include <QString>
7
13
14#include "DeviceReadBuffer.h"
15#include "mpeg/tspacket.h"
16
17#ifndef _WIN32
18#include <sys/poll.h>
19#endif
20
21#ifdef _WIN32
23#else
24void DeviceReadBuffer::setup_pipe(pipe_fd_array& mypipe, pipe_flag_array& myflags)
25{
26 int pipe_ret = pipe(mypipe.data());
27 if (pipe_ret < 0)
28 {
29 LOG(VB_GENERAL, LOG_ERR, "Failed to open pipes" + ENO);
30 mypipe.fill(-1);
31 }
32 else
33 {
34 errno = 0;
35 long flags = fcntl(mypipe[0], F_GETFL);
36 if (0 == errno)
37 {
38 int ret = fcntl(mypipe[0], F_SETFL, flags|O_NONBLOCK);
39 if (ret < 0)
40 LOG(VB_GENERAL, LOG_ERR,
41 QString("Set pipe flags error") + ENO);
42 }
43 else
44 {
45 LOG(VB_GENERAL, LOG_ERR, QString("Get pipe flags error") + ENO);
46 }
47
48 for (uint i = 0; i < 2; i++)
49 {
50 errno = 0;
51 flags = fcntl(mypipe[i], F_GETFL);
52 if (0 == errno)
53 myflags[i] = flags;
54 }
55 }
56}
57#endif
58
60#define REPORT_RING_STATS 0 // NOLINT(cppcoreguidelines-macro-usage)
61
62#define LOC QString("DevRdB(%1): ").arg(m_videoDevice)
63
65 DeviceReaderCB *cb, bool use_poll, bool error_exit_on_poll_timeout)
66 : MThread("DeviceReadBuffer"),
67 m_readerCB(cb),
68 m_usingPoll(use_poll),
69 m_pollTimeoutIsError(error_exit_on_poll_timeout)
70{
71#ifdef __MINGW32__
72#warning mingw DeviceReadBuffer::Poll
73 if (m_usingPoll)
74 {
75 LOG(VB_GENERAL, LOG_WARNING, LOC +
76 "mingw DeviceReadBuffer::Poll is not implemented");
77 m_usingPoll = false;
78 }
79#endif
80}
81
83{
84 Stop();
85 if (m_buffer)
86 {
87 delete[] m_buffer;
88 m_buffer = nullptr;
89 }
90}
91
92bool DeviceReadBuffer::Setup(const QString &streamName, int streamfd,
93 uint readQuanta, uint deviceBufferSize,
94 uint deviceBufferCount)
95{
96 QMutexLocker locker(&m_lock);
97
98 delete[] m_buffer;
99
100 m_videoDevice = streamName;
101 m_videoDevice = m_videoDevice.isNull() ? "" : m_videoDevice;
102 m_streamFd = streamfd;
103
104 // Setup device ringbuffer
105 m_eof = false;
106 m_error = false;
107 m_requestPause = false;
108 m_paused = false;
109
110 m_readQuanta = (readQuanta) ? readQuanta : m_readQuanta;
111 m_devBufferCount = deviceBufferCount;
113 "HDRingbufferSize", static_cast<int>(50 * m_readQuanta)) * 1024_UZ;
114 m_used = 0;
115 m_devReadSize = m_readQuanta * (m_usingPoll ? 256 : 48);
116 m_devReadSize = (deviceBufferSize) ?
117 std::min(m_devReadSize, (size_t)deviceBufferSize) : m_devReadSize;
119
120 m_buffer = new (std::nothrow) unsigned char[m_size + m_devReadSize];
123
124 // Initialize buffer, if it exists
125 if (!m_buffer)
126 {
127 m_endPtr = nullptr;
128 LOG(VB_GENERAL, LOG_ERR, LOC +
129 QString("Failed to allocate buffer of size %1 = %2 + %3")
130 .arg(m_size+m_devReadSize).arg(m_size).arg(m_devReadSize));
131 return false;
132 }
134 memset(m_buffer, 0xFF, m_size + m_readQuanta);
135
136 // Initialize statistics
137 m_maxUsed = 0;
138 m_avgUsed = 0;
140 m_avgBufReadCnt = 0;
143
144 LOG(VB_RECORD, LOG_INFO, LOC + QString("buffer size %1 KB").arg(m_size/1024));
145
146 return true;
147}
148
150{
151 LOG(VB_RECORD, LOG_INFO, LOC + "Start() -- begin");
152
153 QMutexLocker locker(&m_lock);
154 if (isRunning() || m_doRun)
155 {
156 m_doRun = false;
157 locker.unlock();
158 WakePoll();
159 wait();
160 locker.relock();
161 }
162
163 m_doRun = true;
164 m_error = false;
165 m_eof = false;
166
167 start();
168
169 LOG(VB_RECORD, LOG_INFO, LOC + "Start() -- middle");
170
171 while (m_doRun && !isRunning())
172 m_runWait.wait(locker.mutex(), 100);
173
174 LOG(VB_RECORD, LOG_INFO, LOC + "Start() -- end");
175}
176
177void DeviceReadBuffer::Reset(const QString &streamName, int streamfd)
178{
179 QMutexLocker locker(&m_lock);
180
181 m_videoDevice = streamName;
182 m_videoDevice = m_videoDevice.isNull() ? "" : m_videoDevice;
183 m_streamFd = streamfd;
184
185 m_used = 0;
188
189 m_error = false;
190}
191
193{
194 LOG(VB_RECORD, LOG_INFO, LOC + "Stop() -- begin");
195 QMutexLocker locker(&m_lock);
196 if (isRunning() || m_doRun)
197 {
198 m_doRun = false;
199 locker.unlock();
200 WakePoll();
201 wait();
202 }
203 LOG(VB_RECORD, LOG_INFO, LOC + "Stop() -- end");
204}
205
207{
208 QMutexLocker locker(&m_lock);
209 m_requestPause = req;
210 WakePoll();
211}
212
214{
215 QMutexLocker locker(&m_lock);
216 m_paused = val;
217 if (val)
218 m_pauseWait.wakeAll();
219 else
220 m_unpauseWait.wakeAll();
221}
222
223// The WakePoll code is copied from MythSocketThread::WakeReadyReadThread()
225{
226 std::string buf(1,'\0');
227 ssize_t wret = 0;
228 while (isRunning() && (wret <= 0) && (m_wakePipe[1] >= 0))
229 {
230 wret = ::write(m_wakePipe[1], buf.data(), buf.size());
231 if ((wret < 0) && (EAGAIN != errno) && (EINTR != errno))
232 {
233 LOG(VB_GENERAL, LOG_ERR, LOC + "WakePoll failed.");
234 ClosePipes();
235 break;
236 }
237 }
238}
239
241{
242 for (uint i = 0; i < 2; i++)
243 {
244 if (m_wakePipe[i] >= 0)
245 {
247 m_wakePipe[i] = -1;
248 m_wakePipeFlags[i] = 0;
249 }
250 }
251}
252
254{
255 QMutexLocker locker(&m_lock);
256 return m_paused;
257}
258
260{
261 QMutexLocker locker(&m_lock);
262
263 if (!m_paused)
264 m_pauseWait.wait(&m_lock, timeout);
265
266 return m_paused;
267}
268
270{
271 QMutexLocker locker(&m_lock);
272
273 if (m_paused)
275
276 return m_paused;
277}
278
280{
281 QMutexLocker locker(&m_lock);
282 return m_requestPause;
283}
284
286{
287 QMutexLocker locker(&m_lock);
288 return m_error;
289}
290
292{
293 QMutexLocker locker(&m_lock);
294 return m_eof;
295}
296
298{
299 QMutexLocker locker(&m_lock);
300 return isRunning();
301}
302
304{
305 QMutexLocker locker(&m_lock);
306 return m_size - m_used;
307}
308
310{
311 QMutexLocker locker(&m_lock);
312 return m_used;
313}
314
316{
317 QMutexLocker locker(&m_lock);
318 return m_endPtr - m_writePtr;
319}
320
322{
323 QMutexLocker locker(&m_lock);
324 m_used += len;
325 m_writePtr += len;
327#if REPORT_RING_STATS
328 m_maxUsed = std::max(m_used, m_maxUsed);
331#endif
332 m_dataWait.wakeAll();
333}
334
336{
337 QMutexLocker locker(&m_lock);
338 m_used -= len;
339 m_readPtr += len;
341#if REPORT_RING_STATS
343#endif
344}
345
347{
348 RunProlog();
349
350 uint errcnt = 0;
351 uint cnt = 0;
352 ssize_t read_len = 0;
353 size_t total = 0;
354 size_t throttle = m_devReadSize * m_devBufferCount / 2;
355
356 m_lock.lock();
357 m_runWait.wakeAll();
358 m_lock.unlock();
359
360 if (m_usingPoll)
362
363 while (m_doRun)
364 {
365 if (!HandlePausing())
366 continue;
367
368 if (!IsOpen())
369 {
370 usleep(5ms);
371 continue;
372 }
373
374 if (m_usingPoll && !Poll())
375 continue;
376
377 {
378 QMutexLocker locker(&m_lock);
379 if (m_error)
380 {
381 LOG(VB_RECORD, LOG_ERR, LOC + "fill_ringbuffer: error state");
382 break;
383 }
384 }
385
386 /* Some device drivers segment their buffer into small pieces,
387 * So allow for the reading of multiple buffers */
388 for (cnt = 0, read_len = 0, total = 0;
389 m_doRun && read_len >= 0 && cnt < m_devBufferCount; ++cnt)
390 {
391 // Limit read size for faster return from read
392 auto unused = static_cast<size_t>(WaitForUnused(m_readQuanta));
393 size_t read_size = std::min(m_devReadSize, unused);
394
395 // if read_size > 0 do the read...
396 if (read_size)
397 {
398 read_len = read(m_streamFd, m_writePtr, read_size);
399 if (!CheckForErrors(read_len, read_size, errcnt))
400 break;
401 errcnt = 0;
402
403 // if we wrote past the official end of the buffer,
404 // copy to start
405 if (m_writePtr + read_len > m_endPtr)
406 memcpy(m_buffer, m_endPtr, m_writePtr + read_len - m_endPtr);
407 IncrWritePointer(read_len);
408 total += read_len;
409 }
410 }
411 if (errcnt > 5)
412 break;
413
414 // Slow down reading if not under load
415 if (errcnt == 0 && total < throttle)
416 usleep(1ms);
417 }
418
419 ClosePipes();
420
421 m_lock.lock();
422 m_eof = true;
423 m_runWait.wakeAll();
424 m_dataWait.wakeAll();
425 m_pauseWait.wakeAll();
426 m_unpauseWait.wakeAll();
427 m_lock.unlock();
428
429 RunEpilog();
430}
431
433{
434 if (IsPauseRequested())
435 {
436 SetPaused(true);
437
438 if (m_readerCB)
440
441 usleep(5ms);
442 return false;
443 }
444 if (IsPaused())
445 {
447 SetPaused(false);
448 }
449 return true;
450}
451
453{
454#ifdef _WIN32
455# ifdef _MSC_VER
456# pragma message( "mingw DeviceReadBuffer::Poll" )
457# else
458# warning mingw DeviceReadBuffer::Poll
459# endif
460 LOG(VB_GENERAL, LOG_ERR, LOC +
461 "mingw DeviceReadBuffer::Poll is not implemented");
462 return false;
463#else
464 bool retval = true;
465 MythTimer timer;
466 timer.start();
467
468 int poll_cnt = 1;
469 std::array<struct pollfd,2> polls {};
470
471 polls[0].fd = m_streamFd;
472 polls[0].events = POLLIN | POLLPRI;
473 polls[0].revents = 0;
474
475 if (m_wakePipe[0] >= 0)
476 {
477 poll_cnt = 2;
478 polls[1].fd = m_wakePipe[0];
479 polls[1].events = POLLIN;
480 polls[1].revents = 0;
481 }
482
483 while (true)
484 {
485 polls[0].revents = 0;
486 polls[1].revents = 0;
487 poll_cnt = (m_wakePipe[0] >= 0) ? poll_cnt : 1;
488
489 std::chrono::milliseconds timeout = m_maxPollWait;
490 if (1 == poll_cnt)
491 timeout = 10ms;
492 else if (m_pollTimeoutIsError)
493 // subtract a bit to allow processing time.
494 timeout = std::max(m_maxPollWait - timer.elapsed() - 15ms, 10ms);
495
496 int ret = poll(polls.data(), poll_cnt, timeout.count());
497
498 if (polls[0].revents & POLLHUP)
499 {
500 LOG(VB_GENERAL, LOG_ERR, LOC + "poll eof (POLLHUP)");
501 break;
502 }
503 if (polls[0].revents & POLLNVAL)
504 {
505 LOG(VB_GENERAL, LOG_ERR, LOC + "poll error" + ENO);
506 m_error = true;
507 return true;
508 }
509
510 if (!m_doRun || !IsOpen() || IsPauseRequested())
511 {
512 retval = false;
513 break; // are we supposed to pause, stop, etc.
514 }
515
516 if (polls[0].revents & POLLPRI)
517 {
518 m_readerCB->PriorityEvent(polls[0].fd);
519 }
520
521 if (polls[0].revents & POLLIN)
522 {
523 if (ret > 0)
524 break; // we have data to read :)
525 if (ret < 0)
526 {
527 if ((EOVERFLOW == errno))
528 break; // we have an error to handle
529
530 if ((EAGAIN == errno) || (EINTR == errno))
531 continue; // errors that tell you to try again
532
533 usleep(2.5ms);
534 }
535 else // ret == 0
536 {
538 (timer.elapsed() >= m_maxPollWait))
539 {
540 LOG(VB_GENERAL, LOG_ERR, LOC + "Poll giving up 1");
541 QMutexLocker locker(&m_lock);
542 m_error = true;
543 return true;
544 }
545 }
546 }
547
548 // Clear out any pending pipe reads
549 if ((poll_cnt > 1) && (polls[1].revents & POLLIN))
550 {
551 std::array<char,128> dummy {};
552 int cnt = (m_wakePipeFlags[0] & O_NONBLOCK) ? 128 : 1;
553 ::read(m_wakePipe[0], dummy.data(), cnt);
554 }
555
556 if (m_pollTimeoutIsError && (timer.elapsed() >= m_maxPollWait))
557 {
558 LOG(VB_GENERAL, LOG_ERR, LOC + QString("Poll giving up after %1ms")
559 .arg(m_maxPollWait.count()));
560 QMutexLocker locker(&m_lock);
561 m_error = true;
562 return true;
563 }
564 }
565
566 std::chrono::milliseconds e = timer.elapsed();
567 if (e > m_maxPollWait)
568 {
569 LOG(VB_GENERAL, LOG_WARNING, LOC +
570 QString("Poll took an unusually long time %1 ms")
571 .arg(timer.elapsed().count()));
572 }
573
574 return retval;
575#endif
576}
577
579 ssize_t len, size_t requested_len, uint &errcnt)
580{
581 if (len > (ssize_t)requested_len)
582 {
583 LOG(VB_GENERAL, LOG_ERR, LOC +
584 "Driver is returning bogus values on read");
585 if (++errcnt > 5)
586 {
587 LOG(VB_RECORD, LOG_ERR, LOC + "Too many errors.");
588 QMutexLocker locker(&m_lock);
589 m_error = true;
590 }
591 return false;
592 }
593
594#ifdef _WIN32
595# ifdef _MSC_VER
596# pragma message( "mingw DeviceReadBuffer::CheckForErrors" )
597# else
598# warning mingw DeviceReadBuffer::CheckForErrors
599# endif
600 LOG(VB_GENERAL, LOG_ERR, LOC +
601 "mingw DeviceReadBuffer::CheckForErrors is not implemented");
602 return false;
603#else
604 if (len < 0)
605 {
606 if (EINTR == errno)
607 return false;
608 if (EAGAIN == errno)
609 {
610 usleep(2.5ms);
611 return false;
612 }
613 if (EOVERFLOW == errno)
614 {
615 LOG(VB_GENERAL, LOG_ERR, LOC + "Driver buffers overflowed");
616 return false;
617 }
618
619 LOG(VB_GENERAL, LOG_ERR, LOC +
620 QString("Problem reading fd(%1)").arg(m_streamFd) + ENO);
621
622 if (++errcnt > 5)
623 {
624 LOG(VB_RECORD, LOG_ERR, LOC + "Too many errors.");
625 QMutexLocker locker(&m_lock);
626 m_error = true;
627 return false;
628 }
629
630 usleep(500ms);
631 return false;
632 }
633 if (len == 0)
634 {
635 if (++errcnt > 5)
636 {
637 LOG(VB_GENERAL, LOG_ERR, LOC +
638 QString("End-Of-File? fd(%1)").arg(m_streamFd));
639
640 m_lock.lock();
641 m_eof = true;
642 m_lock.unlock();
643
644 return false;
645 }
646 usleep(500ms);
647 return false;
648 }
649 return true;
650#endif
651}
652
659uint DeviceReadBuffer::Read(unsigned char *buf, const uint count)
660{
661 uint avail = WaitForUsed(std::min(count, (uint)m_readThreshold), 20ms);
662 size_t cnt = std::min(count, avail);
663
664 if (!cnt)
665 return 0;
666
667 if (m_readPtr + cnt > m_endPtr)
668 {
669 // Process as two pieces
670 size_t len = m_endPtr - m_readPtr;
671 if (len)
672 {
673 memcpy(buf, m_readPtr, len);
674 buf += len;
675 IncrReadPointer(len);
676 }
677 if (cnt > len)
678 {
679 len = cnt - len;
680 memcpy(buf, m_readPtr, len);
681 IncrReadPointer(len);
682 }
683 }
684 else
685 {
686 memcpy(buf, m_readPtr, cnt);
687 IncrReadPointer(cnt);
688 }
689
690#if REPORT_RING_STATS
691 ReportStats();
692#endif
693
694 return cnt;
695}
696
702{
703 size_t unused = GetUnused();
704
705 if (unused > m_readQuanta)
706 {
707 while (unused < needed)
708 {
709 unused = GetUnused();
710 if (IsPauseRequested() || !IsOpen() || !m_doRun)
711 return 0;
712 usleep(5ms);
713 }
714 if (IsPauseRequested() || !IsOpen() || !m_doRun)
715 return 0;
716 unused = GetUnused();
717 }
718
719 return unused;
720}
721
727uint DeviceReadBuffer::WaitForUsed(uint needed, std::chrono::milliseconds max_wait) const
728{
729 MythTimer timer;
730 timer.start();
731
732 QMutexLocker locker(&m_lock);
733 size_t avail = m_used;
734 while ((needed > avail) && isRunning() &&
735 !m_requestPause && !m_error && !m_eof &&
736 (timer.elapsed() < max_wait))
737 {
738 m_dataWait.wait(locker.mutex(), 10);
739 avail = m_used;
740 }
741 return avail;
742}
743
745{
746#if REPORT_RING_STATS
747 static constexpr std::chrono::seconds secs { 20s }; // msg every 20 seconds
748 static constexpr double d1_s = 1.0 / secs.count();
749 if (m_lastReport.elapsed() > duration_cast<std::chrono::milliseconds>(secs))
750 {
751 QMutexLocker locker(&m_lock);
752 double rsize = 100.0 / m_size;
753 QString msg = QString("fill avg(%1%) ").arg(m_avgUsed*rsize,5,'f',2);
754 msg += QString("fill max(%1%) ").arg(m_maxUsed*rsize,5,'f',2);
755 msg += QString("writes/sec(%1) ").arg(m_avgBufWriteCnt*d1_s);
756 msg += QString("reads/sec(%1) ").arg(m_avgBufReadCnt*d1_s);
757 msg += QString("sleeps/sec(%1)").arg(m_avgBufSleepCnt*d1_s);
758
759 m_avgUsed = 0;
761 m_avgBufReadCnt = 0;
763 m_maxUsed = 0;
765
766 LOG(VB_GENERAL, LOG_INFO, LOC + msg);
767 }
768#endif
769}
770
771/*
772 * vim:ts=4:sw=4:ai:et:si:sts=4
773 */
#define LOC
void ClosePipes(void) const
bool IsRunning(void) const
uint GetUsed(void) const
bool WaitForUnpause(unsigned long timeout)
std::array< long, 2 > pipe_flag_array
unsigned char * m_writePtr
DeviceReadBuffer(DeviceReaderCB *cb, bool use_poll=true, bool error_exit_on_poll_timeout=true)
bool IsPauseRequested(void) const
volatile bool m_doRun
unsigned char * m_readPtr
QWaitCondition m_pauseWait
QWaitCondition m_runWait
unsigned char * m_endPtr
bool Setup(const QString &streamName, int streamfd, uint readQuanta=sizeof(TSPacket), uint deviceBufferSize=0, uint deviceBufferCount=1)
void SetRequestPause(bool request)
uint WaitForUnused(uint needed) const
bool IsOpen(void) const
std::chrono::milliseconds m_maxPollWait
uint Read(unsigned char *buf, uint count)
Try to Read count bytes from into buffer.
std::array< int, 2 > pipe_fd_array
~DeviceReadBuffer() override
static void setup_pipe(pipe_fd_array &mypipe, pipe_flag_array &myflags)
bool IsPaused(void) const
void IncrWritePointer(uint len)
pipe_flag_array m_wakePipeFlags
QWaitCondition m_dataWait
QWaitCondition m_unpauseWait
bool Poll(void) const
bool IsErrored(void) const
bool IsEOF(void) const
unsigned char * m_buffer
void WakePoll(void) const
DeviceReaderCB * m_readerCB
void Reset(const QString &streamName, int streamfd)
bool CheckForErrors(ssize_t read_len, size_t requested_len, uint &errcnt)
uint GetUnused(void) const
uint GetContiguousUnused(void) const
uint WaitForUsed(uint needed, std::chrono::milliseconds max_wait) const
void SetPaused(bool val)
pipe_fd_array m_wakePipe
bool WaitForPaused(unsigned long timeout)
void run(void) override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.
void IncrReadPointer(uint len)
virtual void PriorityEvent(int fd)=0
virtual void ReaderPaused(int fd)=0
This is a wrapper around QThread that does several additional things.
Definition: mthread.h:49
bool isRunning(void) const
Definition: mthread.cpp:263
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:196
static void usleep(std::chrono::microseconds time)
Definition: mthread.cpp:335
void start(QThread::Priority p=QThread::InheritPriority)
Tell MThread to start running the thread in the near future.
Definition: mthread.cpp:283
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:209
bool wait(std::chrono::milliseconds time=std::chrono::milliseconds::max())
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:300
int GetNumSetting(const QString &key, int defaultval=0)
A QElapsedTimer based timer to replace use of QTime as a timer.
Definition: mythtimer.h:14
std::chrono::milliseconds elapsed(void)
Returns milliseconds elapsed since last start() or restart()
Definition: mythtimer.cpp:91
void start(void)
starts measuring elapsed time.
Definition: mythtimer.cpp:47
#define O_NONBLOCK
Definition: compat.h:292
#define close
Definition: compat.h:39
unsigned int uint
Definition: freesurround.h:24
MythCoreContext * gCoreContext
This global variable contains the MythCoreContext instance for the app.
#define ENO
This can be appended to the LOG args with "+".
Definition: mythlogging.h:74
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
Definition: mythlogging.h:39
def read(device=None, features=[])
Definition: disc.py:35
def write(text, progress=True)
Definition: mythburn.py:307