ssearch/sandbox.ipynb
Eric e9fc99ddc6 Initial commit: RAG pipeline for semantic search over personal journal archive
Vector search with cross-encoder re-ranking, hybrid BM25+vector retrieval,
incremental index updates, and multiple LLM backends (Ollama local, OpenAI API).
2026-02-20 06:02:28 -05:00

973 lines
44 KiB
Text

{
"cells": [
{
"cell_type": "markdown",
"id": "11d5ae50",
"metadata": {},
"source": [
"# llamaindex sandbox\n",
"\n",
"Using this to explore llamaindex\\\n",
"August 2025"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "813f8b1a",
"metadata": {},
"outputs": [],
"source": [
"import llama_index.core"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "656faffb",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"['BaseCallbackHandler', 'BasePromptTemplate', 'Callable', 'ChatPromptTemplate', 'ComposableGraph', 'Document', 'DocumentSummaryIndex', 'GPTDocumentSummaryIndex', 'GPTKeywordTableIndex', 'GPTListIndex', 'GPTRAKEKeywordTableIndex', 'GPTSimpleKeywordTableIndex', 'GPTTreeIndex', 'GPTVectorStoreIndex', 'IndexStructType', 'KeywordTableIndex', 'KnowledgeGraphIndex', 'ListIndex', 'MockEmbedding', 'NullHandler', 'Optional', 'Prompt', 'PromptHelper', 'PromptTemplate', 'PropertyGraphIndex', 'QueryBundle', 'RAKEKeywordTableIndex', 'Response', 'SQLContextBuilder', 'SQLDatabase', 'SQLDocumentContextBuilder', 'SelectorPromptTemplate', 'ServiceContext', 'Settings', 'SimpleDirectoryReader', 'SimpleKeywordTableIndex', 'StorageContext', 'SummaryIndex', 'TreeIndex', 'VectorStoreIndex', '__all__', '__annotations__', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__path__', '__spec__', '__version__', 'async_utils', 'base', 'bridge', 'callbacks', 'chat_engine', 'constants', 'data_structs', 'download', 'download_loader', 'embeddings', 'evaluation', 'get_response_synthesizer', 'get_tokenizer', 'global_handler', 'global_tokenizer', 'graph_stores', 'image_retriever', 'indices', 'ingestion', 'instrumentation', 'llama_dataset', 'llms', 'load_graph_from_storage', 'load_index_from_storage', 'load_indices_from_storage', 'logging', 'memory', 'multi_modal_llms', 'node_parser', 'objects', 'output_parsers', 'postprocessor', 'prompts', 'query_engine', 'question_gen', 'readers', 'response', 'response_synthesizers', 'schema', 'selectors', 'service_context', 'set_global_handler', 'set_global_service_context', 'set_global_tokenizer', 'settings', 'storage', 'tools', 'types', 'utilities', 'utils', 'vector_stores', 'workflow']\n"
]
}
],
"source": [
"# List available objects\n",
"print(dir(llama_index.core))"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "bea0759d",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"BaseCallbackHandler\n",
"BasePromptTemplate\n",
"Callable\n",
"ChatPromptTemplate\n",
"ComposableGraph\n",
"Document\n",
"DocumentSummaryIndex\n",
"GPTDocumentSummaryIndex\n",
"GPTKeywordTableIndex\n",
"GPTListIndex\n",
"GPTRAKEKeywordTableIndex\n",
"GPTSimpleKeywordTableIndex\n",
"GPTTreeIndex\n",
"GPTVectorStoreIndex\n",
"IndexStructType\n",
"KeywordTableIndex\n",
"KnowledgeGraphIndex\n",
"ListIndex\n",
"MockEmbedding\n",
"NullHandler\n",
"Optional\n",
"Prompt\n",
"PromptHelper\n",
"PromptTemplate\n",
"PropertyGraphIndex\n",
"QueryBundle\n",
"RAKEKeywordTableIndex\n",
"Response\n",
"SQLContextBuilder\n",
"SQLDatabase\n",
"SQLDocumentContextBuilder\n",
"SelectorPromptTemplate\n",
"ServiceContext\n",
"Settings\n",
"SimpleDirectoryReader\n",
"SimpleKeywordTableIndex\n",
"StorageContext\n",
"SummaryIndex\n",
"TreeIndex\n",
"VectorStoreIndex\n",
"__all__\n",
"__annotations__\n",
"__builtins__\n",
"__cached__\n",
"__doc__\n",
"__file__\n",
"__loader__\n",
"__name__\n",
"__package__\n",
"__path__\n",
"__spec__\n",
"__version__\n",
"async_utils\n",
"base\n",
"bridge\n",
"callbacks\n",
"chat_engine\n",
"constants\n",
"data_structs\n",
"download\n",
"download_loader\n",
"embeddings\n",
"evaluation\n",
"get_response_synthesizer\n",
"get_tokenizer\n",
"global_handler\n",
"global_tokenizer\n",
"graph_stores\n",
"image_retriever\n",
"indices\n",
"ingestion\n",
"instrumentation\n",
"llama_dataset\n",
"llms\n",
"load_graph_from_storage\n",
"load_index_from_storage\n",
"load_indices_from_storage\n",
"logging\n",
"memory\n",
"multi_modal_llms\n",
"node_parser\n",
"objects\n",
"output_parsers\n",
"postprocessor\n",
"prompts\n",
"query_engine\n",
"question_gen\n",
"readers\n",
"response\n",
"response_synthesizers\n",
"schema\n",
"selectors\n",
"service_context\n",
"set_global_handler\n",
"set_global_service_context\n",
"set_global_tokenizer\n",
"settings\n",
"storage\n",
"tools\n",
"types\n",
"utilities\n",
"utils\n",
"vector_stores\n",
"workflow\n"
]
}
],
"source": [
"# Better formatted output for list of available objects\n",
"objects = dir(llama_index.core)\n",
"for obj in objects:\n",
" print(obj)"
]
},
{
"cell_type": "code",
"execution_count": 15,
"id": "3886a5f0",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"list"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# dir returns a list\n",
"type(objects)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "272cb0c9",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"104"
]
},
"execution_count": 14,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# In the case of llamaindex.core, it contains 104 objects\n",
"\n",
"len(objects)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "bfffc03f",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Help on class VectorStoreIndex in module llama_index.core.indices.vector_store.base:\n",
"\n",
"class VectorStoreIndex(llama_index.core.indices.base.BaseIndex)\n",
" | VectorStoreIndex(nodes: Optional[Sequence[llama_index.core.schema.BaseNode]] = None, use_async: bool = False, store_nodes_override: bool = False, embed_model: Union[llama_index.core.base.embeddings.base.BaseEmbedding, ForwardRef('LCEmbeddings'), str, NoneType] = None, insert_batch_size: int = 2048, objects: Optional[Sequence[llama_index.core.schema.IndexNode]] = None, index_struct: Optional[llama_index.core.data_structs.data_structs.IndexDict] = None, storage_context: Optional[llama_index.core.storage.storage_context.StorageContext] = None, callback_manager: Optional[llama_index.core.callbacks.base.CallbackManager] = None, transformations: Optional[List[llama_index.core.schema.TransformComponent]] = None, show_progress: bool = False, **kwargs: Any) -> None\n",
" |\n",
" | Vector Store Index.\n",
" |\n",
" | Args:\n",
" | use_async (bool): Whether to use asynchronous calls. Defaults to False.\n",
" | show_progress (bool): Whether to show tqdm progress bars. Defaults to False.\n",
" | store_nodes_override (bool): set to True to always store Node objects in index\n",
" | store and document store even if vector store keeps text. Defaults to False\n",
" |\n",
" | Method resolution order:\n",
" | VectorStoreIndex\n",
" | llama_index.core.indices.base.BaseIndex\n",
" | typing.Generic\n",
" | abc.ABC\n",
" | builtins.object\n",
" |\n",
" | Methods defined here:\n",
" |\n",
" | __init__(self, nodes: Optional[Sequence[llama_index.core.schema.BaseNode]] = None, use_async: bool = False, store_nodes_override: bool = False, embed_model: Union[llama_index.core.base.embeddings.base.BaseEmbedding, ForwardRef('LCEmbeddings'), str, NoneType] = None, insert_batch_size: int = 2048, objects: Optional[Sequence[llama_index.core.schema.IndexNode]] = None, index_struct: Optional[llama_index.core.data_structs.data_structs.IndexDict] = None, storage_context: Optional[llama_index.core.storage.storage_context.StorageContext] = None, callback_manager: Optional[llama_index.core.callbacks.base.CallbackManager] = None, transformations: Optional[List[llama_index.core.schema.TransformComponent]] = None, show_progress: bool = False, **kwargs: Any) -> None\n",
" | Initialize params.\n",
" |\n",
" | async adelete_nodes(self, node_ids: List[str], delete_from_docstore: bool = False, **delete_kwargs: Any) -> None\n",
" | Delete a list of nodes from the index.\n",
" |\n",
" | Args:\n",
" | node_ids (List[str]): A list of node_ids from the nodes to delete\n",
" |\n",
" | async adelete_ref_doc(self, ref_doc_id: str, delete_from_docstore: bool = False, **delete_kwargs: Any) -> None\n",
" | Delete a document and it's nodes by using ref_doc_id.\n",
" |\n",
" | async ainsert_nodes(self, nodes: Sequence[llama_index.core.schema.BaseNode], **insert_kwargs: Any) -> None\n",
" | Insert nodes.\n",
" |\n",
" | NOTE: overrides BaseIndex.ainsert_nodes.\n",
" | VectorStoreIndex only stores nodes in document store\n",
" | if vector store does not store text\n",
" |\n",
" | as_retriever(self, **kwargs: Any) -> llama_index.core.base.base_retriever.BaseRetriever\n",
" |\n",
" | build_index_from_nodes(self, nodes: Sequence[llama_index.core.schema.BaseNode], **insert_kwargs: Any) -> llama_index.core.data_structs.data_structs.IndexDict\n",
" | Build the index from nodes.\n",
" |\n",
" | NOTE: Overrides BaseIndex.build_index_from_nodes.\n",
" | VectorStoreIndex only stores nodes in document store\n",
" | if vector store does not store text\n",
" |\n",
" | delete_nodes(self, node_ids: List[str], delete_from_docstore: bool = False, **delete_kwargs: Any) -> None\n",
" | Delete a list of nodes from the index.\n",
" |\n",
" | Args:\n",
" | node_ids (List[str]): A list of node_ids from the nodes to delete\n",
" |\n",
" | delete_ref_doc(self, ref_doc_id: str, delete_from_docstore: bool = False, **delete_kwargs: Any) -> None\n",
" | Delete a document and it's nodes by using ref_doc_id.\n",
" |\n",
" | insert_nodes(self, nodes: Sequence[llama_index.core.schema.BaseNode], **insert_kwargs: Any) -> None\n",
" | Insert nodes.\n",
" |\n",
" | NOTE: overrides BaseIndex.insert_nodes.\n",
" | VectorStoreIndex only stores nodes in document store\n",
" | if vector store does not store text\n",
" |\n",
" | ----------------------------------------------------------------------\n",
" | Class methods defined here:\n",
" |\n",
" | from_vector_store(vector_store: llama_index.core.vector_stores.types.BasePydanticVectorStore, embed_model: Union[llama_index.core.base.embeddings.base.BaseEmbedding, ForwardRef('LCEmbeddings'), str, NoneType] = None, **kwargs: Any) -> 'VectorStoreIndex'\n",
" |\n",
" | ----------------------------------------------------------------------\n",
" | Readonly properties defined here:\n",
" |\n",
" | ref_doc_info\n",
" | Retrieve a dict mapping of ingested documents and their nodes+metadata.\n",
" |\n",
" | vector_store\n",
" |\n",
" | ----------------------------------------------------------------------\n",
" | Data and other attributes defined here:\n",
" |\n",
" | __abstractmethods__ = frozenset()\n",
" |\n",
" | __annotations__ = {}\n",
" |\n",
" | __orig_bases__ = (llama_index.core.indices.base.BaseIndex[llama_index....\n",
" |\n",
" | __parameters__ = ()\n",
" |\n",
" | index_struct_cls = <class 'llama_index.core.data_structs.data_structs....\n",
" | A simple dictionary of documents.\n",
" |\n",
" |\n",
" | ----------------------------------------------------------------------\n",
" | Methods inherited from llama_index.core.indices.base.BaseIndex:\n",
" |\n",
" | async ainsert(self, document: llama_index.core.schema.Document, **insert_kwargs: Any) -> None\n",
" | Asynchronously insert a document.\n",
" |\n",
" | async arefresh_ref_docs(self, documents: Sequence[llama_index.core.schema.Document], **update_kwargs: Any) -> List[bool]\n",
" | Asynchronously refresh an index with documents that have changed.\n",
" |\n",
" | This allows users to save LLM and Embedding model calls, while only\n",
" | updating documents that have any changes in text or metadata. It\n",
" | will also insert any documents that previously were not stored.\n",
" |\n",
" | as_chat_engine(self, chat_mode: llama_index.core.chat_engine.types.ChatMode = <ChatMode.BEST: 'best'>, llm: Union[str, llama_index.core.llms.llm.LLM, ForwardRef('BaseLanguageModel'), NoneType] = None, **kwargs: Any) -> llama_index.core.chat_engine.types.BaseChatEngine\n",
" | Convert the index to a chat engine.\n",
" |\n",
" | Calls `index.as_query_engine(llm=llm, **kwargs)` to get the query engine and then\n",
" | wraps it in a chat engine based on the chat mode.\n",
" |\n",
" | Chat modes:\n",
" | - `ChatMode.BEST` (default): Chat engine that uses an agent (react or openai) with a query engine tool\n",
" | - `ChatMode.CONTEXT`: Chat engine that uses a retriever to get context\n",
" | - `ChatMode.CONDENSE_QUESTION`: Chat engine that condenses questions\n",
" | - `ChatMode.CONDENSE_PLUS_CONTEXT`: Chat engine that condenses questions and uses a retriever to get context\n",
" | - `ChatMode.SIMPLE`: Simple chat engine that uses the LLM directly\n",
" | - `ChatMode.REACT`: Chat engine that uses a react agent with a query engine tool\n",
" | - `ChatMode.OPENAI`: Chat engine that uses an openai agent with a query engine tool\n",
" |\n",
" | as_query_engine(self, llm: Union[str, llama_index.core.llms.llm.LLM, ForwardRef('BaseLanguageModel'), NoneType] = None, **kwargs: Any) -> llama_index.core.base.base_query_engine.BaseQueryEngine\n",
" | Convert the index to a query engine.\n",
" |\n",
" | Calls `index.as_retriever(**kwargs)` to get the retriever and then wraps it in a\n",
" | `RetrieverQueryEngine.from_args(retriever, **kwrags)` call.\n",
" |\n",
" | async aupdate_ref_doc(self, document: llama_index.core.schema.Document, **update_kwargs: Any) -> None\n",
" | Asynchronously update a document and it's corresponding nodes.\n",
" |\n",
" | This is equivalent to deleting the document and then inserting it again.\n",
" |\n",
" | Args:\n",
" | document (Union[BaseDocument, BaseIndex]): document to update\n",
" | insert_kwargs (Dict): kwargs to pass to insert\n",
" | delete_kwargs (Dict): kwargs to pass to delete\n",
" |\n",
" | delete(self, doc_id: str, **delete_kwargs: Any) -> None\n",
" | Delete a document from the index.\n",
" | All nodes in the index related to the index will be deleted.\n",
" |\n",
" | Args:\n",
" | doc_id (str): A doc_id of the ingested document\n",
" |\n",
" | insert(self, document: llama_index.core.schema.Document, **insert_kwargs: Any) -> None\n",
" | Insert a document.\n",
" |\n",
" | refresh(self, documents: Sequence[llama_index.core.schema.Document], **update_kwargs: Any) -> List[bool]\n",
" | Refresh an index with documents that have changed.\n",
" |\n",
" | This allows users to save LLM and Embedding model calls, while only\n",
" | updating documents that have any changes in text or metadata. It\n",
" | will also insert any documents that previously were not stored.\n",
" |\n",
" | refresh_ref_docs(self, documents: Sequence[llama_index.core.schema.Document], **update_kwargs: Any) -> List[bool]\n",
" | Refresh an index with documents that have changed.\n",
" |\n",
" | This allows users to save LLM and Embedding model calls, while only\n",
" | updating documents that have any changes in text or metadata. It\n",
" | will also insert any documents that previously were not stored.\n",
" |\n",
" | set_index_id(self, index_id: str) -> None\n",
" | Set the index id.\n",
" |\n",
" | NOTE: if you decide to set the index_id on the index_struct manually,\n",
" | you will need to explicitly call `add_index_struct` on the `index_store`\n",
" | to update the index store.\n",
" |\n",
" | Args:\n",
" | index_id (str): Index id to set.\n",
" |\n",
" | update(self, document: llama_index.core.schema.Document, **update_kwargs: Any) -> None\n",
" | Update a document and it's corresponding nodes.\n",
" |\n",
" | This is equivalent to deleting the document and then inserting it again.\n",
" |\n",
" | Args:\n",
" | document (Union[BaseDocument, BaseIndex]): document to update\n",
" | insert_kwargs (Dict): kwargs to pass to insert\n",
" | delete_kwargs (Dict): kwargs to pass to delete\n",
" |\n",
" | update_ref_doc(self, document: llama_index.core.schema.Document, **update_kwargs: Any) -> None\n",
" | Update a document and it's corresponding nodes.\n",
" |\n",
" | This is equivalent to deleting the document and then inserting it again.\n",
" |\n",
" | Args:\n",
" | document (Union[BaseDocument, BaseIndex]): document to update\n",
" | insert_kwargs (Dict): kwargs to pass to insert\n",
" | delete_kwargs (Dict): kwargs to pass to delete\n",
" |\n",
" | ----------------------------------------------------------------------\n",
" | Class methods inherited from llama_index.core.indices.base.BaseIndex:\n",
" |\n",
" | from_documents(documents: Sequence[llama_index.core.schema.Document], storage_context: Optional[llama_index.core.storage.storage_context.StorageContext] = None, show_progress: bool = False, callback_manager: Optional[llama_index.core.callbacks.base.CallbackManager] = None, transformations: Optional[List[llama_index.core.schema.TransformComponent]] = None, **kwargs: Any) -> ~IndexType\n",
" | Create index from documents.\n",
" |\n",
" | Args:\n",
" | documents (Sequence[Document]]): List of documents to\n",
" | build the index from.\n",
" |\n",
" | ----------------------------------------------------------------------\n",
" | Readonly properties inherited from llama_index.core.indices.base.BaseIndex:\n",
" |\n",
" | docstore\n",
" | Get the docstore corresponding to the index.\n",
" |\n",
" | index_id\n",
" | Get the index struct.\n",
" |\n",
" | index_struct\n",
" | Get the index struct.\n",
" |\n",
" | storage_context\n",
" |\n",
" | ----------------------------------------------------------------------\n",
" | Data descriptors inherited from llama_index.core.indices.base.BaseIndex:\n",
" |\n",
" | __dict__\n",
" | dictionary for instance variables\n",
" |\n",
" | __weakref__\n",
" | list of weak references to the object\n",
" |\n",
" | summary\n",
" |\n",
" | ----------------------------------------------------------------------\n",
" | Class methods inherited from typing.Generic:\n",
" |\n",
" | __class_getitem__(...)\n",
" | Parameterizes a generic class.\n",
" |\n",
" | At least, parameterizing a generic class is the *main* thing this\n",
" | method does. For example, for some generic class `Foo`, this is called\n",
" | when we do `Foo[int]` - there, with `cls=Foo` and `params=int`.\n",
" |\n",
" | However, note that this method is also called when defining generic\n",
" | classes in the first place with `class Foo[T]: ...`.\n",
" |\n",
" | __init_subclass__(...)\n",
" | Function to initialize subclasses.\n",
"\n"
]
}
],
"source": [
"# Get help on a specific object\n",
"help(llama_index.core.VectorStoreIndex)\n"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "3eb5f1b7",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"class VectorStoreIndex(BaseIndex[IndexDict]):\n",
" \"\"\"\n",
" Vector Store Index.\n",
"\n",
" Args:\n",
" use_async (bool): Whether to use asynchronous calls. Defaults to False.\n",
" show_progress (bool): Whether to show tqdm progress bars. Defaults to False.\n",
" store_nodes_override (bool): set to True to always store Node objects in index\n",
" store and document store even if vector store keeps text. Defaults to False\n",
"\n",
" \"\"\"\n",
"\n",
" index_struct_cls = IndexDict\n",
"\n",
" def __init__(\n",
" self,\n",
" nodes: Optional[Sequence[BaseNode]] = None,\n",
" # vector store index params\n",
" use_async: bool = False,\n",
" store_nodes_override: bool = False,\n",
" embed_model: Optional[EmbedType] = None,\n",
" insert_batch_size: int = 2048,\n",
" # parent class params\n",
" objects: Optional[Sequence[IndexNode]] = None,\n",
" index_struct: Optional[IndexDict] = None,\n",
" storage_context: Optional[StorageContext] = None,\n",
" callback_manager: Optional[CallbackManager] = None,\n",
" transformations: Optional[List[TransformComponent]] = None,\n",
" show_progress: bool = False,\n",
" **kwargs: Any,\n",
" ) -> None:\n",
" \"\"\"Initialize params.\"\"\"\n",
" self._use_async = use_async\n",
" self._store_nodes_override = store_nodes_override\n",
" self._embed_model = resolve_embed_model(\n",
" embed_model or Settings.embed_model, callback_manager=callback_manager\n",
" )\n",
"\n",
" self._insert_batch_size = insert_batch_size\n",
" super().__init__(\n",
" nodes=nodes,\n",
" index_struct=index_struct,\n",
" storage_context=storage_context,\n",
" show_progress=show_progress,\n",
" objects=objects,\n",
" callback_manager=callback_manager,\n",
" transformations=transformations,\n",
" **kwargs,\n",
" )\n",
"\n",
" @classmethod\n",
" def from_vector_store(\n",
" cls,\n",
" vector_store: BasePydanticVectorStore,\n",
" embed_model: Optional[EmbedType] = None,\n",
" **kwargs: Any,\n",
" ) -> \"VectorStoreIndex\":\n",
" if not vector_store.stores_text:\n",
" raise ValueError(\n",
" \"Cannot initialize from a vector store that does not store text.\"\n",
" )\n",
"\n",
" kwargs.pop(\"storage_context\", None)\n",
" storage_context = StorageContext.from_defaults(vector_store=vector_store)\n",
"\n",
" return cls(\n",
" nodes=[],\n",
" embed_model=embed_model,\n",
" storage_context=storage_context,\n",
" **kwargs,\n",
" )\n",
"\n",
" @property\n",
" def vector_store(self) -> BasePydanticVectorStore:\n",
" return self._vector_store\n",
"\n",
" def as_retriever(self, **kwargs: Any) -> BaseRetriever:\n",
" # NOTE: lazy import\n",
" from llama_index.core.indices.vector_store.retrievers import (\n",
" VectorIndexRetriever,\n",
" )\n",
"\n",
" return VectorIndexRetriever(\n",
" self,\n",
" node_ids=list(self.index_struct.nodes_dict.values()),\n",
" callback_manager=self._callback_manager,\n",
" object_map=self._object_map,\n",
" **kwargs,\n",
" )\n",
"\n",
" def _get_node_with_embedding(\n",
" self,\n",
" nodes: Sequence[BaseNode],\n",
" show_progress: bool = False,\n",
" ) -> List[BaseNode]:\n",
" \"\"\"\n",
" Get tuples of id, node, and embedding.\n",
"\n",
" Allows us to store these nodes in a vector store.\n",
" Embeddings are called in batches.\n",
"\n",
" \"\"\"\n",
" id_to_embed_map = embed_nodes(\n",
" nodes, self._embed_model, show_progress=show_progress\n",
" )\n",
"\n",
" results = []\n",
" for node in nodes:\n",
" embedding = id_to_embed_map[node.node_id]\n",
" result = node.model_copy()\n",
" result.embedding = embedding\n",
" results.append(result)\n",
" return results\n",
"\n",
" async def _aget_node_with_embedding(\n",
" self,\n",
" nodes: Sequence[BaseNode],\n",
" show_progress: bool = False,\n",
" ) -> List[BaseNode]:\n",
" \"\"\"\n",
" Asynchronously get tuples of id, node, and embedding.\n",
"\n",
" Allows us to store these nodes in a vector store.\n",
" Embeddings are called in batches.\n",
"\n",
" \"\"\"\n",
" id_to_embed_map = await async_embed_nodes(\n",
" nodes=nodes,\n",
" embed_model=self._embed_model,\n",
" show_progress=show_progress,\n",
" )\n",
"\n",
" results = []\n",
" for node in nodes:\n",
" embedding = id_to_embed_map[node.node_id]\n",
" result = node.model_copy()\n",
" result.embedding = embedding\n",
" results.append(result)\n",
" return results\n",
"\n",
" async def _async_add_nodes_to_index(\n",
" self,\n",
" index_struct: IndexDict,\n",
" nodes: Sequence[BaseNode],\n",
" show_progress: bool = False,\n",
" **insert_kwargs: Any,\n",
" ) -> None:\n",
" \"\"\"Asynchronously add nodes to index.\"\"\"\n",
" if not nodes:\n",
" return\n",
"\n",
" for nodes_batch in iter_batch(nodes, self._insert_batch_size):\n",
" nodes_batch = await self._aget_node_with_embedding(\n",
" nodes_batch, show_progress\n",
" )\n",
" new_ids = await self._vector_store.async_add(nodes_batch, **insert_kwargs)\n",
"\n",
" # if the vector store doesn't store text, we need to add the nodes to the\n",
" # index struct and document store\n",
" if not self._vector_store.stores_text or self._store_nodes_override:\n",
" for node, new_id in zip(nodes_batch, new_ids):\n",
" # NOTE: remove embedding from node to avoid duplication\n",
" node_without_embedding = node.model_copy()\n",
" node_without_embedding.embedding = None\n",
"\n",
" index_struct.add_node(node_without_embedding, text_id=new_id)\n",
" await self._docstore.async_add_documents(\n",
" [node_without_embedding], allow_update=True\n",
" )\n",
" else:\n",
" # NOTE: if the vector store keeps text,\n",
" # we only need to add image and index nodes\n",
" for node, new_id in zip(nodes_batch, new_ids):\n",
" if isinstance(node, (ImageNode, IndexNode)):\n",
" # NOTE: remove embedding from node to avoid duplication\n",
" node_without_embedding = node.model_copy()\n",
" node_without_embedding.embedding = None\n",
"\n",
" index_struct.add_node(node_without_embedding, text_id=new_id)\n",
" await self._docstore.async_add_documents(\n",
" [node_without_embedding], allow_update=True\n",
" )\n",
"\n",
" def _add_nodes_to_index(\n",
" self,\n",
" index_struct: IndexDict,\n",
" nodes: Sequence[BaseNode],\n",
" show_progress: bool = False,\n",
" **insert_kwargs: Any,\n",
" ) -> None:\n",
" \"\"\"Add document to index.\"\"\"\n",
" if not nodes:\n",
" return\n",
"\n",
" for nodes_batch in iter_batch(nodes, self._insert_batch_size):\n",
" nodes_batch = self._get_node_with_embedding(nodes_batch, show_progress)\n",
" new_ids = self._vector_store.add(nodes_batch, **insert_kwargs)\n",
"\n",
" if not self._vector_store.stores_text or self._store_nodes_override:\n",
" # NOTE: if the vector store doesn't store text,\n",
" # we need to add the nodes to the index struct and document store\n",
" for node, new_id in zip(nodes_batch, new_ids):\n",
" # NOTE: remove embedding from node to avoid duplication\n",
" node_without_embedding = node.model_copy()\n",
" node_without_embedding.embedding = None\n",
"\n",
" index_struct.add_node(node_without_embedding, text_id=new_id)\n",
" self._docstore.add_documents(\n",
" [node_without_embedding], allow_update=True\n",
" )\n",
" else:\n",
" # NOTE: if the vector store keeps text,\n",
" # we only need to add image and index nodes\n",
" for node, new_id in zip(nodes_batch, new_ids):\n",
" if isinstance(node, (ImageNode, IndexNode)):\n",
" # NOTE: remove embedding from node to avoid duplication\n",
" node_without_embedding = node.model_copy()\n",
" node_without_embedding.embedding = None\n",
"\n",
" index_struct.add_node(node_without_embedding, text_id=new_id)\n",
" self._docstore.add_documents(\n",
" [node_without_embedding], allow_update=True\n",
" )\n",
"\n",
" def _build_index_from_nodes(\n",
" self,\n",
" nodes: Sequence[BaseNode],\n",
" **insert_kwargs: Any,\n",
" ) -> IndexDict:\n",
" \"\"\"Build index from nodes.\"\"\"\n",
" index_struct = self.index_struct_cls()\n",
" if self._use_async:\n",
" tasks = [\n",
" self._async_add_nodes_to_index(\n",
" index_struct,\n",
" nodes,\n",
" show_progress=self._show_progress,\n",
" **insert_kwargs,\n",
" )\n",
" ]\n",
" run_async_tasks(tasks)\n",
" else:\n",
" self._add_nodes_to_index(\n",
" index_struct,\n",
" nodes,\n",
" show_progress=self._show_progress,\n",
" **insert_kwargs,\n",
" )\n",
" return index_struct\n",
"\n",
" def build_index_from_nodes(\n",
" self,\n",
" nodes: Sequence[BaseNode],\n",
" **insert_kwargs: Any,\n",
" ) -> IndexDict:\n",
" \"\"\"\n",
" Build the index from nodes.\n",
"\n",
" NOTE: Overrides BaseIndex.build_index_from_nodes.\n",
" VectorStoreIndex only stores nodes in document store\n",
" if vector store does not store text\n",
" \"\"\"\n",
" # Filter out the nodes that don't have content\n",
" content_nodes = [\n",
" node\n",
" for node in nodes\n",
" if node.get_content(metadata_mode=MetadataMode.EMBED) != \"\"\n",
" ]\n",
"\n",
" # Report if some nodes are missing content\n",
" if len(content_nodes) != len(nodes):\n",
" print(\"Some nodes are missing content, skipping them...\")\n",
"\n",
" return self._build_index_from_nodes(content_nodes, **insert_kwargs)\n",
"\n",
" def _insert(self, nodes: Sequence[BaseNode], **insert_kwargs: Any) -> None:\n",
" \"\"\"Insert a document.\"\"\"\n",
" self._add_nodes_to_index(self._index_struct, nodes, **insert_kwargs)\n",
"\n",
" def _validate_serializable(self, nodes: Sequence[BaseNode]) -> None:\n",
" \"\"\"Validate that the nodes are serializable.\"\"\"\n",
" for node in nodes:\n",
" if isinstance(node, IndexNode):\n",
" try:\n",
" node.dict()\n",
" except ValueError:\n",
" self._object_map[node.index_id] = node.obj\n",
" node.obj = None\n",
"\n",
" async def ainsert_nodes(\n",
" self, nodes: Sequence[BaseNode], **insert_kwargs: Any\n",
" ) -> None:\n",
" \"\"\"\n",
" Insert nodes.\n",
"\n",
" NOTE: overrides BaseIndex.ainsert_nodes.\n",
" VectorStoreIndex only stores nodes in document store\n",
" if vector store does not store text\n",
" \"\"\"\n",
" self._validate_serializable(nodes)\n",
"\n",
" with self._callback_manager.as_trace(\"insert_nodes\"):\n",
" await self._async_add_nodes_to_index(\n",
" self._index_struct, nodes, **insert_kwargs\n",
" )\n",
" self._storage_context.index_store.add_index_struct(self._index_struct)\n",
"\n",
" def insert_nodes(self, nodes: Sequence[BaseNode], **insert_kwargs: Any) -> None:\n",
" \"\"\"\n",
" Insert nodes.\n",
"\n",
" NOTE: overrides BaseIndex.insert_nodes.\n",
" VectorStoreIndex only stores nodes in document store\n",
" if vector store does not store text\n",
" \"\"\"\n",
" self._validate_serializable(nodes)\n",
"\n",
" with self._callback_manager.as_trace(\"insert_nodes\"):\n",
" self._insert(nodes, **insert_kwargs)\n",
" self._storage_context.index_store.add_index_struct(self._index_struct)\n",
"\n",
" def _delete_node(self, node_id: str, **delete_kwargs: Any) -> None:\n",
" pass\n",
"\n",
" async def adelete_nodes(\n",
" self,\n",
" node_ids: List[str],\n",
" delete_from_docstore: bool = False,\n",
" **delete_kwargs: Any,\n",
" ) -> None:\n",
" \"\"\"\n",
" Delete a list of nodes from the index.\n",
"\n",
" Args:\n",
" node_ids (List[str]): A list of node_ids from the nodes to delete\n",
"\n",
" \"\"\"\n",
" # delete nodes from vector store\n",
" await self._vector_store.adelete_nodes(node_ids, **delete_kwargs)\n",
"\n",
" # delete from docstore only if needed\n",
" if (\n",
" not self._vector_store.stores_text or self._store_nodes_override\n",
" ) and delete_from_docstore:\n",
" for node_id in node_ids:\n",
" self._index_struct.delete(node_id)\n",
" await self._docstore.adelete_document(node_id, raise_error=False)\n",
" self._storage_context.index_store.add_index_struct(self._index_struct)\n",
"\n",
" def delete_nodes(\n",
" self,\n",
" node_ids: List[str],\n",
" delete_from_docstore: bool = False,\n",
" **delete_kwargs: Any,\n",
" ) -> None:\n",
" \"\"\"\n",
" Delete a list of nodes from the index.\n",
"\n",
" Args:\n",
" node_ids (List[str]): A list of node_ids from the nodes to delete\n",
"\n",
" \"\"\"\n",
" # delete nodes from vector store\n",
" self._vector_store.delete_nodes(node_ids, **delete_kwargs)\n",
"\n",
" # delete from docstore only if needed\n",
" if (\n",
" not self._vector_store.stores_text or self._store_nodes_override\n",
" ) and delete_from_docstore:\n",
" for node_id in node_ids:\n",
" self._index_struct.delete(node_id)\n",
" self._docstore.delete_document(node_id, raise_error=False)\n",
" self._storage_context.index_store.add_index_struct(self._index_struct)\n",
"\n",
" def _delete_from_index_struct(self, ref_doc_id: str) -> None:\n",
" # delete from index_struct only if needed\n",
" if not self._vector_store.stores_text or self._store_nodes_override:\n",
" ref_doc_info = self._docstore.get_ref_doc_info(ref_doc_id)\n",
" if ref_doc_info is not None:\n",
" for node_id in ref_doc_info.node_ids:\n",
" self._index_struct.delete(node_id)\n",
" self._vector_store.delete(node_id)\n",
"\n",
" def _delete_from_docstore(self, ref_doc_id: str) -> None:\n",
" # delete from docstore only if needed\n",
" if not self._vector_store.stores_text or self._store_nodes_override:\n",
" self._docstore.delete_ref_doc(ref_doc_id, raise_error=False)\n",
"\n",
" def delete_ref_doc(\n",
" self, ref_doc_id: str, delete_from_docstore: bool = False, **delete_kwargs: Any\n",
" ) -> None:\n",
" \"\"\"Delete a document and it's nodes by using ref_doc_id.\"\"\"\n",
" self._vector_store.delete(ref_doc_id, **delete_kwargs)\n",
" self._delete_from_index_struct(ref_doc_id)\n",
" if delete_from_docstore:\n",
" self._delete_from_docstore(ref_doc_id)\n",
" self._storage_context.index_store.add_index_struct(self._index_struct)\n",
"\n",
" async def _adelete_from_index_struct(self, ref_doc_id: str) -> None:\n",
" \"\"\"Delete from index_struct only if needed.\"\"\"\n",
" if not self._vector_store.stores_text or self._store_nodes_override:\n",
" ref_doc_info = await self._docstore.aget_ref_doc_info(ref_doc_id)\n",
" if ref_doc_info is not None:\n",
" for node_id in ref_doc_info.node_ids:\n",
" self._index_struct.delete(node_id)\n",
" self._vector_store.delete(node_id)\n",
"\n",
" async def _adelete_from_docstore(self, ref_doc_id: str) -> None:\n",
" \"\"\"Delete from docstore only if needed.\"\"\"\n",
" if not self._vector_store.stores_text or self._store_nodes_override:\n",
" await self._docstore.adelete_ref_doc(ref_doc_id, raise_error=False)\n",
"\n",
" async def adelete_ref_doc(\n",
" self, ref_doc_id: str, delete_from_docstore: bool = False, **delete_kwargs: Any\n",
" ) -> None:\n",
" \"\"\"Delete a document and it's nodes by using ref_doc_id.\"\"\"\n",
" tasks = [\n",
" self._vector_store.adelete(ref_doc_id, **delete_kwargs),\n",
" self._adelete_from_index_struct(ref_doc_id),\n",
" ]\n",
" if delete_from_docstore:\n",
" tasks.append(self._adelete_from_docstore(ref_doc_id))\n",
"\n",
" await asyncio.gather(*tasks)\n",
"\n",
" self._storage_context.index_store.add_index_struct(self._index_struct)\n",
"\n",
" @property\n",
" def ref_doc_info(self) -> Dict[str, RefDocInfo]:\n",
" \"\"\"Retrieve a dict mapping of ingested documents and their nodes+metadata.\"\"\"\n",
" if not self._vector_store.stores_text or self._store_nodes_override:\n",
" node_doc_ids = list(self.index_struct.nodes_dict.values())\n",
" nodes = self.docstore.get_nodes(node_doc_ids)\n",
"\n",
" all_ref_doc_info = {}\n",
" for node in nodes:\n",
" ref_node = node.source_node\n",
" if not ref_node:\n",
" continue\n",
"\n",
" ref_doc_info = self.docstore.get_ref_doc_info(ref_node.node_id)\n",
" if not ref_doc_info:\n",
" continue\n",
"\n",
" all_ref_doc_info[ref_node.node_id] = ref_doc_info\n",
" return all_ref_doc_info\n",
" else:\n",
" raise NotImplementedError(\n",
" \"Vector store integrations that store text in the vector store are \"\n",
" \"not supported by ref_doc_info yet.\"\n",
" )\n",
"\n"
]
}
],
"source": [
"import inspect\n",
"print(inspect.getsource(llama_index.core.VectorStoreIndex))"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8125e2de",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.11"
}
},
"nbformat": 4,
"nbformat_minor": 5
}