323 lines
13 KiB
Python
323 lines
13 KiB
Python
# backend.py
|
||
|
||
import subprocess
|
||
import os
|
||
import json
|
||
import time # 用于模拟耗时操作,实际可移除
|
||
|
||
def run_command(command, description="执行命令", cwd=None, shell=False):
|
||
"""
|
||
运行一个系统命令,并捕获其输出和错误。
|
||
:param command: 要执行的命令列表 (例如 ["sudo", "lsblk"])
|
||
:param description: 命令的描述,用于日志
|
||
:param cwd: 更改工作目录
|
||
:param shell: 是否使用shell执行命令 (通常不推荐,除非命令需要shell特性)
|
||
:return: (True/False, stdout, stderr) 表示成功、标准输出、标准错误
|
||
"""
|
||
try:
|
||
print(f"[Backend] {description}: {' '.join(command)}")
|
||
result = subprocess.run(
|
||
command,
|
||
capture_output=True,
|
||
text=True,
|
||
check=True, # 如果命令返回非零退出码,将抛出CalledProcessError
|
||
cwd=cwd,
|
||
shell=shell
|
||
)
|
||
print(f"[Backend] 命令成功: {description}")
|
||
return True, result.stdout, result.stderr
|
||
except subprocess.CalledProcessError as e:
|
||
print(f"[Backend] 命令失败: {description}")
|
||
print(f"[Backend] 错误输出: {e.stderr}")
|
||
return False, e.stdout, e.stderr
|
||
except FileNotFoundError:
|
||
print(f"[Backend] 命令未找到: {' '.join(command)}")
|
||
return False, "", f"命令未找到: {command[0]}"
|
||
except Exception as e:
|
||
print(f"[Backend] 发生未知错误: {e}")
|
||
return False, "", str(e)
|
||
|
||
def scan_partitions():
|
||
"""
|
||
扫描系统中的所有磁盘和分区,并返回结构化的信息。
|
||
:return: (success, disks, partitions, efi_partitions, error_message)
|
||
disks: list of {"name": "/dev/sda"}
|
||
partitions: list of {"name": "/dev/sda1", "fstype": "ext4", "size": "100G", "mountpoint": "/"}
|
||
efi_partitions: list of {"name": "/dev/sda1", "fstype": "vfat", "size": "512M"}
|
||
"""
|
||
success, stdout, stderr = run_command(
|
||
["sudo", "lsblk", "-J", "-o", "NAME,FSTYPE,SIZE,MOUNTPOINT,UUID,PARTLABEL,LABEL,TYPE"],
|
||
"扫描分区"
|
||
)
|
||
|
||
if not success:
|
||
return False, [], [], [], stderr
|
||
|
||
try:
|
||
data = json.loads(stdout)
|
||
all_disks = []
|
||
all_partitions = []
|
||
all_efi_partitions = []
|
||
|
||
for block_device in data.get("blockdevices", []):
|
||
dev_name = f"/dev/{block_device['name']}"
|
||
dev_type = block_device.get("type")
|
||
|
||
if dev_type == "disk":
|
||
all_disks.append({"name": dev_name})
|
||
elif dev_type == "part":
|
||
# 过滤掉Live系统自身的分区(基于常见的Live环境挂载点)
|
||
mountpoint = block_device.get("mountpoint")
|
||
if mountpoint and (mountpoint == "/" or mountpoint.startswith("/run/media") or mountpoint.startswith("/cdrom")):
|
||
continue
|
||
|
||
part_info = {
|
||
"name": dev_name,
|
||
"fstype": block_device.get("fstype", "unknown"),
|
||
"size": block_device.get("size", "unknown"),
|
||
"mountpoint": mountpoint,
|
||
"uuid": block_device.get("uuid"),
|
||
"partlabel": block_device.get("partlabel"),
|
||
"label": block_device.get("label")
|
||
}
|
||
all_partitions.append(part_info)
|
||
|
||
# 识别EFI系统分区 (FAT32文件系统, 且通常有特定标签或名称)
|
||
if part_info["fstype"] == "vfat" and (part_info["partlabel"] == "EFI System Partition" or part_info["label"] == "EFI"):
|
||
all_efi_partitions.append(part_info)
|
||
|
||
return True, all_disks, all_partitions, all_efi_partitions, ""
|
||
|
||
except json.JSONDecodeError as e:
|
||
return False, [], [], [], f"解析lsblk输出失败: {e}"
|
||
except Exception as e:
|
||
return False, [], [], [], f"处理分区数据时发生未知错误: {e}"
|
||
|
||
def mount_target_system(root_partition, boot_partition=None, efi_partition=None):
|
||
"""
|
||
挂载目标系统的根分区、/boot分区和EFI分区到临时目录。
|
||
:param root_partition: 目标系统的根分区设备路径 (例如 /dev/sda1)
|
||
:param boot_partition: 目标系统的独立 /boot 分区设备路径 (可选)
|
||
:param efi_partition: 目标系统的EFI系统分区设备路径 (可选,仅UEFI)
|
||
:return: (True/False, mount_point, error_message)
|
||
"""
|
||
mount_point = "/mnt_grub_repair_" + str(int(time.time())) # 确保唯一性
|
||
if not os.path.exists(mount_point):
|
||
os.makedirs(mount_point)
|
||
|
||
# 1. 挂载根分区
|
||
success, _, stderr = run_command(["sudo", "mount", root_partition, mount_point], f"挂载根分区 {root_partition}")
|
||
if not success:
|
||
return False, "", f"挂载根分区失败: {stderr}"
|
||
|
||
# 2. 如果有独立 /boot 分区
|
||
if boot_partition:
|
||
boot_mount_point = os.path.join(mount_point, "boot")
|
||
if not os.path.exists(boot_mount_point):
|
||
os.makedirs(boot_mount_point)
|
||
success, _, stderr = run_command(["sudo", "mount", boot_partition, boot_mount_point], f"挂载 /boot 分区 {boot_partition}")
|
||
if not success:
|
||
unmount_target_system(mount_point) # 挂载失败,尝试清理
|
||
return False, "", f"挂载 /boot 分区失败: {stderr}"
|
||
|
||
# 3. 如果有EFI分区 (UEFI系统)
|
||
if efi_partition:
|
||
efi_mount_point = os.path.join(mount_point, "boot/efi")
|
||
if not os.path.exists(efi_mount_point):
|
||
os.makedirs(efi_mount_point)
|
||
success, _, stderr = run_command(["sudo", "mount", efi_partition, efi_mount_point], f"挂载 EFI 分区 {efi_partition}")
|
||
if not success:
|
||
unmount_target_system(mount_point) # 挂载失败,尝试清理
|
||
return False, "", f"挂载 EFI 分区失败: {stderr}"
|
||
|
||
# 4. 绑定必要的伪文件系统
|
||
bind_paths = ["/dev", "/dev/pts", "/proc", "/sys", "/run"]
|
||
for path in bind_paths:
|
||
target_path = os.path.join(mount_point, path.lstrip('/'))
|
||
if not os.path.exists(target_path):
|
||
os.makedirs(target_path)
|
||
success, _, stderr = run_command(["sudo", "mount", "--bind", path, target_path], f"绑定 {path} 到 {target_path}")
|
||
if not success:
|
||
unmount_target_system(mount_point) # 绑定失败,尝试清理
|
||
return False, "", f"绑定 {path} 失败: {stderr}"
|
||
|
||
return True, mount_point, ""
|
||
|
||
def detect_distro_type(mount_point):
|
||
"""
|
||
尝试检测目标系统发行版类型。
|
||
:param mount_point: 目标系统根分区的挂载点
|
||
:return: "arch", "centos", "debian", "ubuntu", "unknown"
|
||
"""
|
||
os_release_path = os.path.join(mount_point, "etc/os-release")
|
||
if not os.path.exists(os_release_path):
|
||
return "unknown"
|
||
|
||
try:
|
||
with open(os_release_path, "r") as f:
|
||
content = f.read()
|
||
if "ID=arch" in content:
|
||
return "arch"
|
||
elif "ID=centos" in content or "ID=rhel" in content:
|
||
return "centos"
|
||
elif "ID=debian" in content:
|
||
return "debian"
|
||
elif "ID=ubuntu" in content:
|
||
return "ubuntu"
|
||
else:
|
||
return "unknown"
|
||
except Exception as e:
|
||
print(f"[Backend] 无法读取os-release文件: {e}")
|
||
return "unknown"
|
||
|
||
def chroot_and_repair_grub(mount_point, target_disk, is_uefi=False, distro_type="unknown"):
|
||
"""
|
||
Chroot到目标系统并执行GRUB修复命令。
|
||
:param mount_point: 目标系统根分区的挂载点
|
||
:param target_disk: GRUB要安装到的物理磁盘 (例如 /dev/sda)
|
||
:param is_uefi: 目标系统是否使用UEFI启动
|
||
:param distro_type: 目标系统发行版类型 (用于选择正确的GRUB命令)
|
||
:return: (True/False, error_message)
|
||
"""
|
||
chroot_cmd_prefix = ["sudo", "chroot", mount_point]
|
||
|
||
# 1. 安装GRUB到目标磁盘
|
||
if is_uefi:
|
||
# UEFI系统,需要确保EFI分区已挂载到 /boot/efi
|
||
# grub-install --target=x86_64-efi --efi-directory=/boot/efi --bootloader-id=GRUB
|
||
# 注意:某些系统可能需要指定 --removable 选项,以便在某些固件上更容易启动
|
||
# 统一使用 --bootloader-id=GRUB,如果已存在会更新
|
||
success, _, stderr = run_command(
|
||
chroot_cmd_prefix + ["grub-install", "--target=x86_64-efi", "--efi-directory=/boot/efi", "--bootloader-id=GRUB"],
|
||
"安装UEFI GRUB"
|
||
)
|
||
else:
|
||
# BIOS系统,安装到MBR
|
||
success, _, stderr = run_command(
|
||
chroot_cmd_prefix + ["grub-install", target_disk],
|
||
"安装BIOS GRUB"
|
||
)
|
||
if not success:
|
||
return False, f"GRUB安装失败: {stderr}"
|
||
|
||
# 2. 更新GRUB配置文件
|
||
grub_update_cmd = []
|
||
if distro_type in ["debian", "ubuntu"]:
|
||
grub_update_cmd = ["update-grub"] # 这是一个脚本,实际是调用grub-mkconfig
|
||
elif distro_type == "arch":
|
||
grub_update_cmd = ["grub-mkconfig", "-o", "/boot/grub/grub.cfg"]
|
||
elif distro_type == "centos":
|
||
grub_update_cmd = ["grub2-mkconfig", "-o", "/boot/grub2/grub.cfg"] # CentOS 8/9
|
||
else:
|
||
# 尝试通用命令
|
||
grub_update_cmd = ["grub-mkconfig", "-o", "/boot/grub/grub.cfg"]
|
||
|
||
success, _, stderr = run_command(
|
||
chroot_cmd_prefix + grub_update_cmd,
|
||
"更新GRUB配置文件"
|
||
)
|
||
if not success:
|
||
return False, f"GRUB配置文件更新失败: {stderr}"
|
||
|
||
return True, ""
|
||
|
||
def unmount_target_system(mount_point):
|
||
"""
|
||
卸载所有挂载的分区。
|
||
:param mount_point: 目标系统根分区的挂载点
|
||
:return: (True/False, error_message)
|
||
"""
|
||
print(f"[Backend] 正在卸载分区 from {mount_point}...")
|
||
success = True
|
||
error_msg = ""
|
||
|
||
# 逆序卸载绑定
|
||
bind_paths = ["/run", "/sys", "/proc", "/dev/pts", "/dev"]
|
||
for path in reversed(bind_paths): # 逆序
|
||
target_path = os.path.join(mount_point, path.lstrip('/'))
|
||
if os.path.ismount(target_path): # 检查是否真的挂载了
|
||
s, _, stderr = run_command(["sudo", "umount", target_path], f"卸载绑定 {target_path}")
|
||
if not s:
|
||
success = False
|
||
error_msg += f"卸载绑定 {target_path} 失败: {stderr}\n"
|
||
|
||
# 卸载 /boot/efi (如果存在)
|
||
efi_mount_point = os.path.join(mount_point, "boot/efi")
|
||
if os.path.ismount(efi_mount_point):
|
||
s, _, stderr = run_command(["sudo", "umount", efi_mount_point], f"卸载 {efi_mount_point}")
|
||
if not s:
|
||
success = False
|
||
error_msg += f"卸载 {efi_mount_point} 失败: {stderr}\n"
|
||
|
||
# 卸载 /boot (如果存在)
|
||
boot_mount_point = os.path.join(mount_point, "boot")
|
||
if os.path.ismount(boot_mount_point):
|
||
s, _, stderr = run_command(["sudo", "umount", boot_mount_point], f"卸载 {boot_mount_point}")
|
||
if not s:
|
||
success = False
|
||
error_msg += f"卸载 {boot_mount_point} 失败: {stderr}\n"
|
||
|
||
# 卸载根分区
|
||
if os.path.ismount(mount_point):
|
||
s, _, stderr = run_command(["sudo", "umount", mount_point], f"卸载根分区 {mount_point}")
|
||
if not s:
|
||
success = False
|
||
error_msg += f"卸载根分区 {mount_point} 失败: {stderr}\n"
|
||
|
||
# 清理临时挂载点目录
|
||
if os.path.exists(mount_point) and not os.path.ismount(mount_point):
|
||
try:
|
||
os.rmdir(mount_point)
|
||
print(f"[Backend] 清理临时目录 {mount_point}")
|
||
except OSError as e:
|
||
print(f"[Backend] 无法删除临时目录 {mount_point}: {e}")
|
||
error_msg += f"无法删除临时目录 {mount_point}: {e}\n"
|
||
|
||
|
||
return success, error_msg
|
||
|
||
# 示例:如果直接运行backend.py,可以进行一些测试
|
||
if __name__ == "__main__":
|
||
print("--- 运行后端测试 ---")
|
||
|
||
print("\n--- 扫描分区测试 ---")
|
||
success, disks, partitions, efi_partitions, err = scan_partitions()
|
||
if success:
|
||
print("磁盘:", [d['name'] for d in disks])
|
||
print("分区:", [p['name'] for p in partitions])
|
||
print("EFI分区:", [p['name'] for p in efi_partitions])
|
||
else:
|
||
print("扫描分区失败:", err)
|
||
|
||
# 假设有一些分区可供测试,实际运行时需要用户输入
|
||
# 例如:
|
||
# test_root_partition = "/dev/sdaX"
|
||
# test_target_disk = "/dev/sda"
|
||
# test_efi_partition = "/dev/sdY"
|
||
#
|
||
# if test_root_partition and test_target_disk:
|
||
# print(f"\n--- 挂载系统测试 (模拟 {test_root_partition}) ---")
|
||
# mount_ok, mnt_pt, mount_err = mount_target_system(test_root_partition, efi_partition=test_efi_partition)
|
||
# if mount_ok:
|
||
# print(f"系统挂载成功到: {mnt_pt}")
|
||
# distro = detect_distro_type(mnt_pt)
|
||
# print(f"检测到发行版: {distro}")
|
||
#
|
||
# print("\n--- 修复GRUB测试 ---")
|
||
# repair_ok, repair_err = chroot_and_repair_grub(mnt_pt, test_target_disk, is_uefi=True, distro_type=distro)
|
||
# if repair_ok:
|
||
# print("GRUB修复成功!")
|
||
# else:
|
||
# print("GRUB修复失败:", repair_err)
|
||
#
|
||
# print("\n--- 卸载系统测试 ---")
|
||
# unmount_ok, unmount_err = unmount_target_system(mnt_pt)
|
||
# if unmount_ok:
|
||
# print("系统卸载成功!")
|
||
# else:
|
||
# print("系统卸载失败:", unmount_err)
|
||
# else:
|
||
# print("系统挂载失败:", mount_err)
|
||
# else:
|
||
# print("\n请在代码中设置 test_root_partition 和 test_target_disk 进行完整测试。")
|