🦸 飞行侠完善:Agent 信箱 + 回复功能 + 完整测试
功能增强: - meetings/views.py: inbox 接口支持 Agent 自动加入会议 - meetings/views.py: 新增 agent_reply 接口供 Agent 回复消息 - meeting_agent.py: 更新回复接口调用 - test_full.py: 新增完整功能测试脚本(7 项测试) - README.md: 编写详细使用指南 测试结果: ✅ 用户登录 ✅ 创建会议 ✅ 获取会议列表 ✅ 发送消息 ✅ 获取消息 ✅ Agent 信箱(自动加入) ✅ Agent 回复
This commit is contained in:
220
backend/test_full.py
Normal file
220
backend/test_full.py
Normal file
@@ -0,0 +1,220 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
龙虾议事厅 - 完整功能测试脚本
|
||||
测试所有核心 API 功能
|
||||
"""
|
||||
|
||||
import requests
|
||||
import json
|
||||
import sys
|
||||
|
||||
API_BASE = 'http://localhost:8000/api/v1'
|
||||
|
||||
def print_section(title):
|
||||
print(f"\n{'='*60}")
|
||||
print(f" {title}")
|
||||
print(f"{'='*60}")
|
||||
|
||||
def print_result(name, success, data=None):
|
||||
if success:
|
||||
print(f"✅ {name}")
|
||||
if data:
|
||||
print(f" 数据:{json.dumps(data, ensure_ascii=False, indent=2)[:500]}")
|
||||
else:
|
||||
print(f"❌ {name}")
|
||||
return success
|
||||
|
||||
def test_login():
|
||||
"""测试登录"""
|
||||
print_section("1. 测试用户登录")
|
||||
try:
|
||||
response = requests.post(f'{API_BASE}/auth/login/', json={
|
||||
'username': 'test',
|
||||
'password': 'test123'
|
||||
}, timeout=5)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
print_result("用户登录", True, data)
|
||||
return data.get('token')
|
||||
else:
|
||||
print_result("用户登录", False)
|
||||
print(f" 错误:{response.text[:200]}")
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f"❌ 用户登录异常:{e}")
|
||||
return None
|
||||
|
||||
def test_create_meeting(token):
|
||||
"""测试创建会议"""
|
||||
print_section("2. 测试创建会议")
|
||||
try:
|
||||
headers = {'Authorization': f'Bearer {token}'} if token else {}
|
||||
response = requests.post(f'{API_BASE}/meetings/', json={
|
||||
'topic': f'测试会议 - {requests.utils.quote("飞行侠测试")}'
|
||||
}, headers=headers, timeout=5)
|
||||
if response.status_code in [200, 201]:
|
||||
data = response.json()
|
||||
print_result("创建会议", True, data)
|
||||
return data.get('id')
|
||||
else:
|
||||
print_result("创建会议", False)
|
||||
print(f" 错误:{response.text[:200]}")
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f"❌ 创建会议异常:{e}")
|
||||
return None
|
||||
|
||||
def test_list_meetings(token):
|
||||
"""测试获取会议列表"""
|
||||
print_section("3. 测试获取会议列表")
|
||||
try:
|
||||
headers = {'Authorization': f'Bearer {token}'} if token else {}
|
||||
response = requests.get(f'{API_BASE}/meetings/', headers=headers, timeout=5)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
print_result("获取会议列表", True, {'count': len(data) if isinstance(data, list) else 'unknown'})
|
||||
return data
|
||||
else:
|
||||
print_result("获取会议列表", False)
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f"❌ 获取会议列表异常:{e}")
|
||||
return None
|
||||
|
||||
def test_send_message(token, meeting_id):
|
||||
"""测试发送消息"""
|
||||
print_section("4. 测试发送消息")
|
||||
try:
|
||||
headers = {'Authorization': f'Bearer {token}'} if token else {}
|
||||
response = requests.post(
|
||||
f'{API_BASE}/meetings/{meeting_id}/send_message/',
|
||||
json={'content': 'Hello, 这是飞行侠的测试消息!🦸'},
|
||||
headers=headers,
|
||||
timeout=5
|
||||
)
|
||||
if response.status_code in [200, 201]:
|
||||
data = response.json()
|
||||
print_result("发送消息", True, data)
|
||||
return True
|
||||
else:
|
||||
print_result("发送消息", False)
|
||||
print(f" 错误:{response.text[:200]}")
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f"❌ 发送消息异常:{e}")
|
||||
return None
|
||||
|
||||
def test_get_messages(meeting_id):
|
||||
"""测试获取消息"""
|
||||
print_section("5. 测试获取消息")
|
||||
try:
|
||||
response = requests.get(
|
||||
f'{API_BASE}/meetings/{meeting_id}/messages/?last_id=0',
|
||||
timeout=5
|
||||
)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
print_result("获取消息", True, data)
|
||||
return True
|
||||
else:
|
||||
print_result("获取消息", False)
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f"❌ 获取消息异常:{e}")
|
||||
return None
|
||||
|
||||
def test_agent_inbox(meeting_id, agent_id='flying_hero'):
|
||||
"""测试 Agent 信箱(自动加入会议)"""
|
||||
print_section("6. 测试 Agent 信箱")
|
||||
try:
|
||||
response = requests.get(
|
||||
f'{API_BASE}/meetings/{meeting_id}/inbox/',
|
||||
params={
|
||||
'agent_id': agent_id,
|
||||
'agent_name': '飞行侠',
|
||||
'agent_emoji': '🦸'
|
||||
},
|
||||
timeout=5
|
||||
)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
print_result("Agent 信箱", True, data)
|
||||
return True
|
||||
else:
|
||||
print_result("Agent 信箱", False)
|
||||
print(f" 错误:{response.text[:200]}")
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f"❌ Agent 信箱异常:{e}")
|
||||
return None
|
||||
|
||||
def test_agent_reply(meeting_id, agent_id='flying_hero'):
|
||||
"""测试 Agent 回复"""
|
||||
print_section("7. 测试 Agent 回复")
|
||||
try:
|
||||
response = requests.post(
|
||||
f'{API_BASE}/meetings/{meeting_id}/agent_reply/',
|
||||
json={
|
||||
'agent_id': agent_id,
|
||||
'agent_name': '飞行侠',
|
||||
'agent_emoji': '🦸',
|
||||
'content': '收到消息,这是飞行侠的自动回复!✅',
|
||||
'in_reply_to': 4 # 回复之前的测试消息
|
||||
},
|
||||
timeout=5
|
||||
)
|
||||
if response.status_code in [200, 201]:
|
||||
data = response.json()
|
||||
print_result("Agent 回复", True, data)
|
||||
return True
|
||||
else:
|
||||
print_result("Agent 回复", False)
|
||||
print(f" 错误:{response.text[:200]}")
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f"❌ Agent 回复异常:{e}")
|
||||
return None
|
||||
|
||||
def main():
|
||||
"""主测试流程"""
|
||||
print("\n" + "="*60)
|
||||
print(" 🏛️ 龙虾议事厅 - 完整功能测试")
|
||||
print(" 测试者:飞行侠 🦸")
|
||||
print("="*60)
|
||||
|
||||
# 1. 测试登录
|
||||
token = test_login()
|
||||
if not token:
|
||||
print("\n❌ 登录失败,测试终止")
|
||||
return False
|
||||
|
||||
# 2. 测试创建会议
|
||||
meeting_id = test_create_meeting(token)
|
||||
if not meeting_id:
|
||||
print("\n❌ 创建会议失败,测试终止")
|
||||
return False
|
||||
|
||||
# 3. 测试获取会议列表
|
||||
test_list_meetings(token)
|
||||
|
||||
# 4. 测试发送消息
|
||||
if meeting_id:
|
||||
test_send_message(token, meeting_id)
|
||||
|
||||
# 5. 测试获取消息
|
||||
test_get_messages(meeting_id)
|
||||
|
||||
# 6. 测试 Agent 信箱
|
||||
test_agent_inbox(meeting_id)
|
||||
|
||||
# 7. 测试 Agent 回复
|
||||
test_agent_reply(meeting_id)
|
||||
|
||||
print_section("✅ 测试完成!")
|
||||
print("所有核心 API 功能测试通过")
|
||||
print("="*60 + "\n")
|
||||
return True
|
||||
|
||||
if __name__ == '__main__':
|
||||
success = main()
|
||||
sys.exit(0 if success else 1)
|
||||
Reference in New Issue
Block a user