138 lines
4.2 KiB
Python
138 lines
4.2 KiB
Python
# retrieve_clippings.py
|
|
# Verbatim chunk retrieval from clippings index (ChromaDB).
|
|
# Vector search + cross-encoder re-ranking, no LLM.
|
|
#
|
|
# Returns the top re-ranked chunks with their full text, file metadata, and
|
|
# scores. Includes page numbers for PDF sources when available.
|
|
#
|
|
# E.M.F. February 2026
|
|
|
|
# Environment vars must be set before importing huggingface/transformers
|
|
# libraries, because huggingface_hub.constants evaluates HF_HUB_OFFLINE
|
|
# at import time.
|
|
import os
|
|
os.environ["TOKENIZERS_PARALLELISM"] = "false"
|
|
os.environ["SENTENCE_TRANSFORMERS_HOME"] = "./models"
|
|
os.environ["HF_HUB_OFFLINE"] = "1"
|
|
|
|
import chromadb
|
|
from llama_index.core import VectorStoreIndex, Settings
|
|
from llama_index.vector_stores.chroma import ChromaVectorStore
|
|
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
|
|
from llama_index.core.postprocessor import SentenceTransformerRerank
|
|
import sys
|
|
import textwrap
|
|
|
|
#
|
|
# Globals
|
|
#
|
|
|
|
PERSIST_DIR = "./clippings_search/store_clippings"
|
|
COLLECTION_NAME = "clippings"
|
|
|
|
# Embedding model (must match build_clippings.py)
|
|
EMBED_MODEL = HuggingFaceEmbedding(
|
|
cache_folder="./models",
|
|
model_name="BAAI/bge-large-en-v1.5",
|
|
local_files_only=True,
|
|
)
|
|
|
|
# Cross-encoder model for re-ranking (cached in ./models/)
|
|
RERANK_MODEL = "cross-encoder/ms-marco-MiniLM-L-12-v2"
|
|
RERANK_TOP_N = 15
|
|
RETRIEVE_TOP_K = 30
|
|
|
|
# Output formatting
|
|
WRAP_WIDTH = 80
|
|
|
|
|
|
def main():
|
|
# No LLM needed -- set embed model only
|
|
Settings.embed_model = EMBED_MODEL
|
|
|
|
# Load ChromaDB collection
|
|
client = chromadb.PersistentClient(path=PERSIST_DIR)
|
|
collection = client.get_collection(COLLECTION_NAME)
|
|
|
|
# Build index from existing vector store
|
|
vector_store = ChromaVectorStore(chroma_collection=collection)
|
|
index = VectorStoreIndex.from_vector_store(vector_store)
|
|
|
|
# Build retriever (vector search only, no query engine / LLM)
|
|
retriever = index.as_retriever(similarity_top_k=RETRIEVE_TOP_K)
|
|
|
|
# Cross-encoder re-ranker
|
|
reranker = SentenceTransformerRerank(
|
|
model=RERANK_MODEL,
|
|
top_n=RERANK_TOP_N,
|
|
)
|
|
|
|
# Query
|
|
if len(sys.argv) < 2:
|
|
print("Usage: python retrieve_clippings.py QUERY_TEXT")
|
|
sys.exit(1)
|
|
q = " ".join(sys.argv[1:])
|
|
|
|
# Retrieve and re-rank
|
|
nodes = retriever.retrieve(q)
|
|
reranked = reranker.postprocess_nodes(nodes, query_str=q)
|
|
|
|
# Build result list with metadata
|
|
results = []
|
|
for i, node in enumerate(reranked, 1):
|
|
meta = getattr(node, "metadata", None) or node.node.metadata
|
|
score = getattr(node, "score", None)
|
|
file_name = meta.get("file_name", "unknown")
|
|
page_label = meta.get("page_label", "")
|
|
results.append((i, node, file_name, page_label, score))
|
|
|
|
# --- Summary: source files and rankings ---
|
|
print(f"\nQuery: {q}")
|
|
print(f"Retrieved {len(nodes)} chunks, re-ranked to top {len(reranked)}")
|
|
print(f"({collection.count()} total vectors in collection)\n")
|
|
|
|
# Unique source files in rank order
|
|
seen = set()
|
|
unique_sources = []
|
|
for i, node, file_name, page_label, score in results:
|
|
if file_name not in seen:
|
|
seen.add(file_name)
|
|
unique_sources.append(file_name)
|
|
|
|
print(f"Source files ({len(unique_sources)} unique):")
|
|
for j, fname in enumerate(unique_sources, 1):
|
|
print(f" {j}. {fname}")
|
|
|
|
print(f"\nRankings:")
|
|
for i, node, file_name, page_label, score in results:
|
|
line = f" [{i:2d}] {score:+7.3f} {file_name}"
|
|
if page_label:
|
|
line += f" (p. {page_label})"
|
|
print(line)
|
|
|
|
# --- Full chunk text ---
|
|
print(f"\n{'=' * WRAP_WIDTH}")
|
|
print("CHUNKS")
|
|
print("=" * WRAP_WIDTH)
|
|
|
|
for i, node, file_name, page_label, score in results:
|
|
header = f"=== [{i}] {file_name}"
|
|
if page_label:
|
|
header += f" (p. {page_label})"
|
|
header += f" (score: {score:.3f})"
|
|
|
|
print("\n" + "=" * WRAP_WIDTH)
|
|
print(header)
|
|
print("=" * WRAP_WIDTH)
|
|
|
|
text = node.get_content()
|
|
for line in text.splitlines():
|
|
if line.strip():
|
|
print(textwrap.fill(line, width=WRAP_WIDTH))
|
|
else:
|
|
print()
|
|
print()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|