# backend.py import subprocess import os import json import time import re import shutil from typing import Tuple, List, Dict, Optional # 常量配置 COMMAND_TIMEOUT = 300 # 命令执行超时时间(秒) DEFAULT_GRUB_CONFIG_PATHS = [ "/boot/grub/grub.cfg", "/boot/grub2/grub.cfg", ] def validate_device_path(path: str) -> bool: """ 验证设备路径格式是否合法(防止命令注入)。 :param path: 设备路径(如 /dev/sda1, /dev/mapper/cl-root) :return: 是否合法 """ if not path: return False # 允许的格式: # - /dev/sda, /dev/sda1 # - /dev/nvme0n1, /dev/nvme0n1p1 # - /dev/mmcblk0, /dev/mmcblk0p1 # - /dev/mapper/xxx (LVM逻辑卷) # - /dev/dm-0 等 patterns = [ r'^/dev/[a-zA-Z0-9_-]+$', r'^/dev/mapper/[a-zA-Z0-9_-]+$', ] return any(re.match(pattern, path) is not None for pattern in patterns) def run_command(command: List[str], description: str = "执行命令", cwd: Optional[str] = None, shell: bool = False, timeout: int = COMMAND_TIMEOUT) -> Tuple[bool, str, str]: """ 运行一个系统命令,并捕获其输出和错误。 :param command: 要执行的命令列表 (例如 ["sudo", "lsblk"]) :param description: 命令的描述,用于日志 :param cwd: 更改工作目录 :param shell: 是否使用shell执行命令 :param timeout: 命令超时时间(秒) :return: (success, stdout, stderr) 表示成功、标准输出、标准错误 """ try: print(f"[Backend] {description}: {' '.join(command)}") result = subprocess.run( command, capture_output=True, text=True, check=True, cwd=cwd, shell=shell, timeout=timeout ) print(f"[Backend] 命令成功: {description}") return True, result.stdout, result.stderr except subprocess.CalledProcessError as e: print(f"[Backend] 命令失败: {description}") print(f"[Backend] 错误输出: {e.stderr}") return False, e.stdout, e.stderr except subprocess.TimeoutExpired: print(f"[Backend] 命令超时: {description}") return False, "", f"命令执行超时(超过 {timeout} 秒)" except FileNotFoundError: print(f"[Backend] 命令未找到: {' '.join(command)}") return False, "", f"命令未找到: {command[0]}" except Exception as e: print(f"[Backend] 发生未知错误: {e}") return False, "", str(e) def _process_partition(block_device: Dict, all_disks: List, all_partitions: List, all_efi_partitions: List) -> None: """ 处理单个块设备,递归处理子分区、LVM逻辑卷等。 """ dev_name = f"/dev/{block_device.get('name', '')}" dev_type = block_device.get("type") if dev_type == "disk": all_disks.append({"name": dev_name}) # 递归处理子分区 for child in block_device.get("children", []): _process_partition(child, all_disks, all_partitions, all_efi_partitions) elif dev_type == "part": # 过滤掉Live系统自身的分区 mountpoint = block_device.get("mountpoint") if mountpoint and (mountpoint == "/" or mountpoint.startswith("/run/media") or mountpoint.startswith("/cdrom") or mountpoint.startswith("/live")): # 继续处理子设备(如LVM),但自己不被标记为可用分区 for child in block_device.get("children", []): _process_partition(child, all_disks, all_partitions, all_efi_partitions) return part_info = { "name": dev_name, "fstype": block_device.get("fstype", "unknown"), "size": block_device.get("size", "unknown"), "mountpoint": mountpoint, "uuid": block_device.get("uuid"), "partlabel": block_device.get("partlabel"), "label": block_device.get("label"), "parttype": (block_device.get("parttype") or "").upper() # EFI分区类型GUID } all_partitions.append(part_info) # 识别EFI系统分区 (FAT32文件系统, 且满足以下条件之一) is_vfat = part_info["fstype"] == "vfat" has_efi_label = part_info["partlabel"] and "EFI" in part_info["partlabel"].upper() has_efi_name = part_info["label"] and "EFI" in part_info["label"].upper() is_efi_type = part_info["parttype"] == "C12A7328-F81F-11D2-BA4B-00A0C93EC93B" if is_vfat and (has_efi_label or has_efi_name or is_efi_type): all_efi_partitions.append(part_info) # 递归处理子设备(如LVM逻辑卷) for child in block_device.get("children", []): _process_partition(child, all_disks, all_partitions, all_efi_partitions) elif dev_type in ["lvm", "dm"]: # 处理LVM逻辑卷和Device Mapper设备 mountpoint = block_device.get("mountpoint") # 过滤Live系统自身的挂载点 if mountpoint and (mountpoint == "/" or mountpoint.startswith("/run/media") or mountpoint.startswith("/cdrom") or mountpoint.startswith("/live")): pass # 仍然添加到分区列表,但标记一下 # 尝试获取mapper路径(如 /dev/mapper/cl-root) lv_name = block_device.get("name", "") # 如果名称中包含'-',可能是mapper设备 if "-" in lv_name and not lv_name.startswith("dm-"): mapper_path = f"/dev/mapper/{lv_name}" else: mapper_path = dev_name part_info = { "name": mapper_path, "fstype": block_device.get("fstype", "unknown"), "size": block_device.get("size", "unknown"), "mountpoint": mountpoint, "uuid": block_device.get("uuid"), "partlabel": block_device.get("partlabel"), "label": block_device.get("label"), "parttype": (block_device.get("parttype") or "").upper(), "is_lvm": True, "dm_name": lv_name } all_partitions.append(part_info) # 递归处理可能的子设备 for child in block_device.get("children", []): _process_partition(child, all_disks, all_partitions, all_efi_partitions) def scan_partitions() -> Tuple[bool, List, List, List, str]: """ 扫描系统中的所有磁盘和分区,并返回结构化的信息。 :return: (success, disks, partitions, efi_partitions, error_message) """ success, stdout, stderr = run_command( ["sudo", "lsblk", "-J", "-o", "NAME,FSTYPE,SIZE,MOUNTPOINT,UUID,PARTLABEL,LABEL,TYPE,PARTTYPE"], "扫描分区" ) if not success: return False, [], [], [], stderr try: data = json.loads(stdout) all_disks = [] all_partitions = [] all_efi_partitions = [] for block_device in data.get("blockdevices", []): _process_partition(block_device, all_disks, all_partitions, all_efi_partitions) return True, all_disks, all_partitions, all_efi_partitions, "" except json.JSONDecodeError as e: return False, [], [], [], f"解析lsblk输出失败: {e}" except Exception as e: return False, [], [], [], f"处理分区数据时发生未知错误: {e}" def _cleanup_partial_mount(mount_point: str, mounted_boot: bool, mounted_efi: bool, bind_paths_mounted: List[str]) -> None: """ 清理部分挂载的资源(用于挂载失败时的回滚)。 """ print(f"[Backend] 清理部分挂载资源: {mount_point}") # 逆序卸载已绑定的路径 for target_path in reversed(bind_paths_mounted): if os.path.ismount(target_path): run_command(["sudo", "umount", target_path], f"清理卸载绑定 {target_path}") # 卸载EFI分区 if mounted_efi: efi_mount_point = os.path.join(mount_point, "boot/efi") if os.path.ismount(efi_mount_point): run_command(["sudo", "umount", efi_mount_point], "清理卸载EFI分区") # 卸载/boot分区 if mounted_boot: boot_mount_point = os.path.join(mount_point, "boot") if os.path.ismount(boot_mount_point): run_command(["sudo", "umount", boot_mount_point], "清理卸载/boot分区") # 卸载根分区 if os.path.ismount(mount_point): run_command(["sudo", "umount", mount_point], "清理卸载根分区") # 删除临时目录 if os.path.exists(mount_point) and not os.path.ismount(mount_point): try: os.rmdir(mount_point) except OSError: pass def mount_target_system(root_partition: str, boot_partition: Optional[str] = None, efi_partition: Optional[str] = None) -> Tuple[bool, str, str]: """ 挂载目标系统的根分区、/boot分区和EFI分区到临时目录。 :param root_partition: 目标系统的根分区设备路径 :param boot_partition: 目标系统的独立 /boot 分区设备路径 (可选) :param efi_partition: 目标系统的EFI系统分区设备路径 (可选,仅UEFI) :return: (True/False, mount_point, error_message) """ # 验证输入 if not validate_device_path(root_partition): return False, "", f"无效的根分区路径: {root_partition}" if boot_partition and not validate_device_path(boot_partition): return False, "", f"无效的/boot分区路径: {boot_partition}" if efi_partition and not validate_device_path(efi_partition): return False, "", f"无效的EFI分区路径: {efi_partition}" # 创建临时挂载点 mount_point = "/mnt_grub_repair_" + str(int(time.time())) try: os.makedirs(mount_point, exist_ok=False) except Exception as e: return False, "", f"创建挂载点失败: {e}" # 跟踪已挂载的资源,用于失败时清理 bind_paths_mounted = [] mounted_boot = False mounted_efi = False # 1. 挂载根分区 success, _, stderr = run_command(["sudo", "mount", root_partition, mount_point], f"挂载根分区 {root_partition}") if not success: os.rmdir(mount_point) return False, "", f"挂载根分区失败: {stderr}" # 2. 如果有独立 /boot 分区 if boot_partition: boot_mount_point = os.path.join(mount_point, "boot") if not os.path.exists(boot_mount_point): os.makedirs(boot_mount_point) success, _, stderr = run_command(["sudo", "mount", boot_partition, boot_mount_point], f"挂载 /boot 分区 {boot_partition}") if not success: _cleanup_partial_mount(mount_point, False, False, []) return False, "", f"挂载 /boot 分区失败: {stderr}" mounted_boot = True # 3. 如果有EFI分区 (UEFI系统) if efi_partition: efi_mount_point = os.path.join(mount_point, "boot/efi") if not os.path.exists(efi_mount_point): os.makedirs(efi_mount_point) success, _, stderr = run_command(["sudo", "mount", efi_partition, efi_mount_point], f"挂载 EFI 分区 {efi_partition}") if not success: _cleanup_partial_mount(mount_point, mounted_boot, False, []) return False, "", f"挂载 EFI 分区失败: {stderr}" mounted_efi = True # 4. 绑定必要的伪文件系统 bind_paths = ["/dev", "/dev/pts", "/proc", "/sys", "/run"] for path in bind_paths: target_path = os.path.join(mount_point, path.lstrip('/')) if not os.path.exists(target_path): os.makedirs(target_path) success, _, stderr = run_command(["sudo", "mount", "--bind", path, target_path], f"绑定 {path} 到 {target_path}") if not success: _cleanup_partial_mount(mount_point, mounted_boot, mounted_efi, bind_paths_mounted) return False, "", f"绑定 {path} 失败: {stderr}" bind_paths_mounted.append(target_path) return True, mount_point, "" def detect_distro_type(mount_point: str) -> str: """ 尝试检测目标系统发行版类型。 :param mount_point: 目标系统根分区的挂载点 :return: "arch", "centos", "debian", "ubuntu", "fedora", "opensuse", "unknown" """ os_release_path = os.path.join(mount_point, "etc/os-release") if not os.path.exists(os_release_path): # 尝试 /etc/issue 作为备选 issue_path = os.path.join(mount_point, "etc/issue") if os.path.exists(issue_path): try: with open(issue_path, "r") as f: content = f.read().lower() if "ubuntu" in content: return "ubuntu" elif "debian" in content: return "debian" elif "centos" in content or "rhel" in content: return "centos" elif "fedora" in content: return "fedora" except Exception: pass return "unknown" try: with open(os_release_path, "r") as f: content = f.read() # 解析 ID 和 ID_LIKE 字段 id_match = re.search(r'^ID=(.+)$', content, re.MULTILINE) id_like_match = re.search(r'^ID_LIKE=(.+)$', content, re.MULTILINE) distro_id = id_match.group(1).strip('"\'') if id_match else "" id_like = id_like_match.group(1).strip('"\'') if id_like_match else "" # 直接匹配 if distro_id == "ubuntu": return "ubuntu" elif distro_id == "debian": return "debian" elif distro_id in ["arch", "manjaro", "endeavouros"]: return "arch" elif distro_id in ["centos", "rhel", "rocky", "almalinux"]: return "centos" elif distro_id == "fedora": return "fedora" elif distro_id in ["opensuse", "opensuse-leap", "opensuse-tumbleweed"]: return "opensuse" # 通过 ID_LIKE 推断 if "ubuntu" in id_like: return "ubuntu" elif "debian" in id_like: return "debian" elif "arch" in id_like: return "arch" elif "rhel" in id_like or "centos" in id_like or "fedora" in id_like: return "centos" # 使用centos命令集 elif "suse" in id_like: return "opensuse" return "unknown" except Exception as e: print(f"[Backend] 无法读取os-release文件: {e}") return "unknown" def chroot_and_repair_grub(mount_point: str, target_disk: str, is_uefi: bool = False, distro_type: str = "unknown") -> Tuple[bool, str]: """ Chroot到目标系统并执行GRUB修复命令。 :param mount_point: 目标系统根分区的挂载点 :param target_disk: GRUB要安装到的物理磁盘 :param is_uefi: 目标系统是否使用UEFI启动 :param distro_type: 目标系统发行版类型 :return: (True/False, error_message) """ if not validate_device_path(target_disk): return False, f"无效的目标磁盘路径: {target_disk}" chroot_cmd_prefix = ["sudo", "chroot", mount_point] # 1. 安装GRUB到目标磁盘 if is_uefi: success, _, stderr = run_command( chroot_cmd_prefix + ["grub-install", "--target=x86_64-efi", "--efi-directory=/boot/efi", "--bootloader-id=GRUB"], "安装UEFI GRUB" ) else: success, _, stderr = run_command( chroot_cmd_prefix + ["grub-install", target_disk], "安装BIOS GRUB" ) if not success: return False, f"GRUB安装失败: {stderr}" # 2. 更新GRUB配置文件 grub_update_cmd = [] if distro_type in ["debian", "ubuntu"]: grub_update_cmd = ["update-grub"] elif distro_type == "arch": grub_update_cmd = ["grub-mkconfig", "-o", "/boot/grub/grub.cfg"] elif distro_type == "centos": # 检测实际存在的配置文件路径 grub2_path = os.path.join(mount_point, "boot/grub2/grub.cfg") grub_path = os.path.join(mount_point, "boot/grub/grub.cfg") if os.path.exists(os.path.dirname(grub2_path)): grub_update_cmd = ["grub2-mkconfig", "-o", "/boot/grub2/grub.cfg"] else: grub_update_cmd = ["grub2-mkconfig", "-o", "/boot/grub/grub.cfg"] elif distro_type == "fedora": grub_update_cmd = ["grub2-mkconfig", "-o", "/boot/grub2/grub.cfg"] elif distro_type == "opensuse": grub_update_cmd = ["grub2-mkconfig", "-o", "/boot/grub2/grub.cfg"] else: # 尝试通用命令,先检测配置文件路径 for cfg_path in ["/boot/grub/grub.cfg", "/boot/grub2/grub.cfg"]: full_path = os.path.join(mount_point, cfg_path.lstrip('/')) if os.path.exists(os.path.dirname(full_path)): grub_update_cmd = ["grub-mkconfig", "-o", cfg_path] break else: grub_update_cmd = ["grub-mkconfig", "-o", "/boot/grub/grub.cfg"] success, _, stderr = run_command( chroot_cmd_prefix + grub_update_cmd, "更新GRUB配置文件" ) if not success: return False, f"GRUB配置文件更新失败: {stderr}" return True, "" def unmount_target_system(mount_point: str) -> Tuple[bool, str]: """ 卸载所有挂载的分区。 :param mount_point: 目标系统根分区的挂载点 :return: (True/False, error_message) """ print(f"[Backend] 正在卸载分区 from {mount_point}...") success = True error_msg = "" # 按照挂载的逆序卸载: # 挂载顺序: /dev, /dev/pts, /proc, /sys, /run, /boot/efi, /boot, 根分区 # 卸载顺序: /run, /sys, /proc, /dev/pts, /dev, /boot/efi, /boot, 根分区 # 1. 卸载绑定路径(先卸载子目录) bind_paths = ["/run", "/sys", "/proc", "/dev/pts", "/dev"] for path in bind_paths: target_path = os.path.join(mount_point, path.lstrip('/')) if os.path.ismount(target_path): s, _, stderr = run_command(["sudo", "umount", target_path], f"卸载绑定 {target_path}") if not s: success = False error_msg += f"卸载绑定 {target_path} 失败: {stderr}\n" # 2. 卸载 /boot/efi (如果存在) efi_mount_point = os.path.join(mount_point, "boot/efi") if os.path.ismount(efi_mount_point): s, _, stderr = run_command(["sudo", "umount", efi_mount_point], f"卸载 {efi_mount_point}") if not s: success = False error_msg += f"卸载 {efi_mount_point} 失败: {stderr}\n" # 3. 卸载 /boot (如果存在且是独立挂载的) boot_mount_point = os.path.join(mount_point, "boot") if os.path.ismount(boot_mount_point): s, _, stderr = run_command(["sudo", "umount", boot_mount_point], f"卸载 {boot_mount_point}") if not s: success = False error_msg += f"卸载 {boot_mount_point} 失败: {stderr}\n" # 4. 卸载根分区 if os.path.ismount(mount_point): s, _, stderr = run_command(["sudo", "umount", mount_point], f"卸载根分区 {mount_point}") if not s: success = False error_msg += f"卸载根分区 {mount_point} 失败: {stderr}\n" # 5. 清理临时挂载点目录 if os.path.exists(mount_point) and not os.path.ismount(mount_point): try: shutil.rmtree(mount_point) print(f"[Backend] 清理临时目录 {mount_point}") except OSError as e: print(f"[Backend] 无法删除临时目录 {mount_point}: {e}") error_msg += f"无法删除临时目录 {mount_point}: {e}\n" return success, error_msg # 示例:如果直接运行backend.py,可以进行一些测试 if __name__ == "__main__": print("--- 运行后端测试 ---") print("\n--- 扫描分区测试 ---") success, disks, partitions, efi_partitions, err = scan_partitions() if success: print("磁盘:", [d['name'] for d in disks]) print("分区:", [p['name'] for p in partitions]) print("EFI分区:", [p['name'] for p in efi_partitions]) else: print("扫描分区失败:", err) # 假设有一些分区可供测试,实际运行时需要用户输入 # 例如: # test_root_partition = "/dev/sdaX" # test_target_disk = "/dev/sda" # test_efi_partition = "/dev/sdY" # # if test_root_partition and test_target_disk: # print(f"\n--- 挂载系统测试 (模拟 {test_root_partition}) ---") # mount_ok, mnt_pt, mount_err = mount_target_system(test_root_partition, efi_partition=test_efi_partition) # if mount_ok: # print(f"系统挂载成功到: {mnt_pt}") # distro = detect_distro_type(mnt_pt) # print(f"检测到发行版: {distro}") # # print("\n--- 修复GRUB测试 ---") # repair_ok, repair_err = chroot_and_repair_grub(mnt_pt, test_target_disk, is_uefi=True, distro_type=distro) # if repair_ok: # print("GRUB修复成功!") # else: # print("GRUB修复失败:", repair_err) # # print("\n--- 卸载系统测试 ---") # unmount_ok, unmount_err = unmount_target_system(mnt_pt) # if unmount_ok: # print("系统卸载成功!") # else: # print("系统卸载失败:", unmount_err) # else: # print("系统挂载失败:", mount_err) # else: # print("\n请在代码中设置 test_root_partition 和 test_target_disk 进行完整测试。")