"""
Author: Peanut
Created: 2026-04-26
Purpose: 通用服务管理脚本 — 在任意项目目录下自动发现、启动、停止、重启各类服务
支持 Vite 前端、Python 后端 (FastAPI/Flask/Django)、Node.js 后端 (Express/NestJS)、
Java Spring Boot (Maven/Gradle)。
用法: python ~/.claude/dev-services.py [命令] [选项]
"""
import argparse
import json
import os
import re
import shutil
import subprocess
import sys
import time
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Optional
try:
import psutil
except ImportError:
print("错误: 需要 psutil 库,请运行: pip install psutil")
sys.exit(1)
try:
import yaml
HAS_YAML = True
except ImportError:
HAS_YAML = False
try:
import requests as _requests
HAS_REQUESTS = True
except ImportError:
HAS_REQUESTS = False
# ============================================================
# 常量
# ============================================================
SCRIPT_DIR = Path(__file__).parent.resolve()
CWD = Path.cwd()
LOG_DIR = CWD / "logs"
LOG_DIR.mkdir(exist_ok=True)
SCRIPT_LOG = LOG_DIR / "dev-services.log"
MAX_SCAN_DEPTH = 3
SKIP_DIRS = {"node_modules", "__pycache__", ".git", "venv", ".venv", "target", "build", "dist", ".omc", ".next", ".nuxt", "coverage", "e2e"}
# 框架默认端口
DEFAULT_PORTS = {
"vite": 5173,
"next": 3000,
"node-backend": 3000,
"python": 8000,
"spring-boot": 8080,
"spring-boot-gradle": 8080,
}
# ANSI 颜色
COLORS = {
"INFO": "\033[37m",
"SUCCESS": "\033[32m",
"WARN": "\033[33m",
"ERROR": "\033[31m",
"DEBUG": "\033[36m",
"RESET": "\033[0m",
"BOLD": "\033[1m",
"DIM": "\033[2m",
}
if sys.platform == "win32":
import ctypes
try:
ctypes.windll.kernel32.SetConsoleMode(ctypes.windll.kernel32.GetStdHandle(-11), 7)
except Exception:
pass
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(encoding='utf-8')
# ============================================================
# 日志
# ============================================================
def log(msg: str, level: str = "INFO"):
timestamp = datetime.now().strftime("%H:%M:%S")
color = COLORS.get(level, COLORS["INFO"])
reset = COLORS["RESET"]
print(f"{color}[{timestamp}] [{level}]{reset} {msg}")
try:
with open(SCRIPT_LOG, "a", encoding="utf-8") as f:
f.write(f"[{timestamp}] [{level}] {msg}\n")
except Exception:
pass
def log_separator():
print(f"\n{COLORS['DIM']}{'=' * 60}{COLORS['RESET']}\n")
def log_section(title: str):
print(f"\n{COLORS['BOLD']} {title}{COLORS['RESET']}")
print(f"{COLORS['DIM']}{'-' * 50}{COLORS['RESET']}")
# ============================================================
# 服务抽象
# ============================================================
@dataclass
class Service:
name: str
project_type: str
project_dir: Path
port: int
start_cmd: list
health_keyword: str
health_url: str
health_timeout: int = 30
keyword_timeout: int = 30
log_file: Optional[Path] = None
@property
def log_path(self) -> Path:
if self.log_file:
return self.log_file
return LOG_DIR / f"{self.name.lower().replace(' ', '_')}.log"
def type_label(self) -> str:
labels = {
"vite": "Vite 前端",
"next": "Next.js",
"node-backend": "Node.js 后端",
"python": "Python 后端",
"spring-boot": "Spring Boot (Maven)",
"spring-boot-gradle": "Spring Boot (Gradle)",
}
return labels.get(self.project_type, self.project_type)
# ============================================================
# 项目发现
# ============================================================
def discover_services(root_dir: Optional[Path] = None, max_depth: int = MAX_SCAN_DEPTH) -> list[Service]:
"""递归扫描目录,发现所有服务"""
base = root_dir or CWD
services = []
_scan_directory(base, base, max_depth, services)
return services
def _scan_directory(current: Path, root: Path, max_depth: int, services: list):
"""递归扫描,跳过黑名单目录"""
rel_depth = len(current.relative_to(root).parts)
if rel_depth > max_depth:
return
# 跳过黑名单
if current.name in SKIP_DIRS:
return
# 尝试发现服务
svc = _detect_service(current)
if svc:
services.append(svc)
# 如果找到服务,不再深入其子目录(避免重复发现)
return
# 递归子目录
try:
for child in sorted(current.iterdir()):
if child.is_dir():
_scan_directory(child, root, max_depth, services)
except PermissionError:
pass
def _detect_service(directory: Path) -> Optional[Service]:
"""检测目录是否是项目根目录,返回 Service 或 None"""
# 1. Java Spring Boot 优先于 Python 检测,避免辅助脚本导致误判
pom = directory / "pom.xml"
if pom.exists():
svc = _build_spring_boot_maven_service(directory, pom)
if svc:
return svc
gradle = directory / "build.gradle"
gradle_kts = directory / "build.gradle.kts"
if gradle.exists() or gradle_kts.exists():
svc = _build_spring_boot_gradle_service(directory, gradle if gradle.exists() else gradle_kts)
if svc:
return svc
# 2. Vite 前端
pkg = directory / "package.json"
if pkg.exists():
try:
pkg_data = json.loads(pkg.read_text(encoding="utf-8"))
deps = {**pkg_data.get("dependencies", {}), **pkg_data.get("devDependencies", {})}
scripts = pkg_data.get("scripts", {})
# Vite
if "vite" in deps:
return _build_vite_service(directory, pkg_data)
# Next.js
if "next" in deps:
return _build_next_service(directory, pkg_data)
# Node.js 后端 (Express / NestJS / Koa)
if any(k in deps for k in ["express", "@nestjs/core", "koa"]):
return _build_node_backend_service(directory, pkg_data)
except (json.JSONDecodeError, Exception):
pass
# 3. Python 后端
req_file = directory / "requirements.txt"
pyproject = directory / "pyproject.toml"
has_python_deps = req_file.exists() or pyproject.exists()
# 只把明确的 Python Web 项目入口当作 Python 服务,普通 src 目录不代表 Python
has_python_entry = (
(directory / "main.py").exists()
or (directory / "manage.py").exists()
or (directory / "app.py").exists()
or (directory / "app").is_dir()
)
if has_python_deps or has_python_entry:
svc = _build_python_service(directory)
if svc:
return svc
return None
# ============================================================
# 配置解析
# ============================================================
def _read_env_file(directory: Path) -> dict:
"""读取 .env 或 .env.development,返回键值对(合并所有找到的环境文件)"""
env_data = {}
for env_name in [".env.development", ".env.local", ".env"]:
env_path = directory / env_name
if env_path.exists():
try:
for line in env_path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if line and not line.startswith("#") and "=" in line:
key, _, value = line.partition("=")
env_data[key.strip()] = value.strip().strip('"').strip("'")
except Exception:
pass
return env_data
def _extract_port_from_env(directory: Path) -> Optional[int]:
"""从 .env 文件中提取端口号"""
env_data = _read_env_file(directory)
for key in ["VITE_PORT", "PORT", "SERVER_PORT", "APP_PORT", "BACKEND_PORT"]:
if key in env_data:
try:
return int(env_data[key])
except ValueError:
continue
return None
def _extract_port_from_vite_config(directory: Path) -> Optional[int]:
"""从 vite.config.ts/js 中提取 server.port"""
for conf_name in ["vite.config.ts", "vite.config.js"]:
conf_path = directory / conf_name
if conf_path.exists():
try:
content = conf_path.read_text(encoding="utf-8")
# 匹配 port: 5178 或 port:5173
match = re.search(r'port\s*:\s*(\d+)', content)
if match:
return int(match.group(1))
except Exception:
pass
return None
def _extract_port_from_scripts(pkg_data: dict) -> Optional[int]:
"""从 package.json scripts 中提取 --port 参数"""
scripts = pkg_data.get("scripts", {})
dev_script = scripts.get("dev", "") or scripts.get("start", "")
match = re.search(r'--port\s+(\d+)', dev_script)
if match:
return int(match.group(1))
return None
def _extract_port_from_pom(pom_path: Path) -> Optional[int]:
"""从 pom.xml 提取 server.port"""
try:
content = pom_path.read_text(encoding="utf-8")
# 查找 1234 或 1234
match = re.search(r'\s*(\d+)\s*', content)
if match:
return int(match.group(1))
# 查找 1234
match = re.search(r'.*?\s*(\d+)\s*.*?', content, re.DOTALL)
if match:
return int(match.group(1))
except Exception:
pass
return None
def _extract_port_from_spring_boot_yaml(directory: Path) -> Optional[int]:
"""从 Spring Boot application*.yml 提取 server.port
1. 优先检查目录根下的 application.yml
2. 检查直接子目录中的 src/main/resources/application*.yml(多模块 Maven 项目)
3. 最后递归搜索其他子目录(跳过构建产物)
"""
if not HAS_YAML:
return None
# 1. 检查目录根
for name in ["application.yml", "application.yaml"]:
port = _extract_port_from_single_yaml(directory / name)
if port is not None:
return port
# 2. 优先检查直接子目录中的 src/main/resources(多模块 Maven 项目常见布局)
try:
for subdir in sorted(directory.iterdir()):
if subdir.is_dir():
resources_dir = subdir / "src" / "main" / "resources"
if resources_dir.exists():
for name in [
"application.yml", "application.yaml",
"application-local.yml", "application-local.yaml",
"application-dev.yml", "application-dev.yaml",
"application-prod.yml", "application-prod.yaml",
"application-test.yml", "application-test.yaml",
]:
port = _extract_port_from_single_yaml(resources_dir / name)
if port is not None:
return port
except Exception:
pass
# 3. 递归搜索其他子目录(跳过构建产物)
try:
for child in directory.rglob("application*.yml"):
parts = set(child.parts)
if parts & {"target", "build", "dist", "node_modules"}:
continue
depth = len(child.relative_to(directory).parts)
if depth > 10:
continue
port = _extract_port_from_single_yaml(child)
if port is not None:
return port
for child in directory.rglob("application*.yaml"):
parts = set(child.parts)
if parts & {"target", "build", "dist", "node_modules"}:
continue
depth = len(child.relative_to(directory).parts)
if depth > 10:
continue
port = _extract_port_from_single_yaml(child)
if port is not None:
return port
except Exception:
pass
return None
def _extract_spring_boot_context_path(directory: Path) -> str:
"""从 Spring Boot application*.yml 提取 servlet context-path"""
value = _extract_spring_boot_yaml_value(directory, ("server", "servlet", "context-path"))
return _normalize_url_path(value) if value else ""
def _extract_spring_boot_management_base_path(directory: Path) -> str:
"""从 Spring Boot application*.yml 提取 actuator base-path"""
value = _extract_spring_boot_yaml_value(directory, ("management", "endpoints", "web", "base-path"))
return _normalize_url_path(value) if value else "/actuator"
def _build_spring_boot_health_url(port: int, directory: Path) -> str:
"""根据 Spring Boot 上下文路径构建健康检查 URL"""
context_path = _extract_spring_boot_context_path(directory)
actuator_base_path = _extract_spring_boot_management_base_path(directory)
health_path = _join_url_paths(context_path, actuator_base_path, "health")
return f"http://localhost:{port}{health_path}"
def _extract_spring_boot_yaml_value(directory: Path, keys: tuple[str, ...]) -> Optional[str]:
"""按优先级从 Spring Boot application*.yml/yaml 中读取嵌套配置值"""
if not HAS_YAML:
return None
for yml_path in _iter_spring_boot_yaml_files(directory):
value = _extract_value_from_single_yaml(yml_path, keys)
if value is not None:
return str(value)
return None
def _iter_spring_boot_yaml_files(directory: Path) -> list[Path]:
"""按稳定优先级列出 Spring Boot YAML 配置文件"""
candidates = []
for name in ["application.yml", "application.yaml"]:
candidates.append(directory / name)
try:
for subdir in sorted(directory.iterdir()):
if subdir.is_dir():
resources_dir = subdir / "src" / "main" / "resources"
if resources_dir.exists():
for name in [
"application.yml", "application.yaml",
"application-local.yml", "application-local.yaml",
"application-dev.yml", "application-dev.yaml",
"application-prod.yml", "application-prod.yaml",
"application-test.yml", "application-test.yaml",
]:
candidates.append(resources_dir / name)
except Exception:
pass
try:
for pattern in ["application*.yml", "application*.yaml"]:
for child in directory.rglob(pattern):
parts = set(child.parts)
if parts & {"target", "build", "dist", "node_modules"}:
continue
depth = len(child.relative_to(directory).parts)
if depth <= 10:
candidates.append(child)
except Exception:
pass
seen = set()
result = []
for path in candidates:
if path in seen or not path.exists():
continue
seen.add(path)
result.append(path)
return result
def _extract_value_from_single_yaml(yml_path: Path, keys: tuple[str, ...]) -> Optional[object]:
"""从单个 YAML 文件中读取嵌套配置值"""
if not HAS_YAML:
return None
try:
content = yml_path.read_text(encoding="utf-8")
content = re.sub(r'@\w[\w.\-]*@', '""', content)
for doc in yaml.safe_load_all(content):
current = doc
for key in keys:
if not isinstance(current, dict) or key not in current:
current = None
break
current = current[key]
if current is not None:
return current
except Exception:
pass
return None
def _normalize_url_path(value: str) -> str:
"""规范化 URL 路径片段"""
path = str(value).strip()
if not path or path == "/":
return ""
return "/" + path.strip("/")
def _join_url_paths(*parts: str) -> str:
"""拼接 URL 路径,避免重复斜杠"""
cleaned = [str(part).strip("/") for part in parts if str(part).strip("/")]
return "/" + "/".join(cleaned)
def _extract_port_from_single_yaml(yml_path: Path) -> Optional[int]:
"""从单个 YAML 文件提取 server.port
自动处理 Maven @placeholder@ 占位符(替换为空字符串)。
"""
if not HAS_YAML:
return None
try:
content = yml_path.read_text(encoding="utf-8")
# 移除 Maven 占位符: @xxx@ -> ""
content = re.sub(r'@\w[\w.\-]*@', '""', content)
for doc in yaml.safe_load_all(content):
if isinstance(doc, dict):
server = doc.get("server", {})
if isinstance(server, dict) and "port" in server:
return int(server["port"])
except Exception:
pass
return None
def _extract_port_from_gradle(gradle_path: Path) -> Optional[int]:
"""从 build.gradle 提取 server.port"""
try:
content = gradle_path.read_text(encoding="utf-8")
match = re.search(r'server\.port\s*=\s*["\']?(\d+)["\']?', content)
if match:
return int(match.group(1))
match = re.search(r'serverPort\s*=\s*["\']?(\d+)["\']?', content)
if match:
return int(match.group(1))
except Exception:
pass
return None
def _has_spring_boot(pom_path: Path) -> bool:
"""检查 pom.xml 是否包含 Spring Boot 依赖"""
try:
content = pom_path.read_text(encoding="utf-8")
return "spring-boot" in content.lower() or "springboot" in content.lower()
except Exception:
return False
def _has_spring_boot_gradle(gradle_path: Path) -> bool:
"""检查 build.gradle 是否包含 Spring Boot 插件"""
try:
content = gradle_path.read_text(encoding="utf-8")
return "spring-boot" in content.lower() or "org.springframework.boot" in content
except Exception:
return False
def _find_python_entry(directory: Path) -> Optional[Path]:
"""查找 Python 入口文件"""
candidates = ["main.py", "app.py", "app/main.py", "src/main.py", "manage.py", "wsgi.py", "asgi.py"]
for c in candidates:
p = directory / c
if p.exists():
return p
return None
# ============================================================
# 服务构建
# ============================================================
def _get_npm_cmd(directory: Path) -> list:
"""获取可用的包管理器命令,返回 [cmd, args...] 列表"""
# 优先通过 node 直接执行 vite.js(避免 .cmd 的 shell 问题)
vite_js = directory / "node_modules" / "vite" / "bin" / "vite.js"
if vite_js.exists():
node_exe = shutil.which("node") or "node"
return [node_exe, str(vite_js)]
# Windows: 尝试 .cmd
if sys.platform == "win32":
vite_cmd = directory / "node_modules" / ".bin" / "vite.cmd"
if vite_cmd.exists():
return [str(vite_cmd)]
else:
vite_bin = directory / "node_modules" / ".bin" / "vite"
if vite_bin.exists():
return [str(vite_bin)]
for pkg_cmd in ["pnpm", "npm", "yarn"]:
found = shutil.which(pkg_cmd)
if found:
return [found]
return ["npx"]
def _build_vite_service(directory: Path, pkg_data: dict) -> Service:
name = pkg_data.get("name", "前端")
if "/" in name:
name = name.split("/")[-1]
port = (_extract_port_from_env(directory)
or _extract_port_from_vite_config(directory)
or _extract_port_from_scripts(pkg_data)
or DEFAULT_PORTS["vite"])
npm_cmd = _get_npm_cmd(directory)
# 判断是否是直接指向 vite 可执行文件(node+vite.js 或 vite.bin)
cmd_str = " ".join(npm_cmd).lower()
is_direct_vite = "vite" in cmd_str and "npx" not in cmd_str
if is_direct_vite:
cmd = npm_cmd + ["--port", str(port)]
elif npm_cmd[0] in ("pnpm", "npm", "yarn"):
cmd = npm_cmd + ["run", "dev", "--", "--port", str(port)]
else:
cmd = ["npx", "vite", "--port", str(port)]
return Service(
name=f"{name.capitalize()} 前端",
project_type="vite",
project_dir=directory,
port=port,
start_cmd=cmd,
health_keyword="ready in",
health_url=f"http://localhost:{port}",
health_timeout=30,
keyword_timeout=30,
)
def _build_next_service(directory: Path, pkg_data: dict) -> Service:
name = pkg_data.get("name", "Next.js")
if "/" in name:
name = name.split("/")[-1]
port = _extract_port_from_env(directory) or _extract_port_from_scripts(pkg_data) or DEFAULT_PORTS["next"]
return Service(
name=f"{name.capitalize()} Next.js",
project_type="next",
project_dir=directory,
port=port,
start_cmd=["npx", "next", "dev", "--port", str(port)],
health_keyword="started server",
health_url=f"http://localhost:{port}",
health_timeout=30,
keyword_timeout=30,
)
def _build_node_backend_service(directory: Path, pkg_data: dict) -> Service:
name = pkg_data.get("name", "Node.js 后端")
if "/" in name:
name = name.split("/")[-1]
port = _extract_port_from_env(directory) or _extract_port_from_scripts(pkg_data) or DEFAULT_PORTS["node-backend"]
scripts = pkg_data.get("scripts", {})
# 尝试使用 start/dev script
if "start" in scripts:
cmd = [shutil.which("npm") or "npm", "run", "start"]
elif "dev" in scripts:
cmd = [shutil.which("npm") or "npm", "run", "dev"]
else:
# 查找入口文件
entry = directory / "src" / "index.ts"
if not entry.exists():
entry = directory / "src" / "index.js"
if not entry.exists():
entry = directory / "index.ts"
if not entry.exists():
entry = directory / "index.js"
if entry.exists():
if entry.suffix == ".ts":
cmd = ["npx", "ts-node", str(entry.relative_to(directory))]
else:
cmd = ["node", str(entry.relative_to(directory))]
else:
cmd = [shutil.which("npm") or "npm", "run", "start"]
return Service(
name=f"{name.capitalize()} 后端",
project_type="node-backend",
project_dir=directory,
port=port,
start_cmd=cmd,
health_keyword="listening",
health_url=f"http://localhost:{port}",
health_timeout=30,
keyword_timeout=30,
)
def _build_python_service(directory: Path) -> Optional[Service]:
"""检测并构建 Python 服务"""
req_file = directory / "requirements.txt"
has_fastapi = False
has_flask = False
has_django = False
if req_file.exists():
try:
content = req_file.read_text(encoding="utf-8").lower()
has_fastapi = "fastapi" in content
has_flask = "flask" in content
has_django = "django" in content
except Exception:
pass
# 检查 pyproject.toml
pyproject = directory / "pyproject.toml"
if pyproject.exists():
try:
content = pyproject.read_text(encoding="utf-8").lower()
has_fastapi = has_fastapi or "fastapi" in content
has_flask = has_flask or "flask" in content
has_django = has_django or "django" in content
except Exception:
pass
if not (has_fastapi or has_flask or has_django):
# 检查目录名或入口文件内容
entry = _find_python_entry(directory)
if entry:
try:
content = entry.read_text(encoding="utf-8").lower()
has_fastapi = "fastapi" in content or "uvicorn" in content
has_flask = "flask" in content
has_django = "django" in content
except Exception:
pass
# 如果没有明确框架,避免把辅助脚本误判成 FastAPI 服务
if not (has_fastapi or has_flask or has_django):
return None
port = _extract_port_from_env(directory) or DEFAULT_PORTS["python"]
dir_name = directory.name
if has_django:
manage = directory / "manage.py"
if manage.exists():
cmd = [sys.executable, "manage.py", "runserver", f"127.0.0.1:{port}"]
else:
cmd = [sys.executable, "-m", "django", "runserver", f"127.0.0.1:{port}"]
keyword = "Starting development server"
elif has_flask:
env_data = _read_env_file(directory)
app_module = env_data.get("FLASK_APP", "")
if app_module:
cmd = [sys.executable, "-m", "flask", "run", "--port", str(port), "--host", "127.0.0.1"]
else:
cmd = [sys.executable, "-m", "flask", "run", "--port", str(port), "--host", "127.0.0.1"]
keyword = "Running on"
else:
# FastAPI / uvicorn
# 查找 ASGI 入口
asgi_found = False
if (directory / "main.py").exists() or (directory / "app" / "main.py").exists():
asgi_found = True
if asgi_found and (directory / "app").is_dir() and (directory / "app" / "main.py").exists():
module = "app.main:app"
elif (directory / "main.py").exists():
module = "main:app"
elif (directory / "app.py").exists():
module = "app:app"
else:
entry = _find_python_entry(directory)
if entry:
module = entry.stem + ":app"
else:
module = "main:app"
python_exe = sys.executable
# 尝试使用项目 venv
for venv_path in [directory / "venv", directory / ".venv"]:
venv_py = venv_path / "Scripts" / "python.exe" if sys.platform == "win32" else venv_path / "bin" / "python"
if venv_py.exists():
python_exe = str(venv_py)
break
cmd = [python_exe, "-m", "uvicorn", module, "--reload", "--port", str(port), "--host", "127.0.0.1"]
keyword = "Application startup complete"
return Service(
name=f"{dir_name.capitalize()} 后端",
project_type="python",
project_dir=directory,
port=port,
start_cmd=cmd,
health_keyword=keyword,
health_url=f"http://127.0.0.1:{port}",
health_timeout=30,
keyword_timeout=60 if has_fastapi else 30,
)
def _build_spring_boot_maven_service(directory: Path, pom_path: Path) -> Optional[Service]:
if not _has_spring_boot(pom_path):
return None
port = (
_extract_port_from_pom(pom_path)
or _extract_port_from_spring_boot_yaml(directory)
or _extract_port_from_env(directory)
or DEFAULT_PORTS["spring-boot"]
)
dir_name = directory.name
# 尝试提取 artifactId 作为名称
try:
content = pom_path.read_text(encoding="utf-8")
match = re.search(r'\s*(.+?)\s*', content)
if match:
dir_name = match.group(1)
except Exception:
pass
mvn_cmd = shutil.which("mvnw")
if not mvn_cmd:
mvnw = directory / "mvnw"
if mvnw.exists():
mvn_cmd = str(mvnw)
else:
mvn_cmd = shutil.which("mvn") or "mvn"
return Service(
name=f"{dir_name} 服务",
project_type="spring-boot",
project_dir=directory,
port=port,
start_cmd=[mvn_cmd, "spring-boot:run", f"-Dspring-boot.run.arguments=--server.port={port}"],
health_keyword="Started ",
health_url=_build_spring_boot_health_url(port, directory),
health_timeout=60,
keyword_timeout=120,
)
def _build_spring_boot_gradle_service(directory: Path, gradle_path: Path) -> Optional[Service]:
if not _has_spring_boot_gradle(gradle_path):
return None
port = _extract_port_from_gradle(gradle_path) or _extract_port_from_env(directory) or DEFAULT_PORTS["spring-boot-gradle"]
dir_name = directory.name
gradlew = directory / "gradlew"
gradle_cmd = str(gradlew) if gradlew.exists() else (shutil.which("gradle") or "gradle")
return Service(
name=f"{dir_name} 服务",
project_type="spring-boot-gradle",
project_dir=directory,
port=port,
start_cmd=[gradle_cmd, "bootRun", "--args='--server.port=" + str(port) + "'"],
health_keyword="Started ",
health_url=f"http://localhost:{port}/actuator/health",
health_timeout=60,
keyword_timeout=120,
)
# ============================================================
# 进程管理
# ============================================================
def _get_pids_by_port_fallback(port: int) -> list:
"""macOS/Linux: 通过 lsof 获取端口占用 PID"""
pids = []
try:
result = subprocess.run(["lsof", "-ti", f":{port}"], capture_output=True, text=True, timeout=5)
if result.returncode == 0 and result.stdout.strip():
for line in result.stdout.strip().splitlines():
try:
pids.append(int(line.strip()))
except ValueError:
continue
except Exception:
pass
return pids
def check_port(port: int) -> tuple:
"""检查端口占用,返回 (是否占用, PID列表)"""
pids = []
try:
for conn in psutil.net_connections(kind='inet'):
if conn.laddr and conn.laddr.port == port and conn.status == psutil.CONN_LISTEN:
if conn.pid:
pids.append(conn.pid)
except psutil.AccessDenied:
pids = _get_pids_by_port_fallback(port)
except Exception:
pids = _get_pids_by_port_fallback(port)
return (bool(pids), pids) if pids else (False, [])
def _pid_alive(pid: int) -> bool:
try:
psutil.Process(pid)
return True
except (psutil.NoSuchProcess, psutil.AccessDenied):
return False
def get_active_pid_on_port(port: int) -> Optional[int]:
in_use, pids = check_port(port)
if not in_use:
return None
for pid in pids:
if _pid_alive(pid):
return pid
return pids[0] if pids else None
def find_process_by_port(port: int) -> list:
"""查找占用端口的存活进程"""
pids = set()
try:
for conn in psutil.net_connections(kind='inet'):
if conn.laddr and conn.laddr.port == port and conn.pid and conn.pid != os.getpid():
try:
psutil.Process(conn.pid)
pids.add(conn.pid)
except psutil.NoSuchProcess:
continue
except psutil.AccessDenied:
for pid in _get_pids_by_port_fallback(port):
if pid != os.getpid():
try:
psutil.Process(pid)
pids.add(pid)
except psutil.NoSuchProcess:
continue
except Exception:
for pid in _get_pids_by_port_fallback(port):
if pid != os.getpid():
try:
psutil.Process(pid)
pids.add(pid)
except psutil.NoSuchProcess:
continue
return sorted(pids)
def kill_process(pid: int, graceful: bool = True):
"""终止进程"""
try:
proc = psutil.Process(pid)
proc_name = proc.name()
except psutil.NoSuchProcess:
log(f"进程 {pid} 已不存在", "INFO")
return
if graceful:
proc.terminate()
try:
proc.wait(timeout=3)
log(f"{proc_name} (PID {pid}) 已停止", "SUCCESS")
except psutil.TimeoutExpired:
proc.kill()
proc.wait(timeout=2)
log(f"{proc_name} (PID {pid}) 已强制终止", "WARN")
else:
proc.kill()
try:
proc.wait(timeout=2)
except Exception:
pass
log(f"{proc_name} (PID {pid}) 已强制终止", "WARN")
def stop_service(name: str, port: int):
"""优雅停止指定服务"""
log(f"停止 {name} (端口 {port})...", "INFO")
pids = find_process_by_port(port)
if pids:
for pid in pids:
kill_process(pid, graceful=True)
else:
log(f"{name} 未运行", "INFO")
def kill_service(name: str, port: int):
"""强制终止指定服务"""
log(f"强制终止 {name} (端口 {port})...", "WARN")
pids = find_process_by_port(port)
if pids:
for pid in pids:
kill_process(pid, graceful=False)
else:
log(f"{name} 未运行", "INFO")
# ============================================================
# 健康检查
# ============================================================
def wait_for_log_keyword(log_file: Path, keyword: str, timeout: int = 30) -> bool:
"""轮询日志文件等待关键字"""
log(f'等待日志关键字: "{keyword}"', "DEBUG")
start_time = time.time()
while time.time() - start_time < timeout:
if log_file.exists():
break
time.sleep(0.5)
time.sleep(2) # 等待进程开始写日志
while time.time() - start_time < timeout:
try:
with open(log_file, "r", encoding="utf-8", errors="ignore") as f:
content = f.read()
if keyword in content:
return True
except Exception:
pass
time.sleep(1)
return False
def http_get(url: str, timeout: int = 10) -> bool:
"""HTTP GET 请求"""
if HAS_REQUESTS:
try:
resp = _requests.get(url, timeout=timeout)
return resp.status_code == 200
except Exception:
return False
else:
try:
from urllib.request import urlopen
resp = urlopen(url, timeout=timeout)
return resp.status == 200
except Exception:
return False
def http_health_check(url: str, timeout: int = 10) -> bool:
"""HTTP 健康检查"""
log(f"HTTP 健康检查: {url}", "DEBUG")
start_time = time.time()
while time.time() - start_time < timeout:
if http_get(url):
return True
time.sleep(0.5)
return False
def check_service_health(svc: Service) -> bool:
"""两阶段健康检查"""
log(f"健康检查开始: {svc.name}", "INFO")
log_file = svc.log_path
if wait_for_log_keyword(log_file, svc.health_keyword, svc.keyword_timeout):
log(f"[1/2] 日志就绪: {svc.name}", "SUCCESS")
else:
log(f"[1/2] 日志关键字超时: {svc.name}", "ERROR")
_print_log_tail(svc.name, log_file)
return False
if http_health_check(svc.health_url, svc.health_timeout):
log(f"[2/2] HTTP 就绪: {svc.name} ({svc.health_url})", "SUCCESS")
return True
else:
log(f"[2/2] HTTP 健康检查失败: {svc.name}", "ERROR")
_print_log_tail(svc.name, log_file)
return False
def _print_log_tail(name: str, log_file: Path):
"""输出日志最后 20 行"""
if log_file.exists():
try:
lines = log_file.read_text(encoding="utf-8", errors="ignore").splitlines()[-20:]
if lines:
log(f"{name} 日志最后 20 行:", "WARN")
for line in lines:
print(f" {line}")
except Exception:
pass
# ============================================================
# 启动/停止
# ============================================================
def start_service(svc: Service) -> bool:
"""启动单个服务"""
log_section(f"启动 {svc.name} ({svc.type_label()})")
# 确保目录存在
if not svc.project_dir.exists():
log(f"项目目录不存在: {svc.project_dir}", "ERROR")
return False
# 检查端口占用
in_use, pids = check_port(svc.port)
if in_use:
alive_pids = [pid for pid in pids if _pid_alive(pid)]
if alive_pids:
log(f"端口 {svc.port} 已被占用 (PID: {', '.join(map(str, alive_pids))}),先停止...", "WARN")
kill_service(svc.name, svc.port)
time.sleep(3)
else:
log(f"端口 {svc.port} 有残留连接 (PID 已不存在),直接启动...", "WARN")
# 启动进程
cmd = svc.start_cmd
log(f"启动命令: {' '.join(cmd)}", "DEBUG")
try:
with open(svc.log_path, "w", encoding="utf-8") as f:
popen_kwargs = _background_process_options()
proc = subprocess.Popen(
cmd,
cwd=str(svc.project_dir),
stdout=f,
stderr=subprocess.STDOUT,
stdin=subprocess.DEVNULL,
**popen_kwargs,
)
log(f"{svc.name} 进程已启动 (PID: {proc.pid})", "INFO")
except FileNotFoundError as e:
log(f"{svc.name} 启动失败: 命令不存在 — {e}", "ERROR")
_suggest_runtime_install(svc)
return False
except Exception as e:
log(f"{svc.name} 启动失败: {e}", "ERROR")
return False
# 健康检查
if check_service_health(svc):
log(f"{svc.name} 已就绪 {svc.health_url}", "SUCCESS")
return True
else:
log(f"{svc.name} 启动失败", "ERROR")
return False
def _background_process_options() -> dict:
"""获取跨平台后台启动参数"""
if sys.platform == "win32":
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = 0
creationflags = subprocess.CREATE_NEW_PROCESS_GROUP | subprocess.CREATE_NO_WINDOW
if hasattr(subprocess, "CREATE_BREAKAWAY_FROM_JOB"):
creationflags |= subprocess.CREATE_BREAKAWAY_FROM_JOB
return {
"creationflags": creationflags,
"startupinfo": startupinfo,
}
return {
"start_new_session": True,
}
def _suggest_runtime_install(svc: Service):
"""给出运行时环境安装建议"""
suggestions = {
"vite": "请确保已安装 Node.js,并在项目目录下运行: npm install 或 pnpm install",
"next": "请确保已安装 Node.js,并在项目目录下运行: npm install",
"node-backend": "请确保已安装 Node.js,并在项目目录下运行: npm install",
"python": "请确保已安装 Python,并在项目目录下运行: pip install -r requirements.txt",
"spring-boot": "请确保已安装 Java JDK 和 Maven",
"spring-boot-gradle": "请确保已安装 Java JDK 和 Gradle",
}
tip = suggestions.get(svc.project_type, "")
if tip:
log(tip, "WARN")
def stop_service_wrapper(svc: Service):
"""停止单个服务"""
log_section(f"停止 {svc.name}")
stop_service(svc.name, svc.port)
def restart_service(svc: Service) -> bool:
"""重启单个服务"""
stop_service_wrapper(svc)
time.sleep(2)
return start_service(svc)
# ============================================================
# 状态查询
# ============================================================
def status_services(services: list[Service]):
"""查看所有服务状态"""
log_section("服务状态")
if not services:
log("未发现任何服务", "WARN")
log("请确保当前目录下存在可识别的项目(包含 package.json / requirements.txt / pom.xml / build.gradle)", "INFO")
log_separator()
return
for svc in services:
pid = get_active_pid_on_port(svc.port)
if pid:
try:
proc = psutil.Process(pid)
cpu = proc.cpu_percent(interval=0.1)
mem = proc.memory_info().rss / 1024 / 1024
status_icon = "运行中"
color = COLORS["SUCCESS"]
except (psutil.NoSuchProcess, psutil.AccessDenied):
cpu = mem = 0
status_icon = "异常"
color = COLORS["ERROR"]
else:
cpu = mem = 0
status_icon = "未运行"
color = COLORS["INFO"]
reset = COLORS["RESET"]
cpu_str = f"{cpu:.1f}%" if cpu else "-"
mem_str = f"{mem:.1f}MB" if mem else "-"
dir_rel = svc.project_dir.relative_to(CWD) if svc.project_dir.is_relative_to(CWD) else svc.project_dir
print(f" {color}[{svc.name}]{reset} 类型: {svc.type_label()} 端口: {svc.port} 状态: {status_icon} 目录: {dir_rel}")
if pid:
print(f" PID: {pid} CPU: {cpu_str} 内存: {mem_str}")
log_separator()
print(f" {COLORS['BOLD']}访问地址:{COLORS['RESET']}")
for svc in services:
print(f" {svc.name}: {COLORS['SUCCESS']}{svc.health_url}{COLORS['RESET']}")
log_separator()
print(f" {COLORS['BOLD']}日志目录:{COLORS['RESET']}")
print(f" {LOG_DIR}")
log_separator()
# ============================================================
# 日志查看
# ============================================================
def tail_logs(target: str, services: list[Service]):
"""查看日志"""
if target == "all":
_tail_multi([svc.log_path for svc in services])
return
# 按名称或类型匹配
matched = [s for s in services if target.lower() in s.name.lower() or target.lower() in s.project_type.lower()]
if len(matched) == 1:
log_file = matched[0].log_path
elif len(matched) > 1:
_tail_multi([s.log_path for s in matched])
return
else:
# 尝试直接匹配文件
log_file = LOG_DIR / f"{target}.log"
if not log_file.exists():
log(f"未找到日志: {target}", "ERROR")
return
if not log_file.exists():
log(f"日志文件不存在: {log_file}", "ERROR")
return
log(f"实时查看日志 (Ctrl+C 退出)...", "INFO")
if sys.platform == "win32":
try:
subprocess.run(["powershell", "-Command", f"Get-Content -Wait -Tail 50 '{log_file}'"], check=True)
except KeyboardInterrupt:
pass
except Exception:
_tail_python(log_file)
else:
try:
subprocess.run(["tail", "-f", str(log_file)], check=True)
except KeyboardInterrupt:
pass
def _tail_multi(log_files: list):
"""多日志同时查看"""
positions = {lf: 0 for lf in log_files}
log("实时查看所有日志 (Ctrl+C 退出)...", "INFO")
try:
while True:
new_output = False
for lf in log_files:
try:
with open(lf, "r", encoding="utf-8", errors="ignore") as f:
f.seek(positions[lf])
lines = f.readlines()
positions[lf] = f.tell()
for line in lines:
prefix = f"{lf.stem[:3]} "
print(f"{prefix}{line.rstrip()}")
new_output = True
except Exception:
pass
if not new_output:
time.sleep(1)
except KeyboardInterrupt:
pass
def _tail_python(log_file: Path):
"""Python tail -f"""
position = 0
try:
with open(log_file, "r", encoding="utf-8", errors="ignore") as f:
f.seek(0, 2)
position = f.tell()
f.seek(0)
lines = f.readlines()
for line in lines[-50:]:
print(line.rstrip())
while True:
with open(log_file, "r", encoding="utf-8", errors="ignore") as f:
f.seek(position)
lines = f.readlines()
position = f.tell()
for line in lines:
print(line.rstrip())
if not lines:
time.sleep(1)
except KeyboardInterrupt:
pass
# ============================================================
# 服务过滤
# ============================================================
def filter_services(services: list[Service], svc_type: Optional[str] = None, port: Optional[int] = None) -> list[Service]:
"""按类型或端口过滤服务"""
result = services
if svc_type:
result = [s for s in result if s.project_type == svc_type or svc_type.lower() in s.project_type.lower()]
if port:
result = [s for s in result if s.port == port]
return result
# ============================================================
# CLI 入口
# ============================================================
def main():
parser = argparse.ArgumentParser(
description="通用服务管理脚本 — 自动发现并管理项目中的各类服务",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
可用命令:
start 发现并启动所有服务(默认)
stop 停止所有服务
restart 重启所有服务
status 查看发现的服务及运行状态
discover 仅列出发现的服务(不启动)
kill 强制终止服务
logs 实时查看日志
选项:
--type 按类型过滤: vite | python | spring-boot | node-backend | next
--port 按端口号过滤
--foreground, -f 前台模式运行(Ctrl+C 停止)
使用示例:
dev-services 发现并启动所有服务
dev-services discover 仅列出发现的服务
dev-services discover server 只列出 server 目录下的服务
dev-services start frontend 只启动前端服务
dev-services restart server 只重启后端服务
dev-services status 查看所有服务状态
dev-services status web 只查看大屏服务状态
dev-services stop 停止所有服务
dev-services kill all 强制终止所有服务
dev-services logs all 实时查看所有日志
支持的服务类型:
Vite 前端 检测到 package.json + vite 依赖
Next.js 检测到 package.json + next 依赖
Node.js 后端 检测到 Express / NestJS / Koa
Python 后端 检测到 FastAPI / Flask / Django
Spring Boot 检测到 pom.xml + spring-boot 依赖
Spring Boot Gradle 检测到 build.gradle + spring-boot 插件
端口提取:
1. 优先从 .env / .env.development 提取
2. 从 vite.config.ts / pom.xml / build.gradle 提取
3. 从 application*.yml 提取 server.port(Spring Boot)
4. 使用框架默认值
安装方式:
脚本位于 ~/.claude/dev-services.py
通过 PATH 中的 dev-services 命令调用
(Windows: %USERPROFILE%\\bin\\dev-services.cmd)
""",
)
parser.add_argument(
"action",
nargs="?",
default="start",
choices=["start", "stop", "restart", "status", "kill", "logs", "discover"],
help="操作: start|stop|restart|status|kill|logs|discover (默认: start)",
)
parser.add_argument(
"target",
nargs="?",
default=None,
help="目标服务 (kill/logs) 或指定服务目录 (其他命令)",
)
parser.add_argument("--type", dest="svc_type", default=None, help="服务类型过滤: vite|python|spring-boot|node-backend")
parser.add_argument("--port", dest="port", type=int, default=None, help="端口号过滤")
parser.add_argument("--foreground", "-f", action="store_true", help="前台模式运行")
args = parser.parse_args()
# 解析 target:如果是存在的目录,作为服务发现的根目录;否则作为 kill/logs 的目标
root_dir = None
if args.target and args.action not in ("kill", "logs"):
target_path = Path(args.target)
if not target_path.is_absolute():
target_path = CWD / target_path
if target_path.is_dir():
root_dir = target_path
else:
log(f"目标路径不存在: {args.target}", "ERROR")
return
# 发现服务
services = discover_services(root_dir=root_dir)
services = filter_services(services, args.svc_type, args.port)
if args.action == "discover":
log_section(f"发现的服务 (目录: {root_dir or CWD})")
if not services:
log("未发现任何服务", "WARN")
log("支持的类型: Vite 前端, Next.js, Node.js 后端, Python (FastAPI/Flask/Django), Spring Boot (Maven/Gradle)", "INFO")
else:
for i, svc in enumerate(services, 1):
dir_rel = svc.project_dir.relative_to(CWD) if svc.project_dir.is_relative_to(CWD) else svc.project_dir
print(f" {COLORS['SUCCESS']}[{i}]{COLORS['RESET']} {COLORS['BOLD']}{svc.name}{COLORS['RESET']}")
print(f" 类型: {svc.type_label()} 端口: {svc.port}")
print(f" 目录: {dir_rel}")
print(f" 命令: {' '.join(svc.start_cmd)}")
print()
log_separator()
return
if args.action == "status":
status_services(services)
return
if args.action == "kill":
target = args.target or "all"
log_section("强制终止服务")
if not services:
log("未发现任何服务", "WARN")
else:
for svc in services:
if target in ("all", svc.name.lower(), svc.project_type.lower()):
kill_service(svc.name, svc.port)
log_separator()
return
if args.action == "logs":
target = args.target or "all"
if not services:
log("未发现任何服务,无法查看日志", "WARN")
return
tail_logs(target, services)
return
if args.action == "stop":
if not services:
log("未发现任何服务", "WARN")
return
for svc in services:
stop_service_wrapper(svc)
log_separator()
log("所有服务已停止", "SUCCESS")
return
if args.action in ("start", "restart"):
if not services:
log("未发现任何服务", "WARN")
log("支持的类型: Vite 前端, Next.js, Node.js 后端, Python (FastAPI/Flask/Django), Spring Boot (Maven/Gradle)", "INFO")
log(f"扫描目录: {CWD} (最多 {MAX_SCAN_DEPTH} 层)", "DEBUG")
return
if args.action == "restart":
for svc in services:
restart_service(svc)
else:
# start
if args.foreground:
_run_foreground(services)
else:
log_separator()
log(f"通用服务管理器 (目录: {CWD})", "INFO")
log_separator()
results = []
for svc in services:
ok = start_service(svc)
results.append((svc, ok))
log_separator()
if all(ok for _, ok in results):
log("所有服务已就绪", "SUCCESS")
for svc, ok in results:
print(f" {COLORS['SUCCESS']}✓ {svc.name}{COLORS['RESET']}: {svc.health_url}")
elif any(ok for _, ok in results):
log("部分服务启动失败,请检查日志", "WARN")
for svc, ok in results:
icon = f"{COLORS['SUCCESS']}✓{COLORS['RESET']}" if ok else f"{COLORS['ERROR']}✗{COLORS['RESET']}"
print(f" {icon} {svc.name}")
else:
log("所有服务启动失败", "ERROR")
return
log_separator()
return
def _run_foreground(services: list[Service]):
"""前台模式运行"""
procs = []
def cleanup():
log("正在停止服务...", "WARN")
for p in procs:
try:
p.terminate()
except Exception:
pass
for p in procs:
try:
p.wait(timeout=3)
except Exception:
p.kill()
log("所有服务已停止", "INFO")
import signal
def handle_signal(signum, frame):
cleanup()
sys.exit(0)
signal.signal(signal.SIGINT, handle_signal)
if sys.platform != "win32":
signal.signal(signal.SIGTERM, handle_signal)
for svc in services:
log_section(f"启动 {svc.name} (前台模式)")
cmd = svc.start_cmd
try:
creationflags = subprocess.CREATE_NEW_PROCESS_GROUP if sys.platform == "win32" else 0
proc = subprocess.Popen(
cmd,
cwd=str(svc.project_dir),
creationflags=creationflags,
)
procs.append(proc)
except Exception as e:
log(f"{svc.name} 启动失败: {e}", "ERROR")
log_separator()
log("按 Ctrl+C 停止所有服务", "INFO")
log_separator()
for p in procs:
p.wait()
if __name__ == "__main__":
main()