195 lines
6.2 KiB
Python
195 lines
6.2 KiB
Python
"""
|
|
HTTP 客户端服务
|
|
处理 HTTP 请求发送和响应处理
|
|
|
|
@author huazm
|
|
"""
|
|
import time
|
|
import json
|
|
import base64
|
|
import httpx
|
|
from typing import Dict, Any, Optional, List
|
|
from urllib.parse import urlencode
|
|
|
|
|
|
class HttpClient:
|
|
"""HTTP 客户端,支持多种请求方法和认证方式"""
|
|
|
|
def __init__(self, timeout: int = 30):
|
|
self.timeout = timeout
|
|
|
|
def send_request(
|
|
self,
|
|
method: str,
|
|
url: str,
|
|
headers: Optional[Dict[str, str]] = None,
|
|
params: Optional[Dict[str, str]] = None,
|
|
body: Optional[str] = None,
|
|
body_type: str = 'json',
|
|
auth_type: Optional[str] = None,
|
|
auth_config: Optional[Dict[str, Any]] = None
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
发送 HTTP 请求
|
|
|
|
Args:
|
|
method: 请求方法 (GET, POST, PUT, DELETE, PATCH, HEAD, OPTIONS)
|
|
url: 请求 URL
|
|
headers: 请求头
|
|
params: URL 参数
|
|
body: 请求体
|
|
body_type: 请求体类型 (json, form, xml, raw)
|
|
auth_type: 认证类型 (bearer, basic, apikey, oauth2)
|
|
auth_config: 认证配置
|
|
|
|
Returns:
|
|
包含响应信息的字典
|
|
"""
|
|
start_time = time.time()
|
|
|
|
# 初始化请求头
|
|
request_headers = dict(headers) if headers else {}
|
|
|
|
# 处理认证
|
|
self._apply_auth(request_headers, auth_type, auth_config, params)
|
|
|
|
# 处理请求体
|
|
content = None
|
|
data = None
|
|
|
|
if body and method.upper() not in ['GET', 'HEAD', 'OPTIONS']:
|
|
if body_type == 'json':
|
|
request_headers.setdefault('Content-Type', 'application/json')
|
|
content = body
|
|
elif body_type == 'form':
|
|
request_headers.setdefault('Content-Type', 'application/x-www-form-urlencoded')
|
|
try:
|
|
data = json.loads(body)
|
|
except:
|
|
data = body
|
|
elif body_type == 'xml':
|
|
request_headers.setdefault('Content-Type', 'application/xml')
|
|
content = body
|
|
else:
|
|
content = body
|
|
|
|
try:
|
|
with httpx.Client(timeout=self.timeout, verify=False, follow_redirects=True) as client:
|
|
response = client.request(
|
|
method=method.upper(),
|
|
url=url,
|
|
headers=request_headers,
|
|
params=params,
|
|
content=content,
|
|
data=data
|
|
)
|
|
|
|
duration = (time.time() - start_time) * 1000 # 转换为毫秒
|
|
|
|
# 尝试解析响应体
|
|
try:
|
|
response_body = response.text
|
|
except:
|
|
response_body = str(response.content)
|
|
|
|
return {
|
|
'success': True,
|
|
'statusCode': response.status_code,
|
|
'headers': dict(response.headers),
|
|
'body': response_body,
|
|
'duration': round(duration, 2),
|
|
'size': len(response.content)
|
|
}
|
|
|
|
except httpx.TimeoutException:
|
|
return {
|
|
'success': False,
|
|
'error': '请求超时',
|
|
'duration': round((time.time() - start_time) * 1000, 2)
|
|
}
|
|
except httpx.ConnectError as e:
|
|
return {
|
|
'success': False,
|
|
'error': f'连接失败: {str(e)}',
|
|
'duration': round((time.time() - start_time) * 1000, 2)
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'success': False,
|
|
'error': f'请求错误: {str(e)}',
|
|
'duration': round((time.time() - start_time) * 1000, 2)
|
|
}
|
|
|
|
def _apply_auth(
|
|
self,
|
|
headers: Dict[str, str],
|
|
auth_type: Optional[str],
|
|
auth_config: Optional[Dict[str, Any]],
|
|
params: Optional[Dict[str, str]]
|
|
):
|
|
"""应用认证配置到请求头或参数"""
|
|
if not auth_type or not auth_config:
|
|
return
|
|
|
|
if auth_type == 'bearer':
|
|
token = auth_config.get('token', '')
|
|
headers['Authorization'] = f'Bearer {token}'
|
|
|
|
elif auth_type == 'basic':
|
|
username = auth_config.get('username', '')
|
|
password = auth_config.get('password', '')
|
|
credentials = base64.b64encode(f'{username}:{password}'.encode()).decode()
|
|
headers['Authorization'] = f'Basic {credentials}'
|
|
|
|
elif auth_type == 'apikey':
|
|
key = auth_config.get('key', '')
|
|
value = auth_config.get('value', '')
|
|
location = auth_config.get('location', 'header') # header 或 query
|
|
|
|
if location == 'header':
|
|
headers[key] = value
|
|
elif location == 'query' and params is not None:
|
|
params[key] = value
|
|
|
|
elif auth_type == 'oauth2':
|
|
token = auth_config.get('accessToken', '')
|
|
headers['Authorization'] = f'Bearer {token}'
|
|
|
|
def batch_request(
|
|
self,
|
|
requests: List[Dict[str, Any]],
|
|
interval: int = 100
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
批量发送请求
|
|
|
|
Args:
|
|
requests: 请求配置列表
|
|
interval: 请求间隔(毫秒)
|
|
|
|
Returns:
|
|
响应结果列表
|
|
"""
|
|
results = []
|
|
|
|
for i, req in enumerate(requests):
|
|
result = self.send_request(
|
|
method=req.get('method', 'GET'),
|
|
url=req.get('url', ''),
|
|
headers=req.get('headers'),
|
|
params=req.get('params'),
|
|
body=req.get('body'),
|
|
body_type=req.get('bodyType', 'json'),
|
|
auth_type=req.get('authType'),
|
|
auth_config=req.get('authConfig')
|
|
)
|
|
result['index'] = i
|
|
results.append(result)
|
|
|
|
# 请求间隔
|
|
if interval > 0 and i < len(requests) - 1:
|
|
time.sleep(interval / 1000)
|
|
|
|
return results
|
|
|