""" AI助手Web客户端 - Flask应用 对接im-api后端聊天服务 @author huazm """ import os import sys import json import logging import requests from flask import Flask, render_template, request, jsonify, Response, stream_with_context from flask_cors import CORS # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # 创建Flask应用 app = Flask(__name__) app.config['SECRET_KEY'] = 'ai-assistant-web-client-secret-key' CORS(app) # 启用CORS # API后端地址配置(api模块提供HTTP接口) API_BASE_URL = os.environ.get('API_BASE_URL', 'http://localhost:18090') # api模块的接口路径前缀是/api/ai-assistant API_PREFIX = '/api/ai-assistant' # 认证token(实际使用时应该从登录获取) # 这里暂时使用环境变量或固定值 AUTH_TOKEN = os.environ.get('AUTH_TOKEN', '') @app.route('/') def index(): """主页面""" return render_template('index.html') def get_auth_headers(): """ 获取认证请求头 从前端请求中获取Authorization头并传递给后端 """ headers = { 'Content-Type': 'application/json' } # 优先使用前端传递的Authorization头 auth_header = request.headers.get('Authorization') if auth_header: headers['Authorization'] = auth_header elif AUTH_TOKEN: # 如果前端没有传递,使用环境变量中的token headers['Authorization'] = f'Bearer {AUTH_TOKEN}' return headers @app.route('/api/ai-assistant/chatapp', methods=['GET']) def get_applications(): """ 代理获取应用列表请求到api后端 """ try: url = f'{API_BASE_URL}{API_PREFIX}/chatapp' logger.info(f"代理请求: GET {url}") response = requests.get( url, headers=get_auth_headers(), timeout=30 ) logger.info(f"响应状态码: {response.status_code}") # 直接返回后端响应 return Response( response.content, status=response.status_code, content_type=response.headers.get('Content-Type', 'application/json') ) except requests.exceptions.RequestException as e: logger.error(f"代理请求失败: {e}") return jsonify({ 'code': 500, 'message': f'代理请求失败: {str(e)}', 'data': None }), 500 except Exception as e: logger.error(f"获取应用列表失败: {e}") return jsonify({ 'code': 500, 'message': str(e), 'data': None }), 500 @app.route('/api/ai-assistant/chatapp//getRecommendQuestion', methods=['GET']) def get_recommend_questions(app_id): """ 代理获取推荐问题请求到api后端 """ try: # 获取查询参数 page_size = request.args.get('pageSize', '10') current = request.args.get('current', '1') url = f'{API_BASE_URL}{API_PREFIX}/chatapp/{app_id}/getRecommendQuestion' params = { 'pageSize': page_size, 'current': current } logger.info(f"代理请求: GET {url} with params {params}") response = requests.get( url, params=params, headers=get_auth_headers(), timeout=30 ) return Response( response.content, status=response.status_code, content_type=response.headers.get('Content-Type', 'application/json') ) except Exception as e: logger.error(f"获取推荐问题失败: {e}") return jsonify({ 'code': 500, 'message': str(e), 'data': None }), 500 @app.route('/api/ai-assistant/chat/completions/message', methods=['POST']) def send_message(): """ 代理发送消息请求到api后端(SSE流式响应) """ try: data = request.get_json() url = f'{API_BASE_URL}{API_PREFIX}/chat/completions/message' logger.info(f"代理SSE请求: POST {url}") logger.info(f"请求体: {json.dumps(data, ensure_ascii=False)}") # 发送POST请求并流式读取响应 response = requests.post( url, json=data, headers=get_auth_headers(), stream=True, timeout=300 # 5分钟超时 ) # 流式转发SSE响应 def generate(): try: for line in response.iter_lines(): if line: decoded_line = line.decode('utf-8') logger.debug(f"SSE数据: {decoded_line}") yield f"{decoded_line}\n" except Exception as e: logger.error(f"流式响应错误: {e}") yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n" return Response( stream_with_context(generate()), mimetype='text/event-stream', headers={ 'Cache-Control': 'no-cache', 'X-Accel-Buffering': 'no' } ) except Exception as e: logger.error(f"发送消息失败: {e}") return jsonify({ 'code': 500, 'message': str(e), 'data': None }), 500 @app.route('/api/health', methods=['GET']) def health_check(): """ 健康检查 """ # 检查api后端是否可用 api_available = False try: response = requests.get( f'{API_BASE_URL}/actuator/health', timeout=5 ) api_available = response.status_code == 200 except: pass return jsonify({ 'code': 200, 'message': 'OK', 'data': { 'status': 'healthy', 'api_available': api_available, 'api_url': API_BASE_URL } }) if __name__ == '__main__': port = int(os.environ.get('PORT', 15000)) debug = os.environ.get('DEBUG', 'True').lower() == 'true' logger.info(f"启动AI助手Web客户端,端口: {port}, 调试模式: {debug}") app.run(host='0.0.0.0', port=port, debug=debug)