""" HTTP 客户端服务 处理 HTTP 请求发送和响应处理 @author huazm """ import time import json import base64 import httpx from typing import Dict, Any, Optional, List from urllib.parse import urlencode class HttpClient: """HTTP 客户端,支持多种请求方法和认证方式""" def __init__(self, timeout: int = 30): self.timeout = timeout def send_request( self, method: str, url: str, headers: Optional[Dict[str, str]] = None, params: Optional[Dict[str, str]] = None, body: Optional[str] = None, body_type: str = 'json', auth_type: Optional[str] = None, auth_config: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ 发送 HTTP 请求 Args: method: 请求方法 (GET, POST, PUT, DELETE, PATCH, HEAD, OPTIONS) url: 请求 URL headers: 请求头 params: URL 参数 body: 请求体 body_type: 请求体类型 (json, form, xml, raw) auth_type: 认证类型 (bearer, basic, apikey, oauth2) auth_config: 认证配置 Returns: 包含响应信息的字典 """ start_time = time.time() # 初始化请求头 request_headers = dict(headers) if headers else {} # 处理认证 self._apply_auth(request_headers, auth_type, auth_config, params) # 处理请求体 content = None data = None if body and method.upper() not in ['GET', 'HEAD', 'OPTIONS']: if body_type == 'json': request_headers.setdefault('Content-Type', 'application/json') content = body elif body_type == 'form': request_headers.setdefault('Content-Type', 'application/x-www-form-urlencoded') try: data = json.loads(body) except: data = body elif body_type == 'xml': request_headers.setdefault('Content-Type', 'application/xml') content = body else: content = body try: with httpx.Client(timeout=self.timeout, verify=False, follow_redirects=True) as client: response = client.request( method=method.upper(), url=url, headers=request_headers, params=params, content=content, data=data ) duration = (time.time() - start_time) * 1000 # 转换为毫秒 # 尝试解析响应体 try: response_body = response.text except: response_body = str(response.content) return { 'success': True, 'statusCode': response.status_code, 'headers': dict(response.headers), 'body': response_body, 'duration': round(duration, 2), 'size': len(response.content) } except httpx.TimeoutException: return { 'success': False, 'error': '请求超时', 'duration': round((time.time() - start_time) * 1000, 2) } except httpx.ConnectError as e: return { 'success': False, 'error': f'连接失败: {str(e)}', 'duration': round((time.time() - start_time) * 1000, 2) } except Exception as e: return { 'success': False, 'error': f'请求错误: {str(e)}', 'duration': round((time.time() - start_time) * 1000, 2) } def _apply_auth( self, headers: Dict[str, str], auth_type: Optional[str], auth_config: Optional[Dict[str, Any]], params: Optional[Dict[str, str]] ): """应用认证配置到请求头或参数""" if not auth_type or not auth_config: return if auth_type == 'bearer': token = auth_config.get('token', '') headers['Authorization'] = f'Bearer {token}' elif auth_type == 'basic': username = auth_config.get('username', '') password = auth_config.get('password', '') credentials = base64.b64encode(f'{username}:{password}'.encode()).decode() headers['Authorization'] = f'Basic {credentials}' elif auth_type == 'apikey': key = auth_config.get('key', '') value = auth_config.get('value', '') location = auth_config.get('location', 'header') # header 或 query if location == 'header': headers[key] = value elif location == 'query' and params is not None: params[key] = value elif auth_type == 'oauth2': token = auth_config.get('accessToken', '') headers['Authorization'] = f'Bearer {token}' def batch_request( self, requests: List[Dict[str, Any]], interval: int = 100 ) -> List[Dict[str, Any]]: """ 批量发送请求 Args: requests: 请求配置列表 interval: 请求间隔(毫秒) Returns: 响应结果列表 """ results = [] for i, req in enumerate(requests): result = self.send_request( method=req.get('method', 'GET'), url=req.get('url', ''), headers=req.get('headers'), params=req.get('params'), body=req.get('body'), body_type=req.get('bodyType', 'json'), auth_type=req.get('authType'), auth_config=req.get('authConfig') ) result['index'] = i results.append(result) # 请求间隔 if interval > 0 and i < len(requests) - 1: time.sleep(interval / 1000) return results