460 lines
16 KiB
Python
460 lines
16 KiB
Python
# operation_history.py
|
|
"""操作历史与撤销管理模块"""
|
|
|
|
import json
|
|
import os
|
|
import time
|
|
import logging
|
|
import subprocess
|
|
from datetime import datetime
|
|
from typing import List, Dict, Optional, Callable
|
|
from dataclasses import dataclass, asdict
|
|
from enum import Enum
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class OperationStatus(Enum):
|
|
"""操作状态"""
|
|
PENDING = "待执行"
|
|
RUNNING = "执行中"
|
|
SUCCESS = "成功"
|
|
FAILED = "失败"
|
|
CANCELLED = "已取消"
|
|
ROLLED_BACK = "已撤销"
|
|
|
|
|
|
class OperationType(Enum):
|
|
"""操作类型"""
|
|
CREATE_PARTITION = "创建分区"
|
|
DELETE_PARTITION = "删除分区"
|
|
FORMAT_PARTITION = "格式化分区"
|
|
MOUNT_PARTITION = "挂载分区"
|
|
UNMOUNT_PARTITION = "卸载分区"
|
|
CREATE_RAID = "创建 RAID"
|
|
DELETE_RAID = "删除 RAID"
|
|
STOP_RAID = "停止 RAID"
|
|
CREATE_PV = "创建物理卷"
|
|
DELETE_PV = "删除物理卷"
|
|
CREATE_VG = "创建卷组"
|
|
DELETE_VG = "删除卷组"
|
|
CREATE_LV = "创建逻辑卷"
|
|
DELETE_LV = "删除逻辑卷"
|
|
ACTIVATE_LV = "激活逻辑卷"
|
|
DEACTIVATE_LV = "停用逻辑卷"
|
|
WIPE_PARTITION_TABLE = "擦除分区表"
|
|
|
|
|
|
@dataclass
|
|
class OperationRecord:
|
|
"""操作记录"""
|
|
id: str
|
|
timestamp: str
|
|
operation_type: str
|
|
device_path: str
|
|
status: str
|
|
details: Dict
|
|
rollback_data: Optional[Dict] = None
|
|
error_message: Optional[str] = None
|
|
execution_time: Optional[float] = None
|
|
|
|
|
|
class OperationHistory:
|
|
"""操作历史管理器"""
|
|
|
|
def __init__(self, history_file: str = None):
|
|
if history_file is None:
|
|
# 默认存储在用户目录
|
|
home_dir = os.path.expanduser("~")
|
|
self.history_dir = os.path.join(home_dir, ".linux_storage_manager")
|
|
os.makedirs(self.history_dir, exist_ok=True)
|
|
self.history_file = os.path.join(self.history_dir, "operation_history.json")
|
|
self.backup_dir = os.path.join(self.history_dir, "backups")
|
|
os.makedirs(self.backup_dir, exist_ok=True)
|
|
else:
|
|
self.history_file = history_file
|
|
self.backup_dir = os.path.join(os.path.dirname(history_file), "backups")
|
|
os.makedirs(self.backup_dir, exist_ok=True)
|
|
|
|
self.records: List[OperationRecord] = []
|
|
self.pending_operations: List[OperationRecord] = []
|
|
self._load_history()
|
|
|
|
def _load_history(self):
|
|
"""加载历史记录"""
|
|
if os.path.exists(self.history_file):
|
|
try:
|
|
with open(self.history_file, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
self.records = [OperationRecord(**record) for record in data]
|
|
logger.info(f"已加载 {len(self.records)} 条历史记录")
|
|
except Exception as e:
|
|
logger.error(f"加载历史记录失败: {e}")
|
|
self.records = []
|
|
else:
|
|
self.records = []
|
|
|
|
def _save_history(self):
|
|
"""保存历史记录"""
|
|
try:
|
|
with open(self.history_file, 'w', encoding='utf-8') as f:
|
|
json.dump([asdict(r) for r in self.records], f,
|
|
ensure_ascii=False, indent=2)
|
|
except Exception as e:
|
|
logger.error(f"保存历史记录失败: {e}")
|
|
|
|
def add_record(self, operation_type: OperationType, device_path: str,
|
|
details: Dict, rollback_data: Optional[Dict] = None) -> str:
|
|
"""添加操作记录"""
|
|
record_id = f"{int(time.time() * 1000)}_{operation_type.name}"
|
|
record = OperationRecord(
|
|
id=record_id,
|
|
timestamp=datetime.now().isoformat(),
|
|
operation_type=operation_type.value,
|
|
device_path=device_path,
|
|
status=OperationStatus.SUCCESS.value,
|
|
details=details,
|
|
rollback_data=rollback_data
|
|
)
|
|
self.records.append(record)
|
|
self._save_history()
|
|
logger.info(f"添加操作记录: {record_id} - {operation_type.value}")
|
|
return record_id
|
|
|
|
def update_status(self, record_id: str, status: OperationStatus,
|
|
error_message: str = None, execution_time: float = None):
|
|
"""更新操作状态"""
|
|
for record in self.records:
|
|
if record.id == record_id:
|
|
record.status = status.value
|
|
if error_message:
|
|
record.error_message = error_message
|
|
if execution_time:
|
|
record.execution_time = execution_time
|
|
|
|
# 从待执行列表中移除
|
|
if status in [OperationStatus.SUCCESS, OperationStatus.FAILED,
|
|
OperationStatus.CANCELLED]:
|
|
self.pending_operations = [
|
|
r for r in self.pending_operations if r.id != record_id
|
|
]
|
|
|
|
self._save_history()
|
|
logger.info(f"更新操作状态: {record_id} -> {status.value}")
|
|
break
|
|
|
|
def get_recent_records(self, limit: int = 50) -> List[OperationRecord]:
|
|
"""获取最近的操作记录"""
|
|
return sorted(self.records, key=lambda r: r.timestamp, reverse=True)[:limit]
|
|
|
|
def get_pending_operations(self) -> List[OperationRecord]:
|
|
"""获取待执行的操作"""
|
|
return self.pending_operations
|
|
|
|
def export_history(self, file_path: str):
|
|
"""导出历史记录"""
|
|
try:
|
|
with open(file_path, 'w', encoding='utf-8') as f:
|
|
json.dump([asdict(r) for r in self.records], f,
|
|
ensure_ascii=False, indent=2)
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"导出历史记录失败: {e}")
|
|
return False
|
|
|
|
def clear_history(self, days: int = None):
|
|
"""清理历史记录"""
|
|
if days is None:
|
|
# 清空所有记录
|
|
self.records = []
|
|
else:
|
|
# 清理指定天数之前的记录
|
|
cutoff = time.time() - (days * 24 * 60 * 60)
|
|
self.records = [
|
|
r for r in self.records
|
|
if datetime.fromisoformat(r.timestamp).timestamp() > cutoff
|
|
]
|
|
self._save_history()
|
|
|
|
|
|
class PartitionTableBackup:
|
|
"""分区表备份管理器"""
|
|
|
|
def __init__(self, backup_dir: str = None):
|
|
if backup_dir is None:
|
|
home_dir = os.path.expanduser("~")
|
|
self.backup_dir = os.path.join(
|
|
home_dir, ".linux_storage_manager", "partition_backups"
|
|
)
|
|
else:
|
|
self.backup_dir = backup_dir
|
|
os.makedirs(self.backup_dir, exist_ok=True)
|
|
|
|
def backup_partition_table(self, device_path: str) -> Optional[str]:
|
|
"""备份分区表"""
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
device_name = os.path.basename(device_path)
|
|
backup_file = os.path.join(
|
|
self.backup_dir,
|
|
f"{device_name}_pt_{timestamp}.bin"
|
|
)
|
|
|
|
try:
|
|
# 使用 sfdisk 备份分区表
|
|
result = subprocess.run(
|
|
["sudo", "sfdisk", "-d", device_path],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
encoding='utf-8',
|
|
check=True,
|
|
timeout=30
|
|
)
|
|
|
|
with open(backup_file, 'w') as f:
|
|
f.write(result.stdout)
|
|
|
|
logger.info(f"分区表已备份: {backup_file}")
|
|
return backup_file
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
logger.error(f"备份分区表失败: {e}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"备份分区表出错: {e}")
|
|
return None
|
|
|
|
def restore_partition_table(self, device_path: str, backup_file: str) -> bool:
|
|
"""恢复分区表"""
|
|
try:
|
|
if not os.path.exists(backup_file):
|
|
logger.error(f"备份文件不存在: {backup_file}")
|
|
return False
|
|
|
|
with open(backup_file, 'r') as f:
|
|
partition_data = f.read()
|
|
|
|
# 使用 sfdisk 恢复分区表
|
|
result = subprocess.run(
|
|
["sudo", "sfdisk", device_path],
|
|
input=partition_data,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
stdin=subprocess.PIPE,
|
|
encoding='utf-8',
|
|
check=True,
|
|
timeout=30
|
|
)
|
|
|
|
logger.info(f"分区表已从 {backup_file} 恢复到 {device_path}")
|
|
return True
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
logger.error(f"恢复分区表失败: {e.stderr}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"恢复分区表出错: {e}")
|
|
return False
|
|
|
|
def list_backups(self, device_path: str = None) -> List[Dict]:
|
|
"""列出可用的备份"""
|
|
backups = []
|
|
|
|
try:
|
|
for filename in os.listdir(self.backup_dir):
|
|
if filename.endswith('.bin'):
|
|
filepath = os.path.join(self.backup_dir, filename)
|
|
stat = os.stat(filepath)
|
|
|
|
# 解析文件名
|
|
parts = filename.rsplit('_pt_', 1)
|
|
if len(parts) == 2:
|
|
device_name = parts[0]
|
|
timestamp = parts[1].replace('.bin', '')
|
|
|
|
if device_path is None or device_name == os.path.basename(device_path):
|
|
backups.append({
|
|
'file': filepath,
|
|
'device': device_name,
|
|
'timestamp': timestamp,
|
|
'size': stat.st_size,
|
|
'created': datetime.fromtimestamp(stat.st_mtime).isoformat()
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"列出备份失败: {e}")
|
|
|
|
return sorted(backups, key=lambda x: x['created'], reverse=True)
|
|
|
|
def delete_backup(self, backup_file: str) -> bool:
|
|
"""删除备份"""
|
|
try:
|
|
if os.path.exists(backup_file):
|
|
os.remove(backup_file)
|
|
logger.info(f"备份已删除: {backup_file}")
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"删除备份失败: {e}")
|
|
return False
|
|
|
|
|
|
class OperationQueue:
|
|
"""操作队列管理器"""
|
|
|
|
def __init__(self, history: OperationHistory):
|
|
self.history = history
|
|
self.queue: List[Dict] = []
|
|
self.running = False
|
|
self.current_operation = None
|
|
self.callbacks: Dict[str, List[Callable]] = {
|
|
'on_start': [],
|
|
'on_complete': [],
|
|
'on_error': [],
|
|
'on_cancel': []
|
|
}
|
|
|
|
def add_operation(self, operation_type: OperationType, device_path: str,
|
|
execute_func: Callable, details: Dict,
|
|
rollback_data: Dict = None) -> str:
|
|
"""添加操作到队列"""
|
|
record_id = self.history.add_record(
|
|
operation_type, device_path, details, rollback_data
|
|
)
|
|
|
|
operation = {
|
|
'id': record_id,
|
|
'type': operation_type,
|
|
'device_path': device_path,
|
|
'execute_func': execute_func,
|
|
'status': OperationStatus.PENDING
|
|
}
|
|
self.queue.append(operation)
|
|
|
|
logger.info(f"操作已加入队列: {record_id}")
|
|
return record_id
|
|
|
|
def start_execution(self):
|
|
"""开始执行队列"""
|
|
if self.running:
|
|
return
|
|
|
|
self.running = True
|
|
self._execute_next()
|
|
|
|
def _execute_next(self):
|
|
"""执行下一个操作"""
|
|
if not self.queue:
|
|
self.running = False
|
|
return
|
|
|
|
operation = self.queue[0]
|
|
self.current_operation = operation
|
|
|
|
# 更新状态为运行中
|
|
self.history.update_status(
|
|
operation['id'],
|
|
OperationStatus.RUNNING
|
|
)
|
|
operation['status'] = OperationStatus.RUNNING
|
|
|
|
# 触发开始回调
|
|
for callback in self.callbacks['on_start']:
|
|
callback(operation)
|
|
|
|
# 执行操作
|
|
start_time = time.time()
|
|
try:
|
|
result = operation['execute_func']()
|
|
execution_time = time.time() - start_time
|
|
|
|
if result:
|
|
operation['status'] = OperationStatus.SUCCESS
|
|
self.history.update_status(
|
|
operation['id'],
|
|
OperationStatus.SUCCESS,
|
|
execution_time=execution_time
|
|
)
|
|
|
|
# 触发完成回调
|
|
for callback in self.callbacks['on_complete']:
|
|
callback(operation, True)
|
|
else:
|
|
operation['status'] = OperationStatus.FAILED
|
|
self.history.update_status(
|
|
operation['id'],
|
|
OperationStatus.FAILED,
|
|
error_message="操作执行失败",
|
|
execution_time=execution_time
|
|
)
|
|
|
|
# 触发错误回调
|
|
for callback in self.callbacks['on_error']:
|
|
callback(operation, "操作执行失败")
|
|
|
|
except Exception as e:
|
|
execution_time = time.time() - start_time
|
|
operation['status'] = OperationStatus.FAILED
|
|
self.history.update_status(
|
|
operation['id'],
|
|
OperationStatus.FAILED,
|
|
error_message=str(e),
|
|
execution_time=execution_time
|
|
)
|
|
|
|
# 触发错误回调
|
|
for callback in self.callbacks['on_error']:
|
|
callback(operation, str(e))
|
|
|
|
finally:
|
|
# 从队列中移除
|
|
if operation in self.queue:
|
|
self.queue.remove(operation)
|
|
self.current_operation = None
|
|
|
|
# 继续执行下一个
|
|
if self.running and self.queue:
|
|
self._execute_next()
|
|
else:
|
|
self.running = False
|
|
|
|
def cancel_current(self):
|
|
"""取消当前操作"""
|
|
if self.current_operation:
|
|
self.history.update_status(
|
|
self.current_operation['id'],
|
|
OperationStatus.CANCELLED
|
|
)
|
|
self.current_operation['status'] = OperationStatus.CANCELLED
|
|
|
|
# 触发取消回调
|
|
for callback in self.callbacks['on_cancel']:
|
|
callback(self.current_operation)
|
|
|
|
def clear_queue(self):
|
|
"""清空队列"""
|
|
for op in self.queue:
|
|
if op['status'] == OperationStatus.PENDING:
|
|
self.history.update_status(
|
|
op['id'],
|
|
OperationStatus.CANCELLED
|
|
)
|
|
self.queue = []
|
|
self.running = False
|
|
|
|
def register_callback(self, event: str, callback: Callable):
|
|
"""注册回调函数"""
|
|
if event in self.callbacks:
|
|
self.callbacks[event].append(callback)
|
|
|
|
def get_queue_status(self) -> Dict:
|
|
"""获取队列状态"""
|
|
return {
|
|
'running': self.running,
|
|
'pending_count': len([o for o in self.queue if o['status'] == OperationStatus.PENDING]),
|
|
'current_operation': self.current_operation
|
|
}
|
|
|
|
def get_pending_operations(self) -> List[Dict]:
|
|
"""获取待执行的操作列表"""
|
|
return [o for o in self.queue if o['status'] == OperationStatus.PENDING]
|