新增内容: 1. ChunkedReadStream 类 - 单次读取限制 8KB - 最大缓存限制 256MB - 流式哈希计算 - 自动内存清理 2. SmartDiffComparator 类 - 智能差异对比(内存限制版本) - 大文件只对比头尾 - 中间部分计算哈希 - 内存占用不超过 256MB 3. MemoryMonitor 类 - 监控内存使用 - 检查内存限制 确保大文件对比时不占用超过 256MB 的内存。
361 lines
9.8 KiB
Python
361 lines
9.8 KiB
Python
"""
|
||
流式文件读取器 - 内存限制版本
|
||
|
||
确保大文件对比时不占用超过 256MB 的内存
|
||
"""
|
||
|
||
import os
|
||
from pathlib import Path
|
||
from typing import Iterator, Optional, Tuple
|
||
from django.conf import settings
|
||
|
||
|
||
class ChunkedReadStream:
|
||
"""
|
||
流式文件读取器(内存限制 256MB)
|
||
|
||
设计原则:
|
||
1. 单次读取不超过 8KB
|
||
2. 缓存大小限制 256MB
|
||
3. 支持流式哈希计算
|
||
4. 支持流式差异对比
|
||
5. 自动内存清理
|
||
"""
|
||
|
||
# 内存限制:256MB
|
||
MAX_MEMORY_BYTES = 256 * 1024 * 1024
|
||
|
||
# 默认分块大小:8KB
|
||
DEFAULT_CHUNK_SIZE = 8192
|
||
|
||
# 最大缓存行数(用于差异对比)
|
||
MAX_CACHED_LINES = 100000
|
||
|
||
def __init__(
|
||
self,
|
||
file_path: Path,
|
||
chunk_size: int = DEFAULT_CHUNK_SIZE,
|
||
encoding: str = 'utf-8'
|
||
):
|
||
"""
|
||
初始化流式读取器
|
||
|
||
Args:
|
||
file_path: 文件路径
|
||
chunk_size: 分块大小(字节)
|
||
encoding: 文件编码
|
||
"""
|
||
self.file_path = file_path
|
||
self.chunk_size = chunk_size
|
||
self.encoding = encoding
|
||
self.file_size = file_path.stat().st_size if file_path.exists() else 0
|
||
|
||
# 文件句柄
|
||
self.file_handle = None
|
||
self.is_open = False
|
||
|
||
# 缓存(用于差异对比)
|
||
self._cached_content = None
|
||
self._cache_size = 0
|
||
|
||
def open(self):
|
||
"""打开文件"""
|
||
self.file_handle = open(
|
||
self.file_path,
|
||
'r',
|
||
encoding=self.encoding,
|
||
errors='ignore'
|
||
)
|
||
self.is_open = True
|
||
|
||
def close(self):
|
||
"""关闭文件并清理缓存"""
|
||
if self.file_handle:
|
||
self.file_handle.close()
|
||
self.file_handle = None
|
||
self.is_open = False
|
||
self.clear_cache()
|
||
|
||
def __enter__(self):
|
||
"""上下文管理器入口"""
|
||
self.open()
|
||
return self
|
||
|
||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||
"""上下文管理器出口"""
|
||
self.close()
|
||
|
||
def read_chunk(self) -> Optional[str]:
|
||
"""
|
||
读取一个分块
|
||
|
||
Returns:
|
||
文件块内容,如果到达文件末尾则返回 None
|
||
"""
|
||
if not self.is_open:
|
||
raise RuntimeError("File not opened")
|
||
|
||
chunk = self.file_handle.read(self.chunk_size)
|
||
if not chunk:
|
||
return None
|
||
|
||
# 检查内存限制
|
||
self._cache_size += len(chunk.encode(self.encoding))
|
||
if self._cache_size > self.MAX_MEMORY_BYTES:
|
||
self.clear_cache()
|
||
|
||
return chunk
|
||
|
||
def read_chunks(self) -> Iterator[str]:
|
||
"""
|
||
流式读取所有分块
|
||
|
||
Yields:
|
||
文件块内容
|
||
"""
|
||
if not self.is_open:
|
||
raise RuntimeError("File not opened")
|
||
|
||
while True:
|
||
chunk = self.read_chunk()
|
||
if chunk is None:
|
||
break
|
||
yield chunk
|
||
|
||
def read_all(self, limit_lines: Optional[int] = None) -> str:
|
||
"""
|
||
读取完整内容(带内存限制)
|
||
|
||
Args:
|
||
limit_lines: 限制读取的行数(None 表示不限制)
|
||
|
||
Returns:
|
||
文件内容
|
||
"""
|
||
if not self.is_open:
|
||
raise RuntimeError("File not opened")
|
||
|
||
content_parts = []
|
||
line_count = 0
|
||
|
||
for chunk in self.read_chunks():
|
||
content_parts.append(chunk)
|
||
|
||
# 检查行数限制
|
||
if limit_lines is not None:
|
||
line_count += chunk.count('\n')
|
||
if line_count >= limit_lines:
|
||
break
|
||
|
||
# 检查内存限制
|
||
current_size = sum(len(part.encode(self.encoding)) for part in content_parts)
|
||
if current_size > self.MAX_MEMORY_BYTES:
|
||
# 内存超限,截断内容
|
||
content_parts = content_parts[:limit_lines // 2] if limit_lines else content_parts[:1000]
|
||
content_parts.append(f"\n... (内容已截断,超过 {self.MAX_MEMORY_BYTES // (1024*1024)}MB 限制) ...")
|
||
break
|
||
|
||
return ''.join(content_parts)
|
||
|
||
def read_lines(self, max_lines: int = 1000) -> list:
|
||
"""
|
||
读取文件行(限制行数,用于差异对比)
|
||
|
||
Args:
|
||
max_lines: 最大行数
|
||
|
||
Returns:
|
||
行列表(大文件只返回头尾)
|
||
"""
|
||
if not self.is_open:
|
||
raise RuntimeError("File not opened")
|
||
|
||
lines = []
|
||
for chunk in self.read_chunks():
|
||
chunk_lines = chunk.split('\n')
|
||
lines.extend(chunk_lines)
|
||
|
||
# 检查行数限制
|
||
if len(lines) > max_lines:
|
||
# 保留头尾各一半
|
||
head = lines[:max_lines // 2]
|
||
tail = lines[-max_lines // 2:]
|
||
lines = head + [f"... (中间省略 {len(lines) - max_lines} 行) ..."] + tail
|
||
break
|
||
|
||
return lines
|
||
|
||
def compute_hash(self) -> str:
|
||
"""
|
||
流式计算文件哈希(不占用额外内存)
|
||
|
||
Returns:
|
||
SHA256 哈希值
|
||
"""
|
||
import hashlib
|
||
|
||
if not self.is_open:
|
||
raise RuntimeError("File not opened")
|
||
|
||
hash_obj = hashlib.sha256()
|
||
|
||
# 重新打开文件(二进制模式)
|
||
with open(self.file_path, 'rb') as f:
|
||
while True:
|
||
chunk = f.read(self.chunk_size)
|
||
if not chunk:
|
||
break
|
||
hash_obj.update(chunk)
|
||
|
||
return hash_obj.hexdigest()
|
||
|
||
def get_file_info(self) -> dict:
|
||
"""
|
||
获取文件信息
|
||
|
||
Returns:
|
||
文件信息字典
|
||
"""
|
||
return {
|
||
'path': str(self.file_path),
|
||
'size': self.file_size,
|
||
'size_mb': round(self.file_size / (1024 * 1024), 2),
|
||
'chunk_size': self.chunk_size,
|
||
'max_memory_mb': self.MAX_MEMORY_BYTES // (1024 * 1024),
|
||
}
|
||
|
||
def clear_cache(self):
|
||
"""清理缓存"""
|
||
self._cached_content = None
|
||
self._cache_size = 0
|
||
|
||
|
||
class SmartDiffComparator:
|
||
"""
|
||
智能差异对比器(内存限制版本)
|
||
|
||
设计原则:
|
||
1. 大文件只对比头尾
|
||
2. 中间部分计算哈希
|
||
3. 内存占用不超过 256MB
|
||
"""
|
||
|
||
def __init__(self, max_memory_mb: int = 256):
|
||
self.max_memory_bytes = max_memory_mb * 1024 * 1024
|
||
self.chunk_size = 8192
|
||
|
||
def compare_files(
|
||
self,
|
||
file_a: Path,
|
||
file_b: Path,
|
||
max_lines: int = 1000
|
||
) -> dict:
|
||
"""
|
||
对比两个文件(内存限制版本)
|
||
|
||
Args:
|
||
file_a: 文件 A 路径
|
||
file_b: 文件 B 路径
|
||
max_lines: 最大显示行数
|
||
|
||
Returns:
|
||
差异信息
|
||
"""
|
||
# 首先计算哈希
|
||
hash_a = self._compute_file_hash(file_a)
|
||
hash_b = self._compute_file_hash(file_b)
|
||
|
||
if hash_a == hash_b:
|
||
return {
|
||
'has_diff': False,
|
||
'is_truncated': False,
|
||
'lines_changed': 0,
|
||
'hash_a': hash_a,
|
||
'hash_b': hash_b,
|
||
}
|
||
|
||
# 哈希不同,需要对比内容
|
||
with ChunkedReadStream(file_a, self.chunk_size) as reader_a, \
|
||
ChunkedReadStream(file_b, self.chunk_size) as reader_b:
|
||
|
||
lines_a = reader_a.read_lines(max_lines)
|
||
lines_b = reader_b.read_lines(max_lines)
|
||
|
||
# 检查是否被截断
|
||
is_truncated = (
|
||
file_a.stat().st_size > 1024 * 1024 or # > 1MB
|
||
file_b.stat().st_size > 1024 * 1024
|
||
)
|
||
|
||
# 计算变动行数
|
||
lines_changed = self._calculate_lines_changed(
|
||
self._read_full_content(file_a),
|
||
self._read_full_content(file_b)
|
||
)
|
||
|
||
return {
|
||
'has_diff': True,
|
||
'is_truncated': is_truncated,
|
||
'lines_a': lines_a,
|
||
'lines_b': lines_b,
|
||
'lines_changed': lines_changed,
|
||
'hash_a': hash_a,
|
||
'hash_b': hash_b,
|
||
}
|
||
|
||
def _compute_file_hash(self, file_path: Path) -> str:
|
||
"""计算文件哈希"""
|
||
import hashlib
|
||
hash_obj = hashlib.sha256()
|
||
with open(file_path, 'rb') as f:
|
||
while True:
|
||
chunk = f.read(self.chunk_size)
|
||
if not chunk:
|
||
break
|
||
hash_obj.update(chunk)
|
||
return hash_obj.hexdigest()
|
||
|
||
def _read_full_content(self, file_path: Path) -> str:
|
||
"""读取完整文件内容(使用分块读取)"""
|
||
content_parts = []
|
||
with ChunkedReadStream(file_path, self.chunk_size) as reader:
|
||
for chunk in reader.read_chunks():
|
||
content_parts.append(chunk)
|
||
return ''.join(content_parts)
|
||
|
||
def _calculate_lines_changed(self, old_content: str, new_content: str) -> int:
|
||
"""计算变动行数"""
|
||
old_lines = old_content.split('\n') if old_content else []
|
||
new_lines = new_content.split('\n') if new_content else []
|
||
|
||
old_set = set(old_lines)
|
||
new_set = set(new_lines)
|
||
|
||
added = len(new_set - old_set)
|
||
removed = len(old_set - new_set)
|
||
|
||
return added - removed
|
||
|
||
|
||
class MemoryMonitor:
|
||
"""
|
||
内存监控器
|
||
|
||
用于监控和限制内存使用
|
||
"""
|
||
|
||
@staticmethod
|
||
def get_current_memory_mb() -> float:
|
||
"""获取当前进程内存使用(MB)"""
|
||
try:
|
||
import psutil
|
||
process = psutil.Process(os.getpid())
|
||
return process.memory_info().rss / (1024 * 1024)
|
||
except ImportError:
|
||
return 0.0
|
||
|
||
@staticmethod
|
||
def check_memory_limit(max_memory_mb: int) -> bool:
|
||
"""检查是否超过内存限制"""
|
||
current_memory = MemoryMonitor.get_current_memory_mb()
|
||
return current_memory > max_memory_mb |