feat: AI 场景路由、ASR 服务及前后端全链路同步

- 新增 AI 场景路由控制器和管理接口
- 新增 ASR 语音识别服务及前后端集成
- 同步 AI Runtime 客户端到 Web/小程序/Life-Script
- 完善 AI 配置测试修复和管理后台路由配置
- 新增数据库迁移脚本

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-23 13:25:21 +08:00
parent d77090aa5e
commit 89fc42819d
72 changed files with 4584 additions and 383 deletions
+218 -137
View File
@@ -1,239 +1,320 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
life-script 部署脚本
功能:项目构建、文件传输、原子切换、历史版本管理、回滚支持
使用系统自带的ssh/scp命令,无需额外依赖
Life-Script deployment script.
Builds the local project, uploads dist files to the server, switches the live
symlink atomically, keeps recent releases, and supports rollback.
"""
import io
import os
import sys
import subprocess
import shlex
import shutil
import subprocess
import sys
import threading
from datetime import datetime
from pathlib import Path
# 服务器配置
if hasattr(sys.stdout, "buffer"):
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace", line_buffering=True)
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace", line_buffering=True)
SERVER_IP = "101.200.208.45"
USERNAME = "root"
# 部署配置
APP_NAME = "life-script"
DEPLOY_BASE = "/data/www/course-web-deploy"
RELEASES_DIR = f"{DEPLOY_BASE}/releases"
LINK_PATH = "/data/www/course-of-life"
MAX_RELEASES = 5
# 本地路径
SCRIPT_DIR = Path(__file__).parent.absolute()
DIST_DIR = SCRIPT_DIR / "dist"
SSH_BASE = [
"ssh",
"-T",
"-o",
"BatchMode=yes",
"-o",
"ConnectTimeout=10",
"-o",
"StrictHostKeyChecking=no",
f"{USERNAME}@{SERVER_IP}",
]
SCP_BASE = [
"scp",
"-o",
"BatchMode=yes",
"-o",
"ConnectTimeout=10",
"-o",
"StrictHostKeyChecking=no",
]
class Colors:
"""终端颜色"""
GREEN = '\033[32m'
RED = '\033[31m'
YELLOW = '\033[33m'
RESET = '\033[0m'
GREEN = "\033[32m"
RED = "\033[31m"
YELLOW = "\033[33m"
RESET = "\033[0m"
def log_info(msg):
"""打印信息日志"""
print(f"{Colors.GREEN}[INFO]{Colors.RESET} {msg}")
print(f"{Colors.GREEN}[INFO]{Colors.RESET} {msg}", flush=True)
def log_error(msg):
"""打印错误日志"""
print(f"{Colors.RED}[ERROR]{Colors.RESET} {msg}")
print(f"{Colors.RED}[ERROR]{Colors.RESET} {msg}", flush=True)
def log_warn(msg):
"""打印警告日志"""
print(f"{Colors.YELLOW}[WARN]{Colors.RESET} {msg}")
print(f"{Colors.YELLOW}[WARN]{Colors.RESET} {msg}", flush=True)
def run_command(cmd, cwd=None, shell=True, capture=True):
"""执行本地命令"""
def _reader_thread(stream, sink, prefix=""):
for line in iter(stream.readline, ""):
sink.append(line)
print(f"{prefix}{line}", end="", flush=True)
stream.close()
def run_stream(args, cwd=None, timeout=None):
"""Run a command and stream stdout/stderr in real time."""
stdout_lines = []
stderr_lines = []
try:
if capture:
result = subprocess.run(
cmd,
cwd=cwd,
shell=shell,
capture_output=True,
text=True
)
return result.returncode == 0, result.stdout, result.stderr
else:
result = subprocess.run(cmd, cwd=cwd, shell=shell)
return result.returncode == 0, "", ""
except Exception as e:
return False, "", str(e)
process = subprocess.Popen(
args,
cwd=cwd,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
encoding="utf-8",
errors="replace",
)
stdout_thread = threading.Thread(target=_reader_thread, args=(process.stdout, stdout_lines))
stderr_thread = threading.Thread(target=_reader_thread, args=(process.stderr, stderr_lines))
stdout_thread.start()
stderr_thread.start()
try:
returncode = process.wait(timeout=timeout)
except subprocess.TimeoutExpired:
process.kill()
returncode = process.wait()
log_error(f"命令执行超时 ({timeout}s): {' '.join(map(str, args))}")
return False, "".join(stdout_lines), "".join(stderr_lines) or "命令执行超时"
finally:
stdout_thread.join(timeout=2)
stderr_thread.join(timeout=2)
return returncode == 0, "".join(stdout_lines), "".join(stderr_lines)
except Exception as exc:
return False, "".join(stdout_lines), str(exc)
def exec_ssh_cmd(cmd, timeout=30):
"""通过SSH执行远程命令"""
ssh_cmd = f'ssh -o ConnectTimeout=10 -o BatchMode=yes {USERNAME}@{SERVER_IP} "{cmd}"'
def run_capture(args, cwd=None, timeout=None):
"""Run a command and capture output for decision making."""
try:
result = subprocess.run(
ssh_cmd,
shell=True,
args,
cwd=cwd,
shell=False,
capture_output=True,
text=True,
timeout=timeout
encoding="utf-8",
errors="replace",
timeout=timeout,
)
return result.returncode == 0, result.stdout.strip(), result.stderr.strip()
except subprocess.TimeoutExpired:
log_error(f"SSH命令超时: {cmd}")
return False, "", "命令执行超时"
except Exception as e:
return False, "", str(e)
return False, "", f"命令执行超时 ({timeout}s): {' '.join(map(str, args))}"
except Exception as exc:
return False, "", str(exc)
def run_command(cmd, cwd=None, capture=True, timeout=None):
"""Run a local shell command."""
args = cmd if isinstance(cmd, list) else ["cmd", "/c", cmd] if os.name == "nt" else ["bash", "-lc", cmd]
if capture:
return run_capture(args, cwd=cwd, timeout=timeout)
return run_stream(args, cwd=cwd, timeout=timeout)
def ssh_args(cmd):
return SSH_BASE + ["bash", "-lc", shlex.quote(cmd)]
def exec_ssh_cmd(cmd, timeout=120, capture=False):
"""Run a remote command through SSH. Streams output by default."""
args = ssh_args(cmd)
if capture:
ok, stdout, stderr = run_capture(args, cwd=SCRIPT_DIR, timeout=timeout)
else:
ok, stdout, stderr = run_stream(args, cwd=SCRIPT_DIR, timeout=timeout)
if not ok:
log_error(f"SSH命令失败: {cmd}")
if stderr:
log_error(stderr.strip())
return ok, stdout.strip(), stderr.strip()
def scp_upload(local_path, remote_path, recursive=False):
"""通过SCP上传文件或目录"""
r_flag = "-r" if recursive else ""
scp_cmd = f'scp -o ConnectTimeout=10 -o BatchMode=yes {r_flag} "{local_path}" {USERNAME}@{SERVER_IP}:{remote_path}'
try:
result = subprocess.run(
scp_cmd,
shell=True,
capture_output=True,
text=True,
timeout=120 # 上传文件给更长时间
)
if result.returncode != 0:
log_error(f"SCP上传失败: {result.stderr}")
return False
return True
except subprocess.TimeoutExpired:
log_error("SCP上传超时")
return False
except Exception as e:
log_error(f"SCP上传异常: {e}")
return False
"""Upload a file or directory through SCP with streamed logs."""
args = SCP_BASE.copy()
if recursive:
args.append("-r")
args.extend([str(local_path), f"{USERNAME}@{SERVER_IP}:{remote_path}"])
ok, _, stderr = run_stream(args, cwd=SCRIPT_DIR, timeout=300)
if not ok:
log_error(f"SCP上传失败: {stderr.strip()}")
return ok
def check_env():
"""检查本地环境"""
log_info("检查本地环境...")
# 检查npm
success, _, _ = run_command("npm --version")
success, _, err = run_command("npm --version", timeout=30)
if not success:
log_error("未找到npm命令,请先安装Node.js")
log_error("未找到 npm 命令,请先安装 Node.js")
if err:
log_error(err)
sys.exit(1)
log_info("环境检查通过")
def build_project():
"""构建项目"""
log_info("开始构建项目...")
# 切换到项目目录
os.chdir(SCRIPT_DIR)
# 清理旧构建
if DIST_DIR.exists():
shutil.rmtree(DIST_DIR)
log_info("已清理旧的dist目录")
# 执行构建(不捕获输出,直接显示)
log_info("已清理旧的 dist 目录")
log_info("执行: npm run build")
success, _, _ = run_command("npm run build", capture=False)
success, _, err = run_command("npm run build", cwd=SCRIPT_DIR, capture=False, timeout=600)
if not success:
log_error("项目构建失败")
if err:
log_error(err)
sys.exit(1)
log_info("项目构建成功")
# 检查dist目录
if not DIST_DIR.exists():
log_error("dist目录不存在")
log_error("dist 目录不存在")
sys.exit(1)
log_info("项目构建成功")
def deploy():
"""部署到服务器"""
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
release_path = f"{RELEASES_DIR}/{timestamp}"
log_info(f"准备部署版本: {timestamp}")
# 1. 创建远程目录
log_info("创建远程目录...")
success, _, err = exec_ssh_cmd(f"mkdir -p {release_path}")
success, _, err = exec_ssh_cmd(f"mkdir -p {shlex.quote(release_path)}", timeout=60)
if not success:
log_error(f"创建远程目录失败: {err}")
sys.exit(1)
# 2. 上传文件
log_info("上传文件到服务器...")
for item in DIST_DIR.iterdir():
log_info(f" 上传: {item.name}")
if item.is_file():
if not scp_upload(item, f"{release_path}/"):
log_error("文件上传失败")
sys.exit(1)
ok = scp_upload(item, f"{release_path}/")
else:
if not scp_upload(item, f"{release_path}/", recursive=True):
log_error("目录上传失败")
sys.exit(1)
ok = scp_upload(item, f"{release_path}/", recursive=True)
if not ok:
log_error("文件上传失败")
sys.exit(1)
log_info("文件上传成功")
# 3. 设置权限
log_info("设置文件权限...")
exec_ssh_cmd(f"chmod -R 755 {release_path}")
# 4. 原子切换软链接
success, _, err = exec_ssh_cmd(f"chmod -R 755 {shlex.quote(release_path)}", timeout=120)
if not success:
log_error(f"设置文件权限失败: {err}")
sys.exit(1)
log_info("切换服务版本...")
# 检查目标路径是否为普通目录
exec_ssh_cmd(f"if [ -d '{LINK_PATH}' ] && [ ! -L '{LINK_PATH}' ]; then mv '{LINK_PATH}' '{LINK_PATH}_backup_$(date +%s)'; fi")
# 创建软链接
success, _, err = exec_ssh_cmd(f"ln -snf '{release_path}' '{LINK_PATH}'")
switch_script = f"""
set -e
release_path={shlex.quote(release_path)}
link_path={shlex.quote(LINK_PATH)}
if [ -d "$link_path" ] && [ ! -L "$link_path" ]; then
backup_path="${{link_path}}_backup_$(date +%s)"
echo "当前路径是普通目录,备份到: $backup_path"
mv "$link_path" "$backup_path"
fi
ln -sfn "$release_path" "$link_path"
echo "当前版本指向: $(readlink "$link_path")"
"""
success, _, err = exec_ssh_cmd(switch_script, timeout=120)
if not success:
log_error(f"切换版本失败: {err}")
sys.exit(1)
log_info(f"部署完成当前版本指向: {release_path}")
# 5. 清理旧版本
log_info(f"部署完成当前版本指向: {release_path}")
clean_old_releases()
def clean_old_releases():
"""清理旧版本,只保留最近的N个"""
log_info(f"清理旧版本(保留最近{MAX_RELEASES}个)...")
clean_cmd = f"cd {RELEASES_DIR} && ls -t | tail -n +{MAX_RELEASES + 1} | xargs -I {{}} rm -rf {{}}"
exec_ssh_cmd(clean_cmd)
log_info(f"清理旧版本保留最近 {MAX_RELEASES} 个)...")
clean_script = f"""
set -e
mkdir -p {shlex.quote(RELEASES_DIR)}
cd {shlex.quote(RELEASES_DIR)}
old_releases=$(ls -1dt */ 2>/dev/null | tail -n +{MAX_RELEASES + 1} || true)
if [ -n "$old_releases" ]; then
echo "$old_releases" | xargs -r rm -rf
echo "$old_releases" | sed 's/^/已删除旧版本: /'
else
echo "没有需要清理的旧版本"
fi
"""
success, _, err = exec_ssh_cmd(clean_script, timeout=120)
if not success:
log_warn(f"清理旧版本失败: {err}")
def rollback():
"""回滚到上一个版本"""
log_info("开始回滚操作...")
# 获取当前指向的版本
success, current_link, _ = exec_ssh_cmd(f"readlink {LINK_PATH}")
log_info(f"当前版本: {current_link}")
# 获取上一个版本目录
success, prev_version, _ = exec_ssh_cmd(
f"ls -dt {RELEASES_DIR}/* | grep -v '{current_link}' | head -n 1"
)
if not prev_version:
log_error("没有找到可回滚的历史版本")
success, current_link, err = exec_ssh_cmd(f"readlink {shlex.quote(LINK_PATH)}", timeout=60, capture=True)
if not success:
log_error(f"读取当前版本失败: {err}")
sys.exit(1)
log_info(f"当前版本: {current_link}")
find_prev_script = f"""
set -e
current_link={shlex.quote(current_link)}
ls -dt {shlex.quote(RELEASES_DIR)}/* 2>/dev/null | grep -v "$current_link" | head -n 1
"""
success, prev_version, err = exec_ssh_cmd(find_prev_script, timeout=60, capture=True)
if not success or not prev_version:
log_error("没有找到可回滚的历史版本")
if err:
log_error(err)
sys.exit(1)
log_info(f"回滚目标版本: {prev_version}")
exec_ssh_cmd(f"ln -snf {prev_version} {LINK_PATH}")
log_info("回滚成功!")
success, _, err = exec_ssh_cmd(
f"ln -sfn {shlex.quote(prev_version)} {shlex.quote(LINK_PATH)} && readlink {shlex.quote(LINK_PATH)}",
timeout=60,
)
if not success:
log_error(f"回滚失败: {err}")
sys.exit(1)
log_info("回滚成功")
def print_usage():
"""打印使用说明"""
print("""
print(
"""
用法: python deploy.py [命令]
命令:
@@ -243,13 +324,13 @@ def print_usage():
示例:
python deploy.py # 部署
python deploy.py rollback # 回滚
""")
"""
)
def main():
"""主函数"""
command = sys.argv[1] if len(sys.argv) > 1 else "deploy"
if command == "rollback":
rollback()
elif command == "deploy":