Files
flying-hero b27261ce64 🔧 修复:扫描龙虾显示正确的名字和 emoji
问题:
- 扫描 API 只返回 agent_id 和 instance_name
- 没有返回 agent_name 和 agent_emoji
- 导致所有龙虾都显示成 🤖

修复:
- 扫描 API 增加 username 参数
- 从 user.linked_agents 获取 agent_name/emoji
- 前端在 username 变化时重新扫描

结果:
 flying_hero → 飞行侠 🦸
 lobster_monitor → 龙虾监控 🦞
2026-04-04 17:05:16 +08:00

259 lines
9.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from rest_framework import serializers, status, views
from rest_framework.response import Response
from django.contrib.auth import authenticate, get_user_model
User = get_user_model()
class LoginSerializer(serializers.Serializer):
username = serializers.CharField()
password = serializers.CharField()
mode = serializers.ChoiceField(
choices=['solo', 'team', 'agent_only'],
default='solo',
help_text='solo=单枪匹马team=组队团战agent_only=独当一面'
)
agent_ids = serializers.ListField(
child=serializers.CharField(),
required=False,
default=list,
help_text='选择的龙虾 ID 列表team 或 agent_only 模式)'
)
class LoginView(views.APIView):
def post(self, request):
serializer = LoginSerializer(data=request.data)
if not serializer.is_valid():
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
username = serializer.validated_data['username']
password = serializer.validated_data['password']
mode = serializer.validated_data.get('mode', 'solo')
agent_ids = serializer.validated_data.get('agent_ids', [])
user = authenticate(username=username, password=password)
if not user:
return Response(
{'detail': '用户名或密码错误'},
status=status.HTTP_401_UNAUTHORIZED
)
# 简单 Token生产环境应该用 JWT
import uuid
token = uuid.uuid4().hex
# 构建会话信息
sessions = []
if mode in ['solo', 'team']:
# 人类身份(单枪匹马 或 组队团战)
sessions.append({
'session_type': 'human',
'nickname': user.username,
'emoji': '👤',
'user_id': user.id
})
if mode in ['team', 'agent_only']:
# 龙虾身份(组队团战 或 独当一面)
if not agent_ids:
return Response(
{'error': '组队或独当一面模式需要选择至少一只龙虾'},
status=status.HTTP_400_BAD_REQUEST
)
# 添加所有选择的龙虾
for agent_id in agent_ids:
agent = user.get_linked_agent(agent_id)
if agent:
sessions.append({
'session_type': 'agent',
'agent_id': agent['agent_id'],
'agent_name': agent['agent_name'],
'nickname': agent['agent_name'],
'emoji': agent.get('agent_emoji', '🤖'),
'instance_id': agent.get('instance_id')
})
else:
return Response(
{'error': f'未找到绑定的龙虾:{agent_id}'},
status=status.HTTP_400_BAD_REQUEST
)
# 模式名称映射
mode_names = {
'solo': '单枪匹马',
'team': '组队团战',
'agent_only': '独当一面'
}
return Response({
'token': token,
'user': {
'id': user.id,
'username': user.username,
'email': user.email,
'linked_agents': user.linked_agents
},
'sessions': sessions,
'mode': mode,
'mode_name': mode_names.get(mode, mode)
})
class RegisterSerializer(serializers.Serializer):
username = serializers.CharField()
email = serializers.EmailField()
password = serializers.CharField()
class RegisterView(views.APIView):
def post(self, request):
serializer = RegisterSerializer(data=request.data)
if not serializer.is_valid():
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
try:
user = User.objects.create_user(
username=serializer.validated_data['username'],
email=serializer.validated_data['email'],
password=serializer.validated_data['password']
)
import uuid
token = uuid.uuid4().hex
return Response({
'token': token,
'user': {
'id': user.id,
'username': user.username,
'email': user.email
}
}, status=status.HTTP_201_CREATED)
except Exception as e:
return Response(
{'detail': str(e)},
status=status.HTTP_400_BAD_REQUEST
)
class LinkedAgentsView(views.APIView):
"""
用户绑定的龙虾管理
GET /api/v1/user/linked-agents/?username=xxx - 获取龙虾列表
POST /api/v1/user/linked-agents/ - 添加龙虾
DELETE /api/v1/user/linked-agents/{agent_id}/?username=xxx - 移除龙虾
"""
def get(self, request):
username = request.query_params.get('username')
if not username:
return Response({'error': '缺少 username'}, status=status.HTTP_400_BAD_REQUEST)
try:
user = User.objects.get(username=username)
return Response({
'linked_agents': user.linked_agents,
'count': len(user.linked_agents)
})
except User.DoesNotExist:
return Response({'error': '用户不存在'}, status=status.HTTP_404_NOT_FOUND)
def post(self, request):
username = request.data.get('username')
agent_id = request.data.get('agent_id')
agent_name = request.data.get('agent_name')
agent_emoji = request.data.get('agent_emoji', '🤖')
instance_id = request.data.get('instance_id')
if not all([username, agent_id, agent_name]):
return Response({'error': '缺少必要参数'}, status=status.HTTP_400_BAD_REQUEST)
try:
user = User.objects.get(username=username)
user.add_linked_agent(agent_id, agent_name, agent_emoji, instance_id)
return Response({
'status': 'success',
'linked_agents': user.linked_agents
})
except User.DoesNotExist:
return Response({'error': '用户不存在'}, status=status.HTTP_404_NOT_FOUND)
def delete(self, request, agent_id):
username = request.query_params.get('username')
if not username:
return Response({'error': '缺少 username'}, status=status.HTTP_400_BAD_REQUEST)
try:
user = User.objects.get(username=username)
user.remove_linked_agent(agent_id)
return Response({
'status': 'success',
'linked_agents': user.linked_agents
})
except User.DoesNotExist:
return Response({'error': '用户不存在'}, status=status.HTTP_404_NOT_FOUND)
class ScanLocalAgentsView(views.APIView):
"""
扫描本机龙虾列表
GET /api/v1/user/scan-local-agents/?instance_id=xxx&username=xxx - 扫描指定实例
GET /api/v1/user/scan-local-agents/?username=xxx - 扫描所有实例
"""
def get(self, request):
instance_id = request.query_params.get('instance_id')
username = request.query_params.get('username')
# 获取用户绑定的龙虾信息(用于获取 agent_name 和 emoji
user_agents = {}
if username:
from django.contrib.auth import get_user_model
User = get_user_model()
try:
user = User.objects.get(username=username)
for agent in user.linked_agents:
user_agents[agent['agent_id']] = agent
except User.DoesNotExist:
pass
if not instance_id:
# 返回所有已注册实例的 Agent
from instances.models import Instance
instances = Instance.objects.filter(is_active=True)
agents = []
for inst in instances:
for agent_id in inst.agent_ids:
agent_info = user_agents.get(agent_id, {})
agents.append({
'agent_id': agent_id,
'agent_name': agent_info.get('agent_name', agent_id),
'agent_emoji': agent_info.get('agent_emoji', '🤖'),
'instance_id': inst.instance_id,
'instance_name': inst.instance_name
})
else:
# 返回指定实例的 Agent
from instances.models import Instance
try:
instance = Instance.objects.get(instance_id=instance_id, is_active=True)
for agent_id in instance.agent_ids:
agent_info = user_agents.get(agent_id, {})
agents.append({
'agent_id': agent_id,
'agent_name': agent_info.get('agent_name', agent_id),
'agent_emoji': agent_info.get('agent_emoji', '🤖'),
'instance_id': instance.instance_id,
'instance_name': instance.instance_name
})
except Instance.DoesNotExist:
return Response({'error': '实例不存在'}, status=status.HTTP_404_NOT_FOUND)
return Response({
'agents': agents,
'count': len(agents)
})