Use 8 seconds timeout for request duration.

This commit is contained in:
John Preston 2019-12-06 10:05:38 +03:00
parent 7fb24d77c0
commit ae8fb14f9e
2 changed files with 125 additions and 48 deletions

View File

@ -27,11 +27,9 @@ constexpr auto kRetryAddSessionTimeout = 8 * crl::time(1000);
constexpr auto kRetryAddSessionSuccesses = 3;
constexpr auto kMaxTrackedSuccesses = kRetryAddSessionSuccesses
* kMaxTrackedSessionRemoves;
constexpr auto kRemoveSessionAfterTimeouts = 2;
constexpr auto kRemoveSessionAfterTimeouts = 4;
constexpr auto kResetDownloadPrioritiesTimeout = crl::time(200);
constexpr auto kGrowMaxWaitedDurationThreshold = crl::time(500);
constexpr auto kGrowSessionsDurationThreshold = crl::time(500);
constexpr auto kBadRequestDurationThreshold = crl::time(2000);
constexpr auto kBadRequestDurationThreshold = 8 * crl::time(1000);
// Each (session remove by timeouts) we wait for time:
// kRetryAddSessionTimeout * max(removesCount, kMaxTrackedSessionRemoves)
@ -80,6 +78,13 @@ auto DownloadManagerMtproto::Queue::nextTask() const -> Task* {
return (i != all.end()) ? i->get() : nullptr;
}
void DownloadManagerMtproto::Queue::removeSession(int index) {
auto &&all = ranges::view::concat(_tasks, _previousGeneration);
for (const auto task : all) {
task->removeSession(index);
}
}
DownloadManagerMtproto::DcSessionBalanceData::DcSessionBalanceData()
: maxWaitedAmount(kStartWaitedInSession) {
}
@ -166,14 +171,14 @@ bool DownloadManagerMtproto::trySendNextPart(MTP::DcId dcId, Queue &queue) {
return false;
}
void DownloadManagerMtproto::changeRequestedAmount(
int DownloadManagerMtproto::changeRequestedAmount(
MTP::DcId dcId,
int index,
int delta) {
const auto i = _balanceData.find(dcId);
Assert(i != _balanceData.end());
Assert(index < i->second.sessions.size());
i->second.sessions[index].requested += delta;
const auto result = (i->second.sessions[index].requested += delta);
const auto findNonEmptySession = [](const DcBalanceData &data) {
using namespace rpl::mappers;
return ranges::find_if(
@ -186,44 +191,53 @@ void DownloadManagerMtproto::changeRequestedAmount(
} else if (findNonEmptySession(i->second) == end(i->second.sessions)) {
killSessionsSchedule(dcId);
}
return result;
}
void DownloadManagerMtproto::requestSucceeded(
MTP::DcId dcId,
int index,
crl::time duration) {
int amountAtRequestStart,
crl::time timeAtRequestStart) {
using namespace rpl::mappers;
DEBUG_LOG(("Download (%1,%2) request done, duration %3."
).arg(dcId
).arg(index
).arg(duration));
const auto guard = gsl::finally([&] {
checkSendNext(dcId, _queues[dcId]);
});
const auto i = _balanceData.find(dcId);
Assert(i != end(_balanceData));
auto &dc = i->second;
Assert(index < dc.sessions.size());
auto &data = dc.sessions[index];
const auto guard = gsl::finally([&] {
checkSendNext(dcId, _queues[dcId]);
});
const auto parts = data.maxWaitedAmount / kDownloadPartSize;
if (duration < kGrowMaxWaitedDurationThreshold * parts) {
if (data.maxWaitedAmount < kMaxWaitedInSession) {
data.maxWaitedAmount = std::min(
data.maxWaitedAmount + kDownloadPartSize,
kMaxWaitedInSession);
DEBUG_LOG(("Download (%1,%2) increased max waited amount %3."
).arg(dcId
).arg(index
).arg(data.maxWaitedAmount));
}
const auto overloaded = (timeAtRequestStart <= dc.lastSessionRemove)
|| (amountAtRequestStart > data.maxWaitedAmount);
const auto parts = amountAtRequestStart / kDownloadPartSize;
const auto duration = (crl::now() - timeAtRequestStart);
DEBUG_LOG(("Download (%1,%2) request done, duration: %3, parts: %4%5"
).arg(dcId
).arg(index
).arg(duration
).arg(parts
).arg(overloaded ? " (overloaded)" : ""));
if (overloaded) {
return;
}
if (duration >= kBadRequestDurationThreshold * parts) {
if (duration >= kBadRequestDurationThreshold) {
DEBUG_LOG(("Duration too large, signaling time out."));
sessionTimedOut(dcId, index);
return;
} else if (duration >= kGrowSessionsDurationThreshold * parts) {
return;
}
if (amountAtRequestStart == data.maxWaitedAmount
&& data.maxWaitedAmount < kMaxWaitedInSession) {
data.maxWaitedAmount = std::min(
data.maxWaitedAmount + kDownloadPartSize,
kMaxWaitedInSession);
DEBUG_LOG(("Download (%1,%2) increased max waited amount %3."
).arg(dcId
).arg(index
).arg(data.maxWaitedAmount));
}
data.successes = std::min(data.successes + 1, kMaxTrackedSuccesses);
const auto notEnough = ranges::find_if(
@ -247,10 +261,22 @@ void DownloadManagerMtproto::requestSucceeded(
if (dc.lastSessionRemove && now < dc.lastSessionRemove + delay) {
return;
}
DEBUG_LOG(("Download (%1,%2) added session."
).arg(dcId
).arg(dc.sessions.size()));
dc.sessions.emplace_back();
DEBUG_LOG(("Download (%1,%2) adding, now sessions: %3"
).arg(dcId
).arg(dc.sessions.size() - 1
).arg(dc.sessions.size()));
}
int DownloadManagerMtproto::chooseSessionIndex(MTP::DcId dcId) const {
const auto i = _balanceData.find(dcId);
Assert(i != end(_balanceData));
const auto &sessions = i->second.sessions;
const auto j = ranges::min_element(
sessions,
ranges::less(),
&DcSessionBalanceData::requested);
return (j - begin(sessions));
}
void DownloadManagerMtproto::sessionTimedOut(MTP::DcId dcId, int index) {
@ -278,7 +304,10 @@ void DownloadManagerMtproto::removeSession(MTP::DcId dcId) {
auto &dc = _balanceData[dcId];
Assert(dc.sessions.size() > kStartSessionsCount);
const auto index = int(dc.sessions.size() - 1);
DEBUG_LOG(("Download (%1,%2) removing session.").arg(dcId).arg(index));
DEBUG_LOG(("Download (%1,%2) removing, now sessions: %3"
).arg(dcId
).arg(index
).arg(index));
auto &queue = _queues[dcId];
if (dc.sessionRemoveIndex == index) {
dc.sessionRemoveTimes = std::min(
@ -288,8 +317,17 @@ void DownloadManagerMtproto::removeSession(MTP::DcId dcId) {
dc.sessionRemoveIndex = index;
dc.sessionRemoveTimes = 1;
}
auto &session = dc.sessions.back();
// Make sure we don't send anything to that session while redirecting.
session.requested += kMaxWaitedInSession * kMaxSessionsCount;
_queues[dcId].removeSession(index);
Assert(session.requested == kMaxWaitedInSession * kMaxSessionsCount);
dc.sessions.pop_back();
MTP::killSession(MTP::downloadDcId(dcId, index));
dc.lastSessionRemove = crl::now();
// dc.sessions.pop_back();
}
void DownloadManagerMtproto::killSessionsSchedule(MTP::DcId dcId) {
@ -406,16 +444,37 @@ void DownloadMtprotoTask::refreshFileReferenceFrom(
}
}
void DownloadMtprotoTask::loadPart(int dcIndex) {
makeRequest({ takeNextRequestOffset(), dcIndex });
void DownloadMtprotoTask::loadPart(int sessionIndex) {
makeRequest({ takeNextRequestOffset(), sessionIndex });
}
mtpRequestId DownloadMtprotoTask::sendRequest(const RequestData &requestData) {
void DownloadMtprotoTask::removeSession(int sessionIndex) {
struct Redirect {
mtpRequestId requestId = 0;
int offset = 0;
};
auto redirect = std::vector<Redirect>();
for (const auto &[requestId, requestData] : _sentRequests) {
if (requestData.sessionIndex == sessionIndex) {
redirect.reserve(_sentRequests.size());
redirect.push_back({ requestId, requestData.offset });
}
}
for (const auto &[requestId, offset] : redirect) {
cancelRequest(requestId);
const auto newIndex = _owner->chooseSessionIndex(dcId());
Assert(newIndex < sessionIndex);
makeRequest({ offset, newIndex });
}
}
mtpRequestId DownloadMtprotoTask::sendRequest(
const RequestData &requestData) {
const auto offset = requestData.offset;
const auto limit = Storage::kDownloadPartSize;
const auto shiftedDcId = MTP::downloadDcId(
_cdnDcId ? _cdnDcId : dcId(),
requestData.dcIndex);
requestData.sessionIndex);
if (_cdnDcId) {
return api().request(MTPupload_GetCdnFile(
MTP_bytes(_cdnToken),
@ -488,7 +547,7 @@ void DownloadMtprotoTask::requestMoreCdnFileHashes() {
const auto requestData = _cdnUncheckedParts.cbegin()->first;
const auto shiftedDcId = MTP::downloadDcId(
dcId(),
requestData.dcIndex);
requestData.sessionIndex);
_cdnHashesRequestId = api().request(MTPupload_GetCdnFileHashes(
MTP_bytes(_cdnToken),
MTP_int(requestData.offset)
@ -533,7 +592,7 @@ void DownloadMtprotoTask::cdnPartLoaded(const MTPupload_CdnFile &result, mtpRequ
FinishRequestReason::Redirect);
const auto shiftedDcId = MTP::downloadDcId(
dcId(),
requestData.dcIndex);
requestData.sessionIndex);
const auto requestId = api().request(MTPupload_ReuploadCdnFile(
MTP_bytes(_cdnToken),
data.vrequest_token()
@ -665,15 +724,16 @@ void DownloadMtprotoTask::getCdnFileHashesDone(
void DownloadMtprotoTask::placeSentRequest(
mtpRequestId requestId,
const RequestData &requestData) {
_owner->changeRequestedAmount(
const auto amount = _owner->changeRequestedAmount(
dcId(),
requestData.dcIndex,
requestData.sessionIndex,
Storage::kDownloadPartSize);
const auto [i, ok1] = _sentRequests.emplace(requestId, requestData);
const auto [j, ok2] = _requestByOffset.emplace(
requestData.offset,
requestId);
i->second.requestedInSession = amount;
i->second.sent = crl::now();
Ensures(ok1 && ok2);
@ -692,14 +752,17 @@ auto DownloadMtprotoTask::finishSentRequest(
const auto result = it->second;
_owner->changeRequestedAmount(
dcId(),
result.dcIndex,
result.sessionIndex,
-Storage::kDownloadPartSize);
_sentRequests.erase(it);
const auto ok = _requestByOffset.remove(result.offset);
if (reason == FinishRequestReason::Success) {
const auto duration = crl::now() - result.sent;
_owner->requestSucceeded(dcId(), result.dcIndex, duration);
_owner->requestSucceeded(
dcId(),
result.sessionIndex,
result.requestedInSession,
result.sent);
}
Ensures(ok);
@ -731,10 +794,16 @@ void DownloadMtprotoTask::cancelRequestForOffset(int offset) {
}
void DownloadMtprotoTask::cancelRequest(mtpRequestId requestId) {
const auto hashes = (_cdnHashesRequestId == requestId);
api().request(requestId).cancel();
[[maybe_unused]] const auto data = finishSentRequest(
requestId,
FinishRequestReason::Cancel);
if (hashes && !_cdnUncheckedParts.empty()) {
crl::on_main(this, [=] {
requestMoreCdnFileHashes();
});
}
}
void DownloadMtprotoTask::addToQueue() {

View File

@ -42,8 +42,13 @@ public:
return _taskFinishedObservable;
}
void changeRequestedAmount(MTP::DcId dcId, int index, int delta);
void requestSucceeded(MTP::DcId dcId, int index, crl::time duration);
int changeRequestedAmount(MTP::DcId dcId, int index, int delta);
void requestSucceeded(
MTP::DcId dcId,
int index,
int amountAtRequestStart,
crl::time timeAtRequestStart);
[[nodiscard]] int chooseSessionIndex(MTP::DcId dcId) const;
private:
class Queue final {
@ -53,6 +58,7 @@ private:
void resetGeneration();
[[nodiscard]] bool empty() const;
[[nodiscard]] Task *nextTask() const;
void removeSession(int index);
private:
std::vector<not_null<Task*>> _tasks;
@ -129,7 +135,8 @@ public:
[[nodiscard]] const Location &location() const;
[[nodiscard]] virtual bool readyToRequest() const = 0;
void loadPart(int dcIndex);
void loadPart(int sessionIndex);
void removeSession(int sessionIndex);
void refreshFileReferenceFrom(
const Data::UpdatedFileReferences &updates,
@ -152,7 +159,8 @@ protected:
private:
struct RequestData {
int offset = 0;
int dcIndex = 0;
int sessionIndex = 0;
int requestedInSession = 0;
crl::time sent = 0;
inline bool operator<(const RequestData &other) const {