import os import hashlib import fnmatch import time from pathlib import Path from typing import List, Dict, Tuple, Iterator from django.conf import settings from django.utils import timezone class IgnorePattern: """.lobsterignore 模式匹配器""" def __init__(self, base_dir: Path): self.base_dir = base_dir self.patterns = [] self.load_patterns() def load_patterns(self): """加载 .lobsterignore 文件""" ignore_file = self.base_dir / '.lobsterignore' if ignore_file.exists(): with open(ignore_file, 'r', encoding='utf-8') as f: for line in f: line = line.strip() # 跳过空行和注释 if line and not line.startswith('#'): self.patterns.append(line) # 添加默认忽略规则 default_patterns = [ '.DS_Store', '.git', '.gitignore', '__pycache__', 'node_modules', '*.pyc', '*.pyo', '*.log', '*.tmp', '*.temp', '*.bak', '.vscode', '.idea' ] for pattern in default_patterns: if pattern not in self.patterns: self.patterns.append(pattern) def is_ignored(self, file_path: Path) -> bool: """ 判断文件是否被忽略 Args: file_path: 文件路径(绝对路径) Returns: 是否被忽略 """ relative_path = file_path.relative_to(self.base_dir) for pattern in self.patterns: # 匹配文件名 if fnmatch.fnmatch(file_path.name, pattern): return True # 匹配相对路径 if fnmatch.fnmatch(str(relative_path), pattern): return True # 匹配目录 if pattern.endswith('/') and fnmatch.fnmatch(str(relative_path.parent), pattern.rstrip('/')): return True # 递归匹配子目录 if pattern.startswith('*/'): parts = str(relative_path).split(os.sep) for i, part in enumerate(parts): if fnmatch.fnmatch(part, pattern[2:]): return True return False class FileScanner: """文件扫描器(支持 .lobsterignore 和分块读取)""" def __init__(self): self.base_dir = Path(settings.LOBSTER_MEMORY_BASE) self.supported_extensions = settings.SUPPORTED_EXTENSIONS self.ignore = IgnorePattern(self.base_dir) self.chunk_size = 8192 # 8KB 分块读取 def scan_directory(self, lobster_id: str = None) -> List[Dict]: """ 扫描目录,返回所有文件信息 Args: lobster_id: 龙虾ID(可选) Returns: 文件信息列表 """ if not self.base_dir.exists(): return [] files = [] for file_path in self.base_dir.rglob('*'): if not file_path.is_file(): continue # 检查文件扩展名 if file_path.suffix not in self.supported_extensions: continue # 检查是否被 .lobsterignore 忽略 if self.ignore.is_ignored(file_path): continue try: relative_path = file_path.relative_to(self.base_dir) # 使用流式读取获取哈希(避免大文件内存问题) file_hash = self.compute_hash_stream(file_path) files.append({ 'file_path': str(relative_path), 'full_path': str(file_path), 'hash': file_hash, 'size': file_path.stat().st_size, 'lobster_id': lobster_id or 'unknown', }) except Exception as e: print(f"Error reading {file_path}: {e}") return files def get_file_content(self, file_path: str, chunked: bool = False) -> Tuple[str, str]: """ 获取文件内容和哈希 Args: file_path: 相对路径 chunked: 是否使用分块读取 Returns: (content, hash) """ full_path = self.base_dir / file_path if not full_path.exists(): raise FileNotFoundError(f"File not found: {file_path}") # 对于大文件(>50MB),使用分块读取 file_size = full_path.stat().st_size if chunked and file_size > 50 * 1024 * 1024: content = self.read_file_chunked(full_path) else: content = full_path.read_text(encoding='utf-8', errors='ignore') file_hash = self.compute_hash(content) return content, file_hash def read_file_chunked(self, file_path: Path) -> str: """ 分块读取文件 Args: file_path: 文件路径 Returns: 文件内容 """ content_parts = [] with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: while True: chunk = f.read(self.chunk_size) if not chunk: break content_parts.append(chunk) return ''.join(content_parts) def read_file_stream(self, file_path: str) -> Iterator[str]: """ 流式读取文件(用于大文件传输) Args: file_path: 相对路径 Yields: 文件块 """ full_path = self.base_dir / file_path if not full_path.exists(): raise FileNotFoundError(f"File not found: {file_path}") with open(full_path, 'r', encoding='utf-8', errors='ignore') as f: while True: chunk = f.read(self.chunk_size) if not chunk: break yield chunk def write_file(self, file_path: str, content: str): """ 写入文件 Args: file_path: 相对路径 content: 文件内容 """ full_path = self.base_dir / file_path # 确保目录存在 full_path.parent.mkdir(parents=True, exist_ok=True) # 写入文件 full_path.write_text(content, encoding='utf-8') def compute_hash(self, content: str) -> str: """ 计算SHA256哈希 Args: content: 文件内容 Returns: 哈希值 """ return hashlib.sha256(content.encode('utf-8')).hexdigest() def compute_hash_stream(self, file_path: Path) -> str: """ 流式计算文件哈希(避免大文件内存问题) Args: file_path: 文件路径 Returns: 哈希值 """ hash_obj = hashlib.sha256() with open(file_path, 'rb') as f: while True: chunk = f.read(self.chunk_size) if not chunk: break hash_obj.update(chunk) return hash_obj.hexdigest() def get_file_tree(self, lobster_id: str = None) -> Dict: """ 获取文件树结构 Args: lobster_id: 龙虾ID Returns: 文件树字典 """ files = self.scan_directory(lobster_id) tree = {} for file_info in files: parts = Path(file_info['file_path']).parts current = tree for part in parts[:-1]: if part not in current: current[part] = {} current = current[part] filename = parts[-1] current[filename] = file_info return tree class DiffChecker: """差异检查器(支持大文件优化)""" def __init__(self): self.scanner = FileScanner() def check_sync_status(self, local_files: List[Dict], db_files: List[Dict]) -> Dict: """ 检查同步状态 Args: local_files: 本地文件列表 db_files: 数据库文件列表 Returns: 同步状态字典 """ local_map = {f['file_path']: f for f in local_files} db_map = {f['file_path']: f for f in db_files} results = { 'consistent': [], 'local_newer': [], 'db_newer': [], 'conflict': [], 'local_only': [], 'db_only': [], } all_paths = set(local_map.keys()) | set(db_map.keys()) for path in all_paths: local = local_map.get(path) db = db_map.get(path) if local and db: # 两边都存在 if local['hash'] == db['hash']: results['consistent'].append({ 'file_path': path, 'status': 'consistent' }) else: # 比较更新时间 local_time = db.get('updated_at') if db else None if local_time: # 数据库有更新时间,比较 if local['hash'] != db['hash']: results['conflict'].append({ 'file_path': path, 'status': 'conflict', 'local_hash': local['hash'], 'db_hash': db['hash'] }) else: # 无法判断,标记为冲突 results['conflict'].append({ 'file_path': path, 'status': 'conflict', 'local_hash': local['hash'], 'db_hash': db['hash'] }) elif local and not db: # 只有本地 results['local_only'].append({ 'file_path': path, 'status': 'local_only' }) elif not local and db: # 只有数据库 results['db_only'].append({ 'file_path': path, 'status': 'db_only' }) return results def get_file_diff(self, local_content: str, db_content: str, max_lines: int = 1000) -> Dict: """ 获取文件差异(支持大文件限制) Args: local_content: 本地内容 db_content: 数据库内容 max_lines: 最大显示行数(防止大文件差异过大) Returns: 差异信息 """ local_lines = local_content.split('\n') db_lines = db_content.split('\n') # 限制行数(大文件只显示头尾) if len(local_lines) > max_lines: local_head = local_lines[:max_lines//2] local_tail = local_lines[-max_lines//2:] local_lines = local_head + ['... (中间省略 {}) 行 ...'.format(len(local_lines) - max_lines)] + local_tail if len(db_lines) > max_lines: db_head = db_lines[:max_lines//2] db_tail = db_lines[-max_lines//2:] db_lines = db_head + ['... (中间省略 {}) 行 ...'.format(len(db_lines) - max_lines)] + db_tail return { 'local_lines': local_lines, 'db_lines': db_lines, 'has_diff': local_content != db_content, 'is_truncated': len(local_lines) > max_lines or len(db_lines) > max_lines } class AuditLogger: """操作日志记录器""" def __init__(self): self.model = None # 延迟导入模型(避免循环导入) from .models import SyncHistory self.model = SyncHistory def log_sync_action( self, lobster_id: str, file_path: str, action: str, old_version: int = None, new_version: int = None, old_hash: str = None, new_hash: str = None, file_size: int = 0, operator: str = 'system', status: str = 'success', error_message: str = None, execution_time: float = 0 ): """ 记录同步操作 Args: lobster_id: 龙虾ID file_path: 文件路径 action: 操作类型 old_version: 操作前版本 new_version: 操作后版本 old_hash: 操作前哈希 new_hash: 操作后哈希 file_size: 文件大小 operator: 操作者 status: 操作状态 error_message: 错误信息 execution_time: 执行时间 """ self.model.objects.create( lobster_id=lobster_id, file_path=file_path, action=action, old_version=old_version, new_version=new_version, old_hash=old_hash, new_hash=new_hash, file_size=file_size, operator=operator, status=status, error_message=error_message, execution_time=execution_time, created_at=timezone.now() ) def get_history( self, lobster_id: str = None, file_path: str = None, action: str = None, limit: int = 100 ) -> List[Dict]: """ 获取操作历史 Args: lobster_id: 龙虾ID(可选) file_path: 文件路径(可选) action: 操作类型(可选) limit: 返回数量限制 Returns: 操作历史列表 """ queryset = self.model.objects.all() if lobster_id: queryset = queryset.filter(lobster_id=lobster_id) if file_path: queryset = queryset.filter(file_path=file_path) if action: queryset = queryset.filter(action=action) records = queryset.order_by('-created_at')[:limit] return [ { 'id': r.id, 'lobster_id': r.lobster_id, 'file_path': r.file_path, 'action': r.action, 'status': r.status, 'old_version': r.old_version, 'new_version': r.new_version, 'old_hash': r.old_hash, 'new_hash': r.new_hash, 'file_size': r.file_size, 'operator': r.operator, 'error_message': r.error_message, 'execution_time': r.execution_time, 'created_at': r.created_at.isoformat(), } for r in records ]