Files
chengshishouce/city-manual/backend/agents/views.py
maoshen d9e09b61ee feat: 实现 AI-First 代理系统
核心功能:
- AIAgent 模型:AI 代理身份管理
- AIOperationLog: AI 操作日志记录
- AITask: AI 异步任务系统
- AIWebhook: AI webhook 订阅

API 端点:
- POST /api/agents/auth/ - AI 代理认证
- GET/POST /api/agents/ - 代理管理
- GET /api/agent-logs/ - 操作日志查询
- GET/POST /api/agent-tasks/ - 任务管理
- GET/POST /api/agent-webhooks/ - Webhook 管理
- POST /api/batch/ - 批量操作

预置 AI 代理:
- content-moderator-ai: 内容审核 AI
- content-generator-ai: 内容生成 AI
- service-curator-ai: 服务推荐 AI
- analytics-ai: 数据分析 AI
- admin-ai: 管理员 AI

文档:
- AI_AGENT.md: AI-First 设计文档
- init_agents.py: AI 代理初始化脚本

测试:
- 认证系统测试通过
- JWT token 生成正常
- 权限系统工作正常
2026-04-12 11:40:11 +00:00

366 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import time
import uuid
from datetime import timedelta
from django.utils import timezone
from rest_framework import status, viewsets
from rest_framework.decorators import action, api_view, permission_classes
from rest_framework.response import Response
from rest_framework.permissions import AllowAny, IsAuthenticated
from rest_framework_simplejwt.tokens import RefreshToken
from django.db.models import Count, Q
from django.http import JsonResponse
from .models import AIAgent, AIOperationLog, AITask, AIWebhook
from .serializers import (
AIAgentSerializer, AIAgentAuthSerializer, AIOperationLogSerializer,
AITaskSerializer, AITaskCreateSerializer, AIWebhookSerializer,
BatchOperationSerializer
)
class AIAgentViewSet(viewsets.ModelViewSet):
"""AI 代理管理"""
queryset = AIAgent.objects.all()
serializer_class = AIAgentSerializer
lookup_field = 'agent_id'
def get_permissions(self):
if self.action == 'auth':
return [AllowAny()]
return [IsAuthenticated()]
@action(detail=False, methods=['post'], permission_classes=[AllowAny])
def auth(self, request):
"""AI 代理认证"""
serializer = AIAgentAuthSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
agent = serializer.instance
# 生成自定义 JWT token不使用 for_user因为 AI 不是 Django User
from rest_framework_simplejwt.tokens import AccessToken
from datetime import timedelta
from django.utils import timezone
# 创建 access token
access = AccessToken()
access['agent_id'] = agent.agent_id
access['permissions'] = agent.permissions
access['type'] = 'agent'
access.set_exp(lifetime=timedelta(hours=1))
# 创建 refresh token
from rest_framework_simplejwt.tokens import RefreshToken as BaseRefreshToken
class AgentRefreshToken(BaseRefreshToken):
token_type = 'refresh'
@classmethod
def for_agent(cls, agent):
token = cls()
token['agent_id'] = agent.agent_id
token['permissions'] = agent.permissions
token['type'] = 'agent'
return token
refresh = AgentRefreshToken.for_agent(agent)
# 记录登录
AIOperationLog.log(
agent=agent,
action='auth',
resource_type='agent',
status='success',
confidence=1.0,
reasoning='Agent authentication successful'
)
return Response({
'access_token': str(access),
'refresh_token': str(refresh),
'expires_in': 3600,
'agent_info': {
'id': agent.agent_id,
'name': agent.name,
'permissions': agent.permissions,
'rate_limit': agent.rate_limit,
}
})
@action(detail=True, methods=['get'])
def stats(self, request, agent_id):
"""获取代理统计信息"""
agent = self.get_object()
# 操作统计
operations = AIOperationLog.objects.filter(agent=agent)
stats = {
'total_operations': operations.count(),
'success_operations': operations.filter(status='success').count(),
'failed_operations': operations.filter(status='failed').count(),
'avg_confidence': operations.filter(confidence__isnull=False).aggregate(
avg=Count('confidence')
)['avg'],
}
# 任务统计
tasks = AITask.objects.filter(agent=agent)
stats['tasks'] = {
'total': tasks.count(),
'completed': tasks.filter(status='completed').count(),
'failed': tasks.filter(status='failed').count(),
'processing': tasks.filter(status='processing').count(),
}
# 最近 7 天操作趋势
from datetime import datetime, timedelta
seven_days_ago = timezone.now() - timedelta(days=7)
daily_ops = operations.filter(
created_at__gte=seven_days_ago
).extra(
select={'date': 'date(created_at)'}
).values('date').annotate(count=Count('id'))
stats['daily_operations'] = list(daily_ops)
return Response(stats)
@action(detail=True, methods=['post'])
def rotate_secret(self, request, agent_id):
"""轮换密钥"""
agent = self.get_object()
agent.secret_key = uuid.uuid4().hex
agent.save()
AIOperationLog.log(
agent=agent,
action='rotate_secret',
resource_type='agent',
status='success'
)
return Response({'message': 'Secret key rotated', 'new_secret': agent.secret_key})
class AIOperationLogViewSet(viewsets.ReadOnlyModelViewSet):
"""AI 操作日志查询"""
queryset = AIOperationLog.objects.all()
serializer_class = AIOperationLogSerializer
def get_queryset(self):
queryset = super().get_queryset()
# 过滤
agent_id = self.request.query_params.get('agent_id')
if agent_id:
queryset = queryset.filter(agent__agent_id=agent_id)
action = self.request.query_params.get('action')
if action:
queryset = queryset.filter(action=action)
resource_type = self.request.query_params.get('resource_type')
if resource_type:
queryset = queryset.filter(resource_type=resource_type)
status = self.request.query_params.get('status')
if status:
queryset = queryset.filter(status=status)
return queryset
@action(detail=False, methods=['get'])
def summary(self, request):
"""操作日志摘要"""
# 按代理统计
by_agent = AIOperationLog.objects.values('agent__agent_id', 'agent__name').annotate(
total=Count('id'),
success=Count('id', filter=Q(status='success')),
failed=Count('id', filter=Q(status='failed'))
).order_by('-total')
# 按操作类型统计
by_action = AIOperationLog.objects.values('action').annotate(
count=Count('id')
).order_by('-count')[:10]
# 按资源类型统计
by_resource = AIOperationLog.objects.values('resource_type').annotate(
count=Count('id')
).order_by('-count')[:10]
return Response({
'by_agent': list(by_agent),
'by_action': list(by_action),
'by_resource': list(by_resource),
'total': AIOperationLog.objects.count(),
})
class AITaskViewSet(viewsets.ModelViewSet):
"""AI 任务管理"""
queryset = AITask.objects.all()
serializer_class = AITaskSerializer
lookup_field = 'task_id'
def get_serializer_class(self):
if self.action == 'create':
return AITaskCreateSerializer
return AITaskSerializer
def create(self, request, *args, **kwargs):
"""创建任务"""
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
# 获取 AI 代理(从 token 中)
agent_id = request.auth.payload.get('agent_id') if hasattr(request, 'auth') else None
if not agent_id:
return Response({'error': 'Agent authentication required'}, status=401)
try:
agent = AIAgent.objects.get(agent_id=agent_id)
except AIOperationLog.DoesNotExist:
return Response({'error': 'Agent not found'}, status=404)
# 创建任务
task = AITask.objects.create(
task_id=uuid.uuid4().hex,
agent=agent,
task_type=serializer.validated_data['task_type'],
status='pending',
callback_url=serializer.validated_data.get('callback_url')
)
# TODO: 将任务加入队列异步处理
# 这里简化处理,如果是 batch 任务,立即处理
if task.task_type == 'batch':
operations = serializer.validated_data.get('operations', [])
# 异步处理会在这里触发
task.status = 'processing'
task.started_at = timezone.now()
task.total_items = len(operations)
task.save()
return Response(AITaskSerializer(task).data, status=201)
@action(detail=True, methods=['post'])
def cancel(self, request, task_id):
"""取消任务"""
task = self.get_object()
if task.status in ['completed', 'failed']:
return Response({'error': 'Task already completed'}, status=400)
task.status = 'cancelled'
task.completed_at = timezone.now()
task.save()
return Response({'message': 'Task cancelled'})
@api_view(['POST'])
def batch_execute(request):
"""批量操作执行"""
serializer = BatchOperationSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
operations = serializer.validated_data['operations']
results = []
# 获取 AI 代理
agent_id = request.auth.payload.get('agent_id') if hasattr(request, 'auth') else None
if not agent_id:
return Response({'error': 'Agent authentication required'}, status=401)
try:
agent = AIAgent.objects.get(agent_id=agent_id)
except AIAgent.DoesNotExist:
return Response({'error': 'Agent not found'}, status=404)
# 检查批量操作权限
if not agent.has_permission('batch'):
return Response({'error': 'No batch permission'}, status=403)
start_time = time.time()
# 执行操作(简化版本,实际应该异步)
for i, op in enumerate(operations):
try:
# TODO: 实际执行 HTTP 请求到对应的 API
# 这里只记录日志
AIOperationLog.log(
agent=agent,
action=f"batch_{op['method'].lower()}",
resource_type='batch',
status='success',
request_data=op,
execution_time_ms=int((time.time() - start_time) * 1000)
)
results.append({
'index': i,
'status': 'success',
'method': op['method'],
'path': op['path']
})
except Exception as e:
results.append({
'index': i,
'status': 'failed',
'error': str(e)
})
execution_time = int((time.time() - start_time) * 1000)
# 记录批量操作日志
AIOperationLog.log(
agent=agent,
action='batch_execute',
resource_type='batch',
status='success',
confidence=1.0,
reasoning=f'Executed {len(operations)} operations',
request_data={'operations_count': len(operations)},
execution_time_ms=execution_time
)
return Response({
'task_id': uuid.uuid4().hex,
'status': 'completed',
'execution_time_ms': execution_time,
'results': results,
'summary': {
'total': len(operations),
'success': sum(1 for r in results if r['status'] == 'success'),
'failed': sum(1 for r in results if r['status'] == 'failed')
}
})
class AIWebhookViewSet(viewsets.ModelViewSet):
"""AI Webhook 管理"""
queryset = AIWebhook.objects.all()
serializer_class = AIWebhookSerializer
def get_queryset(self):
queryset = super().get_queryset()
# 只返回当前代理的 webhook
agent_id = self.request.auth.payload.get('agent_id') if hasattr(self.request, 'auth') else None
if agent_id:
queryset = queryset.filter(agent__agent_id=agent_id)
return queryset
def perform_create(self, serializer):
# 自动关联当前代理
agent_id = self.request.auth.payload.get('agent_id') if hasattr(self.request, 'auth') else None
if agent_id:
agent = AIAgent.objects.get(agent_id=agent_id)
serializer.save(agent=agent)