MythTV  0.27pre
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Groups Pages
jobqueue.cpp
Go to the documentation of this file.
1 
2 #include <unistd.h>
3 #include <sys/types.h>
4 #include <sys/stat.h>
5 #include <iostream>
6 #include <cstdlib>
7 #include <fcntl.h>
8 #include <pthread.h>
9 using namespace std;
10 
11 #include <QDateTime>
12 #include <QFileInfo>
13 #include <QRegExp>
14 #include <QEvent>
15 #include <QCoreApplication>
16 
17 #include "mythconfig.h"
18 
19 #include "exitcodes.h"
20 #include "jobqueue.h"
21 #include "programinfo.h"
22 #include "mythcorecontext.h"
23 #include "mythdate.h"
24 #include "previewgenerator.h"
25 #include "compat.h"
26 #include "recordingprofile.h"
27 #include "recordinginfo.h"
28 #include "mthread.h"
29 
30 #include "mythdb.h"
31 #include "mythdirs.h"
32 #include "mythsystem.h"
33 #include "mythlogging.h"
34 #include "mythmiscutil.h"
35 
36 #ifndef O_STREAMING
37 #define O_STREAMING 0
38 #endif
39 
40 #ifndef O_LARGEFILE
41 #define O_LARGEFILE 0
42 #endif
43 
44 #define LOC QString("JobQueue: ")
45 
46 JobQueue::JobQueue(bool master) :
47  m_hostname(gCoreContext->GetHostName()),
48  jobsRunning(0),
49  jobQueueCPU(0),
50  m_pginfo(NULL),
51  runningJobsLock(new QMutex(QMutex::Recursive)),
52  isMaster(master),
53  queueThread(new MThread("JobQueue", this)),
54  processQueue(false)
55 {
56  jobQueueCPU = gCoreContext->GetNumSetting("JobQueueCPU", 0);
57 
58 #ifndef USING_VALGRIND
59  QMutexLocker locker(&queueThreadCondLock);
60  processQueue = true;
61  queueThread->start();
62 #else
63  LOG(VB_GENERAL, LOG_ERR, LOC +
64  "The JobQueue has been disabled because "
65  "you compiled with the --enable-valgrind option.");
66 #endif // USING_VALGRIND
67 
69 }
70 
72 {
73  queueThreadCondLock.lock();
74  processQueue = false;
75  queueThreadCond.wakeAll();
76  queueThreadCondLock.unlock();
77 
78  queueThread->wait();
79  delete queueThread;
80  queueThread = NULL;
81 
83 
84  delete runningJobsLock;
85 }
86 
87 void JobQueue::customEvent(QEvent *e)
88 {
89  if ((MythEvent::Type)(e->type()) == MythEvent::MythEventMessage)
90  {
91  MythEvent *me = (MythEvent *)e;
92  QString message = me->Message();
93 
94  if (message.startsWith("LOCAL_JOB"))
95  {
96  // LOCAL_JOB action ID jobID
97  // LOCAL_JOB action type chanid recstartts hostname
98  QString msg;
99  message = message.simplified();
100  QStringList tokens = message.split(" ", QString::SkipEmptyParts);
101  QString action = tokens[1];
102  int jobID = -1;
103 
104  if (tokens[2] == "ID")
105  jobID = tokens[3].toInt();
106  else
107  {
108  jobID = GetJobID(
109  tokens[2].toInt(),
110  tokens[3].toUInt(),
111  MythDate::fromString(tokens[4]));
112  }
113 
114  runningJobsLock->lock();
115  if (!runningJobs.contains(jobID))
116  {
117  msg = QString("Unable to determine jobID for message: "
118  "%1. Program will not be flagged.")
119  .arg(message);
120  LOG(VB_GENERAL, LOG_ERR, LOC + msg);
121  runningJobsLock->unlock();
122  return;
123  }
124  runningJobsLock->unlock();
125 
126  msg = QString("Received message '%1'").arg(message);
127  LOG(VB_JOBQUEUE, LOG_INFO, LOC + msg);
128 
129  if ((action == "STOP") ||
130  (action == "PAUSE") ||
131  (action == "RESTART") ||
132  (action == "RESUME" ))
133  {
134  runningJobsLock->lock();
135 
136  if (action == "STOP")
137  runningJobs[jobID].flag = JOB_STOP;
138  else if (action == "PAUSE")
139  runningJobs[jobID].flag = JOB_PAUSE;
140  else if (action == "RESUME")
141  runningJobs[jobID].flag = JOB_RUN;
142  else if (action == "RESTART")
143  runningJobs[jobID].flag = JOB_RESTART;
144 
145  runningJobsLock->unlock();
146  }
147  }
148  }
149 }
150 
151 void JobQueue::run(void)
152 {
153  queueThreadCondLock.lock();
154  queueThreadCond.wakeAll();
155  queueThreadCondLock.unlock();
156 
157  RecoverQueue();
158 
159  queueThreadCondLock.lock();
160  queueThreadCond.wait(&queueThreadCondLock, 10 * 1000);
161  queueThreadCondLock.unlock();
162 
163  ProcessQueue();
164 }
165 
167 {
168  LOG(VB_JOBQUEUE, LOG_INFO, LOC + "ProcessQueue() started");
169 
170  QString logInfo;
171  int jobID;
172  int cmds;
173  //int flags;
174  int status;
175  QString hostname;
176  int sleepTime;
177 
178  QMap<int, int> jobStatus;
179  int maxJobs;
180  QString message;
181  QMap<int, JobQueueEntry> jobs;
182  bool atMax = false;
183  bool inTimeWindow = true;
184  bool startedJobAlready = false;
185  QMap<int, RunningJobInfo>::Iterator rjiter;
186 
187  QMutexLocker locker(&queueThreadCondLock);
188  while (processQueue)
189  {
190  locker.unlock();
191 
192  startedJobAlready = false;
193  sleepTime = gCoreContext->GetNumSetting("JobQueueCheckFrequency", 30);
194  maxJobs = gCoreContext->GetNumSetting("JobQueueMaxSimultaneousJobs", 3);
195  LOG(VB_JOBQUEUE, LOG_INFO, LOC +
196  QString("Currently set to run up to %1 job(s) max.")
197  .arg(maxJobs));
198 
199  jobStatus.clear();
200 
201  runningJobsLock->lock();
202  for (rjiter = runningJobs.begin(); rjiter != runningJobs.end();
203  ++rjiter)
204  {
205  if ((*rjiter).pginfo)
206  (*rjiter).pginfo->UpdateInUseMark();
207  }
208  runningJobsLock->unlock();
209 
210  jobsRunning = 0;
211  GetJobsInQueue(jobs);
212 
213  if (jobs.size())
214  {
215  inTimeWindow = InJobRunWindow();
216  for (int x = 0; x < jobs.size(); x++)
217  {
218  status = jobs[x].status;
219  hostname = jobs[x].hostname;
220 
221  if (((status == JOB_RUNNING) ||
222  (status == JOB_STARTING) ||
223  (status == JOB_PAUSED)) &&
224  (hostname == m_hostname))
225  jobsRunning++;
226  }
227 
228  message = QString("Currently Running %1 jobs.")
229  .arg(jobsRunning);
230  if (!inTimeWindow)
231  {
232  message += QString(" Jobs in Queue, but we are outside of the "
233  "Job Queue time window, no new jobs can be "
234  "started.");
235  LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
236  }
237  else if (jobsRunning >= maxJobs)
238  {
239  message += " (At Maximum, no new jobs can be started until "
240  "a running job completes)";
241 
242  if (!atMax)
243  LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
244 
245  atMax = true;
246  }
247  else
248  {
249  LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
250  atMax = false;
251  }
252 
253 
254  for ( int x = 0;
255  (x < jobs.size()) && (jobsRunning < maxJobs); x++)
256  {
257  jobID = jobs[x].id;
258  cmds = jobs[x].cmds;
259  //flags = jobs[x].flags;
260  status = jobs[x].status;
261  hostname = jobs[x].hostname;
262 
263  if (!jobs[x].chanid)
264  logInfo = QString("jobID #%1").arg(jobID);
265  else
266  logInfo = QString("chanid %1 @ %2").arg(jobs[x].chanid)
267  .arg(jobs[x].startts);
268 
269  // Should we even be looking at this job?
270  if ((inTimeWindow) &&
271  (!hostname.isEmpty()) &&
272  (hostname != m_hostname))
273  {
274  // Setting the status here will prevent us from processing
275  // any other jobs for this recording until this one is
276  // completed on the remote host.
277  jobStatus[jobID] = status;
278 
279  message = QString("Skipping '%1' job for %2, "
280  "should run on '%3' instead")
281  .arg(JobText(jobs[x].type)).arg(logInfo)
282  .arg(hostname);
283  LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
284  continue;
285  }
286 
287  // Check to see if there was a previous job that is not done
288  if (inTimeWindow)
289  {
290  int otherJobID = GetRunningJobID(jobs[x].chanid,
291  jobs[x].recstartts);
292  if (otherJobID && (jobStatus.contains(otherJobID)) &&
293  (!(jobStatus[otherJobID] & JOB_DONE)))
294  {
295  message =
296  QString("Skipping '%1' job for %2, "
297  "Job ID %3 is already running for "
298  "this recording with a status of '%4'")
299  .arg(JobText(jobs[x].type)).arg(logInfo)
300  .arg(otherJobID)
301  .arg(StatusText(jobStatus[otherJobID]));
302  LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
303  continue;
304  }
305  }
306 
307  jobStatus[jobID] = status;
308 
309  // Are we allowed to run this job?
310  if ((inTimeWindow) && (!AllowedToRun(jobs[x])))
311  {
312  message = QString("Skipping '%1' job for %2, "
313  "not allowed to run on this backend.")
314  .arg(JobText(jobs[x].type)).arg(logInfo);
315  LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
316  continue;
317  }
318 
319  // Is this job scheduled for the future
320  if (jobs[x].schedruntime > MythDate::current())
321  {
322  message = QString("Skipping '%1' job for %2, this job is "
323  "not scheduled to run until %3.")
324  .arg(JobText(jobs[x].type)).arg(logInfo)
325  .arg(jobs[x].schedruntime
327  LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
328  continue;
329  }
330 
331  if (cmds & JOB_STOP)
332  {
333  // if we're trying to stop a job and it's not queued
334  // then lets send a STOP command
335  if (status != JOB_QUEUED) {
336  message = QString("Stopping '%1' job for %2")
337  .arg(JobText(jobs[x].type))
338  .arg(logInfo);
339  LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
340 
341  runningJobsLock->lock();
342  if (runningJobs.contains(jobID))
343  runningJobs[jobID].flag = JOB_STOP;
344  runningJobsLock->unlock();
345 
346  // ChangeJobCmds(m_db, jobID, JOB_RUN);
347  continue;
348 
349  // if we're trying to stop a job and it's still queued
350  // then let's just change the status to cancelled so
351  // we don't try to run it from the queue
352  } else {
353  message = QString("Cancelling '%1' job for %2")
354  .arg(JobText(jobs[x].type))
355  .arg(logInfo);
356  LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
357 
358  // at the bottom of this loop we requeue any jobs that
359  // are not currently queued and also not associated
360  // with a hostname so we must claim this job before we
361  // can cancel it
362  if (!ChangeJobHost(jobID, m_hostname))
363  {
364  message = QString("Unable to claim '%1' job for %2")
365  .arg(JobText(jobs[x].type))
366  .arg(logInfo);
367  LOG(VB_JOBQUEUE, LOG_ERR, LOC + message);
368  continue;
369  }
370 
371  ChangeJobStatus(jobID, JOB_CANCELLED, "");
372  ChangeJobCmds(jobID, JOB_RUN);
373  continue;
374  }
375  }
376 
377  if ((cmds & JOB_PAUSE) && (status != JOB_QUEUED))
378  {
379  message = QString("Pausing '%1' job for %2")
380  .arg(JobText(jobs[x].type)).arg(logInfo);
381  LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
382 
383  runningJobsLock->lock();
384  if (runningJobs.contains(jobID))
385  runningJobs[jobID].flag = JOB_PAUSE;
386  runningJobsLock->unlock();
387 
388  ChangeJobCmds(jobID, JOB_RUN);
389  continue;
390  }
391 
392  if ((cmds & JOB_RESTART) && (status != JOB_QUEUED))
393  {
394  message = QString("Restart '%1' job for %2")
395  .arg(JobText(jobs[x].type)).arg(logInfo);
396  LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
397 
398  runningJobsLock->lock();
399  if (runningJobs.contains(jobID))
400  runningJobs[jobID].flag = JOB_RUN;
401  runningJobsLock->unlock();
402 
403  ChangeJobCmds(jobID, JOB_RUN);
404  continue;
405  }
406 
407  if (status != JOB_QUEUED)
408  {
409 
410  if (hostname.isEmpty())
411  {
412  message = QString("Resetting '%1' job for %2 to %3 "
413  "status, because no hostname is set.")
414  .arg(JobText(jobs[x].type))
415  .arg(logInfo)
416  .arg(StatusText(JOB_QUEUED));
417  LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
418 
419  ChangeJobStatus(jobID, JOB_QUEUED, "");
420  ChangeJobCmds(jobID, JOB_RUN);
421  }
422  else if (inTimeWindow)
423  {
424  message = QString("Skipping '%1' job for %2, "
425  "current job status is '%3'")
426  .arg(JobText(jobs[x].type))
427  .arg(logInfo)
428  .arg(StatusText(status));
429  LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
430  }
431  continue;
432  }
433 
434  // never start or claim more than one job in a single run
435  if (startedJobAlready)
436  continue;
437 
438  if ((inTimeWindow) &&
439  (hostname.isEmpty()) &&
440  (!ChangeJobHost(jobID, m_hostname)))
441  {
442  message = QString("Unable to claim '%1' job for %2")
443  .arg(JobText(jobs[x].type)).arg(logInfo);
444  LOG(VB_JOBQUEUE, LOG_ERR, LOC + message);
445  continue;
446  }
447 
448  if (!inTimeWindow)
449  {
450  message = QString("Skipping '%1' job for %2, "
451  "current time is outside of the "
452  "Job Queue processing window.")
453  .arg(JobText(jobs[x].type)).arg(logInfo);
454  LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
455  continue;
456  }
457 
458  message = QString("Processing '%1' job for %2, "
459  "current status is '%3'")
460  .arg(JobText(jobs[x].type)).arg(logInfo)
461  .arg(StatusText(status));
462  LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
463 
464  ProcessJob(jobs[x]);
465 
466  startedJobAlready = true;
467  }
468  }
469 
470  if (QCoreApplication::applicationName() == MYTH_APPNAME_MYTHJOBQUEUE)
471  {
472  if (jobsRunning > 0)
473  {
474  if (!(gCoreContext->IsBlockingClient()))
475  {
477  LOG(VB_JOBQUEUE, LOG_INFO, QString("%1 jobs running. "
478  "Blocking shutdown.").arg(jobsRunning));
479  }
480  }
481  else
482  {
484  {
486  LOG(VB_JOBQUEUE, LOG_INFO, "No jobs running. "
487  "Allowing shutdown.");
488  }
489  }
490  }
491 
492 
493  locker.relock();
494  if (processQueue)
495  {
496  int st = (startedJobAlready) ? (5 * 1000) : (sleepTime * 1000);
497  if (st > 0)
498  queueThreadCond.wait(locker.mutex(), st);
499  }
500  }
501 }
502 
503 bool JobQueue::QueueRecordingJobs(const RecordingInfo &recinfo, int jobTypes)
504 {
505  if (jobTypes == JOB_NONE)
506  jobTypes = recinfo.GetAutoRunJobs();
507 
508  if (recinfo.IsCommercialFree())
509  jobTypes &= (~JOB_COMMFLAG);
510 
511  if (jobTypes != JOB_NONE)
512  {
513  QString jobHost = QString("");
514 
515  if (gCoreContext->GetNumSetting("JobsRunOnRecordHost", 0))
516  jobHost = recinfo.GetHostname();
517 
518  return JobQueue::QueueJobs(
519  jobTypes, recinfo.GetChanID(), recinfo.GetRecordingStartTime(),
520  "", "", jobHost);
521  }
522  else
523  return false;
524 
525  return true;
526 }
527 
528 bool JobQueue::QueueJob(int jobType, uint chanid, const QDateTime &recstartts,
529  QString args, QString comment, QString host,
530  int flags, int status, QDateTime schedruntime)
531 {
532  int tmpStatus = JOB_UNKNOWN;
533  int tmpCmd = JOB_UNKNOWN;
534  int jobID = -1;
535  int chanidInt = -1;
536 
537  if(!schedruntime.isValid())
538  schedruntime = MythDate::current();
539 
540  MSqlQuery query(MSqlQuery::InitCon());
541 
542  // In order to replace a job, we must have a chanid/recstartts combo
543  if (chanid)
544  {
545  query.prepare("SELECT status, id, cmds FROM jobqueue "
546  "WHERE chanid = :CHANID AND starttime = :STARTTIME "
547  "AND type = :JOBTYPE;");
548  query.bindValue(":CHANID", chanid);
549  query.bindValue(":STARTTIME", recstartts);
550  query.bindValue(":JOBTYPE", jobType);
551 
552  if (!query.exec())
553  {
554  MythDB::DBError("Error in JobQueue::QueueJob()", query);
555  return false;
556  }
557  else
558  {
559  if (query.next())
560  {
561  tmpStatus = query.value(0).toInt();
562  jobID = query.value(1).toInt();
563  tmpCmd = query.value(2).toInt();
564  }
565  }
566  switch (tmpStatus)
567  {
568  case JOB_UNKNOWN:
569  break;
570  case JOB_STARTING:
571  case JOB_RUNNING:
572  case JOB_PAUSED:
573  case JOB_STOPPING:
574  case JOB_ERRORING:
575  case JOB_ABORTING:
576  return false;
577  default:
578  DeleteJob(jobID);
579  break;
580  }
581  if (! (tmpStatus & JOB_DONE) && (tmpCmd & JOB_STOP))
582  return false;
583 
584  chanidInt = chanid;
585  }
586 
587  if (host.isNull())
588  host = QString("");
589 
590  query.prepare("INSERT INTO jobqueue (chanid, starttime, inserttime, type, "
591  "status, statustime, schedruntime, hostname, args, comment, "
592  "flags) "
593  "VALUES (:CHANID, :STARTTIME, now(), :JOBTYPE, :STATUS, "
594  "now(), :SCHEDRUNTIME, :HOST, :ARGS, :COMMENT, :FLAGS);");
595 
596  query.bindValue(":CHANID", chanidInt);
597  query.bindValue(":STARTTIME", recstartts);
598  query.bindValue(":JOBTYPE", jobType);
599  query.bindValue(":STATUS", status);
600  query.bindValue(":SCHEDRUNTIME", schedruntime);
601  query.bindValue(":HOST", host);
602  query.bindValue(":ARGS", args);
603  query.bindValue(":COMMENT", comment);
604  query.bindValue(":FLAGS", flags);
605 
606  if (!query.exec())
607  {
608  MythDB::DBError("Error in JobQueue::StartJob()", query);
609  return false;
610  }
611 
612  return true;
613 }
614 
615 bool JobQueue::QueueJobs(int jobTypes, uint chanid, const QDateTime &recstartts,
616  QString args, QString comment, QString host)
617 {
618  if (gCoreContext->GetNumSetting("AutoTranscodeBeforeAutoCommflag", 0))
619  {
620  if (jobTypes & JOB_METADATA)
621  QueueJob(JOB_METADATA, chanid, recstartts, args, comment, host);
622  if (jobTypes & JOB_TRANSCODE)
623  QueueJob(JOB_TRANSCODE, chanid, recstartts, args, comment, host);
624  if (jobTypes & JOB_COMMFLAG)
625  QueueJob(JOB_COMMFLAG, chanid, recstartts, args, comment, host);
626  }
627  else
628  {
629  if (jobTypes & JOB_METADATA)
630  QueueJob(JOB_METADATA, chanid, recstartts, args, comment, host);
631  if (jobTypes & JOB_COMMFLAG)
632  QueueJob(JOB_COMMFLAG, chanid, recstartts, args, comment, host);
633  if (jobTypes & JOB_TRANSCODE)
634  {
635  QDateTime schedruntime = MythDate::current();
636 
637  int defer = gCoreContext->GetNumSetting("DeferAutoTranscodeDays", 0);
638  if (defer)
639  {
640  schedruntime = QDateTime(schedruntime.addDays(defer).date(),
641  QTime(0,0,0), Qt::UTC);
642  }
643 
644  QueueJob(JOB_TRANSCODE, chanid, recstartts, args, comment, host,
645  0, JOB_QUEUED, schedruntime);
646  }
647  }
648 
649  if (jobTypes & JOB_USERJOB1)
650  QueueJob(JOB_USERJOB1, chanid, recstartts, args, comment, host);
651  if (jobTypes & JOB_USERJOB2)
652  QueueJob(JOB_USERJOB2, chanid, recstartts, args, comment, host);
653  if (jobTypes & JOB_USERJOB3)
654  QueueJob(JOB_USERJOB3, chanid, recstartts, args, comment, host);
655  if (jobTypes & JOB_USERJOB4)
656  QueueJob(JOB_USERJOB4, chanid, recstartts, args, comment, host);
657 
658  return true;
659 }
660 
661 int JobQueue::GetJobID(int jobType, uint chanid, const QDateTime &recstartts)
662 {
663  MSqlQuery query(MSqlQuery::InitCon());
664 
665  query.prepare("SELECT id FROM jobqueue "
666  "WHERE chanid = :CHANID AND starttime = :STARTTIME "
667  "AND type = :JOBTYPE;");
668  query.bindValue(":CHANID", chanid);
669  query.bindValue(":STARTTIME", recstartts);
670  query.bindValue(":JOBTYPE", jobType);
671 
672  if (!query.exec())
673  {
674  MythDB::DBError("Error in JobQueue::GetJobID()", query);
675  return -1;
676  }
677  else
678  {
679  if (query.next())
680  return query.value(0).toInt();
681  }
682 
683  return -1;
684 }
685 
687  int jobID, int &jobType, uint &chanid, QDateTime &recstartts)
688 {
689  MSqlQuery query(MSqlQuery::InitCon());
690 
691  query.prepare("SELECT type, chanid, starttime FROM jobqueue "
692  "WHERE id = :ID;");
693 
694  query.bindValue(":ID", jobID);
695 
696  if (!query.exec())
697  {
698  MythDB::DBError("Error in JobQueue::GetJobInfoFromID()", query);
699  return false;
700  }
701  else
702  {
703  if (query.next())
704  {
705  jobType = query.value(0).toInt();
706  chanid = query.value(1).toUInt();
707  recstartts = MythDate::as_utc(query.value(2).toDateTime());
708  return true;
709  }
710  }
711 
712  return false;
713 }
714 
716  int jobID, int &jobType, uint &chanid, QString &recstartts)
717 {
718  QDateTime tmpStarttime;
719 
720  bool result = JobQueue::GetJobInfoFromID(
721  jobID, jobType, chanid, tmpStarttime);
722 
723  if (result)
724  recstartts = MythDate::toString(tmpStarttime, MythDate::kFilename);
725 
726  return result;
727 }
728 
730 {
731  QString message = QString("GLOBAL_JOB PAUSE ID %1").arg(jobID);
732  MythEvent me(message);
733  gCoreContext->dispatch(me);
734 
735  return ChangeJobCmds(jobID, JOB_PAUSE);
736 }
737 
739 {
740  QString message = QString("GLOBAL_JOB RESUME ID %1").arg(jobID);
741  MythEvent me(message);
742  gCoreContext->dispatch(me);
743 
744  return ChangeJobCmds(jobID, JOB_RESUME);
745 }
746 
748 {
749  QString message = QString("GLOBAL_JOB RESTART ID %1").arg(jobID);
750  MythEvent me(message);
751  gCoreContext->dispatch(me);
752 
753  return ChangeJobCmds(jobID, JOB_RESTART);
754 }
755 
757 {
758  QString message = QString("GLOBAL_JOB STOP ID %1").arg(jobID);
759  MythEvent me(message);
760  gCoreContext->dispatch(me);
761 
762  return ChangeJobCmds(jobID, JOB_STOP);
763 }
764 
765 bool JobQueue::DeleteAllJobs(uint chanid, const QDateTime &recstartts)
766 {
767  MSqlQuery query(MSqlQuery::InitCon());
768  QString message;
769 
770  query.prepare("UPDATE jobqueue SET status = :CANCELLED "
771  "WHERE chanid = :CHANID AND starttime = :STARTTIME "
772  "AND status = :QUEUED;");
773 
774  query.bindValue(":CANCELLED", JOB_CANCELLED);
775  query.bindValue(":CHANID", chanid);
776  query.bindValue(":STARTTIME", recstartts);
777  query.bindValue(":QUEUED", JOB_QUEUED);
778 
779  if (!query.exec())
780  MythDB::DBError("Cancel Pending Jobs", query);
781 
782  query.prepare("UPDATE jobqueue SET cmds = :CMD "
783  "WHERE chanid = :CHANID AND starttime = :STARTTIME "
784  "AND status <> :CANCELLED;");
785  query.bindValue(":CMD", JOB_STOP);
786  query.bindValue(":CHANID", chanid);
787  query.bindValue(":STARTTIME", recstartts);
788  query.bindValue(":CANCELLED", JOB_CANCELLED);
789 
790  if (!query.exec())
791  {
792  MythDB::DBError("Stop Unfinished Jobs", query);
793  return false;
794  }
795 
796  // wait until running job(s) are done
797  bool jobsAreRunning = true;
798  int totalSlept = 0;
799  int maxSleep = 90;
800  while (jobsAreRunning && totalSlept < maxSleep)
801  {
802  usleep(1000);
803  query.prepare("SELECT id FROM jobqueue "
804  "WHERE chanid = :CHANID and starttime = :STARTTIME "
805  "AND status NOT IN "
806  "(:FINISHED,:ABORTED,:ERRORED,:CANCELLED);");
807  query.bindValue(":CHANID", chanid);
808  query.bindValue(":STARTTIME", recstartts);
809  query.bindValue(":FINISHED", JOB_FINISHED);
810  query.bindValue(":ABORTED", JOB_ABORTED);
811  query.bindValue(":ERRORED", JOB_ERRORED);
812  query.bindValue(":CANCELLED", JOB_CANCELLED);
813 
814  if (!query.exec())
815  {
816  MythDB::DBError("Stop Unfinished Jobs", query);
817  return false;
818  }
819 
820  if (query.size() == 0)
821  {
822  jobsAreRunning = false;
823  break;
824  }
825  else if ((totalSlept % 5) == 0)
826  {
827  message = QString("Waiting on %1 jobs still running for "
828  "chanid %2 @ %3").arg(query.size())
829  .arg(chanid).arg(recstartts.toString(Qt::ISODate));
830  LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
831  }
832 
833  sleep(1);
834  totalSlept++;
835  }
836 
837  if (totalSlept <= maxSleep)
838  {
839  query.prepare("DELETE FROM jobqueue "
840  "WHERE chanid = :CHANID AND starttime = :STARTTIME;");
841  query.bindValue(":CHANID", chanid);
842  query.bindValue(":STARTTIME", recstartts);
843 
844  if (!query.exec())
845  MythDB::DBError("Delete All Jobs", query);
846  }
847  else
848  {
849  query.prepare("SELECT id, type, status, comment FROM jobqueue "
850  "WHERE chanid = :CHANID AND starttime = :STARTTIME "
851  "AND status <> :CANCELLED ORDER BY id;");
852 
853  query.bindValue(":CHANID", chanid);
854  query.bindValue(":STARTTIME", recstartts);
855  query.bindValue(":CANCELLED", JOB_CANCELLED);
856 
857  if (!query.exec())
858  {
859  MythDB::DBError("Error in JobQueue::DeleteAllJobs(), Unable "
860  "to query list of Jobs left in Queue.", query);
861  return 0;
862  }
863 
864  LOG(VB_GENERAL, LOG_ERR, LOC +
865  QString( "In DeleteAllJobs: There are Jobs "
866  "left in the JobQueue that are still running for "
867  "chanid %1 @ %2.").arg(chanid)
868  .arg(recstartts.toString(Qt::ISODate)));
869 
870  while (query.next())
871  {
872  LOG(VB_GENERAL, LOG_ERR, LOC +
873  QString("Job ID %1: '%2' with status '%3' and comment '%4'")
874  .arg(query.value(0).toInt())
875  .arg(JobText(query.value(1).toInt()))
876  .arg(StatusText(query.value(2).toInt()))
877  .arg(query.value(3).toString()));
878  }
879 
880  return false;
881  }
882 
883  return true;
884 }
885 
887 {
888  if (jobID < 0)
889  return false;
890 
891  MSqlQuery query(MSqlQuery::InitCon());
892 
893  query.prepare("DELETE FROM jobqueue WHERE id = :ID;");
894 
895  query.bindValue(":ID", jobID);
896 
897  if (!query.exec())
898  {
899  MythDB::DBError("Error in JobQueue::DeleteJob()", query);
900  return false;
901  }
902 
903  return true;
904 }
905 
906 bool JobQueue::ChangeJobCmds(int jobID, int newCmds)
907 {
908  if (jobID < 0)
909  return false;
910 
911  MSqlQuery query(MSqlQuery::InitCon());
912 
913  query.prepare("UPDATE jobqueue SET cmds = :CMDS WHERE id = :ID;");
914 
915  query.bindValue(":CMDS", newCmds);
916  query.bindValue(":ID", jobID);
917 
918  if (!query.exec())
919  {
920  MythDB::DBError("Error in JobQueue::ChangeJobCmds()", query);
921  return false;
922  }
923 
924  return true;
925 }
926 
927 bool JobQueue::ChangeJobCmds(int jobType, uint chanid,
928  const QDateTime &recstartts, int newCmds)
929 {
930  MSqlQuery query(MSqlQuery::InitCon());
931 
932  query.prepare("UPDATE jobqueue SET cmds = :CMDS WHERE type = :TYPE "
933  "AND chanid = :CHANID AND starttime = :STARTTIME;");
934 
935  query.bindValue(":CMDS", newCmds);
936  query.bindValue(":TYPE", jobType);
937  query.bindValue(":CHANID", chanid);
938  query.bindValue(":STARTTIME", recstartts);
939 
940  if (!query.exec())
941  {
942  MythDB::DBError("Error in JobQueue::ChangeJobCmds()", query);
943  return false;
944  }
945 
946  return true;
947 }
948 
949 bool JobQueue::ChangeJobFlags(int jobID, int newFlags)
950 {
951  if (jobID < 0)
952  return false;
953 
954  MSqlQuery query(MSqlQuery::InitCon());
955 
956  query.prepare("UPDATE jobqueue SET flags = :FLAGS WHERE id = :ID;");
957 
958  query.bindValue(":FLAGS", newFlags);
959  query.bindValue(":ID", jobID);
960 
961  if (!query.exec())
962  {
963  MythDB::DBError("Error in JobQueue::ChangeJobFlags()", query);
964  return false;
965  }
966 
967  return true;
968 }
969 
970 bool JobQueue::ChangeJobStatus(int jobID, int newStatus, QString comment)
971 {
972  if (jobID < 0)
973  return false;
974 
975  LOG(VB_JOBQUEUE, LOG_INFO, LOC + QString("ChangeJobStatus(%1, %2, '%3')")
976  .arg(jobID).arg(StatusText(newStatus)).arg(comment));
977 
978  MSqlQuery query(MSqlQuery::InitCon());
979 
980  query.prepare("UPDATE jobqueue SET status = :STATUS, comment = :COMMENT "
981  "WHERE id = :ID AND status <> :NEWSTATUS;");
982 
983  query.bindValue(":STATUS", newStatus);
984  query.bindValue(":COMMENT", comment);
985  query.bindValue(":ID", jobID);
986  query.bindValue(":NEWSTATUS", newStatus);
987 
988  if (!query.exec())
989  {
990  MythDB::DBError("Error in JobQueue::ChangeJobStatus()", query);
991  return false;
992  }
993 
994  return true;
995 }
996 
997 bool JobQueue::ChangeJobComment(int jobID, QString comment)
998 {
999  if (jobID < 0)
1000  return false;
1001 
1002  LOG(VB_JOBQUEUE, LOG_INFO, LOC + QString("ChangeJobComment(%1, '%2')")
1003  .arg(jobID).arg(comment));
1004 
1005  MSqlQuery query(MSqlQuery::InitCon());
1006 
1007  query.prepare("UPDATE jobqueue SET comment = :COMMENT "
1008  "WHERE id = :ID;");
1009 
1010  query.bindValue(":COMMENT", comment);
1011  query.bindValue(":ID", jobID);
1012 
1013  if (!query.exec())
1014  {
1015  MythDB::DBError("Error in JobQueue::ChangeJobComment()", query);
1016  return false;
1017  }
1018 
1019  return true;
1020 }
1021 
1022 bool JobQueue::ChangeJobArgs(int jobID, QString args)
1023 {
1024  if (jobID < 0)
1025  return false;
1026 
1027  MSqlQuery query(MSqlQuery::InitCon());
1028 
1029  query.prepare("UPDATE jobqueue SET args = :ARGS "
1030  "WHERE id = :ID;");
1031 
1032  query.bindValue(":ARGS", args);
1033  query.bindValue(":ID", jobID);
1034 
1035  if (!query.exec())
1036  {
1037  MythDB::DBError("Error in JobQueue::ChangeJobArgs()", query);
1038  return false;
1039  }
1040 
1041  return true;
1042 }
1043 
1044 int JobQueue::GetRunningJobID(uint chanid, const QDateTime &recstartts)
1045 {
1046  runningJobsLock->lock();
1047  QMap<int, RunningJobInfo>::iterator it = runningJobs.begin();
1048  for (; it != runningJobs.end(); ++it)
1049  {
1050  RunningJobInfo jInfo = *it;
1051 
1052  if ((jInfo.pginfo->GetChanID() == chanid) &&
1053  (jInfo.pginfo->GetRecordingStartTime() == recstartts))
1054  {
1055  runningJobsLock->unlock();
1056 
1057  return jInfo.id;
1058  }
1059  }
1060  runningJobsLock->unlock();
1061 
1062  return 0;
1063 }
1064 
1065 bool JobQueue::IsJobRunning(int jobType,
1066  uint chanid, const QDateTime &recstartts)
1067 {
1068  int tmpStatus = GetJobStatus(jobType, chanid, recstartts);
1069 
1070  if ((tmpStatus != JOB_UNKNOWN) && (tmpStatus != JOB_QUEUED) &&
1071  (!(tmpStatus & JOB_DONE)))
1072  return true;
1073 
1074  return false;
1075 }
1076 
1077 bool JobQueue::IsJobRunning(int jobType, const ProgramInfo &pginfo)
1078 {
1079  return JobQueue::IsJobRunning(
1080  jobType, pginfo.GetChanID(), pginfo.GetRecordingStartTime());
1081 }
1082 
1084  int jobType, uint chanid, const QDateTime &recstartts)
1085 {
1086  int tmpStatus = GetJobStatus(jobType, chanid, recstartts);
1087 
1088  if ((tmpStatus != JOB_UNKNOWN) && (!(tmpStatus & JOB_DONE)))
1089  return true;
1090 
1091  return false;
1092 }
1093 
1095  int jobType, uint chanid, const QDateTime &recstartts)
1096 {
1097  int tmpStatus = GetJobStatus(jobType, chanid, recstartts);
1098 
1099  if (tmpStatus & JOB_QUEUED)
1100  return true;
1101 
1102  return false;
1103 }
1104 
1105 QString JobQueue::JobText(int jobType)
1106 {
1107  switch (jobType)
1108  {
1109  case JOB_TRANSCODE: return tr("Transcode");
1110  case JOB_COMMFLAG: return tr("Flag Commercials");
1111  case JOB_METADATA: return tr("Look up Metadata");
1112  }
1113 
1114  if (jobType & JOB_USERJOB)
1115  {
1116  QString settingName =
1117  QString("UserJobDesc%1").arg(UserJobTypeToIndex(jobType));
1118  return gCoreContext->GetSetting(settingName, settingName);
1119  }
1120 
1121  return tr("Unknown Job");
1122 }
1123 
1124 QString JobQueue::StatusText(int status)
1125 {
1126  switch (status)
1127  {
1128 #define JOBSTATUS_STATUSTEXT(A,B,C) case A: return C;
1129  JOBSTATUS_MAP(JOBSTATUS_STATUSTEXT)
1130  default: break;
1131  }
1132  return tr("Undefined");
1133 }
1134 
1135 bool JobQueue::InJobRunWindow(int orStartsWithinMins)
1136 {
1137  QString queueStartTimeStr;
1138  QString queueEndTimeStr;
1139  QTime queueStartTime;
1140  QTime queueEndTime;
1141  QTime curTime = QTime::currentTime();
1142  bool inTimeWindow = false;
1143  orStartsWithinMins = orStartsWithinMins < 0 ? 0 : orStartsWithinMins;
1144 
1145  queueStartTimeStr = gCoreContext->GetSetting("JobQueueWindowStart", "00:00");
1146  queueEndTimeStr = gCoreContext->GetSetting("JobQueueWindowEnd", "23:59");
1147 
1148  queueStartTime = QTime::fromString(queueStartTimeStr, "hh:mm");
1149  if (!queueStartTime.isValid())
1150  {
1151  LOG(VB_GENERAL, LOG_ERR,
1152  QString("Invalid JobQueueWindowStart time '%1', using 00:00")
1153  .arg(queueStartTimeStr));
1154  queueStartTime = QTime(0, 0);
1155  }
1156 
1157  queueEndTime = QTime::fromString(queueEndTimeStr, "hh:mm");
1158  if (!queueEndTime.isValid())
1159  {
1160  LOG(VB_GENERAL, LOG_ERR,
1161  QString("Invalid JobQueueWindowEnd time '%1', using 23:59")
1162  .arg(queueEndTimeStr));
1163  queueEndTime = QTime(23, 59);
1164  }
1165 
1166  LOG(VB_JOBQUEUE, LOG_INFO, LOC +
1167  QString("Currently set to run new jobs from %1 to %2")
1168  .arg(queueStartTimeStr).arg(queueEndTimeStr));
1169 
1170  if ((queueStartTime <= curTime) && (curTime < queueEndTime))
1171  {
1172  inTimeWindow = true;
1173  }
1174  else if ((queueStartTime > queueEndTime) &&
1175  ((curTime < queueEndTime) || (queueStartTime <= curTime)))
1176  {
1177  inTimeWindow = true;
1178  }
1179  else if (orStartsWithinMins > 0)
1180  {
1181  // Check if the window starts soon
1182  if (curTime <= queueStartTime)
1183  {
1184  // Start time hasn't passed yet today
1185  if (queueStartTime.secsTo(curTime) <= (orStartsWithinMins * 60))
1186  {
1187  LOG(VB_JOBQUEUE, LOG_INFO, LOC +
1188  QString("Job run window will start within %1 minutes")
1189  .arg(orStartsWithinMins));
1190  inTimeWindow = true;
1191  }
1192  }
1193  else
1194  {
1195  // We passed the start time for today, try tomorrow
1196  QDateTime curDateTime = MythDate::current();
1197  QDateTime startDateTime = QDateTime(
1198  curDateTime.date(), queueStartTime, Qt::UTC).addDays(1);
1199 
1200  if (curDateTime.secsTo(startDateTime) <= (orStartsWithinMins * 60))
1201  {
1202  LOG(VB_JOBQUEUE, LOG_INFO, LOC +
1203  QString("Job run window will start "
1204  "within %1 minutes (tomorrow)")
1205  .arg(orStartsWithinMins));
1206  inTimeWindow = true;
1207  }
1208  }
1209  }
1210 
1211  return inTimeWindow;
1212 }
1213 
1214 bool JobQueue::HasRunningOrPendingJobs(int startingWithinMins)
1215 {
1216  /* startingWithinMins <= 0 - look for any pending jobs
1217  > 0 - only consider pending starting within this time */
1218  QMap<int, JobQueueEntry> jobs;
1219  QMap<int, JobQueueEntry>::Iterator it;
1220  QDateTime maxSchedRunTime = MythDate::current();
1221  int tmpStatus = 0;
1222  bool checkForQueuedJobs = (startingWithinMins <= 0
1223  || InJobRunWindow(startingWithinMins));
1224 
1225  if (checkForQueuedJobs && startingWithinMins > 0) {
1226  maxSchedRunTime = maxSchedRunTime.addSecs(startingWithinMins * 60);
1227  LOG(VB_JOBQUEUE, LOG_INFO, LOC +
1228  QString("HasRunningOrPendingJobs: checking for jobs "
1229  "starting before: %1")
1230  .arg(maxSchedRunTime.toString(Qt::ISODate)));
1231  }
1232 
1234 
1235  if (jobs.size()) {
1236  for (it = jobs.begin(); it != jobs.end(); ++it)
1237  {
1238  tmpStatus = (*it).status;
1239  if (tmpStatus == JOB_RUNNING) {
1240  LOG(VB_JOBQUEUE, LOG_INFO, LOC +
1241  QString("HasRunningOrPendingJobs: found running job"));
1242  return true;
1243  }
1244 
1245  if (checkForQueuedJobs) {
1246  if ((tmpStatus != JOB_UNKNOWN) && (!(tmpStatus & JOB_DONE))) {
1247  if (startingWithinMins <= 0) {
1248  LOG(VB_JOBQUEUE, LOG_INFO, LOC +
1249  "HasRunningOrPendingJobs: found pending job");
1250  return true;
1251  }
1252  else if ((*it).schedruntime <= maxSchedRunTime) {
1253  LOG(VB_JOBQUEUE, LOG_INFO, LOC +
1254  QString("HasRunningOrPendingJobs: found pending "
1255  "job scheduled to start at: %1")
1256  .arg((*it).schedruntime.toString(Qt::ISODate)));
1257  return true;
1258  }
1259  }
1260  }
1261  }
1262  }
1263  return false;
1264 }
1265 
1266 
1267 int JobQueue::GetJobsInQueue(QMap<int, JobQueueEntry> &jobs, int findJobs)
1268 {
1269  JobQueueEntry thisJob;
1270  MSqlQuery query(MSqlQuery::InitCon());
1271  QDateTime recentDate = MythDate::current().addSecs(-4 * 3600);
1272  QString logInfo;
1273  int jobCount = 0;
1274  bool commflagWhileRecording =
1275  gCoreContext->GetNumSetting("AutoCommflagWhileRecording", 0);
1276 
1277  jobs.clear();
1278 
1279  query.prepare("SELECT j.id, j.chanid, j.starttime, j.inserttime, j.type, "
1280  "j.cmds, j.flags, j.status, j.statustime, j.hostname, "
1281  "j.args, j.comment, r.endtime, j.schedruntime "
1282  "FROM jobqueue j "
1283  "LEFT JOIN recorded r "
1284  " ON j.chanid = r.chanid AND j.starttime = r.starttime "
1285  "ORDER BY j.schedruntime, j.id;");
1286 
1287  if (!query.exec())
1288  {
1289  MythDB::DBError("Error in JobQueue::GetJobs(), Unable to "
1290  "query list of Jobs in Queue.", query);
1291  return 0;
1292  }
1293 
1294  LOG(VB_JOBQUEUE, LOG_INFO, LOC +
1295  QString("GetJobsInQueue: findJobs search bitmask %1, "
1296  "found %2 total jobs")
1297  .arg(findJobs).arg(query.size()));
1298 
1299  while (query.next())
1300  {
1301  bool wantThisJob = false;
1302 
1303  thisJob.id = query.value(0).toInt();
1304  thisJob.recstartts = MythDate::as_utc(query.value(2).toDateTime());
1305  thisJob.schedruntime = MythDate::as_utc(query.value(13).toDateTime());
1306  thisJob.type = query.value(4).toInt();
1307  thisJob.status = query.value(7).toInt();
1308  thisJob.statustime = MythDate::as_utc(query.value(8).toDateTime());
1309  thisJob.startts = MythDate::toString(
1310  thisJob.recstartts, MythDate::kFilename);
1311 
1312  // -1 indicates the chanid is empty
1313  if (query.value(1).toInt() == -1)
1314  {
1315  thisJob.chanid = 0;
1316  logInfo = QString("jobID #%1").arg(thisJob.id);
1317  }
1318  else
1319  {
1320  thisJob.chanid = query.value(1).toUInt();
1321  logInfo = QString("chanid %1 @ %2").arg(thisJob.chanid)
1322  .arg(thisJob.startts);
1323  }
1324 
1325  if ((MythDate::as_utc(query.value(12).toDateTime()) > MythDate::current()) &&
1326  ((!commflagWhileRecording) ||
1327  ((thisJob.type != JOB_COMMFLAG) &&
1328  (thisJob.type != JOB_METADATA))))
1329  {
1330  LOG(VB_JOBQUEUE, LOG_INFO, LOC +
1331  QString("GetJobsInQueue: Ignoring '%1' Job "
1332  "for %2 in %3 state. Endtime in future.")
1333  .arg(JobText(thisJob.type))
1334  .arg(logInfo).arg(StatusText(thisJob.status)));
1335  continue;
1336  }
1337 
1338  if ((findJobs & JOB_LIST_ALL) ||
1339  ((findJobs & JOB_LIST_DONE) &&
1340  (thisJob.status & JOB_DONE)) ||
1341  ((findJobs & JOB_LIST_NOT_DONE) &&
1342  (!(thisJob.status & JOB_DONE))) ||
1343  ((findJobs & JOB_LIST_ERROR) &&
1344  (thisJob.status == JOB_ERRORED)) ||
1345  ((findJobs & JOB_LIST_RECENT) &&
1346  (thisJob.statustime > recentDate)))
1347  wantThisJob = true;
1348 
1349  if (!wantThisJob)
1350  {
1351  LOG(VB_JOBQUEUE, LOG_INFO, LOC +
1352  QString("GetJobsInQueue: Ignore '%1' Job for %2 in %3 state.")
1353  .arg(JobText(thisJob.type))
1354  .arg(logInfo).arg(StatusText(thisJob.status)));
1355  continue;
1356  }
1357 
1358  LOG(VB_JOBQUEUE, LOG_INFO, LOC +
1359  QString("GetJobsInQueue: Found '%1' Job for %2 in %3 state.")
1360  .arg(JobText(thisJob.type))
1361  .arg(logInfo).arg(StatusText(thisJob.status)));
1362 
1363  thisJob.inserttime = MythDate::as_utc(query.value(3).toDateTime());
1364  thisJob.cmds = query.value(5).toInt();
1365  thisJob.flags = query.value(6).toInt();
1366  thisJob.hostname = query.value(9).toString();
1367  thisJob.args = query.value(10).toString();
1368  thisJob.comment = query.value(11).toString();
1369 
1370  if ((thisJob.type & JOB_USERJOB) &&
1371  (UserJobTypeToIndex(thisJob.type) == 0))
1372  {
1373  thisJob.type = JOB_NONE;
1374  LOG(VB_JOBQUEUE, LOG_INFO, LOC +
1375  QString("GetJobsInQueue: Unknown Job Type: %1")
1376  .arg(thisJob.type));
1377  }
1378 
1379  if (thisJob.type != JOB_NONE)
1380  jobs[jobCount++] = thisJob;
1381  }
1382 
1383  return jobCount;
1384 }
1385 
1386 bool JobQueue::ChangeJobHost(int jobID, QString newHostname)
1387 {
1388  MSqlQuery query(MSqlQuery::InitCon());
1389 
1390  if (!newHostname.isEmpty())
1391  {
1392  query.prepare("UPDATE jobqueue SET hostname = :NEWHOSTNAME "
1393  "WHERE hostname = :EMPTY AND id = :ID;");
1394  query.bindValue(":NEWHOSTNAME", newHostname);
1395  query.bindValue(":EMPTY", "");
1396  query.bindValue(":ID", jobID);
1397  }
1398  else
1399  {
1400  query.prepare("UPDATE jobqueue SET hostname = :EMPTY "
1401  "WHERE id = :ID;");
1402  query.bindValue(":EMPTY", "");
1403  query.bindValue(":ID", jobID);
1404  }
1405 
1406  if (!query.exec())
1407  {
1408  MythDB::DBError(QString("Error in JobQueue::ChangeJobHost(), "
1409  "Unable to set hostname to '%1' for "
1410  "job %2.").arg(newHostname).arg(jobID),
1411  query);
1412  return false;
1413  }
1414 
1415  if (query.numRowsAffected() > 0)
1416  return true;
1417 
1418  return false;
1419 }
1420 
1422 {
1423  QString allowSetting;
1424 
1425  if ((!job.hostname.isEmpty()) &&
1426  (job.hostname != m_hostname))
1427  return false;
1428 
1429  if (job.type & JOB_USERJOB)
1430  {
1431  allowSetting =
1432  QString("JobAllowUserJob%1").arg(UserJobTypeToIndex(job.type));
1433  }
1434  else
1435  {
1436  switch (job.type)
1437  {
1438  case JOB_TRANSCODE: allowSetting = "JobAllowTranscode";
1439  break;
1440  case JOB_COMMFLAG: allowSetting = "JobAllowCommFlag";
1441  break;
1442  case JOB_METADATA: allowSetting = "JobAllowMetadata";
1443  break;
1444  default: return false;
1445  }
1446  }
1447 
1448  if (gCoreContext->GetNumSetting(allowSetting, 1))
1449  return true;
1450 
1451  return false;
1452 }
1453 
1455 {
1456  MSqlQuery query(MSqlQuery::InitCon());
1457 
1458  query.prepare("SELECT cmds FROM jobqueue WHERE id = :ID;");
1459 
1460  query.bindValue(":ID", jobID);
1461 
1462  if (query.exec())
1463  {
1464  if (query.next())
1465  return (enum JobCmds)query.value(0).toInt();
1466  }
1467  else
1468  {
1469  MythDB::DBError("Error in JobQueue::GetJobCmd()", query);
1470  }
1471 
1472  return JOB_RUN;
1473 }
1474 
1476 {
1477  MSqlQuery query(MSqlQuery::InitCon());
1478 
1479  query.prepare("SELECT args FROM jobqueue WHERE id = :ID;");
1480 
1481  query.bindValue(":ID", jobID);
1482 
1483  if (query.exec())
1484  {
1485  if (query.next())
1486  return query.value(0).toString();
1487  }
1488  else
1489  {
1490  MythDB::DBError("Error in JobQueue::GetJobArgs()", query);
1491  }
1492 
1493  return QString("");
1494 }
1495 
1497 {
1498  MSqlQuery query(MSqlQuery::InitCon());
1499 
1500  query.prepare("SELECT flags FROM jobqueue WHERE id = :ID;");
1501 
1502  query.bindValue(":ID", jobID);
1503 
1504  if (query.exec())
1505  {
1506  if (query.next())
1507  return (enum JobFlags)query.value(0).toInt();
1508  }
1509  else
1510  {
1511  MythDB::DBError("Error in JobQueue::GetJobFlags()", query);
1512  }
1513 
1514  return JOB_NO_FLAGS;
1515 }
1516 
1518 {
1519  MSqlQuery query(MSqlQuery::InitCon());
1520 
1521  query.prepare("SELECT status FROM jobqueue WHERE id = :ID;");
1522 
1523  query.bindValue(":ID", jobID);
1524 
1525  if (query.exec())
1526  {
1527  if (query.next())
1528  return (enum JobStatus)query.value(0).toInt();
1529  }
1530  else
1531  {
1532  MythDB::DBError("Error in JobQueue::GetJobStatus()", query);
1533  }
1534  return JOB_UNKNOWN;
1535 }
1536 
1538  int jobType, uint chanid, const QDateTime &recstartts)
1539 {
1540  MSqlQuery query(MSqlQuery::InitCon());
1541 
1542  query.prepare("SELECT status FROM jobqueue WHERE type = :TYPE "
1543  "AND chanid = :CHANID AND starttime = :STARTTIME;");
1544 
1545  query.bindValue(":TYPE", jobType);
1546  query.bindValue(":CHANID", chanid);
1547  query.bindValue(":STARTTIME", recstartts);
1548 
1549  if (query.exec())
1550  {
1551  if (query.next())
1552  return (enum JobStatus)query.value(0).toInt();
1553  }
1554  else
1555  {
1556  MythDB::DBError("Error in JobQueue::GetJobStatus()", query);
1557  }
1558  return JOB_UNKNOWN;
1559 }
1560 
1561 void JobQueue::RecoverQueue(bool justOld)
1562 {
1563  QMap<int, JobQueueEntry> jobs;
1564  QString msg;
1565  QString logInfo;
1566 
1567  msg = QString("RecoverQueue: Checking for unfinished jobs to "
1568  "recover.");
1569  LOG(VB_JOBQUEUE, LOG_INFO, LOC + msg);
1570 
1571  GetJobsInQueue(jobs);
1572 
1573  if (jobs.size())
1574  {
1575  QMap<int, JobQueueEntry>::Iterator it;
1576  QDateTime oldDate = MythDate::current().addDays(-1);
1577  QString hostname = gCoreContext->GetHostName();
1578  int tmpStatus;
1579  int tmpCmds;
1580 
1581  for (it = jobs.begin(); it != jobs.end(); ++it)
1582  {
1583  tmpCmds = (*it).cmds;
1584  tmpStatus = (*it).status;
1585 
1586  if (!(*it).chanid)
1587  logInfo = QString("jobID #%1").arg((*it).id);
1588  else
1589  logInfo = QString("chanid %1 @ %2").arg((*it).chanid)
1590  .arg((*it).startts);
1591 
1592  if (((tmpStatus == JOB_STARTING) ||
1593  (tmpStatus == JOB_RUNNING) ||
1594  (tmpStatus == JOB_PAUSED) ||
1595  (tmpCmds & JOB_STOP) ||
1596  (tmpStatus == JOB_STOPPING)) &&
1597  (((!justOld) &&
1598  ((*it).hostname == hostname)) ||
1599  ((*it).statustime < oldDate)))
1600  {
1601  msg = QString("RecoverQueue: Recovering '%1' for %2 "
1602  "from '%3' state.")
1603  .arg(JobText((*it).type))
1604  .arg(logInfo).arg(StatusText((*it).status));
1605  LOG(VB_JOBQUEUE, LOG_INFO, LOC + msg);
1606 
1607  ChangeJobStatus((*it).id, JOB_QUEUED, "");
1608  ChangeJobCmds((*it).id, JOB_RUN);
1609  if (!gCoreContext->GetNumSetting("JobsRunOnRecordHost", 0))
1610  ChangeJobHost((*it).id, "");
1611  }
1612  else
1613  {
1614 #if 0
1615  msg = QString("RecoverQueue: Ignoring '%1' for %2 "
1616  "in '%3' state.")
1617  .arg(JobText((*it).type))
1618  .arg(logInfo).arg(StatusText((*it).status));
1619  LOG(VB_JOBQUEUE, LOG_INFO, LOC + msg);
1620 #endif
1621  }
1622  }
1623  }
1624 }
1625 
1627 {
1628  MSqlQuery delquery(MSqlQuery::InitCon());
1629  QDateTime donePurgeDate = MythDate::current().addDays(-2);
1630  QDateTime errorsPurgeDate = MythDate::current().addDays(-4);
1631 
1632  delquery.prepare("DELETE FROM jobqueue "
1633  "WHERE (status in (:FINISHED, :ABORTED, :CANCELLED) "
1634  "AND statustime < :DONEPURGEDATE) "
1635  "OR (status in (:ERRORED) "
1636  "AND statustime < :ERRORSPURGEDATE) ");
1637  delquery.bindValue(":FINISHED", JOB_FINISHED);
1638  delquery.bindValue(":ABORTED", JOB_ABORTED);
1639  delquery.bindValue(":CANCELLED", JOB_CANCELLED);
1640  delquery.bindValue(":ERRORED", JOB_ERRORED);
1641  delquery.bindValue(":DONEPURGEDATE", donePurgeDate);
1642  delquery.bindValue(":ERRORSPURGEDATE", errorsPurgeDate);
1643 
1644  if (!delquery.exec())
1645  {
1646  MythDB::DBError("JobQueue::CleanupOldJobsInQueue: Error deleting "
1647  "old finished jobs.", delquery);
1648  }
1649 }
1650 
1652 {
1653  int jobID = job.id;
1654  QString name = QString("jobqueue%1%2").arg(jobID).arg(random());
1655 
1657  {
1658  LOG(VB_JOBQUEUE, LOG_ERR, LOC +
1659  "ProcessJob(): Unable to open database connection");
1660  return;
1661  }
1662 
1663  ChangeJobStatus(jobID, JOB_PENDING);
1664  ProgramInfo *pginfo = NULL;
1665 
1666  if (job.chanid)
1667  {
1668  pginfo = new ProgramInfo(job.chanid, job.recstartts);
1669 
1670  if (!pginfo->GetChanID())
1671  {
1672  LOG(VB_JOBQUEUE, LOG_ERR, LOC +
1673  QString("Unable to retrieve program info for chanid %1 @ %2")
1674  .arg(job.chanid)
1675  .arg(job.recstartts.toString(Qt::ISODate)));
1676 
1677  ChangeJobStatus(jobID, JOB_ERRORED,
1678  tr("Unable to retrieve program info from database"));
1679 
1680  delete pginfo;
1681 
1682  return;
1683  }
1684 
1685  pginfo->SetPathname(pginfo->GetPlaybackURL());
1686  }
1687 
1688 
1689  runningJobsLock->lock();
1690 
1691  ChangeJobStatus(jobID, JOB_STARTING);
1692  RunningJobInfo jInfo;
1693  jInfo.type = job.type;
1694  jInfo.id = jobID;
1695  jInfo.flag = JOB_RUN;
1696  jInfo.desc = GetJobDescription(job.type);
1697  jInfo.command = GetJobCommand(jobID, job.type, pginfo);
1698  jInfo.pginfo = pginfo;
1699 
1700  runningJobs[jobID] = jInfo;
1701 
1702  if (pginfo)
1703  pginfo->MarkAsInUse(true, kJobQueueInUseID);
1704 
1705  if (pginfo && pginfo->GetRecordingGroup() == "Deleted")
1706  {
1707  ChangeJobStatus(jobID, JOB_CANCELLED,
1708  tr("Program has been deleted"));
1709  RemoveRunningJob(jobID);
1710  }
1711  else if ((job.type == JOB_TRANSCODE) ||
1712  (runningJobs[jobID].command == "mythtranscode"))
1713  {
1715  }
1716  else if ((job.type == JOB_COMMFLAG) ||
1717  (runningJobs[jobID].command == "mythcommflag"))
1718  {
1720  }
1721  else if ((job.type == JOB_METADATA) ||
1722  (runningJobs[jobID].command == "mythmetadatalookup"))
1723  {
1725  }
1726  else if (job.type & JOB_USERJOB)
1727  {
1728  StartChildJob(UserJobThread, jobID);
1729  }
1730  else
1731  {
1732  ChangeJobStatus(jobID, JOB_ERRORED,
1733  tr("UNKNOWN JobType, unable to process!"));
1734  RemoveRunningJob(jobID);
1735  }
1736 
1737  runningJobsLock->unlock();
1738 }
1739 
1740 void JobQueue::StartChildJob(void *(*ChildThreadRoutine)(void *), int jobID)
1741 {
1742  JobThreadStruct *jts = new JobThreadStruct;
1743  jts->jq = this;
1744  jts->jobID = jobID;
1745 
1746  pthread_t childThread;
1747  pthread_attr_t attr;
1748  pthread_attr_init(&attr);
1749  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
1750  pthread_create(&childThread, &attr, ChildThreadRoutine, jts);
1751  pthread_attr_destroy(&attr);
1752 }
1753 
1754 QString JobQueue::GetJobDescription(int jobType)
1755 {
1756  if (jobType == JOB_TRANSCODE)
1757  return "Transcode";
1758  else if (jobType == JOB_COMMFLAG)
1759  return "Commercial Detection";
1760  else if (!(jobType & JOB_USERJOB))
1761  return "Unknown Job";
1762 
1763  QString descSetting =
1764  QString("UserJobDesc%1").arg(UserJobTypeToIndex(jobType));
1765 
1766  return gCoreContext->GetSetting(descSetting, "Unknown Job");
1767 }
1768 
1769 QString JobQueue::GetJobCommand(int id, int jobType, ProgramInfo *tmpInfo)
1770 {
1771  QString command;
1772  MSqlQuery query(MSqlQuery::InitCon());
1773 
1774  if (jobType == JOB_TRANSCODE)
1775  {
1776  command = gCoreContext->GetSetting("JobQueueTranscodeCommand");
1777  if (command.trimmed().isEmpty())
1778  command = "mythtranscode";
1779 
1780  if (command == "mythtranscode")
1781  return command;
1782  }
1783  else if (jobType == JOB_COMMFLAG)
1784  {
1785  command = gCoreContext->GetSetting("JobQueueCommFlagCommand");
1786  if (command.trimmed().isEmpty())
1787  command = "mythcommflag";
1788 
1789  if (command == "mythcommflag")
1790  return command;
1791  }
1792  else if (jobType & JOB_USERJOB)
1793  {
1794  command = gCoreContext->GetSetting(
1795  QString("UserJob%1").arg(UserJobTypeToIndex(jobType)), "");
1796  }
1797 
1798  if (!command.isEmpty())
1799  {
1800  command.replace("%JOBID%", QString("%1").arg(id));
1801  }
1802 
1803  if (!command.isEmpty() && tmpInfo)
1804  {
1805  tmpInfo->SubstituteMatches(command);
1806 
1807  command.replace("%VERBOSELEVEL%", QString("%1").arg(verboseMask));
1808  command.replace("%VERBOSEMODE%", QString("%1").arg(logPropagateArgs));
1809 
1810  uint transcoder = tmpInfo->QueryTranscoderID();
1811  command.replace("%TRANSPROFILE%",
1812  (RecordingProfile::TranscoderAutodetect == transcoder) ?
1813  "autodetect" : QString::number(transcoder));
1814  }
1815 
1816  return command;
1817 }
1818 
1820 {
1821  runningJobsLock->lock();
1822 
1823  if (runningJobs.contains(id))
1824  {
1825  ProgramInfo *pginfo = runningJobs[id].pginfo;
1826  if (pginfo)
1827  {
1828  pginfo->MarkAsInUse(false, kJobQueueInUseID);
1829  delete pginfo;
1830  }
1831 
1832  runningJobs.remove(id);
1833  }
1834 
1835  runningJobsLock->unlock();
1836 }
1837 
1838 QString JobQueue::PrettyPrint(off_t bytes)
1839 {
1840  // Pretty print "bytes" as KB, MB, GB, TB, etc., subject to the desired
1841  // number of units
1842  static const struct {
1843  const char *suffix;
1844  unsigned int max;
1845  int precision;
1846  } pptab[] = {
1847  { "bytes", 9999, 0 },
1848  { "kB", 999, 0 },
1849  { "MB", 999, 1 },
1850  { "GB", 999, 1 },
1851  { "TB", 999, 1 },
1852  { "PB", 999, 1 },
1853  { "EB", 999, 1 },
1854  { "ZB", 999, 1 },
1855  { "YB", 0, 0 },
1856  };
1857  unsigned int ii;
1858  float fbytes = bytes;
1859 
1860  ii = 0;
1861  while (pptab[ii].max && fbytes > pptab[ii].max) {
1862  fbytes /= 1024;
1863  ii++;
1864  }
1865 
1866  return QString("%1 %2")
1867  .arg(fbytes, 0, 'f', pptab[ii].precision)
1868  .arg(pptab[ii].suffix);
1869 }
1870 
1871 void *JobQueue::TranscodeThread(void *param)
1872 {
1873  JobThreadStruct *jts = (JobThreadStruct *)param;
1874  JobQueue *jq = jts->jq;
1875 
1876  MThread::ThreadSetup(QString("Transcode_%1").arg(jts->jobID));
1877  jq->DoTranscodeThread(jts->jobID);
1879 
1880  delete jts;
1881 
1882  return NULL;
1883 }
1884 
1886 {
1887  // We can't currently transcode non-recording files w/o a ProgramInfo
1888  runningJobsLock->lock();
1889  if (!runningJobs[jobID].pginfo)
1890  {
1891  LOG(VB_JOBQUEUE, LOG_ERR, LOC +
1892  "The JobQueue cannot currently transcode files that do not "
1893  "have a chanid/starttime in the recorded table.");
1894  ChangeJobStatus(jobID, JOB_ERRORED, "ProgramInfo data not found");
1895  RemoveRunningJob(jobID);
1896  runningJobsLock->unlock();
1897  return;
1898  }
1899 
1900  ProgramInfo *program_info = runningJobs[jobID].pginfo;
1901  runningJobsLock->unlock();
1902 
1903  ChangeJobStatus(jobID, JOB_RUNNING);
1904 
1905  // make sure flags are up to date
1906  program_info->Reload();
1907 
1908  bool useCutlist = program_info->HasCutlist() &&
1909  !!(GetJobFlags(jobID) & JOB_USE_CUTLIST);
1910 
1911  uint transcoder = program_info->QueryTranscoderID();
1912  QString profilearg =
1913  (RecordingProfile::TranscoderAutodetect == transcoder) ?
1914  "autodetect" : QString::number(transcoder);
1915 
1916  QString path;
1917  QString command;
1918 
1919  runningJobsLock->lock();
1920  if (runningJobs[jobID].command == "mythtranscode")
1921  {
1922  path = GetInstallPrefix() + "/bin/mythtranscode";
1923  command = QString("%1 -j %2 --profile %3")
1924  .arg(path).arg(jobID).arg(profilearg);
1925  if (useCutlist)
1926  command += " --honorcutlist";
1927  command += logPropagateArgs;
1928  }
1929  else
1930  {
1931  command = runningJobs[jobID].command;
1932 
1933  QStringList tokens = command.split(" ", QString::SkipEmptyParts);
1934  if (!tokens.empty())
1935  path = tokens[0];
1936  }
1937  runningJobsLock->unlock();
1938 
1939  if (jobQueueCPU < 2)
1940  {
1941  myth_nice(17);
1942  myth_ioprio((0 == jobQueueCPU) ? 8 : 7);
1943  }
1944 
1945  QString transcoderName;
1946  if (transcoder == RecordingProfile::TranscoderAutodetect)
1947  {
1948  transcoderName = "Autodetect";
1949  }
1950  else
1951  {
1952  MSqlQuery query(MSqlQuery::InitCon());
1953  query.prepare("SELECT name FROM recordingprofiles WHERE id = :ID;");
1954  query.bindValue(":ID", transcoder);
1955  if (query.exec() && query.next())
1956  {
1957  transcoderName = query.value(0).toString();
1958  }
1959  else
1960  {
1961  /* Unexpected value; log it. */
1962  transcoderName = QString("Autodetect(%1)").arg(transcoder);
1963  }
1964  }
1965 
1966  QString msg;
1967  bool retry = true;
1968  int retrylimit = 3;
1969  while (retry)
1970  {
1971  retry = false;
1972 
1973  ChangeJobStatus(jobID, JOB_STARTING);
1975 
1976  QString filename = program_info->GetPlaybackURL(false, true);
1977 
1978  long long filesize = 0;
1979  long long origfilesize = QFileInfo(filename).size();
1980 
1981  QString msg = QString("Transcode %1")
1982  .arg(StatusText(GetJobStatus(jobID)));
1983 
1984  QString detailstr = QString("%1: %2 (%3)")
1985  .arg(program_info->toString(ProgramInfo::kTitleSubtitle))
1986  .arg(transcoderName)
1987  .arg(PrettyPrint(origfilesize));
1988  QByteArray details = detailstr.toLocal8Bit();
1989 
1990  LOG(VB_GENERAL, LOG_INFO, LOC + QString("%1 for %2")
1991  .arg(msg).arg(details.constData()));
1992 
1993  LOG(VB_JOBQUEUE, LOG_INFO, LOC + QString("Running command: '%1'")
1994  .arg(command));
1995 
1996  GetMythDB()->GetDBManager()->CloseDatabases();
1997  uint result = myth_system(command);
1998  int status = GetJobStatus(jobID);
1999 
2000  if ((result == GENERIC_EXIT_DAEMONIZING_ERROR) ||
2001  (result == GENERIC_EXIT_CMD_NOT_FOUND))
2002  {
2003  ChangeJobStatus(jobID, JOB_ERRORED,
2004  tr("ERROR: Unable to find mythtranscode, check backend logs."));
2006 
2007  msg = QString("Transcode %1").arg(StatusText(GetJobStatus(jobID)));
2008  detailstr = QString("%1: %2 does not exist or is not executable")
2009  .arg(program_info->toString(ProgramInfo::kTitleSubtitle))
2010  .arg(path);
2011  details = detailstr.toLocal8Bit();
2012 
2013  LOG(VB_GENERAL, LOG_ERR, LOC +
2014  QString("%1 for %2").arg(msg).arg(details.constData()));
2015  }
2016  else if (result == GENERIC_EXIT_RESTART && retrylimit > 0)
2017  {
2018  LOG(VB_JOBQUEUE, LOG_INFO, LOC + "Transcode command restarting");
2019  retry = true;
2020  retrylimit--;
2021 
2023  }
2024  else
2025  {
2026  if (status == JOB_FINISHED)
2027  {
2028  ChangeJobStatus(jobID, JOB_FINISHED, tr("Finished."));
2029  retry = false;
2030 
2031  filename = program_info->GetPlaybackURL(false, true);
2032  QFileInfo st(filename);
2033 
2034  if (st.exists())
2035  {
2036  filesize = st.size();
2037  /*: %1 is transcoder name, %2 is the original file size
2038  and %3 is the current file size */
2039  QString comment = tr("%1: %2 => %3")
2040  .arg(transcoderName)
2041  .arg(PrettyPrint(origfilesize))
2042  .arg(PrettyPrint(filesize));
2043  ChangeJobComment(jobID, comment);
2044 
2045  if (filesize > 0)
2046  program_info->SaveFilesize(filesize);
2047 
2048  details = (QString("%1: %2 (%3)")
2049  .arg(program_info->toString(
2051  .arg(transcoderName)
2052  .arg(PrettyPrint(filesize))).toLocal8Bit();
2053  }
2054  else
2055  {
2056  QString comment =
2057  QString("could not stat '%1'").arg(filename);
2058 
2059  ChangeJobStatus(jobID, JOB_FINISHED, comment);
2060 
2061  details = (QString("%1: %2")
2062  .arg(program_info->toString(
2064  .arg(comment)).toLocal8Bit();
2065  }
2066 
2068  }
2069  else
2070  {
2072 
2073  QString comment = tr("exit status %1, job status was \"%2\"")
2074  .arg(result)
2075  .arg(StatusText(status));
2076 
2077  ChangeJobStatus(jobID, JOB_ERRORED, comment);
2078 
2079  details = (QString("%1: %2 (%3)")
2080  .arg(program_info->toString(
2082  .arg(transcoderName)
2083  .arg(comment)).toLocal8Bit().constData();
2084  }
2085 
2086  msg = QString("Transcode %1").arg(StatusText(GetJobStatus(jobID)));
2087  LOG(VB_GENERAL, LOG_INFO, LOC + msg + ": " + details);
2088  }
2089  }
2090 
2091  if (retrylimit == 0)
2092  {
2093  LOG(VB_JOBQUEUE, LOG_ERR, LOC + "Retry limit exceeded for transcoder, "
2094  "setting job status to errored.");
2095  ChangeJobStatus(jobID, JOB_ERRORED, tr("Retry limit exceeded"));
2096  }
2097 
2098  RemoveRunningJob(jobID);
2099 }
2100 
2102 {
2103  JobThreadStruct *jts = (JobThreadStruct *)param;
2104  JobQueue *jq = jts->jq;
2105 
2106  MThread::ThreadSetup(QString("Metadata_%1").arg(jts->jobID));
2107  jq->DoMetadataLookupThread(jts->jobID);
2109 
2110  delete jts;
2111 
2112  return NULL;
2113 }
2114 
2116 {
2117  // We can't currently lookup non-recording files w/o a ProgramInfo
2118  runningJobsLock->lock();
2119  if (!runningJobs[jobID].pginfo)
2120  {
2121  LOG(VB_JOBQUEUE, LOG_ERR, LOC +
2122  "The JobQueue cannot currently perform lookups for items which do "
2123  "not have a chanid/starttime in the recorded table.");
2124  ChangeJobStatus(jobID, JOB_ERRORED, "ProgramInfo data not found");
2125  RemoveRunningJob(jobID);
2126  runningJobsLock->unlock();
2127  return;
2128  }
2129 
2130  ProgramInfo *program_info = runningJobs[jobID].pginfo;
2131  runningJobsLock->unlock();
2132 
2133  QString detailstr = QString("%1 recorded from channel %3")
2134  .arg(program_info->toString(ProgramInfo::kTitleSubtitle))
2135  .arg(program_info->toString(ProgramInfo::kRecordingKey));
2136  QByteArray details = detailstr.toLocal8Bit();
2137 
2139  {
2140  QString msg = QString("Metadata Lookup failed. Could not open "
2141  "new database connection for %1. "
2142  "Program cannot be looked up.")
2143  .arg(details.constData());
2144  LOG(VB_GENERAL, LOG_ERR, LOC + msg);
2145 
2146  ChangeJobStatus(jobID, JOB_ERRORED,
2147  tr("Could not open new database connection for "
2148  "metadata lookup."));
2149 
2150  delete program_info;
2151  return;
2152  }
2153 
2154  QString msg = tr("Metadata Lookup Starting");
2155  LOG(VB_GENERAL, LOG_INFO,
2156  LOC + "Metadata Lookup Starting for " + detailstr);
2157 
2158  uint retVal = 0;
2159  QString path;
2160  QString command;
2161 
2162  path = GetInstallPrefix() + "/bin/mythmetadatalookup";
2163  command = QString("%1 -j %2")
2164  .arg(path).arg(jobID);
2165  command += logPropagateArgs;
2166 
2167  LOG(VB_JOBQUEUE, LOG_INFO, LOC + QString("Running command: '%1'")
2168  .arg(command));
2169 
2170  GetMythDB()->GetDBManager()->CloseDatabases();
2171  retVal = myth_system(command);
2172  int priority = LOG_NOTICE;
2173  QString comment;
2174 
2175  runningJobsLock->lock();
2176 
2177  if ((retVal == GENERIC_EXIT_DAEMONIZING_ERROR) ||
2178  (retVal == GENERIC_EXIT_CMD_NOT_FOUND))
2179  {
2180  comment = tr("Unable to find mythmetadatalookup");
2181  ChangeJobStatus(jobID, JOB_ERRORED, comment);
2182  priority = LOG_WARNING;
2183  }
2184  else if (runningJobs[jobID].flag == JOB_STOP)
2185  {
2186  comment = tr("Aborted by user");
2187  ChangeJobStatus(jobID, JOB_ABORTED, comment);
2188  priority = LOG_WARNING;
2189  }
2190  else if (retVal == GENERIC_EXIT_NO_RECORDING_DATA)
2191  {
2192  comment = tr("Unable to open file or init decoder");
2193  ChangeJobStatus(jobID, JOB_ERRORED, comment);
2194  priority = LOG_WARNING;
2195  }
2196  else if (retVal >= GENERIC_EXIT_NOT_OK) // 256 or above - error
2197  {
2198  comment = tr("Failed with exit status %1").arg(retVal);
2199  ChangeJobStatus(jobID, JOB_ERRORED, comment);
2200  priority = LOG_WARNING;
2201  }
2202  else
2203  {
2204  comment = tr("Metadata Lookup Complete.");
2205  ChangeJobStatus(jobID, JOB_FINISHED, comment);
2206 
2207  program_info->SendUpdateEvent();
2208  }
2209 
2210  msg = tr("Metadata Lookup %1", "Job ID")
2211  .arg(StatusText(GetJobStatus(jobID)));
2212 
2213  if (!comment.isEmpty())
2214  {
2215  detailstr += QString(" (%1)").arg(comment);
2216  details = detailstr.toLocal8Bit();
2217  }
2218 
2219  if (priority <= LOG_WARNING)
2220  LOG(VB_GENERAL, LOG_ERR, LOC + msg + ": " + details.constData());
2221 
2222  RemoveRunningJob(jobID);
2223  runningJobsLock->unlock();
2224 }
2225 
2227 {
2228  JobThreadStruct *jts = (JobThreadStruct *)param;
2229  JobQueue *jq = jts->jq;
2230 
2231  MThread::ThreadSetup(QString("Commflag_%1").arg(jts->jobID));
2232  jq->DoFlagCommercialsThread(jts->jobID);
2234 
2235  delete jts;
2236 
2237  return NULL;
2238 }
2239 
2241 {
2242  // We can't currently commflag non-recording files w/o a ProgramInfo
2243  runningJobsLock->lock();
2244  if (!runningJobs[jobID].pginfo)
2245  {
2246  LOG(VB_JOBQUEUE, LOG_ERR, LOC +
2247  "The JobQueue cannot currently commflag files that do not "
2248  "have a chanid/starttime in the recorded table.");
2249  ChangeJobStatus(jobID, JOB_ERRORED, "ProgramInfo data not found");
2250  RemoveRunningJob(jobID);
2251  runningJobsLock->unlock();
2252  return;
2253  }
2254 
2255  ProgramInfo *program_info = runningJobs[jobID].pginfo;
2256  runningJobsLock->unlock();
2257 
2258  QString detailstr = QString("%1 recorded from channel %3")
2259  .arg(program_info->toString(ProgramInfo::kTitleSubtitle))
2260  .arg(program_info->toString(ProgramInfo::kRecordingKey));
2261  QByteArray details = detailstr.toLocal8Bit();
2262 
2264  {
2265  QString msg = QString("Commercial Detection failed. Could not open "
2266  "new database connection for %1. "
2267  "Program cannot be flagged.")
2268  .arg(details.constData());
2269  LOG(VB_GENERAL, LOG_ERR, LOC + msg);
2270 
2271  ChangeJobStatus(jobID, JOB_ERRORED,
2272  tr("Could not open new database connection for "
2273  "commercial detector."));
2274 
2275  delete program_info;
2276  return;
2277  }
2278 
2279  QString msg = tr("Commercial Detection Starting");
2280  LOG(VB_GENERAL, LOG_INFO,
2281  LOC + "Commercial Detection Starting for " + detailstr);
2282 
2283  uint breaksFound = 0;
2284  QString path;
2285  QString command;
2286 
2287  runningJobsLock->lock();
2288  if (runningJobs[jobID].command == "mythcommflag")
2289  {
2290  path = GetInstallPrefix() + "/bin/mythcommflag";
2291  command = QString("%1 -j %2 --noprogress")
2292  .arg(path).arg(jobID);
2293  command += logPropagateArgs;
2294  }
2295  else
2296  {
2297  command = runningJobs[jobID].command;
2298  QStringList tokens = command.split(" ", QString::SkipEmptyParts);
2299  if (!tokens.empty())
2300  path = tokens[0];
2301  }
2302  runningJobsLock->unlock();
2303 
2304  LOG(VB_JOBQUEUE, LOG_INFO, LOC + QString("Running command: '%1'")
2305  .arg(command));
2306 
2307  GetMythDB()->GetDBManager()->CloseDatabases();
2308  breaksFound = myth_system(command, kMSLowExitVal);
2309  int priority = LOG_NOTICE;
2310  QString comment;
2311 
2312  runningJobsLock->lock();
2313 
2314  if ((breaksFound == GENERIC_EXIT_DAEMONIZING_ERROR) ||
2315  (breaksFound == GENERIC_EXIT_CMD_NOT_FOUND))
2316  {
2317  comment = tr("Unable to find mythcommflag");
2318  ChangeJobStatus(jobID, JOB_ERRORED, comment);
2319  priority = LOG_WARNING;
2320  }
2321  else if (runningJobs[jobID].flag == JOB_STOP)
2322  {
2323  comment = tr("Aborted by user");
2324  ChangeJobStatus(jobID, JOB_ABORTED, comment);
2325  priority = LOG_WARNING;
2326  }
2327  else if (breaksFound == GENERIC_EXIT_NO_RECORDING_DATA)
2328  {
2329  comment = tr("Unable to open file or init decoder");
2330  ChangeJobStatus(jobID, JOB_ERRORED, comment);
2331  priority = LOG_WARNING;
2332  }
2333  else if (breaksFound >= GENERIC_EXIT_NOT_OK) // 256 or above - error
2334  {
2335  comment = tr("Failed with exit status %1").arg(breaksFound);
2336  ChangeJobStatus(jobID, JOB_ERRORED, comment);
2337  priority = LOG_WARNING;
2338  }
2339  else
2340  {
2341  comment = tr("%n commercial break(s)", "", breaksFound);
2342  ChangeJobStatus(jobID, JOB_FINISHED, comment);
2343 
2344  program_info->SendUpdateEvent();
2345 
2346  if (!program_info->IsLocal())
2347  program_info->SetPathname(program_info->GetPlaybackURL(false,true));
2348  if (program_info->IsLocal())
2349  {
2351  program_info, QString(), PreviewGenerator::kLocal);
2352  pg->Run();
2353  pg->deleteLater();
2354  }
2355  }
2356 
2357  msg = tr("Commercial Detection %1", "Job ID")
2358  .arg(StatusText(GetJobStatus(jobID)));
2359 
2360  if (!comment.isEmpty())
2361  {
2362  detailstr += QString(" (%1)").arg(comment);
2363  details = detailstr.toLocal8Bit();
2364  }
2365 
2366  if (priority <= LOG_WARNING)
2367  LOG(VB_GENERAL, LOG_ERR, LOC + msg + ": " + details.constData());
2368 
2369  RemoveRunningJob(jobID);
2370  runningJobsLock->unlock();
2371 }
2372 
2373 void *JobQueue::UserJobThread(void *param)
2374 {
2375  JobThreadStruct *jts = (JobThreadStruct *)param;
2376  JobQueue *jq = jts->jq;
2377 
2378  MThread::ThreadSetup(QString("UserJob_%1").arg(jts->jobID));
2379  jq->DoUserJobThread(jts->jobID);
2381 
2382  delete jts;
2383 
2384  return NULL;
2385 }
2386 
2388 {
2389  runningJobsLock->lock();
2390  ProgramInfo *pginfo = runningJobs[jobID].pginfo;
2391  QString jobDesc = runningJobs[jobID].desc;
2392  QString command = runningJobs[jobID].command;
2393  runningJobsLock->unlock();
2394 
2395  ChangeJobStatus(jobID, JOB_RUNNING);
2396 
2397  QString msg;
2398 
2399  if (pginfo)
2400  {
2401  msg = QString("Started %1 for %2 recorded from channel %3")
2402  .arg(jobDesc)
2403  .arg(pginfo->toString(ProgramInfo::kTitleSubtitle))
2404  .arg(pginfo->toString(ProgramInfo::kRecordingKey));
2405  }
2406  else
2407  msg = QString("Started %1 for jobID %2").arg(jobDesc).arg(jobID);
2408 
2409  LOG(VB_GENERAL, LOG_INFO, LOC + QString(msg.toLocal8Bit().constData()));
2410 
2411  switch (jobQueueCPU)
2412  {
2413  case 0: myth_nice(17);
2414  myth_ioprio(8);
2415  break;
2416  case 1: myth_nice(10);
2417  myth_ioprio(7);
2418  break;
2419  case 2:
2420  default: break;
2421  }
2422 
2423  LOG(VB_JOBQUEUE, LOG_INFO, LOC + QString("Running command: '%1'")
2424  .arg(command));
2425  GetMythDB()->GetDBManager()->CloseDatabases();
2426  uint result = myth_system(command);
2427 
2428  if ((result == GENERIC_EXIT_DAEMONIZING_ERROR) ||
2429  (result == GENERIC_EXIT_CMD_NOT_FOUND))
2430  {
2431  msg = QString("User Job '%1' failed, unable to find "
2432  "executable, check your PATH and backend logs.")
2433  .arg(command);
2434  LOG(VB_GENERAL, LOG_ERR, LOC + msg);
2435  LOG(VB_GENERAL, LOG_NOTICE, LOC + QString("Current PATH: '%1'")
2436  .arg(getenv("PATH")));
2437 
2438  ChangeJobStatus(jobID, JOB_ERRORED,
2439  tr("ERROR: Unable to find executable, check backend logs."));
2440  }
2441  else if (result != 0)
2442  {
2443  msg = QString("User Job '%1' failed.").arg(command);
2444  LOG(VB_GENERAL, LOG_ERR, LOC + msg);
2445 
2446  ChangeJobStatus(jobID, JOB_ERRORED,
2447  tr("ERROR: User Job returned non-zero, check logs."));
2448  }
2449  else
2450  {
2451  if (pginfo)
2452  {
2453  msg = QString("Finished %1 for %2 recorded from channel %3")
2454  .arg(jobDesc)
2455  .arg(pginfo->toString(ProgramInfo::kTitleSubtitle))
2456  .arg(pginfo->toString(ProgramInfo::kRecordingKey));
2457  }
2458  else
2459  msg = QString("Finished %1 for jobID %2").arg(jobDesc).arg(jobID);
2460 
2461  LOG(VB_GENERAL, LOG_INFO, LOC + QString(msg.toLocal8Bit().constData()));
2462 
2463  ChangeJobStatus(jobID, JOB_FINISHED, tr("Successfully Completed."));
2464 
2465  if (pginfo)
2466  pginfo->SendUpdateEvent();
2467  }
2468 
2469  RemoveRunningJob(jobID);
2470 }
2471 
2473 {
2474  if (jobType & JOB_USERJOB)
2475  {
2476  int x = ((jobType & JOB_USERJOB)>> 8);
2477  int bits = 1;
2478  while ((x != 0) && ((x & 0x01) == 0))
2479  {
2480  bits++;
2481  x = x >> 1;
2482  }
2483  if ( bits > 4 )
2484  return JOB_NONE;
2485 
2486  return bits;
2487  }
2488  return JOB_NONE;
2489 }
2490 
2491 /* vim: set expandtab tabstop=4 shiftwidth=4: */