MythTV  master
fifowriter.cpp
Go to the documentation of this file.
1 #include <cstdio>
2 #include <cstdlib>
3 #include <unistd.h>
4 #include <fcntl.h>
5 #include <cassert>
6 #include <cerrno>
7 #include <sys/time.h>
8 #include <sys/types.h>
9 #include <sys/stat.h>
10 #include <ctime>
11 #include <cmath>
12 
13 #include "fifowriter.h"
14 #include "compat.h"
15 #include "mythlogging.h"
16 
17 #include "mythconfig.h"
18 #if CONFIG_DARWIN
19  #include <sys/aio.h> // O_SYNC
20 #endif
21 
22 #include <iostream>
23 using namespace std;
24 
25 FIFOWriter::FIFOWriter(int count, bool sync) :
26  m_num_fifos(count),
27  m_usesync(sync)
28 {
29  if (count <= 0)
30  return;
31 
32  m_fifo_buf = new struct fifo_buf *[count];
33  m_fb_inptr = new struct fifo_buf *[count];
34  m_fb_outptr = new struct fifo_buf *[count];
35  m_fifothrds = new FIFOThread[count];
36  m_fifo_lock = new QMutex[count];
37  m_full_cond = new QWaitCondition[count];
38  m_empty_cond = new QWaitCondition[count];
39  m_filename = new QString [count];
40  m_fbdesc = new QString [count];
41  m_maxblksize = new long[count];
42  m_killwr = new int[count];
43  m_fbcount = new int[count];
44  m_fbmaxcount = new int[count];
45 }
46 
48 {
49  if (m_num_fifos <= 0)
50  return;
51 
52  for (int i = 0; i < m_num_fifos; i++)
53  {
54  QMutexLocker flock(&m_fifo_lock[i]);
55  m_killwr[i] = 1;
56  m_empty_cond[i].wakeAll();
57  }
58 
59  for (int i = 0; i < m_num_fifos; i++)
60  {
61  m_fifothrds[i].wait();
62  }
63 
64  m_num_fifos = 0;
65 
66  delete [] m_maxblksize;
67  delete [] m_fifo_buf;
68  delete [] m_fb_inptr;
69  delete [] m_fb_outptr;
70  delete [] m_fifothrds;
71  delete [] m_full_cond;
72  delete [] m_empty_cond;
73  delete [] m_fifo_lock;
74  delete [] m_filename;
75  delete [] m_fbdesc;
76  delete [] m_killwr;
77  delete [] m_fbcount;
78  delete [] m_fbmaxcount;
79 }
80 
81 bool FIFOWriter::FIFOInit(int id, const QString& desc, const QString& name, long size,
82  int num_bufs)
83 {
84  if (id < 0 || id >= m_num_fifos)
85  return false;
86 
87  QByteArray fname = name.toLatin1();
88  const char *aname = fname.constData();
89  if (mkfifo(aname, S_IREAD | S_IWRITE | S_IRGRP | S_IROTH) == -1)
90  {
91  LOG(VB_GENERAL, LOG_ERR, QString("Couldn't create fifo for file: '%1'")
92  .arg(name) + ENO);
93  return false;
94  }
95  LOG(VB_GENERAL, LOG_INFO, QString("Created %1 fifo: %2")
96  .arg(desc).arg(name));
97  m_maxblksize[id] = size;
98  m_filename[id] = name;
99  m_fbdesc[id] = desc;
100  m_killwr[id] = 0;
101  m_fbcount[id] = (m_usesync) ? 2 : num_bufs;
102  m_fbmaxcount[id] = 512;
103  m_fifo_buf[id] = new struct fifo_buf;
104  struct fifo_buf *fifoptr = m_fifo_buf[id];
105  for (int i = 0; i < m_fbcount[id]; i++)
106  {
107  fifoptr->data = new unsigned char[m_maxblksize[id]];
108  if (i == m_fbcount[id] - 1)
109  fifoptr->next = m_fifo_buf[id];
110  else
111  fifoptr->next = new struct fifo_buf;
112  fifoptr = fifoptr->next;
113  }
114  m_fb_inptr[id] = m_fifo_buf[id];
115  m_fb_outptr[id] = m_fifo_buf[id];
116 
117  m_fifothrds[id].SetParent(this);
118  m_fifothrds[id].SetId(id);
119  m_fifothrds[id].start();
120 
121  while (0 == m_killwr[id] && !m_fifothrds[id].isRunning())
122  usleep(1000);
123 
124  return m_fifothrds[id].isRunning();
125 }
126 
127 void FIFOThread::run(void)
128 {
129  RunProlog();
130  if (m_parent && m_id != -1)
132  RunEpilog();
133 }
134 
136 {
137  int fd = -1;
138 
139  QMutexLocker flock(&m_fifo_lock[id]);
140  while (true)
141  {
142  if ((m_fb_inptr[id] == m_fb_outptr[id]) && (0 == m_killwr[id]))
143  m_empty_cond[id].wait(flock.mutex());
144  flock.unlock();
145  if (m_killwr[id])
146  break;
147  if (fd < 0)
148  {
149  QByteArray fname = m_filename[id].toLatin1();
150  fd = open(fname.constData(), O_WRONLY| O_SYNC);
151  }
152  if (fd >= 0)
153  {
154  int written = 0;
155  while (written < m_fb_outptr[id]->blksize)
156  {
157  int ret = write(fd, m_fb_outptr[id]->data+written,
158  m_fb_outptr[id]->blksize-written);
159  if (ret < 0)
160  {
161  LOG(VB_GENERAL, LOG_ERR,
162  QString("FIFOW: write failed with %1")
163  .arg(strerror(errno)));
165  break;
166  }
167  written += ret;
168  }
169  }
170  flock.relock();
171  m_fb_outptr[id] = m_fb_outptr[id]->next;
172  m_full_cond[id].wakeAll();
173  }
174 
175  if (fd != -1)
176  close(fd);
177 
178  unlink(m_filename[id].toLocal8Bit().constData());
179 
180  while (m_fifo_buf[id]->next != m_fifo_buf[id])
181  {
182  struct fifo_buf *tmpfifo = m_fifo_buf[id]->next->next;
183  delete [] m_fifo_buf[id]->next->data;
184  delete m_fifo_buf[id]->next;
185  m_fifo_buf[id]->next = tmpfifo;
186  }
187  delete [] m_fifo_buf[id]->data;
188  delete m_fifo_buf[id];
189 }
190 
191 void FIFOWriter::FIFOWrite(int id, void *buffer, long blksize)
192 {
193  QMutexLocker flock(&m_fifo_lock[id]);
194  while (m_fb_inptr[id]->next == m_fb_outptr[id])
195  {
196  bool blocking = false;
197  if (!m_usesync)
198  {
199  for(int i = 0; i < m_num_fifos; i++)
200  {
201  if (i == id)
202  continue;
203  if (m_fb_inptr[i] == m_fb_outptr[i])
204  blocking = true;
205  }
206  }
207 
208  if (blocking && m_fbcount[id] < m_fbmaxcount[id])
209  {
210  struct fifo_buf *tmpfifo;
211  tmpfifo = m_fb_inptr[id]->next;
212  m_fb_inptr[id]->next = new struct fifo_buf;
213  m_fb_inptr[id]->next->data = new unsigned char[m_maxblksize[id]];
214  m_fb_inptr[id]->next->next = tmpfifo;
215  QString msg = QString("allocating additonal buffer for : %1(%2)")
216  .arg(m_fbdesc[id]).arg(++m_fbcount[id]);
217  LOG(VB_FILE, LOG_INFO, msg);
218  }
219  else
220  {
221  m_full_cond[id].wait(flock.mutex(), 1000);
222  }
223  }
224  if (blksize > m_maxblksize[id])
225  {
226  delete [] m_fb_inptr[id]->data;
227  m_fb_inptr[id]->data = new unsigned char[blksize];
228  }
229  memcpy(m_fb_inptr[id]->data,buffer,blksize);
230  m_fb_inptr[id]->blksize = blksize;
231  m_fb_inptr[id] = m_fb_inptr[id]->next;
232  m_empty_cond[id].wakeAll();
233 }
234 
236 {
237  int count = 0;
238  while (count < m_num_fifos)
239  {
240  count = 0;
241  for (int i = 0; i < m_num_fifos; i++)
242  {
243  QMutexLocker flock(&m_fifo_lock[i]);
244  if (m_fb_inptr[i] == m_fb_outptr[i])
245  {
246  m_killwr[i] = 1;
247  m_empty_cond[i].wakeAll();
248  count++;
249  }
250  }
251  usleep(1000);
252  }
253 }
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:215
#define mkfifo(path, mode)
Definition: compat.h:227
void start(QThread::Priority=QThread::InheritPriority)
Tell MThread to start running the thread in the near future.
Definition: mthread.cpp:294
def write(text, progress=True)
Definition: mythburn.py:279
FIFOWriter * m_parent
Definition: fifowriter.h:26
struct fifo_buf * next
Definition: fifowriter.h:47
long * m_maxblksize
Definition: fifowriter.h:63
QString * m_filename
Definition: fifowriter.h:60
void run(void) override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.
Definition: fifowriter.cpp:127
#define O_SYNC
Definition: compat.h:225
bool wait(unsigned long time=ULONG_MAX)
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:311
FIFOThread * m_fifothrds
Definition: fifowriter.h:55
QString * m_fbdesc
Definition: fifowriter.h:61
fifo_buf_t ** m_fifo_buf
Definition: fifowriter.h:51
QWaitCondition * m_full_cond
Definition: fifowriter.h:57
int m_num_fifos
Definition: fifowriter.h:67
static bool isRunning(const char *program)
Returns true if a program containing the specified string is running on this machine.
#define S_IROTH
Definition: compat.h:224
#define close
Definition: compat.h:16
int * m_fbcount
Definition: fifowriter.h:65
unsigned char * data
Definition: fifowriter.h:48
void FIFOWriteThread(int id)
Definition: fifowriter.cpp:135
~FIFOWriter(void)
Definition: fifowriter.cpp:47
QMutex * m_fifo_lock
Definition: fifowriter.h:56
fifo_buf_t ** m_fb_inptr
Definition: fifowriter.h:52
bool isRunning(void) const
Definition: mthread.cpp:274
int * m_fbmaxcount
Definition: fifowriter.h:66
bool FIFOInit(int id, const QString &desc, const QString &name, long size, int num_bufs)
Definition: fifowriter.cpp:81
fifo_buf_t ** m_fb_outptr
Definition: fifowriter.h:53
const char * name
Definition: ParseText.cpp:328
int * m_killwr
Definition: fifowriter.h:64
#define ENO
This can be appended to the LOG args with "+".
Definition: mythlogging.h:99
#define S_IRGRP
Definition: compat.h:223
PictureAttribute next(PictureAttributeSupported supported, PictureAttribute attribute)
#define LOG(_MASK_, _LEVEL_, _STRING_)
Definition: mythlogging.h:41
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:202
void FIFODrain(void)
Definition: fifowriter.cpp:235
void FIFOWrite(int id, void *buf, long size)
Definition: fifowriter.cpp:191
void SetParent(FIFOWriter *parent)
Definition: fifowriter.h:23
FIFOWriter(int count, bool sync)
Definition: fifowriter.cpp:25
bool m_usesync
Definition: fifowriter.h:68
QWaitCondition * m_empty_cond
Definition: fifowriter.h:58
void SetId(int id)
Definition: fifowriter.h:22