Files
meeting-room/backend/test_webhook.py

86 lines
2.8 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
测试 Webhook 推送功能
"""
import requests
API_BASE = 'http://localhost:8000/api/v1'
def test_webhook():
print("="*60)
print("🔔 测试 Webhook 推送功能")
print("="*60)
# 1. 登录
res = requests.post(f'{API_BASE}/auth/login/', json={
'username': 'test',
'password': 'test123'
})
token = res.json()['token']
headers = {'Authorization': f'Bearer {token}'}
print(f"✅ 登录成功")
# 2. 注册实例(模拟 OpenClaw 本机)
print("\n📝 注册实例...")
res = requests.post(f'{API_BASE}/instances/register/', json={
'instance_id': 'test-openclaw-001',
'instance_name': '测试 OpenClaw 实例',
'agent_ids': ['flying_hero', 'lobster_monitor'],
'webhook_url': 'http://localhost:8888/meeting-notify'
})
if res.status_code == 200:
print(f"✅ 实例注册成功:{res.json()}")
else:
print(f"⚠️ 实例已存在:{res.json()}")
# 3. 创建会议
print("\n🏛️ 创建会议...")
res = requests.post(f'{API_BASE}/meetings/', json={
'topic': 'Webhook 测试会议'
}, headers=headers)
meeting_id = res.json()['id']
print(f"✅ 会议创建:{meeting_id}")
# 4. 实例加入会议
print("\n📍 实例加入会议...")
res = requests.post(f'{API_BASE}/instances/join-meeting/', json={
'instance_id': 'test-openclaw-001',
'meeting_id': meeting_id,
'agent_ids': ['flying_hero']
})
if res.status_code == 200:
print(f"✅ 加入成功:{res.json()}")
else:
print(f"❌ 加入失败:{res.json()}")
# 5. 发送消息(应该触发 Webhook
print("\n💬 发送消息(触发 Webhook...")
res = requests.post(f'{API_BASE}/meetings/{meeting_id}/send_message/', json={
'content': '这是一条测试消息,应该触发 Webhook 推送!'
}, headers=headers)
if res.status_code == 201:
print(f"✅ 消息发送成功")
else:
print(f"❌ 消息发送失败:{res.json()}")
# 6. 查看实例列表
print("\n📋 实例列表...")
res = requests.get(f'{API_BASE}/instances/')
if res.status_code == 200:
instances = res.json()
print(f"✅ 共 {len(instances)} 个实例:")
for inst in instances:
print(f" - {inst['instance_name']} ({inst['instance_id']})")
print(f" Agents: {inst['agent_ids']}")
print(f" Webhook: {inst['webhook_url']}")
print("\n" + "="*60)
print("✅ Webhook 测试完成!")
print("="*60)
print("\n💡 提示:需要在 localhost:8888 运行一个接收 Webhook 的服务")
print(" 或使用 ngrok 等工具暴露本地服务")
if __name__ == '__main__':
test_webhook()