添加线程池

This commit is contained in:
AdamSmith 2020-11-20 10:06:01 +08:00
parent 8db95b4ea6
commit 2724d791c5
4 changed files with 224 additions and 1 deletions

View File

@ -20,3 +20,8 @@
* CoverEars(迅雷不及掩耳Qt版多线程下载器) https://github.com/xj361685640/CoverEars-Qt * CoverEars(迅雷不及掩耳Qt版多线程下载器) https://github.com/xj361685640/CoverEars-Qt
![](./MultiplethreadDownload/screenshot.png) ![](./MultiplethreadDownload/screenshot.png)
# C++ 相关DEMO
用来测试练习C++技术的。
C++实现的线程池,有些问题 [点击查看](./ThreadPool)

79
ThreadPool/HeapQueue.hpp Normal file
View File

@ -0,0 +1,79 @@
#pragma once
#include <vector>
#include <functional>
template<
typename T,
typename Array=std::vector<T>,
typename compare_T=std::less<T>
>
class HeapQueue
{
public:
HeapQueue() { elements.clear(); }
bool empty() { return elements.size() == 0; }
T front() { return elements[0]; }
/**
*
*/
void push(const T &val)
{
elements.push_back(val);
up_update();
}
/**
*
*/
void pop() {
if (empty())
return ;
int n = elements.size();
std::swap(elements[n-1], elements[0]);
elements.pop_back();
down_update();
return ;
}
private:
/**
*
*/
void up_update()
{
int ind = elements.size();
while (ind > 1 && compare(elements[ind / 2 - 1], elements[ind - 1]))
{
std::swap(elements[ind / 2 - 1], elements[ind - 1]);
ind /= 2;
}
}
/**
*
*/
void down_update()
{
int ind = 0, n = elements.size();
while (ind * 2 + 1 < n)
{
int tind = ind;
if (compare(elements[tind], elements[ind * 2 + 1]))
tind = ind * 2 + 1;
if (ind * 2 + 2 < n && compare(elements[tind], elements[ind * 2 + 2]))
tind = ind * 2 + 2;
if (ind == tind)
break;
std::swap(elements[ind], elements[tind]);
ind = tind;
}
}
Array elements;
compare_T compare;
};

115
ThreadPool/ThreadPool.hpp Normal file
View File

@ -0,0 +1,115 @@
#pragma once
#include <cstdlib>
#include <thread>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <iostream>
#include <functional>
class Task
{
public:
template<typename Func, typename ...Args>
Task(Func&& f, Args&& ...args)
{
func = std::bind(std::forward<Func>(f), std::forward<Args>(args)...);
}
void run()
{
std::cout << "开始运行任务函数!" << std::endl;
func();
return;
}
private:
std::function<void()> func;
};
/**
* 线
*/
template<typename QueueType = std::queue<Task*>>
class ThreadPool
{
public:
ThreadPool(size_t n)
{
for (int i = 0; i < n; i++) {
threads.push_back(new std::thread(
&ThreadPool::thread_worker, this
));
}
}
~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(m_mutex);
is_running = false; // 线程池即将要销毁停止工作了
}
m_cond.notify_all();
for(auto &worker : threads) {
worker->join();
delete worker;
worker = nullptr;
}
}
/**
* 线
* 线
*/
void thread_worker()
{
while (is_running) {
Task *t = getOneTask();
if (t == nullptr) {
std::cout << "获取任务失败 调用了个寂寞" << std::endl;
return;
} else {
std::cout << "拿到了任务,耶耶耶" << std::endl;
}
t->run();
}
}
Task *getOneTask()
{
{
// 进入线程临界区时加锁
std::unique_lock<std::mutex> lock(m_mutex);
// 等待任务
m_cond.wait(lock, [this](){
return is_running && tasks.empty();
});
Task *t = nullptr;
if (is_running) {
t = tasks.front();
tasks.pop();
}
return t;
}
}
void addOneTask(Task *t)
{
std::unique_lock<std::mutex> lock(m_mutex);
// 线程池停止,不允许再添加新的任务
if (!is_running) {
return;
}
tasks.push(t);
std::cout << "添加任务成功!";
m_cond.notify_one();
}
private:
std::vector<std::thread *> threads;
bool is_running = true;
QueueType tasks;
std::mutex m_mutex;
std::condition_variable m_cond;
};

24
ThreadPool/main.cpp Normal file
View File

@ -0,0 +1,24 @@
#include "ThreadPool.hpp"
#include <iostream>
void func(int a, int b)
{
std::cout << a << "+" << b << "=" << a + b << std::endl;
return;
}
/**
* 线退
* 线
*/
int main(int argc, char const *argv[])
{
std::cout << "测试输出!" << std::endl;
ThreadPool<> pool(5);
Task t1(func, 3, 4), t2(func, 5, 6);
pool.addOneTask(&t1);
pool.addOneTask(&t2);
return 0;
}