2689 lines
103 KiB
Python
2689 lines
103 KiB
Python
# backend.py
|
||
# 参考 Calamares 引导安装模块优化
|
||
# 支持多架构、EFI fallback、Secure Boot、特殊文件系统等
|
||
|
||
import subprocess
|
||
import os
|
||
import json
|
||
import time
|
||
import re
|
||
import shutil
|
||
import glob
|
||
import platform
|
||
from typing import Tuple, List, Dict, Optional, Callable
|
||
from enum import Enum
|
||
|
||
# 常量配置
|
||
COMMAND_TIMEOUT = 300 # 命令执行超时时间(秒)
|
||
DEFAULT_GRUB_CONFIG_PATHS = [
|
||
"/boot/grub/grub.cfg",
|
||
"/boot/grub2/grub.cfg",
|
||
]
|
||
|
||
# EFI 架构参数映射 (参考 Calamares)
|
||
EFI_ARCH_PARAMETERS = {
|
||
"32": {
|
||
"target": "i386-efi",
|
||
"grub_file": "grubia32.efi",
|
||
"boot_file": "bootia32.efi",
|
||
"shim_file": "shimia32.efi"
|
||
},
|
||
"64": {
|
||
"x86_64": {
|
||
"target": "x86_64-efi",
|
||
"grub_file": "grubx64.efi",
|
||
"boot_file": "bootx64.efi",
|
||
"shim_file": "shimx64.efi"
|
||
},
|
||
"aarch64": {
|
||
"target": "arm64-efi",
|
||
"grub_file": "grubaa64.efi",
|
||
"boot_file": "bootaa64.efi",
|
||
"shim_file": "shimaa64.efi"
|
||
},
|
||
"loongarch64": {
|
||
"target": "loongarch64-efi",
|
||
"grub_file": "grubloongarch64.efi",
|
||
"boot_file": "bootloongarch64.efi",
|
||
"shim_file": "shimloongarch64.efi"
|
||
}
|
||
}
|
||
}
|
||
|
||
|
||
class LogLevel(Enum):
|
||
"""日志级别"""
|
||
DEBUG = "debug"
|
||
INFO = "info"
|
||
WARNING = "warning"
|
||
ERROR = "error"
|
||
SUCCESS = "success"
|
||
STEP = "step"
|
||
|
||
|
||
# 全局日志回调函数
|
||
_log_callback: Optional[Callable[[str, LogLevel], None]] = None
|
||
|
||
|
||
def set_log_callback(callback: Callable[[str, LogLevel], None]):
|
||
"""设置日志回调函数,用于前端显示"""
|
||
global _log_callback
|
||
_log_callback = callback
|
||
|
||
|
||
def _log(message: str, level: LogLevel = LogLevel.INFO):
|
||
"""内部日志函数"""
|
||
# 同时输出到控制台和回调
|
||
prefix = {
|
||
LogLevel.DEBUG: "[DEBUG]",
|
||
LogLevel.INFO: "[INFO]",
|
||
LogLevel.WARNING: "[WARN]",
|
||
LogLevel.ERROR: "[ERROR]",
|
||
LogLevel.SUCCESS: "[SUCCESS]",
|
||
LogLevel.STEP: "[STEP]"
|
||
}.get(level, "[INFO]")
|
||
|
||
print(f"{prefix} {message}")
|
||
|
||
if _log_callback:
|
||
try:
|
||
_log_callback(message, level)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def log_step(step_name: str, detail: str = ""):
|
||
"""输出步骤日志"""
|
||
separator = "=" * 60
|
||
_log(f"{separator}", LogLevel.STEP)
|
||
_log(f"{step_name}", LogLevel.STEP)
|
||
if detail:
|
||
_log(f"详情: {detail}", LogLevel.STEP)
|
||
_log(f"{separator}", LogLevel.STEP)
|
||
|
||
|
||
def log_info(msg: str):
|
||
"""输出信息日志"""
|
||
_log(msg, LogLevel.INFO)
|
||
|
||
|
||
def log_warning(msg: str):
|
||
"""输出警告日志"""
|
||
_log(msg, LogLevel.WARNING)
|
||
|
||
|
||
def log_error(msg: str):
|
||
"""输出错误日志"""
|
||
_log(msg, LogLevel.ERROR)
|
||
|
||
|
||
def log_debug(msg: str):
|
||
"""输出调试日志"""
|
||
_log(msg, LogLevel.DEBUG)
|
||
|
||
|
||
def log_success(msg: str):
|
||
"""输出成功日志"""
|
||
_log(msg, LogLevel.SUCCESS)
|
||
|
||
|
||
def validate_device_path(path: str) -> bool:
|
||
"""
|
||
验证设备路径格式是否合法(防止命令注入)。
|
||
支持标准分区、LVM、LUKS、RAID、ZFS等设备。
|
||
"""
|
||
if not path:
|
||
return False
|
||
patterns = [
|
||
r'^/dev/[a-zA-Z0-9_-]+$',
|
||
r'^/dev/mapper/[a-zA-Z0-9_-]+$',
|
||
r'^/dev/mapper/[a-zA-Z0-9_]+-[a-zA-Z0-9_-]+$',
|
||
r'^/dev/md[0-9]+$',
|
||
r'^/dev/nvme[0-9]+n[0-9]+(p[0-9]+)?$',
|
||
r'^/dev/mmcblk[0-9]+(p[0-9]+)?$',
|
||
r'^/dev/zd[0-9]+$',
|
||
]
|
||
return any(re.match(pattern, path) is not None for pattern in patterns)
|
||
|
||
|
||
def run_command(command: List[str], description: str = "执行命令",
|
||
cwd: Optional[str] = None, shell: bool = False,
|
||
timeout: int = COMMAND_TIMEOUT,
|
||
env: Optional[Dict[str, str]] = None) -> Tuple[bool, str, str]:
|
||
"""
|
||
运行一个系统命令,并捕获其输出和错误。
|
||
支持环境变量设置(用于ZFS等特殊文件系统)。
|
||
"""
|
||
cmd_str = ' '.join(command)
|
||
log_info(f"执行命令: {cmd_str}")
|
||
log_debug(f"工作目录: {cwd or '当前目录'}, 超时: {timeout}秒")
|
||
|
||
# 准备环境变量
|
||
run_env = None
|
||
if env:
|
||
run_env = os.environ.copy()
|
||
run_env.update(env)
|
||
log_debug(f"环境变量: {env}")
|
||
|
||
try:
|
||
result = subprocess.run(
|
||
command,
|
||
capture_output=True,
|
||
text=True,
|
||
check=True,
|
||
cwd=cwd,
|
||
shell=shell,
|
||
timeout=timeout,
|
||
env=run_env
|
||
)
|
||
if result.stdout:
|
||
log_debug(f"stdout: {result.stdout[:500]}")
|
||
if result.stderr:
|
||
log_debug(f"stderr: {result.stderr[:500]}")
|
||
log_success(f"✓ 命令成功: {description}")
|
||
return True, result.stdout, result.stderr
|
||
except subprocess.CalledProcessError as e:
|
||
log_error(f"✗ 命令失败: {description}")
|
||
log_error(f"返回码: {e.returncode}")
|
||
log_error(f"stdout: {e.stdout}")
|
||
log_error(f"stderr: {e.stderr}")
|
||
return False, e.stdout, e.stderr
|
||
except subprocess.TimeoutExpired:
|
||
log_error(f"✗ 命令超时(超过 {timeout} 秒): {description}")
|
||
return False, "", f"命令执行超时"
|
||
except FileNotFoundError:
|
||
log_error(f"✗ 命令未找到: {command[0]}")
|
||
return False, "", f"命令未找到: {command[0]}"
|
||
except Exception as e:
|
||
log_error(f"✗ 发生未知错误: {e}")
|
||
return False, "", str(e)
|
||
|
||
|
||
def get_efi_word_size() -> str:
|
||
"""
|
||
检测 EFI 位数(32或64位)。
|
||
通过读取 /sys/firmware/efi/fw_platform_size 获取。
|
||
如果内核较旧不支持,默认假设为 64 位。
|
||
"""
|
||
# 首先检查是否是 EFI 系统
|
||
if not os.path.exists("/sys/firmware/efi"):
|
||
log_debug("非 EFI 系统,返回 64 位作为默认值")
|
||
return "64"
|
||
|
||
try:
|
||
# 尝试读取新内核的 fw_platform_size
|
||
if os.path.exists("/sys/firmware/efi/fw_platform_size"):
|
||
with open("/sys/firmware/efi/fw_platform_size", "r") as f:
|
||
efi_bitness = f.read(2).strip()
|
||
if efi_bitness in ["32", "64"]:
|
||
log_debug(f"检测到 EFI 位数: {efi_bitness} 位")
|
||
return efi_bitness
|
||
|
||
# 旧内核:检查是否存在 64 位 EFI 相关文件
|
||
# 如果存在 /sys/firmware/efi/vars 或 /sys/firmware/efi/efivars,假设为 64 位
|
||
if os.path.exists("/sys/firmware/efi/vars") or os.path.exists("/sys/firmware/efi/efivars"):
|
||
log_debug("检测到 EFI 环境,假设为 64 位(旧内核)")
|
||
return "64"
|
||
|
||
except FileNotFoundError:
|
||
log_debug("无法读取 EFI 位数文件,假设为 64 位")
|
||
except Exception as e:
|
||
log_warning(f"读取 EFI 位数时出错: {e}")
|
||
|
||
return "64"
|
||
|
||
|
||
def get_grub_efi_parameters() -> Optional[Tuple[str, str, str]]:
|
||
"""
|
||
获取 GRUB EFI 安装参数。
|
||
返回: (target_name, grub.efi_name, boot.efi_name)
|
||
参考 Calamares 的实现。
|
||
"""
|
||
efi_bitness = get_efi_word_size()
|
||
cpu_type = platform.machine()
|
||
|
||
log_debug(f"系统架构: {cpu_type}, EFI位数: {efi_bitness}")
|
||
|
||
if efi_bitness == "32":
|
||
# 假设所有 32 位都是传统 x86
|
||
params = EFI_ARCH_PARAMETERS["32"]
|
||
return params["target"], params["grub_file"], params["boot_file"]
|
||
elif efi_bitness == "64" and cpu_type == "aarch64":
|
||
params = EFI_ARCH_PARAMETERS["64"]["aarch64"]
|
||
return params["target"], params["grub_file"], params["boot_file"]
|
||
elif efi_bitness == "64" and cpu_type == "loongarch64":
|
||
params = EFI_ARCH_PARAMETERS["64"]["loongarch64"]
|
||
return params["target"], params["grub_file"], params["boot_file"]
|
||
elif efi_bitness == "64":
|
||
# 如果不是 ARM,则为 AMD64
|
||
params = EFI_ARCH_PARAMETERS["64"]["x86_64"]
|
||
return params["target"], params["grub_file"], params["boot_file"]
|
||
|
||
log_warning(f"无法确定 GRUB EFI 参数: bits={efi_bitness}, cpu={cpu_type}")
|
||
return None
|
||
|
||
|
||
def get_shim_filename() -> Optional[str]:
|
||
"""获取当前架构的 shim 文件名"""
|
||
efi_bitness = get_efi_word_size()
|
||
cpu_type = platform.machine()
|
||
|
||
if efi_bitness == "32":
|
||
return EFI_ARCH_PARAMETERS["32"]["shim_file"]
|
||
elif efi_bitness == "64" and cpu_type == "aarch64":
|
||
return EFI_ARCH_PARAMETERS["64"]["aarch64"]["shim_file"]
|
||
elif efi_bitness == "64" and cpu_type == "loongarch64":
|
||
return EFI_ARCH_PARAMETERS["64"]["loongarch64"]["shim_file"]
|
||
elif efi_bitness == "64":
|
||
return EFI_ARCH_PARAMETERS["64"]["x86_64"]["shim_file"]
|
||
|
||
return None
|
||
|
||
|
||
def is_live_environment() -> bool:
|
||
"""
|
||
检测是否在 Live 环境中运行。
|
||
Live 环境通常无法访问 EFI 变量。
|
||
"""
|
||
efivars_path = "/sys/firmware/efi/efivars"
|
||
if not os.path.exists(efivars_path):
|
||
log_debug("Live 环境检测: /sys/firmware/efi/efivars 不存在")
|
||
return True
|
||
try:
|
||
files = os.listdir(efivars_path)
|
||
if not files:
|
||
log_debug("Live 环境检测: efivars 目录为空")
|
||
return True
|
||
except PermissionError:
|
||
log_debug("Live 环境检测: 无法访问 efivars")
|
||
return True
|
||
|
||
log_debug("Live 环境检测: 看起来不是 Live 环境")
|
||
return False
|
||
|
||
|
||
def check_secure_boot_status() -> Tuple[bool, bool]:
|
||
"""
|
||
检查 Secure Boot 状态。
|
||
返回: (是否支持检测, 是否启用)
|
||
"""
|
||
try:
|
||
# 方法1: 通过 mokutil 检查
|
||
success, stdout, _ = run_command(
|
||
["mokutil", "--sb-state"],
|
||
"检查 Secure Boot 状态",
|
||
timeout=10
|
||
)
|
||
if success:
|
||
if "enabled" in stdout.lower():
|
||
log_info("Secure Boot 状态: 已启用")
|
||
return True, True
|
||
elif "disabled" in stdout.lower():
|
||
log_info("Secure Boot 状态: 已禁用")
|
||
return True, False
|
||
except Exception:
|
||
pass
|
||
|
||
# 方法2: 通过 efi 变量检查
|
||
try:
|
||
sb_var_path = "/sys/firmware/efi/efivars/SecureBoot-8be4df61-93ca-11d2-aa0d-00e098032b8c"
|
||
if os.path.exists(sb_var_path):
|
||
with open(sb_var_path, "rb") as f:
|
||
data = f.read()
|
||
if len(data) >= 5:
|
||
# 第5个字节是 Secure Boot 状态
|
||
sb_enabled = data[4] == 1
|
||
log_info(f"Secure Boot 状态: {'已启用' if sb_enabled else '已禁用'}")
|
||
return True, sb_enabled
|
||
except Exception as e:
|
||
log_debug(f"通过 efi 变量检查 Secure Boot 失败: {e}")
|
||
|
||
log_warning("无法检测 Secure Boot 状态")
|
||
return False, False
|
||
|
||
|
||
def _process_partition(block_device: Dict, all_disks: List, all_partitions: List,
|
||
all_efi_partitions: List) -> None:
|
||
"""
|
||
处理单个块设备,递归处理子分区、LVM逻辑卷、btrfs子卷等。
|
||
"""
|
||
dev_name = f"/dev/{block_device.get('name', '')}"
|
||
dev_type = block_device.get("type")
|
||
|
||
# 跳过 loop 设备和 Live 系统自身的挂载
|
||
mountpoint = block_device.get("mountpoint")
|
||
if mountpoint and (mountpoint.startswith("/run/media") or
|
||
mountpoint.startswith("/cdrom") or
|
||
mountpoint.startswith("/live") or
|
||
mountpoint.startswith("/iso")):
|
||
log_debug(f"跳过 Live 系统挂载点: {dev_name} -> {mountpoint}")
|
||
return
|
||
|
||
if dev_type == "disk":
|
||
# 跳过 loop 和 rom 设备
|
||
if block_device.get("rota") is not None or block_device.get("size", "0") == "0":
|
||
pass # 可能是有效磁盘
|
||
all_disks.append({
|
||
"name": dev_name,
|
||
"size": block_device.get("size", "unknown"),
|
||
"model": block_device.get("model", "") # 可能为空(旧版本 lsblk)
|
||
})
|
||
for child in block_device.get("children", []):
|
||
_process_partition(child, all_disks, all_partitions, all_efi_partitions)
|
||
|
||
elif dev_type == "part":
|
||
# 跳过 Live 系统的根分区
|
||
if mountpoint and mountpoint == "/":
|
||
# 检查是否是 Live 系统的根(通过检查是否有 /live 或特定标记)
|
||
live_marker = os.path.join(mountpoint, "live")
|
||
if os.path.exists(live_marker):
|
||
log_debug(f"跳过 Live 系统根分区: {dev_name}")
|
||
for child in block_device.get("children", []):
|
||
_process_partition(child, all_disks, all_partitions, all_efi_partitions)
|
||
return
|
||
|
||
part_info = {
|
||
"name": dev_name,
|
||
"fstype": block_device.get("fstype", "unknown"),
|
||
"size": block_device.get("size", "unknown"),
|
||
"mountpoint": mountpoint,
|
||
"uuid": block_device.get("uuid"),
|
||
"partlabel": block_device.get("partlabel"),
|
||
"label": block_device.get("label"),
|
||
"parttype": (block_device.get("parttype") or "").upper(),
|
||
"fsver": block_device.get("fsver", ""),
|
||
"is_subvolume": False
|
||
}
|
||
|
||
# 检测 btrfs 子卷
|
||
if part_info["fstype"] == "btrfs":
|
||
part_info["is_btrfs"] = True
|
||
# 尝试获取子卷信息
|
||
try:
|
||
success, stdout, _ = run_command(
|
||
["btrfs", "subvolume", "list", dev_name],
|
||
f"检测 btrfs 子卷: {dev_name}",
|
||
timeout=10
|
||
)
|
||
if success:
|
||
subvolumes = []
|
||
for line in stdout.strip().split('\n'):
|
||
if line:
|
||
# 格式: ID 256 gen 16 top level 5 path @
|
||
match = re.search(r'path\s+(\S+)', line)
|
||
if match:
|
||
subvolumes.append(match.group(1))
|
||
if subvolumes:
|
||
part_info["subvolumes"] = subvolumes
|
||
log_debug(f"{dev_name} 的 btrfs 子卷: {subvolumes}")
|
||
except Exception as e:
|
||
log_debug(f"检测 btrfs 子卷失败: {e}")
|
||
|
||
all_partitions.append(part_info)
|
||
|
||
# 检测 EFI 系统分区
|
||
is_vfat = part_info["fstype"] == "vfat"
|
||
has_efi_label = part_info["partlabel"] and "EFI" in part_info["partlabel"].upper()
|
||
has_efi_name = part_info["label"] and "EFI" in part_info["label"].upper()
|
||
# EFI 系统分区的 GPT 类型 GUID
|
||
is_efi_type = part_info["parttype"] == "C12A7328-F81F-11D2-BA4B-00A0C93EC93B"
|
||
# 或者检测标志
|
||
is_efi_fs = block_device.get("fsver") == "FAT32" and is_vfat
|
||
|
||
if is_vfat and (has_efi_label or has_efi_name or is_efi_type):
|
||
all_efi_partitions.append(part_info)
|
||
log_debug(f"检测到 EFI 分区: {dev_name}")
|
||
|
||
for child in block_device.get("children", []):
|
||
_process_partition(child, all_disks, all_partitions, all_efi_partitions)
|
||
|
||
elif dev_type in ["lvm", "dm"]:
|
||
lv_name = block_device.get("name", "")
|
||
|
||
# 处理 LVM 命名格式 (vg-lv)
|
||
if "-" in lv_name and not lv_name.startswith("dm-"):
|
||
mapper_path = f"/dev/mapper/{lv_name}"
|
||
else:
|
||
mapper_path = dev_name
|
||
|
||
part_info = {
|
||
"name": mapper_path,
|
||
"fstype": block_device.get("fstype", "unknown"),
|
||
"size": block_device.get("size", "unknown"),
|
||
"mountpoint": mountpoint,
|
||
"uuid": block_device.get("uuid"),
|
||
"partlabel": block_device.get("partlabel"),
|
||
"label": block_device.get("label"),
|
||
"parttype": (block_device.get("parttype") or "").upper(),
|
||
"is_lvm": True,
|
||
"dm_name": lv_name
|
||
}
|
||
|
||
# 检测 LUKS 加密
|
||
if part_info["fstype"] == "crypto_LUKS":
|
||
part_info["is_luks"] = True
|
||
log_debug(f"检测到 LUKS 加密卷: {mapper_path}")
|
||
|
||
all_partitions.append(part_info)
|
||
|
||
for child in block_device.get("children", []):
|
||
_process_partition(child, all_disks, all_partitions, all_efi_partitions)
|
||
|
||
elif dev_type == "crypt":
|
||
# 已解密的 LUKS 设备
|
||
part_info = {
|
||
"name": dev_name,
|
||
"fstype": block_device.get("fstype", "unknown"),
|
||
"size": block_device.get("size", "unknown"),
|
||
"mountpoint": mountpoint,
|
||
"uuid": block_device.get("uuid"),
|
||
"is_luks_open": True,
|
||
"parent": block_device.get("pkname", "") # 可能为空(旧版本 lsblk)
|
||
}
|
||
all_partitions.append(part_info)
|
||
|
||
for child in block_device.get("children", []):
|
||
_process_partition(child, all_disks, all_partitions, all_efi_partitions)
|
||
|
||
|
||
def scan_partitions() -> Tuple[bool, List, List, List, str]:
|
||
"""
|
||
扫描系统中的所有磁盘和分区。
|
||
支持标准分区、LVM、LUKS、btrfs 等。
|
||
"""
|
||
log_step("扫描系统分区", "使用 lsblk 获取磁盘和分区信息")
|
||
|
||
# 使用基础列名以确保兼容性(旧版本 lsblk 可能不支持 FSVER, ROTA, MODEL, PKNAME)
|
||
success, stdout, stderr = run_command(
|
||
["sudo", "lsblk", "-J", "-o", "NAME,FSTYPE,SIZE,MOUNTPOINT,UUID,PARTLABEL,LABEL,TYPE,PARTTYPE"],
|
||
"扫描分区"
|
||
)
|
||
|
||
if not success:
|
||
return False, [], [], [], stderr
|
||
|
||
try:
|
||
data = json.loads(stdout)
|
||
all_disks = []
|
||
all_partitions = []
|
||
all_efi_partitions = []
|
||
|
||
for block_device in data.get("blockdevices", []):
|
||
_process_partition(block_device, all_disks, all_partitions, all_efi_partitions)
|
||
|
||
log_info(f"扫描完成: 发现 {len(all_disks)} 个磁盘, {len(all_partitions)} 个分区, {len(all_efi_partitions)} 个EFI分区")
|
||
for d in all_disks:
|
||
log_debug(f" 磁盘: {d['name']} ({d.get('model', '')})")
|
||
for p in all_partitions:
|
||
log_debug(f" 分区: {p['name']} ({p['fstype']})")
|
||
for e in all_efi_partitions:
|
||
log_info(f" EFI分区: {e['name']}")
|
||
|
||
return True, all_disks, all_partitions, all_efi_partitions, ""
|
||
|
||
except json.JSONDecodeError as e:
|
||
log_error(f"解析 lsblk 输出失败: {e}")
|
||
return False, [], [], [], f"解析lsblk输出失败: {e}"
|
||
except Exception as e:
|
||
log_error(f"处理分区数据时发生未知错误: {e}")
|
||
return False, [], [], [], f"处理分区数据时发生未知错误: {e}"
|
||
|
||
|
||
def detect_filesystem_type(partition: str) -> str:
|
||
"""
|
||
检测指定分区的文件系统类型。
|
||
使用 blkid 获取准确的文件系统信息。
|
||
"""
|
||
try:
|
||
success, stdout, _ = run_command(
|
||
["blkid", "-s", "TYPE", "-o", "value", partition],
|
||
f"检测文件系统类型: {partition}",
|
||
timeout=10
|
||
)
|
||
if success:
|
||
fs_type = stdout.strip()
|
||
log_debug(f"{partition} 的文件系统类型: {fs_type}")
|
||
return fs_type
|
||
except Exception as e:
|
||
log_debug(f"检测文件系统类型失败: {e}")
|
||
|
||
return "unknown"
|
||
|
||
|
||
def is_btrfs_subvolume(mount_point: str) -> bool:
|
||
"""检查指定路径是否是 btrfs 子卷"""
|
||
try:
|
||
success, stdout, _ = run_command(
|
||
["btrfs", "subvolume", "get-default", mount_point],
|
||
"检查 btrfs 子卷",
|
||
timeout=10
|
||
)
|
||
return success
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
def get_btrfs_root_subvolume(partition: str) -> Optional[str]:
|
||
"""
|
||
获取 btrfs 分区的根子卷名称。
|
||
常见的子卷命名: @, @root, root
|
||
"""
|
||
try:
|
||
success, stdout, _ = run_command(
|
||
["btrfs", "subvolume", "list", partition],
|
||
f"获取 btrfs 子卷: {partition}",
|
||
timeout=10
|
||
)
|
||
if success:
|
||
subvolumes = []
|
||
for line in stdout.strip().split('\n'):
|
||
match = re.search(r'path\s+(\S+)', line)
|
||
if match:
|
||
subvolumes.append(match.group(1))
|
||
|
||
# 优先查找常见的根子卷名称
|
||
for candidate in ["@", "root", "@root", "ROOT", "@ROOT"]:
|
||
if candidate in subvolumes:
|
||
log_debug(f"找到 btrfs 根子卷: {candidate}")
|
||
return candidate
|
||
|
||
# 如果都没找到,返回第一个
|
||
if subvolumes:
|
||
log_debug(f"使用第一个 btrfs 子卷: {subvolumes[0]}")
|
||
return subvolumes[0]
|
||
except Exception as e:
|
||
log_debug(f"获取 btrfs 子卷失败: {e}")
|
||
|
||
return None
|
||
|
||
|
||
def _cleanup_partial_mount(mount_point: str, mounted_boot: bool, mounted_efi: bool,
|
||
bind_paths_mounted: List[str]) -> None:
|
||
"""清理部分挂载的资源"""
|
||
log_warning(f"清理部分挂载资源: {mount_point}")
|
||
|
||
for target_path in reversed(bind_paths_mounted):
|
||
if os.path.ismount(target_path):
|
||
run_command(["sudo", "umount", target_path], f"清理卸载绑定 {target_path}")
|
||
|
||
if mounted_efi:
|
||
efi_mount_point = os.path.join(mount_point, "boot/efi")
|
||
if os.path.ismount(efi_mount_point):
|
||
run_command(["sudo", "umount", efi_mount_point], "清理卸载EFI分区")
|
||
|
||
if mounted_boot:
|
||
boot_mount_point = os.path.join(mount_point, "boot")
|
||
if os.path.ismount(boot_mount_point):
|
||
run_command(["sudo", "umount", boot_mount_point], "清理卸载/boot分区")
|
||
|
||
if os.path.ismount(mount_point):
|
||
run_command(["sudo", "umount", mount_point], "清理卸载根分区")
|
||
|
||
if os.path.exists(mount_point) and not os.path.ismount(mount_point):
|
||
try:
|
||
os.rmdir(mount_point)
|
||
except OSError:
|
||
pass
|
||
|
||
|
||
def mount_target_system(root_partition: str, boot_partition: Optional[str] = None,
|
||
efi_partition: Optional[str] = None,
|
||
btrfs_subvolume: Optional[str] = None) -> Tuple[bool, str, str]:
|
||
"""
|
||
挂载目标系统的根分区、/boot分区和EFI分区到临时目录。
|
||
支持 btrfs 子卷挂载。
|
||
"""
|
||
log_step("挂载目标系统", f"根分区: {root_partition}, /boot: {boot_partition or '无'}, EFI: {efi_partition or '无'}")
|
||
|
||
if not validate_device_path(root_partition):
|
||
log_error(f"无效的根分区路径: {root_partition}")
|
||
return False, "", f"无效的根分区路径: {root_partition}"
|
||
if boot_partition and not validate_device_path(boot_partition):
|
||
log_error(f"无效的/boot分区路径: {boot_partition}")
|
||
return False, "", f"无效的/boot分区路径: {boot_partition}"
|
||
if efi_partition and not validate_device_path(efi_partition):
|
||
log_error(f"无效的EFI分区路径: {efi_partition}")
|
||
return False, "", f"无效的EFI分区路径: {efi_partition}"
|
||
|
||
mount_point = "/mnt_grub_repair_" + str(int(time.time()))
|
||
log_info(f"创建临时挂载点: {mount_point}")
|
||
|
||
try:
|
||
os.makedirs(mount_point, exist_ok=False)
|
||
log_info(f"✓ 挂载点创建成功")
|
||
except Exception as e:
|
||
log_error(f"创建挂载点失败: {e}")
|
||
return False, "", f"创建挂载点失败: {e}"
|
||
|
||
bind_paths_mounted = []
|
||
mounted_boot = False
|
||
mounted_efi = False
|
||
|
||
# 1. 挂载根分区
|
||
log_info(f"[1/4] 挂载根分区 {root_partition} 到 {mount_point}")
|
||
|
||
# 检测文件系统类型
|
||
fs_type = detect_filesystem_type(root_partition)
|
||
|
||
# 构建挂载命令
|
||
mount_cmd = ["sudo", "mount"]
|
||
|
||
# btrfs 子卷处理
|
||
if fs_type == "btrfs" and btrfs_subvolume:
|
||
mount_cmd.extend(["-o", f"subvol={btrfs_subvolume}"])
|
||
log_info(f" 使用 btrfs 子卷: {btrfs_subvolume}")
|
||
|
||
mount_cmd.extend([root_partition, mount_point])
|
||
|
||
success, _, stderr = run_command(mount_cmd, f"挂载根分区 {root_partition}")
|
||
if not success:
|
||
log_error(f"挂载根分区失败,清理中...")
|
||
os.rmdir(mount_point)
|
||
return False, "", f"挂载根分区失败: {stderr}"
|
||
|
||
if not os.path.ismount(mount_point):
|
||
log_error(f"挂载点未激活: {mount_point}")
|
||
return False, "", "挂载根分区失败: 挂载点未激活"
|
||
log_success(f"✓ 根分区挂载成功")
|
||
|
||
# 检查关键目录
|
||
for check_dir in ["etc", "boot"]:
|
||
full_path = os.path.join(mount_point, check_dir)
|
||
if os.path.exists(full_path):
|
||
log_info(f" 发现目录: /{check_dir}")
|
||
else:
|
||
log_warning(f" 未找到目录: /{check_dir}")
|
||
|
||
# 2. 挂载 /boot 分区
|
||
if boot_partition:
|
||
log_info(f"[2/4] 挂载 /boot 分区 {boot_partition}")
|
||
boot_mount_point = os.path.join(mount_point, "boot")
|
||
if not os.path.exists(boot_mount_point):
|
||
os.makedirs(boot_mount_point)
|
||
log_debug(f"创建 /boot 挂载点")
|
||
|
||
success, _, stderr = run_command(["sudo", "mount", boot_partition, boot_mount_point],
|
||
f"挂载 /boot 分区 {boot_partition}")
|
||
if not success:
|
||
log_error(f"挂载 /boot 分区失败,开始清理...")
|
||
_cleanup_partial_mount(mount_point, False, False, [])
|
||
return False, "", f"挂载 /boot 分区失败: {stderr}"
|
||
mounted_boot = True
|
||
log_success(f"✓ /boot 分区挂载成功")
|
||
else:
|
||
log_info(f"[2/4] 无独立的 /boot 分区,跳过")
|
||
|
||
# 3. 挂载 EFI 分区
|
||
if efi_partition:
|
||
log_info(f"[3/4] 挂载 EFI 分区 {efi_partition}")
|
||
efi_mount_point = os.path.join(mount_point, "boot/efi")
|
||
if not os.path.exists(efi_mount_point):
|
||
os.makedirs(efi_mount_point)
|
||
log_debug(f"创建 /boot/efi 挂载点")
|
||
|
||
success, _, stderr = run_command(["sudo", "mount", efi_partition, efi_mount_point],
|
||
f"挂载 EFI 分区 {efi_partition}")
|
||
if not success:
|
||
log_error(f"挂载 EFI 分区失败,开始清理...")
|
||
_cleanup_partial_mount(mount_point, mounted_boot, False, [])
|
||
return False, "", f"挂载 EFI 分区失败: {stderr}"
|
||
mounted_efi = True
|
||
log_success(f"✓ EFI 分区挂载成功")
|
||
|
||
# 检查 EFI 分区内容
|
||
efi_path = os.path.join(mount_point, "boot/efi/EFI")
|
||
if os.path.exists(efi_path):
|
||
log_info(f" EFI 分区内容:")
|
||
try:
|
||
for item in os.listdir(efi_path):
|
||
log_info(f" - {item}")
|
||
except Exception as e:
|
||
log_warning(f" 无法列出 EFI 目录: {e}")
|
||
else:
|
||
log_info(f"[3/4] 无 EFI 分区,跳过")
|
||
|
||
# 4. 绑定伪文件系统
|
||
log_info(f"[4/4] 绑定伪文件系统 (/dev, /proc, /sys 等)")
|
||
bind_paths = ["/dev", "/dev/pts", "/proc", "/sys", "/run"]
|
||
for path in bind_paths:
|
||
target_path = os.path.join(mount_point, path.lstrip('/'))
|
||
if not os.path.exists(target_path):
|
||
os.makedirs(target_path)
|
||
log_debug(f"创建目录: {target_path}")
|
||
|
||
success, _, stderr = run_command(["sudo", "mount", "--bind", path, target_path],
|
||
f"绑定 {path}")
|
||
if not success:
|
||
log_error(f"绑定 {path} 失败,开始清理...")
|
||
_cleanup_partial_mount(mount_point, mounted_boot, mounted_efi, bind_paths_mounted)
|
||
return False, "", f"绑定 {path} 失败: {stderr}"
|
||
bind_paths_mounted.append(target_path)
|
||
|
||
log_success(f"✓ 所有绑定完成")
|
||
log_info(f"挂载摘要: 根分区 ✓, /boot {'✓' if mounted_boot else '✗'}, EFI {'✓' if mounted_efi else '✗'}")
|
||
|
||
return True, mount_point, ""
|
||
|
||
|
||
def detect_distro_type(mount_point: str) -> str:
|
||
"""
|
||
检测目标系统发行版类型。
|
||
支持更多发行版:Arch, Debian, Ubuntu, CentOS/RHEL/Rocky/Alma,
|
||
Fedora, openSUSE, Void, Gentoo, NixOS 等。
|
||
"""
|
||
log_step("检测发行版类型", f"挂载点: {mount_point}")
|
||
|
||
os_release_path = os.path.join(mount_point, "etc/os-release")
|
||
log_info(f"检查文件: {os_release_path}")
|
||
|
||
if not os.path.exists(os_release_path):
|
||
log_warning(f"未找到 {os_release_path},尝试 /etc/issue")
|
||
issue_path = os.path.join(mount_point, "etc/issue")
|
||
if os.path.exists(issue_path):
|
||
try:
|
||
with open(issue_path, "r") as f:
|
||
content = f.read().lower()
|
||
log_debug(f"/etc/issue 内容: {content[:200]}")
|
||
if "ubuntu" in content:
|
||
log_info(f"通过 /etc/issue 检测到: ubuntu")
|
||
return "ubuntu"
|
||
elif "debian" in content:
|
||
log_info(f"通过 /etc/issue 检测到: debian")
|
||
return "debian"
|
||
elif "centos" in content or "rhel" in content:
|
||
log_info(f"通过 /etc/issue 检测到: centos")
|
||
return "centos"
|
||
elif "fedora" in content:
|
||
log_info(f"通过 /etc/issue 检测到: fedora")
|
||
return "fedora"
|
||
elif "void" in content:
|
||
log_info(f"通过 /etc/issue 检测到: void")
|
||
return "void"
|
||
elif "gentoo" in content:
|
||
log_info(f"通过 /etc/issue 检测到: gentoo")
|
||
return "gentoo"
|
||
except Exception as e:
|
||
log_warning(f"读取 /etc/issue 失败: {e}")
|
||
|
||
# 检查其他发行版特定文件
|
||
if os.path.exists(os.path.join(mount_point, "etc/arch-release")):
|
||
log_info("通过 arch-release 检测到: arch")
|
||
return "arch"
|
||
if os.path.exists(os.path.join(mount_point, "etc/gentoo-release")):
|
||
log_info("通过 gentoo-release 检测到: gentoo")
|
||
return "gentoo"
|
||
if os.path.exists(os.path.join(mount_point, "etc/nixos/configuration.nix")):
|
||
log_info("通过 configuration.nix 检测到: nixos")
|
||
return "nixos"
|
||
|
||
return "unknown"
|
||
|
||
try:
|
||
with open(os_release_path, "r") as f:
|
||
content = f.read()
|
||
log_debug(f"/etc/os-release 内容:\n{content}")
|
||
|
||
id_match = re.search(r'^ID=(.+)$', content, re.MULTILINE)
|
||
id_like_match = re.search(r'^ID_LIKE=(.+)$', content, re.MULTILINE)
|
||
|
||
distro_id = id_match.group(1).strip('"\'') if id_match else ""
|
||
id_like = id_like_match.group(1).strip('"\'') if id_like_match else ""
|
||
|
||
log_info(f"ID={distro_id}, ID_LIKE={id_like}")
|
||
|
||
# 直接匹配
|
||
distro_map = {
|
||
"ubuntu": "ubuntu",
|
||
"debian": "debian",
|
||
"arch": "arch",
|
||
"manjaro": "arch",
|
||
"endeavouros": "arch",
|
||
"garuda": "arch",
|
||
"cachyos": "arch",
|
||
"centos": "centos",
|
||
"rhel": "centos",
|
||
"rocky": "centos",
|
||
"almalinux": "centos",
|
||
"fedora": "fedora",
|
||
"opensuse": "opensuse",
|
||
"opensuse-leap": "opensuse",
|
||
"opensuse-tumbleweed": "opensuse",
|
||
"void": "void",
|
||
"gentoo": "gentoo",
|
||
"nixos": "nixos",
|
||
}
|
||
|
||
if distro_id in distro_map:
|
||
result = distro_map[distro_id]
|
||
log_info(f"检测到发行版: {result} (ID={distro_id})")
|
||
return result
|
||
|
||
# 通过 ID_LIKE 推断
|
||
like_map = {
|
||
"ubuntu": "ubuntu",
|
||
"debian": "debian",
|
||
"arch": "arch",
|
||
"rhel": "centos",
|
||
"centos": "centos",
|
||
"fedora": "fedora",
|
||
"suse": "opensuse",
|
||
}
|
||
|
||
for key, value in like_map.items():
|
||
if key in id_like:
|
||
log_info(f"通过 ID_LIKE 推断: {value}")
|
||
return value
|
||
|
||
log_warning(f"无法识别的发行版,ID={distro_id}, ID_LIKE={id_like}")
|
||
return "unknown"
|
||
except Exception as e:
|
||
log_error(f"读取 os-release 文件失败: {e}")
|
||
return "unknown"
|
||
|
||
|
||
def _get_grub_packages(distro_type: str) -> Tuple[List[str], str]:
|
||
"""
|
||
获取安装 GRUB 包所需的包列表和包管理器命令。
|
||
返回: (包列表, 包管理器基命令)
|
||
"""
|
||
package_map = {
|
||
"centos": (["grub2-tools", "grub2-pc"], "yum install -y"),
|
||
"fedora": (["grub2-tools", "grub2-pc"], "dnf install -y"),
|
||
"rhel": (["grub2-tools", "grub2-pc"], "yum install -y"),
|
||
"rocky": (["grub2-tools", "grub2-pc"], "dnf install -y"),
|
||
"almalinux": (["grub2-tools", "grub2-pc"], "dnf install -y"),
|
||
"debian": (["grub-pc"], "apt-get install -y"),
|
||
"ubuntu": (["grub-pc"], "apt-get install -y"),
|
||
"arch": (["grub"], "pacman -S --noconfirm"),
|
||
"manjaro": (["grub"], "pacman -S --noconfirm"),
|
||
"opensuse": (["grub2"], "zypper install -y"),
|
||
"void": (["grub"], "xbps-install -y"),
|
||
"gentoo": (["sys-boot/grub"], "emerge"),
|
||
}
|
||
return package_map.get(distro_type, (["grub"], "包管理器安装"))
|
||
|
||
|
||
def _auto_install_grub(mount_point: str, distro_type: str) -> bool:
|
||
"""
|
||
自动在 chroot 环境中安装 GRUB 包。
|
||
返回是否成功。
|
||
"""
|
||
log_step("自动安装 GRUB 包", f"发行版: {distro_type}")
|
||
|
||
packages, pkg_cmd = _get_grub_packages(distro_type)
|
||
|
||
if not packages:
|
||
log_warning(f"未知的发行版 '{distro_type}',无法自动安装 GRUB")
|
||
return False
|
||
|
||
chroot_cmd_prefix = ["sudo", "chroot", mount_point]
|
||
|
||
# 1. 首先检查网络连接(通过 ping)
|
||
log_info("检查网络连接...")
|
||
success, _, _ = run_command(
|
||
chroot_cmd_prefix + ["ping", "-c", "1", "8.8.8.8"],
|
||
"检查网络连接",
|
||
timeout=10
|
||
)
|
||
if not success:
|
||
# 尝试使用主机的网络配置
|
||
log_warning("chroot 环境无网络,尝试从 Live 环境复制 DNS 配置...")
|
||
|
||
# 复制 /etc/resolv.conf
|
||
resolv_source = "/etc/resolv.conf"
|
||
resolv_target = os.path.join(mount_point, "etc/resolv.conf")
|
||
if os.path.exists(resolv_source):
|
||
try:
|
||
shutil.copy2(resolv_source, resolv_target)
|
||
log_info("已复制 DNS 配置")
|
||
except Exception as e:
|
||
log_warning(f"复制 DNS 配置失败: {e}")
|
||
|
||
# 再次检查网络
|
||
success, _, _ = run_command(
|
||
chroot_cmd_prefix + ["ping", "-c", "1", "8.8.8.8"],
|
||
"再次检查网络连接",
|
||
timeout=10
|
||
)
|
||
if not success:
|
||
log_error("chroot 环境无法连接网络,无法自动安装 GRUB")
|
||
log_info("请手动安装 GRUB 包后再运行此工具")
|
||
return False
|
||
|
||
log_success("网络连接正常")
|
||
|
||
# 2. 更新包列表(某些发行版需要)
|
||
update_cmds = {
|
||
"debian": ["apt-get", "update"],
|
||
"ubuntu": ["apt-get", "update"],
|
||
"arch": ["pacman", "-Sy"],
|
||
"manjaro": ["pacman", "-Sy"],
|
||
}
|
||
|
||
if distro_type in update_cmds:
|
||
log_info(f"更新 {distro_type} 包列表...")
|
||
run_command(
|
||
chroot_cmd_prefix + update_cmds[distro_type],
|
||
f"更新 {distro_type} 包列表",
|
||
timeout=120
|
||
)
|
||
|
||
# 3. 安装 GRUB 包
|
||
log_info(f"安装 GRUB 包: {', '.join(packages)}")
|
||
|
||
install_cmd = chroot_cmd_prefix + pkg_cmd.split() + packages
|
||
success, stdout, stderr = run_command(
|
||
install_cmd,
|
||
f"安装 GRUB 包 ({' '.join(packages)})",
|
||
timeout=300
|
||
)
|
||
|
||
# 即使 yum 报告"已安装",也要检查实际命令是否存在
|
||
# CentOS/RHEL 的命令可能在 /usr/sbin/ 下,需要确保 PATH 包含该路径
|
||
if success:
|
||
log_info("包安装命令执行完成,检查命令是否可用...")
|
||
|
||
# 尝试查找 grub 相关命令的实际路径
|
||
grub_paths = [
|
||
"/usr/sbin/grub2-install",
|
||
"/usr/sbin/grub2-mkconfig",
|
||
"/usr/sbin/grub-install",
|
||
"/usr/sbin/grub-mkconfig",
|
||
"/sbin/grub2-install",
|
||
"/sbin/grub2-mkconfig",
|
||
]
|
||
|
||
found_any = False
|
||
for grub_path in grub_paths:
|
||
full_path = mount_point + grub_path
|
||
if os.path.exists(full_path):
|
||
log_info(f" 找到 GRUB 命令: {grub_path}")
|
||
found_any = True
|
||
|
||
if found_any:
|
||
log_success(f"✓ GRUB 包已安装")
|
||
return True
|
||
else:
|
||
log_warning("包管理器报告成功,但未找到 GRUB 命令")
|
||
log_info("尝试重新安装...")
|
||
|
||
# 强制重新安装
|
||
if distro_type in ["centos", "rhel", "fedora", "rocky", "almalinux"]:
|
||
reinstall_cmd = chroot_cmd_prefix + ["yum", "reinstall", "-y"] + packages
|
||
run_command(reinstall_cmd, "强制重新安装 GRUB 包", timeout=300)
|
||
|
||
# 再次检查
|
||
for grub_path in grub_paths:
|
||
full_path = mount_point + grub_path
|
||
if os.path.exists(full_path):
|
||
log_info(f" 找到 GRUB 命令: {grub_path}")
|
||
return True
|
||
|
||
log_error("仍无法找到 GRUB 命令")
|
||
return False
|
||
else:
|
||
log_error(f"✗ GRUB 包安装失败: {stderr}")
|
||
return False
|
||
|
||
|
||
def _get_grub_cmd_path(mount_point: str, distro_type: str) -> Tuple[Optional[str], Optional[str]]:
|
||
"""
|
||
查找 GRUB 命令的完整路径。
|
||
返回: (grub_install_path, grub_mkconfig_path) 或 (None, None)
|
||
"""
|
||
# 可能的命令名
|
||
install_names = ["grub2-install", "grub-install"]
|
||
mkconfig_names = ["grub2-mkconfig", "grub-mkconfig"]
|
||
|
||
# 可能的目录
|
||
dirs = ["/usr/sbin", "/sbin", "/usr/bin", "/bin"]
|
||
|
||
install_path = None
|
||
mkconfig_path = None
|
||
|
||
for name in install_names:
|
||
for dir_path in dirs:
|
||
full_path = os.path.join(mount_point, dir_path.lstrip('/'), name)
|
||
if os.path.exists(full_path):
|
||
install_path = os.path.join(dir_path, name)
|
||
break
|
||
if install_path:
|
||
break
|
||
|
||
for name in mkconfig_names:
|
||
for dir_path in dirs:
|
||
full_path = os.path.join(mount_point, dir_path.lstrip('/'), name)
|
||
if os.path.exists(full_path):
|
||
mkconfig_path = os.path.join(dir_path, name)
|
||
break
|
||
if mkconfig_path:
|
||
break
|
||
|
||
return install_path, mkconfig_path
|
||
|
||
|
||
# 全局变量存储找到的命令路径
|
||
_grub_install_cmd: Optional[str] = None
|
||
_grub_mkconfig_cmd: Optional[str] = None
|
||
|
||
|
||
def check_chroot_environment(mount_point: str, distro_type: str = "unknown") -> Tuple[bool, str]:
|
||
"""
|
||
检查 chroot 环境是否可用。
|
||
检测可用的 GRUB 命令及其版本。
|
||
如果缺少 GRUB 命令,尝试自动安装。
|
||
"""
|
||
global _grub_install_cmd, _grub_mkconfig_cmd
|
||
|
||
log_step("检查 chroot 环境", f"挂载点: {mount_point}")
|
||
|
||
# 首先尝试找到命令路径
|
||
install_path, mkconfig_path = _get_grub_cmd_path(mount_point, distro_type)
|
||
|
||
if install_path and mkconfig_path:
|
||
log_info(f" ✓ 找到 grub-install: {install_path}")
|
||
log_info(f" ✓ 找到 grub-mkconfig: {mkconfig_path}")
|
||
_grub_install_cmd = install_path
|
||
_grub_mkconfig_cmd = mkconfig_path
|
||
else:
|
||
log_warning("未找到 GRUB 命令,尝试自动安装...")
|
||
|
||
# 尝试自动安装
|
||
if _auto_install_grub(mount_point, distro_type):
|
||
# 重新查找
|
||
install_path, mkconfig_path = _get_grub_cmd_path(mount_point, distro_type)
|
||
if install_path and mkconfig_path:
|
||
log_success("✓ 自动安装成功,GRUB 命令已可用")
|
||
_grub_install_cmd = install_path
|
||
_grub_mkconfig_cmd = mkconfig_path
|
||
else:
|
||
log_error("✗ 自动安装后仍未找到 GRUB 命令")
|
||
return False, "自动安装 GRUB 失败,请手动安装"
|
||
else:
|
||
log_error("=" * 60)
|
||
log_error(f"无法自动安装 GRUB 包")
|
||
log_error(f"")
|
||
log_error(f"请手动安装 GRUB 包:")
|
||
packages, pkg_cmd = _get_grub_packages(distro_type)
|
||
log_error(f" chroot {mount_point}")
|
||
log_error(f" {pkg_cmd} {' '.join(packages)}")
|
||
log_error(f" exit")
|
||
log_error(f"=" * 60)
|
||
return False, "自动安装 GRUB 失败"
|
||
|
||
# 检查 grub-install 版本
|
||
success, stdout, _ = run_command(["sudo", "chroot", mount_point, _grub_install_cmd, "--version"],
|
||
"检查 grub-install 版本", timeout=10)
|
||
if success:
|
||
log_info(f"grub-install 版本: {stdout.strip()}")
|
||
|
||
# 检查 EFI 目录(UEFI 模式)
|
||
efi_dir = os.path.join(mount_point, "boot/efi")
|
||
if os.path.ismount(efi_dir):
|
||
log_info(f"✓ EFI 分区已挂载到 /boot/efi")
|
||
try:
|
||
stat = os.stat(efi_dir)
|
||
log_debug(f" EFI 目录权限: {oct(stat.st_mode)}")
|
||
except Exception as e:
|
||
log_warning(f" 无法获取 EFI 目录权限: {e}")
|
||
else:
|
||
log_info(f"EFI 分区未挂载(可能是 BIOS 模式)")
|
||
|
||
# 检查 Secure Boot 相关工具
|
||
sb_tools = ["mokutil", "sbsign", "sbverify"]
|
||
for tool in sb_tools:
|
||
success, _, _ = run_command(["sudo", "chroot", mount_point, "which", tool],
|
||
f"检查 Secure Boot 工具 {tool}", timeout=5)
|
||
if success:
|
||
log_info(f" ✓ 找到 Secure Boot 工具: {tool}")
|
||
|
||
return True, ""
|
||
|
||
|
||
def check_and_install_efi_modules(mount_point: str, distro_type: str, is_uefi: bool) -> Tuple[bool, str]:
|
||
"""
|
||
检查并安装 UEFI GRUB 模块。
|
||
对于 UEFI 模式,需要检查 /usr/lib/grub/x86_64-efi/ 目录是否存在。
|
||
如果不存在,自动安装 grub2-efi 包。
|
||
|
||
返回: (是否成功, 错误信息)
|
||
"""
|
||
if not is_uefi:
|
||
return True, ""
|
||
|
||
log_step("检查 UEFI GRUB 模块")
|
||
|
||
# 检测架构
|
||
efi_params = get_grub_efi_parameters()
|
||
if not efi_params:
|
||
return False, "无法确定 EFI 架构"
|
||
|
||
efi_target, _, _ = efi_params
|
||
# efi_target 格式: x86_64-efi, i386-efi, arm64-efi
|
||
arch_dir = efi_target # 如: x86_64-efi
|
||
|
||
# 检查模块目录
|
||
module_dir = os.path.join(mount_point, f"usr/lib/grub/{arch_dir}")
|
||
modinfo_file = os.path.join(module_dir, "modinfo.sh")
|
||
|
||
if os.path.exists(modinfo_file):
|
||
log_success(f"✓ UEFI GRUB 模块已存在: {module_dir}")
|
||
return True, ""
|
||
|
||
log_warning(f"UEFI GRUB 模块不存在: {module_dir}")
|
||
log_info("尝试安装 UEFI GRUB 模块包...")
|
||
|
||
chroot_cmd_prefix = ["sudo", "chroot", mount_point]
|
||
|
||
# 根据发行版选择正确的 EFI 包
|
||
# 注意:CentOS/RHEL 8+ 需要 shim 包来支持 UEFI Secure Boot
|
||
efi_packages_map = {
|
||
"centos": ["grub2-efi-x64", "grub2-efi-x64-modules", "shim-x64"],
|
||
"rhel": ["grub2-efi-x64", "grub2-efi-x64-modules", "shim-x64"],
|
||
"fedora": ["grub2-efi-x64", "grub2-efi-x64-modules", "shim-x64"],
|
||
"rocky": ["grub2-efi-x64", "grub2-efi-x64-modules", "shim-x64"],
|
||
"almalinux": ["grub2-efi-x64", "grub2-efi-x64-modules", "shim-x64"],
|
||
"debian": ["grub-efi-amd64", "shim-signed"],
|
||
"ubuntu": ["grub-efi-amd64", "shim-signed"],
|
||
"arch": ["grub", "efibootmgr"], # Arch 的 grub 包包含 EFI 支持
|
||
"manjaro": ["grub", "efibootmgr"],
|
||
}
|
||
|
||
efi_packages = efi_packages_map.get(distro_type, [])
|
||
|
||
if not efi_packages:
|
||
log_warning(f"未知的发行版 '{distro_type}',无法自动安装 EFI 模块")
|
||
return False, f"无法确定 {distro_type} 的 EFI 包名"
|
||
|
||
log_info(f"将安装以下 EFI 包: {', '.join(efi_packages)}")
|
||
|
||
# 安装 EFI 包前,先确保网络连接(复制 DNS 配置)
|
||
resolv_source = "/etc/resolv.conf"
|
||
resolv_target = os.path.join(mount_point, "etc/resolv.conf")
|
||
if os.path.exists(resolv_source):
|
||
try:
|
||
shutil.copy2(resolv_source, resolv_target)
|
||
log_info("已复制 DNS 配置到 chroot 环境")
|
||
except Exception as e:
|
||
log_debug(f"复制 DNS 配置失败: {e}")
|
||
|
||
# 安装 EFI 包
|
||
if distro_type in ["centos", "rhel", "fedora", "rocky", "almalinux"]:
|
||
success, _, stderr = run_command(
|
||
chroot_cmd_prefix + ["yum", "install", "-y"] + efi_packages,
|
||
f"安装 UEFI GRUB 包",
|
||
timeout=300
|
||
)
|
||
elif distro_type in ["debian", "ubuntu"]:
|
||
success, _, stderr = run_command(
|
||
chroot_cmd_prefix + ["apt-get", "update"],
|
||
f"更新包列表",
|
||
timeout=120
|
||
)
|
||
success, _, stderr = run_command(
|
||
chroot_cmd_prefix + ["apt-get", "install", "-y"] + efi_packages,
|
||
f"安装 UEFI GRUB 包",
|
||
timeout=300
|
||
)
|
||
elif distro_type in ["arch", "manjaro"]:
|
||
success, _, stderr = run_command(
|
||
chroot_cmd_prefix + ["pacman", "-Sy", "--noconfirm"] + efi_packages,
|
||
f"安装 UEFI GRUB 包",
|
||
timeout=300
|
||
)
|
||
else:
|
||
return False, f"不支持的发行版: {distro_type}"
|
||
|
||
if not success:
|
||
log_error(f"UEFI GRUB 包安装失败")
|
||
return False, f"无法安装 UEFI GRUB 包: {stderr}"
|
||
|
||
# 重新检查模块
|
||
if os.path.exists(modinfo_file):
|
||
log_success(f"✓ UEFI GRUB 模块安装成功")
|
||
return True, ""
|
||
else:
|
||
log_error("UEFI GRUB 模块安装后仍不存在")
|
||
return False, "UEFI 模块安装失败"
|
||
|
||
|
||
def _manual_install_efi_files(mount_point: str, efi_target: str, efi_grub_file: str,
|
||
efi_boot_file: str, bootloader_id: str = "GRUB") -> bool:
|
||
"""
|
||
手动安装 EFI 文件(当 grub2-install 因为 Secure Boot 不支持时使用)。
|
||
直接从 /boot/efi/EFI/<distro>/ 复制已有的 EFI 文件,或使用 shim 链式加载。
|
||
|
||
返回: 是否成功
|
||
"""
|
||
log_step("手动安装 EFI 文件", f"目标: {efi_grub_file}")
|
||
|
||
efi_firmware_dir = os.path.join(mount_point, "boot/efi/EFI")
|
||
|
||
# 创建目标目录
|
||
target_dir = os.path.join(efi_firmware_dir, bootloader_id)
|
||
try:
|
||
os.makedirs(target_dir, exist_ok=True)
|
||
log_info(f"创建/确认 EFI 目录: {target_dir}")
|
||
except Exception as e:
|
||
log_error(f"创建 EFI 目录失败: {e}")
|
||
return False
|
||
|
||
# 查找源 EFI 文件
|
||
# 可能的源路径:
|
||
source_paths = []
|
||
|
||
# 1. 从 /boot/efi/EFI/<distro> 复制
|
||
for subdir in ["centos", "redhat", "fedora", "rocky", "almalinux", "BOOT", "boot"]:
|
||
source_paths.append(os.path.join(efi_firmware_dir, subdir, efi_grub_file))
|
||
source_paths.append(os.path.join(efi_firmware_dir, subdir, "shimx64.efi"))
|
||
source_paths.append(os.path.join(efi_firmware_dir, subdir, "shim.efi"))
|
||
source_paths.append(os.path.join(efi_firmware_dir, subdir, "grubx64.efi"))
|
||
|
||
# 2. 从 /boot/efi/ 直接查找
|
||
source_paths.append(os.path.join(mount_point, "boot/efi", efi_grub_file))
|
||
source_paths.append(os.path.join(mount_point, "boot/efi", "shimx64.efi"))
|
||
|
||
# 3. 从 /usr/lib/grub/x86_64-efi/ 复制 grubx64.efi
|
||
grub_modules_dir = os.path.join(mount_point, f"usr/lib/grub/{efi_target}")
|
||
source_paths.append(os.path.join(grub_modules_dir, "grubx64.efi"))
|
||
|
||
# 4. CentOS/RHEL 特定的路径
|
||
source_paths.append(os.path.join(mount_point, "boot/efi/EFI/centos/grubx64.efi"))
|
||
source_paths.append(os.path.join(mount_point, "boot/efi/EFI/centos/shimx64.efi"))
|
||
source_paths.append(os.path.join(mount_point, "usr/share/grub2/grubx64.efi"))
|
||
|
||
# 5. 尝试查找任何 .efi 文件
|
||
try:
|
||
if os.path.exists(efi_firmware_dir):
|
||
for root, dirs, files in os.walk(efi_firmware_dir):
|
||
for f in files:
|
||
if f.endswith('.efi'):
|
||
source_paths.append(os.path.join(root, f))
|
||
except:
|
||
pass
|
||
|
||
# 查找 shim 和 grub
|
||
shim_source = None
|
||
grub_source = None
|
||
|
||
for path in source_paths:
|
||
if os.path.exists(path):
|
||
log_info(f"找到 EFI 文件: {path}")
|
||
if "shim" in path.lower():
|
||
shim_source = path
|
||
elif "grub" in path.lower():
|
||
grub_source = path
|
||
|
||
# 复制文件
|
||
try:
|
||
if grub_source:
|
||
grub_target = os.path.join(target_dir, efi_grub_file)
|
||
shutil.copy2(grub_source, grub_target)
|
||
log_success(f"✓ 复制 GRUB: {grub_source} -> {grub_target}")
|
||
|
||
if shim_source:
|
||
# 如果使用 shim,复制 shim 并重命名为 grubx64.efi
|
||
# 或者保留 shim 文件名并在 NVRAM 中注册
|
||
shim_target = os.path.join(target_dir, "shimx64.efi")
|
||
shutil.copy2(shim_source, shim_target)
|
||
log_success(f"✓ 复制 Shim: {shim_source} -> {shim_target}")
|
||
|
||
# 同时复制为 bootloader 名称,便于启动
|
||
if efi_grub_file != "shimx64.efi":
|
||
shutil.copy2(shim_source, os.path.join(target_dir, efi_grub_file))
|
||
|
||
# 如果没有找到任何文件,尝试从 grub 模块生成
|
||
if not grub_source and not shim_source:
|
||
log_warning("未找到现有 EFI 文件,尝试使用 grub-mkimage 生成...")
|
||
|
||
# 获取模块列表(必需模块)
|
||
modules = "part_gpt part_msdos fat ext2 xfs btrfs normal boot linux configfile search search_fs_uuid search_fs_file"
|
||
|
||
# 构建 chroot 内的输出路径
|
||
chroot_output_path = f"/boot/efi/EFI/{bootloader_id}/{efi_grub_file}"
|
||
|
||
# 尝试使用 grub-mkimage 生成 EFI 文件
|
||
chroot_cmd_prefix = ["sudo", "chroot", mount_point]
|
||
mkimage_cmd = chroot_cmd_prefix + [
|
||
"grub2-mkimage",
|
||
"-o", chroot_output_path,
|
||
"-O", efi_target,
|
||
"-p", "/boot/grub2",
|
||
] + modules.split()
|
||
|
||
success, _, stderr = run_command(
|
||
mkimage_cmd,
|
||
"使用 grub-mkimage 生成 EFI 文件",
|
||
timeout=60
|
||
)
|
||
|
||
# 检查主机路径上的文件是否生成成功
|
||
host_output_path = os.path.join(mount_point, chroot_output_path.lstrip('/'))
|
||
if success and os.path.exists(host_output_path):
|
||
log_success(f"✓ 成功生成 EFI 文件: {host_output_path}")
|
||
else:
|
||
log_error(f"生成 EFI 文件失败: {stderr}")
|
||
return False
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
log_error(f"手动安装 EFI 文件失败: {e}")
|
||
return False
|
||
|
||
|
||
def install_efi_fallback(mount_point: str, efi_target: str, efi_grub_file: str,
|
||
efi_boot_file: str, bootloader_id: str = "GRUB") -> bool:
|
||
"""
|
||
安装 EFI fallback 引导文件。
|
||
将 GRUB EFI 文件复制到 /EFI/Boot/bootx64.efi,这是 UEFI 的通用回退路径。
|
||
参考 Calamares 的实现。
|
||
"""
|
||
log_step("安装 EFI Fallback", f"目标文件: {efi_boot_file}")
|
||
|
||
efi_firmware_dir = os.path.join(mount_point, "boot/efi/EFI")
|
||
|
||
# 处理 VFAT 大小写问题(某些固件对大小写敏感)
|
||
# 检查 EFI 目录的实际大小写
|
||
if os.path.exists(efi_firmware_dir):
|
||
try:
|
||
entries = os.listdir(os.path.join(mount_point, "boot/efi"))
|
||
for entry in entries:
|
||
if entry.upper() == "EFI":
|
||
efi_firmware_dir = os.path.join(mount_point, "boot/efi", entry)
|
||
break
|
||
except Exception as e:
|
||
log_debug(f"检查 EFI 目录大小写失败: {e}")
|
||
|
||
# 创建 Boot 目录(处理大小写)
|
||
boot_dir = os.path.join(efi_firmware_dir, "Boot")
|
||
try:
|
||
if os.path.exists(efi_firmware_dir):
|
||
entries = os.listdir(efi_firmware_dir)
|
||
for entry in entries:
|
||
if entry.upper() == "BOOT":
|
||
boot_dir = os.path.join(efi_firmware_dir, entry)
|
||
break
|
||
except Exception:
|
||
pass
|
||
|
||
if not os.path.exists(boot_dir):
|
||
try:
|
||
os.makedirs(boot_dir)
|
||
log_info(f"创建 Boot 目录: {boot_dir}")
|
||
except Exception as e:
|
||
log_error(f"创建 Boot 目录失败: {e}")
|
||
return False
|
||
|
||
# 源文件路径
|
||
source_paths = [
|
||
os.path.join(efi_firmware_dir, bootloader_id, efi_grub_file),
|
||
os.path.join(efi_firmware_dir, bootloader_id.lower(), efi_grub_file.lower()),
|
||
]
|
||
|
||
source_file = None
|
||
for path in source_paths:
|
||
if os.path.exists(path):
|
||
source_file = path
|
||
break
|
||
|
||
if not source_file:
|
||
log_warning(f"找不到源 EFI 文件,尝试查找任何 .efi 文件")
|
||
# 尝试在 bootloader_id 目录下找到任何 .efi 文件
|
||
for subdir in [bootloader_id, bootloader_id.lower(), "grub", "GRUB"]:
|
||
grub_dir = os.path.join(efi_firmware_dir, subdir)
|
||
if os.path.exists(grub_dir):
|
||
try:
|
||
for f in os.listdir(grub_dir):
|
||
if f.endswith(".efi"):
|
||
source_file = os.path.join(grub_dir, f)
|
||
log_info(f"找到替代源文件: {source_file}")
|
||
break
|
||
except Exception:
|
||
pass
|
||
if source_file:
|
||
break
|
||
|
||
if not source_file:
|
||
log_error(f"无法找到源 EFI 文件")
|
||
return False
|
||
|
||
# 目标文件路径
|
||
target_file = os.path.join(boot_dir, efi_boot_file)
|
||
|
||
try:
|
||
shutil.copy2(source_file, target_file)
|
||
log_success(f"✓ EFI fallback 安装成功: {source_file} -> {target_file}")
|
||
return True
|
||
except Exception as e:
|
||
log_error(f"复制 EFI fallback 文件失败: {e}")
|
||
return False
|
||
|
||
|
||
def check_and_restore_kernel(mount_point: str, distro_type: str, has_separate_boot: bool) -> Tuple[bool, str]:
|
||
"""
|
||
检查并恢复 /boot 目录下的内核文件。
|
||
如果内核文件缺失,尝试从 /usr/lib/modules 复制或重新安装内核包。
|
||
|
||
返回: (是否成功, 错误信息)
|
||
"""
|
||
log_step("检查内核文件", f"独立 /boot 分区: {has_separate_boot}")
|
||
|
||
boot_dir = os.path.join(mount_point, "boot")
|
||
|
||
# 检查现有的内核文件
|
||
vmlinuz_files = []
|
||
initramfs_files = []
|
||
|
||
try:
|
||
if os.path.exists(boot_dir):
|
||
for f in os.listdir(boot_dir):
|
||
if f.startswith("vmlinuz-"):
|
||
vmlinuz_files.append(f)
|
||
elif f.startswith("initramfs-") and f.endswith(".img"):
|
||
initramfs_files.append(f)
|
||
except Exception as e:
|
||
log_warning(f"检查 /boot 目录失败: {e}")
|
||
|
||
log_info(f"发现 {len(vmlinuz_files)} 个内核文件, {len(initramfs_files)} 个 initramfs 文件")
|
||
|
||
# 如果内核文件存在,检查是否完整
|
||
if vmlinuz_files and initramfs_files:
|
||
log_success("✓ 内核文件已存在")
|
||
return True, ""
|
||
|
||
log_warning("内核文件缺失,尝试恢复...")
|
||
|
||
chroot_cmd_prefix = ["sudo", "chroot", mount_point]
|
||
|
||
# 方法1: 从 /usr/lib/modules 复制内核(如果存在)
|
||
usr_lib_modules = os.path.join(mount_point, "usr/lib/modules")
|
||
if os.path.exists(usr_lib_modules):
|
||
log_info("检查 /usr/lib/modules 中的内核...")
|
||
try:
|
||
kernel_versions = []
|
||
for d in os.listdir(usr_lib_modules):
|
||
# 跳过 rescue 和普通目录,查找版本号格式的目录
|
||
if os.path.isdir(os.path.join(usr_lib_modules, d)) and not d.endswith("kdump"):
|
||
kernel_versions.append(d)
|
||
|
||
if kernel_versions:
|
||
log_info(f"找到内核版本: {kernel_versions}")
|
||
|
||
for kernel_ver in kernel_versions:
|
||
# 检查该版本的内核文件是否存在
|
||
kernel_source = os.path.join(usr_lib_modules, kernel_ver, "vmlinuz")
|
||
if not os.path.exists(kernel_source):
|
||
# 某些系统使用不同的路径
|
||
kernel_source = os.path.join(mount_point, f"usr/lib/modules/{kernel_ver}/vmlinuz")
|
||
|
||
if os.path.exists(kernel_source):
|
||
kernel_target = os.path.join(boot_dir, f"vmlinuz-{kernel_ver}")
|
||
log_info(f"复制内核: {kernel_source} -> {kernel_target}")
|
||
try:
|
||
shutil.copy2(kernel_source, kernel_target)
|
||
log_success(f"✓ 复制内核成功: vmlinuz-{kernel_ver}")
|
||
except Exception as e:
|
||
log_error(f"复制内核失败: {e}")
|
||
else:
|
||
log_warning("/usr/lib/modules 中没有找到内核")
|
||
except Exception as e:
|
||
log_warning(f"检查 /usr/lib/modules 失败: {e}")
|
||
|
||
# 重新检查内核文件
|
||
vmlinuz_files = []
|
||
try:
|
||
for f in os.listdir(boot_dir):
|
||
if f.startswith("vmlinuz-"):
|
||
vmlinuz_files.append(f)
|
||
except:
|
||
pass
|
||
|
||
# 方法2: 重新安装内核包(如果方法1失败)
|
||
if not vmlinuz_files:
|
||
log_info("尝试重新安装内核包...")
|
||
|
||
# 根据发行版选择正确的包名
|
||
kernel_packages = {
|
||
"centos": ["kernel-core", "kernel-modules"],
|
||
"rhel": ["kernel-core", "kernel-modules"],
|
||
"fedora": ["kernel-core", "kernel-modules"],
|
||
"rocky": ["kernel-core", "kernel-modules"],
|
||
"almalinux": ["kernel-core", "kernel-modules"],
|
||
"debian": ["linux-image-generic"],
|
||
"ubuntu": ["linux-image-generic"],
|
||
"arch": ["linux"],
|
||
"manjaro": ["linux"],
|
||
}
|
||
|
||
packages = kernel_packages.get(distro_type, ["kernel"])
|
||
|
||
if distro_type in ["centos", "rhel", "fedora", "rocky", "almalinux"]:
|
||
success, _, stderr = run_command(
|
||
chroot_cmd_prefix + ["yum", "reinstall", "-y"] + packages,
|
||
f"重新安装内核包",
|
||
timeout=300
|
||
)
|
||
elif distro_type in ["debian", "ubuntu"]:
|
||
success, _, stderr = run_command(
|
||
chroot_cmd_prefix + ["apt-get", "install", "--reinstall", "-y"] + packages,
|
||
f"重新安装内核包",
|
||
timeout=300
|
||
)
|
||
elif distro_type in ["arch", "manjaro"]:
|
||
success, _, stderr = run_command(
|
||
chroot_cmd_prefix + ["pacman", "-S", "--noconfirm"] + packages,
|
||
f"重新安装内核包",
|
||
timeout=300
|
||
)
|
||
else:
|
||
log_warning(f"不支持的发行版 '{distro_type}',无法自动安装内核")
|
||
return False, "无法自动恢复内核文件"
|
||
|
||
if not success:
|
||
log_error("内核包安装失败")
|
||
return False, "内核包安装失败"
|
||
|
||
# 方法3: 重新生成 initramfs
|
||
log_info("重新生成 initramfs...")
|
||
|
||
# 获取当前存在的内核版本
|
||
vmlinuz_files = []
|
||
try:
|
||
for f in os.listdir(boot_dir):
|
||
if f.startswith("vmlinuz-"):
|
||
vmlinuz_files.append(f)
|
||
except:
|
||
pass
|
||
|
||
if not vmlinuz_files:
|
||
log_error("无法找到内核文件,无法生成 initramfs")
|
||
return False, "内核文件缺失"
|
||
|
||
for vmlinuz in vmlinuz_files:
|
||
kernel_ver = vmlinuz.replace("vmlinuz-", "")
|
||
log_info(f"为内核 {kernel_ver} 生成 initramfs...")
|
||
|
||
if distro_type in ["centos", "rhel", "fedora", "rocky", "almalinux"]:
|
||
# CentOS/RHEL 使用 dracut
|
||
success, _, stderr = run_command(
|
||
chroot_cmd_prefix + ["dracut", "-f", f"/boot/initramfs-{kernel_ver}.img", kernel_ver],
|
||
f"生成 initramfs for {kernel_ver}",
|
||
timeout=120
|
||
)
|
||
elif distro_type in ["debian", "ubuntu"]:
|
||
# Debian/Ubuntu 使用 update-initramfs
|
||
success, _, stderr = run_command(
|
||
chroot_cmd_prefix + ["update-initramfs", "-c", "-k", kernel_ver],
|
||
f"生成 initramfs for {kernel_ver}",
|
||
timeout=120
|
||
)
|
||
elif distro_type in ["arch", "manjaro"]:
|
||
# Arch 使用 mkinitcpio
|
||
success, _, stderr = run_command(
|
||
chroot_cmd_prefix + ["mkinitcpio", "-p", kernel_ver],
|
||
f"生成 initramfs for {kernel_ver}",
|
||
timeout=120
|
||
)
|
||
else:
|
||
# 通用方法:尝试 dracut
|
||
success, _, stderr = run_command(
|
||
chroot_cmd_prefix + ["dracut", "-f", f"/boot/initramfs-{kernel_ver}.img", kernel_ver],
|
||
f"生成 initramfs for {kernel_ver}",
|
||
timeout=120
|
||
|
||
)
|
||
|
||
if success:
|
||
log_success(f"✓ initramfs 生成成功: {kernel_ver}")
|
||
else:
|
||
log_warning(f"initramfs 生成失败: {stderr}")
|
||
|
||
# 最终检查
|
||
try:
|
||
vmlinuz_count = sum(1 for f in os.listdir(boot_dir) if f.startswith("vmlinuz-"))
|
||
initramfs_count = sum(1 for f in os.listdir(boot_dir) if f.startswith("initramfs-") and f.endswith(".img"))
|
||
|
||
log_info(f"恢复完成: {vmlinuz_count} 个内核, {initramfs_count} 个 initramfs")
|
||
|
||
if vmlinuz_count > 0:
|
||
return True, ""
|
||
else:
|
||
return False, "内核恢复失败"
|
||
except Exception as e:
|
||
return False, f"检查 /boot 目录失败: {e}"
|
||
|
||
|
||
def restore_bls_entries(mount_point: str, distro_type: str) -> Tuple[bool, str]:
|
||
"""
|
||
恢复 BLS (Boot Loader Specification) 配置条目。
|
||
CentOS/RHEL/Fedora 等使用 BLS 格式,启动项在 /boot/loader/entries/*.conf
|
||
|
||
返回: (是否成功, 错误信息)
|
||
"""
|
||
# 检查是否使用 BLS
|
||
loader_dir = os.path.join(mount_point, "boot/loader/entries")
|
||
grub_cfg_path = os.path.join(mount_point, "boot/grub2/grub.cfg")
|
||
|
||
# 检查 grub.cfg 是否包含 blscfg
|
||
uses_bls = False
|
||
try:
|
||
if os.path.exists(grub_cfg_path):
|
||
with open(grub_cfg_path, 'r') as f:
|
||
content = f.read()
|
||
if 'blscfg' in content or 'BootLoaderSpec' in content:
|
||
uses_bls = True
|
||
log_info("检测到系统使用 BLS (Boot Loader Specification)")
|
||
except Exception as e:
|
||
log_debug(f"检查 BLS 失败: {e}")
|
||
|
||
if not uses_bls:
|
||
log_debug("系统不使用 BLS,跳过 BLS 恢复")
|
||
return True, ""
|
||
|
||
log_step("恢复 BLS 启动条目")
|
||
|
||
chroot_cmd_prefix = ["sudo", "chroot", mount_point]
|
||
|
||
# 方法1: 使用 kernel-install 重新生成 BLS 条目(推荐)
|
||
log_info("尝试使用 kernel-install 重新生成 BLS 条目...")
|
||
|
||
# 获取已安装的内核版本
|
||
boot_dir = os.path.join(mount_point, "boot")
|
||
kernel_versions = []
|
||
|
||
try:
|
||
for f in os.listdir(boot_dir):
|
||
if f.startswith("vmlinuz-"):
|
||
kernel_ver = f.replace("vmlinuz-", "")
|
||
kernel_versions.append(kernel_ver)
|
||
except Exception as e:
|
||
log_warning(f"读取 /boot 目录失败: {e}")
|
||
|
||
if not kernel_versions:
|
||
log_error("没有找到内核版本,无法生成 BLS 条目")
|
||
return False, "没有内核版本"
|
||
|
||
log_info(f"发现内核版本: {kernel_versions}")
|
||
|
||
# 创建 loader 目录
|
||
os.makedirs(loader_dir, exist_ok=True)
|
||
|
||
for kernel_ver in kernel_versions:
|
||
# 检查是否已有 BLS 条目
|
||
entry_file = os.path.join(loader_dir, f"{kernel_ver}.conf")
|
||
if os.path.exists(entry_file):
|
||
log_info(f"BLS 条目已存在: {entry_file}")
|
||
continue
|
||
|
||
log_info(f"为内核 {kernel_ver} 生成 BLS 条目...")
|
||
|
||
# 尝试使用 kernel-install
|
||
success, _, stderr = run_command(
|
||
chroot_cmd_prefix + ["kernel-install", "add", kernel_ver, f"/boot/vmlinuz-{kernel_ver}"],
|
||
f"生成 BLS 条目 for {kernel_ver}",
|
||
timeout=30
|
||
)
|
||
|
||
if success:
|
||
log_success(f"✓ kernel-install 成功: {kernel_ver}")
|
||
else:
|
||
log_warning(f"kernel-install 失败,尝试手动创建 BLS 条目...")
|
||
|
||
# 方法2: 手动创建 BLS 条目
|
||
machine_id = ""
|
||
try:
|
||
machine_id_path = os.path.join(mount_point, "etc/machine-id")
|
||
if os.path.exists(machine_id_path):
|
||
with open(machine_id_path, 'r') as f:
|
||
machine_id = f.read().strip()
|
||
except:
|
||
pass
|
||
|
||
if not machine_id:
|
||
machine_id = "unknown"
|
||
|
||
# 读取 /etc/os-release 获取标题
|
||
os_name = "Linux"
|
||
os_version = ""
|
||
try:
|
||
os_release_path = os.path.join(mount_point, "etc/os-release")
|
||
if os.path.exists(os_release_path):
|
||
with open(os_release_path, 'r') as f:
|
||
for line in f:
|
||
if line.startswith('NAME='):
|
||
os_name = line.split('=')[1].strip().strip('"')
|
||
elif line.startswith('VERSION_ID='):
|
||
os_version = line.split('=')[1].strip().strip('"')
|
||
except:
|
||
pass
|
||
|
||
title = f"{os_name}"
|
||
if os_version:
|
||
title += f" {os_version}"
|
||
|
||
# 获取根分区 UUID 或路径
|
||
root_device = "/dev/mapper/cl-root" # 默认,应该根据实际检测
|
||
|
||
# 尝试从 /etc/fstab 获取根分区
|
||
try:
|
||
fstab_path = os.path.join(mount_point, "etc/fstab")
|
||
if os.path.exists(fstab_path):
|
||
with open(fstab_path, 'r') as f:
|
||
for line in f:
|
||
if line.startswith('/') and ' / ' in line:
|
||
parts = line.split()
|
||
if len(parts) >= 2 and parts[1] == '/':
|
||
root_device = parts[0]
|
||
break
|
||
except:
|
||
pass
|
||
|
||
# 创建 BLS 条目文件
|
||
bls_content = f"""title {title} ({kernel_ver})
|
||
version {kernel_ver}
|
||
linux /vmlinuz-{kernel_ver}
|
||
initrd /initramfs-{kernel_ver}.img
|
||
options root={root_device} ro
|
||
"""
|
||
|
||
# 对于 CentOS/RHEL,添加特定的 LVM 和 resume 参数
|
||
if distro_type in ["centos", "rhel", "fedora", "rocky", "almalinux"]:
|
||
bls_content = f"""title {title} ({kernel_ver})
|
||
version {kernel_ver}
|
||
linux /vmlinuz-{kernel_ver}
|
||
initrd /initramfs-{kernel_ver}.img
|
||
options root={root_device} ro crashkernel=auto resume=/dev/mapper/cl-swap rd.lvm.lv=cl/root rd.lvm.lv=cl/swap rhgb quiet
|
||
"""
|
||
|
||
entry_filename = f"{machine_id}-{kernel_ver}.conf"
|
||
entry_path = os.path.join(loader_dir, entry_filename)
|
||
|
||
try:
|
||
with open(entry_path, 'w') as f:
|
||
f.write(bls_content)
|
||
log_success(f"✓ 手动创建 BLS 条目: {entry_path}")
|
||
except Exception as e:
|
||
log_error(f"创建 BLS 条目失败: {e}")
|
||
return False, f"创建 BLS 条目失败: {e}"
|
||
|
||
# 方法3: 重新运行 grub2-mkconfig 确保 BLS 支持正确
|
||
log_info("重新生成 grub.cfg 以确保 BLS 支持...")
|
||
|
||
# 找到 grub2-mkconfig 或 grub-mkconfig
|
||
mkconfig_cmd = None
|
||
for cmd in ["/usr/sbin/grub2-mkconfig", "/sbin/grub2-mkconfig", "/usr/bin/grub2-mkconfig"]:
|
||
if os.path.exists(os.path.join(mount_point, cmd.lstrip('/'))):
|
||
mkconfig_cmd = cmd
|
||
break
|
||
|
||
if not mkconfig_cmd:
|
||
for cmd in ["/usr/sbin/grub-mkconfig", "/sbin/grub-mkconfig", "/usr/bin/grub-mkconfig"]:
|
||
if os.path.exists(os.path.join(mount_point, cmd.lstrip('/'))):
|
||
mkconfig_cmd = cmd
|
||
break
|
||
|
||
if mkconfig_cmd:
|
||
# 找到 grub.cfg 路径
|
||
grub_cfg = "/boot/grub2/grub.cfg"
|
||
for cfg_path in ["/boot/grub2/grub.cfg", "/boot/grub/grub.cfg"]:
|
||
if os.path.exists(os.path.join(mount_point, cfg_path.lstrip('/'))):
|
||
grub_cfg = cfg_path
|
||
break
|
||
|
||
success, _, stderr = run_command(
|
||
chroot_cmd_prefix + [mkconfig_cmd, "-o", grub_cfg],
|
||
"重新生成 grub.cfg",
|
||
timeout=60
|
||
)
|
||
|
||
if success:
|
||
log_success("✓ grub.cfg 重新生成成功")
|
||
else:
|
||
log_warning(f"grub.cfg 重新生成失败: {stderr}")
|
||
|
||
# 最终检查
|
||
try:
|
||
if os.path.exists(loader_dir):
|
||
entries = [f for f in os.listdir(loader_dir) if f.endswith('.conf')]
|
||
log_info(f"BLS 条目数量: {len(entries)}")
|
||
for entry in entries:
|
||
log_info(f" - {entry}")
|
||
|
||
if len(entries) > 0:
|
||
log_success("✓ BLS 配置恢复完成")
|
||
return True, ""
|
||
else:
|
||
log_warning("没有 BLS 条目生成")
|
||
return False, "BLS 条目生成失败"
|
||
else:
|
||
log_error("loader/entries 目录不存在")
|
||
return False, "BLS 目录不存在"
|
||
except Exception as e:
|
||
return False, f"检查 BLS 条目失败: {e}"
|
||
|
||
|
||
def chroot_and_repair_grub(mount_point: str, target_disk: str,
|
||
is_uefi: bool = False, distro_type: str = "unknown",
|
||
install_hybrid: bool = False,
|
||
use_fallback: bool = True,
|
||
efi_bootloader_id: str = "GRUB",
|
||
has_separate_boot: bool = False) -> Tuple[bool, str]:
|
||
"""
|
||
Chroot到目标系统并执行GRUB修复命令。
|
||
支持多种安装模式、架构、EFI fallback、Secure Boot 等。
|
||
|
||
参数:
|
||
mount_point: 挂载点路径
|
||
target_disk: 目标磁盘(BIOS模式)
|
||
is_uefi: 是否 UEFI 模式
|
||
distro_type: 发行版类型
|
||
install_hybrid: 是否同时安装 BIOS 和 EFI 模式(混合启动)
|
||
use_fallback: 是否安装 EFI fallback 文件
|
||
efi_bootloader_id: EFI 启动项名称
|
||
has_separate_boot: 是否有独立的 /boot 分区
|
||
"""
|
||
global _grub_install_cmd, _grub_mkconfig_cmd
|
||
|
||
log_step("修复 GRUB", f"目标磁盘: {target_disk}, UEFI: {is_uefi}, 发行版: {distro_type}")
|
||
|
||
if not is_uefi and not validate_device_path(target_disk):
|
||
log_error(f"无效的目标磁盘路径: {target_disk}")
|
||
return False, f"无效的目标磁盘路径: {target_disk}"
|
||
|
||
# 检查 chroot 环境(这会设置 _grub_install_cmd 和 _grub_mkconfig_cmd)
|
||
ok, err = check_chroot_environment(mount_point, distro_type)
|
||
if not ok:
|
||
return False, err
|
||
|
||
# 确保命令路径已设置
|
||
if not _grub_install_cmd or not _grub_mkconfig_cmd:
|
||
return False, "无法确定 GRUB 命令路径"
|
||
|
||
log_info(f"使用 grub-install: {_grub_install_cmd}")
|
||
log_info(f"使用 grub-mkconfig: {_grub_mkconfig_cmd}")
|
||
|
||
# 检查是否有独立的 /boot 分区
|
||
boot_mount_point = os.path.join(mount_point, "boot")
|
||
if os.path.ismount(boot_mount_point):
|
||
has_separate_boot = True
|
||
log_info("检测到独立的 /boot 分区")
|
||
|
||
# 检查并恢复内核文件
|
||
kernel_ok, kernel_err = check_and_restore_kernel(mount_point, distro_type, has_separate_boot)
|
||
if not kernel_ok:
|
||
log_error(f"内核恢复失败: {kernel_err}")
|
||
return False, f"内核恢复失败: {kernel_err}"
|
||
|
||
# 恢复 BLS 启动条目(CentOS/RHEL/Fedora 使用 BLS)
|
||
bls_ok, bls_err = restore_bls_entries(mount_point, distro_type)
|
||
if not bls_ok:
|
||
log_warning(f"BLS 恢复失败: {bls_err}")
|
||
# BLS 失败不终止,继续尝试传统菜单
|
||
|
||
chroot_cmd_prefix = ["sudo", "chroot", mount_point]
|
||
|
||
# 检测 Live 环境
|
||
is_live_env = is_live_environment()
|
||
if is_live_env:
|
||
log_info("检测到 Live 环境(无法访问 EFI 变量)")
|
||
|
||
# 获取 EFI 参数(如果需要)
|
||
efi_params = None
|
||
if is_uefi or install_hybrid:
|
||
efi_params = get_grub_efi_parameters()
|
||
if not efi_params:
|
||
return False, "无法确定 EFI 架构参数"
|
||
efi_target, efi_grub_file, efi_boot_file = efi_params
|
||
log_info(f"EFI 参数: target={efi_target}, grub={efi_grub_file}, boot={efi_boot_file}")
|
||
|
||
# 检查并安装 UEFI GRUB 模块
|
||
efi_modules_ok, efi_modules_err = check_and_install_efi_modules(mount_point, distro_type, True)
|
||
if not efi_modules_ok:
|
||
log_error(f"UEFI 模块检查失败: {efi_modules_err}")
|
||
return False, f"UEFI 模块检查失败: {efi_modules_err}"
|
||
|
||
grub_install_success = False
|
||
install_errors = []
|
||
|
||
# ===== UEFI 安装 =====
|
||
if is_uefi or install_hybrid:
|
||
log_step("安装 UEFI GRUB")
|
||
|
||
# 检查 EFI 分区是否正确挂载
|
||
efi_check_path = os.path.join(mount_point, "boot/efi/EFI")
|
||
if os.path.exists(efi_check_path):
|
||
log_info(f"✓ EFI 目录存在: {efi_check_path}")
|
||
try:
|
||
contents = os.listdir(efi_check_path)
|
||
log_info(f" 当前 EFI 目录内容: {contents}")
|
||
except Exception as e:
|
||
log_warning(f" 无法列出 EFI 目录: {e}")
|
||
else:
|
||
log_warning(f"✗ EFI 目录不存在: {efi_check_path}")
|
||
|
||
# ===== 清空并重建 GRUB 环境(针对严重损坏的系统)=====
|
||
log_step("清空并重建 GRUB 环境")
|
||
|
||
# 确定正确的 GRUB 目录名
|
||
if distro_type in ["centos", "rhel", "fedora", "rocky", "almalinux", "opensuse"]:
|
||
grub_dir_name = "grub2"
|
||
else:
|
||
grub_dir_name = "grub"
|
||
|
||
# 1. 清空 /boot/grub* 目录
|
||
boot_grub_paths = [
|
||
os.path.join(mount_point, "boot", "grub"),
|
||
os.path.join(mount_point, "boot", "grub2"),
|
||
]
|
||
for grub_path in boot_grub_paths:
|
||
if os.path.exists(grub_path):
|
||
log_info(f"清空目录: {grub_path}")
|
||
try:
|
||
# 删除目录内所有内容但保留目录本身
|
||
for item in os.listdir(grub_path):
|
||
item_path = os.path.join(grub_path, item)
|
||
if os.path.isfile(item_path) or os.path.islink(item_path):
|
||
os.unlink(item_path)
|
||
log_debug(f" 删除文件: {item}")
|
||
elif os.path.isdir(item_path):
|
||
import shutil
|
||
shutil.rmtree(item_path)
|
||
log_debug(f" 删除目录: {item}")
|
||
log_success(f"✓ 已清空: {grub_path}")
|
||
except Exception as e:
|
||
log_warning(f"清空失败: {e}")
|
||
|
||
# 2. 清空 EFI 分区中的 GRUB 相关目录
|
||
efi_cleanup_dirs = ["GRUB", "grub", "Boot", "boot", "centos", "redhat", "fedora"]
|
||
for efi_dir in efi_cleanup_dirs:
|
||
efi_dir_path = os.path.join(mount_point, "boot/efi/EFI", efi_dir)
|
||
if os.path.exists(efi_dir_path):
|
||
log_info(f"清空 EFI 目录: {efi_dir_path}")
|
||
try:
|
||
import shutil
|
||
shutil.rmtree(efi_dir_path)
|
||
log_success(f"✓ 已删除: {efi_dir_path}")
|
||
except Exception as e:
|
||
log_warning(f"删除失败: {e}")
|
||
|
||
# 3. 重建目录结构
|
||
log_info("重建 GRUB 目录结构...")
|
||
dirs_to_create = [
|
||
os.path.join(mount_point, "boot", grub_dir_name),
|
||
os.path.join(mount_point, "boot/efi/EFI", efi_bootloader_id),
|
||
os.path.join(mount_point, "boot/efi/EFI", "Boot"),
|
||
]
|
||
for dir_path in dirs_to_create:
|
||
try:
|
||
os.makedirs(dir_path, exist_ok=True)
|
||
log_info(f"✓ 创建目录: {dir_path}")
|
||
except Exception as e:
|
||
log_warning(f"创建目录失败: {e}")
|
||
|
||
# 4. 创建 grubenv 文件(普通文件,不是符号链接)
|
||
grubenv_path = os.path.join(mount_point, "boot", grub_dir_name, "grubenv")
|
||
try:
|
||
with open(grubenv_path, 'w') as f:
|
||
f.write("# GRUB Environment Block\n")
|
||
f.write("saved_entry=\n")
|
||
f.write("#" * 1024 + "\n")
|
||
log_success(f"✓ 创建 grubenv: {grubenv_path}")
|
||
except Exception as e:
|
||
log_warning(f"创建 grubenv 失败: {e}")
|
||
|
||
log_success("✓ GRUB 环境重建完成")
|
||
|
||
grub_install_cmd = chroot_cmd_prefix + [
|
||
_grub_install_cmd,
|
||
f"--target={efi_target}",
|
||
"--efi-directory=/boot/efi",
|
||
f"--bootloader-id={efi_bootloader_id}",
|
||
"--force"
|
||
]
|
||
|
||
# Live 环境下优先使用 --removable
|
||
if is_live_env:
|
||
log_info("Live 环境: 优先使用 --removable 模式")
|
||
removable_cmd = chroot_cmd_prefix + [
|
||
_grub_install_cmd,
|
||
f"--target={efi_target}",
|
||
"--efi-directory=/boot/efi",
|
||
"--removable",
|
||
"--force"
|
||
]
|
||
success, stdout, stderr = run_command(
|
||
removable_cmd,
|
||
"安装 UEFI GRUB (removable 模式)",
|
||
timeout=60
|
||
)
|
||
if success:
|
||
grub_install_success = True
|
||
log_success("✓ removable 模式安装成功")
|
||
# 同时安装标准模式作为备选
|
||
log_info("尝试安装标准模式作为备选...")
|
||
run_command(grub_install_cmd + ["--no-nvram"], "安装标准模式 (no-nvram)", timeout=60)
|
||
else:
|
||
log_warning(f"removable 模式失败,尝试标准模式")
|
||
|
||
|
||
install_errors.append(f"removable 模式: {stderr}")
|
||
|
||
if not grub_install_success:
|
||
# 第一次尝试: 标准安装
|
||
log_info("尝试 1/3: 标准 UEFI 安装...")
|
||
success, stdout, stderr = run_command(
|
||
grub_install_cmd,
|
||
"安装 UEFI GRUB(标准模式)",
|
||
timeout=60
|
||
)
|
||
|
||
if success:
|
||
grub_install_success = True
|
||
log_success("✓ 标准安装成功")
|
||
else:
|
||
install_errors.append(f"标准模式: {stderr}")
|
||
log_warning(f"标准安装失败: {stderr}")
|
||
|
||
# 第二次尝试: removable 模式
|
||
log_info("尝试 2/3: Live环境/可移动模式(--removable)...")
|
||
removable_cmd = chroot_cmd_prefix + [
|
||
_grub_install_cmd,
|
||
f"--target={efi_target}",
|
||
"--efi-directory=/boot/efi",
|
||
"--removable",
|
||
"--force"
|
||
]
|
||
success, stdout, stderr = run_command(
|
||
removable_cmd,
|
||
"安装 UEFI GRUB(可移动模式)",
|
||
timeout=60
|
||
)
|
||
|
||
if success:
|
||
grub_install_success = True
|
||
log_success("✓ 可移动模式安装成功")
|
||
log_info(f" GRUB 已安装到 /EFI/Boot/{efi_boot_file}")
|
||
else:
|
||
install_errors.append(f"removable 模式: {stderr}")
|
||
|
||
# 第三次尝试: no-nvram
|
||
log_info("尝试 3/3: 使用 --no-nvram 选项...")
|
||
success, stdout, stderr = run_command(
|
||
grub_install_cmd + ["--no-nvram"],
|
||
"安装 UEFI GRUB(不带NVRAM)",
|
||
timeout=60
|
||
)
|
||
|
||
if success:
|
||
grub_install_success = True
|
||
log_success("✓ no-nvram 模式安装成功")
|
||
else:
|
||
install_errors.append(f"no-nvram 模式: {stderr}")
|
||
|
||
# 如果所有 grub2-install 方法都失败,尝试手动复制 EFI 文件(针对 Secure Boot 问题)
|
||
if not grub_install_success and "Secure Boot" in str(install_errors):
|
||
log_info("grub2-install 不支持 Secure Boot,尝试手动安装 EFI 文件...")
|
||
success = _manual_install_efi_files(mount_point, efi_target, efi_grub_file, efi_boot_file, efi_bootloader_id)
|
||
if success:
|
||
grub_install_success = True
|
||
log_success("✓ 手动 EFI 文件安装成功")
|
||
else:
|
||
log_error("手动 EFI 文件安装失败")
|
||
|
||
# 安装 EFI fallback(如果启用)
|
||
if grub_install_success and use_fallback:
|
||
log_info("安装 EFI fallback 文件...")
|
||
install_efi_fallback(mount_point, efi_target, efi_grub_file, efi_boot_file, efi_bootloader_id)
|
||
|
||
# ===== BIOS 安装 =====
|
||
if not is_uefi or install_hybrid:
|
||
log_step("安装 BIOS GRUB")
|
||
|
||
bios_cmd = chroot_cmd_prefix + [
|
||
_grub_install_cmd,
|
||
"--target=i386-pc",
|
||
"--recheck",
|
||
"--force"
|
||
]
|
||
|
||
# 如果有独立的 /boot 分区,需要指定 --boot-directory
|
||
# 这样 GRUB 才能在启动时正确找到配置文件
|
||
if has_separate_boot:
|
||
log_info("独立 /boot 分区: 添加 --boot-directory=/boot 参数")
|
||
bios_cmd.append("--boot-directory=/boot")
|
||
|
||
bios_cmd.append(target_disk)
|
||
|
||
success, stdout, stderr = run_command(
|
||
bios_cmd,
|
||
f"安装 BIOS GRUB 到 {target_disk}",
|
||
timeout=60
|
||
)
|
||
|
||
if success:
|
||
log_success(f"✓ BIOS GRUB 安装成功到 {target_disk}")
|
||
if not is_uefi:
|
||
grub_install_success = True
|
||
else:
|
||
log_error(f"BIOS GRUB 安装失败: {stderr}")
|
||
if not is_uefi:
|
||
install_errors.append(f"BIOS 模式: {stderr}")
|
||
|
||
# 检查安装结果
|
||
if not grub_install_success:
|
||
error_summary = "\n".join(install_errors)
|
||
log_error(f"所有 GRUB 安装尝试均失败:\n{error_summary}")
|
||
return False, f"GRUB 安装失败:\n{error_summary}"
|
||
|
||
log_success("✓ GRUB 安装阶段完成")
|
||
|
||
# 检查生成的 EFI 文件
|
||
if is_uefi or install_hybrid:
|
||
efi_paths_to_check = [
|
||
os.path.join(mount_point, "boot/efi/EFI", efi_bootloader_id),
|
||
os.path.join(mount_point, "boot/efi/EFI", efi_bootloader_id.lower()),
|
||
os.path.join(mount_point, "boot/efi/EFI/Boot"),
|
||
os.path.join(mount_point, "boot/efi/efi", efi_bootloader_id.lower()),
|
||
]
|
||
|
||
found_efi_file = False
|
||
for path in efi_paths_to_check:
|
||
if os.path.exists(path):
|
||
log_info(f" 检查目录: {path}")
|
||
try:
|
||
files = os.listdir(path)
|
||
for f in files:
|
||
if f.endswith('.efi'):
|
||
full = os.path.join(path, f)
|
||
size = os.path.getsize(full)
|
||
log_info(f" ✓ EFI文件: {f} ({size} bytes)")
|
||
found_efi_file = True
|
||
except Exception as e:
|
||
log_warning(f" 无法列出: {e}")
|
||
|
||
if not found_efi_file:
|
||
log_warning(f" 未找到任何 .efi 文件,安装可能有问题")
|
||
|
||
# ===== 更新 GRUB 配置 =====
|
||
log_step("更新 GRUB 配置文件")
|
||
|
||
# 确定配置文件路径
|
||
# 根据发行版类型确定正确的 GRUB 目录名
|
||
if distro_type in ["centos", "rhel", "fedora", "rocky", "almalinux", "opensuse"]:
|
||
# RHEL 系使用 grub2
|
||
preferred_grub_dir = "grub2"
|
||
fallback_grub_dir = "grub"
|
||
else:
|
||
# Debian/Ubuntu/Arch 等使用 grub
|
||
preferred_grub_dir = "grub"
|
||
fallback_grub_dir = "grub2"
|
||
|
||
possible_config_paths = [
|
||
f"/boot/{preferred_grub_dir}/grub.cfg",
|
||
f"/boot/{fallback_grub_dir}/grub.cfg",
|
||
]
|
||
|
||
config_path = ""
|
||
for cfg_path in possible_config_paths:
|
||
full_path = os.path.join(mount_point, cfg_path.lstrip('/'))
|
||
# 优先检查目录是否存在,但也考虑发行版偏好
|
||
if os.path.exists(os.path.dirname(full_path)):
|
||
config_path = cfg_path
|
||
break
|
||
|
||
# 如果没有找到,使用发行版偏好路径
|
||
if not config_path:
|
||
config_path = f"/boot/{preferred_grub_dir}/grub.cfg"
|
||
|
||
# 确定更新命令
|
||
if distro_type in ["debian", "ubuntu"]:
|
||
# Debian/Ubuntu 使用 update-grub 包装脚本
|
||
success, _, _ = run_command(chroot_cmd_prefix + ["which", "update-grub"], "检查 update-grub", timeout=5)
|
||
if success:
|
||
grub_update_cmd = ["update-grub"]
|
||
else:
|
||
grub_update_cmd = [_grub_mkconfig_cmd, "-o", config_path]
|
||
else:
|
||
grub_update_cmd = [_grub_mkconfig_cmd, "-o", config_path]
|
||
|
||
log_info(f"使用命令: {' '.join(grub_update_cmd)}")
|
||
log_info(f"配置文件路径: {config_path}")
|
||
|
||
# ===== 确保 grubenv 文件存在 =====
|
||
# CentOS/RHEL/Fedora 等系统需要 grubenv 文件,否则 grub2-mkconfig 会失败
|
||
grubenv_dir = os.path.dirname(config_path) # /boot/grub2 或 /boot/grub
|
||
grubenv_path = os.path.join(mount_point, grubenv_dir.lstrip('/'), "grubenv")
|
||
|
||
if not os.path.exists(grubenv_path):
|
||
log_info(f"grubenv 文件不存在,尝试创建...")
|
||
|
||
# 首先确保目录存在(在 chroot 环境中创建)
|
||
grubenv_dir_in_chroot = os.path.dirname(config_path) # /boot/grub2 或 /boot/grub
|
||
success, _, _ = run_command(
|
||
chroot_cmd_prefix + ["mkdir", "-p", grubenv_dir_in_chroot],
|
||
"创建 GRUB 配置目录",
|
||
timeout=5
|
||
)
|
||
if success:
|
||
log_info(f"✓ 创建目录: {grubenv_dir_in_chroot}")
|
||
|
||
# 检查 /boot 是否已挂载(在 chroot 内)
|
||
success, stdout, _ = run_command(
|
||
chroot_cmd_prefix + ["mount"],
|
||
"检查挂载状态",
|
||
timeout=5
|
||
)
|
||
if success:
|
||
boot_mounts = [line for line in stdout.split('\n') if 'boot' in line.lower()]
|
||
if boot_mounts:
|
||
log_info(f"/boot 挂载状态: {', '.join(boot_mounts)}")
|
||
else:
|
||
log_info("/boot 挂载状态: 未找到相关挂载")
|
||
else:
|
||
log_info("/boot 挂载状态: 检查失败")
|
||
|
||
# 方法1: 使用 grub2-editenv 创建
|
||
editenv_cmd = None
|
||
for cmd_name in ["grub2-editenv", "grub-editenv"]:
|
||
for cmd_dir in ["/usr/bin", "/bin", "/usr/sbin", "/sbin"]:
|
||
test_path = os.path.join(mount_point, cmd_dir.lstrip('/'), cmd_name)
|
||
if os.path.exists(test_path):
|
||
editenv_cmd = os.path.join(cmd_dir, cmd_name)
|
||
break
|
||
if editenv_cmd:
|
||
break
|
||
|
||
if editenv_cmd:
|
||
success, _, stderr = run_command(
|
||
chroot_cmd_prefix + [editenv_cmd, grubenv_dir_in_chroot + "/grubenv", "create"],
|
||
"创建 grubenv 文件",
|
||
timeout=10
|
||
)
|
||
# 重新检查文件是否存在(可能在不同路径)
|
||
if success:
|
||
log_success(f"✓ 成功创建 grubenv 文件")
|
||
else:
|
||
log_warning(f"grub2-editenv 创建失败: {stderr}")
|
||
|
||
# 方法2: 如果 /boot 是独立分区,尝试在挂载的主机路径创建
|
||
if not os.path.exists(grubenv_path):
|
||
# 根据发行版确定正确的目录名 (grub2 vs grub)
|
||
if distro_type in ["centos", "rhel", "fedora", "rocky", "almalinux", "opensuse"]:
|
||
grub_dir_name = "grub2"
|
||
else:
|
||
grub_dir_name = "grub"
|
||
|
||
# 首先确保首选目录存在(使用 shell 命令,因为 /boot 可能是独立分区)
|
||
preferred_dir = os.path.dirname(grubenv_path) # 正确路径的目录
|
||
|
||
# 使用 mkdir 命令创建目录(在 chroot 内,确保在正确的文件系统中)
|
||
grubenv_dir_in_chroot = os.path.dirname(config_path) # /boot/grub2 或 /boot/grub
|
||
success, _, _ = run_command(
|
||
chroot_cmd_prefix + ["mkdir", "-p", grubenv_dir_in_chroot],
|
||
f"确保 {grubenv_dir_in_chroot} 目录存在",
|
||
timeout=5
|
||
)
|
||
if success:
|
||
log_info(f"✓ 确保目录存在: {grubenv_dir_in_chroot}")
|
||
|
||
# 再次检查 grubenv_path(主机路径)
|
||
log_debug(f"检查 grubenv 路径: {grubenv_path}, 存在: {os.path.exists(grubenv_path)}")
|
||
log_debug(f"检查目录存在: {preferred_dir}, 存在: {os.path.exists(preferred_dir)}")
|
||
|
||
# 尝试多种可能的路径,检查是否已有 grubenv 在错误位置
|
||
possible_wrong_paths = [
|
||
os.path.join(mount_point, "boot", "grub" if grub_dir_name == "grub2" else "grub2", "grubenv"),
|
||
os.path.join(mount_point, "grub" if grub_dir_name == "grub2" else "grub2", "grubenv"),
|
||
]
|
||
|
||
wrong_path_found = None
|
||
for wrong_path in possible_wrong_paths:
|
||
if os.path.exists(wrong_path):
|
||
wrong_path_found = wrong_path
|
||
log_info(f"发现 grubenv 在错误位置: {wrong_path}")
|
||
break
|
||
|
||
# 如果找到错误位置的 grubenv,复制到正确位置
|
||
if wrong_path_found and not os.path.exists(grubenv_path):
|
||
try:
|
||
import shutil
|
||
# 确保目标目录存在(在主机上)
|
||
os.makedirs(os.path.dirname(grubenv_path), exist_ok=True)
|
||
shutil.copy2(wrong_path_found, grubenv_path)
|
||
log_success(f"✓ 复制 grubenv 到正确位置: {grubenv_path}")
|
||
except Exception as e:
|
||
log_warning(f"复制失败: {e}")
|
||
|
||
# 检查是否是符号链接(CentOS 8 等系统的 grubenv 可能是指向 EFI 分区的符号链接)
|
||
if not os.path.exists(grubenv_path) and os.path.islink(grubenv_path):
|
||
log_info(f"检测到 grubenv 是符号链接,尝试创建链接目标...")
|
||
try:
|
||
link_target = os.readlink(grubenv_path)
|
||
log_info(f" 链接目标: {link_target}")
|
||
|
||
# 解析链接目标为绝对路径
|
||
if link_target.startswith('/'):
|
||
# 绝对路径
|
||
target_path = os.path.join(mount_point, link_target.lstrip('/'))
|
||
else:
|
||
# 相对路径(如 ../efi/EFI/centos/grubenv)
|
||
link_dir = os.path.dirname(grubenv_path)
|
||
target_path = os.path.normpath(os.path.join(link_dir, link_target))
|
||
|
||
log_info(f" 解析后的目标路径: {target_path}")
|
||
|
||
# 创建目标目录
|
||
target_dir = os.path.dirname(target_path)
|
||
if not os.path.exists(target_dir):
|
||
os.makedirs(target_dir, exist_ok=True)
|
||
log_info(f"✓ 创建目录: {target_dir}")
|
||
|
||
# 创建目标文件(1024 字节的 GRUB 环境块)
|
||
grubenv_content = "# GRUB Environment Block\nsaved_entry=\n" + "#" * 1024 + "\n"
|
||
with open(target_path, 'w') as f:
|
||
f.write(grubenv_content)
|
||
log_success(f"✓ 创建符号链接目标文件: {target_path}")
|
||
|
||
except Exception as e:
|
||
log_warning(f"创建符号链接目标失败: {e}")
|
||
# 如果创建符号链接目标失败,删除损坏的符号链接并创建普通文件
|
||
try:
|
||
os.unlink(grubenv_path)
|
||
log_info(f" 已删除损坏的符号链接: {grubenv_path}")
|
||
except Exception as e2:
|
||
log_warning(f" 删除符号链接失败: {e2}")
|
||
|
||
# 如果仍未创建成功,使用 shell 命令在 chroot 内创建
|
||
if not os.path.exists(grubenv_path):
|
||
log_info(f"尝试使用 shell 命令创建 grubenv 文件...")
|
||
|
||
# 方法: 使用 echo 和 tee 在 chroot 内创建文件
|
||
grubenv_chroot_path = grubenv_dir_in_chroot + "/grubenv"
|
||
grubenv_content = "# GRUB Environment Block\nsaved_entry=\n" + "#" * 1024 + "\n"
|
||
|
||
# 使用 printf 创建文件(更可靠)
|
||
success, _, stderr = run_command(
|
||
chroot_cmd_prefix + ["sh", "-c", f'printf "%s" "{grubenv_content}" > "{grubenv_chroot_path}"'],
|
||
"使用 shell 创建 grubenv 文件",
|
||
timeout=5
|
||
)
|
||
|
||
if success or os.path.exists(grubenv_path):
|
||
log_success(f"✓ 手动创建 grubenv 文件成功: {grubenv_path}")
|
||
else:
|
||
log_warning(f"Shell 创建失败: {stderr}")
|
||
|
||
# 最后尝试: 使用 touch 创建空文件
|
||
success, _, _ = run_command(
|
||
chroot_cmd_prefix + ["touch", grubenv_chroot_path],
|
||
"创建空 grubenv 文件",
|
||
timeout=5
|
||
)
|
||
if success or os.path.exists(grubenv_path):
|
||
log_success(f"✓ 创建空 grubenv 文件成功: {grubenv_path}")
|
||
else:
|
||
log_warning(f"所有方法创建 grubenv 文件均失败")
|
||
|
||
# 执行配置更新
|
||
success, stdout, stderr = run_command(
|
||
chroot_cmd_prefix + grub_update_cmd,
|
||
"更新GRUB配置文件",
|
||
timeout=120
|
||
)
|
||
|
||
if not success:
|
||
log_error(f"GRUB 配置文件更新失败: {stderr}")
|
||
return False, f"GRUB配置文件更新失败: {stderr}"
|
||
|
||
log_success("✓ GRUB 配置文件更新成功")
|
||
|
||
# 检查生成的配置文件
|
||
full_config_path = os.path.join(mount_point, config_path.lstrip('/'))
|
||
if os.path.exists(full_config_path):
|
||
size = os.path.getsize(full_config_path)
|
||
log_info(f" 配置文件大小: {size} bytes")
|
||
try:
|
||
with open(full_config_path, 'r') as f:
|
||
content = f.read()
|
||
menu_entries = content.count('menuentry')
|
||
log_info(f" 发现 {menu_entries} 个启动菜单项")
|
||
except Exception as e:
|
||
log_warning(f" 无法读取配置文件: {e}")
|
||
else:
|
||
log_warning(f" 配置文件未找到: {full_config_path}")
|
||
|
||
# ===== 输出启动提示 =====
|
||
if is_uefi or install_hybrid:
|
||
log_step("UEFI 修复完成提示")
|
||
log_info("GRUB EFI 文件已安装到 EFI 分区")
|
||
|
||
# 检测安装模式并给出相应提示
|
||
efi_boot_path = os.path.join(mount_point, f"boot/efi/EFI/Boot/{efi_boot_file if efi_params else 'bootx64.efi'}")
|
||
efi_grub_path = os.path.join(mount_point, f"boot/efi/EFI/{efi_bootloader_id}/{efi_grub_file if efi_params else 'grubx64.efi'}")
|
||
|
||
if os.path.exists(efi_boot_path):
|
||
log_info("")
|
||
log_info("【重要】使用可移动模式安装 (EFI Fallback)")
|
||
log_info("启动方法:")
|
||
log_info(" 1. 在BIOS中选择 'UEFI Hard Drive' 或 'UEFI OS' 启动")
|
||
log_info(" 2. 如果有多块硬盘,选择正确的硬盘")
|
||
log_info(" 3. 确保启动模式为 UEFI(不是 Legacy/CSM)")
|
||
elif os.path.exists(efi_grub_path):
|
||
log_info("")
|
||
log_info(f"【重要】使用标准模式安装 (/{efi_bootloader_id}/{efi_grub_file if efi_params else 'grubx64.efi'})")
|
||
log_info("启动方法:")
|
||
log_info(f" 1. 在BIOS启动菜单中选择 '{efi_bootloader_id}' 或 'Linux' 启动项")
|
||
log_info(" 2. 如果没有该选项,需要手动添加启动项")
|
||
log_info(f" 路径: \\EFI\\{efi_bootloader_id}\\{efi_grub_file if efi_params else 'grubx64.efi'}")
|
||
|
||
log_info("")
|
||
log_info("故障排除:")
|
||
log_info(" • 如果仍无法启动,请检查:")
|
||
log_info(" - BIOS 是否设置为 UEFI 启动模式(非 Legacy/CSM)")
|
||
log_info(" - 安全启动 (Secure Boot) 是否已禁用")
|
||
log_info(" - EFI 分区格式是否为 FAT32")
|
||
|
||
return True, ""
|
||
|
||
|
||
def unmount_target_system(mount_point: str) -> Tuple[bool, str]:
|
||
"""
|
||
卸载所有挂载的分区。
|
||
确保按正确顺序卸载,避免资源忙错误。
|
||
"""
|
||
log_step("卸载目标系统", f"挂载点: {mount_point}")
|
||
|
||
success = True
|
||
error_msg = ""
|
||
|
||
# 按逆序卸载绑定挂载
|
||
bind_paths = ["/run", "/sys", "/proc", "/dev/pts", "/dev"]
|
||
for path in bind_paths:
|
||
target_path = os.path.join(mount_point, path.lstrip('/'))
|
||
if os.path.ismount(target_path):
|
||
# 尝试多次卸载(有时需要重试)
|
||
for attempt in range(3):
|
||
s, _, stderr = run_command(["sudo", "umount", target_path], f"卸载绑定 {target_path}")
|
||
if s:
|
||
break
|
||
time.sleep(0.5)
|
||
if not s:
|
||
success = False
|
||
error_msg += f"卸载绑定 {target_path} 失败: {stderr}\n"
|
||
else:
|
||
log_debug(f"未挂载,跳过: {target_path}")
|
||
|
||
# 卸载 EFI 分区
|
||
efi_mount_point = os.path.join(mount_point, "boot/efi")
|
||
if os.path.ismount(efi_mount_point):
|
||
s, _, stderr = run_command(["sudo", "umount", efi_mount_point], f"卸载 EFI 分区")
|
||
if not s:
|
||
success = False
|
||
error_msg += f"卸载 EFI 分区失败: {stderr}\n"
|
||
else:
|
||
log_debug(f"EFI 分区未挂载,跳过")
|
||
|
||
# 卸载 /boot 分区
|
||
boot_mount_point = os.path.join(mount_point, "boot")
|
||
if os.path.ismount(boot_mount_point):
|
||
s, _, stderr = run_command(["sudo", "umount", boot_mount_point], f"卸载 /boot 分区")
|
||
if not s:
|
||
success = False
|
||
error_msg += f"卸载 /boot 分区失败: {stderr}\n"
|
||
else:
|
||
log_debug(f"/boot 分区未挂载,跳过")
|
||
|
||
# 卸载根分区
|
||
if os.path.ismount(mount_point):
|
||
s, _, stderr = run_command(["sudo", "umount", mount_point], f"卸载根分区")
|
||
if not s:
|
||
success = False
|
||
error_msg += f"卸载根分区失败: {stderr}\n"
|
||
else:
|
||
log_debug(f"根分区未挂载,跳过")
|
||
|
||
# 清理临时目录
|
||
if os.path.exists(mount_point) and not os.path.ismount(mount_point):
|
||
try:
|
||
shutil.rmtree(mount_point)
|
||
log_success(f"✓ 清理临时目录: {mount_point}")
|
||
except OSError as e:
|
||
log_warning(f"无法删除临时目录 {mount_point}: {e}")
|
||
error_msg += f"无法删除临时目录: {e}\n"
|
||
|
||
if success:
|
||
log_success("✓ 所有分区已卸载")
|
||
else:
|
||
log_warning("部分分区卸载失败")
|
||
|
||
return success, error_msg
|
||
|
||
|
||
# ===== 高级功能函数 =====
|
||
|
||
def detect_existing_grub_entries(mount_point: str) -> List[Dict]:
|
||
"""
|
||
检测 EFI 分区中已有的 GRUB 启动项。
|
||
返回发现的 EFI 启动项列表。
|
||
"""
|
||
entries = []
|
||
efi_path = os.path.join(mount_point, "boot/efi/EFI")
|
||
|
||
if not os.path.exists(efi_path):
|
||
return entries
|
||
|
||
try:
|
||
for item in os.listdir(efi_path):
|
||
item_path = os.path.join(efi_path, item)
|
||
if os.path.isdir(item_path):
|
||
efi_files = [f for f in os.listdir(item_path) if f.endswith('.efi')]
|
||
if efi_files:
|
||
entries.append({
|
||
"name": item,
|
||
"path": item_path,
|
||
"files": efi_files
|
||
})
|
||
except Exception as e:
|
||
log_debug(f"检测现有 GRUB 条目失败: {e}")
|
||
|
||
return entries
|
||
|
||
|
||
def install_refind(mount_point: str) -> Tuple[bool, str]:
|
||
"""
|
||
安装 rEFInd 作为备用引导管理器。
|
||
需要目标系统中已安装 rEFInd 包。
|
||
"""
|
||
log_step("安装 rEFInd")
|
||
|
||
chroot_cmd_prefix = ["sudo", "chroot", mount_point]
|
||
|
||
# 检查 refind-install 是否存在
|
||
success, _, _ = run_command(
|
||
chroot_cmd_prefix + ["which", "refind-install"],
|
||
"检查 refind-install",
|
||
timeout=10
|
||
)
|
||
|
||
if not success:
|
||
return False, "未找到 refind-install,请确保已安装 rEFInd 包"
|
||
|
||
# 运行 refind-install
|
||
success, stdout, stderr = run_command(
|
||
chroot_cmd_prefix + ["refind-install"],
|
||
"安装 rEFInd",
|
||
timeout=60
|
||
)
|
||
|
||
if success:
|
||
log_success("✓ rEFInd 安装成功")
|
||
return True, ""
|
||
else:
|
||
log_error(f"rEFInd 安装失败: {stderr}")
|
||
return False, f"rEFInd 安装失败: {stderr}"
|
||
|
||
|
||
def update_firmware_boot_order(mount_point: str, efi_bootloader_id: str = "GRUB") -> bool:
|
||
"""
|
||
使用 efibootmgr 更新固件启动顺序。
|
||
注意:在 Live 环境中可能无法工作。
|
||
"""
|
||
if is_live_environment():
|
||
log_warning("Live 环境无法更新固件启动顺序")
|
||
return False
|
||
|
||
log_step("更新固件启动顺序")
|
||
|
||
# 查找 EFI 分区
|
||
efi_partition = None
|
||
for line in open("/proc/mounts"):
|
||
if mount_point in line and "/boot/efi" in line:
|
||
parts = line.split()
|
||
efi_partition = parts[0]
|
||
break
|
||
|
||
if not efi_partition:
|
||
log_warning("无法找到 EFI 分区")
|
||
return False
|
||
|
||
# 使用 efibootmgr 创建启动项
|
||
success, _, stderr = run_command(
|
||
["efibootmgr", "-c", "-L", efi_bootloader_id,
|
||
"-l", f"\\EFI\\{efi_bootloader_id}\\grubx64.efi"],
|
||
"创建 EFI 启动项",
|
||
timeout=10
|
||
)
|
||
|
||
if success:
|
||
log_success("✓ 固件启动顺序更新成功")
|
||
return True
|
||
else:
|
||
log_warning(f"更新固件启动顺序失败: {stderr}")
|
||
return False
|
||
|
||
|
||
if __name__ == "__main__":
|
||
print("--- 运行后端测试 ---")
|
||
|
||
print("\n--- 系统信息检测 ---")
|
||
print(f"EFI 位数: {get_efi_word_size()}")
|
||
print(f"CPU 架构: {platform.machine()}")
|
||
print(f"Live 环境: {is_live_environment()}")
|
||
|
||
sb_supported, sb_enabled = check_secure_boot_status()
|
||
print(f"Secure Boot 支持: {sb_supported}, 启用: {sb_enabled}")
|
||
|
||
efi_params = get_grub_efi_parameters()
|
||
if efi_params:
|
||
print(f"GRUB EFI 参数: {efi_params}")
|
||
|
||
print("\n--- 扫描分区测试 ---")
|
||
success, disks, partitions, efi_partitions, err = scan_partitions()
|
||
if success:
|
||
print("磁盘:", [d['name'] for d in disks])
|
||
print("分区:", [p['name'] for p in partitions])
|
||
print("EFI分区:", [p['name'] for p in efi_partitions])
|
||
else:
|
||
print("扫描分区失败:", err)
|