Files
2025-12-25 18:04:10 +08:00

195 lines
6.2 KiB
Python

"""
HTTP 客户端服务
处理 HTTP 请求发送和响应处理
@author huazm
"""
import time
import json
import base64
import httpx
from typing import Dict, Any, Optional, List
from urllib.parse import urlencode
class HttpClient:
"""HTTP 客户端,支持多种请求方法和认证方式"""
def __init__(self, timeout: int = 30):
self.timeout = timeout
def send_request(
self,
method: str,
url: str,
headers: Optional[Dict[str, str]] = None,
params: Optional[Dict[str, str]] = None,
body: Optional[str] = None,
body_type: str = 'json',
auth_type: Optional[str] = None,
auth_config: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
发送 HTTP 请求
Args:
method: 请求方法 (GET, POST, PUT, DELETE, PATCH, HEAD, OPTIONS)
url: 请求 URL
headers: 请求头
params: URL 参数
body: 请求体
body_type: 请求体类型 (json, form, xml, raw)
auth_type: 认证类型 (bearer, basic, apikey, oauth2)
auth_config: 认证配置
Returns:
包含响应信息的字典
"""
start_time = time.time()
# 初始化请求头
request_headers = dict(headers) if headers else {}
# 处理认证
self._apply_auth(request_headers, auth_type, auth_config, params)
# 处理请求体
content = None
data = None
if body and method.upper() not in ['GET', 'HEAD', 'OPTIONS']:
if body_type == 'json':
request_headers.setdefault('Content-Type', 'application/json')
content = body
elif body_type == 'form':
request_headers.setdefault('Content-Type', 'application/x-www-form-urlencoded')
try:
data = json.loads(body)
except:
data = body
elif body_type == 'xml':
request_headers.setdefault('Content-Type', 'application/xml')
content = body
else:
content = body
try:
with httpx.Client(timeout=self.timeout, verify=False, follow_redirects=True) as client:
response = client.request(
method=method.upper(),
url=url,
headers=request_headers,
params=params,
content=content,
data=data
)
duration = (time.time() - start_time) * 1000 # 转换为毫秒
# 尝试解析响应体
try:
response_body = response.text
except:
response_body = str(response.content)
return {
'success': True,
'statusCode': response.status_code,
'headers': dict(response.headers),
'body': response_body,
'duration': round(duration, 2),
'size': len(response.content)
}
except httpx.TimeoutException:
return {
'success': False,
'error': '请求超时',
'duration': round((time.time() - start_time) * 1000, 2)
}
except httpx.ConnectError as e:
return {
'success': False,
'error': f'连接失败: {str(e)}',
'duration': round((time.time() - start_time) * 1000, 2)
}
except Exception as e:
return {
'success': False,
'error': f'请求错误: {str(e)}',
'duration': round((time.time() - start_time) * 1000, 2)
}
def _apply_auth(
self,
headers: Dict[str, str],
auth_type: Optional[str],
auth_config: Optional[Dict[str, Any]],
params: Optional[Dict[str, str]]
):
"""应用认证配置到请求头或参数"""
if not auth_type or not auth_config:
return
if auth_type == 'bearer':
token = auth_config.get('token', '')
headers['Authorization'] = f'Bearer {token}'
elif auth_type == 'basic':
username = auth_config.get('username', '')
password = auth_config.get('password', '')
credentials = base64.b64encode(f'{username}:{password}'.encode()).decode()
headers['Authorization'] = f'Basic {credentials}'
elif auth_type == 'apikey':
key = auth_config.get('key', '')
value = auth_config.get('value', '')
location = auth_config.get('location', 'header') # header 或 query
if location == 'header':
headers[key] = value
elif location == 'query' and params is not None:
params[key] = value
elif auth_type == 'oauth2':
token = auth_config.get('accessToken', '')
headers['Authorization'] = f'Bearer {token}'
def batch_request(
self,
requests: List[Dict[str, Any]],
interval: int = 100
) -> List[Dict[str, Any]]:
"""
批量发送请求
Args:
requests: 请求配置列表
interval: 请求间隔(毫秒)
Returns:
响应结果列表
"""
results = []
for i, req in enumerate(requests):
result = self.send_request(
method=req.get('method', 'GET'),
url=req.get('url', ''),
headers=req.get('headers'),
params=req.get('params'),
body=req.get('body'),
body_type=req.get('bodyType', 'json'),
auth_type=req.get('authType'),
auth_config=req.get('authConfig')
)
result['index'] = i
results.append(result)
# 请求间隔
if interval > 0 and i < len(requests) - 1:
time.sleep(interval / 1000)
return results