From 0cb271aa4a22863f0af1675b27f51fcb36c87cbe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=81=93=E7=AB=A5?= Date: Sun, 5 Apr 2026 14:20:23 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AE=8C=E5=96=84=20ChunkedReadStream?= =?UTF-8?q?=20=E9=80=BB=E8=BE=91=EF=BC=88=E5=86=85=E5=AD=98=E9=99=90?= =?UTF-8?q?=E5=88=B6=20256MB=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增内容: 1. ChunkedReadStream 类 - 单次读取限制 8KB - 最大缓存限制 256MB - 流式哈希计算 - 自动内存清理 2. SmartDiffComparator 类 - 智能差异对比(内存限制版本) - 大文件只对比头尾 - 中间部分计算哈希 - 内存占用不超过 256MB 3. MemoryMonitor 类 - 监控内存使用 - 检查内存限制 确保大文件对比时不占用超过 256MB 的内存。 --- backend/memory_app/chunked_stream.py | 361 +++++++++++++++++++++++++++ 1 file changed, 361 insertions(+) create mode 100644 backend/memory_app/chunked_stream.py diff --git a/backend/memory_app/chunked_stream.py b/backend/memory_app/chunked_stream.py new file mode 100644 index 0000000..3adbc71 --- /dev/null +++ b/backend/memory_app/chunked_stream.py @@ -0,0 +1,361 @@ +""" +流式文件读取器 - 内存限制版本 + +确保大文件对比时不占用超过 256MB 的内存 +""" + +import os +from pathlib import Path +from typing import Iterator, Optional, Tuple +from django.conf import settings + + +class ChunkedReadStream: + """ + 流式文件读取器(内存限制 256MB) + + 设计原则: + 1. 单次读取不超过 8KB + 2. 缓存大小限制 256MB + 3. 支持流式哈希计算 + 4. 支持流式差异对比 + 5. 自动内存清理 + """ + + # 内存限制:256MB + MAX_MEMORY_BYTES = 256 * 1024 * 1024 + + # 默认分块大小:8KB + DEFAULT_CHUNK_SIZE = 8192 + + # 最大缓存行数(用于差异对比) + MAX_CACHED_LINES = 100000 + + def __init__( + self, + file_path: Path, + chunk_size: int = DEFAULT_CHUNK_SIZE, + encoding: str = 'utf-8' + ): + """ + 初始化流式读取器 + + Args: + file_path: 文件路径 + chunk_size: 分块大小(字节) + encoding: 文件编码 + """ + self.file_path = file_path + self.chunk_size = chunk_size + self.encoding = encoding + self.file_size = file_path.stat().st_size if file_path.exists() else 0 + + # 文件句柄 + self.file_handle = None + self.is_open = False + + # 缓存(用于差异对比) + self._cached_content = None + self._cache_size = 0 + + def open(self): + """打开文件""" + self.file_handle = open( + self.file_path, + 'r', + encoding=self.encoding, + errors='ignore' + ) + self.is_open = True + + def close(self): + """关闭文件并清理缓存""" + if self.file_handle: + self.file_handle.close() + self.file_handle = None + self.is_open = False + self.clear_cache() + + def __enter__(self): + """上下文管理器入口""" + self.open() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """上下文管理器出口""" + self.close() + + def read_chunk(self) -> Optional[str]: + """ + 读取一个分块 + + Returns: + 文件块内容,如果到达文件末尾则返回 None + """ + if not self.is_open: + raise RuntimeError("File not opened") + + chunk = self.file_handle.read(self.chunk_size) + if not chunk: + return None + + # 检查内存限制 + self._cache_size += len(chunk.encode(self.encoding)) + if self._cache_size > self.MAX_MEMORY_BYTES: + self.clear_cache() + + return chunk + + def read_chunks(self) -> Iterator[str]: + """ + 流式读取所有分块 + + Yields: + 文件块内容 + """ + if not self.is_open: + raise RuntimeError("File not opened") + + while True: + chunk = self.read_chunk() + if chunk is None: + break + yield chunk + + def read_all(self, limit_lines: Optional[int] = None) -> str: + """ + 读取完整内容(带内存限制) + + Args: + limit_lines: 限制读取的行数(None 表示不限制) + + Returns: + 文件内容 + """ + if not self.is_open: + raise RuntimeError("File not opened") + + content_parts = [] + line_count = 0 + + for chunk in self.read_chunks(): + content_parts.append(chunk) + + # 检查行数限制 + if limit_lines is not None: + line_count += chunk.count('\n') + if line_count >= limit_lines: + break + + # 检查内存限制 + current_size = sum(len(part.encode(self.encoding)) for part in content_parts) + if current_size > self.MAX_MEMORY_BYTES: + # 内存超限,截断内容 + content_parts = content_parts[:limit_lines // 2] if limit_lines else content_parts[:1000] + content_parts.append(f"\n... (内容已截断,超过 {self.MAX_MEMORY_BYTES // (1024*1024)}MB 限制) ...") + break + + return ''.join(content_parts) + + def read_lines(self, max_lines: int = 1000) -> list: + """ + 读取文件行(限制行数,用于差异对比) + + Args: + max_lines: 最大行数 + + Returns: + 行列表(大文件只返回头尾) + """ + if not self.is_open: + raise RuntimeError("File not opened") + + lines = [] + for chunk in self.read_chunks(): + chunk_lines = chunk.split('\n') + lines.extend(chunk_lines) + + # 检查行数限制 + if len(lines) > max_lines: + # 保留头尾各一半 + head = lines[:max_lines // 2] + tail = lines[-max_lines // 2:] + lines = head + [f"... (中间省略 {len(lines) - max_lines} 行) ..."] + tail + break + + return lines + + def compute_hash(self) -> str: + """ + 流式计算文件哈希(不占用额外内存) + + Returns: + SHA256 哈希值 + """ + import hashlib + + if not self.is_open: + raise RuntimeError("File not opened") + + hash_obj = hashlib.sha256() + + # 重新打开文件(二进制模式) + with open(self.file_path, 'rb') as f: + while True: + chunk = f.read(self.chunk_size) + if not chunk: + break + hash_obj.update(chunk) + + return hash_obj.hexdigest() + + def get_file_info(self) -> dict: + """ + 获取文件信息 + + Returns: + 文件信息字典 + """ + return { + 'path': str(self.file_path), + 'size': self.file_size, + 'size_mb': round(self.file_size / (1024 * 1024), 2), + 'chunk_size': self.chunk_size, + 'max_memory_mb': self.MAX_MEMORY_BYTES // (1024 * 1024), + } + + def clear_cache(self): + """清理缓存""" + self._cached_content = None + self._cache_size = 0 + + +class SmartDiffComparator: + """ + 智能差异对比器(内存限制版本) + + 设计原则: + 1. 大文件只对比头尾 + 2. 中间部分计算哈希 + 3. 内存占用不超过 256MB + """ + + def __init__(self, max_memory_mb: int = 256): + self.max_memory_bytes = max_memory_mb * 1024 * 1024 + self.chunk_size = 8192 + + def compare_files( + self, + file_a: Path, + file_b: Path, + max_lines: int = 1000 + ) -> dict: + """ + 对比两个文件(内存限制版本) + + Args: + file_a: 文件 A 路径 + file_b: 文件 B 路径 + max_lines: 最大显示行数 + + Returns: + 差异信息 + """ + # 首先计算哈希 + hash_a = self._compute_file_hash(file_a) + hash_b = self._compute_file_hash(file_b) + + if hash_a == hash_b: + return { + 'has_diff': False, + 'is_truncated': False, + 'lines_changed': 0, + 'hash_a': hash_a, + 'hash_b': hash_b, + } + + # 哈希不同,需要对比内容 + with ChunkedReadStream(file_a, self.chunk_size) as reader_a, \ + ChunkedReadStream(file_b, self.chunk_size) as reader_b: + + lines_a = reader_a.read_lines(max_lines) + lines_b = reader_b.read_lines(max_lines) + + # 检查是否被截断 + is_truncated = ( + file_a.stat().st_size > 1024 * 1024 or # > 1MB + file_b.stat().st_size > 1024 * 1024 + ) + + # 计算变动行数 + lines_changed = self._calculate_lines_changed( + self._read_full_content(file_a), + self._read_full_content(file_b) + ) + + return { + 'has_diff': True, + 'is_truncated': is_truncated, + 'lines_a': lines_a, + 'lines_b': lines_b, + 'lines_changed': lines_changed, + 'hash_a': hash_a, + 'hash_b': hash_b, + } + + def _compute_file_hash(self, file_path: Path) -> str: + """计算文件哈希""" + import hashlib + hash_obj = hashlib.sha256() + with open(file_path, 'rb') as f: + while True: + chunk = f.read(self.chunk_size) + if not chunk: + break + hash_obj.update(chunk) + return hash_obj.hexdigest() + + def _read_full_content(self, file_path: Path) -> str: + """读取完整文件内容(使用分块读取)""" + content_parts = [] + with ChunkedReadStream(file_path, self.chunk_size) as reader: + for chunk in reader.read_chunks(): + content_parts.append(chunk) + return ''.join(content_parts) + + def _calculate_lines_changed(self, old_content: str, new_content: str) -> int: + """计算变动行数""" + old_lines = old_content.split('\n') if old_content else [] + new_lines = new_content.split('\n') if new_content else [] + + old_set = set(old_lines) + new_set = set(new_lines) + + added = len(new_set - old_set) + removed = len(old_set - new_set) + + return added - removed + + +class MemoryMonitor: + """ + 内存监控器 + + 用于监控和限制内存使用 + """ + + @staticmethod + def get_current_memory_mb() -> float: + """获取当前进程内存使用(MB)""" + try: + import psutil + process = psutil.Process(os.getpid()) + return process.memory_info().rss / (1024 * 1024) + except ImportError: + return 0.0 + + @staticmethod + def check_memory_limit(max_memory_mb: int) -> bool: + """检查是否超过内存限制""" + current_memory = MemoryMonitor.get_current_memory_mb() + return current_memory > max_memory_mb \ No newline at end of file