Files
BootRepairTool/backend.py
2026-02-11 03:23:56 +08:00

574 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# backend.py
import subprocess
import os
import json
import time
import re
import shutil
from typing import Tuple, List, Dict, Optional
# 常量配置
COMMAND_TIMEOUT = 300 # 命令执行超时时间(秒)
DEFAULT_GRUB_CONFIG_PATHS = [
"/boot/grub/grub.cfg",
"/boot/grub2/grub.cfg",
]
def validate_device_path(path: str) -> bool:
"""
验证设备路径格式是否合法(防止命令注入)。
:param path: 设备路径(如 /dev/sda1, /dev/mapper/cl-root
:return: 是否合法
"""
if not path:
return False
# 允许的格式:
# - /dev/sda, /dev/sda1
# - /dev/nvme0n1, /dev/nvme0n1p1
# - /dev/mmcblk0, /dev/mmcblk0p1
# - /dev/mapper/xxx (LVM逻辑卷)
# - /dev/dm-0 等
patterns = [
r'^/dev/[a-zA-Z0-9_-]+$',
r'^/dev/mapper/[a-zA-Z0-9_-]+$',
]
return any(re.match(pattern, path) is not None for pattern in patterns)
def run_command(command: List[str], description: str = "执行命令",
cwd: Optional[str] = None, shell: bool = False,
timeout: int = COMMAND_TIMEOUT) -> Tuple[bool, str, str]:
"""
运行一个系统命令,并捕获其输出和错误。
:param command: 要执行的命令列表 (例如 ["sudo", "lsblk"])
:param description: 命令的描述,用于日志
:param cwd: 更改工作目录
:param shell: 是否使用shell执行命令
:param timeout: 命令超时时间(秒)
:return: (success, stdout, stderr) 表示成功、标准输出、标准错误
"""
try:
print(f"[Backend] {description}: {' '.join(command)}")
result = subprocess.run(
command,
capture_output=True,
text=True,
check=True,
cwd=cwd,
shell=shell,
timeout=timeout
)
print(f"[Backend] 命令成功: {description}")
return True, result.stdout, result.stderr
except subprocess.CalledProcessError as e:
print(f"[Backend] 命令失败: {description}")
print(f"[Backend] 错误输出: {e.stderr}")
return False, e.stdout, e.stderr
except subprocess.TimeoutExpired:
print(f"[Backend] 命令超时: {description}")
return False, "", f"命令执行超时(超过 {timeout} 秒)"
except FileNotFoundError:
print(f"[Backend] 命令未找到: {' '.join(command)}")
return False, "", f"命令未找到: {command[0]}"
except Exception as e:
print(f"[Backend] 发生未知错误: {e}")
return False, "", str(e)
def _process_partition(block_device: Dict, all_disks: List, all_partitions: List,
all_efi_partitions: List) -> None:
"""
处理单个块设备递归处理子分区、LVM逻辑卷等。
"""
dev_name = f"/dev/{block_device.get('name', '')}"
dev_type = block_device.get("type")
if dev_type == "disk":
all_disks.append({"name": dev_name})
# 递归处理子分区
for child in block_device.get("children", []):
_process_partition(child, all_disks, all_partitions, all_efi_partitions)
elif dev_type == "part":
# 过滤掉Live系统自身的分区
mountpoint = block_device.get("mountpoint")
if mountpoint and (mountpoint == "/" or
mountpoint.startswith("/run/media") or
mountpoint.startswith("/cdrom") or
mountpoint.startswith("/live")):
# 继续处理子设备如LVM但自己不被标记为可用分区
for child in block_device.get("children", []):
_process_partition(child, all_disks, all_partitions, all_efi_partitions)
return
part_info = {
"name": dev_name,
"fstype": block_device.get("fstype", "unknown"),
"size": block_device.get("size", "unknown"),
"mountpoint": mountpoint,
"uuid": block_device.get("uuid"),
"partlabel": block_device.get("partlabel"),
"label": block_device.get("label"),
"parttype": (block_device.get("parttype") or "").upper() # EFI分区类型GUID
}
all_partitions.append(part_info)
# 识别EFI系统分区 (FAT32文件系统, 且满足以下条件之一)
is_vfat = part_info["fstype"] == "vfat"
has_efi_label = part_info["partlabel"] and "EFI" in part_info["partlabel"].upper()
has_efi_name = part_info["label"] and "EFI" in part_info["label"].upper()
is_efi_type = part_info["parttype"] == "C12A7328-F81F-11D2-BA4B-00A0C93EC93B"
if is_vfat and (has_efi_label or has_efi_name or is_efi_type):
all_efi_partitions.append(part_info)
# 递归处理子设备如LVM逻辑卷
for child in block_device.get("children", []):
_process_partition(child, all_disks, all_partitions, all_efi_partitions)
elif dev_type in ["lvm", "dm"]:
# 处理LVM逻辑卷和Device Mapper设备
mountpoint = block_device.get("mountpoint")
# 过滤Live系统自身的挂载点
if mountpoint and (mountpoint == "/" or
mountpoint.startswith("/run/media") or
mountpoint.startswith("/cdrom") or
mountpoint.startswith("/live")):
pass # 仍然添加到分区列表,但标记一下
# 尝试获取mapper路径如 /dev/mapper/cl-root
lv_name = block_device.get("name", "")
# 如果名称中包含'-'可能是mapper设备
if "-" in lv_name and not lv_name.startswith("dm-"):
mapper_path = f"/dev/mapper/{lv_name}"
else:
mapper_path = dev_name
part_info = {
"name": mapper_path,
"fstype": block_device.get("fstype", "unknown"),
"size": block_device.get("size", "unknown"),
"mountpoint": mountpoint,
"uuid": block_device.get("uuid"),
"partlabel": block_device.get("partlabel"),
"label": block_device.get("label"),
"parttype": (block_device.get("parttype") or "").upper(),
"is_lvm": True,
"dm_name": lv_name
}
all_partitions.append(part_info)
# 递归处理可能的子设备
for child in block_device.get("children", []):
_process_partition(child, all_disks, all_partitions, all_efi_partitions)
def scan_partitions() -> Tuple[bool, List, List, List, str]:
"""
扫描系统中的所有磁盘和分区,并返回结构化的信息。
:return: (success, disks, partitions, efi_partitions, error_message)
"""
success, stdout, stderr = run_command(
["sudo", "lsblk", "-J", "-o", "NAME,FSTYPE,SIZE,MOUNTPOINT,UUID,PARTLABEL,LABEL,TYPE,PARTTYPE"],
"扫描分区"
)
if not success:
return False, [], [], [], stderr
try:
data = json.loads(stdout)
all_disks = []
all_partitions = []
all_efi_partitions = []
for block_device in data.get("blockdevices", []):
_process_partition(block_device, all_disks, all_partitions, all_efi_partitions)
return True, all_disks, all_partitions, all_efi_partitions, ""
except json.JSONDecodeError as e:
return False, [], [], [], f"解析lsblk输出失败: {e}"
except Exception as e:
return False, [], [], [], f"处理分区数据时发生未知错误: {e}"
def _cleanup_partial_mount(mount_point: str, mounted_boot: bool, mounted_efi: bool,
bind_paths_mounted: List[str]) -> None:
"""
清理部分挂载的资源(用于挂载失败时的回滚)。
"""
print(f"[Backend] 清理部分挂载资源: {mount_point}")
# 逆序卸载已绑定的路径
for target_path in reversed(bind_paths_mounted):
if os.path.ismount(target_path):
run_command(["sudo", "umount", target_path], f"清理卸载绑定 {target_path}")
# 卸载EFI分区
if mounted_efi:
efi_mount_point = os.path.join(mount_point, "boot/efi")
if os.path.ismount(efi_mount_point):
run_command(["sudo", "umount", efi_mount_point], "清理卸载EFI分区")
# 卸载/boot分区
if mounted_boot:
boot_mount_point = os.path.join(mount_point, "boot")
if os.path.ismount(boot_mount_point):
run_command(["sudo", "umount", boot_mount_point], "清理卸载/boot分区")
# 卸载根分区
if os.path.ismount(mount_point):
run_command(["sudo", "umount", mount_point], "清理卸载根分区")
# 删除临时目录
if os.path.exists(mount_point) and not os.path.ismount(mount_point):
try:
os.rmdir(mount_point)
except OSError:
pass
def mount_target_system(root_partition: str, boot_partition: Optional[str] = None,
efi_partition: Optional[str] = None) -> Tuple[bool, str, str]:
"""
挂载目标系统的根分区、/boot分区和EFI分区到临时目录。
:param root_partition: 目标系统的根分区设备路径
:param boot_partition: 目标系统的独立 /boot 分区设备路径 (可选)
:param efi_partition: 目标系统的EFI系统分区设备路径 (可选仅UEFI)
:return: (True/False, mount_point, error_message)
"""
# 验证输入
if not validate_device_path(root_partition):
return False, "", f"无效的根分区路径: {root_partition}"
if boot_partition and not validate_device_path(boot_partition):
return False, "", f"无效的/boot分区路径: {boot_partition}"
if efi_partition and not validate_device_path(efi_partition):
return False, "", f"无效的EFI分区路径: {efi_partition}"
# 创建临时挂载点
mount_point = "/mnt_grub_repair_" + str(int(time.time()))
try:
os.makedirs(mount_point, exist_ok=False)
except Exception as e:
return False, "", f"创建挂载点失败: {e}"
# 跟踪已挂载的资源,用于失败时清理
bind_paths_mounted = []
mounted_boot = False
mounted_efi = False
# 1. 挂载根分区
success, _, stderr = run_command(["sudo", "mount", root_partition, mount_point],
f"挂载根分区 {root_partition}")
if not success:
os.rmdir(mount_point)
return False, "", f"挂载根分区失败: {stderr}"
# 2. 如果有独立 /boot 分区
if boot_partition:
boot_mount_point = os.path.join(mount_point, "boot")
if not os.path.exists(boot_mount_point):
os.makedirs(boot_mount_point)
success, _, stderr = run_command(["sudo", "mount", boot_partition, boot_mount_point],
f"挂载 /boot 分区 {boot_partition}")
if not success:
_cleanup_partial_mount(mount_point, False, False, [])
return False, "", f"挂载 /boot 分区失败: {stderr}"
mounted_boot = True
# 3. 如果有EFI分区 (UEFI系统)
if efi_partition:
efi_mount_point = os.path.join(mount_point, "boot/efi")
if not os.path.exists(efi_mount_point):
os.makedirs(efi_mount_point)
success, _, stderr = run_command(["sudo", "mount", efi_partition, efi_mount_point],
f"挂载 EFI 分区 {efi_partition}")
if not success:
_cleanup_partial_mount(mount_point, mounted_boot, False, [])
return False, "", f"挂载 EFI 分区失败: {stderr}"
mounted_efi = True
# 4. 绑定必要的伪文件系统
bind_paths = ["/dev", "/dev/pts", "/proc", "/sys", "/run"]
for path in bind_paths:
target_path = os.path.join(mount_point, path.lstrip('/'))
if not os.path.exists(target_path):
os.makedirs(target_path)
success, _, stderr = run_command(["sudo", "mount", "--bind", path, target_path],
f"绑定 {path}{target_path}")
if not success:
_cleanup_partial_mount(mount_point, mounted_boot, mounted_efi, bind_paths_mounted)
return False, "", f"绑定 {path} 失败: {stderr}"
bind_paths_mounted.append(target_path)
return True, mount_point, ""
def detect_distro_type(mount_point: str) -> str:
"""
尝试检测目标系统发行版类型。
:param mount_point: 目标系统根分区的挂载点
:return: "arch", "centos", "debian", "ubuntu", "fedora", "opensuse", "unknown"
"""
os_release_path = os.path.join(mount_point, "etc/os-release")
if not os.path.exists(os_release_path):
# 尝试 /etc/issue 作为备选
issue_path = os.path.join(mount_point, "etc/issue")
if os.path.exists(issue_path):
try:
with open(issue_path, "r") as f:
content = f.read().lower()
if "ubuntu" in content:
return "ubuntu"
elif "debian" in content:
return "debian"
elif "centos" in content or "rhel" in content:
return "centos"
elif "fedora" in content:
return "fedora"
except Exception:
pass
return "unknown"
try:
with open(os_release_path, "r") as f:
content = f.read()
# 解析 ID 和 ID_LIKE 字段
id_match = re.search(r'^ID=(.+)$', content, re.MULTILINE)
id_like_match = re.search(r'^ID_LIKE=(.+)$', content, re.MULTILINE)
distro_id = id_match.group(1).strip('"\'') if id_match else ""
id_like = id_like_match.group(1).strip('"\'') if id_like_match else ""
# 直接匹配
if distro_id == "ubuntu":
return "ubuntu"
elif distro_id == "debian":
return "debian"
elif distro_id in ["arch", "manjaro", "endeavouros"]:
return "arch"
elif distro_id in ["centos", "rhel", "rocky", "almalinux"]:
return "centos"
elif distro_id == "fedora":
return "fedora"
elif distro_id in ["opensuse", "opensuse-leap", "opensuse-tumbleweed"]:
return "opensuse"
# 通过 ID_LIKE 推断
if "ubuntu" in id_like:
return "ubuntu"
elif "debian" in id_like:
return "debian"
elif "arch" in id_like:
return "arch"
elif "rhel" in id_like or "centos" in id_like or "fedora" in id_like:
return "centos" # 使用centos命令集
elif "suse" in id_like:
return "opensuse"
return "unknown"
except Exception as e:
print(f"[Backend] 无法读取os-release文件: {e}")
return "unknown"
def chroot_and_repair_grub(mount_point: str, target_disk: str,
is_uefi: bool = False, distro_type: str = "unknown") -> Tuple[bool, str]:
"""
Chroot到目标系统并执行GRUB修复命令。
:param mount_point: 目标系统根分区的挂载点
:param target_disk: GRUB要安装到的物理磁盘
:param is_uefi: 目标系统是否使用UEFI启动
:param distro_type: 目标系统发行版类型
:return: (True/False, error_message)
"""
if not validate_device_path(target_disk):
return False, f"无效的目标磁盘路径: {target_disk}"
chroot_cmd_prefix = ["sudo", "chroot", mount_point]
# 1. 安装GRUB到目标磁盘
if is_uefi:
# UEFI模式首先尝试正常安装注册NVRAM启动项
success, _, stderr = run_command(
chroot_cmd_prefix + ["grub-install", "--target=x86_64-efi",
"--efi-directory=/boot/efi", "--bootloader-id=GRUB"],
"安装UEFI GRUB带NVRAM"
)
# 如果失败是因为EFI变量不支持常见于Live环境修复外部磁盘使用 --no-nvram 重试
if not success and ("EFI variables are not supported" in stderr or
"efibootmgr" in stderr or
"NVRAM" in stderr):
print(f"[Backend] 警告: EFI变量访问失败尝试不使用NVRAM (--no-nvram) 安装...")
success, _, stderr = run_command(
chroot_cmd_prefix + ["grub-install", "--target=x86_64-efi",
"--efi-directory=/boot/efi", "--bootloader-id=GRUB",
"--no-nvram"],
"安装UEFI GRUB不带NVRAM"
)
# 如果仍然失败,尝试 --removable 选项(某些固件需要)
if not success:
print(f"[Backend] 警告: 标准安装失败,尝试 --removable 选项...")
success, _, stderr = run_command(
chroot_cmd_prefix + ["grub-install", "--target=x86_64-efi",
"--efi-directory=/boot/efi",
"--removable"],
"安装UEFI GRUB可移动模式"
)
else:
success, _, stderr = run_command(
chroot_cmd_prefix + ["grub-install", target_disk],
"安装BIOS GRUB"
)
if not success:
return False, f"GRUB安装失败: {stderr}"
# 2. 更新GRUB配置文件
grub_update_cmd = []
if distro_type in ["debian", "ubuntu"]:
grub_update_cmd = ["update-grub"]
elif distro_type == "arch":
grub_update_cmd = ["grub-mkconfig", "-o", "/boot/grub/grub.cfg"]
elif distro_type == "centos":
# 检测实际存在的配置文件路径
grub2_path = os.path.join(mount_point, "boot/grub2/grub.cfg")
grub_path = os.path.join(mount_point, "boot/grub/grub.cfg")
if os.path.exists(os.path.dirname(grub2_path)):
grub_update_cmd = ["grub2-mkconfig", "-o", "/boot/grub2/grub.cfg"]
else:
grub_update_cmd = ["grub2-mkconfig", "-o", "/boot/grub/grub.cfg"]
elif distro_type == "fedora":
grub_update_cmd = ["grub2-mkconfig", "-o", "/boot/grub2/grub.cfg"]
elif distro_type == "opensuse":
grub_update_cmd = ["grub2-mkconfig", "-o", "/boot/grub2/grub.cfg"]
else:
# 尝试通用命令,先检测配置文件路径
for cfg_path in ["/boot/grub/grub.cfg", "/boot/grub2/grub.cfg"]:
full_path = os.path.join(mount_point, cfg_path.lstrip('/'))
if os.path.exists(os.path.dirname(full_path)):
grub_update_cmd = ["grub-mkconfig", "-o", cfg_path]
break
else:
grub_update_cmd = ["grub-mkconfig", "-o", "/boot/grub/grub.cfg"]
success, _, stderr = run_command(
chroot_cmd_prefix + grub_update_cmd,
"更新GRUB配置文件"
)
if not success:
return False, f"GRUB配置文件更新失败: {stderr}"
return True, ""
def unmount_target_system(mount_point: str) -> Tuple[bool, str]:
"""
卸载所有挂载的分区。
:param mount_point: 目标系统根分区的挂载点
:return: (True/False, error_message)
"""
print(f"[Backend] 正在卸载分区 from {mount_point}...")
success = True
error_msg = ""
# 按照挂载的逆序卸载:
# 挂载顺序: /dev, /dev/pts, /proc, /sys, /run, /boot/efi, /boot, 根分区
# 卸载顺序: /run, /sys, /proc, /dev/pts, /dev, /boot/efi, /boot, 根分区
# 1. 卸载绑定路径(先卸载子目录)
bind_paths = ["/run", "/sys", "/proc", "/dev/pts", "/dev"]
for path in bind_paths:
target_path = os.path.join(mount_point, path.lstrip('/'))
if os.path.ismount(target_path):
s, _, stderr = run_command(["sudo", "umount", target_path], f"卸载绑定 {target_path}")
if not s:
success = False
error_msg += f"卸载绑定 {target_path} 失败: {stderr}\n"
# 2. 卸载 /boot/efi (如果存在)
efi_mount_point = os.path.join(mount_point, "boot/efi")
if os.path.ismount(efi_mount_point):
s, _, stderr = run_command(["sudo", "umount", efi_mount_point], f"卸载 {efi_mount_point}")
if not s:
success = False
error_msg += f"卸载 {efi_mount_point} 失败: {stderr}\n"
# 3. 卸载 /boot (如果存在且是独立挂载的)
boot_mount_point = os.path.join(mount_point, "boot")
if os.path.ismount(boot_mount_point):
s, _, stderr = run_command(["sudo", "umount", boot_mount_point], f"卸载 {boot_mount_point}")
if not s:
success = False
error_msg += f"卸载 {boot_mount_point} 失败: {stderr}\n"
# 4. 卸载根分区
if os.path.ismount(mount_point):
s, _, stderr = run_command(["sudo", "umount", mount_point], f"卸载根分区 {mount_point}")
if not s:
success = False
error_msg += f"卸载根分区 {mount_point} 失败: {stderr}\n"
# 5. 清理临时挂载点目录
if os.path.exists(mount_point) and not os.path.ismount(mount_point):
try:
shutil.rmtree(mount_point)
print(f"[Backend] 清理临时目录 {mount_point}")
except OSError as e:
print(f"[Backend] 无法删除临时目录 {mount_point}: {e}")
error_msg += f"无法删除临时目录 {mount_point}: {e}\n"
return success, error_msg
# 示例如果直接运行backend.py可以进行一些测试
if __name__ == "__main__":
print("--- 运行后端测试 ---")
print("\n--- 扫描分区测试 ---")
success, disks, partitions, efi_partitions, err = scan_partitions()
if success:
print("磁盘:", [d['name'] for d in disks])
print("分区:", [p['name'] for p in partitions])
print("EFI分区:", [p['name'] for p in efi_partitions])
else:
print("扫描分区失败:", err)
# 假设有一些分区可供测试,实际运行时需要用户输入
# 例如:
# test_root_partition = "/dev/sdaX"
# test_target_disk = "/dev/sda"
# test_efi_partition = "/dev/sdY"
#
# if test_root_partition and test_target_disk:
# print(f"\n--- 挂载系统测试 (模拟 {test_root_partition}) ---")
# mount_ok, mnt_pt, mount_err = mount_target_system(test_root_partition, efi_partition=test_efi_partition)
# if mount_ok:
# print(f"系统挂载成功到: {mnt_pt}")
# distro = detect_distro_type(mnt_pt)
# print(f"检测到发行版: {distro}")
#
# print("\n--- 修复GRUB测试 ---")
# repair_ok, repair_err = chroot_and_repair_grub(mnt_pt, test_target_disk, is_uefi=True, distro_type=distro)
# if repair_ok:
# print("GRUB修复成功")
# else:
# print("GRUB修复失败:", repair_err)
#
# print("\n--- 卸载系统测试 ---")
# unmount_ok, unmount_err = unmount_target_system(mnt_pt)
# if unmount_ok:
# print("系统卸载成功!")
# else:
# print("系统卸载失败:", unmount_err)
# else:
# print("系统挂载失败:", mount_err)
# else:
# print("\n请在代码中设置 test_root_partition 和 test_target_disk 进行完整测试。")