Files
openclaw-memory/backend/test_services.py
道童 479d67923c feat: 完成所有功能模块并添加测试
完成内容:
1. 数据库迁移文件
   - 0001_initial.py: 初始表结构
   - 0002_add_summary_and_audit_fields.py: 添加语义摘要和审计字段
   - 新增 summary 字段
   - 新增 source, lines_changed 字段
   - 新增 hard_conflict 状态
   - 添加数据库索引优化查询

2. 功能测试脚本
   - test_services.py: 完整功能测试
   - 测试分块读取
   - 测试 .lobsterignore 匹配(含正则表达式)
   - 测试审计日志(包含变动行数和数据源)
   - 测试语义摘要生成
   - 测试冲突判定(包含 HARD_CONFLICT)
   - 测试变动行数计算

所有功能已完成并提交,代码注释清晰。
2026-04-05 14:17:31 +00:00

297 lines
8.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
龙虾记忆同步系统 - 功能测试脚本
测试内容:
1. 分块读取功能
2. .lobsterignore 匹配
3. 审计日志记录
4. 语义摘要生成
5. 冲突判定逻辑
"""
import os
import sys
import django
# 添加项目路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'backend'))
# 设置 Django 环境
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'memory_sync.settings')
# 配置数据库(测试用临时 SQLite
os.environ['DB_HOST'] = 'localhost'
os.environ['DB_NAME'] = 'test_lobster_memory'
os.environ['DB_USER'] = 'postgres'
os.environ['DB_PASSWORD'] = 'postgres'
os.environ['DB_PORT'] = '5432'
django.setup()
from pathlib import Path
from memory_app.services import (
FileScanner, IgnorePattern, DiffChecker, AuditLogger,
SemanticSummaryGenerator
)
from memory_app.models import LobsterMemory, SyncHistory
def test_chunked_reading():
"""测试分块读取功能"""
print("\n" + "="*60)
print("测试 1: 分块读取功能")
print("="*60)
# 创建测试文件
test_file = Path("/tmp/test_large_file.txt")
test_content = "Hello World\n" * 10000 # ~110KB
with open(test_file, 'w', encoding='utf-8') as f:
f.write(test_content)
try:
scanner = FileScanner()
scanner.base_dir = Path("/tmp")
# 使用分块读取
content, hash_value = scanner.get_file_content("test_large_file.txt", chunked=True)
print(f"✓ 文件大小: {len(test_content)} 字节")
print(f"✓ 分块读取成功: {len(content)} 字节")
print(f"✓ 哈希值: {hash_value[:16]}...")
print(f"✓ 分块大小: {scanner.chunk_size} 字节")
finally:
test_file.unlink()
def test_lobsterignore():
"""测试 .lobsterignore 匹配"""
print("\n" + "="*60)
print("测试 2: .lobsterignore 匹配")
print("="*60)
# 创建测试目录和文件
test_dir = Path("/tmp/test_lobsterignore")
test_dir.mkdir(exist_ok=True)
# 创建 .lobsterignore 文件
ignore_file = test_dir / ".lobsterignore"
ignore_content = """
# 注释行
*.pyc
__pycache__/
node_modules/
test_*.py
re:.*\\.log$
"""
with open(ignore_file, 'w', encoding='utf-8') as f:
f.write(ignore_content)
try:
ignore = IgnorePattern(test_dir)
# 测试文件
test_cases = [
("test.py", False),
("app.pyc", True),
("__pycache__/module.pyc", True),
("node_modules/index.js", True),
("test_main.py", True),
("app.log", True),
("app.txt", False),
("test_api.py", True),
]
for filename, expected in test_cases:
file_path = test_dir / filename
result = ignore.is_ignored(file_path)
status = "" if result == expected else ""
print(f"{status} {filename}: {result} (期望: {expected})")
print(f"\n✓ 加载的规则数: {len(ignore.patterns)}")
for pattern_type, pattern, _ in ignore.patterns:
print(f" - [{pattern_type}] {pattern}")
finally:
import shutil
shutil.rmtree(test_dir, ignore_errors=True)
def test_audit_log():
"""测试审计日志"""
print("\n" + "="*60)
print("测试 3: 审计日志")
print("="*60)
# 检查数据库连接
try:
from django.db import connection
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
print("✓ 数据库连接成功")
# 创建测试记录
audit_logger = AuditLogger()
audit_logger.log_sync_action(
lobster_id="test_lobster",
file_path="test.md",
action="sync_to_db",
old_version=1,
new_version=2,
old_hash="abc123",
new_hash="def456",
file_size=1024,
lines_changed=10,
source="local",
operator="test_user",
status="success",
execution_time=0.123
)
# 查询历史
history = audit_logger.get_history(lobster_id="test_lobster", limit=1)
if history:
print(f"✓ 日志记录成功")
print(f" - 操作: {history[0]['action']}")
print(f" - 操作者: {history[0]['operator']}")
print(f" - 变动行数: {history[0]['lines_changed']}")
print(f" - 数据源: {history[0]['source']}")
else:
print("✗ 未查询到日志")
except Exception as e:
print(f"⚠ 数据库测试跳过(需要配置数据库): {e}")
def test_semantic_summary():
"""测试语义摘要"""
print("\n" + "="*60)
print("测试 4: 语义摘要")
print("="*60)
generator = SemanticSummaryGenerator()
# 测试短文本
short_text = "这是一个简短的测试文本。"
summary = generator.generate_summary(short_text)
print(f"✓ 短文本摘要: {summary}")
# 测试长文本
long_text = "\n".join([f"这是第 {i} 行的测试内容。" for i in range(100)])
summary = generator.generate_summary(long_text)
print(f"✓ 长文本摘要: {summary[:50]}...")
print(f"✓ 摘要长度: {len(summary)} 字符")
def test_conflict_detection():
"""测试冲突判定"""
print("\n" + "="*60)
print("测试 5: 冲突判定")
print("="*60)
checker = DiffChecker()
# 模拟本地文件和数据库文件
local_files = [
{'file_path': 'file1.md', 'hash': 'abc123', 'updated_at': None},
{'file_path': 'file2.md', 'hash': 'def456', 'updated_at': None},
{'file_path': 'file3.md', 'hash': 'xyz789', 'updated_at': None},
]
from datetime import datetime, timedelta
db_files = [
{'file_path': 'file1.md', 'hash': 'abc123', 'version': 1, 'updated_at': datetime.now()},
{'file_path': 'file2.md', 'hash': 'aaa111', 'version': 1, 'updated_at': datetime.now() - timedelta(hours=2)},
{'file_path': 'file4.md', 'hash': 'bbb222', 'version': 1, 'updated_at': datetime.now()},
]
# 测试严重冲突判定
db_files_hard_conflict = [
{'file_path': 'file3.md', 'hash': 'zzz999', 'version': 2, 'updated_at': datetime.now() - timedelta(minutes=30)},
]
status = checker.check_sync_status(local_files, db_files)
print(f"✓ 一致: {len(status['consistent'])}")
print(f"✓ 冲突: {len(status['conflict'])}")
print(f"✓ 仅本地: {len(status['local_only'])}")
print(f"✓ 仅数据库: {len(status['db_only'])}")
# 测试严重冲突
status_hard = checker.check_sync_status(local_files, db_files_hard_conflict)
print(f"✓ 严重冲突: {len(status_hard['hard_conflict'])}")
if status_hard['hard_conflict']:
conflict = status_hard['hard_conflict'][0]
print(f" - 文件: {conflict['file_path']}")
print(f" - 版本: {conflict['version']}")
print(f" - 状态: {conflict['status']}")
def test_lines_changed():
"""测试变动行数计算"""
print("\n" + "="*60)
print("测试 6: 变动行数计算")
print("="*60)
checker = DiffChecker()
# 测试用例
test_cases = [
(
"line1\nline2\nline3",
"line1\nline2\nline3",
0
),
(
"line1\nline2",
"line1\nline2\nline3\nline4",
2
),
(
"line1\nline2\nline3\nline4",
"line1\nline2",
-2
),
(
"line1\nline2",
"line1\nline3\nline4",
1
),
]
for old_content, new_content, expected in test_cases:
result = checker.calculate_lines_changed(old_content, new_content)
status = "" if result == expected else ""
print(f"{status} 变动行数: {result} (期望: {expected})")
def main():
"""运行所有测试"""
print("\n" + "="*60)
print("龙虾记忆同步系统 - 功能测试")
print("="*60)
try:
test_chunked_reading()
test_lobsterignore()
test_audit_log()
test_semantic_summary()
test_conflict_detection()
test_lines_changed()
print("\n" + "="*60)
print("✓ 所有测试完成!")
print("="*60)
except Exception as e:
print(f"\n✗ 测试失败: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == '__main__':
main()