1686 lines
62 KiB
Python
1686 lines
62 KiB
Python
# backend.py
|
||
# 参考 Calamares 引导安装模块优化
|
||
# 支持多架构、EFI fallback、Secure Boot、特殊文件系统等
|
||
|
||
import subprocess
|
||
import os
|
||
import json
|
||
import time
|
||
import re
|
||
import shutil
|
||
import glob
|
||
import platform
|
||
from typing import Tuple, List, Dict, Optional, Callable
|
||
from enum import Enum
|
||
|
||
# 常量配置
|
||
COMMAND_TIMEOUT = 300 # 命令执行超时时间(秒)
|
||
DEFAULT_GRUB_CONFIG_PATHS = [
|
||
"/boot/grub/grub.cfg",
|
||
"/boot/grub2/grub.cfg",
|
||
]
|
||
|
||
# EFI 架构参数映射 (参考 Calamares)
|
||
EFI_ARCH_PARAMETERS = {
|
||
"32": {
|
||
"target": "i386-efi",
|
||
"grub_file": "grubia32.efi",
|
||
"boot_file": "bootia32.efi",
|
||
"shim_file": "shimia32.efi"
|
||
},
|
||
"64": {
|
||
"x86_64": {
|
||
"target": "x86_64-efi",
|
||
"grub_file": "grubx64.efi",
|
||
"boot_file": "bootx64.efi",
|
||
"shim_file": "shimx64.efi"
|
||
},
|
||
"aarch64": {
|
||
"target": "arm64-efi",
|
||
"grub_file": "grubaa64.efi",
|
||
"boot_file": "bootaa64.efi",
|
||
"shim_file": "shimaa64.efi"
|
||
},
|
||
"loongarch64": {
|
||
"target": "loongarch64-efi",
|
||
"grub_file": "grubloongarch64.efi",
|
||
"boot_file": "bootloongarch64.efi",
|
||
"shim_file": "shimloongarch64.efi"
|
||
}
|
||
}
|
||
}
|
||
|
||
|
||
class LogLevel(Enum):
|
||
"""日志级别"""
|
||
DEBUG = "debug"
|
||
INFO = "info"
|
||
WARNING = "warning"
|
||
ERROR = "error"
|
||
SUCCESS = "success"
|
||
STEP = "step"
|
||
|
||
|
||
# 全局日志回调函数
|
||
_log_callback: Optional[Callable[[str, LogLevel], None]] = None
|
||
|
||
|
||
def set_log_callback(callback: Callable[[str, LogLevel], None]):
|
||
"""设置日志回调函数,用于前端显示"""
|
||
global _log_callback
|
||
_log_callback = callback
|
||
|
||
|
||
def _log(message: str, level: LogLevel = LogLevel.INFO):
|
||
"""内部日志函数"""
|
||
# 同时输出到控制台和回调
|
||
prefix = {
|
||
LogLevel.DEBUG: "[DEBUG]",
|
||
LogLevel.INFO: "[INFO]",
|
||
LogLevel.WARNING: "[WARN]",
|
||
LogLevel.ERROR: "[ERROR]",
|
||
LogLevel.SUCCESS: "[SUCCESS]",
|
||
LogLevel.STEP: "[STEP]"
|
||
}.get(level, "[INFO]")
|
||
|
||
print(f"{prefix} {message}")
|
||
|
||
if _log_callback:
|
||
try:
|
||
_log_callback(message, level)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def log_step(step_name: str, detail: str = ""):
|
||
"""输出步骤日志"""
|
||
separator = "=" * 60
|
||
_log(f"{separator}", LogLevel.STEP)
|
||
_log(f"{step_name}", LogLevel.STEP)
|
||
if detail:
|
||
_log(f"详情: {detail}", LogLevel.STEP)
|
||
_log(f"{separator}", LogLevel.STEP)
|
||
|
||
|
||
def log_info(msg: str):
|
||
"""输出信息日志"""
|
||
_log(msg, LogLevel.INFO)
|
||
|
||
|
||
def log_warning(msg: str):
|
||
"""输出警告日志"""
|
||
_log(msg, LogLevel.WARNING)
|
||
|
||
|
||
def log_error(msg: str):
|
||
"""输出错误日志"""
|
||
_log(msg, LogLevel.ERROR)
|
||
|
||
|
||
def log_debug(msg: str):
|
||
"""输出调试日志"""
|
||
_log(msg, LogLevel.DEBUG)
|
||
|
||
|
||
def log_success(msg: str):
|
||
"""输出成功日志"""
|
||
_log(msg, LogLevel.SUCCESS)
|
||
|
||
|
||
def validate_device_path(path: str) -> bool:
|
||
"""
|
||
验证设备路径格式是否合法(防止命令注入)。
|
||
支持标准分区、LVM、LUKS、RAID、ZFS等设备。
|
||
"""
|
||
if not path:
|
||
return False
|
||
patterns = [
|
||
r'^/dev/[a-zA-Z0-9_-]+$',
|
||
r'^/dev/mapper/[a-zA-Z0-9_-]+$',
|
||
r'^/dev/mapper/[a-zA-Z0-9_]+-[a-zA-Z0-9_-]+$',
|
||
r'^/dev/md[0-9]+$',
|
||
r'^/dev/nvme[0-9]+n[0-9]+(p[0-9]+)?$',
|
||
r'^/dev/mmcblk[0-9]+(p[0-9]+)?$',
|
||
r'^/dev/zd[0-9]+$',
|
||
]
|
||
return any(re.match(pattern, path) is not None for pattern in patterns)
|
||
|
||
|
||
def run_command(command: List[str], description: str = "执行命令",
|
||
cwd: Optional[str] = None, shell: bool = False,
|
||
timeout: int = COMMAND_TIMEOUT,
|
||
env: Optional[Dict[str, str]] = None) -> Tuple[bool, str, str]:
|
||
"""
|
||
运行一个系统命令,并捕获其输出和错误。
|
||
支持环境变量设置(用于ZFS等特殊文件系统)。
|
||
"""
|
||
cmd_str = ' '.join(command)
|
||
log_info(f"执行命令: {cmd_str}")
|
||
log_debug(f"工作目录: {cwd or '当前目录'}, 超时: {timeout}秒")
|
||
|
||
# 准备环境变量
|
||
run_env = None
|
||
if env:
|
||
run_env = os.environ.copy()
|
||
run_env.update(env)
|
||
log_debug(f"环境变量: {env}")
|
||
|
||
try:
|
||
result = subprocess.run(
|
||
command,
|
||
capture_output=True,
|
||
text=True,
|
||
check=True,
|
||
cwd=cwd,
|
||
shell=shell,
|
||
timeout=timeout,
|
||
env=run_env
|
||
)
|
||
if result.stdout:
|
||
log_debug(f"stdout: {result.stdout[:500]}")
|
||
if result.stderr:
|
||
log_debug(f"stderr: {result.stderr[:500]}")
|
||
log_success(f"✓ 命令成功: {description}")
|
||
return True, result.stdout, result.stderr
|
||
except subprocess.CalledProcessError as e:
|
||
log_error(f"✗ 命令失败: {description}")
|
||
log_error(f"返回码: {e.returncode}")
|
||
log_error(f"stdout: {e.stdout}")
|
||
log_error(f"stderr: {e.stderr}")
|
||
return False, e.stdout, e.stderr
|
||
except subprocess.TimeoutExpired:
|
||
log_error(f"✗ 命令超时(超过 {timeout} 秒): {description}")
|
||
return False, "", f"命令执行超时"
|
||
except FileNotFoundError:
|
||
log_error(f"✗ 命令未找到: {command[0]}")
|
||
return False, "", f"命令未找到: {command[0]}"
|
||
except Exception as e:
|
||
log_error(f"✗ 发生未知错误: {e}")
|
||
return False, "", str(e)
|
||
|
||
|
||
def get_efi_word_size() -> str:
|
||
"""
|
||
检测 EFI 位数(32或64位)。
|
||
通过读取 /sys/firmware/efi/fw_platform_size 获取。
|
||
如果内核较旧不支持,默认假设为 64 位。
|
||
"""
|
||
# 首先检查是否是 EFI 系统
|
||
if not os.path.exists("/sys/firmware/efi"):
|
||
log_debug("非 EFI 系统,返回 64 位作为默认值")
|
||
return "64"
|
||
|
||
try:
|
||
# 尝试读取新内核的 fw_platform_size
|
||
if os.path.exists("/sys/firmware/efi/fw_platform_size"):
|
||
with open("/sys/firmware/efi/fw_platform_size", "r") as f:
|
||
efi_bitness = f.read(2).strip()
|
||
if efi_bitness in ["32", "64"]:
|
||
log_debug(f"检测到 EFI 位数: {efi_bitness} 位")
|
||
return efi_bitness
|
||
|
||
# 旧内核:检查是否存在 64 位 EFI 相关文件
|
||
# 如果存在 /sys/firmware/efi/vars 或 /sys/firmware/efi/efivars,假设为 64 位
|
||
if os.path.exists("/sys/firmware/efi/vars") or os.path.exists("/sys/firmware/efi/efivars"):
|
||
log_debug("检测到 EFI 环境,假设为 64 位(旧内核)")
|
||
return "64"
|
||
|
||
except FileNotFoundError:
|
||
log_debug("无法读取 EFI 位数文件,假设为 64 位")
|
||
except Exception as e:
|
||
log_warning(f"读取 EFI 位数时出错: {e}")
|
||
|
||
return "64"
|
||
|
||
|
||
def get_grub_efi_parameters() -> Optional[Tuple[str, str, str]]:
|
||
"""
|
||
获取 GRUB EFI 安装参数。
|
||
返回: (target_name, grub.efi_name, boot.efi_name)
|
||
参考 Calamares 的实现。
|
||
"""
|
||
efi_bitness = get_efi_word_size()
|
||
cpu_type = platform.machine()
|
||
|
||
log_debug(f"系统架构: {cpu_type}, EFI位数: {efi_bitness}")
|
||
|
||
if efi_bitness == "32":
|
||
# 假设所有 32 位都是传统 x86
|
||
params = EFI_ARCH_PARAMETERS["32"]
|
||
return params["target"], params["grub_file"], params["boot_file"]
|
||
elif efi_bitness == "64" and cpu_type == "aarch64":
|
||
params = EFI_ARCH_PARAMETERS["64"]["aarch64"]
|
||
return params["target"], params["grub_file"], params["boot_file"]
|
||
elif efi_bitness == "64" and cpu_type == "loongarch64":
|
||
params = EFI_ARCH_PARAMETERS["64"]["loongarch64"]
|
||
return params["target"], params["grub_file"], params["boot_file"]
|
||
elif efi_bitness == "64":
|
||
# 如果不是 ARM,则为 AMD64
|
||
params = EFI_ARCH_PARAMETERS["64"]["x86_64"]
|
||
return params["target"], params["grub_file"], params["boot_file"]
|
||
|
||
log_warning(f"无法确定 GRUB EFI 参数: bits={efi_bitness}, cpu={cpu_type}")
|
||
return None
|
||
|
||
|
||
def get_shim_filename() -> Optional[str]:
|
||
"""获取当前架构的 shim 文件名"""
|
||
efi_bitness = get_efi_word_size()
|
||
cpu_type = platform.machine()
|
||
|
||
if efi_bitness == "32":
|
||
return EFI_ARCH_PARAMETERS["32"]["shim_file"]
|
||
elif efi_bitness == "64" and cpu_type == "aarch64":
|
||
return EFI_ARCH_PARAMETERS["64"]["aarch64"]["shim_file"]
|
||
elif efi_bitness == "64" and cpu_type == "loongarch64":
|
||
return EFI_ARCH_PARAMETERS["64"]["loongarch64"]["shim_file"]
|
||
elif efi_bitness == "64":
|
||
return EFI_ARCH_PARAMETERS["64"]["x86_64"]["shim_file"]
|
||
|
||
return None
|
||
|
||
|
||
def is_live_environment() -> bool:
|
||
"""
|
||
检测是否在 Live 环境中运行。
|
||
Live 环境通常无法访问 EFI 变量。
|
||
"""
|
||
efivars_path = "/sys/firmware/efi/efivars"
|
||
if not os.path.exists(efivars_path):
|
||
log_debug("Live 环境检测: /sys/firmware/efi/efivars 不存在")
|
||
return True
|
||
try:
|
||
files = os.listdir(efivars_path)
|
||
if not files:
|
||
log_debug("Live 环境检测: efivars 目录为空")
|
||
return True
|
||
except PermissionError:
|
||
log_debug("Live 环境检测: 无法访问 efivars")
|
||
return True
|
||
|
||
log_debug("Live 环境检测: 看起来不是 Live 环境")
|
||
return False
|
||
|
||
|
||
def check_secure_boot_status() -> Tuple[bool, bool]:
|
||
"""
|
||
检查 Secure Boot 状态。
|
||
返回: (是否支持检测, 是否启用)
|
||
"""
|
||
try:
|
||
# 方法1: 通过 mokutil 检查
|
||
success, stdout, _ = run_command(
|
||
["mokutil", "--sb-state"],
|
||
"检查 Secure Boot 状态",
|
||
timeout=10
|
||
)
|
||
if success:
|
||
if "enabled" in stdout.lower():
|
||
log_info("Secure Boot 状态: 已启用")
|
||
return True, True
|
||
elif "disabled" in stdout.lower():
|
||
log_info("Secure Boot 状态: 已禁用")
|
||
return True, False
|
||
except Exception:
|
||
pass
|
||
|
||
# 方法2: 通过 efi 变量检查
|
||
try:
|
||
sb_var_path = "/sys/firmware/efi/efivars/SecureBoot-8be4df61-93ca-11d2-aa0d-00e098032b8c"
|
||
if os.path.exists(sb_var_path):
|
||
with open(sb_var_path, "rb") as f:
|
||
data = f.read()
|
||
if len(data) >= 5:
|
||
# 第5个字节是 Secure Boot 状态
|
||
sb_enabled = data[4] == 1
|
||
log_info(f"Secure Boot 状态: {'已启用' if sb_enabled else '已禁用'}")
|
||
return True, sb_enabled
|
||
except Exception as e:
|
||
log_debug(f"通过 efi 变量检查 Secure Boot 失败: {e}")
|
||
|
||
log_warning("无法检测 Secure Boot 状态")
|
||
return False, False
|
||
|
||
|
||
def _process_partition(block_device: Dict, all_disks: List, all_partitions: List,
|
||
all_efi_partitions: List) -> None:
|
||
"""
|
||
处理单个块设备,递归处理子分区、LVM逻辑卷、btrfs子卷等。
|
||
"""
|
||
dev_name = f"/dev/{block_device.get('name', '')}"
|
||
dev_type = block_device.get("type")
|
||
|
||
# 跳过 loop 设备和 Live 系统自身的挂载
|
||
mountpoint = block_device.get("mountpoint")
|
||
if mountpoint and (mountpoint.startswith("/run/media") or
|
||
mountpoint.startswith("/cdrom") or
|
||
mountpoint.startswith("/live") or
|
||
mountpoint.startswith("/iso")):
|
||
log_debug(f"跳过 Live 系统挂载点: {dev_name} -> {mountpoint}")
|
||
return
|
||
|
||
if dev_type == "disk":
|
||
# 跳过 loop 和 rom 设备
|
||
if block_device.get("rota") is not None or block_device.get("size", "0") == "0":
|
||
pass # 可能是有效磁盘
|
||
all_disks.append({
|
||
"name": dev_name,
|
||
"size": block_device.get("size", "unknown"),
|
||
"model": block_device.get("model", "") # 可能为空(旧版本 lsblk)
|
||
})
|
||
for child in block_device.get("children", []):
|
||
_process_partition(child, all_disks, all_partitions, all_efi_partitions)
|
||
|
||
elif dev_type == "part":
|
||
# 跳过 Live 系统的根分区
|
||
if mountpoint and mountpoint == "/":
|
||
# 检查是否是 Live 系统的根(通过检查是否有 /live 或特定标记)
|
||
live_marker = os.path.join(mountpoint, "live")
|
||
if os.path.exists(live_marker):
|
||
log_debug(f"跳过 Live 系统根分区: {dev_name}")
|
||
for child in block_device.get("children", []):
|
||
_process_partition(child, all_disks, all_partitions, all_efi_partitions)
|
||
return
|
||
|
||
part_info = {
|
||
"name": dev_name,
|
||
"fstype": block_device.get("fstype", "unknown"),
|
||
"size": block_device.get("size", "unknown"),
|
||
"mountpoint": mountpoint,
|
||
"uuid": block_device.get("uuid"),
|
||
"partlabel": block_device.get("partlabel"),
|
||
"label": block_device.get("label"),
|
||
"parttype": (block_device.get("parttype") or "").upper(),
|
||
"fsver": block_device.get("fsver", ""),
|
||
"is_subvolume": False
|
||
}
|
||
|
||
# 检测 btrfs 子卷
|
||
if part_info["fstype"] == "btrfs":
|
||
part_info["is_btrfs"] = True
|
||
# 尝试获取子卷信息
|
||
try:
|
||
success, stdout, _ = run_command(
|
||
["btrfs", "subvolume", "list", dev_name],
|
||
f"检测 btrfs 子卷: {dev_name}",
|
||
timeout=10
|
||
)
|
||
if success:
|
||
subvolumes = []
|
||
for line in stdout.strip().split('\n'):
|
||
if line:
|
||
# 格式: ID 256 gen 16 top level 5 path @
|
||
match = re.search(r'path\s+(\S+)', line)
|
||
if match:
|
||
subvolumes.append(match.group(1))
|
||
if subvolumes:
|
||
part_info["subvolumes"] = subvolumes
|
||
log_debug(f"{dev_name} 的 btrfs 子卷: {subvolumes}")
|
||
except Exception as e:
|
||
log_debug(f"检测 btrfs 子卷失败: {e}")
|
||
|
||
all_partitions.append(part_info)
|
||
|
||
# 检测 EFI 系统分区
|
||
is_vfat = part_info["fstype"] == "vfat"
|
||
has_efi_label = part_info["partlabel"] and "EFI" in part_info["partlabel"].upper()
|
||
has_efi_name = part_info["label"] and "EFI" in part_info["label"].upper()
|
||
# EFI 系统分区的 GPT 类型 GUID
|
||
is_efi_type = part_info["parttype"] == "C12A7328-F81F-11D2-BA4B-00A0C93EC93B"
|
||
# 或者检测标志
|
||
is_efi_fs = block_device.get("fsver") == "FAT32" and is_vfat
|
||
|
||
if is_vfat and (has_efi_label or has_efi_name or is_efi_type):
|
||
all_efi_partitions.append(part_info)
|
||
log_debug(f"检测到 EFI 分区: {dev_name}")
|
||
|
||
for child in block_device.get("children", []):
|
||
_process_partition(child, all_disks, all_partitions, all_efi_partitions)
|
||
|
||
elif dev_type in ["lvm", "dm"]:
|
||
lv_name = block_device.get("name", "")
|
||
|
||
# 处理 LVM 命名格式 (vg-lv)
|
||
if "-" in lv_name and not lv_name.startswith("dm-"):
|
||
mapper_path = f"/dev/mapper/{lv_name}"
|
||
else:
|
||
mapper_path = dev_name
|
||
|
||
part_info = {
|
||
"name": mapper_path,
|
||
"fstype": block_device.get("fstype", "unknown"),
|
||
"size": block_device.get("size", "unknown"),
|
||
"mountpoint": mountpoint,
|
||
"uuid": block_device.get("uuid"),
|
||
"partlabel": block_device.get("partlabel"),
|
||
"label": block_device.get("label"),
|
||
"parttype": (block_device.get("parttype") or "").upper(),
|
||
"is_lvm": True,
|
||
"dm_name": lv_name
|
||
}
|
||
|
||
# 检测 LUKS 加密
|
||
if part_info["fstype"] == "crypto_LUKS":
|
||
part_info["is_luks"] = True
|
||
log_debug(f"检测到 LUKS 加密卷: {mapper_path}")
|
||
|
||
all_partitions.append(part_info)
|
||
|
||
for child in block_device.get("children", []):
|
||
_process_partition(child, all_disks, all_partitions, all_efi_partitions)
|
||
|
||
elif dev_type == "crypt":
|
||
# 已解密的 LUKS 设备
|
||
part_info = {
|
||
"name": dev_name,
|
||
"fstype": block_device.get("fstype", "unknown"),
|
||
"size": block_device.get("size", "unknown"),
|
||
"mountpoint": mountpoint,
|
||
"uuid": block_device.get("uuid"),
|
||
"is_luks_open": True,
|
||
"parent": block_device.get("pkname", "") # 可能为空(旧版本 lsblk)
|
||
}
|
||
all_partitions.append(part_info)
|
||
|
||
for child in block_device.get("children", []):
|
||
_process_partition(child, all_disks, all_partitions, all_efi_partitions)
|
||
|
||
|
||
def scan_partitions() -> Tuple[bool, List, List, List, str]:
|
||
"""
|
||
扫描系统中的所有磁盘和分区。
|
||
支持标准分区、LVM、LUKS、btrfs 等。
|
||
"""
|
||
log_step("扫描系统分区", "使用 lsblk 获取磁盘和分区信息")
|
||
|
||
# 使用基础列名以确保兼容性(旧版本 lsblk 可能不支持 FSVER, ROTA, MODEL, PKNAME)
|
||
success, stdout, stderr = run_command(
|
||
["sudo", "lsblk", "-J", "-o", "NAME,FSTYPE,SIZE,MOUNTPOINT,UUID,PARTLABEL,LABEL,TYPE,PARTTYPE"],
|
||
"扫描分区"
|
||
)
|
||
|
||
if not success:
|
||
return False, [], [], [], stderr
|
||
|
||
try:
|
||
data = json.loads(stdout)
|
||
all_disks = []
|
||
all_partitions = []
|
||
all_efi_partitions = []
|
||
|
||
for block_device in data.get("blockdevices", []):
|
||
_process_partition(block_device, all_disks, all_partitions, all_efi_partitions)
|
||
|
||
log_info(f"扫描完成: 发现 {len(all_disks)} 个磁盘, {len(all_partitions)} 个分区, {len(all_efi_partitions)} 个EFI分区")
|
||
for d in all_disks:
|
||
log_debug(f" 磁盘: {d['name']} ({d.get('model', '')})")
|
||
for p in all_partitions:
|
||
log_debug(f" 分区: {p['name']} ({p['fstype']})")
|
||
for e in all_efi_partitions:
|
||
log_info(f" EFI分区: {e['name']}")
|
||
|
||
return True, all_disks, all_partitions, all_efi_partitions, ""
|
||
|
||
except json.JSONDecodeError as e:
|
||
log_error(f"解析 lsblk 输出失败: {e}")
|
||
return False, [], [], [], f"解析lsblk输出失败: {e}"
|
||
except Exception as e:
|
||
log_error(f"处理分区数据时发生未知错误: {e}")
|
||
return False, [], [], [], f"处理分区数据时发生未知错误: {e}"
|
||
|
||
|
||
def detect_filesystem_type(partition: str) -> str:
|
||
"""
|
||
检测指定分区的文件系统类型。
|
||
使用 blkid 获取准确的文件系统信息。
|
||
"""
|
||
try:
|
||
success, stdout, _ = run_command(
|
||
["blkid", "-s", "TYPE", "-o", "value", partition],
|
||
f"检测文件系统类型: {partition}",
|
||
timeout=10
|
||
)
|
||
if success:
|
||
fs_type = stdout.strip()
|
||
log_debug(f"{partition} 的文件系统类型: {fs_type}")
|
||
return fs_type
|
||
except Exception as e:
|
||
log_debug(f"检测文件系统类型失败: {e}")
|
||
|
||
return "unknown"
|
||
|
||
|
||
def is_btrfs_subvolume(mount_point: str) -> bool:
|
||
"""检查指定路径是否是 btrfs 子卷"""
|
||
try:
|
||
success, stdout, _ = run_command(
|
||
["btrfs", "subvolume", "get-default", mount_point],
|
||
"检查 btrfs 子卷",
|
||
timeout=10
|
||
)
|
||
return success
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
def get_btrfs_root_subvolume(partition: str) -> Optional[str]:
|
||
"""
|
||
获取 btrfs 分区的根子卷名称。
|
||
常见的子卷命名: @, @root, root
|
||
"""
|
||
try:
|
||
success, stdout, _ = run_command(
|
||
["btrfs", "subvolume", "list", partition],
|
||
f"获取 btrfs 子卷: {partition}",
|
||
timeout=10
|
||
)
|
||
if success:
|
||
subvolumes = []
|
||
for line in stdout.strip().split('\n'):
|
||
match = re.search(r'path\s+(\S+)', line)
|
||
if match:
|
||
subvolumes.append(match.group(1))
|
||
|
||
# 优先查找常见的根子卷名称
|
||
for candidate in ["@", "root", "@root", "ROOT", "@ROOT"]:
|
||
if candidate in subvolumes:
|
||
log_debug(f"找到 btrfs 根子卷: {candidate}")
|
||
return candidate
|
||
|
||
# 如果都没找到,返回第一个
|
||
if subvolumes:
|
||
log_debug(f"使用第一个 btrfs 子卷: {subvolumes[0]}")
|
||
return subvolumes[0]
|
||
except Exception as e:
|
||
log_debug(f"获取 btrfs 子卷失败: {e}")
|
||
|
||
return None
|
||
|
||
|
||
def _cleanup_partial_mount(mount_point: str, mounted_boot: bool, mounted_efi: bool,
|
||
bind_paths_mounted: List[str]) -> None:
|
||
"""清理部分挂载的资源"""
|
||
log_warning(f"清理部分挂载资源: {mount_point}")
|
||
|
||
for target_path in reversed(bind_paths_mounted):
|
||
if os.path.ismount(target_path):
|
||
run_command(["sudo", "umount", target_path], f"清理卸载绑定 {target_path}")
|
||
|
||
if mounted_efi:
|
||
efi_mount_point = os.path.join(mount_point, "boot/efi")
|
||
if os.path.ismount(efi_mount_point):
|
||
run_command(["sudo", "umount", efi_mount_point], "清理卸载EFI分区")
|
||
|
||
if mounted_boot:
|
||
boot_mount_point = os.path.join(mount_point, "boot")
|
||
if os.path.ismount(boot_mount_point):
|
||
run_command(["sudo", "umount", boot_mount_point], "清理卸载/boot分区")
|
||
|
||
if os.path.ismount(mount_point):
|
||
run_command(["sudo", "umount", mount_point], "清理卸载根分区")
|
||
|
||
if os.path.exists(mount_point) and not os.path.ismount(mount_point):
|
||
try:
|
||
os.rmdir(mount_point)
|
||
except OSError:
|
||
pass
|
||
|
||
|
||
def mount_target_system(root_partition: str, boot_partition: Optional[str] = None,
|
||
efi_partition: Optional[str] = None,
|
||
btrfs_subvolume: Optional[str] = None) -> Tuple[bool, str, str]:
|
||
"""
|
||
挂载目标系统的根分区、/boot分区和EFI分区到临时目录。
|
||
支持 btrfs 子卷挂载。
|
||
"""
|
||
log_step("挂载目标系统", f"根分区: {root_partition}, /boot: {boot_partition or '无'}, EFI: {efi_partition or '无'}")
|
||
|
||
if not validate_device_path(root_partition):
|
||
log_error(f"无效的根分区路径: {root_partition}")
|
||
return False, "", f"无效的根分区路径: {root_partition}"
|
||
if boot_partition and not validate_device_path(boot_partition):
|
||
log_error(f"无效的/boot分区路径: {boot_partition}")
|
||
return False, "", f"无效的/boot分区路径: {boot_partition}"
|
||
if efi_partition and not validate_device_path(efi_partition):
|
||
log_error(f"无效的EFI分区路径: {efi_partition}")
|
||
return False, "", f"无效的EFI分区路径: {efi_partition}"
|
||
|
||
mount_point = "/mnt_grub_repair_" + str(int(time.time()))
|
||
log_info(f"创建临时挂载点: {mount_point}")
|
||
|
||
try:
|
||
os.makedirs(mount_point, exist_ok=False)
|
||
log_info(f"✓ 挂载点创建成功")
|
||
except Exception as e:
|
||
log_error(f"创建挂载点失败: {e}")
|
||
return False, "", f"创建挂载点失败: {e}"
|
||
|
||
bind_paths_mounted = []
|
||
mounted_boot = False
|
||
mounted_efi = False
|
||
|
||
# 1. 挂载根分区
|
||
log_info(f"[1/4] 挂载根分区 {root_partition} 到 {mount_point}")
|
||
|
||
# 检测文件系统类型
|
||
fs_type = detect_filesystem_type(root_partition)
|
||
|
||
# 构建挂载命令
|
||
mount_cmd = ["sudo", "mount"]
|
||
|
||
# btrfs 子卷处理
|
||
if fs_type == "btrfs" and btrfs_subvolume:
|
||
mount_cmd.extend(["-o", f"subvol={btrfs_subvolume}"])
|
||
log_info(f" 使用 btrfs 子卷: {btrfs_subvolume}")
|
||
|
||
mount_cmd.extend([root_partition, mount_point])
|
||
|
||
success, _, stderr = run_command(mount_cmd, f"挂载根分区 {root_partition}")
|
||
if not success:
|
||
log_error(f"挂载根分区失败,清理中...")
|
||
os.rmdir(mount_point)
|
||
return False, "", f"挂载根分区失败: {stderr}"
|
||
|
||
if not os.path.ismount(mount_point):
|
||
log_error(f"挂载点未激活: {mount_point}")
|
||
return False, "", "挂载根分区失败: 挂载点未激活"
|
||
log_success(f"✓ 根分区挂载成功")
|
||
|
||
# 检查关键目录
|
||
for check_dir in ["etc", "boot"]:
|
||
full_path = os.path.join(mount_point, check_dir)
|
||
if os.path.exists(full_path):
|
||
log_info(f" 发现目录: /{check_dir}")
|
||
else:
|
||
log_warning(f" 未找到目录: /{check_dir}")
|
||
|
||
# 2. 挂载 /boot 分区
|
||
if boot_partition:
|
||
log_info(f"[2/4] 挂载 /boot 分区 {boot_partition}")
|
||
boot_mount_point = os.path.join(mount_point, "boot")
|
||
if not os.path.exists(boot_mount_point):
|
||
os.makedirs(boot_mount_point)
|
||
log_debug(f"创建 /boot 挂载点")
|
||
|
||
success, _, stderr = run_command(["sudo", "mount", boot_partition, boot_mount_point],
|
||
f"挂载 /boot 分区 {boot_partition}")
|
||
if not success:
|
||
log_error(f"挂载 /boot 分区失败,开始清理...")
|
||
_cleanup_partial_mount(mount_point, False, False, [])
|
||
return False, "", f"挂载 /boot 分区失败: {stderr}"
|
||
mounted_boot = True
|
||
log_success(f"✓ /boot 分区挂载成功")
|
||
else:
|
||
log_info(f"[2/4] 无独立的 /boot 分区,跳过")
|
||
|
||
# 3. 挂载 EFI 分区
|
||
if efi_partition:
|
||
log_info(f"[3/4] 挂载 EFI 分区 {efi_partition}")
|
||
efi_mount_point = os.path.join(mount_point, "boot/efi")
|
||
if not os.path.exists(efi_mount_point):
|
||
os.makedirs(efi_mount_point)
|
||
log_debug(f"创建 /boot/efi 挂载点")
|
||
|
||
success, _, stderr = run_command(["sudo", "mount", efi_partition, efi_mount_point],
|
||
f"挂载 EFI 分区 {efi_partition}")
|
||
if not success:
|
||
log_error(f"挂载 EFI 分区失败,开始清理...")
|
||
_cleanup_partial_mount(mount_point, mounted_boot, False, [])
|
||
return False, "", f"挂载 EFI 分区失败: {stderr}"
|
||
mounted_efi = True
|
||
log_success(f"✓ EFI 分区挂载成功")
|
||
|
||
# 检查 EFI 分区内容
|
||
efi_path = os.path.join(mount_point, "boot/efi/EFI")
|
||
if os.path.exists(efi_path):
|
||
log_info(f" EFI 分区内容:")
|
||
try:
|
||
for item in os.listdir(efi_path):
|
||
log_info(f" - {item}")
|
||
except Exception as e:
|
||
log_warning(f" 无法列出 EFI 目录: {e}")
|
||
else:
|
||
log_info(f"[3/4] 无 EFI 分区,跳过")
|
||
|
||
# 4. 绑定伪文件系统
|
||
log_info(f"[4/4] 绑定伪文件系统 (/dev, /proc, /sys 等)")
|
||
bind_paths = ["/dev", "/dev/pts", "/proc", "/sys", "/run"]
|
||
for path in bind_paths:
|
||
target_path = os.path.join(mount_point, path.lstrip('/'))
|
||
if not os.path.exists(target_path):
|
||
os.makedirs(target_path)
|
||
log_debug(f"创建目录: {target_path}")
|
||
|
||
success, _, stderr = run_command(["sudo", "mount", "--bind", path, target_path],
|
||
f"绑定 {path}")
|
||
if not success:
|
||
log_error(f"绑定 {path} 失败,开始清理...")
|
||
_cleanup_partial_mount(mount_point, mounted_boot, mounted_efi, bind_paths_mounted)
|
||
return False, "", f"绑定 {path} 失败: {stderr}"
|
||
bind_paths_mounted.append(target_path)
|
||
|
||
log_success(f"✓ 所有绑定完成")
|
||
log_info(f"挂载摘要: 根分区 ✓, /boot {'✓' if mounted_boot else '✗'}, EFI {'✓' if mounted_efi else '✗'}")
|
||
|
||
return True, mount_point, ""
|
||
|
||
|
||
def detect_distro_type(mount_point: str) -> str:
|
||
"""
|
||
检测目标系统发行版类型。
|
||
支持更多发行版:Arch, Debian, Ubuntu, CentOS/RHEL/Rocky/Alma,
|
||
Fedora, openSUSE, Void, Gentoo, NixOS 等。
|
||
"""
|
||
log_step("检测发行版类型", f"挂载点: {mount_point}")
|
||
|
||
os_release_path = os.path.join(mount_point, "etc/os-release")
|
||
log_info(f"检查文件: {os_release_path}")
|
||
|
||
if not os.path.exists(os_release_path):
|
||
log_warning(f"未找到 {os_release_path},尝试 /etc/issue")
|
||
issue_path = os.path.join(mount_point, "etc/issue")
|
||
if os.path.exists(issue_path):
|
||
try:
|
||
with open(issue_path, "r") as f:
|
||
content = f.read().lower()
|
||
log_debug(f"/etc/issue 内容: {content[:200]}")
|
||
if "ubuntu" in content:
|
||
log_info(f"通过 /etc/issue 检测到: ubuntu")
|
||
return "ubuntu"
|
||
elif "debian" in content:
|
||
log_info(f"通过 /etc/issue 检测到: debian")
|
||
return "debian"
|
||
elif "centos" in content or "rhel" in content:
|
||
log_info(f"通过 /etc/issue 检测到: centos")
|
||
return "centos"
|
||
elif "fedora" in content:
|
||
log_info(f"通过 /etc/issue 检测到: fedora")
|
||
return "fedora"
|
||
elif "void" in content:
|
||
log_info(f"通过 /etc/issue 检测到: void")
|
||
return "void"
|
||
elif "gentoo" in content:
|
||
log_info(f"通过 /etc/issue 检测到: gentoo")
|
||
return "gentoo"
|
||
except Exception as e:
|
||
log_warning(f"读取 /etc/issue 失败: {e}")
|
||
|
||
# 检查其他发行版特定文件
|
||
if os.path.exists(os.path.join(mount_point, "etc/arch-release")):
|
||
log_info("通过 arch-release 检测到: arch")
|
||
return "arch"
|
||
if os.path.exists(os.path.join(mount_point, "etc/gentoo-release")):
|
||
log_info("通过 gentoo-release 检测到: gentoo")
|
||
return "gentoo"
|
||
if os.path.exists(os.path.join(mount_point, "etc/nixos/configuration.nix")):
|
||
log_info("通过 configuration.nix 检测到: nixos")
|
||
return "nixos"
|
||
|
||
return "unknown"
|
||
|
||
try:
|
||
with open(os_release_path, "r") as f:
|
||
content = f.read()
|
||
log_debug(f"/etc/os-release 内容:\n{content}")
|
||
|
||
id_match = re.search(r'^ID=(.+)$', content, re.MULTILINE)
|
||
id_like_match = re.search(r'^ID_LIKE=(.+)$', content, re.MULTILINE)
|
||
|
||
distro_id = id_match.group(1).strip('"\'') if id_match else ""
|
||
id_like = id_like_match.group(1).strip('"\'') if id_like_match else ""
|
||
|
||
log_info(f"ID={distro_id}, ID_LIKE={id_like}")
|
||
|
||
# 直接匹配
|
||
distro_map = {
|
||
"ubuntu": "ubuntu",
|
||
"debian": "debian",
|
||
"arch": "arch",
|
||
"manjaro": "arch",
|
||
"endeavouros": "arch",
|
||
"garuda": "arch",
|
||
"cachyos": "arch",
|
||
"centos": "centos",
|
||
"rhel": "centos",
|
||
"rocky": "centos",
|
||
"almalinux": "centos",
|
||
"fedora": "fedora",
|
||
"opensuse": "opensuse",
|
||
"opensuse-leap": "opensuse",
|
||
"opensuse-tumbleweed": "opensuse",
|
||
"void": "void",
|
||
"gentoo": "gentoo",
|
||
"nixos": "nixos",
|
||
}
|
||
|
||
if distro_id in distro_map:
|
||
result = distro_map[distro_id]
|
||
log_info(f"检测到发行版: {result} (ID={distro_id})")
|
||
return result
|
||
|
||
# 通过 ID_LIKE 推断
|
||
like_map = {
|
||
"ubuntu": "ubuntu",
|
||
"debian": "debian",
|
||
"arch": "arch",
|
||
"rhel": "centos",
|
||
"centos": "centos",
|
||
"fedora": "fedora",
|
||
"suse": "opensuse",
|
||
}
|
||
|
||
for key, value in like_map.items():
|
||
if key in id_like:
|
||
log_info(f"通过 ID_LIKE 推断: {value}")
|
||
return value
|
||
|
||
log_warning(f"无法识别的发行版,ID={distro_id}, ID_LIKE={id_like}")
|
||
return "unknown"
|
||
except Exception as e:
|
||
log_error(f"读取 os-release 文件失败: {e}")
|
||
return "unknown"
|
||
|
||
|
||
def _get_grub_packages(distro_type: str) -> Tuple[List[str], str]:
|
||
"""
|
||
获取安装 GRUB 包所需的包列表和包管理器命令。
|
||
返回: (包列表, 包管理器基命令)
|
||
"""
|
||
package_map = {
|
||
"centos": (["grub2-tools", "grub2-pc"], "yum install -y"),
|
||
"fedora": (["grub2-tools", "grub2-pc"], "dnf install -y"),
|
||
"rhel": (["grub2-tools", "grub2-pc"], "yum install -y"),
|
||
"rocky": (["grub2-tools", "grub2-pc"], "dnf install -y"),
|
||
"almalinux": (["grub2-tools", "grub2-pc"], "dnf install -y"),
|
||
"debian": (["grub-pc"], "apt-get install -y"),
|
||
"ubuntu": (["grub-pc"], "apt-get install -y"),
|
||
"arch": (["grub"], "pacman -S --noconfirm"),
|
||
"manjaro": (["grub"], "pacman -S --noconfirm"),
|
||
"opensuse": (["grub2"], "zypper install -y"),
|
||
"void": (["grub"], "xbps-install -y"),
|
||
"gentoo": (["sys-boot/grub"], "emerge"),
|
||
}
|
||
return package_map.get(distro_type, (["grub"], "包管理器安装"))
|
||
|
||
|
||
def _auto_install_grub(mount_point: str, distro_type: str) -> bool:
|
||
"""
|
||
自动在 chroot 环境中安装 GRUB 包。
|
||
返回是否成功。
|
||
"""
|
||
log_step("自动安装 GRUB 包", f"发行版: {distro_type}")
|
||
|
||
packages, pkg_cmd = _get_grub_packages(distro_type)
|
||
|
||
if not packages:
|
||
log_warning(f"未知的发行版 '{distro_type}',无法自动安装 GRUB")
|
||
return False
|
||
|
||
chroot_cmd_prefix = ["sudo", "chroot", mount_point]
|
||
|
||
# 1. 首先检查网络连接(通过 ping)
|
||
log_info("检查网络连接...")
|
||
success, _, _ = run_command(
|
||
chroot_cmd_prefix + ["ping", "-c", "1", "8.8.8.8"],
|
||
"检查网络连接",
|
||
timeout=10
|
||
)
|
||
if not success:
|
||
# 尝试使用主机的网络配置
|
||
log_warning("chroot 环境无网络,尝试从 Live 环境复制 DNS 配置...")
|
||
|
||
# 复制 /etc/resolv.conf
|
||
resolv_source = "/etc/resolv.conf"
|
||
resolv_target = os.path.join(mount_point, "etc/resolv.conf")
|
||
if os.path.exists(resolv_source):
|
||
try:
|
||
shutil.copy2(resolv_source, resolv_target)
|
||
log_info("已复制 DNS 配置")
|
||
except Exception as e:
|
||
log_warning(f"复制 DNS 配置失败: {e}")
|
||
|
||
# 再次检查网络
|
||
success, _, _ = run_command(
|
||
chroot_cmd_prefix + ["ping", "-c", "1", "8.8.8.8"],
|
||
"再次检查网络连接",
|
||
timeout=10
|
||
)
|
||
if not success:
|
||
log_error("chroot 环境无法连接网络,无法自动安装 GRUB")
|
||
log_info("请手动安装 GRUB 包后再运行此工具")
|
||
return False
|
||
|
||
log_success("网络连接正常")
|
||
|
||
# 2. 更新包列表(某些发行版需要)
|
||
update_cmds = {
|
||
"debian": ["apt-get", "update"],
|
||
"ubuntu": ["apt-get", "update"],
|
||
"arch": ["pacman", "-Sy"],
|
||
"manjaro": ["pacman", "-Sy"],
|
||
}
|
||
|
||
if distro_type in update_cmds:
|
||
log_info(f"更新 {distro_type} 包列表...")
|
||
run_command(
|
||
chroot_cmd_prefix + update_cmds[distro_type],
|
||
f"更新 {distro_type} 包列表",
|
||
timeout=120
|
||
)
|
||
|
||
# 3. 安装 GRUB 包
|
||
log_info(f"安装 GRUB 包: {', '.join(packages)}")
|
||
|
||
install_cmd = chroot_cmd_prefix + pkg_cmd.split() + packages
|
||
success, stdout, stderr = run_command(
|
||
install_cmd,
|
||
f"安装 GRUB 包 ({' '.join(packages)})",
|
||
timeout=300
|
||
)
|
||
|
||
if success:
|
||
log_success(f"✓ GRUB 包安装成功")
|
||
return True
|
||
else:
|
||
log_error(f"✗ GRUB 包安装失败: {stderr}")
|
||
return False
|
||
|
||
|
||
def check_chroot_environment(mount_point: str, distro_type: str = "unknown") -> Tuple[bool, str]:
|
||
"""
|
||
检查 chroot 环境是否可用。
|
||
检测可用的 GRUB 命令及其版本。
|
||
如果缺少 GRUB 命令,尝试自动安装。
|
||
"""
|
||
log_step("检查 chroot 环境", f"挂载点: {mount_point}")
|
||
|
||
# 检查关键命令是否存在
|
||
critical_commands = ["grub-install", "grub-mkconfig", "update-grub", "grub2-install", "grub2-mkconfig"]
|
||
found_commands = []
|
||
|
||
for cmd in critical_commands:
|
||
success, _, _ = run_command(["sudo", "chroot", mount_point, "which", cmd],
|
||
f"检查命令 {cmd}", timeout=10)
|
||
if success:
|
||
found_commands.append(cmd)
|
||
log_info(f" ✓ 找到命令: {cmd}")
|
||
else:
|
||
log_debug(f" ✗ 未找到命令: {cmd}")
|
||
|
||
if not found_commands:
|
||
log_warning(f"chroot 环境中未找到关键的 GRUB 命令")
|
||
log_info(f"尝试自动安装 GRUB 包...")
|
||
|
||
# 尝试自动安装
|
||
if _auto_install_grub(mount_point, distro_type):
|
||
# 重新检查命令
|
||
log_info("重新检查 GRUB 命令...")
|
||
for cmd in critical_commands:
|
||
success, _, _ = run_command(["sudo", "chroot", mount_point, "which", cmd],
|
||
f"检查命令 {cmd}", timeout=10)
|
||
if success:
|
||
found_commands.append(cmd)
|
||
log_info(f" ✓ 找到命令: {cmd}")
|
||
|
||
if found_commands:
|
||
log_success("✓ 自动安装成功,GRUB 命令已可用")
|
||
else:
|
||
log_error("✗ 自动安装后仍未找到 GRUB 命令")
|
||
return False, "自动安装 GRUB 失败,请手动安装"
|
||
else:
|
||
log_error("=" * 60)
|
||
log_error(f"无法自动安装 GRUB 包")
|
||
log_error(f"")
|
||
log_error(f"请手动安装 GRUB 包:")
|
||
packages, pkg_cmd = _get_grub_packages(distro_type)
|
||
log_error(f" chroot {mount_point}")
|
||
log_error(f" {pkg_cmd} {' '.join(packages)}")
|
||
log_error(f" exit")
|
||
log_error(f"=" * 60)
|
||
return False, "自动安装 GRUB 失败"
|
||
|
||
# 检查 grub-install 版本(优先使用 grub2-install 如果存在)
|
||
grub_cmd = "grub2-install" if "grub2-install" in found_commands else "grub-install"
|
||
success, stdout, _ = run_command(["sudo", "chroot", mount_point, grub_cmd, "--version"],
|
||
"检查 grub-install 版本", timeout=10)
|
||
if success:
|
||
log_info(f"grub-install 版本: {stdout.strip()}")
|
||
|
||
# 检查 EFI 目录(UEFI 模式)
|
||
efi_dir = os.path.join(mount_point, "boot/efi")
|
||
if os.path.ismount(efi_dir):
|
||
log_info(f"✓ EFI 分区已挂载到 /boot/efi")
|
||
try:
|
||
stat = os.stat(efi_dir)
|
||
log_debug(f" EFI 目录权限: {oct(stat.st_mode)}")
|
||
except Exception as e:
|
||
log_warning(f" 无法获取 EFI 目录权限: {e}")
|
||
else:
|
||
log_info(f"EFI 分区未挂载(可能是 BIOS 模式)")
|
||
|
||
# 检查 Secure Boot 相关工具
|
||
sb_tools = ["mokutil", "sbsign", "sbverify"]
|
||
for tool in sb_tools:
|
||
success, _, _ = run_command(["sudo", "chroot", mount_point, "which", tool],
|
||
f"检查 Secure Boot 工具 {tool}", timeout=5)
|
||
if success:
|
||
log_info(f" ✓ 找到 Secure Boot 工具: {tool}")
|
||
|
||
return True, ""
|
||
|
||
|
||
def install_efi_fallback(mount_point: str, efi_target: str, efi_grub_file: str,
|
||
efi_boot_file: str, bootloader_id: str = "GRUB") -> bool:
|
||
"""
|
||
安装 EFI fallback 引导文件。
|
||
将 GRUB EFI 文件复制到 /EFI/Boot/bootx64.efi,这是 UEFI 的通用回退路径。
|
||
参考 Calamares 的实现。
|
||
"""
|
||
log_step("安装 EFI Fallback", f"目标文件: {efi_boot_file}")
|
||
|
||
efi_firmware_dir = os.path.join(mount_point, "boot/efi/EFI")
|
||
|
||
# 处理 VFAT 大小写问题(某些固件对大小写敏感)
|
||
# 检查 EFI 目录的实际大小写
|
||
if os.path.exists(efi_firmware_dir):
|
||
try:
|
||
entries = os.listdir(os.path.join(mount_point, "boot/efi"))
|
||
for entry in entries:
|
||
if entry.upper() == "EFI":
|
||
efi_firmware_dir = os.path.join(mount_point, "boot/efi", entry)
|
||
break
|
||
except Exception as e:
|
||
log_debug(f"检查 EFI 目录大小写失败: {e}")
|
||
|
||
# 创建 Boot 目录(处理大小写)
|
||
boot_dir = os.path.join(efi_firmware_dir, "Boot")
|
||
try:
|
||
if os.path.exists(efi_firmware_dir):
|
||
entries = os.listdir(efi_firmware_dir)
|
||
for entry in entries:
|
||
if entry.upper() == "BOOT":
|
||
boot_dir = os.path.join(efi_firmware_dir, entry)
|
||
break
|
||
except Exception:
|
||
pass
|
||
|
||
if not os.path.exists(boot_dir):
|
||
try:
|
||
os.makedirs(boot_dir)
|
||
log_info(f"创建 Boot 目录: {boot_dir}")
|
||
except Exception as e:
|
||
log_error(f"创建 Boot 目录失败: {e}")
|
||
return False
|
||
|
||
# 源文件路径
|
||
source_paths = [
|
||
os.path.join(efi_firmware_dir, bootloader_id, efi_grub_file),
|
||
os.path.join(efi_firmware_dir, bootloader_id.lower(), efi_grub_file.lower()),
|
||
]
|
||
|
||
source_file = None
|
||
for path in source_paths:
|
||
if os.path.exists(path):
|
||
source_file = path
|
||
break
|
||
|
||
if not source_file:
|
||
log_warning(f"找不到源 EFI 文件,尝试查找任何 .efi 文件")
|
||
# 尝试在 bootloader_id 目录下找到任何 .efi 文件
|
||
for subdir in [bootloader_id, bootloader_id.lower(), "grub", "GRUB"]:
|
||
grub_dir = os.path.join(efi_firmware_dir, subdir)
|
||
if os.path.exists(grub_dir):
|
||
try:
|
||
for f in os.listdir(grub_dir):
|
||
if f.endswith(".efi"):
|
||
source_file = os.path.join(grub_dir, f)
|
||
log_info(f"找到替代源文件: {source_file}")
|
||
break
|
||
except Exception:
|
||
pass
|
||
if source_file:
|
||
break
|
||
|
||
if not source_file:
|
||
log_error(f"无法找到源 EFI 文件")
|
||
return False
|
||
|
||
# 目标文件路径
|
||
target_file = os.path.join(boot_dir, efi_boot_file)
|
||
|
||
try:
|
||
shutil.copy2(source_file, target_file)
|
||
log_success(f"✓ EFI fallback 安装成功: {source_file} -> {target_file}")
|
||
return True
|
||
except Exception as e:
|
||
log_error(f"复制 EFI fallback 文件失败: {e}")
|
||
return False
|
||
|
||
|
||
def chroot_and_repair_grub(mount_point: str, target_disk: str,
|
||
is_uefi: bool = False, distro_type: str = "unknown",
|
||
install_hybrid: bool = False,
|
||
use_fallback: bool = True,
|
||
efi_bootloader_id: str = "GRUB") -> Tuple[bool, str]:
|
||
"""
|
||
Chroot到目标系统并执行GRUB修复命令。
|
||
支持多种安装模式、架构、EFI fallback、Secure Boot 等。
|
||
|
||
参数:
|
||
mount_point: 挂载点路径
|
||
target_disk: 目标磁盘(BIOS模式)
|
||
is_uefi: 是否 UEFI 模式
|
||
distro_type: 发行版类型
|
||
install_hybrid: 是否同时安装 BIOS 和 EFI 模式(混合启动)
|
||
use_fallback: 是否安装 EFI fallback 文件
|
||
efi_bootloader_id: EFI 启动项名称
|
||
"""
|
||
log_step("修复 GRUB", f"目标磁盘: {target_disk}, UEFI: {is_uefi}, 发行版: {distro_type}")
|
||
|
||
if not is_uefi and not validate_device_path(target_disk):
|
||
log_error(f"无效的目标磁盘路径: {target_disk}")
|
||
return False, f"无效的目标磁盘路径: {target_disk}"
|
||
|
||
# 检查 chroot 环境
|
||
ok, err = check_chroot_environment(mount_point, distro_type)
|
||
if not ok:
|
||
return False, err
|
||
|
||
chroot_cmd_prefix = ["sudo", "chroot", mount_point]
|
||
|
||
# 检测 Live 环境
|
||
is_live_env = is_live_environment()
|
||
if is_live_env:
|
||
log_info("检测到 Live 环境(无法访问 EFI 变量)")
|
||
|
||
# 获取 EFI 参数(如果需要)
|
||
efi_params = None
|
||
if is_uefi or install_hybrid:
|
||
efi_params = get_grub_efi_parameters()
|
||
if not efi_params:
|
||
return False, "无法确定 EFI 架构参数"
|
||
efi_target, efi_grub_file, efi_boot_file = efi_params
|
||
log_info(f"EFI 参数: target={efi_target}, grub={efi_grub_file}, boot={efi_boot_file}")
|
||
|
||
grub_install_success = False
|
||
install_errors = []
|
||
|
||
# ===== UEFI 安装 =====
|
||
if is_uefi or install_hybrid:
|
||
log_step("安装 UEFI GRUB")
|
||
|
||
# 检查 EFI 分区是否正确挂载
|
||
efi_check_path = os.path.join(mount_point, "boot/efi/EFI")
|
||
if os.path.exists(efi_check_path):
|
||
log_info(f"✓ EFI 目录存在: {efi_check_path}")
|
||
try:
|
||
contents = os.listdir(efi_check_path)
|
||
log_info(f" 当前 EFI 目录内容: {contents}")
|
||
except Exception as e:
|
||
log_warning(f" 无法列出 EFI 目录: {e}")
|
||
else:
|
||
log_warning(f"✗ EFI 目录不存在: {efi_check_path}")
|
||
|
||
grub_install_cmd = chroot_cmd_prefix + [
|
||
"grub-install" if distro_type != "centos" else "grub2-install",
|
||
f"--target={efi_target}",
|
||
"--efi-directory=/boot/efi",
|
||
f"--bootloader-id={efi_bootloader_id}",
|
||
"--force"
|
||
]
|
||
|
||
# Live 环境下优先使用 --removable
|
||
if is_live_env:
|
||
log_info("Live 环境: 优先使用 --removable 模式")
|
||
removable_cmd = chroot_cmd_prefix + [
|
||
"grub-install" if distro_type != "centos" else "grub2-install",
|
||
f"--target={efi_target}",
|
||
"--efi-directory=/boot/efi",
|
||
"--removable",
|
||
"--force"
|
||
]
|
||
success, stdout, stderr = run_command(
|
||
removable_cmd,
|
||
"安装 UEFI GRUB (removable 模式)",
|
||
timeout=60
|
||
)
|
||
if success:
|
||
grub_install_success = True
|
||
log_success("✓ removable 模式安装成功")
|
||
# 同时安装标准模式作为备选
|
||
log_info("尝试安装标准模式作为备选...")
|
||
run_command(grub_install_cmd + ["--no-nvram"], "安装标准模式 (no-nvram)", timeout=60)
|
||
else:
|
||
log_warning(f"removable 模式失败,尝试标准模式")
|
||
|
||
install_errors.append(f"removable 模式: {stderr}")
|
||
|
||
if not grub_install_success:
|
||
# 第一次尝试: 标准安装
|
||
log_info("尝试 1/3: 标准 UEFI 安装...")
|
||
success, stdout, stderr = run_command(
|
||
grub_install_cmd,
|
||
"安装 UEFI GRUB(标准模式)",
|
||
timeout=60
|
||
)
|
||
|
||
if success:
|
||
grub_install_success = True
|
||
log_success("✓ 标准安装成功")
|
||
else:
|
||
install_errors.append(f"标准模式: {stderr}")
|
||
log_warning(f"标准安装失败: {stderr}")
|
||
|
||
# 第二次尝试: removable 模式
|
||
log_info("尝试 2/3: Live环境/可移动模式(--removable)...")
|
||
removable_cmd = chroot_cmd_prefix + [
|
||
"grub-install" if distro_type != "centos" else "grub2-install",
|
||
f"--target={efi_target}",
|
||
"--efi-directory=/boot/efi",
|
||
"--removable",
|
||
"--force"
|
||
]
|
||
success, stdout, stderr = run_command(
|
||
removable_cmd,
|
||
"安装 UEFI GRUB(可移动模式)",
|
||
timeout=60
|
||
)
|
||
|
||
if success:
|
||
grub_install_success = True
|
||
log_success("✓ 可移动模式安装成功")
|
||
log_info(f" GRUB 已安装到 /EFI/Boot/{efi_boot_file}")
|
||
else:
|
||
install_errors.append(f"removable 模式: {stderr}")
|
||
|
||
# 第三次尝试: no-nvram
|
||
log_info("尝试 3/3: 使用 --no-nvram 选项...")
|
||
success, stdout, stderr = run_command(
|
||
grub_install_cmd + ["--no-nvram"],
|
||
"安装 UEFI GRUB(不带NVRAM)",
|
||
timeout=60
|
||
)
|
||
|
||
if success:
|
||
grub_install_success = True
|
||
log_success("✓ no-nvram 模式安装成功")
|
||
else:
|
||
install_errors.append(f"no-nvram 模式: {stderr}")
|
||
|
||
# 安装 EFI fallback(如果启用)
|
||
if grub_install_success and use_fallback:
|
||
log_info("安装 EFI fallback 文件...")
|
||
install_efi_fallback(mount_point, efi_target, efi_grub_file, efi_boot_file, efi_bootloader_id)
|
||
|
||
# ===== BIOS 安装 =====
|
||
if not is_uefi or install_hybrid:
|
||
log_step("安装 BIOS GRUB")
|
||
|
||
bios_cmd = chroot_cmd_prefix + [
|
||
"grub-install" if distro_type != "centos" else "grub2-install",
|
||
"--target=i386-pc",
|
||
"--recheck",
|
||
"--force",
|
||
target_disk
|
||
]
|
||
|
||
success, stdout, stderr = run_command(
|
||
bios_cmd,
|
||
f"安装 BIOS GRUB 到 {target_disk}",
|
||
timeout=60
|
||
)
|
||
|
||
if success:
|
||
log_success(f"✓ BIOS GRUB 安装成功到 {target_disk}")
|
||
if not is_uefi:
|
||
grub_install_success = True
|
||
else:
|
||
log_error(f"BIOS GRUB 安装失败: {stderr}")
|
||
if not is_uefi:
|
||
install_errors.append(f"BIOS 模式: {stderr}")
|
||
|
||
# 检查安装结果
|
||
if not grub_install_success:
|
||
error_summary = "\n".join(install_errors)
|
||
log_error(f"所有 GRUB 安装尝试均失败:\n{error_summary}")
|
||
return False, f"GRUB 安装失败:\n{error_summary}"
|
||
|
||
log_success("✓ GRUB 安装阶段完成")
|
||
|
||
# 检查生成的 EFI 文件
|
||
if is_uefi or install_hybrid:
|
||
efi_paths_to_check = [
|
||
os.path.join(mount_point, "boot/efi/EFI", efi_bootloader_id),
|
||
os.path.join(mount_point, "boot/efi/EFI", efi_bootloader_id.lower()),
|
||
os.path.join(mount_point, "boot/efi/EFI/Boot"),
|
||
os.path.join(mount_point, "boot/efi/efi", efi_bootloader_id.lower()),
|
||
]
|
||
|
||
found_efi_file = False
|
||
for path in efi_paths_to_check:
|
||
if os.path.exists(path):
|
||
log_info(f" 检查目录: {path}")
|
||
try:
|
||
files = os.listdir(path)
|
||
for f in files:
|
||
if f.endswith('.efi'):
|
||
full = os.path.join(path, f)
|
||
size = os.path.getsize(full)
|
||
log_info(f" ✓ EFI文件: {f} ({size} bytes)")
|
||
found_efi_file = True
|
||
except Exception as e:
|
||
log_warning(f" 无法列出: {e}")
|
||
|
||
if not found_efi_file:
|
||
log_warning(f" 未找到任何 .efi 文件,安装可能有问题")
|
||
|
||
# ===== 更新 GRUB 配置 =====
|
||
log_step("更新 GRUB 配置文件")
|
||
|
||
# 确定正确的命令和路径
|
||
grub_update_cmd = []
|
||
config_path = ""
|
||
|
||
# 根据发行版确定命令
|
||
grub_install_bin = "grub-install"
|
||
grub_mkconfig_bin = "grub-mkconfig"
|
||
|
||
if distro_type == "centos":
|
||
grub_install_bin = "grub2-install"
|
||
grub_mkconfig_bin = "grub2-mkconfig"
|
||
elif distro_type == "fedora":
|
||
grub_mkconfig_bin = "grub2-mkconfig"
|
||
elif distro_type == "opensuse":
|
||
grub_mkconfig_bin = "grub2-mkconfig"
|
||
|
||
# 检测配置文件路径
|
||
possible_config_paths = [
|
||
"/boot/grub/grub.cfg",
|
||
"/boot/grub2/grub.cfg",
|
||
]
|
||
|
||
for cfg_path in possible_config_paths:
|
||
full_path = os.path.join(mount_point, cfg_path.lstrip('/'))
|
||
if os.path.exists(os.path.dirname(full_path)):
|
||
config_path = cfg_path
|
||
break
|
||
|
||
if not config_path:
|
||
config_path = "/boot/grub/grub.cfg"
|
||
|
||
# 确定更新命令
|
||
if distro_type in ["debian", "ubuntu"]:
|
||
# Debian/Ubuntu 使用 update-grub 包装脚本
|
||
success, _, _ = run_command(chroot_cmd_prefix + ["which", "update-grub"], "检查 update-grub", timeout=5)
|
||
if success:
|
||
grub_update_cmd = ["update-grub"]
|
||
else:
|
||
grub_update_cmd = [grub_mkconfig_bin, "-o", config_path]
|
||
else:
|
||
grub_update_cmd = [grub_mkconfig_bin, "-o", config_path]
|
||
|
||
log_info(f"使用命令: {' '.join(grub_update_cmd)}")
|
||
log_info(f"配置文件路径: {config_path}")
|
||
|
||
# 执行配置更新
|
||
success, stdout, stderr = run_command(
|
||
chroot_cmd_prefix + grub_update_cmd,
|
||
"更新GRUB配置文件",
|
||
timeout=120
|
||
)
|
||
|
||
if not success:
|
||
log_error(f"GRUB 配置文件更新失败: {stderr}")
|
||
return False, f"GRUB配置文件更新失败: {stderr}"
|
||
|
||
log_success("✓ GRUB 配置文件更新成功")
|
||
|
||
# 检查生成的配置文件
|
||
full_config_path = os.path.join(mount_point, config_path.lstrip('/'))
|
||
if os.path.exists(full_config_path):
|
||
size = os.path.getsize(full_config_path)
|
||
log_info(f" 配置文件大小: {size} bytes")
|
||
try:
|
||
with open(full_config_path, 'r') as f:
|
||
content = f.read()
|
||
menu_entries = content.count('menuentry')
|
||
log_info(f" 发现 {menu_entries} 个启动菜单项")
|
||
except Exception as e:
|
||
log_warning(f" 无法读取配置文件: {e}")
|
||
else:
|
||
log_warning(f" 配置文件未找到: {full_config_path}")
|
||
|
||
# ===== 输出启动提示 =====
|
||
if is_uefi or install_hybrid:
|
||
log_step("UEFI 修复完成提示")
|
||
log_info("GRUB EFI 文件已安装到 EFI 分区")
|
||
|
||
# 检测安装模式并给出相应提示
|
||
efi_boot_path = os.path.join(mount_point, f"boot/efi/EFI/Boot/{efi_boot_file if efi_params else 'bootx64.efi'}")
|
||
efi_grub_path = os.path.join(mount_point, f"boot/efi/EFI/{efi_bootloader_id}/{efi_grub_file if efi_params else 'grubx64.efi'}")
|
||
|
||
if os.path.exists(efi_boot_path):
|
||
log_info("")
|
||
log_info("【重要】使用可移动模式安装 (EFI Fallback)")
|
||
log_info("启动方法:")
|
||
log_info(" 1. 在BIOS中选择 'UEFI Hard Drive' 或 'UEFI OS' 启动")
|
||
log_info(" 2. 如果有多块硬盘,选择正确的硬盘")
|
||
log_info(" 3. 确保启动模式为 UEFI(不是 Legacy/CSM)")
|
||
elif os.path.exists(efi_grub_path):
|
||
log_info("")
|
||
log_info(f"【重要】使用标准模式安装 (/{efi_bootloader_id}/{efi_grub_file if efi_params else 'grubx64.efi'})")
|
||
log_info("启动方法:")
|
||
log_info(f" 1. 在BIOS启动菜单中选择 '{efi_bootloader_id}' 或 'Linux' 启动项")
|
||
log_info(" 2. 如果没有该选项,需要手动添加启动项")
|
||
log_info(f" 路径: \\EFI\\{efi_bootloader_id}\\{efi_grub_file if efi_params else 'grubx64.efi'}")
|
||
|
||
log_info("")
|
||
log_info("故障排除:")
|
||
log_info(" • 如果仍无法启动,请检查:")
|
||
log_info(" - BIOS 是否设置为 UEFI 启动模式(非 Legacy/CSM)")
|
||
log_info(" - 安全启动 (Secure Boot) 是否已禁用")
|
||
log_info(" - EFI 分区格式是否为 FAT32")
|
||
|
||
return True, ""
|
||
|
||
|
||
def unmount_target_system(mount_point: str) -> Tuple[bool, str]:
|
||
"""
|
||
卸载所有挂载的分区。
|
||
确保按正确顺序卸载,避免资源忙错误。
|
||
"""
|
||
log_step("卸载目标系统", f"挂载点: {mount_point}")
|
||
|
||
success = True
|
||
error_msg = ""
|
||
|
||
# 按逆序卸载绑定挂载
|
||
bind_paths = ["/run", "/sys", "/proc", "/dev/pts", "/dev"]
|
||
for path in bind_paths:
|
||
target_path = os.path.join(mount_point, path.lstrip('/'))
|
||
if os.path.ismount(target_path):
|
||
# 尝试多次卸载(有时需要重试)
|
||
for attempt in range(3):
|
||
s, _, stderr = run_command(["sudo", "umount", target_path], f"卸载绑定 {target_path}")
|
||
if s:
|
||
break
|
||
time.sleep(0.5)
|
||
if not s:
|
||
success = False
|
||
error_msg += f"卸载绑定 {target_path} 失败: {stderr}\n"
|
||
else:
|
||
log_debug(f"未挂载,跳过: {target_path}")
|
||
|
||
# 卸载 EFI 分区
|
||
efi_mount_point = os.path.join(mount_point, "boot/efi")
|
||
if os.path.ismount(efi_mount_point):
|
||
s, _, stderr = run_command(["sudo", "umount", efi_mount_point], f"卸载 EFI 分区")
|
||
if not s:
|
||
success = False
|
||
error_msg += f"卸载 EFI 分区失败: {stderr}\n"
|
||
else:
|
||
log_debug(f"EFI 分区未挂载,跳过")
|
||
|
||
# 卸载 /boot 分区
|
||
boot_mount_point = os.path.join(mount_point, "boot")
|
||
if os.path.ismount(boot_mount_point):
|
||
s, _, stderr = run_command(["sudo", "umount", boot_mount_point], f"卸载 /boot 分区")
|
||
if not s:
|
||
success = False
|
||
error_msg += f"卸载 /boot 分区失败: {stderr}\n"
|
||
else:
|
||
log_debug(f"/boot 分区未挂载,跳过")
|
||
|
||
# 卸载根分区
|
||
if os.path.ismount(mount_point):
|
||
s, _, stderr = run_command(["sudo", "umount", mount_point], f"卸载根分区")
|
||
if not s:
|
||
success = False
|
||
error_msg += f"卸载根分区失败: {stderr}\n"
|
||
else:
|
||
log_debug(f"根分区未挂载,跳过")
|
||
|
||
# 清理临时目录
|
||
if os.path.exists(mount_point) and not os.path.ismount(mount_point):
|
||
try:
|
||
shutil.rmtree(mount_point)
|
||
log_success(f"✓ 清理临时目录: {mount_point}")
|
||
except OSError as e:
|
||
log_warning(f"无法删除临时目录 {mount_point}: {e}")
|
||
error_msg += f"无法删除临时目录: {e}\n"
|
||
|
||
if success:
|
||
log_success("✓ 所有分区已卸载")
|
||
else:
|
||
log_warning("部分分区卸载失败")
|
||
|
||
return success, error_msg
|
||
|
||
|
||
# ===== 高级功能函数 =====
|
||
|
||
def detect_existing_grub_entries(mount_point: str) -> List[Dict]:
|
||
"""
|
||
检测 EFI 分区中已有的 GRUB 启动项。
|
||
返回发现的 EFI 启动项列表。
|
||
"""
|
||
entries = []
|
||
efi_path = os.path.join(mount_point, "boot/efi/EFI")
|
||
|
||
if not os.path.exists(efi_path):
|
||
return entries
|
||
|
||
try:
|
||
for item in os.listdir(efi_path):
|
||
item_path = os.path.join(efi_path, item)
|
||
if os.path.isdir(item_path):
|
||
efi_files = [f for f in os.listdir(item_path) if f.endswith('.efi')]
|
||
if efi_files:
|
||
entries.append({
|
||
"name": item,
|
||
"path": item_path,
|
||
"files": efi_files
|
||
})
|
||
except Exception as e:
|
||
log_debug(f"检测现有 GRUB 条目失败: {e}")
|
||
|
||
return entries
|
||
|
||
|
||
def install_refind(mount_point: str) -> Tuple[bool, str]:
|
||
"""
|
||
安装 rEFInd 作为备用引导管理器。
|
||
需要目标系统中已安装 rEFInd 包。
|
||
"""
|
||
log_step("安装 rEFInd")
|
||
|
||
chroot_cmd_prefix = ["sudo", "chroot", mount_point]
|
||
|
||
# 检查 refind-install 是否存在
|
||
success, _, _ = run_command(
|
||
chroot_cmd_prefix + ["which", "refind-install"],
|
||
"检查 refind-install",
|
||
timeout=10
|
||
)
|
||
|
||
if not success:
|
||
return False, "未找到 refind-install,请确保已安装 rEFInd 包"
|
||
|
||
# 运行 refind-install
|
||
success, stdout, stderr = run_command(
|
||
chroot_cmd_prefix + ["refind-install"],
|
||
"安装 rEFInd",
|
||
timeout=60
|
||
)
|
||
|
||
if success:
|
||
log_success("✓ rEFInd 安装成功")
|
||
return True, ""
|
||
else:
|
||
log_error(f"rEFInd 安装失败: {stderr}")
|
||
return False, f"rEFInd 安装失败: {stderr}"
|
||
|
||
|
||
def update_firmware_boot_order(mount_point: str, efi_bootloader_id: str = "GRUB") -> bool:
|
||
"""
|
||
使用 efibootmgr 更新固件启动顺序。
|
||
注意:在 Live 环境中可能无法工作。
|
||
"""
|
||
if is_live_environment():
|
||
log_warning("Live 环境无法更新固件启动顺序")
|
||
return False
|
||
|
||
log_step("更新固件启动顺序")
|
||
|
||
# 查找 EFI 分区
|
||
efi_partition = None
|
||
for line in open("/proc/mounts"):
|
||
if mount_point in line and "/boot/efi" in line:
|
||
parts = line.split()
|
||
efi_partition = parts[0]
|
||
break
|
||
|
||
if not efi_partition:
|
||
log_warning("无法找到 EFI 分区")
|
||
return False
|
||
|
||
# 使用 efibootmgr 创建启动项
|
||
success, _, stderr = run_command(
|
||
["efibootmgr", "-c", "-L", efi_bootloader_id,
|
||
"-l", f"\\EFI\\{efi_bootloader_id}\\grubx64.efi"],
|
||
"创建 EFI 启动项",
|
||
timeout=10
|
||
)
|
||
|
||
if success:
|
||
log_success("✓ 固件启动顺序更新成功")
|
||
return True
|
||
else:
|
||
log_warning(f"更新固件启动顺序失败: {stderr}")
|
||
return False
|
||
|
||
|
||
if __name__ == "__main__":
|
||
print("--- 运行后端测试 ---")
|
||
|
||
print("\n--- 系统信息检测 ---")
|
||
print(f"EFI 位数: {get_efi_word_size()}")
|
||
print(f"CPU 架构: {platform.machine()}")
|
||
print(f"Live 环境: {is_live_environment()}")
|
||
|
||
sb_supported, sb_enabled = check_secure_boot_status()
|
||
print(f"Secure Boot 支持: {sb_supported}, 启用: {sb_enabled}")
|
||
|
||
efi_params = get_grub_efi_parameters()
|
||
if efi_params:
|
||
print(f"GRUB EFI 参数: {efi_params}")
|
||
|
||
print("\n--- 扫描分区测试 ---")
|
||
success, disks, partitions, efi_partitions, err = scan_partitions()
|
||
if success:
|
||
print("磁盘:", [d['name'] for d in disks])
|
||
print("分区:", [p['name'] for p in partitions])
|
||
print("EFI分区:", [p['name'] for p in efi_partitions])
|
||
else:
|
||
print("扫描分区失败:", err)
|