297 lines
8.3 KiB
Python
297 lines
8.3 KiB
Python
|
|
#!/usr/bin/env python3
|
|||
|
|
"""
|
|||
|
|
龙虾记忆同步系统 - 功能测试脚本
|
|||
|
|
|
|||
|
|
测试内容:
|
|||
|
|
1. 分块读取功能
|
|||
|
|
2. .lobsterignore 匹配
|
|||
|
|
3. 审计日志记录
|
|||
|
|
4. 语义摘要生成
|
|||
|
|
5. 冲突判定逻辑
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import os
|
|||
|
|
import sys
|
|||
|
|
import django
|
|||
|
|
|
|||
|
|
# 添加项目路径
|
|||
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'backend'))
|
|||
|
|
|
|||
|
|
# 设置 Django 环境
|
|||
|
|
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'memory_sync.settings')
|
|||
|
|
|
|||
|
|
# 配置数据库(测试用临时 SQLite)
|
|||
|
|
os.environ['DB_HOST'] = 'localhost'
|
|||
|
|
os.environ['DB_NAME'] = 'test_lobster_memory'
|
|||
|
|
os.environ['DB_USER'] = 'postgres'
|
|||
|
|
os.environ['DB_PASSWORD'] = 'postgres'
|
|||
|
|
os.environ['DB_PORT'] = '5432'
|
|||
|
|
|
|||
|
|
django.setup()
|
|||
|
|
|
|||
|
|
from pathlib import Path
|
|||
|
|
from memory_app.services import (
|
|||
|
|
FileScanner, IgnorePattern, DiffChecker, AuditLogger,
|
|||
|
|
SemanticSummaryGenerator
|
|||
|
|
)
|
|||
|
|
from memory_app.models import LobsterMemory, SyncHistory
|
|||
|
|
|
|||
|
|
|
|||
|
|
def test_chunked_reading():
|
|||
|
|
"""测试分块读取功能"""
|
|||
|
|
print("\n" + "="*60)
|
|||
|
|
print("测试 1: 分块读取功能")
|
|||
|
|
print("="*60)
|
|||
|
|
|
|||
|
|
# 创建测试文件
|
|||
|
|
test_file = Path("/tmp/test_large_file.txt")
|
|||
|
|
test_content = "Hello World\n" * 10000 # ~110KB
|
|||
|
|
|
|||
|
|
with open(test_file, 'w', encoding='utf-8') as f:
|
|||
|
|
f.write(test_content)
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
scanner = FileScanner()
|
|||
|
|
scanner.base_dir = Path("/tmp")
|
|||
|
|
|
|||
|
|
# 使用分块读取
|
|||
|
|
content, hash_value = scanner.get_file_content("test_large_file.txt", chunked=True)
|
|||
|
|
|
|||
|
|
print(f"✓ 文件大小: {len(test_content)} 字节")
|
|||
|
|
print(f"✓ 分块读取成功: {len(content)} 字节")
|
|||
|
|
print(f"✓ 哈希值: {hash_value[:16]}...")
|
|||
|
|
print(f"✓ 分块大小: {scanner.chunk_size} 字节")
|
|||
|
|
|
|||
|
|
finally:
|
|||
|
|
test_file.unlink()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def test_lobsterignore():
|
|||
|
|
"""测试 .lobsterignore 匹配"""
|
|||
|
|
print("\n" + "="*60)
|
|||
|
|
print("测试 2: .lobsterignore 匹配")
|
|||
|
|
print("="*60)
|
|||
|
|
|
|||
|
|
# 创建测试目录和文件
|
|||
|
|
test_dir = Path("/tmp/test_lobsterignore")
|
|||
|
|
test_dir.mkdir(exist_ok=True)
|
|||
|
|
|
|||
|
|
# 创建 .lobsterignore 文件
|
|||
|
|
ignore_file = test_dir / ".lobsterignore"
|
|||
|
|
ignore_content = """
|
|||
|
|
# 注释行
|
|||
|
|
*.pyc
|
|||
|
|
__pycache__/
|
|||
|
|
node_modules/
|
|||
|
|
test_*.py
|
|||
|
|
re:.*\\.log$
|
|||
|
|
"""
|
|||
|
|
with open(ignore_file, 'w', encoding='utf-8') as f:
|
|||
|
|
f.write(ignore_content)
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
ignore = IgnorePattern(test_dir)
|
|||
|
|
|
|||
|
|
# 测试文件
|
|||
|
|
test_cases = [
|
|||
|
|
("test.py", False),
|
|||
|
|
("app.pyc", True),
|
|||
|
|
("__pycache__/module.pyc", True),
|
|||
|
|
("node_modules/index.js", True),
|
|||
|
|
("test_main.py", True),
|
|||
|
|
("app.log", True),
|
|||
|
|
("app.txt", False),
|
|||
|
|
("test_api.py", True),
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
for filename, expected in test_cases:
|
|||
|
|
file_path = test_dir / filename
|
|||
|
|
result = ignore.is_ignored(file_path)
|
|||
|
|
status = "✓" if result == expected else "✗"
|
|||
|
|
print(f"{status} {filename}: {result} (期望: {expected})")
|
|||
|
|
|
|||
|
|
print(f"\n✓ 加载的规则数: {len(ignore.patterns)}")
|
|||
|
|
for pattern_type, pattern, _ in ignore.patterns:
|
|||
|
|
print(f" - [{pattern_type}] {pattern}")
|
|||
|
|
|
|||
|
|
finally:
|
|||
|
|
import shutil
|
|||
|
|
shutil.rmtree(test_dir, ignore_errors=True)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def test_audit_log():
|
|||
|
|
"""测试审计日志"""
|
|||
|
|
print("\n" + "="*60)
|
|||
|
|
print("测试 3: 审计日志")
|
|||
|
|
print("="*60)
|
|||
|
|
|
|||
|
|
# 检查数据库连接
|
|||
|
|
try:
|
|||
|
|
from django.db import connection
|
|||
|
|
with connection.cursor() as cursor:
|
|||
|
|
cursor.execute("SELECT 1")
|
|||
|
|
print("✓ 数据库连接成功")
|
|||
|
|
|
|||
|
|
# 创建测试记录
|
|||
|
|
audit_logger = AuditLogger()
|
|||
|
|
audit_logger.log_sync_action(
|
|||
|
|
lobster_id="test_lobster",
|
|||
|
|
file_path="test.md",
|
|||
|
|
action="sync_to_db",
|
|||
|
|
old_version=1,
|
|||
|
|
new_version=2,
|
|||
|
|
old_hash="abc123",
|
|||
|
|
new_hash="def456",
|
|||
|
|
file_size=1024,
|
|||
|
|
lines_changed=10,
|
|||
|
|
source="local",
|
|||
|
|
operator="test_user",
|
|||
|
|
status="success",
|
|||
|
|
execution_time=0.123
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 查询历史
|
|||
|
|
history = audit_logger.get_history(lobster_id="test_lobster", limit=1)
|
|||
|
|
|
|||
|
|
if history:
|
|||
|
|
print(f"✓ 日志记录成功")
|
|||
|
|
print(f" - 操作: {history[0]['action']}")
|
|||
|
|
print(f" - 操作者: {history[0]['operator']}")
|
|||
|
|
print(f" - 变动行数: {history[0]['lines_changed']}")
|
|||
|
|
print(f" - 数据源: {history[0]['source']}")
|
|||
|
|
else:
|
|||
|
|
print("✗ 未查询到日志")
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"⚠ 数据库测试跳过(需要配置数据库): {e}")
|
|||
|
|
|
|||
|
|
|
|||
|
|
def test_semantic_summary():
|
|||
|
|
"""测试语义摘要"""
|
|||
|
|
print("\n" + "="*60)
|
|||
|
|
print("测试 4: 语义摘要")
|
|||
|
|
print("="*60)
|
|||
|
|
|
|||
|
|
generator = SemanticSummaryGenerator()
|
|||
|
|
|
|||
|
|
# 测试短文本
|
|||
|
|
short_text = "这是一个简短的测试文本。"
|
|||
|
|
summary = generator.generate_summary(short_text)
|
|||
|
|
print(f"✓ 短文本摘要: {summary}")
|
|||
|
|
|
|||
|
|
# 测试长文本
|
|||
|
|
long_text = "\n".join([f"这是第 {i} 行的测试内容。" for i in range(100)])
|
|||
|
|
summary = generator.generate_summary(long_text)
|
|||
|
|
print(f"✓ 长文本摘要: {summary[:50]}...")
|
|||
|
|
print(f"✓ 摘要长度: {len(summary)} 字符")
|
|||
|
|
|
|||
|
|
|
|||
|
|
def test_conflict_detection():
|
|||
|
|
"""测试冲突判定"""
|
|||
|
|
print("\n" + "="*60)
|
|||
|
|
print("测试 5: 冲突判定")
|
|||
|
|
print("="*60)
|
|||
|
|
|
|||
|
|
checker = DiffChecker()
|
|||
|
|
|
|||
|
|
# 模拟本地文件和数据库文件
|
|||
|
|
local_files = [
|
|||
|
|
{'file_path': 'file1.md', 'hash': 'abc123', 'updated_at': None},
|
|||
|
|
{'file_path': 'file2.md', 'hash': 'def456', 'updated_at': None},
|
|||
|
|
{'file_path': 'file3.md', 'hash': 'xyz789', 'updated_at': None},
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
from datetime import datetime, timedelta
|
|||
|
|
db_files = [
|
|||
|
|
{'file_path': 'file1.md', 'hash': 'abc123', 'version': 1, 'updated_at': datetime.now()},
|
|||
|
|
{'file_path': 'file2.md', 'hash': 'aaa111', 'version': 1, 'updated_at': datetime.now() - timedelta(hours=2)},
|
|||
|
|
{'file_path': 'file4.md', 'hash': 'bbb222', 'version': 1, 'updated_at': datetime.now()},
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
# 测试严重冲突判定
|
|||
|
|
db_files_hard_conflict = [
|
|||
|
|
{'file_path': 'file3.md', 'hash': 'zzz999', 'version': 2, 'updated_at': datetime.now() - timedelta(minutes=30)},
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
status = checker.check_sync_status(local_files, db_files)
|
|||
|
|
|
|||
|
|
print(f"✓ 一致: {len(status['consistent'])} 个")
|
|||
|
|
print(f"✓ 冲突: {len(status['conflict'])} 个")
|
|||
|
|
print(f"✓ 仅本地: {len(status['local_only'])} 个")
|
|||
|
|
print(f"✓ 仅数据库: {len(status['db_only'])} 个")
|
|||
|
|
|
|||
|
|
# 测试严重冲突
|
|||
|
|
status_hard = checker.check_sync_status(local_files, db_files_hard_conflict)
|
|||
|
|
print(f"✓ 严重冲突: {len(status_hard['hard_conflict'])} 个")
|
|||
|
|
if status_hard['hard_conflict']:
|
|||
|
|
conflict = status_hard['hard_conflict'][0]
|
|||
|
|
print(f" - 文件: {conflict['file_path']}")
|
|||
|
|
print(f" - 版本: {conflict['version']}")
|
|||
|
|
print(f" - 状态: {conflict['status']}")
|
|||
|
|
|
|||
|
|
|
|||
|
|
def test_lines_changed():
|
|||
|
|
"""测试变动行数计算"""
|
|||
|
|
print("\n" + "="*60)
|
|||
|
|
print("测试 6: 变动行数计算")
|
|||
|
|
print("="*60)
|
|||
|
|
|
|||
|
|
checker = DiffChecker()
|
|||
|
|
|
|||
|
|
# 测试用例
|
|||
|
|
test_cases = [
|
|||
|
|
(
|
|||
|
|
"line1\nline2\nline3",
|
|||
|
|
"line1\nline2\nline3",
|
|||
|
|
0
|
|||
|
|
),
|
|||
|
|
(
|
|||
|
|
"line1\nline2",
|
|||
|
|
"line1\nline2\nline3\nline4",
|
|||
|
|
2
|
|||
|
|
),
|
|||
|
|
(
|
|||
|
|
"line1\nline2\nline3\nline4",
|
|||
|
|
"line1\nline2",
|
|||
|
|
-2
|
|||
|
|
),
|
|||
|
|
(
|
|||
|
|
"line1\nline2",
|
|||
|
|
"line1\nline3\nline4",
|
|||
|
|
1
|
|||
|
|
),
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
for old_content, new_content, expected in test_cases:
|
|||
|
|
result = checker.calculate_lines_changed(old_content, new_content)
|
|||
|
|
status = "✓" if result == expected else "✗"
|
|||
|
|
print(f"{status} 变动行数: {result} (期望: {expected})")
|
|||
|
|
|
|||
|
|
|
|||
|
|
def main():
|
|||
|
|
"""运行所有测试"""
|
|||
|
|
print("\n" + "="*60)
|
|||
|
|
print("龙虾记忆同步系统 - 功能测试")
|
|||
|
|
print("="*60)
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
test_chunked_reading()
|
|||
|
|
test_lobsterignore()
|
|||
|
|
test_audit_log()
|
|||
|
|
test_semantic_summary()
|
|||
|
|
test_conflict_detection()
|
|||
|
|
test_lines_changed()
|
|||
|
|
|
|||
|
|
print("\n" + "="*60)
|
|||
|
|
print("✓ 所有测试完成!")
|
|||
|
|
print("="*60)
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"\n✗ 测试失败: {e}")
|
|||
|
|
import traceback
|
|||
|
|
traceback.print_exc()
|
|||
|
|
sys.exit(1)
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__ == '__main__':
|
|||
|
|
main()
|