MythTV master
mythfifowriter.cpp
Go to the documentation of this file.
1#include <QtGlobal>
2
3// MythTV
6
7#ifdef Q_OS_DARWIN
8#include <sys/aio.h>
9#endif
10#include "io/mythfifowriter.h"
11
12// Std
13#include <cstdio>
14#include <cstdlib>
15#include <unistd.h>
16#include <fcntl.h>
17#include <cerrno>
18#include <sys/time.h>
19#include <sys/types.h>
20#include <sys/stat.h>
21#include <cmath>
22#include <iostream>
23#include <thread>
24
26 : MThread("FIFOThread")
27{
28}
29
31{
32 wait();
33 m_parent = nullptr;
34 m_id = -1;
35}
36
38{
39 m_id = Id;
40}
41
43{
44 m_parent = Parent;
45}
46
48{
49 RunProlog();
50 if (m_parent && m_id != -1)
52 RunEpilog();
53}
54
56 : m_numFifos(Count),
57 m_useSync(Sync)
58{
59 if (Count < 1)
60 return;
61
62 m_fifoBuf = new MythFifoBuffer*[Count];
63 m_fbInptr = new MythFifoBuffer*[Count];
64 m_fbOutptr = new MythFifoBuffer*[Count];
65 m_fifoThrds = new MythFIFOThread[Count];
66 m_fifoLock = new QMutex[Count];
67 m_fullCond = new QWaitCondition[Count];
68 m_emptyCond = new QWaitCondition[Count];
69 m_filename = new QString [Count];
70 m_fbDesc = new QString [Count];
71 m_maxBlkSize = new long[Count];
72 m_killWr = new int[Count];
73 m_fbCount = new int[Count];
74 m_fbMaxCount = new int[Count];
75}
76
78{
79 if (m_numFifos < 1)
80 return;
81
82 for (uint i = 0; i < m_numFifos; i++)
83 {
84 QMutexLocker flock(&m_fifoLock[i]);
85 m_killWr[i] = 1;
86 m_emptyCond[i].wakeAll();
87 }
88
89 for (uint i = 0; i < m_numFifos; i++)
90 m_fifoThrds[i].wait();
91
92 m_numFifos = 0;
93
94 delete [] m_maxBlkSize;
95 delete [] m_fifoBuf;
96 delete [] m_fbInptr;
97 delete [] m_fbOutptr;
98 delete [] m_fifoThrds;
99 delete [] m_fullCond;
100 delete [] m_emptyCond;
101 delete [] m_fifoLock;
102 delete [] m_filename;
103 delete [] m_fbDesc;
104 delete [] m_killWr;
105 delete [] m_fbCount;
106 delete [] m_fbMaxCount;
107}
108
109bool MythFIFOWriter::FIFOInit(uint Id, const QString& Desc, const QString& Name,
110 long Size, int NumBufs)
111{
112 if (Id >= m_numFifos)
113 return false;
114
115 QByteArray fname = Name.toLatin1();
116 const char *aname = fname.constData();
117 if (mkfifo(aname, S_IREAD | S_IWRITE | S_IRGRP | S_IROTH) == -1)
118 {
119 LOG(VB_GENERAL, LOG_ERR, QString("Couldn't create fifo for file: '%1'").arg(Name) + ENO);
120 return false;
121 }
122
123 LOG(VB_GENERAL, LOG_INFO, QString("Created %1 fifo: %2").arg(Desc, Name));
124
125 m_maxBlkSize[Id] = Size;
126 m_filename[Id] = Name;
127 m_fbDesc[Id] = Desc;
128 m_killWr[Id] = 0;
129 m_fbCount[Id] = (m_useSync) ? 2 : NumBufs;
130 m_fbMaxCount[Id] = 512;
131 m_fifoBuf[Id] = new MythFifoBuffer;
132 struct MythFifoBuffer *fifoptr = m_fifoBuf[Id];
133 for (int i = 0; i < m_fbCount[Id]; i++)
134 {
135 fifoptr->m_data = new unsigned char[static_cast<unsigned long>(m_maxBlkSize[Id])];
136 if (i == m_fbCount[Id] - 1)
137 {
138 fifoptr->m_next = m_fifoBuf[Id];
139 }
140 else
141 {
142 fifoptr->m_next = new struct MythFifoBuffer;
143 }
144 fifoptr = fifoptr->m_next;
145 }
146 m_fbInptr[Id] = m_fifoBuf[Id];
147 m_fbOutptr[Id] = m_fifoBuf[Id];
148
149 m_fifoThrds[Id].SetParent(this);
150 m_fifoThrds[Id].SetId(static_cast<int>(Id));
151 m_fifoThrds[Id].start();
152
153 while (0 == m_killWr[Id] && !m_fifoThrds[Id].isRunning())
154 std::this_thread::sleep_for(1ms);
155
156 return m_fifoThrds[Id].isRunning();
157}
158
160{
161 int fd = -1;
162
163 QMutexLocker flock(&m_fifoLock[Id]);
164 while (true)
165 {
166 if ((m_fbInptr[Id] == m_fbOutptr[Id]) && (0 == m_killWr[Id]))
167 m_emptyCond[Id].wait(flock.mutex());
168 flock.unlock();
169 if (m_killWr[Id])
170 break;
171 if (fd < 0)
172 {
173 QByteArray fname = m_filename[Id].toLatin1();
174 fd = open(fname.constData(), O_WRONLY| O_SYNC);
175 }
176 if (fd >= 0)
177 {
178 int written = 0;
179 while (written < m_fbOutptr[Id]->m_blockSize)
180 {
181 int ret = static_cast<int>(write(fd, m_fbOutptr[Id]->m_data + written,
182 static_cast<size_t>(m_fbOutptr[Id]->m_blockSize-written)));
183 if (ret < 0)
184 {
185 LOG(VB_GENERAL, LOG_ERR, QString("FIFOW: write failed with %1")
186 .arg(strerror(errno)));
188 break;
189 }
190 written += ret;
191 }
192 }
193 flock.relock();
194 m_fbOutptr[Id] = m_fbOutptr[Id]->m_next;
195 m_fullCond[Id].wakeAll();
196 }
197
198 if (fd != -1)
199 close(fd);
200
201 unlink(m_filename[Id].toLocal8Bit().constData());
202
203 while (m_fifoBuf[Id]->m_next != m_fifoBuf[Id])
204 {
205 struct MythFifoBuffer *tmpfifo = m_fifoBuf[Id]->m_next->m_next;
206 delete [] m_fifoBuf[Id]->m_next->m_data;
207 delete m_fifoBuf[Id]->m_next;
208 m_fifoBuf[Id]->m_next = tmpfifo;
209 }
210 delete [] m_fifoBuf[Id]->m_data;
211 delete m_fifoBuf[Id];
212}
213
214void MythFIFOWriter::FIFOWrite(uint Id, void *Buffer, long Size)
215{
216 QMutexLocker flock(&m_fifoLock[Id]);
217 while (m_fbInptr[Id]->m_next == m_fbOutptr[Id])
218 {
219 bool blocking = false;
220 if (!m_useSync)
221 {
222 for (uint i = 0; i < m_numFifos; i++)
223 {
224 if (i == Id)
225 continue;
226 if (m_fbInptr[i] == m_fbOutptr[i])
227 blocking = true;
228 }
229 }
230
231 if (blocking && m_fbCount[Id] < m_fbMaxCount[Id])
232 {
233 struct MythFifoBuffer *tmpfifo = m_fbInptr[Id]->m_next;
234 m_fbInptr[Id]->m_next = new struct MythFifoBuffer;
235 m_fbInptr[Id]->m_next->m_data = new unsigned char[static_cast<unsigned long>(m_maxBlkSize[Id])];
236 m_fbInptr[Id]->m_next->m_next = tmpfifo;
237 QString msg = QString("allocating additonal buffer for : %1(%2)")
238 .arg(m_fbDesc[Id]).arg(++m_fbCount[Id]);
239 LOG(VB_FILE, LOG_INFO, msg);
240 }
241 else
242 {
243 m_fullCond[Id].wait(flock.mutex(), 1000);
244 }
245 }
246
247 if (Size > m_maxBlkSize[Id])
248 {
249 delete [] m_fbInptr[Id]->m_data;
250 m_fbInptr[Id]->m_data = new unsigned char[static_cast<unsigned long>(Size)];
251 }
252
253 memcpy(m_fbInptr[Id]->m_data,Buffer, static_cast<size_t>(Size));
254 m_fbInptr[Id]->m_blockSize = Size;
255 m_fbInptr[Id] = m_fbInptr[Id]->m_next;
256 m_emptyCond[Id].wakeAll();
257}
258
260{
261 uint count = 0;
262 while (count < m_numFifos)
263 {
264 count = 0;
265 for (uint i = 0; i < m_numFifos; i++)
266 {
267 QMutexLocker flock(&m_fifoLock[i]);
268 if (m_fbInptr[i] == m_fbOutptr[i])
269 {
270 m_killWr[i] = 1;
271 m_emptyCond[i].wakeAll();
272 count++;
273 }
274 }
275 std::this_thread::sleep_for(1ms);
276 }
277}
This is a wrapper around QThread that does several additional things.
Definition: mthread.h:49
bool isRunning(void) const
Definition: mthread.cpp:261
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:194
void start(QThread::Priority p=QThread::InheritPriority)
Tell MThread to start running the thread in the near future.
Definition: mthread.cpp:281
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:207
bool wait(std::chrono::milliseconds time=std::chrono::milliseconds::max())
Wait for the MThread to exit, with a maximum timeout.
Definition: mthread.cpp:298
MythFIFOWriter * m_parent
void SetParent(MythFIFOWriter *Parent)
~MythFIFOThread() override
void run(void) override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.
void SetId(int Id)
void FIFODrain(void)
QWaitCondition * m_fullCond
MythFifoBuffer ** m_fbOutptr
MythFifoBuffer ** m_fifoBuf
QString * m_filename
void FIFOWriteThread(int Id)
MythFifoBuffer ** m_fbInptr
QMutex * m_fifoLock
QString * m_fbDesc
MythFIFOThread * m_fifoThrds
bool FIFOInit(uint Id, const QString &Desc, const QString &Name, long Size, int NumBufs)
void FIFOWrite(uint Id, void *Buffer, long Size)
MythFIFOWriter(uint Count, bool Sync)
QWaitCondition * m_emptyCond
#define mkfifo(path, mode)
Definition: compat.h:92
unsigned int uint
Definition: compat.h:60
#define close
Definition: compat.h:28
#define O_SYNC
Definition: compat.h:88
#define ENO
This can be appended to the LOG args with "+".
Definition: mythlogging.h:74
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
Definition: mythlogging.h:39
static bool isRunning(const char *program)
Returns true if a program containing the specified string is running on this machine.
def write(text, progress=True)
Definition: mythburn.py:306
struct MythFifoBuffer * m_next