MythTV  master
fifowriter.cpp
Go to the documentation of this file.
1 #include <cstdio>
2 #include <cstdlib>
3 #include <unistd.h>
4 #include <fcntl.h>
5 #include <cassert>
6 #include <cerrno>
7 #include <sys/time.h>
8 #include <sys/types.h>
9 #include <sys/stat.h>
10 #include <ctime>
11 #include <cmath>
12 
13 #include "fifowriter.h"
14 #include "compat.h"
15 #include "mythlogging.h"
16 
17 #include "mythconfig.h"
18 #if CONFIG_DARWIN
19  #include <sys/aio.h> // O_SYNC
20 #endif
21 
22 #include <iostream>
23 using namespace std;
24 
25 FIFOWriter::FIFOWriter(int count, bool sync) :
26  m_numFifos(count),
27  m_useSync(sync)
28 {
29  if (count <= 0)
30  return;
31 
32  m_fifoBuf = new fifo_buf *[count];
33  m_fbInptr = new fifo_buf *[count];
34  m_fbOutptr = new fifo_buf *[count];
35  m_fifoThrds = new FIFOThread[count];
36  m_fifoLock = new QMutex[count];
37  m_fullCond = new QWaitCondition[count];
38  m_emptyCond = new QWaitCondition[count];
39  m_filename = new QString [count];
40  m_fbDesc = new QString [count];
41  m_maxBlkSize = new long[count];
42  m_killWr = new int[count];
43  m_fbCount = new int[count];
44  m_fbMaxCount = new int[count];
45 }
46 
48 {
49  if (m_numFifos <= 0)
50  return;
51 
52  for (int i = 0; i < m_numFifos; i++)
53  {
54  QMutexLocker flock(&m_fifoLock[i]);
55  m_killWr[i] = 1;
56  m_emptyCond[i].wakeAll();
57  }
58 
59  for (int i = 0; i < m_numFifos; i++)
60  {
61  m_fifoThrds[i].wait();
62  }
63 
64  m_numFifos = 0;
65 
66  delete [] m_maxBlkSize;
67  delete [] m_fifoBuf;
68  delete [] m_fbInptr;
69  delete [] m_fbOutptr;
70  delete [] m_fifoThrds;
71  delete [] m_fullCond;
72  delete [] m_emptyCond;
73  delete [] m_fifoLock;
74  delete [] m_filename;
75  delete [] m_fbDesc;
76  delete [] m_killWr;
77  delete [] m_fbCount;
78  delete [] m_fbMaxCount;
79 }
80 
81 bool FIFOWriter::FIFOInit(int id, const QString& desc, const QString& name, long size,
82  int num_bufs)
83 {
84  if (id < 0 || id >= m_numFifos)
85  return false;
86 
87  QByteArray fname = name.toLatin1();
88  const char *aname = fname.constData();
89  if (mkfifo(aname, S_IREAD | S_IWRITE | S_IRGRP | S_IROTH) == -1)
90  {
91  LOG(VB_GENERAL, LOG_ERR, QString("Couldn't create fifo for file: '%1'")
92  .arg(name) + ENO);
93  return false;
94  }
95  LOG(VB_GENERAL, LOG_INFO, QString("Created %1 fifo: %2")
96  .arg(desc).arg(name));
97  m_maxBlkSize[id] = size;
98  m_filename[id] = name;
99  m_fbDesc[id] = desc;
100  m_killWr[id] = 0;
101  m_fbCount[id] = (m_useSync) ? 2 : num_bufs;
102  m_fbMaxCount[id] = 512;
103  m_fifoBuf[id] = new fifo_buf;
104  struct fifo_buf *fifoptr = m_fifoBuf[id];
105  for (int i = 0; i < m_fbCount[id]; i++)
106  {
107  fifoptr->data = new unsigned char[m_maxBlkSize[id]];
108  if (i == m_fbCount[id] - 1)
109  fifoptr->next = m_fifoBuf[id];
110  else
111  fifoptr->next = new struct fifo_buf;
112  fifoptr = fifoptr->next;
113  }
114  m_fbInptr[id] = m_fifoBuf[id];
115  m_fbOutptr[id] = m_fifoBuf[id];
116 
117  m_fifoThrds[id].SetParent(this);
118  m_fifoThrds[id].SetId(id);
119  m_fifoThrds[id].start();
120 
121  while (0 == m_killWr[id] && !m_fifoThrds[id].isRunning())
122  usleep(1000);
123 
124  return m_fifoThrds[id].isRunning();
125 }
126 
127 void FIFOThread::run(void)
128 {
129  RunProlog();
130  if (m_parent && m_id != -1)
132  RunEpilog();
133 }
134 
136 {
137  int fd = -1;
138 
139  QMutexLocker flock(&m_fifoLock[id]);
140  while (true)
141  {
142  if ((m_fbInptr[id] == m_fbOutptr[id]) && (0 == m_killWr[id]))
143  m_emptyCond[id].wait(flock.mutex());
144  flock.unlock();
145  if (m_killWr[id])
146  break;
147  if (fd < 0)
148  {
149  QByteArray fname = m_filename[id].toLatin1();
150  fd = open(fname.constData(), O_WRONLY| O_SYNC);
151  }
152  if (fd >= 0)
153  {
154  int written = 0;
155  while (written < m_fbOutptr[id]->blksize)
156  {
157  int ret = write(fd, m_fbOutptr[id]->data+written,
158  m_fbOutptr[id]->blksize-written);
159  if (ret < 0)
160  {
161  LOG(VB_GENERAL, LOG_ERR,
162  QString("FIFOW: write failed with %1")
163  .arg(strerror(errno)));
165  break;
166  }
167  written += ret;
168  }
169  }
170  flock.relock();
171  m_fbOutptr[id] = m_fbOutptr[id]->next;
172  m_fullCond[id].wakeAll();
173  }
174 
175  if (fd != -1)
176  close(fd);
177 
178  unlink(m_filename[id].toLocal8Bit().constData());
179 
180  while (m_fifoBuf[id]->next != m_fifoBuf[id])
181  {
182  struct fifo_buf *tmpfifo = m_fifoBuf[id]->next->next;
183  delete [] m_fifoBuf[id]->next->data;
184  delete m_fifoBuf[id]->next;
185  m_fifoBuf[id]->next = tmpfifo;
186  }
187  delete [] m_fifoBuf[id]->data;
188  delete m_fifoBuf[id];
189 }
190 
191 void FIFOWriter::FIFOWrite(int id, void *buffer, long blksize)
192 {
193  QMutexLocker flock(&m_fifoLock[id]);
194  while (m_fbInptr[id]->next == m_fbOutptr[id])
195  {
196  bool blocking = false;
197  if (!m_useSync)
198  {
199  for(int i = 0; i < m_numFifos; i++)
200  {
201  if (i == id)
202  continue;
203  if (m_fbInptr[i] == m_fbOutptr[i])
204  blocking = true;
205  }
206  }
207 
208  if (blocking && m_fbCount[id] < m_fbMaxCount[id])
209  {
210  struct fifo_buf *tmpfifo = m_fbInptr[id]->next;
211  m_fbInptr[id]->next = new struct fifo_buf;
212  m_fbInptr[id]->next->data = new unsigned char[m_maxBlkSize[id]];
213  m_fbInptr[id]->next->next = tmpfifo;
214  QString msg = QString("allocating additonal buffer for : %1(%2)")
215  .arg(m_fbDesc[id]).arg(++m_fbCount[id]);
216  LOG(VB_FILE, LOG_INFO, msg);
217  }
218  else
219  {
220  m_fullCond[id].wait(flock.mutex(), 1000);
221  }
222  }
223  if (blksize > m_maxBlkSize[id])
224  {
225  delete [] m_fbInptr[id]->data;
226  m_fbInptr[id]->data = new unsigned char[blksize];
227  }
228  memcpy(m_fbInptr[id]->data,buffer,blksize);
229  m_fbInptr[id]->blksize = blksize;
230  m_fbInptr[id] = m_fbInptr[id]->next;
231  m_emptyCond[id].wakeAll();
232 }
233 
235 {
236  int count = 0;
237  while (count < m_numFifos)
238  {
239  count = 0;
240  for (int i = 0; i < m_numFifos; i++)
241  {
242  QMutexLocker flock(&m_fifoLock[i]);
243  if (m_fbInptr[i] == m_fbOutptr[i])
244  {
245  m_killWr[i] = 1;
246  m_emptyCond[i].wakeAll();
247  count++;
248  }
249  }
250  usleep(1000);
251  }
252 }
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:215
#define mkfifo(path, mode)
Definition: compat.h:227
def write(text, progress=True)
Definition: mythburn.py:279
FIFOWriter * m_parent
Definition: fifowriter.h:26
struct fifo_buf * next
Definition: fifowriter.h:47
int * m_fbMaxCount
Definition: fifowriter.h:66
QString * m_filename
Definition: fifowriter.h:60
void run(void) override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.
Definition: fifowriter.cpp:127
QMutex * m_fifoLock
Definition: fifowriter.h:56
#define O_SYNC
Definition: compat.h:225
bool wait(unsigned long time=ULONG_MAX)
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:311
fifo_buf ** m_fifoBuf
Definition: fifowriter.h:51
long * m_maxBlkSize
Definition: fifowriter.h:63
fifo_buf ** m_fbOutptr
Definition: fifowriter.h:53
FIFOThread * m_fifoThrds
Definition: fifowriter.h:55
QWaitCondition * m_fullCond
Definition: fifowriter.h:57
fifo_buf ** m_fbInptr
Definition: fifowriter.h:52
static bool isRunning(const char *program)
Returns true if a program containing the specified string is running on this machine.
#define S_IROTH
Definition: compat.h:224
#define close
Definition: compat.h:16
QString * m_fbDesc
Definition: fifowriter.h:61
unsigned char * data
Definition: fifowriter.h:48
void FIFOWriteThread(int id)
Definition: fifowriter.cpp:135
~FIFOWriter(void)
Definition: fifowriter.cpp:47
bool isRunning(void) const
Definition: mthread.cpp:274
bool FIFOInit(int id, const QString &desc, const QString &name, long size, int num_bufs)
Definition: fifowriter.cpp:81
#define ENO
This can be appended to the LOG args with "+".
Definition: mythlogging.h:99
#define S_IRGRP
Definition: compat.h:223
#define LOG(_MASK_, _LEVEL_, _STRING_)
Definition: mythlogging.h:41
PictureAttribute next(PictureAttributeSupported Supported, PictureAttribute Attribute)
int * m_killWr
Definition: fifowriter.h:64
int * m_fbCount
Definition: fifowriter.h:65
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:202
void start(QThread::Priority p=QThread::InheritPriority)
Tell MThread to start running the thread in the near future.
Definition: mthread.cpp:294
void FIFODrain(void)
Definition: fifowriter.cpp:234
QWaitCondition * m_emptyCond
Definition: fifowriter.h:58
bool m_useSync
Definition: fifowriter.h:68
int m_numFifos
Definition: fifowriter.h:67
void FIFOWrite(int id, void *buf, long size)
Definition: fifowriter.cpp:191
void SetParent(FIFOWriter *parent)
Definition: fifowriter.h:23
FIFOWriter(int count, bool sync)
Definition: fifowriter.cpp:25
void SetId(int id)
Definition: fifowriter.h:22