From 1efc39eabc3da46c43fc67050eb885baf485f23d Mon Sep 17 00:00:00 2001 From: AdamSmith Date: Wed, 25 Nov 2020 09:06:03 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E7=BA=BF=E7=A8=8B=E6=B1=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ThreadPool/.gitignore | 73 +++++++++++ ThreadPool/CustomThreadPool.pro | 32 +++++ ThreadPool/PriorityQueue.h | 99 +++++++++++++++ ThreadPool/Task.h | 36 ++++++ ThreadPool/Thread.cpp | 75 +++++++++++ ThreadPool/Thread.h | 32 +++++ ThreadPool/ThreadPool.cpp | 49 ++++++++ ThreadPool/ThreadPool.h | 35 ++++++ ThreadPool/main.cpp | 214 +++++++++++++++++++++++++++++--- 9 files changed, 629 insertions(+), 16 deletions(-) create mode 100644 ThreadPool/.gitignore create mode 100644 ThreadPool/CustomThreadPool.pro create mode 100644 ThreadPool/PriorityQueue.h create mode 100644 ThreadPool/Task.h create mode 100644 ThreadPool/Thread.cpp create mode 100644 ThreadPool/Thread.h create mode 100644 ThreadPool/ThreadPool.cpp create mode 100644 ThreadPool/ThreadPool.h diff --git a/ThreadPool/.gitignore b/ThreadPool/.gitignore new file mode 100644 index 0000000..fab7372 --- /dev/null +++ b/ThreadPool/.gitignore @@ -0,0 +1,73 @@ +# This file is used to ignore files which are generated +# ---------------------------------------------------------------------------- + +*~ +*.autosave +*.a +*.core +*.moc +*.o +*.obj +*.orig +*.rej +*.so +*.so.* +*_pch.h.cpp +*_resource.rc +*.qm +.#* +*.*# +core +!core/ +tags +.DS_Store +.directory +*.debug +Makefile* +*.prl +*.app +moc_*.cpp +ui_*.h +qrc_*.cpp +Thumbs.db +*.res +*.rc +/.qmake.cache +/.qmake.stash + +# qtcreator generated files +*.pro.user* + +# xemacs temporary files +*.flc + +# Vim temporary files +.*.swp + +# Visual Studio generated files +*.ib_pdb_index +*.idb +*.ilk +*.pdb +*.sln +*.suo +*.vcproj +*vcproj.*.*.user +*.ncb +*.sdf +*.opensdf +*.vcxproj +*vcxproj.* + +# MinGW generated files +*.Debug +*.Release + +# Python byte code +*.pyc + +# Binaries +# -------- +*.dll +*.exe + diff --git a/ThreadPool/CustomThreadPool.pro b/ThreadPool/CustomThreadPool.pro new file mode 100644 index 0000000..c6e67ae --- /dev/null +++ b/ThreadPool/CustomThreadPool.pro @@ -0,0 +1,32 @@ +QT -= gui +QT += network + +CONFIG += c++11 console +CONFIG -= app_bundle + +# The following define makes your compiler emit warnings if you use +# any Qt feature that has been marked deprecated (the exact warnings +# depend on your compiler). Please consult the documentation of the +# deprecated API in order to know how to port your code away from it. +DEFINES += QT_DEPRECATED_WARNINGS + +# You can also make your code fail to compile if it uses deprecated APIs. +# In order to do so, uncomment the following line. +# You can also select to disable deprecated APIs only up to a certain version of Qt. +#DEFINES += QT_DISABLE_DEPRECATED_BEFORE=0x060000 # disables all the APIs deprecated before Qt 6.0.0 + +SOURCES += \ + ThreadPool.cpp \ + Thread.cpp \ + main.cpp + +# Default rules for deployment. +qnx: target.path = /tmp/$${TARGET}/bin +else: unix:!android: target.path = /opt/$${TARGET}/bin +!isEmpty(target.path): INSTALLS += target + +HEADERS += \ + PriorityQueue.h \ + Task.h \ + Thread.h \ + ThreadPool.h diff --git a/ThreadPool/PriorityQueue.h b/ThreadPool/PriorityQueue.h new file mode 100644 index 0000000..92705b6 --- /dev/null +++ b/ThreadPool/PriorityQueue.h @@ -0,0 +1,99 @@ +#pragma once + +#include +#include + +template < + typename T, + typename Array = QVector, + typename compare_T=std::less +> +class PriorityQueue +{ +public: + PriorityQueue(); + bool empty(); + T front(); + void push(const T& val); + void pop(); + int size(); + +private: + void up_adjust(); // 向上调整 + void down_adjust(); // 向下调整 + + Array elements; + compare_T compare; +}; + +template +PriorityQueue::PriorityQueue() +{ + elements.clear(); +} + +template +bool PriorityQueue::empty() +{ + return elements.size() == 0; +} + +template +T PriorityQueue::front() +{ + return elements[0]; +} + +template +void PriorityQueue::push(const T &val) +{ + elements.push_back(val); + up_adjust(); +} + +template +void PriorityQueue::pop() +{ + if (empty()) return; + int count = elements.size(); + qSwap(elements[count - 1], elements[0]); + elements.pop_back(); + down_adjust(); +} + +template +int PriorityQueue::size() +{ + return elements.size(); +} + +template +void PriorityQueue::up_adjust() +{ + int ind = elements.size(); + while (ind > 1 && compare(elements[ind/2-1], elements[ind-1])) { + qSwap(elements[ind/2-1], elements[ind-1]); + ind /= 2; + } +} + +template +void PriorityQueue::down_adjust() +{ + int ind = 0; + int count = elements.size(); + while (ind * 2 + 1 < count) { + int tind = ind; + if (compare(elements[tind], elements[ind*2+1])) + tind = ind * 2 + 1; + + if (ind*2+2 + +class Task +{ +public: + Task(){}; + + template + Task(Func f, Args&& ...args) + { + func = std::bind(f, std::forward(args)...); + } + + virtual void run() + { + func(); + } + + void setPriority(int priority) + { + m_priority = priority; + } + + friend bool operator<(const Task& t1, const Task& t2); + +private: + std::function func; + int m_priority = 100; +}; + +inline bool operator<(const Task& t1, const Task& t2) +{ + return t1.m_priority < t2.m_priority; +} diff --git a/ThreadPool/Thread.cpp b/ThreadPool/Thread.cpp new file mode 100644 index 0000000..8c117f8 --- /dev/null +++ b/ThreadPool/Thread.cpp @@ -0,0 +1,75 @@ +#include "Thread.h" +#include "ThreadPool.h" +#include + +Thread::Thread(int id, ThreadPool *pool, QObject *parent) : + QObject(parent), + m_threadPool(pool), + m_id(id) +{ + + moveToThread(&m_thread); + connect(&m_thread, &QThread::started, [this](){ + emit threadStarted(); + run(); + }); + connect(&m_thread, &QThread::finished, [this](){ + emit threadStoped(); + }); +} + +Thread::~Thread() +{ + if (isStarted()) + stop(); +} + +int Thread::id() const +{ + return m_id; +} + +bool Thread::start() +{ + bool ok = false; + if (!isStarted()) { + m_thread.start(); + ok = true; + } + return ok; +} + +bool Thread::stop() +{ + bool ok = false; + if (isStarted()) { + m_thread.quit(); + m_thread.wait(); + emit threadStoped(); + ok = true; + } + return ok; +} + +bool Thread::isStarted() const +{ + return m_thread.isRunning(); +} + +void Thread::run() +{ + // 就是抢任务这一块出问题了,任务输入的太快 + while (true) { + QMutexLocker lock(&(m_threadPool->m_mutex)); +// qDebug() << QString("线程%1 进入等待!").arg(m_id); + m_threadPool->m_cond.wait(lock.mutex()); +// qDebug() << QString("线程%1 被唤醒!").arg(m_id); + if (!m_threadPool->is_running && m_threadPool->m_tasks.empty()) { + return; + } + m_task = m_threadPool->m_tasks.front(); + m_threadPool->m_tasks.pop(); + m_task.run(); + } +} + diff --git a/ThreadPool/Thread.h b/ThreadPool/Thread.h new file mode 100644 index 0000000..b352e50 --- /dev/null +++ b/ThreadPool/Thread.h @@ -0,0 +1,32 @@ +#pragma once + +#include +#include +#include +#include "Task.h" + +class ThreadPool; + +class Thread : public QObject +{ + Q_OBJECT +public: + explicit Thread(int id = 0, ThreadPool *pool = nullptr, QObject *parent= nullptr); + virtual ~Thread(); + virtual int id() const; + virtual bool start(); + virtual bool stop(); + virtual bool isStarted() const; + virtual void run(); + +signals: + void threadStarted(); + void threadStoped(); + +private: + QThread m_thread; + Task m_task; + ThreadPool *m_threadPool; + int m_id; +}; + diff --git a/ThreadPool/ThreadPool.cpp b/ThreadPool/ThreadPool.cpp new file mode 100644 index 0000000..d5d791e --- /dev/null +++ b/ThreadPool/ThreadPool.cpp @@ -0,0 +1,49 @@ +#include "ThreadPool.h" +#include "Thread.h" +#include +#include + +ThreadPool::ThreadPool(size_t threads_num, QObject *parent) + : QObject(parent) +{ + for (size_t i = 0; i < threads_num; i++) { + m_threads.push_back(QSharedPointer(new Thread(i, this))); + } +} + +ThreadPool::~ThreadPool() +{ + { + QMutexLocker lock(&m_mutex); + is_running = false; + } + m_cond.wakeAll(); + // 等待线程退出 + for (auto thread : m_threads) { + thread->stop(); + } + m_threads.clear(); // 智能指针自动释放内存 +} + +void ThreadPool::enqueue(Task &t) +{ + QMutexLocker lock(&m_mutex); + if (!is_running) { + return; + } + m_tasks.push(std::move(t)); + m_cond.wakeOne(); +} + +void ThreadPool::start() +{ + is_running = true; + for (auto thread : m_threads) { + thread->start(); + } +} + +int ThreadPool::tasksCount() +{ + return m_tasks.size(); +} diff --git a/ThreadPool/ThreadPool.h b/ThreadPool/ThreadPool.h new file mode 100644 index 0000000..494c922 --- /dev/null +++ b/ThreadPool/ThreadPool.h @@ -0,0 +1,35 @@ +#pragma once + +#include "PriorityQueue.h" +#include "Task.h" + +#include +#include +#include +#include +#include +#include +#include + +class Thread; + + +using QueueType = PriorityQueue; +class ThreadPool : public QObject +{ +public: + ThreadPool(size_t threads_num = QThread::idealThreadCount(), QObject *parent = nullptr); + ~ThreadPool(); + void enqueue(Task& t); + void start(); + int tasksCount(); + +private: + QVector> m_threads; + bool is_running = false; + QueueType m_tasks; + QMutex m_mutex; + QWaitCondition m_cond; + + friend class Thread; +}; diff --git a/ThreadPool/main.cpp b/ThreadPool/main.cpp index 54a673f..263051d 100644 --- a/ThreadPool/main.cpp +++ b/ThreadPool/main.cpp @@ -1,24 +1,206 @@ -#include "ThreadPool.hpp" -#include +#include "ThreadPool.h" -void func(int a, int b) +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +qint64 getFileSize(const QString& url); +void multiDownload(const QString &url, qint64 fileSize, const QString &filename); + +ThreadPool pool; + +QMutex mutex; +int count = 0; + +void sum(int a, int b) { - std::cout << a << "+" << b << "=" << a + b << std::endl; - return; + QMutexLocker lock(&mutex); + count++; + qDebug() << QString("%1+%2=%3").arg(a).arg(b).arg(a+b); + qDebug() << QString("第%1次计算").arg(count); } +void event() +{ + { + QMutexLocker lock(&mutex); + qDebug() << "当前线程ID为:" << QThread::currentThreadId() << ",开始执行任务"; + } + QTimer timer; + QObject::connect(&timer, &QTimer::timeout, [](){ + QMutexLocker lock(&mutex); + qDebug() << "当前线程ID为:" << QThread::currentThreadId() << ",定时器触发成功"; + }); + timer.setInterval(1000); + timer.start(); +} + +int main(int argc, char *argv[]) +{ + QCoreApplication a(argc, argv); + + // 测试计算任务 + if (false) { + ThreadPool pool; + pool.start(); + qDebug() << "线程池启动完毕,当前任务队列有任务" << pool.tasksCount(); + for(size_t i = 0; i < 100; i++) { + Task t(sum, i, i); + pool.enqueue(t); + } + qDebug() << "任务添加完毕,当前任务队列有任务" << pool.tasksCount(); + QTimer timer; + QObject::connect(&timer, &QTimer::timeout, [&pool](){ + qDebug() << "当前任务队列有任务" << pool.tasksCount(); + }); + timer.setInterval(1000); + timer.start(); + } + + // 测试定时器任务 + if (true) { + pool.start(); + qDebug() << "线程池启动完毕,当前任务队列有任务" << pool.tasksCount(); + for(size_t i = 0; i < 100; i++) { + Task t(event); + pool.enqueue(t); + } + qDebug() << "任务添加完毕,当前任务队列有任务" << pool.tasksCount(); + } + + // 测试下载任务 + if (false) { + pool.start(); + QString url = "https://sample-videos.com/video123/mp4/720/big_buck_bunny_720p_10mb.mp4"; + qint64 fileSize = getFileSize(url); + QString filename = QFileInfo(url).fileName(); + multiDownload(url, fileSize, filename); + } + return a.exec(); +} + + /** - * 这个运行还是有点问题的,会卡顿,应该是线程退出的问题 - * 不像另外一个线程池,能够很快地运行 - */ -int main(int argc, char const *argv[]) + * @brief 获取要下载的文件大小 + * @param url + */ +qint64 getFileSize(const QString& url) { - std::cout << "测试输出!" << std::endl; - ThreadPool<> pool(5); - Task t1(func, 3, 4), t2(func, 5, 6); - - pool.addOneTask(&t1); - pool.addOneTask(&t2); - return 0; + QNetworkAccessManager requestManager; + QEventLoop event; + QNetworkRequest request; + request.setUrl(QUrl(url)); + request.setAttribute(QNetworkRequest::FollowRedirectsAttribute, true); + QNetworkReply *reply = requestManager.head(request); + QObject::connect(reply, &QNetworkReply::errorOccurred, [reply](QNetworkReply::NetworkError error){ + if (error != QNetworkReply::NoError) { + qDebug() << reply->errorString(); + } + }); + QObject::connect(reply, &QNetworkReply::finished, &event, &QEventLoop::quit); + event.exec(); + qint64 fileSize = 0; + if (reply->rawHeader("Accept-Ranges") == QByteArrayLiteral("bytes") + && reply->hasRawHeader(QString("Content-Length").toLocal8Bit())) { + fileSize = reply->header(QNetworkRequest::ContentLengthHeader).toUInt(); + } + reply->deleteLater(); + return fileSize; +} + + +/** + * @brief 多线程下载 + * @param url + * @param fileSize + * @param filename + * @param threadCount + */ +void multiDownload(const QString &url, qint64 fileSize, const QString &filename) +{ + int threadCount = QThread::idealThreadCount(); + + QFile file(filename); + if (file.exists()) + file.remove(); + if (!file.open(QIODevice::WriteOnly)) { + qDebug() << file.errorString(); + return; + } + file.resize(fileSize); + + // 任务等分 + qint64 segmentSize = fileSize / threadCount; + QVector> vec(threadCount); + for (int i = 0; i < threadCount; i++) { + vec[i].first = i * segmentSize; + vec[i].second = i * segmentSize + segmentSize - 1; + } + vec[threadCount-1].second = fileSize; // 余数部分加入最后一个 + + qint64 bytesReceived = 0; // 下载接收的总字节数 + + QMutex mutex; + + auto writeFile = [&](qint64 pos, QByteArray data){ + QMutexLocker lock(&mutex); + qDebug() << QString("跳转文件位置%1,写入数据%2").arg(pos).arg(data.size()); + file.seek(pos); + file.write(data); + bytesReceived += data.size(); + if (fileSize == bytesReceived) { + file.close(); + qDebug() << "下载完毕"; + } + }; + + // 任务队列 + auto downloadFunc = [writeFile, url](const QPair& pair) { + qDebug() << QString("当前线程ID") << QThread::currentThreadId(); + QNetworkAccessManager *mgr = new QNetworkAccessManager; + QNetworkRequest request; + request.setUrl(url); + request.setAttribute(QNetworkRequest::FollowRedirectsAttribute, true); + request.setRawHeader("Range", QString("bytes=%1-%2").arg(pair.first).arg(pair.second).toLocal8Bit()); + QNetworkReply *reply = mgr->get(request); + qint64 writePos = pair.first; + qint64 currentReceived = 0; + qDebug() << "开始下载数据:" << QString(" %1~%2 ").arg(pair.first).arg(pair.second); + QObject::connect(reply, &QNetworkReply::readyRead, [reply, writePos, writeFile, ¤tReceived](){ + qDebug() << "测试,呜啦啦啦啦"; + qDebug() << "当前 currentReceived 的值为:" << currentReceived; + qDebug() << QString("没有任何响应吗? writePos = %1, currentReceived=%2").arg(writePos).arg(currentReceived); + QByteArray data = reply->readAll(); + writeFile(writePos + currentReceived, data); + currentReceived += data.size(); + }); + QObject::connect(reply, &QNetworkReply::finished, [](){ + qDebug() << "线程" << QThread::currentThreadId() << "下载完毕"; + }); + QObject::connect(reply, &QNetworkReply::errorOccurred, [reply](QNetworkReply::NetworkError error){ + qDebug() << "发生了错误,呜啦啦啦" << reply->errorString(); + }); + QObject::connect(reply, &QNetworkReply::finished, mgr, &QNetworkAccessManager::deleteLater); + // 测试事件循环 + QTimer timer; + timer.setInterval(1000); + timer.start(); + QObject::connect(&timer, &QTimer::timeout, [](){ + qDebug() << "的确有触发事件循环,可喜可贺!"; + }); + }; + qDebug() << QString("主线程ID") << QThread::currentThreadId(); + for (auto &pair : vec) { + qDebug() << "输入任务数据,耶耶耶!"; + Task t(downloadFunc, pair); + pool.enqueue(t); + } }