MythTV master
jobqueue.cpp
Go to the documentation of this file.
1
2#include <unistd.h>
3#include <sys/types.h>
4#include <sys/stat.h>
5#include <iostream>
6#include <cstdlib>
7#include <fcntl.h>
8#include <pthread.h>
9
10#include <QDateTime>
11#include <QFileInfo>
12#include <QEvent>
13#include <QCoreApplication>
14#include <QTimeZone>
15
16#include "libmythbase/compat.h"
18#include "libmythbase/mthread.h"
20#include "libmythbase/mythconfig.h"
23#include "libmythbase/mythdb.h"
29
30#include "jobqueue.h"
31#include "previewgenerator.h"
32#include "recordinginfo.h"
33#include "recordingprofile.h"
34
35#define LOC QString("JobQueue: ")
36
37// Consider anything less than 4 hours as a "recent" job.
38static constexpr int64_t kRecentInterval {4LL * 60 * 60};
39
40JobQueue::JobQueue(bool master) :
41 m_hostname(gCoreContext->GetHostName()),
42 m_runningJobsLock(new QRecursiveMutex()),
43 m_isMaster(master),
44 m_queueThread(new MThread("JobQueue", this))
45{
46 m_jobQueueCPU = gCoreContext->GetNumSetting("JobQueueCPU", 0);
47
48#if !CONFIG_VALGRIND
49 QMutexLocker locker(&m_queueThreadCondLock);
50 //NOLINTNEXTLINE(cppcoreguidelines-prefer-member-initializer)
51 m_processQueue = true;
53#else
54 LOG(VB_GENERAL, LOG_ERR, LOC +
55 "The JobQueue has been disabled because "
56 "you compiled with the --enable-valgrind option.");
57#endif // CONFIG_VALGRIND
58
60}
61
63{
65 m_processQueue = false;
66 m_queueThreadCond.wakeAll();
67 m_queueThreadCondLock.unlock();
68
70 delete m_queueThread;
71 m_queueThread = nullptr;
72
74
75 delete m_runningJobsLock;
76}
77
78void JobQueue::customEvent(QEvent *e)
79{
80 if (e->type() == MythEvent::kMythEventMessage)
81 {
82 auto *me = dynamic_cast<MythEvent *>(e);
83 if (me == nullptr)
84 return;
85 QString message = me->Message();
86
87 if (message.startsWith("LOCAL_JOB"))
88 {
89 // LOCAL_JOB action ID jobID
90 // LOCAL_JOB action type chanid recstartts hostname
91 QString msg;
92 message = message.simplified();
93 QStringList tokens = message.split(" ", Qt::SkipEmptyParts);
94 const QString& action = tokens[1];
95 int jobID = -1;
96
97 if (tokens[2] == "ID")
98 jobID = tokens[3].toInt();
99 else
100 {
101 jobID = GetJobID(
102 tokens[2].toInt(),
103 tokens[3].toUInt(),
104 MythDate::fromString(tokens[4]));
105 }
106
107 m_runningJobsLock->lock();
108 if (!m_runningJobs.contains(jobID))
109 {
110 msg = QString("Unable to determine jobID for message: "
111 "%1. Program will not be flagged.")
112 .arg(message);
113 LOG(VB_GENERAL, LOG_ERR, LOC + msg);
114 m_runningJobsLock->unlock();
115 return;
116 }
117 m_runningJobsLock->unlock();
118
119 msg = QString("Received message '%1'").arg(message);
120 LOG(VB_JOBQUEUE, LOG_INFO, LOC + msg);
121
122 if ((action == "STOP") ||
123 (action == "PAUSE") ||
124 (action == "RESTART") ||
125 (action == "RESUME" ))
126 {
127 m_runningJobsLock->lock();
128
129 if (action == "STOP")
131 else if (action == "PAUSE")
133 else if (action == "RESUME")
135 else if (action == "RESTART")
137
138 m_runningJobsLock->unlock();
139 }
140 }
141 }
142}
143
145{
147 m_queueThreadCond.wakeAll();
148 m_queueThreadCondLock.unlock();
149
150 RecoverQueue();
151
153 m_queueThreadCond.wait(&m_queueThreadCondLock, 10 * 1000UL);
154 m_queueThreadCondLock.unlock();
155
156 ProcessQueue();
157}
158
160{
161 LOG(VB_JOBQUEUE, LOG_INFO, LOC + "ProcessQueue() started");
162
163 QString logInfo;
164 //int flags;
165 QString hostname;
166
167 QMap<int, int> jobStatus;
168 QString message;
169 QMap<int, JobQueueEntry> jobs;
170 bool atMax = false;
171 QMap<int, RunningJobInfo>::Iterator rjiter;
172
173 QMutexLocker locker(&m_queueThreadCondLock);
174 while (m_processQueue)
175 {
176 locker.unlock();
177
178 bool startedJobAlready = false;
179 auto sleepTime = gCoreContext->GetDurSetting<std::chrono::seconds>("JobQueueCheckFrequency", 30s);
180 int maxJobs = gCoreContext->GetNumSetting("JobQueueMaxSimultaneousJobs", 3);
181 LOG(VB_JOBQUEUE, LOG_INFO, LOC +
182 QString("Currently set to run up to %1 job(s) max.")
183 .arg(maxJobs));
184
185 jobStatus.clear();
186
187 m_runningJobsLock->lock();
188 for (rjiter = m_runningJobs.begin(); rjiter != m_runningJobs.end();
189 ++rjiter)
190 {
191 if ((*rjiter).pginfo)
192 (*rjiter).pginfo->UpdateInUseMark();
193 }
194 m_runningJobsLock->unlock();
195
196 m_jobsRunning = 0;
197 GetJobsInQueue(jobs);
198
199 if (!jobs.empty())
200 {
201 bool inTimeWindow = InJobRunWindow();
202 for (const auto & job : std::as_const(jobs))
203 {
204 int status = job.status;
205 hostname = job.hostname;
206
207 if (((status == JOB_RUNNING) ||
208 (status == JOB_STARTING) ||
209 (status == JOB_PAUSED)) &&
210 (hostname == m_hostname))
212 }
213
214 message = QString("Currently Running %1 jobs.")
215 .arg(m_jobsRunning);
216 if (!inTimeWindow)
217 {
218 message += QString(" Jobs in Queue, but we are outside of the "
219 "Job Queue time window, no new jobs can be "
220 "started.");
221 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
222 }
223 else if (m_jobsRunning >= maxJobs)
224 {
225 message += " (At Maximum, no new jobs can be started until "
226 "a running job completes)";
227
228 if (!atMax)
229 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
230
231 atMax = true;
232 }
233 else
234 {
235 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
236 atMax = false;
237 }
238
239
240 for ( int x = 0;
241 (x < jobs.size()) && (m_jobsRunning < maxJobs); x++)
242 {
243 int jobID = jobs[x].id;
244 int cmds = jobs[x].cmds;
245 //flags = jobs[x].flags;
246 int status = jobs[x].status;
247 hostname = jobs[x].hostname;
248
249 if (!jobs[x].chanid)
250 logInfo = QString("jobID #%1").arg(jobID);
251 else
252 logInfo = QString("chanid %1 @ %2").arg(jobs[x].chanid)
253 .arg(jobs[x].startts);
254
255 // Should we even be looking at this job?
256 if ((inTimeWindow) &&
257 (!hostname.isEmpty()) &&
258 (hostname != m_hostname))
259 {
260 // Setting the status here will prevent us from processing
261 // any other jobs for this recording until this one is
262 // completed on the remote host.
263 jobStatus[jobID] = status;
264
265 message = QString("Skipping '%1' job for %2, "
266 "should run on '%3' instead")
267 .arg(JobText(jobs[x].type), logInfo,
268 hostname);
269 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
270 continue;
271 }
272
273 // Check to see if there was a previous job that is not done
274 if (inTimeWindow)
275 {
276 int otherJobID = GetRunningJobID(jobs[x].chanid,
277 jobs[x].recstartts);
278 if (otherJobID && (jobStatus.contains(otherJobID)) &&
279 (!(jobStatus[otherJobID] & JOB_DONE)))
280 {
281 message =
282 QString("Skipping '%1' job for %2, "
283 "Job ID %3 is already running for "
284 "this recording with a status of '%4'")
285 .arg(JobText(jobs[x].type), logInfo,
286 QString::number(otherJobID),
287 StatusText(jobStatus[otherJobID]));
288 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
289 continue;
290 }
291 }
292
293 jobStatus[jobID] = status;
294
295 // Are we allowed to run this job?
296 if ((inTimeWindow) && (!AllowedToRun(jobs[x])))
297 {
298 message = QString("Skipping '%1' job for %2, "
299 "not allowed to run on this backend.")
300 .arg(JobText(jobs[x].type), logInfo);
301 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
302 continue;
303 }
304
305 // Is this job scheduled for the future
306 if (jobs[x].schedruntime > MythDate::current())
307 {
308 message = QString("Skipping '%1' job for %2, this job is "
309 "not scheduled to run until %3.")
310 .arg(JobText(jobs[x].type), logInfo,
311 jobs[x].schedruntime
313 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
314 continue;
315 }
316
317 if (cmds & JOB_STOP)
318 {
319 // if we're trying to stop a job and it's not queued
320 // then lets send a STOP command
321 if (status != JOB_QUEUED) {
322 message = QString("Stopping '%1' job for %2")
323 .arg(JobText(jobs[x].type), logInfo);
324 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
325
326 m_runningJobsLock->lock();
327 if (m_runningJobs.contains(jobID))
329 m_runningJobsLock->unlock();
330
331 // ChangeJobCmds(m_db, jobID, JOB_RUN);
332 continue;
333
334 // if we're trying to stop a job and it's still queued
335 // then let's just change the status to cancelled so
336 // we don't try to run it from the queue
337 }
338
339 message = QString("Cancelling '%1' job for %2")
340 .arg(JobText(jobs[x].type), logInfo);
341 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
342
343 // at the bottom of this loop we requeue any jobs that
344 // are not currently queued and also not associated
345 // with a hostname so we must claim this job before we
346 // can cancel it
348 {
349 message = QString("Unable to claim '%1' job for %2")
350 .arg(JobText(jobs[x].type), logInfo);
351 LOG(VB_JOBQUEUE, LOG_ERR, LOC + message);
352 continue;
353 }
354
355 ChangeJobStatus(jobID, JOB_CANCELLED, "");
357 continue;
358 }
359
360 if ((cmds & JOB_PAUSE) && (status != JOB_QUEUED))
361 {
362 message = QString("Pausing '%1' job for %2")
363 .arg(JobText(jobs[x].type), logInfo);
364 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
365
366 m_runningJobsLock->lock();
367 if (m_runningJobs.contains(jobID))
369 m_runningJobsLock->unlock();
370
372 continue;
373 }
374
375 if ((cmds & JOB_RESTART) && (status != JOB_QUEUED))
376 {
377 message = QString("Restart '%1' job for %2")
378 .arg(JobText(jobs[x].type), logInfo);
379 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
380
381 m_runningJobsLock->lock();
382 if (m_runningJobs.contains(jobID))
384 m_runningJobsLock->unlock();
385
387 continue;
388 }
389
390 if (status != JOB_QUEUED)
391 {
392
393 if (hostname.isEmpty())
394 {
395 message = QString("Resetting '%1' job for %2 to %3 "
396 "status, because no hostname is set.")
397 .arg(JobText(jobs[x].type),
398 logInfo,
399 StatusText(JOB_QUEUED));
400 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
401
402 ChangeJobStatus(jobID, JOB_QUEUED, "");
404 }
405 else if (inTimeWindow)
406 {
407 message = QString("Skipping '%1' job for %2, "
408 "current job status is '%3'")
409 .arg(JobText(jobs[x].type),
410 logInfo,
411 StatusText(status));
412 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
413 }
414 continue;
415 }
416
417 // never start or claim more than one job in a single run
418 if (startedJobAlready)
419 continue;
420
421 if ((inTimeWindow) &&
422 (hostname.isEmpty()) &&
424 {
425 message = QString("Unable to claim '%1' job for %2")
426 .arg(JobText(jobs[x].type), logInfo);
427 LOG(VB_JOBQUEUE, LOG_ERR, LOC + message);
428 continue;
429 }
430
431 if (!inTimeWindow)
432 {
433 message = QString("Skipping '%1' job for %2, "
434 "current time is outside of the "
435 "Job Queue processing window.")
436 .arg(JobText(jobs[x].type), logInfo);
437 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
438 continue;
439 }
440
441 message = QString("Processing '%1' job for %2, "
442 "current status is '%3'")
443 .arg(JobText(jobs[x].type), logInfo,
444 StatusText(status));
445 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
446
447 ProcessJob(jobs[x]);
448
449 startedJobAlready = true;
450 }
451 }
452
453 if (QCoreApplication::applicationName() == MYTH_APPNAME_MYTHJOBQUEUE)
454 {
455 if (m_jobsRunning > 0)
456 {
458 {
460 LOG(VB_JOBQUEUE, LOG_INFO, QString("%1 jobs running. "
461 "Blocking shutdown.").arg(m_jobsRunning));
462 }
463 }
464 else
465 {
467 {
469 LOG(VB_JOBQUEUE, LOG_INFO, "No jobs running. "
470 "Allowing shutdown.");
471 }
472 }
473 }
474
475
476 locker.relock();
477 if (m_processQueue)
478 {
479 std::chrono::milliseconds st = (startedJobAlready) ? 5s : sleepTime;
480 if (st > 0ms)
481 m_queueThreadCond.wait(locker.mutex(), st.count());
482 }
483 }
484}
485
486bool JobQueue::QueueRecordingJobs(const RecordingInfo &recinfo, int jobTypes)
487{
488 if (jobTypes == JOB_NONE)
489 jobTypes = recinfo.GetAutoRunJobs();
490
491 if (recinfo.IsCommercialFree())
492 jobTypes &= (~JOB_COMMFLAG);
493
494 if (jobTypes != JOB_NONE)
495 {
496 QString jobHost = QString("");
497
498 if (gCoreContext->GetBoolSetting("JobsRunOnRecordHost", false))
499 jobHost = recinfo.GetHostname();
500
501 return JobQueue::QueueJobs(
502 jobTypes, recinfo.GetChanID(), recinfo.GetRecordingStartTime(),
503 "", "", jobHost);
504 }
505 return false;
506}
507
508bool JobQueue::QueueJob(int jobType, uint chanid, const QDateTime &recstartts,
509 const QString& args, const QString& comment, QString host,
510 int flags, int status, QDateTime schedruntime)
511{
512 int tmpStatus = JOB_UNKNOWN;
513 int tmpCmd = JOB_UNKNOWN;
514 int chanidInt = -1;
515
516 if(!schedruntime.isValid())
517 schedruntime = MythDate::current();
518
520
521 // In order to replace a job, we must have a chanid/recstartts combo
522 if (chanid)
523 {
524 int jobID = -1;
525 query.prepare("SELECT status, id, cmds FROM jobqueue "
526 "WHERE chanid = :CHANID AND starttime = :STARTTIME "
527 "AND type = :JOBTYPE;");
528 query.bindValue(":CHANID", chanid);
529 query.bindValue(":STARTTIME", recstartts);
530 query.bindValue(":JOBTYPE", jobType);
531
532 if (!query.exec())
533 {
534 MythDB::DBError("Error in JobQueue::QueueJob()", query);
535 return false;
536 }
537 if (query.next())
538 {
539 tmpStatus = query.value(0).toInt();
540 jobID = query.value(1).toInt();
541 tmpCmd = query.value(2).toInt();
542 }
543 switch (tmpStatus)
544 {
545 case JOB_UNKNOWN:
546 break;
547 case JOB_STARTING:
548 case JOB_RUNNING:
549 case JOB_PAUSED:
550 case JOB_STOPPING:
551 case JOB_ERRORING:
552 case JOB_ABORTING:
553 return false;
554 default:
556 break;
557 }
558 if (! (tmpStatus & JOB_DONE) && (tmpCmd & JOB_STOP))
559 return false;
560
561 chanidInt = chanid;
562 }
563
564 if (host.isNull())
565 host = QString("");
566
567 query.prepare("INSERT INTO jobqueue (chanid, starttime, inserttime, type, "
568 "status, statustime, schedruntime, hostname, args, comment, "
569 "flags) "
570 "VALUES (:CHANID, :STARTTIME, now(), :JOBTYPE, :STATUS, "
571 "now(), :SCHEDRUNTIME, :HOST, :ARGS, :COMMENT, :FLAGS);");
572
573 query.bindValue(":CHANID", chanidInt);
574 query.bindValue(":STARTTIME", recstartts);
575 query.bindValue(":JOBTYPE", jobType);
576 query.bindValue(":STATUS", status);
577 query.bindValue(":SCHEDRUNTIME", schedruntime);
578 query.bindValue(":HOST", host);
579 query.bindValue(":ARGS", args);
580 query.bindValue(":COMMENT", comment);
581 query.bindValue(":FLAGS", flags);
582
583 if (!query.exec())
584 {
585 MythDB::DBError("Error in JobQueue::StartJob()", query);
586 return false;
587 }
588
589 return true;
590}
591
592bool JobQueue::QueueJobs(int jobTypes, uint chanid, const QDateTime &recstartts,
593 const QString& args, const QString& comment, const QString& host)
594{
595 if (gCoreContext->GetBoolSetting("AutoTranscodeBeforeAutoCommflag", false))
596 {
597 if (jobTypes & JOB_METADATA)
598 QueueJob(JOB_METADATA, chanid, recstartts, args, comment, host);
599 if (jobTypes & JOB_TRANSCODE)
600 QueueJob(JOB_TRANSCODE, chanid, recstartts, args, comment, host);
601 if (jobTypes & JOB_COMMFLAG)
602 QueueJob(JOB_COMMFLAG, chanid, recstartts, args, comment, host);
603 }
604 else
605 {
606 if (jobTypes & JOB_METADATA)
607 QueueJob(JOB_METADATA, chanid, recstartts, args, comment, host);
608 if (jobTypes & JOB_COMMFLAG)
609 QueueJob(JOB_COMMFLAG, chanid, recstartts, args, comment, host);
610 if (jobTypes & JOB_TRANSCODE)
611 {
612 QDateTime schedruntime = MythDate::current();
613
614 int defer = gCoreContext->GetNumSetting("DeferAutoTranscodeDays", 0);
615 if (defer)
616 {
617#if QT_VERSION < QT_VERSION_CHECK(6,5,0)
618 schedruntime = QDateTime(schedruntime.addDays(defer).date(),
619 QTime(0,0,0), Qt::UTC);
620#else
621 schedruntime = QDateTime(schedruntime.addDays(defer).date(),
622 QTime(0,0,0),
623 QTimeZone(QTimeZone::UTC));
624#endif
625 }
626
627 QueueJob(JOB_TRANSCODE, chanid, recstartts, args, comment, host,
628 0, JOB_QUEUED, schedruntime);
629 }
630 }
631
632 if (jobTypes & JOB_USERJOB1)
633 QueueJob(JOB_USERJOB1, chanid, recstartts, args, comment, host);
634 if (jobTypes & JOB_USERJOB2)
635 QueueJob(JOB_USERJOB2, chanid, recstartts, args, comment, host);
636 if (jobTypes & JOB_USERJOB3)
637 QueueJob(JOB_USERJOB3, chanid, recstartts, args, comment, host);
638 if (jobTypes & JOB_USERJOB4)
639 QueueJob(JOB_USERJOB4, chanid, recstartts, args, comment, host);
640
641 return true;
642}
643
644int JobQueue::GetJobID(int jobType, uint chanid, const QDateTime &recstartts)
645{
647
648 query.prepare("SELECT id FROM jobqueue "
649 "WHERE chanid = :CHANID AND starttime = :STARTTIME "
650 "AND type = :JOBTYPE;");
651 query.bindValue(":CHANID", chanid);
652 query.bindValue(":STARTTIME", recstartts);
653 query.bindValue(":JOBTYPE", jobType);
654
655 if (!query.exec())
656 {
657 MythDB::DBError("Error in JobQueue::GetJobID()", query);
658 return -1;
659 }
660 if (query.next())
661 return query.value(0).toInt();
662 return -1;
663}
664
666 int jobID, int &jobType, uint &chanid, QDateTime &recstartts)
667{
669
670 query.prepare("SELECT type, chanid, starttime FROM jobqueue "
671 "WHERE id = :ID;");
672
673 query.bindValue(":ID", jobID);
674
675 if (!query.exec())
676 {
677 MythDB::DBError("Error in JobQueue::GetJobInfoFromID()", query);
678 return false;
679 }
680 if (query.next())
681 {
682 jobType = query.value(0).toInt();
683 chanid = query.value(1).toUInt();
684 recstartts = MythDate::as_utc(query.value(2).toDateTime());
685 return true;
686 }
687 return false;
688}
689
691 int jobID, int &jobType, uint &chanid, QString &recstartts)
692{
693 QDateTime tmpStarttime;
694
695 bool result = JobQueue::GetJobInfoFromID(
696 jobID, jobType, chanid, tmpStarttime);
697
698 if (result)
699 recstartts = MythDate::toString(tmpStarttime, MythDate::kFilename);
700
701 return result;
702}
703
704int JobQueue::GetJobTypeFromName(const QString &name)
705{
706 if (!JobNameToType.contains(name))
707 {
708 LOG(VB_GENERAL, LOG_ERR, QString("'%1' is an invalid Job Name.")
709 .arg(name));
710 return JOB_NONE;
711 }
712 return JobNameToType[name];
713}
714
716{
717 QString message = QString("GLOBAL_JOB PAUSE ID %1").arg(jobID);
718 MythEvent me(message);
720
722}
723
725{
726 QString message = QString("GLOBAL_JOB RESUME ID %1").arg(jobID);
727 MythEvent me(message);
729
731}
732
734{
735 QString message = QString("GLOBAL_JOB RESTART ID %1").arg(jobID);
736 MythEvent me(message);
738
740}
741
743{
744 QString message = QString("GLOBAL_JOB STOP ID %1").arg(jobID);
745 MythEvent me(message);
747
749}
750
751bool JobQueue::DeleteAllJobs(uint chanid, const QDateTime &recstartts)
752{
754 QString message;
755
756 query.prepare("UPDATE jobqueue SET status = :CANCELLED "
757 "WHERE chanid = :CHANID AND starttime = :STARTTIME "
758 "AND status = :QUEUED;");
759
760 query.bindValue(":CANCELLED", JOB_CANCELLED);
761 query.bindValue(":CHANID", chanid);
762 query.bindValue(":STARTTIME", recstartts);
763 query.bindValue(":QUEUED", JOB_QUEUED);
764
765 if (!query.exec())
766 MythDB::DBError("Cancel Pending Jobs", query);
767
768 query.prepare("UPDATE jobqueue SET cmds = :CMD "
769 "WHERE chanid = :CHANID AND starttime = :STARTTIME "
770 "AND status <> :CANCELLED;");
771 query.bindValue(":CMD", JOB_STOP);
772 query.bindValue(":CHANID", chanid);
773 query.bindValue(":STARTTIME", recstartts);
774 query.bindValue(":CANCELLED", JOB_CANCELLED);
775
776 if (!query.exec())
777 {
778 MythDB::DBError("Stop Unfinished Jobs", query);
779 return false;
780 }
781
782 // wait until running job(s) are done
783 bool jobsAreRunning = true;
784 std::chrono::seconds totalSlept = 0s;
785 std::chrono::seconds maxSleep = 90s;
786 while (jobsAreRunning && totalSlept < maxSleep)
787 {
788 usleep(1000);
789 query.prepare("SELECT id FROM jobqueue "
790 "WHERE chanid = :CHANID and starttime = :STARTTIME "
791 "AND status NOT IN "
792 "(:FINISHED,:ABORTED,:ERRORED,:CANCELLED);");
793 query.bindValue(":CHANID", chanid);
794 query.bindValue(":STARTTIME", recstartts);
795 query.bindValue(":FINISHED", JOB_FINISHED);
796 query.bindValue(":ABORTED", JOB_ABORTED);
797 query.bindValue(":ERRORED", JOB_ERRORED);
798 query.bindValue(":CANCELLED", JOB_CANCELLED);
799
800 if (!query.exec())
801 {
802 MythDB::DBError("Stop Unfinished Jobs", query);
803 return false;
804 }
805
806 if (query.size() == 0)
807 {
808 jobsAreRunning = false;
809 continue;
810 }
811 if ((totalSlept % 5s) == 0s)
812 {
813 message = QString("Waiting on %1 jobs still running for "
814 "chanid %2 @ %3").arg(query.size())
815 .arg(chanid).arg(recstartts.toString(Qt::ISODate));
816 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
817 }
818
819 sleep(1);
820 totalSlept++;
821 }
822
823 if (totalSlept <= maxSleep)
824 {
825 query.prepare("DELETE FROM jobqueue "
826 "WHERE chanid = :CHANID AND starttime = :STARTTIME;");
827 query.bindValue(":CHANID", chanid);
828 query.bindValue(":STARTTIME", recstartts);
829
830 if (!query.exec())
831 MythDB::DBError("Delete All Jobs", query);
832 }
833 else
834 {
835 query.prepare("SELECT id, type, status, comment FROM jobqueue "
836 "WHERE chanid = :CHANID AND starttime = :STARTTIME "
837 "AND status <> :CANCELLED ORDER BY id;");
838
839 query.bindValue(":CHANID", chanid);
840 query.bindValue(":STARTTIME", recstartts);
841 query.bindValue(":CANCELLED", JOB_CANCELLED);
842
843 if (!query.exec())
844 {
845 MythDB::DBError("Error in JobQueue::DeleteAllJobs(), Unable "
846 "to query list of Jobs left in Queue.", query);
847 return false;
848 }
849
850 LOG(VB_GENERAL, LOG_ERR, LOC +
851 QString( "In DeleteAllJobs: There are Jobs "
852 "left in the JobQueue that are still running for "
853 "chanid %1 @ %2.").arg(chanid)
854 .arg(recstartts.toString(Qt::ISODate)));
855
856 while (query.next())
857 {
858 LOG(VB_GENERAL, LOG_ERR, LOC +
859 QString("Job ID %1: '%2' with status '%3' and comment '%4'")
860 .arg(query.value(0).toString(),
861 JobText(query.value(1).toInt()),
862 StatusText(query.value(2).toInt()),
863 query.value(3).toString()));
864 }
865
866 return false;
867 }
868
869 return true;
870}
871
873{
874 return JobQueue::SafeDeleteJob(jobID, 0, 0, QDateTime());
875}
876
877bool JobQueue::SafeDeleteJob(int jobID, int jobType, int chanid,
878 const QDateTime& recstartts)
879{
880 if (jobID < 0)
881 return false;
882
883 if (chanid)
884 {
885
886 int thisJob = GetJobID(jobType, chanid, recstartts);
887 QString msg;
888
889 if( thisJob != jobID)
890 {
891 msg = QString("JobType, chanid and starttime don't match jobID %1");
892 LOG(VB_JOBQUEUE, LOG_ERR, LOC + msg.arg(jobID));
893 return false;
894 }
895
896 if (JobQueue::IsJobRunning(jobType, chanid, recstartts))
897 {
898 msg = QString("Can't remove running JobID %1");
899 LOG(VB_GENERAL, LOG_ERR, LOC + msg.arg(jobID));
900 return false;
901 }
902 }
903
905
906 query.prepare("DELETE FROM jobqueue WHERE id = :ID;");
907
908 query.bindValue(":ID", jobID);
909
910 if (!query.exec())
911 {
912 MythDB::DBError("Error in JobQueue::SafeDeleteJob()", query);
913 return false;
914 }
915
916 return true;
917}
918
919bool JobQueue::ChangeJobCmds(int jobID, int newCmds)
920{
921 if (jobID < 0)
922 return false;
923
925
926 query.prepare("UPDATE jobqueue SET cmds = :CMDS WHERE id = :ID;");
927
928 query.bindValue(":CMDS", newCmds);
929 query.bindValue(":ID", jobID);
930
931 if (!query.exec())
932 {
933 MythDB::DBError("Error in JobQueue::ChangeJobCmds()", query);
934 return false;
935 }
936
937 return true;
938}
939
940bool JobQueue::ChangeJobCmds(int jobType, uint chanid,
941 const QDateTime &recstartts, int newCmds)
942{
944
945 query.prepare("UPDATE jobqueue SET cmds = :CMDS WHERE type = :TYPE "
946 "AND chanid = :CHANID AND starttime = :STARTTIME;");
947
948 query.bindValue(":CMDS", newCmds);
949 query.bindValue(":TYPE", jobType);
950 query.bindValue(":CHANID", chanid);
951 query.bindValue(":STARTTIME", recstartts);
952
953 if (!query.exec())
954 {
955 MythDB::DBError("Error in JobQueue::ChangeJobCmds()", query);
956 return false;
957 }
958
959 return true;
960}
961
962bool JobQueue::ChangeJobFlags(int jobID, int newFlags)
963{
964 if (jobID < 0)
965 return false;
966
968
969 query.prepare("UPDATE jobqueue SET flags = :FLAGS WHERE id = :ID;");
970
971 query.bindValue(":FLAGS", newFlags);
972 query.bindValue(":ID", jobID);
973
974 if (!query.exec())
975 {
976 MythDB::DBError("Error in JobQueue::ChangeJobFlags()", query);
977 return false;
978 }
979
980 return true;
981}
982
983bool JobQueue::ChangeJobStatus(int jobID, int newStatus, const QString& comment)
984{
985 if (jobID < 0)
986 return false;
987
988 LOG(VB_JOBQUEUE, LOG_INFO, LOC + QString("ChangeJobStatus(%1, %2, '%3')")
989 .arg(jobID).arg(StatusText(newStatus), comment));
990
992
993 query.prepare("UPDATE jobqueue SET status = :STATUS, comment = :COMMENT "
994 "WHERE id = :ID AND status <> :NEWSTATUS;");
995
996 query.bindValue(":STATUS", newStatus);
997 query.bindValue(":COMMENT", comment);
998 query.bindValue(":ID", jobID);
999 query.bindValue(":NEWSTATUS", newStatus);
1000
1001 if (!query.exec())
1002 {
1003 MythDB::DBError("Error in JobQueue::ChangeJobStatus()", query);
1004 return false;
1005 }
1006
1007 return true;
1008}
1009
1010bool JobQueue::ChangeJobComment(int jobID, const QString& comment)
1011{
1012 if (jobID < 0)
1013 return false;
1014
1015 LOG(VB_JOBQUEUE, LOG_INFO, LOC + QString("ChangeJobComment(%1, '%2')")
1016 .arg(jobID).arg(comment));
1017
1019
1020 query.prepare("UPDATE jobqueue SET comment = :COMMENT "
1021 "WHERE id = :ID;");
1022
1023 query.bindValue(":COMMENT", comment);
1024 query.bindValue(":ID", jobID);
1025
1026 if (!query.exec())
1027 {
1028 MythDB::DBError("Error in JobQueue::ChangeJobComment()", query);
1029 return false;
1030 }
1031
1032 return true;
1033}
1034
1035bool JobQueue::ChangeJobArgs(int jobID, const QString& args)
1036{
1037 if (jobID < 0)
1038 return false;
1039
1041
1042 query.prepare("UPDATE jobqueue SET args = :ARGS "
1043 "WHERE id = :ID;");
1044
1045 query.bindValue(":ARGS", args);
1046 query.bindValue(":ID", jobID);
1047
1048 if (!query.exec())
1049 {
1050 MythDB::DBError("Error in JobQueue::ChangeJobArgs()", query);
1051 return false;
1052 }
1053
1054 return true;
1055}
1056
1057int JobQueue::GetRunningJobID(uint chanid, const QDateTime &recstartts)
1058{
1059 m_runningJobsLock->lock();
1060 for (const auto& jInfo : std::as_const(m_runningJobs))
1061 {
1062 if ((jInfo.pginfo->GetChanID() == chanid) &&
1063 (jInfo.pginfo->GetRecordingStartTime() == recstartts))
1064 {
1065 m_runningJobsLock->unlock();
1066
1067 return jInfo.id;
1068 }
1069 }
1070 m_runningJobsLock->unlock();
1071
1072 return 0;
1073}
1074
1076{
1077 return (status == JOB_QUEUED);
1078}
1079
1081{
1082 return ((status != JOB_UNKNOWN) && (status != JOB_QUEUED) &&
1083 ((status & JOB_DONE) == 0));
1084}
1085
1086bool JobQueue::IsJobRunning(int jobType,
1087 uint chanid, const QDateTime &recstartts)
1088{
1089 return IsJobStatusRunning(GetJobStatus(jobType, chanid, recstartts));
1090}
1091
1092bool JobQueue::IsJobRunning(int jobType, const ProgramInfo &pginfo)
1093{
1095 jobType, pginfo.GetChanID(), pginfo.GetRecordingStartTime());
1096}
1097
1099 int jobType, uint chanid, const QDateTime &recstartts)
1100{
1101 int tmpStatus = GetJobStatus(jobType, chanid, recstartts);
1102
1103 return (tmpStatus != JOB_UNKNOWN) && ((tmpStatus & JOB_DONE) == 0);
1104}
1105
1107 int jobType, uint chanid, const QDateTime &recstartts)
1108{
1109 return IsJobStatusQueued(GetJobStatus(jobType, chanid, recstartts));
1110}
1111
1112QString JobQueue::JobText(int jobType)
1113{
1114 switch (jobType)
1115 {
1116 case JOB_TRANSCODE: return tr("Transcode");
1117 case JOB_COMMFLAG: return tr("Flag Commercials");
1118 case JOB_METADATA: return tr("Look up Metadata");
1119 case JOB_PREVIEW: return tr("Preview Generation");
1120 }
1121
1122 if (jobType & JOB_USERJOB)
1123 {
1124 QString settingName =
1125 QString("UserJobDesc%1").arg(UserJobTypeToIndex(jobType));
1126 return gCoreContext->GetSetting(settingName, settingName);
1127 }
1128
1129 return tr("Unknown Job");
1130}
1131
1132// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
1133#define JOBSTATUS_STATUSTEXT(A,B,C) case A: return C;
1134
1135QString JobQueue::StatusText(int status)
1136{
1137 switch (status)
1138 {
1140 default: break;
1141 }
1142 return tr("Undefined");
1143}
1144
1145bool JobQueue::InJobRunWindow(std::chrono::minutes orStartsWithinMins)
1146{
1147 QString queueStartTimeStr;
1148 QString queueEndTimeStr;
1149 QTime queueStartTime;
1150 QTime queueEndTime;
1151 QTime curTime = QTime::currentTime();
1152 bool inTimeWindow = false;
1153 orStartsWithinMins = orStartsWithinMins < 0min ? 0min : orStartsWithinMins;
1154
1155 queueStartTimeStr = gCoreContext->GetSetting("JobQueueWindowStart", "00:00");
1156 queueEndTimeStr = gCoreContext->GetSetting("JobQueueWindowEnd", "23:59");
1157
1158 queueStartTime = QTime::fromString(queueStartTimeStr, "hh:mm");
1159 if (!queueStartTime.isValid())
1160 {
1161 LOG(VB_GENERAL, LOG_ERR,
1162 QString("Invalid JobQueueWindowStart time '%1', using 00:00")
1163 .arg(queueStartTimeStr));
1164 queueStartTime = QTime(0, 0);
1165 }
1166
1167 queueEndTime = QTime::fromString(queueEndTimeStr, "hh:mm");
1168 if (!queueEndTime.isValid())
1169 {
1170 LOG(VB_GENERAL, LOG_ERR,
1171 QString("Invalid JobQueueWindowEnd time '%1', using 23:59")
1172 .arg(queueEndTimeStr));
1173 queueEndTime = QTime(23, 59);
1174 }
1175
1176 LOG(VB_JOBQUEUE, LOG_INFO, LOC +
1177 QString("Currently set to run new jobs from %1 to %2")
1178 .arg(queueStartTimeStr, queueEndTimeStr));
1179
1180 if ((queueStartTime <= curTime) && (curTime < queueEndTime))
1181 { // NOLINT(bugprone-branch-clone)
1182 inTimeWindow = true;
1183 }
1184 else if ((queueStartTime > queueEndTime) &&
1185 ((curTime < queueEndTime) || (queueStartTime <= curTime)))
1186 {
1187 inTimeWindow = true;
1188 }
1189 else if (orStartsWithinMins > 0min)
1190 {
1191 // Check if the window starts soon
1192 if (curTime <= queueStartTime)
1193 {
1194 // Start time hasn't passed yet today
1195 if (queueStartTime.secsTo(curTime) <= duration_cast<std::chrono::seconds>(orStartsWithinMins).count())
1196 {
1197 LOG(VB_JOBQUEUE, LOG_INFO, LOC +
1198 QString("Job run window will start within %1 minutes")
1199 .arg(orStartsWithinMins.count()));
1200 inTimeWindow = true;
1201 }
1202 }
1203 else
1204 {
1205 // We passed the start time for today, try tomorrow
1206 QDateTime curDateTime = MythDate::current();
1207#if QT_VERSION < QT_VERSION_CHECK(6,5,0)
1208 QDateTime startDateTime = QDateTime(
1209 curDateTime.date(), queueStartTime, Qt::UTC).addDays(1);
1210#else
1211 QDateTime startDateTime =
1212 QDateTime(curDateTime.date(), queueStartTime,
1213 QTimeZone(QTimeZone::UTC)).addDays(1);
1214#endif
1215
1216 if (curDateTime.secsTo(startDateTime) <= duration_cast<std::chrono::seconds>(orStartsWithinMins).count())
1217 {
1218 LOG(VB_JOBQUEUE, LOG_INFO, LOC +
1219 QString("Job run window will start "
1220 "within %1 minutes (tomorrow)")
1221 .arg(orStartsWithinMins.count()));
1222 inTimeWindow = true;
1223 }
1224 }
1225 }
1226
1227 return inTimeWindow;
1228}
1229
1230bool JobQueue::HasRunningOrPendingJobs(std::chrono::minutes startingWithinMins)
1231{
1232 /* startingWithinMins <= 0 - look for any pending jobs
1233 > 0 - only consider pending starting within this time */
1234 QMap<int, JobQueueEntry> jobs;
1235 QMap<int, JobQueueEntry>::Iterator it;
1236 QDateTime maxSchedRunTime = MythDate::current();
1237 bool checkForQueuedJobs = (startingWithinMins <= 0min
1238 || InJobRunWindow(startingWithinMins));
1239
1240 if (checkForQueuedJobs && startingWithinMins > 0min) {
1241 maxSchedRunTime = maxSchedRunTime.addSecs(duration_cast<std::chrono::seconds>(startingWithinMins).count());
1242 LOG(VB_JOBQUEUE, LOG_INFO, LOC +
1243 QString("HasRunningOrPendingJobs: checking for jobs "
1244 "starting before: %1")
1245 .arg(maxSchedRunTime.toString(Qt::ISODate)));
1246 }
1247
1249
1250 if (!jobs.empty()) {
1251 for (it = jobs.begin(); it != jobs.end(); ++it)
1252 {
1253 int tmpStatus = (*it).status;
1254 if (tmpStatus == JOB_RUNNING) {
1255 LOG(VB_JOBQUEUE, LOG_INFO, LOC +
1256 QString("HasRunningOrPendingJobs: found running job"));
1257 return true;
1258 }
1259
1260 if (checkForQueuedJobs) {
1261 if ((tmpStatus != JOB_UNKNOWN) && (!(tmpStatus & JOB_DONE))) {
1262 if (startingWithinMins <= 0min) {
1263 LOG(VB_JOBQUEUE, LOG_INFO, LOC +
1264 "HasRunningOrPendingJobs: found pending job");
1265 return true;
1266 }
1267 if ((*it).schedruntime <= maxSchedRunTime) {
1268 LOG(VB_JOBQUEUE, LOG_INFO, LOC +
1269 QString("HasRunningOrPendingJobs: found pending "
1270 "job scheduled to start at: %1")
1271 .arg((*it).schedruntime.toString(Qt::ISODate)));
1272 return true;
1273 }
1274 }
1275 }
1276 }
1277 }
1278 return false;
1279}
1280
1281
1282int JobQueue::GetJobsInQueue(QMap<int, JobQueueEntry> &jobs, int findJobs)
1283{
1284 JobQueueEntry thisJob;
1286 QDateTime recentDate = MythDate::current().addSecs(-kRecentInterval);
1287 QString logInfo;
1288 int jobCount = 0;
1289 bool commflagWhileRecording =
1290 gCoreContext->GetBoolSetting("AutoCommflagWhileRecording", false);
1291
1292 jobs.clear();
1293
1294 query.prepare("SELECT j.id, j.chanid, j.starttime, j.inserttime, j.type, "
1295 "j.cmds, j.flags, j.status, j.statustime, j.hostname, "
1296 "j.args, j.comment, r.endtime, j.schedruntime "
1297 "FROM jobqueue j "
1298 "LEFT JOIN recorded r "
1299 " ON j.chanid = r.chanid AND j.starttime = r.starttime "
1300 "ORDER BY j.schedruntime, j.id;");
1301
1302 if (!query.exec())
1303 {
1304 MythDB::DBError("Error in JobQueue::GetJobs(), Unable to "
1305 "query list of Jobs in Queue.", query);
1306 return 0;
1307 }
1308
1309 LOG(VB_JOBQUEUE, LOG_INFO, LOC +
1310 QString("GetJobsInQueue: findJobs search bitmask %1, "
1311 "found %2 total jobs")
1312 .arg(findJobs).arg(query.size()));
1313
1314 while (query.next())
1315 {
1316 bool wantThisJob = false;
1317
1318 thisJob.id = query.value(0).toInt();
1319 thisJob.recstartts = MythDate::as_utc(query.value(2).toDateTime());
1320 thisJob.schedruntime = MythDate::as_utc(query.value(13).toDateTime());
1321 thisJob.type = query.value(4).toInt();
1322 thisJob.status = query.value(7).toInt();
1323 thisJob.statustime = MythDate::as_utc(query.value(8).toDateTime());
1324 thisJob.startts = MythDate::toString(
1326
1327 // -1 indicates the chanid is empty
1328 if (query.value(1).toInt() == -1)
1329 {
1330 thisJob.chanid = 0;
1331 logInfo = QString("jobID #%1").arg(thisJob.id);
1332 }
1333 else
1334 {
1335 thisJob.chanid = query.value(1).toUInt();
1336 logInfo = QString("chanid %1 @ %2").arg(thisJob.chanid)
1337 .arg(thisJob.startts);
1338 }
1339
1340 if ((MythDate::as_utc(query.value(12).toDateTime()) > MythDate::current()) &&
1341 ((!commflagWhileRecording) ||
1342 ((thisJob.type != JOB_COMMFLAG) &&
1343 (thisJob.type != JOB_METADATA))))
1344 {
1345 LOG(VB_JOBQUEUE, LOG_INFO, LOC +
1346 QString("GetJobsInQueue: Ignoring '%1' Job "
1347 "for %2 in %3 state. Endtime in future.")
1348 .arg(JobText(thisJob.type),
1349 logInfo, StatusText(thisJob.status)));
1350 continue;
1351 }
1352
1353 if ((findJobs & JOB_LIST_ALL) ||
1354 ((findJobs & JOB_LIST_DONE) &&
1355 (thisJob.status & JOB_DONE)) ||
1356 ((findJobs & JOB_LIST_NOT_DONE) &&
1357 (!(thisJob.status & JOB_DONE))) ||
1358 ((findJobs & JOB_LIST_ERROR) &&
1359 (thisJob.status == JOB_ERRORED)) ||
1360 ((findJobs & JOB_LIST_RECENT) &&
1361 (thisJob.statustime > recentDate)))
1362 wantThisJob = true;
1363
1364 if (!wantThisJob)
1365 {
1366 LOG(VB_JOBQUEUE, LOG_INFO, LOC +
1367 QString("GetJobsInQueue: Ignore '%1' Job for %2 in %3 state.")
1368 .arg(JobText(thisJob.type),
1369 logInfo, StatusText(thisJob.status)));
1370 continue;
1371 }
1372
1373 LOG(VB_JOBQUEUE, LOG_INFO, LOC +
1374 QString("GetJobsInQueue: Found '%1' Job for %2 in %3 state.")
1375 .arg(JobText(thisJob.type),
1376 logInfo, StatusText(thisJob.status)));
1377
1378 thisJob.inserttime = MythDate::as_utc(query.value(3).toDateTime());
1379 thisJob.cmds = query.value(5).toInt();
1380 thisJob.flags = query.value(6).toInt();
1381 thisJob.hostname = query.value(9).toString();
1382 thisJob.args = query.value(10).toString();
1383 thisJob.comment = query.value(11).toString();
1384
1385 if ((thisJob.type & JOB_USERJOB) &&
1386 (UserJobTypeToIndex(thisJob.type) == 0))
1387 {
1388 thisJob.type = JOB_NONE;
1389 LOG(VB_JOBQUEUE, LOG_INFO, LOC +
1390 QString("GetJobsInQueue: Unknown Job Type: %1")
1391 .arg(thisJob.type));
1392 }
1393
1394 if (thisJob.type != JOB_NONE)
1395 jobs[jobCount++] = thisJob;
1396 }
1397
1398 return jobCount;
1399}
1400
1401bool JobQueue::ChangeJobHost(int jobID, const QString& newHostname)
1402{
1404
1405 if (!newHostname.isEmpty())
1406 {
1407 query.prepare("UPDATE jobqueue SET hostname = :NEWHOSTNAME "
1408 "WHERE hostname = :EMPTY AND id = :ID;");
1409 query.bindValue(":NEWHOSTNAME", newHostname);
1410 query.bindValue(":EMPTY", "");
1411 query.bindValue(":ID", jobID);
1412 }
1413 else
1414 {
1415 query.prepare("UPDATE jobqueue SET hostname = :EMPTY "
1416 "WHERE id = :ID;");
1417 query.bindValue(":EMPTY", "");
1418 query.bindValue(":ID", jobID);
1419 }
1420
1421 if (!query.exec())
1422 {
1423 MythDB::DBError(QString("Error in JobQueue::ChangeJobHost(), "
1424 "Unable to set hostname to '%1' for "
1425 "job %2.").arg(newHostname).arg(jobID),
1426 query);
1427 return false;
1428 }
1429
1430 return query.numRowsAffected() > 0;
1431}
1432
1434{
1435 QString allowSetting;
1436
1437 if ((!job.hostname.isEmpty()) &&
1438 (job.hostname != m_hostname))
1439 return false;
1440
1441 if (job.type & JOB_USERJOB)
1442 {
1443 allowSetting =
1444 QString("JobAllowUserJob%1").arg(UserJobTypeToIndex(job.type));
1445 }
1446 else
1447 {
1448 switch (job.type)
1449 {
1450 case JOB_TRANSCODE: allowSetting = "JobAllowTranscode";
1451 break;
1452 case JOB_COMMFLAG: allowSetting = "JobAllowCommFlag";
1453 break;
1454 case JOB_METADATA: allowSetting = "JobAllowMetadata";
1455 break;
1456 case JOB_PREVIEW: allowSetting = "JobAllowPreview";
1457 break;
1458 default: return false;
1459 }
1460 }
1461
1462 return gCoreContext->GetBoolSetting(allowSetting, true);
1463}
1464
1466{
1468
1469 query.prepare("SELECT cmds FROM jobqueue WHERE id = :ID;");
1470
1471 query.bindValue(":ID", jobID);
1472
1473 if (query.exec())
1474 {
1475 if (query.next())
1476 return (enum JobCmds)query.value(0).toInt();
1477 }
1478 else
1479 {
1480 MythDB::DBError("Error in JobQueue::GetJobCmd()", query);
1481 }
1482
1483 return JOB_RUN;
1484}
1485
1487{
1489
1490 query.prepare("SELECT args FROM jobqueue WHERE id = :ID;");
1491
1492 query.bindValue(":ID", jobID);
1493
1494 if (query.exec())
1495 {
1496 if (query.next())
1497 return query.value(0).toString();
1498 }
1499 else
1500 {
1501 MythDB::DBError("Error in JobQueue::GetJobArgs()", query);
1502 }
1503
1504 return {""};
1505}
1506
1508{
1510
1511 query.prepare("SELECT flags FROM jobqueue WHERE id = :ID;");
1512
1513 query.bindValue(":ID", jobID);
1514
1515 if (query.exec())
1516 {
1517 if (query.next())
1518 return (enum JobFlags)query.value(0).toInt();
1519 }
1520 else
1521 {
1522 MythDB::DBError("Error in JobQueue::GetJobFlags()", query);
1523 }
1524
1525 return JOB_NO_FLAGS;
1526}
1527
1529{
1531
1532 query.prepare("SELECT status FROM jobqueue WHERE id = :ID;");
1533
1534 query.bindValue(":ID", jobID);
1535
1536 if (query.exec())
1537 {
1538 if (query.next())
1539 return (enum JobStatus)query.value(0).toInt();
1540 }
1541 else
1542 {
1543 MythDB::DBError("Error in JobQueue::GetJobStatus()", query);
1544 }
1545 return JOB_UNKNOWN;
1546}
1547
1549 int jobType, uint chanid, const QDateTime &recstartts)
1550{
1552
1553 query.prepare("SELECT status FROM jobqueue WHERE type = :TYPE "
1554 "AND chanid = :CHANID AND starttime = :STARTTIME;");
1555
1556 query.bindValue(":TYPE", jobType);
1557 query.bindValue(":CHANID", chanid);
1558 query.bindValue(":STARTTIME", recstartts);
1559
1560 if (query.exec())
1561 {
1562 if (query.next())
1563 return (enum JobStatus)query.value(0).toInt();
1564 }
1565 else
1566 {
1567 MythDB::DBError("Error in JobQueue::GetJobStatus()", query);
1568 }
1569 return JOB_UNKNOWN;
1570}
1571
1572void JobQueue::RecoverQueue(bool justOld)
1573{
1574 QMap<int, JobQueueEntry> jobs;
1575 QString msg;
1576 QString logInfo;
1577
1578 msg = QString("RecoverQueue: Checking for unfinished jobs to "
1579 "recover.");
1580 LOG(VB_JOBQUEUE, LOG_INFO, LOC + msg);
1581
1582 GetJobsInQueue(jobs);
1583
1584 if (!jobs.empty())
1585 {
1586 QMap<int, JobQueueEntry>::Iterator it;
1587 QDateTime oldDate = MythDate::current().addDays(-1);
1588 QString hostname = gCoreContext->GetHostName();
1589
1590 for (it = jobs.begin(); it != jobs.end(); ++it)
1591 {
1592 int tmpCmds = (*it).cmds;
1593 int tmpStatus = (*it).status;
1594
1595 if (!(*it).chanid)
1596 logInfo = QString("jobID #%1").arg((*it).id);
1597 else
1598 logInfo = QString("chanid %1 @ %2").arg((*it).chanid)
1599 .arg((*it).startts);
1600
1601 if (((tmpStatus == JOB_STARTING) ||
1602 (tmpStatus == JOB_RUNNING) ||
1603 (tmpStatus == JOB_PAUSED) ||
1604 (tmpCmds & JOB_STOP) ||
1605 (tmpStatus == JOB_STOPPING)) &&
1606 (((!justOld) &&
1607 ((*it).hostname == hostname)) ||
1608 ((*it).statustime < oldDate)))
1609 {
1610 msg = QString("RecoverQueue: Recovering '%1' for %2 "
1611 "from '%3' state.")
1612 .arg(JobText((*it).type),
1613 logInfo, StatusText((*it).status));
1614 LOG(VB_JOBQUEUE, LOG_INFO, LOC + msg);
1615
1616 ChangeJobStatus((*it).id, JOB_QUEUED, "");
1617 ChangeJobCmds((*it).id, JOB_RUN);
1618 if (!gCoreContext->GetBoolSetting("JobsRunOnRecordHost", false))
1619 ChangeJobHost((*it).id, "");
1620 }
1621 else
1622 {
1623#if 0
1624 msg = QString("RecoverQueue: Ignoring '%1' for %2 "
1625 "in '%3' state.")
1626 .arg(JobText((*it).type))
1627 .arg(logInfo).arg(StatusText((*it).status));
1628 LOG(VB_JOBQUEUE, LOG_INFO, LOC + msg);
1629#endif
1630 }
1631 }
1632 }
1633}
1634
1636{
1637 MSqlQuery delquery(MSqlQuery::InitCon());
1638 QDateTime donePurgeDate = MythDate::current().addDays(-2);
1639 QDateTime errorsPurgeDate = MythDate::current().addDays(-4);
1640
1641 delquery.prepare("DELETE FROM jobqueue "
1642 "WHERE (status in (:FINISHED, :ABORTED, :CANCELLED) "
1643 "AND statustime < :DONEPURGEDATE) "
1644 "OR (status in (:ERRORED) "
1645 "AND statustime < :ERRORSPURGEDATE) ");
1646 delquery.bindValue(":FINISHED", JOB_FINISHED);
1647 delquery.bindValue(":ABORTED", JOB_ABORTED);
1648 delquery.bindValue(":CANCELLED", JOB_CANCELLED);
1649 delquery.bindValue(":ERRORED", JOB_ERRORED);
1650 delquery.bindValue(":DONEPURGEDATE", donePurgeDate);
1651 delquery.bindValue(":ERRORSPURGEDATE", errorsPurgeDate);
1652
1653 if (!delquery.exec())
1654 {
1655 MythDB::DBError("JobQueue::CleanupOldJobsInQueue: Error deleting "
1656 "old finished jobs.", delquery);
1657 }
1658}
1659
1660bool JobQueue::InJobRunWindow(QDateTime jobstarttsRaw)
1661{
1662 if (!jobstarttsRaw.isValid())
1663 {
1664 jobstarttsRaw = QDateTime::currentDateTime();
1665 LOG(VB_JOBQUEUE, LOG_INFO, LOC + QString("Invalid date/time passed, "
1666 "using %1").arg(
1667 jobstarttsRaw.toString()));
1668 }
1669
1670 QString hostname(gCoreContext->GetHostName());
1671
1673 "JobQueueWindowStart", hostname, "00:00")));
1674
1676 "JobQueueWindowEnd", hostname, "23:59")));
1677
1678 QTime scheduleTime(QTime::fromString(jobstarttsRaw.toString("hh:mm")));
1679
1680 if (scheduleTime < windowStart || scheduleTime > windowEnd)
1681 {
1682 LOG(VB_JOBQUEUE, LOG_ERR, LOC + "Time not within job queue window, " +
1683 "job not queued");
1684 return false;
1685 }
1686
1687 return true;
1688}
1689
1691{
1692 int jobID = job.id;
1693
1695 {
1696 LOG(VB_JOBQUEUE, LOG_ERR, LOC +
1697 "ProcessJob(): Unable to open database connection");
1698 return;
1699 }
1700
1701 ChangeJobStatus(jobID, JOB_PENDING);
1702 ProgramInfo *pginfo = nullptr;
1703
1704 if (job.chanid)
1705 {
1706 pginfo = new ProgramInfo(job.chanid, job.recstartts);
1707
1708 if (!pginfo->GetChanID())
1709 {
1710 LOG(VB_JOBQUEUE, LOG_ERR, LOC +
1711 QString("Unable to retrieve program info for chanid %1 @ %2")
1712 .arg(job.chanid)
1713 .arg(job.recstartts.toString(Qt::ISODate)));
1714
1715 ChangeJobStatus(jobID, JOB_ERRORED,
1716 tr("Unable to retrieve program info from database"));
1717
1718 delete pginfo;
1719
1720 return;
1721 }
1722
1723 pginfo->SetPathname(pginfo->GetPlaybackURL());
1724 }
1725
1726
1727 m_runningJobsLock->lock();
1728
1729 ChangeJobStatus(jobID, JOB_STARTING);
1730 RunningJobInfo jInfo;
1731 jInfo.type = job.type;
1732 jInfo.id = jobID;
1733 jInfo.flag = JOB_RUN;
1734 jInfo.desc = GetJobDescription(job.type);
1735 jInfo.command = GetJobCommand(jobID, job.type, pginfo);
1736 jInfo.pginfo = pginfo;
1737
1738 m_runningJobs[jobID] = jInfo;
1739
1740 if (pginfo)
1741 pginfo->MarkAsInUse(true, kJobQueueInUseID);
1742
1743 if (pginfo && pginfo->GetRecordingGroup() == "Deleted")
1744 {
1745 ChangeJobStatus(jobID, JOB_CANCELLED,
1746 tr("Program has been deleted"));
1748 }
1749 else if ((job.type == JOB_TRANSCODE) ||
1750 (m_runningJobs[jobID].command == "mythtranscode"))
1751 {
1753 }
1754 else if ((job.type == JOB_COMMFLAG) ||
1755 (m_runningJobs[jobID].command == "mythcommflag"))
1756 {
1758 }
1759 else if ((job.type == JOB_METADATA) ||
1760 (m_runningJobs[jobID].command == "mythmetadatalookup"))
1761 {
1763 }
1764 else if (job.type & JOB_USERJOB)
1765 {
1767 }
1768 else
1769 {
1770 ChangeJobStatus(jobID, JOB_ERRORED,
1771 tr("UNKNOWN JobType, unable to process!"));
1773 }
1774
1775 m_runningJobsLock->unlock();
1776}
1777
1778void JobQueue::StartChildJob(void *(*ChildThreadRoutine)(void *), int jobID)
1779{
1780 auto *jts = new JobThreadStruct;
1781 jts->jq = this;
1782 jts->jobID = jobID;
1783
1784 pthread_t childThread = 0;
1785 pthread_attr_t attr;
1786 pthread_attr_init(&attr);
1787 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
1788 pthread_create(&childThread, &attr, ChildThreadRoutine, jts);
1789 pthread_attr_destroy(&attr);
1790}
1791
1793{
1794 if (jobType == JOB_TRANSCODE)
1795 return "Transcode";
1796 if (jobType == JOB_COMMFLAG)
1797 return "Commercial Detection";
1798 if (!(jobType & JOB_USERJOB))
1799 return "Unknown Job";
1800
1801 QString descSetting =
1802 QString("UserJobDesc%1").arg(UserJobTypeToIndex(jobType));
1803
1804 return gCoreContext->GetSetting(descSetting, "Unknown Job");
1805}
1806
1807QString JobQueue::GetJobCommand(int id, int jobType, ProgramInfo *tmpInfo)
1808{
1809 QString command;
1811
1812 if (jobType == JOB_TRANSCODE)
1813 {
1814 command = gCoreContext->GetSetting("JobQueueTranscodeCommand");
1815 if (command.trimmed().isEmpty())
1816 command = "mythtranscode";
1817
1818 if (command == "mythtranscode")
1819 return command;
1820 }
1821 else if (jobType == JOB_COMMFLAG)
1822 {
1823 command = gCoreContext->GetSetting("JobQueueCommFlagCommand");
1824 if (command.trimmed().isEmpty())
1825 command = "mythcommflag";
1826
1827 if (command == "mythcommflag")
1828 return command;
1829 }
1830 else if (jobType & JOB_USERJOB)
1831 {
1832 command = gCoreContext->GetSetting(
1833 QString("UserJob%1").arg(UserJobTypeToIndex(jobType)), "");
1834 }
1835
1836 if (!command.isEmpty())
1837 {
1838 command.replace("%JOBID%", QString("%1").arg(id));
1839 }
1840
1841 if (!command.isEmpty() && tmpInfo)
1842 {
1843 tmpInfo->SubstituteMatches(command);
1844
1845 command.replace("%VERBOSELEVEL%", QString("%1").arg(verboseMask));
1846 command.replace("%VERBOSEMODE%", QString("%1").arg(logPropagateArgs));
1847
1848 uint transcoder = tmpInfo->QueryTranscoderID();
1849 command.replace("%TRANSPROFILE%",
1851 "autodetect" : QString::number(transcoder));
1852 }
1853
1854 return command;
1855}
1856
1858{
1859 m_runningJobsLock->lock();
1860
1861 if (m_runningJobs.contains(id))
1862 {
1863 ProgramInfo *pginfo = m_runningJobs[id].pginfo;
1864 if (pginfo)
1865 {
1866 pginfo->MarkAsInUse(false, kJobQueueInUseID);
1867 delete pginfo;
1868 }
1869
1870 m_runningJobs.remove(id);
1871 }
1872
1873 m_runningJobsLock->unlock();
1874}
1875
1877{
1878 // Pretty print "bytes" as KB, MB, GB, TB, etc., subject to the desired
1879 // number of units
1880 struct PpTab_t {
1881 const char *m_suffix;
1882 unsigned int m_max;
1883 int m_precision;
1884 };
1885 static constexpr std::array<const PpTab_t,9> kPpTab {{
1886 { "bytes", 9999, 0 },
1887 { "kB", 999, 0 },
1888 { "MB", 999, 1 },
1889 { "GB", 999, 1 },
1890 { "TB", 999, 1 },
1891 { "PB", 999, 1 },
1892 { "EB", 999, 1 },
1893 { "ZB", 999, 1 },
1894 { "YB", 0, 0 },
1895 }};
1896 float fbytes = bytes;
1897
1898 unsigned int ii = 0;
1899 while (kPpTab[ii].m_max && fbytes > kPpTab[ii].m_max) {
1900 fbytes /= 1024;
1901 ii++;
1902 }
1903
1904 return QString("%1 %2")
1905 .arg(fbytes, 0, 'f', kPpTab[ii].m_precision)
1906 .arg(kPpTab[ii].m_suffix);
1907}
1908
1910{
1911 auto *jts = (JobThreadStruct *)param;
1912 JobQueue *jq = jts->jq;
1913
1914 MThread::ThreadSetup(QString("Transcode_%1").arg(jts->jobID));
1915 jq->DoTranscodeThread(jts->jobID);
1917
1918 delete jts;
1919
1920 return nullptr;
1921}
1922
1924{
1925 // We can't currently transcode non-recording files w/o a ProgramInfo
1926 m_runningJobsLock->lock();
1927 if (!m_runningJobs[jobID].pginfo)
1928 {
1929 LOG(VB_JOBQUEUE, LOG_ERR, LOC +
1930 "The JobQueue cannot currently transcode files that do not "
1931 "have a chanid/starttime in the recorded table.");
1932 ChangeJobStatus(jobID, JOB_ERRORED, "ProgramInfo data not found");
1934 m_runningJobsLock->unlock();
1935 return;
1936 }
1937
1938 ProgramInfo *program_info = m_runningJobs[jobID].pginfo;
1939 m_runningJobsLock->unlock();
1940
1941 ChangeJobStatus(jobID, JOB_RUNNING);
1942
1943 // make sure flags are up to date
1944 program_info->Reload();
1945
1946 bool useCutlist = program_info->HasCutlist() &&
1947 ((GetJobFlags(jobID) & JOB_USE_CUTLIST) != 0);
1948
1949 uint transcoder = program_info->QueryTranscoderID();
1950 QString profilearg =
1952 "autodetect" : QString::number(transcoder);
1953
1954 QString path;
1955 QString command;
1956
1957 m_runningJobsLock->lock();
1958 if (m_runningJobs[jobID].command == "mythtranscode")
1959 {
1960 path = GetAppBinDir() + "mythtranscode";
1961 command = QString("%1 -j %2 --profile %3")
1962 .arg(path).arg(jobID).arg(profilearg);
1963 if (useCutlist)
1964 command += " --honorcutlist";
1965 command += logPropagateArgs;
1966 }
1967 else
1968 {
1969 command = m_runningJobs[jobID].command;
1970
1971 QStringList tokens = command.split(" ", Qt::SkipEmptyParts);
1972 if (!tokens.empty())
1973 path = tokens[0];
1974 }
1975 m_runningJobsLock->unlock();
1976
1977 if (m_jobQueueCPU < 2)
1978 {
1979 myth_nice(17);
1980 myth_ioprio((0 == m_jobQueueCPU) ? 8 : 7);
1981 }
1982
1983 QString transcoderName;
1985 {
1986 transcoderName = "Autodetect";
1987 }
1988 else
1989 {
1991 query.prepare("SELECT name FROM recordingprofiles WHERE id = :ID;");
1992 query.bindValue(":ID", transcoder);
1993 if (query.exec() && query.next())
1994 {
1995 transcoderName = query.value(0).toString();
1996 }
1997 else
1998 {
1999 /* Unexpected value; log it. */
2000 transcoderName = QString("Autodetect(%1)").arg(transcoder);
2001 }
2002 }
2003
2004 bool retry = true;
2005 int retrylimit = 3;
2006 while (retry)
2007 {
2008 retry = false;
2009
2010 ChangeJobStatus(jobID, JOB_STARTING);
2012
2013 QString filename = program_info->GetPlaybackURL(false, true);
2014
2015 long long filesize = 0;
2016 long long origfilesize = QFileInfo(filename).size();
2017
2018 QString msg = QString("Transcode %1")
2020
2021 QString details = QString("%1: %2 (%3)")
2022 .arg(program_info->toString(ProgramInfo::kTitleSubtitle),
2023 transcoderName, PrettyPrint(origfilesize));
2024
2025 LOG(VB_GENERAL, LOG_INFO, LOC + QString("%1 for %2")
2026 .arg(msg, details));
2027
2028 LOG(VB_JOBQUEUE, LOG_INFO, LOC + QString("Running command: '%1'")
2029 .arg(command));
2030
2031 GetMythDB()->GetDBManager()->CloseDatabases();
2032 uint result = myth_system(command);
2033 int status = GetJobStatus(jobID);
2034
2035 if ((result == GENERIC_EXIT_DAEMONIZING_ERROR) ||
2036 (result == GENERIC_EXIT_CMD_NOT_FOUND))
2037 {
2038 ChangeJobStatus(jobID, JOB_ERRORED,
2039 tr("ERROR: Unable to find mythtranscode, check backend logs."));
2041
2042 msg = QString("Transcode %1").arg(StatusText(GetJobStatus(jobID)));
2043 details = QString("%1: %2 does not exist or is not executable")
2044 .arg(program_info->toString(ProgramInfo::kTitleSubtitle),path);
2045
2046 LOG(VB_GENERAL, LOG_ERR, LOC +
2047 QString("%1 for %2").arg(msg, details));
2048 }
2049 else if (result == GENERIC_EXIT_RESTART && retrylimit > 0)
2050 {
2051 LOG(VB_JOBQUEUE, LOG_INFO, LOC + "Transcode command restarting");
2052 retry = true;
2053 retrylimit--;
2054
2056 }
2057 else
2058 {
2059 if (status == JOB_FINISHED)
2060 {
2061 ChangeJobStatus(jobID, JOB_FINISHED, tr("Finished."));
2062 retry = false;
2063
2064 program_info->Reload(); // Refresh, the basename may have changed
2065 filename = program_info->GetPlaybackURL(false, true);
2066 QFileInfo st(filename);
2067
2068 if (st.exists())
2069 {
2070 filesize = st.size();
2071 /*: %1 is transcoder name, %2 is the original file size
2072 and %3 is the current file size */
2073 QString comment = tr("%1: %2 => %3")
2074 .arg(transcoderName,
2075 PrettyPrint(origfilesize),
2076 PrettyPrint(filesize));
2077 ChangeJobComment(jobID, comment);
2078
2079 if (filesize > 0)
2080 program_info->SaveFilesize(filesize);
2081
2082 details = QString("%1: %2 (%3)")
2083 .arg(program_info->toString(
2085 transcoderName,
2086 PrettyPrint(filesize));
2087 }
2088 else
2089 {
2090 QString comment =
2091 QString("could not stat '%1'").arg(filename);
2092
2093 ChangeJobStatus(jobID, JOB_FINISHED, comment);
2094
2095 details = QString("%1: %2")
2096 .arg(program_info->toString(
2098 comment);
2099 }
2100
2102 }
2103 else
2104 {
2106
2107 QString comment = tr("exit status %1, job status was \"%2\"")
2108 .arg(result)
2109 .arg(StatusText(status));
2110
2111 ChangeJobStatus(jobID, JOB_ERRORED, comment);
2112
2113 details = QString("%1: %2 (%3)")
2114 .arg(program_info->toString(
2116 transcoderName,
2117 comment);
2118 }
2119
2120 msg = QString("Transcode %1").arg(StatusText(GetJobStatus(jobID)));
2121 LOG(VB_GENERAL, LOG_INFO, LOC + msg + ": " + details);
2122 }
2123 }
2124
2125 if (retrylimit == 0)
2126 {
2127 LOG(VB_JOBQUEUE, LOG_ERR, LOC + "Retry limit exceeded for transcoder, "
2128 "setting job status to errored.");
2129 ChangeJobStatus(jobID, JOB_ERRORED, tr("Retry limit exceeded"));
2130 }
2131
2133}
2134
2136{
2137 auto *jts = (JobThreadStruct *)param;
2138 JobQueue *jq = jts->jq;
2139
2140 MThread::ThreadSetup(QString("Metadata_%1").arg(jts->jobID));
2141 jq->DoMetadataLookupThread(jts->jobID);
2143
2144 delete jts;
2145
2146 return nullptr;
2147}
2148
2150{
2151 // We can't currently lookup non-recording files w/o a ProgramInfo
2152 m_runningJobsLock->lock();
2153 if (!m_runningJobs[jobID].pginfo)
2154 {
2155 LOG(VB_JOBQUEUE, LOG_ERR, LOC +
2156 "The JobQueue cannot currently perform lookups for items which do "
2157 "not have a chanid/starttime in the recorded table.");
2158 ChangeJobStatus(jobID, JOB_ERRORED, "ProgramInfo data not found");
2160 m_runningJobsLock->unlock();
2161 return;
2162 }
2163
2164 ProgramInfo *program_info = m_runningJobs[jobID].pginfo;
2165 m_runningJobsLock->unlock();
2166
2167 QString details = QString("%1 recorded from channel %3")
2168 .arg(program_info->toString(ProgramInfo::kTitleSubtitle),
2169 program_info->toString(ProgramInfo::kRecordingKey));
2170
2172 {
2173 QString msg = QString("Metadata Lookup failed. Could not open "
2174 "new database connection for %1. "
2175 "Program cannot be looked up.")
2176 .arg(details);
2177 LOG(VB_GENERAL, LOG_ERR, LOC + msg);
2178
2179 ChangeJobStatus(jobID, JOB_ERRORED,
2180 tr("Could not open new database connection for "
2181 "metadata lookup."));
2182
2183 delete program_info;
2184 return;
2185 }
2186
2187 LOG(VB_GENERAL, LOG_INFO,
2188 LOC + "Metadata Lookup Starting for " + details);
2189
2190 uint retVal = 0;
2191 QString path;
2192 QString command;
2193
2194 path = GetAppBinDir() + "mythmetadatalookup";
2195 command = QString("%1 -j %2")
2196 .arg(path).arg(jobID);
2197 command += logPropagateArgs;
2198
2199 LOG(VB_JOBQUEUE, LOG_INFO, LOC + QString("Running command: '%1'")
2200 .arg(command));
2201
2202 GetMythDB()->GetDBManager()->CloseDatabases();
2203 retVal = myth_system(command);
2204 int priority = LOG_NOTICE;
2205 QString comment;
2206
2207 m_runningJobsLock->lock();
2208
2209 if ((retVal == GENERIC_EXIT_DAEMONIZING_ERROR) ||
2210 (retVal == GENERIC_EXIT_CMD_NOT_FOUND))
2211 {
2212 comment = tr("Unable to find mythmetadatalookup");
2213 ChangeJobStatus(jobID, JOB_ERRORED, comment);
2214 priority = LOG_WARNING;
2215 }
2216 else if (m_runningJobs[jobID].flag == JOB_STOP)
2217 {
2218 comment = tr("Aborted by user");
2219 ChangeJobStatus(jobID, JOB_ABORTED, comment);
2220 priority = LOG_WARNING;
2221 }
2222 else if (retVal == GENERIC_EXIT_NO_RECORDING_DATA)
2223 {
2224 comment = tr("Unable to open file or init decoder");
2225 ChangeJobStatus(jobID, JOB_ERRORED, comment);
2226 priority = LOG_WARNING;
2227 }
2228 else if (retVal >= GENERIC_EXIT_NOT_OK) // 256 or above - error
2229 {
2230 comment = tr("Failed with exit status %1").arg(retVal);
2231 ChangeJobStatus(jobID, JOB_ERRORED, comment);
2232 priority = LOG_WARNING;
2233 }
2234 else
2235 {
2236 comment = tr("Metadata Lookup Complete.");
2237 ChangeJobStatus(jobID, JOB_FINISHED, comment);
2238
2239 program_info->SendUpdateEvent();
2240 }
2241
2242 QString msg = tr("Metadata Lookup %1", "Job ID")
2244
2245 if (!comment.isEmpty())
2246 details += QString(" (%1)").arg(comment);
2247
2248 if (priority <= LOG_WARNING)
2249 LOG(VB_GENERAL, LOG_ERR, LOC + msg + ": " + details);
2250
2252 m_runningJobsLock->unlock();
2253}
2254
2256{
2257 auto *jts = (JobThreadStruct *)param;
2258 JobQueue *jq = jts->jq;
2259
2260 MThread::ThreadSetup(QString("Commflag_%1").arg(jts->jobID));
2261 jq->DoFlagCommercialsThread(jts->jobID);
2263
2264 delete jts;
2265
2266 return nullptr;
2267}
2268
2270{
2271 // We can't currently commflag non-recording files w/o a ProgramInfo
2272 m_runningJobsLock->lock();
2273 if (!m_runningJobs[jobID].pginfo)
2274 {
2275 LOG(VB_JOBQUEUE, LOG_ERR, LOC +
2276 "The JobQueue cannot currently commflag files that do not "
2277 "have a chanid/starttime in the recorded table.");
2278 ChangeJobStatus(jobID, JOB_ERRORED, "ProgramInfo data not found");
2280 m_runningJobsLock->unlock();
2281 return;
2282 }
2283
2284 ProgramInfo *program_info = m_runningJobs[jobID].pginfo;
2285 m_runningJobsLock->unlock();
2286
2287 QString details = QString("%1 recorded from channel %3")
2288 .arg(program_info->toString(ProgramInfo::kTitleSubtitle),
2289 program_info->toString(ProgramInfo::kRecordingKey));
2290
2292 {
2293 QString msg = QString("Commercial Detection failed. Could not open "
2294 "new database connection for %1. "
2295 "Program cannot be flagged.")
2296 .arg(details);
2297 LOG(VB_GENERAL, LOG_ERR, LOC + msg);
2298
2299 ChangeJobStatus(jobID, JOB_ERRORED,
2300 tr("Could not open new database connection for "
2301 "commercial detector."));
2302
2303 delete program_info;
2304 return;
2305 }
2306
2307 LOG(VB_GENERAL, LOG_INFO,
2308 LOC + "Commercial Detection Starting for " + details);
2309
2310 uint breaksFound = 0;
2311 QString path;
2312 QString command;
2313
2314 m_runningJobsLock->lock();
2315 if (m_runningJobs[jobID].command == "mythcommflag")
2316 {
2317 path = GetAppBinDir() + "mythcommflag";
2318 command = QString("%1 -j %2 --noprogress")
2319 .arg(path).arg(jobID);
2320 command += logPropagateArgs;
2321 }
2322 else
2323 {
2324 command = m_runningJobs[jobID].command;
2325 QStringList tokens = command.split(" ", Qt::SkipEmptyParts);
2326 if (!tokens.empty())
2327 path = tokens[0];
2328 }
2329 m_runningJobsLock->unlock();
2330
2331 LOG(VB_JOBQUEUE, LOG_INFO, LOC + QString("Running command: '%1'")
2332 .arg(command));
2333
2334 GetMythDB()->GetDBManager()->CloseDatabases();
2335 breaksFound = myth_system(command, kMSLowExitVal);
2336 int priority = LOG_NOTICE;
2337 QString comment;
2338
2339 m_runningJobsLock->lock();
2340
2341 if ((breaksFound == GENERIC_EXIT_DAEMONIZING_ERROR) ||
2342 (breaksFound == GENERIC_EXIT_CMD_NOT_FOUND))
2343 {
2344 comment = tr("Unable to find mythcommflag");
2345 ChangeJobStatus(jobID, JOB_ERRORED, comment);
2346 priority = LOG_WARNING;
2347 }
2348 else if (m_runningJobs[jobID].flag == JOB_STOP)
2349 {
2350 comment = tr("Aborted by user");
2351 ChangeJobStatus(jobID, JOB_ABORTED, comment);
2352 priority = LOG_WARNING;
2353 }
2354 else if (breaksFound == GENERIC_EXIT_NO_RECORDING_DATA)
2355 {
2356 comment = tr("Unable to open file or init decoder");
2357 ChangeJobStatus(jobID, JOB_ERRORED, comment);
2358 priority = LOG_WARNING;
2359 }
2360 else if (breaksFound >= GENERIC_EXIT_NOT_OK) // 256 or above - error
2361 {
2362 comment = tr("Failed with exit status %1").arg(breaksFound);
2363 ChangeJobStatus(jobID, JOB_ERRORED, comment);
2364 priority = LOG_WARNING;
2365 }
2366 else
2367 {
2368 comment = tr("%n commercial break(s)", "", breaksFound);
2369 ChangeJobStatus(jobID, JOB_FINISHED, comment);
2370
2371 program_info->SendUpdateEvent();
2372
2373 if (!program_info->IsLocal())
2374 program_info->SetPathname(program_info->GetPlaybackURL(false,true));
2375 if (program_info->IsLocal())
2376 {
2377 auto *pg = new PreviewGenerator(program_info, QString(),
2379 pg->Run();
2380 pg->deleteLater();
2381 }
2382 }
2383
2384 QString msg = tr("Commercial Detection %1", "Job ID")
2386
2387 if (!comment.isEmpty())
2388 details += QString(" (%1)").arg(comment);
2389
2390 if (priority <= LOG_WARNING)
2391 LOG(VB_GENERAL, LOG_ERR, LOC + msg + ": " + details);
2392
2394 m_runningJobsLock->unlock();
2395}
2396
2397void *JobQueue::UserJobThread(void *param)
2398{
2399 auto *jts = (JobThreadStruct *)param;
2400 JobQueue *jq = jts->jq;
2401
2402 MThread::ThreadSetup(QString("UserJob_%1").arg(jts->jobID));
2403 jq->DoUserJobThread(jts->jobID);
2405
2406 delete jts;
2407
2408 return nullptr;
2409}
2410
2412{
2413 m_runningJobsLock->lock();
2414 ProgramInfo *pginfo = m_runningJobs[jobID].pginfo;
2415 QString jobDesc = m_runningJobs[jobID].desc;
2416 QString command = m_runningJobs[jobID].command;
2417 m_runningJobsLock->unlock();
2418
2419 ChangeJobStatus(jobID, JOB_RUNNING);
2420
2421 QString msg;
2422
2423 if (pginfo)
2424 {
2425 msg = QString("Started %1 for %2 recorded from channel %3")
2426 .arg(jobDesc,
2429 }
2430 else
2431 {
2432 msg = QString("Started %1 for jobID %2").arg(jobDesc).arg(jobID);
2433 }
2434
2435 LOG(VB_GENERAL, LOG_INFO, LOC + QString(msg.toLocal8Bit().constData()));
2436
2437 switch (m_jobQueueCPU)
2438 {
2439 case 0: myth_nice(17);
2440 myth_ioprio(8);
2441 break;
2442 case 1: myth_nice(10);
2443 myth_ioprio(7);
2444 break;
2445 case 2:
2446 default: break;
2447 }
2448
2449 LOG(VB_JOBQUEUE, LOG_INFO, LOC + QString("Running command: '%1'")
2450 .arg(command));
2451 GetMythDB()->GetDBManager()->CloseDatabases();
2452 uint result = myth_system(command);
2453
2454 if ((result == GENERIC_EXIT_DAEMONIZING_ERROR) ||
2455 (result == GENERIC_EXIT_CMD_NOT_FOUND))
2456 {
2457 msg = QString("User Job '%1' failed, unable to find "
2458 "executable, check your PATH and backend logs.")
2459 .arg(command);
2460 LOG(VB_GENERAL, LOG_ERR, LOC + msg);
2461 LOG(VB_GENERAL, LOG_NOTICE, LOC + QString("Current PATH: '%1'")
2462 .arg(qEnvironmentVariable("PATH")));
2463
2464 ChangeJobStatus(jobID, JOB_ERRORED,
2465 tr("ERROR: Unable to find executable, check backend logs."));
2466 }
2467 else if (result != 0)
2468 {
2469 msg = QString("User Job '%1' failed.").arg(command);
2470 LOG(VB_GENERAL, LOG_ERR, LOC + msg);
2471
2472 ChangeJobStatus(jobID, JOB_ERRORED,
2473 tr("ERROR: User Job returned non-zero, check logs."));
2474 }
2475 else
2476 {
2477 if (pginfo)
2478 {
2479 msg = QString("Finished %1 for %2 recorded from channel %3")
2480 .arg(jobDesc,
2483 }
2484 else
2485 {
2486 msg = QString("Finished %1 for jobID %2").arg(jobDesc).arg(jobID);
2487 }
2488
2489 LOG(VB_GENERAL, LOG_INFO, LOC + QString(msg.toLocal8Bit().constData()));
2490
2491 ChangeJobStatus(jobID, JOB_FINISHED, tr("Successfully Completed."));
2492
2493 if (pginfo)
2494 pginfo->SendUpdateEvent();
2495 }
2496
2498}
2499
2501{
2502 if (jobType & JOB_USERJOB)
2503 {
2504 int x = ((jobType & JOB_USERJOB)>> 8);
2505 int bits = 1;
2506 while ((x != 0) && ((x & 0x01) == 0))
2507 {
2508 bits++;
2509 x = x >> 1;
2510 }
2511 if ( bits > 4 )
2512 return JOB_NONE;
2513
2514 return bits;
2515 }
2516 return JOB_NONE;
2517}
2518
2519/* vim: set expandtab tabstop=4 shiftwidth=4: */
static bool QueueRecordingJobs(const RecordingInfo &recinfo, int jobTypes=JOB_NONE)
Definition: jobqueue.cpp:486
QMap< int, RunningJobInfo > m_runningJobs
Definition: jobqueue.h:266
static bool ChangeJobHost(int jobID, const QString &newHostname)
Definition: jobqueue.cpp:1401
static bool ChangeJobFlags(int jobID, int newFlags)
Definition: jobqueue.cpp:962
static void RecoverQueue(bool justOld=false)
Definition: jobqueue.cpp:1572
static QString GetJobCommand(int id, int jobType, ProgramInfo *tmpInfo)
Definition: jobqueue.cpp:1807
static bool RestartJob(int jobID)
Definition: jobqueue.cpp:733
static bool SafeDeleteJob(int jobID, int jobType, int chanid, const QDateTime &recstartts)
Definition: jobqueue.cpp:877
static bool ChangeJobCmds(int jobID, int newCmds)
Definition: jobqueue.cpp:919
static QString GetJobArgs(int jobID)
Definition: jobqueue.cpp:1486
QWaitCondition m_queueThreadCond
Definition: jobqueue.h:271
static void CleanupOldJobsInQueue()
Definition: jobqueue.cpp:1635
void DoUserJobThread(int jobID)
Definition: jobqueue.cpp:2411
bool AllowedToRun(const JobQueueEntry &job)
Definition: jobqueue.cpp:1433
static bool GetJobInfoFromID(int jobID, int &jobType, uint &chanid, QDateTime &recstartts)
Definition: jobqueue.cpp:665
static void * TranscodeThread(void *param)
Definition: jobqueue.cpp:1909
static void * UserJobThread(void *param)
Definition: jobqueue.cpp:2397
static bool InJobRunWindow(QDateTime jobstarttsRaw)
Definition: jobqueue.cpp:1660
static int GetJobsInQueue(QMap< int, JobQueueEntry > &jobs, int findJobs=JOB_LIST_NOT_DONE)
Definition: jobqueue.cpp:1282
void DoFlagCommercialsThread(int jobID)
Definition: jobqueue.cpp:2269
static enum JobFlags GetJobFlags(int jobID)
Definition: jobqueue.cpp:1507
static QString JobText(int jobType)
Definition: jobqueue.cpp:1112
QRecursiveMutex * m_runningJobsLock
Definition: jobqueue.h:265
static bool ChangeJobArgs(int jobID, const QString &args="")
Definition: jobqueue.cpp:1035
static enum JobCmds GetJobCmd(int jobID)
Definition: jobqueue.cpp:1465
int m_jobQueueCPU
Definition: jobqueue.h:258
MThread * m_queueThread
Definition: jobqueue.h:270
static bool DeleteAllJobs(uint chanid, const QDateTime &recstartts)
Definition: jobqueue.cpp:751
static bool DeleteJob(int jobID)
Definition: jobqueue.cpp:872
int m_jobsRunning
Definition: jobqueue.h:257
static bool QueueJob(int jobType, uint chanid, const QDateTime &recstartts, const QString &args="", const QString &comment="", QString host="", int flags=0, int status=JOB_QUEUED, QDateTime schedruntime=QDateTime())
Definition: jobqueue.cpp:508
static bool IsJobStatusQueued(int status)
Definition: jobqueue.cpp:1075
static int GetJobID(int jobType, uint chanid, const QDateTime &recstartts)
Definition: jobqueue.cpp:644
void StartChildJob(void *(*ChildThreadRoutine)(void *), int jobID)
Definition: jobqueue.cpp:1778
static bool StopJob(int jobID)
Definition: jobqueue.cpp:742
QMutex m_queueThreadCondLock
Definition: jobqueue.h:272
static bool ResumeJob(int jobID)
Definition: jobqueue.cpp:724
static bool ChangeJobComment(int jobID, const QString &comment="")
Definition: jobqueue.cpp:1010
void ProcessJob(const JobQueueEntry &job)
Definition: jobqueue.cpp:1690
static bool IsJobQueuedOrRunning(int jobType, uint chanid, const QDateTime &recstartts)
Definition: jobqueue.cpp:1098
static void * FlagCommercialsThread(void *param)
Definition: jobqueue.cpp:2255
void ProcessQueue(void)
Definition: jobqueue.cpp:159
static bool IsJobRunning(int jobType, uint chanid, const QDateTime &recstartts)
Definition: jobqueue.cpp:1086
static QString GetJobDescription(int jobType)
Definition: jobqueue.cpp:1792
static bool ChangeJobStatus(int jobID, int newStatus, const QString &comment="")
Definition: jobqueue.cpp:983
static void * MetadataLookupThread(void *param)
Definition: jobqueue.cpp:2135
static QString PrettyPrint(off_t bytes)
Definition: jobqueue.cpp:1876
static enum JobStatus GetJobStatus(int jobID)
Definition: jobqueue.cpp:1528
bool m_processQueue
Definition: jobqueue.h:273
void RemoveRunningJob(int id)
Definition: jobqueue.cpp:1857
void run(void) override
Definition: jobqueue.cpp:144
int GetRunningJobID(uint chanid, const QDateTime &recstartts)
Definition: jobqueue.cpp:1057
static bool PauseJob(int jobID)
Definition: jobqueue.cpp:715
static bool IsJobQueued(int jobType, uint chanid, const QDateTime &recstartts)
Definition: jobqueue.cpp:1106
void DoMetadataLookupThread(int jobID)
Definition: jobqueue.cpp:2149
void DoTranscodeThread(int jobID)
Definition: jobqueue.cpp:1923
static bool QueueJobs(int jobTypes, uint chanid, const QDateTime &recstartts, const QString &args="", const QString &comment="", const QString &host="")
Definition: jobqueue.cpp:592
static QString StatusText(int status)
Definition: jobqueue.cpp:1135
~JobQueue(void) override
Definition: jobqueue.cpp:62
static int GetJobTypeFromName(const QString &name)
Definition: jobqueue.cpp:704
QString m_hostname
Definition: jobqueue.h:255
JobQueue(bool master)
Definition: jobqueue.cpp:40
static bool HasRunningOrPendingJobs(std::chrono::minutes startingWithinMins=0min)
Definition: jobqueue.cpp:1230
static int UserJobTypeToIndex(int JobType)
Definition: jobqueue.cpp:2500
static bool IsJobStatusRunning(int status)
Definition: jobqueue.cpp:1080
void customEvent(QEvent *e) override
Definition: jobqueue.cpp:78
QSqlQuery wrapper that fetches a DB connection from the connection pool.
Definition: mythdbcon.h:128
bool prepare(const QString &query)
QSqlQuery::prepare() is not thread safe in Qt <= 3.3.2.
Definition: mythdbcon.cpp:837
QVariant value(int i) const
Definition: mythdbcon.h:204
int size(void) const
Definition: mythdbcon.h:214
static bool testDBConnection()
Checks DB connection + login (login info via Mythcontext)
Definition: mythdbcon.cpp:876
int numRowsAffected() const
Definition: mythdbcon.h:217
bool exec(void)
Wrap QSqlQuery::exec() so we can display SQL.
Definition: mythdbcon.cpp:618
void bindValue(const QString &placeholder, const QVariant &val)
Add a single binding.
Definition: mythdbcon.cpp:888
bool next(void)
Wrap QSqlQuery::next() so we can display the query results.
Definition: mythdbcon.cpp:812
static MSqlQueryInfo InitCon(ConnectionReuse _reuse=kNormalConnection)
Only use this in combination with MSqlQuery constructor.
Definition: mythdbcon.cpp:550
This is a wrapper around QThread that does several additional things.
Definition: mthread.h:49
static void ThreadCleanup(void)
This is to be called on exit in those few threads that haven't been ported to MThread.
Definition: mthread.cpp:226
void start(QThread::Priority p=QThread::InheritPriority)
Tell MThread to start running the thread in the near future.
Definition: mthread.cpp:283
bool wait(std::chrono::milliseconds time=std::chrono::milliseconds::max())
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:300
static void ThreadSetup(const QString &name)
This is to be called on startup in those few threads that haven't been ported to MThread.
Definition: mthread.cpp:221
QString GetHostName(void)
QString GetSetting(const QString &key, const QString &defaultval="")
QString GetSettingOnHost(const QString &key, const QString &host, const QString &defaultval="")
void BlockShutdown(void)
void dispatch(const MythEvent &event)
int GetNumSetting(const QString &key, int defaultval=0)
bool IsBlockingClient(void) const
is this client blocking shutdown
void AllowShutdown(void)
std::enable_if_t< std::chrono::__is_duration< T >::value, T > GetDurSetting(const QString &key, T defaultval=T::zero())
bool GetBoolSetting(const QString &key, bool defaultval=false)
static void DBError(const QString &where, const MSqlQuery &query)
Definition: mythdb.cpp:226
This class is used as a container for messages.
Definition: mythevent.h:17
const QString & Message() const
Definition: mythevent.h:65
static const Type kMythEventMessage
Definition: mythevent.h:79
void addListener(QObject *listener)
Add a listener to the observable.
void removeListener(QObject *listener)
Remove a listener to the observable.
This class creates a preview image of a recording.
Holds information on recordings and videos.
Definition: programinfo.h:68
uint GetChanID(void) const
This is the unique key used in the database to locate tuning information.
Definition: programinfo.h:373
void SaveTranscodeStatus(TranscodingStatus trans)
Set "transcoded" field in "recorded" table to "trans".
QString toString(Verbosity v=kLongDescription, const QString &sep=":", const QString &grp="\"") const
virtual void SaveFilesize(uint64_t fsize)
Sets recording file size in database, and sets "filesize" field.
QString GetRecordingGroup(void) const
Definition: programinfo.h:420
uint QueryTranscoderID(void) const
bool HasCutlist(void) const
Definition: programinfo.h:484
QString GetHostname(void) const
Definition: programinfo.h:422
void MarkAsInUse(bool inuse, const QString &usedFor="")
Tracks a recording's in use status, to prevent deletion and to allow the storage scheduler to perform...
QDateTime GetRecordingStartTime(void) const
Approximate time the recording started.
Definition: programinfo.h:405
bool IsLocal(void) const
Definition: programinfo.h:352
bool Reload(void)
bool IsCommercialFree(void) const
Definition: programinfo.h:482
virtual void SubstituteMatches(QString &str)
Subsitute MATCH% type variable names in the given string.
QString GetPlaybackURL(bool checkMaster=false, bool forceCheckLocal=false)
Returns filename or URL to be used to play back this recording.
void SendUpdateEvent(void) const
Sends event out that the ProgramInfo should be reloaded.
void SetPathname(const QString &pn)
Holds information on a TV Program one might wish to record.
Definition: recordinginfo.h:36
int GetAutoRunJobs(void) const
Returns a bitmap of which jobs are attached to this RecordingInfo.
static const uint kTranscoderAutodetect
sentinel value
@ GENERIC_EXIT_RESTART
Need to restart transcoding.
Definition: exitcodes.h:34
@ GENERIC_EXIT_CMD_NOT_FOUND
Command not found.
Definition: exitcodes.h:15
@ GENERIC_EXIT_DAEMONIZING_ERROR
Error daemonizing or execl.
Definition: exitcodes.h:31
@ GENERIC_EXIT_NO_RECORDING_DATA
No program/recording data.
Definition: exitcodes.h:32
@ GENERIC_EXIT_NOT_OK
Exited with error.
Definition: exitcodes.h:14
unsigned int uint
Definition: freesurround.h:24
#define LOC
Definition: jobqueue.cpp:35
static constexpr int64_t kRecentInterval
Definition: jobqueue.cpp:38
#define JOBSTATUS_STATUSTEXT(A, B, C)
Definition: jobqueue.cpp:1133
@ JOB_LIST_DONE
Definition: jobqueue.h:68
@ JOB_LIST_ALL
Definition: jobqueue.h:67
@ JOB_LIST_RECENT
Definition: jobqueue.h:71
@ JOB_LIST_NOT_DONE
Definition: jobqueue.h:69
@ JOB_LIST_ERROR
Definition: jobqueue.h:70
#define JOBSTATUS_MAP(F)
Definition: jobqueue.h:25
@ JOB_USERJOB3
Definition: jobqueue.h:86
@ JOB_METADATA
Definition: jobqueue.h:80
@ JOB_USERJOB1
Definition: jobqueue.h:84
@ JOB_USERJOB
Definition: jobqueue.h:83
@ JOB_USERJOB2
Definition: jobqueue.h:85
@ JOB_PREVIEW
Definition: jobqueue.h:81
@ JOB_NONE
Definition: jobqueue.h:75
@ JOB_COMMFLAG
Definition: jobqueue.h:79
@ JOB_USERJOB4
Definition: jobqueue.h:87
@ JOB_TRANSCODE
Definition: jobqueue.h:78
static QMap< QString, int > JobNameToType
Definition: jobqueue.h:90
JobStatus
Definition: jobqueue.h:44
JobCmds
Definition: jobqueue.h:50
@ JOB_STOP
Definition: jobqueue.h:54
@ JOB_RESTART
Definition: jobqueue.h:55
@ JOB_RESUME
Definition: jobqueue.h:53
@ JOB_RUN
Definition: jobqueue.h:51
@ JOB_PAUSE
Definition: jobqueue.h:52
JobFlags
Definition: jobqueue.h:58
@ JOB_USE_CUTLIST
Definition: jobqueue.h:60
@ JOB_NO_FLAGS
Definition: jobqueue.h:59
uint64_t verboseMask
Definition: logging.cpp:97
QString logPropagateArgs
Definition: logging.cpp:82
static constexpr const char * MYTH_APPNAME_MYTHJOBQUEUE
Definition: mythappname.h:5
int jobID
MythCoreContext * gCoreContext
This global variable contains the MythCoreContext instance for the app.
MythDB * GetMythDB(void)
Definition: mythdb.cpp:51
QString GetAppBinDir(void)
Definition: mythdirs.cpp:260
#define off_t
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
Definition: mythlogging.h:39
bool myth_nice(int val)
bool myth_ioprio(int)
Allows setting the I/O priority of the current process/thread.
@ kMSLowExitVal
allow exit values 0-127 only
Definition: mythsystem.h:47
uint myth_system(const QString &command, uint flags, std::chrono::seconds timeout)
QDateTime as_utc(const QDateTime &old_dt)
Returns copy of QDateTime with TimeSpec set to UTC.
Definition: mythdate.cpp:28
QString toString(const QDateTime &raw_dt, uint format)
Returns formatted string representing the time.
Definition: mythdate.cpp:93
@ kFilename
Default UTC, "yyyyMMddhhmmss".
Definition: mythdate.h:18
@ ISODate
Default UTC.
Definition: mythdate.h:17
QDateTime fromString(const QString &dtstr)
Converts kFilename && kISODate formats to QDateTime.
Definition: mythdate.cpp:39
QDateTime current(bool stripped)
Returns current Date and Time in UTC.
Definition: mythdate.cpp:15
string hostname
Definition: caa.py:17
const QString kJobQueueInUseID
@ TRANSCODING_COMPLETE
Definition: programtypes.h:158
@ TRANSCODING_RUNNING
Definition: programtypes.h:159
@ TRANSCODING_NOT_TRANSCODED
Definition: programtypes.h:157
QDateTime schedruntime
Definition: jobqueue.h:104
QString hostname
Definition: jobqueue.h:112
QDateTime statustime
Definition: jobqueue.h:111
QString comment
Definition: jobqueue.h:114
QDateTime recstartts
Definition: jobqueue.h:103
QString args
Definition: jobqueue.h:113
QString startts
Definition: jobqueue.h:105
QDateTime inserttime
Definition: jobqueue.h:106
ProgramInfo * pginfo
Definition: jobqueue.h:123
QString command
Definition: jobqueue.h:122
QString desc
Definition: jobqueue.h:121