MythTV  master
mythfifowriter.cpp
Go to the documentation of this file.
1 #include <QtGlobal>
2 
3 // MythTV
4 #include "libmythbase/compat.h"
6 
7 #ifdef Q_OS_DARWIN
8 #include <sys/aio.h>
9 #endif
10 #include "io/mythfifowriter.h"
11 
12 // Std
13 #include <cstdio>
14 #include <cstdlib>
15 #include <unistd.h>
16 #include <fcntl.h>
17 #include <cerrno>
18 #include <sys/time.h>
19 #include <sys/types.h>
20 #include <sys/stat.h>
21 #include <ctime>
22 #include <cmath>
23 #include <iostream>
24 
26  : MThread("FIFOThread")
27 {
28 }
29 
31 {
32  wait();
33  m_parent = nullptr;
34  m_id = -1;
35 }
36 
38 {
39  m_id = Id;
40 }
41 
43 {
44  m_parent = Parent;
45 }
46 
48 {
49  RunProlog();
50  if (m_parent && m_id != -1)
52  RunEpilog();
53 }
54 
56  : m_numFifos(Count),
57  m_useSync(Sync)
58 {
59  if (Count < 1)
60  return;
61 
62  m_fifoBuf = new MythFifoBuffer*[Count];
63  m_fbInptr = new MythFifoBuffer*[Count];
64  m_fbOutptr = new MythFifoBuffer*[Count];
65  m_fifoThrds = new MythFIFOThread[Count];
66  m_fifoLock = new QMutex[Count];
67  m_fullCond = new QWaitCondition[Count];
68  m_emptyCond = new QWaitCondition[Count];
69  m_filename = new QString [Count];
70  m_fbDesc = new QString [Count];
71  m_maxBlkSize = new long[Count];
72  m_killWr = new int[Count];
73  m_fbCount = new int[Count];
74  m_fbMaxCount = new int[Count];
75 }
76 
78 {
79  if (m_numFifos < 1)
80  return;
81 
82  for (uint i = 0; i < m_numFifos; i++)
83  {
84  QMutexLocker flock(&m_fifoLock[i]);
85  m_killWr[i] = 1;
86  m_emptyCond[i].wakeAll();
87  }
88 
89  for (uint i = 0; i < m_numFifos; i++)
90  m_fifoThrds[i].wait();
91 
92  m_numFifos = 0;
93 
94  delete [] m_maxBlkSize;
95  delete [] m_fifoBuf;
96  delete [] m_fbInptr;
97  delete [] m_fbOutptr;
98  delete [] m_fifoThrds;
99  delete [] m_fullCond;
100  delete [] m_emptyCond;
101  delete [] m_fifoLock;
102  delete [] m_filename;
103  delete [] m_fbDesc;
104  delete [] m_killWr;
105  delete [] m_fbCount;
106  delete [] m_fbMaxCount;
107 }
108 
109 bool MythFIFOWriter::FIFOInit(uint Id, const QString& Desc, const QString& Name,
110  long Size, int NumBufs)
111 {
112  if (Id >= m_numFifos)
113  return false;
114 
115  QByteArray fname = Name.toLatin1();
116  const char *aname = fname.constData();
117  if (mkfifo(aname, S_IREAD | S_IWRITE | S_IRGRP | S_IROTH) == -1)
118  {
119  LOG(VB_GENERAL, LOG_ERR, QString("Couldn't create fifo for file: '%1'").arg(Name) + ENO);
120  return false;
121  }
122 
123  LOG(VB_GENERAL, LOG_INFO, QString("Created %1 fifo: %2").arg(Desc, Name));
124 
125  m_maxBlkSize[Id] = Size;
126  m_filename[Id] = Name;
127  m_fbDesc[Id] = Desc;
128  m_killWr[Id] = 0;
129  m_fbCount[Id] = (m_useSync) ? 2 : NumBufs;
130  m_fbMaxCount[Id] = 512;
131  m_fifoBuf[Id] = new MythFifoBuffer;
132  struct MythFifoBuffer *fifoptr = m_fifoBuf[Id];
133  for (int i = 0; i < m_fbCount[Id]; i++)
134  {
135  fifoptr->m_data = new unsigned char[static_cast<unsigned long>(m_maxBlkSize[Id])];
136  if (i == m_fbCount[Id] - 1)
137  {
138  fifoptr->m_next = m_fifoBuf[Id];
139  }
140  else
141  {
142  fifoptr->m_next = new struct MythFifoBuffer;
143  }
144  fifoptr = fifoptr->m_next;
145  }
146  m_fbInptr[Id] = m_fifoBuf[Id];
147  m_fbOutptr[Id] = m_fifoBuf[Id];
148 
149  m_fifoThrds[Id].SetParent(this);
150  m_fifoThrds[Id].SetId(static_cast<int>(Id));
151  m_fifoThrds[Id].start();
152 
153  while (0 == m_killWr[Id] && !m_fifoThrds[Id].isRunning())
154  usleep(1000);
155 
156  return m_fifoThrds[Id].isRunning();
157 }
158 
160 {
161  int fd = -1;
162 
163  QMutexLocker flock(&m_fifoLock[Id]);
164  while (true)
165  {
166  if ((m_fbInptr[Id] == m_fbOutptr[Id]) && (0 == m_killWr[Id]))
167  m_emptyCond[Id].wait(flock.mutex());
168  flock.unlock();
169  if (m_killWr[Id])
170  break;
171  if (fd < 0)
172  {
173  QByteArray fname = m_filename[Id].toLatin1();
174  fd = open(fname.constData(), O_WRONLY| O_SYNC);
175  }
176  if (fd >= 0)
177  {
178  int written = 0;
179  while (written < m_fbOutptr[Id]->m_blockSize)
180  {
181  int ret = static_cast<int>(write(fd, m_fbOutptr[Id]->m_data + written,
182  static_cast<size_t>(m_fbOutptr[Id]->m_blockSize-written)));
183  if (ret < 0)
184  {
185  LOG(VB_GENERAL, LOG_ERR, QString("FIFOW: write failed with %1")
186  .arg(strerror(errno)));
188  break;
189  }
190  written += ret;
191  }
192  }
193  flock.relock();
194  m_fbOutptr[Id] = m_fbOutptr[Id]->m_next;
195  m_fullCond[Id].wakeAll();
196  }
197 
198  if (fd != -1)
199  close(fd);
200 
201  unlink(m_filename[Id].toLocal8Bit().constData());
202 
203  while (m_fifoBuf[Id]->m_next != m_fifoBuf[Id])
204  {
205  struct MythFifoBuffer *tmpfifo = m_fifoBuf[Id]->m_next->m_next;
206  delete [] m_fifoBuf[Id]->m_next->m_data;
207  delete m_fifoBuf[Id]->m_next;
208  m_fifoBuf[Id]->m_next = tmpfifo;
209  }
210  delete [] m_fifoBuf[Id]->m_data;
211  delete m_fifoBuf[Id];
212 }
213 
214 void MythFIFOWriter::FIFOWrite(uint Id, void *Buffer, long Size)
215 {
216  QMutexLocker flock(&m_fifoLock[Id]);
217  while (m_fbInptr[Id]->m_next == m_fbOutptr[Id])
218  {
219  bool blocking = false;
220  if (!m_useSync)
221  {
222  for (uint i = 0; i < m_numFifos; i++)
223  {
224  if (i == Id)
225  continue;
226  if (m_fbInptr[i] == m_fbOutptr[i])
227  blocking = true;
228  }
229  }
230 
231  if (blocking && m_fbCount[Id] < m_fbMaxCount[Id])
232  {
233  struct MythFifoBuffer *tmpfifo = m_fbInptr[Id]->m_next;
234  m_fbInptr[Id]->m_next = new struct MythFifoBuffer;
235  m_fbInptr[Id]->m_next->m_data = new unsigned char[static_cast<unsigned long>(m_maxBlkSize[Id])];
236  m_fbInptr[Id]->m_next->m_next = tmpfifo;
237  QString msg = QString("allocating additonal buffer for : %1(%2)")
238  .arg(m_fbDesc[Id]).arg(++m_fbCount[Id]);
239  LOG(VB_FILE, LOG_INFO, msg);
240  }
241  else
242  {
243  m_fullCond[Id].wait(flock.mutex(), 1000);
244  }
245  }
246 
247  if (Size > m_maxBlkSize[Id])
248  {
249  delete [] m_fbInptr[Id]->m_data;
250  m_fbInptr[Id]->m_data = new unsigned char[static_cast<unsigned long>(Size)];
251  }
252 
253  memcpy(m_fbInptr[Id]->m_data,Buffer, static_cast<size_t>(Size));
254  m_fbInptr[Id]->m_blockSize = Size;
255  m_fbInptr[Id] = m_fbInptr[Id]->m_next;
256  m_emptyCond[Id].wakeAll();
257 }
258 
260 {
261  uint count = 0;
262  while (count < m_numFifos)
263  {
264  count = 0;
265  for (uint i = 0; i < m_numFifos; i++)
266  {
267  QMutexLocker flock(&m_fifoLock[i]);
268  if (m_fbInptr[i] == m_fbOutptr[i])
269  {
270  m_killWr[i] = 1;
271  m_emptyCond[i].wakeAll();
272  count++;
273  }
274  }
275  usleep(1000);
276  }
277 }
O_SYNC
#define O_SYNC
Definition: compat.h:144
MThread::start
void start(QThread::Priority p=QThread::InheritPriority)
Tell MThread to start running the thread in the near future.
Definition: mthread.cpp:283
ENO
#define ENO
This can be appended to the LOG args with "+".
Definition: mythlogging.h:73
MythFIFOWriter::m_fbOutptr
MythFifoBuffer ** m_fbOutptr
Definition: mythfifowriter.h:56
MythFIFOWriter::m_maxBlkSize
long * m_maxBlkSize
Definition: mythfifowriter.h:66
MythFIFOWriter::m_emptyCond
QWaitCondition * m_emptyCond
Definition: mythfifowriter.h:61
MythFIFOWriter::m_fbDesc
QString * m_fbDesc
Definition: mythfifowriter.h:64
MThread::wait
bool wait(std::chrono::milliseconds time=std::chrono::milliseconds::max())
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:300
MythFIFOThread::~MythFIFOThread
~MythFIFOThread() override
Definition: mythfifowriter.cpp:30
mythburn.write
def write(text, progress=True)
Definition: mythburn.py:308
MythFIFOThread::MythFIFOThread
MythFIFOThread()
Definition: mythfifowriter.cpp:25
MythFIFOWriter::~MythFIFOWriter
~MythFIFOWriter(void)
Definition: mythfifowriter.cpp:77
MythFIFOWriter::m_fullCond
QWaitCondition * m_fullCond
Definition: mythfifowriter.h:60
MythFIFOThread
Definition: mythfifowriter.h:15
LOG
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
Definition: mythlogging.h:39
MThread::RunProlog
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:196
MythFIFOWriter::m_useSync
bool m_useSync
Definition: mythfifowriter.h:71
MythFIFOWriter
Definition: mythfifowriter.h:31
MythFIFOThread::m_id
int m_id
Definition: mythfifowriter.h:28
MythFIFOWriter::m_fbInptr
MythFifoBuffer ** m_fbInptr
Definition: mythfifowriter.h:55
MythFIFOWriter::FIFODrain
void FIFODrain(void)
Definition: mythfifowriter.cpp:259
MythFIFOThread::SetParent
void SetParent(MythFIFOWriter *Parent)
Definition: mythfifowriter.cpp:42
close
#define close
Definition: compat.h:43
MythFIFOWriter::MythFifoBuffer
Definition: mythfifowriter.h:47
MythFIFOWriter::MythFifoBuffer::m_blockSize
long m_blockSize
Definition: mythfifowriter.h:51
mythlogging.h
compat.h
MythFIFOThread::run
void run(void) override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.
Definition: mythfifowriter.cpp:47
MythFIFOWriter::FIFOInit
bool FIFOInit(uint Id, const QString &Desc, const QString &Name, long Size, int NumBufs)
Definition: mythfifowriter.cpp:109
isRunning
static bool isRunning(const char *program)
Returns true if a program containing the specified string is running on this machine.
Definition: mythshutdown.cpp:207
mythfifowriter.h
MThread::RunEpilog
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:209
uint
unsigned int uint
Definition: compat.h:81
MythFIFOWriter::MythFifoBuffer::m_next
struct MythFifoBuffer * m_next
Definition: mythfifowriter.h:49
S_IRGRP
#define S_IRGRP
Definition: replex.cpp:55
MythFIFOWriter::FIFOWrite
void FIFOWrite(uint Id, void *Buffer, long Size)
Definition: mythfifowriter.cpp:214
MythFIFOWriter::m_filename
QString * m_filename
Definition: mythfifowriter.h:63
MythFIFOWriter::m_fifoLock
QMutex * m_fifoLock
Definition: mythfifowriter.h:59
MythFIFOWriter::m_fbCount
int * m_fbCount
Definition: mythfifowriter.h:68
MythFIFOWriter::MythFifoBuffer::m_data
unsigned char * m_data
Definition: mythfifowriter.h:50
MythFIFOWriter::m_fbMaxCount
int * m_fbMaxCount
Definition: mythfifowriter.h:69
Buffer
Definition: MythExternControl.h:36
Name
Definition: channelsettings.cpp:71
MythFIFOWriter::FIFOWriteThread
void FIFOWriteThread(int Id)
Definition: mythfifowriter.cpp:159
MythFIFOWriter::m_numFifos
uint m_numFifos
Definition: mythfifowriter.h:70
MThread
This is a wrapper around QThread that does several additional things.
Definition: mthread.h:48
MThread::isRunning
bool isRunning(void) const
Definition: mthread.cpp:263
MythFIFOWriter::m_killWr
int * m_killWr
Definition: mythfifowriter.h:67
MythFIFOThread::m_parent
MythFIFOWriter * m_parent
Definition: mythfifowriter.h:27
MythFIFOWriter::MythFIFOWriter
MythFIFOWriter(uint Count, bool Sync)
Definition: mythfifowriter.cpp:55
MythFIFOWriter::m_fifoThrds
MythFIFOThread * m_fifoThrds
Definition: mythfifowriter.h:58
MythFIFOThread::SetId
void SetId(int Id)
Definition: mythfifowriter.cpp:37
MythFIFOWriter::m_fifoBuf
MythFifoBuffer ** m_fifoBuf
Definition: mythfifowriter.h:54
mkfifo
#define mkfifo(path, mode)
Definition: compat.h:148
S_IROTH
#define S_IROTH
Definition: replex.cpp:57