Files
spark-store-neo/src-tauri/src/utils/download_manager.rs
2025-03-07 03:34:08 +08:00

549 lines
21 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use std::collections::HashMap;
use std::sync::Mutex;
use std::process::Command;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::net::TcpListener;
use crate::models::download::{DownloadTask, DownloadTaskResponse, InstallStatus, InstallTask, ResponseStatus};
use crate::handlers::server::get_json_server_url;
use crate::utils::{UA, aria2::Aria2Client};
use super::format::{format_size, format_speed, format_icon_url};
pub struct DownloadManager {
download_queue: Mutex<HashMap<String, DownloadTask>>,
install_queue: Mutex<HashMap<String, InstallTask>>,
aria2_started: Arc<AtomicBool>,
aria2_port: Arc<Mutex<u16>>,
aria2_pid: Arc<Mutex<Option<u32>>>,
installing: Arc<AtomicBool>,
last_get_downloads: Arc<Mutex<Option<std::time::Instant>>>,
}
impl DownloadManager {
pub fn new() -> Self {
DownloadManager {
download_queue: Mutex::new(HashMap::new()),
install_queue: Mutex::new(HashMap::new()),
aria2_started: Arc::new(AtomicBool::new(false)),
aria2_port: Arc::new(Mutex::new(5144)),
aria2_pid: Arc::new(Mutex::new(None)),
installing: Arc::new(AtomicBool::new(false)), // 初始化为 false
last_get_downloads: Arc::new(Mutex::new(None)),
}
}
pub async fn get_downloads(&self) -> Result<Vec<DownloadTaskResponse>, String> {
// 检查是否需要更新最后调用时间,并在作用域内立即释放锁
let should_skip_update = {
if let Ok(last_call) = self.last_get_downloads.lock() {
if let Some(last_time) = *last_call {
last_time.elapsed().as_secs_f32() < 1.0
} else {
false
}
} else {
false
}
};
if should_skip_update {
return self.get_downloads_internal().await;
}
// 更新最后调用时间,并在作用域内立即释放锁
{
if let Ok(mut last_call) = self.last_get_downloads.lock() {
*last_call = Some(std::time::Instant::now());
}
}
self.get_downloads_internal().await
}
// 获取所有下载任务
async fn get_downloads_internal(&self) -> Result<Vec<DownloadTaskResponse>, String> {
// 获取队列中的任务信息并立即克隆所需数据,然后释放锁
let tasks_clone: Vec<DownloadTask>;
let aria2_started;
let port;
{
// 使用作用域限制锁的生命周期
let downloads = self.download_queue.lock().map_err(|e| e.to_string())?;
tasks_clone = downloads.values().cloned().collect();
aria2_started = self.aria2_started.load(Ordering::SeqCst);
}
// 如果 aria2 未启动,直接返回队列中的任务
if !aria2_started {
return Ok(tasks_clone.into_iter().map(|task| DownloadTaskResponse {
category: task.category,
pkgname: task.pkgname,
filename: task.filename,
status: ResponseStatus::Error, // 如果 aria2 未启动,标记为错误状态
icon: task.icon,
name: task.name,
progress: 0.0,
speed: None,
size: None,
}).collect());
}
// 获取端口(在单独的作用域中获取锁)
{
port = *self.aria2_port.lock().map_err(|e| e.to_string())?;
}
// 创建 Aria2Client 实例
let aria2_client = Aria2Client::new("127.0.0.1", port, None);
// 获取所有活动中的下载任务
let active_downloads = aria2_client.tell_active().await
.map_err(|e| format!("获取活动下载任务失败: {}", e))?;
// 获取所有等待中的下载任务最多100个
let waiting_downloads = aria2_client.tell_waiting(0, 100).await
.map_err(|e| format!("获取等待中下载任务失败: {}", e))?;
// 获取所有已停止的下载任务最多100个
let stopped_downloads = aria2_client.tell_stopped(0, 100).await
.map_err(|e| format!("获取已停止下载任务失败: {}", e))?;
// 创建一个映射,用于存储 GID 到 aria2 任务状态的映射
let mut aria2_tasks = HashMap::new();
// 处理活动中的下载任务
for task in active_downloads {
if let Some(gid) = task["gid"].as_str() {
aria2_tasks.insert(gid.to_string(), (task, ResponseStatus::Downloading));
}
}
// 处理等待中的下载任务
for task in waiting_downloads {
if let Some(gid) = task["gid"].as_str() {
let status = if task["status"].as_str() == Some("paused") {
ResponseStatus::Paused
} else {
ResponseStatus::Queued
};
aria2_tasks.insert(gid.to_string(), (task, status));
}
}
// 处理已停止的下载任务
for task in stopped_downloads {
if let Some(gid) = task["gid"].as_str() {
let status = if task["status"].as_str() == Some("complete") {
// 当任务完成时,将其加入到安装队列
let task_info = {
let downloads = self.download_queue.lock().map_err(|e| e.to_string())?;
downloads.values()
.find(|t| t.gid == gid)
.cloned()
};
if let Some(task_info) = task_info {
let task_id = format!("{}/{}", task_info.category, task_info.pkgname);
let should_process = {
let mut install_queue = self.install_queue.lock().map_err(|e| e.to_string())?;
if !install_queue.contains_key(&task_id) {
let install_task = InstallTask {
category: task_info.category.clone(),
pkgname: task_info.pkgname.clone(),
filepath: task["files"][0]["path"].to_string(),
status: InstallStatus::Queued,
};
install_queue.insert(task_id, install_task);
true
} else {
false
}
};
// 在所有锁都释放后再处理安装队列
if should_process {
let _ = self.process_install_queue().await;
}
}
ResponseStatus::Completed
} else {
ResponseStatus::Error
};
aria2_tasks.insert(gid.to_string(), (task, status));
}
}
// 将队列中的任务与 aria2 任务状态结合,生成响应
let mut result = Vec::new();
// 获取安装队列(在生成响应之前)
let install_queue = self.install_queue.lock().map_err(|e| e.to_string())?;
for task in tasks_clone {
let task_id = format!("{}/{}", task.category, task.pkgname);
let mut response = DownloadTaskResponse {
category: task.category.clone(),
pkgname: task.pkgname.clone(),
filename: task.filename.clone(),
status: ResponseStatus::Error, // 默认为错误状态
icon: task.icon.clone(),
name: task.name.clone(),
progress: 0.0,
speed: None,
size: None,
};
// 首先检查是否在安装队列中
if let Some(install_task) = install_queue.get(&task_id) {
response.status = match install_task.status {
InstallStatus::Queued => ResponseStatus::Completed,
InstallStatus::Installing => ResponseStatus::Installing,
InstallStatus::Installed => ResponseStatus::Installed,
InstallStatus::Error => ResponseStatus::Error,
};
} else {
// 如果不在安装队列中,则检查下载状态
if let Some((aria2_task, status)) = aria2_tasks.get(&task.gid) {
response.status = status.clone();
// 计算进度(百分比)
if let (Some(completed_length), Some(total_length)) = (
aria2_task["completedLength"].as_str().and_then(|s| s.parse::<f64>().ok()),
aria2_task["totalLength"].as_str().and_then(|s| s.parse::<f64>().ok())
) {
if total_length > 0.0 {
let progress = ((completed_length / total_length) * 100.0) as f32;
response.progress = (progress * 100.0).round() / 100.0;
}
}
// 获取下载速度
if let Some(download_speed) = aria2_task["downloadSpeed"].as_str() {
if let Ok(speed) = download_speed.parse::<u64>() {
response.speed = Some(format_speed(speed));
}
}
// 获取文件大小
if let Some(total_length) = aria2_task["totalLength"].as_str() {
if let Ok(size) = total_length.parse::<u64>() {
response.size = Some(format_size(size));
}
}
}
}
result.push(response);
}
Ok(result)
}
fn start_aria2(&self) {
// 寻找可用端口
let mut port = 5144;
while TcpListener::bind(format!("127.0.0.1:{}", port)).is_err() {
port += 1;
}
if let Ok(mut port_guard) = self.aria2_port.lock() {
*port_guard = port;
}
// 启动 aria2c
let child = Command::new("aria2c")
.args([
"--enable-rpc",
"--rpc-listen-all=false",
&format!("--rpc-listen-port={}", port),
&format!("--user-agent={}", UA),
"--follow-metalink=mem",
"--dir=/tmp/spark-store", // 设置下载目录为 /tmp/spark-store
])
.spawn()
.map_err(|e| format!("启动 aria2 失败: {}", e)).unwrap();
// 保存进程 ID
if let Ok(mut pid_guard) = self.aria2_pid.lock() {
*pid_guard = Some(child.id());
}
self.aria2_started.store(true, Ordering::SeqCst);
}
// 添加下载任务
pub async fn add_download(&self, category: String, pkgname: String, filename: String, name: String) -> Result<(), String> {
// 检查并启动 aria2如果还没启动
if !self.aria2_started.load(Ordering::SeqCst) {
self.start_aria2();
}
let task_id = format!("{}/{}", category, pkgname);
// 获取metalink文件URL和内容在获取锁之前完成
let json_server_url = get_json_server_url();
let metalink_url = format!("{}{}/{}/{}.metalink", json_server_url, category, pkgname, filename);
// 发送请求获取metalink文件
let client = reqwest::Client::new();
let response = client
.get(&metalink_url)
.header("User-Agent", UA)
.send()
.await
.map_err(|e| format!("获取metalink文件失败: {}", e))?;
// 检查响应状态
if !response.status().is_success() {
return Err(format!("获取metalink文件失败: HTTP {}", response.status()));
}
// 获取metalink文件内容
let metalink_content = response.bytes()
.await
.map_err(|e| format!("读取metalink内容失败: {}", e))?;
// 创建Aria2Client并添加下载任务
let port = *self.aria2_port.lock().map_err(|e| e.to_string())?;
let aria2_client = Aria2Client::new("127.0.0.1", port, None);
// 使用Aria2Client添加metalink下载
let gids = aria2_client.add_metalink(
&metalink_content,
None,
None
)
.await
.map_err(|e| format!("添加下载任务失败: {}", e))?;
// 获取一次锁,完成所有状态更新操作
let mut downloads = self.download_queue.lock().map_err(|e| e.to_string())?;
// 检查任务是否已存在
if downloads.contains_key(&task_id) {
return Ok(());
}
// 创建下载任务
let task = DownloadTask {
category: category.clone(),
pkgname: pkgname.clone(),
filename,
name,
icon: format_icon_url(&category, &pkgname),
gid: gids[0].clone()
};
// 添加任务到队列
downloads.insert(task_id, task);
Ok(())
}
// 暂停下载任务
pub async fn pause_download(&self, category: String, pkgname: String) -> Result<(), String> {
let task_id = format!("{}/{}", category, pkgname);
// 获取任务信息,并在作用域结束时释放锁
let task_gid = {
let downloads = self.download_queue.lock().map_err(|e| e.to_string())?;
match downloads.get(&task_id) {
Some(task) => task.gid.clone(),
None => return Err(format!("找不到下载任务: {}", task_id)),
}
};
// 如果 aria2 未启动,返回错误
if !self.aria2_started.load(Ordering::SeqCst) {
return Err("aria2 未启动".to_string());
}
// 创建 Aria2Client 实例
let port = *self.aria2_port.lock().map_err(|e| e.to_string())?;
let aria2_client = Aria2Client::new("127.0.0.1", port, None);
// 调用 aria2 的 pause 方法
aria2_client.pause(task_gid.as_str()).await
.map_err(|e| format!("暂停下载任务失败: {}", e))?;
Ok(())
}
// 恢复下载任务
pub async fn resume_download(&self, category: String, pkgname: String) -> Result<(), String> {
let task_id = format!("{}/{}", category, pkgname);
// 获取任务信息,并在作用域结束时释放锁
let task_gid = {
let downloads = self.download_queue.lock().map_err(|e| e.to_string())?;
match downloads.get(&task_id) {
Some(task) => task.gid.clone(),
None => return Err(format!("找不到下载任务: {}", task_id)),
}
};
// 如果 aria2 未启动,返回错误
if !self.aria2_started.load(Ordering::SeqCst) {
return Err("aria2 未启动".to_string());
}
// 创建 Aria2Client 实例
let port = *self.aria2_port.lock().map_err(|e| e.to_string())?;
let aria2_client = Aria2Client::new("127.0.0.1", port, None);
// 调用 aria2 的 unpause 方法
aria2_client.unpause(task_gid.as_str()).await
.map_err(|e| format!("恢复下载任务失败: {}", e))?;
Ok(())
}
// 取消下载任务
pub async fn cancel_download(&self, category: String, pkgname: String) -> Result<(), String> {
let task_id = format!("{}/{}", category, pkgname);
// 获取任务信息,并在作用域结束时释放锁
let task_gid = {
let mut downloads = self.download_queue.lock().map_err(|e| e.to_string())?;
match downloads.get(&task_id) {
Some(task) => {
// 如果 aria2 未启动,只从队列中移除任务
if !self.aria2_started.load(Ordering::SeqCst) {
downloads.remove(&task_id);
return Ok(());
}
task.gid.clone()
},
None => return Err(format!("找不到下载任务: {}", task_id)),
}
};
// 创建 Aria2Client 实例
let port = *self.aria2_port.lock().map_err(|e| e.to_string())?;
let aria2_client = Aria2Client::new("127.0.0.1", port, None);
// 调用 aria2 的 remove 方法
aria2_client.remove(task_gid.as_str()).await
.map_err(|e| format!("取消下载任务失败: {}", e))?;
// 从队列中移除任务
let mut downloads = self.download_queue.lock().map_err(|e| e.to_string())?;
downloads.remove(&task_id);
Ok(())
}
// 关闭 aria2
pub fn shutdown_aria2(&self) {
if self.aria2_started.load(Ordering::SeqCst) {
// 获取保存的 PID
if let Ok(pid_guard) = self.aria2_pid.lock() {
if let Some(pid) = *pid_guard {
// 使用 kill 命令终止特定的进程
if let Ok(output) = Command::new("kill")
.arg(pid.to_string())
.output() {
if output.status.success() {
println!("成功关闭 aria2 (PID: {})", pid);
} else {
eprintln!("关闭 aria2 失败 (PID: {})", pid);
}
}
}
}
self.aria2_started.store(false, Ordering::SeqCst);
}
}
// 开始安装任务
pub async fn process_install_queue(&self) -> Result<(), String> {
// 如果已经有任务在安装,直接返回
if self.installing.load(Ordering::SeqCst) {
return Ok(());
}
// 查找第一个等待安装的任务
let (task_id, task) = {
let mut install_queue = self.install_queue.lock().map_err(|e| e.to_string())?;
if let Some((id, task)) = install_queue.iter_mut()
.find(|(_, task)| matches!(task.status, InstallStatus::Queued))
.map(|(id, task)| (id.clone(), task.clone()))
{
// 更新任务状态为安装中
if let Some(task) = install_queue.get_mut(&id) {
task.status = InstallStatus::Installing;
}
(id, task)
} else {
return Ok(());
}
};
// 标记为正在安装
self.installing.store(true, Ordering::SeqCst);
// 执行安装操作
let install_result = self.install_package(&task).await;
// 更新任务状态
{
let mut install_queue = self.install_queue.lock().map_err(|e| e.to_string())?;
if let Some(task) = install_queue.get_mut(&task_id) {
task.status = match &install_result {
Ok(_) => InstallStatus::Installed,
Err(_) => InstallStatus::Error,
};
}
}
// 安装完成,重置安装状态
self.installing.store(false, Ordering::SeqCst);
// 如果安装失败,返回错误
if let Err(e) = install_result {
return Err(format!("安装失败: {}", e));
}
Ok(())
}
// 实际执行安装的方法
async fn install_package(&self, task: &InstallTask) -> Result<(), String> {
println!("开始安装包: {}", task.filepath);
// 移除可能存在的引号
let filepath = task.filepath.trim_matches('"');
let output = Command::new("pkexec")
.arg("ssinstall")
.arg(filepath)
.arg("--delete-after-install")
.output()
.map_err(|e| {
println!("安装命令执行失败: {}", e);
format!("执行安装命令失败: {}", e)
})?;
if output.status.success() {
println!("安装成功完成");
println!("命令输出: {}", String::from_utf8_lossy(&output.stdout));
Ok(())
} else {
let error_msg = String::from_utf8_lossy(&output.stderr).to_string();
println!("安装失败: {}", error_msg);
println!("命令输出: {}", String::from_utf8_lossy(&output.stdout));
Err(error_msg)
}
}
// 检查是否有下载任务
pub fn has_downloads(&self) -> bool {
if let Ok(downloads) = self.download_queue.lock() {
!downloads.is_empty()
} else {
false
}
}
}