376 lines
12 KiB
Python
376 lines
12 KiB
Python
|
|
#!/usr/bin/env python3
|
|||
|
|
"""
|
|||
|
|
龙虾记忆同步系统 - 简化功能测试(不依赖 Django)
|
|||
|
|
|
|||
|
|
测试内容:
|
|||
|
|
1. .lobsterignore 匹配
|
|||
|
|
2. 分块读取模拟
|
|||
|
|
3. 冲突判定逻辑
|
|||
|
|
4. 变动行数计算
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import os
|
|||
|
|
import re
|
|||
|
|
from pathlib import Path
|
|||
|
|
from typing import List, Tuple, Iterator
|
|||
|
|
|
|||
|
|
|
|||
|
|
def test_lobsterignore():
|
|||
|
|
"""测试 .lobsterignore 匹配"""
|
|||
|
|
print("\n" + "="*60)
|
|||
|
|
print("测试 1: .lobsterignore 匹配")
|
|||
|
|
print("="*60)
|
|||
|
|
|
|||
|
|
# 创建测试目录和文件
|
|||
|
|
test_dir = Path("/tmp/test_lobsterignore")
|
|||
|
|
test_dir.mkdir(exist_ok=True)
|
|||
|
|
|
|||
|
|
# 创建 .lobsterignore 文件
|
|||
|
|
ignore_file = test_dir / ".lobsterignore"
|
|||
|
|
ignore_content = """
|
|||
|
|
# 注释行
|
|||
|
|
*.pyc
|
|||
|
|
__pycache__/
|
|||
|
|
node_modules/
|
|||
|
|
test_*.py
|
|||
|
|
re:.*\\.log$
|
|||
|
|
"""
|
|||
|
|
with open(ignore_file, 'w', encoding='utf-8') as f:
|
|||
|
|
f.write(ignore_content)
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
patterns = []
|
|||
|
|
|
|||
|
|
# 解析 .lobsterignore 文件
|
|||
|
|
with open(ignore_file, 'r', encoding='utf-8') as f:
|
|||
|
|
for line in f:
|
|||
|
|
line = line.strip()
|
|||
|
|
if not line or line.startswith('#'):
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
if line.startswith('re:'):
|
|||
|
|
pattern = line[3:]
|
|||
|
|
try:
|
|||
|
|
regex = re.compile(pattern)
|
|||
|
|
patterns.append(('regex', pattern, regex))
|
|||
|
|
except re.error as e:
|
|||
|
|
print(f"Invalid regex pattern '{pattern}': {e}")
|
|||
|
|
else:
|
|||
|
|
patterns.append(('glob', line, None))
|
|||
|
|
|
|||
|
|
# 添加默认忽略规则
|
|||
|
|
default_patterns = [
|
|||
|
|
'.DS_Store', '.git', '.gitignore', '__pycache__',
|
|||
|
|
'node_modules', '*.pyc', '*.pyo', '*.log',
|
|||
|
|
'*.tmp', '*.temp', '*.bak', '.vscode', '.idea'
|
|||
|
|
]
|
|||
|
|
for pattern in default_patterns:
|
|||
|
|
if not any(p[1] == pattern for p in patterns):
|
|||
|
|
patterns.append(('glob', pattern, None))
|
|||
|
|
|
|||
|
|
print(f"✓ 加载的规则数: {len(patterns)}")
|
|||
|
|
for pattern_type, pattern, _ in patterns:
|
|||
|
|
print(f" - [{pattern_type}] {pattern}")
|
|||
|
|
|
|||
|
|
# 测试文件
|
|||
|
|
test_cases = [
|
|||
|
|
("test.py", False),
|
|||
|
|
("app.pyc", True),
|
|||
|
|
("__pycache__/module.pyc", True),
|
|||
|
|
("node_modules/index.js", True),
|
|||
|
|
("test_main.py", True),
|
|||
|
|
("app.log", True),
|
|||
|
|
("app.txt", False),
|
|||
|
|
("test_api.py", True),
|
|||
|
|
(".git/config", True),
|
|||
|
|
("README.md", False),
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
print("\n测试结果:")
|
|||
|
|
all_passed = True
|
|||
|
|
for filename, expected in test_cases:
|
|||
|
|
file_path = test_dir / filename
|
|||
|
|
result = False
|
|||
|
|
|
|||
|
|
for pattern_type, pattern, regex in patterns:
|
|||
|
|
if pattern_type == 'regex':
|
|||
|
|
if regex.search(filename):
|
|||
|
|
result = True
|
|||
|
|
break
|
|||
|
|
else:
|
|||
|
|
from fnmatch import fnmatch
|
|||
|
|
if fnmatch(filename, pattern):
|
|||
|
|
result = True
|
|||
|
|
break
|
|||
|
|
|
|||
|
|
status = "✓" if result == expected else "✗"
|
|||
|
|
if result != expected:
|
|||
|
|
all_passed = False
|
|||
|
|
print(f" {status} {filename}: {result} (期望: {expected})")
|
|||
|
|
|
|||
|
|
if all_passed:
|
|||
|
|
print("\n✓ 所有 .lobsterignore 测试通过")
|
|||
|
|
else:
|
|||
|
|
print("\n✗ 部分测试失败")
|
|||
|
|
|
|||
|
|
finally:
|
|||
|
|
import shutil
|
|||
|
|
shutil.rmtree(test_dir, ignore_errors=True)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def test_chunked_reading():
|
|||
|
|
"""测试分块读取功能"""
|
|||
|
|
print("\n" + "="*60)
|
|||
|
|
print("测试 2: 分块读取模拟")
|
|||
|
|
print("="*60)
|
|||
|
|
|
|||
|
|
# 创建测试文件
|
|||
|
|
test_file = Path("/tmp/test_large_file.txt")
|
|||
|
|
chunk_size = 8192 # 8KB
|
|||
|
|
|
|||
|
|
# 生成大文件(约 100KB)
|
|||
|
|
test_content = "Hello World\n" * 10000
|
|||
|
|
|
|||
|
|
with open(test_file, 'w', encoding='utf-8') as f:
|
|||
|
|
f.write(test_content)
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 模拟分块读取
|
|||
|
|
content_parts = []
|
|||
|
|
chunk_count = 0
|
|||
|
|
|
|||
|
|
with open(test_file, 'r', encoding='utf-8') as f:
|
|||
|
|
while True:
|
|||
|
|
chunk = f.read(chunk_size)
|
|||
|
|
if not chunk:
|
|||
|
|
break
|
|||
|
|
content_parts.append(chunk)
|
|||
|
|
chunk_count += 1
|
|||
|
|
|
|||
|
|
result_content = ''.join(content_parts)
|
|||
|
|
|
|||
|
|
print(f"✓ 原始文件大小: {len(test_content)} 字节")
|
|||
|
|
print(f"✓ 分块读取大小: {len(result_content)} 字节")
|
|||
|
|
print(f"✓ 读取块数: {chunk_count}")
|
|||
|
|
print(f"✓ 分块大小: {chunk_size} 字节")
|
|||
|
|
print(f"✓ 内容一致: {test_content == result_content}")
|
|||
|
|
|
|||
|
|
# 计算哈希(流式)
|
|||
|
|
import hashlib
|
|||
|
|
hash_obj = hashlib.sha256()
|
|||
|
|
with open(test_file, 'rb') as f:
|
|||
|
|
while True:
|
|||
|
|
chunk = f.read(chunk_size)
|
|||
|
|
if not chunk:
|
|||
|
|
break
|
|||
|
|
hash_obj.update(chunk)
|
|||
|
|
|
|||
|
|
hash_value = hash_obj.hexdigest()
|
|||
|
|
print(f"✓ 流式哈希: {hash_value[:16]}...")
|
|||
|
|
|
|||
|
|
finally:
|
|||
|
|
test_file.unlink()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def test_lines_changed():
|
|||
|
|
"""测试变动行数计算"""
|
|||
|
|
print("\n" + "="*60)
|
|||
|
|
print("测试 3: 变动行数计算")
|
|||
|
|
print("="*60)
|
|||
|
|
|
|||
|
|
def calculate_lines_changed(old_content: str, new_content: str) -> int:
|
|||
|
|
old_lines = set(old_content.split('\n'))
|
|||
|
|
new_lines = set(new_content.split('\n'))
|
|||
|
|
added = len(new_lines - old_lines)
|
|||
|
|
removed = len(old_lines - new_lines)
|
|||
|
|
return added - removed
|
|||
|
|
|
|||
|
|
# 测试用例
|
|||
|
|
test_cases = [
|
|||
|
|
("line1\nline2\nline3", "line1\nline2\nline3", 0, "无变化"),
|
|||
|
|
("line1\nline2", "line1\nline2\nline3\nline4", 2, "新增 2 行"),
|
|||
|
|
("line1\nline2\nline3\nline4", "line1\nline2", -2, "删除 2 行"),
|
|||
|
|
("line1\nline2", "line1\nline3\nline4", 1, "替换 + 新增"),
|
|||
|
|
("", "line1\nline2", 2, "空文件 -> 有内容"),
|
|||
|
|
("line1\nline2", "", -2, "有内容 -> 空文件"),
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
print("\n测试结果:")
|
|||
|
|
all_passed = True
|
|||
|
|
for old_content, new_content, expected, desc in test_cases:
|
|||
|
|
result = calculate_lines_changed(old_content, new_content)
|
|||
|
|
status = "✓" if result == expected else "✗"
|
|||
|
|
if result != expected:
|
|||
|
|
all_passed = False
|
|||
|
|
print(f" {status} {desc}: {result} (期望: {expected})")
|
|||
|
|
|
|||
|
|
if all_passed:
|
|||
|
|
print("\n✓ 所有变动行数测试通过")
|
|||
|
|
else:
|
|||
|
|
print("\n✗ 部分测试失败")
|
|||
|
|
|
|||
|
|
|
|||
|
|
def test_conflict_detection():
|
|||
|
|
"""测试冲突判定逻辑"""
|
|||
|
|
print("\n" + "="*60)
|
|||
|
|
print("测试 4: 冲突判定逻辑")
|
|||
|
|
print("="*60)
|
|||
|
|
|
|||
|
|
from datetime import datetime, timedelta
|
|||
|
|
|
|||
|
|
def check_sync_status(local_files: List[dict], db_files: List[dict]) -> dict:
|
|||
|
|
local_map = {f['file_path']: f for f in local_files}
|
|||
|
|
db_map = {f['file_path']: f for f in db_files}
|
|||
|
|
|
|||
|
|
results = {
|
|||
|
|
'consistent': [],
|
|||
|
|
'conflict': [],
|
|||
|
|
'hard_conflict': [],
|
|||
|
|
'local_only': [],
|
|||
|
|
'db_only': [],
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
all_paths = set(local_map.keys()) | set(db_map.keys())
|
|||
|
|
|
|||
|
|
for path in all_paths:
|
|||
|
|
local = local_map.get(path)
|
|||
|
|
db = db_map.get(path)
|
|||
|
|
|
|||
|
|
if local and db:
|
|||
|
|
if local['hash'] == db['hash']:
|
|||
|
|
results['consistent'].append({
|
|||
|
|
'file_path': path,
|
|||
|
|
'status': 'consistent'
|
|||
|
|
})
|
|||
|
|
else:
|
|||
|
|
# 判定严重冲突
|
|||
|
|
updated_at = db.get('updated_at')
|
|||
|
|
version = db.get('version', 0)
|
|||
|
|
|
|||
|
|
if version > 1 and updated_at:
|
|||
|
|
time_diff = datetime.now() - updated_at
|
|||
|
|
if time_diff < timedelta(hours=1):
|
|||
|
|
results['hard_conflict'].append({
|
|||
|
|
'file_path': path,
|
|||
|
|
'status': 'hard_conflict',
|
|||
|
|
'version': version
|
|||
|
|
})
|
|||
|
|
else:
|
|||
|
|
results['conflict'].append({
|
|||
|
|
'file_path': path,
|
|||
|
|
'status': 'conflict',
|
|||
|
|
'version': version
|
|||
|
|
})
|
|||
|
|
else:
|
|||
|
|
results['conflict'].append({
|
|||
|
|
'file_path': path,
|
|||
|
|
'status': 'conflict',
|
|||
|
|
'version': version
|
|||
|
|
})
|
|||
|
|
|
|||
|
|
elif local and not db:
|
|||
|
|
results['local_only'].append({
|
|||
|
|
'file_path': path,
|
|||
|
|
'status': 'local_only'
|
|||
|
|
})
|
|||
|
|
|
|||
|
|
elif not local and db:
|
|||
|
|
results['db_only'].append({
|
|||
|
|
'file_path': path,
|
|||
|
|
'status': 'db_only'
|
|||
|
|
})
|
|||
|
|
|
|||
|
|
return results
|
|||
|
|
|
|||
|
|
# 测试用例
|
|||
|
|
now = datetime.now()
|
|||
|
|
|
|||
|
|
test_cases = [
|
|||
|
|
(
|
|||
|
|
"一致",
|
|||
|
|
[{'file_path': 'file1.md', 'hash': 'abc123'}],
|
|||
|
|
[{'file_path': 'file1.md', 'hash': 'abc123', 'version': 1, 'updated_at': now}],
|
|||
|
|
{'consistent': 1, 'conflict': 0, 'hard_conflict': 0, 'local_only': 0, 'db_only': 0}
|
|||
|
|
),
|
|||
|
|
(
|
|||
|
|
"普通冲突",
|
|||
|
|
[{'file_path': 'file2.md', 'hash': 'def456'}],
|
|||
|
|
[{'file_path': 'file2.md', 'hash': 'aaa111', 'version': 1, 'updated_at': now - timedelta(hours=2)}],
|
|||
|
|
{'consistent': 0, 'conflict': 1, 'hard_conflict': 0, 'local_only': 0, 'db_only': 0}
|
|||
|
|
),
|
|||
|
|
(
|
|||
|
|
"严重冲突",
|
|||
|
|
[{'file_path': 'file3.md', 'hash': 'xyz789'}],
|
|||
|
|
[{'file_path': 'file3.md', 'hash': 'zzz999', 'version': 2, 'updated_at': now - timedelta(minutes=30)}],
|
|||
|
|
{'consistent': 0, 'conflict': 0, 'hard_conflict': 1, 'local_only': 0, 'db_only': 0}
|
|||
|
|
),
|
|||
|
|
(
|
|||
|
|
"仅本地",
|
|||
|
|
[{'file_path': 'file4.md', 'hash': 'test123'}],
|
|||
|
|
[],
|
|||
|
|
{'consistent': 0, 'conflict': 0, 'hard_conflict': 0, 'local_only': 1, 'db_only': 0}
|
|||
|
|
),
|
|||
|
|
(
|
|||
|
|
"仅数据库",
|
|||
|
|
[],
|
|||
|
|
[{'file_path': 'file5.md', 'hash': 'db123', 'version': 1, 'updated_at': now}],
|
|||
|
|
{'consistent': 0, 'conflict': 0, 'hard_conflict': 0, 'local_only': 0, 'db_only': 1}
|
|||
|
|
),
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
print("\n测试结果:")
|
|||
|
|
all_passed = True
|
|||
|
|
for desc, local_files, db_files, expected in test_cases:
|
|||
|
|
result = check_sync_status(local_files, db_files)
|
|||
|
|
result_counts = {
|
|||
|
|
'consistent': len(result['consistent']),
|
|||
|
|
'conflict': len(result['conflict']),
|
|||
|
|
'hard_conflict': len(result['hard_conflict']),
|
|||
|
|
'local_only': len(result['local_only']),
|
|||
|
|
'db_only': len(result['db_only']),
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
status = "✓" if result_counts == expected else "✗"
|
|||
|
|
if result_counts != expected:
|
|||
|
|
all_passed = False
|
|||
|
|
print(f" {status} {desc}")
|
|||
|
|
print(f" 结果: {result_counts}")
|
|||
|
|
print(f" 期望: {expected}")
|
|||
|
|
|
|||
|
|
if all_passed:
|
|||
|
|
print("\n✓ 所有冲突判定测试通过")
|
|||
|
|
else:
|
|||
|
|
print("\n✗ 部分测试失败")
|
|||
|
|
|
|||
|
|
|
|||
|
|
def main():
|
|||
|
|
"""运行所有测试"""
|
|||
|
|
print("\n" + "="*60)
|
|||
|
|
print("龙虾记忆同步系统 - 简化功能测试")
|
|||
|
|
print("="*60)
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
test_lobsterignore()
|
|||
|
|
test_chunked_reading()
|
|||
|
|
test_lines_changed()
|
|||
|
|
test_conflict_detection()
|
|||
|
|
|
|||
|
|
print("\n" + "="*60)
|
|||
|
|
print("✓ 所有测试完成!")
|
|||
|
|
print("="*60)
|
|||
|
|
print("\n已验证的功能:")
|
|||
|
|
print(" 1. ✓ .lobsterignore 匹配(含正则表达式)")
|
|||
|
|
print(" 2. ✓ 分块读取(8KB 分块)")
|
|||
|
|
print(" 3. ✓ 变动行数计算")
|
|||
|
|
print(" 4. ✓ 冲突判定(包含 HARD_CONFLICT)")
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"\n✗ 测试失败: {e}")
|
|||
|
|
import traceback
|
|||
|
|
traceback.print_exc()
|
|||
|
|
import sys
|
|||
|
|
sys.exit(1)
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__ == '__main__':
|
|||
|
|
main()
|