MythTV  master
bufferedsocketdevice.cpp
Go to the documentation of this file.
1 // Program Name: bufferedsocketdevice.cpp
3 // Created : Oct. 1, 2005
4 //
5 // Purpose :
6 //
7 // Copyright (c) 2005 David Blain <dblain@mythtv.org>
8 //
9 // Licensed under the GPL v2 or later, see COPYING for details
10 //
12 
13 #include <algorithm>
14 #include <chrono> // for milliseconds
15 #include <thread> // for sleep_for
16 #include <utility>
17 
18 #include "mythtimer.h"
19 #include "bufferedsocketdevice.h"
20 #include "upnputil.h"
21 #include "mythlogging.h"
22 
24 //
26 
28 {
29  m_pSocket = new MSocketDevice();
30 
31  m_pSocket->setSocket ( nSocket, MSocketDevice::Stream );
32  m_pSocket->setBlocking ( false );
33  m_pSocket->setAddressReusable( true );
34 
35  struct linger ling = {1, 1};
36 
37  if ( setsockopt(socket(), SOL_SOCKET, SO_LINGER, (const char *)&ling,
38  sizeof(ling)) < 0)
39  LOG(VB_GENERAL, LOG_ERR,
40  "BufferedSocketDevice: setsockopt - SO_LINGER: " + ENO);
41 }
42 
44 //
46 
47 BufferedSocketDevice::BufferedSocketDevice( MSocketDevice *pSocket /* = nullptr*/,
48  bool bTakeOwnership /* = false */ )
49 {
50  m_pSocket = pSocket;
51 
52  m_nDestPort = 0;
53 
55  m_nWriteSize = 0;
56  m_nWriteIndex = 0;
57  m_bHandleSocketDelete= bTakeOwnership;
58 
59 }
60 
62 //
64 
66 {
67  Close();
68 }
69 
71 //
73 
75 {
76  Flush();
77  ReadBytes();
78 
79  m_bufRead.clear();
81 
82  if (m_pSocket != nullptr)
83  {
84  if (m_pSocket->isValid())
85  m_pSocket->close();
86 
88  delete m_pSocket;
89 
90  m_pSocket = nullptr;
91  }
92 
93 }
94 
96 //
98 
99 bool BufferedSocketDevice::Connect( const QHostAddress &addr, quint16 port )
100 {
101  if (m_pSocket == nullptr)
102  return false;
103 
104  return m_pSocket->connect( addr, port );
105 }
106 
108 //
110 
112 {
113  return( m_pSocket );
114 }
115 
117 //
119 
120 void BufferedSocketDevice::SetSocketDevice( MSocketDevice *pSocket )
121 {
122  if ((m_bHandleSocketDelete) && (m_pSocket != nullptr))
123  delete m_pSocket;
124 
125  m_bHandleSocketDelete = false;
126 
127  m_pSocket = pSocket;
128 }
129 
131 //
133 
135  QHostAddress hostAddress, quint16 nPort)
136 {
137  m_destHostAddress = std::move(hostAddress);
138  m_nDestPort = nPort;
139 }
140 
142 //
144 
145 void BufferedSocketDevice::SetReadBufferSize( qulonglong bufSize )
146 {
147  m_nMaxReadBufferSize = bufSize;
148 }
149 
151 //
153 
155 {
156  return m_nMaxReadBufferSize;
157 }
158 
160 //
162 
164 {
165  if (m_pSocket == nullptr)
166  return m_bufRead.size();
167 
168  qlonglong maxToRead = 0;
169 
170  if ( m_nMaxReadBufferSize > 0 )
171  {
172  maxToRead = m_nMaxReadBufferSize - m_bufRead.size();
173 
174  if ( maxToRead <= 0 )
175  return m_bufRead.size();
176  }
177 
178  qlonglong nbytes = m_pSocket->bytesAvailable();
179 
180  QByteArray *a = nullptr;
181 
182  if ( nbytes > 0 )
183  {
184  a = new QByteArray();
185  a->resize(nbytes);
186 
187  qlonglong nread = m_pSocket->readBlock(
188  a->data(), maxToRead ? std::min(nbytes, maxToRead) : nbytes);
189 
190  if (( nread > 0 ) && ( nread != a->size() ))
191  {
192  // unexpected
193  a->resize( nread );
194  }
195  }
196 
197  if (a)
198  {
199 #if 0
200  QString msg;
201  for( long n = 0; n < a->count(); n++ )
202  msg += QString("%1").arg(a->at(n));
203  LOG(VB_GENERAL, LOG_DEBUG, msg);
204 #endif
205 
206  m_bufRead.append( a );
207  }
208 
209  return m_bufRead.size();
210 }
211 
213 //
215 
216 bool BufferedSocketDevice::ConsumeWriteBuf( qulonglong nbytes )
217 {
218  if ( !nbytes || ((qlonglong)nbytes > m_nWriteSize) )
219  return false;
220 
221  m_nWriteSize -= nbytes;
222 
223  for ( ;; )
224  {
225  QByteArray *a = m_bufWrite.front();
226 
227  if ( m_nWriteIndex + nbytes >= (qulonglong)a->size() )
228  {
229  nbytes -= a->size() - m_nWriteIndex;
230  m_bufWrite.pop_front();
231  delete a;
232 
233  m_nWriteIndex = 0;
234 
235  if ( nbytes == 0 )
236  break;
237  }
238  else
239  {
240  m_nWriteIndex += nbytes;
241  break;
242  }
243  }
244 
245  return true;
246 }
247 
249 //
251 
253 {
254 
255  if ((m_pSocket == nullptr) || !m_pSocket->isValid())
256  return;
257 
258  bool osBufferFull = false;
259  //int consumed = 0;
260 
261  while ( !osBufferFull && ( m_nWriteSize > 0 ) && m_pSocket->isValid())
262  {
263  auto it = m_bufWrite.begin();
264  QByteArray *a = *it;
265 
266  int nwritten = 0;
267  int i = 0;
268 
269  if ( a->size() - m_nWriteIndex < 1460 )
270  {
271  QByteArray out;
272  out.resize(65536);
273 
274  int j = m_nWriteIndex;
275  int s = a->size() - j;
276 
277  while ( a && i+s < out.size() )
278  {
279  memcpy( out.data()+i, a->data()+j, s );
280  j = 0;
281  i += s;
282  ++it;
283  a = *it;
284  s = a ? a->size() : 0;
285  }
286 
287  if (m_nDestPort != 0)
288  nwritten = m_pSocket->writeBlock( out.data(), i, m_destHostAddress, m_nDestPort );
289  else
290  nwritten = m_pSocket->writeBlock( out.data(), i );
291  }
292  else
293  {
294  // Big block, write it immediately
295  i = a->size() - m_nWriteIndex;
296 
297  if (m_nDestPort != 0)
298  nwritten = m_pSocket->writeBlock( a->data() + m_nWriteIndex, i, m_destHostAddress, m_nDestPort );
299  else
300  nwritten = m_pSocket->writeBlock( a->data() + m_nWriteIndex, i );
301  }
302 
303  if ( nwritten > 0 )
304  {
305  if ( ConsumeWriteBuf( nwritten ) )
306  {
307  //consumed += nwritten;
308  }
309  }
310 
311  if ( nwritten < i )
312  osBufferFull = true;
313  }
314 }
315 
316 
318 //
320 
322 {
323  return (qlonglong)BytesAvailable();
324 }
325 
327 //
329 
331 {
332  return( 0 );
333 }
334 
336 //
338 
339 bool BufferedSocketDevice::At( qlonglong index )
340 {
341  ReadBytes();
342 
343  if ( index > m_bufRead.size() )
344  return false;
345 
346  // throw away data 0..index-1
347  m_bufRead.consumeBytes( (qulonglong)index, nullptr );
348 
349  return true;
350 }
351 
353 //
355 
357 {
358  if ( !m_pSocket->isValid() )
359  return true;
360 
361  ReadBytes();
362 
363  return m_bufRead.size() == 0;
364 
365 }
366 
368 //
370 
372 {
373  if ( !m_pSocket->isValid() )
374  return 0;
375 
376  return ReadBytes();
377 }
378 
380 //
382 
384  int msecs, bool *pTimeout /* = nullptr*/ )
385 {
386  bool bTimeout = false;
387 
388  if ( !m_pSocket->isValid() )
389  return 0;
390 
391  qlonglong nBytes = BytesAvailable();
392 
393  if (nBytes == 0)
394  {
395 /*
396  The following code is a possible workaround to the lost request problem
397  I just hate looping too much to put it in. I believe there is something
398  I'm missing that is causing the lost packets... Just need to find it.
399 
400  bTimeout = true;
401  int nCount = 0;
402  int msWait = msecs / 100;
403 
404  while (((nBytes = ReadBytes()) == 0 ) &&
405  (nCount++ < 100 ) &&
406  bTimeout &&
407  m_pSocket->isValid() )
408  {
409  // give up control
410 
411  // should be some multiple of msWait.
412  std::this_thread::sleep_for(std::chrono::milliseconds(1));
413 
414  }
415  }
416 */
417  // -=>TODO: Override the timeout to 1 second... Closes connection sooner
418  // to help recover from lost requests. (hack until better fix found)
419 
420  msecs = 1000;
421 
422  nBytes = m_pSocket->waitForMore( msecs, &bTimeout );
423 
424  if (pTimeout != nullptr)
425  *pTimeout = bTimeout;
426  }
427 
428  return nBytes; // nBytes //m_bufRead.size();
429 }
430 
432 //
434 
436 {
437  return m_nWriteSize;
438 
439 }
440 
442 //
444 
446 {
447  while (!m_bufWrite.empty())
448  {
449  delete m_bufWrite.back();
450  m_bufWrite.pop_back();
451  }
453 }
454 
456 //
458 
460 {
461  m_bufRead.clear();
462 }
463 
465 //
467 
468 qlonglong BufferedSocketDevice::ReadBlock( char *data, qulonglong maxlen )
469 {
470  if ( data == nullptr && maxlen != 0 )
471  return -1;
472 
473  if ( !m_pSocket->isOpen() )
474  return -1;
475 
476  ReadBytes();
477 
478  if ( maxlen >= (qulonglong)m_bufRead.size() )
479  maxlen = m_bufRead.size();
480 
481  m_bufRead.consumeBytes( maxlen, data );
482 
483  return maxlen;
484 
485 }
486 
488 //
490 
491 qlonglong BufferedSocketDevice::WriteBlock( const char *data, qulonglong len )
492 {
493  if ( len == 0 )
494  return 0;
495 
496  QByteArray *a = m_bufWrite.back();
497 
498  bool writeNow = ( (m_nWriteSize + len >= 1400) || (len > 512) );
499 
500  if ( a && (a->size() + len < 128) )
501  {
502  // small buffer, resize
503  int i = a->size();
504 
505  a->resize( i+len );
506  memcpy( a->data()+i, data, len );
507  }
508  else
509  {
510  // append new buffer
511  m_bufWrite.push_back(new QByteArray(data, len));
512  }
513 
514  m_nWriteSize += len;
515 
516  if ( writeNow )
517  Flush();
518 
519  return len;
520 }
521 
523 //
525 
527  const char *data, qulonglong len)
528 {
529  qlonglong nWritten = 0;
530 
531  // must Flush data just in case caller is mixing buffered & un-buffered calls
532 
533  Flush();
534 
535  if (m_nDestPort != 0)
536  nWritten = m_pSocket->writeBlock( data, len, m_destHostAddress , m_nDestPort );
537  else
538  nWritten = m_pSocket->writeBlock( data, len );
539 
540  return nWritten;
541 }
542 
544 //
546 
548 {
549  if ( m_pSocket->isOpen() )
550  {
551  ReadBytes();
552 
553  if (m_bufRead.size() > 0 )
554  {
555  uchar c = '\0';
556 
557  m_bufRead.consumeBytes( 1, (char*)&c );
558 
559  return c;
560  }
561  }
562 
563  return -1;
564 }
565 
567 //
569 
571 {
572  char buf[2];
573 
574  buf[0] = ch;
575 
576  return WriteBlock(buf, 1) == 1 ? ch : -1;
577 }
578 
580 //
582 
584 {
585  return m_bufRead.ungetch( ch );
586 }
587 
589 //
591 
593 {
594  ReadBytes();
595 
596  return ( BytesAvailable() > 0 ) && m_bufRead.scanNewline( nullptr );
597 }
598 
600 //
602 
604 {
605  QByteArray a;
606  a.resize(256);
607 
608  ReadBytes();
609 
610  bool nl = m_bufRead.scanNewline( &a );
611 
612  QString s;
613 
614  if ( nl )
615  {
616  At( a.size() ); // skips the data read
617 
618  s = QString( a );
619  }
620 
621  return s;
622 }
623 
625 //
627 
628 QString BufferedSocketDevice::ReadLine( int msecs )
629 {
630  MythTimer timer;
631  QString sLine;
632 
633  if ( CanReadLine() )
634  return( ReadLine() );
635 
636  // ----------------------------------------------------------------------
637  // If the user supplied a timeout, lets loop until we can read a line
638  // or timeout.
639  // ----------------------------------------------------------------------
640 
641  if ( msecs > 0)
642  {
643  bool bTimeout = false;
644 
645  timer.start();
646 
647  while ( !CanReadLine() && !bTimeout )
648  {
649 #if 0
650  LOG(VB_HTTP, LOG_DEBUG, "Can't Read Line... Waiting for more." );
651 #endif
652 
653  WaitForMore( msecs, &bTimeout );
654 
655  if ( timer.elapsed() >= msecs )
656  {
657  bTimeout = true;
658  LOG(VB_HTTP, LOG_INFO, "Exceeded Total Elapsed Wait Time." );
659  }
660  }
661 
662  if (CanReadLine())
663  sLine = ReadLine();
664  }
665 
666  return( sLine );
667 }
668 
670 //
672 
673 quint16 BufferedSocketDevice::Port(void) const
674 {
675  if (m_pSocket)
676  return( m_pSocket->port() );
677 
678  return 0;
679 }
680 
682 //
684 
686 {
687  if (m_pSocket)
688  return( m_pSocket->peerPort() );
689 
690  return 0;
691 }
692 
694 //
696 
697 QHostAddress BufferedSocketDevice::Address() const
698 {
699  if (m_pSocket)
700  return( m_pSocket->address() );
701 
702  QHostAddress tmp;
703 
704  return tmp;
705 }
706 
708 //
710 
712 {
713  if (m_pSocket)
714  return( m_pSocket->peerAddress() );
715 
716  QHostAddress tmp;
717 
718  return tmp;
719 }
720 
BufferedSocketDevice::m_bHandleSocketDelete
bool m_bHandleSocketDelete
Definition: bufferedsocketdevice.h:48
ENO
#define ENO
This can be appended to the LOG args with "+".
Definition: mythlogging.h:72
BufferedSocketDevice::BytesAvailable
qulonglong BytesAvailable()
Definition: bufferedsocketdevice.cpp:371
BufferedSocketDevice::Putch
int Putch(int ch)
Definition: bufferedsocketdevice.cpp:570
BufferedSocketDevice::Connect
bool Connect(const QHostAddress &addr, quint16 port)
Definition: bufferedsocketdevice.cpp:99
BufferedSocketDevice::m_pSocket
MSocketDevice * m_pSocket
Definition: bufferedsocketdevice.h:42
MythTimer
A QElapsedTimer based timer to replace use of QTime as a timer.
Definition: mythtimer.h:14
BufferedSocketDevice::SetReadBufferSize
void SetReadBufferSize(qulonglong bufSize)
Definition: bufferedsocketdevice.cpp:145
BufferedSocketDevice::WriteBlock
qlonglong WriteBlock(const char *data, qulonglong len)
Definition: bufferedsocketdevice.cpp:491
BufferedSocketDevice::m_bufRead
MMembuf m_bufRead
Definition: bufferedsocketdevice.h:53
BufferedSocketDevice::m_destHostAddress
QHostAddress m_destHostAddress
Definition: bufferedsocketdevice.h:50
BufferedSocketDevice::m_nMaxReadBufferSize
qulonglong m_nMaxReadBufferSize
Definition: bufferedsocketdevice.h:44
MythTimer::start
void start(void)
starts measuring elapsed time.
Definition: mythtimer.cpp:47
LOG
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
Definition: mythlogging.h:23
BufferedSocketDevice::Address
QHostAddress Address() const
Definition: bufferedsocketdevice.cpp:697
BufferedSocketDevice::m_bufWrite
deque< QByteArray * > m_bufWrite
Definition: bufferedsocketdevice.h:54
BufferedSocketDevice::ClearPendingData
void ClearPendingData()
Definition: bufferedsocketdevice.cpp:445
BufferedSocketDevice::Port
quint16 Port() const
Definition: bufferedsocketdevice.cpp:673
BufferedSocketDevice::ReadBlock
qlonglong ReadBlock(char *data, qulonglong maxlen)
Definition: bufferedsocketdevice.cpp:468
tmp
static guint32 * tmp
Definition: goom_core.cpp:30
BufferedSocketDevice::~BufferedSocketDevice
virtual ~BufferedSocketDevice()
Definition: bufferedsocketdevice.cpp:65
BufferedSocketDevice::Ungetch
int Ungetch(int ch)
Definition: bufferedsocketdevice.cpp:583
BufferedSocketDevice::SetDestAddress
void SetDestAddress(QHostAddress hostAddress, quint16 nPort)
Definition: bufferedsocketdevice.cpp:134
BufferedSocketDevice::WaitForMore
qulonglong WaitForMore(int msecs, bool *timeout=nullptr)
Definition: bufferedsocketdevice.cpp:383
BufferedSocketDevice::CanReadLine
bool CanReadLine()
Definition: bufferedsocketdevice.cpp:592
mythlogging.h
BufferedSocketDevice::m_nDestPort
quint16 m_nDestPort
Definition: bufferedsocketdevice.h:51
BufferedSocketDevice::ReadLine
QString ReadLine()
Definition: bufferedsocketdevice.cpp:603
BufferedSocketDevice::WriteBlockDirect
qlonglong WriteBlockDirect(const char *data, qulonglong len)
Definition: bufferedsocketdevice.cpp:526
BufferedSocketDevice::BufferedSocketDevice
BufferedSocketDevice(int nSocket)
Definition: bufferedsocketdevice.cpp:27
BufferedSocketDevice::PeerAddress
QHostAddress PeerAddress() const
Definition: bufferedsocketdevice.cpp:711
bufferedsocketdevice.h
BufferedSocketDevice::SocketDevice
MSocketDevice * SocketDevice()
Definition: bufferedsocketdevice.cpp:111
BufferedSocketDevice::AtEnd
bool AtEnd()
Definition: bufferedsocketdevice.cpp:356
BufferedSocketDevice::Flush
void Flush()
Definition: bufferedsocketdevice.cpp:252
BufferedSocketDevice::Close
void Close()
Definition: bufferedsocketdevice.cpp:74
BufferedSocketDevice::Getch
int Getch()
Definition: bufferedsocketdevice.cpp:547
BufferedSocketDevice::Size
qint64 Size()
Definition: bufferedsocketdevice.cpp:321
BufferedSocketDevice::At
static qint64 At()
Definition: bufferedsocketdevice.cpp:330
BufferedSocketDevice::ConsumeWriteBuf
bool ConsumeWriteBuf(qulonglong nbytes)
Definition: bufferedsocketdevice.cpp:216
BufferedSocketDevice::socket
int socket()
Definition: bufferedsocketdevice.h:116
MythTimer::elapsed
int elapsed(void)
Returns milliseconds elapsed since last start() or restart()
Definition: mythtimer.cpp:90
BufferedSocketDevice::PeerPort
quint16 PeerPort() const
Definition: bufferedsocketdevice.cpp:685
BufferedSocketDevice::m_nWriteSize
qint64 m_nWriteSize
write total buf size
Definition: bufferedsocketdevice.h:45
BufferedSocketDevice::m_nWriteIndex
qint64 m_nWriteIndex
write index
Definition: bufferedsocketdevice.h:46
BufferedSocketDevice::ReadBytes
int ReadBytes()
Definition: bufferedsocketdevice.cpp:163
mythtimer.h
BufferedSocketDevice::ReadBufferSize
qulonglong ReadBufferSize() const
Definition: bufferedsocketdevice.cpp:154
BufferedSocketDevice::ClearReadBuffer
void ClearReadBuffer()
Definition: bufferedsocketdevice.cpp:459
BufferedSocketDevice::SetSocketDevice
void SetSocketDevice(MSocketDevice *pSocket)
Definition: bufferedsocketdevice.cpp:120
upnputil.h
BufferedSocketDevice::BytesToWrite
qulonglong BytesToWrite() const
Definition: bufferedsocketdevice.cpp:435