MythTV master
DeviceReadBuffer.cpp
Go to the documentation of this file.
1#include <algorithm>
2#include <cerrno>
3#include <fcntl.h>
4#include <sys/types.h>
5
6#include <QtGlobal>
7#if QT_VERSION >= QT_VERSION_CHECK(6,0,0)
8#include <QtSystemDetection>
9#endif
10#include <QString>
11
12#include "libmythbase/compat.h"
13#include "libmythbase/mthread.h"
17
18#include "DeviceReadBuffer.h"
19#include "mpeg/tspacket.h"
20
21#ifndef Q_OS_WINDOWS
22#include <sys/poll.h>
23#endif
24
25#ifdef Q_OS_WINDOWS
27#else
28void DeviceReadBuffer::setup_pipe(pipe_fd_array& mypipe, pipe_flag_array& myflags)
29{
30 int pipe_ret = pipe(mypipe.data());
31 if (pipe_ret < 0)
32 {
33 LOG(VB_GENERAL, LOG_ERR, "Failed to open pipes" + ENO);
34 mypipe.fill(-1);
35 }
36 else
37 {
38 errno = 0;
39 long flags = fcntl(mypipe[0], F_GETFL);
40 if (0 == errno)
41 {
42 int ret = fcntl(mypipe[0], F_SETFL, flags|O_NONBLOCK);
43 if (ret < 0)
44 LOG(VB_GENERAL, LOG_ERR,
45 QString("Set pipe flags error") + ENO);
46 }
47 else
48 {
49 LOG(VB_GENERAL, LOG_ERR, QString("Get pipe flags error") + ENO);
50 }
51
52 for (uint i = 0; i < 2; i++)
53 {
54 errno = 0;
55 flags = fcntl(mypipe[i], F_GETFL);
56 if (0 == errno)
57 myflags[i] = flags;
58 }
59 }
60}
61#endif
62
64#define REPORT_RING_STATS 0 // NOLINT(cppcoreguidelines-macro-usage)
65
66#define LOC QString("DevRdB(%1): ").arg(m_videoDevice)
67
69 DeviceReaderCB *cb, bool use_poll, bool error_exit_on_poll_timeout)
70 : MThread("DeviceReadBuffer"),
71 m_readerCB(cb),
72 m_usingPoll(use_poll),
73 m_pollTimeoutIsError(error_exit_on_poll_timeout)
74{
75#ifdef Q_OS_WINDOWS
76# if !defined(_MSC_VER) || __cplusplus >= 202302L
77# warning mingw DeviceReadBuffer::Poll is not implemented
78# endif
79 if (m_usingPoll)
80 {
81 LOG(VB_GENERAL, LOG_WARNING, LOC +
82 "mingw DeviceReadBuffer::Poll is not implemented");
83 m_usingPoll = false;
84 }
85#endif
86}
87
89{
90 Stop();
91 if (m_buffer)
92 {
93 delete[] m_buffer;
94 m_buffer = nullptr;
95 }
96}
97
98bool DeviceReadBuffer::Setup(const QString &streamName, int streamfd,
99 uint readQuanta, uint deviceBufferSize,
100 uint deviceBufferCount)
101{
102 QMutexLocker locker(&m_lock);
103
104 delete[] m_buffer;
105
106 m_videoDevice = streamName;
107 m_videoDevice = m_videoDevice.isNull() ? "" : m_videoDevice;
108 m_streamFd = streamfd;
109
110 // Setup device ringbuffer
111 m_eof = false;
112 m_error = false;
113 m_requestPause = false;
114 m_paused = false;
115
116 m_readQuanta = (readQuanta) ? readQuanta : m_readQuanta;
117 m_devBufferCount = deviceBufferCount;
119 "HDRingbufferSize", static_cast<int>(50 * m_readQuanta)) * 1024_UZ;
120 m_used = 0;
121 m_devReadSize = m_readQuanta * (m_usingPoll ? 256 : 48);
122 m_devReadSize = (deviceBufferSize) ?
123 std::min(m_devReadSize, (size_t)deviceBufferSize) : m_devReadSize;
125
126 m_buffer = new (std::nothrow) unsigned char[m_size + m_devReadSize];
129
130 // Initialize buffer, if it exists
131 if (!m_buffer)
132 {
133 m_endPtr = nullptr;
134 LOG(VB_GENERAL, LOG_ERR, LOC +
135 QString("Failed to allocate buffer of size %1 = %2 + %3")
136 .arg(m_size+m_devReadSize).arg(m_size).arg(m_devReadSize));
137 return false;
138 }
140 memset(m_buffer, 0xFF, m_size + m_readQuanta);
141
142 // Initialize statistics
143 m_maxUsed = 0;
144 m_avgUsed = 0;
146 m_avgBufReadCnt = 0;
149
150 LOG(VB_RECORD, LOG_INFO, LOC + QString("buffer size %1 KB").arg(m_size/1024));
151
152 return true;
153}
154
156{
157 LOG(VB_RECORD, LOG_INFO, LOC + "Start() -- begin");
158
159 QMutexLocker locker(&m_lock);
160 if (isRunning() || m_doRun)
161 {
162 m_doRun = false;
163 locker.unlock();
164 WakePoll();
165 wait();
166 locker.relock();
167 }
168
169 m_doRun = true;
170 m_error = false;
171 m_eof = false;
172
173 start();
174
175 LOG(VB_RECORD, LOG_INFO, LOC + "Start() -- middle");
176
177 while (m_doRun && !isRunning())
178 m_runWait.wait(locker.mutex(), 100);
179
180 LOG(VB_RECORD, LOG_INFO, LOC + "Start() -- end");
181}
182
183void DeviceReadBuffer::Reset(const QString &streamName, int streamfd)
184{
185 QMutexLocker locker(&m_lock);
186
187 m_videoDevice = streamName;
188 m_videoDevice = m_videoDevice.isNull() ? "" : m_videoDevice;
189 m_streamFd = streamfd;
190
191 m_used = 0;
194
195 m_error = false;
196}
197
199{
200 LOG(VB_RECORD, LOG_INFO, LOC + "Stop() -- begin");
201 QMutexLocker locker(&m_lock);
202 if (isRunning() || m_doRun)
203 {
204 m_doRun = false;
205 locker.unlock();
206 WakePoll();
207 wait();
208 }
209 LOG(VB_RECORD, LOG_INFO, LOC + "Stop() -- end");
210}
211
213{
214 QMutexLocker locker(&m_lock);
215 m_requestPause = req;
216 WakePoll();
217}
218
220{
221 QMutexLocker locker(&m_lock);
222 m_paused = val;
223 if (val)
224 m_pauseWait.wakeAll();
225 else
226 m_unpauseWait.wakeAll();
227}
228
229// The WakePoll code is copied from MythSocketThread::WakeReadyReadThread()
231{
232 std::string buf(1,'\0');
233 ssize_t wret = 0;
234 while (isRunning() && (wret <= 0) && (m_wakePipe[1] >= 0))
235 {
236 wret = ::write(m_wakePipe[1], buf.data(), buf.size());
237 if ((wret < 0) && (EAGAIN != errno) && (EINTR != errno))
238 {
239 LOG(VB_GENERAL, LOG_ERR, LOC + "WakePoll failed.");
240 ClosePipes();
241 break;
242 }
243 }
244}
245
247{
248 for (uint i = 0; i < 2; i++)
249 {
250 if (m_wakePipe[i] >= 0)
251 {
253 m_wakePipe[i] = -1;
254 m_wakePipeFlags[i] = 0;
255 }
256 }
257}
258
260{
261 QMutexLocker locker(&m_lock);
262 return m_paused;
263}
264
266{
267 QMutexLocker locker(&m_lock);
268
269 if (!m_paused)
270 m_pauseWait.wait(&m_lock, timeout);
271
272 return m_paused;
273}
274
276{
277 QMutexLocker locker(&m_lock);
278
279 if (m_paused)
281
282 return m_paused;
283}
284
286{
287 QMutexLocker locker(&m_lock);
288 return m_requestPause;
289}
290
292{
293 QMutexLocker locker(&m_lock);
294 return m_error;
295}
296
298{
299 QMutexLocker locker(&m_lock);
300 return m_eof;
301}
302
304{
305 QMutexLocker locker(&m_lock);
306 return isRunning();
307}
308
310{
311 QMutexLocker locker(&m_lock);
312 return m_size - m_used;
313}
314
316{
317 QMutexLocker locker(&m_lock);
318 return m_used;
319}
320
322{
323 QMutexLocker locker(&m_lock);
324 return m_endPtr - m_writePtr;
325}
326
328{
329 QMutexLocker locker(&m_lock);
330 m_used += len;
331 m_writePtr += len;
333#if REPORT_RING_STATS
334 m_maxUsed = std::max(m_used, m_maxUsed);
337#endif
338 m_dataWait.wakeAll();
339}
340
342{
343 QMutexLocker locker(&m_lock);
344 m_used -= len;
345 m_readPtr += len;
347#if REPORT_RING_STATS
349#endif
350}
351
353{
354 RunProlog();
355
356 uint errcnt = 0;
357 uint cnt = 0;
358 ssize_t read_len = 0;
359 size_t total = 0;
360 size_t throttle = m_devReadSize * m_devBufferCount / 2;
361
362 m_lock.lock();
363 m_runWait.wakeAll();
364 m_lock.unlock();
365
366 if (m_usingPoll)
368
369 while (m_doRun)
370 {
371 if (!HandlePausing())
372 continue;
373
374 if (!IsOpen())
375 {
376 usleep(5ms);
377 continue;
378 }
379
380 if (m_usingPoll && !Poll())
381 continue;
382
383 {
384 QMutexLocker locker(&m_lock);
385 if (m_error)
386 {
387 LOG(VB_RECORD, LOG_ERR, LOC + "fill_ringbuffer: error state");
388 break;
389 }
390 }
391
392 /* Some device drivers segment their buffer into small pieces,
393 * So allow for the reading of multiple buffers */
394 for (cnt = 0, read_len = 0, total = 0;
395 m_doRun && read_len >= 0 && cnt < m_devBufferCount; ++cnt)
396 {
397 // Limit read size for faster return from read
398 auto unused = static_cast<size_t>(WaitForUnused(m_readQuanta));
399 size_t read_size = std::min(m_devReadSize, unused);
400
401 // if read_size > 0 do the read...
402 if (read_size)
403 {
404 read_len = read(m_streamFd, m_writePtr, read_size);
405 if (!CheckForErrors(read_len, read_size, errcnt))
406 break;
407 errcnt = 0;
408
409 // if we wrote past the official end of the buffer,
410 // copy to start
411 if (m_writePtr + read_len > m_endPtr)
412 memcpy(m_buffer, m_endPtr, m_writePtr + read_len - m_endPtr);
413 IncrWritePointer(read_len);
414 total += read_len;
415 }
416 }
417 if (errcnt > 5)
418 break;
419
420 // Slow down reading if not under load
421 if (errcnt == 0 && total < throttle)
422 usleep(1ms);
423 }
424
425 ClosePipes();
426
427 m_lock.lock();
428 m_eof = true;
429 m_runWait.wakeAll();
430 m_dataWait.wakeAll();
431 m_pauseWait.wakeAll();
432 m_unpauseWait.wakeAll();
433 m_lock.unlock();
434
435 RunEpilog();
436}
437
439{
440 if (IsPauseRequested())
441 {
442 SetPaused(true);
443
444 if (m_readerCB)
446
447 usleep(5ms);
448 return false;
449 }
450 if (IsPaused())
451 {
453 SetPaused(false);
454 }
455 return true;
456}
457
459{
460#ifdef Q_OS_WINDOWS
461# ifdef _MSC_VER
462# pragma message( "mingw DeviceReadBuffer::Poll" )
463# else
464# warning mingw DeviceReadBuffer::Poll
465# endif
466 LOG(VB_GENERAL, LOG_ERR, LOC +
467 "mingw DeviceReadBuffer::Poll is not implemented");
468 return false;
469#else
470 bool retval = true;
471 MythTimer timer;
472 timer.start();
473
474 int poll_cnt = 1;
475 std::array<struct pollfd,2> polls {};
476
477 polls[0].fd = m_streamFd;
478 polls[0].events = POLLIN | POLLPRI;
479 polls[0].revents = 0;
480
481 if (m_wakePipe[0] >= 0)
482 {
483 poll_cnt = 2;
484 polls[1].fd = m_wakePipe[0];
485 polls[1].events = POLLIN;
486 polls[1].revents = 0;
487 }
488
489 while (true)
490 {
491 polls[0].revents = 0;
492 polls[1].revents = 0;
493 poll_cnt = (m_wakePipe[0] >= 0) ? poll_cnt : 1;
494
495 std::chrono::milliseconds timeout = m_maxPollWait;
496 if (1 == poll_cnt)
497 timeout = 10ms;
498 else if (m_pollTimeoutIsError)
499 // subtract a bit to allow processing time.
500 timeout = std::max(m_maxPollWait - timer.elapsed() - 15ms, 10ms);
501
502 int ret = poll(polls.data(), poll_cnt, timeout.count());
503
504 if (polls[0].revents & POLLHUP)
505 {
506 LOG(VB_GENERAL, LOG_ERR, LOC + "poll eof (POLLHUP)");
507 break;
508 }
509 if (polls[0].revents & POLLNVAL)
510 {
511 LOG(VB_GENERAL, LOG_ERR, LOC + "poll error" + ENO);
512 m_error = true;
513 return true;
514 }
515
516 if (!m_doRun || !IsOpen() || IsPauseRequested())
517 {
518 retval = false;
519 break; // are we supposed to pause, stop, etc.
520 }
521
522 if (polls[0].revents & POLLPRI)
523 {
524 m_readerCB->PriorityEvent(polls[0].fd);
525 }
526
527 if (polls[0].revents & POLLIN)
528 {
529 if (ret > 0)
530 break; // we have data to read :)
531 if (ret < 0)
532 {
533 if ((EOVERFLOW == errno))
534 break; // we have an error to handle
535
536 if ((EAGAIN == errno) || (EINTR == errno))
537 continue; // errors that tell you to try again
538
539 usleep(2.5ms);
540 }
541 else // ret == 0
542 {
544 (timer.elapsed() >= m_maxPollWait))
545 {
546 LOG(VB_GENERAL, LOG_ERR, LOC + "Poll giving up 1");
547 QMutexLocker locker(&m_lock);
548 m_error = true;
549 return true;
550 }
551 }
552 }
553
554 // Clear out any pending pipe reads
555 if ((poll_cnt > 1) && (polls[1].revents & POLLIN))
556 {
557 std::array<char,128> dummy {};
558 int cnt = (m_wakePipeFlags[0] & O_NONBLOCK) ? 128 : 1;
559 ::read(m_wakePipe[0], dummy.data(), cnt);
560 }
561
562 if (m_pollTimeoutIsError && (timer.elapsed() >= m_maxPollWait))
563 {
564 LOG(VB_GENERAL, LOG_ERR, LOC + QString("Poll giving up after %1ms")
565 .arg(m_maxPollWait.count()));
566 QMutexLocker locker(&m_lock);
567 m_error = true;
568 return true;
569 }
570 }
571
572 std::chrono::milliseconds e = timer.elapsed();
573 if (e > m_maxPollWait)
574 {
575 LOG(VB_GENERAL, LOG_WARNING, LOC +
576 QString("Poll took an unusually long time %1 ms")
577 .arg(timer.elapsed().count()));
578 }
579
580 return retval;
581#endif
582}
583
585 ssize_t len, size_t requested_len, uint &errcnt)
586{
587 if (len > (ssize_t)requested_len)
588 {
589 LOG(VB_GENERAL, LOG_ERR, LOC +
590 "Driver is returning bogus values on read");
591 if (++errcnt > 5)
592 {
593 LOG(VB_RECORD, LOG_ERR, LOC + "Too many errors.");
594 QMutexLocker locker(&m_lock);
595 m_error = true;
596 }
597 return false;
598 }
599
600#ifdef Q_OS_WINDOWS
601# ifdef _MSC_VER
602# pragma message( "mingw DeviceReadBuffer::CheckForErrors" )
603# else
604# warning mingw DeviceReadBuffer::CheckForErrors
605# endif
606 LOG(VB_GENERAL, LOG_ERR, LOC +
607 "mingw DeviceReadBuffer::CheckForErrors is not implemented");
608 return false;
609#else
610 if (len < 0)
611 {
612 if (EINTR == errno)
613 return false;
614 if (EAGAIN == errno)
615 {
616 usleep(2.5ms);
617 return false;
618 }
619 if (EOVERFLOW == errno)
620 {
621 LOG(VB_GENERAL, LOG_ERR, LOC + "Driver buffers overflowed");
622 return false;
623 }
624
625 LOG(VB_GENERAL, LOG_ERR, LOC +
626 QString("Problem reading fd(%1)").arg(m_streamFd) + ENO);
627
628 if (++errcnt > 5)
629 {
630 LOG(VB_RECORD, LOG_ERR, LOC + "Too many errors.");
631 QMutexLocker locker(&m_lock);
632 m_error = true;
633 return false;
634 }
635
636 usleep(500ms);
637 return false;
638 }
639 if (len == 0)
640 {
641 if (++errcnt > 5)
642 {
643 LOG(VB_GENERAL, LOG_ERR, LOC +
644 QString("End-Of-File? fd(%1)").arg(m_streamFd));
645
646 m_lock.lock();
647 m_eof = true;
648 m_lock.unlock();
649
650 return false;
651 }
652 usleep(500ms);
653 return false;
654 }
655 return true;
656#endif
657}
658
665uint DeviceReadBuffer::Read(unsigned char *buf, const uint count)
666{
667 uint avail = WaitForUsed(std::min(count, (uint)m_readThreshold), 20ms);
668 size_t cnt = std::min(count, avail);
669
670 if (!cnt)
671 return 0;
672
673 if (m_readPtr + cnt > m_endPtr)
674 {
675 // Process as two pieces
676 size_t len = m_endPtr - m_readPtr;
677 if (len)
678 {
679 memcpy(buf, m_readPtr, len);
680 buf += len;
681 IncrReadPointer(len);
682 }
683 if (cnt > len)
684 {
685 len = cnt - len;
686 memcpy(buf, m_readPtr, len);
687 IncrReadPointer(len);
688 }
689 }
690 else
691 {
692 memcpy(buf, m_readPtr, cnt);
693 IncrReadPointer(cnt);
694 }
695
696#if REPORT_RING_STATS
697 ReportStats();
698#endif
699
700 return cnt;
701}
702
708{
709 size_t unused = GetUnused();
710
711 if (unused > m_readQuanta)
712 {
713 while (unused < needed)
714 {
715 unused = GetUnused();
716 if (IsPauseRequested() || !IsOpen() || !m_doRun)
717 return 0;
718 usleep(5ms);
719 }
720 if (IsPauseRequested() || !IsOpen() || !m_doRun)
721 return 0;
722 unused = GetUnused();
723 }
724
725 return unused;
726}
727
733uint DeviceReadBuffer::WaitForUsed(uint needed, std::chrono::milliseconds max_wait) const
734{
735 MythTimer timer;
736 timer.start();
737
738 QMutexLocker locker(&m_lock);
739 size_t avail = m_used;
740 while ((needed > avail) && isRunning() &&
741 !m_requestPause && !m_error && !m_eof &&
742 (timer.elapsed() < max_wait))
743 {
744 m_dataWait.wait(locker.mutex(), 10);
745 avail = m_used;
746 }
747 return avail;
748}
749
751{
752#if REPORT_RING_STATS
753 static constexpr std::chrono::seconds secs { 20s }; // msg every 20 seconds
754 static constexpr double d1_s = 1.0 / secs.count();
755 if (m_lastReport.elapsed() > duration_cast<std::chrono::milliseconds>(secs))
756 {
757 QMutexLocker locker(&m_lock);
758 double rsize = 100.0 / m_size;
759 QString msg = QString("fill avg(%1%) ").arg(m_avgUsed*rsize,5,'f',2);
760 msg += QString("fill max(%1%) ").arg(m_maxUsed*rsize,5,'f',2);
761 msg += QString("writes/sec(%1) ").arg(m_avgBufWriteCnt*d1_s);
762 msg += QString("reads/sec(%1) ").arg(m_avgBufReadCnt*d1_s);
763 msg += QString("sleeps/sec(%1)").arg(m_avgBufSleepCnt*d1_s);
764
765 m_avgUsed = 0;
767 m_avgBufReadCnt = 0;
769 m_maxUsed = 0;
771
772 LOG(VB_GENERAL, LOG_INFO, LOC + msg);
773 }
774#endif
775}
776
777/*
778 * vim:ts=4:sw=4:ai:et:si:sts=4
779 */
#define LOC
void ClosePipes(void) const
bool IsRunning(void) const
uint GetUsed(void) const
bool WaitForUnpause(unsigned long timeout)
std::array< long, 2 > pipe_flag_array
unsigned char * m_writePtr
DeviceReadBuffer(DeviceReaderCB *cb, bool use_poll=true, bool error_exit_on_poll_timeout=true)
bool IsPauseRequested(void) const
volatile bool m_doRun
unsigned char * m_readPtr
QWaitCondition m_pauseWait
QWaitCondition m_runWait
unsigned char * m_endPtr
bool Setup(const QString &streamName, int streamfd, uint readQuanta=sizeof(TSPacket), uint deviceBufferSize=0, uint deviceBufferCount=1)
void SetRequestPause(bool request)
uint WaitForUnused(uint needed) const
bool IsOpen(void) const
std::chrono::milliseconds m_maxPollWait
uint Read(unsigned char *buf, uint count)
Try to Read count bytes from into buffer.
std::array< int, 2 > pipe_fd_array
~DeviceReadBuffer() override
static void setup_pipe(pipe_fd_array &mypipe, pipe_flag_array &myflags)
bool IsPaused(void) const
void IncrWritePointer(uint len)
pipe_flag_array m_wakePipeFlags
QWaitCondition m_dataWait
QWaitCondition m_unpauseWait
bool Poll(void) const
bool IsErrored(void) const
bool IsEOF(void) const
unsigned char * m_buffer
void WakePoll(void) const
DeviceReaderCB * m_readerCB
void Reset(const QString &streamName, int streamfd)
bool CheckForErrors(ssize_t read_len, size_t requested_len, uint &errcnt)
uint GetUnused(void) const
uint GetContiguousUnused(void) const
uint WaitForUsed(uint needed, std::chrono::milliseconds max_wait) const
void SetPaused(bool val)
pipe_fd_array m_wakePipe
bool WaitForPaused(unsigned long timeout)
void run(void) override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.
void IncrReadPointer(uint len)
virtual void PriorityEvent(int fd)=0
virtual void ReaderPaused(int fd)=0
This is a wrapper around QThread that does several additional things.
Definition: mthread.h:49
bool isRunning(void) const
Definition: mthread.cpp:263
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:196
static void usleep(std::chrono::microseconds time)
Definition: mthread.cpp:335
void start(QThread::Priority p=QThread::InheritPriority)
Tell MThread to start running the thread in the near future.
Definition: mthread.cpp:283
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:209
bool wait(std::chrono::milliseconds time=std::chrono::milliseconds::max())
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:300
int GetNumSetting(const QString &key, int defaultval=0)
A QElapsedTimer based timer to replace use of QTime as a timer.
Definition: mythtimer.h:14
std::chrono::milliseconds elapsed(void)
Returns milliseconds elapsed since last start() or restart()
Definition: mythtimer.cpp:91
void start(void)
starts measuring elapsed time.
Definition: mythtimer.cpp:47
#define O_NONBLOCK
Definition: compat.h:215
unsigned int uint
Definition: compat.h:68
#define close
Definition: compat.h:31
MythCoreContext * gCoreContext
This global variable contains the MythCoreContext instance for the app.
#define ENO
This can be appended to the LOG args with "+".
Definition: mythlogging.h:74
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
Definition: mythlogging.h:39
def read(device=None, features=[])
Definition: disc.py:35
def write(text, progress=True)
Definition: mythburn.py:307