# 🎯 三个"补丁"更新日志 ## 更新时间 2026-04-05 ## 更新说明 根据逍遥子的建议,为龙虾记忆同步系统添加了三个重要功能补丁,提升系统性能、可用性和安全性。 --- ## 📦 补丁 1: 分块读取与流式传输 ### 问题 - 如果龙虾的记忆文件(比如某些 Log 或向量快照)超过 50MB - 一次性 GET /api/diff 会让后端内存瞬间飙升 ### 解决方案 - **流式读取**:使用 8KB 分块读取大文件,避免一次性加载到内存 - **流式哈希计算**:直接从文件流计算哈希,无需加载完整内容 - **差异对比限制**:大文件只显示头尾各 500 行,中间省略 ### 实现细节 ```python # services.py class FileScanner: chunk_size = 8192 # 8KB 分块读取 def read_file_chunked(self, file_path: Path) -> str: """分块读取文件""" content_parts = [] with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: while True: chunk = f.read(self.chunk_size) if not chunk: break content_parts.append(chunk) return ''.join(content_parts) def read_file_stream(self, file_path: str) -> Iterator[str]: """流式读取文件(用于大文件传输)""" with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: while True: chunk = f.read(self.chunk_size) if not chunk: break yield chunk def compute_hash_stream(self, file_path: Path) -> str: """流式计算文件哈希(避免大文件内存问题)""" hash_obj = hashlib.sha256() with open(file_path, 'rb') as f: while True: chunk = f.read(self.chunk_size) if not chunk: break hash_obj.update(chunk) return hash_obj.hexdigest() class DiffChecker: def get_file_diff(self, local_content: str, db_content: str, max_lines: int = 1000) -> Dict: """获取文件差异(支持大文件限制)""" local_lines = local_content.split('\n') db_lines = db_content.split('\n') # 限制行数(大文件只显示头尾) if len(local_lines) > max_lines: local_head = local_lines[:max_lines//2] local_tail = local_lines[-max_lines//2:] local_lines = local_head + ['... (中间省略 {}) 行 ...'.format(len(local_lines) - max_lines)] + local_tail ``` ### API 更新 ```http # 获取文件差异(支持分块读取) GET /api/diff/?lobster_id=daotong&file_path=large-file.log&chunked=true ``` --- ## 📦 补丁 2: .lobsterignore 机制 ### 问题 - 临时文件(如 .DS_Store、日志缓存)不需要进数据库 - 手动维护一个排除列表会更清爽 ### 解决方案 - 创建 `.lobsterignore` 文件(类似 `.gitignore`) - 扫描时自动跳过匹配的文件 - 提供默认忽略规则 ### 实现细节 ```python # services.py class IgnorePattern: """.lobsterignore 模式匹配器""" def __init__(self, base_dir: Path): self.base_dir = base_dir self.patterns = [] self.load_patterns() def load_patterns(self): """加载 .lobsterignore 文件""" ignore_file = self.base_dir / '.lobsterignore' if ignore_file.exists(): with open(ignore_file, 'r', encoding='utf-8') as f: for line in f: line = line.strip() # 跳过空行和注释 if line and not line.startswith('#'): self.patterns.append(line) # 添加默认忽略规则 default_patterns = [ '.DS_Store', '.git', '.gitignore', '__pycache__', 'node_modules', '*.pyc', '*.pyo', '*.log', '*.tmp', '*.temp', '*.bak', '.vscode', '.idea' ] for pattern in default_patterns: if pattern not in self.patterns: self.patterns.append(pattern) def is_ignored(self, file_path: Path) -> bool: """判断文件是否被忽略""" relative_path = file_path.relative_to(self.base_dir) for pattern in self.patterns: # 匹配文件名 if fnmatch.fnmatch(file_path.name, pattern): return True # 匹配相对路径 if fnmatch.fnmatch(str(relative_path), pattern): return True # 匹配目录 if pattern.endswith('/') and fnmatch.fnmatch(str(relative_path.parent), pattern.rstrip('/')): return True # 递归匹配子目录 if pattern.startswith('*/'): parts = str(relative_path).split(os.sep) for i, part in enumerate(parts): if fnmatch.fnmatch(part, pattern[2:]): return True return False ``` ### 示例文件 ```bash # .lobsterignore # 系统文件 .DS_Store .Thumbs.db # IDE 和编辑器 .vscode/ .idea/ *.swp # Python __pycache__/ *.pyc *.log # Node.js node_modules/ # 临时文件 *.tmp *.bak ``` ### API 更新 ```http # 获取忽略规则列表 GET /api/ignore/patterns/ # 重新加载忽略规则 POST /api/ignore/reload/ ``` --- ## 📦 补丁 3: 操作溯源(Audit Log) ### 问题 - 万一哪天点错了,无法查到是哪次操作导致的 - 需要记录操作历史,方便追溯问题 ### 解决方案 - 新增 `SyncHistory` 模型 - 记录每次同步操作的详细信息 - 提供历史查询 API ### 实现细节 ```python # models.py class SyncHistory(models.Model): """同步操作历史记录""" ACTION_CHOICES = [ ('sync_to_db', '同步到数据库'), ('sync_to_local', '同步到本地'), ('auto_sync', '自动同步'), ('manual_merge', '手动合并'), ] STATUS_CHOICES = [ ('success', '成功'), ('failed', '失败'), ('partial', '部分成功'), ] lobster_id = models.CharField(max_length=50, help_text='龙虾ID') file_path = models.CharField(max_length=500, help_text='文件相对路径') action = models.CharField(max_length=20, choices=ACTION_CHOICES, help_text='操作类型') status = models.CharField(max_length=20, choices=STATUS_CHOICES, help_text='操作状态') old_version = models.IntegerField(null=True, blank=True, help_text='操作前版本') new_version = models.IntegerField(null=True, blank=True, help_text='操作后版本') old_hash = models.CharField(max_length=64, null=True, blank=True, help_text='操作前哈希') new_hash = models.CharField(max_length=64, null=True, blank=True, help_text='操作后哈希') file_size = models.IntegerField(default=0, help_text='文件大小(字节)') operator = models.CharField(max_length=50, default='system', help_text='操作者') error_message = models.TextField(null=True, blank=True, help_text='错误信息') execution_time = models.FloatField(default=0, help_text='执行时间(秒)') created_at = models.DateTimeField(auto_now_add=True, help_text='操作时间') # services.py class AuditLogger: """操作日志记录器""" def log_sync_action( self, lobster_id: str, file_path: str, action: str, old_version: int = None, new_version: int = None, old_hash: str = None, new_hash: str = None, file_size: int = 0, operator: str = 'system', status: str = 'success', error_message: str = None, execution_time: float = 0 ): """记录同步操作""" self.model.objects.create(...) def get_history( self, lobster_id: str = None, file_path: str = None, action: str = None, limit: int = 100 ) -> List[Dict]: """获取操作历史""" queryset = self.model.objects.all() # 过滤和排序... ``` ### 使用示例 ```python # views.py @api_view(['POST']) def sync_to_db(request): """同步到数据库(带操作日志)""" audit_logger = AuditLogger() start_time = time.time() try: # 执行同步操作... execution_time = time.time() - start_time # 记录成功日志 audit_logger.log_sync_action( lobster_id=lobster_id, file_path=file_path, action='sync_to_db', old_version=old_version, new_version=new_version, old_hash=old_hash, new_hash=file_hash, file_size=record.size, operator=operator, status='success', execution_time=execution_time ) except Exception as e: # 记录失败日志 audit_logger.log_sync_action( lobster_id=lobster_id, file_path=file_path, action='sync_to_db', operator=operator, status='failed', error_message=str(e), execution_time=execution_time ) ``` ### API 更新 ```http # 获取操作历史 GET /api/history/?lobster_id=daotong&file_path=MEMORY.md&limit=50 ``` ### 历史记录示例 ```json { "success": true, "data": [ { "id": 1, "lobster_id": "daotong", "file_path": "MEMORY.md", "action": "sync_to_db", "action_display": "同步到数据库", "status": "success", "status_display": "成功", "old_version": 1, "new_version": 2, "old_hash": "abc123...", "new_hash": "def456...", "file_size": 1234, "operator": "逍遥子", "error_message": null, "execution_time": 0.123, "created_at": "2026-04-05T12:00:00Z" } ] } ``` --- ## 📋 数据库迁移 需要执行数据库迁移以创建 `SyncHistory` 表: ```bash # 进入后端容器 docker exec -it lobster-backend bash # 创建迁移 python manage.py makemigrations memory_app python manage.py migrate ``` --- ## ✅ 完成检查清单 - [x] 分块读取与流式传输(services.py) - [x] .lobsterignore 机制(services.py + .lobsterignore.example) - [x] 操作溯源(models.py + services.py + views.py + serializers.py) - [x] 新增 API 接口(urls.py) - [x] 更新文档(CHANGELOG.md) --- ## 🚀 下一步 1. 执行数据库迁移 2. 推送代码到远程仓库 3. 更新前端界面(添加历史记录和忽略规则管理) --- **感谢逍遥子的宝贵建议!** 🙏