修复SpkDownloadMgr重大bug

1. 原有依靠Qt Network的连接超时不可靠,改为自己使用Watchdog值跟踪。
2. 原有代码在请求时大量未对Reply设置正确的workerId,已修正
3. 原有代码不能自动在出错worker出错但其他worker已经全部完成时进行得分配,已修正
4. 原有代码ActiveWorkerCount计算方法不准确,每次link都会增加,但是由于重试重分配都会Link而且Worker不会从
List中删除,所以已改为任务开始时一次加满
5. 原有代码忘记在WorkerFinish时检查有否未写入的cache以及未关闭下载文件
6. 原有代码未将Reply的errorOccurred信号与WorkerError槽连接,导致这个槽并没有发挥什么卵用

测试文件改为有道词典,小一点
增加了等待HEAD请求时光标变成忙的特性
This commit is contained in:
RigoLigoRLC 2022-02-19 02:38:03 +08:00
parent 03f157f620
commit 8f32141726
4 changed files with 83 additions and 5 deletions

View File

@ -184,7 +184,7 @@ namespace SpkUi
connect(mBtnDownload, &QPushButton::clicked, connect(mBtnDownload, &QPushButton::clicked,
[=](){ emit RequestDownload(mAppTitle->text(), mPkgName->text(), [=](){ emit RequestDownload(mAppTitle->text(), mPkgName->text(),
"/store/chat/icalingua/icalingua_2.4.4-Deus-non-vult_amd64.deb"); "/store/reading/youdao-dict/youdao-dict_6.0.0-0~ubuntu_amd64.deb");
}); });
} }

View File

@ -7,6 +7,9 @@
/** /**
* @note SpkDownloadMgr does NOT do download scheduling and other things; it's only a multithreaded * @note SpkDownloadMgr does NOT do download scheduling and other things; it's only a multithreaded
* downloader; it manages the threads that are downloading stuff from the Internet. * downloader; it manages the threads that are downloading stuff from the Internet.
*
* Because of this, SpkDownloadMgr does not support complex download queues, cannot handle
* pauses, and can only work on a sequential list of tasks.
*/ */
class SpkDownloadMgr : public QObject class SpkDownloadMgr : public QObject
@ -34,15 +37,22 @@ class SpkDownloadMgr : public QObject
* unnecessary race conditions and data safety problems. * unnecessary race conditions and data safety problems.
* DownloadWorker is also used in mFailureRetryQueue to indicate the blocks that needed * DownloadWorker is also used in mFailureRetryQueue to indicate the blocks that needed
* to be retried on other servers. * to be retried on other servers.
*
* Each worker has a watch dog value, incremented each time the download speed is
* updated, and zeroed each time the worker has data ready. If the value exceeds a
* preset maximum, then this worker is considered timed out and killed.
*/ */
struct DownloadWorker struct DownloadWorker
{ {
QNetworkReply *Reply; ///< Reply from the network QNetworkReply *Reply; ///< Reply from the network
int Watchdog; ///< Watch dog value watching for a timed out worker
qint64 BeginOffset; ///< Where should a worker start downloading qint64 BeginOffset; ///< Where should a worker start downloading
qint64 BytesNeeded; ///< How many bytes a worker should fetch in total qint64 BytesNeeded; ///< How many bytes a worker should fetch in total
qint64 BytesRecvd; ///< How many bytes a worker has received till now qint64 BytesRecvd; ///< How many bytes a worker has received till now
}; };
constexpr static int WatchDogMaximum = 7;
struct RemoteFileInfo struct RemoteFileInfo
{ {
qint64 Size = -1; qint64 Size = -1;

View File

@ -12,6 +12,8 @@ int main(int argc, char *argv[])
{ {
QApplication a(argc, argv); QApplication a(argc, argv);
qRegisterMetaType<SpkDownloadMgr::TaskResult>("TaskResult");
QString LogPath = ""; QString LogPath = "";
SpkStore store(false, LogPath); SpkStore store(false, LogPath);

View File

@ -5,6 +5,8 @@
#include "spkpopup.h" #include "spkpopup.h"
#include <QEventLoop> #include <QEventLoop>
#include <QDir> #include <QDir>
#include <QApplication>
#include <QDebug>
SpkDownloadMgr::SpkDownloadMgr(QObject *parent) SpkDownloadMgr::SpkDownloadMgr(QObject *parent)
{ {
@ -89,13 +91,18 @@ bool SpkDownloadMgr::StartNewDownload(QString path, int downloadId)
if(mCurrentDownloadId != -1) if(mCurrentDownloadId != -1)
return false; // Already downloading something return false; // Already downloading something
// Reserve the manager and let everyone else wait
mCurrentDownloadId = downloadId;
// Try get the file size first. If one server fails then go to next server // Try get the file size first. If one server fails then go to next server
qApp->setOverrideCursor(QCursor(Qt::WaitCursor));
RemoteFileInfo info; RemoteFileInfo info;
for(int i = 0; i < mServers.size() && info.Size == -1; i++) for(int i = 0; i < mServers.size() && info.Size == -1; i++)
{ {
info = GetRemoteFileInfo(mServers[i] + path); info = GetRemoteFileInfo(mServers[i] + path);
// TODO: Mark dead servers as unusable so they don't get scheduled first? // TODO: Mark dead servers as unusable so they don't get scheduled first?
} }
qApp->setOverrideCursor(QCursor(Qt::ArrowCursor));
if(info.Size == -1) if(info.Size == -1)
{ {
sNotify(tr("Server request failure, %1 cannot be downloaded.").arg(SpkUtils::CutFileName(path))); sNotify(tr("Server request failure, %1 cannot be downloaded.").arg(SpkUtils::CutFileName(path)));
@ -165,6 +172,7 @@ bool SpkDownloadMgr::StartNewDownload(QString path, int downloadId)
{ {
DownloadWorker worker { .BeginOffset = 0, .BytesNeeded = info.Size, .BytesRecvd = 0 }; DownloadWorker worker { .BeginOffset = 0, .BytesNeeded = info.Size, .BytesRecvd = 0 };
worker.Reply = STORE->SendDownloadRequest(mServers[0] + path); worker.Reply = STORE->SendDownloadRequest(mServers[0] + path);
worker.Reply->setProperty("workerId", mScheduledWorkers.size());
mScheduledWorkers.append(worker); mScheduledWorkers.append(worker);
} }
@ -173,6 +181,7 @@ bool SpkDownloadMgr::StartNewDownload(QString path, int downloadId)
{ {
LinkReplyWithMe(i.Reply); LinkReplyWithMe(i.Reply);
i.Reply->setProperty("failCount", 0); // Used for fail retry algorithm i.Reply->setProperty("failCount", 0); // Used for fail retry algorithm
mActiveWorkerCount++;
} }
mProgressEmitterTimer.start(); mProgressEmitterTimer.start();
@ -198,6 +207,8 @@ bool SpkDownloadMgr::CancelCurrentDownload()
for(auto &i : mScheduledWorkers) for(auto &i : mScheduledWorkers)
{ {
auto r = i.Reply; auto r = i.Reply;
if(!r) // Don't bother with finished workers
continue;
r->blockSignals(true); r->blockSignals(true);
r->abort(); r->abort();
r->deleteLater(); r->deleteLater();
@ -207,10 +218,19 @@ bool SpkDownloadMgr::CancelCurrentDownload()
mDestFile.close(); mDestFile.close();
if(!mDestFile.remove()) if(!mDestFile.remove())
{ {
sErr(tr("SpkDownloadMgr: Cannot remove destination file %1 of a cancelled task") sWarn(tr("SpkDownloadMgr: Cannot remove destination file %1 of a cancelled task")
.arg(mDestFile.fileName())); .arg(mDestFile.fileName()));
sNotify(tr("The destination file of the cancelled task can't be deleted!")); sNotify(tr("The destination file of the cancelled task can't be deleted!"));
} }
// Tell the UI to schedule next task and cleanup current status
emit DownloadStopped(FailCancel, mCurrentDownloadId);
mScheduledWorkers.clear();
mFailureRetryQueue.clear();
mCurrentDownloadId = -1;
mDownloadedBytes = 0;
mProgressEmitterTimer.stop();
return true; return true;
} }
@ -224,7 +244,15 @@ void SpkDownloadMgr::WorkerFinish()
if(reply->error() == QNetworkReply::NetworkError::NoError) if(reply->error() == QNetworkReply::NetworkError::NoError)
{ {
// Finished successfully, destroy associated stuff // Finished successfully, write any data possibly left in buffer
auto replyData = reply->readAll();
mDestFile.seek(worker.BeginOffset + worker.BytesRecvd);
mDestFile.write(replyData);
worker.BytesRecvd += replyData.size();
worker.Watchdog = 0;
mDownloadedBytes += replyData.size();
// Destroy associated stuff
reply->deleteLater(); reply->deleteLater();
worker.Reply = nullptr; worker.Reply = nullptr;
@ -239,6 +267,7 @@ void SpkDownloadMgr::WorkerFinish()
mFailureRetryQueue.clear(); mFailureRetryQueue.clear();
mCurrentDownloadId = -1; mCurrentDownloadId = -1;
mDownloadedBytes = 0; mDownloadedBytes = 0;
mDestFile.close(); // Remember to close file!!
mProgressEmitterTimer.stop(); mProgressEmitterTimer.stop();
} }
@ -258,20 +287,51 @@ void SpkDownloadMgr::WorkerDownloadProgress()
mDestFile.seek(worker.BeginOffset + worker.BytesRecvd); mDestFile.seek(worker.BeginOffset + worker.BytesRecvd);
mDestFile.write(replyData); mDestFile.write(replyData);
worker.BytesRecvd += replyData.size(); worker.BytesRecvd += replyData.size();
worker.Watchdog = 0;
mDownloadedBytes += replyData.size(); mDownloadedBytes += replyData.size();
} }
void SpkDownloadMgr::WorkerError(QNetworkReply::NetworkError) void SpkDownloadMgr::WorkerError(QNetworkReply::NetworkError err)
{ {
QNetworkReply *reply = static_cast<QNetworkReply*>(sender()); QNetworkReply *reply = static_cast<QNetworkReply*>(sender());
int id = reply->property("workerId").toInt(); int id = reply->property("workerId").toInt();
DownloadWorker &worker = mScheduledWorkers[id]; DownloadWorker &worker = mScheduledWorkers[id];
switch(err)
{
case QNetworkReply::TimeoutError:
sNotify(tr("A download has timed out, retrying..."));
sWarn(tr("SpkDownloadMgr: %1 download timed out. Retrying").arg(reply->url().toString()));
break;
default:
sNotify(tr("An error occured when downloading, retrying..."));
sWarn(tr("SpkDownloadMgr: %1 fails with error %2. Retrying")
.arg(reply->url().toString())
.arg((int)err));
break;
}
ProcessWorkerError(worker, id); ProcessWorkerError(worker, id);
} }
void SpkDownloadMgr::ProgressTimer() void SpkDownloadMgr::ProgressTimer()
{ {
// Check watchdog
for(auto &i : mScheduledWorkers)
{
if(i.Reply == nullptr) continue;
if(++i.Watchdog > WatchDogMaximum)
{
// This reply has timed out
emit i.Reply->errorOccurred(QNetworkReply::TimeoutError);
// FIXME: Likely a Qt Bug. If you add these two lines (which you should if it works!),
// And the target reply is a reply created on retry, then the signal on the last line
// is NEVER EMITTED.
// i.Reply->blockSignals(true);
// i.Reply->abort();
}
}
emit DownloadProgressed(mDownloadedBytes, mCurrentRemoteFileInfo.Size, mCurrentDownloadId); emit DownloadProgressed(mDownloadedBytes, mCurrentRemoteFileInfo.Size, mCurrentDownloadId);
} }
@ -291,6 +351,8 @@ void SpkDownloadMgr::ProcessWorkerError(DownloadWorker &worker, int id)
reply->deleteLater(); reply->deleteLater();
worker.Reply = nullptr; worker.Reply = nullptr;
mFailureRetryQueue.enqueue(worker); mFailureRetryQueue.enqueue(worker);
if(mActiveWorkerCount < mScheduledWorkers.size())
TryScheduleFailureRetries();
return; return;
} }
@ -301,14 +363,17 @@ void SpkDownloadMgr::ProcessWorkerError(DownloadWorker &worker, int id)
worker.BeginOffset + worker.BytesNeeded); worker.BeginOffset + worker.BytesNeeded);
LinkReplyWithMe(worker.Reply); LinkReplyWithMe(worker.Reply);
worker.Reply->setProperty("failCount", reply->property("failCount").toInt() + 1); worker.Reply->setProperty("failCount", reply->property("failCount").toInt() + 1);
worker.Reply->setProperty("workerId", id);
worker.Watchdog = 0;
reply->deleteLater(); reply->deleteLater();
} }
void SpkDownloadMgr::LinkReplyWithMe(QNetworkReply *reply) void SpkDownloadMgr::LinkReplyWithMe(QNetworkReply *reply)
{ {
mActiveWorkerCount++; // Each time you spin up a request you must do this so it's ok to do it here
connect(reply, &QNetworkReply::readyRead, this, &SpkDownloadMgr::WorkerDownloadProgress); connect(reply, &QNetworkReply::readyRead, this, &SpkDownloadMgr::WorkerDownloadProgress);
connect(reply, &QNetworkReply::finished, this, &SpkDownloadMgr::WorkerFinish); connect(reply, &QNetworkReply::finished, this, &SpkDownloadMgr::WorkerFinish);
connect(reply, &QNetworkReply::errorOccurred, this, &SpkDownloadMgr::WorkerError);
reply->setProperty("linked", 1);
} }
void SpkDownloadMgr::TryScheduleFailureRetries() void SpkDownloadMgr::TryScheduleFailureRetries()
@ -335,6 +400,7 @@ void SpkDownloadMgr::TryScheduleFailureRetries(int i)
STORE->SendDownloadRequest(mServers[i] + mCurrentRemotePath, STORE->SendDownloadRequest(mServers[i] + mCurrentRemotePath,
worker.BeginOffset, worker.BeginOffset,
worker.BeginOffset + worker.BytesNeeded); worker.BeginOffset + worker.BytesNeeded);
mScheduledWorkers[i].Reply->setProperty("workerId", i);
LinkReplyWithMe(mScheduledWorkers[i].Reply); LinkReplyWithMe(mScheduledWorkers[i].Reply);
} }
} }