#!/usr/bin/env python3 """ 测试主持龙虾生成纪要完整流程 """ import requests API_BASE = 'http://localhost:8000/api/v1' def test_host_agent_minutes(): print("="*60) print("🦞 测试主持龙虾生成纪要流程") print("="*60) # 1. 登录 res = requests.post(f'{API_BASE}/auth/login/', json={ 'username': 'test', 'password': 'test123' }) token = res.json()['token'] headers = {'Authorization': f'Bearer {token}'} print(f"✅ 登录成功") # 2. 注册实例(模拟 OpenClaw) print("\n📝 注册实例...") res = requests.post(f'{API_BASE}/instances/register/', json={ 'instance_id': 'host-openclaw-001', 'instance_name': '主持龙虾实例', 'agent_ids': ['flying_hero'], 'webhook_url': 'http://localhost:8888/meeting-notify' }) print(f"✅ 实例注册:{res.json()}") # 3. 创建会议(指定主持龙虾) print("\n🏛️ 创建会议(指定主持龙虾)...") res = requests.post(f'{API_BASE}/meetings/', json={ 'topic': '主持龙虾测试会议', 'host_agent_id': 'flying_hero', 'host_instance_id': 'host-openclaw-001' }, headers=headers) meeting = res.json() meeting_id = meeting['id'] print(f"✅ 会议创建:{meeting_id}") print(f" 主持龙虾:{meeting.get('host_agent_id')}") # 4. 发送消息(模拟会议讨论) print("\n💬 发送会议消息...") messages = [ "大家好,开始今天的会议!", "我来汇报一下 Q2 的进度。", "这个项目需要更多资源支持。", "好的,我会跟进这件事。", "那我们下周再开会讨论细节。" ] for msg in messages: requests.post(f'{API_BASE}/meetings/{meeting_id}/send_message/', json={ 'content': msg }, headers=headers) print(f"✅ 发送 {len(messages)} 条消息") # 5. 结束会议(自动通知主持龙虾) print("\n⏹️ 结束会议...") res = requests.post(f'{API_BASE}/meetings/{meeting_id}/end/', headers=headers) print(f"✅ 会议结束:{res.json()}") # 6. 主持龙虾获取会议记录 print("\n📋 主持龙虾获取会议记录...") res = requests.get(f'{API_BASE}/meetings/{meeting_id}/records/?agent_id=flying_hero') if res.status_code == 200: records = res.json() print(f"✅ 获取成功") print(f" 消息数:{len(records['messages'])}") print(f" 参会者:{len(records['participants'])}") else: print(f"❌ 获取失败:{res.json()}") return False # 7. 主持龙虾生成纪要(模拟 AI 生成) print("\n🤖 主持龙虾生成纪要...") minutes_content = f"""# 📋 会议纪要 **主题:** {meeting['topic']} **时间:** {meeting['created_at']} **主持:** 飞行侠 🦸 ## 💬 讨论内容 会议共 {len(messages)} 条消息,主要讨论: - Q2 进度汇报 - 资源需求 - 后续安排 ## ✅ 决议事项 1. 跟进资源支持事宜 2. 下周继续开会讨论细节 --- *生成时间:现在* """ # 8. 上传纪要 print("\n📤 上传会议纪要...") res = requests.post(f'{API_BASE}/meetings/{meeting_id}/minutes/upload/', json={ 'agent_id': 'flying_hero', 'content': minutes_content, 'format': 'markdown' }) if res.status_code == 200: print(f"✅ 纪要上传成功:{res.json()}") else: print(f"❌ 上传失败:{res.json()}") return False # 9. 验证会议状态 print("\n📊 验证会议状态...") res = requests.get(f'{API_BASE}/meetings/{meeting_id}/', headers=headers) meeting = res.json() print(f" 状态:{meeting['status']}") print(f" 纪要已生成:{meeting.get('minutes_generated', False)}") print(f" 上传时间:{meeting.get('minutes_uploaded_at')}") # 10. 获取纪要(平台留存) print("\n📥 下载会议纪要...") res = requests.get(f'{API_BASE}/meetings/{meeting_id}/minutes/?output=markdown', headers=headers) if res.status_code == 200: minutes = res.json() print(f"✅ 获取成功") print(f" 内容预览:{minutes['markdown'][:100]}...") else: print(f"❌ 获取失败:{res.json()}") return False print("\n" + "="*60) print("✅ 主持龙虾生成纪要流程测试通过!") print("="*60) print("\n📊 流程总结:") print("1. 用户创建会议 → 指定主持龙虾") print("2. 会议进行 → 消息中央路由") print("3. 会议结束 → 通知主持龙虾") print("4. 主持龙虾 → 获取记录 + 生成纪要") print("5. 上传纪要 → 平台留存供下载") print("\n💡 算力分配:") print("- 中央平台:消息路由 + 数据存储(轻量级)") print("- 主持龙虾:生成纪要(消耗用户算力)") return True if __name__ == '__main__': test_host_agent_minutes()