MythTV  master
MythExternControl.cpp
Go to the documentation of this file.
1 /* -*- Mode: c++ -*-
2  *
3  * Copyright (C) John Poet 2018
4  *
5  * This file is part of MythTV
6  *
7  * This program is free software: you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation, either version 3 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program. If not, see <http://www.gnu.org/licenses/>.
19  */
20 
21 #include "MythExternControl.h"
22 #include "mythlogging.h"
23 
24 #include <QFile>
25 #include <QTextStream>
26 
27 #include <unistd.h>
28 #include <poll.h>
29 
30 #include <iostream>
31 
32 using namespace std;
33 
34 const QString VERSION = "0.6";
35 
36 #define LOC Desc()
37 
39  : m_buffer(this)
40  , m_commands(this)
41 {
42  setObjectName("Control");
43 
44  m_buffer.Start();
45  m_commands.Start();
46 }
47 
49 {
50  Terminate();
51  m_commands.Join();
52  m_buffer.Join();
53 }
54 
55 Q_SLOT void MythExternControl::Opened(void)
56 {
57  std::lock_guard<std::mutex> lock(m_flow_mutex);
58 
59  m_ready = true;
60  m_flow_cond.notify_all();
61 }
62 
63 Q_SLOT void MythExternControl::Streaming(bool val)
64 {
65  m_streaming = val;
66  m_flow_cond.notify_all();
67 }
68 
70 {
71  emit Close();
72 }
73 
74 Q_SLOT void MythExternControl::Done(void)
75 {
77  {
78  m_run = false;
79  m_flow_cond.notify_all();
80  m_run_cond.notify_all();
81 
82  std::this_thread::sleep_for(std::chrono::microseconds(50));
83 
85  {
86  std::unique_lock<std::mutex> lk(m_flow_mutex);
87  m_flow_cond.wait_for(lk, std::chrono::milliseconds(1000));
88  }
89 
90  LOG(VB_RECORD, LOG_CRIT, LOC + "Terminated.");
91  }
92 }
93 
94 void MythExternControl::Error(const QString & msg)
95 {
96  LOG(VB_RECORD, LOG_CRIT, LOC + msg);
97 
98  std::unique_lock<std::mutex> lk(m_msg_mutex);
99  if (m_errmsg.isEmpty())
100  m_errmsg = msg;
101  else
102  m_errmsg += "; " + msg;
103 }
104 
105 void MythExternControl::Fatal(const QString & msg)
106 {
107  Error(msg);
108  m_fatal = true;
109  Terminate();
110 }
111 
112 Q_SLOT void MythExternControl::SendMessage(const QString & cmd,
113  const QString & serial,
114  const QString & msg)
115 {
116  std::unique_lock<std::mutex> lk(m_msg_mutex);
117  m_commands.SendStatus(cmd, serial, msg);
118 }
119 
120 Q_SLOT void MythExternControl::ErrorMessage(const QString & msg)
121 {
122  std::unique_lock<std::mutex> lk(m_msg_mutex);
123  if (m_errmsg.isEmpty())
124  m_errmsg = msg;
125  else
126  m_errmsg += "; " + msg;
127 }
128 
129 #undef LOC
130 #define LOC QString("%1").arg(m_parent->Desc())
131 
132 void Commands::Close(void)
133 {
134  std::lock_guard<std::mutex> lock(m_parent->m_flow_mutex);
135 
136  emit m_parent->Close();
137  m_parent->m_ready = false;
138  m_parent->m_flow_cond.notify_all();
139 }
140 
141 void Commands::StartStreaming(const QString & serial)
142 {
143  emit m_parent->StartStreaming(serial);
144 }
145 
146 void Commands::StopStreaming(const QString & serial, bool silent)
147 {
148  emit m_parent->StopStreaming(serial, silent);
149 }
150 
151 void Commands::LockTimeout(const QString & serial) const
152 {
153  emit m_parent->LockTimeout(serial);
154 }
155 
156 void Commands::HasTuner(const QString & serial) const
157 {
158  emit m_parent->HasTuner(serial);
159 }
160 
161 void Commands::HasPictureAttributes(const QString & serial) const
162 {
163  emit m_parent->HasPictureAttributes(serial);
164 }
165 
166 void Commands::SetBlockSize(const QString & serial, int blksz)
167 {
168  emit m_parent->SetBlockSize(serial, blksz);
169 }
170 
171 void Commands::TuneChannel(const QString & serial, const QString & channum)
172 {
173  emit m_parent->TuneChannel(serial, channum);
174 }
175 
176 void Commands::LoadChannels(const QString & serial)
177 {
178  emit m_parent->LoadChannels(serial);
179 }
180 
181 void Commands::FirstChannel(const QString & serial)
182 {
183  emit m_parent->FirstChannel(serial);
184 }
185 
186 void Commands::NextChannel(const QString & serial)
187 {
188  emit m_parent->NextChannel(serial);
189 }
190 
191 bool Commands::SendStatus(const QString & command, const QString & status)
192 {
193  int len = write(2, status.toUtf8().constData(), status.size());
194  write(2, "\n", 1);
195 
196  if (len != status.size())
197  {
198  LOG(VB_RECORD, LOG_ERR, LOC +
199  QString("%1: Only wrote %2 of %3 bytes of message '%4'.")
200  .arg(command).arg(len).arg(status.size()).arg(status));
201  return false;
202  }
203 
204  LOG(VB_RECORD, LOG_INFO, LOC + QString("Processing '%1' --> '%2'")
205  .arg(command).arg(status));
206 
207  m_parent->ClearError();
208  return true;
209 }
210 
211 bool Commands::SendStatus(const QString & command, const QString & serial,
212  const QString & status)
213 {
214  QString msg = QString("%1:%2").arg(serial).arg(status);
215 
216  int len = write(2, msg.toUtf8().constData(), msg.size());
217  write(2, "\n", 1);
218 
219  if (len != msg.size())
220  {
221  LOG(VB_RECORD, LOG_ERR, LOC +
222  QString("%1: Only wrote %2 of %3 bytes of message '%4'.")
223  .arg(command).arg(len).arg(msg.size()).arg(msg));
224  return false;
225  }
226 
227  if (!command.isEmpty())
228  {
229  LOG(VB_RECORD, LOG_INFO, LOC + QString("Processing '%1' --> '%2'")
230  .arg(command).arg(msg));
231  }
232 #if 0
233  else
234  LOG(VB_RECORD, LOG_INFO, LOC + QString("%1").arg(msg));
235 #endif
236 
237  m_parent->ClearError();
238  return true;
239 }
240 
241 bool Commands::ProcessCommand(const QString & cmd)
242 {
243  LOG(VB_RECORD, LOG_DEBUG, LOC + QString("Processing '%1'").arg(cmd));
244 
245  std::unique_lock<std::mutex> lk1(m_parent->m_msg_mutex);
246 
247  if (cmd.startsWith("APIVersion?"))
248  {
249  if (m_parent->m_fatal)
250  SendStatus(cmd, "ERR:" + m_parent->ErrorString());
251  else
252  SendStatus(cmd, "OK:2");
253  return true;
254  }
255 
256  QStringList tokens = cmd.split(':', QString::SkipEmptyParts);
257  if (tokens.size() < 2)
258  {
259  SendStatus(cmd, "0",
260  QString("0:ERR:Version 2 API expects serial_no:msg format. "
261  "Saw '%1' instead").arg(cmd));
262  return true;
263  }
264 
265  if (tokens[1].startsWith("APIVersion?"))
266  {
267  if (m_parent->m_fatal)
268  SendStatus(cmd, tokens[0], "ERR:" + m_parent->ErrorString());
269  else
270  SendStatus(cmd, tokens[0], "OK:2");
271  }
272  else if (tokens[1].startsWith("APIVersion"))
273  {
274  if (tokens.size() > 1)
275  {
276  m_apiVersion = tokens[2].toInt();
277  SendStatus(cmd, tokens[0], QString("OK:%1").arg(m_apiVersion));
278  }
279  else
280  SendStatus(cmd, tokens[0], "ERR:Missing API Version number");
281  }
282  else if (tokens[1].startsWith("Version?"))
283  {
284  if (m_parent->m_fatal)
285  SendStatus(cmd, tokens[0], "ERR:" + m_parent->ErrorString());
286  else
287  SendStatus(cmd, tokens[0], "OK:" + VERSION);
288  }
289  else if (tokens[1].startsWith("Description?"))
290  {
291  if (m_parent->m_fatal)
292  SendStatus(cmd, tokens[0], "ERR:" + m_parent->ErrorString());
293  else if (m_parent->m_desc.trimmed().isEmpty())
294  SendStatus(cmd, tokens[0], "WARN:Not set");
295  else
296  SendStatus(cmd, tokens[0], "OK:" + m_parent->m_desc.trimmed());
297  }
298  else if (tokens[1].startsWith("HasLock?"))
299  {
300  if (m_parent->m_ready)
301  SendStatus(cmd, tokens[0], "OK:Yes");
302  else
303  SendStatus(cmd, tokens[0], "OK:No");
304  }
305  else if (tokens[1].startsWith("SignalStrengthPercent"))
306  {
307  if (m_parent->m_ready)
308  SendStatus(cmd, tokens[0], "OK:100");
309  else
310  SendStatus(cmd, tokens[0], "OK:20");
311  }
312  else if (tokens[1].startsWith("LockTimeout"))
313  {
314  LockTimeout(tokens[0]);
315  }
316  else if (tokens[1].startsWith("HasTuner"))
317  {
318  HasTuner(tokens[0]);
319  }
320  else if (tokens[1].startsWith("HasPictureAttributes"))
321  {
322  HasPictureAttributes(tokens[0]);
323  }
324  else if (tokens[1].startsWith("SendBytes"))
325  {
326  // Used when FlowControl is Polling
327  SendStatus(cmd, tokens[0], "ERR:Not supported");
328  }
329  else if (tokens[1].startsWith("XON"))
330  {
331  // Used when FlowControl is XON/XOFF
332  if (m_parent->m_streaming)
333  {
334  SendStatus(cmd, tokens[0], "OK");
335  m_parent->m_xon = true;
336  m_parent->m_flow_cond.notify_all();
337  }
338  else
339  SendStatus(cmd, tokens[0], "WARN:Not streaming");
340  }
341  else if (tokens[1].startsWith("XOFF"))
342  {
343  if (m_parent->m_streaming)
344  {
345  SendStatus(cmd, tokens[0], "OK");
346  // Used when FlowControl is XON/XOFF
347  m_parent->m_xon = false;
348  m_parent->m_flow_cond.notify_all();
349  }
350  else
351  SendStatus(cmd, tokens[0], "WARN:Not streaming");
352  }
353  else if (tokens[1].startsWith("TuneChannel"))
354  {
355  if (tokens.size() > 1)
356  TuneChannel(tokens[0], tokens[2]);
357  else
358  SendStatus(cmd, tokens[0], "ERR:Missing channum");
359  }
360  else if (tokens[1].startsWith("LoadChannels"))
361  {
362  LoadChannels(tokens[0]);
363  }
364  else if (tokens[1].startsWith("FirstChannel"))
365  {
366  FirstChannel(tokens[0]);
367  }
368  else if (tokens[1].startsWith("NextChannel"))
369  {
370  NextChannel(tokens[0]);
371  }
372  else if (tokens[1].startsWith("IsOpen?"))
373  {
374  std::unique_lock<std::mutex> lk2(m_parent->m_run_mutex);
375  if (m_parent->m_fatal)
376  SendStatus(cmd, tokens[0], "ERR:" + m_parent->ErrorString());
377  else if (m_parent->m_ready)
378  SendStatus(cmd, tokens[0], "OK:Open");
379  else
380  SendStatus(cmd, tokens[0], "WARN:Not Open yet");
381  }
382  else if (tokens[1].startsWith("CloseRecorder"))
383  {
384  if (m_parent->m_streaming)
385  StopStreaming(tokens[0], true);
386  m_parent->Terminate();
387  SendStatus(cmd, tokens[0], "OK:Terminating");
388  }
389  else if (tokens[1].startsWith("FlowControl?"))
390  {
391  SendStatus(cmd, tokens[0], "OK:XON/XOFF");
392  }
393  else if (tokens[1].startsWith("BlockSize"))
394  {
395  if (tokens.size() > 1)
396  SetBlockSize(tokens[0], tokens[2].toUInt());
397  else
398  SendStatus(cmd, tokens[0], "ERR:Missing block size");
399  }
400  else if (tokens[1].startsWith("StartStreaming"))
401  {
402  StartStreaming(tokens[0]);
403  }
404  else if (tokens[1].startsWith("StopStreaming"))
405  {
406  /* This does not close the stream! When Myth is done with
407  * this 'recording' ExternalChannel::EnterPowerSavingMode()
408  * will be called, which invokes CloseRecorder() */
409  StopStreaming(tokens[0], false);
410  }
411  else
412  SendStatus(cmd, tokens[0],
413  QString("ERR:Unrecognized command '%1'").arg(tokens[1]));
414 
415  return true;
416 }
417 
418 void Commands::Run(void)
419 {
420  setObjectName("Commands");
421 
422  QString cmd;
423 
424  struct pollfd polls[2];
425  memset(polls, 0, sizeof(polls));
426 
427  polls[0].fd = 0;
428  polls[0].events = POLLIN | POLLPRI;
429  polls[0].revents = 0;
430 
431  QFile input;
432  input.open(stdin, QIODevice::ReadOnly);
433  QTextStream qtin(&input);
434 
435  LOG(VB_RECORD, LOG_INFO, LOC + "Command parser ready.");
436 
437  while (m_parent->m_run)
438  {
439  int timeout = 250;
440  int poll_cnt = 1;
441  int ret = poll(polls, poll_cnt, timeout);
442 
443  if (polls[0].revents & POLLHUP)
444  {
445  LOG(VB_RECORD, LOG_ERR, LOC + "poll eof (POLLHUP)");
446  break;
447  }
448  if (polls[0].revents & POLLNVAL)
449  {
450  m_parent->Fatal("poll error");
451  return;
452  }
453 
454  if (polls[0].revents & POLLIN)
455  {
456  if (ret > 0)
457  {
458  cmd = qtin.readLine();
459  if (!ProcessCommand(cmd))
460  m_parent->Fatal("Invalid command");
461  }
462  else if (ret < 0)
463  {
464  if ((EOVERFLOW == errno))
465  {
466  LOG(VB_RECORD, LOG_ERR, "command overflow");
467  break; // we have an error to handle
468  }
469 
470  if ((EAGAIN == errno) || (EINTR == errno))
471  {
472  LOG(VB_RECORD, LOG_ERR, LOC + "retry command read.");
473  continue; // errors that tell you to try again
474  }
475 
476  LOG(VB_RECORD, LOG_ERR, LOC + "unknown error reading command.");
477  }
478  }
479  }
480 
481  LOG(VB_RECORD, LOG_INFO, LOC + "Command parser: shutting down");
482  m_parent->m_commands_running = false;
483  m_parent->m_flow_cond.notify_all();
484 }
485 
487  : m_parent(parent)
488 {
489  m_heartbeat = std::chrono::system_clock::now();
490 }
491 
492 bool Buffer::Fill(const QByteArray & buffer)
493 {
494  if (buffer.size() < 1)
495  return false;
496 
497  static int s_dropped = 0;
498  static int s_droppedBytes = 0;
499 
500  m_parent->m_flow_mutex.lock();
501  if (m_data.size() < MAX_QUEUE)
502  {
503  block_t blk(reinterpret_cast<const uint8_t *>(buffer.constData()),
504  reinterpret_cast<const uint8_t *>(buffer.constData())
505  + buffer.size());
506 
507  m_data.push(blk);
508  s_dropped = 0;
509 
510  LOG(VB_GENERAL, LOG_DEBUG, LOC +
511  QString("Adding %1 bytes").arg(buffer.size()));
512  }
513  else
514  {
515  s_droppedBytes += buffer.size();
516  LOG(VB_RECORD, LOG_WARNING, LOC +
517  QString("Packet queue overrun. Dropped %1 packets, %2 bytes.")
518  .arg(++s_dropped).arg(s_droppedBytes));
519 
520  std::this_thread::sleep_for(std::chrono::microseconds(250));
521  }
522 
523  m_parent->m_flow_mutex.unlock();
524  m_parent->m_flow_cond.notify_all();
525 
526  m_heartbeat = std::chrono::system_clock::now();
527 
528  return true;
529 }
530 
531 void Buffer::Run(void)
532 {
533  setObjectName("Buffer");
534 
535  bool is_empty = false;
536  bool wait = false;
537  time_t send_time = time (nullptr) + (60 * 5);
538  uint64_t write_total = 0;
539  uint64_t written = 0;
540  uint64_t write_cnt = 0;
541  uint64_t empty_cnt = 0;
542 
543  LOG(VB_RECORD, LOG_INFO, LOC + "Buffer: Ready for data.");
544 
545  while (m_parent->m_run)
546  {
547  {
548  std::unique_lock<std::mutex> lk(m_parent->m_flow_mutex);
549  m_parent->m_flow_cond.wait_for(lk,
550  std::chrono::milliseconds
551  (wait ? 5000 : 25));
552  wait = false;
553  }
554 
555  if (send_time < static_cast<double>(time (nullptr)))
556  {
557  // Every 5 minutes, write out some statistics.
558  send_time = time (nullptr) + (60 * 5);
559  write_total += written;
560  if (m_parent->m_streaming)
561  {
562  LOG(VB_RECORD, LOG_NOTICE, LOC +
563  QString("Count: %1, Empty cnt %2, Written %3, Total %4")
564  .arg(write_cnt).arg(empty_cnt)
565  .arg(written).arg(write_total));
566  }
567  else
568  {
569  LOG(VB_GENERAL, LOG_NOTICE, LOC + "Not streaming.");
570  }
571 
572  write_cnt = empty_cnt = written = 0;
573  }
574 
575  if (m_parent->m_streaming)
576  {
577  if (m_parent->m_xon)
578  {
579  block_t pkt;
580  m_parent->m_flow_mutex.lock();
581  if (!m_data.empty())
582  {
583  pkt = m_data.front();
584  m_data.pop();
585  is_empty = m_data.empty();
586  }
587  m_parent->m_flow_mutex.unlock();
588 
589  if (!pkt.empty())
590  {
591  uint sz = write(1, pkt.data(), pkt.size());
592  written += sz;
593  ++write_cnt;
594 
595  if (sz != pkt.size())
596  {
597  LOG(VB_GENERAL, LOG_WARNING, LOC +
598  QString("Only wrote %1 of %2 bytes to mythbackend")
599  .arg(sz).arg(pkt.size()));
600  }
601  }
602 
603  if (is_empty)
604  {
605  wait = true;
606  ++empty_cnt;
607  }
608  }
609  else
610  wait = true;
611  }
612  else
613  {
614  // Clear packet queue
615  m_parent->m_flow_mutex.lock();
616  if (!m_data.empty())
617  {
618  stack_t dummy;
619  std::swap(m_data, dummy);
620  }
621  m_parent->m_flow_mutex.unlock();
622 
623  wait = true;
624  }
625  }
626 
627  LOG(VB_RECORD, LOG_INFO, LOC + "Buffer: shutting down");
628  m_parent->m_buffer_running = false;
629  m_parent->m_flow_cond.notify_all();
630 }
std::atomic< bool > m_run
std::atomic< bool > m_commands_running
def write(text, progress=True)
Definition: mythburn.py:308
MythExternControl * m_parent
void Join(void)
void TuneChannel(const QString &serial, const QString &channum)
void ErrorMessage(const QString &msg)
std::condition_variable m_run_cond
void NextChannel(const QString &serial)
void Run(void)
Buffer(MythExternControl *parent)
void TuneChannel(const QString &serial, const QString &channum)
std::atomic< bool > m_buffer_running
void StopStreaming(const QString &serial, bool silent)
#define LOC
void Error(const QString &msg)
void LockTimeout(const QString &serial) const
bool Fill(const QByteArray &buffer)
void LoadChannels(const QString &serial)
void StopStreaming(const QString &serial, bool silent)
QString ErrorString(void) const
const QString VERSION
void FirstChannel(const QString &serial)
void HasTuner(const QString &serial) const
void Run(void)
std::condition_variable m_flow_cond
void SetBlockSize(const QString &serial, int blksz)
void Start(void)
std::vector< uint8_t > block_t
void FirstChannel(const QString &serial)
void HasPictureAttributes(const QString &serial) const
std::atomic< bool > m_ready
void SetBlockSize(const QString &serial, int blksz)
void StartStreaming(const QString &serial)
unsigned int uint
Definition: compat.h:140
MythExternControl * m_parent
std::atomic< bool > m_streaming
void Close(void)
bool SendStatus(const QString &command, const QString &status)
void Fatal(const QString &msg)
bool ProcessCommand(const QString &cmd)
void Join(void)
void Streaming(bool val)
void HasPictureAttributes(const QString &serial) const
std::queue< block_t > stack_t
#define LOG(_MASK_, _LEVEL_, _STRING_)
Definition: mythlogging.h:41
void HasTuner(const QString &serial) const
void Start(void)
void StartStreaming(const QString &serial)
void SendMessage(const QString &cmd, const QString &serial, const QString &msg)
stack_t m_data
std::chrono::time_point< std::chrono::system_clock > m_heartbeat
void NextChannel(const QString &serial)
void LoadChannels(const QString &serial)
~MythExternControl(void) override
void Close(void)
std::atomic< bool > m_xon
void LockTimeout(const QString &serial) const