MythTV  master
bufferedsocketdevice.cpp
Go to the documentation of this file.
1 // Program Name: bufferedsocketdevice.cpp
3 // Created : Oct. 1, 2005
4 //
5 // Purpose :
6 //
7 // Copyright (c) 2005 David Blain <dblain@mythtv.org>
8 //
9 // Licensed under the GPL v2 or later, see LICENSE for details
10 //
12 
13 #include <algorithm>
14 #include <array>
15 #include <chrono> // for milliseconds
16 #include <thread> // for sleep_for
17 #include <utility>
18 
20 #include "libmythbase/mythtimer.h"
21 
22 #include "bufferedsocketdevice.h"
23 #include "upnputil.h"
24 
26 //
28 
30 {
31  m_pSocket = new MSocketDevice();
32 
33  m_pSocket->setSocket ( nSocket, MSocketDevice::Stream );
34  m_pSocket->setBlocking ( false );
35  m_pSocket->setAddressReusable( true );
36 
37  struct linger ling = {1, 1};
38 
39  if ( setsockopt(socket(), SOL_SOCKET, SO_LINGER, (const char *)&ling,
40  sizeof(ling)) < 0)
41  LOG(VB_GENERAL, LOG_ERR,
42  "BufferedSocketDevice: setsockopt - SO_LINGER: " + ENO);
43 }
44 
46 //
48 
49 BufferedSocketDevice::BufferedSocketDevice( MSocketDevice *pSocket /* = nullptr*/,
50  bool bTakeOwnership /* = false */ )
51 {
52  m_pSocket = pSocket;
53 
54  m_nDestPort = 0;
55 
57  m_nWriteSize = 0;
58  m_nWriteIndex = 0;
59  m_bHandleSocketDelete= bTakeOwnership;
60 
61 }
62 
64 //
66 
68 {
69  Close();
70 }
71 
73 //
75 
77 {
78  Flush();
79  ReadBytes();
80 
81  m_bufRead.clear();
83 
84  if (m_pSocket != nullptr)
85  {
86  if (m_pSocket->isValid())
87  m_pSocket->close();
88 
90  delete m_pSocket;
91 
92  m_pSocket = nullptr;
93  }
94 
95 }
96 
98 //
100 
101 bool BufferedSocketDevice::Connect( const QHostAddress &addr, quint16 port )
102 {
103  if (m_pSocket == nullptr)
104  return false;
105 
106  return m_pSocket->connect( addr, port );
107 }
108 
110 //
112 
114 {
115  return( m_pSocket );
116 }
117 
119 //
121 
122 void BufferedSocketDevice::SetSocketDevice( MSocketDevice *pSocket )
123 {
124  if ((m_bHandleSocketDelete) && (m_pSocket != nullptr))
125  delete m_pSocket;
126 
127  m_bHandleSocketDelete = false;
128 
129  m_pSocket = pSocket;
130 }
131 
133 //
135 
137  QHostAddress hostAddress, quint16 nPort)
138 {
139  m_destHostAddress = std::move(hostAddress);
140  m_nDestPort = nPort;
141 }
142 
144 //
146 
147 void BufferedSocketDevice::SetReadBufferSize( qulonglong bufSize )
148 {
149  m_nMaxReadBufferSize = bufSize;
150 }
151 
153 //
155 
157 {
158  return m_nMaxReadBufferSize;
159 }
160 
162 //
164 
166 {
167  if (m_pSocket == nullptr)
168  return m_bufRead.size();
169 
170  qlonglong maxToRead = 0;
171 
172  if ( m_nMaxReadBufferSize > 0 )
173  {
174  maxToRead = m_nMaxReadBufferSize - m_bufRead.size();
175 
176  if ( maxToRead <= 0 )
177  return m_bufRead.size();
178  }
179 
180  qlonglong nbytes = m_pSocket->bytesAvailable();
181 
182  QByteArray *a = nullptr;
183 
184  if ( nbytes > 0 )
185  {
186  a = new QByteArray();
187  a->resize(nbytes);
188 
189  qlonglong nread = m_pSocket->readBlock(
190  a->data(), maxToRead ? std::min(nbytes, maxToRead) : nbytes);
191 
192  if (( nread > 0 ) && ( nread != a->size() ))
193  {
194  // unexpected
195  a->resize( nread );
196  }
197  }
198 
199  if (a)
200  {
201 #if 0
202  QString msg;
203  for( long n = 0; n < a->count(); n++ )
204  msg += QString("%1").arg(a->at(n));
205  LOG(VB_GENERAL, LOG_DEBUG, msg);
206 #endif
207 
208  m_bufRead.append( a );
209  }
210 
211  return m_bufRead.size();
212 }
213 
215 //
217 
218 bool BufferedSocketDevice::ConsumeWriteBuf( qulonglong nbytes )
219 {
220  if ( !nbytes || ((qlonglong)nbytes > m_nWriteSize) )
221  return false;
222 
223  m_nWriteSize -= nbytes;
224 
225  for ( ;; )
226  {
227  QByteArray *a = m_bufWrite.front();
228 
229  if ( m_nWriteIndex + nbytes >= (qulonglong)a->size() )
230  {
231  nbytes -= a->size() - m_nWriteIndex;
232  m_bufWrite.pop_front();
233  delete a;
234 
235  m_nWriteIndex = 0;
236 
237  if ( nbytes == 0 )
238  break;
239  }
240  else
241  {
242  m_nWriteIndex += nbytes;
243  break;
244  }
245  }
246 
247  return true;
248 }
249 
251 //
253 
255 {
256 
257  if ((m_pSocket == nullptr) || !m_pSocket->isValid())
258  return;
259 
260  bool osBufferFull = false;
261  //int consumed = 0;
262 
263  while ( !osBufferFull && ( m_nWriteSize > 0 ) && m_pSocket->isValid())
264  {
265  auto it = m_bufWrite.begin();
266  QByteArray *a = *it;
267 
268  int nwritten = 0;
269  int i = 0;
270 
271  if ( a->size() - m_nWriteIndex < 1460 )
272  {
273  QByteArray out;
274  out.resize(65536);
275 
276  int j = m_nWriteIndex;
277  int s = a->size() - j;
278 
279  while ( a && i+s < out.size() )
280  {
281  memcpy( out.data()+i, a->data()+j, s );
282  j = 0;
283  i += s;
284  ++it;
285  a = *it;
286  s = a ? a->size() : 0;
287  }
288 
289  if (m_nDestPort != 0)
290  nwritten = m_pSocket->writeBlock( out.data(), i, m_destHostAddress, m_nDestPort );
291  else
292  nwritten = m_pSocket->writeBlock( out.data(), i );
293  }
294  else
295  {
296  // Big block, write it immediately
297  i = a->size() - m_nWriteIndex;
298 
299  if (m_nDestPort != 0)
300  nwritten = m_pSocket->writeBlock( a->data() + m_nWriteIndex, i, m_destHostAddress, m_nDestPort );
301  else
302  nwritten = m_pSocket->writeBlock( a->data() + m_nWriteIndex, i );
303  }
304 
305  if ( nwritten > 0 )
306  {
307  if ( ConsumeWriteBuf( nwritten ) )
308  {
309  //consumed += nwritten;
310  }
311  }
312 
313  if ( nwritten < i )
314  osBufferFull = true;
315  }
316 }
317 
318 
320 //
322 
324 {
325  return (qlonglong)BytesAvailable();
326 }
327 
329 //
331 
333 {
334  return( 0 );
335 }
336 
338 //
340 
341 bool BufferedSocketDevice::At( qlonglong index )
342 {
343  ReadBytes();
344 
345  if ( index > m_bufRead.size() )
346  return false;
347 
348  // throw away data 0..index-1
349  m_bufRead.consumeBytes( (qulonglong)index, nullptr );
350 
351  return true;
352 }
353 
355 //
357 
359 {
360  if ( !m_pSocket->isValid() )
361  return true;
362 
363  ReadBytes();
364 
365  return m_bufRead.size() == 0;
366 
367 }
368 
370 //
372 
374 {
375  if ( !m_pSocket->isValid() )
376  return 0;
377 
378  return ReadBytes();
379 }
380 
382 //
384 
386  std::chrono::milliseconds msecs, bool *pTimeout /* = nullptr*/ )
387 {
388  bool bTimeout = false;
389 
390  if ( !m_pSocket->isValid() )
391  return 0;
392 
393  qlonglong nBytes = BytesAvailable();
394 
395  if (nBytes == 0)
396  {
397 /*
398  The following code is a possible workaround to the lost request problem
399  I just hate looping too much to put it in. I believe there is something
400  I'm missing that is causing the lost packets... Just need to find it.
401 
402  bTimeout = true;
403  int nCount = 0;
404  int msWait = msecs / 100;
405 
406  while (((nBytes = ReadBytes()) == 0 ) &&
407  (nCount++ < 100 ) &&
408  bTimeout &&
409  m_pSocket->isValid() )
410  {
411  // give up control
412 
413  // should be some multiple of msWait.
414  std::this_thread::sleep_for(1ms);
415 
416  }
417  }
418 */
419  // -=>TODO: Override the timeout to 1 second... Closes connection sooner
420  // to help recover from lost requests. (hack until better fix found)
421 
422  msecs = 1s;
423 
424  nBytes = m_pSocket->waitForMore( msecs, &bTimeout );
425 
426  if (pTimeout != nullptr)
427  *pTimeout = bTimeout;
428  }
429 
430  return nBytes; // nBytes //m_bufRead.size();
431 }
432 
434 //
436 
438 {
439  return m_nWriteSize;
440 
441 }
442 
444 //
446 
448 {
449  while (!m_bufWrite.empty())
450  {
451  delete m_bufWrite.back();
452  m_bufWrite.pop_back();
453  }
455 }
456 
458 //
460 
462 {
463  m_bufRead.clear();
464 }
465 
467 //
469 
470 qlonglong BufferedSocketDevice::ReadBlock( char *data, qulonglong maxlen )
471 {
472  if ( data == nullptr && maxlen != 0 )
473  return -1;
474 
475  if ( !m_pSocket->isOpen() )
476  return -1;
477 
478  ReadBytes();
479 
480  if ( maxlen >= (qulonglong)m_bufRead.size() )
481  maxlen = m_bufRead.size();
482 
483  m_bufRead.consumeBytes( maxlen, data );
484 
485  return maxlen;
486 
487 }
488 
490 //
492 
493 qlonglong BufferedSocketDevice::WriteBlock( const char *data, qulonglong len )
494 {
495  if ( len == 0 )
496  return 0;
497 
498  QByteArray *a = m_bufWrite.back();
499 
500  bool writeNow = ( (m_nWriteSize + len >= 1400) || (len > 512) );
501 
502  if ( a && (a->size() + len < 128) )
503  {
504  // small buffer, resize
505  int i = a->size();
506 
507  a->resize( i+len );
508  memcpy( a->data()+i, data, len );
509  }
510  else
511  {
512  // append new buffer
513  m_bufWrite.push_back(new QByteArray(data, len));
514  }
515 
516  m_nWriteSize += len;
517 
518  if ( writeNow )
519  Flush();
520 
521  return len;
522 }
523 
525 //
527 
529  const char *data, qulonglong len)
530 {
531  qlonglong nWritten = 0;
532 
533  // must Flush data just in case caller is mixing buffered & un-buffered calls
534 
535  Flush();
536 
537  if (m_nDestPort != 0)
538  nWritten = m_pSocket->writeBlock( data, len, m_destHostAddress , m_nDestPort );
539  else
540  nWritten = m_pSocket->writeBlock( data, len );
541 
542  return nWritten;
543 }
544 
546 //
548 
550 {
551  if ( m_pSocket->isOpen() )
552  {
553  ReadBytes();
554 
555  if (m_bufRead.size() > 0 )
556  {
557  uchar c = '\0';
558 
559  m_bufRead.consumeBytes( 1, (char*)&c );
560 
561  return c;
562  }
563  }
564 
565  return -1;
566 }
567 
569 //
571 
573 {
574  std::array <char,2> buf { static_cast<char>(ch), 0 };
575 
576  return WriteBlock(buf.data(), 1) == 1 ? ch : -1;
577 }
578 
580 //
582 
584 {
585  return m_bufRead.ungetch( ch );
586 }
587 
589 //
591 
593 {
594  ReadBytes();
595 
596  return ( BytesAvailable() > 0 ) && m_bufRead.scanNewline( nullptr );
597 }
598 
600 //
602 
604 {
605  QByteArray a;
606  a.resize(256);
607 
608  ReadBytes();
609 
610  bool nl = m_bufRead.scanNewline( &a );
611 
612  QString s;
613 
614  if ( nl )
615  {
616  At( a.size() ); // skips the data read
617 
618  s = QString( a );
619  }
620 
621  return s;
622 }
623 
625 //
627 
628 QString BufferedSocketDevice::ReadLine( std::chrono::milliseconds msecs )
629 {
630  MythTimer timer;
631  QString sLine;
632 
633  if ( CanReadLine() )
634  return( ReadLine() );
635 
636  // ----------------------------------------------------------------------
637  // If the user supplied a timeout, lets loop until we can read a line
638  // or timeout.
639  // ----------------------------------------------------------------------
640 
641  if ( msecs > 0ms)
642  {
643  bool bTimeout = false;
644 
645  timer.start();
646 
647  while ( !CanReadLine() && !bTimeout )
648  {
649 #if 0
650  LOG(VB_HTTP, LOG_DEBUG, "Can't Read Line... Waiting for more." );
651 #endif
652 
653  WaitForMore( msecs, &bTimeout );
654 
655  if ( timer.elapsed() >= msecs )
656  {
657  bTimeout = true;
658  LOG(VB_HTTP, LOG_INFO, "Exceeded Total Elapsed Wait Time." );
659  }
660  }
661 
662  if (CanReadLine())
663  sLine = ReadLine();
664  }
665 
666  return( sLine );
667 }
668 
670 //
672 
673 quint16 BufferedSocketDevice::Port(void) const
674 {
675  if (m_pSocket)
676  return( m_pSocket->port() );
677 
678  return 0;
679 }
680 
682 //
684 
686 {
687  if (m_pSocket)
688  return( m_pSocket->peerPort() );
689 
690  return 0;
691 }
692 
694 //
696 
697 QHostAddress BufferedSocketDevice::Address() const
698 {
699  if (m_pSocket)
700  return( m_pSocket->address() );
701 
702  QHostAddress tmp;
703 
704  return tmp;
705 }
706 
708 //
710 
712 {
713  if (m_pSocket)
714  return( m_pSocket->peerAddress() );
715 
716  QHostAddress tmp;
717 
718  return tmp;
719 }
720 
MythTimer::elapsed
std::chrono::milliseconds elapsed(void)
Returns milliseconds elapsed since last start() or restart()
Definition: mythtimer.cpp:91
BufferedSocketDevice::m_bHandleSocketDelete
bool m_bHandleSocketDelete
Definition: bufferedsocketdevice.h:48
ENO
#define ENO
This can be appended to the LOG args with "+".
Definition: mythlogging.h:73
BufferedSocketDevice::BytesAvailable
qulonglong BytesAvailable()
Definition: bufferedsocketdevice.cpp:373
BufferedSocketDevice::Putch
int Putch(int ch)
Definition: bufferedsocketdevice.cpp:572
BufferedSocketDevice::Connect
bool Connect(const QHostAddress &addr, quint16 port)
Definition: bufferedsocketdevice.cpp:101
BufferedSocketDevice::m_pSocket
MSocketDevice * m_pSocket
Definition: bufferedsocketdevice.h:42
MythTimer
A QElapsedTimer based timer to replace use of QTime as a timer.
Definition: mythtimer.h:13
BufferedSocketDevice::SetReadBufferSize
void SetReadBufferSize(qulonglong bufSize)
Definition: bufferedsocketdevice.cpp:147
BufferedSocketDevice::WriteBlock
qlonglong WriteBlock(const char *data, qulonglong len)
Definition: bufferedsocketdevice.cpp:493
BufferedSocketDevice::m_bufRead
MMembuf m_bufRead
Definition: bufferedsocketdevice.h:53
BufferedSocketDevice::m_destHostAddress
QHostAddress m_destHostAddress
Definition: bufferedsocketdevice.h:50
BufferedSocketDevice::m_nMaxReadBufferSize
qulonglong m_nMaxReadBufferSize
Definition: bufferedsocketdevice.h:44
MythTimer::start
void start(void)
starts measuring elapsed time.
Definition: mythtimer.cpp:47
LOG
#define LOG(_MASK_, _LEVEL_, _QSTRING_)
Definition: mythlogging.h:39
BufferedSocketDevice::Address
QHostAddress Address() const
Definition: bufferedsocketdevice.cpp:697
BufferedSocketDevice::ClearPendingData
void ClearPendingData()
Definition: bufferedsocketdevice.cpp:447
BufferedSocketDevice::Port
quint16 Port() const
Definition: bufferedsocketdevice.cpp:673
BufferedSocketDevice::ReadBlock
qlonglong ReadBlock(char *data, qulonglong maxlen)
Definition: bufferedsocketdevice.cpp:470
tmp
static guint32 * tmp
Definition: goom_core.cpp:26
BufferedSocketDevice::~BufferedSocketDevice
virtual ~BufferedSocketDevice()
Definition: bufferedsocketdevice.cpp:67
BufferedSocketDevice::Ungetch
int Ungetch(int ch)
Definition: bufferedsocketdevice.cpp:583
BufferedSocketDevice::SetDestAddress
void SetDestAddress(QHostAddress hostAddress, quint16 nPort)
Definition: bufferedsocketdevice.cpp:136
BufferedSocketDevice::CanReadLine
bool CanReadLine()
Definition: bufferedsocketdevice.cpp:592
mythlogging.h
BufferedSocketDevice::m_nDestPort
quint16 m_nDestPort
Definition: bufferedsocketdevice.h:51
BufferedSocketDevice::ReadLine
QString ReadLine()
Definition: bufferedsocketdevice.cpp:603
BufferedSocketDevice::WriteBlockDirect
qlonglong WriteBlockDirect(const char *data, qulonglong len)
Definition: bufferedsocketdevice.cpp:528
BufferedSocketDevice::WaitForMore
qulonglong WaitForMore(std::chrono::milliseconds msecs, bool *timeout=nullptr)
Definition: bufferedsocketdevice.cpp:385
BufferedSocketDevice::BufferedSocketDevice
BufferedSocketDevice(int nSocket)
Definition: bufferedsocketdevice.cpp:29
BufferedSocketDevice::PeerAddress
QHostAddress PeerAddress() const
Definition: bufferedsocketdevice.cpp:711
bufferedsocketdevice.h
BufferedSocketDevice::SocketDevice
MSocketDevice * SocketDevice()
Definition: bufferedsocketdevice.cpp:113
BufferedSocketDevice::AtEnd
bool AtEnd()
Definition: bufferedsocketdevice.cpp:358
BufferedSocketDevice::Flush
void Flush()
Definition: bufferedsocketdevice.cpp:254
BufferedSocketDevice::Close
void Close()
Definition: bufferedsocketdevice.cpp:76
BufferedSocketDevice::Getch
int Getch()
Definition: bufferedsocketdevice.cpp:549
BufferedSocketDevice::Size
qint64 Size()
Definition: bufferedsocketdevice.cpp:323
BufferedSocketDevice::At
static qint64 At()
Definition: bufferedsocketdevice.cpp:332
BufferedSocketDevice::ConsumeWriteBuf
bool ConsumeWriteBuf(qulonglong nbytes)
Definition: bufferedsocketdevice.cpp:218
BufferedSocketDevice::socket
int socket()
Definition: bufferedsocketdevice.h:116
BufferedSocketDevice::PeerPort
quint16 PeerPort() const
Definition: bufferedsocketdevice.cpp:685
BufferedSocketDevice::m_nWriteSize
qint64 m_nWriteSize
write total buf size
Definition: bufferedsocketdevice.h:45
BufferedSocketDevice::m_nWriteIndex
qint64 m_nWriteIndex
write index
Definition: bufferedsocketdevice.h:46
BufferedSocketDevice::ReadBytes
int ReadBytes()
Definition: bufferedsocketdevice.cpp:165
mythtimer.h
BufferedSocketDevice::ReadBufferSize
qulonglong ReadBufferSize() const
Definition: bufferedsocketdevice.cpp:156
BufferedSocketDevice::ClearReadBuffer
void ClearReadBuffer()
Definition: bufferedsocketdevice.cpp:461
BufferedSocketDevice::SetSocketDevice
void SetSocketDevice(MSocketDevice *pSocket)
Definition: bufferedsocketdevice.cpp:122
upnputil.h
BufferedSocketDevice::m_bufWrite
std::deque< QByteArray * > m_bufWrite
Definition: bufferedsocketdevice.h:54
BufferedSocketDevice::BytesToWrite
qulonglong BytesToWrite() const
Definition: bufferedsocketdevice.cpp:437