MythTV  0.27pre
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Groups Pages
bufferedsocketdevice.cpp
Go to the documentation of this file.
1 
2 // Program Name: bufferedsocketdevice.cpp
3 // Created : Oct. 1, 2005
4 //
5 // Purpose :
6 //
7 // Copyright (c) 2005 David Blain <dblain@mythtv.org>
8 //
9 // Licensed under the GPL v2 or later, see COPYING for details
10 //
12 
13 #include <algorithm>
14 
15 #include "mythtimer.h"
16 #include "bufferedsocketdevice.h"
17 #include "upnputil.h"
18 #include "mythlogging.h"
19 
21 //
23 
25 {
26  m_pSocket = new MSocketDevice();
27 
29  m_pSocket->setBlocking ( false );
31 
32  struct linger ling = {1, 1};
33 
34  if ( setsockopt(socket(), SOL_SOCKET, SO_LINGER, (const char *)&ling,
35  sizeof(ling)) < 0)
36  LOG(VB_GENERAL, LOG_ERR,
37  "BufferedSocketDevice: setsockopt - SO_LINGER: " + ENO);
38 
39  m_nDestPort = 0;
40 
42  m_nWriteSize = 0;
43  m_nWriteIndex = 0;
45 }
46 
48 //
50 
52  bool bTakeOwnership /* = false */ )
53 {
54  m_pSocket = pSocket;
55 
56  m_nDestPort = 0;
57 
59  m_nWriteSize = 0;
60  m_nWriteIndex = 0;
61  m_bHandleSocketDelete= bTakeOwnership;
62 
63 }
64 
66 //
68 
70 {
71  Close();
72 }
73 
75 //
77 
79 {
80  Flush();
81  ReadBytes();
82 
83  m_bufRead.clear();
85 
86  if (m_pSocket != NULL)
87  {
88  if (m_pSocket->isValid())
89  m_pSocket->close();
90 
92  delete m_pSocket;
93 
94  m_pSocket = NULL;
95  }
96 
97 }
98 
100 //
102 
103 bool BufferedSocketDevice::Connect( const QHostAddress &addr, quint16 port )
104 {
105  if (m_pSocket == NULL)
106  return false;
107 
108  return m_pSocket->connect( addr, port );
109 }
110 
112 //
114 
116 {
117  return( m_pSocket );
118 }
119 
121 //
123 
125 {
126  if ((m_bHandleSocketDelete) && (m_pSocket != NULL))
127  delete m_pSocket;
128 
129  m_bHandleSocketDelete = false;
130 
131  m_pSocket = pSocket;
132 }
133 
135 //
137 
139  QHostAddress hostAddress, quint16 nPort)
140 {
141  m_DestHostAddress = hostAddress;
142  m_nDestPort = nPort;
143 }
144 
146 //
148 
149 void BufferedSocketDevice::SetReadBufferSize( qulonglong bufSize )
150 {
151  m_nMaxReadBufferSize = bufSize;
152 }
153 
155 //
157 
159 {
160  return m_nMaxReadBufferSize;
161 }
162 
164 //
166 
168 {
169  if (m_pSocket == NULL)
170  return m_bufRead.size();
171 
172  qlonglong maxToRead = 0;
173 
174  if ( m_nMaxReadBufferSize > 0 )
175  {
176  maxToRead = m_nMaxReadBufferSize - m_bufRead.size();
177 
178  if ( maxToRead <= 0 )
179  return m_bufRead.size();
180  }
181 
182  qlonglong nbytes = m_pSocket->bytesAvailable();
183  qlonglong nread;
184 
185  QByteArray *a = 0;
186 
187  if ( nbytes > 0 )
188  {
189  a = new QByteArray();
190  a->resize(nbytes);
191 
192  nread = m_pSocket->readBlock(
193  a->data(), maxToRead ? std::min(nbytes, maxToRead) : nbytes);
194 
195  if (( nread > 0 ) && ( nread != (int)a->size() ))
196  {
197  // unexpected
198  a->resize( nread );
199  }
200  }
201 
202  if (a)
203  {
204 #if 0
205  QString msg;
206  for( long n = 0; n < a->count(); n++ )
207  msg += QString("%1").arg(a->at(n));
208  LOG(VB_GENERAL, LOG_DEBUG, msg);
209 #endif
210 
211  m_bufRead.append( a );
212  }
213 
214  return m_bufRead.size();
215 }
216 
218 //
220 
221 bool BufferedSocketDevice::ConsumeWriteBuf( qulonglong nbytes )
222 {
223  if ( !nbytes || ((qlonglong)nbytes > m_nWriteSize) )
224  return false;
225 
226  m_nWriteSize -= nbytes;
227 
228  for ( ;; )
229  {
230  QByteArray *a = m_bufWrite.front();
231 
232  if ( m_nWriteIndex + nbytes >= (qulonglong)a->size() )
233  {
234  nbytes -= a->size() - m_nWriteIndex;
235  m_bufWrite.pop_front();
236  delete a;
237 
238  m_nWriteIndex = 0;
239 
240  if ( nbytes == 0 )
241  break;
242  }
243  else
244  {
245  m_nWriteIndex += nbytes;
246  break;
247  }
248  }
249 
250  return true;
251 }
252 
254 //
256 
258 {
259 
260  if ((m_pSocket == NULL) || !m_pSocket->isValid())
261  return;
262 
263  bool osBufferFull = false;
264  int consumed = 0;
265 
266  while ( !osBufferFull && ( m_nWriteSize > 0 ) && m_pSocket->isValid())
267  {
269  QByteArray *a = *it;
270 
271  int nwritten = 0;
272  int i = 0;
273 
274  if ( (int)a->size() - m_nWriteIndex < 1460 )
275  {
276  QByteArray out;
277  out.resize(65536);
278 
279  int j = m_nWriteIndex;
280  int s = a->size() - j;
281 
282  while ( a && i+s < (int)out.size() )
283  {
284  memcpy( out.data()+i, a->data()+j, s );
285  j = 0;
286  i += s;
287  ++it;
288  a = *it;
289  s = a ? a->size() : 0;
290  }
291 
292  if (m_nDestPort != 0)
293  nwritten = m_pSocket->writeBlock( out.data(), i, m_DestHostAddress, m_nDestPort );
294  else
295  nwritten = m_pSocket->writeBlock( out.data(), i );
296  }
297  else
298  {
299  // Big block, write it immediately
300  i = a->size() - m_nWriteIndex;
301 
302  if (m_nDestPort != 0)
303  nwritten = m_pSocket->writeBlock( a->data() + m_nWriteIndex, i, m_DestHostAddress, m_nDestPort );
304  else
305  nwritten = m_pSocket->writeBlock( a->data() + m_nWriteIndex, i );
306  }
307 
308  if ( nwritten > 0 )
309  {
310  if ( ConsumeWriteBuf( nwritten ) )
311 
312  consumed += nwritten;
313  }
314 
315  if ( nwritten < i )
316  osBufferFull = true;
317  }
318 }
319 
320 
322 //
324 
326 {
327  return (qlonglong)BytesAvailable();
328 }
329 
331 //
333 
334 qlonglong BufferedSocketDevice::At() const
335 {
336  return( 0 );
337 }
338 
340 //
342 
344 {
345  ReadBytes();
346 
347  if ( index > m_bufRead.size() )
348  return false;
349 
350  // throw away data 0..index-1
351  m_bufRead.consumeBytes( (qulonglong)index, 0 );
352 
353  return true;
354 }
355 
357 //
359 
361 {
362  if ( !m_pSocket->isValid() )
363  return true;
364 
365  ReadBytes();
366 
367  return m_bufRead.size() == 0;
368 
369 }
370 
372 //
374 
376 {
377  if ( !m_pSocket->isValid() )
378  return 0;
379 
380  return ReadBytes();
381 }
382 
384 //
386 
388  int msecs, bool *pTimeout /* = NULL*/ )
389 {
390  bool bTimeout = false;
391 
392  if ( !m_pSocket->isValid() )
393  return 0;
394 
395  qlonglong nBytes = BytesAvailable();
396 
397  if (nBytes == 0)
398  {
399 /*
400  The following code is a possible workaround to the lost request problem
401  I just hate looping too much to put it in. I believe there is something
402  I'm missing that is causing the lost packets... Just need to find it.
403 
404  bTimeout = true;
405  int nCount = 0;
406  int msWait = msecs / 100;
407 
408  while (((nBytes = ReadBytes()) == 0 ) &&
409  (nCount++ < 100 ) &&
410  bTimeout &&
411  m_pSocket->isValid() )
412  {
413  // give up control
414 
415  usleep( 1000 ); // should be some multiple of msWait.
416 
417  }
418  }
419 */
420  // -=>TODO: Override the timeout to 1 second... Closes connection sooner
421  // to help recover from lost requests. (hack until better fix found)
422 
423  msecs = 1000;
424 
425  nBytes = m_pSocket->waitForMore( msecs, &bTimeout );
426 
427  if (pTimeout != NULL)
428  *pTimeout = bTimeout;
429  }
430 
431  return nBytes; // nBytes //m_bufRead.size();
432 }
433 
435 //
437 
439 {
440  return m_nWriteSize;
441 
442 }
443 
445 //
447 
449 {
450  while (!m_bufWrite.empty())
451  {
452  delete m_bufWrite.back();
453  m_bufWrite.pop_back();
454  }
456 }
457 
459 //
461 
463 {
464  m_bufRead.clear();
465 }
466 
468 //
470 
471 qlonglong BufferedSocketDevice::ReadBlock( char *data, qulonglong maxlen )
472 {
473  if ( data == 0 && maxlen != 0 )
474  return -1;
475 
476  if ( !m_pSocket->isOpen() )
477  return -1;
478 
479  ReadBytes();
480 
481  if ( maxlen >= (qulonglong)m_bufRead.size() )
482  maxlen = m_bufRead.size();
483 
484  m_bufRead.consumeBytes( maxlen, data );
485 
486  return maxlen;
487 
488 }
489 
491 //
493 
494 qlonglong BufferedSocketDevice::WriteBlock( const char *data, qulonglong len )
495 {
496  if ( len == 0 )
497  return 0;
498 
499  QByteArray *a = m_bufWrite.back();
500 
501  bool writeNow = ( (m_nWriteSize + len >= 1400) || (len > 512) );
502 
503  if ( a && (a->size() + len < 128) )
504  {
505  // small buffer, resize
506  int i = a->size();
507 
508  a->resize( i+len );
509  memcpy( a->data()+i, data, len );
510  }
511  else
512  {
513  // append new buffer
514  m_bufWrite.push_back(new QByteArray(data, len));
515  }
516 
517  m_nWriteSize += len;
518 
519  if ( writeNow )
520  Flush();
521 
522  return len;
523 }
524 
526 //
528 
530  const char *data, qulonglong len)
531 {
532  qlonglong nWritten = 0;
533 
534  // must Flush data just in case caller is mixing buffered & un-buffered calls
535 
536  Flush();
537 
538  if (m_nDestPort != 0)
539  nWritten = m_pSocket->writeBlock( data, len, m_DestHostAddress, m_nDestPort );
540  else
541  nWritten = m_pSocket->writeBlock( data, len );
542 
543  return nWritten;
544 }
545 
547 //
549 
551 {
552  if ( m_pSocket->isOpen() )
553  {
554  ReadBytes();
555 
556  if (m_bufRead.size() > 0 )
557  {
558  uchar c;
559 
560  m_bufRead.consumeBytes( 1, (char*)&c );
561 
562  return c;
563  }
564  }
565 
566  return -1;
567 }
568 
570 //
572 
574 {
575  char buf[2];
576 
577  buf[0] = ch;
578 
579  return WriteBlock(buf, 1) == 1 ? ch : -1;
580 }
581 
583 //
585 
587 {
588  return m_bufRead.ungetch( ch );
589 }
590 
592 //
594 
596 {
597  ReadBytes();
598 
599  if (( BytesAvailable() > 0 ) && m_bufRead.scanNewline( 0 ) )
600  return true;
601 
602  return false;
603 }
604 
606 //
608 
610 {
611  QByteArray a;
612  a.resize(256);
613 
614  ReadBytes();
615 
616  bool nl = m_bufRead.scanNewline( &a );
617 
618  QString s;
619 
620  if ( nl )
621  {
622  At( a.size() ); // skips the data read
623 
624  s = QString( a );
625  }
626 
627  return s;
628 }
629 
631 //
633 
634 QString BufferedSocketDevice::ReadLine( int msecs )
635 {
636  MythTimer timer;
637  QString sLine;
638 
639  if ( CanReadLine() )
640  return( ReadLine() );
641 
642  // ----------------------------------------------------------------------
643  // If the user supplied a timeout, lets loop until we can read a line
644  // or timeout.
645  // ----------------------------------------------------------------------
646 
647  if ( msecs > 0)
648  {
649  bool bTimeout = false;
650 
651  timer.start();
652 
653  while ( !CanReadLine() && !bTimeout )
654  {
655 #if 0
656  LOG(VB_UPNP, LOG_DEBUG, "Can't Read Line... Waiting for more." );
657 #endif
658 
659  WaitForMore( msecs, &bTimeout );
660 
661  if ( timer.elapsed() >= msecs )
662  {
663  bTimeout = true;
664  LOG(VB_UPNP, LOG_INFO, "Exceeded Total Elapsed Wait Time." );
665  }
666  }
667 
668  if (CanReadLine())
669  sLine = ReadLine();
670  }
671 
672  return( sLine );
673 }
674 
676 //
678 
679 quint16 BufferedSocketDevice::Port(void) const
680 {
681  if (m_pSocket)
682  return( m_pSocket->port() );
683 
684  return 0;
685 }
686 
688 //
690 
692 {
693  if (m_pSocket)
694  return( m_pSocket->peerPort() );
695 
696  return 0;
697 }
698 
700 //
702 
703 QHostAddress BufferedSocketDevice::Address() const
704 {
705  if (m_pSocket)
706  return( m_pSocket->address() );
707 
708  QHostAddress tmp;
709 
710  return tmp;
711 }
712 
714 //
716 
718 {
719  if (m_pSocket)
720  return( m_pSocket->peerAddress() );
721 
722  QHostAddress tmp;
723 
724  return tmp;
725 }
726