MythTV master
ExternalStreamHandler.cpp
Go to the documentation of this file.
1// -*- Mode: c++ -*-
2
3// POSIX headers
4#include <thread>
5#include <iostream>
6#include <fcntl.h>
7#include <unistd.h>
8#include <algorithm>
9#ifndef _WIN32
10#include <poll.h>
11#include <sys/ioctl.h>
12#endif
13
14#include <QtGlobal>
15#ifdef Q_OS_ANDROID
16#include <sys/wait.h>
17#endif
18
19// Qt headers
20#include <QString>
21#include <QFile>
22#include <QJsonDocument>
23#include <QJsonObject>
24
25// MythTV headers
26#include "config.h"
30
31#include "ExternalChannel.h"
33//#include "ThreadedFileWriter.h"
34#include "cardutil.h"
35#include "dtvsignalmonitor.h"
36#include "mpeg/mpegstreamdata.h"
38
39#define LOC QString("ExternSH[%1](%2): ").arg(m_inputId).arg(m_loc)
40
41ExternIO::ExternIO(const QString & app,
42 const QStringList & args)
43 : m_app(QFileInfo(app)),
44 m_status(&m_statusBuf, QIODevice::ReadWrite)
45{
46 if (!m_app.exists())
47 {
48 m_error = QString("ExternIO: '%1' does not exist.").arg(app);
49 return;
50 }
51 if (!m_app.isReadable() || !m_app.isFile())
52 {
53 m_error = QString("ExternIO: '%1' is not readable.")
54 .arg(m_app.canonicalFilePath());
55 return;
56 }
57 if (!m_app.isExecutable())
58 {
59 m_error = QString("ExternIO: '%1' is not executable.")
60 .arg(m_app.canonicalFilePath());
61 return;
62 }
63
64 m_args = args;
65 m_args.prepend(m_app.baseName());
66
67 m_status.setString(&m_statusBuf);
68}
69
71{
75
76 // waitpid(m_pid, &status, 0);
77 delete[] m_buffer;
78}
79
80bool ExternIO::Ready([[maybe_unused]] int fd,
81 [[maybe_unused]] std::chrono::milliseconds timeout,
82 [[maybe_unused]] const QString & what)
83{
84#ifndef _WIN32
85 std::array<struct pollfd,2> m_poll {};
86
87 m_poll[0].fd = fd;
88 m_poll[0].events = POLLIN | POLLPRI;
89 int ret = poll(m_poll.data(), 1, timeout.count());
90
91 if (m_poll[0].revents & POLLHUP)
92 {
93 m_error = what + " poll eof (POLLHUP)";
94 return false;
95 }
96 if (m_poll[0].revents & POLLNVAL)
97 {
98 LOG(VB_GENERAL, LOG_ERR, "poll error");
99 return false;
100 }
101 if (m_poll[0].revents & POLLIN)
102 {
103 if (ret > 0)
104 return true;
105
106 if ((EOVERFLOW == errno))
107 m_error = "poll overflow";
108 return false;
109 }
110#endif // !defined( _WIN32 )
111 return false;
112}
113
114int ExternIO::Read(QByteArray & buffer, int maxlen, std::chrono::milliseconds timeout)
115{
116 if (Error())
117 {
118 LOG(VB_RECORD, LOG_ERR,
119 QString("ExternIO::Read: already in error state: '%1'")
120 .arg(m_error));
121 return 0;
122 }
123
124 if (!Ready(m_appOut, timeout, "data"))
125 return 0;
126
127 if (m_bufSize < maxlen)
128 {
129 m_bufSize = maxlen;
130 delete [] m_buffer;
131 m_buffer = new char[m_bufSize];
132 }
133
134 int len = read(m_appOut, m_buffer, maxlen);
135
136 if (len < 0)
137 {
138 if (errno == EAGAIN)
139 {
140 if (++m_errCnt > kMaxErrorCnt)
141 {
142 m_error = "Failed to read from External Recorder: " + ENO;
143 LOG(VB_RECORD, LOG_WARNING,
144 "External Recorder not ready. Giving up.");
145 }
146 else
147 {
148 LOG(VB_RECORD, LOG_WARNING,
149 QString("External Recorder not ready. Will retry (%1/%2).")
150 .arg(m_errCnt).arg(kMaxErrorCnt));
151 std::this_thread::sleep_for(100ms);
152 }
153 }
154 else
155 {
156 m_error = "Failed to read from External Recorder: " + ENO;
157 LOG(VB_RECORD, LOG_ERR, m_error);
158 }
159 }
160 else
161 {
162 m_errCnt = 0;
163 }
164
165 if (len == 0)
166 return 0;
167
168 buffer.append(m_buffer, len);
169
170 LOG(VB_RECORD, LOG_DEBUG,
171 QString("ExternIO::Read '%1' bytes, buffer size %2")
172 .arg(len).arg(buffer.size()));
173
174 return len;
175}
176
177QByteArray ExternIO::GetStatus(std::chrono::milliseconds timeout)
178{
179 if (Error())
180 {
181 LOG(VB_RECORD, LOG_ERR,
182 QString("ExternIO::GetStatus: already in error state: '%1'")
183 .arg(m_error));
184 return {};
185 }
186
187 std::chrono::milliseconds waitfor = m_status.atEnd() ? timeout : 0ms;
188 if (Ready(m_appErr, waitfor, "status"))
189 {
190 std::array<char,2048> buffer {};
191 int len = read(m_appErr, buffer.data(), buffer.size());
192 m_status << QString::fromLatin1(buffer.data(), len);
193 }
194
195 if (m_status.atEnd())
196 return {};
197
198 QString msg = m_status.readLine();
199
200 LOG(VB_RECORD, LOG_DEBUG, QString("ExternIO::GetStatus '%1'")
201 .arg(msg));
202
203 return msg.toUtf8();
204}
205
206int ExternIO::Write(const QByteArray & buffer)
207{
208 if (Error())
209 {
210 LOG(VB_RECORD, LOG_ERR,
211 QString("ExternIO::Write: already in error state: '%1'")
212 .arg(m_error));
213 return -1;
214 }
215
216 LOG(VB_RECORD, LOG_DEBUG, QString("ExternIO::Write('%1')")
217 .arg(QString(buffer).simplified()));
218
219 int len = write(m_appIn, buffer.constData(), buffer.size());
220 if (len != buffer.size())
221 {
222 if (len > 0)
223 {
224 LOG(VB_RECORD, LOG_WARNING,
225 QString("ExternIO::Write: only wrote %1 of %2 bytes '%3'")
226 .arg(len).arg(buffer.size()).arg(QString(buffer)));
227 }
228 else
229 {
230 m_error = QString("ExternIO: Failed to write '%1' to app's stdin: ")
231 .arg(QString(buffer)) + ENO;
232 return -1;
233 }
234 }
235
236 return len;
237}
238
240{
241 LOG(VB_RECORD, LOG_INFO, QString("ExternIO::Run()"));
242
243 Fork();
244 GetStatus(10ms);
245
246 return true;
247}
248
249/* Return true if the process is not, or is no longer running */
250bool ExternIO::KillIfRunning([[maybe_unused]] const QString & cmd)
251{
252#if defined(Q_OS_DARWIN) || defined(__FreeBSD__) || defined(__OpenBSD__)
253 return false;
254#elif defined( _WIN32 )
255 return false;
256#else
257 QString grp = QString("pgrep -x -f -- \"%1\" 2>&1 > /dev/null").arg(cmd);
258 QString kil = QString("pkill --signal 15 -x -f -- \"%1\" 2>&1 > /dev/null")
259 .arg(cmd);
260
261 int res_grp = system(grp.toUtf8().constData());
262 if (WEXITSTATUS(res_grp) == 1)
263 {
264 LOG(VB_RECORD, LOG_DEBUG, QString("'%1' not running.").arg(cmd));
265 return true;
266 }
267
268 LOG(VB_RECORD, LOG_WARNING, QString("'%1' already running, killing...")
269 .arg(cmd));
270 int res_kil = system(kil.toUtf8().constData());
271 if (WEXITSTATUS(res_kil) == 1)
272 LOG(VB_GENERAL, LOG_WARNING, QString("'%1' failed: %2")
273 .arg(kil, ENO));
274
275 res_grp = system(grp.toUtf8().constData());
276 if (WEXITSTATUS(res_grp) == 1)
277 {
278 LOG(WEXITSTATUS(res_kil) == 0 ? VB_RECORD : VB_GENERAL, LOG_WARNING,
279 QString("'%1' terminated.").arg(cmd));
280 return true;
281 }
282
283 std::this_thread::sleep_for(50ms);
284
285 kil = QString("pkill --signal 9 -x -f \"%1\" 2>&1 > /dev/null").arg(cmd);
286 res_kil = system(kil.toUtf8().constData());
287 if (WEXITSTATUS(res_kil) > 0)
288 LOG(VB_GENERAL, LOG_WARNING, QString("'%1' failed: %2")
289 .arg(kil, ENO));
290
291 res_grp = system(grp.toUtf8().constData());
292 LOG(WEXITSTATUS(res_kil) == 0 ? VB_RECORD : VB_GENERAL, LOG_WARNING,
293 QString("'%1' %2.")
294 .arg(cmd, WEXITSTATUS(res_grp) == 0 ? "sill running" : "terminated"));
295
296 return (WEXITSTATUS(res_grp) != 0);
297#endif
298}
299
301{
302#ifndef _WIN32
303 if (Error())
304 {
305 LOG(VB_RECORD, LOG_INFO, QString("ExternIO in bad state: '%1'")
306 .arg(m_error));
307 return;
308 }
309
310 QString full_command = QString("%1").arg(m_args.join(" "));
311
312 if (!KillIfRunning(full_command))
313 {
314 // Give it one more chance.
315 std::this_thread::sleep_for(50ms);
316 if (!KillIfRunning(full_command))
317 {
318 m_error = QString("Unable to kill existing '%1'.")
319 .arg(full_command);
320 LOG(VB_GENERAL, LOG_ERR, m_error);
321 return;
322 }
323 }
324
325
326 LOG(VB_RECORD, LOG_INFO, QString("ExternIO::Fork '%1'").arg(full_command));
327
328 std::array<int,2> in = {-1, -1};
329 std::array<int,2> out = {-1, -1};
330 std::array<int,2> err = {-1, -1};
331
332 if (pipe(in.data()) < 0)
333 {
334 m_error = "pipe(in) failed: " + ENO;
335 return;
336 }
337 if (pipe(out.data()) < 0)
338 {
339 m_error = "pipe(out) failed: " + ENO;
340 close(in[0]);
341 close(in[1]);
342 return;
343 }
344 if (pipe(err.data()) < 0)
345 {
346 m_error = "pipe(err) failed: " + ENO;
347 close(in[0]);
348 close(in[1]);
349 close(out[0]);
350 close(out[1]);
351 return;
352 }
353
354 m_pid = fork();
355 if (m_pid < 0)
356 {
357 // Failed
358 m_error = "fork() failed: " + ENO;
359 return;
360 }
361 if (m_pid > 0)
362 {
363 // Parent
364 close(in[0]);
365 close(out[1]);
366 close(err[1]);
367 m_appIn = in[1];
368 m_appOut = out[0];
369 m_appErr = err[0];
370
371 bool error = false;
372 error = (fcntl(m_appIn, F_SETFL, O_NONBLOCK) == -1);
373 error |= (fcntl(m_appOut, F_SETFL, O_NONBLOCK) == -1);
374 error |= (fcntl(m_appErr, F_SETFL, O_NONBLOCK) == -1);
375
376 if (error)
377 {
378 LOG(VB_GENERAL, LOG_WARNING,
379 "ExternIO::Fork(): Failed to set O_NONBLOCK for FD: " + ENO);
380 std::this_thread::sleep_for(2s);
382 }
383
384 LOG(VB_RECORD, LOG_INFO, "Spawned");
385 return;
386 }
387
388 // Child
389 close(in[1]);
390 close(out[0]);
391 close(err[0]);
392 if (dup2( in[0], 0) < 0)
393 {
394 std::cerr << "dup2(stdin) failed: " << strerror(errno);
396 }
397 else if (dup2(out[1], 1) < 0)
398 {
399 std::cerr << "dup2(stdout) failed: " << strerror(errno);
401 }
402 else if (dup2(err[1], 2) < 0)
403 {
404 std::cerr << "dup2(stderr) failed: " << strerror(errno);
406 }
407
408 /* Close all open file descriptors except stdin/stdout/stderr */
409#if HAVE_CLOSE_RANGE
410 close_range(3, sysconf(_SC_OPEN_MAX) - 1, 0);
411#else
412 for (int i = sysconf(_SC_OPEN_MAX) - 1; i > 2; --i)
413 close(i);
414#endif
415
416 /* Set the process group id to be the same as the pid of this
417 * child process. This ensures that any subprocesses launched by this
418 * process can be killed along with the process itself. */
419 if (setpgid(0,0) < 0)
420 {
421 std::cerr << "ExternIO: "
422 << "setpgid() failed: "
423 << strerror(errno) << std::endl;
424 }
425
426 /* run command */
427 char *command = strdup(m_app.canonicalFilePath()
428 .toUtf8().constData());
429 // Copy QStringList to char**
430 char **arguments = new char*[m_args.size() + 1];
431 for (int i = 0; i < m_args.size(); ++i)
432 {
433 int len = m_args[i].size() + 1;
434 arguments[i] = new char[len];
435 memcpy(arguments[i], m_args[i].toStdString().c_str(), len);
436 }
437 arguments[m_args.size()] = nullptr;
438
439 if (execv(command, arguments) < 0)
440 {
441 // Can't use LOG due to locking fun.
442 std::cerr << "ExternIO: "
443 << "execv() failed: "
444 << strerror(errno) << std::endl;
445 }
446 else
447 {
448 std::cerr << "ExternIO: "
449 << "execv() should not be here?: "
450 << strerror(errno) << std::endl;
451 }
452
453#endif // !defined( _WIN32 )
454
455 /* Failed to exec */
456 _exit(GENERIC_EXIT_DAEMONIZING_ERROR); // this exit is ok
457}
458
459
460QMap<int, ExternalStreamHandler*> ExternalStreamHandler::s_handlers;
463
465 int inputid, int majorid)
466{
467 QMutexLocker locker(&s_handlersLock);
468
469 QMap<int, ExternalStreamHandler*>::iterator it = s_handlers.find(majorid);
470
471 if (it == s_handlers.end())
472 {
473 auto *newhandler = new ExternalStreamHandler(devname, inputid, majorid);
474 s_handlers[majorid] = newhandler;
475 s_handlersRefCnt[majorid] = 1;
476
477 LOG(VB_RECORD, LOG_INFO,
478 QString("ExternSH[%1:%2]: Creating new stream handler for %3 "
479 "(1 in use)")
480 .arg(inputid).arg(majorid).arg(devname));
481 }
482 else
483 {
484 ++s_handlersRefCnt[majorid];
485 uint rcount = s_handlersRefCnt[majorid];
486 LOG(VB_RECORD, LOG_INFO,
487 QString("ExternSH[%1:%2]: Using existing stream handler for %3")
488 .arg(inputid).arg(majorid).arg(devname) +
489 QString(" (%1 in use)").arg(rcount));
490 }
491
492 return s_handlers[majorid];
493}
494
496 int inputid)
497{
498 QMutexLocker locker(&s_handlersLock);
499
500 int majorid = ref->m_majorId;
501
502 QMap<int, uint>::iterator rit = s_handlersRefCnt.find(majorid);
503 if (rit == s_handlersRefCnt.end())
504 return;
505
506 QMap<int, ExternalStreamHandler*>::iterator it =
507 s_handlers.find(majorid);
508
509 if (*rit > 1)
510 {
511 ref = nullptr;
512 --(*rit);
513
514 LOG(VB_RECORD, LOG_INFO,
515 QString("ExternSH[%1:%2]: Return handler (%3 still in use)")
516 .arg(inputid).arg(majorid).arg(*rit));
517
518 return;
519 }
520
521 if ((it != s_handlers.end()) && (*it == ref))
522 {
523 LOG(VB_RECORD, LOG_INFO,
524 QString("ExternSH[%1:%2]: Closing handler (0 in use)")
525 .arg(inputid).arg(majorid));
526 delete *it;
527 s_handlers.erase(it);
528 }
529 else
530 {
531 LOG(VB_GENERAL, LOG_ERR,
532 QString("ExternSH[%1:%2]: Error: No handler to return!")
533 .arg(inputid).arg(majorid));
534 }
535
536 s_handlersRefCnt.erase(rit);
537 ref = nullptr;
538}
539
540/*
541 ExternalStreamHandler
542 */
543
545 int inputid,
546 int majorid)
547 : StreamHandler(path, inputid)
548 , m_loc(m_device)
549 , m_majorId(majorid)
550{
551 setObjectName("ExternSH");
552
553 m_args = path.split(' ',Qt::SkipEmptyParts) +
554 logPropagateArgs.split(' ', Qt::SkipEmptyParts);
555 //NOLINTNEXTLINE(cppcoreguidelines-prefer-member-initializer)
556 m_app = m_args.first();
557 m_args.removeFirst();
558
559 // Pass one (and only one) 'quiet'
560 if (!m_args.contains("--quiet") && !m_args.contains("-q"))
561 m_args << "--quiet";
562
563 m_args << "--inputid" << QString::number(majorid);
564 LOG(VB_RECORD, LOG_INFO, LOC + QString("args \"%1\"")
565 .arg(m_args.join(" ")));
566
567 if (!OpenApp())
568 {
569 LOG(VB_GENERAL, LOG_ERR, LOC +
570 QString("Failed to start %1").arg(m_device));
571 }
572}
573
575{
576 return m_streamingCnt.loadAcquire();
577}
578
580{
581 QString result;
582 QString ready_cmd;
583 QByteArray buffer;
584 int sz = 0;
585 uint len = 0;
586 uint read_len = 0;
587 uint restart_cnt = 0;
588 MythTimer status_timer;
589 MythTimer nodata_timer;
590
591 bool good_data = false;
592 uint data_proc_err = 0;
593 uint data_short_err = 0;
594
595 if (!m_io)
596 {
597 LOG(VB_GENERAL, LOG_ERR, LOC +
598 QString("%1 is not running.").arg(m_device));
599 }
600
601 status_timer.start();
602
603 RunProlog();
604
605 LOG(VB_RECORD, LOG_INFO, LOC + "run(): begin");
606
607 SetRunning(true, true, false);
608
609 if (m_pollMode)
610 ready_cmd = "SendBytes";
611 else
612 ready_cmd = "XON";
613
614 uint remainder = 0;
615 while (m_runningDesired && !m_bError)
616 {
617 if (!IsTSOpen())
618 {
619 LOG(VB_RECORD, LOG_WARNING, LOC + "TS not open yet.");
620 std::this_thread::sleep_for(10ms);
621 continue;
622 }
623
624 if (StreamingCount() == 0)
625 {
626 std::this_thread::sleep_for(10ms);
627 continue;
628 }
629
631
632 if (!m_xon || m_pollMode)
633 {
634 if (buffer.size() > TOO_FAST_SIZE)
635 {
636 LOG(VB_RECORD, LOG_WARNING, LOC +
637 "Internal buffer too full to accept more data from "
638 "external application.");
639 }
640 else
641 {
642 if (!ProcessCommand(ready_cmd, result))
643 {
644 if (result.startsWith("ERR"))
645 {
646 LOG(VB_GENERAL, LOG_ERR, LOC +
647 QString("Aborting: %1 -> %2")
648 .arg(ready_cmd, result));
649 m_bError = true;
650 continue;
651 }
652
653 if (restart_cnt++)
654 std::this_thread::sleep_for(20s);
655 if (!RestartStream())
656 {
657 LOG(VB_RECORD, LOG_ERR, LOC +
658 "Failed to restart stream.");
659 m_bError = true;
660 }
661 continue;
662 }
663 m_xon = true;
664 }
665 }
666
667 if (m_xon)
668 {
669 if (status_timer.elapsed() >= 2s)
670 {
671 // Since we may never need to send the XOFF
672 // command, occationally check to see if the
673 // External recorder needs to report an issue.
674 if (CheckForError())
675 {
676 if (restart_cnt++)
677 std::this_thread::sleep_for(20s);
678 if (!RestartStream())
679 {
680 LOG(VB_RECORD, LOG_ERR, LOC +
681 "Failed to restart stream.");
682 m_bError = true;
683 }
684 continue;
685 }
686
687 status_timer.restart();
688 }
689
690 if (buffer.size() > TOO_FAST_SIZE)
691 {
692 if (!m_pollMode)
693 {
694 // Data is comming a little too fast, so XOFF
695 // to give us time to process it.
696 if (!ProcessCommand(QString("XOFF"), result))
697 {
698 if (result.startsWith("ERR"))
699 {
700 LOG(VB_GENERAL, LOG_ERR, LOC +
701 QString("Aborting: XOFF -> %2")
702 .arg(result));
703 m_bError = true;
704 }
705 }
706 m_xon = false;
707 }
708 }
709
710 read_len = 0;
711 if (m_io != nullptr)
712 {
713 sz = PACKET_SIZE - remainder;
714 if (sz > 0)
715 read_len = m_io->Read(buffer, sz, 100ms);
716 }
717 }
718 else
719 {
720 read_len = 0;
721 }
722
723 if (read_len == 0)
724 {
725 if (!nodata_timer.isRunning())
726 nodata_timer.start();
727 else
728 {
729 if (nodata_timer.elapsed() >= 50s)
730 {
731 LOG(VB_GENERAL, LOG_WARNING, LOC +
732 "No data for 50 seconds, Restarting stream.");
733 if (!RestartStream())
734 {
735 LOG(VB_RECORD, LOG_ERR, LOC +
736 "Failed to restart stream.");
737 m_bError = true;
738 }
739 nodata_timer.stop();
740 continue;
741 }
742 }
743
744 std::this_thread::sleep_for(50ms);
745
746 // HLS type streams may only produce data every ~10 seconds
747 if (nodata_timer.elapsed() < 12s && buffer.size() < TS_PACKET_SIZE)
748 continue;
749 }
750 else
751 {
752 nodata_timer.stop();
753 restart_cnt = 0;
754 }
755
756 if (m_io == nullptr)
757 {
758 LOG(VB_GENERAL, LOG_ERR, LOC + "I/O thread has disappeared!");
759 m_bError = true;
760 break;
761 }
762 if (m_io->Error())
763 {
764 LOG(VB_GENERAL, LOG_ERR, LOC +
765 QString("Fatal Error from External Recorder: %1")
766 .arg(m_io->ErrorString()));
767 CloseApp();
768 m_bError = true;
769 break;
770 }
771
772 len = remainder = buffer.size();
773
774 if (len == 0)
775 continue;
776
777 if (len < TS_PACKET_SIZE)
778 {
779 if (m_xon && data_short_err++ == 0)
780 LOG(VB_RECORD, LOG_INFO, LOC + "Waiting for a full TS packet.");
781 std::this_thread::sleep_for(50us);
782 continue;
783 }
784 if (data_short_err)
785 {
786 if (data_short_err > 1)
787 {
788 LOG(VB_RECORD, LOG_INFO, LOC +
789 QString("Waited for a full TS packet %1 times.")
790 .arg(data_short_err));
791 }
792 data_short_err = 0;
793 }
794
795 if (!m_streamLock.tryLock())
796 continue;
797
798 if (!m_listenerLock.tryLock())
799 continue;
800
801 for (auto sit = m_streamDataList.cbegin();
802 sit != m_streamDataList.cend(); ++sit)
803 {
804 remainder = sit.key()->ProcessData
805 (reinterpret_cast<const uint8_t *>
806 (buffer.constData()), buffer.size());
807 }
808
809 m_listenerLock.unlock();
810
811 if (m_replay)
812 {
813 m_replayBuffer += buffer.left(len - remainder);
814 if (m_replayBuffer.size() > (50 * PACKET_SIZE))
815 {
816 m_replayBuffer.remove(0, len - remainder);
817 LOG(VB_RECORD, LOG_WARNING, LOC +
818 QString("Replay size truncated to %1 bytes")
819 .arg(m_replayBuffer.size()));
820 }
821 }
822
823 m_streamLock.unlock();
824
825 if (remainder == 0)
826 {
827 buffer.clear();
828 good_data = (len != 0U);
829 }
830 else if (len > remainder) // leftover bytes
831 {
832 buffer.remove(0, len - remainder);
833 good_data = (len != 0U);
834 }
835 else if (len == remainder)
836 {
837 good_data = false;
838 }
839
840 if (good_data)
841 {
842 if (data_proc_err)
843 {
844 if (data_proc_err > 1)
845 {
846 LOG(VB_RECORD, LOG_WARNING, LOC +
847 QString("Failed to process the data received %1 times.")
848 .arg(data_proc_err));
849 }
850 data_proc_err = 0;
851 }
852 }
853 else
854 {
855 if (data_proc_err++ == 0)
856 {
857 LOG(VB_RECORD, LOG_WARNING, LOC +
858 "Failed to process the data received");
859 }
860 }
861 }
862
863 LOG(VB_RECORD, LOG_INFO, LOC + "run(): " +
864 QString("%1 shutdown").arg(m_bError ? "Error" : "Normal"));
865
867 SetRunning(false, true, false);
868
869 LOG(VB_RECORD, LOG_INFO, LOC + "run(): " + "end");
870
871 RunEpilog();
872}
873
875{
876 QString result;
877
878 if (ProcessCommand("APIVersion?", result, 10s))
879 {
880 QStringList tokens = result.split(':', Qt::SkipEmptyParts);
881 if (tokens.size() > 1)
882 m_apiVersion = tokens[1].toUInt();
883 m_apiVersion = std::min(m_apiVersion, static_cast<int>(MAX_API_VERSION));
884 if (m_apiVersion < 1)
885 {
886 LOG(VB_RECORD, LOG_ERR, LOC +
887 QString("Bad response to 'APIVersion?' - '%1'. "
888 "Expecting 1, 2 or 3").arg(result));
889 m_apiVersion = 1;
890 }
891
892 ProcessCommand(QString("APIVersion:%1").arg(m_apiVersion), result);
893 return true;
894 }
895
896 return false;
897}
898
900{
901 if (m_apiVersion > 1)
902 {
903 QString result;
904
905 if (ProcessCommand("Description?", result))
906 m_loc = result.mid(3);
907 else
908 m_loc = m_device;
909 }
910
911 return m_loc;
912}
913
915{
916 {
917 QMutexLocker locker(&m_ioLock);
918
919 if (m_io)
920 {
921 LOG(VB_RECORD, LOG_WARNING, LOC + "OpenApp: already open!");
922 return true;
923 }
924
925 m_io = new ExternIO(m_app, m_args);
926
927 if (m_io == nullptr)
928 {
929 LOG(VB_GENERAL, LOG_ERR, LOC + "ExternIO failed: " + ENO);
930 m_bError = true;
931 }
932 else
933 {
934 LOG(VB_RECORD, LOG_INFO, LOC + QString("Spawn '%1'").arg(m_device));
935 m_io->Run();
936 if (m_io->Error())
937 {
938 LOG(VB_GENERAL, LOG_ERR,
939 "Failed to start External Recorder: " + m_io->ErrorString());
940 delete m_io;
941 m_io = nullptr;
942 m_bError = true;
943 return false;
944 }
945 }
946 }
947
948 QString result;
949
950 if (!SetAPIVersion())
951 {
952 // Try again using API version 2
953 m_apiVersion = 2;
954 if (!SetAPIVersion())
955 m_apiVersion = 1;
956 }
957
958 if (!IsAppOpen())
959 {
960 LOG(VB_RECORD, LOG_ERR, LOC + "Application is not responding.");
961 m_bError = true;
962 return false;
963 }
964
966
967 // Gather capabilities
968 if (!ProcessCommand("HasTuner?", result))
969 {
970 LOG(VB_RECORD, LOG_ERR, LOC +
971 QString("Bad response to 'HasTuner?' - '%1'").arg(result));
972 m_bError = true;
973 return false;
974 }
975 m_hasTuner = result.startsWith("OK:Yes");
976
977 if (!ProcessCommand("HasPictureAttributes?", result))
978 {
979 LOG(VB_RECORD, LOG_ERR, LOC +
980 QString("Bad response to 'HasPictureAttributes?' - '%1'")
981 .arg(result));
982 m_bError = true;
983 return false;
984 }
985 m_hasPictureAttributes = result.startsWith("OK:Yes");
986
987 /* Operate in "poll" or "xon/xoff" mode */
988 m_pollMode = ProcessCommand("FlowControl?", result) &&
989 result.startsWith("OK:Poll");
990
991 LOG(VB_RECORD, LOG_INFO, LOC + "App opened successfully");
992 LOG(VB_RECORD, LOG_INFO, LOC +
993 QString("Capabilities: tuner(%1) "
994 "Picture attributes(%2) "
995 "Flow control(%3)")
996 .arg(m_hasTuner ? "yes" : "no",
997 m_hasPictureAttributes ? "yes" : "no",
998 m_pollMode ? "Polling" : "XON/XOFF")
999 );
1000
1001 /* Let the external app know how many bytes will read without blocking */
1002 ProcessCommand(QString("BlockSize:%1").arg(PACKET_SIZE), result);
1003
1004 return true;
1005}
1006
1008{
1009 if (m_io == nullptr)
1010 {
1011 LOG(VB_RECORD, LOG_WARNING, LOC +
1012 "WARNING: Unable to communicate with external app.");
1013 return false;
1014 }
1015
1016 QString result;
1017 return ProcessCommand("Version?", result, 10s);
1018}
1019
1021{
1022 if (m_tsOpen)
1023 return true;
1024
1025 QString result;
1026
1027 if (!ProcessCommand("IsOpen?", result))
1028 return false;
1029
1030 m_tsOpen = true;
1031 return m_tsOpen;
1032}
1033
1035{
1036 m_ioLock.lock();
1037 if (m_io)
1038 {
1039 QString result;
1040
1041 LOG(VB_RECORD, LOG_INFO, LOC + "CloseRecorder");
1042 m_ioLock.unlock();
1043 ProcessCommand("CloseRecorder", result, 10s);
1044 m_ioLock.lock();
1045
1046 if (!result.startsWith("OK"))
1047 {
1048 LOG(VB_RECORD, LOG_INFO, LOC +
1049 "CloseRecorder failed, sending kill.");
1050
1051 QString full_command = QString("%1").arg(m_args.join(" "));
1052
1053 if (!ExternIO::KillIfRunning(full_command))
1054 {
1055 // Give it one more chance.
1056 std::this_thread::sleep_for(50ms);
1057 if (!ExternIO::KillIfRunning(full_command))
1058 {
1059 LOG(VB_GENERAL, LOG_ERR,
1060 QString("Unable to kill existing '%1'.")
1061 .arg(full_command));
1062 return;
1063 }
1064 }
1065 }
1066 delete m_io;
1067 m_io = nullptr;
1068 }
1069 m_ioLock.unlock();
1070}
1071
1073{
1074 bool streaming = (StreamingCount() > 0);
1075
1076 LOG(VB_RECORD, LOG_WARNING, LOC + "Restarting stream.");
1077 m_damaged = true;
1078
1079 if (streaming)
1080 StopStreaming();
1081
1082 std::this_thread::sleep_for(1s);
1083
1084 if (streaming)
1085 return StartStreaming();
1086
1087 return true;
1088}
1089
1091{
1092 if (m_replay)
1093 {
1094 QString result;
1095
1096 // Let the external app know that we could be busy for a little while
1097 if (!m_pollMode)
1098 {
1099 ProcessCommand(QString("XOFF"), result);
1100 m_xon = false;
1101 }
1102
1103 /* If the input is not a 'broadcast' it may only have one
1104 * copy of the SPS right at the beginning of the stream,
1105 * so make sure we don't miss it!
1106 */
1107 QMutexLocker listen_lock(&m_listenerLock);
1108
1109 if (!m_streamDataList.empty())
1110 {
1111 for (auto sit = m_streamDataList.cbegin();
1112 sit != m_streamDataList.cend(); ++sit)
1113 {
1114 sit.key()->ProcessData(reinterpret_cast<const uint8_t *>
1115 (m_replayBuffer.constData()),
1116 m_replayBuffer.size());
1117 }
1118 }
1119 LOG(VB_RECORD, LOG_INFO, LOC + QString("Replayed %1 bytes")
1120 .arg(m_replayBuffer.size()));
1121 m_replayBuffer.clear();
1122 m_replay = false;
1123
1124 // Let the external app know that we are ready
1125 if (!m_pollMode)
1126 {
1127 if (ProcessCommand(QString("XON"), result))
1128 m_xon = true;
1129 }
1130 }
1131}
1132
1134{
1135 QString result;
1136
1137 QMutexLocker locker(&m_streamLock);
1138
1140
1141 LOG(VB_RECORD, LOG_INFO, LOC +
1142 QString("StartStreaming with %1 current listeners")
1143 .arg(StreamingCount()));
1144
1145 if (!IsAppOpen())
1146 {
1147 LOG(VB_GENERAL, LOG_ERR, LOC + "External Recorder not started.");
1148 return false;
1149 }
1150
1151 if (StreamingCount() == 0)
1152 {
1153 if (!ProcessCommand("StartStreaming", result, 15s))
1154 {
1155 LogLevel_t level = LOG_ERR;
1156 if (result.startsWith("warn", Qt::CaseInsensitive))
1157 level = LOG_WARNING;
1158 else
1159 m_bError = true;
1160
1161 LOG(VB_GENERAL, level, LOC + QString("StartStreaming failed: '%1'")
1162 .arg(result));
1163
1164 return false;
1165 }
1166
1167 LOG(VB_RECORD, LOG_INFO, LOC + "Streaming started");
1168 }
1169 else
1170 {
1171 LOG(VB_RECORD, LOG_INFO, LOC + "Already streaming");
1172 }
1173
1174 m_streamingCnt.ref();
1175
1176 LOG(VB_RECORD, LOG_INFO, LOC +
1177 QString("StartStreaming %1 listeners")
1178 .arg(StreamingCount()));
1179
1180 return true;
1181}
1182
1184{
1185 QMutexLocker locker(&m_streamLock);
1186
1187 LOG(VB_RECORD, LOG_INFO, LOC +
1188 QString("StopStreaming %1 listeners")
1189 .arg(StreamingCount()));
1190
1191 if (StreamingCount() == 0)
1192 {
1193 LOG(VB_RECORD, LOG_INFO, LOC +
1194 "StopStreaming requested, but we are not streaming!");
1195 return true;
1196 }
1197
1198 if (m_streamingCnt.deref())
1199 {
1200 LOG(VB_RECORD, LOG_INFO, LOC +
1201 QString("StopStreaming delayed, still have %1 listeners")
1202 .arg(StreamingCount()));
1203 return true;
1204 }
1205
1206 LOG(VB_RECORD, LOG_INFO, LOC + "StopStreaming");
1207
1208 if (!m_pollMode && m_xon)
1209 {
1210 QString result;
1211 ProcessCommand(QString("XOFF"), result);
1212 m_xon = false;
1213 }
1214
1215 if (!IsAppOpen())
1216 {
1217 LOG(VB_GENERAL, LOG_ERR, LOC + "External Recorder not started.");
1218 return false;
1219 }
1220
1221 QString result;
1222 if (!ProcessCommand("StopStreaming", result, 10s))
1223 {
1224 LogLevel_t level = LOG_ERR;
1225 if (result.startsWith("warn", Qt::CaseInsensitive))
1226 level = LOG_WARNING;
1227 else
1228 m_bError = true;
1229
1230 LOG(VB_GENERAL, level, LOC + QString("StopStreaming: '%1'")
1231 .arg(result));
1232
1233 return false;
1234 }
1235
1236 PurgeBuffer();
1237 LOG(VB_RECORD, LOG_INFO, LOC + "Streaming stopped");
1238
1239 return true;
1240}
1241
1243 QString & result,
1244 std::chrono::milliseconds timeout,
1245 uint retry_cnt)
1246{
1247 QMutexLocker locker(&m_processLock);
1248
1249 if (m_apiVersion == 3)
1250 {
1251 QVariantMap vcmd;
1252 QVariantMap vresult;
1253 QByteArray response;
1254 QStringList tokens = cmd.split(':');
1255 vcmd["command"] = tokens[0];
1256 if (tokens.size() > 1)
1257 vcmd["value"] = tokens[1];
1258
1259 LOG(VB_RECORD, LOG_DEBUG, LOC +
1260 QString("Arguments: %1").arg(tokens.join("\n")));
1261
1262 bool r = ProcessJson(vcmd, vresult, response, timeout, retry_cnt);
1263 result = QString("%1:%2").arg(vresult["status"].toString(),
1264 vresult["message"].toString());
1265 return r;
1266 }
1267 if (m_apiVersion == 2)
1268 return ProcessVer2(cmd, result, timeout, retry_cnt);
1269 if (m_apiVersion == 1)
1270 return ProcessVer1(cmd, result, timeout, retry_cnt);
1271
1272 LOG(VB_RECORD, LOG_ERR, LOC +
1273 QString("Invalid API version %1. Expected 1 or 2").arg(m_apiVersion));
1274 return false;
1275}
1276
1277bool ExternalStreamHandler::ProcessVer1(const QString & cmd,
1278 QString & result,
1279 std::chrono::milliseconds timeout,
1280 uint retry_cnt)
1281{
1282 LOG(VB_RECORD, LOG_DEBUG, LOC + QString("ProcessVer1('%1')")
1283 .arg(cmd));
1284
1285 for (uint cnt = 0; cnt < retry_cnt; ++cnt)
1286 {
1287 QMutexLocker locker(&m_ioLock);
1288
1289 if (!m_io)
1290 {
1291 LOG(VB_RECORD, LOG_ERR, LOC + "External I/O not ready!");
1292 return false;
1293 }
1294
1295 QByteArray buf(cmd.toUtf8(), cmd.size());
1296 buf += '\n';
1297
1298 if (m_io->Error())
1299 {
1300 LOG(VB_GENERAL, LOG_ERR, LOC + "External Recorder in bad state: " +
1301 m_io->ErrorString());
1302 return false;
1303 }
1304
1305 /* Try to keep in sync, if External app was too slow in responding
1306 * to previous query, consume the response before sending new query */
1307 m_io->GetStatus(0ms);
1308
1309 /* Send new query */
1310 m_io->Write(buf);
1311
1313 while (timer.elapsed() < timeout)
1314 {
1315 result = m_io->GetStatus(timeout);
1316 if (m_io->Error())
1317 {
1318 LOG(VB_GENERAL, LOG_ERR, LOC +
1319 "Failed to read from External Recorder: " +
1320 m_io->ErrorString());
1321 m_bError = true;
1322 return false;
1323 }
1324
1325 // Out-of-band error message
1326 if (result.startsWith("STATUS:ERR") ||
1327 result.startsWith("0:STATUS:ERR"))
1328 {
1329 LOG(VB_RECORD, LOG_ERR, LOC + result);
1330 result.remove(0, result.indexOf(":ERR") + 1);
1331 return false;
1332 }
1333 // STATUS message are "out of band".
1334 // Ignore them while waiting for a responds to a command
1335 if (!result.startsWith("STATUS") && !result.startsWith("0:STATUS"))
1336 break;
1337 LOG(VB_RECORD, LOG_INFO, LOC +
1338 QString("Ignoring response '%1'").arg(result));
1339 }
1340
1341 if (result.size() < 1)
1342 {
1343 LOG(VB_GENERAL, LOG_WARNING, LOC +
1344 QString("External Recorder did not respond to '%1'").arg(cmd));
1345 }
1346 else
1347 {
1348 bool okay = result.startsWith("OK");
1349 if (okay || result.startsWith("WARN") || result.startsWith("ERR"))
1350 {
1351 LogLevel_t level = LOG_INFO;
1352
1353 m_ioErrCnt = 0;
1354 if (!okay)
1355 level = LOG_WARNING;
1356 else if (cmd.startsWith("SendBytes"))
1357 level = LOG_DEBUG;
1358
1359 LOG(VB_RECORD, level,
1360 LOC + QString("ProcessCommand('%1') = '%2' took %3ms %4")
1361 .arg(cmd, result,
1362 QString::number(timer.elapsed().count()),
1363 okay ? "" : "<-- NOTE"));
1364
1365 return okay;
1366 }
1367 LOG(VB_GENERAL, LOG_WARNING, LOC +
1368 QString("External Recorder invalid response to '%1': '%2'")
1369 .arg(cmd, result));
1370 }
1371
1372 if (++m_ioErrCnt > 10)
1373 {
1374 LOG(VB_GENERAL, LOG_ERR, LOC + "Too many I/O errors.");
1375 m_bError = true;
1376 break;
1377 }
1378 }
1379
1380 return false;
1381}
1382
1383bool ExternalStreamHandler::ProcessVer2(const QString & command,
1384 QString & result,
1385 std::chrono::milliseconds timeout,
1386 uint retry_cnt)
1387{
1388 QString status;
1389 QString raw;
1390
1391 for (uint cnt = 0; cnt < retry_cnt; ++cnt)
1392 {
1393 QString cmd = QString("%1:%2").arg(++m_serialNo).arg(command);
1394
1395 LOG(VB_RECORD, LOG_DEBUG, LOC + QString("ProcessVer2('%1') serial(%2)")
1396 .arg(cmd).arg(m_serialNo));
1397
1398 QMutexLocker locker(&m_ioLock);
1399
1400 if (!m_io)
1401 {
1402 LOG(VB_RECORD, LOG_ERR, LOC + "External I/O not ready!");
1403 return false;
1404 }
1405
1406 QByteArray buf(cmd.toUtf8(), cmd.size());
1407 buf += '\n';
1408
1409 if (m_io->Error())
1410 {
1411 LOG(VB_GENERAL, LOG_ERR, LOC + "External Recorder in bad state: " +
1412 m_io->ErrorString());
1413 return false;
1414 }
1415
1416 /* Send query */
1417 m_io->Write(buf);
1418
1419 QStringList tokens;
1420
1422 while (timer.elapsed() < timeout)
1423 {
1424 result = m_io->GetStatus(timeout);
1425 if (m_io->Error())
1426 {
1427 LOG(VB_GENERAL, LOG_ERR, LOC +
1428 "Failed to read from External Recorder: " +
1429 m_io->ErrorString());
1430 m_bError = true;
1431 return false;
1432 }
1433
1434 if (!result.isEmpty())
1435 {
1436 raw = result;
1437 tokens = result.split(':', Qt::SkipEmptyParts);
1438
1439 // Look for result with the serial number of this query
1440 if (tokens.size() > 1 && tokens[0].toUInt() >= m_serialNo)
1441 break;
1442
1443 /* Other messages are "out of band" */
1444
1445 // Check for error message missing serial#
1446 if (tokens[0].startsWith("ERR"))
1447 break;
1448
1449 // Remove serial#
1450 tokens.removeFirst();
1451 result = tokens.join(':');
1452 bool err = (tokens.size() > 1 && tokens[1].startsWith("ERR"));
1453 LOG(VB_RECORD, (err ? LOG_WARNING : LOG_INFO), LOC + raw);
1454 if (err)
1455 {
1456 // Remove "STATUS"
1457 tokens.removeFirst();
1458 result = tokens.join(':');
1459 return false;
1460 }
1461 }
1462 }
1463
1464 if (timer.elapsed() >= timeout)
1465 {
1466 LOG(VB_RECORD, LOG_ERR, LOC +
1467 QString("ProcessVer2: Giving up waiting for response for "
1468 "command '%2'").arg(cmd));
1469 }
1470 else if (tokens.size() < 2)
1471 {
1472 LOG(VB_RECORD, LOG_ERR, LOC +
1473 QString("Did not receive a valid response "
1474 "for command '%1', received '%2'").arg(cmd, result));
1475 }
1476 else if (tokens[0].toUInt() > m_serialNo)
1477 {
1478 LOG(VB_RECORD, LOG_ERR, LOC +
1479 QString("ProcessVer2: Looking for serial no %1, "
1480 "but received %2 for command '%2'")
1481 .arg(QString::number(m_serialNo), tokens[0], cmd));
1482 }
1483 else
1484 {
1485 tokens.removeFirst();
1486 status = tokens[0].trimmed();
1487 result = tokens.join(':');
1488
1489 bool okay = (status == "OK");
1490 if (okay || status.startsWith("WARN") || status.startsWith("ERR"))
1491 {
1492 LogLevel_t level = LOG_INFO;
1493
1494 m_ioErrCnt = 0;
1495 if (!okay)
1496 level = LOG_WARNING;
1497 else if (command.startsWith("SendBytes") ||
1498 (command.startsWith("TuneStatus") &&
1499 result == "OK:InProgress"))
1500 level = LOG_DEBUG;
1501
1502 LOG(VB_RECORD, level,
1503 LOC + QString("ProcessV2('%1') = '%2' took %3ms %4")
1504 .arg(cmd, result, QString::number(timer.elapsed().count()),
1505 okay ? "" : "<-- NOTE"));
1506
1507 return okay;
1508 }
1509 LOG(VB_GENERAL, LOG_WARNING, LOC +
1510 QString("External Recorder invalid response to '%1': '%2'")
1511 .arg(cmd, result));
1512 }
1513
1514 if (++m_ioErrCnt > 10)
1515 {
1516 LOG(VB_GENERAL, LOG_ERR, LOC + "Too many I/O errors.");
1517 m_bError = true;
1518 break;
1519 }
1520 }
1521
1522 return false;
1523}
1524
1525bool ExternalStreamHandler::ProcessJson(const QVariantMap & vmsg,
1526 QVariantMap & elements,
1527 QByteArray & response,
1528 std::chrono::milliseconds timeout,
1529 uint retry_cnt)
1530{
1531 for (uint cnt = 0; cnt < retry_cnt; ++cnt)
1532 {
1533 QVariantMap query(vmsg);
1534
1535 uint serial = ++m_serialNo;
1536 query["serial"] = serial;
1537 QString cmd = query["command"].toString();
1538
1539 QJsonDocument qdoc;
1540 qdoc = QJsonDocument::fromVariant(query);
1541 QByteArray cmdbuf = qdoc.toJson(QJsonDocument::Compact);
1542
1543 LOG(VB_RECORD, LOG_DEBUG, LOC +
1544 QString("ProcessJson: %1").arg(QString(cmdbuf)));
1545
1546 if (m_io->Error())
1547 {
1548 LOG(VB_GENERAL, LOG_ERR, LOC + "External Recorder in bad state: " +
1549 m_io->ErrorString());
1550 return false;
1551 }
1552
1553 /* Send query */
1554 m_io->Write(cmdbuf);
1555 m_io->Write("\n");
1556
1558 while (timer.elapsed() < timeout)
1559 {
1560 response = m_io->GetStatus(timeout);
1561 if (m_io->Error())
1562 {
1563 LOG(VB_GENERAL, LOG_ERR, LOC +
1564 "Failed to read from External Recorder: " +
1565 m_io->ErrorString());
1566 m_bError = true;
1567 return false;
1568 }
1569
1570 if (!response.isEmpty())
1571 {
1572 QJsonParseError parseError {};
1573 QJsonDocument doc;
1574
1575 doc = QJsonDocument::fromJson(response, &parseError);
1576
1577 if (parseError.error != QJsonParseError::NoError)
1578 {
1579 LOG(VB_GENERAL, LOG_ERR, LOC +
1580 QString("ExternalRecorder returned invalid JSON message: %1: %2\n%3\nfor\n%4")
1581 .arg(parseError.offset)
1582 .arg(parseError.errorString(),
1583 QString(response),
1584 QString(cmdbuf)));
1585 }
1586 else
1587 {
1588 elements = doc.toVariant().toMap();
1589 if (!elements.contains("serial"))
1590 continue;
1591
1592 serial = elements["serial"].toInt();
1593 if (serial >= m_serialNo)
1594 break;
1595
1596 if (elements.contains("status"))
1597 {
1598 LogLevel_t level { LOG_INFO };
1599
1600 if (elements["status"] == "ERR")
1601 level = LOG_ERR;
1602 else if (elements["status"] == "WARN")
1603 level = LOG_WARNING;
1604
1605 LOG(VB_RECORD, level, LOC + QString("%1: %2")
1606 .arg(elements["status"].toString(),
1607 elements["message"].toString()));
1608 }
1609 }
1610 }
1611 }
1612
1613 if (timer.elapsed() >= timeout)
1614 {
1615 LOG(VB_RECORD, LOG_ERR, LOC +
1616 QString("ProcessJson: Giving up waiting for response for "
1617 "command '%2'").arg(QString(cmdbuf)));
1618
1619 }
1620
1621 if (serial > m_serialNo)
1622 {
1623 LOG(VB_RECORD, LOG_ERR, LOC +
1624 QString("ProcessJson: Looking for serial no %1, "
1625 "but received %2 for command '%2'")
1626 .arg(QString::number(m_serialNo))
1627 .arg(serial)
1628 .arg(QString(cmdbuf)));
1629 }
1630 else if (!elements.contains("status"))
1631 {
1632 LOG(VB_RECORD, LOG_ERR, LOC +
1633 QString("ProcessJson: ExternalRecorder 'status' not found in %1")
1634 .arg(QString(response)));
1635 }
1636 else
1637 {
1638 QString status = elements["status"].toString();
1639 bool okay = (status == "OK");
1640 if (okay || status == "WARN" || status == "ERR")
1641 {
1642 LogLevel_t level = LOG_INFO;
1643
1644 m_ioErrCnt = 0;
1645 if (!okay)
1646 level = LOG_WARNING;
1647 else if (cmd == "SendBytes" ||
1648 (cmd == "TuneStatus?" &&
1649 elements["message"] == "InProgress"))
1650 level = LOG_DEBUG;
1651
1652 LOG(VB_RECORD, level,
1653 LOC + QString("ProcessJson('%1') = %2:%3:%4 took %5ms %6")
1654 .arg(QString(cmdbuf))
1655 .arg(elements["serial"].toInt())
1656 .arg(elements["status"].toString(),
1657 elements["message"].toString(),
1658 QString::number(timer.elapsed().count()),
1659 okay ? "" : "<-- NOTE")
1660 );
1661
1662 return okay;
1663 }
1664 LOG(VB_GENERAL, LOG_WARNING, LOC +
1665 QString("External Recorder invalid response to '%1': '%2'")
1666 .arg(QString(cmdbuf),
1667 QString(response)));
1668 }
1669
1670 if (++m_ioErrCnt > 10)
1671 {
1672 LOG(VB_GENERAL, LOG_ERR, LOC + "Too many I/O errors.");
1673 m_bError = true;
1674 break;
1675 }
1676 }
1677
1678 return false;
1679}
1680
1682{
1683 QByteArray response;
1684 bool err = false;
1685
1686 QMutexLocker locker(&m_ioLock);
1687
1688 if (!m_io)
1689 {
1690 LOG(VB_RECORD, LOG_ERR, LOC + "External I/O not ready!");
1691 return true;
1692 }
1693
1694 if (m_io->Error())
1695 {
1696 LOG(VB_GENERAL, LOG_ERR, "External Recorder in bad state: " +
1697 m_io->ErrorString());
1698 return true;
1699 }
1700
1701 response = m_io->GetStatus(0ms);
1702 while (!response.isEmpty())
1703 {
1704 if (m_apiVersion > 2)
1705 {
1706 QJsonParseError parseError {};
1707 QJsonDocument doc;
1708 QVariantMap elements;
1709
1710 doc = QJsonDocument::fromJson(response, &parseError);
1711
1712 if (parseError.error != QJsonParseError::NoError)
1713 {
1714 LOG(VB_GENERAL, LOG_ERR, LOC +
1715 QString("ExternalRecorder returned invalid JSON message: %1: %2\n%3\n")
1716 .arg(parseError.offset)
1717 .arg(parseError.errorString(), QString(response)));
1718 }
1719 else
1720 {
1721 elements = doc.toVariant().toMap();
1722 if (elements.contains("command") &&
1723 elements["command"] == "STATUS")
1724 {
1725 LogLevel_t level { LOG_INFO };
1726 QString status = elements["status"].toString();
1727 if (status.startsWith("err", Qt::CaseInsensitive))
1728 {
1729 level = LOG_ERR;
1730 err |= true;
1731 }
1732 else if (status.startsWith("warn",
1733 Qt::CaseInsensitive))
1734 {
1735 level = LOG_WARNING;
1736 }
1737 else if (status.startsWith("damage",
1738 Qt::CaseInsensitive))
1739 {
1740 level = LOG_WARNING;
1741 m_damaged |= true;
1742 }
1743 LOG(VB_RECORD, level,
1744 LOC + QString("%1:%2%3")
1745 .arg(status, elements["message"].toString(),
1746 m_damaged ? " (Damaged)" : ""));
1747 }
1748 }
1749 }
1750 else
1751 {
1752 QString res = QString(response);
1753 if (m_apiVersion == 2)
1754 {
1755 QStringList tokens = res.split(':', Qt::SkipEmptyParts);
1756 tokens.removeFirst();
1757 res = tokens.join(':');
1758 for (int idx = 1; idx < tokens.size(); ++idx)
1759 {
1760 err |= tokens[idx].startsWith("ERR",
1761 Qt::CaseInsensitive);
1762 m_damaged |= tokens[idx].startsWith("damage",
1763 Qt::CaseInsensitive);
1764 }
1765 }
1766 else
1767 {
1768 err |= res.startsWith("STATUS:ERR",
1769 Qt::CaseInsensitive);
1770 m_damaged |= res.startsWith("STATUS:DAMAGE",
1771 Qt::CaseInsensitive);
1772 }
1773
1774 LOG(VB_RECORD, (err ? LOG_WARNING : LOG_INFO), LOC + res);
1775 }
1776
1777 response = m_io->GetStatus(0ms);
1778 }
1779
1780 return err;
1781}
1782
1784{
1785 if (m_io)
1786 {
1787 QByteArray buffer;
1788 m_io->Read(buffer, PACKET_SIZE, 1ms);
1789 m_io->GetStatus(1ms);
1790 }
1791}
1792
1794{
1795 // TODO report on buffer overruns, etc.
1796}
#define LOC
QStringList m_args
bool Ready(int fd, std::chrono::milliseconds timeout, const QString &what)
int Write(const QByteArray &buffer)
static bool KillIfRunning(const QString &cmd)
int Read(QByteArray &buffer, int maxlen, std::chrono::milliseconds timeout=2500ms)
static constexpr uint8_t kMaxErrorCnt
QByteArray GetStatus(std::chrono::milliseconds timeout=2500ms)
QTextStream m_status
bool Error(void) const
ExternIO(const QString &app, const QStringList &args)
QString ErrorString(void) const
bool ProcessVer1(const QString &cmd, QString &result, std::chrono::milliseconds timeout, uint retry_cnt)
static ExternalStreamHandler * Get(const QString &devname, int inputid, int majorid)
void run(void) override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.
ExternalStreamHandler(const QString &path, int inputid, int majorid)
bool ProcessVer2(const QString &command, QString &result, std::chrono::milliseconds timeout, uint retry_cnt)
static QMap< int, uint > s_handlersRefCnt
bool ProcessCommand(const QString &cmd, QString &result, std::chrono::milliseconds timeout=4s, uint retry_cnt=3)
static QMap< int, ExternalStreamHandler * > s_handlers
bool ProcessJson(const QVariantMap &vmsg, QVariantMap &elements, QByteArray &response, std::chrono::milliseconds timeout=4s, uint retry_cnt=3)
void PriorityEvent(int fd) override
static void Return(ExternalStreamHandler *&ref, int inputid)
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:196
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:209
void setObjectName(const QString &name)
Definition: mthread.cpp:238
A QElapsedTimer based timer to replace use of QTime as a timer.
Definition: mythtimer.h:14
std::chrono::milliseconds restart(void)
Returns milliseconds elapsed since last start() or restart() and resets the count.
Definition: mythtimer.cpp:62
std::chrono::milliseconds elapsed(void)
Returns milliseconds elapsed since last start() or restart()
Definition: mythtimer.cpp:91
bool isRunning(void) const
Returns true if start() or restart() has been called at least once since construction and since any c...
Definition: mythtimer.cpp:135
void stop(void)
Stops timer, next call to isRunning() will return false and any calls to elapsed() or restart() will ...
Definition: mythtimer.cpp:78
@ kStartRunning
Definition: mythtimer.h:17
void start(void)
starts measuring elapsed time.
Definition: mythtimer.cpp:47
StreamDataList m_streamDataList
QString m_device
volatile bool m_runningDesired
volatile bool m_bError
bool RemoveAllPIDFilters(void)
void SetRunning(bool running, bool using_buffering, bool using_section_reader)
bool UpdateFiltersFromStreamData(void)
QRecursiveMutex m_listenerLock
#define O_NONBLOCK
Definition: compat.h:258
unsigned int uint
Definition: compat.h:68
#define close
Definition: compat.h:30
#define WEXITSTATUS(w)
Definition: compat.h:187
@ GENERIC_EXIT_DAEMONIZING_ERROR
Error daemonizing or execl.
Definition: exitcodes.h:31
@ GENERIC_EXIT_PIPE_FAILURE
Error creating I/O pipes.
Definition: exitcodes.h:29
QString logPropagateArgs
Definition: logging.cpp:82
#define ENO
This can be appended to the LOG args with "+".
Definition: mythlogging.h:74
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
Definition: mythlogging.h:39
Convenience inline random number generator functions.
QString toString(const QDateTime &raw_dt, uint format)
Returns formatted string representing the time.
Definition: mythdate.cpp:93
def read(device=None, features=[])
Definition: disc.py:35
def error(message)
Definition: smolt.py:409
def write(text, progress=True)
Definition: mythburn.py:307