MythTV  master
streamhandler.cpp
Go to the documentation of this file.
1 // -*- Mode: c++ -*-
2 
3 // MythTV headers
4 #include "streamhandler.h"
5 
6 #include "threadedfilewriter.h"
7 #include <utility>
8 
9 #ifndef O_LARGEFILE
10 #define O_LARGEFILE 0
11 #endif
12 
13 #define LOC QString("SH[%1]: ").arg(m_inputId)
14 
16 {
17  QMutexLocker locker(&m_addRmLock);
18 
19  {
20  QMutexLocker locker2(&m_listenerLock);
21  if (!m_streamDataList.empty())
22  {
23  LOG(VB_GENERAL, LOG_ERR, LOC +
24  "dtor & _stream_data_list not empty");
25  }
26  }
27 
28  // This should never be triggered.. just to be safe..
29  if (m_running)
30  Stop();
31 }
32 
34  bool allow_section_reader,
35  bool needs_buffering,
36  const QString& output_file)
37 {
38  QMutexLocker locker(&m_addRmLock);
39 
40  LOG(VB_RECORD, LOG_INFO, LOC + QString("AddListener(0x%1) -- begin")
41  .arg((uint64_t)data,0,16));
42  if (!data)
43  {
44  LOG(VB_GENERAL, LOG_ERR, LOC +
45  QString("AddListener(0x%1) -- null data")
46  .arg((uint64_t)data,0,16));
47  return;
48  }
49 
50  m_listenerLock.lock();
51 
52  LOG(VB_RECORD, LOG_INFO, LOC + QString("AddListener(0x%1) -- locked")
53  .arg((uint64_t)data,0,16));
54 
55  if (m_streamDataList.empty())
56  {
57  QMutexLocker locker2(&m_startStopLock);
58  m_allowSectionReader = allow_section_reader;
59  m_needsBuffering = needs_buffering;
60  }
61  else
62  {
63  QMutexLocker locker2(&m_startStopLock);
64  m_allowSectionReader &= allow_section_reader;
65  m_needsBuffering |= needs_buffering;
66  }
67 
68  m_streamDataList[data] = output_file;
69 
70  m_listenerLock.unlock();
71 
72  Start();
73 
74  LOG(VB_RECORD, LOG_INFO, LOC + QString("AddListener(0x%1) -- end")
75  .arg((uint64_t)data,0,16));
76 }
77 
79 {
80  QMutexLocker locker(&m_addRmLock);
81 
82  LOG(VB_RECORD, LOG_INFO, LOC + QString("RemoveListener(0x%1) -- begin")
83  .arg((uint64_t)data,0,16));
84  if (!data)
85  {
86  LOG(VB_GENERAL, LOG_ERR, LOC +
87  QString("RemoveListener(0x%1) -- null data")
88  .arg((uint64_t)data,0,16));
89  return;
90  }
91 
92  m_listenerLock.lock();
93 
94  LOG(VB_RECORD, LOG_INFO, LOC + QString("RemoveListener(0x%1) -- locked")
95  .arg((uint64_t)data,0,16));
96 
97  StreamDataList::iterator it = m_streamDataList.find(data);
98 
99  if (it != m_streamDataList.end())
100  {
101  if (!(*it).isEmpty())
103  m_streamDataList.erase(it);
104  }
105 
106  m_listenerLock.unlock();
107 
108  if (m_streamDataList.empty())
109  Stop();
110 
111  LOG(VB_RECORD, LOG_INFO, LOC + QString("RemoveListener(0x%1) -- end")
112  .arg((uint64_t)data,0,16));
113 }
114 
116 {
117  QMutexLocker locker(&m_startStopLock);
118 
119  if (m_running)
120  {
123  {
124  LOG(VB_RECORD, LOG_INFO, LOC + "Restarting StreamHandler");
125  SetRunningDesired(false);
126  m_restarting = true;
127  locker.unlock();
128  wait();
129  locker.relock();
130  m_restarting = false;
131  }
132  }
133 
134  if (m_running)
135  return;
136 
137  m_eitPids.clear();
138 
139  m_bError = false;
140  SetRunningDesired(true);
141  MThread::start();
142 
143  while (!m_running && !m_bError && m_runningDesired)
145 
146  if (m_bError)
147  {
148  LOG(VB_GENERAL, LOG_ERR, LOC + "Start failed");
149  SetRunningDesired(false);
150  }
151 }
152 
154 {
155  LOG(VB_RECORD, LOG_DEBUG, LOC + "Stopping");
157  wait();
158  LOG(VB_RECORD, LOG_DEBUG, LOC + "Stopped");
159 }
160 
161 bool StreamHandler::IsRunning(void) const
162 {
163  // This used to use QMutexLocker, but that sometimes left the
164  // mutex locked on exit, so...
165  m_startStopLock.lock();
166  bool r = m_running || m_restarting;
167  m_startStopLock.unlock();
168  return r;
169 }
170 
171 void StreamHandler::SetRunning(bool is_running,
172  bool is_using_buffering,
173  bool is_using_section_reader)
174 {
175  QMutexLocker locker(&m_startStopLock);
176  m_running = is_running;
177  m_usingBuffering = is_using_buffering;
178  m_usingSectionReader = is_using_section_reader;
179  m_runningStateChanged.wakeAll();
180 }
181 
183 {
184  m_runningDesired = desired;
185  if (!desired)
186  MThread::exit(0);
187 }
188 
190 {
191 #ifdef DEBUG_PID_FILTERS
192  LOG(VB_RECORD, LOG_DEBUG, LOC + QString("AddPIDFilter(0x%1)")
193  .arg(info->m_pid, 0, 16));
194 #endif // DEBUG_PID_FILTERS
195 
196  QMutexLocker writing_locker(&m_pidLock);
197  m_pidInfo[info->m_pid] = info;
198 
199  m_filtersChanged = true;
200 
202 
203  return true;
204 }
205 
207 {
208 #ifdef DEBUG_PID_FILTERS
209  LOG(VB_RECORD, LOG_DEBUG, LOC +
210  QString("RemovePIDFilter(0x%1)").arg(pid, 0, 16));
211 #endif // DEBUG_PID_FILTERS
212 
213  QMutexLocker write_locker(&m_pidLock);
214 
215  PIDInfoMap::iterator it = m_pidInfo.find(pid);
216  if (it == m_pidInfo.end())
217  return false;
218 
219  PIDInfo *tmp = *it;
220  m_pidInfo.erase(it);
221 
222  bool ok = true;
223  if (tmp->IsOpen())
224  {
225  ok = tmp->Close(m_device);
227 
229  }
230 
231  delete tmp;
232 
233  m_filtersChanged = true;
234 
235  return ok;
236 }
237 
239 {
240  QMutexLocker write_locker(&m_pidLock);
241 
242 #ifdef DEBUG_PID_FILTERS
243  LOG(VB_RECORD, LOG_DEBUG, LOC + "RemoveAllPIDFilters()");
244 #endif // DEBUG_PID_FILTERS
245 
246  std::vector<int> del_pids;
247  for (auto it = m_pidInfo.begin(); it != m_pidInfo.end(); ++it)
248  del_pids.push_back(it.key());
249 
250  bool ok = true;
251  for (int & pid : del_pids)
252  ok &= RemovePIDFilter(pid);
253 
254  return UpdateFilters() && ok;
255 }
256 
258 {
259  QMutexLocker read_locker(&m_listenerLock);
260 
261  for (auto it1 = m_streamDataList.cbegin(); it1 != m_streamDataList.cend(); ++it1)
262  {
263  std::vector<uint> add_eit;
264  std::vector<uint> del_eit;
265 
266  MPEGStreamData *sd = it1.key();
267  if (sd->HasEITPIDChanges(m_eitPids) &&
268  sd->GetEITPIDChanges(m_eitPids, add_eit, del_eit))
269  {
270  for (uint eit : del_eit)
271  {
272  uint_vec_t::iterator it2;
273  it2 = find(m_eitPids.begin(), m_eitPids.end(), eit);
274  if (it2 != m_eitPids.end())
275  m_eitPids.erase(it2);
276  sd->RemoveListeningPID(eit);
277  }
278 
279  for (uint eit : add_eit)
280  {
281  m_eitPids.push_back(eit);
282  sd->AddListeningPID(eit);
283  }
284  }
285  }
286 }
287 
289 {
291 
292  pid_map_t pids;
293 
294  {
295  QMutexLocker read_locker(&m_listenerLock);
296  for (auto it = m_streamDataList.cbegin(); it != m_streamDataList.cend(); ++it)
297  it.key()->GetPIDs(pids);
298  }
299 
300  QMap<uint, PIDInfo*> add_pids;
301  std::vector<uint> del_pids;
302 
303  {
304  QMutexLocker read_locker(&m_pidLock);
305 
306  // PIDs that need to be added..
307  for (auto lit = pids.constBegin(); lit != pids.constEnd(); ++lit)
308  {
309  if ((*lit != 0U) && (m_pidInfo.find(lit.key()) == m_pidInfo.end()))
310  {
311  add_pids[lit.key()] = CreatePIDInfo(
312  lit.key(), StreamID::PrivSec, 0);
313  }
314  }
315 
316  // PIDs that need to be removed..
317  for (auto fit = m_pidInfo.cbegin(); fit != m_pidInfo.cend(); ++fit)
318  {
319  bool in_pids = pids.find(fit.key()) != pids.end();
320  if (!in_pids)
321  del_pids.push_back(fit.key());
322  }
323  }
324 
325  // Remove PIDs
326  bool ok = true;
327  for (uint & pid : del_pids)
328  ok &= RemovePIDFilter(pid);
329 
330  // Add PIDs
331  for (auto & pid : add_pids)
332  ok &= AddPIDFilter(pid);
333 
334  // Cycle filters if it's been a while
335  if (m_cycleTimer.isRunning() && (m_cycleTimer.elapsed() > 1s))
337 
338  return ok;
339 }
340 
342 {
343  QMutexLocker reading_locker(&m_listenerLock);
344 
346 
347  for (auto it = m_streamDataList.cbegin(); it != m_streamDataList.cend(); ++it)
348  tmp = std::max(tmp, it.key()->GetPIDPriority(pid));
349 
350  return tmp;
351 }
352 
353 void StreamHandler::WriteMPTS(const unsigned char * buffer, uint len)
354 {
355  if (m_mptsTfw == nullptr)
356  return;
357  m_mptsTfw->Write(buffer, len);
358 }
359 
361 {
362 #if !defined( USING_MINGW ) && !defined( _MSC_VER )
363  QMutexLocker lk(&m_mptsLock);
364 
365  m_mptsFiles.insert(file);
366  QString fn = QString("%1.raw").arg(file);
367 
368  if (m_mptsFiles.size() == 1)
369  {
370  m_mptsBaseFile = fn;
371  m_mptsTfw = new ThreadedFileWriter(fn,
372  O_WRONLY|O_TRUNC|O_CREAT|O_LARGEFILE,
373  0644);
374  if (!m_mptsTfw->Open())
375  {
376  delete m_mptsTfw;
377  m_mptsTfw = nullptr;
378  return false;
379  }
380  LOG(VB_RECORD, LOG_INFO, LOC +
381  QString("Opened '%1'").arg(m_mptsBaseFile));
382  }
383  else
384  {
385  if (link(m_mptsBaseFile.toLocal8Bit(), fn.toLocal8Bit()) < 0)
386  {
387  LOG(VB_GENERAL, LOG_ERR, LOC +
388  QString("Failed to link '%1' to '%2'")
389  .arg(m_mptsBaseFile, fn) + ENO);
390  }
391  else
392  {
393  LOG(VB_RECORD, LOG_INFO, LOC +
394  QString("linked '%1' to '%2'")
395  .arg(m_mptsBaseFile, fn));
396  }
397  }
398 
399 #endif // !defined( USING_MINGW ) && !defined( _MSC_VER )
400  return true;
401 }
402 
404 {
405 #if !defined( USING_MINGW ) && !defined( _MSC_VER )
406  QMutexLocker lk(&m_mptsLock);
407 
408  QSet<QString>::iterator it = m_mptsFiles.find(file);
409  if (it != m_mptsFiles.end())
410  {
411  m_mptsFiles.erase(it);
412  if (m_mptsFiles.isEmpty())
413  {
414  delete m_mptsTfw;
415  m_mptsTfw = nullptr;
416  }
417  }
418 #endif // !defined( USING_MINGW ) && !defined( _MSC_VER )
419 }
MythTimer::elapsed
std::chrono::milliseconds elapsed(void)
Returns milliseconds elapsed since last start() or restart()
Definition: mythtimer.cpp:91
MThread::start
void start(QThread::Priority p=QThread::InheritPriority)
Tell MThread to start running the thread in the near future.
Definition: mthread.cpp:286
ENO
#define ENO
This can be appended to the LOG args with "+".
Definition: mythlogging.h:72
streamhandler.h
StreamHandler::RemoveAllPIDFilters
bool RemoveAllPIDFilters(void)
Definition: streamhandler.cpp:238
StreamHandler::SetRunning
void SetRunning(bool running, bool using_buffering, bool using_section_reader)
Definition: streamhandler.cpp:171
StreamHandler::WriteMPTS
void WriteMPTS(const unsigned char *buffer, uint len)
Write out a copy of the raw MPTS.
Definition: streamhandler.cpp:353
StreamHandler::Stop
void Stop(void)
Definition: streamhandler.cpp:153
StreamHandler::CycleFiltersByPriority
virtual void CycleFiltersByPriority()
Definition: streamhandler.h:92
StreamHandler::m_mptsTfw
ThreadedFileWriter * m_mptsTfw
Definition: streamhandler.h:143
MThread::wait
bool wait(std::chrono::milliseconds time=std::chrono::milliseconds::max())
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:303
PIDInfo
Definition: streamhandler.h:28
MythTimer::isRunning
bool isRunning(void) const
Returns true if start() or restart() has been called at least once since construction and since any c...
Definition: mythtimer.cpp:135
O_LARGEFILE
#define O_LARGEFILE
Definition: streamhandler.cpp:10
LOG
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
Definition: mythlogging.h:23
build_compdb.file
file
Definition: build_compdb.py:55
kPIDPriorityNone
@ kPIDPriorityNone
Definition: mpegstreamdata.h:77
StreamHandler::AddNamedOutputFile
virtual bool AddNamedOutputFile(const QString &filename)
Called with _listener_lock locked just after adding new output file.
Definition: streamhandler.cpp:360
StreamHandler::m_startStopLock
QMutex m_startStopLock
Definition: streamhandler.h:119
StreamHandler::m_filtersChanged
bool m_filtersChanged
Definition: streamhandler.h:140
StreamHandler::GetPIDPriority
PIDPriority GetPIDPriority(uint pid) const
Definition: streamhandler.cpp:341
StreamHandler::RemoveListener
virtual void RemoveListener(MPEGStreamData *data)
Definition: streamhandler.cpp:78
tmp
static guint32 * tmp
Definition: goom_core.cpp:31
StreamHandler::m_openPidFilters
uint m_openPidFilters
Definition: streamhandler.h:139
StreamHandler::m_usingSectionReader
bool m_usingSectionReader
Definition: streamhandler.h:129
StreamHandler::m_needsBuffering
bool m_needsBuffering
Definition: streamhandler.h:114
threadedfilewriter.h
StreamHandler::m_listenerLock
QRecursiveMutex m_listenerLock
Definition: streamhandler.h:152
StreamHandler::m_running
bool m_running
Definition: streamhandler.h:126
StreamHandler::m_pidInfo
PIDInfoMap m_pidInfo
Definition: streamhandler.h:138
StreamHandler::m_pidLock
QRecursiveMutex m_pidLock
Definition: streamhandler.h:135
StreamHandler::SetRunningDesired
virtual void SetRunningDesired(bool desired)
At minimum this sets _running_desired, this may also send signals to anything that might be blocking ...
Definition: streamhandler.cpp:182
StreamHandler::m_mptsLock
QMutex m_mptsLock
Definition: streamhandler.h:146
StreamHandler::m_usingBuffering
bool m_usingBuffering
Definition: streamhandler.h:128
StreamHandler::UpdateFiltersFromStreamData
bool UpdateFiltersFromStreamData(void)
Definition: streamhandler.cpp:288
MPEGStreamData
Encapsulates data about MPEG stream and emits events for each table.
Definition: mpegstreamdata.h:84
StreamHandler::RemoveNamedOutputFile
virtual void RemoveNamedOutputFile(const QString &filename)
Called with _listener_lock locked just before removing old output file.
Definition: streamhandler.cpp:403
uint
unsigned int uint
Definition: compat.h:144
StreamHandler::CreatePIDInfo
virtual PIDInfo * CreatePIDInfo(uint pid, uint stream_type, int pes_type)
Definition: streamhandler.h:100
pid_map_t
QMap< uint, PIDPriority > pid_map_t
Definition: mpegstreamdata.h:82
StreamHandler::~StreamHandler
~StreamHandler() override
Definition: streamhandler.cpp:15
ThreadedFileWriter::Write
int Write(const void *data, uint count)
Writes data to the end of the write buffer.
Definition: threadedfilewriter.cpp:190
StreamHandler::UpdateListeningForEIT
void UpdateListeningForEIT(void)
Definition: streamhandler.cpp:257
StreamHandler::AddPIDFilter
bool AddPIDFilter(PIDInfo *info)
Definition: streamhandler.cpp:189
LOC
#define LOC
Definition: streamhandler.cpp:13
ThreadedFileWriter
This class supports the writing of recordings to disk.
Definition: threadedfilewriter.h:42
StreamHandler::m_mptsFiles
QSet< QString > m_mptsFiles
Definition: streamhandler.h:144
StreamHandler::UpdateFilters
virtual bool UpdateFilters(void)
Definition: streamhandler.h:91
StreamHandler::m_cycleTimer
MythTimer m_cycleTimer
Definition: streamhandler.h:141
PIDInfo::m_pid
uint m_pid
Definition: streamhandler.h:42
StreamHandler::m_runningDesired
volatile bool m_runningDesired
Definition: streamhandler.h:120
StreamHandler::m_runningStateChanged
QWaitCondition m_runningStateChanged
Definition: streamhandler.h:130
StreamHandler::m_mptsBaseFile
QString m_mptsBaseFile
Definition: streamhandler.h:145
StreamHandler::m_streamDataList
StreamDataList m_streamDataList
Definition: streamhandler.h:154
StreamHandler::RemovePIDFilter
bool RemovePIDFilter(uint pid)
Definition: streamhandler.cpp:206
MThread::exit
void exit(int retcode=0)
Use this to exit from the thread if you are using a Qt event loop.
Definition: mthread.cpp:281
MPEGStreamData::GetEITPIDChanges
virtual bool GetEITPIDChanges(const uint_vec_t &, uint_vec_t &, uint_vec_t &) const
Definition: mpegstreamdata.h:104
StreamHandler::m_bError
volatile bool m_bError
Definition: streamhandler.h:125
StreamHandler::m_eitPids
std::vector< uint > m_eitPids
Definition: streamhandler.h:137
StreamHandler::AddListener
virtual void AddListener(MPEGStreamData *data, bool allow_section_reader=false, bool needs_buffering=false, const QString &output_file=QString())
Definition: streamhandler.cpp:33
MPEGStreamData::AddListeningPID
virtual void AddListeningPID(uint pid, PIDPriority priority=kPIDPriorityNormal)
Definition: mpegstreamdata.h:119
ThreadedFileWriter::Open
bool Open(void)
Opens the file we will be writing to.
Definition: threadedfilewriter.cpp:92
StreamHandler::m_addRmLock
QMutex m_addRmLock
Definition: streamhandler.h:117
StreamHandler::m_restarting
bool m_restarting
Definition: streamhandler.h:127
StreamHandler::IsRunning
bool IsRunning(void) const
Definition: streamhandler.cpp:161
MPEGStreamData::HasEITPIDChanges
virtual bool HasEITPIDChanges(const uint_vec_t &) const
Definition: mpegstreamdata.h:102
StreamHandler::m_device
QString m_device
Definition: streamhandler.h:112
StreamHandler::Start
void Start(void)
Definition: streamhandler.cpp:115
PIDPriority
PIDPriority
Definition: mpegstreamdata.h:75
MPEGStreamData::RemoveListeningPID
virtual void RemoveListeningPID(uint pid)
Definition: mpegstreamdata.h:134
StreamID::PrivSec
@ PrivSec
ISO 13818-1 private tables & ITU H.222.0.
Definition: mpegtables.h:146
find
static pid_list_t::iterator find(const PIDInfoMap &map, pid_list_t &list, pid_list_t::iterator begin, pid_list_t::iterator end, bool find_open)
Definition: dvbstreamhandler.cpp:356
StreamHandler::m_allowSectionReader
bool m_allowSectionReader
Definition: streamhandler.h:115