From 7fd9d945f0e0abaf2f381adb9e763a3e9ee6b78b Mon Sep 17 00:00:00 2001
From: Lawrence Rust <lvr@softsystem.co.uk>
Date: Sun, 31 Jul 2011 17:14:52 +0200
Subject: [PATCH 1/9] ringbuffer: Adapt readahead for low bit rate (64kbps radio) streams
- The existing readahead block size is too large for 64/128 kBps audio
streams and can cause audio underruns.
- Make WaitFailAvail return the bytes available if less than that requested
for low bit rate streams (where fill_min is < 32kB).
- Make fileringbuffer::safe_read retry if reading from a remote file. Low
bit rate sources such as radio can have considerable latency.
Signed-off-by: Lawrence Rust <lvr@softsystem.co.uk>
---
mythtv/libs/libmythtv/fileringbuffer.cpp | 48 +++++---
mythtv/libs/libmythtv/ringbuffer.cpp | 193 ++++++++++++++++--------------
mythtv/libs/libmythtv/ringbuffer.h | 2 +
3 files changed, 138 insertions(+), 105 deletions(-)
diff --git a/mythtv/libs/libmythtv/fileringbuffer.cpp b/mythtv/libs/libmythtv/fileringbuffer.cpp
index a3b73f3..f4b90d7 100644
a
|
b
|
bool FileRingBuffer::OpenFile(const QString &lfilename, uint retry_ms) |
356 | 356 | commserror = false; |
357 | 357 | numfailures = 0; |
358 | 358 | |
359 | | rawbitrate = 8000; |
| 359 | // The initial bitrate needs to be set with consideration for low bit rate |
| 360 | // streams (e.g. radio @ 64Kbps) such that fill_min bytes are received |
| 361 | // in a reasonable time period to enable decoders to peek the first few KB |
| 362 | // to determine type & settings. |
| 363 | if (is_local) |
| 364 | rawbitrate = 256; // Allow for radio |
| 365 | else |
| 366 | rawbitrate = 128; // remotefile |
| 367 | |
360 | 368 | CalcReadAheadThresh(); |
361 | 369 | |
362 | 370 | bool ok = fd2 >= 0 || remotefile; |
… |
… |
int FileRingBuffer::safe_read(int fd, void *data, uint sz) |
458 | 466 | */ |
459 | 467 | int FileRingBuffer::safe_read(RemoteFile *rf, void *data, uint sz) |
460 | 468 | { |
461 | | int ret = rf->Read(data, sz); |
462 | | if (ret < 0) |
463 | | { |
464 | | LOG(VB_GENERAL, LOG_ERR, LOC + |
465 | | "safe_read(RemoteFile* ...): read failed"); |
466 | | |
467 | | poslock.lockForRead(); |
468 | | rf->Seek(internalreadpos - readAdjust, SEEK_SET); |
469 | | poslock.unlock(); |
470 | | numfailures++; |
471 | | } |
472 | | else if (ret == 0) |
| 469 | for (int retries = 0; ; ++retries) |
473 | 470 | { |
474 | | LOG(VB_FILE, LOG_INFO, LOC + |
475 | | "safe_read(RemoteFile* ...): at EOF"); |
| 471 | int ret = rf->Read(data, sz); |
| 472 | if (ret > 0) |
| 473 | return ret; |
| 474 | else if (ret < 0) |
| 475 | { |
| 476 | LOG(VB_GENERAL, LOG_ERR, LOC + |
| 477 | "safe_read(RemoteFile* ...): read failed"); |
| 478 | |
| 479 | poslock.lockForRead(); |
| 480 | rf->Seek(internalreadpos - readAdjust, SEEK_SET); |
| 481 | poslock.unlock(); |
| 482 | numfailures++; |
| 483 | return ret; |
| 484 | } |
| 485 | // Retry for 300mS if liveTV for low bit rate (radio) streams |
| 486 | else if (!livetvchain || retries >= 5) |
| 487 | break; |
| 488 | |
| 489 | usleep(60000); |
476 | 490 | } |
477 | 491 | |
478 | | return ret; |
| 492 | LOG(VB_FILE, LOG_INFO, LOC + |
| 493 | "safe_read(RemoteFile* ...): at EOF"); |
| 494 | return 0; |
479 | 495 | } |
480 | 496 | |
481 | 497 | long long FileRingBuffer::GetReadPosition(void) const |
diff --git a/mythtv/libs/libmythtv/ringbuffer.cpp b/mythtv/libs/libmythtv/ringbuffer.cpp
index 1caacef..cd3b6ce 100644
a
|
b
|
void RingBuffer::UpdateRawBitrate(uint raw_bitrate) |
289 | 289 | { |
290 | 290 | LOG(VB_FILE, LOG_INFO, LOC + |
291 | 291 | QString("UpdateRawBitrate(%1Kb)").arg(raw_bitrate)); |
292 | | if (raw_bitrate < 2500) |
| 292 | // NB DVB-S radio can be 64kbps |
| 293 | if (raw_bitrate < 64) |
293 | 294 | { |
294 | 295 | LOG(VB_FILE, LOG_INFO, LOC + |
295 | 296 | QString("UpdateRawBitrate(%1Kb) - ignoring bitrate,") |
… |
… |
void RingBuffer::SetBufferSizeFactors(bool estbitrate, bool matroska) |
339 | 340 | */ |
340 | 341 | void RingBuffer::CalcReadAheadThresh(void) |
341 | 342 | { |
342 | | uint estbitrate = 0; |
343 | | |
344 | 343 | readsallowed = false; |
345 | 344 | readblocksize = max(readblocksize, CHUNK); |
346 | 345 | |
347 | | // loop without sleeping if the buffered data is less than this |
348 | | fill_threshold = 7 * bufferSize / 8; |
349 | | |
350 | | const uint KB32 = 32*1024; |
351 | | const uint KB64 = 64*1024; |
352 | | const uint KB128 = 128*1024; |
353 | | const uint KB256 = 256*1024; |
354 | | const uint KB512 = 512*1024; |
355 | | |
356 | | estbitrate = (uint) max(abs(rawbitrate * playspeed), |
| 346 | uint estbitrate = (uint) max(abs(rawbitrate * playspeed), |
357 | 347 | 0.5f * rawbitrate); |
358 | | estbitrate = min(rawbitrate * 3, estbitrate); |
359 | | int rbs = (estbitrate > 2500) ? KB64 : KB32; |
360 | | rbs = (estbitrate > 5000) ? KB128 : rbs; |
361 | | rbs = (estbitrate > 9000) ? KB256 : rbs; |
362 | | rbs = (estbitrate > 18000) ? KB512 : rbs; |
363 | | readblocksize = max(rbs,readblocksize); |
| 348 | estbitrate = min(rawbitrate * 3, estbitrate); |
| 349 | |
| 350 | int const KB1 = 1024; |
| 351 | int const rbs = estbitrate > 18000 ? 512*KB1 : |
| 352 | estbitrate > 9000 ? 256*KB1 : |
| 353 | estbitrate > 5000 ? 128*KB1 : |
| 354 | estbitrate > 2500 ? 64*KB1 : |
| 355 | estbitrate > 250 ? 32*KB1 : // 32KB~=0.25s @ 1Mbps |
| 356 | 16*KB1 ; |
| 357 | if (rbs < CHUNK) |
| 358 | readblocksize = rbs; |
| 359 | else |
| 360 | readblocksize = max(rbs,readblocksize); |
364 | 361 | |
365 | 362 | // minumum seconds of buffering before allowing read |
366 | | float secs_min = 0.25; |
| 363 | float const secs_min = 0.25f; |
367 | 364 | // set the minimum buffering before allowing ffmpeg read |
368 | | fill_min = (uint) ((estbitrate * secs_min) * 0.125f); |
369 | | // make this a multiple of ffmpeg block size.. |
370 | | fill_min = ((fill_min / KB32) + 1) * KB32; |
| 365 | fill_min = (uint) (estbitrate * ((KB1 / 8) * secs_min)); |
| 366 | if (fill_min < readblocksize) |
| 367 | fill_min = readblocksize; |
| 368 | if (fill_min > CHUNK) |
| 369 | fill_min = ((fill_min + CHUNK - 1) / CHUNK) * CHUNK; |
| 370 | if ((uint)fill_min >= bufferSize) |
| 371 | fill_min = bufferSize - 1; |
| 372 | |
| 373 | // loop without sleeping if the buffered data is less than this |
| 374 | fill_threshold = max((uint)fill_min, bufferSize / 8); |
371 | 375 | |
372 | 376 | LOG(VB_FILE, LOG_INFO, LOC + |
373 | 377 | QString("CalcReadAheadThresh(%1 Kb)\n\t\t\t -> " |
… |
… |
bool RingBuffer::IsNearEnd(double fps, uint vvf) const |
389 | 393 | // WARNING: readahead_frames can greatly overestimate or underestimate |
390 | 394 | // the number of frames available in the read ahead buffer |
391 | 395 | // when rh_frames is less than the keyframe distance. |
| 396 | if (fps == 0.) |
| 397 | return false; |
392 | 398 | double bytes_per_frame = kbits_per_sec * (1000.0/8.0) / fps; |
| 399 | if (bytes_per_frame == 0.) |
| 400 | return false; |
393 | 401 | double readahead_frames = sz / bytes_per_frame; |
394 | 402 | |
395 | 403 | bool near_end = ((vvf + readahead_frames) < 10.0) || (sz < rbs*1.5); |
… |
… |
int RingBuffer::ReadBufAvail(void) const |
428 | 436 | return ret; |
429 | 437 | } |
430 | 438 | |
| 439 | inline int RingBuffer::ReadBufUsed() const |
| 440 | { |
| 441 | return (bufferSize - 1) - ReadBufFree(); |
| 442 | } |
| 443 | |
| 444 | inline bool RingBuffer::ReadsAllowed() const |
| 445 | { |
| 446 | return ateof || setswitchtonext || commserror || |
| 447 | // Ensure some hysteresis around fill_min |
| 448 | ReadBufUsed() >= (readsallowed ? 1 : fill_min); |
| 449 | } |
| 450 | |
431 | 451 | /** \fn RingBuffer::ResetReadAhead(long long) |
432 | 452 | * \brief Restart the read-ahead thread at the 'newinternal' position. |
433 | 453 | * |
… |
… |
void RingBuffer::run(void) |
780 | 800 | readtimeavg = (readtimeavg * 9 + readinterval) / 10; |
781 | 801 | |
782 | 802 | if (readtimeavg < 150 && |
783 | | (uint)readblocksize < (BUFFER_SIZE_MINIMUM >>2)) |
| 803 | (uint)readblocksize < (BUFFER_SIZE_MINIMUM >>2) && |
| 804 | readblocksize >= CHUNK) |
784 | 805 | { |
785 | 806 | int old_block_size = readblocksize; |
786 | 807 | readblocksize = 3 * readblocksize / 2; |
… |
… |
void RingBuffer::run(void) |
869 | 890 | } |
870 | 891 | } |
871 | 892 | |
872 | | int used = bufferSize - ReadBufFree(); |
873 | | |
874 | 893 | bool reads_were_allowed = readsallowed; |
875 | 894 | |
876 | | if ((0 == read_return) || (numfailures > 5) || |
877 | | (readsallowed != (used >= fill_min || ateof || |
878 | | setswitchtonext || commserror))) |
| 895 | if ((0 == read_return) || (numfailures > 5) || ReadsAllowed() != readsallowed) |
879 | 896 | { |
880 | 897 | // If readpos changes while the lock is released |
881 | 898 | // we should not handle the 0 read_return now. |
… |
… |
void RingBuffer::run(void) |
886 | 903 | |
887 | 904 | commserror |= (numfailures > 5); |
888 | 905 | |
889 | | readsallowed = used >= fill_min || ateof || |
890 | | setswitchtonext || commserror; |
| 906 | bool bReadsAllowed = ReadsAllowed(); |
| 907 | if (readsallowed != bReadsAllowed) |
| 908 | { |
| 909 | readsallowed = bReadsAllowed; |
| 910 | LOG(VB_FILE, LOG_INFO, LOC + (bReadsAllowed ? |
| 911 | QString("Reads allowed: %1 bytes available").arg(ReadBufUsed()) : |
| 912 | QString("Rebuffering %1..%2").arg(ReadBufUsed()).arg(fill_min)) ); |
| 913 | } |
891 | 914 | |
892 | 915 | if (0 == read_return && old_readpos == readpos) |
893 | 916 | { |
… |
… |
void RingBuffer::run(void) |
910 | 933 | |
911 | 934 | rwlock.unlock(); |
912 | 935 | rwlock.lockForRead(); |
913 | | used = bufferSize - ReadBufFree(); |
914 | 936 | } |
915 | 937 | |
916 | 938 | LOG(VB_FILE, LOG_DEBUG, LOC + "@ end of read ahead loop"); |
917 | 939 | |
918 | 940 | if (!readsallowed || commserror || ateof || setswitchtonext || |
919 | | (wanttoread <= used && wanttoread > 0)) |
| 941 | (wanttoread <= ReadBufUsed() && wanttoread > 0)) |
920 | 942 | { |
921 | 943 | // To give other threads a good chance to handle these |
922 | 944 | // conditions, even if they are only requesting a read lock |
… |
… |
void RingBuffer::run(void) |
930 | 952 | { |
931 | 953 | // yield if we have nothing to do... |
932 | 954 | if (!request_pause && reads_were_allowed && |
933 | | (used >= fill_threshold || ateof || setswitchtonext)) |
| 955 | (ReadBufUsed() >= fill_threshold || ateof || setswitchtonext || ignoreliveeof)) |
934 | 956 | { |
935 | 957 | generalWait.wait(&rwlock, 50); |
936 | 958 | } |
… |
… |
void RingBuffer::run(void) |
940 | 962 | generalWait.wakeAll(); |
941 | 963 | rwlock.unlock(); |
942 | 964 | usleep(5 * 1000); |
943 | | rwlock.lockForRead(); |
| 965 | rwlock.lockForRead(); |
944 | 966 | } |
945 | 967 | } |
946 | 968 | } |
… |
… |
bool RingBuffer::WaitForReadsAllowed(void) |
996 | 1018 | while (!readsallowed && !stopreads && |
997 | 1019 | !request_pause && !commserror && readaheadrunning) |
998 | 1020 | { |
999 | | generalWait.wait(&rwlock, 1000); |
1000 | | if (!readsallowed && t.elapsed() > 1000) |
| 1021 | // The timeout should allow for congestion of internet streamed media |
| 1022 | if (t.elapsed() >= 30000) |
1001 | 1023 | { |
1002 | | LOG(VB_GENERAL, LOG_WARNING, LOC + |
1003 | | "Taking too long to be allowed to read.."); |
1004 | | |
1005 | | if (t.elapsed() > 10000) |
1006 | | { |
1007 | | LOG(VB_GENERAL, LOG_ERR, LOC + "Took more than 10 seconds to " |
1008 | | "be allowed to read, aborting."); |
1009 | | return false; |
1010 | | } |
| 1024 | LOG(VB_GENERAL, LOG_ERR, LOC + |
| 1025 | QString("Waited %1 seconds to be allowed to read, aborting.") |
| 1026 | .arg(t.elapsed()/1000) ); |
| 1027 | return false; |
1011 | 1028 | } |
| 1029 | |
| 1030 | generalWait.wait(&rwlock, 250); |
1012 | 1031 | } |
1013 | 1032 | |
1014 | | return readsallowed; |
| 1033 | if (t.elapsed() >= 500) |
| 1034 | { |
| 1035 | LOG(VB_GENERAL, LOG_WARNING, LOC + |
| 1036 | QString("Waited %1 mS to be allowed to read (avail=%2 fill_min=%3)..") |
| 1037 | .arg(t.elapsed()).arg(ReadBufAvail()).arg(fill_min) ); |
| 1038 | } |
| 1039 | return true; |
1015 | 1040 | } |
1016 | 1041 | |
1017 | 1042 | bool RingBuffer::WaitForAvail(int count) |
… |
… |
bool RingBuffer::WaitForAvail(int count) |
1035 | 1060 | generalWait.wakeAll(); |
1036 | 1061 | } |
1037 | 1062 | |
1038 | | MythTimer t; |
1039 | | t.start(); |
1040 | | while ((avail < count) && !stopreads && |
1041 | | !request_pause && !commserror && readaheadrunning) |
| 1063 | MythTimer t; t.start(); |
| 1064 | wanttoread = count; |
| 1065 | while (avail < count && !stopreads && !request_pause && |
| 1066 | !commserror && readaheadrunning) |
1042 | 1067 | { |
1043 | | wanttoread = count; |
1044 | | generalWait.wait(&rwlock, 250); |
1045 | | avail = ReadBufAvail(); |
1046 | | |
1047 | | if (ateof && avail < count) |
1048 | | count = avail; |
1049 | | |
1050 | | if (avail < count) |
| 1068 | uint elapsed = t.elapsed(); |
| 1069 | if (elapsed >= 10000) |
1051 | 1070 | { |
1052 | | int elapsed = t.elapsed(); |
1053 | | if (((elapsed > 250) && (elapsed < 500)) || |
1054 | | ((elapsed > 500) && (elapsed < 750)) || |
1055 | | ((elapsed > 1000) && (elapsed < 1250)) || |
1056 | | ((elapsed > 2000) && (elapsed < 2250)) || |
1057 | | ((elapsed > 4000) && (elapsed < 4250)) || |
1058 | | ((elapsed > 8000) && (elapsed < 8250)) || |
1059 | | ((elapsed > 9000))) |
1060 | | { |
1061 | | LOG(VB_GENERAL, LOG_INFO, LOC + "Waited " + |
1062 | | QString("%1").arg((elapsed / 250) * 0.25f, 3, 'f', 1) + |
1063 | | " seconds for data \n\t\t\tto become available..." + |
1064 | | QString(" %2 < %3") .arg(avail).arg(count)); |
1065 | | } |
1066 | | |
1067 | | if (elapsed > 16000) |
1068 | | { |
1069 | | LOG(VB_GENERAL, LOG_ERR, LOC + "Waited " + |
1070 | | QString("%1").arg(elapsed/1000) + |
1071 | | " seconds for data, aborting."); |
1072 | | return false; |
1073 | | } |
| 1071 | LOG(VB_GENERAL, LOG_ERR, LOC + |
| 1072 | QString("Timed out waiting for data available (wanted=%1, avail=%2)") |
| 1073 | .arg(count).arg(avail) ); |
| 1074 | break; |
1074 | 1075 | } |
| 1076 | else if (elapsed >= 100 && avail) |
| 1077 | { |
| 1078 | LOG(VB_GENERAL, LOG_INFO, LOC + |
| 1079 | QString("Waited %1 mS for %2 bytes (wanted %3)") |
| 1080 | .arg(elapsed).arg(avail).arg(count) ); |
| 1081 | count = avail; |
| 1082 | generalWait.wakeAll(); |
| 1083 | break; |
| 1084 | } |
| 1085 | |
| 1086 | generalWait.wait(&rwlock, 100); |
| 1087 | avail = ReadBufAvail(); |
1075 | 1088 | } |
1076 | 1089 | |
1077 | 1090 | wanttoread = 0; |
… |
… |
int RingBuffer::ReadDirect(void *buf, int count, bool peek) |
1133 | 1146 | if (new_pos != old_pos) |
1134 | 1147 | { |
1135 | 1148 | LOG(VB_GENERAL, LOG_ERR, LOC + |
1136 | | QString("Peek() Failed to return from new " |
| 1149 | QString("Seek() Failed to return from new " |
1137 | 1150 | "position %1 to old position %2, now " |
1138 | 1151 | "at position %3") |
1139 | 1152 | .arg(old_pos - ret).arg(old_pos).arg(new_pos)); |
… |
… |
int RingBuffer::ReadDirect(void *buf, int count, bool peek) |
1153 | 1166 | */ |
1154 | 1167 | int RingBuffer::ReadPriv(void *buf, int count, bool peek) |
1155 | 1168 | { |
1156 | | QString loc_desc = QString("ReadPriv(..%1, %2)") |
| 1169 | const QString loc_desc = QString("ReadPriv(..%1, %2)") |
1157 | 1170 | .arg(count).arg(peek?"peek":"normal"); |
1158 | | LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc + |
1159 | | QString(" @%1 -- begin").arg(rbrpos)); |
1160 | 1171 | |
1161 | 1172 | rwlock.lockForRead(); |
| 1173 | |
| 1174 | LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc + |
| 1175 | QString(" @%1 avail=%2 -- begin").arg(rbrpos).arg(ReadBufAvail())); |
| 1176 | |
1162 | 1177 | if (writemode) |
1163 | 1178 | { |
1164 | 1179 | LOG(VB_GENERAL, LOG_ERR, LOC + loc_desc + |
… |
… |
int RingBuffer::ReadPriv(void *buf, int count, bool peek) |
1189 | 1204 | if (request_pause || stopreads || |
1190 | 1205 | !readaheadrunning || (ignorereadpos >= 0)) |
1191 | 1206 | { |
| 1207 | LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc + " -- direct read"); |
1192 | 1208 | int ret = ReadDirect(buf, count, peek); |
1193 | 1209 | LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc + |
1194 | 1210 | QString(": ReadDirect checksum %1") |
… |
… |
int RingBuffer::ReadPriv(void *buf, int count, bool peek) |
1203 | 1219 | if (!WaitForReadsAllowed()) |
1204 | 1220 | { |
1205 | 1221 | LOG(VB_FILE, LOG_NOTICE, LOC + loc_desc + ": !WaitForReadsAllowed()"); |
1206 | | rwlock.unlock(); |
1207 | | stopreads = true; // this needs to be outside the lock |
1208 | | rwlock.lockForWrite(); |
1209 | | wanttoread = 0; |
| 1222 | // NB don't set stopreads or else the next ReadPriv will call ReadDirect |
| 1223 | // which, if there's any readahead, will cause data to be returned out |
| 1224 | // of sequence |
1210 | 1225 | rwlock.unlock(); |
1211 | 1226 | return 0; |
1212 | 1227 | } |
diff --git a/mythtv/libs/libmythtv/ringbuffer.h b/mythtv/libs/libmythtv/ringbuffer.h
index f551ecd..8dc633d 100644
a
|
b
|
class MTV_PUBLIC RingBuffer : protected MThread |
166 | 166 | |
167 | 167 | int ReadBufFree(void) const; |
168 | 168 | int ReadBufAvail(void) const; |
| 169 | int ReadBufUsed() const; |
| 170 | bool ReadsAllowed() const; |
169 | 171 | |
170 | 172 | void ResetReadAhead(long long newinternal); |
171 | 173 | void KillReadAheadThread(void); |