Files
openclaw-memory/CHANGELOG.md
道童 4374379d3f feat: 龙虾记忆同步系统完整版本
功能特性:
- 文件树展示
- 差异对比
- 双向同步(本地 <-> 数据库)
- 版本历史追踪
- 统计信息展示

核心补丁:
1. 分块读取与流式传输(防止大文件内存飙升)
2. .lobsterignore 机制(排除临时文件)
3. 操作溯源(Audit Log,记录同步历史)

技术栈:
- 后端: Django + DRF + PostgreSQL
- 前端: React + Ant Design
- 部署: Docker + Docker Compose

项目已完整部署,可直接使用 docker-compose up -d 启动
2026-04-05 12:43:24 +00:00

376 lines
10 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 🎯 三个"补丁"更新日志
## 更新时间
2026-04-05
## 更新说明
根据逍遥子的建议,为龙虾记忆同步系统添加了三个重要功能补丁,提升系统性能、可用性和安全性。
---
## 📦 补丁 1: 分块读取与流式传输
### 问题
- 如果龙虾的记忆文件(比如某些 Log 或向量快照)超过 50MB
- 一次性 GET /api/diff 会让后端内存瞬间飙升
### 解决方案
- **流式读取**:使用 8KB 分块读取大文件,避免一次性加载到内存
- **流式哈希计算**:直接从文件流计算哈希,无需加载完整内容
- **差异对比限制**:大文件只显示头尾各 500 行,中间省略
### 实现细节
```python
# services.py
class FileScanner:
chunk_size = 8192 # 8KB 分块读取
def read_file_chunked(self, file_path: Path) -> str:
"""分块读取文件"""
content_parts = []
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
while True:
chunk = f.read(self.chunk_size)
if not chunk:
break
content_parts.append(chunk)
return ''.join(content_parts)
def read_file_stream(self, file_path: str) -> Iterator[str]:
"""流式读取文件(用于大文件传输)"""
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
while True:
chunk = f.read(self.chunk_size)
if not chunk:
break
yield chunk
def compute_hash_stream(self, file_path: Path) -> str:
"""流式计算文件哈希(避免大文件内存问题)"""
hash_obj = hashlib.sha256()
with open(file_path, 'rb') as f:
while True:
chunk = f.read(self.chunk_size)
if not chunk:
break
hash_obj.update(chunk)
return hash_obj.hexdigest()
class DiffChecker:
def get_file_diff(self, local_content: str, db_content: str, max_lines: int = 1000) -> Dict:
"""获取文件差异(支持大文件限制)"""
local_lines = local_content.split('\n')
db_lines = db_content.split('\n')
# 限制行数(大文件只显示头尾)
if len(local_lines) > max_lines:
local_head = local_lines[:max_lines//2]
local_tail = local_lines[-max_lines//2:]
local_lines = local_head + ['... (中间省略 {}) 行 ...'.format(len(local_lines) - max_lines)] + local_tail
```
### API 更新
```http
#
GET /api/diff/?lobster_id=daotong&file_path=large-file.log&chunked=true
```
---
## 📦 补丁 2: .lobsterignore 机制
### 问题
- 临时文件(如 .DS_Store、日志缓存不需要进数据库
- 手动维护一个排除列表会更清爽
### 解决方案
- 创建 `.lobsterignore` 文件(类似 `.gitignore`
- 扫描时自动跳过匹配的文件
- 提供默认忽略规则
### 实现细节
```python
# services.py
class IgnorePattern:
""".lobsterignore 模式匹配器"""
def __init__(self, base_dir: Path):
self.base_dir = base_dir
self.patterns = []
self.load_patterns()
def load_patterns(self):
"""加载 .lobsterignore 文件"""
ignore_file = self.base_dir / '.lobsterignore'
if ignore_file.exists():
with open(ignore_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
# 跳过空行和注释
if line and not line.startswith('#'):
self.patterns.append(line)
# 添加默认忽略规则
default_patterns = [
'.DS_Store', '.git', '.gitignore', '__pycache__',
'node_modules', '*.pyc', '*.pyo', '*.log',
'*.tmp', '*.temp', '*.bak', '.vscode', '.idea'
]
for pattern in default_patterns:
if pattern not in self.patterns:
self.patterns.append(pattern)
def is_ignored(self, file_path: Path) -> bool:
"""判断文件是否被忽略"""
relative_path = file_path.relative_to(self.base_dir)
for pattern in self.patterns:
# 匹配文件名
if fnmatch.fnmatch(file_path.name, pattern):
return True
# 匹配相对路径
if fnmatch.fnmatch(str(relative_path), pattern):
return True
# 匹配目录
if pattern.endswith('/') and fnmatch.fnmatch(str(relative_path.parent), pattern.rstrip('/')):
return True
# 递归匹配子目录
if pattern.startswith('*/'):
parts = str(relative_path).split(os.sep)
for i, part in enumerate(parts):
if fnmatch.fnmatch(part, pattern[2:]):
return True
return False
```
### 示例文件
```bash
# .lobsterignore
# 系统文件
.DS_Store
.Thumbs.db
# IDE 和编辑器
.vscode/
.idea/
*.swp
# Python
__pycache__/
*.pyc
*.log
# Node.js
node_modules/
# 临时文件
*.tmp
*.bak
```
### API 更新
```http
#
GET /api/ignore/patterns/
#
POST /api/ignore/reload/
```
---
## 📦 补丁 3: 操作溯源Audit Log
### 问题
- 万一哪天点错了,无法查到是哪次操作导致的
- 需要记录操作历史,方便追溯问题
### 解决方案
- 新增 `SyncHistory` 模型
- 记录每次同步操作的详细信息
- 提供历史查询 API
### 实现细节
```python
# models.py
class SyncHistory(models.Model):
"""同步操作历史记录"""
ACTION_CHOICES = [
('sync_to_db', '同步到数据库'),
('sync_to_local', '同步到本地'),
('auto_sync', '自动同步'),
('manual_merge', '手动合并'),
]
STATUS_CHOICES = [
('success', '成功'),
('failed', '失败'),
('partial', '部分成功'),
]
lobster_id = models.CharField(max_length=50, help_text='龙虾ID')
file_path = models.CharField(max_length=500, help_text='文件相对路径')
action = models.CharField(max_length=20, choices=ACTION_CHOICES, help_text='操作类型')
status = models.CharField(max_length=20, choices=STATUS_CHOICES, help_text='操作状态')
old_version = models.IntegerField(null=True, blank=True, help_text='操作前版本')
new_version = models.IntegerField(null=True, blank=True, help_text='操作后版本')
old_hash = models.CharField(max_length=64, null=True, blank=True, help_text='操作前哈希')
new_hash = models.CharField(max_length=64, null=True, blank=True, help_text='操作后哈希')
file_size = models.IntegerField(default=0, help_text='文件大小(字节)')
operator = models.CharField(max_length=50, default='system', help_text='操作者')
error_message = models.TextField(null=True, blank=True, help_text='错误信息')
execution_time = models.FloatField(default=0, help_text='执行时间(秒)')
created_at = models.DateTimeField(auto_now_add=True, help_text='操作时间')
# services.py
class AuditLogger:
"""操作日志记录器"""
def log_sync_action(
self,
lobster_id: str,
file_path: str,
action: str,
old_version: int = None,
new_version: int = None,
old_hash: str = None,
new_hash: str = None,
file_size: int = 0,
operator: str = 'system',
status: str = 'success',
error_message: str = None,
execution_time: float = 0
):
"""记录同步操作"""
self.model.objects.create(...)
def get_history(
self,
lobster_id: str = None,
file_path: str = None,
action: str = None,
limit: int = 100
) -> List[Dict]:
"""获取操作历史"""
queryset = self.model.objects.all()
# 过滤和排序...
```
### 使用示例
```python
# views.py
@api_view(['POST'])
def sync_to_db(request):
"""同步到数据库(带操作日志)"""
audit_logger = AuditLogger()
start_time = time.time()
try:
# 执行同步操作...
execution_time = time.time() - start_time
# 记录成功日志
audit_logger.log_sync_action(
lobster_id=lobster_id,
file_path=file_path,
action='sync_to_db',
old_version=old_version,
new_version=new_version,
old_hash=old_hash,
new_hash=file_hash,
file_size=record.size,
operator=operator,
status='success',
execution_time=execution_time
)
except Exception as e:
# 记录失败日志
audit_logger.log_sync_action(
lobster_id=lobster_id,
file_path=file_path,
action='sync_to_db',
operator=operator,
status='failed',
error_message=str(e),
execution_time=execution_time
)
```
### API 更新
```http
#
GET /api/history/?lobster_id=daotong&file_path=MEMORY.md&limit=50
```
### 历史记录示例
```json
{
"success": true,
"data": [
{
"id": 1,
"lobster_id": "daotong",
"file_path": "MEMORY.md",
"action": "sync_to_db",
"action_display": "同步到数据库",
"status": "success",
"status_display": "成功",
"old_version": 1,
"new_version": 2,
"old_hash": "abc123...",
"new_hash": "def456...",
"file_size": 1234,
"operator": "逍遥子",
"error_message": null,
"execution_time": 0.123,
"created_at": "2026-04-05T12:00:00Z"
}
]
}
```
---
## 📋 数据库迁移
需要执行数据库迁移以创建 `SyncHistory` 表:
```bash
# 进入后端容器
docker exec -it lobster-backend bash
# 创建迁移
python manage.py makemigrations memory_app
python manage.py migrate
```
---
## ✅ 完成检查清单
- [x] 分块读取与流式传输services.py
- [x] .lobsterignore 机制services.py + .lobsterignore.example
- [x] 操作溯源models.py + services.py + views.py + serializers.py
- [x] 新增 API 接口urls.py
- [x] 更新文档CHANGELOG.md
---
## 🚀 下一步
1. 执行数据库迁移
2. 推送代码到远程仓库
3. 更新前端界面(添加历史记录和忽略规则管理)
---
**感谢逍遥子的宝贵建议!** 🙏