From ca919253268a1cd137fe91b9c7d3390c6b9be9bd Mon Sep 17 00:00:00 2001
From: Lawrence Rust <lvr@softsystem.co.uk>
Date: Sun, 31 Jul 2011 17:14:52 +0200
Subject: [PATCH 2/2] ringbuffer: Adapt readahead for low bit rate / high latency streams
DVB-S radio programs are often low bit rate (64Kbps..256Kbps) and
experience occasional high packet latency.
MHEG interaction streams are http sourced, have medium bit rate
(250Kbps..1000Kbps) and often experience congestion resulting in high
packet latency.
- Add hysteresis to the ringbuffer readahead. This avoids audio
underrun and rapid play/wait cycling in http video streams.
- RingBuffer::WaitForReadsAllowed incease timeout to 30 Secs to allow
for http congestion.
- RingBuffer::WaitFailAvail returns bytes available if less than that
requested for low bit rate streams (where fill_min is < 32kB).
- Fix a bug in RingBuffer::ReadPriv where if WaitForReadsAllowed times out
then the next read would cause data to be returned out of sequence
Signed-off-by: Lawrence Rust <lvr@softsystem.co.uk>
---
mythtv/libs/libmythtv/ringbuffer.cpp | 118 +++++++++++++++++++++-------------
mythtv/libs/libmythtv/ringbuffer.h | 2 +
2 files changed, 76 insertions(+), 44 deletions(-)
diff --git a/mythtv/libs/libmythtv/ringbuffer.cpp b/mythtv/libs/libmythtv/ringbuffer.cpp
index ddd018c..c21d118 100644
a
|
b
|
bool RingBuffer::IsNearEnd(double fps, uint vvf) const |
407 | 407 | // WARNING: readahead_frames can greatly overestimate or underestimate |
408 | 408 | // the number of frames available in the read ahead buffer |
409 | 409 | // when rh_frames is less than the keyframe distance. |
| 410 | if (fps == 0.) |
| 411 | return false; |
410 | 412 | double bytes_per_frame = kbits_per_sec * (1000.0/8.0) / fps; |
| 413 | if (bytes_per_frame == 0.) |
| 414 | return false; |
411 | 415 | double readahead_frames = sz / bytes_per_frame; |
412 | 416 | |
413 | 417 | bool near_end = ((vvf + readahead_frames) < 10.0) || (sz < rbs*1.5); |
… |
… |
int RingBuffer::ReadBufAvail(void) const |
446 | 450 | return ret; |
447 | 451 | } |
448 | 452 | |
| 453 | inline int RingBuffer::ReadBufUsed() const |
| 454 | { |
| 455 | return (bufferSize - 1) - ReadBufFree(); |
| 456 | } |
| 457 | |
| 458 | inline bool RingBuffer::ReadsAllowed() const |
| 459 | { |
| 460 | return ateof || setswitchtonext || commserror || |
| 461 | // Ensure some hysteresis around fill_min |
| 462 | ReadBufUsed() >= (readsallowed ? 1 : fill_min); |
| 463 | } |
| 464 | |
449 | 465 | /** \fn RingBuffer::ResetReadAhead(long long) |
450 | 466 | * \brief Restart the read-ahead thread at the 'newinternal' position. |
451 | 467 | * |
… |
… |
void RingBuffer::run(void) |
888 | 904 | } |
889 | 905 | } |
890 | 906 | |
891 | | int used = bufferSize - ReadBufFree(); |
892 | | |
893 | 907 | bool reads_were_allowed = readsallowed; |
894 | 908 | |
895 | | if ((0 == read_return) || (numfailures > 5) || |
896 | | (readsallowed != (used >= fill_min || ateof || |
897 | | setswitchtonext || commserror))) |
| 909 | if (0 == read_return || numfailures > 5 || ReadsAllowed() != readsallowed) |
898 | 910 | { |
899 | 911 | // If readpos changes while the lock is released |
900 | 912 | // we should not handle the 0 read_return now. |
… |
… |
void RingBuffer::run(void) |
905 | 917 | |
906 | 918 | commserror |= (numfailures > 5); |
907 | 919 | |
908 | | readsallowed = used >= fill_min || ateof || |
909 | | setswitchtonext || commserror; |
| 920 | bool bReadsAllowed = ReadsAllowed(); |
| 921 | if (readsallowed != bReadsAllowed) |
| 922 | { |
| 923 | readsallowed = bReadsAllowed; |
| 924 | LOG(VB_FILE, LOG_INFO, LOC + (bReadsAllowed ? |
| 925 | QString("Reads allowed: %1 bytes available").arg(ReadBufUsed()) : |
| 926 | QString("Rebuffering %1..%2").arg(ReadBufUsed()).arg(fill_min)) ); |
| 927 | } |
910 | 928 | |
911 | 929 | if (0 == read_return && old_readpos == readpos) |
912 | 930 | { |
… |
… |
void RingBuffer::run(void) |
929 | 947 | |
930 | 948 | rwlock.unlock(); |
931 | 949 | rwlock.lockForRead(); |
932 | | used = bufferSize - ReadBufFree(); |
933 | 950 | } |
934 | 951 | |
935 | 952 | LOG(VB_FILE, LOG_DEBUG, LOC + "@ end of read ahead loop"); |
936 | 953 | |
937 | 954 | if (!readsallowed || commserror || ateof || setswitchtonext || |
938 | | (wanttoread <= used && wanttoread > 0)) |
| 955 | (wanttoread <= ReadBufUsed() && wanttoread > 0)) |
939 | 956 | { |
940 | 957 | // To give other threads a good chance to handle these |
941 | 958 | // conditions, even if they are only requesting a read lock |
… |
… |
void RingBuffer::run(void) |
943 | 960 | generalWait.wakeAll(); |
944 | 961 | rwlock.unlock(); |
945 | 962 | usleep(5 * 1000); |
946 | | rwlock.lockForRead(); |
| 963 | rwlock.lockForRead(); |
947 | 964 | } |
948 | 965 | else |
949 | 966 | { |
950 | 967 | // yield if we have nothing to do... |
951 | 968 | if (!request_pause && reads_were_allowed && |
952 | | (used >= fill_threshold || ateof || setswitchtonext)) |
| 969 | (ReadBufUsed() >= fill_threshold || ateof || setswitchtonext || ignoreliveeof)) |
953 | 970 | { |
954 | 971 | generalWait.wait(&rwlock, 50); |
955 | 972 | } |
… |
… |
void RingBuffer::run(void) |
959 | 976 | generalWait.wakeAll(); |
960 | 977 | rwlock.unlock(); |
961 | 978 | usleep(5 * 1000); |
962 | | rwlock.lockForRead(); |
| 979 | rwlock.lockForRead(); |
963 | 980 | } |
964 | 981 | } |
965 | 982 | } |
… |
… |
bool RingBuffer::WaitForReadsAllowed(void) |
1015 | 1032 | while (!readsallowed && !stopreads && |
1016 | 1033 | !request_pause && !commserror && readaheadrunning) |
1017 | 1034 | { |
1018 | | generalWait.wait(&rwlock, 1000); |
1019 | | if (!readsallowed && t.elapsed() > 1000) |
| 1035 | // The timeout should allow for congestion of internet streamed media |
| 1036 | if (t.elapsed() >= 30000) |
1020 | 1037 | { |
1021 | | LOG(VB_GENERAL, LOG_WARNING, LOC + |
1022 | | "Taking too long to be allowed to read.."); |
1023 | | |
1024 | | if (t.elapsed() > 10000) |
1025 | | { |
1026 | | LOG(VB_GENERAL, LOG_ERR, LOC + "Took more than 10 seconds to " |
1027 | | "be allowed to read, aborting."); |
1028 | | return false; |
1029 | | } |
| 1038 | LOG(VB_GENERAL, LOG_ERR, LOC + |
| 1039 | QString("Waited %1 seconds to be allowed to read, aborting.") |
| 1040 | .arg(t.elapsed()/1000) ); |
| 1041 | return false; |
1030 | 1042 | } |
| 1043 | |
| 1044 | generalWait.wait(&rwlock, 250); |
1031 | 1045 | } |
1032 | 1046 | |
1033 | | return readsallowed; |
| 1047 | if (t.elapsed() >= 500) |
| 1048 | { |
| 1049 | LOG(VB_GENERAL, LOG_WARNING, LOC + |
| 1050 | QString("Waited %1 mS to be allowed to read (avail=%2 fill_min=%3)..") |
| 1051 | .arg(t.elapsed()).arg(ReadBufAvail()).arg(fill_min) ); |
| 1052 | } |
| 1053 | return true; |
1034 | 1054 | } |
1035 | 1055 | |
1036 | 1056 | bool RingBuffer::WaitForAvail(int count) |
… |
… |
bool RingBuffer::WaitForAvail(int count) |
1054 | 1074 | generalWait.wakeAll(); |
1055 | 1075 | } |
1056 | 1076 | |
1057 | | MythTimer t; |
1058 | | t.start(); |
1059 | | while ((avail < count) && !stopreads && |
1060 | | !request_pause && !commserror && readaheadrunning) |
| 1077 | MythTimer t; t.start(); |
| 1078 | wanttoread = count; |
| 1079 | while (avail < count && !stopreads && !request_pause && |
| 1080 | !commserror && readaheadrunning) |
1061 | 1081 | { |
1062 | | wanttoread = count; |
1063 | | generalWait.wait(&rwlock, 250); |
1064 | | avail = ReadBufAvail(); |
1065 | | |
1066 | | if (ateof && avail < count) |
1067 | | count = avail; |
1068 | | |
1069 | | if (avail < count) |
| 1082 | uint elapsed = t.elapsed(); |
| 1083 | if (elapsed >= 10000) |
1070 | 1084 | { |
1071 | 1085 | int elapsed = t.elapsed(); |
1072 | 1086 | if (elapsed > 500 && low_buffers && avail >= fill_min) |
… |
… |
bool RingBuffer::WaitForAvail(int count) |
1093 | 1107 | return false; |
1094 | 1108 | } |
1095 | 1109 | } |
| 1110 | else if (elapsed >= 100 && avail) |
| 1111 | { |
| 1112 | LOG(VB_GENERAL, LOG_INFO, LOC + |
| 1113 | QString("Waited %1 mS for %2 bytes (wanted %3)") |
| 1114 | .arg(elapsed).arg(avail).arg(count) ); |
| 1115 | count = avail; |
| 1116 | generalWait.wakeAll(); |
| 1117 | break; |
| 1118 | } |
| 1119 | |
| 1120 | generalWait.wait(&rwlock, 100); |
| 1121 | avail = ReadBufAvail(); |
| 1122 | if (ateof && avail < count) |
| 1123 | count = avail; |
1096 | 1124 | } |
1097 | 1125 | |
1098 | 1126 | wanttoread = 0; |
… |
… |
int RingBuffer::ReadDirect(void *buf, int count, bool peek) |
1154 | 1182 | if (new_pos != old_pos) |
1155 | 1183 | { |
1156 | 1184 | LOG(VB_GENERAL, LOG_ERR, LOC + |
1157 | | QString("Peek() Failed to return from new " |
| 1185 | QString("Seek() Failed to return from new " |
1158 | 1186 | "position %1 to old position %2, now " |
1159 | 1187 | "at position %3") |
1160 | 1188 | .arg(old_pos - ret).arg(old_pos).arg(new_pos)); |
… |
… |
int RingBuffer::ReadDirect(void *buf, int count, bool peek) |
1174 | 1202 | */ |
1175 | 1203 | int RingBuffer::ReadPriv(void *buf, int count, bool peek) |
1176 | 1204 | { |
1177 | | QString loc_desc = QString("ReadPriv(..%1, %2)") |
| 1205 | const QString loc_desc = QString("ReadPriv(..%1, %2)") |
1178 | 1206 | .arg(count).arg(peek?"peek":"normal"); |
1179 | | LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc + |
1180 | | QString(" @%1 -- begin").arg(rbrpos)); |
1181 | 1207 | |
1182 | 1208 | rwlock.lockForRead(); |
| 1209 | |
| 1210 | LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc + |
| 1211 | QString(" @%1 avail=%2 -- begin").arg(rbrpos).arg(ReadBufAvail())); |
| 1212 | |
1183 | 1213 | if (writemode) |
1184 | 1214 | { |
1185 | 1215 | LOG(VB_GENERAL, LOG_ERR, LOC + loc_desc + |
… |
… |
int RingBuffer::ReadPriv(void *buf, int count, bool peek) |
1210 | 1240 | if (request_pause || stopreads || |
1211 | 1241 | !readaheadrunning || (ignorereadpos >= 0)) |
1212 | 1242 | { |
| 1243 | LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc + " -- direct read"); |
1213 | 1244 | int ret = ReadDirect(buf, count, peek); |
1214 | 1245 | LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc + |
1215 | 1246 | QString(": ReadDirect checksum %1") |
… |
… |
int RingBuffer::ReadPriv(void *buf, int count, bool peek) |
1224 | 1255 | if (!WaitForReadsAllowed()) |
1225 | 1256 | { |
1226 | 1257 | LOG(VB_FILE, LOG_NOTICE, LOC + loc_desc + ": !WaitForReadsAllowed()"); |
1227 | | rwlock.unlock(); |
1228 | | stopreads = true; // this needs to be outside the lock |
1229 | | rwlock.lockForWrite(); |
1230 | | wanttoread = 0; |
| 1258 | // NB don't set stopreads or else the next ReadPriv will call ReadDirect |
| 1259 | // which, if there's any readahead, will cause data to be returned out |
| 1260 | // of sequence |
1231 | 1261 | rwlock.unlock(); |
1232 | 1262 | return 0; |
1233 | 1263 | } |
diff --git a/mythtv/libs/libmythtv/ringbuffer.h b/mythtv/libs/libmythtv/ringbuffer.h
index 45bd956..d7632ff 100644
a
|
b
|
class MTV_PUBLIC RingBuffer : protected MThread |
167 | 167 | |
168 | 168 | int ReadBufFree(void) const; |
169 | 169 | int ReadBufAvail(void) const; |
| 170 | int ReadBufUsed() const; |
| 171 | bool ReadsAllowed() const; |
170 | 172 | |
171 | 173 | void ResetReadAhead(long long newinternal); |
172 | 174 | void KillReadAheadThread(void); |