MythTV master
asistreamhandler.cpp
Go to the documentation of this file.
1// -*- Mode: c++ -*-
2
3// POSIX headers
4#include <fcntl.h>
5#include <unistd.h>
6#ifndef _WIN32
7#include <sys/select.h>
8#include <sys/ioctl.h>
9#endif
10
11// Qt headers
12#include <QString>
13#include <QFile>
14
15// MythTV headers
17
18#include "asichannel.h"
19#include "asistreamhandler.h"
20#include "cardutil.h"
21#include "dtvsignalmonitor.h"
22#include "mpeg/mpegstreamdata.h"
24
25// DVEO ASI headers
26#include <dveo/asi.h>
27#include <dveo/master.h>
28
29#define LOC QString("ASISH[%1](%2): ").arg(m_inputId).arg(m_device)
30
31QMap<QString,ASIStreamHandler*> ASIStreamHandler::s_handlers;
32QMap<QString,uint> ASIStreamHandler::s_handlersRefCnt;
34
36 int inputid)
37{
38 QMutexLocker locker(&s_handlersLock);
39
40 const QString& devkey = devname;
41
42 QMap<QString,ASIStreamHandler*>::iterator it = s_handlers.find(devkey);
43
44 if (it == s_handlers.end())
45 {
46 auto *newhandler = new ASIStreamHandler(devname, inputid);
47 newhandler->Open();
48 s_handlers[devkey] = newhandler;
49 s_handlersRefCnt[devkey] = 1;
50
51 LOG(VB_RECORD, LOG_INFO,
52 QString("ASISH[%1]: Creating new stream handler %2 for %3")
53 .arg(QString::number(inputid), devkey, devname));
54 }
55 else
56 {
57 s_handlersRefCnt[devkey]++;
58 uint rcount = s_handlersRefCnt[devkey];
59 LOG(VB_RECORD, LOG_INFO,
60 QString("ASISH[%1]: Using existing stream handler %2 for %3")
61 .arg(QString::number(inputid), devkey, devname) +
62 QString(" (%1 in use)").arg(rcount));
63 }
64
65 return s_handlers[devkey];
66}
67
69{
70 QMutexLocker locker(&s_handlersLock);
71
72 QString devname = ref->m_device;
73
74 QMap<QString,uint>::iterator rit = s_handlersRefCnt.find(devname);
75 if (rit == s_handlersRefCnt.end())
76 return;
77
78 QMap<QString,ASIStreamHandler*>::iterator it = s_handlers.find(devname);
79
80 if (*rit > 1)
81 {
82 ref = nullptr;
83 (*rit)--;
84 return;
85 }
86
87 if ((it != s_handlers.end()) && (*it == ref))
88 {
89 LOG(VB_RECORD, LOG_INFO, QString("ASISH[%1]: Closing handler for %2")
90 .arg(inputid).arg(devname));
91 ref->Close();
92 delete *it;
93 s_handlers.erase(it);
94 }
95 else
96 {
97 LOG(VB_GENERAL, LOG_ERR,
98 QString("ASISH[%1] Error: Couldn't find handler for %2")
99 .arg(inputid).arg(devname));
100 }
101
102 s_handlersRefCnt.erase(rit);
103 ref = nullptr;
104}
105
106ASIStreamHandler::ASIStreamHandler(const QString &device, int inputid)
107 : StreamHandler(device, inputid)
108{
109 setObjectName("ASISH");
110}
111
113{
114 m_clockSource = cs;
115 // TODO we should make it possible to set this immediately
116 // not wait for the next open
117}
118
120{
121 m_rxMode = m;
122 // TODO we should make it possible to set this immediately
123 // not wait for the next open
124}
125
127{
128 if (m_drb && m_runningDesired && !desired)
129 m_drb->Stop();
131}
132
134{
135 RunProlog();
136
137 LOG(VB_RECORD, LOG_INFO, LOC + "run(): begin");
138
139 if (!Open())
140 {
141 LOG(VB_GENERAL, LOG_ERR, LOC + QString("Failed to open device %1 : %2")
142 .arg(m_device, strerror(errno)));
143 m_bError = true;
144 return;
145 }
146
147 auto *drb = new DeviceReadBuffer(this, true, false);
148 bool ok = drb->Setup(m_device, m_fd, m_packetSize, m_bufSize,
149 m_numBuffers / 4);
150 if (!ok)
151 {
152 LOG(VB_GENERAL, LOG_ERR, LOC + "Failed to allocate DRB buffer");
153 delete drb;
154 drb = nullptr;
155 Close();
156 m_bError = true;
157 RunEpilog();
158 return;
159 }
160
161 uint buffer_size = m_packetSize * 15000;
162 auto *buffer = new unsigned char[buffer_size];
163 if (!buffer)
164 {
165 LOG(VB_GENERAL, LOG_ERR, LOC + "Failed to allocate buffer");
166 delete drb;
167 drb = nullptr;
168 Close();
169 m_bError = true;
170 RunEpilog();
171 return;
172 }
173 memset(buffer, 0, buffer_size);
174
175 SetRunning(true, true, false);
176
177 drb->Start();
178
179 {
180 QMutexLocker locker(&m_startStopLock);
181 m_drb = drb;
182 }
183
184 int remainder = 0;
185 while (m_runningDesired && !m_bError)
186 {
188
189 ssize_t len = 0;
190
191 len = drb->Read(
192 &(buffer[remainder]), buffer_size - remainder);
193
194 if (!m_runningDesired)
195 break;
196
197 // Check for DRB errors
198 if (drb->IsErrored())
199 {
200 LOG(VB_GENERAL, LOG_ERR, LOC + "Device error detected");
201 m_bError = true;
202 }
203
204 if (drb->IsEOF())
205 {
206 LOG(VB_GENERAL, LOG_ERR, LOC + "Device EOF detected");
207 m_bError = true;
208 }
209
210 len += remainder;
211
212 if (len < 10) // 10 bytes = 4 bytes TS header + 6 bytes PES header
213 {
214 remainder = len;
215 continue;
216 }
217
218 if (!m_listenerLock.tryLock())
219 {
220 remainder = len;
221 continue;
222 }
223
224 if (m_streamDataList.empty())
225 {
226 m_listenerLock.unlock();
227 continue;
228 }
229
230 for (auto sit = m_streamDataList.cbegin();
231 sit != m_streamDataList.cend(); ++sit)
232 remainder = sit.key()->ProcessData(buffer, len);
233
234 WriteMPTS(buffer, len - remainder);
235
236 m_listenerLock.unlock();
237
238 if (remainder > 0 && (len > remainder)) // leftover bytes
239 memmove(buffer, &(buffer[len - remainder]), remainder);
240 }
241 LOG(VB_RECORD, LOG_INFO, LOC + "run(): " + "shutdown");
242
244
245 {
246 QMutexLocker locker(&m_startStopLock);
247 m_drb = nullptr;
248 }
249
250 if (drb->IsRunning())
251 drb->Stop();
252
253 delete drb;
254 delete[] buffer;
255 Close();
256
257 LOG(VB_RECORD, LOG_INFO, LOC + "run(): " + "end");
258
259 SetRunning(false, true, false);
260 RunEpilog();
261}
262
264{
265 if (m_fd >= 0)
266 return true;
267
268 QString error;
270 if (m_deviceNum < 0)
271 {
272 LOG(VB_GENERAL, LOG_ERR, LOC + error);
273 return false;
274 }
275
277 if (m_bufSize <= 0)
278 {
279 LOG(VB_GENERAL, LOG_ERR, LOC + error);
280 return false;
281 }
282
284 if (m_numBuffers <= 0)
285 {
286 LOG(VB_GENERAL, LOG_ERR, LOC + error);
287 return false;
288 }
289
291 {
292 LOG(VB_GENERAL, LOG_ERR, LOC + "Failed to set RX Mode: " + error);
293 return false;
294 }
295
296 // actually open the device
297 m_fd = open(m_device.toLocal8Bit().constData(), O_RDONLY, 0);
298 if (m_fd < 0)
299 {
300 LOG(VB_GENERAL, LOG_ERR, LOC +
301 QString("Failed to open '%1'").arg(m_device) + ENO);
302 return false;
303 }
304
305 // get the rx capabilities
306 unsigned int cap = 0;
307 if (ioctl(m_fd, ASI_IOC_RXGETCAP, &cap) < 0)
308 {
309 LOG(VB_GENERAL, LOG_ERR, LOC +
310 QString("Failed to query capabilities '%1'").arg(m_device) + ENO);
311 Close();
312 return false;
313 }
314 // TODO? do stuff with capabilities..
315
316 // we need to handle 188 & 204 byte packets..
317 switch (m_rxMode)
318 {
319 case kASIRXRawMode:
322 break;
323 case kASIRXSyncOn204:
325 break;
326 case kASIRXSyncOn188:
330 break;
331 }
332
333 // pid counter?
334
335 return m_fd >= 0;
336}
337
339{
340 if (m_fd >= 0)
341 {
342 close(m_fd);
343 m_fd = -1;
344 }
345}
346
348{
349 int val = 0;
350 if(ioctl(fd, ASI_IOC_RXGETEVENTS, &val) < 0)
351 {
352 LOG(VB_GENERAL, LOG_ERR, LOC + QString("Failed to open device %1: ")
353 .arg(m_device) + ENO);
354 //TODO: Handle error
355 return;
356 }
357 if(val & ASI_EVENT_RX_BUFFER)
358 {
359 LOG(VB_RECORD, LOG_ERR, LOC +
360 QString("Driver receive buffer queue overrun detected %1")
361 .arg(m_device));
362 }
363 if(val & ASI_EVENT_RX_FIFO)
364 {
365 LOG(VB_RECORD, LOG_ERR, LOC +
366 QString("Driver receive FIFO overrun detected %1")
367 .arg(m_device));
368 }
369 if(val & ASI_EVENT_RX_CARRIER)
370 {
371 LOG(VB_RECORD, LOG_NOTICE, LOC +
372 QString("Carrier Status change detected %1")
373 .arg(m_device));
374 }
375 if(val & ASI_EVENT_RX_LOS)
376 {
377 LOG(VB_RECORD, LOG_ERR, LOC +
378 QString("Loss of Packet Sync detected %1")
379 .arg(m_device));
380 }
381 if(val & ASI_EVENT_RX_AOS)
382 {
383 LOG(VB_RECORD, LOG_NOTICE, LOC +
384 QString("Acquisition of Sync detected %1")
385 .arg(m_device));
386 }
387 if(val & ASI_EVENT_RX_DATA)
388 {
389 LOG(VB_RECORD, LOG_NOTICE, LOC +
390 QString("Receive data status change detected %1")
391 .arg(m_device));
392 }
393}
#define LOC
ASIRXMode
@ kASIRXSyncOnActualSize
@ kASIRXSyncOn188
@ kASIRXSyncOn204
@ kASIRXSyncOnActualConvertTo188
@ kASIRXSyncOn204ConvertTo188
@ kASIRXRawMode
ASIClockSource
static ASIStreamHandler * Get(const QString &devname, int inputid)
static QMutex s_handlersLock
static QMap< QString, ASIStreamHandler * > s_handlers
DeviceReadBuffer * m_drb
ASIClockSource m_clockSource
void SetRunningDesired(bool desired) override
At minimum this sets _running_desired, this may also send signals to anything that might be blocking ...
void run(void) override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.
ASIStreamHandler(const QString &device, int inputid)
void SetRXMode(ASIRXMode m)
void PriorityEvent(int fd) override
void SetClockSource(ASIClockSource cs)
static void Return(ASIStreamHandler *&ref, int inputid)
static QMap< QString, uint > s_handlersRefCnt
static int GetASIDeviceNumber(const QString &device, QString *error=nullptr)
Definition: cardutil.cpp:3254
static uint GetASINumBuffers(uint device_num, QString *error=nullptr)
Definition: cardutil.cpp:3333
static uint GetASIBufferSize(uint device_num, QString *error=nullptr)
Definition: cardutil.cpp:3309
static bool SetASIMode(uint device_num, uint mode, QString *error=nullptr)
Definition: cardutil.cpp:3380
Buffers reads from device files.
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:196
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:209
void setObjectName(const QString &name)
Definition: mthread.cpp:238
StreamDataList m_streamDataList
void WriteMPTS(const unsigned char *buffer, uint len)
Write out a copy of the raw MPTS.
virtual void SetRunningDesired(bool desired)
At minimum this sets _running_desired, this may also send signals to anything that might be blocking ...
QString m_device
volatile bool m_runningDesired
volatile bool m_bError
bool RemoveAllPIDFilters(void)
void SetRunning(bool running, bool using_buffering, bool using_section_reader)
QMutex m_startStopLock
bool UpdateFiltersFromStreamData(void)
QRecursiveMutex m_listenerLock
static constexpr unsigned int kDVBEmissionSize
Definition: tspacket.h:263
static constexpr unsigned int kSize
Definition: tspacket.h:261
unsigned int uint
Definition: compat.h:68
#define close
Definition: compat.h:30
#define ENO
This can be appended to the LOG args with "+".
Definition: mythlogging.h:74
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
Definition: mythlogging.h:39
def error(message)
Definition: smolt.py:409