Files
BootRepairTool/backend.py
2026-02-11 03:48:54 +08:00

820 lines
33 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# backend.py
import subprocess
import os
import json
import time
import re
import shutil
import glob
from typing import Tuple, List, Dict, Optional
# 常量配置
COMMAND_TIMEOUT = 300 # 命令执行超时时间(秒)
DEFAULT_GRUB_CONFIG_PATHS = [
"/boot/grub/grub.cfg",
"/boot/grub2/grub.cfg",
]
def log_step(step_name: str, detail: str = ""):
"""输出步骤日志"""
separator = "=" * 60
print(f"\n[STEP] {separator}")
print(f"[STEP] {step_name}")
if detail:
print(f"[STEP] 详情: {detail}")
print(f"[STEP] {separator}\n")
def log_info(msg: str):
"""输出信息日志"""
print(f"[INFO] {msg}")
def log_warning(msg: str):
"""输出警告日志"""
print(f"[WARN] {msg}")
def log_error(msg: str):
"""输出错误日志"""
print(f"[ERROR] {msg}")
def log_debug(msg: str):
"""输出调试日志"""
print(f"[DEBUG] {msg}")
def validate_device_path(path: str) -> bool:
"""
验证设备路径格式是否合法(防止命令注入)。
:param path: 设备路径(如 /dev/sda1, /dev/mapper/cl-root
:return: 是否合法
"""
if not path:
return False
patterns = [
r'^/dev/[a-zA-Z0-9_-]+$',
r'^/dev/mapper/[a-zA-Z0-9_-]+$',
]
return any(re.match(pattern, path) is not None for pattern in patterns)
def run_command(command: List[str], description: str = "执行命令",
cwd: Optional[str] = None, shell: bool = False,
timeout: int = COMMAND_TIMEOUT) -> Tuple[bool, str, str]:
"""
运行一个系统命令,并捕获其输出和错误。
"""
cmd_str = ' '.join(command)
log_info(f"执行命令: {cmd_str}")
log_debug(f"工作目录: {cwd or '当前目录'}, 超时: {timeout}")
try:
result = subprocess.run(
command,
capture_output=True,
text=True,
check=True,
cwd=cwd,
shell=shell,
timeout=timeout
)
if result.stdout:
log_debug(f"stdout: {result.stdout[:500]}") # 限制输出长度
if result.stderr:
log_debug(f"stderr: {result.stderr[:500]}")
log_info(f"✓ 命令成功: {description}")
return True, result.stdout, result.stderr
except subprocess.CalledProcessError as e:
log_error(f"✗ 命令失败: {description}")
log_error(f"返回码: {e.returncode}")
log_error(f"stdout: {e.stdout}")
log_error(f"stderr: {e.stderr}")
return False, e.stdout, e.stderr
except subprocess.TimeoutExpired:
log_error(f"✗ 命令超时(超过 {timeout} 秒): {description}")
return False, "", f"命令执行超时"
except FileNotFoundError:
log_error(f"✗ 命令未找到: {command[0]}")
return False, "", f"命令未找到: {command[0]}"
except Exception as e:
log_error(f"✗ 发生未知错误: {e}")
return False, "", str(e)
def _process_partition(block_device: Dict, all_disks: List, all_partitions: List,
all_efi_partitions: List) -> None:
"""
处理单个块设备递归处理子分区、LVM逻辑卷等。
"""
dev_name = f"/dev/{block_device.get('name', '')}"
dev_type = block_device.get("type")
if dev_type == "disk":
all_disks.append({"name": dev_name})
for child in block_device.get("children", []):
_process_partition(child, all_disks, all_partitions, all_efi_partitions)
elif dev_type == "part":
mountpoint = block_device.get("mountpoint")
if mountpoint and (mountpoint == "/" or
mountpoint.startswith("/run/media") or
mountpoint.startswith("/cdrom") or
mountpoint.startswith("/live")):
for child in block_device.get("children", []):
_process_partition(child, all_disks, all_partitions, all_efi_partitions)
return
part_info = {
"name": dev_name,
"fstype": block_device.get("fstype", "unknown"),
"size": block_device.get("size", "unknown"),
"mountpoint": mountpoint,
"uuid": block_device.get("uuid"),
"partlabel": block_device.get("partlabel"),
"label": block_device.get("label"),
"parttype": (block_device.get("parttype") or "").upper()
}
all_partitions.append(part_info)
is_vfat = part_info["fstype"] == "vfat"
has_efi_label = part_info["partlabel"] and "EFI" in part_info["partlabel"].upper()
has_efi_name = part_info["label"] and "EFI" in part_info["label"].upper()
is_efi_type = part_info["parttype"] == "C12A7328-F81F-11D2-BA4B-00A0C93EC93B"
if is_vfat and (has_efi_label or has_efi_name or is_efi_type):
all_efi_partitions.append(part_info)
for child in block_device.get("children", []):
_process_partition(child, all_disks, all_partitions, all_efi_partitions)
elif dev_type in ["lvm", "dm"]:
mountpoint = block_device.get("mountpoint")
lv_name = block_device.get("name", "")
if "-" in lv_name and not lv_name.startswith("dm-"):
mapper_path = f"/dev/mapper/{lv_name}"
else:
mapper_path = dev_name
part_info = {
"name": mapper_path,
"fstype": block_device.get("fstype", "unknown"),
"size": block_device.get("size", "unknown"),
"mountpoint": mountpoint,
"uuid": block_device.get("uuid"),
"partlabel": block_device.get("partlabel"),
"label": block_device.get("label"),
"parttype": (block_device.get("parttype") or "").upper(),
"is_lvm": True,
"dm_name": lv_name
}
all_partitions.append(part_info)
for child in block_device.get("children", []):
_process_partition(child, all_disks, all_partitions, all_efi_partitions)
def scan_partitions() -> Tuple[bool, List, List, List, str]:
"""
扫描系统中的所有磁盘和分区。
"""
log_step("扫描系统分区", "使用 lsblk 获取磁盘和分区信息")
success, stdout, stderr = run_command(
["sudo", "lsblk", "-J", "-o", "NAME,FSTYPE,SIZE,MOUNTPOINT,UUID,PARTLABEL,LABEL,TYPE,PARTTYPE"],
"扫描分区"
)
if not success:
return False, [], [], [], stderr
try:
data = json.loads(stdout)
all_disks = []
all_partitions = []
all_efi_partitions = []
for block_device in data.get("blockdevices", []):
_process_partition(block_device, all_disks, all_partitions, all_efi_partitions)
log_info(f"扫描完成: 发现 {len(all_disks)} 个磁盘, {len(all_partitions)} 个分区, {len(all_efi_partitions)} 个EFI分区")
for d in all_disks:
log_debug(f" 磁盘: {d['name']}")
for p in all_partitions:
log_debug(f" 分区: {p['name']} ({p['fstype']})")
for e in all_efi_partitions:
log_info(f" EFI分区: {e['name']}")
return True, all_disks, all_partitions, all_efi_partitions, ""
except json.JSONDecodeError as e:
log_error(f"解析 lsblk 输出失败: {e}")
return False, [], [], [], f"解析lsblk输出失败: {e}"
except Exception as e:
log_error(f"处理分区数据时发生未知错误: {e}")
return False, [], [], [], f"处理分区数据时发生未知错误: {e}"
def _cleanup_partial_mount(mount_point: str, mounted_boot: bool, mounted_efi: bool,
bind_paths_mounted: List[str]) -> None:
"""清理部分挂载的资源"""
log_warning(f"清理部分挂载资源: {mount_point}")
for target_path in reversed(bind_paths_mounted):
if os.path.ismount(target_path):
run_command(["sudo", "umount", target_path], f"清理卸载绑定 {target_path}")
if mounted_efi:
efi_mount_point = os.path.join(mount_point, "boot/efi")
if os.path.ismount(efi_mount_point):
run_command(["sudo", "umount", efi_mount_point], "清理卸载EFI分区")
if mounted_boot:
boot_mount_point = os.path.join(mount_point, "boot")
if os.path.ismount(boot_mount_point):
run_command(["sudo", "umount", boot_mount_point], "清理卸载/boot分区")
if os.path.ismount(mount_point):
run_command(["sudo", "umount", mount_point], "清理卸载根分区")
if os.path.exists(mount_point) and not os.path.ismount(mount_point):
try:
os.rmdir(mount_point)
except OSError:
pass
def mount_target_system(root_partition: str, boot_partition: Optional[str] = None,
efi_partition: Optional[str] = None) -> Tuple[bool, str, str]:
"""
挂载目标系统的根分区、/boot分区和EFI分区到临时目录。
"""
log_step("挂载目标系统", f"根分区: {root_partition}, /boot: {boot_partition or ''}, EFI: {efi_partition or ''}")
if not validate_device_path(root_partition):
log_error(f"无效的根分区路径: {root_partition}")
return False, "", f"无效的根分区路径: {root_partition}"
if boot_partition and not validate_device_path(boot_partition):
log_error(f"无效的/boot分区路径: {boot_partition}")
return False, "", f"无效的/boot分区路径: {boot_partition}"
if efi_partition and not validate_device_path(efi_partition):
log_error(f"无效的EFI分区路径: {efi_partition}")
return False, "", f"无效的EFI分区路径: {efi_partition}"
mount_point = "/mnt_grub_repair_" + str(int(time.time()))
log_info(f"创建临时挂载点: {mount_point}")
try:
os.makedirs(mount_point, exist_ok=False)
log_info(f"✓ 挂载点创建成功")
except Exception as e:
log_error(f"创建挂载点失败: {e}")
return False, "", f"创建挂载点失败: {e}"
bind_paths_mounted = []
mounted_boot = False
mounted_efi = False
# 1. 挂载根分区
log_info(f"[1/4] 挂载根分区 {root_partition}{mount_point}")
success, _, stderr = run_command(["sudo", "mount", root_partition, mount_point],
f"挂载根分区 {root_partition}")
if not success:
log_error(f"挂载根分区失败,清理中...")
os.rmdir(mount_point)
return False, "", f"挂载根分区失败: {stderr}"
# 检查挂载是否成功
if not os.path.ismount(mount_point):
log_error(f"挂载点未激活: {mount_point}")
return False, "", "挂载根分区失败: 挂载点未激活"
log_info(f"✓ 根分区挂载成功")
# 检查关键目录
for check_dir in ["etc", "boot"]:
full_path = os.path.join(mount_point, check_dir)
if os.path.exists(full_path):
log_info(f" 发现目录: /{check_dir}")
else:
log_warning(f" 未找到目录: /{check_dir}")
# 2. 挂载 /boot 分区
if boot_partition:
log_info(f"[2/4] 挂载 /boot 分区 {boot_partition}")
boot_mount_point = os.path.join(mount_point, "boot")
if not os.path.exists(boot_mount_point):
os.makedirs(boot_mount_point)
log_debug(f"创建 /boot 挂载点")
success, _, stderr = run_command(["sudo", "mount", boot_partition, boot_mount_point],
f"挂载 /boot 分区 {boot_partition}")
if not success:
log_error(f"挂载 /boot 分区失败,开始清理...")
_cleanup_partial_mount(mount_point, False, False, [])
return False, "", f"挂载 /boot 分区失败: {stderr}"
mounted_boot = True
log_info(f"✓ /boot 分区挂载成功")
else:
log_info(f"[2/4] 无独立的 /boot 分区,跳过")
# 3. 挂载 EFI 分区
if efi_partition:
log_info(f"[3/4] 挂载 EFI 分区 {efi_partition}")
efi_mount_point = os.path.join(mount_point, "boot/efi")
if not os.path.exists(efi_mount_point):
os.makedirs(efi_mount_point)
log_debug(f"创建 /boot/efi 挂载点")
success, _, stderr = run_command(["sudo", "mount", efi_partition, efi_mount_point],
f"挂载 EFI 分区 {efi_partition}")
if not success:
log_error(f"挂载 EFI 分区失败,开始清理...")
_cleanup_partial_mount(mount_point, mounted_boot, False, [])
return False, "", f"挂载 EFI 分区失败: {stderr}"
mounted_efi = True
log_info(f"✓ EFI 分区挂载成功")
# 检查 EFI 分区内容
efi_path = os.path.join(mount_point, "boot/efi/EFI")
if os.path.exists(efi_path):
log_info(f" EFI 分区内容:")
try:
for item in os.listdir(efi_path):
log_info(f" - {item}")
except Exception as e:
log_warning(f" 无法列出 EFI 目录: {e}")
else:
log_info(f"[3/4] 无 EFI 分区,跳过")
# 4. 绑定伪文件系统
log_info(f"[4/4] 绑定伪文件系统 (/dev, /proc, /sys 等)")
bind_paths = ["/dev", "/dev/pts", "/proc", "/sys", "/run"]
for path in bind_paths:
target_path = os.path.join(mount_point, path.lstrip('/'))
if not os.path.exists(target_path):
os.makedirs(target_path)
log_debug(f"创建目录: {target_path}")
success, _, stderr = run_command(["sudo", "mount", "--bind", path, target_path],
f"绑定 {path}")
if not success:
log_error(f"绑定 {path} 失败,开始清理...")
_cleanup_partial_mount(mount_point, mounted_boot, mounted_efi, bind_paths_mounted)
return False, "", f"绑定 {path} 失败: {stderr}"
bind_paths_mounted.append(target_path)
log_info(f"✓ 所有绑定完成")
log_info(f"挂载摘要: 根分区 ✓, /boot {'' if mounted_boot else ''}, EFI {'' if mounted_efi else ''}")
return True, mount_point, ""
def detect_distro_type(mount_point: str) -> str:
"""
检测目标系统发行版类型。
"""
log_step("检测发行版类型", f"挂载点: {mount_point}")
os_release_path = os.path.join(mount_point, "etc/os-release")
log_info(f"检查文件: {os_release_path}")
if not os.path.exists(os_release_path):
log_warning(f"未找到 {os_release_path},尝试 /etc/issue")
issue_path = os.path.join(mount_point, "etc/issue")
if os.path.exists(issue_path):
try:
with open(issue_path, "r") as f:
content = f.read().lower()
log_debug(f"/etc/issue 内容: {content[:200]}")
if "ubuntu" in content:
log_info(f"通过 /etc/issue 检测到: ubuntu")
return "ubuntu"
elif "debian" in content:
log_info(f"通过 /etc/issue 检测到: debian")
return "debian"
elif "centos" in content or "rhel" in content:
log_info(f"通过 /etc/issue 检测到: centos")
return "centos"
elif "fedora" in content:
log_info(f"通过 /etc/issue 检测到: fedora")
return "fedora"
except Exception as e:
log_warning(f"读取 /etc/issue 失败: {e}")
return "unknown"
try:
with open(os_release_path, "r") as f:
content = f.read()
log_debug(f"/etc/os-release 内容:\n{content}")
id_match = re.search(r'^ID=(.+)$', content, re.MULTILINE)
id_like_match = re.search(r'^ID_LIKE=(.+)$', content, re.MULTILINE)
distro_id = id_match.group(1).strip('"\'') if id_match else ""
id_like = id_like_match.group(1).strip('"\'') if id_like_match else ""
log_info(f"ID={distro_id}, ID_LIKE={id_like}")
if distro_id == "ubuntu":
log_info(f"检测到发行版: ubuntu")
return "ubuntu"
elif distro_id == "debian":
log_info(f"检测到发行版: debian")
return "debian"
elif distro_id in ["arch", "manjaro", "endeavouros"]:
log_info(f"检测到发行版: arch (ID={distro_id})")
return "arch"
elif distro_id in ["centos", "rhel", "rocky", "almalinux"]:
log_info(f"检测到发行版: centos (ID={distro_id})")
return "centos"
elif distro_id == "fedora":
log_info(f"检测到发行版: fedora")
return "fedora"
elif distro_id in ["opensuse", "opensuse-leap", "opensuse-tumbleweed"]:
log_info(f"检测到发行版: opensuse (ID={distro_id})")
return "opensuse"
if "ubuntu" in id_like:
log_info(f"通过 ID_LIKE 推断: ubuntu")
return "ubuntu"
elif "debian" in id_like:
log_info(f"通过 ID_LIKE 推断: debian")
return "debian"
elif "arch" in id_like:
log_info(f"通过 ID_LIKE 推断: arch")
return "arch"
elif "rhel" in id_like or "centos" in id_like or "fedora" in id_like:
log_info(f"通过 ID_LIKE 推断: centos")
return "centos"
elif "suse" in id_like:
log_info(f"通过 ID_LIKE 推断: opensuse")
return "opensuse"
log_warning(f"无法识别的发行版ID={distro_id}, ID_LIKE={id_like}")
return "unknown"
except Exception as e:
log_error(f"读取 os-release 文件失败: {e}")
return "unknown"
def check_chroot_environment(mount_point: str) -> Tuple[bool, str]:
"""
检查 chroot 环境是否可用。
"""
log_step("检查 chroot 环境", f"挂载点: {mount_point}")
# 检查关键命令是否存在
critical_commands = ["grub-install", "grub-mkconfig", "update-grub"]
found_commands = []
for cmd in critical_commands:
success, _, _ = run_command(["sudo", "chroot", mount_point, "which", cmd],
f"检查命令 {cmd}", timeout=10)
if success:
found_commands.append(cmd)
log_info(f" ✓ 找到命令: {cmd}")
else:
log_warning(f" ✗ 未找到命令: {cmd}")
if not found_commands:
log_error(f"chroot 环境中未找到关键的 GRUB 命令")
return False, "chroot 环境中缺少 GRUB 命令"
# 检查 grub-install 版本
success, stdout, _ = run_command(["sudo", "chroot", mount_point, "grub-install", "--version"],
"检查 grub-install 版本", timeout=10)
if success:
log_info(f"grub-install 版本: {stdout.strip()}")
# 检查 EFI 目录UEFI 模式)
efi_dir = os.path.join(mount_point, "boot/efi")
if os.path.ismount(efi_dir):
log_info(f"✓ EFI 分区已挂载到 /boot/efi")
# 检查 EFI 目录权限
try:
stat = os.stat(efi_dir)
log_info(f" EFI 目录权限: {oct(stat.st_mode)}")
except Exception as e:
log_warning(f" 无法获取 EFI 目录权限: {e}")
else:
log_info(f"EFI 分区未挂载(可能是 BIOS 模式)")
return True, ""
def chroot_and_repair_grub(mount_point: str, target_disk: str,
is_uefi: bool = False, distro_type: str = "unknown") -> Tuple[bool, str]:
"""
Chroot到目标系统并执行GRUB修复命令。
"""
log_step("修复 GRUB", f"目标磁盘: {target_disk}, UEFI: {is_uefi}, 发行版: {distro_type}")
if not validate_device_path(target_disk):
log_error(f"无效的目标磁盘路径: {target_disk}")
return False, f"无效的目标磁盘路径: {target_disk}"
# 检查 chroot 环境
ok, err = check_chroot_environment(mount_point)
if not ok:
return False, err
chroot_cmd_prefix = ["sudo", "chroot", mount_point]
# 1. 安装GRUB
log_info(f"[1/2] 安装 GRUB 到 {target_disk}")
if is_uefi:
log_info(f"UEFI 模式安装...")
# 检查 EFI 分区是否正确挂载
efi_check_path = os.path.join(mount_point, "boot/efi/EFI")
if os.path.exists(efi_check_path):
log_info(f"✓ EFI 目录存在: {efi_check_path}")
try:
contents = os.listdir(efi_check_path)
log_info(f" 当前 EFI 目录内容: {contents}")
# 检查是否已有其他启动项
existing_entries = [d for d in contents if d not in ['BOOT', 'boot']]
if existing_entries:
log_info(f" 发现已有启动项: {existing_entries}")
log_info(f" 建议在BIOS中选择这些启动项之一或继续使用新安装的GRUB")
except Exception as e:
log_warning(f" 无法列出 EFI 目录: {e}")
else:
log_warning(f"✗ EFI 目录不存在: {efi_check_path}")
# 检测是否在Live环境中通过检查/sys/firmware/efi/efivars是否存在且可访问
is_live_env = not os.path.exists("/sys/firmware/efi/efivars") or \
not os.listdir("/sys/firmware/efi/efivars")
if is_live_env:
log_info(f"检测到 Live 环境(无法访问 EFI 变量),将使用 --removable 模式安装")
# 第一次尝试标准安装带NVRAM注册
log_info(f"尝试 1/3: 标准 UEFI 安装(带 NVRAM...")
success, stdout, stderr = run_command(
chroot_cmd_prefix + ["grub-install", "--target=x86_64-efi",
"--efi-directory=/boot/efi", "--bootloader-id=GRUB",
"--verbose"],
"安装UEFI GRUB带NVRAM",
timeout=60
)
if not success:
log_warning(f"标准安装失败")
log_debug(f"错误详情: {stderr}")
# Live环境或NVRAM失败时优先使用 --removable创建 /EFI/Boot/bootx64.efi
if "EFI variables are not supported" in stderr or "efibootmgr" in stderr or is_live_env:
log_info(f"尝试 2/3: Live环境模式--removable安装到 /EFI/Boot/bootx64.efi...")
success, stdout, stderr = run_command(
chroot_cmd_prefix + ["grub-install", "--target=x86_64-efi",
"--efi-directory=/boot/efi",
"--removable", "--verbose"],
"安装UEFI GRUB可移动模式",
timeout=60
)
if success:
log_info(f"✓ 可移动模式安装成功!")
log_info(f" GRUB 已安装到 /EFI/Boot/bootx64.efi")
log_info(f" 这是UEFI通用回退路径应该在大多数BIOS中自动识别")
# 如果 removable 也失败,尝试 --no-nvram标准路径但不注册NVRAM
if not success:
log_info(f"尝试 3/3: 使用 --no-nvram 选项(标准路径但不注册启动项)...")
success, stdout, stderr = run_command(
chroot_cmd_prefix + ["grub-install", "--target=x86_64-efi",
"--efi-directory=/boot/efi", "--bootloader-id=GRUB",
"--no-nvram", "--verbose"],
"安装UEFI GRUB不带NVRAM",
timeout=60
)
# 检查安装结果
if success:
log_info(f"✓ GRUB EFI 文件安装成功")
# 检查生成的文件
efi_paths_to_check = [
os.path.join(mount_point, "boot/efi/EFI/GRUB"),
os.path.join(mount_point, "boot/efi/EFI/Boot"),
os.path.join(mount_point, "boot/efi/efi/GRUB"), # 某些系统使用小写
os.path.join(mount_point, "boot/efi/efi/boot"),
]
found_efi_file = False
for path in efi_paths_to_check:
if os.path.exists(path):
log_info(f" 检查目录: {path}")
try:
files = os.listdir(path)
for f in files:
if f.endswith('.efi'):
full = os.path.join(path, f)
size = os.path.getsize(full)
log_info(f" ✓ EFI文件: {f} ({size} bytes)")
found_efi_file = True
except Exception as e:
log_warning(f" 无法列出: {e}")
if not found_efi_file:
log_warning(f" 未找到任何 .efi 文件,安装可能有问题")
else:
log_info(f"BIOS 模式安装...")
success, stdout, stderr = run_command(
chroot_cmd_prefix + ["grub-install", "--verbose", target_disk],
"安装BIOS GRUB",
timeout=60
)
if not success:
log_error(f"GRUB 安装失败")
return False, f"GRUB安装失败: {stderr}"
log_info(f"✓ GRUB 安装成功")
# 2. 更新GRUB配置
log_info(f"[2/2] 更新 GRUB 配置文件")
grub_update_cmd = []
config_path = ""
if distro_type in ["debian", "ubuntu"]:
grub_update_cmd = ["update-grub"]
config_path = "/boot/grub/grub.cfg"
elif distro_type == "arch":
grub_update_cmd = ["grub-mkconfig", "-o", "/boot/grub/grub.cfg"]
config_path = "/boot/grub/grub.cfg"
elif distro_type == "centos":
grub2_path = os.path.join(mount_point, "boot/grub2/grub.cfg")
if os.path.exists(os.path.dirname(grub2_path)):
grub_update_cmd = ["grub2-mkconfig", "-o", "/boot/grub2/grub.cfg"]
config_path = "/boot/grub2/grub.cfg"
else:
grub_update_cmd = ["grub2-mkconfig", "-o", "/boot/grub/grub.cfg"]
config_path = "/boot/grub/grub.cfg"
elif distro_type == "fedora":
grub_update_cmd = ["grub2-mkconfig", "-o", "/boot/grub2/grub.cfg"]
config_path = "/boot/grub2/grub.cfg"
elif distro_type == "opensuse":
grub_update_cmd = ["grub2-mkconfig", "-o", "/boot/grub2/grub.cfg"]
config_path = "/boot/grub2/grub.cfg"
else:
for cfg_path in ["/boot/grub/grub.cfg", "/boot/grub2/grub.cfg"]:
full_path = os.path.join(mount_point, cfg_path.lstrip('/'))
if os.path.exists(os.path.dirname(full_path)):
grub_update_cmd = ["grub-mkconfig", "-o", cfg_path]
config_path = cfg_path
break
else:
grub_update_cmd = ["grub-mkconfig", "-o", "/boot/grub/grub.cfg"]
config_path = "/boot/grub/grub.cfg"
log_info(f"使用命令: {' '.join(grub_update_cmd)}")
log_info(f"配置文件路径: {config_path}")
success, stdout, stderr = run_command(
chroot_cmd_prefix + grub_update_cmd,
"更新GRUB配置文件",
timeout=120
)
if not success:
log_error(f"GRUB 配置文件更新失败: {stderr}")
return False, f"GRUB配置文件更新失败: {stderr}"
log_info(f"✓ GRUB 配置文件更新成功")
# 检查生成的配置文件
full_config_path = os.path.join(mount_point, config_path.lstrip('/'))
if os.path.exists(full_config_path):
size = os.path.getsize(full_config_path)
log_info(f" 配置文件大小: {size} bytes")
# 检查配置文件中是否包含菜单项
try:
with open(full_config_path, 'r') as f:
content = f.read()
menu_entries = content.count('menuentry')
log_info(f" 发现 {menu_entries} 个启动菜单项")
except Exception as e:
log_warning(f" 无法读取配置文件: {e}")
else:
log_warning(f" 配置文件未找到: {full_config_path}")
# UEFI 模式下输出重要提示
if is_uefi:
log_step("UEFI 修复完成提示")
log_info(f"GRUB EFI 文件已安装到 EFI 分区")
# 检测安装模式并给出相应提示
efi_boot_path = os.path.join(mount_point, "boot/efi/EFI/Boot/bootx64.efi")
efi_grub_path = os.path.join(mount_point, "boot/efi/EFI/GRUB/grubx64.efi")
if os.path.exists(efi_boot_path):
log_info(f"")
log_info(f"【重要】使用可移动模式安装 (/EFI/Boot/bootx64.efi)")
log_info(f"启动方法:")
log_info(f" 1. 在BIOS中选择 'UEFI Hard Drive''UEFI OS' 启动")
log_info(f" 2. 如果有多块硬盘,选择正确的硬盘 (通常是第一块)")
log_info(f" 3. 确保启动模式为 UEFI不是 Legacy/CSM")
elif os.path.exists(efi_grub_path):
log_info(f"")
log_info(f"【重要】使用标准模式安装 (/EFI/GRUB/grubx64.efi)")
log_info(f"启动方法:")
log_info(f" 1. 在BIOS启动菜单中选择 'GRUB''Linux' 启动项")
log_info(f" 2. 如果没有该选项,需要手动添加启动项")
log_info(f" 路径: \\EFI\\GRUB\\grubx64.efi")
log_info(f"")
log_info(f"故障排除:")
log_info(f" • 如果仍无法启动,请检查:")
log_info(f" - BIOS 是否设置为 UEFI 启动模式(非 Legacy/CSM")
log_info(f" - 安全启动 (Secure Boot) 是否已禁用")
log_info(f" - EFI 分区格式是否为 FAT32")
log_info(f"")
log_info(f" • 如果BIOS中有旧启动项如 BBT-TMS-OS请尝试")
log_info(f" - 选择该启动项可能可以启动(如果它指向正确的系统)")
log_info(f" - 或删除旧启动项让BIOS重新检测")
return True, ""
def unmount_target_system(mount_point: str) -> Tuple[bool, str]:
"""
卸载所有挂载的分区。
"""
log_step("卸载目标系统", f"挂载点: {mount_point}")
success = True
error_msg = ""
bind_paths = ["/run", "/sys", "/proc", "/dev/pts", "/dev"]
for path in bind_paths:
target_path = os.path.join(mount_point, path.lstrip('/'))
if os.path.ismount(target_path):
s, _, stderr = run_command(["sudo", "umount", target_path], f"卸载绑定 {target_path}")
if not s:
success = False
error_msg += f"卸载绑定 {target_path} 失败: {stderr}\n"
else:
log_debug(f"未挂载,跳过: {target_path}")
efi_mount_point = os.path.join(mount_point, "boot/efi")
if os.path.ismount(efi_mount_point):
s, _, stderr = run_command(["sudo", "umount", efi_mount_point], f"卸载 EFI 分区")
if not s:
success = False
error_msg += f"卸载 EFI 分区失败: {stderr}\n"
else:
log_debug(f"EFI 分区未挂载,跳过")
boot_mount_point = os.path.join(mount_point, "boot")
if os.path.ismount(boot_mount_point):
s, _, stderr = run_command(["sudo", "umount", boot_mount_point], f"卸载 /boot 分区")
if not s:
success = False
error_msg += f"卸载 /boot 分区失败: {stderr}\n"
else:
log_debug(f"/boot 分区未挂载,跳过")
if os.path.ismount(mount_point):
s, _, stderr = run_command(["sudo", "umount", mount_point], f"卸载根分区")
if not s:
success = False
error_msg += f"卸载根分区失败: {stderr}\n"
else:
log_debug(f"根分区未挂载,跳过")
if os.path.exists(mount_point) and not os.path.ismount(mount_point):
try:
shutil.rmtree(mount_point)
log_info(f"✓ 清理临时目录: {mount_point}")
except OSError as e:
log_warning(f"无法删除临时目录 {mount_point}: {e}")
error_msg += f"无法删除临时目录: {e}\n"
if success:
log_info(f"✓ 所有分区已卸载")
else:
log_warning(f"部分分区卸载失败")
return success, error_msg
if __name__ == "__main__":
print("--- 运行后端测试 ---")
print("\n--- 扫描分区测试 ---")
success, disks, partitions, efi_partitions, err = scan_partitions()
if success:
print("磁盘:", [d['name'] for d in disks])
print("分区:", [p['name'] for p in partitions])
print("EFI分区:", [p['name'] for p in efi_partitions])
else:
print("扫描分区失败:", err)