MythTV  master
streamhandler.cpp
Go to the documentation of this file.
1 // -*- Mode: c++ -*-
2 
3 // MythTV headers
4 #include "streamhandler.h"
5 
6 #include "threadedfilewriter.h"
7 #include <utility>
8 
9 #ifndef O_LARGEFILE
10 #define O_LARGEFILE 0
11 #endif
12 
13 #define LOC QString("SH[%1](%2): ").arg(m_inputid).arg(m_device)
14 
16 {
17  QMutexLocker locker(&m_add_rm_lock);
18 
19  {
20  QMutexLocker locker2(&m_listener_lock);
21  if (!m_stream_data_list.empty())
22  {
23  LOG(VB_GENERAL, LOG_ERR, LOC +
24  "dtor & _stream_data_list not empty");
25  }
26  }
27 
28  // This should never be triggered.. just to be safe..
29  if (m_running)
30  Stop();
31 }
32 
34  bool allow_section_reader,
35  bool needs_buffering,
36  QString output_file)
37 {
38  QMutexLocker locker(&m_add_rm_lock);
39 
40  LOG(VB_RECORD, LOG_INFO, LOC + QString("AddListener(0x%1) -- begin")
41  .arg((uint64_t)data,0,16));
42  if (!data)
43  {
44  LOG(VB_GENERAL, LOG_ERR, LOC +
45  QString("AddListener(0x%1) -- null data")
46  .arg((uint64_t)data,0,16));
47  return;
48  }
49 
50  m_listener_lock.lock();
51 
52  LOG(VB_RECORD, LOG_INFO, LOC + QString("AddListener(0x%1) -- locked")
53  .arg((uint64_t)data,0,16));
54 
55  if (m_stream_data_list.empty())
56  {
57  QMutexLocker locker2(&m_start_stop_lock);
58  m_allow_section_reader = allow_section_reader;
59  m_needs_buffering = needs_buffering;
60  }
61  else
62  {
63  QMutexLocker locker2(&m_start_stop_lock);
64  m_allow_section_reader &= allow_section_reader;
65  m_needs_buffering |= needs_buffering;
66  }
67 
68  m_stream_data_list[data] = std::move(output_file);
69 
70  m_listener_lock.unlock();
71 
72  Start();
73 
74  LOG(VB_RECORD, LOG_INFO, LOC + QString("AddListener(0x%1) -- end")
75  .arg((uint64_t)data,0,16));
76 }
77 
79 {
80  QMutexLocker locker(&m_add_rm_lock);
81 
82  LOG(VB_RECORD, LOG_INFO, LOC + QString("RemoveListener(0x%1) -- begin")
83  .arg((uint64_t)data,0,16));
84  if (!data)
85  {
86  LOG(VB_GENERAL, LOG_ERR, LOC +
87  QString("RemoveListener(0x%1) -- null data")
88  .arg((uint64_t)data,0,16));
89  return;
90  }
91 
92  m_listener_lock.lock();
93 
94  LOG(VB_RECORD, LOG_INFO, LOC + QString("RemoveListener(0x%1) -- locked")
95  .arg((uint64_t)data,0,16));
96 
97  StreamDataList::iterator it = m_stream_data_list.find(data);
98 
99  if (it != m_stream_data_list.end())
100  {
101  if (!(*it).isEmpty())
103  m_stream_data_list.erase(it);
104  }
105 
106  m_listener_lock.unlock();
107 
108  if (m_stream_data_list.empty())
109  Stop();
110 
111  LOG(VB_RECORD, LOG_INFO, LOC + QString("RemoveListener(0x%1) -- end")
112  .arg((uint64_t)data,0,16));
113 }
114 
116 {
117  QMutexLocker locker(&m_start_stop_lock);
118 
119  if (m_running)
120  {
123  {
124  LOG(VB_RECORD, LOG_INFO, LOC + "Restarting StreamHandler");
125  SetRunningDesired(false);
126  m_restarting = true;
127  locker.unlock();
128  wait();
129  locker.relock();
130  m_restarting = false;
131  }
132  }
133 
134  if (m_running)
135  return;
136 
137  m_eit_pids.clear();
138 
139  m_bError = false;
140  SetRunningDesired(true);
141  MThread::start();
142 
143  while (!m_running && !m_bError && m_running_desired)
145 
146  if (m_bError)
147  {
148  LOG(VB_GENERAL, LOG_ERR, LOC + "Start failed");
149  SetRunningDesired(false);
150  }
151 }
152 
154 {
155  LOG(VB_RECORD, LOG_DEBUG, LOC + "Stopping");
156  SetRunningDesired(false);
157  wait();
158  LOG(VB_RECORD, LOG_DEBUG, LOC + "Stopped");
159 }
160 
161 bool StreamHandler::IsRunning(void) const
162 {
163  // This used to use QMutexLocker, but that sometimes left the
164  // mutex locked on exit, so...
165  m_start_stop_lock.lock();
166  bool r = m_running || m_restarting;
167  m_start_stop_lock.unlock();
168  return r;
169 }
170 
171 void StreamHandler::SetRunning(bool is_running,
172  bool is_using_buffering,
173  bool is_using_section_reader)
174 {
175  QMutexLocker locker(&m_start_stop_lock);
176  m_running = is_running;
177  m_using_buffering = is_using_buffering;
178  m_using_section_reader = is_using_section_reader;
179  m_running_state_changed.wakeAll();
180 }
181 
183 {
184  m_running_desired = desired;
185  if (!desired)
186  MThread::exit(0);
187 }
188 
190 {
191 #ifdef DEBUG_PID_FILTERS
192  LOG(VB_RECORD, LOG_DEBUG, LOC + QString("AddPIDFilter(0x%1)")
193  .arg(info->_pid, 0, 16));
194 #endif // DEBUG_PID_FILTERS
195 
196  QMutexLocker writing_locker(&m_pid_lock);
197  m_pid_info[info->_pid] = info;
198 
200 
201  return true;
202 }
203 
205 {
206 #ifdef DEBUG_PID_FILTERS
207  LOG(VB_RECORD, LOG_DEBUG, LOC +
208  QString("RemovePIDFilter(0x%1)").arg(pid, 0, 16));
209 #endif // DEBUG_PID_FILTERS
210 
211  QMutexLocker write_locker(&m_pid_lock);
212 
213  PIDInfoMap::iterator it = m_pid_info.find(pid);
214  if (it == m_pid_info.end())
215  return false;
216 
217  PIDInfo *tmp = *it;
218  m_pid_info.erase(it);
219 
220  bool ok = true;
221  if (tmp->IsOpen())
222  {
223  ok = tmp->Close(m_device);
225 
227  }
228 
229  delete tmp;
230 
231  return ok;
232 }
233 
235 {
236  QMutexLocker write_locker(&m_pid_lock);
237 
238 #ifdef DEBUG_PID_FILTERS
239  LOG(VB_RECORD, LOG_DEBUG, LOC + "RemoveAllPIDFilters()");
240 #endif // DEBUG_PID_FILTERS
241 
242  vector<int> del_pids;
243  PIDInfoMap::iterator it = m_pid_info.begin();
244  for (; it != m_pid_info.end(); ++it)
245  del_pids.push_back(it.key());
246 
247  bool ok = true;
248  vector<int>::iterator dit = del_pids.begin();
249  for (; dit != del_pids.end(); ++dit)
250  ok &= RemovePIDFilter(*dit);
251 
252  return UpdateFilters() && ok;
253 }
254 
256 {
257  vector<uint> add_eit, del_eit;
258 
259  QMutexLocker read_locker(&m_listener_lock);
260 
261  StreamDataList::const_iterator it1 = m_stream_data_list.begin();
262  for (; it1 != m_stream_data_list.end(); ++it1)
263  {
264  MPEGStreamData *sd = it1.key();
265  if (sd->HasEITPIDChanges(m_eit_pids) &&
266  sd->GetEITPIDChanges(m_eit_pids, add_eit, del_eit))
267  {
268  for (size_t i = 0; i < del_eit.size(); i++)
269  {
270  uint_vec_t::iterator it2;
271  it2 = find(m_eit_pids.begin(), m_eit_pids.end(), del_eit[i]);
272  if (it2 != m_eit_pids.end())
273  m_eit_pids.erase(it2);
274  sd->RemoveListeningPID(del_eit[i]);
275  }
276 
277  for (size_t i = 0; i < add_eit.size(); i++)
278  {
279  m_eit_pids.push_back(add_eit[i]);
280  sd->AddListeningPID(add_eit[i]);
281  }
282  }
283  }
284 }
285 
287 {
289 
290  pid_map_t pids;
291 
292  {
293  QMutexLocker read_locker(&m_listener_lock);
294  StreamDataList::const_iterator it = m_stream_data_list.begin();
295  for (; it != m_stream_data_list.end(); ++it)
296  it.key()->GetPIDs(pids);
297  }
298 
299  QMap<uint, PIDInfo*> add_pids;
300  vector<uint> del_pids;
301 
302  {
303  QMutexLocker read_locker(&m_pid_lock);
304 
305  // PIDs that need to be added..
306  pid_map_t::const_iterator lit = pids.constBegin();
307  for (; lit != pids.constEnd(); ++lit)
308  {
309  if (*lit && (m_pid_info.find(lit.key()) == m_pid_info.end()))
310  {
311  add_pids[lit.key()] = CreatePIDInfo(
312  lit.key(), StreamID::PrivSec, 0);
313  }
314  }
315 
316  // PIDs that need to be removed..
317  PIDInfoMap::const_iterator fit = m_pid_info.begin();
318  for (; fit != m_pid_info.end(); ++fit)
319  {
320  bool in_pids = pids.find(fit.key()) != pids.end();
321  if (!in_pids)
322  del_pids.push_back(fit.key());
323  }
324  }
325 
326  // Remove PIDs
327  bool ok = true;
328  vector<uint>::iterator dit = del_pids.begin();
329  for (; dit != del_pids.end(); ++dit)
330  ok &= RemovePIDFilter(*dit);
331 
332  // Add PIDs
333  QMap<uint, PIDInfo*>::iterator ait = add_pids.begin();
334  for (; ait != add_pids.end(); ++ait)
335  ok &= AddPIDFilter(*ait);
336 
337  // Cycle filters if it's been a while
338  if (m_cycle_timer.isRunning() && (m_cycle_timer.elapsed() > 1000))
340 
341  return ok;
342 }
343 
345 {
346  QMutexLocker reading_locker(&m_listener_lock);
347 
349 
350  StreamDataList::const_iterator it = m_stream_data_list.begin();
351  for (; it != m_stream_data_list.end(); ++it)
352  tmp = max(tmp, it.key()->GetPIDPriority(pid));
353 
354  return tmp;
355 }
356 
357 void StreamHandler::WriteMPTS(unsigned char * buffer, uint len)
358 {
359  if (m_mpts_tfw == nullptr)
360  return;
361  m_mpts_tfw->Write(buffer, len);
362 }
363 
364 bool StreamHandler::AddNamedOutputFile(const QString &file)
365 {
366 #if !defined( USING_MINGW ) && !defined( _MSC_VER )
367  QMutexLocker lk(&m_mpts_lock);
368 
369  m_mpts_files.insert(file);
370  QString fn = QString("%1.raw").arg(file);
371 
372  if (m_mpts_files.size() == 1)
373  {
374  m_mpts_base_file = fn;
376  O_WRONLY|O_TRUNC|O_CREAT|O_LARGEFILE,
377  0644);
378  if (!m_mpts_tfw->Open())
379  {
380  delete m_mpts_tfw;
381  m_mpts_tfw = nullptr;
382  return false;
383  }
384  LOG(VB_RECORD, LOG_INFO, LOC +
385  QString("Opened '%1'").arg(m_mpts_base_file));
386  }
387  else
388  {
389  if (link(m_mpts_base_file.toLocal8Bit(), fn.toLocal8Bit()) < 0)
390  {
391  LOG(VB_GENERAL, LOG_ERR, LOC +
392  QString("Failed to link '%1' to '%2'")
393  .arg(m_mpts_base_file)
394  .arg(fn) +
395  ENO);
396  }
397  else
398  {
399  LOG(VB_RECORD, LOG_INFO, LOC +
400  QString("linked '%1' to '%2'")
401  .arg(m_mpts_base_file)
402  .arg(fn));
403  }
404  }
405 
406 #endif // !defined( USING_MINGW ) && !defined( _MSC_VER )
407  return true;
408 }
409 
410 void StreamHandler::RemoveNamedOutputFile(const QString &file)
411 {
412 #if !defined( USING_MINGW ) && !defined( _MSC_VER )
413  QMutexLocker lk(&m_mpts_lock);
414 
415  QSet<QString>::iterator it = m_mpts_files.find(file);
416  if (it != m_mpts_files.end())
417  {
418  m_mpts_files.erase(it);
419  if (m_mpts_files.isEmpty())
420  {
421  delete m_mpts_tfw;
422  m_mpts_tfw = nullptr;
423  }
424  }
425 #endif // !defined( USING_MINGW ) && !defined( _MSC_VER )
426 }
void start(QThread::Priority=QThread::InheritPriority)
Tell MThread to start running the thread in the near future.
Definition: mthread.cpp:294
#define O_LARGEFILE
virtual PIDInfo * CreatePIDInfo(uint pid, uint stream_type, int pes_type)
Definition: streamhandler.h:98
QString m_mpts_base_file
volatile bool m_running_desired
bool m_needs_buffering
bool UpdateFiltersFromStreamData(void)
static pid_list_t::iterator find(const PIDInfoMap &map, pid_list_t &list, pid_list_t::iterator begin, pid_list_t::iterator end, bool find_open)
QMap< uint, PIDPriority > pid_map_t
QSet< QString > m_mpts_files
bool wait(unsigned long time=ULONG_MAX)
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:311
QWaitCondition m_running_state_changed
QString m_device
virtual void RemoveListener(MPEGStreamData *data)
ISO 13818-1 private tables & ITU H.222.0.
Definition: mpegtables.h:143
unsigned int uint
Definition: compat.h:140
QMutex m_listener_lock
static guint32 * tmp
Definition: goom_core.c:35
bool isRunning(void) const
Returns true if start() or restart() has been called at least once since construction and since any c...
Definition: mythtimer.cpp:134
unsigned char r
Definition: ParseText.cpp:329
virtual bool UpdateFilters(void)
Definition: streamhandler.h:89
virtual void RemoveListeningPID(uint pid)
virtual void RemoveNamedOutputFile(const QString &filename)
Called with _listener_lock locked just before removing old output file.
int Write(const void *data, uint count)
Writes data to the end of the write buffer.
void SetRunning(bool running, bool using_buffering, bool using_section_reader)
void Stop(void)
void Start(void)
#define LOC
QMutex m_add_rm_lock
uint m_open_pid_filters
virtual void AddListener(MPEGStreamData *data, bool allow_section_reader=false, bool needs_buffering=false, QString output_file=QString())
bool RemovePIDFilter(uint pid)
bool m_using_buffering
void exit(int retcode=0)
Use this to exit from the thread if you are using a Qt event loop.
Definition: mthread.cpp:289
PIDInfoMap m_pid_info
#define ENO
This can be appended to the LOG args with "+".
Definition: mythlogging.h:99
int elapsed(void) const
Returns milliseconds elapsed since last start() or restart()
Definition: mythtimer.cpp:90
virtual void AddListeningPID(uint pid, PIDPriority priority=kPIDPriorityNormal)
bool m_using_section_reader
PIDPriority
bool Open(void)
Opens the file we will be writing to.
#define LOG(_MASK_, _LEVEL_, _STRING_)
Definition: mythlogging.h:41
MythTimer m_cycle_timer
StreamDataList m_stream_data_list
virtual void SetRunningDesired(bool desired)
At minimum this sets _running_desired, this may also send signals to anything that might be blocking ...
bool AddPIDFilter(PIDInfo *info)
volatile bool m_bError
bool m_allow_section_reader
virtual void CycleFiltersByPriority()
Definition: streamhandler.h:90
QMutex m_mpts_lock
This class supports the writing of recordings to disk.
ThreadedFileWriter * m_mpts_tfw
QMutex m_start_stop_lock
void WriteMPTS(unsigned char *buffer, uint len)
Write out a copy of the raw MPTS.
virtual bool HasEITPIDChanges(const uint_vec_t &) const
virtual bool GetEITPIDChanges(const uint_vec_t &, uint_vec_t &, uint_vec_t &) const
void UpdateListeningForEIT(void)
uint _pid
Definition: streamhandler.h:40
virtual bool AddNamedOutputFile(const QString &filename)
Called with _listener_lock locked just after adding new output file.
Encapsulates data about MPEG stream and emits events for each table.
bool IsRunning(void) const
vector< uint > m_eit_pids
bool RemoveAllPIDFilters(void)
PIDPriority GetPIDPriority(uint pid) const