""" 龙虾记忆同步系统 - 核心服务模块 功能说明: 1. 分块与流式处理:所有文件读取使用 8KB 分块,避免大文件内存问题 2. .lobsterignore 支持:正则表达式匹配,过滤不需要同步的文件 3. 审计日志:记录所有同步操作,包括变动行数 4. 语义摘要:调用本地模型生成文件内容摘要 5. 冲突判定:完善的状态检查,识别 HARD_CONFLICT 状态 """ import os import re import hashlib import time from pathlib import Path from typing import List, Dict, Tuple, Iterator, Optional from django.conf import settings from django.utils import timezone class IgnorePattern: """ .lobsterignore 模式匹配器(支持正则表达式) 支持的匹配规则: 1. 通配符:*.pyc, node_modules/ 2. 目录:__pycache__/ 3. 正则表达式:re:.*\.log$ 4. 注释:# 开头的行为注释 """ def __init__(self, base_dir: Path): self.base_dir = base_dir self.patterns = [] # (pattern_type, pattern, compiled_regex) self.load_patterns() def load_patterns(self): """ 加载 .lobsterignore 文件 默认忽略规则: - .git, .gitignore - node_modules - .pyc, __pycache__ """ ignore_file = self.base_dir / '.lobsterignore' if ignore_file.exists(): with open(ignore_file, 'r', encoding='utf-8') as f: for line in f: line = line.strip() # 跳过空行和注释 if not line or line.startswith('#'): continue # 解析模式类型 if line.startswith('re:'): # 正则表达式模式 pattern = line[3:] try: regex = re.compile(pattern) self.patterns.append(('regex', pattern, regex)) except re.error as e: print(f"Invalid regex pattern '{pattern}': {e}") else: # 通配符模式 self.patterns.append(('glob', line, None)) # 添加默认忽略规则 default_patterns = [ '.DS_Store', '.git', '.gitignore', '__pycache__', 'node_modules', '*.pyc', '*.pyo', '*.log', '*.tmp', '*.temp', '*.bak', '.vscode', '.idea', '.pytest_cache', '.mypy_cache', '*.egg-info' ] for pattern in default_patterns: # 检查是否已存在 if not any(p[1] == pattern for p in self.patterns): self.patterns.append(('glob', pattern, None)) def is_ignored(self, file_path: Path) -> bool: """ 判断文件是否被忽略 Args: file_path: 文件路径(绝对路径) Returns: True 表示忽略,False 表示不忽略 """ # 获取相对路径 try: relative_path = file_path.relative_to(self.base_dir) relative_str = str(relative_path) filename = file_path.name except ValueError: # 文件不在基础目录下 return False for pattern_type, pattern, regex in self.patterns: if pattern_type == 'regex': # 正则表达式匹配 if regex.search(relative_str) or regex.search(filename): return True else: # 通配符匹配 from fnmatch import fnmatch # 匹配文件名 if fnmatch(filename, pattern): return True # 匹配相对路径 if fnmatch(relative_str, pattern): return True # 匹配目录 if pattern.endswith('/') and fnmatch(str(relative_path.parent), pattern.rstrip('/')): return True # 递归匹配子目录 if pattern.startswith('*/'): parts = relative_str.split(os.sep) for part in parts: if fnmatch(part, pattern[2:]): return True return False class FileScanner: """ 文件扫描器(支持 .lobsterignore 和分块读取) 所有文件读取操作都使用 8KB 分块,避免大文件内存问题 """ def __init__(self): self.base_dir = Path(settings.LOBSTER_MEMORY_BASE) self.supported_extensions = settings.SUPPORTED_EXTENSIONS self.ignore = IgnorePattern(self.base_dir) self.chunk_size = 8192 # 8KB 分块读取 def scan_directory(self, lobster_id: str = None) -> List[Dict]: """ 扫描目录,返回所有文件信息 Args: lobster_id: 龙虾ID(可选) Returns: 文件信息列表 """ if not self.base_dir.exists(): return [] files = [] for file_path in self.base_dir.rglob('*'): if not file_path.is_file(): continue # 检查文件扩展名 if file_path.suffix not in self.supported_extensions: continue # 检查是否被 .lobsterignore 忽略 if self.ignore.is_ignored(file_path): continue try: relative_path = file_path.relative_to(self.base_dir) # 使用流式计算哈希(避免大文件内存问题) file_hash = self.compute_hash_stream(file_path) files.append({ 'file_path': str(relative_path), 'full_path': str(file_path), 'hash': file_hash, 'size': file_path.stat().st_size, 'lobster_id': lobster_id or 'unknown', }) except Exception as e: print(f"Error reading {file_path}: {e}") return files def get_file_content(self, file_path: str, chunked: bool = True) -> Tuple[str, str]: """ 获取文件内容和哈希(使用分块读取) Args: file_path: 相对路径 chunked: 是否使用分块读取(默认 True) Returns: (content, hash) """ full_path = self.base_dir / file_path if not full_path.exists(): raise FileNotFoundError(f"File not found: {file_path}") # 默认使用分块读取 if chunked: content = self.read_file_chunked(full_path) else: content = full_path.read_text(encoding='utf-8', errors='ignore') file_hash = self.compute_hash(content) return content, file_hash def read_file_chunked(self, file_path: Path) -> str: """ 分块读取文件(8KB 分块) Args: file_path: 文件路径 Returns: 文件内容 """ content_parts = [] with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: while True: chunk = f.read(self.chunk_size) if not chunk: break content_parts.append(chunk) return ''.join(content_parts) def read_file_stream(self, file_path: str) -> Iterator[str]: """ 流式读取文件(用于大文件传输) Args: file_path: 相对路径 Yields: 8KB 文件块 """ full_path = self.base_dir / file_path if not full_path.exists(): raise FileNotFoundError(f"File not found: {file_path}") with open(full_path, 'r', encoding='utf-8', errors='ignore') as f: while True: chunk = f.read(self.chunk_size) if not chunk: break yield chunk def write_file(self, file_path: str, content: str): """ 写入文件 Args: file_path: 相对路径 content: 文件内容 """ full_path = self.base_dir / file_path # 确保目录存在 full_path.parent.mkdir(parents=True, exist_ok=True) # 写入文件 full_path.write_text(content, encoding='utf-8') def compute_hash(self, content: str) -> str: """ 计算 SHA256 哈希 Args: content: 文件内容 Returns: 哈希值 """ return hashlib.sha256(content.encode('utf-8')).hexdigest() def compute_hash_stream(self, file_path: Path) -> str: """ 流式计算文件哈希(避免大文件内存问题) Args: file_path: 文件路径 Returns: 哈希值 """ hash_obj = hashlib.sha256() with open(file_path, 'rb') as f: while True: chunk = f.read(self.chunk_size) if not chunk: break hash_obj.update(chunk) return hash_obj.hexdigest() def get_file_tree(self, lobster_id: str = None) -> Dict: """ 获取文件树结构 Args: lobster_id: 龙虾ID Returns: 文件树字典 """ files = self.scan_directory(lobster_id) tree = {} for file_info in files: parts = Path(file_info['file_path']).parts current = tree for part in parts[:-1]: if part not in current: current[part] = {} current = current[part] filename = parts[-1] current[filename] = file_info return tree class SemanticSummaryGenerator: """ 语义摘要生成器 调用本地模型生成文件内容摘要 """ def __init__(self): self.enabled = getattr(settings, 'SEMANTIC_SUMMARY_ENABLED', False) self.model_path = getattr(settings, 'SEMANTIC_MODEL_PATH', None) def generate_summary(self, content: str, max_length: int = 200) -> Optional[str]: """ 生成文件内容摘要 Args: content: 文件内容 max_length: 摘要最大长度 Returns: 摘要文本(如果启用) """ if not self.enabled or not content: return None # 如果内容较短,直接返回截断版本 if len(content) < 500: return content[:max_length] # TODO: 调用本地模型生成摘要 # 这里可以集成 OpenClaw 的本地模型 # 暂时返回简单的摘要 lines = content.split('\n') summary_lines = [] # 提取前 5 行和后 5 行 for i, line in enumerate(lines): if i < 5 or i >= len(lines) - 5: if line.strip(): summary_lines.append(line.strip()) summary = ' '.join(summary_lines) return summary[:max_length] if len(summary) > max_length else summary class DiffChecker: """ 差异检查器(支持大文件优化和冲突判定) 冲突判定逻辑: - consistent: 哈希相同,内容一致 - local_newer: 只有本地存在 - db_newer: 只有数据库存在 - conflict: 两边都存在但哈希不同 - hard_conflict: 两边都存在,哈希不同,且数据库有多个版本变化 """ def __init__(self): self.scanner = FileScanner() def check_sync_status(self, local_files: List[Dict], db_files: List[Dict]) -> Dict: """ 检查同步状态(完善冲突判定逻辑) Args: local_files: 本地文件列表 db_files: 数据库文件列表 Returns: 同步状态字典 """ local_map = {f['file_path']: f for f in local_files} db_map = {f['file_path']: f for f in db_files} results = { 'consistent': [], 'local_newer': [], 'db_newer': [], 'conflict': [], 'hard_conflict': [], 'local_only': [], 'db_only': [], } all_paths = set(local_map.keys()) | set(db_map.keys()) for path in all_paths: local = local_map.get(path) db = db_map.get(path) if local and db: # 两边都存在 if local['hash'] == db['hash']: # 哈希相同,内容一致 results['consistent'].append({ 'file_path': path, 'status': 'consistent', 'hash': local['hash'] }) else: # 哈希不同,检查是否为严重冲突 updated_at = db.get('updated_at') version = db.get('version', 0) # 判定严重冲突的条件: # 1. 哈希不同 # 2. 版本号 > 1(说明已经有多次变更) # 3. 数据库更新时间较近(1小时内) if version > 1 and updated_at: from datetime import datetime, timedelta if isinstance(updated_at, str): updated_at = datetime.fromisoformat(updated_at) time_diff = datetime.now() - updated_at if time_diff < timedelta(hours=1): results['hard_conflict'].append({ 'file_path': path, 'status': 'hard_conflict', 'local_hash': local['hash'], 'db_hash': db['hash'], 'version': version, 'updated_at': str(updated_at) }) else: results['conflict'].append({ 'file_path': path, 'status': 'conflict', 'local_hash': local['hash'], 'db_hash': db['hash'], 'version': version }) else: results['conflict'].append({ 'file_path': path, 'status': 'conflict', 'local_hash': local['hash'], 'db_hash': db['hash'], 'version': version }) elif local and not db: # 只有本地存在 results['local_only'].append({ 'file_path': path, 'status': 'local_only', 'hash': local['hash'] }) elif not local and db: # 只有数据库存在 results['db_only'].append({ 'file_path': path, 'status': 'db_only', 'hash': db['hash'] }) return results def calculate_lines_changed(self, old_content: str, new_content: str) -> int: """ 计算变动行数 Args: old_content: 旧内容 new_content: 新内容 Returns: 变动行数(+新增 -删除) """ old_lines = set(old_content.split('\n')) new_lines = set(new_content.split('\n')) added = len(new_lines - old_lines) removed = len(old_lines - new_lines) return added - removed def get_file_diff(self, local_content: str, db_content: str, max_lines: int = 1000) -> Dict: """ 获取文件差异(支持大文件限制) Args: local_content: 本地内容 db_content: 数据库内容 max_lines: 最大显示行数(防止大文件差异过大) Returns: 差异信息 """ local_lines = local_content.split('\n') db_lines = db_content.split('\n') # 限制行数(大文件只显示头尾) truncated = False if len(local_lines) > max_lines: local_head = local_lines[:max_lines//2] local_tail = local_lines[-max_lines//2:] local_lines = local_head + [f'... (中间省略 {len(local_lines) - max_lines} 行) ...'] + local_tail truncated = True if len(db_lines) > max_lines: db_head = db_lines[:max_lines//2] db_tail = db_lines[-max_lines//2:] db_lines = db_head + [f'... (中间省略 {len(db_lines) - max_lines} 行) ...'] + db_tail truncated = True # 计算变动行数 lines_changed = self.calculate_lines_changed(local_content, db_content) return { 'local_lines': local_lines, 'db_lines': db_lines, 'has_diff': local_content != db_content, 'is_truncated': truncated, 'lines_changed': lines_changed } class AuditLogger: """ 操作日志记录器 记录所有同步操作,包括: - 操作人、操作时间 - 数据源(local/database/manual) - 变动行数 - 执行时间 """ def __init__(self): self.model = None # 延迟导入模型(避免循环导入) from .models import SyncHistory self.model = SyncHistory def log_sync_action( self, lobster_id: str, file_path: str, action: str, old_version: int = None, new_version: int = None, old_hash: str = None, new_hash: str = None, file_size: int = 0, lines_changed: int = 0, source: str = 'local', operator: str = 'system', status: str = 'success', error_message: str = None, execution_time: float = 0 ): """ 记录同步操作 Args: lobster_id: 龙虾ID file_path: 文件路径 action: 操作类型 old_version: 操作前版本 new_version: 操作后版本 old_hash: 操作前哈希 new_hash: 操作后哈希 file_size: 文件大小 lines_changed: 变动行数 source: 数据源 operator: 操作者 status: 操作状态 error_message: 错误信息 execution_time: 执行时间 """ self.model.objects.create( lobster_id=lobster_id, file_path=file_path, action=action, old_version=old_version, new_version=new_version, old_hash=old_hash, new_hash=new_hash, file_size=file_size, lines_changed=lines_changed, source=source, operator=operator, status=status, error_message=error_message, execution_time=execution_time, created_at=timezone.now() ) def get_history( self, lobster_id: str = None, file_path: str = None, action: str = None, limit: int = 100 ) -> List[Dict]: """ 获取操作历史 Args: lobster_id: 龙虾ID(可选) file_path: 文件路径(可选) action: 操作类型(可选) limit: 返回数量限制 Returns: 操作历史列表 """ queryset = self.model.objects.all() if lobster_id: queryset = queryset.filter(lobster_id=lobster_id) if file_path: queryset = queryset.filter(file_path=file_path) if action: queryset = queryset.filter(action=action) records = queryset.order_by('-created_at')[:limit] return [ { 'id': r.id, 'lobster_id': r.lobster_id, 'file_path': r.file_path, 'action': r.action, 'status': r.status, 'source': r.source, 'old_version': r.old_version, 'new_version': r.new_version, 'old_hash': r.old_hash, 'new_hash': r.new_hash, 'file_size': r.file_size, 'lines_changed': r.lines_changed, 'operator': r.operator, 'error_message': r.error_message, 'execution_time': r.execution_time, 'created_at': r.created_at.isoformat(), } for r in records ]