29 #include <QTextStream>
30 #include <QJsonDocument>
31 #include <QJsonObject>
35 using namespace std::chrono_literals;
45 setObjectName(
"Control");
85 std::this_thread::sleep_for(50us);
93 LOG(VB_RECORD, LOG_CRIT,
LOC +
"Terminated.");
99 LOG(VB_RECORD, LOG_CRIT,
LOC + msg);
116 const QString & serial,
117 const QString & message,
118 const QString & status)
134 #define LOC QString("%1").arg(m_parent->Desc())
206 const QString & status,
207 const QString & serial,
208 const QString & response)
211 if (!serial.isEmpty())
212 query[
"serial"] = serial;
213 query[
"command"] = command;
214 query[
"status"] = status;
215 if (!response.isEmpty())
216 query[
"message"] = response;
218 QByteArray msgbuf = QJsonDocument(query).toJson(QJsonDocument::Compact);
219 int len =
write(2, msgbuf.constData(), msgbuf.size());
220 len +=
write(2,
"\n", 1);
222 if (len != msgbuf.size() + 1)
224 LOG(VB_RECORD, LOG_ERR,
LOC +
225 QString(
"%1: Only wrote %2 of %3 bytes of message '%4'.")
226 .arg(command).arg(len).arg(msgbuf.size()).arg(QString(msgbuf)));
230 if (!command.isEmpty())
236 LOG(VB_RECORD, LOG_DEBUG,
LOC +
237 QString(
"Processing '%1' --> '%2' (Repeated 25 times)")
238 .arg(command, QString(msgbuf)));
245 LOG(VB_RECORD, LOG_DEBUG,
246 LOC + QString(
"Processing '%1' --> '%2' (Repeated %2 times)")
250 LOG(VB_RECORD, LOG_DEBUG,
LOC +
251 QString(
"Processing '%1' --> '%2'")
252 .arg(command, QString(msgbuf)));
270 LOG(VB_RECORD, LOG_DEBUG,
LOC + QString(
"Processing '%1'").arg(query));
274 if (query.startsWith(
"APIVersion?"))
276 write(2,
"OK:3\n", 5);
280 QJsonParseError parseError {};
285 QVariantMap elements;
286 QByteArray cmdbuf = query.toUtf8();
289 doc = QJsonDocument::fromJson(cmdbuf, &parseError);
290 elements = doc.toVariant().toMap();
292 cmd = elements[
"command"].toString();
293 serial = elements[
"serial"].toString();
295 if (parseError.error != QJsonParseError::NoError)
298 QString(
"ExternalRecorder sent invalid JSON message: %1: %2")
299 .arg(parseError.offset).arg(parseError.errorString()));
308 if (elements[
"command"] ==
"APIVersion")
313 else if (cmd ==
"Version?")
317 else if (cmd ==
"Description?")
324 else if (cmd ==
"HasLock?")
328 else if (cmd ==
"SignalStrengthPercent?")
332 else if (cmd ==
"LockTimeout?")
336 else if (cmd ==
"HasTuner?")
340 else if (cmd ==
"HasPictureAttributes?")
344 else if (cmd ==
"SendBytes")
347 SendStatus(cmd,
"ERR", serial,
"Not supported");
349 else if (cmd ==
"XON")
354 SendStatus(cmd,
"OK", serial,
"Started Streaming");
360 SendStatus(cmd,
"Warn", serial,
"Not Streaming");
363 else if (cmd ==
"XOFF")
367 SendStatus(cmd,
"OK", serial,
"Stopped Streaming");
374 SendStatus(cmd,
"Warn", serial,
"Not Streaming");
377 else if (cmd ==
"TuneChannel")
381 else if (cmd ==
"TuneStatus?")
385 else if (cmd ==
"LoadChannels")
389 else if (cmd ==
"FirstChannel")
393 else if (cmd ==
"NextChannel")
397 else if (cmd ==
"IsOpen?")
403 SendStatus(cmd,
"WARN", serial,
"Not Open yet");
405 else if (cmd ==
"CloseRecorder")
413 else if (cmd ==
"FlowControl?")
417 else if (cmd ==
"BlockSize")
419 if (elements.find(
"value") == elements.end())
420 SendStatus(cmd,
"ERR", serial,
"Missing block size value");
424 else if (cmd ==
"StartStreaming")
428 else if (cmd ==
"StopStreaming")
437 SendStatus(cmd,
"ERR", serial, QString(
"Unrecognized command '%1'").arg(query));
445 setObjectName(
"Commands");
449 std::array<struct pollfd,2> polls {};
452 polls[0].events = POLLIN | POLLPRI;
453 polls[0].revents = 0;
456 input.open(stdin, QIODevice::ReadOnly);
457 QTextStream qtin(&input);
459 LOG(VB_RECORD, LOG_INFO,
LOC +
"Command parser ready.");
465 int ret = poll(polls.data(), poll_cnt,
timeout);
467 if (polls[0].revents & POLLHUP)
469 LOG(VB_RECORD, LOG_ERR,
LOC +
"poll eof (POLLHUP)");
472 if (polls[0].revents & POLLNVAL)
478 if (polls[0].revents & POLLIN)
482 cmd = qtin.readLine();
488 if ((EOVERFLOW == errno))
490 LOG(VB_RECORD, LOG_ERR,
"command overflow");
494 if ((EAGAIN == errno) || (EINTR == errno))
496 LOG(VB_RECORD, LOG_ERR,
LOC +
"retry command read.");
500 LOG(VB_RECORD, LOG_ERR,
LOC +
"unknown error reading command.");
505 LOG(VB_RECORD, LOG_INFO,
LOC +
"Command parser: shutting down");
518 if (buffer.size() < 1)
521 static int s_dropped = 0;
522 static int s_droppedBytes = 0;
534 block_t blk(
reinterpret_cast<const uint8_t *
>(buffer.constData()),
535 reinterpret_cast<const uint8_t *
>(buffer.constData())
541 LOG(VB_GENERAL, LOG_DEBUG,
LOC +
542 QString(
"Adding %1 bytes").arg(buffer.size()));
546 s_droppedBytes += buffer.size();
547 LOG(VB_RECORD, LOG_WARNING,
LOC +
548 QString(
"Packet queue overrun. Dropped %1 packets, %2 bytes.")
549 .arg(++s_dropped).arg(s_droppedBytes));
551 std::this_thread::sleep_for(250us);
564 setObjectName(
"Buffer");
566 bool is_empty =
false;
568 auto send_time = std::chrono::system_clock::now() + 5min;
569 uint64_t write_total = 0;
570 uint64_t written = 0;
571 uint64_t write_cnt = 0;
572 uint64_t empty_cnt = 0;
574 LOG(VB_RECORD, LOG_INFO,
LOC +
"Buffer: Ready for data.");
584 if (send_time < std::chrono::system_clock::now())
587 send_time = std::chrono::system_clock::now() + 5min;
588 write_total += written;
591 LOG(VB_RECORD, LOG_NOTICE,
LOC +
592 QString(
"Count: %1, Empty cnt %2, Written %3, Total %4")
593 .arg(write_cnt).arg(empty_cnt)
594 .arg(written).arg(write_total));
598 LOG(VB_GENERAL, LOG_NOTICE,
LOC +
"Not streaming.");
601 write_cnt = empty_cnt = written = 0;
614 is_empty =
m_data.empty();
620 uint sz =
write(1, pkt.data(), pkt.size());
624 if (sz != pkt.size())
626 LOG(VB_GENERAL, LOG_WARNING,
LOC +
627 QString(
"Only wrote %1 of %2 bytes to mythbackend")
628 .arg(sz).arg(pkt.size()));
658 LOG(VB_RECORD, LOG_INFO,
LOC +
"Buffer: shutting down");