MythTV  master
streamhandler.cpp
Go to the documentation of this file.
1 // -*- Mode: c++ -*-
2 
3 // MythTV headers
4 #include "streamhandler.h"
5 
6 #include "threadedfilewriter.h"
7 #include <utility>
8 
9 #ifndef O_LARGEFILE
10 #define O_LARGEFILE 0
11 #endif
12 
13 #define LOC QString("SH[%1](%2): ").arg(m_inputId).arg(m_device)
14 
16 {
17  QMutexLocker locker(&m_addRmLock);
18 
19  {
20  QMutexLocker locker2(&m_listenerLock);
21  if (!m_streamDataList.empty())
22  {
23  LOG(VB_GENERAL, LOG_ERR, LOC +
24  "dtor & _stream_data_list not empty");
25  }
26  }
27 
28  // This should never be triggered.. just to be safe..
29  if (m_running)
30  Stop();
31 }
32 
34  bool allow_section_reader,
35  bool needs_buffering,
36  QString output_file)
37 {
38  QMutexLocker locker(&m_addRmLock);
39 
40  LOG(VB_RECORD, LOG_INFO, LOC + QString("AddListener(0x%1) -- begin")
41  .arg((uint64_t)data,0,16));
42  if (!data)
43  {
44  LOG(VB_GENERAL, LOG_ERR, LOC +
45  QString("AddListener(0x%1) -- null data")
46  .arg((uint64_t)data,0,16));
47  return;
48  }
49 
50  m_listenerLock.lock();
51 
52  LOG(VB_RECORD, LOG_INFO, LOC + QString("AddListener(0x%1) -- locked")
53  .arg((uint64_t)data,0,16));
54 
55  if (m_streamDataList.empty())
56  {
57  QMutexLocker locker2(&m_startStopLock);
58  m_allowSectionReader = allow_section_reader;
59  m_needsBuffering = needs_buffering;
60  }
61  else
62  {
63  QMutexLocker locker2(&m_startStopLock);
64  m_allowSectionReader &= allow_section_reader;
65  m_needsBuffering |= needs_buffering;
66  }
67 
68  m_streamDataList[data] = std::move(output_file);
69 
70  m_listenerLock.unlock();
71 
72  Start();
73 
74  LOG(VB_RECORD, LOG_INFO, LOC + QString("AddListener(0x%1) -- end")
75  .arg((uint64_t)data,0,16));
76 }
77 
79 {
80  QMutexLocker locker(&m_addRmLock);
81 
82  LOG(VB_RECORD, LOG_INFO, LOC + QString("RemoveListener(0x%1) -- begin")
83  .arg((uint64_t)data,0,16));
84  if (!data)
85  {
86  LOG(VB_GENERAL, LOG_ERR, LOC +
87  QString("RemoveListener(0x%1) -- null data")
88  .arg((uint64_t)data,0,16));
89  return;
90  }
91 
92  m_listenerLock.lock();
93 
94  LOG(VB_RECORD, LOG_INFO, LOC + QString("RemoveListener(0x%1) -- locked")
95  .arg((uint64_t)data,0,16));
96 
97  StreamDataList::iterator it = m_streamDataList.find(data);
98 
99  if (it != m_streamDataList.end())
100  {
101  if (!(*it).isEmpty())
103  m_streamDataList.erase(it);
104  }
105 
106  m_listenerLock.unlock();
107 
108  if (m_streamDataList.empty())
109  Stop();
110 
111  LOG(VB_RECORD, LOG_INFO, LOC + QString("RemoveListener(0x%1) -- end")
112  .arg((uint64_t)data,0,16));
113 }
114 
116 {
117  QMutexLocker locker(&m_startStopLock);
118 
119  if (m_running)
120  {
123  {
124  LOG(VB_RECORD, LOG_INFO, LOC + "Restarting StreamHandler");
125  SetRunningDesired(false);
126  m_restarting = true;
127  locker.unlock();
128  wait();
129  locker.relock();
130  m_restarting = false;
131  }
132  }
133 
134  if (m_running)
135  return;
136 
137  m_eitPids.clear();
138 
139  m_bError = false;
140  SetRunningDesired(true);
141  MThread::start();
142 
143  while (!m_running && !m_bError && m_runningDesired)
145 
146  if (m_bError)
147  {
148  LOG(VB_GENERAL, LOG_ERR, LOC + "Start failed");
149  SetRunningDesired(false);
150  }
151 }
152 
154 {
155  LOG(VB_RECORD, LOG_DEBUG, LOC + "Stopping");
157  wait();
158  LOG(VB_RECORD, LOG_DEBUG, LOC + "Stopped");
159 }
160 
161 bool StreamHandler::IsRunning(void) const
162 {
163  // This used to use QMutexLocker, but that sometimes left the
164  // mutex locked on exit, so...
165  m_startStopLock.lock();
166  bool r = m_running || m_restarting;
167  m_startStopLock.unlock();
168  return r;
169 }
170 
171 void StreamHandler::SetRunning(bool is_running,
172  bool is_using_buffering,
173  bool is_using_section_reader)
174 {
175  QMutexLocker locker(&m_startStopLock);
176  m_running = is_running;
177  m_usingBuffering = is_using_buffering;
178  m_usingSectionReader = is_using_section_reader;
179  m_runningStateChanged.wakeAll();
180 }
181 
183 {
184  m_runningDesired = desired;
185  if (!desired)
186  MThread::exit(0);
187 }
188 
190 {
191 #ifdef DEBUG_PID_FILTERS
192  LOG(VB_RECORD, LOG_DEBUG, LOC + QString("AddPIDFilter(0x%1)")
193  .arg(info->m_pid, 0, 16));
194 #endif // DEBUG_PID_FILTERS
195 
196  QMutexLocker writing_locker(&m_pidLock);
197  m_pidInfo[info->m_pid] = info;
198 
200 
201  return true;
202 }
203 
205 {
206 #ifdef DEBUG_PID_FILTERS
207  LOG(VB_RECORD, LOG_DEBUG, LOC +
208  QString("RemovePIDFilter(0x%1)").arg(pid, 0, 16));
209 #endif // DEBUG_PID_FILTERS
210 
211  QMutexLocker write_locker(&m_pidLock);
212 
213  PIDInfoMap::iterator it = m_pidInfo.find(pid);
214  if (it == m_pidInfo.end())
215  return false;
216 
217  PIDInfo *tmp = *it;
218  m_pidInfo.erase(it);
219 
220  bool ok = true;
221  if (tmp->IsOpen())
222  {
223  ok = tmp->Close(m_device);
225 
227  }
228 
229  delete tmp;
230 
231  return ok;
232 }
233 
235 {
236  QMutexLocker write_locker(&m_pidLock);
237 
238 #ifdef DEBUG_PID_FILTERS
239  LOG(VB_RECORD, LOG_DEBUG, LOC + "RemoveAllPIDFilters()");
240 #endif // DEBUG_PID_FILTERS
241 
242  vector<int> del_pids;
243  for (auto it = m_pidInfo.begin(); it != m_pidInfo.end(); ++it)
244  del_pids.push_back(it.key());
245 
246  bool ok = true;
247  for (int & pid : del_pids)
248  ok &= RemovePIDFilter(pid);
249 
250  return UpdateFilters() && ok;
251 }
252 
254 {
255  vector<uint> add_eit;
256  vector<uint> del_eit;
257 
258  QMutexLocker read_locker(&m_listenerLock);
259 
260  for (auto it1 = m_streamDataList.cbegin(); it1 != m_streamDataList.cend(); ++it1)
261  {
262  MPEGStreamData *sd = it1.key();
263  if (sd->HasEITPIDChanges(m_eitPids) &&
264  sd->GetEITPIDChanges(m_eitPids, add_eit, del_eit))
265  {
266  for (uint eit : del_eit)
267  {
268  uint_vec_t::iterator it2;
269  it2 = find(m_eitPids.begin(), m_eitPids.end(), eit);
270  if (it2 != m_eitPids.end())
271  m_eitPids.erase(it2);
272  sd->RemoveListeningPID(eit);
273  }
274 
275  for (uint eit : add_eit)
276  {
277  m_eitPids.push_back(eit);
278  sd->AddListeningPID(eit);
279  }
280  }
281  }
282 }
283 
285 {
287 
288  pid_map_t pids;
289 
290  {
291  QMutexLocker read_locker(&m_listenerLock);
292  for (auto it = m_streamDataList.cbegin(); it != m_streamDataList.cend(); ++it)
293  it.key()->GetPIDs(pids);
294  }
295 
296  QMap<uint, PIDInfo*> add_pids;
297  vector<uint> del_pids;
298 
299  {
300  QMutexLocker read_locker(&m_pidLock);
301 
302  // PIDs that need to be added..
303  for (auto lit = pids.constBegin(); lit != pids.constEnd(); ++lit)
304  {
305  if ((*lit != 0U) && (m_pidInfo.find(lit.key()) == m_pidInfo.end()))
306  {
307  add_pids[lit.key()] = CreatePIDInfo(
308  lit.key(), StreamID::PrivSec, 0);
309  }
310  }
311 
312  // PIDs that need to be removed..
313  for (auto fit = m_pidInfo.cbegin(); fit != m_pidInfo.cend(); ++fit)
314  {
315  bool in_pids = pids.find(fit.key()) != pids.end();
316  if (!in_pids)
317  del_pids.push_back(fit.key());
318  }
319  }
320 
321  // Remove PIDs
322  bool ok = true;
323  for (uint & pid : del_pids)
324  ok &= RemovePIDFilter(pid);
325 
326  // Add PIDs
327  for (auto & pid : add_pids)
328  ok &= AddPIDFilter(pid);
329 
330  // Cycle filters if it's been a while
331  if (m_cycleTimer.isRunning() && (m_cycleTimer.elapsed() > 1000))
333 
334  return ok;
335 }
336 
338 {
339  QMutexLocker reading_locker(&m_listenerLock);
340 
342 
343  for (auto it = m_streamDataList.cbegin(); it != m_streamDataList.cend(); ++it)
344  tmp = max(tmp, it.key()->GetPIDPriority(pid));
345 
346  return tmp;
347 }
348 
349 void StreamHandler::WriteMPTS(const unsigned char * buffer, uint len)
350 {
351  if (m_mptsTfw == nullptr)
352  return;
353  m_mptsTfw->Write(buffer, len);
354 }
355 
357 {
358 #if !defined( USING_MINGW ) && !defined( _MSC_VER )
359  QMutexLocker lk(&m_mptsLock);
360 
361  m_mptsFiles.insert(file);
362  QString fn = QString("%1.raw").arg(file);
363 
364  if (m_mptsFiles.size() == 1)
365  {
366  m_mptsBaseFile = fn;
367  m_mptsTfw = new ThreadedFileWriter(fn,
368  O_WRONLY|O_TRUNC|O_CREAT|O_LARGEFILE,
369  0644);
370  if (!m_mptsTfw->Open())
371  {
372  delete m_mptsTfw;
373  m_mptsTfw = nullptr;
374  return false;
375  }
376  LOG(VB_RECORD, LOG_INFO, LOC +
377  QString("Opened '%1'").arg(m_mptsBaseFile));
378  }
379  else
380  {
381  if (link(m_mptsBaseFile.toLocal8Bit(), fn.toLocal8Bit()) < 0)
382  {
383  LOG(VB_GENERAL, LOG_ERR, LOC +
384  QString("Failed to link '%1' to '%2'")
386  .arg(fn) +
387  ENO);
388  }
389  else
390  {
391  LOG(VB_RECORD, LOG_INFO, LOC +
392  QString("linked '%1' to '%2'")
394  .arg(fn));
395  }
396  }
397 
398 #endif // !defined( USING_MINGW ) && !defined( _MSC_VER )
399  return true;
400 }
401 
403 {
404 #if !defined( USING_MINGW ) && !defined( _MSC_VER )
405  QMutexLocker lk(&m_mptsLock);
406 
407  QSet<QString>::iterator it = m_mptsFiles.find(file);
408  if (it != m_mptsFiles.end())
409  {
410  m_mptsFiles.erase(it);
411  if (m_mptsFiles.isEmpty())
412  {
413  delete m_mptsTfw;
414  m_mptsTfw = nullptr;
415  }
416  }
417 #endif // !defined( USING_MINGW ) && !defined( _MSC_VER )
418 }
MThread::start
void start(QThread::Priority p=QThread::InheritPriority)
Tell MThread to start running the thread in the near future.
Definition: mthread.cpp:292
ENO
#define ENO
This can be appended to the LOG args with "+".
Definition: mythlogging.h:72
streamhandler.h
StreamHandler::RemoveAllPIDFilters
bool RemoveAllPIDFilters(void)
Definition: streamhandler.cpp:234
StreamHandler::SetRunning
void SetRunning(bool running, bool using_buffering, bool using_section_reader)
Definition: streamhandler.cpp:171
StreamHandler::WriteMPTS
void WriteMPTS(const unsigned char *buffer, uint len)
Write out a copy of the raw MPTS.
Definition: streamhandler.cpp:349
StreamHandler::Stop
void Stop(void)
Definition: streamhandler.cpp:153
StreamHandler::CycleFiltersByPriority
virtual void CycleFiltersByPriority()
Definition: streamhandler.h:90
StreamHandler::m_mptsTfw
ThreadedFileWriter * m_mptsTfw
Definition: streamhandler.h:136
PIDInfo
Definition: streamhandler.h:26
StreamID::PrivSec
@ PrivSec
ISO 13818-1 private tables & ITU H.222.0.
Definition: mpegtables.h:146
arg
arg(title).arg(filename).arg(doDelete))
StreamHandler::m_eitPids
vector< uint > m_eitPids
Definition: streamhandler.h:131
MythTimer::isRunning
bool isRunning(void) const
Returns true if start() or restart() has been called at least once since construction and since any c...
Definition: mythtimer.cpp:134
O_LARGEFILE
#define O_LARGEFILE
Definition: streamhandler.cpp:10
LOG
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
Definition: mythlogging.h:23
build_compdb.file
file
Definition: build_compdb.py:55
kPIDPriorityNone
@ kPIDPriorityNone
Definition: mpegstreamdata.h:78
StreamHandler::AddNamedOutputFile
virtual bool AddNamedOutputFile(const QString &filename)
Called with _listener_lock locked just after adding new output file.
Definition: streamhandler.cpp:356
StreamHandler::m_startStopLock
QMutex m_startStopLock
Definition: streamhandler.h:117
StreamHandler::GetPIDPriority
PIDPriority GetPIDPriority(uint pid) const
Definition: streamhandler.cpp:337
StreamHandler::RemoveListener
virtual void RemoveListener(MPEGStreamData *data)
Definition: streamhandler.cpp:78
StreamHandler::AddListener
virtual void AddListener(MPEGStreamData *data, bool allow_section_reader=false, bool needs_buffering=false, QString output_file=QString())
Definition: streamhandler.cpp:33
tmp
static guint32 * tmp
Definition: goom_core.cpp:30
StreamHandler::m_openPidFilters
uint m_openPidFilters
Definition: streamhandler.h:133
StreamHandler::m_usingSectionReader
bool m_usingSectionReader
Definition: streamhandler.h:127
StreamHandler::m_needsBuffering
bool m_needsBuffering
Definition: streamhandler.h:112
threadedfilewriter.h
StreamHandler::m_running
bool m_running
Definition: streamhandler.h:124
StreamHandler::m_pidInfo
PIDInfoMap m_pidInfo
Definition: streamhandler.h:132
StreamHandler::SetRunningDesired
virtual void SetRunningDesired(bool desired)
At minimum this sets _running_desired, this may also send signals to anything that might be blocking ...
Definition: streamhandler.cpp:182
StreamHandler::m_mptsLock
QMutex m_mptsLock
Definition: streamhandler.h:139
StreamHandler::m_usingBuffering
bool m_usingBuffering
Definition: streamhandler.h:126
StreamHandler::UpdateFiltersFromStreamData
bool UpdateFiltersFromStreamData(void)
Definition: streamhandler.cpp:284
MPEGStreamData
Encapsulates data about MPEG stream and emits events for each table.
Definition: mpegstreamdata.h:85
StreamHandler::RemoveNamedOutputFile
virtual void RemoveNamedOutputFile(const QString &filename)
Called with _listener_lock locked just before removing old output file.
Definition: streamhandler.cpp:402
uint
unsigned int uint
Definition: compat.h:140
StreamHandler::CreatePIDInfo
virtual PIDInfo * CreatePIDInfo(uint pid, uint stream_type, int pes_type)
Definition: streamhandler.h:98
pid_map_t
QMap< uint, PIDPriority > pid_map_t
Definition: mpegstreamdata.h:83
StreamHandler::~StreamHandler
~StreamHandler() override
Definition: streamhandler.cpp:15
ThreadedFileWriter::Write
int Write(const void *data, uint count)
Writes data to the end of the write buffer.
Definition: threadedfilewriter.cpp:190
StreamHandler::UpdateListeningForEIT
void UpdateListeningForEIT(void)
Definition: streamhandler.cpp:253
StreamHandler::AddPIDFilter
bool AddPIDFilter(PIDInfo *info)
Definition: streamhandler.cpp:189
LOC
#define LOC
Definition: streamhandler.cpp:13
ThreadedFileWriter
This class supports the writing of recordings to disk.
Definition: threadedfilewriter.h:43
StreamHandler::m_mptsFiles
QSet< QString > m_mptsFiles
Definition: streamhandler.h:137
StreamHandler::UpdateFilters
virtual bool UpdateFilters(void)
Definition: streamhandler.h:89
MythTimer::elapsed
int elapsed(void)
Returns milliseconds elapsed since last start() or restart()
Definition: mythtimer.cpp:90
StreamHandler::m_cycleTimer
MythTimer m_cycleTimer
Definition: streamhandler.h:134
StreamHandler::m_listenerLock
QMutex m_listenerLock
Definition: streamhandler.h:142
StreamHandler::m_pidLock
QMutex m_pidLock
Definition: streamhandler.h:130
PIDInfo::m_pid
uint m_pid
Definition: streamhandler.h:40
StreamHandler::m_runningDesired
volatile bool m_runningDesired
Definition: streamhandler.h:118
StreamHandler::m_runningStateChanged
QWaitCondition m_runningStateChanged
Definition: streamhandler.h:128
StreamHandler::m_mptsBaseFile
QString m_mptsBaseFile
Definition: streamhandler.h:138
StreamHandler::m_streamDataList
StreamDataList m_streamDataList
Definition: streamhandler.h:143
StreamHandler::RemovePIDFilter
bool RemovePIDFilter(uint pid)
Definition: streamhandler.cpp:204
MThread::exit
void exit(int retcode=0)
Use this to exit from the thread if you are using a Qt event loop.
Definition: mthread.cpp:287
MPEGStreamData::GetEITPIDChanges
virtual bool GetEITPIDChanges(const uint_vec_t &, uint_vec_t &, uint_vec_t &) const
Definition: mpegstreamdata.h:105
StreamHandler::m_bError
volatile bool m_bError
Definition: streamhandler.h:123
MPEGStreamData::AddListeningPID
virtual void AddListeningPID(uint pid, PIDPriority priority=kPIDPriorityNormal)
Definition: mpegstreamdata.h:120
ThreadedFileWriter::Open
bool Open(void)
Opens the file we will be writing to.
Definition: threadedfilewriter.cpp:92
StreamHandler::m_addRmLock
QMutex m_addRmLock
Definition: streamhandler.h:115
StreamHandler::m_restarting
bool m_restarting
Definition: streamhandler.h:125
MThread::wait
bool wait(unsigned long time=ULONG_MAX)
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:309
StreamHandler::IsRunning
bool IsRunning(void) const
Definition: streamhandler.cpp:161
MPEGStreamData::HasEITPIDChanges
virtual bool HasEITPIDChanges(const uint_vec_t &) const
Definition: mpegstreamdata.h:103
StreamHandler::m_device
QString m_device
Definition: streamhandler.h:110
StreamHandler::Start
void Start(void)
Definition: streamhandler.cpp:115
PIDPriority
PIDPriority
Definition: mpegstreamdata.h:76
MPEGStreamData::RemoveListeningPID
virtual void RemoveListeningPID(uint pid)
Definition: mpegstreamdata.h:132
find
static pid_list_t::iterator find(const PIDInfoMap &map, pid_list_t &list, pid_list_t::iterator begin, pid_list_t::iterator end, bool find_open)
Definition: dvbstreamhandler.cpp:356
StreamHandler::m_allowSectionReader
bool m_allowSectionReader
Definition: streamhandler.h:113