更新线程池

This commit is contained in:
AdamSmith 2020-11-25 09:06:03 +08:00
parent 2724d791c5
commit 1efc39eabc
9 changed files with 629 additions and 16 deletions

73
ThreadPool/.gitignore vendored Normal file
View File

@ -0,0 +1,73 @@
# This file is used to ignore files which are generated
# ----------------------------------------------------------------------------
*~
*.autosave
*.a
*.core
*.moc
*.o
*.obj
*.orig
*.rej
*.so
*.so.*
*_pch.h.cpp
*_resource.rc
*.qm
.#*
*.*#
core
!core/
tags
.DS_Store
.directory
*.debug
Makefile*
*.prl
*.app
moc_*.cpp
ui_*.h
qrc_*.cpp
Thumbs.db
*.res
*.rc
/.qmake.cache
/.qmake.stash
# qtcreator generated files
*.pro.user*
# xemacs temporary files
*.flc
# Vim temporary files
.*.swp
# Visual Studio generated files
*.ib_pdb_index
*.idb
*.ilk
*.pdb
*.sln
*.suo
*.vcproj
*vcproj.*.*.user
*.ncb
*.sdf
*.opensdf
*.vcxproj
*vcxproj.*
# MinGW generated files
*.Debug
*.Release
# Python byte code
*.pyc
# Binaries
# --------
*.dll
*.exe

View File

@ -0,0 +1,32 @@
QT -= gui
QT += network
CONFIG += c++11 console
CONFIG -= app_bundle
# The following define makes your compiler emit warnings if you use
# any Qt feature that has been marked deprecated (the exact warnings
# depend on your compiler). Please consult the documentation of the
# deprecated API in order to know how to port your code away from it.
DEFINES += QT_DEPRECATED_WARNINGS
# You can also make your code fail to compile if it uses deprecated APIs.
# In order to do so, uncomment the following line.
# You can also select to disable deprecated APIs only up to a certain version of Qt.
#DEFINES += QT_DISABLE_DEPRECATED_BEFORE=0x060000 # disables all the APIs deprecated before Qt 6.0.0
SOURCES += \
ThreadPool.cpp \
Thread.cpp \
main.cpp
# Default rules for deployment.
qnx: target.path = /tmp/$${TARGET}/bin
else: unix:!android: target.path = /opt/$${TARGET}/bin
!isEmpty(target.path): INSTALLS += target
HEADERS += \
PriorityQueue.h \
Task.h \
Thread.h \
ThreadPool.h

View File

@ -0,0 +1,99 @@
#pragma once
#include <QVector>
#include <algorithm>
template <
typename T,
typename Array = QVector<T>,
typename compare_T=std::less<T>
>
class PriorityQueue
{
public:
PriorityQueue();
bool empty();
T front();
void push(const T& val);
void pop();
int size();
private:
void up_adjust(); // 向上调整
void down_adjust(); // 向下调整
Array elements;
compare_T compare;
};
template<typename T, typename Array, typename compare_T>
PriorityQueue<T, Array, compare_T>::PriorityQueue()
{
elements.clear();
}
template<typename T, typename Array, typename compare_T>
bool PriorityQueue<T, Array, compare_T>::empty()
{
return elements.size() == 0;
}
template<typename T, typename Array, typename compare_T>
T PriorityQueue<T, Array, compare_T>::front()
{
return elements[0];
}
template<typename T, typename Array, typename compare_T>
void PriorityQueue<T, Array, compare_T>::push(const T &val)
{
elements.push_back(val);
up_adjust();
}
template<typename T, typename Array, typename compare_T>
void PriorityQueue<T, Array, compare_T>::pop()
{
if (empty()) return;
int count = elements.size();
qSwap(elements[count - 1], elements[0]);
elements.pop_back();
down_adjust();
}
template<typename T, typename Array, typename compare_T>
int PriorityQueue<T, Array, compare_T>::size()
{
return elements.size();
}
template<typename T, typename Array, typename compare_T>
void PriorityQueue<T, Array, compare_T>::up_adjust()
{
int ind = elements.size();
while (ind > 1 && compare(elements[ind/2-1], elements[ind-1])) {
qSwap(elements[ind/2-1], elements[ind-1]);
ind /= 2;
}
}
template<typename T, typename Array, typename compare_T>
void PriorityQueue<T, Array, compare_T>::down_adjust()
{
int ind = 0;
int count = elements.size();
while (ind * 2 + 1 < count) {
int tind = ind;
if (compare(elements[tind], elements[ind*2+1]))
tind = ind * 2 + 1;
if (ind*2+2<count && compare(elements[tind], elements[ind*2+1]))
tind = ind * 2 + 2;
if (ind == tind)
break;
qSwap(elements[ind], elements[tind]);
ind = tind;
}
}

36
ThreadPool/Task.h Normal file
View File

@ -0,0 +1,36 @@
#pragma once
#include <functional>
class Task
{
public:
Task(){};
template<typename Func, typename ...Args>
Task(Func f, Args&& ...args)
{
func = std::bind(f, std::forward<Args>(args)...);
}
virtual void run()
{
func();
}
void setPriority(int priority)
{
m_priority = priority;
}
friend bool operator<(const Task& t1, const Task& t2);
private:
std::function<void()> func;
int m_priority = 100;
};
inline bool operator<(const Task& t1, const Task& t2)
{
return t1.m_priority < t2.m_priority;
}

75
ThreadPool/Thread.cpp Normal file
View File

@ -0,0 +1,75 @@
#include "Thread.h"
#include "ThreadPool.h"
#include <QDebug>
Thread::Thread(int id, ThreadPool *pool, QObject *parent) :
QObject(parent),
m_threadPool(pool),
m_id(id)
{
moveToThread(&m_thread);
connect(&m_thread, &QThread::started, [this](){
emit threadStarted();
run();
});
connect(&m_thread, &QThread::finished, [this](){
emit threadStoped();
});
}
Thread::~Thread()
{
if (isStarted())
stop();
}
int Thread::id() const
{
return m_id;
}
bool Thread::start()
{
bool ok = false;
if (!isStarted()) {
m_thread.start();
ok = true;
}
return ok;
}
bool Thread::stop()
{
bool ok = false;
if (isStarted()) {
m_thread.quit();
m_thread.wait();
emit threadStoped();
ok = true;
}
return ok;
}
bool Thread::isStarted() const
{
return m_thread.isRunning();
}
void Thread::run()
{
// 就是抢任务这一块出问题了,任务输入的太快
while (true) {
QMutexLocker lock(&(m_threadPool->m_mutex));
// qDebug() << QString("线程%1 进入等待!").arg(m_id);
m_threadPool->m_cond.wait(lock.mutex());
// qDebug() << QString("线程%1 被唤醒!").arg(m_id);
if (!m_threadPool->is_running && m_threadPool->m_tasks.empty()) {
return;
}
m_task = m_threadPool->m_tasks.front();
m_threadPool->m_tasks.pop();
m_task.run();
}
}

32
ThreadPool/Thread.h Normal file
View File

@ -0,0 +1,32 @@
#pragma once
#include <QObject>
#include <QThread>
#include <thread>
#include "Task.h"
class ThreadPool;
class Thread : public QObject
{
Q_OBJECT
public:
explicit Thread(int id = 0, ThreadPool *pool = nullptr, QObject *parent= nullptr);
virtual ~Thread();
virtual int id() const;
virtual bool start();
virtual bool stop();
virtual bool isStarted() const;
virtual void run();
signals:
void threadStarted();
void threadStoped();
private:
QThread m_thread;
Task m_task;
ThreadPool *m_threadPool;
int m_id;
};

49
ThreadPool/ThreadPool.cpp Normal file
View File

@ -0,0 +1,49 @@
#include "ThreadPool.h"
#include "Thread.h"
#include <QDebug>
#include <algorithm>
ThreadPool::ThreadPool(size_t threads_num, QObject *parent)
: QObject(parent)
{
for (size_t i = 0; i < threads_num; i++) {
m_threads.push_back(QSharedPointer<Thread>(new Thread(i, this)));
}
}
ThreadPool::~ThreadPool()
{
{
QMutexLocker lock(&m_mutex);
is_running = false;
}
m_cond.wakeAll();
// 等待线程退出
for (auto thread : m_threads) {
thread->stop();
}
m_threads.clear(); // 智能指针自动释放内存
}
void ThreadPool::enqueue(Task &t)
{
QMutexLocker lock(&m_mutex);
if (!is_running) {
return;
}
m_tasks.push(std::move(t));
m_cond.wakeOne();
}
void ThreadPool::start()
{
is_running = true;
for (auto thread : m_threads) {
thread->start();
}
}
int ThreadPool::tasksCount()
{
return m_tasks.size();
}

35
ThreadPool/ThreadPool.h Normal file
View File

@ -0,0 +1,35 @@
#pragma once
#include "PriorityQueue.h"
#include "Task.h"
#include <QThread>
#include <QVector>
#include <QMutex>
#include <QMutexLocker>
#include <QWaitCondition>
#include <QSharedPointer>
#include <QTimer>
class Thread;
using QueueType = PriorityQueue<Task>;
class ThreadPool : public QObject
{
public:
ThreadPool(size_t threads_num = QThread::idealThreadCount(), QObject *parent = nullptr);
~ThreadPool();
void enqueue(Task& t);
void start();
int tasksCount();
private:
QVector<QSharedPointer<Thread>> m_threads;
bool is_running = false;
QueueType m_tasks;
QMutex m_mutex;
QWaitCondition m_cond;
friend class Thread;
};

View File

@ -1,24 +1,206 @@
#include "ThreadPool.hpp" #include "ThreadPool.h"
#include <iostream>
void func(int a, int b) #include <QCoreApplication>
#include <QMutex>
#include <QMutexLocker>
#include <QDebug>
#include <QFile>
#include <QFileInfo>
#include <QtNetwork/QNetworkRequest>
#include <QtNetwork/QNetworkReply>
#include <QtNetwork/QNetworkAccessManager>
#include <QTimer>
qint64 getFileSize(const QString& url);
void multiDownload(const QString &url, qint64 fileSize, const QString &filename);
ThreadPool pool;
QMutex mutex;
int count = 0;
void sum(int a, int b)
{ {
std::cout << a << "+" << b << "=" << a + b << std::endl; QMutexLocker lock(&mutex);
return; count++;
qDebug() << QString("%1+%2=%3").arg(a).arg(b).arg(a+b);
qDebug() << QString("第%1次计算").arg(count);
} }
void event()
{
{
QMutexLocker lock(&mutex);
qDebug() << "当前线程ID为" << QThread::currentThreadId() << ",开始执行任务";
}
QTimer timer;
QObject::connect(&timer, &QTimer::timeout, [](){
QMutexLocker lock(&mutex);
qDebug() << "当前线程ID为" << QThread::currentThreadId() << ",定时器触发成功";
});
timer.setInterval(1000);
timer.start();
}
int main(int argc, char *argv[])
{
QCoreApplication a(argc, argv);
// 测试计算任务
if (false) {
ThreadPool pool;
pool.start();
qDebug() << "线程池启动完毕,当前任务队列有任务" << pool.tasksCount();
for(size_t i = 0; i < 100; i++) {
Task t(sum, i, i);
pool.enqueue(t);
}
qDebug() << "任务添加完毕,当前任务队列有任务" << pool.tasksCount();
QTimer timer;
QObject::connect(&timer, &QTimer::timeout, [&pool](){
qDebug() << "当前任务队列有任务" << pool.tasksCount();
});
timer.setInterval(1000);
timer.start();
}
// 测试定时器任务
if (true) {
pool.start();
qDebug() << "线程池启动完毕,当前任务队列有任务" << pool.tasksCount();
for(size_t i = 0; i < 100; i++) {
Task t(event);
pool.enqueue(t);
}
qDebug() << "任务添加完毕,当前任务队列有任务" << pool.tasksCount();
}
// 测试下载任务
if (false) {
pool.start();
QString url = "https://sample-videos.com/video123/mp4/720/big_buck_bunny_720p_10mb.mp4";
qint64 fileSize = getFileSize(url);
QString filename = QFileInfo(url).fileName();
multiDownload(url, fileSize, filename);
}
return a.exec();
}
/** /**
* 线退 * @brief
* 线 * @param url
*/ */
int main(int argc, char const *argv[]) qint64 getFileSize(const QString& url)
{ {
std::cout << "测试输出!" << std::endl;
ThreadPool<> pool(5);
Task t1(func, 3, 4), t2(func, 5, 6);
pool.addOneTask(&t1);
pool.addOneTask(&t2);
return 0; QNetworkAccessManager requestManager;
QEventLoop event;
QNetworkRequest request;
request.setUrl(QUrl(url));
request.setAttribute(QNetworkRequest::FollowRedirectsAttribute, true);
QNetworkReply *reply = requestManager.head(request);
QObject::connect(reply, &QNetworkReply::errorOccurred, [reply](QNetworkReply::NetworkError error){
if (error != QNetworkReply::NoError) {
qDebug() << reply->errorString();
}
});
QObject::connect(reply, &QNetworkReply::finished, &event, &QEventLoop::quit);
event.exec();
qint64 fileSize = 0;
if (reply->rawHeader("Accept-Ranges") == QByteArrayLiteral("bytes")
&& reply->hasRawHeader(QString("Content-Length").toLocal8Bit())) {
fileSize = reply->header(QNetworkRequest::ContentLengthHeader).toUInt();
}
reply->deleteLater();
return fileSize;
}
/**
* @brief 线
* @param url
* @param fileSize
* @param filename
* @param threadCount
*/
void multiDownload(const QString &url, qint64 fileSize, const QString &filename)
{
int threadCount = QThread::idealThreadCount();
QFile file(filename);
if (file.exists())
file.remove();
if (!file.open(QIODevice::WriteOnly)) {
qDebug() << file.errorString();
return;
}
file.resize(fileSize);
// 任务等分
qint64 segmentSize = fileSize / threadCount;
QVector<QPair<qint64, qint64>> vec(threadCount);
for (int i = 0; i < threadCount; i++) {
vec[i].first = i * segmentSize;
vec[i].second = i * segmentSize + segmentSize - 1;
}
vec[threadCount-1].second = fileSize; // 余数部分加入最后一个
qint64 bytesReceived = 0; // 下载接收的总字节数
QMutex mutex;
auto writeFile = [&](qint64 pos, QByteArray data){
QMutexLocker lock(&mutex);
qDebug() << QString("跳转文件位置%1写入数据%2").arg(pos).arg(data.size());
file.seek(pos);
file.write(data);
bytesReceived += data.size();
if (fileSize == bytesReceived) {
file.close();
qDebug() << "下载完毕";
}
};
// 任务队列
auto downloadFunc = [writeFile, url](const QPair<qint64, qint64>& pair) {
qDebug() << QString("当前线程ID") << QThread::currentThreadId();
QNetworkAccessManager *mgr = new QNetworkAccessManager;
QNetworkRequest request;
request.setUrl(url);
request.setAttribute(QNetworkRequest::FollowRedirectsAttribute, true);
request.setRawHeader("Range", QString("bytes=%1-%2").arg(pair.first).arg(pair.second).toLocal8Bit());
QNetworkReply *reply = mgr->get(request);
qint64 writePos = pair.first;
qint64 currentReceived = 0;
qDebug() << "开始下载数据:" << QString(" %1~%2 ").arg(pair.first).arg(pair.second);
QObject::connect(reply, &QNetworkReply::readyRead, [reply, writePos, writeFile, &currentReceived](){
qDebug() << "测试,呜啦啦啦啦";
qDebug() << "当前 currentReceived 的值为:" << currentReceived;
qDebug() << QString("没有任何响应吗? writePos = %1, currentReceived=%2").arg(writePos).arg(currentReceived);
QByteArray data = reply->readAll();
writeFile(writePos + currentReceived, data);
currentReceived += data.size();
});
QObject::connect(reply, &QNetworkReply::finished, [](){
qDebug() << "线程" << QThread::currentThreadId() << "下载完毕";
});
QObject::connect(reply, &QNetworkReply::errorOccurred, [reply](QNetworkReply::NetworkError error){
qDebug() << "发生了错误,呜啦啦啦" << reply->errorString();
});
QObject::connect(reply, &QNetworkReply::finished, mgr, &QNetworkAccessManager::deleteLater);
// 测试事件循环
QTimer timer;
timer.setInterval(1000);
timer.start();
QObject::connect(&timer, &QTimer::timeout, [](){
qDebug() << "的确有触发事件循环,可喜可贺!";
});
};
qDebug() << QString("主线程ID") << QThread::currentThreadId();
for (auto &pair : vec) {
qDebug() << "输入任务数据,耶耶耶!";
Task t(downloadFunc, pair);
pool.enqueue(t);
}
} }