use crate::handlers::server::get_json_server_url; use crate::models::download::{ DownloadTask, DownloadTaskResponse, InstallStatus, InstallTask, ResponseStatus, }; use crate::utils::{aria2::Aria2Client, UA}; use std::collections::HashMap; use std::net::TcpListener; use std::process::Command; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::sync::Mutex; use super::format::{format_icon_url, format_size, format_speed}; pub struct DownloadManager { download_queue: Mutex>, install_queue: Mutex>, aria2_started: Arc, aria2_port: Arc>, aria2_pid: Arc>>, installing: Arc, last_get_downloads: Arc>>, } impl DownloadManager { pub fn new() -> Self { DownloadManager { download_queue: Mutex::new(HashMap::new()), install_queue: Mutex::new(HashMap::new()), aria2_started: Arc::new(AtomicBool::new(false)), aria2_port: Arc::new(Mutex::new(5144)), aria2_pid: Arc::new(Mutex::new(None)), installing: Arc::new(AtomicBool::new(false)), // 初始化为 false last_get_downloads: Arc::new(Mutex::new(None)), } } pub async fn get_downloads(&self) -> Result, String> { // 检查是否需要更新最后调用时间,并在作用域内立即释放锁 let should_skip_update = { if let Ok(last_call) = self.last_get_downloads.lock() { if let Some(last_time) = *last_call { last_time.elapsed().as_secs_f32() < 1.0 } else { false } } else { false } }; if should_skip_update { return self.get_downloads_internal().await; } // 更新最后调用时间,并在作用域内立即释放锁 { if let Ok(mut last_call) = self.last_get_downloads.lock() { *last_call = Some(std::time::Instant::now()); } } self.get_downloads_internal().await } // 获取所有下载任务 async fn get_downloads_internal(&self) -> Result, String> { // 获取队列中的任务信息并立即克隆所需数据,然后释放锁 let tasks_clone: Vec; let aria2_started; let port; { // 使用作用域限制锁的生命周期 let downloads = self.download_queue.lock().map_err(|e| e.to_string())?; tasks_clone = downloads.values().cloned().collect(); aria2_started = self.aria2_started.load(Ordering::SeqCst); } // 如果 aria2 未启动,直接返回队列中的任务 if !aria2_started { return Ok(tasks_clone .into_iter() .map(|task| DownloadTaskResponse { category: task.category, pkgname: task.pkgname, filename: task.filename, status: ResponseStatus::Error, // 如果 aria2 未启动,标记为错误状态 icon: task.icon, name: task.name, progress: 0.0, speed: None, size: None, }) .collect()); } // 获取端口(在单独的作用域中获取锁) { port = *self.aria2_port.lock().map_err(|e| e.to_string())?; } // 创建 Aria2Client 实例 let aria2_client = Aria2Client::new("127.0.0.1", port, None); // 获取所有活动中的下载任务 let active_downloads = aria2_client .tell_active() .await .map_err(|e| format!("获取活动下载任务失败: {}", e))?; // 获取所有等待中的下载任务(最多100个) let waiting_downloads = aria2_client .tell_waiting(0, 100) .await .map_err(|e| format!("获取等待中下载任务失败: {}", e))?; // 获取所有已停止的下载任务(最多100个) let stopped_downloads = aria2_client .tell_stopped(0, 100) .await .map_err(|e| format!("获取已停止下载任务失败: {}", e))?; // 创建一个映射,用于存储 GID 到 aria2 任务状态的映射 let mut aria2_tasks = HashMap::new(); // 处理活动中的下载任务 for task in active_downloads { if let Some(gid) = task["gid"].as_str() { aria2_tasks.insert(gid.to_string(), (task, ResponseStatus::Downloading)); } } // 处理等待中的下载任务 for task in waiting_downloads { if let Some(gid) = task["gid"].as_str() { let status = if task["status"].as_str() == Some("paused") { ResponseStatus::Paused } else { ResponseStatus::Queued }; aria2_tasks.insert(gid.to_string(), (task, status)); } } // 处理已停止的下载任务 for task in stopped_downloads { if let Some(gid) = task["gid"].as_str() { let status = if task["status"].as_str() == Some("complete") { // 当任务完成时,将其加入到安装队列 let task_info = { let downloads = self.download_queue.lock().map_err(|e| e.to_string())?; downloads.values().find(|t| t.gid == gid).cloned() }; if let Some(task_info) = task_info { let task_id = format!("{}/{}", task_info.category, task_info.pkgname); let should_process = { let mut install_queue = self.install_queue.lock().map_err(|e| e.to_string())?; if !install_queue.contains_key(&task_id) { let install_task = InstallTask { category: task_info.category.clone(), pkgname: task_info.pkgname.clone(), filepath: task["files"][0]["path"].to_string(), status: InstallStatus::Queued, }; install_queue.insert(task_id, install_task); true } else { false } }; // 在所有锁都释放后再处理安装队列 if should_process { let _ = self.process_install_queue().await; } } ResponseStatus::Completed } else { ResponseStatus::Error }; aria2_tasks.insert(gid.to_string(), (task, status)); } } // 将队列中的任务与 aria2 任务状态结合,生成响应 let mut result = Vec::new(); // 获取安装队列(在生成响应之前) let install_queue = self.install_queue.lock().map_err(|e| e.to_string())?; for task in tasks_clone { let task_id = format!("{}/{}", task.category, task.pkgname); let mut response = DownloadTaskResponse { category: task.category.clone(), pkgname: task.pkgname.clone(), filename: task.filename.clone(), status: ResponseStatus::Error, // 默认为错误状态 icon: task.icon.clone(), name: task.name.clone(), progress: 0.0, speed: None, size: None, }; // 首先检查是否在安装队列中 if let Some(install_task) = install_queue.get(&task_id) { response.status = match install_task.status { InstallStatus::Queued => ResponseStatus::Completed, InstallStatus::Installing => ResponseStatus::Installing, InstallStatus::Installed => ResponseStatus::Installed, InstallStatus::Error => ResponseStatus::Error, }; } else { // 如果不在安装队列中,则检查下载状态 if let Some((aria2_task, status)) = aria2_tasks.get(&task.gid) { response.status = status.clone(); // 计算进度(百分比) if let (Some(completed_length), Some(total_length)) = ( aria2_task["completedLength"] .as_str() .and_then(|s| s.parse::().ok()), aria2_task["totalLength"] .as_str() .and_then(|s| s.parse::().ok()), ) { if total_length > 0.0 { let progress = ((completed_length / total_length) * 100.0) as f32; response.progress = (progress * 100.0).round() / 100.0; } } // 获取下载速度 if let Some(download_speed) = aria2_task["downloadSpeed"].as_str() { if let Ok(speed) = download_speed.parse::() { response.speed = Some(format_speed(speed)); } } // 获取文件大小 if let Some(total_length) = aria2_task["totalLength"].as_str() { if let Ok(size) = total_length.parse::() { response.size = Some(format_size(size)); } } } } result.push(response); } Ok(result) } fn start_aria2(&self) { // 寻找可用端口 let mut port = 5144; while TcpListener::bind(format!("127.0.0.1:{}", port)).is_err() { port += 1; } if let Ok(mut port_guard) = self.aria2_port.lock() { *port_guard = port; } // 启动 aria2c let child = Command::new("aria2c") .args([ "--enable-rpc", "--rpc-listen-all=false", &format!("--rpc-listen-port={}", port), &format!("--user-agent={}", UA), "--follow-metalink=mem", "--dir=/tmp/spark-store", // 设置下载目录为 /tmp/spark-store ]) .spawn() .map_err(|e| format!("启动 aria2 失败: {}", e)) .unwrap(); // 保存进程 ID if let Ok(mut pid_guard) = self.aria2_pid.lock() { *pid_guard = Some(child.id()); } self.aria2_started.store(true, Ordering::SeqCst); } // 添加下载任务 pub async fn add_download( &self, category: String, pkgname: String, filename: String, name: String, ) -> Result<(), String> { // 检查并启动 aria2(如果还没启动) if !self.aria2_started.load(Ordering::SeqCst) { self.start_aria2(); } let task_id = format!("{}/{}", category, pkgname); // 获取metalink文件URL和内容(在获取锁之前完成) let json_server_url = get_json_server_url(); let metalink_url = format!( "{}{}/{}/{}.metalink", json_server_url, category, pkgname, filename ); // 发送请求获取metalink文件 let client = reqwest::Client::new(); let response = client .get(&metalink_url) .header("User-Agent", UA) .send() .await .map_err(|e| format!("获取metalink文件失败: {}", e))?; // 检查响应状态 if !response.status().is_success() { return Err(format!("获取metalink文件失败: HTTP {}", response.status())); } // 获取metalink文件内容 let metalink_content = response .bytes() .await .map_err(|e| format!("读取metalink内容失败: {}", e))?; // 创建Aria2Client并添加下载任务 let port = *self.aria2_port.lock().map_err(|e| e.to_string())?; let aria2_client = Aria2Client::new("127.0.0.1", port, None); // 使用Aria2Client添加metalink下载 let gids = aria2_client .add_metalink(&metalink_content, None, None) .await .map_err(|e| format!("添加下载任务失败: {}", e))?; // 获取一次锁,完成所有状态更新操作 let mut downloads = self.download_queue.lock().map_err(|e| e.to_string())?; // 检查任务是否已存在 if downloads.contains_key(&task_id) { return Ok(()); } // 创建下载任务 let task = DownloadTask { category: category.clone(), pkgname: pkgname.clone(), filename, name, icon: format_icon_url(&category, &pkgname), gid: gids[0].clone(), }; // 添加任务到队列 downloads.insert(task_id, task); Ok(()) } // 暂停下载任务 pub async fn pause_download(&self, category: String, pkgname: String) -> Result<(), String> { let task_id = format!("{}/{}", category, pkgname); // 获取任务信息,并在作用域结束时释放锁 let task_gid = { let downloads = self.download_queue.lock().map_err(|e| e.to_string())?; match downloads.get(&task_id) { Some(task) => task.gid.clone(), None => return Err(format!("找不到下载任务: {}", task_id)), } }; // 如果 aria2 未启动,返回错误 if !self.aria2_started.load(Ordering::SeqCst) { return Err("aria2 未启动".to_string()); } // 创建 Aria2Client 实例 let port = *self.aria2_port.lock().map_err(|e| e.to_string())?; let aria2_client = Aria2Client::new("127.0.0.1", port, None); // 调用 aria2 的 pause 方法 aria2_client .pause(task_gid.as_str()) .await .map_err(|e| format!("暂停下载任务失败: {}", e))?; Ok(()) } // 恢复下载任务 pub async fn resume_download(&self, category: String, pkgname: String) -> Result<(), String> { let task_id = format!("{}/{}", category, pkgname); // 获取任务信息,并在作用域结束时释放锁 let task_gid = { let downloads = self.download_queue.lock().map_err(|e| e.to_string())?; match downloads.get(&task_id) { Some(task) => task.gid.clone(), None => return Err(format!("找不到下载任务: {}", task_id)), } }; // 如果 aria2 未启动,返回错误 if !self.aria2_started.load(Ordering::SeqCst) { return Err("aria2 未启动".to_string()); } // 创建 Aria2Client 实例 let port = *self.aria2_port.lock().map_err(|e| e.to_string())?; let aria2_client = Aria2Client::new("127.0.0.1", port, None); // 调用 aria2 的 unpause 方法 aria2_client .unpause(task_gid.as_str()) .await .map_err(|e| format!("恢复下载任务失败: {}", e))?; Ok(()) } // 取消下载任务 pub async fn cancel_download(&self, category: String, pkgname: String) -> Result<(), String> { let task_id = format!("{}/{}", category, pkgname); // 获取任务信息,并在作用域结束时释放锁 let task_gid = { let mut downloads = self.download_queue.lock().map_err(|e| e.to_string())?; match downloads.get(&task_id) { Some(task) => { // 如果 aria2 未启动,只从队列中移除任务 if !self.aria2_started.load(Ordering::SeqCst) { downloads.remove(&task_id); return Ok(()); } task.gid.clone() } None => return Err(format!("找不到下载任务: {}", task_id)), } }; // 创建 Aria2Client 实例 let port = *self.aria2_port.lock().map_err(|e| e.to_string())?; let aria2_client = Aria2Client::new("127.0.0.1", port, None); // 调用 aria2 的 remove 方法 aria2_client .remove(task_gid.as_str()) .await .map_err(|e| format!("取消下载任务失败: {}", e))?; // 从队列中移除任务 let mut downloads = self.download_queue.lock().map_err(|e| e.to_string())?; downloads.remove(&task_id); Ok(()) } // 关闭 aria2 pub fn shutdown_aria2(&self) { if self.aria2_started.load(Ordering::SeqCst) { // 获取保存的 PID if let Ok(pid_guard) = self.aria2_pid.lock() { if let Some(pid) = *pid_guard { // 使用 kill 命令终止特定的进程 if let Ok(output) = Command::new("kill").arg(pid.to_string()).output() { if output.status.success() { println!("成功关闭 aria2 (PID: {})", pid); } else { eprintln!("关闭 aria2 失败 (PID: {})", pid); } } } } self.aria2_started.store(false, Ordering::SeqCst); } } // 开始安装任务 pub async fn process_install_queue(&self) -> Result<(), String> { // 如果已经有任务在安装,直接返回 if self.installing.load(Ordering::SeqCst) { return Ok(()); } // 查找第一个等待安装的任务 let (task_id, task) = { let mut install_queue = self.install_queue.lock().map_err(|e| e.to_string())?; if let Some((id, task)) = install_queue .iter_mut() .find(|(_, task)| matches!(task.status, InstallStatus::Queued)) .map(|(id, task)| (id.clone(), task.clone())) { // 更新任务状态为安装中 if let Some(task) = install_queue.get_mut(&id) { task.status = InstallStatus::Installing; } (id, task) } else { return Ok(()); } }; // 标记为正在安装 self.installing.store(true, Ordering::SeqCst); // 执行安装操作 let install_result = self.install_package(&task).await; // 更新任务状态 { let mut install_queue = self.install_queue.lock().map_err(|e| e.to_string())?; if let Some(task) = install_queue.get_mut(&task_id) { task.status = match &install_result { Ok(_) => InstallStatus::Installed, Err(_) => InstallStatus::Error, }; } } // 安装完成,重置安装状态 self.installing.store(false, Ordering::SeqCst); // 如果安装失败,返回错误 if let Err(e) = install_result { return Err(format!("安装失败: {}", e)); } Ok(()) } // 实际执行安装的方法 async fn install_package(&self, task: &InstallTask) -> Result<(), String> { println!("开始安装包: {}", task.filepath); // 移除可能存在的引号 let filepath = task.filepath.trim_matches('"'); let output = Command::new("pkexec") .arg("ssinstall") .arg(filepath) .arg("--delete-after-install") .output() .map_err(|e| { println!("安装命令执行失败: {}", e); format!("执行安装命令失败: {}", e) })?; if output.status.success() { println!("安装成功完成"); println!("命令输出: {}", String::from_utf8_lossy(&output.stdout)); Ok(()) } else { let error_msg = String::from_utf8_lossy(&output.stderr).to_string(); println!("安装失败: {}", error_msg); println!("命令输出: {}", String::from_utf8_lossy(&output.stdout)); Err(error_msg) } } // 检查是否有下载任务 pub fn has_downloads(&self) -> bool { if let Ok(downloads) = self.download_queue.lock() { !downloads.is_empty() } else { false } } }