Files
BootRepairTool/backend.py
2026-02-12 03:07:06 +08:00

2543 lines
96 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# backend.py
# 参考 Calamares 引导安装模块优化
# 支持多架构、EFI fallback、Secure Boot、特殊文件系统等
import subprocess
import os
import json
import time
import re
import shutil
import glob
import platform
from typing import Tuple, List, Dict, Optional, Callable
from enum import Enum
# 常量配置
COMMAND_TIMEOUT = 300 # 命令执行超时时间(秒)
DEFAULT_GRUB_CONFIG_PATHS = [
"/boot/grub/grub.cfg",
"/boot/grub2/grub.cfg",
]
# EFI 架构参数映射 (参考 Calamares)
EFI_ARCH_PARAMETERS = {
"32": {
"target": "i386-efi",
"grub_file": "grubia32.efi",
"boot_file": "bootia32.efi",
"shim_file": "shimia32.efi"
},
"64": {
"x86_64": {
"target": "x86_64-efi",
"grub_file": "grubx64.efi",
"boot_file": "bootx64.efi",
"shim_file": "shimx64.efi"
},
"aarch64": {
"target": "arm64-efi",
"grub_file": "grubaa64.efi",
"boot_file": "bootaa64.efi",
"shim_file": "shimaa64.efi"
},
"loongarch64": {
"target": "loongarch64-efi",
"grub_file": "grubloongarch64.efi",
"boot_file": "bootloongarch64.efi",
"shim_file": "shimloongarch64.efi"
}
}
}
class LogLevel(Enum):
"""日志级别"""
DEBUG = "debug"
INFO = "info"
WARNING = "warning"
ERROR = "error"
SUCCESS = "success"
STEP = "step"
# 全局日志回调函数
_log_callback: Optional[Callable[[str, LogLevel], None]] = None
def set_log_callback(callback: Callable[[str, LogLevel], None]):
"""设置日志回调函数,用于前端显示"""
global _log_callback
_log_callback = callback
def _log(message: str, level: LogLevel = LogLevel.INFO):
"""内部日志函数"""
# 同时输出到控制台和回调
prefix = {
LogLevel.DEBUG: "[DEBUG]",
LogLevel.INFO: "[INFO]",
LogLevel.WARNING: "[WARN]",
LogLevel.ERROR: "[ERROR]",
LogLevel.SUCCESS: "[SUCCESS]",
LogLevel.STEP: "[STEP]"
}.get(level, "[INFO]")
print(f"{prefix} {message}")
if _log_callback:
try:
_log_callback(message, level)
except Exception:
pass
def log_step(step_name: str, detail: str = ""):
"""输出步骤日志"""
separator = "=" * 60
_log(f"{separator}", LogLevel.STEP)
_log(f"{step_name}", LogLevel.STEP)
if detail:
_log(f"详情: {detail}", LogLevel.STEP)
_log(f"{separator}", LogLevel.STEP)
def log_info(msg: str):
"""输出信息日志"""
_log(msg, LogLevel.INFO)
def log_warning(msg: str):
"""输出警告日志"""
_log(msg, LogLevel.WARNING)
def log_error(msg: str):
"""输出错误日志"""
_log(msg, LogLevel.ERROR)
def log_debug(msg: str):
"""输出调试日志"""
_log(msg, LogLevel.DEBUG)
def log_success(msg: str):
"""输出成功日志"""
_log(msg, LogLevel.SUCCESS)
def validate_device_path(path: str) -> bool:
"""
验证设备路径格式是否合法(防止命令注入)。
支持标准分区、LVM、LUKS、RAID、ZFS等设备。
"""
if not path:
return False
patterns = [
r'^/dev/[a-zA-Z0-9_-]+$',
r'^/dev/mapper/[a-zA-Z0-9_-]+$',
r'^/dev/mapper/[a-zA-Z0-9_]+-[a-zA-Z0-9_-]+$',
r'^/dev/md[0-9]+$',
r'^/dev/nvme[0-9]+n[0-9]+(p[0-9]+)?$',
r'^/dev/mmcblk[0-9]+(p[0-9]+)?$',
r'^/dev/zd[0-9]+$',
]
return any(re.match(pattern, path) is not None for pattern in patterns)
def run_command(command: List[str], description: str = "执行命令",
cwd: Optional[str] = None, shell: bool = False,
timeout: int = COMMAND_TIMEOUT,
env: Optional[Dict[str, str]] = None) -> Tuple[bool, str, str]:
"""
运行一个系统命令,并捕获其输出和错误。
支持环境变量设置用于ZFS等特殊文件系统
"""
cmd_str = ' '.join(command)
log_info(f"执行命令: {cmd_str}")
log_debug(f"工作目录: {cwd or '当前目录'}, 超时: {timeout}")
# 准备环境变量
run_env = None
if env:
run_env = os.environ.copy()
run_env.update(env)
log_debug(f"环境变量: {env}")
try:
result = subprocess.run(
command,
capture_output=True,
text=True,
check=True,
cwd=cwd,
shell=shell,
timeout=timeout,
env=run_env
)
if result.stdout:
log_debug(f"stdout: {result.stdout[:500]}")
if result.stderr:
log_debug(f"stderr: {result.stderr[:500]}")
log_success(f"✓ 命令成功: {description}")
return True, result.stdout, result.stderr
except subprocess.CalledProcessError as e:
log_error(f"✗ 命令失败: {description}")
log_error(f"返回码: {e.returncode}")
log_error(f"stdout: {e.stdout}")
log_error(f"stderr: {e.stderr}")
return False, e.stdout, e.stderr
except subprocess.TimeoutExpired:
log_error(f"✗ 命令超时(超过 {timeout} 秒): {description}")
return False, "", f"命令执行超时"
except FileNotFoundError:
log_error(f"✗ 命令未找到: {command[0]}")
return False, "", f"命令未找到: {command[0]}"
except Exception as e:
log_error(f"✗ 发生未知错误: {e}")
return False, "", str(e)
def get_efi_word_size() -> str:
"""
检测 EFI 位数32或64位
通过读取 /sys/firmware/efi/fw_platform_size 获取。
如果内核较旧不支持,默认假设为 64 位。
"""
# 首先检查是否是 EFI 系统
if not os.path.exists("/sys/firmware/efi"):
log_debug("非 EFI 系统,返回 64 位作为默认值")
return "64"
try:
# 尝试读取新内核的 fw_platform_size
if os.path.exists("/sys/firmware/efi/fw_platform_size"):
with open("/sys/firmware/efi/fw_platform_size", "r") as f:
efi_bitness = f.read(2).strip()
if efi_bitness in ["32", "64"]:
log_debug(f"检测到 EFI 位数: {efi_bitness}")
return efi_bitness
# 旧内核:检查是否存在 64 位 EFI 相关文件
# 如果存在 /sys/firmware/efi/vars 或 /sys/firmware/efi/efivars假设为 64 位
if os.path.exists("/sys/firmware/efi/vars") or os.path.exists("/sys/firmware/efi/efivars"):
log_debug("检测到 EFI 环境,假设为 64 位(旧内核)")
return "64"
except FileNotFoundError:
log_debug("无法读取 EFI 位数文件,假设为 64 位")
except Exception as e:
log_warning(f"读取 EFI 位数时出错: {e}")
return "64"
def get_grub_efi_parameters() -> Optional[Tuple[str, str, str]]:
"""
获取 GRUB EFI 安装参数。
返回: (target_name, grub.efi_name, boot.efi_name)
参考 Calamares 的实现。
"""
efi_bitness = get_efi_word_size()
cpu_type = platform.machine()
log_debug(f"系统架构: {cpu_type}, EFI位数: {efi_bitness}")
if efi_bitness == "32":
# 假设所有 32 位都是传统 x86
params = EFI_ARCH_PARAMETERS["32"]
return params["target"], params["grub_file"], params["boot_file"]
elif efi_bitness == "64" and cpu_type == "aarch64":
params = EFI_ARCH_PARAMETERS["64"]["aarch64"]
return params["target"], params["grub_file"], params["boot_file"]
elif efi_bitness == "64" and cpu_type == "loongarch64":
params = EFI_ARCH_PARAMETERS["64"]["loongarch64"]
return params["target"], params["grub_file"], params["boot_file"]
elif efi_bitness == "64":
# 如果不是 ARM则为 AMD64
params = EFI_ARCH_PARAMETERS["64"]["x86_64"]
return params["target"], params["grub_file"], params["boot_file"]
log_warning(f"无法确定 GRUB EFI 参数: bits={efi_bitness}, cpu={cpu_type}")
return None
def get_shim_filename() -> Optional[str]:
"""获取当前架构的 shim 文件名"""
efi_bitness = get_efi_word_size()
cpu_type = platform.machine()
if efi_bitness == "32":
return EFI_ARCH_PARAMETERS["32"]["shim_file"]
elif efi_bitness == "64" and cpu_type == "aarch64":
return EFI_ARCH_PARAMETERS["64"]["aarch64"]["shim_file"]
elif efi_bitness == "64" and cpu_type == "loongarch64":
return EFI_ARCH_PARAMETERS["64"]["loongarch64"]["shim_file"]
elif efi_bitness == "64":
return EFI_ARCH_PARAMETERS["64"]["x86_64"]["shim_file"]
return None
def is_live_environment() -> bool:
"""
检测是否在 Live 环境中运行。
Live 环境通常无法访问 EFI 变量。
"""
efivars_path = "/sys/firmware/efi/efivars"
if not os.path.exists(efivars_path):
log_debug("Live 环境检测: /sys/firmware/efi/efivars 不存在")
return True
try:
files = os.listdir(efivars_path)
if not files:
log_debug("Live 环境检测: efivars 目录为空")
return True
except PermissionError:
log_debug("Live 环境检测: 无法访问 efivars")
return True
log_debug("Live 环境检测: 看起来不是 Live 环境")
return False
def check_secure_boot_status() -> Tuple[bool, bool]:
"""
检查 Secure Boot 状态。
返回: (是否支持检测, 是否启用)
"""
try:
# 方法1: 通过 mokutil 检查
success, stdout, _ = run_command(
["mokutil", "--sb-state"],
"检查 Secure Boot 状态",
timeout=10
)
if success:
if "enabled" in stdout.lower():
log_info("Secure Boot 状态: 已启用")
return True, True
elif "disabled" in stdout.lower():
log_info("Secure Boot 状态: 已禁用")
return True, False
except Exception:
pass
# 方法2: 通过 efi 变量检查
try:
sb_var_path = "/sys/firmware/efi/efivars/SecureBoot-8be4df61-93ca-11d2-aa0d-00e098032b8c"
if os.path.exists(sb_var_path):
with open(sb_var_path, "rb") as f:
data = f.read()
if len(data) >= 5:
# 第5个字节是 Secure Boot 状态
sb_enabled = data[4] == 1
log_info(f"Secure Boot 状态: {'已启用' if sb_enabled else '已禁用'}")
return True, sb_enabled
except Exception as e:
log_debug(f"通过 efi 变量检查 Secure Boot 失败: {e}")
log_warning("无法检测 Secure Boot 状态")
return False, False
def _process_partition(block_device: Dict, all_disks: List, all_partitions: List,
all_efi_partitions: List) -> None:
"""
处理单个块设备递归处理子分区、LVM逻辑卷、btrfs子卷等。
"""
dev_name = f"/dev/{block_device.get('name', '')}"
dev_type = block_device.get("type")
# 跳过 loop 设备和 Live 系统自身的挂载
mountpoint = block_device.get("mountpoint")
if mountpoint and (mountpoint.startswith("/run/media") or
mountpoint.startswith("/cdrom") or
mountpoint.startswith("/live") or
mountpoint.startswith("/iso")):
log_debug(f"跳过 Live 系统挂载点: {dev_name} -> {mountpoint}")
return
if dev_type == "disk":
# 跳过 loop 和 rom 设备
if block_device.get("rota") is not None or block_device.get("size", "0") == "0":
pass # 可能是有效磁盘
all_disks.append({
"name": dev_name,
"size": block_device.get("size", "unknown"),
"model": block_device.get("model", "") # 可能为空(旧版本 lsblk
})
for child in block_device.get("children", []):
_process_partition(child, all_disks, all_partitions, all_efi_partitions)
elif dev_type == "part":
# 跳过 Live 系统的根分区
if mountpoint and mountpoint == "/":
# 检查是否是 Live 系统的根(通过检查是否有 /live 或特定标记)
live_marker = os.path.join(mountpoint, "live")
if os.path.exists(live_marker):
log_debug(f"跳过 Live 系统根分区: {dev_name}")
for child in block_device.get("children", []):
_process_partition(child, all_disks, all_partitions, all_efi_partitions)
return
part_info = {
"name": dev_name,
"fstype": block_device.get("fstype", "unknown"),
"size": block_device.get("size", "unknown"),
"mountpoint": mountpoint,
"uuid": block_device.get("uuid"),
"partlabel": block_device.get("partlabel"),
"label": block_device.get("label"),
"parttype": (block_device.get("parttype") or "").upper(),
"fsver": block_device.get("fsver", ""),
"is_subvolume": False
}
# 检测 btrfs 子卷
if part_info["fstype"] == "btrfs":
part_info["is_btrfs"] = True
# 尝试获取子卷信息
try:
success, stdout, _ = run_command(
["btrfs", "subvolume", "list", dev_name],
f"检测 btrfs 子卷: {dev_name}",
timeout=10
)
if success:
subvolumes = []
for line in stdout.strip().split('\n'):
if line:
# 格式: ID 256 gen 16 top level 5 path @
match = re.search(r'path\s+(\S+)', line)
if match:
subvolumes.append(match.group(1))
if subvolumes:
part_info["subvolumes"] = subvolumes
log_debug(f"{dev_name} 的 btrfs 子卷: {subvolumes}")
except Exception as e:
log_debug(f"检测 btrfs 子卷失败: {e}")
all_partitions.append(part_info)
# 检测 EFI 系统分区
is_vfat = part_info["fstype"] == "vfat"
has_efi_label = part_info["partlabel"] and "EFI" in part_info["partlabel"].upper()
has_efi_name = part_info["label"] and "EFI" in part_info["label"].upper()
# EFI 系统分区的 GPT 类型 GUID
is_efi_type = part_info["parttype"] == "C12A7328-F81F-11D2-BA4B-00A0C93EC93B"
# 或者检测标志
is_efi_fs = block_device.get("fsver") == "FAT32" and is_vfat
if is_vfat and (has_efi_label or has_efi_name or is_efi_type):
all_efi_partitions.append(part_info)
log_debug(f"检测到 EFI 分区: {dev_name}")
for child in block_device.get("children", []):
_process_partition(child, all_disks, all_partitions, all_efi_partitions)
elif dev_type in ["lvm", "dm"]:
lv_name = block_device.get("name", "")
# 处理 LVM 命名格式 (vg-lv)
if "-" in lv_name and not lv_name.startswith("dm-"):
mapper_path = f"/dev/mapper/{lv_name}"
else:
mapper_path = dev_name
part_info = {
"name": mapper_path,
"fstype": block_device.get("fstype", "unknown"),
"size": block_device.get("size", "unknown"),
"mountpoint": mountpoint,
"uuid": block_device.get("uuid"),
"partlabel": block_device.get("partlabel"),
"label": block_device.get("label"),
"parttype": (block_device.get("parttype") or "").upper(),
"is_lvm": True,
"dm_name": lv_name
}
# 检测 LUKS 加密
if part_info["fstype"] == "crypto_LUKS":
part_info["is_luks"] = True
log_debug(f"检测到 LUKS 加密卷: {mapper_path}")
all_partitions.append(part_info)
for child in block_device.get("children", []):
_process_partition(child, all_disks, all_partitions, all_efi_partitions)
elif dev_type == "crypt":
# 已解密的 LUKS 设备
part_info = {
"name": dev_name,
"fstype": block_device.get("fstype", "unknown"),
"size": block_device.get("size", "unknown"),
"mountpoint": mountpoint,
"uuid": block_device.get("uuid"),
"is_luks_open": True,
"parent": block_device.get("pkname", "") # 可能为空(旧版本 lsblk
}
all_partitions.append(part_info)
for child in block_device.get("children", []):
_process_partition(child, all_disks, all_partitions, all_efi_partitions)
def scan_partitions() -> Tuple[bool, List, List, List, str]:
"""
扫描系统中的所有磁盘和分区。
支持标准分区、LVM、LUKS、btrfs 等。
"""
log_step("扫描系统分区", "使用 lsblk 获取磁盘和分区信息")
# 使用基础列名以确保兼容性(旧版本 lsblk 可能不支持 FSVER, ROTA, MODEL, PKNAME
success, stdout, stderr = run_command(
["sudo", "lsblk", "-J", "-o", "NAME,FSTYPE,SIZE,MOUNTPOINT,UUID,PARTLABEL,LABEL,TYPE,PARTTYPE"],
"扫描分区"
)
if not success:
return False, [], [], [], stderr
try:
data = json.loads(stdout)
all_disks = []
all_partitions = []
all_efi_partitions = []
for block_device in data.get("blockdevices", []):
_process_partition(block_device, all_disks, all_partitions, all_efi_partitions)
log_info(f"扫描完成: 发现 {len(all_disks)} 个磁盘, {len(all_partitions)} 个分区, {len(all_efi_partitions)} 个EFI分区")
for d in all_disks:
log_debug(f" 磁盘: {d['name']} ({d.get('model', '')})")
for p in all_partitions:
log_debug(f" 分区: {p['name']} ({p['fstype']})")
for e in all_efi_partitions:
log_info(f" EFI分区: {e['name']}")
return True, all_disks, all_partitions, all_efi_partitions, ""
except json.JSONDecodeError as e:
log_error(f"解析 lsblk 输出失败: {e}")
return False, [], [], [], f"解析lsblk输出失败: {e}"
except Exception as e:
log_error(f"处理分区数据时发生未知错误: {e}")
return False, [], [], [], f"处理分区数据时发生未知错误: {e}"
def detect_filesystem_type(partition: str) -> str:
"""
检测指定分区的文件系统类型。
使用 blkid 获取准确的文件系统信息。
"""
try:
success, stdout, _ = run_command(
["blkid", "-s", "TYPE", "-o", "value", partition],
f"检测文件系统类型: {partition}",
timeout=10
)
if success:
fs_type = stdout.strip()
log_debug(f"{partition} 的文件系统类型: {fs_type}")
return fs_type
except Exception as e:
log_debug(f"检测文件系统类型失败: {e}")
return "unknown"
def is_btrfs_subvolume(mount_point: str) -> bool:
"""检查指定路径是否是 btrfs 子卷"""
try:
success, stdout, _ = run_command(
["btrfs", "subvolume", "get-default", mount_point],
"检查 btrfs 子卷",
timeout=10
)
return success
except Exception:
return False
def get_btrfs_root_subvolume(partition: str) -> Optional[str]:
"""
获取 btrfs 分区的根子卷名称。
常见的子卷命名: @, @root, root
"""
try:
success, stdout, _ = run_command(
["btrfs", "subvolume", "list", partition],
f"获取 btrfs 子卷: {partition}",
timeout=10
)
if success:
subvolumes = []
for line in stdout.strip().split('\n'):
match = re.search(r'path\s+(\S+)', line)
if match:
subvolumes.append(match.group(1))
# 优先查找常见的根子卷名称
for candidate in ["@", "root", "@root", "ROOT", "@ROOT"]:
if candidate in subvolumes:
log_debug(f"找到 btrfs 根子卷: {candidate}")
return candidate
# 如果都没找到,返回第一个
if subvolumes:
log_debug(f"使用第一个 btrfs 子卷: {subvolumes[0]}")
return subvolumes[0]
except Exception as e:
log_debug(f"获取 btrfs 子卷失败: {e}")
return None
def _cleanup_partial_mount(mount_point: str, mounted_boot: bool, mounted_efi: bool,
bind_paths_mounted: List[str]) -> None:
"""清理部分挂载的资源"""
log_warning(f"清理部分挂载资源: {mount_point}")
for target_path in reversed(bind_paths_mounted):
if os.path.ismount(target_path):
run_command(["sudo", "umount", target_path], f"清理卸载绑定 {target_path}")
if mounted_efi:
efi_mount_point = os.path.join(mount_point, "boot/efi")
if os.path.ismount(efi_mount_point):
run_command(["sudo", "umount", efi_mount_point], "清理卸载EFI分区")
if mounted_boot:
boot_mount_point = os.path.join(mount_point, "boot")
if os.path.ismount(boot_mount_point):
run_command(["sudo", "umount", boot_mount_point], "清理卸载/boot分区")
if os.path.ismount(mount_point):
run_command(["sudo", "umount", mount_point], "清理卸载根分区")
if os.path.exists(mount_point) and not os.path.ismount(mount_point):
try:
os.rmdir(mount_point)
except OSError:
pass
def mount_target_system(root_partition: str, boot_partition: Optional[str] = None,
efi_partition: Optional[str] = None,
btrfs_subvolume: Optional[str] = None) -> Tuple[bool, str, str]:
"""
挂载目标系统的根分区、/boot分区和EFI分区到临时目录。
支持 btrfs 子卷挂载。
"""
log_step("挂载目标系统", f"根分区: {root_partition}, /boot: {boot_partition or ''}, EFI: {efi_partition or ''}")
if not validate_device_path(root_partition):
log_error(f"无效的根分区路径: {root_partition}")
return False, "", f"无效的根分区路径: {root_partition}"
if boot_partition and not validate_device_path(boot_partition):
log_error(f"无效的/boot分区路径: {boot_partition}")
return False, "", f"无效的/boot分区路径: {boot_partition}"
if efi_partition and not validate_device_path(efi_partition):
log_error(f"无效的EFI分区路径: {efi_partition}")
return False, "", f"无效的EFI分区路径: {efi_partition}"
mount_point = "/mnt_grub_repair_" + str(int(time.time()))
log_info(f"创建临时挂载点: {mount_point}")
try:
os.makedirs(mount_point, exist_ok=False)
log_info(f"✓ 挂载点创建成功")
except Exception as e:
log_error(f"创建挂载点失败: {e}")
return False, "", f"创建挂载点失败: {e}"
bind_paths_mounted = []
mounted_boot = False
mounted_efi = False
# 1. 挂载根分区
log_info(f"[1/4] 挂载根分区 {root_partition}{mount_point}")
# 检测文件系统类型
fs_type = detect_filesystem_type(root_partition)
# 构建挂载命令
mount_cmd = ["sudo", "mount"]
# btrfs 子卷处理
if fs_type == "btrfs" and btrfs_subvolume:
mount_cmd.extend(["-o", f"subvol={btrfs_subvolume}"])
log_info(f" 使用 btrfs 子卷: {btrfs_subvolume}")
mount_cmd.extend([root_partition, mount_point])
success, _, stderr = run_command(mount_cmd, f"挂载根分区 {root_partition}")
if not success:
log_error(f"挂载根分区失败,清理中...")
os.rmdir(mount_point)
return False, "", f"挂载根分区失败: {stderr}"
if not os.path.ismount(mount_point):
log_error(f"挂载点未激活: {mount_point}")
return False, "", "挂载根分区失败: 挂载点未激活"
log_success(f"✓ 根分区挂载成功")
# 检查关键目录
for check_dir in ["etc", "boot"]:
full_path = os.path.join(mount_point, check_dir)
if os.path.exists(full_path):
log_info(f" 发现目录: /{check_dir}")
else:
log_warning(f" 未找到目录: /{check_dir}")
# 2. 挂载 /boot 分区
if boot_partition:
log_info(f"[2/4] 挂载 /boot 分区 {boot_partition}")
boot_mount_point = os.path.join(mount_point, "boot")
if not os.path.exists(boot_mount_point):
os.makedirs(boot_mount_point)
log_debug(f"创建 /boot 挂载点")
success, _, stderr = run_command(["sudo", "mount", boot_partition, boot_mount_point],
f"挂载 /boot 分区 {boot_partition}")
if not success:
log_error(f"挂载 /boot 分区失败,开始清理...")
_cleanup_partial_mount(mount_point, False, False, [])
return False, "", f"挂载 /boot 分区失败: {stderr}"
mounted_boot = True
log_success(f"✓ /boot 分区挂载成功")
else:
log_info(f"[2/4] 无独立的 /boot 分区,跳过")
# 3. 挂载 EFI 分区
if efi_partition:
log_info(f"[3/4] 挂载 EFI 分区 {efi_partition}")
efi_mount_point = os.path.join(mount_point, "boot/efi")
if not os.path.exists(efi_mount_point):
os.makedirs(efi_mount_point)
log_debug(f"创建 /boot/efi 挂载点")
success, _, stderr = run_command(["sudo", "mount", efi_partition, efi_mount_point],
f"挂载 EFI 分区 {efi_partition}")
if not success:
log_error(f"挂载 EFI 分区失败,开始清理...")
_cleanup_partial_mount(mount_point, mounted_boot, False, [])
return False, "", f"挂载 EFI 分区失败: {stderr}"
mounted_efi = True
log_success(f"✓ EFI 分区挂载成功")
# 检查 EFI 分区内容
efi_path = os.path.join(mount_point, "boot/efi/EFI")
if os.path.exists(efi_path):
log_info(f" EFI 分区内容:")
try:
for item in os.listdir(efi_path):
log_info(f" - {item}")
except Exception as e:
log_warning(f" 无法列出 EFI 目录: {e}")
else:
log_info(f"[3/4] 无 EFI 分区,跳过")
# 4. 绑定伪文件系统
log_info(f"[4/4] 绑定伪文件系统 (/dev, /proc, /sys 等)")
bind_paths = ["/dev", "/dev/pts", "/proc", "/sys", "/run"]
for path in bind_paths:
target_path = os.path.join(mount_point, path.lstrip('/'))
if not os.path.exists(target_path):
os.makedirs(target_path)
log_debug(f"创建目录: {target_path}")
success, _, stderr = run_command(["sudo", "mount", "--bind", path, target_path],
f"绑定 {path}")
if not success:
log_error(f"绑定 {path} 失败,开始清理...")
_cleanup_partial_mount(mount_point, mounted_boot, mounted_efi, bind_paths_mounted)
return False, "", f"绑定 {path} 失败: {stderr}"
bind_paths_mounted.append(target_path)
log_success(f"✓ 所有绑定完成")
log_info(f"挂载摘要: 根分区 ✓, /boot {'' if mounted_boot else ''}, EFI {'' if mounted_efi else ''}")
return True, mount_point, ""
def detect_distro_type(mount_point: str) -> str:
"""
检测目标系统发行版类型。
支持更多发行版Arch, Debian, Ubuntu, CentOS/RHEL/Rocky/Alma,
Fedora, openSUSE, Void, Gentoo, NixOS 等。
"""
log_step("检测发行版类型", f"挂载点: {mount_point}")
os_release_path = os.path.join(mount_point, "etc/os-release")
log_info(f"检查文件: {os_release_path}")
if not os.path.exists(os_release_path):
log_warning(f"未找到 {os_release_path},尝试 /etc/issue")
issue_path = os.path.join(mount_point, "etc/issue")
if os.path.exists(issue_path):
try:
with open(issue_path, "r") as f:
content = f.read().lower()
log_debug(f"/etc/issue 内容: {content[:200]}")
if "ubuntu" in content:
log_info(f"通过 /etc/issue 检测到: ubuntu")
return "ubuntu"
elif "debian" in content:
log_info(f"通过 /etc/issue 检测到: debian")
return "debian"
elif "centos" in content or "rhel" in content:
log_info(f"通过 /etc/issue 检测到: centos")
return "centos"
elif "fedora" in content:
log_info(f"通过 /etc/issue 检测到: fedora")
return "fedora"
elif "void" in content:
log_info(f"通过 /etc/issue 检测到: void")
return "void"
elif "gentoo" in content:
log_info(f"通过 /etc/issue 检测到: gentoo")
return "gentoo"
except Exception as e:
log_warning(f"读取 /etc/issue 失败: {e}")
# 检查其他发行版特定文件
if os.path.exists(os.path.join(mount_point, "etc/arch-release")):
log_info("通过 arch-release 检测到: arch")
return "arch"
if os.path.exists(os.path.join(mount_point, "etc/gentoo-release")):
log_info("通过 gentoo-release 检测到: gentoo")
return "gentoo"
if os.path.exists(os.path.join(mount_point, "etc/nixos/configuration.nix")):
log_info("通过 configuration.nix 检测到: nixos")
return "nixos"
return "unknown"
try:
with open(os_release_path, "r") as f:
content = f.read()
log_debug(f"/etc/os-release 内容:\n{content}")
id_match = re.search(r'^ID=(.+)$', content, re.MULTILINE)
id_like_match = re.search(r'^ID_LIKE=(.+)$', content, re.MULTILINE)
distro_id = id_match.group(1).strip('"\'') if id_match else ""
id_like = id_like_match.group(1).strip('"\'') if id_like_match else ""
log_info(f"ID={distro_id}, ID_LIKE={id_like}")
# 直接匹配
distro_map = {
"ubuntu": "ubuntu",
"debian": "debian",
"arch": "arch",
"manjaro": "arch",
"endeavouros": "arch",
"garuda": "arch",
"cachyos": "arch",
"centos": "centos",
"rhel": "centos",
"rocky": "centos",
"almalinux": "centos",
"fedora": "fedora",
"opensuse": "opensuse",
"opensuse-leap": "opensuse",
"opensuse-tumbleweed": "opensuse",
"void": "void",
"gentoo": "gentoo",
"nixos": "nixos",
}
if distro_id in distro_map:
result = distro_map[distro_id]
log_info(f"检测到发行版: {result} (ID={distro_id})")
return result
# 通过 ID_LIKE 推断
like_map = {
"ubuntu": "ubuntu",
"debian": "debian",
"arch": "arch",
"rhel": "centos",
"centos": "centos",
"fedora": "fedora",
"suse": "opensuse",
}
for key, value in like_map.items():
if key in id_like:
log_info(f"通过 ID_LIKE 推断: {value}")
return value
log_warning(f"无法识别的发行版ID={distro_id}, ID_LIKE={id_like}")
return "unknown"
except Exception as e:
log_error(f"读取 os-release 文件失败: {e}")
return "unknown"
def _get_grub_packages(distro_type: str) -> Tuple[List[str], str]:
"""
获取安装 GRUB 包所需的包列表和包管理器命令。
返回: (包列表, 包管理器基命令)
"""
package_map = {
"centos": (["grub2-tools", "grub2-pc"], "yum install -y"),
"fedora": (["grub2-tools", "grub2-pc"], "dnf install -y"),
"rhel": (["grub2-tools", "grub2-pc"], "yum install -y"),
"rocky": (["grub2-tools", "grub2-pc"], "dnf install -y"),
"almalinux": (["grub2-tools", "grub2-pc"], "dnf install -y"),
"debian": (["grub-pc"], "apt-get install -y"),
"ubuntu": (["grub-pc"], "apt-get install -y"),
"arch": (["grub"], "pacman -S --noconfirm"),
"manjaro": (["grub"], "pacman -S --noconfirm"),
"opensuse": (["grub2"], "zypper install -y"),
"void": (["grub"], "xbps-install -y"),
"gentoo": (["sys-boot/grub"], "emerge"),
}
return package_map.get(distro_type, (["grub"], "包管理器安装"))
def _auto_install_grub(mount_point: str, distro_type: str) -> bool:
"""
自动在 chroot 环境中安装 GRUB 包。
返回是否成功。
"""
log_step("自动安装 GRUB 包", f"发行版: {distro_type}")
packages, pkg_cmd = _get_grub_packages(distro_type)
if not packages:
log_warning(f"未知的发行版 '{distro_type}',无法自动安装 GRUB")
return False
chroot_cmd_prefix = ["sudo", "chroot", mount_point]
# 1. 首先检查网络连接(通过 ping
log_info("检查网络连接...")
success, _, _ = run_command(
chroot_cmd_prefix + ["ping", "-c", "1", "8.8.8.8"],
"检查网络连接",
timeout=10
)
if not success:
# 尝试使用主机的网络配置
log_warning("chroot 环境无网络,尝试从 Live 环境复制 DNS 配置...")
# 复制 /etc/resolv.conf
resolv_source = "/etc/resolv.conf"
resolv_target = os.path.join(mount_point, "etc/resolv.conf")
if os.path.exists(resolv_source):
try:
shutil.copy2(resolv_source, resolv_target)
log_info("已复制 DNS 配置")
except Exception as e:
log_warning(f"复制 DNS 配置失败: {e}")
# 再次检查网络
success, _, _ = run_command(
chroot_cmd_prefix + ["ping", "-c", "1", "8.8.8.8"],
"再次检查网络连接",
timeout=10
)
if not success:
log_error("chroot 环境无法连接网络,无法自动安装 GRUB")
log_info("请手动安装 GRUB 包后再运行此工具")
return False
log_success("网络连接正常")
# 2. 更新包列表(某些发行版需要)
update_cmds = {
"debian": ["apt-get", "update"],
"ubuntu": ["apt-get", "update"],
"arch": ["pacman", "-Sy"],
"manjaro": ["pacman", "-Sy"],
}
if distro_type in update_cmds:
log_info(f"更新 {distro_type} 包列表...")
run_command(
chroot_cmd_prefix + update_cmds[distro_type],
f"更新 {distro_type} 包列表",
timeout=120
)
# 3. 安装 GRUB 包
log_info(f"安装 GRUB 包: {', '.join(packages)}")
install_cmd = chroot_cmd_prefix + pkg_cmd.split() + packages
success, stdout, stderr = run_command(
install_cmd,
f"安装 GRUB 包 ({' '.join(packages)})",
timeout=300
)
# 即使 yum 报告"已安装",也要检查实际命令是否存在
# CentOS/RHEL 的命令可能在 /usr/sbin/ 下,需要确保 PATH 包含该路径
if success:
log_info("包安装命令执行完成,检查命令是否可用...")
# 尝试查找 grub 相关命令的实际路径
grub_paths = [
"/usr/sbin/grub2-install",
"/usr/sbin/grub2-mkconfig",
"/usr/sbin/grub-install",
"/usr/sbin/grub-mkconfig",
"/sbin/grub2-install",
"/sbin/grub2-mkconfig",
]
found_any = False
for grub_path in grub_paths:
full_path = mount_point + grub_path
if os.path.exists(full_path):
log_info(f" 找到 GRUB 命令: {grub_path}")
found_any = True
if found_any:
log_success(f"✓ GRUB 包已安装")
return True
else:
log_warning("包管理器报告成功,但未找到 GRUB 命令")
log_info("尝试重新安装...")
# 强制重新安装
if distro_type in ["centos", "rhel", "fedora", "rocky", "almalinux"]:
reinstall_cmd = chroot_cmd_prefix + ["yum", "reinstall", "-y"] + packages
run_command(reinstall_cmd, "强制重新安装 GRUB 包", timeout=300)
# 再次检查
for grub_path in grub_paths:
full_path = mount_point + grub_path
if os.path.exists(full_path):
log_info(f" 找到 GRUB 命令: {grub_path}")
return True
log_error("仍无法找到 GRUB 命令")
return False
else:
log_error(f"✗ GRUB 包安装失败: {stderr}")
return False
def _get_grub_cmd_path(mount_point: str, distro_type: str) -> Tuple[Optional[str], Optional[str]]:
"""
查找 GRUB 命令的完整路径。
返回: (grub_install_path, grub_mkconfig_path) 或 (None, None)
"""
# 可能的命令名
install_names = ["grub2-install", "grub-install"]
mkconfig_names = ["grub2-mkconfig", "grub-mkconfig"]
# 可能的目录
dirs = ["/usr/sbin", "/sbin", "/usr/bin", "/bin"]
install_path = None
mkconfig_path = None
for name in install_names:
for dir_path in dirs:
full_path = os.path.join(mount_point, dir_path.lstrip('/'), name)
if os.path.exists(full_path):
install_path = os.path.join(dir_path, name)
break
if install_path:
break
for name in mkconfig_names:
for dir_path in dirs:
full_path = os.path.join(mount_point, dir_path.lstrip('/'), name)
if os.path.exists(full_path):
mkconfig_path = os.path.join(dir_path, name)
break
if mkconfig_path:
break
return install_path, mkconfig_path
# 全局变量存储找到的命令路径
_grub_install_cmd: Optional[str] = None
_grub_mkconfig_cmd: Optional[str] = None
def check_chroot_environment(mount_point: str, distro_type: str = "unknown") -> Tuple[bool, str]:
"""
检查 chroot 环境是否可用。
检测可用的 GRUB 命令及其版本。
如果缺少 GRUB 命令,尝试自动安装。
"""
global _grub_install_cmd, _grub_mkconfig_cmd
log_step("检查 chroot 环境", f"挂载点: {mount_point}")
# 首先尝试找到命令路径
install_path, mkconfig_path = _get_grub_cmd_path(mount_point, distro_type)
if install_path and mkconfig_path:
log_info(f" ✓ 找到 grub-install: {install_path}")
log_info(f" ✓ 找到 grub-mkconfig: {mkconfig_path}")
_grub_install_cmd = install_path
_grub_mkconfig_cmd = mkconfig_path
else:
log_warning("未找到 GRUB 命令,尝试自动安装...")
# 尝试自动安装
if _auto_install_grub(mount_point, distro_type):
# 重新查找
install_path, mkconfig_path = _get_grub_cmd_path(mount_point, distro_type)
if install_path and mkconfig_path:
log_success("✓ 自动安装成功GRUB 命令已可用")
_grub_install_cmd = install_path
_grub_mkconfig_cmd = mkconfig_path
else:
log_error("✗ 自动安装后仍未找到 GRUB 命令")
return False, "自动安装 GRUB 失败,请手动安装"
else:
log_error("=" * 60)
log_error(f"无法自动安装 GRUB 包")
log_error(f"")
log_error(f"请手动安装 GRUB 包:")
packages, pkg_cmd = _get_grub_packages(distro_type)
log_error(f" chroot {mount_point}")
log_error(f" {pkg_cmd} {' '.join(packages)}")
log_error(f" exit")
log_error(f"=" * 60)
return False, "自动安装 GRUB 失败"
# 检查 grub-install 版本
success, stdout, _ = run_command(["sudo", "chroot", mount_point, _grub_install_cmd, "--version"],
"检查 grub-install 版本", timeout=10)
if success:
log_info(f"grub-install 版本: {stdout.strip()}")
# 检查 EFI 目录UEFI 模式)
efi_dir = os.path.join(mount_point, "boot/efi")
if os.path.ismount(efi_dir):
log_info(f"✓ EFI 分区已挂载到 /boot/efi")
try:
stat = os.stat(efi_dir)
log_debug(f" EFI 目录权限: {oct(stat.st_mode)}")
except Exception as e:
log_warning(f" 无法获取 EFI 目录权限: {e}")
else:
log_info(f"EFI 分区未挂载(可能是 BIOS 模式)")
# 检查 Secure Boot 相关工具
sb_tools = ["mokutil", "sbsign", "sbverify"]
for tool in sb_tools:
success, _, _ = run_command(["sudo", "chroot", mount_point, "which", tool],
f"检查 Secure Boot 工具 {tool}", timeout=5)
if success:
log_info(f" ✓ 找到 Secure Boot 工具: {tool}")
return True, ""
def check_and_install_efi_modules(mount_point: str, distro_type: str, is_uefi: bool) -> Tuple[bool, str]:
"""
检查并安装 UEFI GRUB 模块。
对于 UEFI 模式,需要检查 /usr/lib/grub/x86_64-efi/ 目录是否存在。
如果不存在,自动安装 grub2-efi 包。
返回: (是否成功, 错误信息)
"""
if not is_uefi:
return True, ""
log_step("检查 UEFI GRUB 模块")
# 检测架构
efi_params = get_grub_efi_parameters()
if not efi_params:
return False, "无法确定 EFI 架构"
efi_target, _, _ = efi_params
# efi_target 格式: x86_64-efi, i386-efi, arm64-efi
arch_dir = efi_target # 如: x86_64-efi
# 检查模块目录
module_dir = os.path.join(mount_point, f"usr/lib/grub/{arch_dir}")
modinfo_file = os.path.join(module_dir, "modinfo.sh")
if os.path.exists(modinfo_file):
log_success(f"✓ UEFI GRUB 模块已存在: {module_dir}")
return True, ""
log_warning(f"UEFI GRUB 模块不存在: {module_dir}")
log_info("尝试安装 UEFI GRUB 模块包...")
chroot_cmd_prefix = ["sudo", "chroot", mount_point]
# 根据发行版选择正确的 EFI 包
# 注意CentOS/RHEL 8+ 需要 shim 包来支持 UEFI Secure Boot
efi_packages_map = {
"centos": ["grub2-efi-x64", "grub2-efi-x64-modules", "shim-x64"],
"rhel": ["grub2-efi-x64", "grub2-efi-x64-modules", "shim-x64"],
"fedora": ["grub2-efi-x64", "grub2-efi-x64-modules", "shim-x64"],
"rocky": ["grub2-efi-x64", "grub2-efi-x64-modules", "shim-x64"],
"almalinux": ["grub2-efi-x64", "grub2-efi-x64-modules", "shim-x64"],
"debian": ["grub-efi-amd64", "shim-signed"],
"ubuntu": ["grub-efi-amd64", "shim-signed"],
"arch": ["grub", "efibootmgr"], # Arch 的 grub 包包含 EFI 支持
"manjaro": ["grub", "efibootmgr"],
}
efi_packages = efi_packages_map.get(distro_type, [])
if not efi_packages:
log_warning(f"未知的发行版 '{distro_type}',无法自动安装 EFI 模块")
return False, f"无法确定 {distro_type} 的 EFI 包名"
log_info(f"将安装以下 EFI 包: {', '.join(efi_packages)}")
# 安装 EFI 包前,先确保网络连接(复制 DNS 配置)
resolv_source = "/etc/resolv.conf"
resolv_target = os.path.join(mount_point, "etc/resolv.conf")
if os.path.exists(resolv_source):
try:
shutil.copy2(resolv_source, resolv_target)
log_info("已复制 DNS 配置到 chroot 环境")
except Exception as e:
log_debug(f"复制 DNS 配置失败: {e}")
# 安装 EFI 包
if distro_type in ["centos", "rhel", "fedora", "rocky", "almalinux"]:
success, _, stderr = run_command(
chroot_cmd_prefix + ["yum", "install", "-y"] + efi_packages,
f"安装 UEFI GRUB 包",
timeout=300
)
elif distro_type in ["debian", "ubuntu"]:
success, _, stderr = run_command(
chroot_cmd_prefix + ["apt-get", "update"],
f"更新包列表",
timeout=120
)
success, _, stderr = run_command(
chroot_cmd_prefix + ["apt-get", "install", "-y"] + efi_packages,
f"安装 UEFI GRUB 包",
timeout=300
)
elif distro_type in ["arch", "manjaro"]:
success, _, stderr = run_command(
chroot_cmd_prefix + ["pacman", "-Sy", "--noconfirm"] + efi_packages,
f"安装 UEFI GRUB 包",
timeout=300
)
else:
return False, f"不支持的发行版: {distro_type}"
if not success:
log_error(f"UEFI GRUB 包安装失败")
return False, f"无法安装 UEFI GRUB 包: {stderr}"
# 重新检查模块
if os.path.exists(modinfo_file):
log_success(f"✓ UEFI GRUB 模块安装成功")
return True, ""
else:
log_error("UEFI GRUB 模块安装后仍不存在")
return False, "UEFI 模块安装失败"
def _manual_install_efi_files(mount_point: str, efi_target: str, efi_grub_file: str,
efi_boot_file: str, bootloader_id: str = "GRUB") -> bool:
"""
手动安装 EFI 文件(当 grub2-install 因为 Secure Boot 不支持时使用)。
直接从 /boot/efi/EFI/<distro>/ 复制已有的 EFI 文件,或使用 shim 链式加载。
返回: 是否成功
"""
log_step("手动安装 EFI 文件", f"目标: {efi_grub_file}")
efi_firmware_dir = os.path.join(mount_point, "boot/efi/EFI")
# 创建目标目录
target_dir = os.path.join(efi_firmware_dir, bootloader_id)
try:
os.makedirs(target_dir, exist_ok=True)
log_info(f"创建/确认 EFI 目录: {target_dir}")
except Exception as e:
log_error(f"创建 EFI 目录失败: {e}")
return False
# 查找源 EFI 文件
# 可能的源路径:
source_paths = []
# 1. 从 /boot/efi/EFI/<distro> 复制
for subdir in ["centos", "redhat", "fedora", "rocky", "almalinux", "BOOT", "boot"]:
source_paths.append(os.path.join(efi_firmware_dir, subdir, efi_grub_file))
source_paths.append(os.path.join(efi_firmware_dir, subdir, "shimx64.efi"))
source_paths.append(os.path.join(efi_firmware_dir, subdir, "shim.efi"))
source_paths.append(os.path.join(efi_firmware_dir, subdir, "grubx64.efi"))
# 2. 从 /boot/efi/ 直接查找
source_paths.append(os.path.join(mount_point, "boot/efi", efi_grub_file))
source_paths.append(os.path.join(mount_point, "boot/efi", "shimx64.efi"))
# 3. 从 /usr/lib/grub/x86_64-efi/ 复制 grubx64.efi
grub_modules_dir = os.path.join(mount_point, f"usr/lib/grub/{efi_target}")
source_paths.append(os.path.join(grub_modules_dir, "grubx64.efi"))
# 4. CentOS/RHEL 特定的路径
source_paths.append(os.path.join(mount_point, "boot/efi/EFI/centos/grubx64.efi"))
source_paths.append(os.path.join(mount_point, "boot/efi/EFI/centos/shimx64.efi"))
source_paths.append(os.path.join(mount_point, "usr/share/grub2/grubx64.efi"))
# 5. 尝试查找任何 .efi 文件
try:
if os.path.exists(efi_firmware_dir):
for root, dirs, files in os.walk(efi_firmware_dir):
for f in files:
if f.endswith('.efi'):
source_paths.append(os.path.join(root, f))
except:
pass
# 查找 shim 和 grub
shim_source = None
grub_source = None
for path in source_paths:
if os.path.exists(path):
log_info(f"找到 EFI 文件: {path}")
if "shim" in path.lower():
shim_source = path
elif "grub" in path.lower():
grub_source = path
# 复制文件
try:
if grub_source:
grub_target = os.path.join(target_dir, efi_grub_file)
shutil.copy2(grub_source, grub_target)
log_success(f"✓ 复制 GRUB: {grub_source} -> {grub_target}")
if shim_source:
# 如果使用 shim复制 shim 并重命名为 grubx64.efi
# 或者保留 shim 文件名并在 NVRAM 中注册
shim_target = os.path.join(target_dir, "shimx64.efi")
shutil.copy2(shim_source, shim_target)
log_success(f"✓ 复制 Shim: {shim_source} -> {shim_target}")
# 同时复制为 bootloader 名称,便于启动
if efi_grub_file != "shimx64.efi":
shutil.copy2(shim_source, os.path.join(target_dir, efi_grub_file))
# 如果没有找到任何文件,尝试从 grub 模块生成
if not grub_source and not shim_source:
log_warning("未找到现有 EFI 文件,尝试使用 grub-mkimage 生成...")
# 获取模块列表(必需模块)
modules = "part_gpt part_msdos fat ext2 xfs btrfs normal boot linux configfile search search_fs_uuid search_fs_file"
# 构建 chroot 内的输出路径
chroot_output_path = f"/boot/efi/EFI/{bootloader_id}/{efi_grub_file}"
# 尝试使用 grub-mkimage 生成 EFI 文件
chroot_cmd_prefix = ["sudo", "chroot", mount_point]
mkimage_cmd = chroot_cmd_prefix + [
"grub2-mkimage",
"-o", chroot_output_path,
"-O", efi_target,
"-p", "/boot/grub2",
] + modules.split()
success, _, stderr = run_command(
mkimage_cmd,
"使用 grub-mkimage 生成 EFI 文件",
timeout=60
)
# 检查主机路径上的文件是否生成成功
host_output_path = os.path.join(mount_point, chroot_output_path.lstrip('/'))
if success and os.path.exists(host_output_path):
log_success(f"✓ 成功生成 EFI 文件: {host_output_path}")
else:
log_error(f"生成 EFI 文件失败: {stderr}")
return False
return True
except Exception as e:
log_error(f"手动安装 EFI 文件失败: {e}")
return False
def install_efi_fallback(mount_point: str, efi_target: str, efi_grub_file: str,
efi_boot_file: str, bootloader_id: str = "GRUB") -> bool:
"""
安装 EFI fallback 引导文件。
将 GRUB EFI 文件复制到 /EFI/Boot/bootx64.efi这是 UEFI 的通用回退路径。
参考 Calamares 的实现。
"""
log_step("安装 EFI Fallback", f"目标文件: {efi_boot_file}")
efi_firmware_dir = os.path.join(mount_point, "boot/efi/EFI")
# 处理 VFAT 大小写问题(某些固件对大小写敏感)
# 检查 EFI 目录的实际大小写
if os.path.exists(efi_firmware_dir):
try:
entries = os.listdir(os.path.join(mount_point, "boot/efi"))
for entry in entries:
if entry.upper() == "EFI":
efi_firmware_dir = os.path.join(mount_point, "boot/efi", entry)
break
except Exception as e:
log_debug(f"检查 EFI 目录大小写失败: {e}")
# 创建 Boot 目录(处理大小写)
boot_dir = os.path.join(efi_firmware_dir, "Boot")
try:
if os.path.exists(efi_firmware_dir):
entries = os.listdir(efi_firmware_dir)
for entry in entries:
if entry.upper() == "BOOT":
boot_dir = os.path.join(efi_firmware_dir, entry)
break
except Exception:
pass
if not os.path.exists(boot_dir):
try:
os.makedirs(boot_dir)
log_info(f"创建 Boot 目录: {boot_dir}")
except Exception as e:
log_error(f"创建 Boot 目录失败: {e}")
return False
# 源文件路径
source_paths = [
os.path.join(efi_firmware_dir, bootloader_id, efi_grub_file),
os.path.join(efi_firmware_dir, bootloader_id.lower(), efi_grub_file.lower()),
]
source_file = None
for path in source_paths:
if os.path.exists(path):
source_file = path
break
if not source_file:
log_warning(f"找不到源 EFI 文件,尝试查找任何 .efi 文件")
# 尝试在 bootloader_id 目录下找到任何 .efi 文件
for subdir in [bootloader_id, bootloader_id.lower(), "grub", "GRUB"]:
grub_dir = os.path.join(efi_firmware_dir, subdir)
if os.path.exists(grub_dir):
try:
for f in os.listdir(grub_dir):
if f.endswith(".efi"):
source_file = os.path.join(grub_dir, f)
log_info(f"找到替代源文件: {source_file}")
break
except Exception:
pass
if source_file:
break
if not source_file:
log_error(f"无法找到源 EFI 文件")
return False
# 目标文件路径
target_file = os.path.join(boot_dir, efi_boot_file)
try:
shutil.copy2(source_file, target_file)
log_success(f"✓ EFI fallback 安装成功: {source_file} -> {target_file}")
return True
except Exception as e:
log_error(f"复制 EFI fallback 文件失败: {e}")
return False
def check_and_restore_kernel(mount_point: str, distro_type: str, has_separate_boot: bool) -> Tuple[bool, str]:
"""
检查并恢复 /boot 目录下的内核文件。
如果内核文件缺失,尝试从 /usr/lib/modules 复制或重新安装内核包。
返回: (是否成功, 错误信息)
"""
log_step("检查内核文件", f"独立 /boot 分区: {has_separate_boot}")
boot_dir = os.path.join(mount_point, "boot")
# 检查现有的内核文件
vmlinuz_files = []
initramfs_files = []
try:
if os.path.exists(boot_dir):
for f in os.listdir(boot_dir):
if f.startswith("vmlinuz-"):
vmlinuz_files.append(f)
elif f.startswith("initramfs-") and f.endswith(".img"):
initramfs_files.append(f)
except Exception as e:
log_warning(f"检查 /boot 目录失败: {e}")
log_info(f"发现 {len(vmlinuz_files)} 个内核文件, {len(initramfs_files)} 个 initramfs 文件")
# 如果内核文件存在,检查是否完整
if vmlinuz_files and initramfs_files:
log_success("✓ 内核文件已存在")
return True, ""
log_warning("内核文件缺失,尝试恢复...")
chroot_cmd_prefix = ["sudo", "chroot", mount_point]
# 方法1: 从 /usr/lib/modules 复制内核(如果存在)
usr_lib_modules = os.path.join(mount_point, "usr/lib/modules")
if os.path.exists(usr_lib_modules):
log_info("检查 /usr/lib/modules 中的内核...")
try:
kernel_versions = []
for d in os.listdir(usr_lib_modules):
# 跳过 rescue 和普通目录,查找版本号格式的目录
if os.path.isdir(os.path.join(usr_lib_modules, d)) and not d.endswith("kdump"):
kernel_versions.append(d)
if kernel_versions:
log_info(f"找到内核版本: {kernel_versions}")
for kernel_ver in kernel_versions:
# 检查该版本的内核文件是否存在
kernel_source = os.path.join(usr_lib_modules, kernel_ver, "vmlinuz")
if not os.path.exists(kernel_source):
# 某些系统使用不同的路径
kernel_source = os.path.join(mount_point, f"usr/lib/modules/{kernel_ver}/vmlinuz")
if os.path.exists(kernel_source):
kernel_target = os.path.join(boot_dir, f"vmlinuz-{kernel_ver}")
log_info(f"复制内核: {kernel_source} -> {kernel_target}")
try:
shutil.copy2(kernel_source, kernel_target)
log_success(f"✓ 复制内核成功: vmlinuz-{kernel_ver}")
except Exception as e:
log_error(f"复制内核失败: {e}")
else:
log_warning("/usr/lib/modules 中没有找到内核")
except Exception as e:
log_warning(f"检查 /usr/lib/modules 失败: {e}")
# 重新检查内核文件
vmlinuz_files = []
try:
for f in os.listdir(boot_dir):
if f.startswith("vmlinuz-"):
vmlinuz_files.append(f)
except:
pass
# 方法2: 重新安装内核包如果方法1失败
if not vmlinuz_files:
log_info("尝试重新安装内核包...")
# 根据发行版选择正确的包名
kernel_packages = {
"centos": ["kernel-core", "kernel-modules"],
"rhel": ["kernel-core", "kernel-modules"],
"fedora": ["kernel-core", "kernel-modules"],
"rocky": ["kernel-core", "kernel-modules"],
"almalinux": ["kernel-core", "kernel-modules"],
"debian": ["linux-image-generic"],
"ubuntu": ["linux-image-generic"],
"arch": ["linux"],
"manjaro": ["linux"],
}
packages = kernel_packages.get(distro_type, ["kernel"])
if distro_type in ["centos", "rhel", "fedora", "rocky", "almalinux"]:
success, _, stderr = run_command(
chroot_cmd_prefix + ["yum", "reinstall", "-y"] + packages,
f"重新安装内核包",
timeout=300
)
elif distro_type in ["debian", "ubuntu"]:
success, _, stderr = run_command(
chroot_cmd_prefix + ["apt-get", "install", "--reinstall", "-y"] + packages,
f"重新安装内核包",
timeout=300
)
elif distro_type in ["arch", "manjaro"]:
success, _, stderr = run_command(
chroot_cmd_prefix + ["pacman", "-S", "--noconfirm"] + packages,
f"重新安装内核包",
timeout=300
)
else:
log_warning(f"不支持的发行版 '{distro_type}',无法自动安装内核")
return False, "无法自动恢复内核文件"
if not success:
log_error("内核包安装失败")
return False, "内核包安装失败"
# 方法3: 重新生成 initramfs
log_info("重新生成 initramfs...")
# 获取当前存在的内核版本
vmlinuz_files = []
try:
for f in os.listdir(boot_dir):
if f.startswith("vmlinuz-"):
vmlinuz_files.append(f)
except:
pass
if not vmlinuz_files:
log_error("无法找到内核文件,无法生成 initramfs")
return False, "内核文件缺失"
for vmlinuz in vmlinuz_files:
kernel_ver = vmlinuz.replace("vmlinuz-", "")
log_info(f"为内核 {kernel_ver} 生成 initramfs...")
if distro_type in ["centos", "rhel", "fedora", "rocky", "almalinux"]:
# CentOS/RHEL 使用 dracut
success, _, stderr = run_command(
chroot_cmd_prefix + ["dracut", "-f", f"/boot/initramfs-{kernel_ver}.img", kernel_ver],
f"生成 initramfs for {kernel_ver}",
timeout=120
)
elif distro_type in ["debian", "ubuntu"]:
# Debian/Ubuntu 使用 update-initramfs
success, _, stderr = run_command(
chroot_cmd_prefix + ["update-initramfs", "-c", "-k", kernel_ver],
f"生成 initramfs for {kernel_ver}",
timeout=120
)
elif distro_type in ["arch", "manjaro"]:
# Arch 使用 mkinitcpio
success, _, stderr = run_command(
chroot_cmd_prefix + ["mkinitcpio", "-p", kernel_ver],
f"生成 initramfs for {kernel_ver}",
timeout=120
)
else:
# 通用方法:尝试 dracut
success, _, stderr = run_command(
chroot_cmd_prefix + ["dracut", "-f", f"/boot/initramfs-{kernel_ver}.img", kernel_ver],
f"生成 initramfs for {kernel_ver}",
timeout=120
)
if success:
log_success(f"✓ initramfs 生成成功: {kernel_ver}")
else:
log_warning(f"initramfs 生成失败: {stderr}")
# 最终检查
try:
vmlinuz_count = sum(1 for f in os.listdir(boot_dir) if f.startswith("vmlinuz-"))
initramfs_count = sum(1 for f in os.listdir(boot_dir) if f.startswith("initramfs-") and f.endswith(".img"))
log_info(f"恢复完成: {vmlinuz_count} 个内核, {initramfs_count} 个 initramfs")
if vmlinuz_count > 0:
return True, ""
else:
return False, "内核恢复失败"
except Exception as e:
return False, f"检查 /boot 目录失败: {e}"
def restore_bls_entries(mount_point: str, distro_type: str) -> Tuple[bool, str]:
"""
恢复 BLS (Boot Loader Specification) 配置条目。
CentOS/RHEL/Fedora 等使用 BLS 格式,启动项在 /boot/loader/entries/*.conf
返回: (是否成功, 错误信息)
"""
# 检查是否使用 BLS
loader_dir = os.path.join(mount_point, "boot/loader/entries")
grub_cfg_path = os.path.join(mount_point, "boot/grub2/grub.cfg")
# 检查 grub.cfg 是否包含 blscfg
uses_bls = False
try:
if os.path.exists(grub_cfg_path):
with open(grub_cfg_path, 'r') as f:
content = f.read()
if 'blscfg' in content or 'BootLoaderSpec' in content:
uses_bls = True
log_info("检测到系统使用 BLS (Boot Loader Specification)")
except Exception as e:
log_debug(f"检查 BLS 失败: {e}")
if not uses_bls:
log_debug("系统不使用 BLS跳过 BLS 恢复")
return True, ""
log_step("恢复 BLS 启动条目")
chroot_cmd_prefix = ["sudo", "chroot", mount_point]
# 方法1: 使用 kernel-install 重新生成 BLS 条目(推荐)
log_info("尝试使用 kernel-install 重新生成 BLS 条目...")
# 获取已安装的内核版本
boot_dir = os.path.join(mount_point, "boot")
kernel_versions = []
try:
for f in os.listdir(boot_dir):
if f.startswith("vmlinuz-"):
kernel_ver = f.replace("vmlinuz-", "")
kernel_versions.append(kernel_ver)
except Exception as e:
log_warning(f"读取 /boot 目录失败: {e}")
if not kernel_versions:
log_error("没有找到内核版本,无法生成 BLS 条目")
return False, "没有内核版本"
log_info(f"发现内核版本: {kernel_versions}")
# 创建 loader 目录
os.makedirs(loader_dir, exist_ok=True)
for kernel_ver in kernel_versions:
# 检查是否已有 BLS 条目
entry_file = os.path.join(loader_dir, f"{kernel_ver}.conf")
if os.path.exists(entry_file):
log_info(f"BLS 条目已存在: {entry_file}")
continue
log_info(f"为内核 {kernel_ver} 生成 BLS 条目...")
# 尝试使用 kernel-install
success, _, stderr = run_command(
chroot_cmd_prefix + ["kernel-install", "add", kernel_ver, f"/boot/vmlinuz-{kernel_ver}"],
f"生成 BLS 条目 for {kernel_ver}",
timeout=30
)
if success:
log_success(f"✓ kernel-install 成功: {kernel_ver}")
else:
log_warning(f"kernel-install 失败,尝试手动创建 BLS 条目...")
# 方法2: 手动创建 BLS 条目
machine_id = ""
try:
machine_id_path = os.path.join(mount_point, "etc/machine-id")
if os.path.exists(machine_id_path):
with open(machine_id_path, 'r') as f:
machine_id = f.read().strip()
except:
pass
if not machine_id:
machine_id = "unknown"
# 读取 /etc/os-release 获取标题
os_name = "Linux"
os_version = ""
try:
os_release_path = os.path.join(mount_point, "etc/os-release")
if os.path.exists(os_release_path):
with open(os_release_path, 'r') as f:
for line in f:
if line.startswith('NAME='):
os_name = line.split('=')[1].strip().strip('"')
elif line.startswith('VERSION_ID='):
os_version = line.split('=')[1].strip().strip('"')
except:
pass
title = f"{os_name}"
if os_version:
title += f" {os_version}"
# 获取根分区 UUID 或路径
root_device = "/dev/mapper/cl-root" # 默认,应该根据实际检测
# 尝试从 /etc/fstab 获取根分区
try:
fstab_path = os.path.join(mount_point, "etc/fstab")
if os.path.exists(fstab_path):
with open(fstab_path, 'r') as f:
for line in f:
if line.startswith('/') and ' / ' in line:
parts = line.split()
if len(parts) >= 2 and parts[1] == '/':
root_device = parts[0]
break
except:
pass
# 创建 BLS 条目文件
bls_content = f"""title {title} ({kernel_ver})
version {kernel_ver}
linux /vmlinuz-{kernel_ver}
initrd /initramfs-{kernel_ver}.img
options root={root_device} ro
"""
# 对于 CentOS/RHEL添加特定的 LVM 和 resume 参数
if distro_type in ["centos", "rhel", "fedora", "rocky", "almalinux"]:
bls_content = f"""title {title} ({kernel_ver})
version {kernel_ver}
linux /vmlinuz-{kernel_ver}
initrd /initramfs-{kernel_ver}.img
options root={root_device} ro crashkernel=auto resume=/dev/mapper/cl-swap rd.lvm.lv=cl/root rd.lvm.lv=cl/swap rhgb quiet
"""
entry_filename = f"{machine_id}-{kernel_ver}.conf"
entry_path = os.path.join(loader_dir, entry_filename)
try:
with open(entry_path, 'w') as f:
f.write(bls_content)
log_success(f"✓ 手动创建 BLS 条目: {entry_path}")
except Exception as e:
log_error(f"创建 BLS 条目失败: {e}")
return False, f"创建 BLS 条目失败: {e}"
# 方法3: 重新运行 grub2-mkconfig 确保 BLS 支持正确
log_info("重新生成 grub.cfg 以确保 BLS 支持...")
# 找到 grub2-mkconfig 或 grub-mkconfig
mkconfig_cmd = None
for cmd in ["/usr/sbin/grub2-mkconfig", "/sbin/grub2-mkconfig", "/usr/bin/grub2-mkconfig"]:
if os.path.exists(os.path.join(mount_point, cmd.lstrip('/'))):
mkconfig_cmd = cmd
break
if not mkconfig_cmd:
for cmd in ["/usr/sbin/grub-mkconfig", "/sbin/grub-mkconfig", "/usr/bin/grub-mkconfig"]:
if os.path.exists(os.path.join(mount_point, cmd.lstrip('/'))):
mkconfig_cmd = cmd
break
if mkconfig_cmd:
# 找到 grub.cfg 路径
grub_cfg = "/boot/grub2/grub.cfg"
for cfg_path in ["/boot/grub2/grub.cfg", "/boot/grub/grub.cfg"]:
if os.path.exists(os.path.join(mount_point, cfg_path.lstrip('/'))):
grub_cfg = cfg_path
break
success, _, stderr = run_command(
chroot_cmd_prefix + [mkconfig_cmd, "-o", grub_cfg],
"重新生成 grub.cfg",
timeout=60
)
if success:
log_success("✓ grub.cfg 重新生成成功")
else:
log_warning(f"grub.cfg 重新生成失败: {stderr}")
# 最终检查
try:
if os.path.exists(loader_dir):
entries = [f for f in os.listdir(loader_dir) if f.endswith('.conf')]
log_info(f"BLS 条目数量: {len(entries)}")
for entry in entries:
log_info(f" - {entry}")
if len(entries) > 0:
log_success("✓ BLS 配置恢复完成")
return True, ""
else:
log_warning("没有 BLS 条目生成")
return False, "BLS 条目生成失败"
else:
log_error("loader/entries 目录不存在")
return False, "BLS 目录不存在"
except Exception as e:
return False, f"检查 BLS 条目失败: {e}"
def chroot_and_repair_grub(mount_point: str, target_disk: str,
is_uefi: bool = False, distro_type: str = "unknown",
install_hybrid: bool = False,
use_fallback: bool = True,
efi_bootloader_id: str = "GRUB",
has_separate_boot: bool = False) -> Tuple[bool, str]:
"""
Chroot到目标系统并执行GRUB修复命令。
支持多种安装模式、架构、EFI fallback、Secure Boot 等。
参数:
mount_point: 挂载点路径
target_disk: 目标磁盘BIOS模式
is_uefi: 是否 UEFI 模式
distro_type: 发行版类型
install_hybrid: 是否同时安装 BIOS 和 EFI 模式(混合启动)
use_fallback: 是否安装 EFI fallback 文件
efi_bootloader_id: EFI 启动项名称
has_separate_boot: 是否有独立的 /boot 分区
"""
global _grub_install_cmd, _grub_mkconfig_cmd
log_step("修复 GRUB", f"目标磁盘: {target_disk}, UEFI: {is_uefi}, 发行版: {distro_type}")
if not is_uefi and not validate_device_path(target_disk):
log_error(f"无效的目标磁盘路径: {target_disk}")
return False, f"无效的目标磁盘路径: {target_disk}"
# 检查 chroot 环境(这会设置 _grub_install_cmd 和 _grub_mkconfig_cmd
ok, err = check_chroot_environment(mount_point, distro_type)
if not ok:
return False, err
# 确保命令路径已设置
if not _grub_install_cmd or not _grub_mkconfig_cmd:
return False, "无法确定 GRUB 命令路径"
log_info(f"使用 grub-install: {_grub_install_cmd}")
log_info(f"使用 grub-mkconfig: {_grub_mkconfig_cmd}")
# 检查是否有独立的 /boot 分区
boot_mount_point = os.path.join(mount_point, "boot")
if os.path.ismount(boot_mount_point):
has_separate_boot = True
log_info("检测到独立的 /boot 分区")
# 检查并恢复内核文件
kernel_ok, kernel_err = check_and_restore_kernel(mount_point, distro_type, has_separate_boot)
if not kernel_ok:
log_error(f"内核恢复失败: {kernel_err}")
return False, f"内核恢复失败: {kernel_err}"
# 恢复 BLS 启动条目CentOS/RHEL/Fedora 使用 BLS
bls_ok, bls_err = restore_bls_entries(mount_point, distro_type)
if not bls_ok:
log_warning(f"BLS 恢复失败: {bls_err}")
# BLS 失败不终止,继续尝试传统菜单
chroot_cmd_prefix = ["sudo", "chroot", mount_point]
# 检测 Live 环境
is_live_env = is_live_environment()
if is_live_env:
log_info("检测到 Live 环境(无法访问 EFI 变量)")
# 获取 EFI 参数(如果需要)
efi_params = None
if is_uefi or install_hybrid:
efi_params = get_grub_efi_parameters()
if not efi_params:
return False, "无法确定 EFI 架构参数"
efi_target, efi_grub_file, efi_boot_file = efi_params
log_info(f"EFI 参数: target={efi_target}, grub={efi_grub_file}, boot={efi_boot_file}")
# 检查并安装 UEFI GRUB 模块
efi_modules_ok, efi_modules_err = check_and_install_efi_modules(mount_point, distro_type, True)
if not efi_modules_ok:
log_error(f"UEFI 模块检查失败: {efi_modules_err}")
return False, f"UEFI 模块检查失败: {efi_modules_err}"
grub_install_success = False
install_errors = []
# ===== UEFI 安装 =====
if is_uefi or install_hybrid:
log_step("安装 UEFI GRUB")
# 检查 EFI 分区是否正确挂载
efi_check_path = os.path.join(mount_point, "boot/efi/EFI")
if os.path.exists(efi_check_path):
log_info(f"✓ EFI 目录存在: {efi_check_path}")
try:
contents = os.listdir(efi_check_path)
log_info(f" 当前 EFI 目录内容: {contents}")
except Exception as e:
log_warning(f" 无法列出 EFI 目录: {e}")
else:
log_warning(f"✗ EFI 目录不存在: {efi_check_path}")
grub_install_cmd = chroot_cmd_prefix + [
_grub_install_cmd,
f"--target={efi_target}",
"--efi-directory=/boot/efi",
f"--bootloader-id={efi_bootloader_id}",
"--force"
]
# Live 环境下优先使用 --removable
if is_live_env:
log_info("Live 环境: 优先使用 --removable 模式")
removable_cmd = chroot_cmd_prefix + [
_grub_install_cmd,
f"--target={efi_target}",
"--efi-directory=/boot/efi",
"--removable",
"--force"
]
success, stdout, stderr = run_command(
removable_cmd,
"安装 UEFI GRUB (removable 模式)",
timeout=60
)
if success:
grub_install_success = True
log_success("✓ removable 模式安装成功")
# 同时安装标准模式作为备选
log_info("尝试安装标准模式作为备选...")
run_command(grub_install_cmd + ["--no-nvram"], "安装标准模式 (no-nvram)", timeout=60)
else:
log_warning(f"removable 模式失败,尝试标准模式")
install_errors.append(f"removable 模式: {stderr}")
if not grub_install_success:
# 第一次尝试: 标准安装
log_info("尝试 1/3: 标准 UEFI 安装...")
success, stdout, stderr = run_command(
grub_install_cmd,
"安装 UEFI GRUB标准模式",
timeout=60
)
if success:
grub_install_success = True
log_success("✓ 标准安装成功")
else:
install_errors.append(f"标准模式: {stderr}")
log_warning(f"标准安装失败: {stderr}")
# 第二次尝试: removable 模式
log_info("尝试 2/3: Live环境/可移动模式(--removable...")
removable_cmd = chroot_cmd_prefix + [
_grub_install_cmd,
f"--target={efi_target}",
"--efi-directory=/boot/efi",
"--removable",
"--force"
]
success, stdout, stderr = run_command(
removable_cmd,
"安装 UEFI GRUB可移动模式",
timeout=60
)
if success:
grub_install_success = True
log_success("✓ 可移动模式安装成功")
log_info(f" GRUB 已安装到 /EFI/Boot/{efi_boot_file}")
else:
install_errors.append(f"removable 模式: {stderr}")
# 第三次尝试: no-nvram
log_info("尝试 3/3: 使用 --no-nvram 选项...")
success, stdout, stderr = run_command(
grub_install_cmd + ["--no-nvram"],
"安装 UEFI GRUB不带NVRAM",
timeout=60
)
if success:
grub_install_success = True
log_success("✓ no-nvram 模式安装成功")
else:
install_errors.append(f"no-nvram 模式: {stderr}")
# 如果所有 grub2-install 方法都失败,尝试手动复制 EFI 文件(针对 Secure Boot 问题)
if not grub_install_success and "Secure Boot" in str(install_errors):
log_info("grub2-install 不支持 Secure Boot尝试手动安装 EFI 文件...")
success = _manual_install_efi_files(mount_point, efi_target, efi_grub_file, efi_boot_file, efi_bootloader_id)
if success:
grub_install_success = True
log_success("✓ 手动 EFI 文件安装成功")
else:
log_error("手动 EFI 文件安装失败")
# 安装 EFI fallback如果启用
if grub_install_success and use_fallback:
log_info("安装 EFI fallback 文件...")
install_efi_fallback(mount_point, efi_target, efi_grub_file, efi_boot_file, efi_bootloader_id)
# ===== BIOS 安装 =====
if not is_uefi or install_hybrid:
log_step("安装 BIOS GRUB")
bios_cmd = chroot_cmd_prefix + [
_grub_install_cmd,
"--target=i386-pc",
"--recheck",
"--force"
]
# 如果有独立的 /boot 分区,需要指定 --boot-directory
# 这样 GRUB 才能在启动时正确找到配置文件
if has_separate_boot:
log_info("独立 /boot 分区: 添加 --boot-directory=/boot 参数")
bios_cmd.append("--boot-directory=/boot")
bios_cmd.append(target_disk)
success, stdout, stderr = run_command(
bios_cmd,
f"安装 BIOS GRUB 到 {target_disk}",
timeout=60
)
if success:
log_success(f"✓ BIOS GRUB 安装成功到 {target_disk}")
if not is_uefi:
grub_install_success = True
else:
log_error(f"BIOS GRUB 安装失败: {stderr}")
if not is_uefi:
install_errors.append(f"BIOS 模式: {stderr}")
# 检查安装结果
if not grub_install_success:
error_summary = "\n".join(install_errors)
log_error(f"所有 GRUB 安装尝试均失败:\n{error_summary}")
return False, f"GRUB 安装失败:\n{error_summary}"
log_success("✓ GRUB 安装阶段完成")
# 检查生成的 EFI 文件
if is_uefi or install_hybrid:
efi_paths_to_check = [
os.path.join(mount_point, "boot/efi/EFI", efi_bootloader_id),
os.path.join(mount_point, "boot/efi/EFI", efi_bootloader_id.lower()),
os.path.join(mount_point, "boot/efi/EFI/Boot"),
os.path.join(mount_point, "boot/efi/efi", efi_bootloader_id.lower()),
]
found_efi_file = False
for path in efi_paths_to_check:
if os.path.exists(path):
log_info(f" 检查目录: {path}")
try:
files = os.listdir(path)
for f in files:
if f.endswith('.efi'):
full = os.path.join(path, f)
size = os.path.getsize(full)
log_info(f" ✓ EFI文件: {f} ({size} bytes)")
found_efi_file = True
except Exception as e:
log_warning(f" 无法列出: {e}")
if not found_efi_file:
log_warning(f" 未找到任何 .efi 文件,安装可能有问题")
# ===== 更新 GRUB 配置 =====
log_step("更新 GRUB 配置文件")
# 确定配置文件路径
# 根据发行版类型确定正确的 GRUB 目录名
if distro_type in ["centos", "rhel", "fedora", "rocky", "almalinux", "opensuse"]:
# RHEL 系使用 grub2
preferred_grub_dir = "grub2"
fallback_grub_dir = "grub"
else:
# Debian/Ubuntu/Arch 等使用 grub
preferred_grub_dir = "grub"
fallback_grub_dir = "grub2"
possible_config_paths = [
f"/boot/{preferred_grub_dir}/grub.cfg",
f"/boot/{fallback_grub_dir}/grub.cfg",
]
config_path = ""
for cfg_path in possible_config_paths:
full_path = os.path.join(mount_point, cfg_path.lstrip('/'))
# 优先检查目录是否存在,但也考虑发行版偏好
if os.path.exists(os.path.dirname(full_path)):
config_path = cfg_path
break
# 如果没有找到,使用发行版偏好路径
if not config_path:
config_path = f"/boot/{preferred_grub_dir}/grub.cfg"
# 确定更新命令
if distro_type in ["debian", "ubuntu"]:
# Debian/Ubuntu 使用 update-grub 包装脚本
success, _, _ = run_command(chroot_cmd_prefix + ["which", "update-grub"], "检查 update-grub", timeout=5)
if success:
grub_update_cmd = ["update-grub"]
else:
grub_update_cmd = [_grub_mkconfig_cmd, "-o", config_path]
else:
grub_update_cmd = [_grub_mkconfig_cmd, "-o", config_path]
log_info(f"使用命令: {' '.join(grub_update_cmd)}")
log_info(f"配置文件路径: {config_path}")
# ===== 确保 grubenv 文件存在 =====
# CentOS/RHEL/Fedora 等系统需要 grubenv 文件,否则 grub2-mkconfig 会失败
grubenv_dir = os.path.dirname(config_path) # /boot/grub2 或 /boot/grub
grubenv_path = os.path.join(mount_point, grubenv_dir.lstrip('/'), "grubenv")
if not os.path.exists(grubenv_path):
log_info(f"grubenv 文件不存在,尝试创建...")
# 首先确保目录存在(在 chroot 环境中创建)
grubenv_dir_in_chroot = os.path.dirname(config_path) # /boot/grub2 或 /boot/grub
success, _, _ = run_command(
chroot_cmd_prefix + ["mkdir", "-p", grubenv_dir_in_chroot],
"创建 GRUB 配置目录",
timeout=5
)
if success:
log_info(f"✓ 创建目录: {grubenv_dir_in_chroot}")
# 检查 /boot 是否已挂载(在 chroot 内)
success, stdout, _ = run_command(
chroot_cmd_prefix + ["mount", "|", "grep", "boot"],
"检查 /boot 挂载状态",
timeout=5
)
log_info(f"/boot 挂载状态: {stdout if success else '未显示或检查失败'}")
# 方法1: 使用 grub2-editenv 创建
editenv_cmd = None
for cmd_name in ["grub2-editenv", "grub-editenv"]:
for cmd_dir in ["/usr/bin", "/bin", "/usr/sbin", "/sbin"]:
test_path = os.path.join(mount_point, cmd_dir.lstrip('/'), cmd_name)
if os.path.exists(test_path):
editenv_cmd = os.path.join(cmd_dir, cmd_name)
break
if editenv_cmd:
break
if editenv_cmd:
success, _, stderr = run_command(
chroot_cmd_prefix + [editenv_cmd, grubenv_dir_in_chroot + "/grubenv", "create"],
"创建 grubenv 文件",
timeout=10
)
# 重新检查文件是否存在(可能在不同路径)
if success:
log_success(f"✓ 成功创建 grubenv 文件")
else:
log_warning(f"grub2-editenv 创建失败: {stderr}")
# 方法2: 如果 /boot 是独立分区,尝试在挂载的主机路径创建
if not os.path.exists(grubenv_path):
# 根据发行版确定正确的目录名 (grub2 vs grub)
if distro_type in ["centos", "rhel", "fedora", "rocky", "almalinux", "opensuse"]:
grub_dir_name = "grub2"
else:
grub_dir_name = "grub"
# 尝试多种可能的路径(处理独立 /boot 分区的情况)
possible_grubenv_paths = [
grubenv_path, # 标准路径
os.path.join(mount_point, "boot", grub_dir_name, "grubenv"),
os.path.join(mount_point, "boot", "grub2", "grubenv"),
os.path.join(mount_point, "boot", "grub", "grubenv"),
os.path.join(mount_point, grub_dir_name, "grubenv"),
os.path.join(mount_point, "grub2", "grubenv"),
os.path.join(mount_point, "grub", "grubenv"),
]
for test_path in possible_grubenv_paths:
if os.path.exists(test_path):
log_info(f"✓ 发现 grubenv 已存在于: {test_path}")
grubenv_path = test_path
break
else:
# 没有找到现有文件,尝试创建
# 首先确保首选目录存在(通过主机直接创建)
preferred_dir = os.path.join(mount_point, "boot", grub_dir_name)
try:
os.makedirs(preferred_dir, exist_ok=True)
log_info(f"✓ 确保目录存在: {preferred_dir}")
except Exception as e:
log_warning(f"创建目录失败: {e}")
for test_path in possible_grubenv_paths:
try:
dir_path = os.path.dirname(test_path)
os.makedirs(dir_path, exist_ok=True)
# 创建一个空的 grubenv 文件(包含必要的头部)
with open(test_path, 'w') as f:
f.write("# GRUB Environment Block\n")
f.write("saved_entry=\n")
f.write("#" * 1024 + "\n") # 填充到至少1KB
log_success(f"✓ 手动创建 grubenv 文件成功: {test_path}")
grubenv_path = test_path
break
except Exception as e:
log_debug(f"尝试创建 {test_path} 失败: {e}")
continue
else:
log_warning(f"所有路径创建 grubenv 文件均失败")
# 执行配置更新
success, stdout, stderr = run_command(
chroot_cmd_prefix + grub_update_cmd,
"更新GRUB配置文件",
timeout=120
)
if not success:
log_error(f"GRUB 配置文件更新失败: {stderr}")
return False, f"GRUB配置文件更新失败: {stderr}"
log_success("✓ GRUB 配置文件更新成功")
# 检查生成的配置文件
full_config_path = os.path.join(mount_point, config_path.lstrip('/'))
if os.path.exists(full_config_path):
size = os.path.getsize(full_config_path)
log_info(f" 配置文件大小: {size} bytes")
try:
with open(full_config_path, 'r') as f:
content = f.read()
menu_entries = content.count('menuentry')
log_info(f" 发现 {menu_entries} 个启动菜单项")
except Exception as e:
log_warning(f" 无法读取配置文件: {e}")
else:
log_warning(f" 配置文件未找到: {full_config_path}")
# ===== 输出启动提示 =====
if is_uefi or install_hybrid:
log_step("UEFI 修复完成提示")
log_info("GRUB EFI 文件已安装到 EFI 分区")
# 检测安装模式并给出相应提示
efi_boot_path = os.path.join(mount_point, f"boot/efi/EFI/Boot/{efi_boot_file if efi_params else 'bootx64.efi'}")
efi_grub_path = os.path.join(mount_point, f"boot/efi/EFI/{efi_bootloader_id}/{efi_grub_file if efi_params else 'grubx64.efi'}")
if os.path.exists(efi_boot_path):
log_info("")
log_info("【重要】使用可移动模式安装 (EFI Fallback)")
log_info("启动方法:")
log_info(" 1. 在BIOS中选择 'UEFI Hard Drive''UEFI OS' 启动")
log_info(" 2. 如果有多块硬盘,选择正确的硬盘")
log_info(" 3. 确保启动模式为 UEFI不是 Legacy/CSM")
elif os.path.exists(efi_grub_path):
log_info("")
log_info(f"【重要】使用标准模式安装 (/{efi_bootloader_id}/{efi_grub_file if efi_params else 'grubx64.efi'})")
log_info("启动方法:")
log_info(f" 1. 在BIOS启动菜单中选择 '{efi_bootloader_id}''Linux' 启动项")
log_info(" 2. 如果没有该选项,需要手动添加启动项")
log_info(f" 路径: \\EFI\\{efi_bootloader_id}\\{efi_grub_file if efi_params else 'grubx64.efi'}")
log_info("")
log_info("故障排除:")
log_info(" • 如果仍无法启动,请检查:")
log_info(" - BIOS 是否设置为 UEFI 启动模式(非 Legacy/CSM")
log_info(" - 安全启动 (Secure Boot) 是否已禁用")
log_info(" - EFI 分区格式是否为 FAT32")
return True, ""
def unmount_target_system(mount_point: str) -> Tuple[bool, str]:
"""
卸载所有挂载的分区。
确保按正确顺序卸载,避免资源忙错误。
"""
log_step("卸载目标系统", f"挂载点: {mount_point}")
success = True
error_msg = ""
# 按逆序卸载绑定挂载
bind_paths = ["/run", "/sys", "/proc", "/dev/pts", "/dev"]
for path in bind_paths:
target_path = os.path.join(mount_point, path.lstrip('/'))
if os.path.ismount(target_path):
# 尝试多次卸载(有时需要重试)
for attempt in range(3):
s, _, stderr = run_command(["sudo", "umount", target_path], f"卸载绑定 {target_path}")
if s:
break
time.sleep(0.5)
if not s:
success = False
error_msg += f"卸载绑定 {target_path} 失败: {stderr}\n"
else:
log_debug(f"未挂载,跳过: {target_path}")
# 卸载 EFI 分区
efi_mount_point = os.path.join(mount_point, "boot/efi")
if os.path.ismount(efi_mount_point):
s, _, stderr = run_command(["sudo", "umount", efi_mount_point], f"卸载 EFI 分区")
if not s:
success = False
error_msg += f"卸载 EFI 分区失败: {stderr}\n"
else:
log_debug(f"EFI 分区未挂载,跳过")
# 卸载 /boot 分区
boot_mount_point = os.path.join(mount_point, "boot")
if os.path.ismount(boot_mount_point):
s, _, stderr = run_command(["sudo", "umount", boot_mount_point], f"卸载 /boot 分区")
if not s:
success = False
error_msg += f"卸载 /boot 分区失败: {stderr}\n"
else:
log_debug(f"/boot 分区未挂载,跳过")
# 卸载根分区
if os.path.ismount(mount_point):
s, _, stderr = run_command(["sudo", "umount", mount_point], f"卸载根分区")
if not s:
success = False
error_msg += f"卸载根分区失败: {stderr}\n"
else:
log_debug(f"根分区未挂载,跳过")
# 清理临时目录
if os.path.exists(mount_point) and not os.path.ismount(mount_point):
try:
shutil.rmtree(mount_point)
log_success(f"✓ 清理临时目录: {mount_point}")
except OSError as e:
log_warning(f"无法删除临时目录 {mount_point}: {e}")
error_msg += f"无法删除临时目录: {e}\n"
if success:
log_success("✓ 所有分区已卸载")
else:
log_warning("部分分区卸载失败")
return success, error_msg
# ===== 高级功能函数 =====
def detect_existing_grub_entries(mount_point: str) -> List[Dict]:
"""
检测 EFI 分区中已有的 GRUB 启动项。
返回发现的 EFI 启动项列表。
"""
entries = []
efi_path = os.path.join(mount_point, "boot/efi/EFI")
if not os.path.exists(efi_path):
return entries
try:
for item in os.listdir(efi_path):
item_path = os.path.join(efi_path, item)
if os.path.isdir(item_path):
efi_files = [f for f in os.listdir(item_path) if f.endswith('.efi')]
if efi_files:
entries.append({
"name": item,
"path": item_path,
"files": efi_files
})
except Exception as e:
log_debug(f"检测现有 GRUB 条目失败: {e}")
return entries
def install_refind(mount_point: str) -> Tuple[bool, str]:
"""
安装 rEFInd 作为备用引导管理器。
需要目标系统中已安装 rEFInd 包。
"""
log_step("安装 rEFInd")
chroot_cmd_prefix = ["sudo", "chroot", mount_point]
# 检查 refind-install 是否存在
success, _, _ = run_command(
chroot_cmd_prefix + ["which", "refind-install"],
"检查 refind-install",
timeout=10
)
if not success:
return False, "未找到 refind-install请确保已安装 rEFInd 包"
# 运行 refind-install
success, stdout, stderr = run_command(
chroot_cmd_prefix + ["refind-install"],
"安装 rEFInd",
timeout=60
)
if success:
log_success("✓ rEFInd 安装成功")
return True, ""
else:
log_error(f"rEFInd 安装失败: {stderr}")
return False, f"rEFInd 安装失败: {stderr}"
def update_firmware_boot_order(mount_point: str, efi_bootloader_id: str = "GRUB") -> bool:
"""
使用 efibootmgr 更新固件启动顺序。
注意:在 Live 环境中可能无法工作。
"""
if is_live_environment():
log_warning("Live 环境无法更新固件启动顺序")
return False
log_step("更新固件启动顺序")
# 查找 EFI 分区
efi_partition = None
for line in open("/proc/mounts"):
if mount_point in line and "/boot/efi" in line:
parts = line.split()
efi_partition = parts[0]
break
if not efi_partition:
log_warning("无法找到 EFI 分区")
return False
# 使用 efibootmgr 创建启动项
success, _, stderr = run_command(
["efibootmgr", "-c", "-L", efi_bootloader_id,
"-l", f"\\EFI\\{efi_bootloader_id}\\grubx64.efi"],
"创建 EFI 启动项",
timeout=10
)
if success:
log_success("✓ 固件启动顺序更新成功")
return True
else:
log_warning(f"更新固件启动顺序失败: {stderr}")
return False
if __name__ == "__main__":
print("--- 运行后端测试 ---")
print("\n--- 系统信息检测 ---")
print(f"EFI 位数: {get_efi_word_size()}")
print(f"CPU 架构: {platform.machine()}")
print(f"Live 环境: {is_live_environment()}")
sb_supported, sb_enabled = check_secure_boot_status()
print(f"Secure Boot 支持: {sb_supported}, 启用: {sb_enabled}")
efi_params = get_grub_efi_parameters()
if efi_params:
print(f"GRUB EFI 参数: {efi_params}")
print("\n--- 扫描分区测试 ---")
success, disks, partitions, efi_partitions, err = scan_partitions()
if success:
print("磁盘:", [d['name'] for d in disks])
print("分区:", [p['name'] for p in partitions])
print("EFI分区:", [p['name'] for p in efi_partitions])
else:
print("扫描分区失败:", err)