MythTV  master
MythExternControl.cpp
Go to the documentation of this file.
1 /* -*- Mode: c++ -*-
2  *
3  * Copyright (C) John Poet 2018
4  *
5  * This file is part of MythTV
6  *
7  * This program is free software: you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation, either version 3 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program. If not, see <http://www.gnu.org/licenses/>.
19  */
20 
21 #include "MythExternControl.h"
22 #include "mythlogging.h"
23 
24 #include <QFile>
25 #include <QTextStream>
26 
27 #include <unistd.h>
28 #include <poll.h>
29 
30 #include <iostream>
31 
32 using namespace std;
33 
34 const QString VERSION = "0.6";
35 
36 #define LOC Desc()
37 
39  : m_buffer(this)
40  , m_commands(this)
41  , m_run(true)
42  , m_commands_running(true)
43  , m_buffer_running(true)
44  , m_fatal(false)
45  , m_streaming(false)
46  , m_xon(false)
47  , m_ready(false)
48 {
49  setObjectName("Control");
50 
51  m_buffer.Start();
52  m_commands.Start();
53 }
54 
56 {
57  Terminate();
58  m_commands.Join();
59  m_buffer.Join();
60 }
61 
62 Q_SLOT void MythExternControl::Opened(void)
63 {
64  std::lock_guard<std::mutex> lock(m_flow_mutex);
65 
66  m_ready = true;
67  m_flow_cond.notify_all();
68 }
69 
70 Q_SLOT void MythExternControl::Streaming(bool val)
71 {
72  m_streaming = val;
73  m_flow_cond.notify_all();
74 }
75 
77 {
78  emit Close();
79 }
80 
81 Q_SLOT void MythExternControl::Done(void)
82 {
84  {
85  m_run = false;
86  m_flow_cond.notify_all();
87  m_run_cond.notify_all();
88 
89  std::this_thread::sleep_for(std::chrono::microseconds(50));
90 
92  {
93  std::unique_lock<std::mutex> lk(m_flow_mutex);
94  m_flow_cond.wait_for(lk, std::chrono::milliseconds(1000));
95  }
96 
97  LOG(VB_RECORD, LOG_CRIT, LOC + "Terminated.");
98  }
99 }
100 
101 void MythExternControl::Error(const QString & msg)
102 {
103  LOG(VB_RECORD, LOG_CRIT, LOC + msg);
104 
105  std::unique_lock<std::mutex> lk(m_msg_mutex);
106  if (m_errmsg.isEmpty())
107  m_errmsg = msg;
108  else
109  m_errmsg += "; " + msg;
110 }
111 
112 void MythExternControl::Fatal(const QString & msg)
113 {
114  Error(msg);
115  m_fatal = true;
116  Terminate();
117 }
118 
119 Q_SLOT void MythExternControl::SendMessage(const QString & cmd,
120  const QString & serial,
121  const QString & msg)
122 {
123  std::unique_lock<std::mutex> lk(m_msg_mutex);
124  m_commands.SendStatus(cmd, serial, msg);
125 }
126 
127 Q_SLOT void MythExternControl::ErrorMessage(const QString & msg)
128 {
129  std::unique_lock<std::mutex> lk(m_msg_mutex);
130  if (m_errmsg.isEmpty())
131  m_errmsg = msg;
132  else
133  m_errmsg += "; " + msg;
134 }
135 
136 #undef LOC
137 #define LOC QString("%1").arg(m_parent->Desc())
138 
139 void Commands::Close(void)
140 {
141  std::lock_guard<std::mutex> lock(m_parent->m_flow_mutex);
142 
143  emit m_parent->Close();
144  m_parent->m_ready = false;
145  m_parent->m_flow_cond.notify_all();
146 }
147 
148 void Commands::StartStreaming(const QString & serial)
149 {
150  emit m_parent->StartStreaming(serial);
151 }
152 
153 void Commands::StopStreaming(const QString & serial, bool silent)
154 {
155  emit m_parent->StopStreaming(serial, silent);
156 }
157 
158 void Commands::LockTimeout(const QString & serial) const
159 {
160  emit m_parent->LockTimeout(serial);
161 }
162 
163 void Commands::HasTuner(const QString & serial) const
164 {
165  emit m_parent->HasTuner(serial);
166 }
167 
168 void Commands::HasPictureAttributes(const QString & serial) const
169 {
170  emit m_parent->HasPictureAttributes(serial);
171 }
172 
173 void Commands::SetBlockSize(const QString & serial, int blksz)
174 {
175  emit m_parent->SetBlockSize(serial, blksz);
176 }
177 
178 void Commands::TuneChannel(const QString & serial, const QString & channum)
179 {
180  emit m_parent->TuneChannel(serial, channum);
181 }
182 
183 void Commands::LoadChannels(const QString & serial)
184 {
185  emit m_parent->LoadChannels(serial);
186 }
187 
188 void Commands::FirstChannel(const QString & serial)
189 {
190  emit m_parent->FirstChannel(serial);
191 }
192 
193 void Commands::NextChannel(const QString & serial)
194 {
195  emit m_parent->NextChannel(serial);
196 }
197 
198 bool Commands::SendStatus(const QString & command, const QString & status)
199 {
200  int len = write(2, status.toUtf8().constData(), status.size());
201  write(2, "\n", 1);
202 
203  if (len != status.size())
204  {
205  LOG(VB_RECORD, LOG_ERR, LOC +
206  QString("%1: Only wrote %2 of %3 bytes of message '%4'.")
207  .arg(command).arg(len).arg(status.size()).arg(status));
208  return false;
209  }
210 
211  LOG(VB_RECORD, LOG_INFO, LOC + QString("Processing '%1' --> '%2'")
212  .arg(command).arg(status));
213 
214  m_parent->ClearError();
215  return true;
216 }
217 
218 bool Commands::SendStatus(const QString & command, const QString & serial,
219  const QString & status)
220 {
221  QString msg = QString("%1:%2").arg(serial).arg(status);
222 
223  int len = write(2, msg.toUtf8().constData(), msg.size());
224  write(2, "\n", 1);
225 
226  if (len != msg.size())
227  {
228  LOG(VB_RECORD, LOG_ERR, LOC +
229  QString("%1: Only wrote %2 of %3 bytes of message '%4'.")
230  .arg(command).arg(len).arg(msg.size()).arg(msg));
231  return false;
232  }
233 
234  if (!command.isEmpty())
235  {
236  LOG(VB_RECORD, LOG_INFO, LOC + QString("Processing '%1' --> '%2'")
237  .arg(command).arg(msg));
238  }
239 #if 0
240  else
241  LOG(VB_RECORD, LOG_INFO, LOC + QString("%1").arg(msg));
242 #endif
243 
244  m_parent->ClearError();
245  return true;
246 }
247 
248 bool Commands::ProcessCommand(const QString & cmd)
249 {
250  LOG(VB_RECORD, LOG_DEBUG, LOC + QString("Processing '%1'").arg(cmd));
251 
252  std::unique_lock<std::mutex> lk1(m_parent->m_msg_mutex);
253 
254  if (cmd.startsWith("APIVersion?"))
255  {
256  if (m_parent->m_fatal)
257  SendStatus(cmd, "ERR:" + m_parent->ErrorString());
258  else
259  SendStatus(cmd, "OK:2");
260  return true;
261  }
262 
263  QStringList tokens = cmd.split(':', QString::SkipEmptyParts);
264  if (tokens.size() < 2)
265  {
266  SendStatus(cmd, "0",
267  QString("0:ERR:Version 2 API expects serial_no:msg format. "
268  "Saw '%1' instead").arg(cmd));
269  return true;
270  }
271 
272  if (tokens[1].startsWith("APIVersion?"))
273  {
274  if (m_parent->m_fatal)
275  SendStatus(cmd, tokens[0], "ERR:" + m_parent->ErrorString());
276  else
277  SendStatus(cmd, tokens[0], "OK:2");
278  }
279  else if (tokens[1].startsWith("APIVersion"))
280  {
281  if (tokens.size() > 1)
282  {
283  m_apiVersion = tokens[2].toInt();
284  SendStatus(cmd, tokens[0], QString("OK:%1").arg(m_apiVersion));
285  }
286  else
287  SendStatus(cmd, tokens[0], "ERR:Missing API Version number");
288  }
289  else if (tokens[1].startsWith("Version?"))
290  {
291  if (m_parent->m_fatal)
292  SendStatus(cmd, tokens[0], "ERR:" + m_parent->ErrorString());
293  else
294  SendStatus(cmd, tokens[0], "OK:" + VERSION);
295  }
296  else if (tokens[1].startsWith("Description?"))
297  {
298  if (m_parent->m_fatal)
299  SendStatus(cmd, tokens[0], "ERR:" + m_parent->ErrorString());
300  else if (m_parent->m_desc.trimmed().isEmpty())
301  SendStatus(cmd, tokens[0], "WARN:Not set");
302  else
303  SendStatus(cmd, tokens[0], "OK:" + m_parent->m_desc.trimmed());
304  }
305  else if (tokens[1].startsWith("HasLock?"))
306  {
307  if (m_parent->m_ready)
308  SendStatus(cmd, tokens[0], "OK:Yes");
309  else
310  SendStatus(cmd, tokens[0], "OK:No");
311  }
312  else if (tokens[1].startsWith("SignalStrengthPercent"))
313  {
314  if (m_parent->m_ready)
315  SendStatus(cmd, tokens[0], "OK:100");
316  else
317  SendStatus(cmd, tokens[0], "OK:20");
318  }
319  else if (tokens[1].startsWith("LockTimeout"))
320  {
321  LockTimeout(tokens[0]);
322  }
323  else if (tokens[1].startsWith("HasTuner"))
324  {
325  HasTuner(tokens[0]);
326  }
327  else if (tokens[1].startsWith("HasPictureAttributes"))
328  {
329  HasPictureAttributes(tokens[0]);
330  }
331  else if (tokens[1].startsWith("SendBytes"))
332  {
333  // Used when FlowControl is Polling
334  SendStatus(cmd, tokens[0], "ERR:Not supported");
335  }
336  else if (tokens[1].startsWith("XON"))
337  {
338  // Used when FlowControl is XON/XOFF
339  if (m_parent->m_streaming)
340  {
341  SendStatus(cmd, tokens[0], "OK");
342  m_parent->m_xon = true;
343  m_parent->m_flow_cond.notify_all();
344  }
345  else
346  SendStatus(cmd, tokens[0], "WARN:Not streaming");
347  }
348  else if (tokens[1].startsWith("XOFF"))
349  {
350  if (m_parent->m_streaming)
351  {
352  SendStatus(cmd, tokens[0], "OK");
353  // Used when FlowControl is XON/XOFF
354  m_parent->m_xon = false;
355  m_parent->m_flow_cond.notify_all();
356  }
357  else
358  SendStatus(cmd, tokens[0], "WARN:Not streaming");
359  }
360  else if (tokens[1].startsWith("TuneChannel"))
361  {
362  if (tokens.size() > 1)
363  TuneChannel(tokens[0], tokens[2]);
364  else
365  SendStatus(cmd, tokens[0], "ERR:Missing channum");
366  }
367  else if (tokens[1].startsWith("LoadChannels"))
368  {
369  LoadChannels(tokens[0]);
370  }
371  else if (tokens[1].startsWith("FirstChannel"))
372  {
373  FirstChannel(tokens[0]);
374  }
375  else if (tokens[1].startsWith("NextChannel"))
376  {
377  NextChannel(tokens[0]);
378  }
379  else if (tokens[1].startsWith("IsOpen?"))
380  {
381  std::unique_lock<std::mutex> lk2(m_parent->m_run_mutex);
382  if (m_parent->m_fatal)
383  SendStatus(cmd, tokens[0], "ERR:" + m_parent->ErrorString());
384  else if (m_parent->m_ready)
385  SendStatus(cmd, tokens[0], "OK:Open");
386  else
387  SendStatus(cmd, tokens[0], "WARN:Not Open yet");
388  }
389  else if (tokens[1].startsWith("CloseRecorder"))
390  {
391  if (m_parent->m_streaming)
392  StopStreaming(tokens[0], true);
393  m_parent->Terminate();
394  SendStatus(cmd, tokens[0], "OK:Terminating");
395  }
396  else if (tokens[1].startsWith("FlowControl?"))
397  {
398  SendStatus(cmd, tokens[0], "OK:XON/XOFF");
399  }
400  else if (tokens[1].startsWith("BlockSize"))
401  {
402  if (tokens.size() > 1)
403  SetBlockSize(tokens[0], tokens[2].toUInt());
404  else
405  SendStatus(cmd, tokens[0], "ERR:Missing block size");
406  }
407  else if (tokens[1].startsWith("StartStreaming"))
408  {
409  StartStreaming(tokens[0]);
410  }
411  else if (tokens[1].startsWith("StopStreaming"))
412  {
413  /* This does not close the stream! When Myth is done with
414  * this 'recording' ExternalChannel::EnterPowerSavingMode()
415  * will be called, which invokes CloseRecorder() */
416  StopStreaming(tokens[0], false);
417  }
418  else
419  SendStatus(cmd, tokens[0],
420  QString("ERR:Unrecognized command '%1'").arg(tokens[1]));
421 
422  return true;
423 }
424 
425 void Commands::Run(void)
426 {
427  setObjectName("Commands");
428 
429  QString cmd;
430 
431  struct pollfd polls[2];
432  memset(polls, 0, sizeof(polls));
433 
434  polls[0].fd = 0;
435  polls[0].events = POLLIN | POLLPRI;
436  polls[0].revents = 0;
437 
438  QFile input;
439  input.open(stdin, QIODevice::ReadOnly);
440  QTextStream qtin(&input);
441 
442  LOG(VB_RECORD, LOG_INFO, LOC + "Command parser ready.");
443 
444  while (m_parent->m_run)
445  {
446  int timeout = 250;
447  int poll_cnt = 1;
448  int ret = poll(polls, poll_cnt, timeout);
449 
450  if (polls[0].revents & POLLHUP)
451  {
452  LOG(VB_RECORD, LOG_ERR, LOC + "poll eof (POLLHUP)");
453  break;
454  }
455  if (polls[0].revents & POLLNVAL)
456  {
457  m_parent->Fatal("poll error");
458  return;
459  }
460 
461  if (polls[0].revents & POLLIN)
462  {
463  if (ret > 0)
464  {
465  cmd = qtin.readLine();
466  if (!ProcessCommand(cmd))
467  m_parent->Fatal("Invalid command");
468  }
469  else if (ret < 0)
470  {
471  if ((EOVERFLOW == errno))
472  {
473  LOG(VB_RECORD, LOG_ERR, "command overflow");
474  break; // we have an error to handle
475  }
476 
477  if ((EAGAIN == errno) || (EINTR == errno))
478  {
479  LOG(VB_RECORD, LOG_ERR, LOC + "retry command read.");
480  continue; // errors that tell you to try again
481  }
482 
483  LOG(VB_RECORD, LOG_ERR, LOC + "unknown error reading command.");
484  }
485  }
486  }
487 
488  LOG(VB_RECORD, LOG_INFO, LOC + "Command parser: shutting down");
489  m_parent->m_commands_running = false;
490  m_parent->m_flow_cond.notify_all();
491 }
492 
494  : m_parent(parent)
495 {
496  m_heartbeat = std::chrono::system_clock::now();
497 }
498 
499 bool Buffer::Fill(const QByteArray & buffer)
500 {
501  if (buffer.size() < 1)
502  return false;
503 
504  static int s_dropped = 0;
505  static int s_droppedBytes = 0;
506 
507  m_parent->m_flow_mutex.lock();
508  if (m_data.size() < MAX_QUEUE)
509  {
510  block_t blk(reinterpret_cast<const uint8_t *>(buffer.constData()),
511  reinterpret_cast<const uint8_t *>(buffer.constData())
512  + buffer.size());
513 
514  m_data.push(blk);
515  s_dropped = 0;
516 
517  LOG(VB_GENERAL, LOG_DEBUG, LOC +
518  QString("Adding %1 bytes").arg(buffer.size()));
519  }
520  else
521  {
522  s_droppedBytes += buffer.size();
523  LOG(VB_RECORD, LOG_WARNING, LOC +
524  QString("Packet queue overrun. Dropped %1 packets, %2 bytes.")
525  .arg(++s_dropped).arg(s_droppedBytes));
526 
527  std::this_thread::sleep_for(std::chrono::microseconds(250));
528  }
529 
530  m_parent->m_flow_mutex.unlock();
531  m_parent->m_flow_cond.notify_all();
532 
533  m_heartbeat = std::chrono::system_clock::now();
534 
535  return true;
536 }
537 
538 void Buffer::Run(void)
539 {
540  setObjectName("Buffer");
541 
542  bool is_empty = false;
543  bool wait = false;
544  time_t send_time = time (nullptr) + (60 * 5);
545  uint64_t write_total = 0;
546  uint64_t written = 0;
547  uint64_t write_cnt = 0;
548  uint64_t empty_cnt = 0;
549 
550  LOG(VB_RECORD, LOG_INFO, LOC + "Buffer: Ready for data.");
551 
552  while (m_parent->m_run)
553  {
554  {
555  std::unique_lock<std::mutex> lk(m_parent->m_flow_mutex);
556  m_parent->m_flow_cond.wait_for(lk,
557  std::chrono::milliseconds
558  (wait ? 5000 : 25));
559  wait = false;
560  }
561 
562  if (send_time < static_cast<double>(time (nullptr)))
563  {
564  // Every 5 minutes, write out some statistics.
565  send_time = time (nullptr) + (60 * 5);
566  write_total += written;
567  if (m_parent->m_streaming)
568  LOG(VB_RECORD, LOG_NOTICE, LOC +
569  QString("Count: %1, Empty cnt %2, Written %3, Total %4")
570  .arg(write_cnt).arg(empty_cnt)
571  .arg(written).arg(write_total));
572  else
573  LOG(VB_GENERAL, LOG_NOTICE, LOC + "Not streaming.");
574 
575  write_cnt = empty_cnt = written = 0;
576  }
577 
578  if (m_parent->m_streaming)
579  {
580  if (m_parent->m_xon)
581  {
582  block_t pkt;
583  m_parent->m_flow_mutex.lock();
584  if (!m_data.empty())
585  {
586  pkt = m_data.front();
587  m_data.pop();
588  is_empty = m_data.empty();
589  }
590  m_parent->m_flow_mutex.unlock();
591 
592  if (!pkt.empty())
593  {
594  uint sz = write(1, pkt.data(), pkt.size());
595  written += sz;
596  ++write_cnt;
597 
598  if (sz != pkt.size())
599  {
600  LOG(VB_GENERAL, LOG_WARNING, LOC +
601  QString("Only wrote %1 of %2 bytes to mythbackend")
602  .arg(sz).arg(pkt.size()));
603  }
604  }
605 
606  if (is_empty)
607  {
608  wait = true;
609  ++empty_cnt;
610  }
611  }
612  else
613  wait = true;
614  }
615  else
616  {
617  // Clear packet queue
618  m_parent->m_flow_mutex.lock();
619  if (!m_data.empty())
620  {
621  stack_t dummy;
622  std::swap(m_data, dummy);
623  }
624  m_parent->m_flow_mutex.unlock();
625 
626  wait = true;
627  }
628  }
629 
630  LOG(VB_RECORD, LOG_INFO, LOC + "Buffer: shutting down");
631  m_parent->m_buffer_running = false;
632  m_parent->m_flow_cond.notify_all();
633 }
std::atomic< bool > m_run
std::atomic< bool > m_commands_running
def write(text, progress=True)
Definition: mythburn.py:279
VERBOSE_PREAMBLE Most true
Definition: verbosedefs.h:91
MythExternControl * m_parent
void Join(void)
void TuneChannel(const QString &serial, const QString &channum)
void ErrorMessage(const QString &msg)
std::condition_variable m_run_cond
void NextChannel(const QString &serial)
void Run(void)
Buffer(MythExternControl *parent)
void TuneChannel(const QString &serial, const QString &channum)
std::atomic< bool > m_buffer_running
void StopStreaming(const QString &serial, bool silent)
#define LOC
void Error(const QString &msg)
void LockTimeout(const QString &serial) const
bool Fill(const QByteArray &buffer)
void LoadChannels(const QString &serial)
void StopStreaming(const QString &serial, bool silent)
QString ErrorString(void) const
const QString VERSION
void FirstChannel(const QString &serial)
void HasTuner(const QString &serial) const
void Run(void)
VERBOSE_PREAMBLE false
Definition: verbosedefs.h:85
std::condition_variable m_flow_cond
void SetBlockSize(const QString &serial, int blksz)
void Start(void)
std::vector< uint8_t > block_t
void FirstChannel(const QString &serial)
void HasPictureAttributes(const QString &serial) const
std::atomic< bool > m_ready
void SetBlockSize(const QString &serial, int blksz)
void StartStreaming(const QString &serial)
unsigned int uint
Definition: compat.h:140
MythExternControl * m_parent
std::atomic< bool > m_streaming
void Close(void)
bool SendStatus(const QString &command, const QString &status)
void Fatal(const QString &msg)
bool ProcessCommand(const QString &cmd)
void Join(void)
void Streaming(bool val)
void HasPictureAttributes(const QString &serial) const
std::queue< block_t > stack_t
#define LOG(_MASK_, _LEVEL_, _STRING_)
Definition: mythlogging.h:41
void HasTuner(const QString &serial) const
void Start(void)
void StartStreaming(const QString &serial)
void SendMessage(const QString &cmd, const QString &serial, const QString &msg)
stack_t m_data
std::chrono::time_point< std::chrono::system_clock > m_heartbeat
void NextChannel(const QString &serial)
void LoadChannels(const QString &serial)
void Close(void)
std::atomic< bool > m_xon
void LockTimeout(const QString &serial) const