# build_clippings.py # # Build or update the ChromaDB vector store from clippings in ./clippings. # # Default mode (incremental): loads the existing index and adds only # new or modified files. Use --rebuild for a full rebuild from scratch. # # Handles PDFs, TXT, webarchive, and RTF files. Skips non-extractable PDFs # and writes them to ocr_needed.txt for later OCR processing. # # February 2026 # E. M. Furst # Environment vars must be set before importing huggingface/transformers # libraries, because huggingface_hub.constants evaluates HF_HUB_OFFLINE # at import time. import os os.environ["TOKENIZERS_PARALLELISM"] = "false" os.environ["SENTENCE_TRANSFORMERS_HOME"] = "./models" os.environ["HF_HUB_OFFLINE"] = "1" import chromadb from llama_index.core import ( SimpleDirectoryReader, StorageContext, VectorStoreIndex, Settings, Document, ) from llama_index.vector_stores.chroma import ChromaVectorStore from llama_index.embeddings.huggingface import HuggingFaceEmbedding from llama_index.core.node_parser import SentenceSplitter from pathlib import Path import argparse import datetime import time # Shared constants DATA_DIR = Path("./clippings") PERSIST_DIR = "./clippings_search/store_clippings" COLLECTION_NAME = "clippings" EMBED_MODEL_NAME = "BAAI/bge-large-en-v1.5" CHUNK_SIZE = 256 CHUNK_OVERLAP = 25 # File types handled by SimpleDirectoryReader (PDF + TXT) READER_EXTS = {".pdf", ".txt"} # File types handled by custom loaders CUSTOM_EXTS = {".webarchive", ".rtf"} # All supported extensions SUPPORTED_EXTS = READER_EXTS | CUSTOM_EXTS # Minimum extracted text length to consider a PDF valid (characters) MIN_TEXT_LENGTH = 100 def get_text_splitter(): return SentenceSplitter( chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP, paragraph_separator="\n\n", ) def validate_pdf(file_path): """Check if a PDF has extractable text. Returns (is_valid, reason) where reason explains why it was skipped. """ import pypdf try: reader = pypdf.PdfReader(str(file_path)) page_count = len(reader.pages) total_chars = 0 printable_chars = 0 for page in reader.pages: text = page.extract_text() or "" total_chars += len(text) printable_chars += sum( 1 for c in text if c.isprintable() or c in "\n\r\t" ) if total_chars < MIN_TEXT_LENGTH: return False, f"too little text ({total_chars} chars, {page_count} pages)" ratio = printable_chars / total_chars if total_chars > 0 else 0 if ratio < 0.5: return False, f"low printable ratio ({ratio:.2f}, {page_count} pages)" return True, None except Exception as e: return False, str(e) def load_webarchive(file_path): """Extract text from a macOS .webarchive file. Returns a LlamaIndex Document, or None if extraction fails. """ import plistlib from bs4 import BeautifulSoup try: with open(file_path, "rb") as f: plist = plistlib.load(f) resource = plist.get("WebMainResource", {}) html_bytes = resource.get("WebResourceData", b"") if not html_bytes: return None html = html_bytes.decode("utf-8", errors="replace") soup = BeautifulSoup(html, "html.parser") text = soup.get_text(separator="\n", strip=True) if len(text) < MIN_TEXT_LENGTH: return None stat = file_path.stat() mdate = datetime.datetime.fromtimestamp( stat.st_mtime, tz=datetime.timezone.utc ).strftime("%Y-%m-%d") return Document( text=text, metadata={ "file_name": file_path.name, "file_path": str(file_path), "file_size": stat.st_size, "last_modified_date": mdate, "file_type": "webarchive", }, ) except Exception as e: print(f" Warning: could not read webarchive {file_path.name}: {e}") return None def load_rtf(file_path): """Extract text from an RTF file. Returns a LlamaIndex Document, or None if extraction fails. """ from striprtf.striprtf import rtf_to_text try: with open(file_path, "r", errors="replace") as f: rtf_content = f.read() text = rtf_to_text(rtf_content) if len(text) < MIN_TEXT_LENGTH: return None stat = file_path.stat() mdate = datetime.datetime.fromtimestamp( stat.st_mtime, tz=datetime.timezone.utc ).strftime("%Y-%m-%d") return Document( text=text, metadata={ "file_name": file_path.name, "file_path": str(file_path), "file_size": stat.st_size, "last_modified_date": mdate, "file_type": "rtf", }, ) except Exception as e: print(f" Warning: could not read RTF {file_path.name}: {e}") return None def scan_clippings(): """Scan the clippings directory and classify files. Returns (reader_files, custom_docs, skipped, ocr_needed) where: - reader_files: list of Paths for SimpleDirectoryReader (PDF + TXT) - custom_docs: list of Document objects from custom loaders - skipped: list of (Path, reason) tuples - ocr_needed: list of Paths for PDFs that need OCR """ reader_files = [] custom_docs = [] skipped = [] ocr_needed = [] for fpath in sorted(DATA_DIR.rglob("*")): if not fpath.is_file(): continue if fpath.name.startswith("."): continue ext = fpath.suffix.lower() if ext not in SUPPORTED_EXTS: skipped.append((fpath, f"unsupported type: {ext}")) continue if ext == ".pdf": is_valid, reason = validate_pdf(fpath) if not is_valid: skipped.append((fpath, f"no extractable text: {reason}")) ocr_needed.append(fpath) continue reader_files.append(fpath) elif ext == ".txt": reader_files.append(fpath) elif ext == ".webarchive": doc = load_webarchive(fpath) if doc: custom_docs.append(doc) else: skipped.append((fpath, "no extractable text from webarchive")) elif ext == ".rtf": doc = load_rtf(fpath) if doc: custom_docs.append(doc) else: skipped.append((fpath, "no extractable text from RTF")) return reader_files, custom_docs, skipped, ocr_needed def write_ocr_list(ocr_needed): """Write the list of PDFs needing OCR to ocr_needed.txt.""" with open("ocr_needed.txt", "w") as f: for fpath in ocr_needed: f.write(f"{fpath}\n") print(f"Wrote {len(ocr_needed)} file(s) to ocr_needed.txt") def load_all_documents(reader_files, custom_docs): """Load documents from SimpleDirectoryReader and merge with custom docs.""" documents = [] if reader_files: print(f"Loading {len(reader_files)} PDF/TXT files...") reader_docs = SimpleDirectoryReader( input_files=[str(f) for f in reader_files], filename_as_id=True, ).load_data() documents.extend(reader_docs) if custom_docs: print(f"Adding {len(custom_docs)} webarchive/RTF documents...") documents.extend(custom_docs) return documents def rebuild(reader_files, custom_docs): """Full rebuild: delete existing collection and recreate from scratch.""" client = chromadb.PersistentClient(path=PERSIST_DIR) # Delete existing collection if present try: client.delete_collection(COLLECTION_NAME) print(f"Deleted existing collection '{COLLECTION_NAME}'") except Exception: pass collection = client.get_or_create_collection(COLLECTION_NAME) vector_store = ChromaVectorStore(chroma_collection=collection) storage_context = StorageContext.from_defaults(vector_store=vector_store) documents = load_all_documents(reader_files, custom_docs) if not documents: raise ValueError("No documents loaded") print(f"Loaded {len(documents)} document(s) total") print("Building vector index...") index = VectorStoreIndex.from_documents( documents, storage_context=storage_context, transformations=[get_text_splitter()], show_progress=True, ) print(f"Index built. Collection has {collection.count()} vectors.") return index def update(reader_files, custom_docs): """Incremental update: add new, re-index modified, remove deleted files.""" client = chromadb.PersistentClient(path=PERSIST_DIR) collection = client.get_collection(COLLECTION_NAME) count = collection.count() print(f"Existing collection has {count} vectors") # Get all stored metadata to find what's indexed # Key on file_path (not file_name) to handle duplicate names across subdirs indexed = {} # file_path -> {"ids": [], "file_size": ..., "last_modified_date": ...} if count > 0: results = collection.get(include=["metadatas"]) for i, meta in enumerate(results["metadatas"]): fpath = meta.get("file_path", "") if fpath not in indexed: indexed[fpath] = { "ids": [], "file_size": meta.get("file_size"), "last_modified_date": meta.get("last_modified_date"), } indexed[fpath]["ids"].append(results["ids"][i]) print(f"Index contains {len(indexed)} unique files") # Build disk file lookup: file_path_str -> Path # For reader_files, match the path format SimpleDirectoryReader would store disk_files = {} for f in reader_files: disk_files[str(f)] = f for doc in custom_docs: disk_files[doc.metadata["file_path"]] = Path(doc.metadata["file_path"]) # Classify files new_reader = [] new_custom = [] modified_reader = [] modified_custom = [] deleted_paths = [] unchanged = 0 for path_str, fpath in disk_files.items(): if path_str not in indexed: # Check if it's a custom doc if fpath.suffix.lower() in CUSTOM_EXTS: matching = [d for d in custom_docs if d.metadata["file_path"] == path_str] if matching: new_custom.extend(matching) else: new_reader.append(fpath) else: info = indexed[path_str] stat = fpath.stat() disk_mdate = datetime.datetime.fromtimestamp( stat.st_mtime, tz=datetime.timezone.utc ).strftime("%Y-%m-%d") if stat.st_size != info["file_size"] or disk_mdate != info["last_modified_date"]: if fpath.suffix.lower() in CUSTOM_EXTS: matching = [d for d in custom_docs if d.metadata["file_path"] == path_str] if matching: modified_custom.extend(matching) else: modified_reader.append(fpath) else: unchanged += 1 for path_str in indexed: if path_str not in disk_files: deleted_paths.append(path_str) n_new = len(new_reader) + len(new_custom) n_modified = len(modified_reader) + len(modified_custom) print(f"\n New: {n_new}") print(f" Modified: {n_modified}") print(f" Deleted: {len(deleted_paths)}") print(f" Unchanged: {unchanged}") if n_new == 0 and n_modified == 0 and len(deleted_paths) == 0: print("\nNothing to do.") return # Delete chunks for removed and modified files for path_str in deleted_paths: ids = indexed[path_str]["ids"] fname = Path(path_str).name print(f" Removing {fname} ({len(ids)} chunks)") collection.delete(ids=ids) for fpath in modified_reader: path_str = str(fpath) ids = indexed[path_str]["ids"] print(f" Re-indexing {fpath.name} ({len(ids)} chunks)") collection.delete(ids=ids) for doc in modified_custom: path_str = doc.metadata["file_path"] if path_str in indexed: ids = indexed[path_str]["ids"] print(f" Re-indexing {doc.metadata['file_name']} ({len(ids)} chunks)") collection.delete(ids=ids) # Add new and modified files files_to_add = new_reader + modified_reader docs_to_add = new_custom + modified_custom if files_to_add or docs_to_add: documents = load_all_documents(files_to_add, docs_to_add) if documents: print(f"Indexing {len(documents)} document(s)...") vector_store = ChromaVectorStore(chroma_collection=collection) storage_context = StorageContext.from_defaults(vector_store=vector_store) VectorStoreIndex.from_documents( documents, storage_context=storage_context, transformations=[get_text_splitter()], show_progress=True, ) print(f"\nIndex updated. Collection now has {collection.count()} vectors.") def main(): parser = argparse.ArgumentParser( description="Build or update the clippings vector store (ChromaDB)." ) parser.add_argument( "--rebuild", action="store_true", help="Full rebuild from scratch (default: incremental update)", ) args = parser.parse_args() # Configure embedding model (offline, cached in ./models) embed_model = HuggingFaceEmbedding( model_name=EMBED_MODEL_NAME, cache_folder="./models", local_files_only=True, ) Settings.embed_model = embed_model if not DATA_DIR.exists(): raise FileNotFoundError( f"Clippings directory not found: {DATA_DIR.absolute()}\n" f"Create symlink: ln -s ../clippings ./clippings" ) start = time.time() # Scan and classify files print(f"Scanning {DATA_DIR.absolute()}...") reader_files, custom_docs, skipped, ocr_needed = scan_clippings() n_valid = len(reader_files) + len(custom_docs) print(f"\nFiles to index: {n_valid}") print(f" PDF/TXT: {len(reader_files)}") print(f" Webarchive/RTF: {len(custom_docs)}") print(f"Files skipped: {len(skipped)}") for fpath, reason in skipped: print(f" SKIP: {fpath.name} -- {reason}") if ocr_needed: write_ocr_list(ocr_needed) if n_valid == 0: raise ValueError("No valid files found to index") if args.rebuild: print("\nMode: full rebuild") rebuild(reader_files, custom_docs) else: print("\nMode: incremental update") if not Path(PERSIST_DIR).exists(): print(f"No existing index at {PERSIST_DIR}, doing full rebuild.") rebuild(reader_files, custom_docs) else: update(reader_files, custom_docs) elapsed = time.time() - start print(f"Done in {elapsed:.1f}s") if __name__ == "__main__": main()