# operation_history.py """操作历史与撤销管理模块""" import json import os import time import logging import subprocess from datetime import datetime from typing import List, Dict, Optional, Callable from dataclasses import dataclass, asdict from enum import Enum logger = logging.getLogger(__name__) class OperationStatus(Enum): """操作状态""" PENDING = "待执行" RUNNING = "执行中" SUCCESS = "成功" FAILED = "失败" CANCELLED = "已取消" ROLLED_BACK = "已撤销" class OperationType(Enum): """操作类型""" CREATE_PARTITION = "创建分区" DELETE_PARTITION = "删除分区" FORMAT_PARTITION = "格式化分区" MOUNT_PARTITION = "挂载分区" UNMOUNT_PARTITION = "卸载分区" CREATE_RAID = "创建 RAID" DELETE_RAID = "删除 RAID" STOP_RAID = "停止 RAID" CREATE_PV = "创建物理卷" DELETE_PV = "删除物理卷" CREATE_VG = "创建卷组" DELETE_VG = "删除卷组" CREATE_LV = "创建逻辑卷" DELETE_LV = "删除逻辑卷" ACTIVATE_LV = "激活逻辑卷" DEACTIVATE_LV = "停用逻辑卷" WIPE_PARTITION_TABLE = "擦除分区表" @dataclass class OperationRecord: """操作记录""" id: str timestamp: str operation_type: str device_path: str status: str details: Dict rollback_data: Optional[Dict] = None error_message: Optional[str] = None execution_time: Optional[float] = None class OperationHistory: """操作历史管理器""" def __init__(self, history_file: str = None): if history_file is None: # 默认存储在用户目录 home_dir = os.path.expanduser("~") self.history_dir = os.path.join(home_dir, ".linux_storage_manager") os.makedirs(self.history_dir, exist_ok=True) self.history_file = os.path.join(self.history_dir, "operation_history.json") self.backup_dir = os.path.join(self.history_dir, "backups") os.makedirs(self.backup_dir, exist_ok=True) else: self.history_file = history_file self.backup_dir = os.path.join(os.path.dirname(history_file), "backups") os.makedirs(self.backup_dir, exist_ok=True) self.records: List[OperationRecord] = [] self.pending_operations: List[OperationRecord] = [] self._load_history() def _load_history(self): """加载历史记录""" if os.path.exists(self.history_file): try: with open(self.history_file, 'r', encoding='utf-8') as f: data = json.load(f) self.records = [OperationRecord(**record) for record in data] logger.info(f"已加载 {len(self.records)} 条历史记录") except Exception as e: logger.error(f"加载历史记录失败: {e}") self.records = [] else: self.records = [] def _save_history(self): """保存历史记录""" try: with open(self.history_file, 'w', encoding='utf-8') as f: json.dump([asdict(r) for r in self.records], f, ensure_ascii=False, indent=2) except Exception as e: logger.error(f"保存历史记录失败: {e}") def add_record(self, operation_type: OperationType, device_path: str, details: Dict, rollback_data: Optional[Dict] = None) -> str: """添加操作记录""" record_id = f"{int(time.time() * 1000)}_{operation_type.name}" record = OperationRecord( id=record_id, timestamp=datetime.now().isoformat(), operation_type=operation_type.value, device_path=device_path, status=OperationStatus.SUCCESS.value, details=details, rollback_data=rollback_data ) self.records.append(record) self._save_history() logger.info(f"添加操作记录: {record_id} - {operation_type.value}") return record_id def update_status(self, record_id: str, status: OperationStatus, error_message: str = None, execution_time: float = None): """更新操作状态""" for record in self.records: if record.id == record_id: record.status = status.value if error_message: record.error_message = error_message if execution_time: record.execution_time = execution_time # 从待执行列表中移除 if status in [OperationStatus.SUCCESS, OperationStatus.FAILED, OperationStatus.CANCELLED]: self.pending_operations = [ r for r in self.pending_operations if r.id != record_id ] self._save_history() logger.info(f"更新操作状态: {record_id} -> {status.value}") break def get_recent_records(self, limit: int = 50) -> List[OperationRecord]: """获取最近的操作记录""" return sorted(self.records, key=lambda r: r.timestamp, reverse=True)[:limit] def get_pending_operations(self) -> List[OperationRecord]: """获取待执行的操作""" return self.pending_operations def export_history(self, file_path: str): """导出历史记录""" try: with open(file_path, 'w', encoding='utf-8') as f: json.dump([asdict(r) for r in self.records], f, ensure_ascii=False, indent=2) return True except Exception as e: logger.error(f"导出历史记录失败: {e}") return False def clear_history(self, days: int = None): """清理历史记录""" if days is None: # 清空所有记录 self.records = [] else: # 清理指定天数之前的记录 cutoff = time.time() - (days * 24 * 60 * 60) self.records = [ r for r in self.records if datetime.fromisoformat(r.timestamp).timestamp() > cutoff ] self._save_history() class PartitionTableBackup: """分区表备份管理器""" def __init__(self, backup_dir: str = None): if backup_dir is None: home_dir = os.path.expanduser("~") self.backup_dir = os.path.join( home_dir, ".linux_storage_manager", "partition_backups" ) else: self.backup_dir = backup_dir os.makedirs(self.backup_dir, exist_ok=True) def backup_partition_table(self, device_path: str) -> Optional[str]: """备份分区表""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") device_name = os.path.basename(device_path) backup_file = os.path.join( self.backup_dir, f"{device_name}_pt_{timestamp}.bin" ) try: # 使用 sfdisk 备份分区表 result = subprocess.run( ["sudo", "sfdisk", "-d", device_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8', check=True, timeout=30 ) with open(backup_file, 'w') as f: f.write(result.stdout) logger.info(f"分区表已备份: {backup_file}") return backup_file except subprocess.CalledProcessError as e: logger.error(f"备份分区表失败: {e}") return None except Exception as e: logger.error(f"备份分区表出错: {e}") return None def restore_partition_table(self, device_path: str, backup_file: str) -> bool: """恢复分区表""" try: if not os.path.exists(backup_file): logger.error(f"备份文件不存在: {backup_file}") return False with open(backup_file, 'r') as f: partition_data = f.read() # 使用 sfdisk 恢复分区表 result = subprocess.run( ["sudo", "sfdisk", device_path], input=partition_data, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE, encoding='utf-8', check=True, timeout=30 ) logger.info(f"分区表已从 {backup_file} 恢复到 {device_path}") return True except subprocess.CalledProcessError as e: logger.error(f"恢复分区表失败: {e.stderr}") return False except Exception as e: logger.error(f"恢复分区表出错: {e}") return False def list_backups(self, device_path: str = None) -> List[Dict]: """列出可用的备份""" backups = [] try: for filename in os.listdir(self.backup_dir): if filename.endswith('.bin'): filepath = os.path.join(self.backup_dir, filename) stat = os.stat(filepath) # 解析文件名 parts = filename.rsplit('_pt_', 1) if len(parts) == 2: device_name = parts[0] timestamp = parts[1].replace('.bin', '') if device_path is None or device_name == os.path.basename(device_path): backups.append({ 'file': filepath, 'device': device_name, 'timestamp': timestamp, 'size': stat.st_size, 'created': datetime.fromtimestamp(stat.st_mtime).isoformat() }) except Exception as e: logger.error(f"列出备份失败: {e}") return sorted(backups, key=lambda x: x['created'], reverse=True) def delete_backup(self, backup_file: str) -> bool: """删除备份""" try: if os.path.exists(backup_file): os.remove(backup_file) logger.info(f"备份已删除: {backup_file}") return True return False except Exception as e: logger.error(f"删除备份失败: {e}") return False class OperationQueue: """操作队列管理器""" def __init__(self, history: OperationHistory): self.history = history self.queue: List[Dict] = [] self.running = False self.current_operation = None self.callbacks: Dict[str, List[Callable]] = { 'on_start': [], 'on_complete': [], 'on_error': [], 'on_cancel': [] } def add_operation(self, operation_type: OperationType, device_path: str, execute_func: Callable, details: Dict, rollback_data: Dict = None) -> str: """添加操作到队列""" record_id = self.history.add_record( operation_type, device_path, details, rollback_data ) operation = { 'id': record_id, 'type': operation_type, 'device_path': device_path, 'execute_func': execute_func, 'status': OperationStatus.PENDING } self.queue.append(operation) logger.info(f"操作已加入队列: {record_id}") return record_id def start_execution(self): """开始执行队列""" if self.running: return self.running = True self._execute_next() def _execute_next(self): """执行下一个操作""" if not self.queue: self.running = False return operation = self.queue[0] self.current_operation = operation # 更新状态为运行中 self.history.update_status( operation['id'], OperationStatus.RUNNING ) operation['status'] = OperationStatus.RUNNING # 触发开始回调 for callback in self.callbacks['on_start']: callback(operation) # 执行操作 start_time = time.time() try: result = operation['execute_func']() execution_time = time.time() - start_time if result: operation['status'] = OperationStatus.SUCCESS self.history.update_status( operation['id'], OperationStatus.SUCCESS, execution_time=execution_time ) # 触发完成回调 for callback in self.callbacks['on_complete']: callback(operation, True) else: operation['status'] = OperationStatus.FAILED self.history.update_status( operation['id'], OperationStatus.FAILED, error_message="操作执行失败", execution_time=execution_time ) # 触发错误回调 for callback in self.callbacks['on_error']: callback(operation, "操作执行失败") except Exception as e: execution_time = time.time() - start_time operation['status'] = OperationStatus.FAILED self.history.update_status( operation['id'], OperationStatus.FAILED, error_message=str(e), execution_time=execution_time ) # 触发错误回调 for callback in self.callbacks['on_error']: callback(operation, str(e)) finally: # 从队列中移除 if operation in self.queue: self.queue.remove(operation) self.current_operation = None # 继续执行下一个 if self.running and self.queue: self._execute_next() else: self.running = False def cancel_current(self): """取消当前操作""" if self.current_operation: self.history.update_status( self.current_operation['id'], OperationStatus.CANCELLED ) self.current_operation['status'] = OperationStatus.CANCELLED # 触发取消回调 for callback in self.callbacks['on_cancel']: callback(self.current_operation) def clear_queue(self): """清空队列""" for op in self.queue: if op['status'] == OperationStatus.PENDING: self.history.update_status( op['id'], OperationStatus.CANCELLED ) self.queue = [] self.running = False def register_callback(self, event: str, callback: Callable): """注册回调函数""" if event in self.callbacks: self.callbacks[event].append(callback) def get_queue_status(self) -> Dict: """获取队列状态""" return { 'running': self.running, 'pending_count': len([o for o in self.queue if o['status'] == OperationStatus.PENDING]), 'current_operation': self.current_operation } def get_pending_operations(self) -> List[Dict]: """获取待执行的操作列表""" return [o for o in self.queue if o['status'] == OperationStatus.PENDING]