This commit is contained in:
zj
2026-02-11 03:34:53 +08:00
parent d0210d6582
commit 622e894883
2 changed files with 357 additions and 167 deletions

Binary file not shown.

View File

@@ -6,6 +6,7 @@ import json
import time import time
import re import re
import shutil import shutil
import glob
from typing import Tuple, List, Dict, Optional from typing import Tuple, List, Dict, Optional
# 常量配置 # 常量配置
@@ -16,6 +17,36 @@ DEFAULT_GRUB_CONFIG_PATHS = [
] ]
def log_step(step_name: str, detail: str = ""):
"""输出步骤日志"""
separator = "=" * 60
print(f"\n[STEP] {separator}")
print(f"[STEP] {step_name}")
if detail:
print(f"[STEP] 详情: {detail}")
print(f"[STEP] {separator}\n")
def log_info(msg: str):
"""输出信息日志"""
print(f"[INFO] {msg}")
def log_warning(msg: str):
"""输出警告日志"""
print(f"[WARN] {msg}")
def log_error(msg: str):
"""输出错误日志"""
print(f"[ERROR] {msg}")
def log_debug(msg: str):
"""输出调试日志"""
print(f"[DEBUG] {msg}")
def validate_device_path(path: str) -> bool: def validate_device_path(path: str) -> bool:
""" """
验证设备路径格式是否合法(防止命令注入)。 验证设备路径格式是否合法(防止命令注入)。
@@ -24,12 +55,6 @@ def validate_device_path(path: str) -> bool:
""" """
if not path: if not path:
return False return False
# 允许的格式:
# - /dev/sda, /dev/sda1
# - /dev/nvme0n1, /dev/nvme0n1p1
# - /dev/mmcblk0, /dev/mmcblk0p1
# - /dev/mapper/xxx (LVM逻辑卷)
# - /dev/dm-0 等
patterns = [ patterns = [
r'^/dev/[a-zA-Z0-9_-]+$', r'^/dev/[a-zA-Z0-9_-]+$',
r'^/dev/mapper/[a-zA-Z0-9_-]+$', r'^/dev/mapper/[a-zA-Z0-9_-]+$',
@@ -42,15 +67,12 @@ def run_command(command: List[str], description: str = "执行命令",
timeout: int = COMMAND_TIMEOUT) -> Tuple[bool, str, str]: timeout: int = COMMAND_TIMEOUT) -> Tuple[bool, str, str]:
""" """
运行一个系统命令,并捕获其输出和错误。 运行一个系统命令,并捕获其输出和错误。
:param command: 要执行的命令列表 (例如 ["sudo", "lsblk"])
:param description: 命令的描述,用于日志
:param cwd: 更改工作目录
:param shell: 是否使用shell执行命令
:param timeout: 命令超时时间(秒)
:return: (success, stdout, stderr) 表示成功、标准输出、标准错误
""" """
cmd_str = ' '.join(command)
log_info(f"执行命令: {cmd_str}")
log_debug(f"工作目录: {cwd or '当前目录'}, 超时: {timeout}")
try: try:
print(f"[Backend] {description}: {' '.join(command)}")
result = subprocess.run( result = subprocess.run(
command, command,
capture_output=True, capture_output=True,
@@ -60,20 +82,26 @@ def run_command(command: List[str], description: str = "执行命令",
shell=shell, shell=shell,
timeout=timeout timeout=timeout
) )
print(f"[Backend] 命令成功: {description}") if result.stdout:
log_debug(f"stdout: {result.stdout[:500]}") # 限制输出长度
if result.stderr:
log_debug(f"stderr: {result.stderr[:500]}")
log_info(f"✓ 命令成功: {description}")
return True, result.stdout, result.stderr return True, result.stdout, result.stderr
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
print(f"[Backend] 命令失败: {description}") log_error(f" 命令失败: {description}")
print(f"[Backend] 错误输出: {e.stderr}") log_error(f"返回码: {e.returncode}")
log_error(f"stdout: {e.stdout}")
log_error(f"stderr: {e.stderr}")
return False, e.stdout, e.stderr return False, e.stdout, e.stderr
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
print(f"[Backend] 命令超时: {description}") log_error(f"✗ 命令超时(超过 {timeout} 秒): {description}")
return False, "", f"命令执行超时(超过 {timeout} 秒)" return False, "", f"命令执行超时"
except FileNotFoundError: except FileNotFoundError:
print(f"[Backend] 命令未找到: {' '.join(command)}") log_error(f" 命令未找到: {command[0]}")
return False, "", f"命令未找到: {command[0]}" return False, "", f"命令未找到: {command[0]}"
except Exception as e: except Exception as e:
print(f"[Backend] 发生未知错误: {e}") log_error(f" 发生未知错误: {e}")
return False, "", str(e) return False, "", str(e)
@@ -87,18 +115,15 @@ def _process_partition(block_device: Dict, all_disks: List, all_partitions: List
if dev_type == "disk": if dev_type == "disk":
all_disks.append({"name": dev_name}) all_disks.append({"name": dev_name})
# 递归处理子分区
for child in block_device.get("children", []): for child in block_device.get("children", []):
_process_partition(child, all_disks, all_partitions, all_efi_partitions) _process_partition(child, all_disks, all_partitions, all_efi_partitions)
elif dev_type == "part": elif dev_type == "part":
# 过滤掉Live系统自身的分区
mountpoint = block_device.get("mountpoint") mountpoint = block_device.get("mountpoint")
if mountpoint and (mountpoint == "/" or if mountpoint and (mountpoint == "/" or
mountpoint.startswith("/run/media") or mountpoint.startswith("/run/media") or
mountpoint.startswith("/cdrom") or mountpoint.startswith("/cdrom") or
mountpoint.startswith("/live")): mountpoint.startswith("/live")):
# 继续处理子设备如LVM但自己不被标记为可用分区
for child in block_device.get("children", []): for child in block_device.get("children", []):
_process_partition(child, all_disks, all_partitions, all_efi_partitions) _process_partition(child, all_disks, all_partitions, all_efi_partitions)
return return
@@ -111,11 +136,10 @@ def _process_partition(block_device: Dict, all_disks: List, all_partitions: List
"uuid": block_device.get("uuid"), "uuid": block_device.get("uuid"),
"partlabel": block_device.get("partlabel"), "partlabel": block_device.get("partlabel"),
"label": block_device.get("label"), "label": block_device.get("label"),
"parttype": (block_device.get("parttype") or "").upper() # EFI分区类型GUID "parttype": (block_device.get("parttype") or "").upper()
} }
all_partitions.append(part_info) all_partitions.append(part_info)
# 识别EFI系统分区 (FAT32文件系统, 且满足以下条件之一)
is_vfat = part_info["fstype"] == "vfat" is_vfat = part_info["fstype"] == "vfat"
has_efi_label = part_info["partlabel"] and "EFI" in part_info["partlabel"].upper() has_efi_label = part_info["partlabel"] and "EFI" in part_info["partlabel"].upper()
has_efi_name = part_info["label"] and "EFI" in part_info["label"].upper() has_efi_name = part_info["label"] and "EFI" in part_info["label"].upper()
@@ -124,24 +148,13 @@ def _process_partition(block_device: Dict, all_disks: List, all_partitions: List
if is_vfat and (has_efi_label or has_efi_name or is_efi_type): if is_vfat and (has_efi_label or has_efi_name or is_efi_type):
all_efi_partitions.append(part_info) all_efi_partitions.append(part_info)
# 递归处理子设备如LVM逻辑卷
for child in block_device.get("children", []): for child in block_device.get("children", []):
_process_partition(child, all_disks, all_partitions, all_efi_partitions) _process_partition(child, all_disks, all_partitions, all_efi_partitions)
elif dev_type in ["lvm", "dm"]: elif dev_type in ["lvm", "dm"]:
# 处理LVM逻辑卷和Device Mapper设备
mountpoint = block_device.get("mountpoint") mountpoint = block_device.get("mountpoint")
# 过滤Live系统自身的挂载点
if mountpoint and (mountpoint == "/" or
mountpoint.startswith("/run/media") or
mountpoint.startswith("/cdrom") or
mountpoint.startswith("/live")):
pass # 仍然添加到分区列表,但标记一下
# 尝试获取mapper路径如 /dev/mapper/cl-root
lv_name = block_device.get("name", "") lv_name = block_device.get("name", "")
# 如果名称中包含'-'可能是mapper设备
if "-" in lv_name and not lv_name.startswith("dm-"): if "-" in lv_name and not lv_name.startswith("dm-"):
mapper_path = f"/dev/mapper/{lv_name}" mapper_path = f"/dev/mapper/{lv_name}"
else: else:
@@ -161,16 +174,16 @@ def _process_partition(block_device: Dict, all_disks: List, all_partitions: List
} }
all_partitions.append(part_info) all_partitions.append(part_info)
# 递归处理可能的子设备
for child in block_device.get("children", []): for child in block_device.get("children", []):
_process_partition(child, all_disks, all_partitions, all_efi_partitions) _process_partition(child, all_disks, all_partitions, all_efi_partitions)
def scan_partitions() -> Tuple[bool, List, List, List, str]: def scan_partitions() -> Tuple[bool, List, List, List, str]:
""" """
扫描系统中的所有磁盘和分区,并返回结构化的信息 扫描系统中的所有磁盘和分区。
:return: (success, disks, partitions, efi_partitions, error_message)
""" """
log_step("扫描系统分区", "使用 lsblk 获取磁盘和分区信息")
success, stdout, stderr = run_command( success, stdout, stderr = run_command(
["sudo", "lsblk", "-J", "-o", "NAME,FSTYPE,SIZE,MOUNTPOINT,UUID,PARTLABEL,LABEL,TYPE,PARTTYPE"], ["sudo", "lsblk", "-J", "-o", "NAME,FSTYPE,SIZE,MOUNTPOINT,UUID,PARTLABEL,LABEL,TYPE,PARTTYPE"],
"扫描分区" "扫描分区"
@@ -188,43 +201,46 @@ def scan_partitions() -> Tuple[bool, List, List, List, str]:
for block_device in data.get("blockdevices", []): for block_device in data.get("blockdevices", []):
_process_partition(block_device, all_disks, all_partitions, all_efi_partitions) _process_partition(block_device, all_disks, all_partitions, all_efi_partitions)
log_info(f"扫描完成: 发现 {len(all_disks)} 个磁盘, {len(all_partitions)} 个分区, {len(all_efi_partitions)} 个EFI分区")
for d in all_disks:
log_debug(f" 磁盘: {d['name']}")
for p in all_partitions:
log_debug(f" 分区: {p['name']} ({p['fstype']})")
for e in all_efi_partitions:
log_info(f" EFI分区: {e['name']}")
return True, all_disks, all_partitions, all_efi_partitions, "" return True, all_disks, all_partitions, all_efi_partitions, ""
except json.JSONDecodeError as e: except json.JSONDecodeError as e:
log_error(f"解析 lsblk 输出失败: {e}")
return False, [], [], [], f"解析lsblk输出失败: {e}" return False, [], [], [], f"解析lsblk输出失败: {e}"
except Exception as e: except Exception as e:
log_error(f"处理分区数据时发生未知错误: {e}")
return False, [], [], [], f"处理分区数据时发生未知错误: {e}" return False, [], [], [], f"处理分区数据时发生未知错误: {e}"
def _cleanup_partial_mount(mount_point: str, mounted_boot: bool, mounted_efi: bool, def _cleanup_partial_mount(mount_point: str, mounted_boot: bool, mounted_efi: bool,
bind_paths_mounted: List[str]) -> None: bind_paths_mounted: List[str]) -> None:
""" """清理部分挂载的资源"""
清理部分挂载资源(用于挂载失败时的回滚)。 log_warning(f"清理部分挂载资源: {mount_point}")
"""
print(f"[Backend] 清理部分挂载资源: {mount_point}")
# 逆序卸载已绑定的路径
for target_path in reversed(bind_paths_mounted): for target_path in reversed(bind_paths_mounted):
if os.path.ismount(target_path): if os.path.ismount(target_path):
run_command(["sudo", "umount", target_path], f"清理卸载绑定 {target_path}") run_command(["sudo", "umount", target_path], f"清理卸载绑定 {target_path}")
# 卸载EFI分区
if mounted_efi: if mounted_efi:
efi_mount_point = os.path.join(mount_point, "boot/efi") efi_mount_point = os.path.join(mount_point, "boot/efi")
if os.path.ismount(efi_mount_point): if os.path.ismount(efi_mount_point):
run_command(["sudo", "umount", efi_mount_point], "清理卸载EFI分区") run_command(["sudo", "umount", efi_mount_point], "清理卸载EFI分区")
# 卸载/boot分区
if mounted_boot: if mounted_boot:
boot_mount_point = os.path.join(mount_point, "boot") boot_mount_point = os.path.join(mount_point, "boot")
if os.path.ismount(boot_mount_point): if os.path.ismount(boot_mount_point):
run_command(["sudo", "umount", boot_mount_point], "清理卸载/boot分区") run_command(["sudo", "umount", boot_mount_point], "清理卸载/boot分区")
# 卸载根分区
if os.path.ismount(mount_point): if os.path.ismount(mount_point):
run_command(["sudo", "umount", mount_point], "清理卸载根分区") run_command(["sudo", "umount", mount_point], "清理卸载根分区")
# 删除临时目录
if os.path.exists(mount_point) and not os.path.ismount(mount_point): if os.path.exists(mount_point) and not os.path.ismount(mount_point):
try: try:
os.rmdir(mount_point) os.rmdir(mount_point)
@@ -236,253 +252,451 @@ def mount_target_system(root_partition: str, boot_partition: Optional[str] = Non
efi_partition: Optional[str] = None) -> Tuple[bool, str, str]: efi_partition: Optional[str] = None) -> Tuple[bool, str, str]:
""" """
挂载目标系统的根分区、/boot分区和EFI分区到临时目录。 挂载目标系统的根分区、/boot分区和EFI分区到临时目录。
:param root_partition: 目标系统的根分区设备路径
:param boot_partition: 目标系统的独立 /boot 分区设备路径 (可选)
:param efi_partition: 目标系统的EFI系统分区设备路径 (可选仅UEFI)
:return: (True/False, mount_point, error_message)
""" """
# 验证输入 log_step("挂载目标系统", f"根分区: {root_partition}, /boot: {boot_partition or ''}, EFI: {efi_partition or ''}")
if not validate_device_path(root_partition): if not validate_device_path(root_partition):
log_error(f"无效的根分区路径: {root_partition}")
return False, "", f"无效的根分区路径: {root_partition}" return False, "", f"无效的根分区路径: {root_partition}"
if boot_partition and not validate_device_path(boot_partition): if boot_partition and not validate_device_path(boot_partition):
log_error(f"无效的/boot分区路径: {boot_partition}")
return False, "", f"无效的/boot分区路径: {boot_partition}" return False, "", f"无效的/boot分区路径: {boot_partition}"
if efi_partition and not validate_device_path(efi_partition): if efi_partition and not validate_device_path(efi_partition):
log_error(f"无效的EFI分区路径: {efi_partition}")
return False, "", f"无效的EFI分区路径: {efi_partition}" return False, "", f"无效的EFI分区路径: {efi_partition}"
# 创建临时挂载点
mount_point = "/mnt_grub_repair_" + str(int(time.time())) mount_point = "/mnt_grub_repair_" + str(int(time.time()))
log_info(f"创建临时挂载点: {mount_point}")
try: try:
os.makedirs(mount_point, exist_ok=False) os.makedirs(mount_point, exist_ok=False)
log_info(f"✓ 挂载点创建成功")
except Exception as e: except Exception as e:
log_error(f"创建挂载点失败: {e}")
return False, "", f"创建挂载点失败: {e}" return False, "", f"创建挂载点失败: {e}"
# 跟踪已挂载的资源,用于失败时清理
bind_paths_mounted = [] bind_paths_mounted = []
mounted_boot = False mounted_boot = False
mounted_efi = False mounted_efi = False
# 1. 挂载根分区 # 1. 挂载根分区
log_info(f"[1/4] 挂载根分区 {root_partition}{mount_point}")
success, _, stderr = run_command(["sudo", "mount", root_partition, mount_point], success, _, stderr = run_command(["sudo", "mount", root_partition, mount_point],
f"挂载根分区 {root_partition}") f"挂载根分区 {root_partition}")
if not success: if not success:
log_error(f"挂载根分区失败,清理中...")
os.rmdir(mount_point) os.rmdir(mount_point)
return False, "", f"挂载根分区失败: {stderr}" return False, "", f"挂载根分区失败: {stderr}"
# 2. 如果有独立 /boot 分区 # 检查挂载是否成功
if not os.path.ismount(mount_point):
log_error(f"挂载点未激活: {mount_point}")
return False, "", "挂载根分区失败: 挂载点未激活"
log_info(f"✓ 根分区挂载成功")
# 检查关键目录
for check_dir in ["etc", "boot"]:
full_path = os.path.join(mount_point, check_dir)
if os.path.exists(full_path):
log_info(f" 发现目录: /{check_dir}")
else:
log_warning(f" 未找到目录: /{check_dir}")
# 2. 挂载 /boot 分区
if boot_partition: if boot_partition:
log_info(f"[2/4] 挂载 /boot 分区 {boot_partition}")
boot_mount_point = os.path.join(mount_point, "boot") boot_mount_point = os.path.join(mount_point, "boot")
if not os.path.exists(boot_mount_point): if not os.path.exists(boot_mount_point):
os.makedirs(boot_mount_point) os.makedirs(boot_mount_point)
log_debug(f"创建 /boot 挂载点")
success, _, stderr = run_command(["sudo", "mount", boot_partition, boot_mount_point], success, _, stderr = run_command(["sudo", "mount", boot_partition, boot_mount_point],
f"挂载 /boot 分区 {boot_partition}") f"挂载 /boot 分区 {boot_partition}")
if not success: if not success:
log_error(f"挂载 /boot 分区失败,开始清理...")
_cleanup_partial_mount(mount_point, False, False, []) _cleanup_partial_mount(mount_point, False, False, [])
return False, "", f"挂载 /boot 分区失败: {stderr}" return False, "", f"挂载 /boot 分区失败: {stderr}"
mounted_boot = True mounted_boot = True
log_info(f"✓ /boot 分区挂载成功")
else:
log_info(f"[2/4] 无独立的 /boot 分区,跳过")
# 3. 如果有EFI分区 (UEFI系统) # 3. 挂载 EFI 分区
if efi_partition: if efi_partition:
log_info(f"[3/4] 挂载 EFI 分区 {efi_partition}")
efi_mount_point = os.path.join(mount_point, "boot/efi") efi_mount_point = os.path.join(mount_point, "boot/efi")
if not os.path.exists(efi_mount_point): if not os.path.exists(efi_mount_point):
os.makedirs(efi_mount_point) os.makedirs(efi_mount_point)
log_debug(f"创建 /boot/efi 挂载点")
success, _, stderr = run_command(["sudo", "mount", efi_partition, efi_mount_point], success, _, stderr = run_command(["sudo", "mount", efi_partition, efi_mount_point],
f"挂载 EFI 分区 {efi_partition}") f"挂载 EFI 分区 {efi_partition}")
if not success: if not success:
log_error(f"挂载 EFI 分区失败,开始清理...")
_cleanup_partial_mount(mount_point, mounted_boot, False, []) _cleanup_partial_mount(mount_point, mounted_boot, False, [])
return False, "", f"挂载 EFI 分区失败: {stderr}" return False, "", f"挂载 EFI 分区失败: {stderr}"
mounted_efi = True mounted_efi = True
log_info(f"✓ EFI 分区挂载成功")
# 4. 绑定必要的伪文件系统 # 检查 EFI 分区内容
efi_path = os.path.join(mount_point, "boot/efi/EFI")
if os.path.exists(efi_path):
log_info(f" EFI 分区内容:")
try:
for item in os.listdir(efi_path):
log_info(f" - {item}")
except Exception as e:
log_warning(f" 无法列出 EFI 目录: {e}")
else:
log_info(f"[3/4] 无 EFI 分区,跳过")
# 4. 绑定伪文件系统
log_info(f"[4/4] 绑定伪文件系统 (/dev, /proc, /sys 等)")
bind_paths = ["/dev", "/dev/pts", "/proc", "/sys", "/run"] bind_paths = ["/dev", "/dev/pts", "/proc", "/sys", "/run"]
for path in bind_paths: for path in bind_paths:
target_path = os.path.join(mount_point, path.lstrip('/')) target_path = os.path.join(mount_point, path.lstrip('/'))
if not os.path.exists(target_path): if not os.path.exists(target_path):
os.makedirs(target_path) os.makedirs(target_path)
log_debug(f"创建目录: {target_path}")
success, _, stderr = run_command(["sudo", "mount", "--bind", path, target_path], success, _, stderr = run_command(["sudo", "mount", "--bind", path, target_path],
f"绑定 {path}{target_path}") f"绑定 {path}")
if not success: if not success:
log_error(f"绑定 {path} 失败,开始清理...")
_cleanup_partial_mount(mount_point, mounted_boot, mounted_efi, bind_paths_mounted) _cleanup_partial_mount(mount_point, mounted_boot, mounted_efi, bind_paths_mounted)
return False, "", f"绑定 {path} 失败: {stderr}" return False, "", f"绑定 {path} 失败: {stderr}"
bind_paths_mounted.append(target_path) bind_paths_mounted.append(target_path)
log_info(f"✓ 所有绑定完成")
log_info(f"挂载摘要: 根分区 ✓, /boot {'' if mounted_boot else ''}, EFI {'' if mounted_efi else ''}")
return True, mount_point, "" return True, mount_point, ""
def detect_distro_type(mount_point: str) -> str: def detect_distro_type(mount_point: str) -> str:
""" """
尝试检测目标系统发行版类型。 检测目标系统发行版类型。
:param mount_point: 目标系统根分区的挂载点
:return: "arch", "centos", "debian", "ubuntu", "fedora", "opensuse", "unknown"
""" """
log_step("检测发行版类型", f"挂载点: {mount_point}")
os_release_path = os.path.join(mount_point, "etc/os-release") os_release_path = os.path.join(mount_point, "etc/os-release")
log_info(f"检查文件: {os_release_path}")
if not os.path.exists(os_release_path): if not os.path.exists(os_release_path):
# 尝试 /etc/issue 作为备选 log_warning(f"未找到 {os_release_path}尝试 /etc/issue")
issue_path = os.path.join(mount_point, "etc/issue") issue_path = os.path.join(mount_point, "etc/issue")
if os.path.exists(issue_path): if os.path.exists(issue_path):
try: try:
with open(issue_path, "r") as f: with open(issue_path, "r") as f:
content = f.read().lower() content = f.read().lower()
log_debug(f"/etc/issue 内容: {content[:200]}")
if "ubuntu" in content: if "ubuntu" in content:
log_info(f"通过 /etc/issue 检测到: ubuntu")
return "ubuntu" return "ubuntu"
elif "debian" in content: elif "debian" in content:
log_info(f"通过 /etc/issue 检测到: debian")
return "debian" return "debian"
elif "centos" in content or "rhel" in content: elif "centos" in content or "rhel" in content:
log_info(f"通过 /etc/issue 检测到: centos")
return "centos" return "centos"
elif "fedora" in content: elif "fedora" in content:
log_info(f"通过 /etc/issue 检测到: fedora")
return "fedora" return "fedora"
except Exception: except Exception as e:
pass log_warning(f"读取 /etc/issue 失败: {e}")
return "unknown" return "unknown"
try: try:
with open(os_release_path, "r") as f: with open(os_release_path, "r") as f:
content = f.read() content = f.read()
log_debug(f"/etc/os-release 内容:\n{content}")
# 解析 ID 和 ID_LIKE 字段
id_match = re.search(r'^ID=(.+)$', content, re.MULTILINE) id_match = re.search(r'^ID=(.+)$', content, re.MULTILINE)
id_like_match = re.search(r'^ID_LIKE=(.+)$', content, re.MULTILINE) id_like_match = re.search(r'^ID_LIKE=(.+)$', content, re.MULTILINE)
distro_id = id_match.group(1).strip('"\'') if id_match else "" distro_id = id_match.group(1).strip('"\'') if id_match else ""
id_like = id_like_match.group(1).strip('"\'') if id_like_match else "" id_like = id_like_match.group(1).strip('"\'') if id_like_match else ""
# 直接匹配 log_info(f"ID={distro_id}, ID_LIKE={id_like}")
if distro_id == "ubuntu": if distro_id == "ubuntu":
log_info(f"检测到发行版: ubuntu")
return "ubuntu" return "ubuntu"
elif distro_id == "debian": elif distro_id == "debian":
log_info(f"检测到发行版: debian")
return "debian" return "debian"
elif distro_id in ["arch", "manjaro", "endeavouros"]: elif distro_id in ["arch", "manjaro", "endeavouros"]:
log_info(f"检测到发行版: arch (ID={distro_id})")
return "arch" return "arch"
elif distro_id in ["centos", "rhel", "rocky", "almalinux"]: elif distro_id in ["centos", "rhel", "rocky", "almalinux"]:
log_info(f"检测到发行版: centos (ID={distro_id})")
return "centos" return "centos"
elif distro_id == "fedora": elif distro_id == "fedora":
log_info(f"检测到发行版: fedora")
return "fedora" return "fedora"
elif distro_id in ["opensuse", "opensuse-leap", "opensuse-tumbleweed"]: elif distro_id in ["opensuse", "opensuse-leap", "opensuse-tumbleweed"]:
log_info(f"检测到发行版: opensuse (ID={distro_id})")
return "opensuse" return "opensuse"
# 通过 ID_LIKE 推断
if "ubuntu" in id_like: if "ubuntu" in id_like:
log_info(f"通过 ID_LIKE 推断: ubuntu")
return "ubuntu" return "ubuntu"
elif "debian" in id_like: elif "debian" in id_like:
log_info(f"通过 ID_LIKE 推断: debian")
return "debian" return "debian"
elif "arch" in id_like: elif "arch" in id_like:
log_info(f"通过 ID_LIKE 推断: arch")
return "arch" return "arch"
elif "rhel" in id_like or "centos" in id_like or "fedora" in id_like: elif "rhel" in id_like or "centos" in id_like or "fedora" in id_like:
return "centos" # 使用centos命令集 log_info(f"通过 ID_LIKE 推断: centos")
return "centos"
elif "suse" in id_like: elif "suse" in id_like:
log_info(f"通过 ID_LIKE 推断: opensuse")
return "opensuse" return "opensuse"
log_warning(f"无法识别的发行版ID={distro_id}, ID_LIKE={id_like}")
return "unknown" return "unknown"
except Exception as e: except Exception as e:
print(f"[Backend] 无法读取os-release文件: {e}") log_error(f"读取 os-release 文件失败: {e}")
return "unknown" return "unknown"
def check_chroot_environment(mount_point: str) -> Tuple[bool, str]:
"""
检查 chroot 环境是否可用。
"""
log_step("检查 chroot 环境", f"挂载点: {mount_point}")
# 检查关键命令是否存在
critical_commands = ["grub-install", "grub-mkconfig", "update-grub"]
found_commands = []
for cmd in critical_commands:
success, _, _ = run_command(["sudo", "chroot", mount_point, "which", cmd],
f"检查命令 {cmd}", timeout=10)
if success:
found_commands.append(cmd)
log_info(f" ✓ 找到命令: {cmd}")
else:
log_warning(f" ✗ 未找到命令: {cmd}")
if not found_commands:
log_error(f"chroot 环境中未找到关键的 GRUB 命令")
return False, "chroot 环境中缺少 GRUB 命令"
# 检查 grub-install 版本
success, stdout, _ = run_command(["sudo", "chroot", mount_point, "grub-install", "--version"],
"检查 grub-install 版本", timeout=10)
if success:
log_info(f"grub-install 版本: {stdout.strip()}")
# 检查 EFI 目录UEFI 模式)
efi_dir = os.path.join(mount_point, "boot/efi")
if os.path.ismount(efi_dir):
log_info(f"✓ EFI 分区已挂载到 /boot/efi")
# 检查 EFI 目录权限
try:
stat = os.stat(efi_dir)
log_info(f" EFI 目录权限: {oct(stat.st_mode)}")
except Exception as e:
log_warning(f" 无法获取 EFI 目录权限: {e}")
else:
log_info(f"EFI 分区未挂载(可能是 BIOS 模式)")
return True, ""
def chroot_and_repair_grub(mount_point: str, target_disk: str, def chroot_and_repair_grub(mount_point: str, target_disk: str,
is_uefi: bool = False, distro_type: str = "unknown") -> Tuple[bool, str]: is_uefi: bool = False, distro_type: str = "unknown") -> Tuple[bool, str]:
""" """
Chroot到目标系统并执行GRUB修复命令。 Chroot到目标系统并执行GRUB修复命令。
:param mount_point: 目标系统根分区的挂载点
:param target_disk: GRUB要安装到的物理磁盘
:param is_uefi: 目标系统是否使用UEFI启动
:param distro_type: 目标系统发行版类型
:return: (True/False, error_message)
""" """
log_step("修复 GRUB", f"目标磁盘: {target_disk}, UEFI: {is_uefi}, 发行版: {distro_type}")
if not validate_device_path(target_disk): if not validate_device_path(target_disk):
log_error(f"无效的目标磁盘路径: {target_disk}")
return False, f"无效的目标磁盘路径: {target_disk}" return False, f"无效的目标磁盘路径: {target_disk}"
# 检查 chroot 环境
ok, err = check_chroot_environment(mount_point)
if not ok:
return False, err
chroot_cmd_prefix = ["sudo", "chroot", mount_point] chroot_cmd_prefix = ["sudo", "chroot", mount_point]
# 1. 安装GRUB到目标磁盘 # 1. 安装GRUB
if is_uefi: log_info(f"[1/2] 安装 GRUB 到 {target_disk}")
# UEFI模式首先尝试正常安装注册NVRAM启动项
success, _, stderr = run_command(
chroot_cmd_prefix + ["grub-install", "--target=x86_64-efi",
"--efi-directory=/boot/efi", "--bootloader-id=GRUB"],
"安装UEFI GRUB带NVRAM"
)
# 如果失败是因为EFI变量不支持常见于Live环境修复外部磁盘使用 --no-nvram 重试 if is_uefi:
if not success and ("EFI variables are not supported" in stderr or log_info(f"UEFI 模式安装...")
"efibootmgr" in stderr or
"NVRAM" in stderr): # 检查 EFI 分区是否正确挂载
print(f"[Backend] 警告: EFI变量访问失败尝试不使用NVRAM (--no-nvram) 安装...") efi_check_path = os.path.join(mount_point, "boot/efi/EFI")
success, _, stderr = run_command( if os.path.exists(efi_check_path):
log_info(f"✓ EFI 目录存在: {efi_check_path}")
try:
contents = os.listdir(efi_check_path)
log_info(f" 当前 EFI 目录内容: {contents}")
except Exception as e:
log_warning(f" 无法列出 EFI 目录: {e}")
else:
log_warning(f"✗ EFI 目录不存在: {efi_check_path}")
# 第一次尝试:正常安装
log_info(f"尝试 1/3: 标准 UEFI 安装(带 NVRAM...")
success, stdout, stderr = run_command(
chroot_cmd_prefix + ["grub-install", "--target=x86_64-efi", chroot_cmd_prefix + ["grub-install", "--target=x86_64-efi",
"--efi-directory=/boot/efi", "--bootloader-id=GRUB", "--efi-directory=/boot/efi", "--bootloader-id=GRUB",
"--no-nvram"], "--verbose"],
"安装UEFI GRUB带NVRAM" "安装UEFI GRUB带NVRAM",
timeout=60
) )
# 如果仍然失败,尝试 --removable 选项(某些固件需要)
if not success: if not success:
print(f"[Backend] 警告: 标准安装失败,尝试 --removable 选项...") log_warning(f"标准安装失败,错误: {stderr}")
success, _, stderr = run_command(
if "EFI variables are not supported" in stderr or "efibootmgr" in stderr:
log_info(f"尝试 2/3: 使用 --no-nvram 选项...")
success, stdout, stderr = run_command(
chroot_cmd_prefix + ["grub-install", "--target=x86_64-efi",
"--efi-directory=/boot/efi", "--bootloader-id=GRUB",
"--no-nvram", "--verbose"],
"安装UEFI GRUB不带NVRAM",
timeout=60
)
if not success:
log_info(f"尝试 3/3: 使用 --removable 选项...")
success, stdout, stderr = run_command(
chroot_cmd_prefix + ["grub-install", "--target=x86_64-efi", chroot_cmd_prefix + ["grub-install", "--target=x86_64-efi",
"--efi-directory=/boot/efi", "--efi-directory=/boot/efi",
"--removable"], "--removable", "--verbose"],
"安装UEFI GRUB可移动模式" "安装UEFI GRUB可移动模式",
timeout=60
) )
# 检查安装结果
if success:
log_info(f"✓ GRUB EFI 文件安装成功")
# 检查生成的文件
efi_grub_path = os.path.join(mount_point, "boot/efi/EFI/GRUB")
efi_boot_path = os.path.join(mount_point, "boot/efi/EFI/Boot")
for path in [efi_grub_path, efi_boot_path]:
if os.path.exists(path):
log_info(f" 检查目录: {path}")
try:
files = os.listdir(path)
for f in files:
full = os.path.join(path, f)
size = os.path.getsize(full)
log_info(f" - {f} ({size} bytes)")
except Exception as e:
log_warning(f" 无法列出: {e}")
else: else:
success, _, stderr = run_command( log_info(f"BIOS 模式安装...")
chroot_cmd_prefix + ["grub-install", target_disk], success, stdout, stderr = run_command(
"安装BIOS GRUB" chroot_cmd_prefix + ["grub-install", "--verbose", target_disk],
"安装BIOS GRUB",
timeout=60
) )
if not success: if not success:
log_error(f"GRUB 安装失败")
return False, f"GRUB安装失败: {stderr}" return False, f"GRUB安装失败: {stderr}"
# 2. 更新GRUB配置文件 log_info(f"✓ GRUB 安装成功")
# 2. 更新GRUB配置
log_info(f"[2/2] 更新 GRUB 配置文件")
grub_update_cmd = [] grub_update_cmd = []
config_path = ""
if distro_type in ["debian", "ubuntu"]: if distro_type in ["debian", "ubuntu"]:
grub_update_cmd = ["update-grub"] grub_update_cmd = ["update-grub"]
config_path = "/boot/grub/grub.cfg"
elif distro_type == "arch": elif distro_type == "arch":
grub_update_cmd = ["grub-mkconfig", "-o", "/boot/grub/grub.cfg"] grub_update_cmd = ["grub-mkconfig", "-o", "/boot/grub/grub.cfg"]
config_path = "/boot/grub/grub.cfg"
elif distro_type == "centos": elif distro_type == "centos":
# 检测实际存在的配置文件路径
grub2_path = os.path.join(mount_point, "boot/grub2/grub.cfg") grub2_path = os.path.join(mount_point, "boot/grub2/grub.cfg")
grub_path = os.path.join(mount_point, "boot/grub/grub.cfg")
if os.path.exists(os.path.dirname(grub2_path)): if os.path.exists(os.path.dirname(grub2_path)):
grub_update_cmd = ["grub2-mkconfig", "-o", "/boot/grub2/grub.cfg"] grub_update_cmd = ["grub2-mkconfig", "-o", "/boot/grub2/grub.cfg"]
config_path = "/boot/grub2/grub.cfg"
else: else:
grub_update_cmd = ["grub2-mkconfig", "-o", "/boot/grub/grub.cfg"] grub_update_cmd = ["grub2-mkconfig", "-o", "/boot/grub/grub.cfg"]
config_path = "/boot/grub/grub.cfg"
elif distro_type == "fedora": elif distro_type == "fedora":
grub_update_cmd = ["grub2-mkconfig", "-o", "/boot/grub2/grub.cfg"] grub_update_cmd = ["grub2-mkconfig", "-o", "/boot/grub2/grub.cfg"]
config_path = "/boot/grub2/grub.cfg"
elif distro_type == "opensuse": elif distro_type == "opensuse":
grub_update_cmd = ["grub2-mkconfig", "-o", "/boot/grub2/grub.cfg"] grub_update_cmd = ["grub2-mkconfig", "-o", "/boot/grub2/grub.cfg"]
config_path = "/boot/grub2/grub.cfg"
else: else:
# 尝试通用命令,先检测配置文件路径
for cfg_path in ["/boot/grub/grub.cfg", "/boot/grub2/grub.cfg"]: for cfg_path in ["/boot/grub/grub.cfg", "/boot/grub2/grub.cfg"]:
full_path = os.path.join(mount_point, cfg_path.lstrip('/')) full_path = os.path.join(mount_point, cfg_path.lstrip('/'))
if os.path.exists(os.path.dirname(full_path)): if os.path.exists(os.path.dirname(full_path)):
grub_update_cmd = ["grub-mkconfig", "-o", cfg_path] grub_update_cmd = ["grub-mkconfig", "-o", cfg_path]
config_path = cfg_path
break break
else: else:
grub_update_cmd = ["grub-mkconfig", "-o", "/boot/grub/grub.cfg"] grub_update_cmd = ["grub-mkconfig", "-o", "/boot/grub/grub.cfg"]
config_path = "/boot/grub/grub.cfg"
success, _, stderr = run_command( log_info(f"使用命令: {' '.join(grub_update_cmd)}")
log_info(f"配置文件路径: {config_path}")
success, stdout, stderr = run_command(
chroot_cmd_prefix + grub_update_cmd, chroot_cmd_prefix + grub_update_cmd,
"更新GRUB配置文件" "更新GRUB配置文件",
timeout=120
) )
if not success: if not success:
log_error(f"GRUB 配置文件更新失败: {stderr}")
return False, f"GRUB配置文件更新失败: {stderr}" return False, f"GRUB配置文件更新失败: {stderr}"
log_info(f"✓ GRUB 配置文件更新成功")
# 检查生成的配置文件
full_config_path = os.path.join(mount_point, config_path.lstrip('/'))
if os.path.exists(full_config_path):
size = os.path.getsize(full_config_path)
log_info(f" 配置文件大小: {size} bytes")
# 检查配置文件中是否包含菜单项
try:
with open(full_config_path, 'r') as f:
content = f.read()
menu_entries = content.count('menuentry')
log_info(f" 发现 {menu_entries} 个启动菜单项")
except Exception as e:
log_warning(f" 无法读取配置文件: {e}")
else:
log_warning(f" 配置文件未找到: {full_config_path}")
# UEFI 模式下输出重要提示
if is_uefi:
log_step("UEFI 修复完成提示")
log_info(f"GRUB EFI 文件已安装到 EFI 分区")
log_info(f"如果无法启动,请检查:")
log_info(f" 1. BIOS/UEFI 设置中是否启用了 UEFI 启动模式")
log_info(f" 2. 启动顺序中是否有 'GRUB''Linux' 选项")
log_info(f" 3. 安全启动 (Secure Boot) 是否已禁用")
log_info(f" 4. EFI 分区中的文件: /EFI/GRUB/grubx64.efi")
return True, "" return True, ""
def unmount_target_system(mount_point: str) -> Tuple[bool, str]: def unmount_target_system(mount_point: str) -> Tuple[bool, str]:
""" """
卸载所有挂载的分区。 卸载所有挂载的分区。
:param mount_point: 目标系统根分区的挂载点
:return: (True/False, error_message)
""" """
print(f"[Backend] 正在卸载分区 from {mount_point}...") log_step("卸载目标系统", f"挂载点: {mount_point}")
success = True success = True
error_msg = "" error_msg = ""
# 按照挂载的逆序卸载:
# 挂载顺序: /dev, /dev/pts, /proc, /sys, /run, /boot/efi, /boot, 根分区
# 卸载顺序: /run, /sys, /proc, /dev/pts, /dev, /boot/efi, /boot, 根分区
# 1. 卸载绑定路径(先卸载子目录)
bind_paths = ["/run", "/sys", "/proc", "/dev/pts", "/dev"] bind_paths = ["/run", "/sys", "/proc", "/dev/pts", "/dev"]
for path in bind_paths: for path in bind_paths:
target_path = os.path.join(mount_point, path.lstrip('/')) target_path = os.path.join(mount_point, path.lstrip('/'))
@@ -491,43 +705,51 @@ def unmount_target_system(mount_point: str) -> Tuple[bool, str]:
if not s: if not s:
success = False success = False
error_msg += f"卸载绑定 {target_path} 失败: {stderr}\n" error_msg += f"卸载绑定 {target_path} 失败: {stderr}\n"
else:
log_debug(f"未挂载,跳过: {target_path}")
# 2. 卸载 /boot/efi (如果存在)
efi_mount_point = os.path.join(mount_point, "boot/efi") efi_mount_point = os.path.join(mount_point, "boot/efi")
if os.path.ismount(efi_mount_point): if os.path.ismount(efi_mount_point):
s, _, stderr = run_command(["sudo", "umount", efi_mount_point], f"卸载 {efi_mount_point}") s, _, stderr = run_command(["sudo", "umount", efi_mount_point], f"卸载 EFI 分区")
if not s: if not s:
success = False success = False
error_msg += f"卸载 {efi_mount_point} 失败: {stderr}\n" error_msg += f"卸载 EFI 分区失败: {stderr}\n"
else:
log_debug(f"EFI 分区未挂载,跳过")
# 3. 卸载 /boot (如果存在且是独立挂载的)
boot_mount_point = os.path.join(mount_point, "boot") boot_mount_point = os.path.join(mount_point, "boot")
if os.path.ismount(boot_mount_point): if os.path.ismount(boot_mount_point):
s, _, stderr = run_command(["sudo", "umount", boot_mount_point], f"卸载 {boot_mount_point}") s, _, stderr = run_command(["sudo", "umount", boot_mount_point], f"卸载 /boot 分区")
if not s: if not s:
success = False success = False
error_msg += f"卸载 {boot_mount_point} 失败: {stderr}\n" error_msg += f"卸载 /boot 分区失败: {stderr}\n"
else:
log_debug(f"/boot 分区未挂载,跳过")
# 4. 卸载根分区
if os.path.ismount(mount_point): if os.path.ismount(mount_point):
s, _, stderr = run_command(["sudo", "umount", mount_point], f"卸载根分区 {mount_point}") s, _, stderr = run_command(["sudo", "umount", mount_point], f"卸载根分区")
if not s: if not s:
success = False success = False
error_msg += f"卸载根分区 {mount_point} 失败: {stderr}\n" error_msg += f"卸载根分区失败: {stderr}\n"
else:
log_debug(f"根分区未挂载,跳过")
# 5. 清理临时挂载点目录
if os.path.exists(mount_point) and not os.path.ismount(mount_point): if os.path.exists(mount_point) and not os.path.ismount(mount_point):
try: try:
shutil.rmtree(mount_point) shutil.rmtree(mount_point)
print(f"[Backend] 清理临时目录 {mount_point}") log_info(f" 清理临时目录: {mount_point}")
except OSError as e: except OSError as e:
print(f"[Backend] 无法删除临时目录 {mount_point}: {e}") log_warning(f"无法删除临时目录 {mount_point}: {e}")
error_msg += f"无法删除临时目录 {mount_point}: {e}\n" error_msg += f"无法删除临时目录: {e}\n"
if success:
log_info(f"✓ 所有分区已卸载")
else:
log_warning(f"部分分区卸载失败")
return success, error_msg return success, error_msg
# 示例如果直接运行backend.py可以进行一些测试
if __name__ == "__main__": if __name__ == "__main__":
print("--- 运行后端测试 ---") print("--- 运行后端测试 ---")
@@ -539,35 +761,3 @@ if __name__ == "__main__":
print("EFI分区:", [p['name'] for p in efi_partitions]) print("EFI分区:", [p['name'] for p in efi_partitions])
else: else:
print("扫描分区失败:", err) print("扫描分区失败:", err)
# 假设有一些分区可供测试,实际运行时需要用户输入
# 例如:
# test_root_partition = "/dev/sdaX"
# test_target_disk = "/dev/sda"
# test_efi_partition = "/dev/sdY"
#
# if test_root_partition and test_target_disk:
# print(f"\n--- 挂载系统测试 (模拟 {test_root_partition}) ---")
# mount_ok, mnt_pt, mount_err = mount_target_system(test_root_partition, efi_partition=test_efi_partition)
# if mount_ok:
# print(f"系统挂载成功到: {mnt_pt}")
# distro = detect_distro_type(mnt_pt)
# print(f"检测到发行版: {distro}")
#
# print("\n--- 修复GRUB测试 ---")
# repair_ok, repair_err = chroot_and_repair_grub(mnt_pt, test_target_disk, is_uefi=True, distro_type=distro)
# if repair_ok:
# print("GRUB修复成功")
# else:
# print("GRUB修复失败:", repair_err)
#
# print("\n--- 卸载系统测试 ---")
# unmount_ok, unmount_err = unmount_target_system(mnt_pt)
# if unmount_ok:
# print("系统卸载成功!")
# else:
# print("系统卸载失败:", unmount_err)
# else:
# print("系统挂载失败:", mount_err)
# else:
# print("\n请在代码中设置 test_root_partition 和 test_target_disk 进行完整测试。")