MythTV master
DeviceReadBuffer.cpp
Go to the documentation of this file.
1#include <algorithm>
2#include <cerrno>
3#include <thread>
4
5#include <fcntl.h>
6#include <sys/types.h>
7
8#include <QtGlobal>
9#if QT_VERSION >= QT_VERSION_CHECK(6,5,0)
10#include <QtSystemDetection>
11#endif
12#include <QString>
13
14#include "libmythbase/compat.h"
15#include "libmythbase/mthread.h"
18#ifndef __cpp_size_t_suffix
20#endif
21
22#include "DeviceReadBuffer.h"
23#include "mpeg/tspacket.h"
24
25#ifndef Q_OS_WINDOWS
26#include <sys/poll.h>
27#endif
28
29#ifdef Q_OS_WINDOWS
31#else
32void DeviceReadBuffer::setup_pipe(pipe_fd_array& mypipe, pipe_flag_array& myflags)
33{
34 int pipe_ret = pipe(mypipe.data());
35 if (pipe_ret < 0)
36 {
37 LOG(VB_GENERAL, LOG_ERR, "Failed to open pipes" + ENO);
38 mypipe.fill(-1);
39 }
40 else
41 {
42 errno = 0;
43 long flags = fcntl(mypipe[0], F_GETFL);
44 if (0 == errno)
45 {
46 int ret = fcntl(mypipe[0], F_SETFL, flags|O_NONBLOCK);
47 if (ret < 0)
48 LOG(VB_GENERAL, LOG_ERR,
49 QString("Set pipe flags error") + ENO);
50 }
51 else
52 {
53 LOG(VB_GENERAL, LOG_ERR, QString("Get pipe flags error") + ENO);
54 }
55
56 for (uint i = 0; i < 2; i++)
57 {
58 errno = 0;
59 flags = fcntl(mypipe[i], F_GETFL);
60 if (0 == errno)
61 myflags[i] = flags;
62 }
63 }
64}
65#endif
66
68#define REPORT_RING_STATS 0 // NOLINT(cppcoreguidelines-macro-usage)
69
70#define LOC QString("DevRdB(%1): ").arg(m_videoDevice)
71
73 DeviceReaderCB *cb, bool use_poll, bool error_exit_on_poll_timeout)
74 : MThread("DeviceReadBuffer"),
75 m_readerCB(cb),
76 m_usingPoll(use_poll),
77 m_pollTimeoutIsError(error_exit_on_poll_timeout)
78{
79#ifdef Q_OS_WINDOWS
80# warning mingw DeviceReadBuffer::Poll is not implemented
81 if (m_usingPoll)
82 {
83 LOG(VB_GENERAL, LOG_WARNING, LOC +
84 "mingw DeviceReadBuffer::Poll is not implemented");
85 m_usingPoll = false;
86 }
87#endif
88}
89
91{
92 Stop();
93 if (m_buffer)
94 {
95 delete[] m_buffer;
96 m_buffer = nullptr;
97 }
98}
99
100bool DeviceReadBuffer::Setup(const QString &streamName, int streamfd,
101 uint readQuanta, uint deviceBufferSize,
102 uint deviceBufferCount)
103{
104 QMutexLocker locker(&m_lock);
105
106 delete[] m_buffer;
107
108 m_videoDevice = streamName;
109 m_videoDevice = m_videoDevice.isNull() ? "" : m_videoDevice;
110 m_streamFd = streamfd;
111
112 // Setup device ringbuffer
113 m_eof = false;
114 m_error = false;
115 m_requestPause = false;
116 m_paused = false;
117
118 m_readQuanta = (readQuanta) ? readQuanta : m_readQuanta;
119 m_devBufferCount = deviceBufferCount;
120#ifdef __cpp_size_t_suffix
122 "HDRingbufferSize", static_cast<int>(50 * m_readQuanta)) * 1024UZ;
123#else
125 "HDRingbufferSize", static_cast<int>(50 * m_readQuanta)) * 1024_UZ;
126#endif
127 m_used = 0;
128 m_devReadSize = m_readQuanta * (m_usingPoll ? 256 : 48);
129 m_devReadSize = (deviceBufferSize) ?
130 std::min(m_devReadSize, (size_t)deviceBufferSize) : m_devReadSize;
132
133 m_buffer = new (std::nothrow) unsigned char[m_size + m_devReadSize];
136
137 // Initialize buffer, if it exists
138 if (!m_buffer)
139 {
140 m_endPtr = nullptr;
141 LOG(VB_GENERAL, LOG_ERR, LOC +
142 QString("Failed to allocate buffer of size %1 = %2 + %3")
143 .arg(m_size+m_devReadSize).arg(m_size).arg(m_devReadSize));
144 return false;
145 }
147 memset(m_buffer, 0xFF, m_size + m_readQuanta);
148
149 // Initialize statistics
150 m_maxUsed = 0;
151 m_avgUsed = 0;
153 m_avgBufReadCnt = 0;
156
157 LOG(VB_RECORD, LOG_INFO, LOC + QString("buffer size %1 KB").arg(m_size/1024));
158
159 return true;
160}
161
163{
164 LOG(VB_RECORD, LOG_INFO, LOC + "Start() -- begin");
165
166 QMutexLocker locker(&m_lock);
167 if (isRunning() || m_doRun)
168 {
169 m_doRun = false;
170 locker.unlock();
171 WakePoll();
172 wait();
173 locker.relock();
174 }
175
176 m_doRun = true;
177 m_error = false;
178 m_eof = false;
179
180 start();
181
182 LOG(VB_RECORD, LOG_INFO, LOC + "Start() -- middle");
183
184 while (m_doRun && !isRunning())
185 m_runWait.wait(locker.mutex(), 100);
186
187 LOG(VB_RECORD, LOG_INFO, LOC + "Start() -- end");
188}
189
190void DeviceReadBuffer::Reset(const QString &streamName, int streamfd)
191{
192 QMutexLocker locker(&m_lock);
193
194 m_videoDevice = streamName;
195 m_videoDevice = m_videoDevice.isNull() ? "" : m_videoDevice;
196 m_streamFd = streamfd;
197
198 m_used = 0;
201
202 m_error = false;
203}
204
206{
207 LOG(VB_RECORD, LOG_INFO, LOC + "Stop() -- begin");
208 QMutexLocker locker(&m_lock);
209 if (isRunning() || m_doRun)
210 {
211 m_doRun = false;
212 locker.unlock();
213 WakePoll();
214 wait();
215 }
216 LOG(VB_RECORD, LOG_INFO, LOC + "Stop() -- end");
217}
218
220{
221 QMutexLocker locker(&m_lock);
222 m_requestPause = req;
223 WakePoll();
224}
225
227{
228 QMutexLocker locker(&m_lock);
229 m_paused = val;
230 if (val)
231 m_pauseWait.wakeAll();
232 else
233 m_unpauseWait.wakeAll();
234}
235
236// The WakePoll code is copied from MythSocketThread::WakeReadyReadThread()
238{
239 std::string buf(1,'\0');
240 ssize_t wret = 0;
241 while (isRunning() && (wret <= 0) && (m_wakePipe[1] >= 0))
242 {
243 wret = ::write(m_wakePipe[1], buf.data(), buf.size());
244 if ((wret < 0) && (EAGAIN != errno) && (EINTR != errno))
245 {
246 LOG(VB_GENERAL, LOG_ERR, LOC + "WakePoll failed.");
247 ClosePipes();
248 break;
249 }
250 }
251}
252
254{
255 for (uint i = 0; i < 2; i++)
256 {
257 if (m_wakePipe[i] >= 0)
258 {
260 m_wakePipe[i] = -1;
261 m_wakePipeFlags[i] = 0;
262 }
263 }
264}
265
267{
268 QMutexLocker locker(&m_lock);
269 return m_paused;
270}
271
273{
274 QMutexLocker locker(&m_lock);
275
276 if (!m_paused)
277 m_pauseWait.wait(&m_lock, timeout);
278
279 return m_paused;
280}
281
283{
284 QMutexLocker locker(&m_lock);
285
286 if (m_paused)
288
289 return m_paused;
290}
291
293{
294 QMutexLocker locker(&m_lock);
295 return m_requestPause;
296}
297
299{
300 QMutexLocker locker(&m_lock);
301 return m_error;
302}
303
305{
306 QMutexLocker locker(&m_lock);
307 return m_eof;
308}
309
311{
312 QMutexLocker locker(&m_lock);
313 return isRunning();
314}
315
317{
318 QMutexLocker locker(&m_lock);
319 return m_size - m_used;
320}
321
323{
324 QMutexLocker locker(&m_lock);
325 return m_used;
326}
327
329{
330 QMutexLocker locker(&m_lock);
331 return m_endPtr - m_writePtr;
332}
333
335{
336 QMutexLocker locker(&m_lock);
337 m_used += len;
338 m_writePtr += len;
340#if REPORT_RING_STATS
341 m_maxUsed = std::max(m_used, m_maxUsed);
344#endif
345 m_dataWait.wakeAll();
346}
347
349{
350 QMutexLocker locker(&m_lock);
351 m_used -= len;
352 m_readPtr += len;
354#if REPORT_RING_STATS
356#endif
357}
358
360{
361 RunProlog();
362
363 uint errcnt = 0;
364 uint cnt = 0;
365 ssize_t read_len = 0;
366 size_t total = 0;
367 size_t throttle = m_devReadSize * m_devBufferCount / 2;
368
369 m_lock.lock();
370 m_runWait.wakeAll();
371 m_lock.unlock();
372
373 if (m_usingPoll)
375
376 while (m_doRun)
377 {
378 if (!HandlePausing())
379 continue;
380
381 if (!IsOpen())
382 {
383 std::this_thread::sleep_for(5ms);
384 continue;
385 }
386
387 if (m_usingPoll && !Poll())
388 continue;
389
390 {
391 QMutexLocker locker(&m_lock);
392 if (m_error)
393 {
394 LOG(VB_RECORD, LOG_ERR, LOC + "fill_ringbuffer: error state");
395 break;
396 }
397 }
398
399 /* Some device drivers segment their buffer into small pieces,
400 * So allow for the reading of multiple buffers */
401 for (cnt = 0, read_len = 0, total = 0;
402 m_doRun && read_len >= 0 && cnt < m_devBufferCount; ++cnt)
403 {
404 // Limit read size for faster return from read
405 auto unused = static_cast<size_t>(WaitForUnused(m_readQuanta));
406 size_t read_size = std::min(m_devReadSize, unused);
407
408 // if read_size > 0 do the read...
409 if (read_size)
410 {
411 read_len = read(m_streamFd, m_writePtr, read_size);
412 if (!CheckForErrors(read_len, read_size, errcnt))
413 break;
414 errcnt = 0;
415
416 // if we wrote past the official end of the buffer,
417 // copy to start
418 if (m_writePtr + read_len > m_endPtr)
419 memcpy(m_buffer, m_endPtr, m_writePtr + read_len - m_endPtr);
420 IncrWritePointer(read_len);
421 total += read_len;
422 }
423 }
424 if (errcnt > 5)
425 break;
426
427 // Slow down reading if not under load
428 if (errcnt == 0 && total < throttle)
429 std::this_thread::sleep_for(1ms);
430 }
431
432 ClosePipes();
433
434 m_lock.lock();
435 m_eof = true;
436 m_runWait.wakeAll();
437 m_dataWait.wakeAll();
438 m_pauseWait.wakeAll();
439 m_unpauseWait.wakeAll();
440 m_lock.unlock();
441
442 RunEpilog();
443}
444
446{
447 if (IsPauseRequested())
448 {
449 SetPaused(true);
450
451 if (m_readerCB)
453
454 std::this_thread::sleep_for(5ms);
455 return false;
456 }
457 if (IsPaused())
458 {
460 SetPaused(false);
461 }
462 return true;
463}
464
466{
467#ifdef Q_OS_WINDOWS
468# warning mingw DeviceReadBuffer::Poll
469 LOG(VB_GENERAL, LOG_ERR, LOC +
470 "mingw DeviceReadBuffer::Poll is not implemented");
471 return false;
472#else
473 bool retval = true;
474 MythTimer timer;
475 timer.start();
476
477 int poll_cnt = 1;
478 std::array<struct pollfd,2> polls {};
479
480 polls[0].fd = m_streamFd;
481 polls[0].events = POLLIN | POLLPRI;
482 polls[0].revents = 0;
483
484 if (m_wakePipe[0] >= 0)
485 {
486 poll_cnt = 2;
487 polls[1].fd = m_wakePipe[0];
488 polls[1].events = POLLIN;
489 polls[1].revents = 0;
490 }
491
492 while (true)
493 {
494 polls[0].revents = 0;
495 polls[1].revents = 0;
496 poll_cnt = (m_wakePipe[0] >= 0) ? poll_cnt : 1;
497
498 std::chrono::milliseconds timeout = m_maxPollWait;
499 if (1 == poll_cnt)
500 timeout = 10ms;
501 else if (m_pollTimeoutIsError)
502 // subtract a bit to allow processing time.
503 timeout = std::max(m_maxPollWait - timer.elapsed() - 15ms, 10ms);
504
505 int ret = poll(polls.data(), poll_cnt, timeout.count());
506
507 if (polls[0].revents & POLLHUP)
508 {
509 LOG(VB_GENERAL, LOG_ERR, LOC + "poll eof (POLLHUP)");
510 break;
511 }
512 if (polls[0].revents & POLLNVAL)
513 {
514 LOG(VB_GENERAL, LOG_ERR, LOC + "poll error" + ENO);
515 m_error = true;
516 return true;
517 }
518
519 if (!m_doRun || !IsOpen() || IsPauseRequested())
520 {
521 retval = false;
522 break; // are we supposed to pause, stop, etc.
523 }
524
525 if (polls[0].revents & POLLPRI)
526 {
527 m_readerCB->PriorityEvent(polls[0].fd);
528 }
529
530 if (polls[0].revents & POLLIN)
531 {
532 if (ret > 0)
533 break; // we have data to read :)
534 if (ret < 0)
535 {
536 if ((EOVERFLOW == errno))
537 break; // we have an error to handle
538
539 if ((EAGAIN == errno) || (EINTR == errno))
540 continue; // errors that tell you to try again
541
542 std::this_thread::sleep_for(2500us);
543 }
544 else // ret == 0
545 {
547 (timer.elapsed() >= m_maxPollWait))
548 {
549 LOG(VB_GENERAL, LOG_ERR, LOC + "Poll giving up 1");
550 QMutexLocker locker(&m_lock);
551 m_error = true;
552 return true;
553 }
554 }
555 }
556
557 // Clear out any pending pipe reads
558 if ((poll_cnt > 1) && (polls[1].revents & POLLIN))
559 {
560 std::array<char,128> dummy {};
561 int cnt = (m_wakePipeFlags[0] & O_NONBLOCK) ? 128 : 1;
562 ::read(m_wakePipe[0], dummy.data(), cnt);
563 }
564
565 if (m_pollTimeoutIsError && (timer.elapsed() >= m_maxPollWait))
566 {
567 LOG(VB_GENERAL, LOG_ERR, LOC + QString("Poll giving up after %1ms")
568 .arg(m_maxPollWait.count()));
569 QMutexLocker locker(&m_lock);
570 m_error = true;
571 return true;
572 }
573 }
574
575 std::chrono::milliseconds e = timer.elapsed();
576 if (e > m_maxPollWait)
577 {
578 LOG(VB_GENERAL, LOG_WARNING, LOC +
579 QString("Poll took an unusually long time %1 ms")
580 .arg(timer.elapsed().count()));
581 }
582
583 return retval;
584#endif
585}
586
588 ssize_t len, size_t requested_len, uint &errcnt)
589{
590 if (len > (ssize_t)requested_len)
591 {
592 LOG(VB_GENERAL, LOG_ERR, LOC +
593 "Driver is returning bogus values on read");
594 if (++errcnt > 5)
595 {
596 LOG(VB_RECORD, LOG_ERR, LOC + "Too many errors.");
597 QMutexLocker locker(&m_lock);
598 m_error = true;
599 }
600 return false;
601 }
602
603#ifdef Q_OS_WINDOWS
604# warning mingw DeviceReadBuffer::CheckForErrors
605 LOG(VB_GENERAL, LOG_ERR, LOC +
606 "mingw DeviceReadBuffer::CheckForErrors is not implemented");
607 return false;
608#else
609 if (len < 0)
610 {
611 if (EINTR == errno)
612 return false;
613 if (EAGAIN == errno)
614 {
615 std::this_thread::sleep_for(2500us);
616 return false;
617 }
618 if (EOVERFLOW == errno)
619 {
620 LOG(VB_GENERAL, LOG_ERR, LOC + "Driver buffers overflowed");
621 return false;
622 }
623
624 LOG(VB_GENERAL, LOG_ERR, LOC +
625 QString("Problem reading fd(%1)").arg(m_streamFd) + ENO);
626
627 if (++errcnt > 5)
628 {
629 LOG(VB_RECORD, LOG_ERR, LOC + "Too many errors.");
630 QMutexLocker locker(&m_lock);
631 m_error = true;
632 return false;
633 }
634
635 std::this_thread::sleep_for(500ms);
636 return false;
637 }
638 if (len == 0)
639 {
640 if (++errcnt > 5)
641 {
642 LOG(VB_GENERAL, LOG_ERR, LOC +
643 QString("End-Of-File? fd(%1)").arg(m_streamFd));
644
645 m_lock.lock();
646 m_eof = true;
647 m_lock.unlock();
648
649 return false;
650 }
651 std::this_thread::sleep_for(500ms);
652 return false;
653 }
654 return true;
655#endif
656}
657
664uint DeviceReadBuffer::Read(unsigned char *buf, const uint count)
665{
666 uint avail = WaitForUsed(std::min(count, (uint)m_readThreshold), 20ms);
667 size_t cnt = std::min(count, avail);
668
669 if (!cnt)
670 return 0;
671
672 if (m_readPtr + cnt > m_endPtr)
673 {
674 // Process as two pieces
675 size_t len = m_endPtr - m_readPtr;
676 if (len)
677 {
678 memcpy(buf, m_readPtr, len);
679 buf += len;
680 IncrReadPointer(len);
681 }
682 if (cnt > len)
683 {
684 len = cnt - len;
685 memcpy(buf, m_readPtr, len);
686 IncrReadPointer(len);
687 }
688 }
689 else
690 {
691 memcpy(buf, m_readPtr, cnt);
692 IncrReadPointer(cnt);
693 }
694
695#if REPORT_RING_STATS
696 ReportStats();
697#endif
698
699 return cnt;
700}
701
707{
708 size_t unused = GetUnused();
709
710 if (unused > m_readQuanta)
711 {
712 while (unused < needed)
713 {
714 unused = GetUnused();
715 if (IsPauseRequested() || !IsOpen() || !m_doRun)
716 return 0;
717 std::this_thread::sleep_for(5ms);
718 }
719 if (IsPauseRequested() || !IsOpen() || !m_doRun)
720 return 0;
721 unused = GetUnused();
722 }
723
724 return unused;
725}
726
732uint DeviceReadBuffer::WaitForUsed(uint needed, std::chrono::milliseconds max_wait) const
733{
734 MythTimer timer;
735 timer.start();
736
737 QMutexLocker locker(&m_lock);
738 size_t avail = m_used;
739 while ((needed > avail) && isRunning() &&
740 !m_requestPause && !m_error && !m_eof &&
741 (timer.elapsed() < max_wait))
742 {
743 m_dataWait.wait(locker.mutex(), 10);
744 avail = m_used;
745 }
746 return avail;
747}
748
750{
751#if REPORT_RING_STATS
752 static constexpr std::chrono::seconds secs { 20s }; // msg every 20 seconds
753 static constexpr double d1_s = 1.0 / secs.count();
754 if (m_lastReport.elapsed() > duration_cast<std::chrono::milliseconds>(secs))
755 {
756 QMutexLocker locker(&m_lock);
757 double rsize = 100.0 / m_size;
758 QString msg = QString("fill avg(%1%) ").arg(m_avgUsed*rsize,5,'f',2);
759 msg += QString("fill max(%1%) ").arg(m_maxUsed*rsize,5,'f',2);
760 msg += QString("writes/sec(%1) ").arg(m_avgBufWriteCnt*d1_s);
761 msg += QString("reads/sec(%1) ").arg(m_avgBufReadCnt*d1_s);
762 msg += QString("sleeps/sec(%1)").arg(m_avgBufSleepCnt*d1_s);
763
764 m_avgUsed = 0;
766 m_avgBufReadCnt = 0;
768 m_maxUsed = 0;
770
771 LOG(VB_GENERAL, LOG_INFO, LOC + msg);
772 }
773#endif
774}
775
776/*
777 * vim:ts=4:sw=4:ai:et:si:sts=4
778 */
#define LOC
void ClosePipes(void) const
bool IsRunning(void) const
uint GetUsed(void) const
bool WaitForUnpause(unsigned long timeout)
std::array< long, 2 > pipe_flag_array
unsigned char * m_writePtr
DeviceReadBuffer(DeviceReaderCB *cb, bool use_poll=true, bool error_exit_on_poll_timeout=true)
bool IsPauseRequested(void) const
volatile bool m_doRun
unsigned char * m_readPtr
QWaitCondition m_pauseWait
QWaitCondition m_runWait
unsigned char * m_endPtr
bool Setup(const QString &streamName, int streamfd, uint readQuanta=sizeof(TSPacket), uint deviceBufferSize=0, uint deviceBufferCount=1)
void SetRequestPause(bool request)
uint WaitForUnused(uint needed) const
bool IsOpen(void) const
std::chrono::milliseconds m_maxPollWait
uint Read(unsigned char *buf, uint count)
Try to Read count bytes from into buffer.
std::array< int, 2 > pipe_fd_array
~DeviceReadBuffer() override
static void setup_pipe(pipe_fd_array &mypipe, pipe_flag_array &myflags)
bool IsPaused(void) const
void IncrWritePointer(uint len)
pipe_flag_array m_wakePipeFlags
QWaitCondition m_dataWait
QWaitCondition m_unpauseWait
bool Poll(void) const
bool IsErrored(void) const
bool IsEOF(void) const
unsigned char * m_buffer
void WakePoll(void) const
DeviceReaderCB * m_readerCB
void Reset(const QString &streamName, int streamfd)
bool CheckForErrors(ssize_t read_len, size_t requested_len, uint &errcnt)
uint GetUnused(void) const
uint GetContiguousUnused(void) const
uint WaitForUsed(uint needed, std::chrono::milliseconds max_wait) const
void SetPaused(bool val)
pipe_fd_array m_wakePipe
bool WaitForPaused(unsigned long timeout)
void run(void) override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.
void IncrReadPointer(uint len)
virtual void PriorityEvent(int fd)=0
virtual void ReaderPaused(int fd)=0
This is a wrapper around QThread that does several additional things.
Definition: mthread.h:49
bool isRunning(void) const
Definition: mthread.cpp:261
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:194
void start(QThread::Priority p=QThread::InheritPriority)
Tell MThread to start running the thread in the near future.
Definition: mthread.cpp:281
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:207
bool wait(std::chrono::milliseconds time=std::chrono::milliseconds::max())
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:298
int GetNumSetting(const QString &key, int defaultval=0)
A QElapsedTimer based timer to replace use of QTime as a timer.
Definition: mythtimer.h:14
std::chrono::milliseconds elapsed(void)
Returns milliseconds elapsed since last start() or restart()
Definition: mythtimer.cpp:91
void start(void)
starts measuring elapsed time.
Definition: mythtimer.cpp:47
#define O_NONBLOCK
Definition: compat.h:142
unsigned int uint
Definition: compat.h:60
#define close
Definition: compat.h:28
MythCoreContext * gCoreContext
This global variable contains the MythCoreContext instance for the app.
#define ENO
This can be appended to the LOG args with "+".
Definition: mythlogging.h:74
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
Definition: mythlogging.h:39
def read(device=None, features=[])
Definition: disc.py:35
def write(text, progress=True)
Definition: mythburn.py:306