MythTV  master
httptsstreamhandler.cpp
Go to the documentation of this file.
1 // MythTV headers
2 #include "httptsstreamhandler.h"
3 #include "mythlogging.h"
4 
5 #include <chrono> // for milliseconds
6 #include <thread> // for sleep_for
7 
8 #define LOC QString("HTTPTSSH[%1](%2): ").arg(m_inputid).arg(m_device)
9 
10 // BUFFER_SIZE is a multiple of TS_SIZE
11 #define TS_SIZE 188
12 #define BUFFER_SIZE (512 * TS_SIZE)
13 
14 QMap<QString, HTTPTSStreamHandler*> HTTPTSStreamHandler::s_httphandlers;
17 
19  int inputid)
20 {
21  QMutexLocker locker(&s_httphandlers_lock);
22 
23  QString devkey = tuning.GetDeviceKey();
24 
25  QMap<QString,HTTPTSStreamHandler*>::iterator it = s_httphandlers.find(devkey);
26 
27  if (it == s_httphandlers.end())
28  {
29  HTTPTSStreamHandler* newhandler = new HTTPTSStreamHandler(tuning, inputid);
30  newhandler->Start();
31  s_httphandlers[devkey] = newhandler;
32  s_httphandlers_refcnt[devkey] = 1;
33 
34  LOG(VB_RECORD, LOG_INFO,
35  QString("HTTPTSSH[%1]: Creating new stream handler %2 for %3")
36  .arg(inputid).arg(devkey).arg(tuning.GetDeviceName()));
37  }
38  else
39  {
40  s_httphandlers_refcnt[devkey]++;
41  uint rcount = s_httphandlers_refcnt[devkey];
42  LOG(VB_RECORD, LOG_INFO,
43  QString("HTTPTSSH[%1]: Using existing stream handler %2 for %3")
44  .arg(inputid).arg(devkey).arg(tuning.GetDeviceName()) +
45  QString(" (%1 in use)").arg(rcount));
46  }
47 
48  return s_httphandlers[devkey];
49 }
50 
52 {
53  QMutexLocker locker(&s_httphandlers_lock);
54 
55  QString devname = ref->m_device;
56 
57  QMap<QString,uint>::iterator rit = s_httphandlers_refcnt.find(devname);
58  if (rit == s_httphandlers_refcnt.end())
59  return;
60 
61  LOG(VB_RECORD, LOG_INFO, QString("HTTPTSSH[%1]: Return(%2) has %3 handlers")
62  .arg(inputid).arg(devname).arg(*rit));
63 
64  if (*rit > 1)
65  {
66  ref = nullptr;
67  (*rit)--;
68  return;
69  }
70 
71  QMap<QString,HTTPTSStreamHandler*>::iterator it = s_httphandlers.find(devname);
72  if ((it != s_httphandlers.end()) && (*it == ref))
73  {
74  LOG(VB_RECORD, LOG_INFO, QString("HTTPTSSH[%1]: Closing handler for %1")
75  .arg(inputid).arg(devname));
76  ref->Stop();
77  delete *it;
78  s_httphandlers.erase(it);
79  }
80  else
81  {
82  LOG(VB_GENERAL, LOG_ERR,
83  QString("HTTPTSSH[%1] Error: Couldn't find handler for %2")
84  .arg(inputid).arg(devname));
85  }
86 
87  s_httphandlers_refcnt.erase(rit);
88  ref = nullptr;
89 }
90 
92  int inputid)
93  : IPTVStreamHandler(tuning, inputid)
94 {
95  LOG(VB_GENERAL, LOG_INFO, LOC + "ctor");
96 }
97 
99 {
100  LOG(VB_GENERAL, LOG_INFO, LOC + "dtor");
101  Stop();
102 }
103 
105 {
106  RunProlog();
107  int open_sleep = 250;
108  LOG(VB_RECORD, LOG_INFO, LOC + "run() -- begin");
109  SetRunning(true, false, false);
110 
111  m_reader = new HTTPReader(this);
112  while (m_running_desired)
113  {
115  {
116  LOG(VB_RECORD, LOG_INFO, LOC + "DownloadStream failed to receive bytes from " + m_tuning.GetURL(0).toString());
117  std::this_thread::sleep_for(std::chrono::milliseconds(open_sleep));
118  if (open_sleep < 10000)
119  open_sleep += 250;
120  continue;
121  }
122  open_sleep = 250;
123  }
124 
125  delete m_reader;
126  SetRunning(false, false, false);
127  RunEpilog();
128  LOG(VB_RECORD, LOG_INFO, LOC + "run() -- done");
129 }
130 
131 
132 #undef LOC
133 #define LOC QString("HTTPReader(%1): ").arg(m_url)
134 
135 bool HTTPReader::DownloadStream(const QUrl& url)
136 {
137  m_url = url.toString();
138 
139  LOG(VB_RECORD, LOG_INFO, LOC + "DownloadStream -- begin");
140 
141  QMutexLocker lock(&m_lock);
142  QEventLoop event_loop;
143 
144  m_buffer = new uint8_t[BUFFER_SIZE];
145  m_size = 0;
146  m_ok = false;
147 
148  // the HTTP request
149  m_replylock.lock();
150  m_reply = m_mgr.get(QNetworkRequest(url));
151  m_replylock.unlock();
152 
153  connect(&m_timer, SIGNAL(timeout()), &event_loop, SLOT(quit()));
154  connect(m_reply, SIGNAL(finished()), &event_loop, SLOT(quit()));
155  connect(m_reply,SIGNAL(readyRead()), this, SLOT(HttpRead()));
156 
157  // Configure timeout and size limit
158  m_timer.setSingleShot(true);
159  m_timer.start(10000);
160 
161  event_loop.exec(); // blocks stack until quit() is called
162 
163  disconnect(&m_timer, SIGNAL(timeout()), &event_loop, SLOT(quit()));
164  disconnect(m_reply, SIGNAL(finished()), &event_loop, SLOT(quit()));
165  disconnect(m_reply,SIGNAL(readyRead()), this, SLOT(HttpRead()));
166 
167  if (m_timer.isActive())
168  m_timer.stop();
169 
170  QMutexLocker replylock(&m_replylock);
171  if (m_reply->error() != QNetworkReply::NoError)
172  {
173  LOG(VB_RECORD, LOG_ERR, LOC + "DownloadStream exited with " + m_reply->errorString());
174  }
175 
176  delete m_reply;
177  m_reply = nullptr;
178  delete[] m_buffer;
179  m_buffer=nullptr;
180 
181  LOG(VB_RECORD, LOG_INFO, LOC + "DownloadStream -- end");
182  return m_ok;
183 }
184 
186 {
187  m_timer.stop();
188  m_ok = true;
189  ReadBytes();
190  WriteBytes();
191 }
192 
194 {
195  QMutexLocker replylock(&m_replylock);
196  QMutexLocker bufferlock(&m_bufferlock);
197 
198  if(m_reply == nullptr || m_buffer == nullptr || m_size > BUFFER_SIZE)
199  return;
200 
201  qint64 bytesRead = m_reply->read( reinterpret_cast<char*>(m_buffer + m_size), BUFFER_SIZE - m_size);
202  m_size += (bytesRead > 0 ? bytesRead : 0);
203  LOG(VB_RECORD, LOG_DEBUG, LOC + QString("ReadBytes: %1 bytes received").arg(bytesRead));
204 }
205 
207 {
208  if (m_size < TS_SIZE)
209  return;
210 
211  QMutexLocker bufferlock(&m_bufferlock);
212  int remainder = 0;
213  {
214  QMutexLocker locker(&m_parent->m_listener_lock);
215  HTTPTSStreamHandler::StreamDataList::const_iterator sit;
216  sit = m_parent->m_stream_data_list.begin();
217  for (; sit != m_parent->m_stream_data_list.end(); ++sit)
218  {
219  remainder = sit.key()->ProcessData(m_buffer, m_size);
220  }
221  }
222  LOG(VB_RECORD, LOG_DEBUG, LOC + QString("WriteBytes: %1/%2 bytes remain").arg(remainder).arg(m_size));
223 
224  /* move the remaining data to the beginning of the buffer */
225  memmove(m_buffer, m_buffer + (m_size - remainder), remainder);
226  m_size = remainder;
227 }
228 
230 {
231  QMutexLocker replylock(&m_replylock);
232  if (m_reply)
233  {
234  LOG(VB_RECORD, LOG_INFO, LOC + "Cancel: Aborting stream download");
235  m_reply->abort();
236  }
237 }
void RunEpilog(void)
Cleans up a thread's resources, call this if you reimplement run().
Definition: mthread.cpp:215
virtual ~HTTPTSStreamHandler(void)
HTTPTSStreamHandler(const IPTVTuningData &tuning, int inputid)
QUrl GetURL(uint i) const
IPTVTuningData m_tuning
static QMap< QString, HTTPTSStreamHandler * > s_httphandlers
volatile bool m_running_desired
bool DownloadStream(const QUrl &url)
QString m_device
unsigned int uint
Definition: compat.h:140
QMutex m_listener_lock
HTTPTSStreamHandler * m_parent
static QMutex s_httphandlers_lock
void run(void) override
Runs the Qt event loop unless we have a QRunnable, in which case we run the runnable run instead.
static QMap< QString, uint > s_httphandlers_refcnt
void SetRunning(bool running, bool using_buffering, bool using_section_reader)
void Stop(void)
void Start(void)
static void Return(HTTPTSStreamHandler *&ref, int inputid)
static HTTPTSStreamHandler * Get(const IPTVTuningData &tuning, int inputid)
#define BUFFER_SIZE
QString GetDeviceName(void) const
QString GetDeviceKey(void) const
uint8_t * m_buffer
#define LOG(_MASK_, _LEVEL_, _STRING_)
Definition: mythlogging.h:41
StreamDataList m_stream_data_list
QNetworkAccessManager m_mgr
#define TS_SIZE
void RunProlog(void)
Sets up a thread, call this if you reimplement run().
Definition: mthread.cpp:202
#define LOC
QNetworkReply * m_reply