|
import requests
import random
import string
import time
import hashlib
import uuid
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
URL = "http://111.231.15.49:30395/api/user/register"
# 并发线程数 (模拟的用户数)
THREADS = 20
# 每个线程发送的请求数
REQUESTS_PER_THREAD = 20
# --- 统计 ---
success_count = 0
failure_count = 0
lock = threading.Lock()
def get_random_string(length=8):
"""生成指定长度的随机字母和数字组合的字符串"""
letters_and_digits = string.ascii_lowercase + string.digits
return ''.join(random.choice(letters_and_digits) for i in range(length))
def generate_sign(params):
"""
通常的逻辑是:
1. 移除 'sign' 参数本身。
2. 将所有参数按 key 的字母顺序排序。
3. 将排序后的参数拼接成 'key=value&key=value' 的形式。
4. 在拼接后的字符串末尾加上一个秘密密钥 (secret key/salt)。
5. 对最终的字符串进行 MD5 或其他哈希算法加密。
由于我们不知道秘密密钥,这里的实现是一个示例,很可能无法直接使用。
您需要通过分析您 App 的源代码来找到正确的加密逻辑和密钥。
"""
# 示例密钥,您必须替换成真实的密钥
secret_key = "secret_key"
# 1. & 2. 按 Key 排序
sorted_params = sorted(params.items())
# 3. 拼接字符串
query_string = '&'.join([f"{k}={v}" for k, v in sorted_params if v])
# 4. 加上密钥
string_to_sign = f"{query_string}&key={secret_key}"
# 5. MD5 加密并返回小写形式
# 如果您的 App 使用其他算法(如 SHA256 ),请修改 hashlib.md5
return hashlib.md5(string_to_sign.encode('utf-8')).hexdigest()
def register_user(request_num):
"""
模拟单个用户注册请求
"""
global success_count, failure_count
try:
# 准备随机数据
account = get_random_string(10)
password = get_random_string(12)
nickname = "user_" + get_random_string(6)
device_no = uuid.uuid4().hex
token = str(uuid.uuid4())
# 准备请求体 (Payload)
payload = {
'account': account,
'password': password,
'nickname': nickname,
'deviceNo': device_no,
'token': token,
'timestamp': int(time.time() * 1000), # 毫秒时间戳
'ip': f"{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}",
'question': '',
'os': 'android',
'invitecode': '',
'mobilecode': '',
'answer': '',
'v': '1.0.28',
}
# 生成签名
sign = generate_sign(payload)
payload['sign'] = sign
# 准备请求头
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'User-Agent': 'okhttp/3.3.1',
'Connection': 'Keep-Alive',
}
# 发送 POST 请求
response = requests.post(URL, data=payload, headers=headers, timeout=2)
# 检查响应
if response.status_code == 200:
print(f"[请求 {request_num:03d}] 成功: Status={response.status_code}, Account={account}")
with lock:
success_count += 1
else:
print(f"[请求 {request_num:03d}] 失败: Status={response.status_code}, Account={account}, Response={response.text[:100]}")
with lock:
failure_count += 1
except requests.exceptions.RequestException as e:
print(f"[请求 {request_num:03d}] 异常: Account={account}, Error={e}")
with lock:
failure_count += 1
def main():
"""
主函数,启动并发测试
"""
total_requests = THREADS * REQUESTS_PER_THREAD
print(f"🚀 开始负载测试...")
print(f"URL: {URL}")
print(f"总请求数: {total_requests} ({THREADS} 个线程 * {REQUESTS_PER_THREAD} 次/线程)")
start_time = time.time()
with ThreadPoolExecutor(max_workers=THREADS) as executor:
# 提交所有任务
futures = [executor.submit(register_user, i + 1) for i in range(total_requests)]
# 等待所有任务完成 (as_completed 可以提供进度)
for future in as_completed(futures):
try:
future.result() # 获取任务结果,如有异常会在这里抛出
except Exception as e:
print(f"一个线程任务执行出错: {e}")
end_time = time.time()
duration = end_time - start_time
print("\n" + "="*50)
print("📊 测试完成")
print(f"总耗时: {duration:.2f} 秒")
print(f"成功请求: {success_count}")
print(f"失败请求: {failure_count}")
if duration > 0:
print(f"平均 QPS (每秒请求数): {total_requests / duration:.2f}")
print("="*50)
if __name__ == "__main__":
main() # 拿 gemini 生成了一个代码
@HiHuan |