# 漏洞总结 ## 漏洞概述 - **漏洞名称**: 通过FAISS向量存储中的pickle实现任意代码执行 - **漏洞描述**: 在FAISS向量存储中,使用pickle加载数据时存在任意代码执行的风险。攻击者可以通过构造恶意的pickle文件,在加载数据时执行任意代码。 ## 影响范围 - **受影响组件**: `mem0/vector_stores/faiss.py` - **受影响版本**: 所有未修复的版本 ## 修复方案 - **修复措施**: - 引入`SafeUnpickler`类,限制pickle反序列化时只允许安全的内置类型。 - 修改`_load`方法,使用`SafeUnpickler`来加载pickle文件。 - 添加`_validate_docstore_structure`方法,验证加载的数据结构是否符合预期。 - 修改`_save`方法,优先使用JSON格式保存数据,避免使用pickle。 ### 修复代码 ```python class SafeUnpickler(pickle.Unpickler): """ Restricted unpickler that only allows safe built-in types. This prevents arbitrary code execution via pickle deserialization by only allowing a whitelist of safe types (dict, list, str, int, float, bool, tuple, None). """ # Only allow builtins module SAFE_MODULES = frozenset({"builtins", "_builtins_"}) # Only allow safe basic types SAFE_NAMES = frozenset({"dict", "list", "str", "int", "float", "bool", "tuple", "set", "frozenset", "NoneType"}) def find_class(self, module, name): """Override find_class to only allow safe types.""" if module in self.SAFE_MODULES and name in self.SAFE_NAMES: import builtins if hasattr(builtins, name): return getattr(builtins, name) # NoneType special case if name == "NoneType": return type(None) raise pickle.UnpicklingError( f"Unsafe pickle: attempted to load ({module}.{name}). " f"Only basic Python types are allowed for security reasons." ) def _safe_pickle_load(file_path: str) -> Any: """ Safely load a pickle file using restricted unpickler. Args: file_path: Path to the pickle file. Returns: The deserialized object (only basic Python types allowed). Raises: pickle.UnpicklingError: If the pickle contains unsafe types. """ with open(file_path, "rb") as f: return SafeUnpickler(f).load() def _validate_docstore_structure(data: Any) -> tuple: """ Validate that loaded data has the expected structure. Args: data: The loaded data to validate. Returns: Tuple of (docstore, index_to_id) if valid. Raises: ValueError: If the data structure is invalid. """ if not isinstance(data, tuple) or len(data) != 2: raise ValueError("Invalid docstore format: expected tuple of (docstore, index_to_id)") docstore, index_to_id = data if not isinstance(docstore, dict): raise ValueError("Invalid docstore format: docstore must be a dict") if not isinstance(index_to_id, dict): raise ValueError("Invalid docstore format: index_to_id must be a dict") # Validate docstore entries for key, value in docstore.items(): if not isinstance(key, str): raise ValueError(f"Invalid docstore key type: {type(key)}, expected str") if not isinstance(value, dict): raise ValueError(f"Invalid docstore value type: {type(value)}, expected dict") # Validate index_to_id entries for key, value in index_to_id.items(): if not isinstance(key, int): raise ValueError(f"Invalid index_to_id key type: {type(key)}, expected int") if not isinstance(value, str): raise ValueError(f"Invalid index_to_id value type: {type(value)}, expected str") return docstore, index_to_id ``` ### 修复后的`_load`方法 ```python def _load(self, index_path: str, docstore_path: str): """ Load FAISS index and docstore from disk. Supports both JSON (preferred) and legacy pickle formats. Pickle files are loaded using a restricted unpickler that only allows basic Python types to prevent arbitrary code execution (CVE mitigation). """ try: self.index = faiss.read_index(index_path) # Determine docstore format - prefer JSON over pickle json_docstore_path = docstore_path.replace(".pkl", ".json") if os.path.exists(json_docstore_path): # Load from JSON (safe, preferred format) with open(json_docstore_path, "r", encoding="utf-8") as f: data = json.load(f) # JSON keys are always strings, convert back to int self.index_to_id = {int(k): v for k, v in data.get("index_to_id", {}).items()} logger.info(f"Loaded FAISS index from {index_path} with {self.index.ntotal} vectors (JSON format)") elif os.path.exists(docstore_path): # Load from legacy pickle using safe unpickler # This prevents arbitrary code execution from malicious pickle files logger.warning( f"Loading legacy pickle docstore from {docstore_path}. " f"Consider migrating to JSON format for better security." ) data = _safe_pickle_load(docstore_path) self.docstore, self.index_to_id = _validate_docstore_structure(data) logger.info(f"Loaded FAISS index from {index_path} with {self.index.ntotal} vectors (pickle format)") else: raise FileNotFoundError(f"No docstore found at {docstore_path} or {json_docstore_path}") except pickle.UnpicklingError as e: logger.error(f"Security error loading FAISS docstore: {e}") raise ValueError(f"Failed to load FAISS docstore: potentially malicious pickle file. ({e})") from e except Exception as e: logger.warning(f"Failed to load FAISS index: {e}") self.docstore = {} self.index_to_id = {} ``` ### 修复后的`_save`方法 ```python def _save(self): """ Save FAISS index and docstore to disk using JSON format (secure). """ if not self.path or not self.index: return try: os.makedirs(self.path, exist_ok=True) index_path = f"{self.path}/{self.collection_name}