MythTV master
ExternalStreamHandler.cpp
Go to the documentation of this file.
1// -*- Mode: c++ -*-
2
3#include <QtGlobal>
4#if QT_VERSION >= QT_VERSION_CHECK(6,0,0)
5#include <QtSystemDetection>
6#endif
7
8// POSIX headers
9#include <thread>
10#include <iostream>
11#include <fcntl.h>
12#include <unistd.h>
13#include <algorithm>
14#ifndef Q_OS_WINDOWS
15#include <poll.h>
16#include <sys/ioctl.h>
17#endif
18
19#ifdef Q_OS_ANDROID
20#include <sys/wait.h>
21#endif
22
23// Qt headers
24#include <QString>
25#include <QFile>
26#include <QJsonDocument>
27#include <QJsonObject>
28
29// MythTV headers
30#include "config.h"
34
35#include "ExternalChannel.h"
37//#include "ThreadedFileWriter.h"
38#include "cardutil.h"
39#include "dtvsignalmonitor.h"
40#include "mpeg/mpegstreamdata.h"
42
43#define LOC QString("ExternSH[%1](%2): ").arg(m_inputId).arg(m_loc)
44
45ExternIO::ExternIO(const QString & app,
46 const QStringList & args)
47 : m_app(QFileInfo(app)),
48 m_status(&m_statusBuf, QIODevice::ReadWrite)
49{
50 if (!m_app.exists())
51 {
52 m_error = QString("ExternIO: '%1' does not exist.").arg(app);
53 return;
54 }
55 if (!m_app.isReadable() || !m_app.isFile())
56 {
57 m_error = QString("ExternIO: '%1' is not readable.")
58 .arg(m_app.canonicalFilePath());
59 return;
60 }
61 if (!m_app.isExecutable())
62 {
63 m_error = QString("ExternIO: '%1' is not executable.")
64 .arg(m_app.canonicalFilePath());
65 return;
66 }
67
68 m_args = args;
69 m_args.prepend(m_app.baseName());
70
71 m_status.setString(&m_statusBuf);
72}
73
75{
79
80 // waitpid(m_pid, &status, 0);
81 delete[] m_buffer;
82}
83
84bool ExternIO::Ready([[maybe_unused]] int fd,
85 [[maybe_unused]] std::chrono::milliseconds timeout,
86 [[maybe_unused]] const QString & what)
87{
88#ifndef Q_OS_WINDOWS
89 std::array<struct pollfd,2> m_poll {};
90
91 m_poll[0].fd = fd;
92 m_poll[0].events = POLLIN | POLLPRI;
93 int ret = poll(m_poll.data(), 1, timeout.count());
94
95 if (m_poll[0].revents & POLLHUP)
96 {
97 m_error = what + " poll eof (POLLHUP)";
98 return false;
99 }
100 if (m_poll[0].revents & POLLNVAL)
101 {
102 LOG(VB_GENERAL, LOG_ERR, "poll error");
103 return false;
104 }
105 if (m_poll[0].revents & POLLIN)
106 {
107 if (ret > 0)
108 return true;
109
110 if ((EOVERFLOW == errno))
111 m_error = "poll overflow";
112 return false;
113 }
114#endif // !defined( Q_OS_WINDOWS )
115 return false;
116}
117
118int ExternIO::Read(QByteArray & buffer, int maxlen, std::chrono::milliseconds timeout)
119{
120 if (Error())
121 {
122 LOG(VB_RECORD, LOG_ERR,
123 QString("ExternIO::Read: already in error state: '%1'")
124 .arg(m_error));
125 return 0;
126 }
127
128 if (!Ready(m_appOut, timeout, "data"))
129 return 0;
130
131 if (m_bufSize < maxlen)
132 {
133 m_bufSize = maxlen;
134 delete [] m_buffer;
135 m_buffer = new char[m_bufSize];
136 }
137
138 int len = read(m_appOut, m_buffer, maxlen);
139
140 if (len < 0)
141 {
142 if (errno == EAGAIN)
143 {
144 if (++m_errCnt > kMaxErrorCnt)
145 {
146 m_error = "Failed to read from External Recorder: " + ENO;
147 LOG(VB_RECORD, LOG_WARNING,
148 "External Recorder not ready. Giving up.");
149 }
150 else
151 {
152 LOG(VB_RECORD, LOG_WARNING,
153 QString("External Recorder not ready. Will retry (%1/%2).")
154 .arg(m_errCnt).arg(kMaxErrorCnt));
155 std::this_thread::sleep_for(100ms);
156 }
157 }
158 else
159 {
160 m_error = "Failed to read from External Recorder: " + ENO;
161 LOG(VB_RECORD, LOG_ERR, m_error);
162 }
163 }
164 else
165 {
166 m_errCnt = 0;
167 }
168
169 if (len == 0)
170 return 0;
171
172 buffer.append(m_buffer, len);
173
174 LOG(VB_RECORD, LOG_DEBUG,
175 QString("ExternIO::Read '%1' bytes, buffer size %2")
176 .arg(len).arg(buffer.size()));
177
178 return len;
179}
180
181QByteArray ExternIO::GetStatus(std::chrono::milliseconds timeout)
182{
183 if (Error())
184 {
185 LOG(VB_RECORD, LOG_ERR,
186 QString("ExternIO::GetStatus: already in error state: '%1'")
187 .arg(m_error));
188 return {};
189 }
190
191 std::chrono::milliseconds waitfor = m_status.atEnd() ? timeout : 0ms;
192 if (Ready(m_appErr, waitfor, "status"))
193 {
194 std::array<char,2048> buffer {};
195 int len = read(m_appErr, buffer.data(), buffer.size());
196 m_status << QString::fromLatin1(buffer.data(), len);
197 }
198
199 if (m_status.atEnd())
200 return {};
201
202 QString msg = m_status.readLine();
203
204 LOG(VB_RECORD, LOG_DEBUG, QString("ExternIO::GetStatus '%1'")
205 .arg(msg));
206
207 return msg.toUtf8();
208}
209
210int ExternIO::Write(const QByteArray & buffer)
211{
212 if (Error())
213 {
214 LOG(VB_RECORD, LOG_ERR,
215 QString("ExternIO::Write: already in error state: '%1'")
216 .arg(m_error));
217 return -1;
218 }
219
220 LOG(VB_RECORD, LOG_DEBUG, QString("ExternIO::Write('%1')")
221 .arg(QString(buffer).simplified()));
222
223 int len = write(m_appIn, buffer.constData(), buffer.size());
224 if (len != buffer.size())
225 {
226 if (len > 0)
227 {
228 LOG(VB_RECORD, LOG_WARNING,
229 QString("ExternIO::Write: only wrote %1 of %2 bytes '%3'")
230 .arg(len).arg(buffer.size()).arg(QString(buffer)));
231 }
232 else
233 {
234 m_error = QString("ExternIO: Failed to write '%1' to app's stdin: ")
235 .arg(QString(buffer)) + ENO;
236 return -1;
237 }
238 }
239
240 return len;
241}
242
244{
245 LOG(VB_RECORD, LOG_INFO, QString("ExternIO::Run()"));
246
247 Fork();
248 GetStatus(10ms);
249
250 return true;
251}
252
253/* Return true if the process is not, or is no longer running */
254bool ExternIO::KillIfRunning([[maybe_unused]] const QString & cmd)
255{
256#ifdef Q_OS_BSD4
257 return false;
258#elif defined( Q_OS_WINDOWS )
259 return false;
260#else
261 QString grp = QString("pgrep -x -f -- \"%1\" 2>&1 > /dev/null").arg(cmd);
262 QString kil = QString("pkill --signal 15 -x -f -- \"%1\" 2>&1 > /dev/null")
263 .arg(cmd);
264
265 int res_grp = system(grp.toUtf8().constData());
266 if (WEXITSTATUS(res_grp) == 1)
267 {
268 LOG(VB_RECORD, LOG_DEBUG, QString("'%1' not running.").arg(cmd));
269 return true;
270 }
271
272 LOG(VB_RECORD, LOG_WARNING, QString("'%1' already running, killing...")
273 .arg(cmd));
274 int res_kil = system(kil.toUtf8().constData());
275 if (WEXITSTATUS(res_kil) == 1)
276 LOG(VB_GENERAL, LOG_WARNING, QString("'%1' failed: %2")
277 .arg(kil, ENO));
278
279 res_grp = system(grp.toUtf8().constData());
280 if (WEXITSTATUS(res_grp) == 1)
281 {
282 LOG(WEXITSTATUS(res_kil) == 0 ? VB_RECORD : VB_GENERAL, LOG_WARNING,
283 QString("'%1' terminated.").arg(cmd));
284 return true;
285 }
286
287 std::this_thread::sleep_for(50ms);
288
289 kil = QString("pkill --signal 9 -x -f \"%1\" 2>&1 > /dev/null").arg(cmd);
290 res_kil = system(kil.toUtf8().constData());
291 if (WEXITSTATUS(res_kil) > 0)
292 LOG(VB_GENERAL, LOG_WARNING, QString("'%1' failed: %2")
293 .arg(kil, ENO));
294
295 res_grp = system(grp.toUtf8().constData());
296 LOG(WEXITSTATUS(res_kil) == 0 ? VB_RECORD : VB_GENERAL, LOG_WARNING,
297 QString("'%1' %2.")
298 .arg(cmd, WEXITSTATUS(res_grp) == 0 ? "sill running" : "terminated"));
299
300 return (WEXITSTATUS(res_grp) != 0);
301#endif
302}
303
305{
306#ifndef Q_OS_WINDOWS
307 if (Error())
308 {
309 LOG(VB_RECORD, LOG_INFO, QString("ExternIO in bad state: '%1'")
310 .arg(m_error));
311 return;
312 }
313
314 QString full_command = QString("%1").arg(m_args.join(" "));
315
316 if (!KillIfRunning(full_command))
317 {
318 // Give it one more chance.
319 std::this_thread::sleep_for(50ms);
320 if (!KillIfRunning(full_command))
321 {
322 m_error = QString("Unable to kill existing '%1'.")
323 .arg(full_command);
324 LOG(VB_GENERAL, LOG_ERR, m_error);
325 return;
326 }
327 }
328
329
330 LOG(VB_RECORD, LOG_INFO, QString("ExternIO::Fork '%1'").arg(full_command));
331
332 std::array<int,2> in = {-1, -1};
333 std::array<int,2> out = {-1, -1};
334 std::array<int,2> err = {-1, -1};
335
336 if (pipe(in.data()) < 0)
337 {
338 m_error = "pipe(in) failed: " + ENO;
339 return;
340 }
341 if (pipe(out.data()) < 0)
342 {
343 m_error = "pipe(out) failed: " + ENO;
344 close(in[0]);
345 close(in[1]);
346 return;
347 }
348 if (pipe(err.data()) < 0)
349 {
350 m_error = "pipe(err) failed: " + ENO;
351 close(in[0]);
352 close(in[1]);
353 close(out[0]);
354 close(out[1]);
355 return;
356 }
357
358 m_pid = fork();
359 if (m_pid < 0)
360 {
361 // Failed
362 m_error = "fork() failed: " + ENO;
363 return;
364 }
365 if (m_pid > 0)
366 {
367 // Parent
368 close(in[0]);
369 close(out[1]);
370 close(err[1]);
371 m_appIn = in[1];
372 m_appOut = out[0];
373 m_appErr = err[0];
374
375 bool error = false;
376 error = (fcntl(m_appIn, F_SETFL, O_NONBLOCK) == -1);
377 error |= (fcntl(m_appOut, F_SETFL, O_NONBLOCK) == -1);
378 error |= (fcntl(m_appErr, F_SETFL, O_NONBLOCK) == -1);
379
380 if (error)
381 {
382 LOG(VB_GENERAL, LOG_WARNING,
383 "ExternIO::Fork(): Failed to set O_NONBLOCK for FD: " + ENO);
384 std::this_thread::sleep_for(2s);
386 }
387
388 LOG(VB_RECORD, LOG_INFO, "Spawned");
389 return;
390 }
391
392 // Child
393 close(in[1]);
394 close(out[0]);
395 close(err[0]);
396 if (dup2( in[0], 0) < 0)
397 {
398 std::cerr << "dup2(stdin) failed: " << strerror(errno);
400 }
401 else if (dup2(out[1], 1) < 0)
402 {
403 std::cerr << "dup2(stdout) failed: " << strerror(errno);
405 }
406 else if (dup2(err[1], 2) < 0)
407 {
408 std::cerr << "dup2(stderr) failed: " << strerror(errno);
410 }
411
412 /* Close all open file descriptors except stdin/stdout/stderr */
413#if HAVE_CLOSE_RANGE
414 close_range(3, sysconf(_SC_OPEN_MAX) - 1, 0);
415#else
416 for (int i = sysconf(_SC_OPEN_MAX) - 1; i > 2; --i)
417 close(i);
418#endif
419
420 /* Set the process group id to be the same as the pid of this
421 * child process. This ensures that any subprocesses launched by this
422 * process can be killed along with the process itself. */
423 if (setpgid(0,0) < 0)
424 {
425 std::cerr << "ExternIO: "
426 << "setpgid() failed: "
427 << strerror(errno) << std::endl;
428 }
429
430 /* run command */
431 char *command = strdup(m_app.canonicalFilePath()
432 .toUtf8().constData());
433 // Copy QStringList to char**
434 char **arguments = new char*[m_args.size() + 1];
435 for (int i = 0; i < m_args.size(); ++i)
436 {
437 int len = m_args[i].size() + 1;
438 arguments[i] = new char[len];
439 memcpy(arguments[i], m_args[i].toStdString().c_str(), len);
440 }
441 arguments[m_args.size()] = nullptr;
442
443 if (execv(command, arguments) < 0)
444 {
445 // Can't use LOG due to locking fun.
446 std::cerr << "ExternIO: "
447 << "execv() failed: "
448 << strerror(errno) << std::endl;
449 }
450 else
451 {
452 std::cerr << "ExternIO: "
453 << "execv() should not be here?: "
454 << strerror(errno) << std::endl;
455 }
456
457#endif // !defined( Q_OS_WINDOWS )
458
459 /* Failed to exec */
460 _exit(GENERIC_EXIT_DAEMONIZING_ERROR); // this exit is ok
461}
462
463
464QMap<int, ExternalStreamHandler*> ExternalStreamHandler::s_handlers;
467
469 int inputid, int majorid)
470{
471 QMutexLocker locker(&s_handlersLock);
472
473 QMap<int, ExternalStreamHandler*>::iterator it = s_handlers.find(majorid);
474
475 if (it == s_handlers.end())
476 {
477 auto *newhandler = new ExternalStreamHandler(devname, inputid, majorid);
478 s_handlers[majorid] = newhandler;
479 s_handlersRefCnt[majorid] = 1;
480
481 LOG(VB_RECORD, LOG_INFO,
482 QString("ExternSH[%1:%2]: Creating new stream handler for %3 "
483 "(1 in use)")
484 .arg(inputid).arg(majorid).arg(devname));
485 }
486 else
487 {
488 ++s_handlersRefCnt[majorid];
489 uint rcount = s_handlersRefCnt[majorid];
490 LOG(VB_RECORD, LOG_INFO,
491 QString("ExternSH[%1:%2]: Using existing stream handler for %3")
492 .arg(inputid).arg(majorid).arg(devname) +
493 QString(" (%1 in use)").arg(rcount));
494 }
495
496 return s_handlers[majorid];
497}
498
500 int inputid)
501{
502 QMutexLocker locker(&s_handlersLock);
503
504 int majorid = ref->m_majorId;
505
506 QMap<int, uint>::iterator rit = s_handlersRefCnt.find(majorid);
507 if (rit == s_handlersRefCnt.end())
508 return;
509
510 QMap<int, ExternalStreamHandler*>::iterator it =
511 s_handlers.find(majorid);
512
513 if (*rit > 1)
514 {
515 ref = nullptr;
516 --(*rit);
517
518 LOG(VB_RECORD, LOG_INFO,
519 QString("ExternSH[%1:%2]: Return handler (%3 still in use)")
520 .arg(inputid).arg(majorid).arg(*rit));
521
522 return;
523 }
524
525 if ((it != s_handlers.end()) && (*it == ref))
526 {
527 LOG(VB_RECORD, LOG_INFO,
528 QString("ExternSH[%1:%2]: Closing handler (0 in use)")
529 .arg(inputid).arg(majorid));
530 delete *it;
531 s_handlers.erase(it);
532 }
533 else
534 {
535 LOG(VB_GENERAL, LOG_ERR,
536 QString("ExternSH[%1:%2]: Error: No handler to return!")
537 .arg(inputid).arg(majorid));
538 }
539
540 s_handlersRefCnt.erase(rit);
541 ref = nullptr;
542}
543
544/*
545 ExternalStreamHandler
546 */
547
549 int inputid,
550 int majorid)
551 : StreamHandler(path, inputid)
552 , m_loc(m_device)
553 , m_majorId(majorid)
554{
555 setObjectName("ExternSH");
556
557 m_args = path.split(' ',Qt::SkipEmptyParts) +
558 logPropagateArgs.split(' ', Qt::SkipEmptyParts);
559 //NOLINTNEXTLINE(cppcoreguidelines-prefer-member-initializer)
560 m_app = m_args.first();
561 m_args.removeFirst();
562
563 // Pass one (and only one) 'quiet'
564 if (!m_args.contains("--quiet") && !m_args.contains("-q"))
565 m_args << "--quiet";
566
567 m_args << "--inputid" << QString::number(majorid);
568 LOG(VB_RECORD, LOG_INFO, LOC + QString("args \"%1\"")
569 .arg(m_args.join(" ")));
570
571 if (!OpenApp())
572 {
573 LOG(VB_GENERAL, LOG_ERR, LOC +
574 QString("Failed to start %1").arg(m_device));
575 }
576}
577
579{
580 return m_streamingCnt.loadAcquire();
581}
582
584{
585 QString result;
586 QString ready_cmd;
587 QByteArray buffer;
588 int sz = 0;
589 uint len = 0;
590 uint read_len = 0;
591 uint restart_cnt = 0;
592 MythTimer status_timer;
593 MythTimer nodata_timer;
594
595 bool good_data = false;
596 uint data_proc_err = 0;
597 uint data_short_err = 0;
598
599 if (!m_io)
600 {
601 LOG(VB_GENERAL, LOG_ERR, LOC +
602 QString("%1 is not running.").arg(m_device));
603 }
604
605 status_timer.start();
606
607 RunProlog();
608
609 LOG(VB_RECORD, LOG_INFO, LOC + "run(): begin");
610
611 SetRunning(true, true, false);
612
613 if (m_pollMode)
614 ready_cmd = "SendBytes";
615 else
616 ready_cmd = "XON";
617
618 uint remainder = 0;
619 while (m_runningDesired && !m_bError)
620 {
621 if (!IsTSOpen())
622 {
623 LOG(VB_RECORD, LOG_WARNING, LOC + "TS not open yet.");
624 std::this_thread::sleep_for(10ms);
625 continue;
626 }
627
628 if (StreamingCount() == 0)
629 {
630 std::this_thread::sleep_for(10ms);
631 continue;
632 }
633
635
636 if (!m_xon || m_pollMode)
637 {
638 if (buffer.size() > TOO_FAST_SIZE)
639 {
640 LOG(VB_RECORD, LOG_WARNING, LOC +
641 "Internal buffer too full to accept more data from "
642 "external application.");
643 }
644 else
645 {
646 if (!ProcessCommand(ready_cmd, result))
647 {
648 if (result.startsWith("ERR"))
649 {
650 LOG(VB_GENERAL, LOG_ERR, LOC +
651 QString("Aborting: %1 -> %2")
652 .arg(ready_cmd, result));
653 m_bError = true;
654 continue;
655 }
656
657 if (restart_cnt++)
658 std::this_thread::sleep_for(20s);
659 if (!RestartStream())
660 {
661 LOG(VB_RECORD, LOG_ERR, LOC +
662 "Failed to restart stream.");
663 m_bError = true;
664 }
665 continue;
666 }
667 m_xon = true;
668 }
669 }
670
671 if (m_xon)
672 {
673 if (status_timer.elapsed() >= 2s)
674 {
675 // Since we may never need to send the XOFF
676 // command, occationally check to see if the
677 // External recorder needs to report an issue.
678 if (CheckForError())
679 {
680 if (restart_cnt++)
681 std::this_thread::sleep_for(20s);
682 if (!RestartStream())
683 {
684 LOG(VB_RECORD, LOG_ERR, LOC +
685 "Failed to restart stream.");
686 m_bError = true;
687 }
688 continue;
689 }
690
691 status_timer.restart();
692 }
693
694 if (buffer.size() > TOO_FAST_SIZE)
695 {
696 if (!m_pollMode)
697 {
698 // Data is comming a little too fast, so XOFF
699 // to give us time to process it.
700 if (!ProcessCommand(QString("XOFF"), result))
701 {
702 if (result.startsWith("ERR"))
703 {
704 LOG(VB_GENERAL, LOG_ERR, LOC +
705 QString("Aborting: XOFF -> %2")
706 .arg(result));
707 m_bError = true;
708 }
709 }
710 m_xon = false;
711 }
712 }
713
714 read_len = 0;
715 if (m_io != nullptr)
716 {
717 sz = PACKET_SIZE - remainder;
718 if (sz > 0)
719 read_len = m_io->Read(buffer, sz, 100ms);
720 }
721 }
722 else
723 {
724 read_len = 0;
725 }
726
727 if (read_len == 0)
728 {
729 if (!nodata_timer.isRunning())
730 nodata_timer.start();
731 else
732 {
733 if (nodata_timer.elapsed() >= 50s)
734 {
735 LOG(VB_GENERAL, LOG_WARNING, LOC +
736 "No data for 50 seconds, Restarting stream.");
737 if (!RestartStream())
738 {
739 LOG(VB_RECORD, LOG_ERR, LOC +
740 "Failed to restart stream.");
741 m_bError = true;
742 }
743 nodata_timer.stop();
744 continue;
745 }
746 }
747
748 std::this_thread::sleep_for(50ms);
749
750 // HLS type streams may only produce data every ~10 seconds
751 if (nodata_timer.elapsed() < 12s && buffer.size() < TS_PACKET_SIZE)
752 continue;
753 }
754 else
755 {
756 nodata_timer.stop();
757 restart_cnt = 0;
758 }
759
760 if (m_io == nullptr)
761 {
762 LOG(VB_GENERAL, LOG_ERR, LOC + "I/O thread has disappeared!");
763 m_bError = true;
764 break;
765 }
766 if (m_io->Error())
767 {
768 LOG(VB_GENERAL, LOG_ERR, LOC +
769 QString("Fatal Error from External Recorder: %1")
770 .arg(m_io->ErrorString()));
771 CloseApp();
772 m_bError = true;
773 break;
774 }
775
776 len = remainder = buffer.size();
777
778 if (len == 0)
779 continue;
780
781 if (len < TS_PACKET_SIZE)
782 {
783 if (m_xon && data_short_err++ == 0)
784 LOG(VB_RECORD, LOG_INFO, LOC + "Waiting for a full TS packet.");
785 std::this_thread::sleep_for(50us);
786 continue;
787 }
788 if (data_short_err)
789 {
790 if (data_short_err > 1)
791 {
792 LOG(VB_RECORD, LOG_INFO, LOC +
793 QString("Waited for a full TS packet %1 times.")
794 .arg(data_short_err));
795 }
796 data_short_err = 0;
797 }
798
799 if (!m_streamLock.tryLock())
800 continue;
801
802 if (!m_listenerLock.tryLock())
803 continue;
804
805 for (auto sit = m_streamDataList.cbegin();
806 sit != m_streamDataList.cend(); ++sit)
807 {
808 remainder = sit.key()->ProcessData
809 (reinterpret_cast<const uint8_t *>
810 (buffer.constData()), buffer.size());
811 }
812
813 m_listenerLock.unlock();
814
815 if (m_replay)
816 {
817 m_replayBuffer += buffer.left(len - remainder);
818 if (m_replayBuffer.size() > (50 * PACKET_SIZE))
819 {
820 m_replayBuffer.remove(0, len - remainder);
821 LOG(VB_RECORD, LOG_WARNING, LOC +
822 QString("Replay size truncated to %1 bytes")
823 .arg(m_replayBuffer.size()));
824 }
825 }
826
827 m_streamLock.unlock();
828
829 if (remainder == 0)
830 {
831 buffer.clear();
832 good_data = (len != 0U);
833 }
834 else if (len > remainder) // leftover bytes
835 {
836 buffer.remove(0, len - remainder);
837 good_data = (len != 0U);
838 }
839 else if (len == remainder)
840 {
841 good_data = false;
842 }
843
844 if (good_data)
845 {
846 if (data_proc_err)
847 {
848 if (data_proc_err > 1)
849 {
850 LOG(VB_RECORD, LOG_WARNING, LOC +
851 QString("Failed to process the data received %1 times.")
852 .arg(data_proc_err));
853 }
854 data_proc_err = 0;
855 }
856 }
857 else
858 {
859 if (data_proc_err++ == 0)
860 {
861 LOG(VB_RECORD, LOG_WARNING, LOC +
862 "Failed to process the data received");
863 }
864 }
865 }
866
867 LOG(VB_RECORD, LOG_INFO, LOC + "run(): " +
868 QString("%1 shutdown").arg(m_bError ? "Error" : "Normal"));
869
871 SetRunning(false, true, false);
872
873 LOG(VB_RECORD, LOG_INFO, LOC + "run(): " + "end");
874
875 RunEpilog();
876}
877
879{
880 QString result;
881
882 if (ProcessCommand("APIVersion?", result, 10s))
883 {
884 QStringList tokens = result.split(':', Qt::SkipEmptyParts);
885 if (tokens.size() > 1)
886 m_apiVersion = tokens[1].toUInt();
887 m_apiVersion = std::min(m_apiVersion, static_cast<int>(MAX_API_VERSION));
888 if (m_apiVersion < 1)
889 {
890 LOG(VB_RECORD, LOG_ERR, LOC +
891 QString("Bad response to 'APIVersion?' - '%1'. "
892 "Expecting 1, 2 or 3").arg(result));
893 m_apiVersion = 1;
894 }
895
896 ProcessCommand(QString("APIVersion:%1").arg(m_apiVersion), result);
897 return true;
898 }
899
900 return false;
901}
902
904{
905 if (m_apiVersion > 1)
906 {
907 QString result;
908
909 if (ProcessCommand("Description?", result))
910 m_loc = result.mid(3);
911 else
912 m_loc = m_device;
913 }
914
915 return m_loc;
916}
917
919{
920 {
921 QMutexLocker locker(&m_ioLock);
922
923 if (m_io)
924 {
925 LOG(VB_RECORD, LOG_WARNING, LOC + "OpenApp: already open!");
926 return true;
927 }
928
929 m_io = new ExternIO(m_app, m_args);
930
931 if (m_io == nullptr)
932 {
933 LOG(VB_GENERAL, LOG_ERR, LOC + "ExternIO failed: " + ENO);
934 m_bError = true;
935 }
936 else
937 {
938 LOG(VB_RECORD, LOG_INFO, LOC + QString("Spawn '%1'").arg(m_device));
939 m_io->Run();
940 if (m_io->Error())
941 {
942 LOG(VB_GENERAL, LOG_ERR,
943 "Failed to start External Recorder: " + m_io->ErrorString());
944 delete m_io;
945 m_io = nullptr;
946 m_bError = true;
947 return false;
948 }
949 }
950 }
951
952 QString result;
953
954 if (!SetAPIVersion())
955 {
956 // Try again using API version 2
957 m_apiVersion = 2;
958 if (!SetAPIVersion())
959 m_apiVersion = 1;
960 }
961
962 if (!IsAppOpen())
963 {
964 LOG(VB_RECORD, LOG_ERR, LOC + "Application is not responding.");
965 m_bError = true;
966 return false;
967 }
968
970
971 // Gather capabilities
972 if (!ProcessCommand("HasTuner?", result))
973 {
974 LOG(VB_RECORD, LOG_ERR, LOC +
975 QString("Bad response to 'HasTuner?' - '%1'").arg(result));
976 m_bError = true;
977 return false;
978 }
979 m_hasTuner = result.startsWith("OK:Yes");
980
981 if (!ProcessCommand("HasPictureAttributes?", result))
982 {
983 LOG(VB_RECORD, LOG_ERR, LOC +
984 QString("Bad response to 'HasPictureAttributes?' - '%1'")
985 .arg(result));
986 m_bError = true;
987 return false;
988 }
989 m_hasPictureAttributes = result.startsWith("OK:Yes");
990
991 /* Operate in "poll" or "xon/xoff" mode */
992 m_pollMode = ProcessCommand("FlowControl?", result) &&
993 result.startsWith("OK:Poll");
994
995 LOG(VB_RECORD, LOG_INFO, LOC + "App opened successfully");
996 LOG(VB_RECORD, LOG_INFO, LOC +
997 QString("Capabilities: tuner(%1) "
998 "Picture attributes(%2) "
999 "Flow control(%3)")
1000 .arg(m_hasTuner ? "yes" : "no",
1001 m_hasPictureAttributes ? "yes" : "no",
1002 m_pollMode ? "Polling" : "XON/XOFF")
1003 );
1004
1005 /* Let the external app know how many bytes will read without blocking */
1006 ProcessCommand(QString("BlockSize:%1").arg(PACKET_SIZE), result);
1007
1008 return true;
1009}
1010
1012{
1013 if (m_io == nullptr)
1014 {
1015 LOG(VB_RECORD, LOG_WARNING, LOC +
1016 "WARNING: Unable to communicate with external app.");
1017 return false;
1018 }
1019
1020 QString result;
1021 return ProcessCommand("Version?", result, 10s);
1022}
1023
1025{
1026 if (m_tsOpen)
1027 return true;
1028
1029 QString result;
1030
1031 if (!ProcessCommand("IsOpen?", result))
1032 return false;
1033
1034 m_tsOpen = true;
1035 return m_tsOpen;
1036}
1037
1039{
1040 m_ioLock.lock();
1041 if (m_io)
1042 {
1043 QString result;
1044
1045 LOG(VB_RECORD, LOG_INFO, LOC + "CloseRecorder");
1046 m_ioLock.unlock();
1047 ProcessCommand("CloseRecorder", result, 10s);
1048 m_ioLock.lock();
1049
1050 if (!result.startsWith("OK"))
1051 {
1052 LOG(VB_RECORD, LOG_INFO, LOC +
1053 "CloseRecorder failed, sending kill.");
1054
1055 QString full_command = QString("%1").arg(m_args.join(" "));
1056
1057 if (!ExternIO::KillIfRunning(full_command))
1058 {
1059 // Give it one more chance.
1060 std::this_thread::sleep_for(50ms);
1061 if (!ExternIO::KillIfRunning(full_command))
1062 {
1063 LOG(VB_GENERAL, LOG_ERR,
1064 QString("Unable to kill existing '%1'.")
1065 .arg(full_command));
1066 return;
1067 }
1068 }
1069 }
1070 delete m_io;
1071 m_io = nullptr;
1072 }
1073 m_ioLock.unlock();
1074}
1075
1077{
1078 bool streaming = (StreamingCount() > 0);
1079
1080 LOG(VB_RECORD, LOG_WARNING, LOC + "Restarting stream.");
1081 m_damaged = true;
1082
1083 if (streaming)
1084 StopStreaming();
1085
1086 std::this_thread::sleep_for(1s);
1087
1088 if (streaming)
1089 return StartStreaming();
1090
1091 return true;
1092}
1093
1095{
1096 if (m_replay)
1097 {
1098 QString result;
1099
1100 // Let the external app know that we could be busy for a little while
1101 if (!m_pollMode)
1102 {
1103 ProcessCommand(QString("XOFF"), result);
1104 m_xon = false;
1105 }
1106
1107 /* If the input is not a 'broadcast' it may only have one
1108 * copy of the SPS right at the beginning of the stream,
1109 * so make sure we don't miss it!
1110 */
1111 QMutexLocker listen_lock(&m_listenerLock);
1112
1113 if (!m_streamDataList.empty())
1114 {
1115 for (auto sit = m_streamDataList.cbegin();
1116 sit != m_streamDataList.cend(); ++sit)
1117 {
1118 sit.key()->ProcessData(reinterpret_cast<const uint8_t *>
1119 (m_replayBuffer.constData()),
1120 m_replayBuffer.size());
1121 }
1122 }
1123 LOG(VB_RECORD, LOG_INFO, LOC + QString("Replayed %1 bytes")
1124 .arg(m_replayBuffer.size()));
1125 m_replayBuffer.clear();
1126 m_replay = false;
1127
1128 // Let the external app know that we are ready
1129 if (!m_pollMode)
1130 {
1131 if (ProcessCommand(QString("XON"), result))
1132 m_xon = true;
1133 }
1134 }
1135}
1136
1138{
1139 QString result;
1140
1141 QMutexLocker locker(&m_streamLock);
1142
1144
1145 LOG(VB_RECORD, LOG_INFO, LOC +
1146 QString("StartStreaming with %1 current listeners")
1147 .arg(StreamingCount()));
1148
1149 if (!IsAppOpen())
1150 {
1151 LOG(VB_GENERAL, LOG_ERR, LOC + "External Recorder not started.");
1152 return false;
1153 }
1154
1155 if (StreamingCount() == 0)
1156 {
1157 if (!ProcessCommand("StartStreaming", result, 15s))
1158 {
1159 LogLevel_t level = LOG_ERR;
1160 if (result.startsWith("warn", Qt::CaseInsensitive))
1161 level = LOG_WARNING;
1162 else
1163 m_bError = true;
1164
1165 LOG(VB_GENERAL, level, LOC + QString("StartStreaming failed: '%1'")
1166 .arg(result));
1167
1168 return false;
1169 }
1170
1171 LOG(VB_RECORD, LOG_INFO, LOC + "Streaming started");
1172 }
1173 else
1174 {
1175 LOG(VB_RECORD, LOG_INFO, LOC + "Already streaming");
1176 }
1177
1178 m_streamingCnt.ref();
1179
1180 LOG(VB_RECORD, LOG_INFO, LOC +
1181 QString("StartStreaming %1 listeners")
1182 .arg(StreamingCount()));
1183
1184 return true;
1185}
1186
1188{
1189 QMutexLocker locker(&m_streamLock);
1190
1191 LOG(VB_RECORD, LOG_INFO, LOC +
1192 QString("StopStreaming %1 listeners")
1193 .arg(StreamingCount()));
1194
1195 if (StreamingCount() == 0)
1196 {
1197 LOG(VB_RECORD, LOG_INFO, LOC +
1198 "StopStreaming requested, but we are not streaming!");
1199 return true;
1200 }
1201
1202 if (m_streamingCnt.deref())
1203 {
1204 LOG(VB_RECORD, LOG_INFO, LOC +
1205 QString("StopStreaming delayed, still have %1 listeners")
1206 .arg(StreamingCount()));
1207 return true;
1208 }
1209
1210 LOG(VB_RECORD, LOG_INFO, LOC + "StopStreaming");
1211
1212 if (!m_pollMode && m_xon)
1213 {
1214 QString result;
1215 ProcessCommand(QString("XOFF"), result);
1216 m_xon = false;
1217 }
1218
1219 if (!IsAppOpen())
1220 {
1221 LOG(VB_GENERAL, LOG_ERR, LOC + "External Recorder not started.");
1222 return false;
1223 }
1224
1225 QString result;
1226 if (!ProcessCommand("StopStreaming", result, 10s))
1227 {
1228 LogLevel_t level = LOG_ERR;
1229 if (result.startsWith("warn", Qt::CaseInsensitive))
1230 level = LOG_WARNING;
1231 else
1232 m_bError = true;
1233
1234 LOG(VB_GENERAL, level, LOC + QString("StopStreaming: '%1'")
1235 .arg(result));
1236
1237 return false;
1238 }
1239
1240 PurgeBuffer();
1241 LOG(VB_RECORD, LOG_INFO, LOC + "Streaming stopped");
1242
1243 return true;
1244}
1245
1247 QString & result,
1248 std::chrono::milliseconds timeout,
1249 uint retry_cnt)
1250{
1251 QMutexLocker locker(&m_processLock);
1252
1253 if (m_apiVersion == 3)
1254 {
1255 QVariantMap vcmd;
1256 QVariantMap vresult;
1257 QByteArray response;
1258 QStringList tokens = cmd.split(':');
1259 vcmd["command"] = tokens[0];
1260 if (tokens.size() > 1)
1261 vcmd["value"] = tokens[1];
1262
1263 LOG(VB_RECORD, LOG_DEBUG, LOC +
1264 QString("Arguments: %1").arg(tokens.join("\n")));
1265
1266 bool r = ProcessJson(vcmd, vresult, response, timeout, retry_cnt);
1267 result = QString("%1:%2").arg(vresult["status"].toString(),
1268 vresult["message"].toString());
1269 return r;
1270 }
1271 if (m_apiVersion == 2)
1272 return ProcessVer2(cmd, result, timeout, retry_cnt);
1273 if (m_apiVersion == 1)
1274 return ProcessVer1(cmd, result, timeout, retry_cnt);
1275
1276 LOG(VB_RECORD, LOG_ERR, LOC +
1277 QString("Invalid API version %1. Expected 1 or 2").arg(m_apiVersion));
1278 return false;
1279}
1280
1281bool ExternalStreamHandler::ProcessVer1(const QString & cmd,
1282 QString & result,
1283 std::chrono::milliseconds timeout,
1284 uint retry_cnt)
1285{
1286 LOG(VB_RECORD, LOG_DEBUG, LOC + QString("ProcessVer1('%1')")
1287 .arg(cmd));
1288
1289 for (uint cnt = 0; cnt < retry_cnt; ++cnt)
1290 {
1291 QMutexLocker locker(&m_ioLock);
1292
1293 if (!m_io)
1294 {
1295 LOG(VB_RECORD, LOG_ERR, LOC + "External I/O not ready!");
1296 return false;
1297 }
1298
1299 QByteArray buf(cmd.toUtf8(), cmd.size());
1300 buf += '\n';
1301
1302 if (m_io->Error())
1303 {
1304 LOG(VB_GENERAL, LOG_ERR, LOC + "External Recorder in bad state: " +
1305 m_io->ErrorString());
1306 return false;
1307 }
1308
1309 /* Try to keep in sync, if External app was too slow in responding
1310 * to previous query, consume the response before sending new query */
1311 m_io->GetStatus(0ms);
1312
1313 /* Send new query */
1314 m_io->Write(buf);
1315
1317 while (timer.elapsed() < timeout)
1318 {
1319 result = m_io->GetStatus(timeout);
1320 if (m_io->Error())
1321 {
1322 LOG(VB_GENERAL, LOG_ERR, LOC +
1323 "Failed to read from External Recorder: " +
1324 m_io->ErrorString());
1325 m_bError = true;
1326 return false;
1327 }
1328
1329 // Out-of-band error message
1330 if (result.startsWith("STATUS:ERR") ||
1331 result.startsWith("0:STATUS:ERR"))
1332 {
1333 LOG(VB_RECORD, LOG_ERR, LOC + result);
1334 result.remove(0, result.indexOf(":ERR") + 1);
1335 return false;
1336 }
1337 // STATUS message are "out of band".
1338 // Ignore them while waiting for a responds to a command
1339 if (!result.startsWith("STATUS") && !result.startsWith("0:STATUS"))
1340 break;
1341 LOG(VB_RECORD, LOG_INFO, LOC +
1342 QString("Ignoring response '%1'").arg(result));
1343 }
1344
1345 if (result.size() < 1)
1346 {
1347 LOG(VB_GENERAL, LOG_WARNING, LOC +
1348 QString("External Recorder did not respond to '%1'").arg(cmd));
1349 }
1350 else
1351 {
1352 bool okay = result.startsWith("OK");
1353 if (okay || result.startsWith("WARN") || result.startsWith("ERR"))
1354 {
1355 LogLevel_t level = LOG_INFO;
1356
1357 m_ioErrCnt = 0;
1358 if (!okay)
1359 level = LOG_WARNING;
1360 else if (cmd.startsWith("SendBytes"))
1361 level = LOG_DEBUG;
1362
1363 LOG(VB_RECORD, level,
1364 LOC + QString("ProcessCommand('%1') = '%2' took %3ms %4")
1365 .arg(cmd, result,
1366 QString::number(timer.elapsed().count()),
1367 okay ? "" : "<-- NOTE"));
1368
1369 return okay;
1370 }
1371 LOG(VB_GENERAL, LOG_WARNING, LOC +
1372 QString("External Recorder invalid response to '%1': '%2'")
1373 .arg(cmd, result));
1374 }
1375
1376 if (++m_ioErrCnt > 10)
1377 {
1378 LOG(VB_GENERAL, LOG_ERR, LOC + "Too many I/O errors.");
1379 m_bError = true;
1380 break;
1381 }
1382 }
1383
1384 return false;
1385}
1386
1387bool ExternalStreamHandler::ProcessVer2(const QString & command,
1388 QString & result,
1389 std::chrono::milliseconds timeout,
1390 uint retry_cnt)
1391{
1392 QString status;
1393 QString raw;
1394
1395 for (uint cnt = 0; cnt < retry_cnt; ++cnt)
1396 {
1397 QString cmd = QString("%1:%2").arg(++m_serialNo).arg(command);
1398
1399 LOG(VB_RECORD, LOG_DEBUG, LOC + QString("ProcessVer2('%1') serial(%2)")
1400 .arg(cmd).arg(m_serialNo));
1401
1402 QMutexLocker locker(&m_ioLock);
1403
1404 if (!m_io)
1405 {
1406 LOG(VB_RECORD, LOG_ERR, LOC + "External I/O not ready!");
1407 return false;
1408 }
1409
1410 QByteArray buf(cmd.toUtf8(), cmd.size());
1411 buf += '\n';
1412
1413 if (m_io->Error())
1414 {
1415 LOG(VB_GENERAL, LOG_ERR, LOC + "External Recorder in bad state: " +
1416 m_io->ErrorString());
1417 return false;
1418 }
1419
1420 /* Send query */
1421 m_io->Write(buf);
1422
1423 QStringList tokens;
1424
1426 while (timer.elapsed() < timeout)
1427 {
1428 result = m_io->GetStatus(timeout);
1429 if (m_io->Error())
1430 {
1431 LOG(VB_GENERAL, LOG_ERR, LOC +
1432 "Failed to read from External Recorder: " +
1433 m_io->ErrorString());
1434 m_bError = true;
1435 return false;
1436 }
1437
1438 if (!result.isEmpty())
1439 {
1440 raw = result;
1441 tokens = result.split(':', Qt::SkipEmptyParts);
1442
1443 // Look for result with the serial number of this query
1444 if (tokens.size() > 1 && tokens[0].toUInt() >= m_serialNo)
1445 break;
1446
1447 /* Other messages are "out of band" */
1448
1449 // Check for error message missing serial#
1450 if (tokens[0].startsWith("ERR"))
1451 break;
1452
1453 // Remove serial#
1454 tokens.removeFirst();
1455 result = tokens.join(':');
1456 bool err = (tokens.size() > 1 && tokens[1].startsWith("ERR"));
1457 LOG(VB_RECORD, (err ? LOG_WARNING : LOG_INFO), LOC + raw);
1458 if (err)
1459 {
1460 // Remove "STATUS"
1461 tokens.removeFirst();
1462 result = tokens.join(':');
1463 return false;
1464 }
1465 }
1466 }
1467
1468 if (timer.elapsed() >= timeout)
1469 {
1470 LOG(VB_RECORD, LOG_ERR, LOC +
1471 QString("ProcessVer2: Giving up waiting for response for "
1472 "command '%2'").arg(cmd));
1473 }
1474 else if (tokens.size() < 2)
1475 {
1476 LOG(VB_RECORD, LOG_ERR, LOC +
1477 QString("Did not receive a valid response "
1478 "for command '%1', received '%2'").arg(cmd, result));
1479 }
1480 else if (tokens[0].toUInt() > m_serialNo)
1481 {
1482 LOG(VB_RECORD, LOG_ERR, LOC +
1483 QString("ProcessVer2: Looking for serial no %1, "
1484 "but received %2 for command '%2'")
1485 .arg(QString::number(m_serialNo), tokens[0], cmd));
1486 }
1487 else
1488 {
1489 tokens.removeFirst();
1490 status = tokens[0].trimmed();
1491 result = tokens.join(':');
1492
1493 bool okay = (status == "OK");
1494 if (okay || status.startsWith("WARN") || status.startsWith("ERR"))
1495 {
1496 LogLevel_t level = LOG_INFO;
1497
1498 m_ioErrCnt = 0;
1499 if (!okay)
1500 level = LOG_WARNING;
1501 else if (command.startsWith("SendBytes") ||
1502 (command.startsWith("TuneStatus") &&
1503 result == "OK:InProgress"))
1504 level = LOG_DEBUG;
1505
1506 LOG(VB_RECORD, level,
1507 LOC + QString("ProcessV2('%1') = '%2' took %3ms %4")
1508 .arg(cmd, result, QString::number(timer.elapsed().count()),
1509 okay ? "" : "<-- NOTE"));
1510
1511 return okay;
1512 }
1513 LOG(VB_GENERAL, LOG_WARNING, LOC +
1514 QString("External Recorder invalid response to '%1': '%2'")
1515 .arg(cmd, result));
1516 }
1517
1518 if (++m_ioErrCnt > 10)
1519 {
1520 LOG(VB_GENERAL, LOG_ERR, LOC + "Too many I/O errors.");
1521 m_bError = true;
1522 break;
1523 }
1524 }
1525
1526 return false;
1527}
1528
1529bool ExternalStreamHandler::ProcessJson(const QVariantMap & vmsg,
1530 QVariantMap & elements,
1531 QByteArray & response,
1532 std::chrono::milliseconds timeout,
1533 uint retry_cnt)
1534{
1535 for (uint cnt = 0; cnt < retry_cnt; ++cnt)
1536 {
1537 QVariantMap query(vmsg);
1538
1539 uint serial = ++m_serialNo;
1540 query["serial"] = serial;
1541 QString cmd = query["command"].toString();
1542
1543 QJsonDocument qdoc;
1544 qdoc = QJsonDocument::fromVariant(query);
1545 QByteArray cmdbuf = qdoc.toJson(QJsonDocument::Compact);
1546
1547 LOG(VB_RECORD, LOG_DEBUG, LOC +
1548 QString("ProcessJson: %1").arg(QString(cmdbuf)));
1549
1550 if (m_io->Error())
1551 {
1552 LOG(VB_GENERAL, LOG_ERR, LOC + "External Recorder in bad state: " +
1553 m_io->ErrorString());
1554 return false;
1555 }
1556
1557 /* Send query */
1558 m_io->Write(cmdbuf);
1559 m_io->Write("\n");
1560
1562 while (timer.elapsed() < timeout)
1563 {
1564 response = m_io->GetStatus(timeout);
1565 if (m_io->Error())
1566 {
1567 LOG(VB_GENERAL, LOG_ERR, LOC +
1568 "Failed to read from External Recorder: " +
1569 m_io->ErrorString());
1570 m_bError = true;
1571 return false;
1572 }
1573
1574 if (!response.isEmpty())
1575 {
1576 QJsonParseError parseError {};
1577 QJsonDocument doc;
1578
1579 doc = QJsonDocument::fromJson(response, &parseError);
1580
1581 if (parseError.error != QJsonParseError::NoError)
1582 {
1583 LOG(VB_GENERAL, LOG_ERR, LOC +
1584 QString("ExternalRecorder returned invalid JSON message: %1: %2\n%3\nfor\n%4")
1585 .arg(parseError.offset)
1586 .arg(parseError.errorString(),
1587 QString(response),
1588 QString(cmdbuf)));
1589 }
1590 else
1591 {
1592 elements = doc.toVariant().toMap();
1593 if (!elements.contains("serial"))
1594 continue;
1595
1596 serial = elements["serial"].toInt();
1597 if (serial >= m_serialNo)
1598 break;
1599
1600 if (elements.contains("status"))
1601 {
1602 LogLevel_t level { LOG_INFO };
1603
1604 if (elements["status"] == "ERR")
1605 level = LOG_ERR;
1606 else if (elements["status"] == "WARN")
1607 level = LOG_WARNING;
1608
1609 LOG(VB_RECORD, level, LOC + QString("%1: %2")
1610 .arg(elements["status"].toString(),
1611 elements["message"].toString()));
1612 }
1613 }
1614 }
1615 }
1616
1617 if (timer.elapsed() >= timeout)
1618 {
1619 LOG(VB_RECORD, LOG_ERR, LOC +
1620 QString("ProcessJson: Giving up waiting for response for "
1621 "command '%2'").arg(QString(cmdbuf)));
1622
1623 }
1624
1625 if (serial > m_serialNo)
1626 {
1627 LOG(VB_RECORD, LOG_ERR, LOC +
1628 QString("ProcessJson: Looking for serial no %1, "
1629 "but received %2 for command '%2'")
1630 .arg(QString::number(m_serialNo))
1631 .arg(serial)
1632 .arg(QString(cmdbuf)));
1633 }
1634 else if (!elements.contains("status"))
1635 {
1636 LOG(VB_RECORD, LOG_ERR, LOC +
1637 QString("ProcessJson: ExternalRecorder 'status' not found in %1")
1638 .arg(QString(response)));
1639 }
1640 else
1641 {
1642 QString status = elements["status"].toString();
1643 bool okay = (status == "OK");
1644 if (okay || status == "WARN" || status == "ERR")
1645 {
1646 LogLevel_t level = LOG_INFO;
1647
1648 m_ioErrCnt = 0;
1649 if (!okay)
1650 level = LOG_WARNING;
1651 else if (cmd == "SendBytes" ||
1652 (cmd == "TuneStatus?" &&
1653 elements["message"] == "InProgress"))
1654 level = LOG_DEBUG;
1655
1656 LOG(VB_RECORD, level,
1657 LOC + QString("ProcessJson('%1') = %2:%3:%4 took %5ms %6")
1658 .arg(QString(cmdbuf))
1659 .arg(elements["serial"].toInt())
1660 .arg(elements["status"].toString(),
1661 elements["message"].toString(),
1662 QString::number(timer.elapsed().count()),
1663 okay ? "" : "<-- NOTE")
1664 );
1665
1666 return okay;
1667 }
1668 LOG(VB_GENERAL, LOG_WARNING, LOC +
1669 QString("External Recorder invalid response to '%1': '%2'")
1670 .arg(QString(cmdbuf),
1671 QString(response)));
1672 }
1673
1674 if (++m_ioErrCnt > 10)
1675 {
1676 LOG(VB_GENERAL, LOG_ERR, LOC + "Too many I/O errors.");
1677 m_bError = true;
1678 break;
1679 }
1680 }
1681
1682 return false;
1683}
1684
1686{
1687 QByteArray response;
1688 bool err = false;
1689
1690 QMutexLocker locker(&m_ioLock);
1691
1692 if (!m_io)
1693 {
1694 LOG(VB_RECORD, LOG_ERR, LOC + "External I/O not ready!");
1695 return true;
1696 }
1697
1698 if (m_io->Error())
1699 {
1700 LOG(VB_GENERAL, LOG_ERR, "External Recorder in bad state: " +
1701 m_io->ErrorString());
1702 return true;
1703 }
1704
1705 response = m_io->GetStatus(0ms);
1706 while (!response.isEmpty())
1707 {
1708 if (m_apiVersion > 2)
1709 {
1710 QJsonParseError parseError {};
1711 QJsonDocument doc;
1712 QVariantMap elements;
1713
1714 doc = QJsonDocument::fromJson(response, &parseError);
1715
1716 if (parseError.error != QJsonParseError::NoError)
1717 {
1718 LOG(VB_GENERAL, LOG_ERR, LOC +
1719 QString("ExternalRecorder returned invalid JSON message: %1: %2\n%3\n")
1720 .arg(parseError.offset)
1721 .arg(parseError.errorString(), QString(response)));
1722 }
1723 else
1724 {
1725 elements = doc.toVariant().toMap();
1726 if (elements.contains("command") &&
1727 elements["command"] == "STATUS")
1728 {
1729 LogLevel_t level { LOG_INFO };
1730 QString status = elements["status"].toString();
1731 if (status.startsWith("err", Qt::CaseInsensitive))
1732 {
1733 level = LOG_ERR;
1734 err |= true;
1735 }
1736 else if (status.startsWith("warn",
1737 Qt::CaseInsensitive))
1738 {
1739 level = LOG_WARNING;
1740 }
1741 else if (status.startsWith("damage",
1742 Qt::CaseInsensitive))
1743 {
1744 level = LOG_WARNING;
1745 m_damaged |= true;
1746 }
1747 LOG(VB_RECORD, level,
1748 LOC + QString("%1:%2%3")
1749 .arg(status, elements["message"].toString(),
1750 m_damaged ? " (Damaged)" : ""));
1751 }
1752 }
1753 }
1754 else
1755 {
1756 QString res = QString(response);
1757 if (m_apiVersion == 2)
1758 {
1759 QStringList tokens = res.split(':', Qt::SkipEmptyParts);
1760 tokens.removeFirst();
1761 res = tokens.join(':');
1762 for (int idx = 1; idx < tokens.size(); ++idx)
1763 {
1764 err |= tokens[idx].startsWith("ERR",
1765 Qt::CaseInsensitive);
1766 m_damaged |= tokens[idx].startsWith("damage",
1767 Qt::CaseInsensitive);
1768 }
1769 }
1770 else
1771 {
1772 err |= res.startsWith("STATUS:ERR",
1773 Qt::CaseInsensitive);
1774 m_damaged |= res.startsWith("STATUS:DAMAGE",
1775 Qt::CaseInsensitive);
1776 }
1777
1778 LOG(VB_RECORD, (err ? LOG_WARNING : LOG_INFO), LOC + res);
1779 }
1780
1781 response = m_io->GetStatus(0ms);
1782 }
1783
1784 return err;
1785}
1786
1788{
1789 if (m_io)
1790 {
1791 QByteArray buffer;
1792 m_io->Read(buffer, PACKET_SIZE, 1ms);
1793 m_io->GetStatus(1ms);
1794 }
1795}
1796
1798{
1799 // TODO report on buffer overruns, etc.
1800}
#define LOC
QStringList m_args
bool Ready(int fd, std::chrono::milliseconds timeout, const QString &what)
int Write(const QByteArray &buffer)
static bool KillIfRunning(const QString &cmd)
int Read(QByteArray &buffer, int maxlen, std::chrono::milliseconds timeout=2500ms)
static constexpr uint8_t kMaxErrorCnt
QByteArray GetStatus(std::chrono::milliseconds timeout=2500ms)
QTextStream m_status
bool Error(void) const
ExternIO(const QString &app, const QStringList &args)
QString ErrorString(void) const
bool ProcessVer1(const QString &cmd, QString &result, std::chrono::milliseconds timeout, uint retry_cnt)
static ExternalStreamHandler * Get(const QString &devname, int inputid, int majorid)
void run(void) override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.
ExternalStreamHandler(const QString &path, int inputid, int majorid)
bool ProcessVer2(const QString &command, QString &result, std::chrono::milliseconds timeout, uint retry_cnt)
static QMap< int, uint > s_handlersRefCnt
bool ProcessCommand(const QString &cmd, QString &result, std::chrono::milliseconds timeout=4s, uint retry_cnt=3)
static QMap< int, ExternalStreamHandler * > s_handlers
bool ProcessJson(const QVariantMap &vmsg, QVariantMap &elements, QByteArray &response, std::chrono::milliseconds timeout=4s, uint retry_cnt=3)
void PriorityEvent(int fd) override
static void Return(ExternalStreamHandler *&ref, int inputid)
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:196
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:209
void setObjectName(const QString &name)
Definition: mthread.cpp:238
A QElapsedTimer based timer to replace use of QTime as a timer.
Definition: mythtimer.h:14
std::chrono::milliseconds restart(void)
Returns milliseconds elapsed since last start() or restart() and resets the count.
Definition: mythtimer.cpp:62
std::chrono::milliseconds elapsed(void)
Returns milliseconds elapsed since last start() or restart()
Definition: mythtimer.cpp:91
bool isRunning(void) const
Returns true if start() or restart() has been called at least once since construction and since any c...
Definition: mythtimer.cpp:135
void stop(void)
Stops timer, next call to isRunning() will return false and any calls to elapsed() or restart() will ...
Definition: mythtimer.cpp:78
@ kStartRunning
Definition: mythtimer.h:17
void start(void)
starts measuring elapsed time.
Definition: mythtimer.cpp:47
StreamDataList m_streamDataList
QString m_device
volatile bool m_runningDesired
volatile bool m_bError
bool RemoveAllPIDFilters(void)
void SetRunning(bool running, bool using_buffering, bool using_section_reader)
bool UpdateFiltersFromStreamData(void)
QRecursiveMutex m_listenerLock
#define O_NONBLOCK
Definition: compat.h:215
unsigned int uint
Definition: compat.h:68
#define close
Definition: compat.h:31
#define WEXITSTATUS(w)
Definition: compat.h:144
@ GENERIC_EXIT_DAEMONIZING_ERROR
Error daemonizing or execl.
Definition: exitcodes.h:31
@ GENERIC_EXIT_PIPE_FAILURE
Error creating I/O pipes.
Definition: exitcodes.h:29
QString logPropagateArgs
Definition: logging.cpp:85
#define ENO
This can be appended to the LOG args with "+".
Definition: mythlogging.h:74
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
Definition: mythlogging.h:39
Convenience inline random number generator functions.
QString toString(const QDateTime &raw_dt, uint format)
Returns formatted string representing the time.
Definition: mythdate.cpp:93
def read(device=None, features=[])
Definition: disc.py:35
def error(message)
Definition: smolt.py:409
def write(text, progress=True)
Definition: mythburn.py:307