Files
openclaw-memory/backend/test_services.py

297 lines
8.3 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
龙虾记忆同步系统 - 功能测试脚本
测试内容
1. 分块读取功能
2. .lobsterignore 匹配
3. 审计日志记录
4. 语义摘要生成
5. 冲突判定逻辑
"""
import os
import sys
import django
# 添加项目路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'backend'))
# 设置 Django 环境
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'memory_sync.settings')
# 配置数据库(测试用临时 SQLite
os.environ['DB_HOST'] = 'localhost'
os.environ['DB_NAME'] = 'test_lobster_memory'
os.environ['DB_USER'] = 'postgres'
os.environ['DB_PASSWORD'] = 'postgres'
os.environ['DB_PORT'] = '5432'
django.setup()
from pathlib import Path
from memory_app.services import (
FileScanner, IgnorePattern, DiffChecker, AuditLogger,
SemanticSummaryGenerator
)
from memory_app.models import LobsterMemory, SyncHistory
def test_chunked_reading():
"""测试分块读取功能"""
print("\n" + "="*60)
print("测试 1: 分块读取功能")
print("="*60)
# 创建测试文件
test_file = Path("/tmp/test_large_file.txt")
test_content = "Hello World\n" * 10000 # ~110KB
with open(test_file, 'w', encoding='utf-8') as f:
f.write(test_content)
try:
scanner = FileScanner()
scanner.base_dir = Path("/tmp")
# 使用分块读取
content, hash_value = scanner.get_file_content("test_large_file.txt", chunked=True)
print(f"✓ 文件大小: {len(test_content)} 字节")
print(f"✓ 分块读取成功: {len(content)} 字节")
print(f"✓ 哈希值: {hash_value[:16]}...")
print(f"✓ 分块大小: {scanner.chunk_size} 字节")
finally:
test_file.unlink()
def test_lobsterignore():
"""测试 .lobsterignore 匹配"""
print("\n" + "="*60)
print("测试 2: .lobsterignore 匹配")
print("="*60)
# 创建测试目录和文件
test_dir = Path("/tmp/test_lobsterignore")
test_dir.mkdir(exist_ok=True)
# 创建 .lobsterignore 文件
ignore_file = test_dir / ".lobsterignore"
ignore_content = """
# 注释行
*.pyc
__pycache__/
node_modules/
test_*.py
re:.*\\.log$
"""
with open(ignore_file, 'w', encoding='utf-8') as f:
f.write(ignore_content)
try:
ignore = IgnorePattern(test_dir)
# 测试文件
test_cases = [
("test.py", False),
("app.pyc", True),
("__pycache__/module.pyc", True),
("node_modules/index.js", True),
("test_main.py", True),
("app.log", True),
("app.txt", False),
("test_api.py", True),
]
for filename, expected in test_cases:
file_path = test_dir / filename
result = ignore.is_ignored(file_path)
status = "" if result == expected else ""
print(f"{status} {filename}: {result} (期望: {expected})")
print(f"\n✓ 加载的规则数: {len(ignore.patterns)}")
for pattern_type, pattern, _ in ignore.patterns:
print(f" - [{pattern_type}] {pattern}")
finally:
import shutil
shutil.rmtree(test_dir, ignore_errors=True)
def test_audit_log():
"""测试审计日志"""
print("\n" + "="*60)
print("测试 3: 审计日志")
print("="*60)
# 检查数据库连接
try:
from django.db import connection
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
print("✓ 数据库连接成功")
# 创建测试记录
audit_logger = AuditLogger()
audit_logger.log_sync_action(
lobster_id="test_lobster",
file_path="test.md",
action="sync_to_db",
old_version=1,
new_version=2,
old_hash="abc123",
new_hash="def456",
file_size=1024,
lines_changed=10,
source="local",
operator="test_user",
status="success",
execution_time=0.123
)
# 查询历史
history = audit_logger.get_history(lobster_id="test_lobster", limit=1)
if history:
print(f"✓ 日志记录成功")
print(f" - 操作: {history[0]['action']}")
print(f" - 操作者: {history[0]['operator']}")
print(f" - 变动行数: {history[0]['lines_changed']}")
print(f" - 数据源: {history[0]['source']}")
else:
print("✗ 未查询到日志")
except Exception as e:
print(f"⚠ 数据库测试跳过(需要配置数据库): {e}")
def test_semantic_summary():
"""测试语义摘要"""
print("\n" + "="*60)
print("测试 4: 语义摘要")
print("="*60)
generator = SemanticSummaryGenerator()
# 测试短文本
short_text = "这是一个简短的测试文本。"
summary = generator.generate_summary(short_text)
print(f"✓ 短文本摘要: {summary}")
# 测试长文本
long_text = "\n".join([f"这是第 {i} 行的测试内容。" for i in range(100)])
summary = generator.generate_summary(long_text)
print(f"✓ 长文本摘要: {summary[:50]}...")
print(f"✓ 摘要长度: {len(summary)} 字符")
def test_conflict_detection():
"""测试冲突判定"""
print("\n" + "="*60)
print("测试 5: 冲突判定")
print("="*60)
checker = DiffChecker()
# 模拟本地文件和数据库文件
local_files = [
{'file_path': 'file1.md', 'hash': 'abc123', 'updated_at': None},
{'file_path': 'file2.md', 'hash': 'def456', 'updated_at': None},
{'file_path': 'file3.md', 'hash': 'xyz789', 'updated_at': None},
]
from datetime import datetime, timedelta
db_files = [
{'file_path': 'file1.md', 'hash': 'abc123', 'version': 1, 'updated_at': datetime.now()},
{'file_path': 'file2.md', 'hash': 'aaa111', 'version': 1, 'updated_at': datetime.now() - timedelta(hours=2)},
{'file_path': 'file4.md', 'hash': 'bbb222', 'version': 1, 'updated_at': datetime.now()},
]
# 测试严重冲突判定
db_files_hard_conflict = [
{'file_path': 'file3.md', 'hash': 'zzz999', 'version': 2, 'updated_at': datetime.now() - timedelta(minutes=30)},
]
status = checker.check_sync_status(local_files, db_files)
print(f"✓ 一致: {len(status['consistent'])}")
print(f"✓ 冲突: {len(status['conflict'])}")
print(f"✓ 仅本地: {len(status['local_only'])}")
print(f"✓ 仅数据库: {len(status['db_only'])}")
# 测试严重冲突
status_hard = checker.check_sync_status(local_files, db_files_hard_conflict)
print(f"✓ 严重冲突: {len(status_hard['hard_conflict'])}")
if status_hard['hard_conflict']:
conflict = status_hard['hard_conflict'][0]
print(f" - 文件: {conflict['file_path']}")
print(f" - 版本: {conflict['version']}")
print(f" - 状态: {conflict['status']}")
def test_lines_changed():
"""测试变动行数计算"""
print("\n" + "="*60)
print("测试 6: 变动行数计算")
print("="*60)
checker = DiffChecker()
# 测试用例
test_cases = [
(
"line1\nline2\nline3",
"line1\nline2\nline3",
0
),
(
"line1\nline2",
"line1\nline2\nline3\nline4",
2
),
(
"line1\nline2\nline3\nline4",
"line1\nline2",
-2
),
(
"line1\nline2",
"line1\nline3\nline4",
1
),
]
for old_content, new_content, expected in test_cases:
result = checker.calculate_lines_changed(old_content, new_content)
status = "" if result == expected else ""
print(f"{status} 变动行数: {result} (期望: {expected})")
def main():
"""运行所有测试"""
print("\n" + "="*60)
print("龙虾记忆同步系统 - 功能测试")
print("="*60)
try:
test_chunked_reading()
test_lobsterignore()
test_audit_log()
test_semantic_summary()
test_conflict_detection()
test_lines_changed()
print("\n" + "="*60)
print("✓ 所有测试完成!")
print("="*60)
except Exception as e:
print(f"\n✗ 测试失败: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == '__main__':
main()