MythTV master
MythExternControl.cpp
Go to the documentation of this file.
1/* -*- Mode: c++ -*-
2 *
3 * Copyright (C) John Poet 2018
4 *
5 * This file is part of MythTV
6 *
7 * This program is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 */
20
21#include "MythExternControl.h"
22
23#include <array>
24#include <iostream>
25#include <mutex>
26#include <poll.h>
27#include <unistd.h>
28
29#include <QFile>
30#include <QTextStream>
31#include <QJsonDocument>
32#include <QJsonObject>
33
35
36using namespace std::chrono_literals;
37
38const QString VERSION = "2.0";
39
40#define LOC Desc()
41
43 : m_buffer(this)
44 , m_commands(this)
45{
46 setObjectName("Control");
47
50}
51
53{
54 Terminate();
56 m_buffer.Join();
57}
58
60{
61 std::scoped_lock lock(m_flowMutex);
62
63 m_ready = true;
64 m_flowCond.notify_all();
65}
66
67Q_SLOT void MythExternControl::Streaming(bool val)
68{
69 m_streaming = val;
70 m_flowCond.notify_all();
71}
72
74{
75 emit Close();
76}
77
78Q_SLOT void MythExternControl::Done(void)
79{
81 {
82 m_run = false;
83 m_flowCond.notify_all();
84 m_runCond.notify_all();
85
86 std::this_thread::sleep_for(50us);
87
89 {
90 std::unique_lock<std::mutex> lk(m_flowMutex);
91 m_flowCond.wait_for(lk, 1s);
92 }
93
94 LOG(VB_RECORD, LOG_CRIT, LOC + "Terminated.");
95 }
96}
97
98void MythExternControl::Error(const QString & msg)
99{
100 LOG(VB_RECORD, LOG_CRIT, LOC + msg);
101
102 std::unique_lock<std::mutex> lk(m_msgMutex);
103 if (m_errmsg.isEmpty())
104 m_errmsg = msg;
105 else
106 m_errmsg += "; " + msg;
107}
108
109void MythExternControl::Fatal(const QString & msg)
110{
111 Error(msg);
112 m_fatal = true;
113 Terminate();
114}
115
116Q_SLOT void MythExternControl::SendMessage(const QString & command,
117 const QString & serial,
118 const QString & message,
119 const QString & status)
120{
121 std::unique_lock<std::mutex> lk(m_msgMutex);
122 m_commands.SendStatus(command, status, serial, message);
123}
124
125Q_SLOT void MythExternControl::ErrorMessage(const QString & msg)
126{
127 std::unique_lock<std::mutex> lk(m_msgMutex);
128 if (m_errmsg.isEmpty())
129 m_errmsg = msg;
130 else
131 m_errmsg += "; " + msg;
132}
133
134#undef LOC
135#define LOC QString("%1").arg(m_parent->Desc())
136
138{
139 std::scoped_lock lock(m_parent->m_flowMutex);
140
141 emit m_parent->Close();
142 m_parent->m_ready = false;
143 m_parent->m_flowCond.notify_all();
144}
145
146void Commands::StartStreaming(const QString & serial)
147{
148 emit m_parent->StartStreaming(serial);
149}
150
151void Commands::StopStreaming(const QString & serial, bool silent)
152{
153 emit m_parent->StopStreaming(serial, silent);
154}
155
156void Commands::LockTimeout(const QString & serial) const
157{
158 emit m_parent->LockTimeout(serial);
159}
160
161void Commands::HasTuner(const QString & serial) const
162{
163 emit m_parent->HasTuner(serial);
164}
165
166void Commands::HasPictureAttributes(const QString & serial) const
167{
168 emit m_parent->HasPictureAttributes(serial);
169}
170
171void Commands::SetBlockSize(const QString & serial, int blksz)
172{
173 emit m_parent->SetBlockSize(serial, blksz);
174}
175
176void Commands::TuneChannel(const QString & serial, const QVariantMap & args)
177{
178 emit m_parent->TuneChannel(serial, args);
179}
180
181void Commands::TuneStatus(const QString & serial)
182{
183 emit m_parent->TuneStatus(serial);
184}
185
186void Commands::LoadChannels(const QString & serial)
187{
188 emit m_parent->LoadChannels(serial);
189}
190
191void Commands::FirstChannel(const QString & serial)
192{
193 emit m_parent->FirstChannel(serial);
194}
195
196void Commands::NextChannel(const QString & serial)
197{
198 emit m_parent->NextChannel(serial);
199}
200
202{
203 emit m_parent->Cleanup();
204}
205
206bool Commands::SendStatus(const QString & command,
207 const QString & status,
208 const QString & serial,
209 const QString & response)
210{
211 QJsonObject query;
212 if (!serial.isEmpty())
213 query["serial"] = serial;
214 query["command"] = command;
215 query["status"] = status;
216 if (!response.isEmpty())
217 query["message"] = response;
218
219 QByteArray msgbuf = QJsonDocument(query).toJson(QJsonDocument::Compact);
220 int len = write(2, msgbuf.constData(), msgbuf.size());
221 len += write(2, "\n", 1);
222
223 if (len != msgbuf.size() + 1)
224 {
225 LOG(VB_RECORD, LOG_ERR, LOC +
226 QString("%1: Only wrote %2 of %3 bytes of message '%4'.")
227 .arg(command).arg(len).arg(msgbuf.size()).arg(QString(msgbuf)));
228 return false;
229 }
230
231 if (!command.isEmpty())
232 {
233 if (command + response + status == m_prevStatus)
234 {
235 if (++m_repCmdCnt % 25 == 0)
236 {
237 LOG(VB_RECORD, LOG_DEBUG, LOC +
238 QString("Processing '%1' --> '%2' (Repeated 25 times)")
239 .arg(command, QString(msgbuf)));
240 }
241 }
242 else
243 {
244 if (m_repCmdCnt)
245 {
246 LOG(VB_RECORD, LOG_DEBUG,
247 LOC + QString("Processing '%1' --> '%2' (Repeated %2 times)")
248 .arg(m_prevMsgBuf).arg(m_repCmdCnt % 25));
249 m_repCmdCnt = 0;
250 }
251 LOG(VB_RECORD, LOG_DEBUG, LOC +
252 QString("Processing '%1' --> '%2'")
253 .arg(command, QString(msgbuf)));
254 }
255 m_prevStatus = command + response + status;
256 m_prevMsgBuf = QString(msgbuf);
257 }
258 else
259 {
260 m_prevStatus.clear();
261 m_prevMsgBuf.clear();
262 m_repCmdCnt = 0;
263 }
264
266 return true;
267}
268
269bool Commands::ProcessCommand(const QString & query)
270{
271 LOG(VB_RECORD, LOG_DEBUG, LOC + QString("Processing '%1'").arg(query));
272
273 std::unique_lock<std::mutex> lk1(m_parent->m_msgMutex);
274
275 if (query.startsWith("APIVersion?"))
276 {
277 write(2, "OK:3\n", 5);
278 return true;
279 }
280
281 QJsonParseError parseError {};
282 QJsonDocument doc;
283 QJsonObject jObj;
284 QString cmd;
285 QString serial;
286 QVariantMap elements;
287 QByteArray cmdbuf = query.toUtf8();
288
289 jObj = doc.object();
290 doc = QJsonDocument::fromJson(cmdbuf, &parseError);
291 elements = doc.toVariant().toMap();
292
293 cmd = elements["command"].toString();
294 serial = elements["serial"].toString();
295
296 if (parseError.error != QJsonParseError::NoError)
297 {
298 SendStatus(query, "ERR", serial,
299 QString("ExternalRecorder sent invalid JSON message: %1: %2")
300 .arg(parseError.offset).arg(parseError.errorString()));
301 return false;
302 }
303 if (m_parent->m_fatal)
304 {
305 SendStatus(query, "ERR", serial, m_parent->ErrorString());
306 return false;
307 }
308
309 if (elements["command"] == "APIVersion")
310 {
311 m_apiVersion = elements["value"].toInt();
312 SendStatus(cmd, "OK", serial, QString::number(m_apiVersion));
313 }
314 else if (cmd == "Version?")
315 {
316 SendStatus(cmd, "OK", serial, VERSION);
317 }
318 else if (cmd == "Description?")
319 {
320 if (m_parent->m_desc.trimmed().isEmpty())
321 SendStatus(cmd, "OK", serial, "Not set");
322 else
323 SendStatus(cmd, "OK", serial, m_parent->m_desc.trimmed());
324 }
325 else if (cmd == "HasLock?")
326 {
327 SendStatus(cmd, "OK", serial, m_parent->m_ready ? "Yes" : "No");
328 }
329 else if (cmd == "SignalStrengthPercent?")
330 {
331 SendStatus(cmd, "OK", serial, m_parent->m_ready ? "100" : "20");
332 }
333 else if (cmd == "LockTimeout?")
334 {
335 LockTimeout(serial);
336 }
337 else if (cmd == "HasTuner?")
338 {
339 HasTuner(serial);
340 }
341 else if (cmd == "HasPictureAttributes?")
342 {
343 HasPictureAttributes(serial);
344 }
345 else if (cmd == "SendBytes")
346 {
347 // Used when FlowControl is Polling
348 SendStatus(cmd, "ERR", serial, "Not supported");
349 }
350 else if (cmd == "XON")
351 {
352 // Used when FlowControl is XON/XOFF
354 {
355 SendStatus(cmd, "OK", serial, "Started Streaming");
356 m_parent->m_xon = true;
357 m_parent->m_flowCond.notify_all();
358 }
359 else
360 {
361 SendStatus(cmd, "Warn", serial, "Not Streaming");
362 }
363 }
364 else if (cmd == "XOFF")
365 {
367 {
368 SendStatus(cmd, "OK", serial, "Stopped Streaming");
369 // Used when FlowControl is XON/XOFF
370 m_parent->m_xon = false;
371 m_parent->m_flowCond.notify_all();
372 }
373 else
374 {
375 SendStatus(cmd, "Warn", serial, "Not Streaming");
376 }
377 }
378 else if (cmd == "TuneChannel")
379 {
380 TuneChannel(serial, elements);
381 }
382 else if (cmd == "TuneStatus?")
383 {
384 TuneStatus(serial);
385 }
386 else if (cmd == "LoadChannels")
387 {
388 LoadChannels(serial);
389 }
390 else if (cmd == "FirstChannel")
391 {
392 FirstChannel(serial);
393 }
394 else if (cmd == "NextChannel")
395 {
396 NextChannel(serial);
397 }
398 else if (cmd == "IsOpen?")
399 {
400 std::unique_lock<std::mutex> lk2(m_parent->m_runMutex);
401 if (m_parent->m_ready)
402 SendStatus(cmd, "OK", serial, "Open");
403 else
404 SendStatus(cmd, "WARN", serial, "Not Open yet");
405 }
406 else if (cmd == "CloseRecorder")
407 {
409 StopStreaming(serial, true);
411 SendStatus(cmd, "OK", serial, "Terminating");
412 Cleanup();
413 }
414 else if (cmd == "FlowControl?")
415 {
416 SendStatus(cmd, "OK", serial, "XON/XOFF");
417 }
418 else if (cmd == "BlockSize")
419 {
420 if (!elements.contains("value"))
421 SendStatus(cmd, "ERR", serial, "Missing block size value");
422 else
423 SetBlockSize(serial, elements["value"].toUInt());
424 }
425 else if (cmd == "StartStreaming")
426 {
427 StartStreaming(serial);
428 }
429 else if (cmd == "StopStreaming")
430 {
431 /* This does not close the stream! When Myth is done with
432 * this 'recording' ExternalChannel::EnterPowerSavingMode()
433 * will be called, which invokes CloseRecorder() */
434 StopStreaming(serial, false);
435 }
436 else
437 {
438 SendStatus(cmd, "ERR", serial, QString("Unrecognized command '%1'").arg(query));
439 }
440
441 return true;
442}
443
445{
446 setObjectName("Commands");
447
448 QString cmd;
449
450 std::array<struct pollfd,2> polls {};
451
452 polls[0].fd = 0;
453 polls[0].events = POLLIN | POLLPRI;
454 polls[0].revents = 0;
455
456 QFile input;
457 if (!input.open(stdin, QIODevice::ReadOnly))
458 {
459 LOG(VB_RECORD, LOG_ERR, LOC + "Opening of stdin failed");
460 return;
461 }
462 QTextStream qtin(&input);
463
464 LOG(VB_RECORD, LOG_INFO, LOC + "Command parser ready.");
465
466 while (m_parent->m_run)
467 {
468 int timeout = 250;
469 int poll_cnt = 1;
470 int ret = poll(polls.data(), poll_cnt, timeout);
471
472 if (polls[0].revents & POLLHUP)
473 {
474 LOG(VB_RECORD, LOG_ERR, LOC + "poll eof (POLLHUP)");
475 break;
476 }
477 if (polls[0].revents & POLLNVAL)
478 {
479 m_parent->Fatal("poll error");
480 return;
481 }
482
483 if (polls[0].revents & POLLIN)
484 {
485 if (ret > 0)
486 {
487 cmd = qtin.readLine();
488 if (!ProcessCommand(cmd))
489 m_parent->Fatal("Invalid command");
490 }
491 else if (ret < 0)
492 {
493 if ((EOVERFLOW == errno))
494 {
495 LOG(VB_RECORD, LOG_ERR, "command overflow");
496 break; // we have an error to handle
497 }
498
499 if ((EAGAIN == errno) || (EINTR == errno))
500 {
501 LOG(VB_RECORD, LOG_ERR, LOC + "retry command read.");
502 continue; // errors that tell you to try again
503 }
504
505 LOG(VB_RECORD, LOG_ERR, LOC + "unknown error reading command.");
506 }
507 }
508 }
509
510 LOG(VB_RECORD, LOG_INFO, LOC + "Command parser: shutting down");
512 m_parent->m_flowCond.notify_all();
513}
514
516 : m_parent(parent)
517{
518 m_heartbeat = std::chrono::system_clock::now();
519}
520
521bool Buffer::Fill(const QByteArray & buffer)
522{
523 if (buffer.size() < 1)
524 return false;
525
526 static int s_dropped = 0;
527 static int s_droppedBytes = 0;
528
529 m_parent->m_flowMutex.lock();
530
531 if (!m_dataSeen)
532 {
533 m_dataSeen = true;
534 emit m_parent->DataStarted();
535 }
536
537 if (m_data.size() < kMaxQueue)
538 {
539 block_t blk(reinterpret_cast<const uint8_t *>(buffer.constData()),
540 reinterpret_cast<const uint8_t *>(buffer.constData())
541 + buffer.size());
542
543 m_data.push(blk);
544 s_dropped = 0;
545
546 LOG(VB_GENERAL, LOG_DEBUG, LOC +
547 QString("Adding %1 bytes").arg(buffer.size()));
548 }
549 else
550 {
551 s_droppedBytes += buffer.size();
552 LOG(VB_RECORD, LOG_WARNING, LOC +
553 QString("Packet queue overrun. Dropped %1 packets, %2 bytes.")
554 .arg(++s_dropped).arg(s_droppedBytes));
555
556 std::this_thread::sleep_for(250us);
557 }
558
559 m_parent->m_flowMutex.unlock();
560 m_parent->m_flowCond.notify_all();
561
562 m_heartbeat = std::chrono::system_clock::now();
563
564 return true;
565}
566
567void Buffer::Run(void)
568{
569 setObjectName("Buffer");
570
571 bool is_empty = false;
572 bool wait = false;
573 auto send_time = std::chrono::system_clock::now() + 5min;
574 uint64_t write_total = 0;
575 uint64_t written = 0;
576 uint64_t write_cnt = 0;
577 uint64_t empty_cnt = 0;
578
579 LOG(VB_RECORD, LOG_INFO, LOC + "Buffer: Ready for data.");
580
581 while (m_parent->m_run)
582 {
583 {
584 std::unique_lock<std::mutex> lk(m_parent->m_flowMutex);
585 m_parent->m_flowCond.wait_for(lk, wait ? 5s : 25ms);
586 wait = false;
587 }
588
589 if (send_time < std::chrono::system_clock::now())
590 {
591 // Every 5 minutes, write out some statistics.
592 send_time = std::chrono::system_clock::now() + 5min;
593 write_total += written;
595 {
596 LOG(VB_RECORD, LOG_NOTICE, LOC +
597 QString("Count: %1, Empty cnt %2, Written %3, Total %4")
598 .arg(write_cnt).arg(empty_cnt)
599 .arg(written).arg(write_total));
600 }
601 else
602 {
603 LOG(VB_GENERAL, LOG_NOTICE, LOC + "Not streaming.");
604 }
605
606 write_cnt = empty_cnt = written = 0;
607 }
608
610 {
611 if (m_parent->m_xon)
612 {
613 block_t pkt;
614 m_parent->m_flowMutex.lock();
615 if (!m_data.empty())
616 {
617 pkt = m_data.front();
618 m_data.pop();
619 is_empty = m_data.empty();
620 }
621 m_parent->m_flowMutex.unlock();
622
623 if (!pkt.empty())
624 {
625 uint sz = write(1, pkt.data(), pkt.size());
626 written += sz;
627 ++write_cnt;
628
629 if (sz != pkt.size())
630 {
631 LOG(VB_GENERAL, LOG_WARNING, LOC +
632 QString("Only wrote %1 of %2 bytes to mythbackend")
633 .arg(sz).arg(pkt.size()));
634 }
635 }
636
637 if (is_empty)
638 {
639 wait = true;
640 ++empty_cnt;
641 }
642 }
643 else
644 {
645 wait = true;
646 }
647 }
648 else
649 {
650 // Clear packet queue
651 m_parent->m_flowMutex.lock();
652 if (!m_data.empty())
653 {
654 stack_t dummy;
655 std::swap(m_data, dummy);
656 }
657 m_parent->m_flowMutex.unlock();
658
659 wait = true;
660 }
661 }
662
663 LOG(VB_RECORD, LOG_INFO, LOC + "Buffer: shutting down");
664 m_parent->m_bufferRunning = false;
665 m_parent->m_flowCond.notify_all();
666}
const QString VERSION
#define LOC
stack_t m_data
std::queue< block_t > stack_t
std::chrono::time_point< std::chrono::system_clock > m_heartbeat
MythExternControl * m_parent
static constexpr uint16_t kMaxQueue
bool m_dataSeen
bool Fill(const QByteArray &buffer)
void Start(void)
std::vector< uint8_t > block_t
Buffer(MythExternControl *parent)
void Run(void)
void Join(void)
size_t m_repCmdCnt
bool ProcessCommand(const QString &query)
void HasPictureAttributes(const QString &serial) const
void StopStreaming(const QString &serial, bool silent)
void Cleanup(void)
void HasTuner(const QString &serial) const
QString m_prevMsgBuf
void TuneStatus(const QString &serial)
void LoadChannels(const QString &serial)
void StartStreaming(const QString &serial)
void TuneChannel(const QString &serial, const QVariantMap &args)
void FirstChannel(const QString &serial)
void Close(void)
void Start(void)
MythExternControl * m_parent
void NextChannel(const QString &serial)
void Join(void)
void SetBlockSize(const QString &serial, int blksz)
void LockTimeout(const QString &serial) const
bool SendStatus(const QString &command, const QString &status, const QString &serial, const QString &response="")
void Run(void)
QString m_prevStatus
void ErrorMessage(const QString &msg)
void TuneStatus(const QString &serial)
void StartStreaming(const QString &serial)
void SetBlockSize(const QString &serial, int blksz)
void HasTuner(const QString &serial)
std::atomic< bool > m_streaming
void Error(const QString &msg)
void HasPictureAttributes(const QString &serial)
QString ErrorString(void) const
std::condition_variable m_runCond
~MythExternControl(void) override
std::atomic< bool > m_xon
void SendMessage(const QString &command, const QString &serial, const QString &message, const QString &status="")
void FirstChannel(const QString &serial)
void DataStarted(void)
void Cleanup(void)
void Fatal(const QString &msg)
std::atomic< bool > m_run
void TuneChannel(const QString &serial, const QVariantMap &args)
void NextChannel(const QString &serial)
std::atomic< bool > m_commandsRunning
void LoadChannels(const QString &serial)
void Streaming(bool val)
void StopStreaming(const QString &serial, bool silent)
std::condition_variable m_flowCond
void LockTimeout(const QString &serial)
std::atomic< bool > m_bufferRunning
void Close(void)
std::atomic< bool > m_ready
unsigned int uint
Definition: compat.h:68
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
Definition: mythlogging.h:39
def write(text, progress=True)
Definition: mythburn.py:307