""" API 路由 处理 HTTP 请求、历史记录、收藏夹等 API @author huazm """ import json import re from flask import Blueprint, request, jsonify from models import db, RequestHistory, Collection, Environment from services import HttpClient api_bp = Blueprint('api', __name__, url_prefix='/api') http_client = HttpClient() @api_bp.route('/request', methods=['POST']) def send_request(): """发送 HTTP 请求""" data = request.get_json() if not data or not data.get('url'): return jsonify({'success': False, 'error': '请提供 URL'}), 400 # 解析请求参数 method = data.get('method', 'GET') url = data.get('url') headers = data.get('headers') params = data.get('params') body = data.get('body') body_type = data.get('bodyType', 'json') auth_type = data.get('authType') auth_config = data.get('authConfig') # 解析 headers 和 params(如果是字符串) if isinstance(headers, str): try: headers = json.loads(headers) except: headers = {} if isinstance(params, str): try: params = json.loads(params) except: params = {} if isinstance(auth_config, str): try: auth_config = json.loads(auth_config) except: auth_config = {} # 发送请求 result = http_client.send_request( method=method, url=url, headers=headers, params=params, body=body, body_type=body_type, auth_type=auth_type, auth_config=auth_config ) # 保存到历史记录 if data.get('saveHistory', True): history = RequestHistory( method=method, url=url, headers=json.dumps(headers) if headers else None, params=json.dumps(params) if params else None, body=body, body_type=body_type, auth_type=auth_type, auth_config=json.dumps(auth_config) if auth_config else None, response_body=result.get('body'), response_headers=json.dumps(result.get('headers')) if result.get('headers') else None, status_code=result.get('statusCode'), duration=result.get('duration') ) db.session.add(history) db.session.commit() result['historyId'] = history.id return jsonify(result) @api_bp.route('/batch', methods=['POST']) def batch_request(): """批量发送请求""" data = request.get_json() if not data or not data.get('requests'): return jsonify({'success': False, 'error': '请提供请求列表'}), 400 requests_list = data.get('requests', []) interval = data.get('interval', 100) results = http_client.batch_request(requests_list, interval) return jsonify({ 'success': True, 'results': results, 'total': len(results), 'successCount': sum(1 for r in results if r.get('success')), 'failCount': sum(1 for r in results if not r.get('success')) }) @api_bp.route('/history', methods=['GET']) def get_history(): """获取历史记录""" page = request.args.get('page', 1, type=int) size = request.args.get('size', 20, type=int) search = request.args.get('search', '') method = request.args.get('method', '') query = RequestHistory.query if search: query = query.filter(RequestHistory.url.contains(search)) if method: query = query.filter(RequestHistory.method == method.upper()) query = query.order_by(RequestHistory.created_at.desc()) pagination = query.paginate(page=page, per_page=size, error_out=False) return jsonify({ 'success': True, 'data': [h.to_dict() for h in pagination.items], 'total': pagination.total, 'page': page, 'size': size }) @api_bp.route('/history/', methods=['DELETE']) def delete_history(id): """删除历史记录""" history = RequestHistory.query.get(id) if not history: return jsonify({'success': False, 'error': '记录不存在'}), 404 db.session.delete(history) db.session.commit() return jsonify({'success': True}) @api_bp.route('/history/clear', methods=['DELETE']) def clear_history(): """清空历史记录""" RequestHistory.query.delete() db.session.commit() return jsonify({'success': True}) # ==================== 收藏夹 API ==================== @api_bp.route('/collections', methods=['GET']) def get_collections(): """获取收藏夹列表""" folder = request.args.get('folder', '') search = request.args.get('search', '') query = Collection.query if folder: query = query.filter(Collection.folder == folder) if search: query = query.filter( (Collection.name.contains(search)) | (Collection.url.contains(search)) ) collections = query.order_by(Collection.updated_at.desc()).all() return jsonify({ 'success': True, 'data': [c.to_dict() for c in collections] }) @api_bp.route('/collections', methods=['POST']) def save_collection(): """保存到收藏夹""" data = request.get_json() if not data or not data.get('name') or not data.get('url'): return jsonify({'success': False, 'error': '请提供名称和 URL'}), 400 collection = Collection( name=data.get('name'), description=data.get('description'), folder=data.get('folder'), method=data.get('method', 'GET'), url=data.get('url'), headers=json.dumps(data.get('headers')) if data.get('headers') else None, params=json.dumps(data.get('params')) if data.get('params') else None, body=data.get('body'), body_type=data.get('bodyType'), auth_type=data.get('authType'), auth_config=json.dumps(data.get('authConfig')) if data.get('authConfig') else None, tags=json.dumps(data.get('tags')) if data.get('tags') else None ) db.session.add(collection) db.session.commit() return jsonify({'success': True, 'data': collection.to_dict()}) @api_bp.route('/collections/', methods=['DELETE']) def delete_collection(id): """删除收藏""" collection = Collection.query.get(id) if not collection: return jsonify({'success': False, 'error': '收藏不存在'}), 404 db.session.delete(collection) db.session.commit() return jsonify({'success': True}) # ==================== 环境变量 API ==================== @api_bp.route('/environments', methods=['GET']) def get_environments(): """获取环境变量列表""" environments = Environment.query.all() return jsonify({ 'success': True, 'data': [e.to_dict() for e in environments] }) @api_bp.route('/environments', methods=['POST']) def save_environment(): """保存环境变量""" data = request.get_json() if not data or not data.get('name'): return jsonify({'success': False, 'error': '请提供环境名称'}), 400 env = Environment( name=data.get('name'), variables=json.dumps(data.get('variables', {})), is_active=data.get('isActive', False) ) # 如果设为激活,取消其他环境的激活状态 if env.is_active: Environment.query.update({Environment.is_active: False}) db.session.add(env) db.session.commit() return jsonify({'success': True, 'data': env.to_dict()}) @api_bp.route('/environments//activate', methods=['POST']) def activate_environment(id): """激活环境""" env = Environment.query.get(id) if not env: return jsonify({'success': False, 'error': '环境不存在'}), 404 Environment.query.update({Environment.is_active: False}) env.is_active = True db.session.commit() return jsonify({'success': True}) # ==================== 导入功能 API ==================== @api_bp.route('/import/curl', methods=['POST']) def import_curl(): """导入 cURL 命令""" data = request.get_json() curl_command = data.get('curl', '') if not curl_command: return jsonify({'success': False, 'error': '请提供 cURL 命令'}), 400 parsed = parse_curl(curl_command) return jsonify({'success': True, 'data': parsed}) def parse_curl(curl_command: str) -> dict: """解析 cURL 命令""" result = { 'method': 'GET', 'url': '', 'headers': {}, 'body': None } # 提取 URL url_match = re.search(r"curl\s+['\"]?([^'\"\s]+)['\"]?", curl_command) if url_match: result['url'] = url_match.group(1) # 提取方法 method_match = re.search(r'-X\s+(\w+)', curl_command) if method_match: result['method'] = method_match.group(1).upper() # 提取 headers header_matches = re.findall(r"-H\s+['\"]([^'\"]+)['\"]", curl_command) for header in header_matches: if ':' in header: key, value = header.split(':', 1) result['headers'][key.strip()] = value.strip() # 提取 body body_match = re.search(r"(?:-d|--data|--data-raw)\s+['\"]([^'\"]+)['\"]", curl_command) if body_match: result['body'] = body_match.group(1) if result['method'] == 'GET': result['method'] = 'POST' return result