31#include <QJsonDocument>
36using namespace std::chrono_literals;
46 setObjectName(
"Control");
86 std::this_thread::sleep_for(50us);
94 LOG(VB_RECORD, LOG_CRIT,
LOC +
"Terminated.");
100 LOG(VB_RECORD, LOG_CRIT,
LOC + msg);
117 const QString & serial,
118 const QString & message,
119 const QString & status)
135#define LOC QString("%1").arg(m_parent->Desc())
207 const QString & status,
208 const QString & serial,
209 const QString & response)
212 if (!serial.isEmpty())
213 query[
"serial"] = serial;
214 query[
"command"] = command;
215 query[
"status"] = status;
216 if (!response.isEmpty())
217 query[
"message"] = response;
219 QByteArray msgbuf = QJsonDocument(query).toJson(QJsonDocument::Compact);
220 int len =
write(2, msgbuf.constData(), msgbuf.size());
221 len +=
write(2,
"\n", 1);
223 if (len != msgbuf.size() + 1)
225 LOG(VB_RECORD, LOG_ERR,
LOC +
226 QString(
"%1: Only wrote %2 of %3 bytes of message '%4'.")
227 .arg(command).arg(len).arg(msgbuf.size()).arg(QString(msgbuf)));
231 if (!command.isEmpty())
237 LOG(VB_RECORD, LOG_DEBUG,
LOC +
238 QString(
"Processing '%1' --> '%2' (Repeated 25 times)")
239 .arg(command, QString(msgbuf)));
246 LOG(VB_RECORD, LOG_DEBUG,
247 LOC + QString(
"Processing '%1' --> '%2' (Repeated %2 times)")
251 LOG(VB_RECORD, LOG_DEBUG,
LOC +
252 QString(
"Processing '%1' --> '%2'")
253 .arg(command, QString(msgbuf)));
271 LOG(VB_RECORD, LOG_DEBUG,
LOC + QString(
"Processing '%1'").arg(query));
275 if (query.startsWith(
"APIVersion?"))
277 write(2,
"OK:3\n", 5);
281 QJsonParseError parseError {};
286 QVariantMap elements;
287 QByteArray cmdbuf = query.toUtf8();
290 doc = QJsonDocument::fromJson(cmdbuf, &parseError);
291 elements = doc.toVariant().toMap();
293 cmd = elements[
"command"].toString();
294 serial = elements[
"serial"].toString();
296 if (parseError.error != QJsonParseError::NoError)
299 QString(
"ExternalRecorder sent invalid JSON message: %1: %2")
300 .arg(parseError.offset).arg(parseError.errorString()));
309 if (elements[
"command"] ==
"APIVersion")
314 else if (cmd ==
"Version?")
318 else if (cmd ==
"Description?")
325 else if (cmd ==
"HasLock?")
329 else if (cmd ==
"SignalStrengthPercent?")
333 else if (cmd ==
"LockTimeout?")
337 else if (cmd ==
"HasTuner?")
341 else if (cmd ==
"HasPictureAttributes?")
345 else if (cmd ==
"SendBytes")
348 SendStatus(cmd,
"ERR", serial,
"Not supported");
350 else if (cmd ==
"XON")
355 SendStatus(cmd,
"OK", serial,
"Started Streaming");
361 SendStatus(cmd,
"Warn", serial,
"Not Streaming");
364 else if (cmd ==
"XOFF")
368 SendStatus(cmd,
"OK", serial,
"Stopped Streaming");
375 SendStatus(cmd,
"Warn", serial,
"Not Streaming");
378 else if (cmd ==
"TuneChannel")
382 else if (cmd ==
"TuneStatus?")
386 else if (cmd ==
"LoadChannels")
390 else if (cmd ==
"FirstChannel")
394 else if (cmd ==
"NextChannel")
398 else if (cmd ==
"IsOpen?")
404 SendStatus(cmd,
"WARN", serial,
"Not Open yet");
406 else if (cmd ==
"CloseRecorder")
414 else if (cmd ==
"FlowControl?")
418 else if (cmd ==
"BlockSize")
420 if (!elements.contains(
"value"))
421 SendStatus(cmd,
"ERR", serial,
"Missing block size value");
425 else if (cmd ==
"StartStreaming")
429 else if (cmd ==
"StopStreaming")
438 SendStatus(cmd,
"ERR", serial, QString(
"Unrecognized command '%1'").arg(query));
446 setObjectName(
"Commands");
450 std::array<struct pollfd,2> polls {};
453 polls[0].events = POLLIN | POLLPRI;
454 polls[0].revents = 0;
457 if (!input.open(stdin, QIODevice::ReadOnly))
459 LOG(VB_RECORD, LOG_ERR,
LOC +
"Opening of stdin failed");
462 QTextStream qtin(&input);
464 LOG(VB_RECORD, LOG_INFO,
LOC +
"Command parser ready.");
470 int ret = poll(polls.data(), poll_cnt,
timeout);
472 if (polls[0].revents & POLLHUP)
474 LOG(VB_RECORD, LOG_ERR,
LOC +
"poll eof (POLLHUP)");
477 if (polls[0].revents & POLLNVAL)
483 if (polls[0].revents & POLLIN)
487 cmd = qtin.readLine();
493 if ((EOVERFLOW == errno))
495 LOG(VB_RECORD, LOG_ERR,
"command overflow");
499 if ((EAGAIN == errno) || (EINTR == errno))
501 LOG(VB_RECORD, LOG_ERR,
LOC +
"retry command read.");
505 LOG(VB_RECORD, LOG_ERR,
LOC +
"unknown error reading command.");
510 LOG(VB_RECORD, LOG_INFO,
LOC +
"Command parser: shutting down");
523 if (buffer.size() < 1)
526 static int s_dropped = 0;
527 static int s_droppedBytes = 0;
539 block_t blk(
reinterpret_cast<const uint8_t *
>(buffer.constData()),
540 reinterpret_cast<const uint8_t *
>(buffer.constData())
546 LOG(VB_GENERAL, LOG_DEBUG,
LOC +
547 QString(
"Adding %1 bytes").arg(buffer.size()));
551 s_droppedBytes += buffer.size();
552 LOG(VB_RECORD, LOG_WARNING,
LOC +
553 QString(
"Packet queue overrun. Dropped %1 packets, %2 bytes.")
554 .arg(++s_dropped).arg(s_droppedBytes));
556 std::this_thread::sleep_for(250us);
569 setObjectName(
"Buffer");
571 bool is_empty =
false;
573 auto send_time = std::chrono::system_clock::now() + 5min;
574 uint64_t write_total = 0;
575 uint64_t written = 0;
576 uint64_t write_cnt = 0;
577 uint64_t empty_cnt = 0;
579 LOG(VB_RECORD, LOG_INFO,
LOC +
"Buffer: Ready for data.");
589 if (send_time < std::chrono::system_clock::now())
592 send_time = std::chrono::system_clock::now() + 5min;
593 write_total += written;
596 LOG(VB_RECORD, LOG_NOTICE,
LOC +
597 QString(
"Count: %1, Empty cnt %2, Written %3, Total %4")
598 .arg(write_cnt).arg(empty_cnt)
599 .arg(written).arg(write_total));
603 LOG(VB_GENERAL, LOG_NOTICE,
LOC +
"Not streaming.");
606 write_cnt = empty_cnt = written = 0;
619 is_empty =
m_data.empty();
625 uint sz =
write(1, pkt.data(), pkt.size());
629 if (sz != pkt.size())
631 LOG(VB_GENERAL, LOG_WARNING,
LOC +
632 QString(
"Only wrote %1 of %2 bytes to mythbackend")
633 .arg(sz).arg(pkt.size()));
663 LOG(VB_RECORD, LOG_INFO,
LOC +
"Buffer: shutting down");
std::queue< block_t > stack_t
std::chrono::time_point< std::chrono::system_clock > m_heartbeat
MythExternControl * m_parent
static constexpr uint16_t kMaxQueue
bool Fill(const QByteArray &buffer)
std::vector< uint8_t > block_t
Buffer(MythExternControl *parent)
bool ProcessCommand(const QString &query)
void HasPictureAttributes(const QString &serial) const
void StopStreaming(const QString &serial, bool silent)
void HasTuner(const QString &serial) const
void TuneStatus(const QString &serial)
void LoadChannels(const QString &serial)
void StartStreaming(const QString &serial)
void TuneChannel(const QString &serial, const QVariantMap &args)
void FirstChannel(const QString &serial)
MythExternControl * m_parent
void NextChannel(const QString &serial)
void SetBlockSize(const QString &serial, int blksz)
void LockTimeout(const QString &serial) const
bool SendStatus(const QString &command, const QString &status, const QString &serial, const QString &response="")
void ErrorMessage(const QString &msg)
void TuneStatus(const QString &serial)
void StartStreaming(const QString &serial)
void SetBlockSize(const QString &serial, int blksz)
void HasTuner(const QString &serial)
std::atomic< bool > m_streaming
void Error(const QString &msg)
void HasPictureAttributes(const QString &serial)
QString ErrorString(void) const
std::condition_variable m_runCond
~MythExternControl(void) override
std::atomic< bool > m_xon
void SendMessage(const QString &command, const QString &serial, const QString &message, const QString &status="")
void FirstChannel(const QString &serial)
void Fatal(const QString &msg)
std::atomic< bool > m_run
void TuneChannel(const QString &serial, const QVariantMap &args)
void NextChannel(const QString &serial)
std::atomic< bool > m_commandsRunning
void LoadChannels(const QString &serial)
void StopStreaming(const QString &serial, bool silent)
std::condition_variable m_flowCond
void LockTimeout(const QString &serial)
std::atomic< bool > m_bufferRunning
std::atomic< bool > m_ready
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
def write(text, progress=True)