#!/usr/bin/env python3 """ 龙虾记忆同步系统 - 功能测试脚本 测试内容: 1. 分块读取功能 2. .lobsterignore 匹配 3. 审计日志记录 4. 语义摘要生成 5. 冲突判定逻辑 """ import os import sys import django # 添加项目路径 sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'backend')) # 设置 Django 环境 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'memory_sync.settings') # 配置数据库(测试用临时 SQLite) os.environ['DB_HOST'] = 'localhost' os.environ['DB_NAME'] = 'test_lobster_memory' os.environ['DB_USER'] = 'postgres' os.environ['DB_PASSWORD'] = 'postgres' os.environ['DB_PORT'] = '5432' django.setup() from pathlib import Path from memory_app.services import ( FileScanner, IgnorePattern, DiffChecker, AuditLogger, SemanticSummaryGenerator ) from memory_app.models import LobsterMemory, SyncHistory def test_chunked_reading(): """测试分块读取功能""" print("\n" + "="*60) print("测试 1: 分块读取功能") print("="*60) # 创建测试文件 test_file = Path("/tmp/test_large_file.txt") test_content = "Hello World\n" * 10000 # ~110KB with open(test_file, 'w', encoding='utf-8') as f: f.write(test_content) try: scanner = FileScanner() scanner.base_dir = Path("/tmp") # 使用分块读取 content, hash_value = scanner.get_file_content("test_large_file.txt", chunked=True) print(f"✓ 文件大小: {len(test_content)} 字节") print(f"✓ 分块读取成功: {len(content)} 字节") print(f"✓ 哈希值: {hash_value[:16]}...") print(f"✓ 分块大小: {scanner.chunk_size} 字节") finally: test_file.unlink() def test_lobsterignore(): """测试 .lobsterignore 匹配""" print("\n" + "="*60) print("测试 2: .lobsterignore 匹配") print("="*60) # 创建测试目录和文件 test_dir = Path("/tmp/test_lobsterignore") test_dir.mkdir(exist_ok=True) # 创建 .lobsterignore 文件 ignore_file = test_dir / ".lobsterignore" ignore_content = """ # 注释行 *.pyc __pycache__/ node_modules/ test_*.py re:.*\\.log$ """ with open(ignore_file, 'w', encoding='utf-8') as f: f.write(ignore_content) try: ignore = IgnorePattern(test_dir) # 测试文件 test_cases = [ ("test.py", False), ("app.pyc", True), ("__pycache__/module.pyc", True), ("node_modules/index.js", True), ("test_main.py", True), ("app.log", True), ("app.txt", False), ("test_api.py", True), ] for filename, expected in test_cases: file_path = test_dir / filename result = ignore.is_ignored(file_path) status = "✓" if result == expected else "✗" print(f"{status} {filename}: {result} (期望: {expected})") print(f"\n✓ 加载的规则数: {len(ignore.patterns)}") for pattern_type, pattern, _ in ignore.patterns: print(f" - [{pattern_type}] {pattern}") finally: import shutil shutil.rmtree(test_dir, ignore_errors=True) def test_audit_log(): """测试审计日志""" print("\n" + "="*60) print("测试 3: 审计日志") print("="*60) # 检查数据库连接 try: from django.db import connection with connection.cursor() as cursor: cursor.execute("SELECT 1") print("✓ 数据库连接成功") # 创建测试记录 audit_logger = AuditLogger() audit_logger.log_sync_action( lobster_id="test_lobster", file_path="test.md", action="sync_to_db", old_version=1, new_version=2, old_hash="abc123", new_hash="def456", file_size=1024, lines_changed=10, source="local", operator="test_user", status="success", execution_time=0.123 ) # 查询历史 history = audit_logger.get_history(lobster_id="test_lobster", limit=1) if history: print(f"✓ 日志记录成功") print(f" - 操作: {history[0]['action']}") print(f" - 操作者: {history[0]['operator']}") print(f" - 变动行数: {history[0]['lines_changed']}") print(f" - 数据源: {history[0]['source']}") else: print("✗ 未查询到日志") except Exception as e: print(f"⚠ 数据库测试跳过(需要配置数据库): {e}") def test_semantic_summary(): """测试语义摘要""" print("\n" + "="*60) print("测试 4: 语义摘要") print("="*60) generator = SemanticSummaryGenerator() # 测试短文本 short_text = "这是一个简短的测试文本。" summary = generator.generate_summary(short_text) print(f"✓ 短文本摘要: {summary}") # 测试长文本 long_text = "\n".join([f"这是第 {i} 行的测试内容。" for i in range(100)]) summary = generator.generate_summary(long_text) print(f"✓ 长文本摘要: {summary[:50]}...") print(f"✓ 摘要长度: {len(summary)} 字符") def test_conflict_detection(): """测试冲突判定""" print("\n" + "="*60) print("测试 5: 冲突判定") print("="*60) checker = DiffChecker() # 模拟本地文件和数据库文件 local_files = [ {'file_path': 'file1.md', 'hash': 'abc123', 'updated_at': None}, {'file_path': 'file2.md', 'hash': 'def456', 'updated_at': None}, {'file_path': 'file3.md', 'hash': 'xyz789', 'updated_at': None}, ] from datetime import datetime, timedelta db_files = [ {'file_path': 'file1.md', 'hash': 'abc123', 'version': 1, 'updated_at': datetime.now()}, {'file_path': 'file2.md', 'hash': 'aaa111', 'version': 1, 'updated_at': datetime.now() - timedelta(hours=2)}, {'file_path': 'file4.md', 'hash': 'bbb222', 'version': 1, 'updated_at': datetime.now()}, ] # 测试严重冲突判定 db_files_hard_conflict = [ {'file_path': 'file3.md', 'hash': 'zzz999', 'version': 2, 'updated_at': datetime.now() - timedelta(minutes=30)}, ] status = checker.check_sync_status(local_files, db_files) print(f"✓ 一致: {len(status['consistent'])} 个") print(f"✓ 冲突: {len(status['conflict'])} 个") print(f"✓ 仅本地: {len(status['local_only'])} 个") print(f"✓ 仅数据库: {len(status['db_only'])} 个") # 测试严重冲突 status_hard = checker.check_sync_status(local_files, db_files_hard_conflict) print(f"✓ 严重冲突: {len(status_hard['hard_conflict'])} 个") if status_hard['hard_conflict']: conflict = status_hard['hard_conflict'][0] print(f" - 文件: {conflict['file_path']}") print(f" - 版本: {conflict['version']}") print(f" - 状态: {conflict['status']}") def test_lines_changed(): """测试变动行数计算""" print("\n" + "="*60) print("测试 6: 变动行数计算") print("="*60) checker = DiffChecker() # 测试用例 test_cases = [ ( "line1\nline2\nline3", "line1\nline2\nline3", 0 ), ( "line1\nline2", "line1\nline2\nline3\nline4", 2 ), ( "line1\nline2\nline3\nline4", "line1\nline2", -2 ), ( "line1\nline2", "line1\nline3\nline4", 1 ), ] for old_content, new_content, expected in test_cases: result = checker.calculate_lines_changed(old_content, new_content) status = "✓" if result == expected else "✗" print(f"{status} 变动行数: {result} (期望: {expected})") def main(): """运行所有测试""" print("\n" + "="*60) print("龙虾记忆同步系统 - 功能测试") print("="*60) try: test_chunked_reading() test_lobsterignore() test_audit_log() test_semantic_summary() test_conflict_detection() test_lines_changed() print("\n" + "="*60) print("✓ 所有测试完成!") print("="*60) except Exception as e: print(f"\n✗ 测试失败: {e}") import traceback traceback.print_exc() sys.exit(1) if __name__ == '__main__': main()