🎭 飞行侠实现:多身份登录系统

核心功能:
- 用户模型扩展:linked_agents 字段存储绑定龙虾
- 登录 API 支持 3 种模式:human_only / agent_only / both
- 龙虾管理 API:绑定/解绑/列表
- 扫描本机龙虾 API:从注册实例获取

API 端点:
- POST /api/v1/auth/login/ - 支持 login_mode 和 selected_agent_id
- GET  /api/v1/user/linked-agents/ - 获取绑定龙虾
- POST /api/v1/user/linked-agents/ - 添加绑定龙虾
- DELETE /api/v1/user/linked-agents/{id}/ - 移除龙虾
- GET  /api/v1/user/scan-local-agents/ - 扫描本机龙虾

登录模式:
1. human_only - 纯人类身份(1 个座位)
2. agent_only - 纯龙虾身份(1 个座位)
3. both - 双重身份(2 个座位)

测试:
- test_multi_identity.py: 完整测试通过

使用场景:
- 普通用户参会 → human_only
- 龙虾独立参会 → agent_only
- 用户带龙虾助理 → both
This commit is contained in:
2026-04-04 12:53:02 +08:00
parent 9a30cc3945
commit 97da46b219
6 changed files with 471 additions and 6 deletions

View File

@@ -133,6 +133,9 @@ STATIC_URL = "static/"
DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField" DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField"
# 自定义用户模型
AUTH_USER_MODEL = 'users.User'
# REST Framework 配置 # REST Framework 配置
REST_FRAMEWORK = { REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': [], 'DEFAULT_AUTHENTICATION_CLASSES': [],

View File

@@ -3,7 +3,7 @@ from django.urls import path, include, re_path
from django.views.generic import TemplateView from django.views.generic import TemplateView
from rest_framework.routers import DefaultRouter from rest_framework.routers import DefaultRouter
from meetings.views import MeetingViewSet, ParticipantViewSet from meetings.views import MeetingViewSet, ParticipantViewSet
from users.views import LoginView, RegisterView from users.views import LoginView, RegisterView, LinkedAgentsView, ScanLocalAgentsView
from instances.views import InstanceRegisterView, MeetingJoinView, InstanceListView, WebhookNotifyView from instances.views import InstanceRegisterView, MeetingJoinView, InstanceListView, WebhookNotifyView
from meetings.minutes_api import MeetingRecordsView, MinutesUploadView, MeetingEndNotifyView from meetings.minutes_api import MeetingRecordsView, MinutesUploadView, MeetingEndNotifyView
@@ -16,6 +16,11 @@ urlpatterns = [
path("", TemplateView.as_view(template_name="meeting_room.html"), name="home"), path("", TemplateView.as_view(template_name="meeting_room.html"), name="home"),
path("api/v1/auth/login/", LoginView.as_view()), path("api/v1/auth/login/", LoginView.as_view()),
path("api/v1/auth/register/", RegisterView.as_view()), path("api/v1/auth/register/", RegisterView.as_view()),
# 用户龙虾管理
path("api/v1/user/linked-agents/", LinkedAgentsView.as_view()),
path("api/v1/user/linked-agents/<str:agent_id>/", LinkedAgentsView.as_view()),
path("api/v1/user/scan-local-agents/", ScanLocalAgentsView.as_view()),
# 实例管理
path("api/v1/instances/register/", InstanceRegisterView.as_view()), path("api/v1/instances/register/", InstanceRegisterView.as_view()),
path("api/v1/instances/join-meeting/", MeetingJoinView.as_view()), path("api/v1/instances/join-meeting/", MeetingJoinView.as_view()),
path("api/v1/instances/", InstanceListView.as_view()), path("api/v1/instances/", InstanceListView.as_view()),

View File

@@ -0,0 +1,139 @@
#!/usr/bin/env python3
"""
测试多身份登录系统
"""
import requests
API_BASE = 'http://localhost:8000/api/v1'
def test_multi_identity_login():
print("="*60)
print("🎭 测试多身份登录系统")
print("="*60)
# 1. 创建测试用户
print("\n📝 创建测试用户...")
res = requests.post(f'{API_BASE}/auth/register/', json={
'username': 'polaris',
'email': 'polaris@example.com',
'password': 'password123'
})
if res.status_code == 201:
print(f"✅ 用户创建成功polaris")
else:
print(f"⚠️ 用户可能已存在:{res.json()}")
# 2. 注册实例(模拟 OpenClaw
print("\n🔧 注册实例...")
res = requests.post(f'{API_BASE}/instances/register/', json={
'instance_id': 'phospher-openclaw',
'instance_name': '飞行侠的 OpenClaw',
'agent_ids': ['flying_hero', 'lobster_monitor'],
'webhook_url': 'http://localhost:8888/meeting-notify'
})
print(f"✅ 实例注册:{res.json()}")
# 3. 绑定龙虾到用户
print("\n🔗 绑定龙虾到用户...")
res = requests.post(f'{API_BASE}/user/linked-agents/', json={
'username': 'polaris',
'agent_id': 'flying_hero',
'agent_name': '飞行侠',
'agent_emoji': '🦸',
'instance_id': 'phospher-openclaw'
})
if res.status_code == 200:
print(f"✅ 龙虾绑定成功:{res.json()['linked_agents']}")
else:
print(f"⚠️ 可能已绑定:{res.json()}")
# 4. 扫描本机龙虾
print("\n📡 扫描本机龙虾...")
res = requests.get(f'{API_BASE}/user/scan-local-agents/?instance_id=phospher-openclaw')
if res.status_code == 200:
agents = res.json()['agents']
print(f"✅ 扫描到 {len(agents)} 只龙虾:")
for a in agents:
print(f" - {a['agent_id']} ({a['instance_name']})")
else:
print(f"❌ 扫描失败:{res.json()}")
# 5. 测试纯人类登录
print("\n👤 测试纯人类登录...")
res = requests.post(f'{API_BASE}/auth/login/', json={
'username': 'polaris',
'password': 'password123',
'login_mode': 'human_only'
})
if res.status_code == 200:
data = res.json()
print(f"✅ 登录成功")
print(f" 模式:{data['login_mode']}")
print(f" 会话数:{len(data['sessions'])}")
for s in data['sessions']:
print(f" - {s['session_type']}: {s['nickname']} ({s['emoji']})")
else:
print(f"❌ 登录失败:{res.json()}")
# 6. 测试纯龙虾登录
print("\n🦞 测试纯龙虾登录...")
res = requests.post(f'{API_BASE}/auth/login/', json={
'username': 'polaris',
'password': 'password123',
'login_mode': 'agent_only',
'selected_agent_id': 'flying_hero'
})
if res.status_code == 200:
data = res.json()
print(f"✅ 登录成功")
print(f" 模式:{data['login_mode']}")
print(f" 会话数:{len(data['sessions'])}")
for s in data['sessions']:
print(f" - {s['session_type']}: {s['nickname']} ({s['emoji']})")
else:
print(f"❌ 登录失败:{res.json()}")
# 7. 测试双重身份登录
print("\n👤+🦞 测试双重身份登录...")
res = requests.post(f'{API_BASE}/auth/login/', json={
'username': 'polaris',
'password': 'password123',
'login_mode': 'both',
'selected_agent_id': 'flying_hero'
})
if res.status_code == 200:
data = res.json()
print(f"✅ 登录成功")
print(f" 模式:{data['login_mode']}")
print(f" 会话数:{len(data['sessions'])}")
for s in data['sessions']:
print(f" - {s['session_type']}: {s['nickname']} ({s['emoji']})")
else:
print(f"❌ 登录失败:{res.json()}")
# 8. 获取用户绑定的龙虾列表
print("\n📋 获取用户绑定的龙虾列表...")
res = requests.get(f'{API_BASE}/user/linked-agents/?username=polaris')
if res.status_code == 200:
print(f"✅ 获取成功")
print(f" 龙虾数:{res.json()['count']}")
for a in res.json()['linked_agents']:
print(f" - {a['agent_name']} ({a['agent_id']}) {a['agent_emoji']}")
else:
print(f"❌ 获取失败:{res.json()}")
print("\n" + "="*60)
print("✅ 多身份登录系统测试完成!")
print("="*60)
print("\n📊 登录模式总结:")
print("1. human_only - 纯人类身份1 个座位)")
print("2. agent_only - 纯龙虾身份1 个座位)")
print("3. both - 双重身份2 个座位)")
print("\n💡 使用场景:")
print("- 普通用户参会 → human_only")
print("- 龙虾独立参会 → agent_only")
print("- 用户带龙虾助理 → both")
if __name__ == '__main__':
test_multi_identity_login()

View File

@@ -0,0 +1,134 @@
# Generated by Django 6.0.3 on 2026-04-04 04:52
import django.contrib.auth.models
import django.contrib.auth.validators
import django.utils.timezone
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
("auth", "0012_alter_user_first_name_max_length"),
]
operations = [
migrations.CreateModel(
name="User",
fields=[
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("password", models.CharField(max_length=128, verbose_name="password")),
(
"last_login",
models.DateTimeField(
blank=True, null=True, verbose_name="last login"
),
),
(
"is_superuser",
models.BooleanField(
default=False,
help_text="Designates that this user has all permissions without explicitly assigning them.",
verbose_name="superuser status",
),
),
(
"username",
models.CharField(
error_messages={
"unique": "A user with that username already exists."
},
help_text="Required. 150 characters or fewer. Letters, digits and @/./+/-/_ only.",
max_length=150,
unique=True,
validators=[
django.contrib.auth.validators.UnicodeUsernameValidator()
],
verbose_name="username",
),
),
(
"first_name",
models.CharField(
blank=True, max_length=150, verbose_name="first name"
),
),
(
"last_name",
models.CharField(
blank=True, max_length=150, verbose_name="last name"
),
),
(
"is_staff",
models.BooleanField(
default=False,
help_text="Designates whether the user can log into this admin site.",
verbose_name="staff status",
),
),
(
"is_active",
models.BooleanField(
default=True,
help_text="Designates whether this user should be treated as active. Unselect this instead of deleting accounts.",
verbose_name="active",
),
),
(
"date_joined",
models.DateTimeField(
default=django.utils.timezone.now, verbose_name="date joined"
),
),
("email", models.EmailField(max_length=254, unique=True)),
("created_at", models.DateTimeField(auto_now_add=True)),
(
"linked_agents",
models.JSONField(
blank=True, default=list, verbose_name="绑定的龙虾"
),
),
(
"groups",
models.ManyToManyField(
blank=True,
help_text="The groups this user belongs to. A user will get all permissions granted to each of their groups.",
related_name="user_set",
related_query_name="user",
to="auth.group",
verbose_name="groups",
),
),
(
"user_permissions",
models.ManyToManyField(
blank=True,
help_text="Specific permissions for this user.",
related_name="user_set",
related_query_name="user",
to="auth.permission",
verbose_name="user permissions",
),
),
],
options={
"verbose_name": "用户",
"verbose_name_plural": "用户",
"db_table": "users",
},
managers=[
("objects", django.contrib.auth.models.UserManager()),
],
),
]

View File

@@ -1,3 +1,50 @@
from django.contrib.auth.models import AbstractUser
from django.db import models from django.db import models
# Create your models here.
class User(AbstractUser):
"""扩展用户模型"""
email = models.EmailField(unique=True)
created_at = models.DateTimeField(auto_now_add=True)
# 绑定的龙虾列表JSON 存储)
# 示例:[{"agent_id": "flying_hero", "agent_name": "飞行侠", "agent_emoji": "🦸", "instance_id": "phospher-openclaw"}]
linked_agents = models.JSONField(default=list, verbose_name='绑定的龙虾', blank=True)
class Meta:
db_table = 'users'
verbose_name = '用户'
verbose_name_plural = '用户'
# 避免与 auth.User 冲突
app_label = 'users'
def __str__(self):
return self.username
def add_linked_agent(self, agent_id: str, agent_name: str, agent_emoji: str = '🤖', instance_id: str = None):
"""添加绑定的龙虾"""
agent = {
'agent_id': agent_id,
'agent_name': agent_name,
'agent_emoji': agent_emoji,
'instance_id': instance_id
}
# 避免重复
for i, a in enumerate(self.linked_agents):
if a.get('agent_id') == agent_id:
self.linked_agents[i] = agent
return
self.linked_agents.append(agent)
self.save()
def remove_linked_agent(self, agent_id: str):
"""移除绑定的龙虾"""
self.linked_agents = [a for a in self.linked_agents if a.get('agent_id') != agent_id]
self.save()
def get_linked_agent(self, agent_id: str):
"""获取指定的龙虾信息"""
for agent in self.linked_agents:
if agent.get('agent_id') == agent_id:
return agent
return None

View File

@@ -8,6 +8,11 @@ User = get_user_model()
class LoginSerializer(serializers.Serializer): class LoginSerializer(serializers.Serializer):
username = serializers.CharField() username = serializers.CharField()
password = serializers.CharField() password = serializers.CharField()
login_mode = serializers.ChoiceField(
choices=['human_only', 'agent_only', 'both'],
default='human_only'
)
selected_agent_id = serializers.CharField(required=False, allow_blank=True)
class LoginView(views.APIView): class LoginView(views.APIView):
@@ -18,6 +23,8 @@ class LoginView(views.APIView):
username = serializer.validated_data['username'] username = serializer.validated_data['username']
password = serializer.validated_data['password'] password = serializer.validated_data['password']
login_mode = serializer.validated_data.get('login_mode', 'human_only')
selected_agent_id = serializer.validated_data.get('selected_agent_id')
user = authenticate(username=username, password=password) user = authenticate(username=username, password=password)
if not user: if not user:
@@ -30,16 +37,47 @@ class LoginView(views.APIView):
import uuid import uuid
token = uuid.uuid4().hex token = uuid.uuid4().hex
# 实际项目中应该存储 token 到数据库/缓存 # 构建会话信息
# 这里简化处理 sessions = []
if login_mode in ['human_only', 'both']:
# 人类身份
sessions.append({
'session_type': 'human',
'nickname': user.username,
'emoji': '👤',
'user_id': user.id
})
if login_mode in ['agent_only', 'both']:
# 龙虾身份
if selected_agent_id:
agent = user.get_linked_agent(selected_agent_id)
if agent:
sessions.append({
'session_type': 'agent',
'agent_id': agent['agent_id'],
'agent_name': agent['agent_name'],
'nickname': agent['agent_name'],
'emoji': agent.get('agent_emoji', '🤖'),
'instance_id': agent.get('instance_id')
})
else:
return Response(
{'error': f'未找到绑定的龙虾:{selected_agent_id}'},
status=status.HTTP_400_BAD_REQUEST
)
return Response({ return Response({
'token': token, 'token': token,
'user': { 'user': {
'id': user.id, 'id': user.id,
'username': user.username, 'username': user.username,
'email': user.email 'email': user.email,
} 'linked_agents': user.linked_agents
},
'sessions': sessions,
'login_mode': login_mode
}) })
@@ -78,3 +116,102 @@ class RegisterView(views.APIView):
{'detail': str(e)}, {'detail': str(e)},
status=status.HTTP_400_BAD_REQUEST status=status.HTTP_400_BAD_REQUEST
) )
class LinkedAgentsView(views.APIView):
"""
用户绑定的龙虾管理
GET /api/v1/user/linked-agents/?username=xxx - 获取龙虾列表
POST /api/v1/user/linked-agents/ - 添加龙虾
DELETE /api/v1/user/linked-agents/{agent_id}/?username=xxx - 移除龙虾
"""
def get(self, request):
username = request.query_params.get('username')
if not username:
return Response({'error': '缺少 username'}, status=status.HTTP_400_BAD_REQUEST)
try:
user = User.objects.get(username=username)
return Response({
'linked_agents': user.linked_agents,
'count': len(user.linked_agents)
})
except User.DoesNotExist:
return Response({'error': '用户不存在'}, status=status.HTTP_404_NOT_FOUND)
def post(self, request):
username = request.data.get('username')
agent_id = request.data.get('agent_id')
agent_name = request.data.get('agent_name')
agent_emoji = request.data.get('agent_emoji', '🤖')
instance_id = request.data.get('instance_id')
if not all([username, agent_id, agent_name]):
return Response({'error': '缺少必要参数'}, status=status.HTTP_400_BAD_REQUEST)
try:
user = User.objects.get(username=username)
user.add_linked_agent(agent_id, agent_name, agent_emoji, instance_id)
return Response({
'status': 'success',
'linked_agents': user.linked_agents
})
except User.DoesNotExist:
return Response({'error': '用户不存在'}, status=status.HTTP_404_NOT_FOUND)
def delete(self, request, agent_id):
username = request.query_params.get('username')
if not username:
return Response({'error': '缺少 username'}, status=status.HTTP_400_BAD_REQUEST)
try:
user = User.objects.get(username=username)
user.remove_linked_agent(agent_id)
return Response({
'status': 'success',
'linked_agents': user.linked_agents
})
except User.DoesNotExist:
return Response({'error': '用户不存在'}, status=status.HTTP_404_NOT_FOUND)
class ScanLocalAgentsView(views.APIView):
"""
扫描本机龙虾列表
GET /api/v1/user/scan-local-agents/?instance_id=xxx - 扫描指定实例
GET /api/v1/user/scan-local-agents/ - 扫描所有实例
"""
def get(self, request):
instance_id = request.query_params.get('instance_id')
if not instance_id:
# 返回所有已注册实例的 Agent
from instances.models import Instance
instances = Instance.objects.filter(is_active=True)
agents = []
for inst in instances:
for agent_id in inst.agent_ids:
agents.append({
'agent_id': agent_id,
'instance_id': inst.instance_id,
'instance_name': inst.instance_name
})
else:
# 返回指定实例的 Agent
from instances.models import Instance
try:
instance = Instance.objects.get(instance_id=instance_id, is_active=True)
agents = [{
'agent_id': agent_id,
'instance_id': instance.instance_id,
'instance_name': instance.instance_name
} for agent_id in instance.agent_ids]
except Instance.DoesNotExist:
return Response({'error': '实例不存在'}, status=status.HTTP_404_NOT_FOUND)
return Response({
'agents': agents,
'count': len(agents)
})