Files
meeting-room/backend/meeting_ai_sdk.py

303 lines
8.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
龙虾议事厅 - AI 操作 SDK
让 AI 可以直接操作会议系统
"""
import requests
from typing import Optional, List, Dict
from dataclasses import dataclass
from datetime import datetime
@dataclass
class Meeting:
"""会议数据类"""
id: str
topic: str
host: int
status: str
invite_code: str
created_at: datetime
participant_count: int
@dataclass
class Message:
"""消息数据类"""
id: int
meeting: str
sender_name: str
sender_emoji: str
content: str
created_at: datetime
class MeetingAIOperations:
"""
AI 专用的会议操作接口
使用示例:
```python
api = MeetingAIOperations('http://localhost:8000')
# 登录
await api.login('test', 'test123')
# 创建会议
meeting = await api.create_meeting('Q2 计划讨论')
# 发送消息
await api.send_message(meeting.id, '大家好!')
# 获取消息
messages = await api.get_messages(meeting.id)
```
"""
def __init__(self, api_base: str = 'http://localhost:8000'):
self.api_base = api_base
self.token: Optional[str] = None
self.session = requests.Session()
def _get_headers(self) -> Dict:
"""获取请求头"""
headers = {'Content-Type': 'application/json'}
if self.token:
headers['Authorization'] = f'Bearer {self.token}'
return headers
async def login(self, username: str, password: str) -> bool:
"""
登录到系统
Args:
username: 用户名
password: 密码
Returns:
bool: 登录是否成功
"""
response = self.session.post(
f'{self.api_base}/api/v1/auth/login/',
json={'username': username, 'password': password},
headers={'Content-Type': 'application/json'}
)
if response.status_code == 200:
data = response.json()
self.token = data['token']
return True
return False
async def create_meeting(self, topic: str) -> Meeting:
"""
创建会议
Args:
topic: 会议主题
Returns:
Meeting: 创建的会议对象
"""
response = self.session.post(
f'{self.api_base}/api/v1/meetings/',
json={'topic': topic},
headers=self._get_headers()
)
if response.status_code in [200, 201]:
data = response.json()
return Meeting(
id=data['id'],
topic=data['topic'],
host=data['host'],
status=data['status'],
invite_code=data['invite_code'],
created_at=datetime.fromisoformat(data['created_at'].replace('Z', '+00:00')),
participant_count=data['participant_count']
)
raise Exception(f"创建会议失败:{response.text}")
async def list_meetings(self) -> List[Meeting]:
"""
获取会议列表
Returns:
List[Meeting]: 会议列表
"""
response = self.session.get(
f'{self.api_base}/api/v1/meetings/',
headers=self._get_headers()
)
if response.status_code == 200:
data = response.json()
return [
Meeting(
id=m['id'],
topic=m['topic'],
host=m['host'],
status=m['status'],
invite_code=m['invite_code'],
created_at=datetime.fromisoformat(m['created_at'].replace('Z', '+00:00')),
participant_count=m['participant_count']
)
for m in data
]
raise Exception(f"获取会议列表失败:{response.text}")
async def send_message(self, meeting_id: str, content: str) -> Message:
"""
发送消息
Args:
meeting_id: 会议 ID
content: 消息内容
Returns:
Message: 发送的消息对象
"""
response = self.session.post(
f'{self.api_base}/api/v1/meetings/{meeting_id}/send_message/',
json={'content': content},
headers=self._get_headers()
)
if response.status_code in [200, 201]:
data = response.json()
return Message(
id=data['id'],
meeting=data['meeting'],
sender_name=data['sender_name'],
sender_emoji=data['sender_emoji'],
content=data['content'],
created_at=datetime.fromisoformat(data['created_at'].replace('Z', '+00:00'))
)
raise Exception(f"发送消息失败:{response.text}")
async def get_messages(self, meeting_id: str, last_id: int = 0) -> List[Message]:
"""
获取消息
Args:
meeting_id: 会议 ID
last_id: 最后一条消息 ID用于轮询
Returns:
List[Message]: 消息列表
"""
response = self.session.get(
f'{self.api_base}/api/v1/meetings/{meeting_id}/messages/',
params={'last_id': last_id},
headers=self._get_headers()
)
if response.status_code == 200:
data = response.json()
return [
Message(
id=m['id'],
meeting=m['meeting'],
sender_name=m['sender_name'],
sender_emoji=m['sender_emoji'],
content=m['content'],
created_at=datetime.fromisoformat(m['created_at'].replace('Z', '+00:00'))
)
for m in data['messages']
]
raise Exception(f"获取消息失败:{response.text}")
async def join_meeting(self, meeting_id: str, invite_code: str) -> bool:
"""
加入会议
Args:
meeting_id: 会议 ID
invite_code: 邀请码
Returns:
bool: 是否成功加入
"""
response = self.session.post(
f'{self.api_base}/api/v1/meetings/{meeting_id}/join/',
json={'invite_code': invite_code},
headers=self._get_headers()
)
return response.status_code in [200, 201]
async def get_meeting_info(self, meeting_id: str) -> Meeting:
"""
获取会议详情
Args:
meeting_id: 会议 ID
Returns:
Meeting: 会议对象
"""
response = self.session.get(
f'{self.api_base}/api/v1/meetings/{meeting_id}/',
headers=self._get_headers()
)
if response.status_code == 200:
data = response.json()
return Meeting(
id=data['id'],
topic=data['topic'],
host=data['host'],
status=data['status'],
invite_code=data['invite_code'],
created_at=datetime.fromisoformat(data['created_at'].replace('Z', '+00:00')),
participant_count=data.get('participant_count', 0)
)
raise Exception(f"获取会议详情失败:{response.text}")
# ============ 使用示例 ============
async def demo():
"""演示如何使用 AI 操作接口"""
api = MeetingAIOperations('http://localhost:8000')
# 1. 登录
print("🔐 登录...")
success = await api.login('test', 'test123')
if not success:
print("❌ 登录失败")
return
print("✅ 登录成功")
# 2. 创建会议
print("\n🏛️ 创建会议...")
meeting = await api.create_meeting('AI 自主创建的会议')
print(f"✅ 会议已创建:{meeting.id}")
print(f" 主题:{meeting.topic}")
print(f" 邀请码:{meeting.invite_code}")
# 3. 发送消息
print("\n💬 发送消息...")
message = await api.send_message(meeting.id, '这是 AI 自动发送的消息!')
print(f"✅ 消息已发送:{message.content}")
# 4. 获取消息
print("\n📨 获取消息...")
messages = await api.get_messages(meeting.id)
print(f"✅ 共 {len(messages)} 条消息")
for msg in messages:
print(f" {msg.sender_name}: {msg.content}")
# 5. 获取会议列表
print("\n📋 获取会议列表...")
meetings = await api.list_meetings()
print(f"✅ 共 {len(meetings)} 个会议")
for m in meetings:
print(f" {m.topic} ({m.invite_code})")
if __name__ == '__main__':
import asyncio
asyncio.run(demo())