MythTV master
httptsstreamhandler.cpp
Go to the documentation of this file.
1#include <chrono> // for milliseconds
2#include <thread> // for sleep_for
3
4// MythTV headers
6#include "libmythbase/mythversion.h"
8
9#define LOC QString("HTTPTSSH[%1](%2): ").arg(m_inputId).arg(m_device)
10
11// BUFFER_SIZE is a multiple of TS_SIZE
12static constexpr qint64 TS_SIZE { 188 };
13static constexpr qint64 BUFFER_SIZE { TS_SIZE * 512 };
14
15QMap<QString, HTTPTSStreamHandler*> HTTPTSStreamHandler::s_httphandlers;
18
20 int inputid)
21{
22 QMutexLocker locker(&s_httphandlers_lock);
23
24 QString devkey = tuning.GetDeviceKey();
25
26 QMap<QString,HTTPTSStreamHandler*>::iterator it = s_httphandlers.find(devkey);
27
28 if (it == s_httphandlers.end())
29 {
30 auto* newhandler = new HTTPTSStreamHandler(tuning, inputid);
31 newhandler->Start();
32 s_httphandlers[devkey] = newhandler;
33 s_httphandlers_refcnt[devkey] = 1;
34
35 LOG(VB_RECORD, LOG_INFO,
36 QString("HTTPTSSH[%1]: Creating new stream handler %2 for %3")
37 .arg(QString::number(inputid), devkey, tuning.GetDeviceName()));
38 }
39 else
40 {
41 s_httphandlers_refcnt[devkey]++;
42 uint rcount = s_httphandlers_refcnt[devkey];
43 LOG(VB_RECORD, LOG_INFO,
44 QString("HTTPTSSH[%1]: Using existing stream handler %2 for %3")
45 .arg(QString::number(inputid), devkey, tuning.GetDeviceName()) +
46 QString(" (%1 in use)").arg(rcount));
47 }
48
49 return s_httphandlers[devkey];
50}
51
53{
54 QMutexLocker locker(&s_httphandlers_lock);
55
56 QString devname = ref->m_device;
57
58 QMap<QString,uint>::iterator rit = s_httphandlers_refcnt.find(devname);
59 if (rit == s_httphandlers_refcnt.end())
60 return;
61
62 LOG(VB_RECORD, LOG_INFO, QString("HTTPTSSH[%1]: Return(%2) has %3 handlers")
63 .arg(inputid).arg(devname).arg(*rit));
64
65 if (*rit > 1)
66 {
67 ref = nullptr;
68 (*rit)--;
69 return;
70 }
71
72 QMap<QString,HTTPTSStreamHandler*>::iterator it = s_httphandlers.find(devname);
73 if ((it != s_httphandlers.end()) && (*it == ref))
74 {
75 LOG(VB_RECORD, LOG_INFO, QString("HTTPTSSH[%1]: Closing handler for %2")
76 .arg(inputid).arg(devname));
77 ref->Stop();
78 delete *it;
79 s_httphandlers.erase(it);
80 }
81 else
82 {
83 LOG(VB_GENERAL, LOG_ERR,
84 QString("HTTPTSSH[%1] Error: Couldn't find handler for %2")
85 .arg(inputid).arg(devname));
86 }
87
88 s_httphandlers_refcnt.erase(rit);
89 ref = nullptr;
90}
91
93 int inputid)
94 : IPTVStreamHandler(tuning, inputid)
95{
96 LOG(VB_GENERAL, LOG_INFO, LOC + "ctor");
97}
98
100{
101 LOG(VB_GENERAL, LOG_INFO, LOC + "dtor");
102 Stop();
103}
104
106{
107 RunProlog();
108 std::chrono::milliseconds open_sleep = 250ms;
109 LOG(VB_RECORD, LOG_INFO, LOC + "run() -- begin");
110 SetRunning(true, false, false);
111
112 m_reader = new HTTPReader(this);
113 while (m_runningDesired)
114 {
116 {
117 LOG(VB_RECORD, LOG_INFO, LOC + "DownloadStream failed to receive bytes from " + m_tuning.GetURL(0).toString());
118 std::this_thread::sleep_for(open_sleep);
119 if (open_sleep < 10s)
120 open_sleep += 250ms;
121 continue;
122 }
123 open_sleep = 250ms;
124 }
125
126 delete m_reader;
127 SetRunning(false, false, false);
128 RunEpilog();
129 LOG(VB_RECORD, LOG_INFO, LOC + "run() -- done");
130}
131
132
133#undef LOC
134#define LOC QString("HTTPReader(%1): ").arg(m_url)
135
136bool HTTPReader::DownloadStream(const QUrl& url)
137{
138 m_url = url.toString();
139
140 LOG(VB_RECORD, LOG_INFO, LOC + "DownloadStream -- begin");
141
142 QMutexLocker lock(&m_lock);
143 QEventLoop event_loop;
144
145 m_buffer = new uint8_t[BUFFER_SIZE];
146 m_size = 0;
147 m_ok = false;
148
149 // the HTTP request
150 m_replylock.lock();
151 QNetworkRequest m_req = QNetworkRequest(url);
152 m_req.setAttribute(QNetworkRequest::RedirectPolicyAttribute,
153 QNetworkRequest::NoLessSafeRedirectPolicy);
154 m_req.setHeader(QNetworkRequest::UserAgentHeader,
155 QString("MythTV/%1 %2/%3")
156 .arg(MYTH_VERSION_MAJMIN,
157 QSysInfo::productType(),
158 QSysInfo::productVersion()));
159 m_reply = m_mgr.get(m_req);
160 m_replylock.unlock();
161
162 connect(&m_timer, &QTimer::timeout, &event_loop, &QEventLoop::quit);
163 connect(m_reply, &QNetworkReply::finished, &event_loop, &QEventLoop::quit);
164 connect(m_reply,&QIODevice::readyRead, this, &HTTPReader::HttpRead);
165
166 // Configure timeout and size limit
167 m_timer.setSingleShot(true);
168 m_timer.start(10s);
169
170 event_loop.exec(); // blocks stack until quit() is called
171
172 disconnect(&m_timer, &QTimer::timeout, &event_loop, &QEventLoop::quit);
173 disconnect(m_reply, &QNetworkReply::finished, &event_loop, &QEventLoop::quit);
174 disconnect(m_reply,&QIODevice::readyRead, this, &HTTPReader::HttpRead);
175
176 if (m_timer.isActive())
177 m_timer.stop();
178
179 QMutexLocker replylock(&m_replylock);
180 if (m_reply->error() != QNetworkReply::NoError)
181 {
182 LOG(VB_RECORD, LOG_ERR, LOC + "DownloadStream exited with error " +
183 QString("%1 '%2'").arg(m_reply->error()).arg(m_reply->errorString()));
184
185 // Download is not OK when there is a network error
186 m_ok = false;
187 }
188
189 delete m_reply;
190 m_reply = nullptr;
191 delete[] m_buffer;
192 m_buffer=nullptr;
193
194 LOG(VB_RECORD, LOG_INFO, LOC + "DownloadStream -- end");
195 return m_ok;
196}
197
199{
200 m_timer.stop();
201 m_ok = true;
202 ReadBytes();
203 WriteBytes();
204}
205
207{
208 QMutexLocker replylock(&m_replylock);
209 QMutexLocker bufferlock(&m_bufferlock);
210
211 if(m_reply == nullptr || m_buffer == nullptr || m_size > BUFFER_SIZE)
212 return;
213
214 qint64 bytesRead = m_reply->read( reinterpret_cast<char*>(m_buffer + m_size), BUFFER_SIZE - m_size);
215 m_size += (bytesRead > 0 ? bytesRead : 0);
216 LOG(VB_RECORD, LOG_DEBUG, LOC + QString("ReadBytes: %1 bytes received").arg(bytesRead));
217}
218
220{
221 if (m_size < TS_SIZE)
222 return;
223
224 QMutexLocker bufferlock(&m_bufferlock);
225 int remainder = 0;
226 {
227 QMutexLocker locker(&m_parent->m_listenerLock);
228 for (auto sit = m_parent->m_streamDataList.cbegin();
229 sit != m_parent->m_streamDataList.cend(); ++sit)
230 {
231 remainder = sit.key()->ProcessData(m_buffer, m_size);
232 }
233 }
234 LOG(VB_RECORD, LOG_DEBUG, LOC + QString("WriteBytes: %1/%2 bytes remain").arg(remainder).arg(m_size));
235
236 /* move the remaining data to the beginning of the buffer */
237 memmove(m_buffer, m_buffer + (m_size - remainder), remainder);
238 m_size = remainder;
239}
240
242{
243 QMutexLocker replylock(&m_replylock);
244 if (m_reply)
245 {
246 LOG(VB_RECORD, LOG_INFO, LOC + "Cancel: Aborting stream download");
247 m_reply->abort();
248 }
249}
QNetworkAccessManager m_mgr
bool DownloadStream(const QUrl &url)
QNetworkReply * m_reply
uint8_t * m_buffer
HTTPTSStreamHandler * m_parent
static QMap< QString, uint > s_httphandlers_refcnt
void run(void) override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.
static HTTPTSStreamHandler * Get(const IPTVTuningData &tuning, int inputid)
static QMap< QString, HTTPTSStreamHandler * > s_httphandlers
static QMutex s_httphandlers_lock
~HTTPTSStreamHandler(void) override
static void Return(HTTPTSStreamHandler *&ref, int inputid)
HTTPTSStreamHandler(const IPTVTuningData &tuning, int inputid)
IPTVTuningData m_tuning
QString GetDeviceKey(void) const
QString GetDeviceName(void) const
QUrl GetURL(uint i) const
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:196
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:209
StreamDataList m_streamDataList
QString m_device
volatile bool m_runningDesired
void SetRunning(bool running, bool using_buffering, bool using_section_reader)
void Stop(void)
QRecursiveMutex m_listenerLock
unsigned int uint
Definition: freesurround.h:24
#define LOC
static constexpr qint64 TS_SIZE
static constexpr qint64 BUFFER_SIZE
@ quit
Definition: lirc_client.h:30
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
Definition: mythlogging.h:39