import os import hashlib from pathlib import Path from typing import List, Dict, Tuple from django.conf import settings class FileScanner: """文件扫描器""" def __init__(self): self.base_dir = Path(settings.LOBSTER_MEMORY_BASE) self.supported_extensions = settings.SUPPORTED_EXTENSIONS def scan_directory(self, lobster_id: str = None) -> List[Dict]: """ 扫描目录,返回所有文件信息 Args: lobster_id: 龙虾ID(可选) Returns: 文件信息列表 """ if not self.base_dir.exists(): return [] files = [] for file_path in self.base_dir.rglob('*'): if file_path.is_file() and file_path.suffix in self.supported_extensions: try: relative_path = file_path.relative_to(self.base_dir) content = file_path.read_text(encoding='utf-8', errors='ignore') file_hash = self.compute_hash(content) files.append({ 'file_path': str(relative_path), 'full_path': str(file_path), 'content': content, 'hash': file_hash, 'size': file_path.stat().st_size, 'lobster_id': lobster_id or 'unknown', }) except Exception as e: print(f"Error reading {file_path}: {e}") return files def get_file_content(self, file_path: str) -> Tuple[str, str]: """ 获取文件内容和哈希 Args: file_path: 相对路径 Returns: (content, hash) """ full_path = self.base_dir / file_path if not full_path.exists(): raise FileNotFoundError(f"File not found: {file_path}") content = full_path.read_text(encoding='utf-8', errors='ignore') file_hash = self.compute_hash(content) return content, file_hash def write_file(self, file_path: str, content: str): """ 写入文件 Args: file_path: 相对路径 content: 文件内容 """ full_path = self.base_dir / file_path # 确保目录存在 full_path.parent.mkdir(parents=True, exist_ok=True) # 写入文件 full_path.write_text(content, encoding='utf-8') def compute_hash(self, content: str) -> str: """ 计算SHA256哈希 Args: content: 文件内容 Returns: 哈希值 """ return hashlib.sha256(content.encode('utf-8')).hexdigest() def get_file_tree(self, lobster_id: str = None) -> Dict: """ 获取文件树结构 Args: lobster_id: 龙虾ID Returns: 文件树字典 """ files = self.scan_directory(lobster_id) tree = {} for file_info in files: parts = Path(file_info['file_path']).parts current = tree for part in parts[:-1]: if part not in current: current[part] = {} current = current[part] filename = parts[-1] current[filename] = file_info return tree class DiffChecker: """差异检查器""" def __init__(self): self.scanner = FileScanner() def check_sync_status(self, local_files: List[Dict], db_files: List[Dict]) -> Dict: """ 检查同步状态 Args: local_files: 本地文件列表 db_files: 数据库文件列表 Returns: 同步状态字典 """ local_map = {f['file_path']: f for f in local_files} db_map = {f['file_path']: f for f in db_files} results = { 'consistent': [], 'local_newer': [], 'db_newer': [], 'conflict': [], 'local_only': [], 'db_only': [], } all_paths = set(local_map.keys()) | set(db_map.keys()) for path in all_paths: local = local_map.get(path) db = db_map.get(path) if local and db: # 两边都存在 if local['hash'] == db['hash']: results['consistent'].append({ 'file_path': path, 'status': 'consistent' }) else: # 比较更新时间 local_time = db.get('updated_at') if db else None if local_time: # 数据库有更新时间,比较 if local['hash'] != db['hash']: results['conflict'].append({ 'file_path': path, 'status': 'conflict', 'local_hash': local['hash'], 'db_hash': db['hash'] }) else: # 无法判断,标记为冲突 results['conflict'].append({ 'file_path': path, 'status': 'conflict', 'local_hash': local['hash'], 'db_hash': db['hash'] }) elif local and not db: # 只有本地 results['local_only'].append({ 'file_path': path, 'status': 'local_only' }) elif not local and db: # 只有数据库 results['db_only'].append({ 'file_path': path, 'status': 'db_only' }) return results def get_file_diff(self, local_content: str, db_content: str) -> Dict: """ 获取文件差异(简单版) Args: local_content: 本地内容 db_content: 数据库内容 Returns: 差异信息 """ # 这里可以使用 difflib 或其他差异库 # 简单实现,后续可以用 react-diff-viewer 在前端显示 return { 'local_lines': local_content.split('\n'), 'db_lines': db_content.split('\n'), 'has_diff': local_content != db_content }