""" 情绪博物馆 - 服务管理器核心模块 Emotion Museum - Service Manager Core Module 提供跨平台服务管理功能:启动、停止、重启、状态检查、日志查看、健康检查 """ import os import sys import time import signal import socket import shutil import subprocess import platform from pathlib import Path from typing import Dict, List, Optional, Any import yaml import psutil import requests from colorama import init, Fore, Style # 初始化 colorama(跨平台彩色输出) init(autoreset=True) class Colors: """终端颜色常量""" INFO = f"{Fore.GREEN}[INFO]{Style.RESET_ALL}" WARN = f"{Fore.YELLOW}[WARN]{Style.RESET_ALL}" ERROR = f"{Fore.RED}[ERROR]{Style.RESET_ALL}" SECTION = f"{Fore.BLUE}[SECTION]{Style.RESET_ALL}" DEBUG = f"{Fore.CYAN}[DEBUG]{Style.RESET_ALL}" class ServiceConfig: """单个服务的配置信息""" def __init__(self, name: str, config: Dict[str, Any]): self.key = name self.name = config.get("name", name) self.dir = config["dir"] self.type = config["type"] self.port = config["port"] self.health_check_path = config.get("health_check_path", "/") self.start_cmd = config["start_cmd"] self.build_cmd = config.get("build_cmd") self.build_check = config.get("build_check") self.install_cmd = config.get("install_cmd") self.install_check = config.get("install_check") self.lock_file = config.get("lock_file") self.pid_file = config.get("pid_file", ".pid") self.log_file = config.get("log_file") self.depends_on = config.get("depends_on", []) self.env = config.get("env", {}) @property def pid_path(self) -> Path: """PID 文件路径(在服务目录内)""" return PROJECT_ROOT / self.dir / self.pid_file @property def work_dir(self) -> Path: """服务工作目录""" return PROJECT_ROOT / self.dir @property def manage_log_path(self) -> Path: """管理服务日志路径""" log_dir = self.work_dir / "logs" return log_dir / f"manage-{self.key}.log" class ConfigLoader: """配置文件加载与校验""" REQUIRED_FIELDS = ["name", "dir", "type", "port", "start_cmd"] VALID_TYPES = ["java", "node"] @classmethod def load(cls, config_path: Path) -> Dict[str, ServiceConfig]: """加载并校验配置文件""" if not config_path.exists(): print(f"{Colors.ERROR} 配置文件不存在: {config_path}") sys.exit(1) with open(config_path, "r", encoding="utf-8") as f: data = yaml.safe_load(f) if not data or "services" not in data: print(f"{Colors.ERROR} 配置文件格式错误: 缺少 'services' 字段") sys.exit(1) services = {} for key, config in data["services"].items(): cls._validate(key, config) services[key] = ServiceConfig(key, config) return services @classmethod def _validate(cls, key: str, config: Dict[str, Any]): """校验单个服务配置""" # 检查必填字段 for field in cls.REQUIRED_FIELDS: if field not in config: print(f"{Colors.ERROR} 配置校验失败: '{key}' 缺少必填字段 '{field}'") sys.exit(1) # 校验 type 字段 if config["type"] not in cls.VALID_TYPES: print( f"{Colors.ERROR} 配置校验失败: '{key}' 的 type 必须是 {cls.VALID_TYPES} 之一" ) sys.exit(1) # 校验 port 字段 if not isinstance(config["port"], int) or not (1 <= config["port"] <= 65535): print(f"{Colors.ERROR} 配置校验失败: '{key}' 的 port 必须是 1-65535 的整数") sys.exit(1) class HealthChecker: """服务健康检查""" @staticmethod def check_port(port: int, timeout: float = 1.0) -> bool: """检查端口是否被监听(支持 IPv4 和 IPv6)""" # 先尝试 IPv4 try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(timeout) result = s.connect_ex(("127.0.0.1", port)) if result == 0: return True except Exception: pass # 再尝试 IPv6 try: with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as s: s.settimeout(timeout) result = s.connect_ex(("::1", port)) if result == 0: return True except Exception: pass return False @staticmethod def check_http( url: str, service_config: ServiceConfig, timeout: float = 2.0 ) -> bool: """HTTP 健康检查""" try: response = requests.get(url, timeout=timeout, allow_redirects=True) if service_config.type == "java": # Java 服务检查 /actuator/health 返回 status: UP data = response.json() return data.get("status") == "UP" else: # Node 前端服务检查 HTTP 状态码是否为 2xx return 200 <= response.status_code < 300 except Exception: return False @classmethod def wait_for_ready( cls, config: ServiceConfig, timeout: int = 60, interval: int = 2 ) -> bool: """等待服务就绪,返回是否成功""" print(f"{Colors.INFO} 等待服务就绪...", end="", flush=True) start_time = time.time() last_displayed = -1 # Track last displayed percentage to reduce flicker while time.time() - start_time < timeout: # 先检查端口 if cls.check_port(config.port): # 端口通了,再进行 HTTP 检查 url = f"http://127.0.0.1:{config.port}{config.health_check_path}" if cls.check_http(url, config): print(f"\r{Colors.INFO} 等待服务就绪... [OK]") return True # 显示进度(仅在百分比变化 >= 10% 时刷新) elapsed = time.time() - start_time progress = min(int((elapsed / timeout) * 100), 99) if progress - last_displayed >= 10: last_displayed = progress bar_length = 10 filled = int(bar_length * progress / 100) bar = "=" * filled + " " * (bar_length - filled) print( f"\r{Colors.INFO} 等待服务就绪... [{bar}] {progress}%", end="", flush=True, ) time.sleep(interval) print(f"\r{Colors.ERROR} 等待服务就绪... [TIMEOUT] 超时 ({timeout}秒)") return False class ExecutableFinder: """动态查找可执行文件路径""" # Windows 常见安装路径(按优先级) NODE_PATHS = [ r"C:\Program Files\nodejs", r"C:\Program Files (x86)\nodejs", os.path.expanduser(r"~\AppData\Roaming\npm"), ] MAVEN_PATHS = [ r"C:\Program Files\apache-maven\bin", r"C:\Program Files (x86)\apache-maven\bin", ] @classmethod def find_node(cls) -> Optional[str]: """查找 node 可执行文件路径""" path = cls._find("node.exe", cls.NODE_PATHS) if path: return path # Linux/macOS fallback return cls._find("node", []) @classmethod def find_npm(cls) -> Optional[str]: """查找 npm 可执行文件路径""" if platform.system() == "Windows": for name in ["npm.cmd", "npm.bat", "npm.exe"]: path = cls._find(name, cls.NODE_PATHS) if path: return path return cls._find("npm", cls.NODE_PATHS) @classmethod def find_maven(cls) -> Optional[str]: """查找 maven 可执行文件路径""" if platform.system() == "Windows": for name in ["mvn.cmd", "mvn.bat", "mvn.exe"]: path = cls._find(name, cls.MAVEN_PATHS) if path: return path return cls._find("mvn", cls.MAVEN_PATHS) @staticmethod def _find(name: str, search_paths: List[str]) -> Optional[str]: """按顺序查找可执行文件""" # 1. 尝试系统 PATH path = shutil.which(name) if path: return path # 2. 尝试常见安装路径 for base in search_paths: candidate = os.path.join(base, name) if os.path.isfile(candidate): return candidate return None class DependencyManager: """依赖检测与安装""" @staticmethod def needs_update(config: ServiceConfig) -> bool: """检查是否需要更新依赖""" if not config.lock_file: return False lock_path = config.work_dir / config.lock_file if config.type == "java": # Java: 比较 pom.xml 和构建产物 if not config.build_check: return False build_path = config.work_dir / config.build_check if not build_path.exists(): return True return lock_path.stat().st_mtime > build_path.stat().st_mtime elif config.type == "node": # Node: 比较 package-lock.json 和 node_modules install_path = config.work_dir / (config.install_check or "node_modules") if not install_path.exists(): return True return lock_path.stat().st_mtime > install_path.stat().st_mtime return False @classmethod def ensure_ready(cls, config: ServiceConfig) -> bool: """确保依赖已安装/构建""" # 检查是否需要安装/构建 needs_install = False needs_build = False if config.type == "node": install_path = config.work_dir / (config.install_check or "node_modules") if not install_path.exists(): needs_install = True elif cls.needs_update(config): needs_install = True elif config.type == "java": if config.build_check: build_path = config.work_dir / config.build_check if not build_path.exists(): needs_build = True elif cls.needs_update(config): needs_build = True # 执行安装 if needs_install and config.install_cmd: print(f"{Colors.INFO} 安装依赖: {config.install_cmd}") if not cls._run_command(config.install_cmd, config.work_dir): print(f"{Colors.ERROR} 依赖安装失败") return False print(f"{Colors.INFO} 依赖安装完成") # 执行构建 if needs_build and config.build_cmd: print(f"{Colors.INFO} 构建项目: {config.build_cmd}") if not cls._run_command(config.build_cmd, config.work_dir): print(f"{Colors.ERROR} 项目构建失败") return False print(f"{Colors.INFO} 项目构建完成") return True @staticmethod def _run_command(cmd: str, work_dir: Path) -> bool: """执行命令并返回是否成功""" try: result = subprocess.run( cmd, shell=True, cwd=work_dir, capture_output=True, text=True ) return result.returncode == 0 except Exception as e: print(f"{Colors.ERROR} 命令执行失败: {e}") return False class ProcessManager: """跨平台进程管理""" @staticmethod def is_running(config: ServiceConfig) -> bool: """检查服务是否正在运行""" pid = ProcessManager._read_pid(config) if pid is None: return False if not psutil.pid_exists(pid): return False return ProcessManager._is_service_process(pid, config) @staticmethod def get_pid(config: ServiceConfig) -> Optional[int]: """获取服务 PID""" pid = ProcessManager._read_pid(config) if pid and ProcessManager._is_service_process(pid, config): return pid return None @classmethod def start(cls, config: ServiceConfig) -> Optional[int]: """启动服务(后台运行)""" work_dir = config.work_dir log_path = config.manage_log_path # 确保日志目录存在 log_path.parent.mkdir(parents=True, exist_ok=True) # 准备环境变量 env = os.environ.copy() if config.env: env.update(config.env) # 动态查找可执行文件路径(Windows 兼容) if platform.system() == "Windows": node_exe = ExecutableFinder.find_node() npm_cmd = ExecutableFinder.find_npm() mvn_cmd = ExecutableFinder.find_maven() # 将找到的路径添加到 PATH extra_paths = [] if node_exe: extra_paths.append(os.path.dirname(node_exe)) if npm_cmd: npm_dir = os.path.dirname(npm_cmd) if npm_dir not in extra_paths: extra_paths.append(npm_dir) if mvn_cmd: mvn_dir = os.path.dirname(mvn_cmd) if mvn_dir not in extra_paths: extra_paths.append(mvn_dir) current_path = env.get("PATH", "") for p in extra_paths: if p not in current_path: current_path = p + os.pathsep + current_path env["PATH"] = current_path if npm_cmd: env["npm_execpath"] = npm_cmd # 构建启动命令 cmd = config.start_cmd if config.type == "node" and node_exe: vite_js = work_dir / "node_modules" / "vite" / "bin" / "vite.js" if vite_js.exists(): # 替换 npm run dev 为直接调用 node vite if cmd in ("npm run dev", "npm run dev:h5"): cmd = f'"{node_exe}" "{vite_js}"' elif config.type == "java" and mvn_cmd: if cmd.startswith("mvn "): cmd = cmd.replace("mvn ", f'"{mvn_cmd}" ', 1) else: cmd = config.start_cmd # 打开日志文件 log_file = open(log_path, "a", encoding="utf-8") # 根据平台创建进程组 if platform.system() == "Windows": process = subprocess.Popen( cmd, shell=True, cwd=work_dir, env=env, stdout=log_file, stderr=subprocess.STDOUT, creationflags=subprocess.CREATE_NEW_PROCESS_GROUP, ) else: process = subprocess.Popen( cmd, shell=True, cwd=work_dir, env=env, stdout=log_file, stderr=subprocess.STDOUT, start_new_session=True, ) # 保存 PID cls._write_pid(config, process.pid) log_file.close() return process.pid @classmethod def stop(cls, config: ServiceConfig, timeout: int = 10) -> bool: """优雅停止服务""" pid = cls._read_pid(config) if pid is None: print(f"{Colors.WARN} 未找到 {config.name} 的 PID 文件") return cls._force_cleanup(config) if not psutil.pid_exists(pid): print(f"{Colors.WARN} {config.name} 进程不存在 (PID: {pid})") cls._write_pid(config, None) return True print(f"{Colors.INFO} 停止 {config.name} (PID: {pid})...") try: process = psutil.Process(pid) # 发送终止信号 if platform.system() == "Windows": subprocess.run(["taskkill", "/PID", str(pid)], capture_output=True) else: process.terminate() # 等待进程退出 for _ in range(timeout * 2): if not psutil.pid_exists(pid): print(f"{Colors.INFO} {config.name} 已停止") cls._write_pid(config, None) return True time.sleep(0.5) # 超时,强制杀死 print(f"{Colors.WARN} {config.name} 未响应,强制终止...") if platform.system() == "Windows": subprocess.run( ["taskkill", "/F", "/PID", str(pid)], capture_output=True ) else: # Windows 上 signal 没有 SIGKILL,用 SIGTERM 替代 process.kill() time.sleep(1) cls._write_pid(config, None) return True except psutil.NoSuchProcess: cls._write_pid(config, None) return True except Exception as e: print(f"{Colors.ERROR} 停止服务失败: {e}") return False @staticmethod def _is_service_process(pid: int, config: ServiceConfig) -> bool: """验证 PID 是否属于该服务进程(检查 cwd 匹配)""" try: proc = psutil.Process(pid) return str(config.work_dir).lower() in (proc.cwd() or "").lower() except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): return False @staticmethod def _read_pid(config: ServiceConfig) -> Optional[int]: """读取 PID 文件""" pid_path = config.pid_path if not pid_path.exists(): return None try: return int(pid_path.read_text().strip()) except (ValueError, OSError): return None @staticmethod def _write_pid(config: ServiceConfig, pid: Optional[int]): """写入 PID 文件""" pid_path = config.pid_path if pid is None: if pid_path.exists(): pid_path.unlink() else: pid_path.write_text(str(pid)) @staticmethod def _force_cleanup(config: ServiceConfig) -> bool: """强制清理(通过端口查找进程)""" try: for conn in psutil.net_connections(kind="inet"): laddr = conn.laddr if isinstance(laddr, tuple) and len(laddr) >= 2: port = laddr[1] # type: ignore elif hasattr(laddr, "port"): port = laddr.port # type: ignore else: continue if port == config.port and conn.status == "LISTEN": pid = conn.pid if pid: print( f"{Colors.WARN} 发现占用端口 {config.port} 的进程 (PID: {pid}),强制终止..." ) if platform.system() == "Windows": subprocess.run( ["taskkill", "/F", "/PID", str(pid)], capture_output=True, ) else: os.kill(pid, 9) # SIGKILL = 9 return True except (psutil.AccessDenied, Exception): pass return True class DependencyGraph: """服务依赖图(拓扑排序)""" def __init__(self, services: Dict[str, ServiceConfig]): self.services = services self.graph = self._build_graph() def _build_graph(self) -> Dict[str, List[str]]: """构建依赖图""" graph = {} for key, config in self.services.items(): graph[key] = config.depends_on.copy() return graph def get_start_order(self) -> List[str]: """获取启动顺序(拓扑排序)""" return self._topological_sort() def get_stop_order(self) -> List[str]: """获取停止顺序(反向拓扑排序)""" return list(reversed(self.get_start_order())) def _topological_sort(self) -> List[str]: """拓扑排序(Kahn 算法) 依赖关系: web -> backend 表示 web 依赖于 backend 启动顺序: backend 先启动,web 后启动 """ # 计算每个节点的入度(有多少服务依赖它) in_degree = {key: 0 for key in self.graph} # 邻接表:key -> 依赖 key 的服务列表 dependents = {key: [] for key in self.graph} for key in self.graph: for dep in self.graph[key]: if dep in in_degree: # key 依赖 dep,所以 key 的入度 +1 in_degree[key] += 1 # dep 被 key 依赖 dependents[dep].append(key) # 初始入度为 0 的节点(不依赖任何服务) queue = [k for k, v in in_degree.items() if v == 0] result = [] while queue: node = queue.pop(0) result.append(node) # 处理所有依赖 node 的服务 for dependent in dependents[node]: in_degree[dependent] -= 1 if in_degree[dependent] == 0: queue.append(dependent) if len(result) != len(self.graph): print(f"{Colors.ERROR} 依赖图中存在循环依赖") sys.exit(1) return result class LogManager: """日志管理""" @staticmethod def show_logs(config: ServiceConfig, lines: int = 50, follow: bool = False): """查看服务日志""" log_paths = [] # 优先显示服务自己的日志 if config.log_file: log_path = config.work_dir / config.log_file if log_path.exists(): log_paths.append(log_path) # 同时显示管理服务日志 manage_log = config.manage_log_path if manage_log.exists(): log_paths.append(manage_log) if not log_paths: print(f"{Colors.WARN} 未找到 {config.name} 的日志文件") return for log_path in log_paths: print(f"\n{Colors.SECTION} 日志文件: {log_path}") if follow: LogManager._follow_log(log_path) else: LogManager._tail_log(log_path, lines) @staticmethod def _tail_log(log_path: Path, lines: int): """显示最后 N 行日志""" try: with open(log_path, "r", encoding="utf-8", errors="ignore") as f: all_lines = f.readlines() for line in all_lines[-lines:]: print(line, end="") except Exception as e: print(f"{Colors.ERROR} 读取日志失败: {e}") @staticmethod def _follow_log(log_path: Path): """实时跟踪日志(跨平台实现)""" print(f"{Colors.INFO} 实时跟踪日志 (Ctrl+C 退出)...") try: with open(log_path, "r", encoding="utf-8", errors="ignore") as f: # 移动到文件末尾 f.seek(0, 2) while True: line = f.readline() if line: print(line, end="", flush=True) else: time.sleep(0.5) except KeyboardInterrupt: print(f"\n{Colors.INFO} 退出日志跟踪") except Exception as e: print(f"{Colors.ERROR} 日志跟踪失败: {e}") class ServiceManager: """服务管理器(门面类)""" def __init__(self, config_path: Path): self.config_path = config_path self.services = ConfigLoader.load(config_path) self.dep_graph = DependencyGraph(self.services) def start(self, service_key: Optional[str] = None, skip_deps: bool = False): """启动服务""" if service_key and service_key != "all": self._start_single(service_key, skip_deps=skip_deps) else: self._start_all(skip_deps=skip_deps) def stop(self, service_key: Optional[str] = None): """停止服务""" if service_key and service_key != "all": self._stop_single(service_key) else: self._stop_all() def restart(self, service_key: Optional[str] = None): """重启服务""" if service_key and service_key != "all": self.stop(service_key) time.sleep(2) self.start(service_key) else: self._stop_all() time.sleep(2) self._start_all() def status(self): """显示所有服务状态""" print(f"\n{Fore.BLUE}{'=' * 60}") print(f" 情绪博物馆 - 服务状态") print(f"{'=' * 60}{Style.RESET_ALL}\n") for key in self.dep_graph.get_start_order(): config = self.services[key] pid = ProcessManager.get_pid(config) is_running = pid is not None status_icon = ( f"{Fore.GREEN}● 运行中{Style.RESET_ALL}" if is_running else f"{Fore.RED}○ 已停止{Style.RESET_ALL}" ) pid_info = f"PID: {pid}" if pid else "PID: -" port_info = f"端口: {config.port}" print(f" {status_icon} {config.name} ({key})") print(f" {pid_info} | {port_info}") print() def info(self): """显示服务访问地址""" print(f"\n{Fore.BLUE}{'=' * 60}") print(f" 情绪博物馆 - 服务地址") print(f"{'=' * 60}{Style.RESET_ALL}\n") for key, config in self.services.items(): pid = ProcessManager.get_pid(config) is_running = pid is not None status = ( f"{Fore.GREEN}运行中{Style.RESET_ALL}" if is_running else f"{Fore.RED}已停止{Style.RESET_ALL}" ) print(f" {config.name} ({key}) - {status}") print(f" 本地地址: http://localhost:{config.port}") # 显示额外地址 if key == "backend": print(f" API 地址: http://localhost:{config.port}/api") print(f" WebSocket: ws://localhost:{config.port}/ws") elif key == "web-admin": print(f" 管理后台: http://localhost:{config.port}/") print() def logs(self, service_key: str, lines: int = 50, follow: bool = False): """查看日志""" if service_key not in self.services: print(f"{Colors.ERROR} 未知服务: {service_key}") return config = self.services[service_key] LogManager.show_logs(config, lines=lines, follow=follow) def clean(self, service_key: str): """清理构建产物""" if service_key not in self.services: print(f"{Colors.ERROR} 未知服务: {service_key}") return config = self.services[service_key] print(f"{Colors.INFO} 清理 {config.name}...") if config.type == "java": # 优先使用 mvn clean,如果找不到 mvn 则直接删除 target mvn_cmd = ExecutableFinder.find_maven() if mvn_cmd: self._run_in_workdir(f'"{mvn_cmd}" clean', config.work_dir) else: target_dir = config.work_dir / "target" if target_dir.exists(): import shutil shutil.rmtree(target_dir) print(f"{Colors.INFO} 已删除 target") elif config.type == "node": import shutil for dir_name in ["node_modules", "dist", ".vite"]: target = config.work_dir / dir_name if target.exists(): shutil.rmtree(target) print(f"{Colors.INFO} 已删除 {dir_name}") def setup(self, service_key: str): """安装依赖""" if service_key not in self.services: print(f"{Colors.ERROR} 未知服务: {service_key}") return config = self.services[service_key] DependencyManager.ensure_ready(config) def _start_single(self, key: str, check_deps: bool = True, skip_deps: bool = False): """启动单个服务""" if key not in self.services: print(f"{Colors.ERROR} 未知服务: {key}") return config = self.services[key] # 检查是否已运行 if ProcessManager.is_running(config): pid = ProcessManager.get_pid(config) print(f"{Colors.WARN} {config.name} 已在运行 (PID: {pid})") return print(f"\n{Colors.INFO} 启动服务: {config.name} ({key})") # 检查依赖 if not skip_deps and check_deps: print(f"{Colors.INFO} 检查依赖...") if not DependencyManager.ensure_ready(config): print(f"{Colors.ERROR} 依赖检查失败,无法启动 {config.name}") return print(f"{Colors.INFO} 依赖检查通过") # 检查端口占用 if HealthChecker.check_port(config.port): print(f"{Colors.WARN} 端口 {config.port} 已被占用") return # 启动进程 print(f"{Colors.INFO} 启动进程: {config.start_cmd}") pid = ProcessManager.start(config) if pid is None: print(f"{Colors.ERROR} 启动失败") return # 等待服务就绪 if not HealthChecker.wait_for_ready(config): print( f"{Colors.WARN} 服务可能启动失败,请查看日志: {config.manage_log_path}" ) return print( f"{Colors.INFO} [OK] {config.name} 已启动 (PID: {pid}, 端口: {config.port})" ) def _start_all(self, skip_deps: bool = False): """启动所有服务(按依赖顺序)""" print(f"\n{Fore.BLUE}╔{'═' * 58}╗") print(f"║ 情绪博物馆 - 启动所有服务") print(f"╚{'═' * 58}╝{Style.RESET_ALL}\n") order = self.dep_graph.get_start_order() for key in order: config = self.services[key] # 启动前先等待依赖服务就绪 for dep_key in config.depends_on: dep_config = self.services[dep_key] if not ProcessManager.is_running(dep_config): print(f"{Colors.INFO} 等待依赖服务 {dep_config.name}...") if not HealthChecker.wait_for_ready(dep_config, timeout=30): print(f"{Colors.ERROR} 依赖服务 {dep_config.name} 未就绪,终止启动") return self._start_single(key, check_deps=True, skip_deps=skip_deps) # 服务之间等待 1 秒 if key != order[-1]: time.sleep(1) # 显示访问地址 self.info() def _stop_single(self, key: str): """停止单个服务""" if key not in self.services: print(f"{Colors.ERROR} 未知服务: {key}") return config = self.services[key] ProcessManager.stop(config) def _stop_all(self): """停止所有服务(逆依赖顺序)""" print(f"\n{Fore.BLUE}╔{'═' * 58}╗") print(f"║ 情绪博物馆 - 停止所有服务") print(f"╚{'═' * 58}╝{Style.RESET_ALL}\n") order = self.dep_graph.get_stop_order() for key in order: config = self.services[key] ProcessManager.stop(config) time.sleep(1) # 等待端口释放 print(f"\n{Colors.INFO} 所有服务已停止") @staticmethod def _run_in_workdir(cmd: str, work_dir: Path): """在工作目录执行命令""" subprocess.run(cmd, shell=True, cwd=work_dir) # 项目根目录(manage.py 所在目录) PROJECT_ROOT = Path(__file__).parent.parent