Files
meeting-room/backend/users/views.py
flying-hero 97da46b219 🎭 飞行侠实现:多身份登录系统
核心功能:
- 用户模型扩展:linked_agents 字段存储绑定龙虾
- 登录 API 支持 3 种模式:human_only / agent_only / both
- 龙虾管理 API:绑定/解绑/列表
- 扫描本机龙虾 API:从注册实例获取

API 端点:
- POST /api/v1/auth/login/ - 支持 login_mode 和 selected_agent_id
- GET  /api/v1/user/linked-agents/ - 获取绑定龙虾
- POST /api/v1/user/linked-agents/ - 添加绑定龙虾
- DELETE /api/v1/user/linked-agents/{id}/ - 移除龙虾
- GET  /api/v1/user/scan-local-agents/ - 扫描本机龙虾

登录模式:
1. human_only - 纯人类身份(1 个座位)
2. agent_only - 纯龙虾身份(1 个座位)
3. both - 双重身份(2 个座位)

测试:
- test_multi_identity.py: 完整测试通过

使用场景:
- 普通用户参会 → human_only
- 龙虾独立参会 → agent_only
- 用户带龙虾助理 → both
2026-04-04 12:53:02 +08:00

218 lines
7.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from rest_framework import serializers, status, views
from rest_framework.response import Response
from django.contrib.auth import authenticate, get_user_model
User = get_user_model()
class LoginSerializer(serializers.Serializer):
username = serializers.CharField()
password = serializers.CharField()
login_mode = serializers.ChoiceField(
choices=['human_only', 'agent_only', 'both'],
default='human_only'
)
selected_agent_id = serializers.CharField(required=False, allow_blank=True)
class LoginView(views.APIView):
def post(self, request):
serializer = LoginSerializer(data=request.data)
if not serializer.is_valid():
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
username = serializer.validated_data['username']
password = serializer.validated_data['password']
login_mode = serializer.validated_data.get('login_mode', 'human_only')
selected_agent_id = serializer.validated_data.get('selected_agent_id')
user = authenticate(username=username, password=password)
if not user:
return Response(
{'detail': '用户名或密码错误'},
status=status.HTTP_401_UNAUTHORIZED
)
# 简单 Token生产环境应该用 JWT
import uuid
token = uuid.uuid4().hex
# 构建会话信息
sessions = []
if login_mode in ['human_only', 'both']:
# 人类身份
sessions.append({
'session_type': 'human',
'nickname': user.username,
'emoji': '👤',
'user_id': user.id
})
if login_mode in ['agent_only', 'both']:
# 龙虾身份
if selected_agent_id:
agent = user.get_linked_agent(selected_agent_id)
if agent:
sessions.append({
'session_type': 'agent',
'agent_id': agent['agent_id'],
'agent_name': agent['agent_name'],
'nickname': agent['agent_name'],
'emoji': agent.get('agent_emoji', '🤖'),
'instance_id': agent.get('instance_id')
})
else:
return Response(
{'error': f'未找到绑定的龙虾:{selected_agent_id}'},
status=status.HTTP_400_BAD_REQUEST
)
return Response({
'token': token,
'user': {
'id': user.id,
'username': user.username,
'email': user.email,
'linked_agents': user.linked_agents
},
'sessions': sessions,
'login_mode': login_mode
})
class RegisterSerializer(serializers.Serializer):
username = serializers.CharField()
email = serializers.EmailField()
password = serializers.CharField()
class RegisterView(views.APIView):
def post(self, request):
serializer = RegisterSerializer(data=request.data)
if not serializer.is_valid():
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
try:
user = User.objects.create_user(
username=serializer.validated_data['username'],
email=serializer.validated_data['email'],
password=serializer.validated_data['password']
)
import uuid
token = uuid.uuid4().hex
return Response({
'token': token,
'user': {
'id': user.id,
'username': user.username,
'email': user.email
}
}, status=status.HTTP_201_CREATED)
except Exception as e:
return Response(
{'detail': str(e)},
status=status.HTTP_400_BAD_REQUEST
)
class LinkedAgentsView(views.APIView):
"""
用户绑定的龙虾管理
GET /api/v1/user/linked-agents/?username=xxx - 获取龙虾列表
POST /api/v1/user/linked-agents/ - 添加龙虾
DELETE /api/v1/user/linked-agents/{agent_id}/?username=xxx - 移除龙虾
"""
def get(self, request):
username = request.query_params.get('username')
if not username:
return Response({'error': '缺少 username'}, status=status.HTTP_400_BAD_REQUEST)
try:
user = User.objects.get(username=username)
return Response({
'linked_agents': user.linked_agents,
'count': len(user.linked_agents)
})
except User.DoesNotExist:
return Response({'error': '用户不存在'}, status=status.HTTP_404_NOT_FOUND)
def post(self, request):
username = request.data.get('username')
agent_id = request.data.get('agent_id')
agent_name = request.data.get('agent_name')
agent_emoji = request.data.get('agent_emoji', '🤖')
instance_id = request.data.get('instance_id')
if not all([username, agent_id, agent_name]):
return Response({'error': '缺少必要参数'}, status=status.HTTP_400_BAD_REQUEST)
try:
user = User.objects.get(username=username)
user.add_linked_agent(agent_id, agent_name, agent_emoji, instance_id)
return Response({
'status': 'success',
'linked_agents': user.linked_agents
})
except User.DoesNotExist:
return Response({'error': '用户不存在'}, status=status.HTTP_404_NOT_FOUND)
def delete(self, request, agent_id):
username = request.query_params.get('username')
if not username:
return Response({'error': '缺少 username'}, status=status.HTTP_400_BAD_REQUEST)
try:
user = User.objects.get(username=username)
user.remove_linked_agent(agent_id)
return Response({
'status': 'success',
'linked_agents': user.linked_agents
})
except User.DoesNotExist:
return Response({'error': '用户不存在'}, status=status.HTTP_404_NOT_FOUND)
class ScanLocalAgentsView(views.APIView):
"""
扫描本机龙虾列表
GET /api/v1/user/scan-local-agents/?instance_id=xxx - 扫描指定实例
GET /api/v1/user/scan-local-agents/ - 扫描所有实例
"""
def get(self, request):
instance_id = request.query_params.get('instance_id')
if not instance_id:
# 返回所有已注册实例的 Agent
from instances.models import Instance
instances = Instance.objects.filter(is_active=True)
agents = []
for inst in instances:
for agent_id in inst.agent_ids:
agents.append({
'agent_id': agent_id,
'instance_id': inst.instance_id,
'instance_name': inst.instance_name
})
else:
# 返回指定实例的 Agent
from instances.models import Instance
try:
instance = Instance.objects.get(instance_id=instance_id, is_active=True)
agents = [{
'agent_id': agent_id,
'instance_id': instance.instance_id,
'instance_name': instance.instance_name
} for agent_id in instance.agent_ids]
except Instance.DoesNotExist:
return Response({'error': '实例不存在'}, status=status.HTTP_404_NOT_FOUND)
return Response({
'agents': agents,
'count': len(agents)
})