MythTV master
streamhandler.cpp
Go to the documentation of this file.
1// -*- Mode: c++ -*-
2
3// C++ headers
4#include <utility>
5
6// MythTV headers
7#include "streamhandler.h"
8
11
12#ifndef O_LARGEFILE
13#define O_LARGEFILE 0
14#endif
15
16#define LOC QString("SH[%1]: ").arg(m_inputId)
17
19{
20 QMutexLocker locker(&m_addRmLock);
21
22 {
23 QMutexLocker locker2(&m_listenerLock);
24 if (!m_streamDataList.empty())
25 {
26 LOG(VB_GENERAL, LOG_ERR, LOC +
27 "dtor & _stream_data_list not empty");
28 }
29 }
30
31 // This should never be triggered.. just to be safe..
32 if (m_running)
33 Stop();
34}
35
37 bool allow_section_reader,
38 bool needs_buffering,
39 const QString& output_file)
40{
41 QMutexLocker locker(&m_addRmLock);
42
43 LOG(VB_RECORD, LOG_INFO, LOC + QString("AddListener(0x%1) -- begin")
44 .arg((uint64_t)data,0,16));
45 if (!data)
46 {
47 LOG(VB_GENERAL, LOG_ERR, LOC +
48 QString("AddListener(0x%1) -- null data")
49 .arg((uint64_t)data,0,16));
50 return;
51 }
52
53 m_listenerLock.lock();
54
55 LOG(VB_RECORD, LOG_INFO, LOC + QString("AddListener(0x%1) -- locked")
56 .arg((uint64_t)data,0,16));
57
58 if (m_streamDataList.empty())
59 {
60 QMutexLocker locker2(&m_startStopLock);
61 m_allowSectionReader = allow_section_reader;
62 m_needsBuffering = needs_buffering;
63 }
64 else
65 {
66 QMutexLocker locker2(&m_startStopLock);
67 m_allowSectionReader &= allow_section_reader;
68 m_needsBuffering |= needs_buffering;
69 }
70
71 m_streamDataList[data] = output_file;
72
73 m_listenerLock.unlock();
74
75 Start();
76
77 LOG(VB_RECORD, LOG_INFO, LOC + QString("AddListener(0x%1) -- end")
78 .arg((uint64_t)data,0,16));
79}
80
82{
83 QMutexLocker locker(&m_addRmLock);
84
85 LOG(VB_RECORD, LOG_INFO, LOC + QString("RemoveListener(0x%1) -- begin")
86 .arg((uint64_t)data,0,16));
87 if (!data)
88 {
89 LOG(VB_GENERAL, LOG_ERR, LOC +
90 QString("RemoveListener(0x%1) -- null data")
91 .arg((uint64_t)data,0,16));
92 return;
93 }
94
95 m_listenerLock.lock();
96
97 LOG(VB_RECORD, LOG_INFO, LOC + QString("RemoveListener(0x%1) -- locked")
98 .arg((uint64_t)data,0,16));
99
100 StreamDataList::iterator it = m_streamDataList.find(data);
101
102 if (it != m_streamDataList.end())
103 {
104 if (!(*it).isEmpty())
106 m_streamDataList.erase(it);
107 }
108
109 m_listenerLock.unlock();
110
111 if (m_streamDataList.empty())
112 Stop();
113
114 LOG(VB_RECORD, LOG_INFO, LOC + QString("RemoveListener(0x%1) -- end")
115 .arg((uint64_t)data,0,16));
116}
117
119{
120 QMutexLocker locker(&m_startStopLock);
121
122 if (m_running)
123 {
126 {
127 LOG(VB_RECORD, LOG_INFO, LOC + "Restarting StreamHandler");
128 SetRunningDesired(false);
129 m_restarting = true;
130 locker.unlock();
131 wait();
132 locker.relock();
133 m_restarting = false;
134 }
135 }
136
137 if (m_running)
138 return;
139
140 m_eitPids.clear();
141
142 m_bError = false;
143 SetRunningDesired(true);
145
146 while (!m_running && !m_bError && m_runningDesired)
148
149 if (m_bError)
150 {
151 LOG(VB_GENERAL, LOG_ERR, LOC + "Start failed");
152 SetRunningDesired(false);
153 }
154}
155
157{
158 LOG(VB_RECORD, LOG_DEBUG, LOC + "Stopping");
160 wait();
161 LOG(VB_RECORD, LOG_DEBUG, LOC + "Stopped");
162}
163
165{
166 // This used to use QMutexLocker, but that sometimes left the
167 // mutex locked on exit, so...
168 m_startStopLock.lock();
169 bool r = m_running || m_restarting;
170 m_startStopLock.unlock();
171 return r;
172}
173
174void StreamHandler::SetRunning(bool is_running,
175 bool is_using_buffering,
176 bool is_using_section_reader)
177{
178 QMutexLocker locker(&m_startStopLock);
179 m_running = is_running;
180 m_usingBuffering = is_using_buffering;
181 m_usingSectionReader = is_using_section_reader;
182 m_runningStateChanged.wakeAll();
183}
184
186{
187 m_runningDesired = desired;
188 if (!desired)
189 MThread::exit(0);
190}
191
193{
194#ifdef DEBUG_PID_FILTERS
195 LOG(VB_RECORD, LOG_DEBUG, LOC + QString("AddPIDFilter(0x%1)")
196 .arg(info->m_pid, 0, 16));
197#endif // DEBUG_PID_FILTERS
198
199 QMutexLocker writing_locker(&m_pidLock);
200 m_pidInfo[info->m_pid] = info;
201
202 m_filtersChanged = true;
203
205
206 return true;
207}
208
210{
211#ifdef DEBUG_PID_FILTERS
212 LOG(VB_RECORD, LOG_DEBUG, LOC +
213 QString("RemovePIDFilter(0x%1)").arg(pid, 0, 16));
214#endif // DEBUG_PID_FILTERS
215
216 QMutexLocker write_locker(&m_pidLock);
217
218 PIDInfoMap::iterator it = m_pidInfo.find(pid);
219 if (it == m_pidInfo.end())
220 return false;
221
222 PIDInfo *tmp = *it;
223 m_pidInfo.erase(it);
224
225 bool ok = true;
226 if (tmp->IsOpen())
227 {
228 ok = tmp->Close(m_device);
230
232 }
233
234 delete tmp;
235
236 m_filtersChanged = true;
237
238 return ok;
239}
240
242{
243 QMutexLocker write_locker(&m_pidLock);
244
245#ifdef DEBUG_PID_FILTERS
246 LOG(VB_RECORD, LOG_DEBUG, LOC + "RemoveAllPIDFilters()");
247#endif // DEBUG_PID_FILTERS
248
249 std::vector<int> del_pids;
250 for (auto it = m_pidInfo.begin(); it != m_pidInfo.end(); ++it)
251 del_pids.push_back(it.key());
252
253 bool ok = true;
254 for (int & pid : del_pids)
255 ok &= RemovePIDFilter(pid);
256
257 return UpdateFilters() && ok;
258}
259
261{
262 QMutexLocker read_locker(&m_listenerLock);
263
264 for (auto it1 = m_streamDataList.cbegin(); it1 != m_streamDataList.cend(); ++it1)
265 {
266 std::vector<uint> add_eit;
267 std::vector<uint> del_eit;
268
269 MPEGStreamData *sd = it1.key();
270 if (sd->HasEITPIDChanges(m_eitPids) &&
271 sd->GetEITPIDChanges(m_eitPids, add_eit, del_eit))
272 {
273 for (uint eit : del_eit)
274 {
275 uint_vec_t::iterator it2;
276 it2 = find(m_eitPids.begin(), m_eitPids.end(), eit);
277 if (it2 != m_eitPids.end())
278 m_eitPids.erase(it2);
279 sd->RemoveListeningPID(eit);
280 }
281
282 for (uint eit : add_eit)
283 {
284 m_eitPids.push_back(eit);
285 sd->AddListeningPID(eit);
286 }
287 }
288 }
289}
290
292{
294
295 pid_map_t pids;
296
297 {
298 QMutexLocker read_locker(&m_listenerLock);
299 for (auto it = m_streamDataList.cbegin(); it != m_streamDataList.cend(); ++it)
300 it.key()->GetPIDs(pids);
301 }
302
303 QMap<uint, PIDInfo*> add_pids;
304 std::vector<uint> del_pids;
305
306 {
307 QMutexLocker read_locker(&m_pidLock);
308
309 // PIDs that need to be added..
310 for (auto lit = pids.constBegin(); lit != pids.constEnd(); ++lit)
311 {
312 if ((*lit != 0U) && (!m_pidInfo.contains(lit.key())))
313 {
314 add_pids[lit.key()] = CreatePIDInfo(
315 lit.key(), StreamID::PrivSec, 0);
316 }
317 }
318
319 // PIDs that need to be removed..
320 for (auto fit = m_pidInfo.cbegin(); fit != m_pidInfo.cend(); ++fit)
321 {
322 bool in_pids = pids.contains(fit.key());
323 if (!in_pids)
324 del_pids.push_back(fit.key());
325 }
326 }
327
328 // Remove PIDs
329 bool ok = true;
330 for (uint & pid : del_pids)
331 ok &= RemovePIDFilter(pid);
332
333 // Add PIDs
334 for (auto & pid : add_pids)
335 ok &= AddPIDFilter(pid);
336
337 // Cycle filters if it's been a while
338 if (m_cycleTimer.isRunning() && (m_cycleTimer.elapsed() > 1s))
340
341 return ok;
342}
343
345{
346 QMutexLocker reading_locker(&m_listenerLock);
347
349
350 for (auto it = m_streamDataList.cbegin(); it != m_streamDataList.cend(); ++it)
351 tmp = std::max(tmp, it.key()->GetPIDPriority(pid));
352
353 return tmp;
354}
355
356void StreamHandler::WriteMPTS(const unsigned char * buffer, uint len)
357{
358 if (m_mptsTfw == nullptr)
359 return;
360 m_mptsTfw->Write(buffer, len);
361}
362
363bool StreamHandler::AddNamedOutputFile([[maybe_unused]] const QString &file)
364{
365#if !defined( _WIN32 )
366 QMutexLocker lk(&m_mptsLock);
367
368 m_mptsFiles.insert(file);
369 QString fn = QString("%1.raw").arg(file);
370
371 if (m_mptsFiles.size() == 1)
372 {
373 m_mptsBaseFile = fn;
375 O_WRONLY|O_TRUNC|O_CREAT|O_LARGEFILE,
376 0644);
377 if (!m_mptsTfw->Open())
378 {
379 delete m_mptsTfw;
380 m_mptsTfw = nullptr;
381 return false;
382 }
383 LOG(VB_RECORD, LOG_INFO, LOC +
384 QString("Opened '%1'").arg(m_mptsBaseFile));
385 }
386 else
387 {
388 if (link(m_mptsBaseFile.toLocal8Bit(), fn.toLocal8Bit()) < 0)
389 {
390 LOG(VB_GENERAL, LOG_ERR, LOC +
391 QString("Failed to link '%1' to '%2'")
392 .arg(m_mptsBaseFile, fn) + ENO);
393 }
394 else
395 {
396 LOG(VB_RECORD, LOG_INFO, LOC +
397 QString("linked '%1' to '%2'")
398 .arg(m_mptsBaseFile, fn));
399 }
400 }
401#endif // !defined( _WIN32 )
402 return true;
403}
404
405void StreamHandler::RemoveNamedOutputFile([[maybe_unused]] const QString &file)
406{
407#if !defined( _WIN32 )
408 QMutexLocker lk(&m_mptsLock);
409
410 QSet<QString>::iterator it = m_mptsFiles.find(file);
411 if (it != m_mptsFiles.end())
412 {
413 m_mptsFiles.erase(it);
414 if (m_mptsFiles.isEmpty())
415 {
416 delete m_mptsTfw;
417 m_mptsTfw = nullptr;
418 }
419 }
420#endif // !defined( _WIN32 )
421}
Encapsulates data about MPEG stream and emits events for each table.
virtual bool GetEITPIDChanges(const uint_vec_t &, uint_vec_t &, uint_vec_t &) const
virtual void RemoveListeningPID(uint pid)
virtual bool HasEITPIDChanges(const uint_vec_t &) const
virtual void AddListeningPID(uint pid, PIDPriority priority=kPIDPriorityNormal)
void start(QThread::Priority p=QThread::InheritPriority)
Tell MThread to start running the thread in the near future.
Definition: mthread.cpp:283
bool wait(std::chrono::milliseconds time=std::chrono::milliseconds::max())
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:300
void exit(int retcode=0)
Use this to exit from the thread if you are using a Qt event loop.
Definition: mthread.cpp:278
std::chrono::milliseconds elapsed(void)
Returns milliseconds elapsed since last start() or restart()
Definition: mythtimer.cpp:91
bool isRunning(void) const
Returns true if start() or restart() has been called at least once since construction and since any c...
Definition: mythtimer.cpp:135
QRecursiveMutex m_pidLock
bool AddPIDFilter(PIDInfo *info)
virtual bool AddNamedOutputFile(const QString &filename)
Called with _listener_lock locked just after adding new output file.
StreamDataList m_streamDataList
bool RemovePIDFilter(uint pid)
QString m_mptsBaseFile
MythTimer m_cycleTimer
void WriteMPTS(const unsigned char *buffer, uint len)
Write out a copy of the raw MPTS.
virtual void RemoveNamedOutputFile(const QString &filename)
Called with _listener_lock locked just before removing old output file.
void Start(void)
virtual void CycleFiltersByPriority()
Definition: streamhandler.h:92
virtual void SetRunningDesired(bool desired)
At minimum this sets _running_desired, this may also send signals to anything that might be blocking ...
void UpdateListeningForEIT(void)
QString m_device
PIDInfoMap m_pidInfo
ThreadedFileWriter * m_mptsTfw
volatile bool m_runningDesired
volatile bool m_bError
bool RemoveAllPIDFilters(void)
void SetRunning(bool running, bool using_buffering, bool using_section_reader)
QMutex m_startStopLock
bool m_usingSectionReader
bool UpdateFiltersFromStreamData(void)
QMutex m_addRmLock
bool IsRunning(void) const
std::vector< uint > m_eitPids
PIDPriority GetPIDPriority(uint pid) const
QSet< QString > m_mptsFiles
~StreamHandler() override
void Stop(void)
QWaitCondition m_runningStateChanged
virtual void RemoveListener(MPEGStreamData *data)
virtual void AddListener(MPEGStreamData *data, bool allow_section_reader=false, bool needs_buffering=false, const QString &output_file=QString())
bool m_allowSectionReader
virtual bool UpdateFilters(void)
Definition: streamhandler.h:91
virtual PIDInfo * CreatePIDInfo(uint pid, uint stream_type, int pes_type)
QRecursiveMutex m_listenerLock
@ PrivSec
ISO 13818-1 private tables & ITU H.222.0.
Definition: mpegtables.h:146
This class supports the writing of recordings to disk.
bool Open(void)
Opens the file we will be writing to.
int Write(const void *data, uint count)
Writes data to the end of the write buffer.
static pid_list_t::iterator find(const PIDInfoMap &map, pid_list_t &list, pid_list_t::iterator begin, pid_list_t::iterator end, bool find_open)
unsigned int uint
Definition: freesurround.h:24
static guint32 * tmp
Definition: goom_core.cpp:26
QMap< uint, PIDPriority > pid_map_t
PIDPriority
@ kPIDPriorityNone
#define ENO
This can be appended to the LOG args with "+".
Definition: mythlogging.h:74
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
Definition: mythlogging.h:39
dictionary info
Definition: azlyrics.py:7
#define LOC
#define O_LARGEFILE