Files
meeting-room/backend/meeting_agent.py
flying-hero 7697d26682 🦸 飞行侠完善:Agent 信箱 + 回复功能 + 完整测试
功能增强:
- meetings/views.py: inbox 接口支持 Agent 自动加入会议
- meetings/views.py: 新增 agent_reply 接口供 Agent 回复消息
- meeting_agent.py: 更新回复接口调用
- test_full.py: 新增完整功能测试脚本(7 项测试)
- README.md: 编写详细使用指南

测试结果:
 用户登录
 创建会议
 获取会议列表
 发送消息
 获取消息
 Agent 信箱(自动加入)
 Agent 回复
2026-04-04 11:28:24 +08:00

140 lines
4.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
龙虾会议轮询客户端
用法python meeting_agent.py --config meeting_config.json
"""
import requests
import time
import json
import argparse
from datetime import datetime
class MeetingAgent:
"""会议 Agent 轮询客户端"""
def __init__(self, config):
self.meeting_id = config['meeting_id']
self.agent_id = config['agent_id']
self.agent_name = config.get('agent_name', '飞行侠')
self.agent_emoji = config.get('agent_emoji', '🦸')
self.api_key = config['api_key']
self.api_base = config.get('api_base', 'http://localhost:8000')
self.check_interval = config.get('check_interval', 5)
self.headers = {
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json'
}
def check_inbox(self):
"""查阅信箱"""
try:
response = requests.get(
f'{self.api_base}/api/v1/meetings/{self.meeting_id}/inbox/',
params={'agent_id': self.agent_id},
headers=self.headers,
timeout=10
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"[{datetime.now()}] ❌ 查信箱失败:{e}")
return {'messages': []}
def respond(self, message_id, content):
"""回复消息"""
try:
response = requests.post(
f'{self.api_base}/api/v1/meetings/{self.meeting_id}/agent_reply/',
json={
'agent_id': self.agent_id,
'agent_name': self.agent_name,
'agent_emoji': self.agent_emoji,
'in_reply_to': message_id,
'content': content,
},
headers=self.headers,
timeout=10
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"[{datetime.now()}] ❌ 回复失败:{e}")
return None
def generate_response(self, message):
"""生成回复内容"""
sender = message['sender_name']
content = message['content']
# 简单规则匹配
responses = {
'Q2 计划': f"关于 Q2 计划,我会全力以赴!",
'你好': f"你好,{sender}!👋",
'在吗': f"我在!有什么事吗?",
'说说': f"让我想想... 我觉得这个很重要。",
}
# 匹配关键词
for keyword, response in responses.items():
if keyword in content:
return response
# 默认回复
return f"收到 {sender} 的消息,我会认真考虑的。"
def run(self):
"""运行轮询"""
print(f"🦐 {self.agent_emoji} {self.agent_name} 开始会议轮询")
print(f" 会议 ID: {self.meeting_id}")
print(f" 轮询间隔:{self.check_interval}")
print(f" API: {self.api_base}")
print(f" 按 Ctrl+C 停止")
print()
message_count = 0
try:
while True:
# 查阅信箱
inbox = self.check_inbox()
# 处理消息
for message in inbox['messages']:
if not message.get('read') and message.get('requires_response'):
print(f"[{datetime.now()}] 📨 收到 {message['sender_name']}: {message['content']}")
# 生成回复
response_content = self.generate_response(message)
print(f"[{datetime.now()}] 📤 回复:{response_content}")
# 发送回复
result = self.respond(message['id'], response_content)
if result:
print(f"[{datetime.now()}] ✅ 回复成功")
message_count += 1
# 等待
time.sleep(self.check_interval)
except KeyboardInterrupt:
print(f"\n👋 停止轮询,共处理 {message_count} 条消息")
def main():
parser = argparse.ArgumentParser(description='龙虾会议轮询客户端')
parser.add_argument('--config', '-c', required=True, help='配置文件路径')
args = parser.parse_args()
with open(args.config) as f:
config = json.load(f)
agent = MeetingAgent(config)
agent.run()
if __name__ == '__main__':
main()