# Vulnerability Summary ## Vulnerability Overview - **Vulnerability Name**: Arbitrary Code Execution via Pickle in FAISS Vector Store - **Vulnerability Description**: There is a risk of arbitrary code execution when loading data using pickle in the FAISS vector store. Attackers can execute arbitrary code during data loading by constructing malicious pickle files. ## Scope of Impact - **Affected Component**: `mem0/vector_stores/faiss.py` - **Affected Versions**: All unpatched versions ## Remediation Plan - **Remediation Measures**: - Introduce the `SafeUnpickler` class to restrict pickle deserialization to only allow safe built-in types. - Modify the `_load` method to use `SafeUnpickler` for loading pickle files. - Add the `_validate_docstore_structure` method to verify that the loaded data structure matches expectations. - Modify the `_save` method to prioritize saving data in JSON format, avoiding the use of pickle. ### Patched Code ```python class SafeUnpickler(pickle.Unpickler): """ Restricted unpickler that only allows safe built-in types. This prevents arbitrary code execution via pickle deserialization by only allowing a whitelist of safe types (dict, list, str, int, float, bool, tuple, None). """ # Only allow builtins module SAFE_MODULES = frozenset({"builtins", "_builtins_"}) # Only allow safe basic types SAFE_NAMES = frozenset({"dict", "list", "str", "int", "float", "bool", "tuple", "set", "frozenset", "NoneType"}) def find_class(self, module, name): """Override find_class to only allow safe types.""" if module in self.SAFE_MODULES and name in self.SAFE_NAMES: import builtins if hasattr(builtins, name): return getattr(builtins, name) # NoneType special case if name == "NoneType": return type(None) raise pickle.UnpicklingError( f"Unsafe pickle: attempted to load ({module}.{name}). " f"Only basic Python types are allowed for security reasons." ) def _safe_pickle_load(file_path: str) -> Any: """ Safely load a pickle file using restricted unpickler. Args: file_path: Path to the pickle file. Returns: The deserialized object (only basic Python types allowed). Raises: pickle.UnpicklingError: If the pickle contains unsafe types. """ with open(file_path, "rb") as f: return SafeUnpickler(f).load() def _validate_docstore_structure(data: Any) -> tuple: """ Validate that loaded data has the expected structure. Args: data: The loaded data to validate. Returns: Tuple of (docstore, index_to_id) if valid. Raises: ValueError: If the data structure is invalid. """ if not isinstance(data, tuple) or len(data) != 2: raise ValueError("Invalid docstore format: expected tuple of (docstore, index_to_id)") docstore, index_to_id = data if not isinstance(docstore, dict): raise ValueError("Invalid docstore format: docstore must be a dict") if not isinstance(index_to_id, dict): raise ValueError("Invalid docstore format: index_to_id must be a dict") # Validate docstore entries for key, value in docstore.items(): if not isinstance(key, str): raise ValueError(f"Invalid docstore key type: {type(key)}, expected str") if not isinstance(value, dict): raise ValueError(f"Invalid docstore value type: {type(value)}, expected dict") # Validate index_to_id entries for key, value in index_to_id.items(): if not isinstance(key, int): raise ValueError(f"Invalid index_to_id key type: {type(key)}, expected int") if not isinstance(value, str): raise ValueError(f"Invalid index_to_id value type: {type(value)}, expected str") return docstore, index_to_id ``` ### Patched `_load` Method ```python def _load(self, index_path: str, docstore_path: str): """ Load FAISS index and docstore from disk. Supports both JSON (preferred) and legacy pickle formats. Pickle files are loaded using a restricted unpickler that only allows basic Python types to prevent arbitrary code execution (CVE mitigation). """ try: self.index = faiss.read_index(index_path) # Determine docstore format - prefer JSON over pickle json_docstore_path = docstore_path.replace(".pkl", ".json") if os.path.exists(json_docstore_path): # Load from JSON (safe, preferred format) with open(json_docstore_path, "r", encoding="utf-8") as f: data = json.load(f) # JSON keys are always strings, convert back to int self.index_to_id = {int(k): v for k, v in data.get("index_to_id", {}).items()} logger.info(f"Loaded FAISS index from {index_path} with {self.index.ntotal} vectors (JSON format)") elif os.path.exists(docstore_path): # Load from legacy pickle using safe unpickler # This prevents arbitrary code execution from malicious pickle files logger.warning( f"Loading legacy pickle docstore from {docstore_path}. " f"Consider migrating to JSON format for better security." ) data = _safe_pickle_load(docstore_path) self.docstore, self.index_to_id = _validate_docstore_structure(data) logger.info(f"Loaded FAISS index from {index_path} with {self.index.ntotal} vectors (pickle format)") else: raise FileNotFoundError(f"No docstore found at {docstore_path} or {json_docstore_path}") except pickle.UnpicklingError as e: lo