MythTV  master
streamhandler.cpp
Go to the documentation of this file.
1 // -*- Mode: c++ -*-
2 
3 // C++ headers
4 #include <utility>
5 
6 // MythTV headers
7 #include "streamhandler.h"
8 
10 
11 #ifndef O_LARGEFILE
12 #define O_LARGEFILE 0
13 #endif
14 
15 #define LOC QString("SH[%1]: ").arg(m_inputId)
16 
18 {
19  QMutexLocker locker(&m_addRmLock);
20 
21  {
22  QMutexLocker locker2(&m_listenerLock);
23  if (!m_streamDataList.empty())
24  {
25  LOG(VB_GENERAL, LOG_ERR, LOC +
26  "dtor & _stream_data_list not empty");
27  }
28  }
29 
30  // This should never be triggered.. just to be safe..
31  if (m_running)
32  Stop();
33 }
34 
36  bool allow_section_reader,
37  bool needs_buffering,
38  const QString& output_file)
39 {
40  QMutexLocker locker(&m_addRmLock);
41 
42  LOG(VB_RECORD, LOG_INFO, LOC + QString("AddListener(0x%1) -- begin")
43  .arg((uint64_t)data,0,16));
44  if (!data)
45  {
46  LOG(VB_GENERAL, LOG_ERR, LOC +
47  QString("AddListener(0x%1) -- null data")
48  .arg((uint64_t)data,0,16));
49  return;
50  }
51 
52  m_listenerLock.lock();
53 
54  LOG(VB_RECORD, LOG_INFO, LOC + QString("AddListener(0x%1) -- locked")
55  .arg((uint64_t)data,0,16));
56 
57  if (m_streamDataList.empty())
58  {
59  QMutexLocker locker2(&m_startStopLock);
60  m_allowSectionReader = allow_section_reader;
61  m_needsBuffering = needs_buffering;
62  }
63  else
64  {
65  QMutexLocker locker2(&m_startStopLock);
66  m_allowSectionReader &= allow_section_reader;
67  m_needsBuffering |= needs_buffering;
68  }
69 
70  m_streamDataList[data] = output_file;
71 
72  m_listenerLock.unlock();
73 
74  Start();
75 
76  LOG(VB_RECORD, LOG_INFO, LOC + QString("AddListener(0x%1) -- end")
77  .arg((uint64_t)data,0,16));
78 }
79 
81 {
82  QMutexLocker locker(&m_addRmLock);
83 
84  LOG(VB_RECORD, LOG_INFO, LOC + QString("RemoveListener(0x%1) -- begin")
85  .arg((uint64_t)data,0,16));
86  if (!data)
87  {
88  LOG(VB_GENERAL, LOG_ERR, LOC +
89  QString("RemoveListener(0x%1) -- null data")
90  .arg((uint64_t)data,0,16));
91  return;
92  }
93 
94  m_listenerLock.lock();
95 
96  LOG(VB_RECORD, LOG_INFO, LOC + QString("RemoveListener(0x%1) -- locked")
97  .arg((uint64_t)data,0,16));
98 
99  StreamDataList::iterator it = m_streamDataList.find(data);
100 
101  if (it != m_streamDataList.end())
102  {
103  if (!(*it).isEmpty())
105  m_streamDataList.erase(it);
106  }
107 
108  m_listenerLock.unlock();
109 
110  if (m_streamDataList.empty())
111  Stop();
112 
113  LOG(VB_RECORD, LOG_INFO, LOC + QString("RemoveListener(0x%1) -- end")
114  .arg((uint64_t)data,0,16));
115 }
116 
118 {
119  QMutexLocker locker(&m_startStopLock);
120 
121  if (m_running)
122  {
125  {
126  LOG(VB_RECORD, LOG_INFO, LOC + "Restarting StreamHandler");
127  SetRunningDesired(false);
128  m_restarting = true;
129  locker.unlock();
130  wait();
131  locker.relock();
132  m_restarting = false;
133  }
134  }
135 
136  if (m_running)
137  return;
138 
139  m_eitPids.clear();
140 
141  m_bError = false;
142  SetRunningDesired(true);
143  MThread::start();
144 
145  while (!m_running && !m_bError && m_runningDesired)
147 
148  if (m_bError)
149  {
150  LOG(VB_GENERAL, LOG_ERR, LOC + "Start failed");
151  SetRunningDesired(false);
152  }
153 }
154 
156 {
157  LOG(VB_RECORD, LOG_DEBUG, LOC + "Stopping");
159  wait();
160  LOG(VB_RECORD, LOG_DEBUG, LOC + "Stopped");
161 }
162 
163 bool StreamHandler::IsRunning(void) const
164 {
165  // This used to use QMutexLocker, but that sometimes left the
166  // mutex locked on exit, so...
167  m_startStopLock.lock();
168  bool r = m_running || m_restarting;
169  m_startStopLock.unlock();
170  return r;
171 }
172 
173 void StreamHandler::SetRunning(bool is_running,
174  bool is_using_buffering,
175  bool is_using_section_reader)
176 {
177  QMutexLocker locker(&m_startStopLock);
178  m_running = is_running;
179  m_usingBuffering = is_using_buffering;
180  m_usingSectionReader = is_using_section_reader;
181  m_runningStateChanged.wakeAll();
182 }
183 
185 {
186  m_runningDesired = desired;
187  if (!desired)
188  MThread::exit(0);
189 }
190 
192 {
193 #ifdef DEBUG_PID_FILTERS
194  LOG(VB_RECORD, LOG_DEBUG, LOC + QString("AddPIDFilter(0x%1)")
195  .arg(info->m_pid, 0, 16));
196 #endif // DEBUG_PID_FILTERS
197 
198  QMutexLocker writing_locker(&m_pidLock);
199  m_pidInfo[info->m_pid] = info;
200 
201  m_filtersChanged = true;
202 
204 
205  return true;
206 }
207 
209 {
210 #ifdef DEBUG_PID_FILTERS
211  LOG(VB_RECORD, LOG_DEBUG, LOC +
212  QString("RemovePIDFilter(0x%1)").arg(pid, 0, 16));
213 #endif // DEBUG_PID_FILTERS
214 
215  QMutexLocker write_locker(&m_pidLock);
216 
217  PIDInfoMap::iterator it = m_pidInfo.find(pid);
218  if (it == m_pidInfo.end())
219  return false;
220 
221  PIDInfo *tmp = *it;
222  m_pidInfo.erase(it);
223 
224  bool ok = true;
225  if (tmp->IsOpen())
226  {
227  ok = tmp->Close(m_device);
229 
231  }
232 
233  delete tmp;
234 
235  m_filtersChanged = true;
236 
237  return ok;
238 }
239 
241 {
242  QMutexLocker write_locker(&m_pidLock);
243 
244 #ifdef DEBUG_PID_FILTERS
245  LOG(VB_RECORD, LOG_DEBUG, LOC + "RemoveAllPIDFilters()");
246 #endif // DEBUG_PID_FILTERS
247 
248  std::vector<int> del_pids;
249  for (auto it = m_pidInfo.begin(); it != m_pidInfo.end(); ++it)
250  del_pids.push_back(it.key());
251 
252  bool ok = true;
253  for (int & pid : del_pids)
254  ok &= RemovePIDFilter(pid);
255 
256  return UpdateFilters() && ok;
257 }
258 
260 {
261  QMutexLocker read_locker(&m_listenerLock);
262 
263  for (auto it1 = m_streamDataList.cbegin(); it1 != m_streamDataList.cend(); ++it1)
264  {
265  std::vector<uint> add_eit;
266  std::vector<uint> del_eit;
267 
268  MPEGStreamData *sd = it1.key();
269  if (sd->HasEITPIDChanges(m_eitPids) &&
270  sd->GetEITPIDChanges(m_eitPids, add_eit, del_eit))
271  {
272  for (uint eit : del_eit)
273  {
274  uint_vec_t::iterator it2;
275  it2 = find(m_eitPids.begin(), m_eitPids.end(), eit);
276  if (it2 != m_eitPids.end())
277  m_eitPids.erase(it2);
278  sd->RemoveListeningPID(eit);
279  }
280 
281  for (uint eit : add_eit)
282  {
283  m_eitPids.push_back(eit);
284  sd->AddListeningPID(eit);
285  }
286  }
287  }
288 }
289 
291 {
293 
294  pid_map_t pids;
295 
296  {
297  QMutexLocker read_locker(&m_listenerLock);
298  for (auto it = m_streamDataList.cbegin(); it != m_streamDataList.cend(); ++it)
299  it.key()->GetPIDs(pids);
300  }
301 
302  QMap<uint, PIDInfo*> add_pids;
303  std::vector<uint> del_pids;
304 
305  {
306  QMutexLocker read_locker(&m_pidLock);
307 
308  // PIDs that need to be added..
309  for (auto lit = pids.constBegin(); lit != pids.constEnd(); ++lit)
310  {
311  if ((*lit != 0U) && (m_pidInfo.find(lit.key()) == m_pidInfo.end()))
312  {
313  add_pids[lit.key()] = CreatePIDInfo(
314  lit.key(), StreamID::PrivSec, 0);
315  }
316  }
317 
318  // PIDs that need to be removed..
319  for (auto fit = m_pidInfo.cbegin(); fit != m_pidInfo.cend(); ++fit)
320  {
321  bool in_pids = pids.find(fit.key()) != pids.end();
322  if (!in_pids)
323  del_pids.push_back(fit.key());
324  }
325  }
326 
327  // Remove PIDs
328  bool ok = true;
329  for (uint & pid : del_pids)
330  ok &= RemovePIDFilter(pid);
331 
332  // Add PIDs
333  for (auto & pid : add_pids)
334  ok &= AddPIDFilter(pid);
335 
336  // Cycle filters if it's been a while
337  if (m_cycleTimer.isRunning() && (m_cycleTimer.elapsed() > 1s))
339 
340  return ok;
341 }
342 
344 {
345  QMutexLocker reading_locker(&m_listenerLock);
346 
348 
349  for (auto it = m_streamDataList.cbegin(); it != m_streamDataList.cend(); ++it)
350  tmp = std::max(tmp, it.key()->GetPIDPriority(pid));
351 
352  return tmp;
353 }
354 
355 void StreamHandler::WriteMPTS(const unsigned char * buffer, uint len)
356 {
357  if (m_mptsTfw == nullptr)
358  return;
359  m_mptsTfw->Write(buffer, len);
360 }
361 
363 {
364 #if !defined( USING_MINGW ) && !defined( _MSC_VER )
365  QMutexLocker lk(&m_mptsLock);
366 
367  m_mptsFiles.insert(file);
368  QString fn = QString("%1.raw").arg(file);
369 
370  if (m_mptsFiles.size() == 1)
371  {
372  m_mptsBaseFile = fn;
373  m_mptsTfw = new ThreadedFileWriter(fn,
374  O_WRONLY|O_TRUNC|O_CREAT|O_LARGEFILE,
375  0644);
376  if (!m_mptsTfw->Open())
377  {
378  delete m_mptsTfw;
379  m_mptsTfw = nullptr;
380  return false;
381  }
382  LOG(VB_RECORD, LOG_INFO, LOC +
383  QString("Opened '%1'").arg(m_mptsBaseFile));
384  }
385  else
386  {
387  if (link(m_mptsBaseFile.toLocal8Bit(), fn.toLocal8Bit()) < 0)
388  {
389  LOG(VB_GENERAL, LOG_ERR, LOC +
390  QString("Failed to link '%1' to '%2'")
391  .arg(m_mptsBaseFile, fn) + ENO);
392  }
393  else
394  {
395  LOG(VB_RECORD, LOG_INFO, LOC +
396  QString("linked '%1' to '%2'")
397  .arg(m_mptsBaseFile, fn));
398  }
399  }
400 #else
401  Q_UNUSED(file);
402 #endif // !defined( USING_MINGW ) && !defined( _MSC_VER )
403  return true;
404 }
405 
407 {
408 #if !defined( USING_MINGW ) && !defined( _MSC_VER )
409  QMutexLocker lk(&m_mptsLock);
410 
411  QSet<QString>::iterator it = m_mptsFiles.find(file);
412  if (it != m_mptsFiles.end())
413  {
414  m_mptsFiles.erase(it);
415  if (m_mptsFiles.isEmpty())
416  {
417  delete m_mptsTfw;
418  m_mptsTfw = nullptr;
419  }
420  }
421 #else
422  Q_UNUSED(file);
423 #endif // !defined( USING_MINGW ) && !defined( _MSC_VER )
424 }
MythTimer::elapsed
std::chrono::milliseconds elapsed(void)
Returns milliseconds elapsed since last start() or restart()
Definition: mythtimer.cpp:91
MThread::start
void start(QThread::Priority p=QThread::InheritPriority)
Tell MThread to start running the thread in the near future.
Definition: mthread.cpp:283
ENO
#define ENO
This can be appended to the LOG args with "+".
Definition: mythlogging.h:73
streamhandler.h
StreamHandler::RemoveAllPIDFilters
bool RemoveAllPIDFilters(void)
Definition: streamhandler.cpp:240
StreamHandler::SetRunning
void SetRunning(bool running, bool using_buffering, bool using_section_reader)
Definition: streamhandler.cpp:173
StreamHandler::WriteMPTS
void WriteMPTS(const unsigned char *buffer, uint len)
Write out a copy of the raw MPTS.
Definition: streamhandler.cpp:355
StreamHandler::Stop
void Stop(void)
Definition: streamhandler.cpp:155
StreamHandler::CycleFiltersByPriority
virtual void CycleFiltersByPriority()
Definition: streamhandler.h:93
StreamHandler::m_mptsTfw
ThreadedFileWriter * m_mptsTfw
Definition: streamhandler.h:144
MThread::wait
bool wait(std::chrono::milliseconds time=std::chrono::milliseconds::max())
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:300
PIDInfo
Definition: streamhandler.h:29
MythTimer::isRunning
bool isRunning(void) const
Returns true if start() or restart() has been called at least once since construction and since any c...
Definition: mythtimer.cpp:135
O_LARGEFILE
#define O_LARGEFILE
Definition: streamhandler.cpp:12
LOG
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
Definition: mythlogging.h:39
build_compdb.file
file
Definition: build_compdb.py:55
kPIDPriorityNone
@ kPIDPriorityNone
Definition: mpegstreamdata.h:78
StreamHandler::AddNamedOutputFile
virtual bool AddNamedOutputFile(const QString &filename)
Called with _listener_lock locked just after adding new output file.
Definition: streamhandler.cpp:362
StreamHandler::m_startStopLock
QMutex m_startStopLock
Definition: streamhandler.h:120
StreamHandler::m_filtersChanged
bool m_filtersChanged
Definition: streamhandler.h:141
StreamHandler::GetPIDPriority
PIDPriority GetPIDPriority(uint pid) const
Definition: streamhandler.cpp:343
StreamHandler::RemoveListener
virtual void RemoveListener(MPEGStreamData *data)
Definition: streamhandler.cpp:80
tmp
static guint32 * tmp
Definition: goom_core.cpp:26
StreamHandler::m_openPidFilters
uint m_openPidFilters
Definition: streamhandler.h:140
StreamHandler::m_usingSectionReader
bool m_usingSectionReader
Definition: streamhandler.h:130
StreamHandler::m_needsBuffering
bool m_needsBuffering
Definition: streamhandler.h:115
threadedfilewriter.h
StreamHandler::m_listenerLock
QRecursiveMutex m_listenerLock
Definition: streamhandler.h:153
StreamHandler::m_running
bool m_running
Definition: streamhandler.h:127
StreamHandler::m_pidInfo
PIDInfoMap m_pidInfo
Definition: streamhandler.h:139
StreamHandler::m_pidLock
QRecursiveMutex m_pidLock
Definition: streamhandler.h:136
StreamHandler::SetRunningDesired
virtual void SetRunningDesired(bool desired)
At minimum this sets _running_desired, this may also send signals to anything that might be blocking ...
Definition: streamhandler.cpp:184
StreamHandler::m_mptsLock
QMutex m_mptsLock
Definition: streamhandler.h:147
StreamHandler::m_usingBuffering
bool m_usingBuffering
Definition: streamhandler.h:129
StreamHandler::UpdateFiltersFromStreamData
bool UpdateFiltersFromStreamData(void)
Definition: streamhandler.cpp:290
MPEGStreamData
Encapsulates data about MPEG stream and emits events for each table.
Definition: mpegstreamdata.h:85
StreamHandler::RemoveNamedOutputFile
virtual void RemoveNamedOutputFile(const QString &filename)
Called with _listener_lock locked just before removing old output file.
Definition: streamhandler.cpp:406
uint
unsigned int uint
Definition: compat.h:79
StreamHandler::CreatePIDInfo
virtual PIDInfo * CreatePIDInfo(uint pid, uint stream_type, int pes_type)
Definition: streamhandler.h:101
pid_map_t
QMap< uint, PIDPriority > pid_map_t
Definition: mpegstreamdata.h:83
StreamHandler::~StreamHandler
~StreamHandler() override
Definition: streamhandler.cpp:17
ThreadedFileWriter::Write
int Write(const void *data, uint count)
Writes data to the end of the write buffer.
Definition: threadedfilewriter.cpp:190
StreamHandler::UpdateListeningForEIT
void UpdateListeningForEIT(void)
Definition: streamhandler.cpp:259
StreamHandler::AddPIDFilter
bool AddPIDFilter(PIDInfo *info)
Definition: streamhandler.cpp:191
LOC
#define LOC
Definition: streamhandler.cpp:15
ThreadedFileWriter
This class supports the writing of recordings to disk.
Definition: threadedfilewriter.h:42
StreamHandler::m_mptsFiles
QSet< QString > m_mptsFiles
Definition: streamhandler.h:145
StreamHandler::UpdateFilters
virtual bool UpdateFilters(void)
Definition: streamhandler.h:92
StreamHandler::m_cycleTimer
MythTimer m_cycleTimer
Definition: streamhandler.h:142
PIDInfo::m_pid
uint m_pid
Definition: streamhandler.h:43
StreamHandler::m_runningDesired
volatile bool m_runningDesired
Definition: streamhandler.h:121
StreamHandler::m_runningStateChanged
QWaitCondition m_runningStateChanged
Definition: streamhandler.h:131
StreamHandler::m_mptsBaseFile
QString m_mptsBaseFile
Definition: streamhandler.h:146
StreamHandler::m_streamDataList
StreamDataList m_streamDataList
Definition: streamhandler.h:155
StreamHandler::RemovePIDFilter
bool RemovePIDFilter(uint pid)
Definition: streamhandler.cpp:208
MThread::exit
void exit(int retcode=0)
Use this to exit from the thread if you are using a Qt event loop.
Definition: mthread.cpp:278
MPEGStreamData::GetEITPIDChanges
virtual bool GetEITPIDChanges(const uint_vec_t &, uint_vec_t &, uint_vec_t &) const
Definition: mpegstreamdata.h:105
StreamHandler::m_bError
volatile bool m_bError
Definition: streamhandler.h:126
StreamHandler::m_eitPids
std::vector< uint > m_eitPids
Definition: streamhandler.h:138
StreamHandler::AddListener
virtual void AddListener(MPEGStreamData *data, bool allow_section_reader=false, bool needs_buffering=false, const QString &output_file=QString())
Definition: streamhandler.cpp:35
MPEGStreamData::AddListeningPID
virtual void AddListeningPID(uint pid, PIDPriority priority=kPIDPriorityNormal)
Definition: mpegstreamdata.h:120
ThreadedFileWriter::Open
bool Open(void)
Opens the file we will be writing to.
Definition: threadedfilewriter.cpp:92
StreamHandler::m_addRmLock
QMutex m_addRmLock
Definition: streamhandler.h:118
StreamHandler::m_restarting
bool m_restarting
Definition: streamhandler.h:128
StreamHandler::IsRunning
bool IsRunning(void) const
Definition: streamhandler.cpp:163
MPEGStreamData::HasEITPIDChanges
virtual bool HasEITPIDChanges(const uint_vec_t &) const
Definition: mpegstreamdata.h:103
StreamHandler::m_device
QString m_device
Definition: streamhandler.h:113
StreamHandler::Start
void Start(void)
Definition: streamhandler.cpp:117
PIDPriority
PIDPriority
Definition: mpegstreamdata.h:76
MPEGStreamData::RemoveListeningPID
virtual void RemoveListeningPID(uint pid)
Definition: mpegstreamdata.h:135
StreamID::PrivSec
@ PrivSec
ISO 13818-1 private tables & ITU H.222.0.
Definition: mpegtables.h:146
find
static pid_list_t::iterator find(const PIDInfoMap &map, pid_list_t &list, pid_list_t::iterator begin, pid_list_t::iterator end, bool find_open)
Definition: dvbstreamhandler.cpp:363
StreamHandler::m_allowSectionReader
bool m_allowSectionReader
Definition: streamhandler.h:116