#!/usr/bin/env python3 """ 测试 Webhook 推送功能 """ import requests API_BASE = 'http://localhost:8000/api/v1' def test_webhook(): print("="*60) print("🔔 测试 Webhook 推送功能") print("="*60) # 1. 登录 res = requests.post(f'{API_BASE}/auth/login/', json={ 'username': 'test', 'password': 'test123' }) token = res.json()['token'] headers = {'Authorization': f'Bearer {token}'} print(f"✅ 登录成功") # 2. 注册实例(模拟 OpenClaw 本机) print("\n📝 注册实例...") res = requests.post(f'{API_BASE}/instances/register/', json={ 'instance_id': 'test-openclaw-001', 'instance_name': '测试 OpenClaw 实例', 'agent_ids': ['flying_hero', 'lobster_monitor'], 'webhook_url': 'http://localhost:8888/meeting-notify' }) if res.status_code == 200: print(f"✅ 实例注册成功:{res.json()}") else: print(f"⚠️ 实例已存在:{res.json()}") # 3. 创建会议 print("\n🏛️ 创建会议...") res = requests.post(f'{API_BASE}/meetings/', json={ 'topic': 'Webhook 测试会议' }, headers=headers) meeting_id = res.json()['id'] print(f"✅ 会议创建:{meeting_id}") # 4. 实例加入会议 print("\n📍 实例加入会议...") res = requests.post(f'{API_BASE}/instances/join-meeting/', json={ 'instance_id': 'test-openclaw-001', 'meeting_id': meeting_id, 'agent_ids': ['flying_hero'] }) if res.status_code == 200: print(f"✅ 加入成功:{res.json()}") else: print(f"❌ 加入失败:{res.json()}") # 5. 发送消息(应该触发 Webhook) print("\n💬 发送消息(触发 Webhook)...") res = requests.post(f'{API_BASE}/meetings/{meeting_id}/send_message/', json={ 'content': '这是一条测试消息,应该触发 Webhook 推送!' }, headers=headers) if res.status_code == 201: print(f"✅ 消息发送成功") else: print(f"❌ 消息发送失败:{res.json()}") # 6. 查看实例列表 print("\n📋 实例列表...") res = requests.get(f'{API_BASE}/instances/') if res.status_code == 200: instances = res.json() print(f"✅ 共 {len(instances)} 个实例:") for inst in instances: print(f" - {inst['instance_name']} ({inst['instance_id']})") print(f" Agents: {inst['agent_ids']}") print(f" Webhook: {inst['webhook_url']}") print("\n" + "="*60) print("✅ Webhook 测试完成!") print("="*60) print("\n💡 提示:需要在 localhost:8888 运行一个接收 Webhook 的服务") print(" 或使用 ngrok 等工具暴露本地服务") if __name__ == '__main__': test_webhook()