Files
chengshishouce/city-manual/backend/agents/models.py
maoshen d9e09b61ee feat: 实现 AI-First 代理系统
核心功能:
- AIAgent 模型:AI 代理身份管理
- AIOperationLog: AI 操作日志记录
- AITask: AI 异步任务系统
- AIWebhook: AI webhook 订阅

API 端点:
- POST /api/agents/auth/ - AI 代理认证
- GET/POST /api/agents/ - 代理管理
- GET /api/agent-logs/ - 操作日志查询
- GET/POST /api/agent-tasks/ - 任务管理
- GET/POST /api/agent-webhooks/ - Webhook 管理
- POST /api/batch/ - 批量操作

预置 AI 代理:
- content-moderator-ai: 内容审核 AI
- content-generator-ai: 内容生成 AI
- service-curator-ai: 服务推荐 AI
- analytics-ai: 数据分析 AI
- admin-ai: 管理员 AI

文档:
- AI_AGENT.md: AI-First 设计文档
- init_agents.py: AI 代理初始化脚本

测试:
- 认证系统测试通过
- JWT token 生成正常
- 权限系统工作正常
2026-04-12 11:40:11 +00:00

246 lines
7.7 KiB
Python

from django.db import models
from django.utils import timezone
from datetime import timedelta
class AIAgent(models.Model):
"""AI 代理模型"""
agent_id = models.CharField(max_length=100, unique=True)
name = models.CharField(max_length=200)
description = models.TextField(blank=True)
secret_key = models.CharField(max_length=64)
# 权限
permissions = models.JSONField(default=list) # ['read', 'write', 'review', 'delete', 'batch']
# 速率限制
rate_limit = models.IntegerField(default=1000) # 每小时请求数
rate_limit_window = models.IntegerField(default=3600) # 秒
# 状态
is_active = models.BooleanField(default=True)
last_seen = models.DateTimeField(null=True, blank=True)
# 元数据
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'ai_agents'
ordering = ['created_at']
def __str__(self):
return f"{self.name} ({self.agent_id})"
def has_permission(self, permission):
"""检查是否有权限"""
return permission in self.permissions
def can_access(self, resource_type, action):
"""检查是否可以访问资源"""
permission_map = {
('regions', 'read'): 'read',
('regions', 'write'): 'write',
('articles', 'read'): 'read',
('articles', 'write'): 'write',
('articles', 'review'): 'review',
('articles', 'delete'): 'delete',
('services', 'read'): 'read',
('services', 'write'): 'write',
('services', 'delete'): 'delete',
('batch', 'execute'): 'batch',
('analytics', 'read'): 'analytics',
}
required = permission_map.get((resource_type, action))
return required and self.has_permission(required)
class AIOperationLog(models.Model):
"""AI 操作日志"""
STATUS_CHOICES = [
('success', 'Success'),
('failed', 'Failed'),
('partial', 'Partial Success'),
]
agent = models.ForeignKey(AIAgent, on_delete=models.CASCADE, related_name='operations')
action = models.CharField(max_length=50) # create, update, delete, review, etc.
resource_type = models.CharField(max_length=50) # article, service, region, etc.
resource_id = models.IntegerField(null=True, blank=True)
status = models.CharField(max_length=20, choices=STATUS_CHOICES)
# AI 元数据
confidence = models.FloatField(null=True, blank=True) # AI 置信度 0-1
reasoning = models.TextField(blank=True) # AI 推理过程
# 请求信息
request_data = models.JSONField(null=True, blank=True)
response_data = models.JSONField(null=True, blank=True)
error_message = models.TextField(blank=True)
# 性能
execution_time_ms = models.IntegerField(null=True, blank=True)
# 时间
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = 'ai_operation_logs'
ordering = ['-created_at']
indexes = [
models.Index(fields=['agent', '-created_at']),
models.Index(fields=['resource_type', '-created_at']),
]
def __str__(self):
return f"{self.agent.agent_id} - {self.action} - {self.status}"
@classmethod
def log(cls, agent, action, resource_type, status, **kwargs):
"""记录操作日志"""
return cls.objects.create(
agent=agent,
action=action,
resource_type=resource_type,
status=status,
**kwargs
)
class AITask(models.Model):
"""AI 异步任务"""
STATUS_CHOICES = [
('pending', 'Pending'),
('processing', 'Processing'),
('completed', 'Completed'),
('failed', 'Failed'),
]
task_id = models.CharField(max_length=64, unique=True)
agent = models.ForeignKey(AIAgent, on_delete=models.CASCADE, related_name='tasks')
task_type = models.CharField(max_length=50) # batch, analyze, optimize, etc.
status = models.CharField(max_length=20, choices=STATUS_CHOICES)
# 进度
progress = models.IntegerField(default=0) # 0-100
total_items = models.IntegerField(null=True, blank=True)
processed_items = models.IntegerField(default=0)
# 结果
result = models.JSONField(null=True, blank=True)
error_message = models.TextField(blank=True)
# 回调
callback_url = models.URLField(null=True, blank=True)
callback_secret = models.CharField(max_length=64, null=True, blank=True)
# 时间
created_at = models.DateTimeField(auto_now_add=True)
started_at = models.DateTimeField(null=True, blank=True)
completed_at = models.DateTimeField(null=True, blank=True)
class Meta:
db_table = 'ai_tasks'
ordering = ['-created_at']
def __str__(self):
return f"{self.task_id} - {self.status}"
def update_progress(self, processed, total=None):
"""更新任务进度"""
self.processed_items = processed
if total:
self.total_items = total
if total:
self.progress = int((processed / total) * 100)
self.save()
def complete(self, result=None):
"""标记任务完成"""
self.status = 'completed'
self.completed_at = timezone.now()
if result:
self.result = result
self.progress = 100
self.save()
def fail(self, error_message):
"""标记任务失败"""
self.status = 'failed'
self.completed_at = timezone.now()
self.error_message = error_message
self.save()
class AIWebhook(models.Model):
"""AI Webhook 订阅"""
EVENT_CHOICES = [
('article.created', 'Article Created'),
('article.approved', 'Article Approved'),
('article.rejected', 'Article Rejected'),
('service.created', 'Service Created'),
('review.pending', 'Review Pending'),
]
agent = models.ForeignKey(AIAgent, on_delete=models.CASCADE, related_name='webhooks')
event = models.CharField(max_length=50, choices=EVENT_CHOICES)
url = models.URLField()
secret = models.CharField(max_length=64)
is_active = models.BooleanField(default=True)
last_triggered = models.DateTimeField(null=True, blank=True)
failure_count = models.IntegerField(default=0)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = 'ai_webhooks'
ordering = ['created_at']
def __str__(self):
return f"{self.agent.agent_id} - {self.event}"
def trigger(self, payload):
"""触发 webhook"""
import requests
import hashlib
import hmac
# 生成签名
signature = hmac.new(
self.secret.encode(),
payload.encode(),
hashlib.sha256
).hexdigest()
try:
response = requests.post(
self.url,
data=payload,
headers={
'Content-Type': 'application/json',
'X-Webhook-Signature': f'sha256={signature}',
'X-Webhook-Event': self.event,
},
timeout=10
)
if response.status_code == 200:
self.last_triggered = timezone.now()
self.failure_count = 0
else:
self.failure_count += 1
self.save()
return response.status_code == 200
except Exception:
self.failure_count += 1
self.save()
return False