MythTV  master
ExternalStreamHandler.cpp
Go to the documentation of this file.
1 // -*- Mode: c++ -*-
2 
3 // POSIX headers
4 #include <thread>
5 #include <iostream>
6 #include <fcntl.h>
7 #include <unistd.h>
8 #include <algorithm>
9 #if !defined( USING_MINGW ) && !defined( _MSC_VER )
10 #include <poll.h>
11 #include <sys/ioctl.h>
12 #endif
13 
14 #include <QtGlobal>
15 #ifdef Q_OS_ANDROID
16 #include <sys/wait.h>
17 #endif
18 
19 // Qt headers
20 #include <QString>
21 #include <QFile>
22 #include <QJsonDocument>
23 #include <QJsonObject>
24 
25 // MythTV headers
26 #include "config.h"
27 #include "libmythbase/exitcodes.h"
28 
29 #include "ExternalChannel.h"
30 #include "ExternalStreamHandler.h"
31 //#include "ThreadedFileWriter.h"
32 #include "cardutil.h"
33 #include "dtvsignalmonitor.h"
34 #include "mpeg/mpegstreamdata.h"
35 #include "mpeg/streamlisteners.h"
36 
37 #define LOC QString("ExternSH[%1](%2): ").arg(m_inputId).arg(m_loc)
38 
39 ExternIO::ExternIO(const QString & app,
40  const QStringList & args)
41  : m_app(QFileInfo(app)),
42  m_status(&m_statusBuf, QIODevice::ReadWrite)
43 {
44  if (!m_app.exists())
45  {
46  m_error = QString("ExternIO: '%1' does not exist.").arg(app);
47  return;
48  }
49  if (!m_app.isReadable() || !m_app.isFile())
50  {
51  m_error = QString("ExternIO: '%1' is not readable.")
52  .arg(m_app.canonicalFilePath());
53  return;
54  }
55  if (!m_app.isExecutable())
56  {
57  m_error = QString("ExternIO: '%1' is not executable.")
58  .arg(m_app.canonicalFilePath());
59  return;
60  }
61 
62  m_args = args;
63  m_args.prepend(m_app.baseName());
64 
65  m_status.setString(&m_statusBuf);
66 }
67 
69 {
70  close(m_appIn);
71  close(m_appOut);
72  close(m_appErr);
73 
74  // waitpid(m_pid, &status, 0);
75  delete[] m_buffer;
76 }
77 
78 bool ExternIO::Ready([[maybe_unused]] int fd,
79  [[maybe_unused]] std::chrono::milliseconds timeout,
80  [[maybe_unused]] const QString & what)
81 {
82 #if !defined( USING_MINGW ) && !defined( _MSC_VER )
83  std::array<struct pollfd,2> m_poll {};
84 
85  m_poll[0].fd = fd;
86  m_poll[0].events = POLLIN | POLLPRI;
87  int ret = poll(m_poll.data(), 1, timeout.count());
88 
89  if (m_poll[0].revents & POLLHUP)
90  {
91  m_error = what + " poll eof (POLLHUP)";
92  return false;
93  }
94  if (m_poll[0].revents & POLLNVAL)
95  {
96  LOG(VB_GENERAL, LOG_ERR, "poll error");
97  return false;
98  }
99  if (m_poll[0].revents & POLLIN)
100  {
101  if (ret > 0)
102  return true;
103 
104  if ((EOVERFLOW == errno))
105  m_error = "poll overflow";
106  return false;
107  }
108 #endif // !defined( USING_MINGW ) && !defined( _MSC_VER )
109  return false;
110 }
111 
112 int ExternIO::Read(QByteArray & buffer, int maxlen, std::chrono::milliseconds timeout)
113 {
114  if (Error())
115  {
116  LOG(VB_RECORD, LOG_ERR,
117  QString("ExternIO::Read: already in error state: '%1'")
118  .arg(m_error));
119  return 0;
120  }
121 
122  if (!Ready(m_appOut, timeout, "data"))
123  return 0;
124 
125  if (m_bufSize < maxlen)
126  {
127  m_bufSize = maxlen;
128  delete [] m_buffer;
129  m_buffer = new char[m_bufSize];
130  }
131 
132  int len = read(m_appOut, m_buffer, maxlen);
133 
134  if (len < 0)
135  {
136  if (errno == EAGAIN)
137  {
138  if (++m_errCnt > kMaxErrorCnt)
139  {
140  m_error = "Failed to read from External Recorder: " + ENO;
141  LOG(VB_RECORD, LOG_WARNING,
142  "External Recorder not ready. Giving up.");
143  }
144  else
145  {
146  LOG(VB_RECORD, LOG_WARNING,
147  QString("External Recorder not ready. Will retry (%1/%2).")
148  .arg(m_errCnt).arg(kMaxErrorCnt));
149  std::this_thread::sleep_for(100ms);
150  }
151  }
152  else
153  {
154  m_error = "Failed to read from External Recorder: " + ENO;
155  LOG(VB_RECORD, LOG_ERR, m_error);
156  }
157  }
158  else
159  {
160  m_errCnt = 0;
161  }
162 
163  if (len == 0)
164  return 0;
165 
166  buffer.append(m_buffer, len);
167 
168  LOG(VB_RECORD, LOG_DEBUG,
169  QString("ExternIO::Read '%1' bytes, buffer size %2")
170  .arg(len).arg(buffer.size()));
171 
172  return len;
173 }
174 
175 QByteArray ExternIO::GetStatus(std::chrono::milliseconds timeout)
176 {
177  if (Error())
178  {
179  LOG(VB_RECORD, LOG_ERR,
180  QString("ExternIO::GetStatus: already in error state: '%1'")
181  .arg(m_error));
182  return {};
183  }
184 
185  std::chrono::milliseconds waitfor = m_status.atEnd() ? timeout : 0ms;
186  if (Ready(m_appErr, waitfor, "status"))
187  {
188  std::array<char,2048> buffer {};
189  int len = read(m_appErr, buffer.data(), buffer.size());
190  m_status << QString::fromLatin1(buffer.data(), len);
191  }
192 
193  if (m_status.atEnd())
194  return {};
195 
196  QString msg = m_status.readLine();
197 
198  LOG(VB_RECORD, LOG_DEBUG, QString("ExternIO::GetStatus '%1'")
199  .arg(msg));
200 
201  return msg.toUtf8();
202 }
203 
204 int ExternIO::Write(const QByteArray & buffer)
205 {
206  if (Error())
207  {
208  LOG(VB_RECORD, LOG_ERR,
209  QString("ExternIO::Write: already in error state: '%1'")
210  .arg(m_error));
211  return -1;
212  }
213 
214  LOG(VB_RECORD, LOG_DEBUG, QString("ExternIO::Write('%1')")
215  .arg(QString(buffer).simplified()));
216 
217  int len = write(m_appIn, buffer.constData(), buffer.size());
218  if (len != buffer.size())
219  {
220  if (len > 0)
221  {
222  LOG(VB_RECORD, LOG_WARNING,
223  QString("ExternIO::Write: only wrote %1 of %2 bytes '%3'")
224  .arg(len).arg(buffer.size()).arg(QString(buffer)));
225  }
226  else
227  {
228  m_error = QString("ExternIO: Failed to write '%1' to app's stdin: ")
229  .arg(QString(buffer)) + ENO;
230  return -1;
231  }
232  }
233 
234  return len;
235 }
236 
237 bool ExternIO::Run(void)
238 {
239  LOG(VB_RECORD, LOG_INFO, QString("ExternIO::Run()"));
240 
241  Fork();
242  GetStatus(10ms);
243 
244  return true;
245 }
246 
247 /* Return true if the process is not, or is no longer running */
248 bool ExternIO::KillIfRunning([[maybe_unused]] const QString & cmd)
249 {
250 #if defined(Q_OS_DARWIN) || defined(__FreeBSD__) || defined(__OpenBSD__)
251  return false;
252 #elif defined USING_MINGW
253  return false;
254 #elif defined( _MSC_VER )
255  return false;
256 #else
257  QString grp = QString("pgrep -x -f -- \"%1\" 2>&1 > /dev/null").arg(cmd);
258  QString kil = QString("pkill --signal 15 -x -f -- \"%1\" 2>&1 > /dev/null")
259  .arg(cmd);
260 
261  int res_grp = system(grp.toUtf8().constData());
262  if (WEXITSTATUS(res_grp) == 1)
263  {
264  LOG(VB_RECORD, LOG_DEBUG, QString("'%1' not running.").arg(cmd));
265  return true;
266  }
267 
268  LOG(VB_RECORD, LOG_WARNING, QString("'%1' already running, killing...")
269  .arg(cmd));
270  int res_kil = system(kil.toUtf8().constData());
271  if (WEXITSTATUS(res_kil) == 1)
272  LOG(VB_GENERAL, LOG_WARNING, QString("'%1' failed: %2")
273  .arg(kil, ENO));
274 
275  res_grp = system(grp.toUtf8().constData());
276  if (WEXITSTATUS(res_grp) == 1)
277  {
278  LOG(WEXITSTATUS(res_kil) == 0 ? VB_RECORD : VB_GENERAL, LOG_WARNING,
279  QString("'%1' terminated.").arg(cmd));
280  return true;
281  }
282 
283  std::this_thread::sleep_for(50ms);
284 
285  kil = QString("pkill --signal 9 -x -f \"%1\" 2>&1 > /dev/null").arg(cmd);
286  res_kil = system(kil.toUtf8().constData());
287  if (WEXITSTATUS(res_kil) > 0)
288  LOG(VB_GENERAL, LOG_WARNING, QString("'%1' failed: %2")
289  .arg(kil, ENO));
290 
291  res_grp = system(grp.toUtf8().constData());
292  LOG(WEXITSTATUS(res_kil) == 0 ? VB_RECORD : VB_GENERAL, LOG_WARNING,
293  QString("'%1' %2.")
294  .arg(cmd, WEXITSTATUS(res_grp) == 0 ? "sill running" : "terminated"));
295 
296  return (WEXITSTATUS(res_grp) != 0);
297 #endif
298 }
299 
300 void ExternIO::Fork(void)
301 {
302 #if !defined( USING_MINGW ) && !defined( _MSC_VER )
303  if (Error())
304  {
305  LOG(VB_RECORD, LOG_INFO, QString("ExternIO in bad state: '%1'")
306  .arg(m_error));
307  return;
308  }
309 
310  QString full_command = QString("%1").arg(m_args.join(" "));
311 
312  if (!KillIfRunning(full_command))
313  {
314  // Give it one more chance.
315  std::this_thread::sleep_for(50ms);
316  if (!KillIfRunning(full_command))
317  {
318  m_error = QString("Unable to kill existing '%1'.")
319  .arg(full_command);
320  LOG(VB_GENERAL, LOG_ERR, m_error);
321  return;
322  }
323  }
324 
325 
326  LOG(VB_RECORD, LOG_INFO, QString("ExternIO::Fork '%1'").arg(full_command));
327 
328  std::array<int,2> in = {-1, -1};
329  std::array<int,2> out = {-1, -1};
330  std::array<int,2> err = {-1, -1};
331 
332  if (pipe(in.data()) < 0)
333  {
334  m_error = "pipe(in) failed: " + ENO;
335  return;
336  }
337  if (pipe(out.data()) < 0)
338  {
339  m_error = "pipe(out) failed: " + ENO;
340  close(in[0]);
341  close(in[1]);
342  return;
343  }
344  if (pipe(err.data()) < 0)
345  {
346  m_error = "pipe(err) failed: " + ENO;
347  close(in[0]);
348  close(in[1]);
349  close(out[0]);
350  close(out[1]);
351  return;
352  }
353 
354  m_pid = fork();
355  if (m_pid < 0)
356  {
357  // Failed
358  m_error = "fork() failed: " + ENO;
359  return;
360  }
361  if (m_pid > 0)
362  {
363  // Parent
364  close(in[0]);
365  close(out[1]);
366  close(err[1]);
367  m_appIn = in[1];
368  m_appOut = out[0];
369  m_appErr = err[0];
370 
371  bool error = false;
372  error = (fcntl(m_appIn, F_SETFL, O_NONBLOCK) == -1);
373  error |= (fcntl(m_appOut, F_SETFL, O_NONBLOCK) == -1);
374  error |= (fcntl(m_appErr, F_SETFL, O_NONBLOCK) == -1);
375 
376  if (error)
377  {
378  LOG(VB_GENERAL, LOG_WARNING,
379  "ExternIO::Fork(): Failed to set O_NONBLOCK for FD: " + ENO);
380  std::this_thread::sleep_for(2s);
382  }
383 
384  LOG(VB_RECORD, LOG_INFO, "Spawned");
385  return;
386  }
387 
388  // Child
389  close(in[1]);
390  close(out[0]);
391  close(err[0]);
392  if (dup2( in[0], 0) < 0)
393  {
394  std::cerr << "dup2(stdin) failed: " << strerror(errno);
396  }
397  else if (dup2(out[1], 1) < 0)
398  {
399  std::cerr << "dup2(stdout) failed: " << strerror(errno);
401  }
402  else if (dup2(err[1], 2) < 0)
403  {
404  std::cerr << "dup2(stderr) failed: " << strerror(errno);
406  }
407 
408  /* Close all open file descriptors except stdin/stdout/stderr */
409 #if HAVE_CLOSE_RANGE
410  close_range(3, sysconf(_SC_OPEN_MAX) - 1, 0);
411 #else
412  for (int i = sysconf(_SC_OPEN_MAX) - 1; i > 2; --i)
413  close(i);
414 #endif
415 
416  /* Set the process group id to be the same as the pid of this
417  * child process. This ensures that any subprocesses launched by this
418  * process can be killed along with the process itself. */
419  if (setpgid(0,0) < 0)
420  {
421  std::cerr << "ExternIO: "
422  << "setpgid() failed: "
423  << strerror(errno) << std::endl;
424  }
425 
426  /* run command */
427  char *command = strdup(m_app.canonicalFilePath()
428  .toUtf8().constData());
429  // Copy QStringList to char**
430  char **arguments = new char*[m_args.size() + 1];
431  for (int i = 0; i < m_args.size(); ++i)
432  {
433  int len = m_args[i].size() + 1;
434  arguments[i] = new char[len];
435  memcpy(arguments[i], m_args[i].toStdString().c_str(), len);
436  }
437  arguments[m_args.size()] = nullptr;
438 
439  if (execv(command, arguments) < 0)
440  {
441  // Can't use LOG due to locking fun.
442  std::cerr << "ExternIO: "
443  << "execv() failed: "
444  << strerror(errno) << std::endl;
445  }
446  else
447  {
448  std::cerr << "ExternIO: "
449  << "execv() should not be here?: "
450  << strerror(errno) << std::endl;
451  }
452 
453 #endif // !defined( USING_MINGW ) && !defined( _MSC_VER )
454 
455  /* Failed to exec */
456  _exit(GENERIC_EXIT_DAEMONIZING_ERROR); // this exit is ok
457 }
458 
459 
460 QMap<int, ExternalStreamHandler*> ExternalStreamHandler::s_handlers;
463 
465  int inputid, int majorid)
466 {
467  QMutexLocker locker(&s_handlersLock);
468 
469  QMap<int, ExternalStreamHandler*>::iterator it = s_handlers.find(majorid);
470 
471  if (it == s_handlers.end())
472  {
473  auto *newhandler = new ExternalStreamHandler(devname, inputid, majorid);
474  s_handlers[majorid] = newhandler;
475  s_handlersRefCnt[majorid] = 1;
476 
477  LOG(VB_RECORD, LOG_INFO,
478  QString("ExternSH[%1:%2]: Creating new stream handler for %3 "
479  "(1 in use)")
480  .arg(inputid).arg(majorid).arg(devname));
481  }
482  else
483  {
484  ++s_handlersRefCnt[majorid];
485  uint rcount = s_handlersRefCnt[majorid];
486  LOG(VB_RECORD, LOG_INFO,
487  QString("ExternSH[%1:%2]: Using existing stream handler for %3")
488  .arg(inputid).arg(majorid).arg(devname) +
489  QString(" (%1 in use)").arg(rcount));
490  }
491 
492  return s_handlers[majorid];
493 }
494 
496  int inputid)
497 {
498  QMutexLocker locker(&s_handlersLock);
499 
500  int majorid = ref->m_majorId;
501 
502  QMap<int, uint>::iterator rit = s_handlersRefCnt.find(majorid);
503  if (rit == s_handlersRefCnt.end())
504  return;
505 
506  QMap<int, ExternalStreamHandler*>::iterator it =
507  s_handlers.find(majorid);
508 
509  if (*rit > 1)
510  {
511  ref = nullptr;
512  --(*rit);
513 
514  LOG(VB_RECORD, LOG_INFO,
515  QString("ExternSH[%1:%2]: Return handler (%3 still in use)")
516  .arg(inputid).arg(majorid).arg(*rit));
517 
518  return;
519  }
520 
521  if ((it != s_handlers.end()) && (*it == ref))
522  {
523  LOG(VB_RECORD, LOG_INFO,
524  QString("ExternSH[%1:%2]: Closing handler (0 in use)")
525  .arg(inputid).arg(majorid));
526  delete *it;
527  s_handlers.erase(it);
528  }
529  else
530  {
531  LOG(VB_GENERAL, LOG_ERR,
532  QString("ExternSH[%1:%2]: Error: No handler to return!")
533  .arg(inputid).arg(majorid));
534  }
535 
536  s_handlersRefCnt.erase(rit);
537  ref = nullptr;
538 }
539 
540 /*
541  ExternalStreamHandler
542  */
543 
545  int inputid,
546  int majorid)
547  : StreamHandler(path, inputid)
548  , m_loc(m_device)
549  , m_majorId(majorid)
550 {
551  setObjectName("ExternSH");
552 
553  m_args = path.split(' ',Qt::SkipEmptyParts) +
554  logPropagateArgs.split(' ', Qt::SkipEmptyParts);
555  //NOLINTNEXTLINE(cppcoreguidelines-prefer-member-initializer)
556  m_app = m_args.first();
557  m_args.removeFirst();
558 
559  // Pass one (and only one) 'quiet'
560  if (!m_args.contains("--quiet") && !m_args.contains("-q"))
561  m_args << "--quiet";
562 
563  m_args << "--inputid" << QString::number(majorid);
564  LOG(VB_RECORD, LOG_INFO, LOC + QString("args \"%1\"")
565  .arg(m_args.join(" ")));
566 
567  if (!OpenApp())
568  {
569  LOG(VB_GENERAL, LOG_ERR, LOC +
570  QString("Failed to start %1").arg(m_device));
571  }
572 }
573 
575 {
576  return m_streamingCnt.loadAcquire();
577 }
578 
580 {
581  QString result;
582  QString ready_cmd;
583  QByteArray buffer;
584  int sz = 0;
585  uint len = 0;
586  uint read_len = 0;
587  uint restart_cnt = 0;
588  MythTimer status_timer;
589  MythTimer nodata_timer;
590 
591  bool good_data = false;
592  uint data_proc_err = 0;
593  uint data_short_err = 0;
594 
595  if (!m_io)
596  {
597  LOG(VB_GENERAL, LOG_ERR, LOC +
598  QString("%1 is not running.").arg(m_device));
599  }
600 
601  status_timer.start();
602 
603  RunProlog();
604 
605  LOG(VB_RECORD, LOG_INFO, LOC + "run(): begin");
606 
607  SetRunning(true, true, false);
608 
609  if (m_pollMode)
610  ready_cmd = "SendBytes";
611  else
612  ready_cmd = "XON";
613 
614  uint remainder = 0;
615  while (m_runningDesired && !m_bError)
616  {
617  if (!IsTSOpen())
618  {
619  LOG(VB_RECORD, LOG_WARNING, LOC + "TS not open yet.");
620  std::this_thread::sleep_for(10ms);
621  continue;
622  }
623 
624  if (StreamingCount() == 0)
625  {
626  std::this_thread::sleep_for(10ms);
627  continue;
628  }
629 
631 
632  if (!m_xon || m_pollMode)
633  {
634  if (buffer.size() > TOO_FAST_SIZE)
635  {
636  LOG(VB_RECORD, LOG_WARNING, LOC +
637  "Internal buffer too full to accept more data from "
638  "external application.");
639  }
640  else
641  {
642  if (!ProcessCommand(ready_cmd, result))
643  {
644  if (result.startsWith("ERR"))
645  {
646  LOG(VB_GENERAL, LOG_ERR, LOC +
647  QString("Aborting: %1 -> %2")
648  .arg(ready_cmd, result));
649  m_bError = true;
650  continue;
651  }
652 
653  if (restart_cnt++)
654  std::this_thread::sleep_for(20s);
655  if (!RestartStream())
656  {
657  LOG(VB_RECORD, LOG_ERR, LOC +
658  "Failed to restart stream.");
659  m_bError = true;
660  }
661  continue;
662  }
663  m_xon = true;
664  }
665  }
666 
667  if (m_xon)
668  {
669  if (status_timer.elapsed() >= 2s)
670  {
671  // Since we may never need to send the XOFF
672  // command, occationally check to see if the
673  // External recorder needs to report an issue.
674  if (CheckForError())
675  {
676  if (restart_cnt++)
677  std::this_thread::sleep_for(20s);
678  if (!RestartStream())
679  {
680  LOG(VB_RECORD, LOG_ERR, LOC +
681  "Failed to restart stream.");
682  m_bError = true;
683  }
684  continue;
685  }
686 
687  status_timer.restart();
688  }
689 
690  if (buffer.size() > TOO_FAST_SIZE)
691  {
692  if (!m_pollMode)
693  {
694  // Data is comming a little too fast, so XOFF
695  // to give us time to process it.
696  if (!ProcessCommand(QString("XOFF"), result))
697  {
698  if (result.startsWith("ERR"))
699  {
700  LOG(VB_GENERAL, LOG_ERR, LOC +
701  QString("Aborting: XOFF -> %2")
702  .arg(result));
703  m_bError = true;
704  }
705  }
706  m_xon = false;
707  }
708  }
709 
710  read_len = 0;
711  if (m_io != nullptr)
712  {
713  sz = PACKET_SIZE - remainder;
714  if (sz > 0)
715  read_len = m_io->Read(buffer, sz, 100ms);
716  }
717  }
718  else
719  {
720  read_len = 0;
721  }
722 
723  if (read_len == 0)
724  {
725  if (!nodata_timer.isRunning())
726  nodata_timer.start();
727  else
728  {
729  if (nodata_timer.elapsed() >= 50s)
730  {
731  LOG(VB_GENERAL, LOG_WARNING, LOC +
732  "No data for 50 seconds, Restarting stream.");
733  if (!RestartStream())
734  {
735  LOG(VB_RECORD, LOG_ERR, LOC +
736  "Failed to restart stream.");
737  m_bError = true;
738  }
739  nodata_timer.stop();
740  continue;
741  }
742  }
743 
744  std::this_thread::sleep_for(50ms);
745 
746  // HLS type streams may only produce data every ~10 seconds
747  if (nodata_timer.elapsed() < 12s && buffer.size() < TS_PACKET_SIZE)
748  continue;
749  }
750  else
751  {
752  nodata_timer.stop();
753  restart_cnt = 0;
754  }
755 
756  if (m_io == nullptr)
757  {
758  LOG(VB_GENERAL, LOG_ERR, LOC + "I/O thread has disappeared!");
759  m_bError = true;
760  break;
761  }
762  if (m_io->Error())
763  {
764  LOG(VB_GENERAL, LOG_ERR, LOC +
765  QString("Fatal Error from External Recorder: %1")
766  .arg(m_io->ErrorString()));
767  CloseApp();
768  m_bError = true;
769  break;
770  }
771 
772  len = remainder = buffer.size();
773 
774  if (len == 0)
775  continue;
776 
777  if (len < TS_PACKET_SIZE)
778  {
779  if (m_xon && data_short_err++ == 0)
780  LOG(VB_RECORD, LOG_INFO, LOC + "Waiting for a full TS packet.");
781  std::this_thread::sleep_for(50us);
782  continue;
783  }
784  if (data_short_err)
785  {
786  if (data_short_err > 1)
787  {
788  LOG(VB_RECORD, LOG_INFO, LOC +
789  QString("Waited for a full TS packet %1 times.")
790  .arg(data_short_err));
791  }
792  data_short_err = 0;
793  }
794 
795  if (!m_streamLock.tryLock())
796  continue;
797 
798  if (!m_listenerLock.tryLock())
799  continue;
800 
801  for (auto sit = m_streamDataList.cbegin();
802  sit != m_streamDataList.cend(); ++sit)
803  {
804  remainder = sit.key()->ProcessData
805  (reinterpret_cast<const uint8_t *>
806  (buffer.constData()), buffer.size());
807  }
808 
809  m_listenerLock.unlock();
810 
811  if (m_replay)
812  {
813  m_replayBuffer += buffer.left(len - remainder);
814  if (m_replayBuffer.size() > (50 * PACKET_SIZE))
815  {
816  m_replayBuffer.remove(0, len - remainder);
817  LOG(VB_RECORD, LOG_WARNING, LOC +
818  QString("Replay size truncated to %1 bytes")
819  .arg(m_replayBuffer.size()));
820  }
821  }
822 
823  m_streamLock.unlock();
824 
825  if (remainder == 0)
826  {
827  buffer.clear();
828  good_data = (len != 0U);
829  }
830  else if (len > remainder) // leftover bytes
831  {
832  buffer.remove(0, len - remainder);
833  good_data = (len != 0U);
834  }
835  else if (len == remainder)
836  {
837  good_data = false;
838  }
839 
840  if (good_data)
841  {
842  if (data_proc_err)
843  {
844  if (data_proc_err > 1)
845  {
846  LOG(VB_RECORD, LOG_WARNING, LOC +
847  QString("Failed to process the data received %1 times.")
848  .arg(data_proc_err));
849  }
850  data_proc_err = 0;
851  }
852  }
853  else
854  {
855  if (data_proc_err++ == 0)
856  {
857  LOG(VB_RECORD, LOG_WARNING, LOC +
858  "Failed to process the data received");
859  }
860  }
861  }
862 
863  LOG(VB_RECORD, LOG_INFO, LOC + "run(): " +
864  QString("%1 shutdown").arg(m_bError ? "Error" : "Normal"));
865 
867  SetRunning(false, true, false);
868 
869  LOG(VB_RECORD, LOG_INFO, LOC + "run(): " + "end");
870 
871  RunEpilog();
872 }
873 
875 {
876  QString result;
877 
878  if (ProcessCommand("APIVersion?", result, 10s))
879  {
880  QStringList tokens = result.split(':', Qt::SkipEmptyParts);
881  if (tokens.size() > 1)
882  m_apiVersion = tokens[1].toUInt();
883  m_apiVersion = std::min(m_apiVersion, static_cast<int>(MAX_API_VERSION));
884  if (m_apiVersion < 1)
885  {
886  LOG(VB_RECORD, LOG_ERR, LOC +
887  QString("Bad response to 'APIVersion?' - '%1'. "
888  "Expecting 1, 2 or 3").arg(result));
889  m_apiVersion = 1;
890  }
891 
892  ProcessCommand(QString("APIVersion:%1").arg(m_apiVersion), result);
893  return true;
894  }
895 
896  return false;
897 }
898 
900 {
901  if (m_apiVersion > 1)
902  {
903  QString result;
904 
905  if (ProcessCommand("Description?", result))
906  m_loc = result.mid(3);
907  else
908  m_loc = m_device;
909  }
910 
911  return m_loc;
912 }
913 
915 {
916  {
917  QMutexLocker locker(&m_ioLock);
918 
919  if (m_io)
920  {
921  LOG(VB_RECORD, LOG_WARNING, LOC + "OpenApp: already open!");
922  return true;
923  }
924 
925  m_io = new ExternIO(m_app, m_args);
926 
927  if (m_io == nullptr)
928  {
929  LOG(VB_GENERAL, LOG_ERR, LOC + "ExternIO failed: " + ENO);
930  m_bError = true;
931  }
932  else
933  {
934  LOG(VB_RECORD, LOG_INFO, LOC + QString("Spawn '%1'").arg(m_device));
935  m_io->Run();
936  if (m_io->Error())
937  {
938  LOG(VB_GENERAL, LOG_ERR,
939  "Failed to start External Recorder: " + m_io->ErrorString());
940  delete m_io;
941  m_io = nullptr;
942  m_bError = true;
943  return false;
944  }
945  }
946  }
947 
948  QString result;
949 
950  if (!SetAPIVersion())
951  {
952  // Try again using API version 2
953  m_apiVersion = 2;
954  if (!SetAPIVersion())
955  m_apiVersion = 1;
956  }
957 
958  if (!IsAppOpen())
959  {
960  LOG(VB_RECORD, LOG_ERR, LOC + "Application is not responding.");
961  m_bError = true;
962  return false;
963  }
964 
966 
967  // Gather capabilities
968  if (!ProcessCommand("HasTuner?", result))
969  {
970  LOG(VB_RECORD, LOG_ERR, LOC +
971  QString("Bad response to 'HasTuner?' - '%1'").arg(result));
972  m_bError = true;
973  return false;
974  }
975  m_hasTuner = result.startsWith("OK:Yes");
976 
977  if (!ProcessCommand("HasPictureAttributes?", result))
978  {
979  LOG(VB_RECORD, LOG_ERR, LOC +
980  QString("Bad response to 'HasPictureAttributes?' - '%1'")
981  .arg(result));
982  m_bError = true;
983  return false;
984  }
985  m_hasPictureAttributes = result.startsWith("OK:Yes");
986 
987  /* Operate in "poll" or "xon/xoff" mode */
988  m_pollMode = ProcessCommand("FlowControl?", result) &&
989  result.startsWith("OK:Poll");
990 
991  LOG(VB_RECORD, LOG_INFO, LOC + "App opened successfully");
992  LOG(VB_RECORD, LOG_INFO, LOC +
993  QString("Capabilities: tuner(%1) "
994  "Picture attributes(%2) "
995  "Flow control(%3)")
996  .arg(m_hasTuner ? "yes" : "no",
997  m_hasPictureAttributes ? "yes" : "no",
998  m_pollMode ? "Polling" : "XON/XOFF")
999  );
1000 
1001  /* Let the external app know how many bytes will read without blocking */
1002  ProcessCommand(QString("BlockSize:%1").arg(PACKET_SIZE), result);
1003 
1004  return true;
1005 }
1006 
1008 {
1009  if (m_io == nullptr)
1010  {
1011  LOG(VB_RECORD, LOG_WARNING, LOC +
1012  "WARNING: Unable to communicate with external app.");
1013  return false;
1014  }
1015 
1016  QString result;
1017  return ProcessCommand("Version?", result, 10s);
1018 }
1019 
1021 {
1022  if (m_tsOpen)
1023  return true;
1024 
1025  QString result;
1026 
1027  if (!ProcessCommand("IsOpen?", result))
1028  return false;
1029 
1030  m_tsOpen = true;
1031  return m_tsOpen;
1032 }
1033 
1035 {
1036  m_ioLock.lock();
1037  if (m_io)
1038  {
1039  QString result;
1040 
1041  LOG(VB_RECORD, LOG_INFO, LOC + "CloseRecorder");
1042  m_ioLock.unlock();
1043  ProcessCommand("CloseRecorder", result, 10s);
1044  m_ioLock.lock();
1045 
1046  if (!result.startsWith("OK"))
1047  {
1048  LOG(VB_RECORD, LOG_INFO, LOC +
1049  "CloseRecorder failed, sending kill.");
1050 
1051  QString full_command = QString("%1").arg(m_args.join(" "));
1052 
1053  if (!ExternIO::KillIfRunning(full_command))
1054  {
1055  // Give it one more chance.
1056  std::this_thread::sleep_for(50ms);
1057  if (!ExternIO::KillIfRunning(full_command))
1058  {
1059  LOG(VB_GENERAL, LOG_ERR,
1060  QString("Unable to kill existing '%1'.")
1061  .arg(full_command));
1062  return;
1063  }
1064  }
1065  }
1066  delete m_io;
1067  m_io = nullptr;
1068  }
1069  m_ioLock.unlock();
1070 }
1071 
1073 {
1074  bool streaming = (StreamingCount() > 0);
1075 
1076  LOG(VB_RECORD, LOG_INFO, LOC + "Restarting stream.");
1077  m_damaged = true;
1078 
1079  if (streaming)
1080  StopStreaming();
1081 
1082  std::this_thread::sleep_for(1s);
1083 
1084  if (streaming)
1085  return StartStreaming();
1086 
1087  return true;
1088 }
1089 
1091 {
1092  if (m_replay)
1093  {
1094  QString result;
1095 
1096  // Let the external app know that we could be busy for a little while
1097  if (!m_pollMode)
1098  {
1099  ProcessCommand(QString("XOFF"), result);
1100  m_xon = false;
1101  }
1102 
1103  /* If the input is not a 'broadcast' it may only have one
1104  * copy of the SPS right at the beginning of the stream,
1105  * so make sure we don't miss it!
1106  */
1107  QMutexLocker listen_lock(&m_listenerLock);
1108 
1109  if (!m_streamDataList.empty())
1110  {
1111  for (auto sit = m_streamDataList.cbegin();
1112  sit != m_streamDataList.cend(); ++sit)
1113  {
1114  sit.key()->ProcessData(reinterpret_cast<const uint8_t *>
1115  (m_replayBuffer.constData()),
1116  m_replayBuffer.size());
1117  }
1118  }
1119  LOG(VB_RECORD, LOG_INFO, LOC + QString("Replayed %1 bytes")
1120  .arg(m_replayBuffer.size()));
1121  m_replayBuffer.clear();
1122  m_replay = false;
1123 
1124  // Let the external app know that we are ready
1125  if (!m_pollMode)
1126  {
1127  if (ProcessCommand(QString("XON"), result))
1128  m_xon = true;
1129  }
1130  }
1131 }
1132 
1134 {
1135  QString result;
1136 
1137  QMutexLocker locker(&m_streamLock);
1138 
1140 
1141  LOG(VB_RECORD, LOG_INFO, LOC +
1142  QString("StartStreaming with %1 current listeners")
1143  .arg(StreamingCount()));
1144 
1145  if (!IsAppOpen())
1146  {
1147  LOG(VB_GENERAL, LOG_ERR, LOC + "External Recorder not started.");
1148  return false;
1149  }
1150 
1151  if (StreamingCount() == 0)
1152  {
1153  if (!ProcessCommand("StartStreaming", result, 15s))
1154  {
1155  LogLevel_t level = LOG_ERR;
1156  if (result.startsWith("warn", Qt::CaseInsensitive))
1157  level = LOG_WARNING;
1158  else
1159  m_bError = true;
1160 
1161  LOG(VB_GENERAL, level, LOC + QString("StartStreaming failed: '%1'")
1162  .arg(result));
1163 
1164  return false;
1165  }
1166 
1167  LOG(VB_RECORD, LOG_INFO, LOC + "Streaming started");
1168  }
1169  else
1170  {
1171  LOG(VB_RECORD, LOG_INFO, LOC + "Already streaming");
1172  }
1173 
1174  m_streamingCnt.ref();
1175 
1176  LOG(VB_RECORD, LOG_INFO, LOC +
1177  QString("StartStreaming %1 listeners")
1178  .arg(StreamingCount()));
1179 
1180  return true;
1181 }
1182 
1184 {
1185  QMutexLocker locker(&m_streamLock);
1186 
1187  LOG(VB_RECORD, LOG_INFO, LOC +
1188  QString("StopStreaming %1 listeners")
1189  .arg(StreamingCount()));
1190 
1191  if (StreamingCount() == 0)
1192  {
1193  LOG(VB_RECORD, LOG_INFO, LOC +
1194  "StopStreaming requested, but we are not streaming!");
1195  return true;
1196  }
1197 
1198  if (m_streamingCnt.deref())
1199  {
1200  LOG(VB_RECORD, LOG_INFO, LOC +
1201  QString("StopStreaming delayed, still have %1 listeners")
1202  .arg(StreamingCount()));
1203  return true;
1204  }
1205 
1206  LOG(VB_RECORD, LOG_INFO, LOC + "StopStreaming");
1207 
1208  if (!m_pollMode && m_xon)
1209  {
1210  QString result;
1211  ProcessCommand(QString("XOFF"), result);
1212  m_xon = false;
1213  }
1214 
1215  if (!IsAppOpen())
1216  {
1217  LOG(VB_GENERAL, LOG_ERR, LOC + "External Recorder not started.");
1218  return false;
1219  }
1220 
1221  QString result;
1222  if (!ProcessCommand("StopStreaming", result, 10s))
1223  {
1224  LogLevel_t level = LOG_ERR;
1225  if (result.startsWith("warn", Qt::CaseInsensitive))
1226  level = LOG_WARNING;
1227  else
1228  m_bError = true;
1229 
1230  LOG(VB_GENERAL, level, LOC + QString("StopStreaming: '%1'")
1231  .arg(result));
1232 
1233  return false;
1234  }
1235 
1236  PurgeBuffer();
1237  LOG(VB_RECORD, LOG_INFO, LOC + "Streaming stopped");
1238 
1239  return true;
1240 }
1241 
1242 bool ExternalStreamHandler::ProcessCommand(const QString & cmd,
1243  QString & result,
1244  std::chrono::milliseconds timeout,
1245  uint retry_cnt)
1246 {
1247  QMutexLocker locker(&m_processLock);
1248 
1249  if (m_apiVersion == 3)
1250  {
1251  QVariantMap vcmd;
1252  QVariantMap vresult;
1253  QByteArray response;
1254  QStringList tokens = cmd.split(':');
1255  vcmd["command"] = tokens[0];
1256  if (tokens.size() > 1)
1257  vcmd["value"] = tokens[1];
1258 
1259  LOG(VB_RECORD, LOG_DEBUG, LOC +
1260  QString("Arguments: %1").arg(tokens.join("\n")));
1261 
1262  bool r = ProcessJson(vcmd, vresult, response, timeout, retry_cnt);
1263  result = QString("%1:%2").arg(vresult["status"].toString(),
1264  vresult["message"].toString());
1265  return r;
1266  }
1267  if (m_apiVersion == 2)
1268  return ProcessVer2(cmd, result, timeout, retry_cnt);
1269  if (m_apiVersion == 1)
1270  return ProcessVer1(cmd, result, timeout, retry_cnt);
1271 
1272  LOG(VB_RECORD, LOG_ERR, LOC +
1273  QString("Invalid API version %1. Expected 1 or 2").arg(m_apiVersion));
1274  return false;
1275 }
1276 
1277 bool ExternalStreamHandler::ProcessVer1(const QString & cmd,
1278  QString & result,
1279  std::chrono::milliseconds timeout,
1280  uint retry_cnt)
1281 {
1282  LOG(VB_RECORD, LOG_DEBUG, LOC + QString("ProcessVer1('%1')")
1283  .arg(cmd));
1284 
1285  for (uint cnt = 0; cnt < retry_cnt; ++cnt)
1286  {
1287  QMutexLocker locker(&m_ioLock);
1288 
1289  if (!m_io)
1290  {
1291  LOG(VB_RECORD, LOG_ERR, LOC + "External I/O not ready!");
1292  return false;
1293  }
1294 
1295  QByteArray buf(cmd.toUtf8(), cmd.size());
1296  buf += '\n';
1297 
1298  if (m_io->Error())
1299  {
1300  LOG(VB_GENERAL, LOG_ERR, LOC + "External Recorder in bad state: " +
1301  m_io->ErrorString());
1302  return false;
1303  }
1304 
1305  /* Try to keep in sync, if External app was too slow in responding
1306  * to previous query, consume the response before sending new query */
1307  m_io->GetStatus(0ms);
1308 
1309  /* Send new query */
1310  m_io->Write(buf);
1311 
1313  while (timer.elapsed() < timeout)
1314  {
1315  result = m_io->GetStatus(timeout);
1316  if (m_io->Error())
1317  {
1318  LOG(VB_GENERAL, LOG_ERR, LOC +
1319  "Failed to read from External Recorder: " +
1320  m_io->ErrorString());
1321  m_bError = true;
1322  return false;
1323  }
1324 
1325  // Out-of-band error message
1326  if (result.startsWith("STATUS:ERR") ||
1327  result.startsWith("0:STATUS:ERR"))
1328  {
1329  LOG(VB_RECORD, LOG_ERR, LOC + result);
1330  result.remove(0, result.indexOf(":ERR") + 1);
1331  return false;
1332  }
1333  // STATUS message are "out of band".
1334  // Ignore them while waiting for a responds to a command
1335  if (!result.startsWith("STATUS") && !result.startsWith("0:STATUS"))
1336  break;
1337  LOG(VB_RECORD, LOG_INFO, LOC +
1338  QString("Ignoring response '%1'").arg(result));
1339  }
1340 
1341  if (result.size() < 1)
1342  {
1343  LOG(VB_GENERAL, LOG_WARNING, LOC +
1344  QString("External Recorder did not respond to '%1'").arg(cmd));
1345  }
1346  else
1347  {
1348  bool okay = result.startsWith("OK");
1349  if (okay || result.startsWith("WARN") || result.startsWith("ERR"))
1350  {
1351  LogLevel_t level = LOG_INFO;
1352 
1353  m_ioErrCnt = 0;
1354  if (!okay)
1355  level = LOG_WARNING;
1356  else if (cmd.startsWith("SendBytes"))
1357  level = LOG_DEBUG;
1358 
1359  LOG(VB_RECORD, level,
1360  LOC + QString("ProcessCommand('%1') = '%2' took %3ms %4")
1361  .arg(cmd, result,
1362  QString::number(timer.elapsed().count()),
1363  okay ? "" : "<-- NOTE"));
1364 
1365  return okay;
1366  }
1367  LOG(VB_GENERAL, LOG_WARNING, LOC +
1368  QString("External Recorder invalid response to '%1': '%2'")
1369  .arg(cmd, result));
1370  }
1371 
1372  if (++m_ioErrCnt > 10)
1373  {
1374  LOG(VB_GENERAL, LOG_ERR, LOC + "Too many I/O errors.");
1375  m_bError = true;
1376  break;
1377  }
1378  }
1379 
1380  return false;
1381 }
1382 
1383 bool ExternalStreamHandler::ProcessVer2(const QString & command,
1384  QString & result,
1385  std::chrono::milliseconds timeout,
1386  uint retry_cnt)
1387 {
1388  QString status;
1389  QString raw;
1390 
1391  for (uint cnt = 0; cnt < retry_cnt; ++cnt)
1392  {
1393  QString cmd = QString("%1:%2").arg(++m_serialNo).arg(command);
1394 
1395  LOG(VB_RECORD, LOG_DEBUG, LOC + QString("ProcessVer2('%1') serial(%2)")
1396  .arg(cmd).arg(m_serialNo));
1397 
1398  QMutexLocker locker(&m_ioLock);
1399 
1400  if (!m_io)
1401  {
1402  LOG(VB_RECORD, LOG_ERR, LOC + "External I/O not ready!");
1403  return false;
1404  }
1405 
1406  QByteArray buf(cmd.toUtf8(), cmd.size());
1407  buf += '\n';
1408 
1409  if (m_io->Error())
1410  {
1411  LOG(VB_GENERAL, LOG_ERR, LOC + "External Recorder in bad state: " +
1412  m_io->ErrorString());
1413  return false;
1414  }
1415 
1416  /* Send query */
1417  m_io->Write(buf);
1418 
1419  QStringList tokens;
1420 
1422  while (timer.elapsed() < timeout)
1423  {
1424  result = m_io->GetStatus(timeout);
1425  if (m_io->Error())
1426  {
1427  LOG(VB_GENERAL, LOG_ERR, LOC +
1428  "Failed to read from External Recorder: " +
1429  m_io->ErrorString());
1430  m_bError = true;
1431  return false;
1432  }
1433 
1434  if (!result.isEmpty())
1435  {
1436  raw = result;
1437  tokens = result.split(':', Qt::SkipEmptyParts);
1438 
1439  // Look for result with the serial number of this query
1440  if (tokens.size() > 1 && tokens[0].toUInt() >= m_serialNo)
1441  break;
1442 
1443  /* Other messages are "out of band" */
1444 
1445  // Check for error message missing serial#
1446  if (tokens[0].startsWith("ERR"))
1447  break;
1448 
1449  // Remove serial#
1450  tokens.removeFirst();
1451  result = tokens.join(':');
1452  bool err = (tokens.size() > 1 && tokens[1].startsWith("ERR"));
1453  LOG(VB_RECORD, (err ? LOG_WARNING : LOG_INFO), LOC + raw);
1454  if (err)
1455  {
1456  // Remove "STATUS"
1457  tokens.removeFirst();
1458  result = tokens.join(':');
1459  return false;
1460  }
1461  }
1462  }
1463 
1464  if (timer.elapsed() >= timeout)
1465  {
1466  LOG(VB_RECORD, LOG_ERR, LOC +
1467  QString("ProcessVer2: Giving up waiting for response for "
1468  "command '%2'").arg(cmd));
1469  }
1470  else if (tokens.size() < 2)
1471  {
1472  LOG(VB_RECORD, LOG_ERR, LOC +
1473  QString("Did not receive a valid response "
1474  "for command '%1', received '%2'").arg(cmd, result));
1475  }
1476  else if (tokens[0].toUInt() > m_serialNo)
1477  {
1478  LOG(VB_RECORD, LOG_ERR, LOC +
1479  QString("ProcessVer2: Looking for serial no %1, "
1480  "but received %2 for command '%2'")
1481  .arg(QString::number(m_serialNo), tokens[0], cmd));
1482  }
1483  else
1484  {
1485  tokens.removeFirst();
1486  status = tokens[0].trimmed();
1487  result = tokens.join(':');
1488 
1489  bool okay = (status == "OK");
1490  if (okay || status.startsWith("WARN") || status.startsWith("ERR"))
1491  {
1492  LogLevel_t level = LOG_INFO;
1493 
1494  m_ioErrCnt = 0;
1495  if (!okay)
1496  level = LOG_WARNING;
1497  else if (command.startsWith("SendBytes") ||
1498  (command.startsWith("TuneStatus") &&
1499  result == "OK:InProgress"))
1500  level = LOG_DEBUG;
1501 
1502  LOG(VB_RECORD, level,
1503  LOC + QString("ProcessV2('%1') = '%2' took %3ms %4")
1504  .arg(cmd, result, QString::number(timer.elapsed().count()),
1505  okay ? "" : "<-- NOTE"));
1506 
1507  return okay;
1508  }
1509  LOG(VB_GENERAL, LOG_WARNING, LOC +
1510  QString("External Recorder invalid response to '%1': '%2'")
1511  .arg(cmd, result));
1512  }
1513 
1514  if (++m_ioErrCnt > 10)
1515  {
1516  LOG(VB_GENERAL, LOG_ERR, LOC + "Too many I/O errors.");
1517  m_bError = true;
1518  break;
1519  }
1520  }
1521 
1522  return false;
1523 }
1524 
1525 bool ExternalStreamHandler::ProcessJson(const QVariantMap & vmsg,
1526  QVariantMap & elements,
1527  QByteArray & response,
1528  std::chrono::milliseconds timeout,
1529  uint retry_cnt)
1530 {
1531  for (uint cnt = 0; cnt < retry_cnt; ++cnt)
1532  {
1533  QVariantMap query(vmsg);
1534 
1535  uint serial = ++m_serialNo;
1536  query["serial"] = serial;
1537  QString cmd = query["command"].toString();
1538 
1539  QJsonDocument qdoc;
1540  qdoc = QJsonDocument::fromVariant(query);
1541  QByteArray cmdbuf = qdoc.toJson(QJsonDocument::Compact);
1542 
1543  LOG(VB_RECORD, LOG_DEBUG, LOC +
1544  QString("ProcessJson: %1").arg(QString(cmdbuf)));
1545 
1546  if (m_io->Error())
1547  {
1548  LOG(VB_GENERAL, LOG_ERR, LOC + "External Recorder in bad state: " +
1549  m_io->ErrorString());
1550  return false;
1551  }
1552 
1553  /* Send query */
1554  m_io->Write(cmdbuf);
1555  m_io->Write("\n");
1556 
1558  while (timer.elapsed() < timeout)
1559  {
1560  response = m_io->GetStatus(timeout);
1561  if (m_io->Error())
1562  {
1563  LOG(VB_GENERAL, LOG_ERR, LOC +
1564  "Failed to read from External Recorder: " +
1565  m_io->ErrorString());
1566  m_bError = true;
1567  return false;
1568  }
1569 
1570  if (!response.isEmpty())
1571  {
1572  QJsonParseError parseError {};
1573  QJsonDocument doc;
1574 
1575  doc = QJsonDocument::fromJson(response, &parseError);
1576 
1577  if (parseError.error != QJsonParseError::NoError)
1578  {
1579  LOG(VB_GENERAL, LOG_ERR, LOC +
1580  QString("ExternalRecorder returned invalid JSON message: %1: %2\n%3\nfor\n%4")
1581  .arg(parseError.offset)
1582  .arg(parseError.errorString(),
1583  QString(response),
1584  QString(cmdbuf)));
1585  }
1586  else
1587  {
1588  elements = doc.toVariant().toMap();
1589  if (elements.find("serial") == elements.end())
1590  continue;
1591 
1592  serial = elements["serial"].toInt();
1593  if (serial >= m_serialNo)
1594  break;
1595 
1596  if (elements.find("status") != elements.end() &&
1597  elements["status"] != "OK")
1598  {
1599  LOG(VB_RECORD, LOG_WARNING, LOC + QString("%1: %2")
1600  .arg(elements["status"].toString(),
1601  elements["message"].toString()));
1602  }
1603  }
1604  }
1605  }
1606 
1607  if (timer.elapsed() >= timeout)
1608  {
1609  LOG(VB_RECORD, LOG_ERR, LOC +
1610  QString("ProcessJson: Giving up waiting for response for "
1611  "command '%2'").arg(QString(cmdbuf)));
1612 
1613  }
1614 
1615  if (serial > m_serialNo)
1616  {
1617  LOG(VB_RECORD, LOG_ERR, LOC +
1618  QString("ProcessJson: Looking for serial no %1, "
1619  "but received %2 for command '%2'")
1620  .arg(QString::number(m_serialNo))
1621  .arg(serial)
1622  .arg(QString(cmdbuf)));
1623  }
1624  else if (elements.find("status") == elements.end())
1625  {
1626  LOG(VB_RECORD, LOG_ERR, LOC +
1627  QString("ProcessJson: ExternalRecorder 'status' not found in %1")
1628  .arg(QString(response)));
1629  }
1630  else
1631  {
1632  QString status = elements["status"].toString();
1633  bool okay = (status == "OK");
1634  if (okay || status == "WARN" || status == "ERR")
1635  {
1636  LogLevel_t level = LOG_INFO;
1637 
1638  m_ioErrCnt = 0;
1639  if (!okay)
1640  level = LOG_WARNING;
1641  else if (cmd == "SendBytes" ||
1642  (cmd == "TuneStatus?" &&
1643  elements["message"] == "InProgress"))
1644  level = LOG_DEBUG;
1645 
1646  LOG(VB_RECORD, level,
1647  LOC + QString("ProcessJson('%1') = %2:%3:%4 took %5ms %6")
1648  .arg(QString(cmdbuf))
1649  .arg(elements["serial"].toInt())
1650  .arg(elements["status"].toString(),
1651  elements["message"].toString(),
1652  QString::number(timer.elapsed().count()),
1653  okay ? "" : "<-- NOTE")
1654  );
1655 
1656  return okay;
1657  }
1658  LOG(VB_GENERAL, LOG_WARNING, LOC +
1659  QString("External Recorder invalid response to '%1': '%2'")
1660  .arg(QString(cmdbuf),
1661  QString(response)));
1662  }
1663 
1664  if (++m_ioErrCnt > 10)
1665  {
1666  LOG(VB_GENERAL, LOG_ERR, LOC + "Too many I/O errors.");
1667  m_bError = true;
1668  break;
1669  }
1670  }
1671 
1672  return false;
1673 }
1674 
1676 {
1677  QByteArray response;
1678  bool err = false;
1679 
1680  QMutexLocker locker(&m_ioLock);
1681 
1682  if (!m_io)
1683  {
1684  LOG(VB_RECORD, LOG_ERR, LOC + "External I/O not ready!");
1685  return true;
1686  }
1687 
1688  if (m_io->Error())
1689  {
1690  LOG(VB_GENERAL, LOG_ERR, "External Recorder in bad state: " +
1691  m_io->ErrorString());
1692  return true;
1693  }
1694 
1695  do
1696  {
1697  response = m_io->GetStatus(0ms);
1698  if (!response.isEmpty())
1699  {
1700  if (m_apiVersion > 2)
1701  {
1702  QJsonParseError parseError {};
1703  QJsonDocument doc;
1704  QVariantMap elements;
1705 
1706  doc = QJsonDocument::fromJson(response, &parseError);
1707 
1708  if (parseError.error != QJsonParseError::NoError)
1709  {
1710  LOG(VB_GENERAL, LOG_ERR, LOC +
1711  QString("ExternalRecorder returned invalid JSON message: %1: %2\n%3\n")
1712  .arg(parseError.offset)
1713  .arg(parseError.errorString(), QString(response)));
1714  }
1715  else
1716  {
1717  elements = doc.toVariant().toMap();
1718  if (elements.find("command") != elements.end() &&
1719  elements["command"] == "STATUS")
1720  {
1721  LogLevel_t level { LOG_INFO };
1722  QString status = elements["status"].toString();
1723  if (status.startsWith("err", Qt::CaseInsensitive))
1724  {
1725  level = LOG_ERR;
1726  err |= true;
1727  }
1728  else if (status.startsWith("warn",
1729  Qt::CaseInsensitive))
1730  {
1731  level = LOG_WARNING;
1732  }
1733  else if (status.startsWith("damage",
1734  Qt::CaseInsensitive))
1735  {
1736  level = LOG_WARNING;
1737  m_damaged |= true;
1738  }
1739  LOG(VB_RECORD, level,
1740  LOC + elements["message"].toString());
1741  }
1742  }
1743  }
1744  else
1745  {
1746  QString res = QString(response);
1747  if (m_apiVersion == 2)
1748  {
1749  QStringList tokens = res.split(':', Qt::SkipEmptyParts);
1750  tokens.removeFirst();
1751  res = tokens.join(':');
1752  for (int idx = 1; idx < tokens.size(); ++idx)
1753  {
1754  err |= tokens[idx].startsWith("ERR",
1755  Qt::CaseInsensitive);
1756  m_damaged |= tokens[idx].startsWith("damage",
1757  Qt::CaseInsensitive);
1758  }
1759  }
1760  else
1761  {
1762  err |= res.startsWith("STATUS:ERR",
1763  Qt::CaseInsensitive);
1764  m_damaged |= res.startsWith("STATUS:DAMAGE",
1765  Qt::CaseInsensitive);
1766  }
1767 
1768  LOG(VB_RECORD, (err ? LOG_WARNING : LOG_INFO), LOC + res);
1769  }
1770  }
1771  }
1772  while (!response.isEmpty());
1773 
1774  return err;
1775 }
1776 
1778 {
1779  if (m_io)
1780  {
1781  QByteArray buffer;
1782  m_io->Read(buffer, PACKET_SIZE, 1ms);
1783  m_io->GetStatus(1ms);
1784  }
1785 }
1786 
1788 {
1789  // TODO report on buffer overruns, etc.
1790 }
WEXITSTATUS
#define WEXITSTATUS(w)
Definition: compat.h:197
ExternalStreamHandler::ProcessVer1
bool ProcessVer1(const QString &cmd, QString &result, std::chrono::milliseconds timeout, uint retry_cnt)
Definition: ExternalStreamHandler.cpp:1277
ExternalStreamHandler::ExternalStreamHandler
ExternalStreamHandler(const QString &path, int inputid, int majorid)
Definition: ExternalStreamHandler.cpp:544
build_compdb.args
args
Definition: build_compdb.py:11
ExternalStreamHandler::m_pollMode
bool m_pollMode
Definition: ExternalStreamHandler.h:139
MythTimer::elapsed
std::chrono::milliseconds elapsed(void)
Returns milliseconds elapsed since last start() or restart()
Definition: mythtimer.cpp:91
ExternalStreamHandler::IsTSOpen
bool IsTSOpen(void)
Definition: ExternalStreamHandler.cpp:1020
O_NONBLOCK
#define O_NONBLOCK
Definition: compat.h:341
ExternalStreamHandler::m_ioLock
QMutex m_ioLock
Definition: ExternalStreamHandler.h:133
MythDate::toString
QString toString(const QDateTime &raw_dt, uint format)
Returns formatted string representing the time.
Definition: mythdate.cpp:93
ENO
#define ENO
This can be appended to the LOG args with "+".
Definition: mythlogging.h:74
hardwareprofile.smolt.timeout
float timeout
Definition: smolt.py:102
ExternalStreamHandler::m_apiVersion
int m_apiVersion
Definition: ExternalStreamHandler.h:141
ExternalStreamHandler::m_serialNo
uint m_serialNo
Definition: ExternalStreamHandler.h:142
error
static void error(const char *str,...)
Definition: vbi.cpp:37
StreamHandler::RemoveAllPIDFilters
bool RemoveAllPIDFilters(void)
Definition: streamhandler.cpp:240
StreamHandler::SetRunning
void SetRunning(bool running, bool using_buffering, bool using_section_reader)
Definition: streamhandler.cpp:173
ExternalStreamHandler::PriorityEvent
void PriorityEvent(int fd) override
Definition: ExternalStreamHandler.cpp:1787
MythTimer
A QElapsedTimer based timer to replace use of QTime as a timer.
Definition: mythtimer.h:13
StreamHandler
Definition: streamhandler.h:56
ExternalStreamHandler::m_hasPictureAttributes
bool m_hasPictureAttributes
Definition: ExternalStreamHandler.h:144
ExternalStreamHandler::StreamingCount
int StreamingCount(void) const
Definition: ExternalStreamHandler.cpp:574
discid.disc.read
def read(device=None, features=[])
Definition: disc.py:35
ExternIO::~ExternIO
~ExternIO(void)
Definition: ExternalStreamHandler.cpp:68
ExternalStreamHandler::run
void run(void) override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.
Definition: ExternalStreamHandler.cpp:579
mythburn.write
def write(text, progress=True)
Definition: mythburn.py:308
ExternalStreamHandler::m_io
ExternIO * m_io
Definition: ExternalStreamHandler.h:134
MThread::setObjectName
void setObjectName(const QString &name)
Definition: mthread.cpp:238
MythTimer::stop
void stop(void)
Stops timer, next call to isRunning() will return false and any calls to elapsed() or restart() will ...
Definition: mythtimer.cpp:78
MythTimer::isRunning
bool isRunning(void) const
Returns true if start() or restart() has been called at least once since construction and since any c...
Definition: mythtimer.cpp:135
ExternalStreamHandler::StopStreaming
bool StopStreaming(void)
Definition: ExternalStreamHandler.cpp:1183
ExternIO::m_error
QString m_error
Definition: ExternalStreamHandler.h:57
ExternalStreamHandler::m_xon
bool m_xon
Definition: ExternalStreamHandler.h:148
MythTimer::start
void start(void)
starts measuring elapsed time.
Definition: mythtimer.cpp:47
LOG
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
Definition: mythlogging.h:39
MThread::RunProlog
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:196
ExternalStreamHandler::m_processLock
QMutex m_processLock
Definition: ExternalStreamHandler.h:159
streamlisteners.h
ExternIO::ExternIO
ExternIO(const QString &app, const QStringList &args)
Definition: ExternalStreamHandler.cpp:39
ExternalStreamHandler::s_handlers
static QMap< int, ExternalStreamHandler * > s_handlers
Definition: ExternalStreamHandler.h:153
ExternIO::m_bufSize
int m_bufSize
Definition: ExternalStreamHandler.h:59
ExternIO::Error
bool Error(void) const
Definition: ExternalStreamHandler.h:42
ExternIO::m_buffer
char * m_buffer
Definition: ExternalStreamHandler.h:60
ExternalStreamHandler.h
ExternIO
Definition: ExternalStreamHandler.h:29
ExternalStreamHandler::s_handlersRefCnt
static QMap< int, uint > s_handlersRefCnt
Definition: ExternalStreamHandler.h:154
ExternalStreamHandler::TOO_FAST_SIZE
@ TOO_FAST_SIZE
Definition: ExternalStreamHandler.h:74
close
#define close
Definition: compat.h:43
ExternalStreamHandler
Definition: ExternalStreamHandler.h:69
ExternalStreamHandler::m_hasTuner
bool m_hasTuner
Definition: ExternalStreamHandler.h:143
ExternIO::KillIfRunning
static bool KillIfRunning(const QString &cmd)
Definition: ExternalStreamHandler.cpp:248
StreamHandler::m_listenerLock
QRecursiveMutex m_listenerLock
Definition: streamhandler.h:144
ExternIO::kMaxErrorCnt
static constexpr uint8_t kMaxErrorCnt
Definition: ExternalStreamHandler.h:31
ExternalStreamHandler::ReplayStream
void ReplayStream(void)
Definition: ExternalStreamHandler.cpp:1090
ExternalStreamHandler::m_tsOpen
bool m_tsOpen
Definition: ExternalStreamHandler.h:137
ExternIO::Fork
void Fork(void)
Definition: ExternalStreamHandler.cpp:300
ExternalStreamHandler::m_replayBuffer
QByteArray m_replayBuffer
Definition: ExternalStreamHandler.h:146
ExternalStreamHandler::CloseApp
void CloseApp(void)
Definition: ExternalStreamHandler.cpp:1034
ExternIO::m_appErr
int m_appErr
Definition: ExternalStreamHandler.h:55
ExternIO::Ready
bool Ready(int fd, std::chrono::milliseconds timeout, const QString &what)
Definition: ExternalStreamHandler.cpp:78
ExternIO::m_pid
pid_t m_pid
Definition: ExternalStreamHandler.h:56
ExternIO::ErrorString
QString ErrorString(void) const
Definition: ExternalStreamHandler.h:43
StreamHandler::UpdateFiltersFromStreamData
bool UpdateFiltersFromStreamData(void)
Definition: streamhandler.cpp:290
MythTimer::restart
std::chrono::milliseconds restart(void)
Returns milliseconds elapsed since last start() or restart() and resets the count.
Definition: mythtimer.cpp:62
ExternalStreamHandler::PurgeBuffer
void PurgeBuffer(void)
Definition: ExternalStreamHandler.cpp:1777
MThread::RunEpilog
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:209
ExternIO::m_app
QFileInfo m_app
Definition: ExternalStreamHandler.h:51
ExternalStreamHandler::Get
static ExternalStreamHandler * Get(const QString &devname, int inputid, int majorid)
Definition: ExternalStreamHandler.cpp:464
ExternIO::Run
bool Run(void)
Definition: ExternalStreamHandler.cpp:237
ExternalStreamHandler::RestartStream
bool RestartStream(void)
Definition: ExternalStreamHandler.cpp:1072
ExternalStreamHandler::m_majorId
int m_majorId
Definition: ExternalStreamHandler.h:132
ExternalStreamHandler::SetAPIVersion
bool SetAPIVersion(void)
Definition: ExternalStreamHandler.cpp:874
ExternalStreamHandler::s_handlersLock
static QMutex s_handlersLock
Definition: ExternalStreamHandler.h:152
ExternIO::m_appOut
int m_appOut
Definition: ExternalStreamHandler.h:54
ExternIO::m_appIn
int m_appIn
Definition: ExternalStreamHandler.h:53
ExternalStreamHandler::ProcessJson
bool ProcessJson(const QVariantMap &vmsg, QVariantMap &elements, QByteArray &response, std::chrono::milliseconds timeout=4s, uint retry_cnt=3)
Definition: ExternalStreamHandler.cpp:1525
ExternalStreamHandler::TS_PACKET_SIZE
@ TS_PACKET_SIZE
Definition: ExternalStreamHandler.h:72
GENERIC_EXIT_DAEMONIZING_ERROR
@ GENERIC_EXIT_DAEMONIZING_ERROR
Error daemonizing or execl.
Definition: exitcodes.h:31
ExternalStreamHandler::StartStreaming
bool StartStreaming(void)
Definition: ExternalStreamHandler.cpp:1133
LOC
#define LOC
Definition: ExternalStreamHandler.cpp:37
ExternalStreamHandler::UpdateDescription
QString UpdateDescription(void)
Definition: ExternalStreamHandler.cpp:899
mpegstreamdata.h
ExternalStreamHandler::IsAppOpen
bool IsAppOpen(void)
Definition: ExternalStreamHandler.cpp:1007
ExternalStreamHandler::CheckForError
bool CheckForError(void)
Definition: ExternalStreamHandler.cpp:1675
ExternalStreamHandler::ProcessVer2
bool ProcessVer2(const QString &command, QString &result, std::chrono::milliseconds timeout, uint retry_cnt)
Definition: ExternalStreamHandler.cpp:1383
ExternalStreamHandler::m_damaged
bool m_damaged
Definition: ExternalStreamHandler.h:149
ExternalStreamHandler::m_ioErrCnt
int m_ioErrCnt
Definition: ExternalStreamHandler.h:138
ExternalStreamHandler::m_replay
bool m_replay
Definition: ExternalStreamHandler.h:147
cardutil.h
StreamHandler::m_runningDesired
volatile bool m_runningDesired
Definition: streamhandler.h:119
ExternalStreamHandler::MAX_API_VERSION
@ MAX_API_VERSION
Definition: ExternalStreamHandler.h:71
ExternalStreamHandler::PACKET_SIZE
@ PACKET_SIZE
Definition: ExternalStreamHandler.h:73
ExternalChannel.h
ExternIO::Write
int Write(const QByteArray &buffer)
Definition: ExternalStreamHandler.cpp:204
GENERIC_EXIT_PIPE_FAILURE
@ GENERIC_EXIT_PIPE_FAILURE
Error creating I/O pipes.
Definition: exitcodes.h:29
ExternIO::m_statusBuf
QString m_statusBuf
Definition: ExternalStreamHandler.h:62
MythTimer::kStartRunning
@ kStartRunning
Definition: mythtimer.h:17
ExternalStreamHandler::m_args
QStringList m_args
Definition: ExternalStreamHandler.h:135
StreamHandler::m_streamDataList
StreamDataList m_streamDataList
Definition: streamhandler.h:145
ExternIO::GetStatus
QByteArray GetStatus(std::chrono::milliseconds timeout=2500ms)
Definition: ExternalStreamHandler.cpp:175
ExternalStreamHandler::ProcessCommand
bool ProcessCommand(const QString &cmd, QString &result, std::chrono::milliseconds timeout=4s, uint retry_cnt=3)
Definition: ExternalStreamHandler.cpp:1242
ExternalStreamHandler::m_streamingCnt
QAtomicInt m_streamingCnt
Definition: ExternalStreamHandler.h:156
StreamHandler::m_bError
volatile bool m_bError
Definition: streamhandler.h:124
logPropagateArgs
QString logPropagateArgs
Definition: logging.cpp:83
ExternalStreamHandler::m_loc
QString m_loc
Definition: ExternalStreamHandler.h:131
ExternIO::Read
int Read(QByteArray &buffer, int maxlen, std::chrono::milliseconds timeout=2500ms)
Definition: ExternalStreamHandler.cpp:112
ExternIO::m_status
QTextStream m_status
Definition: ExternalStreamHandler.h:63
exitcodes.h
StreamHandler::m_device
QString m_device
Definition: streamhandler.h:111
ExternalStreamHandler::m_streamLock
QMutex m_streamLock
Definition: ExternalStreamHandler.h:157
ExternalStreamHandler::Return
static void Return(ExternalStreamHandler *&ref, int inputid)
Definition: ExternalStreamHandler.cpp:495
dtvsignalmonitor.h
ExternalStreamHandler::OpenApp
bool OpenApp(void)
Definition: ExternalStreamHandler.cpp:914
ExternIO::m_args
QStringList m_args
Definition: ExternalStreamHandler.h:52
ExternIO::m_errCnt
int m_errCnt
Definition: ExternalStreamHandler.h:64
uint
unsigned int uint
Definition: freesurround.h:24
ExternalStreamHandler::m_app
QString m_app
Definition: ExternalStreamHandler.h:136