工具助手添加
This commit is contained in:
@@ -0,0 +1,194 @@
|
||||
"""
|
||||
HTTP 客户端服务
|
||||
处理 HTTP 请求发送和响应处理
|
||||
|
||||
@author huazm
|
||||
"""
|
||||
import time
|
||||
import json
|
||||
import base64
|
||||
import httpx
|
||||
from typing import Dict, Any, Optional, List
|
||||
from urllib.parse import urlencode
|
||||
|
||||
|
||||
class HttpClient:
|
||||
"""HTTP 客户端,支持多种请求方法和认证方式"""
|
||||
|
||||
def __init__(self, timeout: int = 30):
|
||||
self.timeout = timeout
|
||||
|
||||
def send_request(
|
||||
self,
|
||||
method: str,
|
||||
url: str,
|
||||
headers: Optional[Dict[str, str]] = None,
|
||||
params: Optional[Dict[str, str]] = None,
|
||||
body: Optional[str] = None,
|
||||
body_type: str = 'json',
|
||||
auth_type: Optional[str] = None,
|
||||
auth_config: Optional[Dict[str, Any]] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
发送 HTTP 请求
|
||||
|
||||
Args:
|
||||
method: 请求方法 (GET, POST, PUT, DELETE, PATCH, HEAD, OPTIONS)
|
||||
url: 请求 URL
|
||||
headers: 请求头
|
||||
params: URL 参数
|
||||
body: 请求体
|
||||
body_type: 请求体类型 (json, form, xml, raw)
|
||||
auth_type: 认证类型 (bearer, basic, apikey, oauth2)
|
||||
auth_config: 认证配置
|
||||
|
||||
Returns:
|
||||
包含响应信息的字典
|
||||
"""
|
||||
start_time = time.time()
|
||||
|
||||
# 初始化请求头
|
||||
request_headers = dict(headers) if headers else {}
|
||||
|
||||
# 处理认证
|
||||
self._apply_auth(request_headers, auth_type, auth_config, params)
|
||||
|
||||
# 处理请求体
|
||||
content = None
|
||||
data = None
|
||||
|
||||
if body and method.upper() not in ['GET', 'HEAD', 'OPTIONS']:
|
||||
if body_type == 'json':
|
||||
request_headers.setdefault('Content-Type', 'application/json')
|
||||
content = body
|
||||
elif body_type == 'form':
|
||||
request_headers.setdefault('Content-Type', 'application/x-www-form-urlencoded')
|
||||
try:
|
||||
data = json.loads(body)
|
||||
except:
|
||||
data = body
|
||||
elif body_type == 'xml':
|
||||
request_headers.setdefault('Content-Type', 'application/xml')
|
||||
content = body
|
||||
else:
|
||||
content = body
|
||||
|
||||
try:
|
||||
with httpx.Client(timeout=self.timeout, verify=False, follow_redirects=True) as client:
|
||||
response = client.request(
|
||||
method=method.upper(),
|
||||
url=url,
|
||||
headers=request_headers,
|
||||
params=params,
|
||||
content=content,
|
||||
data=data
|
||||
)
|
||||
|
||||
duration = (time.time() - start_time) * 1000 # 转换为毫秒
|
||||
|
||||
# 尝试解析响应体
|
||||
try:
|
||||
response_body = response.text
|
||||
except:
|
||||
response_body = str(response.content)
|
||||
|
||||
return {
|
||||
'success': True,
|
||||
'statusCode': response.status_code,
|
||||
'headers': dict(response.headers),
|
||||
'body': response_body,
|
||||
'duration': round(duration, 2),
|
||||
'size': len(response.content)
|
||||
}
|
||||
|
||||
except httpx.TimeoutException:
|
||||
return {
|
||||
'success': False,
|
||||
'error': '请求超时',
|
||||
'duration': round((time.time() - start_time) * 1000, 2)
|
||||
}
|
||||
except httpx.ConnectError as e:
|
||||
return {
|
||||
'success': False,
|
||||
'error': f'连接失败: {str(e)}',
|
||||
'duration': round((time.time() - start_time) * 1000, 2)
|
||||
}
|
||||
except Exception as e:
|
||||
return {
|
||||
'success': False,
|
||||
'error': f'请求错误: {str(e)}',
|
||||
'duration': round((time.time() - start_time) * 1000, 2)
|
||||
}
|
||||
|
||||
def _apply_auth(
|
||||
self,
|
||||
headers: Dict[str, str],
|
||||
auth_type: Optional[str],
|
||||
auth_config: Optional[Dict[str, Any]],
|
||||
params: Optional[Dict[str, str]]
|
||||
):
|
||||
"""应用认证配置到请求头或参数"""
|
||||
if not auth_type or not auth_config:
|
||||
return
|
||||
|
||||
if auth_type == 'bearer':
|
||||
token = auth_config.get('token', '')
|
||||
headers['Authorization'] = f'Bearer {token}'
|
||||
|
||||
elif auth_type == 'basic':
|
||||
username = auth_config.get('username', '')
|
||||
password = auth_config.get('password', '')
|
||||
credentials = base64.b64encode(f'{username}:{password}'.encode()).decode()
|
||||
headers['Authorization'] = f'Basic {credentials}'
|
||||
|
||||
elif auth_type == 'apikey':
|
||||
key = auth_config.get('key', '')
|
||||
value = auth_config.get('value', '')
|
||||
location = auth_config.get('location', 'header') # header 或 query
|
||||
|
||||
if location == 'header':
|
||||
headers[key] = value
|
||||
elif location == 'query' and params is not None:
|
||||
params[key] = value
|
||||
|
||||
elif auth_type == 'oauth2':
|
||||
token = auth_config.get('accessToken', '')
|
||||
headers['Authorization'] = f'Bearer {token}'
|
||||
|
||||
def batch_request(
|
||||
self,
|
||||
requests: List[Dict[str, Any]],
|
||||
interval: int = 100
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
批量发送请求
|
||||
|
||||
Args:
|
||||
requests: 请求配置列表
|
||||
interval: 请求间隔(毫秒)
|
||||
|
||||
Returns:
|
||||
响应结果列表
|
||||
"""
|
||||
results = []
|
||||
|
||||
for i, req in enumerate(requests):
|
||||
result = self.send_request(
|
||||
method=req.get('method', 'GET'),
|
||||
url=req.get('url', ''),
|
||||
headers=req.get('headers'),
|
||||
params=req.get('params'),
|
||||
body=req.get('body'),
|
||||
body_type=req.get('bodyType', 'json'),
|
||||
auth_type=req.get('authType'),
|
||||
auth_config=req.get('authConfig')
|
||||
)
|
||||
result['index'] = i
|
||||
results.append(result)
|
||||
|
||||
# 请求间隔
|
||||
if interval > 0 and i < len(requests) - 1:
|
||||
time.sleep(interval / 1000)
|
||||
|
||||
return results
|
||||
|
||||
Reference in New Issue
Block a user