#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Life-Script deployment script. Builds the local project, uploads dist files to the server, switches the live symlink atomically, keeps recent releases, and supports rollback. """ import io import os import shlex import shutil import subprocess import sys import threading from datetime import datetime from pathlib import Path if hasattr(sys.stdout, "buffer"): sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace", line_buffering=True) sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace", line_buffering=True) SERVER_IP = "101.200.208.45" USERNAME = "root" APP_NAME = "life-script" DEPLOY_BASE = "/data/www/course-web-deploy" RELEASES_DIR = f"{DEPLOY_BASE}/releases" LINK_PATH = "/data/www/course-of-life" MAX_RELEASES = 5 SCRIPT_DIR = Path(__file__).parent.absolute() DIST_DIR = SCRIPT_DIR / "dist" SSH_BASE = [ "ssh", "-T", "-o", "BatchMode=yes", "-o", "ConnectTimeout=10", "-o", "StrictHostKeyChecking=no", f"{USERNAME}@{SERVER_IP}", ] SCP_BASE = [ "scp", "-o", "BatchMode=yes", "-o", "ConnectTimeout=10", "-o", "StrictHostKeyChecking=no", ] class Colors: GREEN = "\033[32m" RED = "\033[31m" YELLOW = "\033[33m" RESET = "\033[0m" def log_info(msg): print(f"{Colors.GREEN}[INFO]{Colors.RESET} {msg}", flush=True) def log_error(msg): print(f"{Colors.RED}[ERROR]{Colors.RESET} {msg}", flush=True) def log_warn(msg): print(f"{Colors.YELLOW}[WARN]{Colors.RESET} {msg}", flush=True) def _reader_thread(stream, sink, prefix=""): for line in iter(stream.readline, ""): sink.append(line) print(f"{prefix}{line}", end="", flush=True) stream.close() def run_stream(args, cwd=None, timeout=None): """Run a command and stream stdout/stderr in real time.""" stdout_lines = [] stderr_lines = [] try: process = subprocess.Popen( args, cwd=cwd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding="utf-8", errors="replace", ) stdout_thread = threading.Thread(target=_reader_thread, args=(process.stdout, stdout_lines)) stderr_thread = threading.Thread(target=_reader_thread, args=(process.stderr, stderr_lines)) stdout_thread.start() stderr_thread.start() try: returncode = process.wait(timeout=timeout) except subprocess.TimeoutExpired: process.kill() returncode = process.wait() log_error(f"命令执行超时 ({timeout}s): {' '.join(map(str, args))}") return False, "".join(stdout_lines), "".join(stderr_lines) or "命令执行超时" finally: stdout_thread.join(timeout=2) stderr_thread.join(timeout=2) return returncode == 0, "".join(stdout_lines), "".join(stderr_lines) except Exception as exc: return False, "".join(stdout_lines), str(exc) def run_capture(args, cwd=None, timeout=None): """Run a command and capture output for decision making.""" try: result = subprocess.run( args, cwd=cwd, shell=False, capture_output=True, text=True, encoding="utf-8", errors="replace", timeout=timeout, ) return result.returncode == 0, result.stdout.strip(), result.stderr.strip() except subprocess.TimeoutExpired: return False, "", f"命令执行超时 ({timeout}s): {' '.join(map(str, args))}" except Exception as exc: return False, "", str(exc) def run_command(cmd, cwd=None, capture=True, timeout=None): """Run a local shell command.""" args = cmd if isinstance(cmd, list) else ["cmd", "/c", cmd] if os.name == "nt" else ["bash", "-lc", cmd] if capture: return run_capture(args, cwd=cwd, timeout=timeout) return run_stream(args, cwd=cwd, timeout=timeout) def ssh_args(cmd): return SSH_BASE + ["bash", "-lc", shlex.quote(cmd)] def exec_ssh_cmd(cmd, timeout=120, capture=False): """Run a remote command through SSH. Streams output by default.""" args = ssh_args(cmd) if capture: ok, stdout, stderr = run_capture(args, cwd=SCRIPT_DIR, timeout=timeout) else: ok, stdout, stderr = run_stream(args, cwd=SCRIPT_DIR, timeout=timeout) if not ok: log_error(f"SSH命令失败: {cmd}") if stderr: log_error(stderr.strip()) return ok, stdout.strip(), stderr.strip() def scp_upload(local_path, remote_path, recursive=False): """Upload a file or directory through SCP with streamed logs.""" args = SCP_BASE.copy() if recursive: args.append("-r") args.extend([str(local_path), f"{USERNAME}@{SERVER_IP}:{remote_path}"]) ok, _, stderr = run_stream(args, cwd=SCRIPT_DIR, timeout=300) if not ok: log_error(f"SCP上传失败: {stderr.strip()}") return ok def check_env(): log_info("检查本地环境...") success, _, err = run_command("npm --version", timeout=30) if not success: log_error("未找到 npm 命令,请先安装 Node.js") if err: log_error(err) sys.exit(1) log_info("环境检查通过") def build_project(): log_info("开始构建项目...") os.chdir(SCRIPT_DIR) if DIST_DIR.exists(): shutil.rmtree(DIST_DIR) log_info("已清理旧的 dist 目录") log_info("执行: npm run build") success, _, err = run_command("npm run build", cwd=SCRIPT_DIR, capture=False, timeout=600) if not success: log_error("项目构建失败") if err: log_error(err) sys.exit(1) if not DIST_DIR.exists(): log_error("dist 目录不存在") sys.exit(1) log_info("项目构建成功") def deploy(): timestamp = datetime.now().strftime("%Y%m%d%H%M%S") release_path = f"{RELEASES_DIR}/{timestamp}" log_info(f"准备部署版本: {timestamp}") log_info("创建远程目录...") success, _, err = exec_ssh_cmd(f"mkdir -p {shlex.quote(release_path)}", timeout=60) if not success: log_error(f"创建远程目录失败: {err}") sys.exit(1) log_info("上传文件到服务器...") for item in DIST_DIR.iterdir(): log_info(f" 上传: {item.name}") if item.is_file(): ok = scp_upload(item, f"{release_path}/") else: ok = scp_upload(item, f"{release_path}/", recursive=True) if not ok: log_error("文件上传失败") sys.exit(1) log_info("文件上传成功") log_info("设置文件权限...") success, _, err = exec_ssh_cmd(f"chmod -R 755 {shlex.quote(release_path)}", timeout=120) if not success: log_error(f"设置文件权限失败: {err}") sys.exit(1) log_info("切换服务版本...") switch_script = f""" set -e release_path={shlex.quote(release_path)} link_path={shlex.quote(LINK_PATH)} if [ -d "$link_path" ] && [ ! -L "$link_path" ]; then backup_path="${{link_path}}_backup_$(date +%s)" echo "当前路径是普通目录,备份到: $backup_path" mv "$link_path" "$backup_path" fi ln -sfn "$release_path" "$link_path" echo "当前版本指向: $(readlink "$link_path")" """ success, _, err = exec_ssh_cmd(switch_script, timeout=120) if not success: log_error(f"切换版本失败: {err}") sys.exit(1) log_info(f"部署完成,当前版本指向: {release_path}") clean_old_releases() def clean_old_releases(): log_info(f"清理旧版本(保留最近 {MAX_RELEASES} 个)...") clean_script = f""" set -e mkdir -p {shlex.quote(RELEASES_DIR)} cd {shlex.quote(RELEASES_DIR)} old_releases=$(ls -1dt */ 2>/dev/null | tail -n +{MAX_RELEASES + 1} || true) if [ -n "$old_releases" ]; then echo "$old_releases" | xargs -r rm -rf echo "$old_releases" | sed 's/^/已删除旧版本: /' else echo "没有需要清理的旧版本" fi """ success, _, err = exec_ssh_cmd(clean_script, timeout=120) if not success: log_warn(f"清理旧版本失败: {err}") def rollback(): log_info("开始回滚操作...") success, current_link, err = exec_ssh_cmd(f"readlink {shlex.quote(LINK_PATH)}", timeout=60, capture=True) if not success: log_error(f"读取当前版本失败: {err}") sys.exit(1) log_info(f"当前版本: {current_link}") find_prev_script = f""" set -e current_link={shlex.quote(current_link)} ls -dt {shlex.quote(RELEASES_DIR)}/* 2>/dev/null | grep -v "$current_link" | head -n 1 """ success, prev_version, err = exec_ssh_cmd(find_prev_script, timeout=60, capture=True) if not success or not prev_version: log_error("没有找到可回滚的历史版本") if err: log_error(err) sys.exit(1) log_info(f"回滚目标版本: {prev_version}") success, _, err = exec_ssh_cmd( f"ln -sfn {shlex.quote(prev_version)} {shlex.quote(LINK_PATH)} && readlink {shlex.quote(LINK_PATH)}", timeout=60, ) if not success: log_error(f"回滚失败: {err}") sys.exit(1) log_info("回滚成功") def print_usage(): print( """ 用法: python deploy.py [命令] 命令: deploy - 构建并部署到服务器(默认) rollback - 回滚到上一个版本 示例: python deploy.py # 部署 python deploy.py rollback # 回滚 """ ) def main(): command = sys.argv[1] if len(sys.argv) > 1 else "deploy" if command == "rollback": rollback() elif command == "deploy": check_env() build_project() deploy() else: print_usage() sys.exit(1) if __name__ == "__main__": main()