Embedded text as a ChromaDB to learn that. Updated requirements to include new depenendencies
471 lines
15 KiB
Python
471 lines
15 KiB
Python
# build_clippings.py
|
|
#
|
|
# Build or update the ChromaDB vector store from clippings in ./clippings.
|
|
#
|
|
# Default mode (incremental): loads the existing index and adds only
|
|
# new or modified files. Use --rebuild for a full rebuild from scratch.
|
|
#
|
|
# Handles PDFs, TXT, webarchive, and RTF files. Skips non-extractable PDFs
|
|
# and writes them to ocr_needed.txt for later OCR processing.
|
|
#
|
|
# February 2026
|
|
# E. M. Furst
|
|
|
|
# Environment vars must be set before importing huggingface/transformers
|
|
# libraries, because huggingface_hub.constants evaluates HF_HUB_OFFLINE
|
|
# at import time.
|
|
import os
|
|
os.environ["TOKENIZERS_PARALLELISM"] = "false"
|
|
os.environ["SENTENCE_TRANSFORMERS_HOME"] = "./models"
|
|
os.environ["HF_HUB_OFFLINE"] = "1"
|
|
|
|
import chromadb
|
|
from llama_index.core import (
|
|
SimpleDirectoryReader,
|
|
StorageContext,
|
|
VectorStoreIndex,
|
|
Settings,
|
|
Document,
|
|
)
|
|
from llama_index.vector_stores.chroma import ChromaVectorStore
|
|
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
|
|
from llama_index.core.node_parser import SentenceSplitter
|
|
from pathlib import Path
|
|
import argparse
|
|
import datetime
|
|
import time
|
|
|
|
# Shared constants
|
|
DATA_DIR = Path("./clippings")
|
|
PERSIST_DIR = "./storage_clippings"
|
|
COLLECTION_NAME = "clippings"
|
|
EMBED_MODEL_NAME = "BAAI/bge-large-en-v1.5"
|
|
CHUNK_SIZE = 256
|
|
CHUNK_OVERLAP = 25
|
|
|
|
# File types handled by SimpleDirectoryReader (PDF + TXT)
|
|
READER_EXTS = {".pdf", ".txt"}
|
|
# File types handled by custom loaders
|
|
CUSTOM_EXTS = {".webarchive", ".rtf"}
|
|
# All supported extensions
|
|
SUPPORTED_EXTS = READER_EXTS | CUSTOM_EXTS
|
|
|
|
# Minimum extracted text length to consider a PDF valid (characters)
|
|
MIN_TEXT_LENGTH = 100
|
|
|
|
|
|
def get_text_splitter():
|
|
return SentenceSplitter(
|
|
chunk_size=CHUNK_SIZE,
|
|
chunk_overlap=CHUNK_OVERLAP,
|
|
paragraph_separator="\n\n",
|
|
)
|
|
|
|
|
|
def validate_pdf(file_path):
|
|
"""Check if a PDF has extractable text.
|
|
|
|
Returns (is_valid, reason) where reason explains why it was skipped.
|
|
"""
|
|
import pypdf
|
|
try:
|
|
reader = pypdf.PdfReader(str(file_path))
|
|
page_count = len(reader.pages)
|
|
total_chars = 0
|
|
printable_chars = 0
|
|
for page in reader.pages:
|
|
text = page.extract_text() or ""
|
|
total_chars += len(text)
|
|
printable_chars += sum(
|
|
1 for c in text if c.isprintable() or c in "\n\r\t"
|
|
)
|
|
|
|
if total_chars < MIN_TEXT_LENGTH:
|
|
return False, f"too little text ({total_chars} chars, {page_count} pages)"
|
|
|
|
ratio = printable_chars / total_chars if total_chars > 0 else 0
|
|
if ratio < 0.5:
|
|
return False, f"low printable ratio ({ratio:.2f}, {page_count} pages)"
|
|
|
|
return True, None
|
|
except Exception as e:
|
|
return False, str(e)
|
|
|
|
|
|
def load_webarchive(file_path):
|
|
"""Extract text from a macOS .webarchive file.
|
|
|
|
Returns a LlamaIndex Document, or None if extraction fails.
|
|
"""
|
|
import plistlib
|
|
from bs4 import BeautifulSoup
|
|
|
|
try:
|
|
with open(file_path, "rb") as f:
|
|
plist = plistlib.load(f)
|
|
|
|
resource = plist.get("WebMainResource", {})
|
|
html_bytes = resource.get("WebResourceData", b"")
|
|
if not html_bytes:
|
|
return None
|
|
|
|
html = html_bytes.decode("utf-8", errors="replace")
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
text = soup.get_text(separator="\n", strip=True)
|
|
|
|
if len(text) < MIN_TEXT_LENGTH:
|
|
return None
|
|
|
|
stat = file_path.stat()
|
|
mdate = datetime.datetime.fromtimestamp(
|
|
stat.st_mtime, tz=datetime.timezone.utc
|
|
).strftime("%Y-%m-%d")
|
|
|
|
return Document(
|
|
text=text,
|
|
metadata={
|
|
"file_name": file_path.name,
|
|
"file_path": str(file_path),
|
|
"file_size": stat.st_size,
|
|
"last_modified_date": mdate,
|
|
"file_type": "webarchive",
|
|
},
|
|
)
|
|
except Exception as e:
|
|
print(f" Warning: could not read webarchive {file_path.name}: {e}")
|
|
return None
|
|
|
|
|
|
def load_rtf(file_path):
|
|
"""Extract text from an RTF file.
|
|
|
|
Returns a LlamaIndex Document, or None if extraction fails.
|
|
"""
|
|
from striprtf.striprtf import rtf_to_text
|
|
|
|
try:
|
|
with open(file_path, "r", errors="replace") as f:
|
|
rtf_content = f.read()
|
|
|
|
text = rtf_to_text(rtf_content)
|
|
|
|
if len(text) < MIN_TEXT_LENGTH:
|
|
return None
|
|
|
|
stat = file_path.stat()
|
|
mdate = datetime.datetime.fromtimestamp(
|
|
stat.st_mtime, tz=datetime.timezone.utc
|
|
).strftime("%Y-%m-%d")
|
|
|
|
return Document(
|
|
text=text,
|
|
metadata={
|
|
"file_name": file_path.name,
|
|
"file_path": str(file_path),
|
|
"file_size": stat.st_size,
|
|
"last_modified_date": mdate,
|
|
"file_type": "rtf",
|
|
},
|
|
)
|
|
except Exception as e:
|
|
print(f" Warning: could not read RTF {file_path.name}: {e}")
|
|
return None
|
|
|
|
|
|
def scan_clippings():
|
|
"""Scan the clippings directory and classify files.
|
|
|
|
Returns (reader_files, custom_docs, skipped, ocr_needed) where:
|
|
- reader_files: list of Paths for SimpleDirectoryReader (PDF + TXT)
|
|
- custom_docs: list of Document objects from custom loaders
|
|
- skipped: list of (Path, reason) tuples
|
|
- ocr_needed: list of Paths for PDFs that need OCR
|
|
"""
|
|
reader_files = []
|
|
custom_docs = []
|
|
skipped = []
|
|
ocr_needed = []
|
|
|
|
for fpath in sorted(DATA_DIR.rglob("*")):
|
|
if not fpath.is_file():
|
|
continue
|
|
if fpath.name.startswith("."):
|
|
continue
|
|
|
|
ext = fpath.suffix.lower()
|
|
|
|
if ext not in SUPPORTED_EXTS:
|
|
skipped.append((fpath, f"unsupported type: {ext}"))
|
|
continue
|
|
|
|
if ext == ".pdf":
|
|
is_valid, reason = validate_pdf(fpath)
|
|
if not is_valid:
|
|
skipped.append((fpath, f"no extractable text: {reason}"))
|
|
ocr_needed.append(fpath)
|
|
continue
|
|
reader_files.append(fpath)
|
|
|
|
elif ext == ".txt":
|
|
reader_files.append(fpath)
|
|
|
|
elif ext == ".webarchive":
|
|
doc = load_webarchive(fpath)
|
|
if doc:
|
|
custom_docs.append(doc)
|
|
else:
|
|
skipped.append((fpath, "no extractable text from webarchive"))
|
|
|
|
elif ext == ".rtf":
|
|
doc = load_rtf(fpath)
|
|
if doc:
|
|
custom_docs.append(doc)
|
|
else:
|
|
skipped.append((fpath, "no extractable text from RTF"))
|
|
|
|
return reader_files, custom_docs, skipped, ocr_needed
|
|
|
|
|
|
def write_ocr_list(ocr_needed):
|
|
"""Write the list of PDFs needing OCR to ocr_needed.txt."""
|
|
with open("ocr_needed.txt", "w") as f:
|
|
for fpath in ocr_needed:
|
|
f.write(f"{fpath}\n")
|
|
print(f"Wrote {len(ocr_needed)} file(s) to ocr_needed.txt")
|
|
|
|
|
|
def load_all_documents(reader_files, custom_docs):
|
|
"""Load documents from SimpleDirectoryReader and merge with custom docs."""
|
|
documents = []
|
|
|
|
if reader_files:
|
|
print(f"Loading {len(reader_files)} PDF/TXT files...")
|
|
reader_docs = SimpleDirectoryReader(
|
|
input_files=[str(f) for f in reader_files],
|
|
filename_as_id=True,
|
|
).load_data()
|
|
documents.extend(reader_docs)
|
|
|
|
if custom_docs:
|
|
print(f"Adding {len(custom_docs)} webarchive/RTF documents...")
|
|
documents.extend(custom_docs)
|
|
|
|
return documents
|
|
|
|
|
|
def rebuild(reader_files, custom_docs):
|
|
"""Full rebuild: delete existing collection and recreate from scratch."""
|
|
client = chromadb.PersistentClient(path=PERSIST_DIR)
|
|
# Delete existing collection if present
|
|
try:
|
|
client.delete_collection(COLLECTION_NAME)
|
|
print(f"Deleted existing collection '{COLLECTION_NAME}'")
|
|
except Exception:
|
|
pass
|
|
|
|
collection = client.get_or_create_collection(COLLECTION_NAME)
|
|
vector_store = ChromaVectorStore(chroma_collection=collection)
|
|
storage_context = StorageContext.from_defaults(vector_store=vector_store)
|
|
|
|
documents = load_all_documents(reader_files, custom_docs)
|
|
if not documents:
|
|
raise ValueError("No documents loaded")
|
|
|
|
print(f"Loaded {len(documents)} document(s) total")
|
|
print("Building vector index...")
|
|
|
|
index = VectorStoreIndex.from_documents(
|
|
documents,
|
|
storage_context=storage_context,
|
|
transformations=[get_text_splitter()],
|
|
show_progress=True,
|
|
)
|
|
|
|
print(f"Index built. Collection has {collection.count()} vectors.")
|
|
return index
|
|
|
|
|
|
def update(reader_files, custom_docs):
|
|
"""Incremental update: add new, re-index modified, remove deleted files."""
|
|
client = chromadb.PersistentClient(path=PERSIST_DIR)
|
|
collection = client.get_collection(COLLECTION_NAME)
|
|
count = collection.count()
|
|
print(f"Existing collection has {count} vectors")
|
|
|
|
# Get all stored metadata to find what's indexed
|
|
# Key on file_path (not file_name) to handle duplicate names across subdirs
|
|
indexed = {} # file_path -> {"ids": [], "file_size": ..., "last_modified_date": ...}
|
|
if count > 0:
|
|
results = collection.get(include=["metadatas"])
|
|
for i, meta in enumerate(results["metadatas"]):
|
|
fpath = meta.get("file_path", "")
|
|
if fpath not in indexed:
|
|
indexed[fpath] = {
|
|
"ids": [],
|
|
"file_size": meta.get("file_size"),
|
|
"last_modified_date": meta.get("last_modified_date"),
|
|
}
|
|
indexed[fpath]["ids"].append(results["ids"][i])
|
|
|
|
print(f"Index contains {len(indexed)} unique files")
|
|
|
|
# Build disk file lookup: file_path_str -> Path
|
|
# For reader_files, match the path format SimpleDirectoryReader would store
|
|
disk_files = {}
|
|
for f in reader_files:
|
|
disk_files[str(f)] = f
|
|
for doc in custom_docs:
|
|
disk_files[doc.metadata["file_path"]] = Path(doc.metadata["file_path"])
|
|
|
|
# Classify files
|
|
new_reader = []
|
|
new_custom = []
|
|
modified_reader = []
|
|
modified_custom = []
|
|
deleted_paths = []
|
|
unchanged = 0
|
|
|
|
for path_str, fpath in disk_files.items():
|
|
if path_str not in indexed:
|
|
# Check if it's a custom doc
|
|
if fpath.suffix.lower() in CUSTOM_EXTS:
|
|
matching = [d for d in custom_docs if d.metadata["file_path"] == path_str]
|
|
if matching:
|
|
new_custom.extend(matching)
|
|
else:
|
|
new_reader.append(fpath)
|
|
else:
|
|
info = indexed[path_str]
|
|
stat = fpath.stat()
|
|
disk_mdate = datetime.datetime.fromtimestamp(
|
|
stat.st_mtime, tz=datetime.timezone.utc
|
|
).strftime("%Y-%m-%d")
|
|
|
|
if stat.st_size != info["file_size"] or disk_mdate != info["last_modified_date"]:
|
|
if fpath.suffix.lower() in CUSTOM_EXTS:
|
|
matching = [d for d in custom_docs if d.metadata["file_path"] == path_str]
|
|
if matching:
|
|
modified_custom.extend(matching)
|
|
else:
|
|
modified_reader.append(fpath)
|
|
else:
|
|
unchanged += 1
|
|
|
|
for path_str in indexed:
|
|
if path_str not in disk_files:
|
|
deleted_paths.append(path_str)
|
|
|
|
n_new = len(new_reader) + len(new_custom)
|
|
n_modified = len(modified_reader) + len(modified_custom)
|
|
print(f"\n New: {n_new}")
|
|
print(f" Modified: {n_modified}")
|
|
print(f" Deleted: {len(deleted_paths)}")
|
|
print(f" Unchanged: {unchanged}")
|
|
|
|
if n_new == 0 and n_modified == 0 and len(deleted_paths) == 0:
|
|
print("\nNothing to do.")
|
|
return
|
|
|
|
# Delete chunks for removed and modified files
|
|
for path_str in deleted_paths:
|
|
ids = indexed[path_str]["ids"]
|
|
fname = Path(path_str).name
|
|
print(f" Removing {fname} ({len(ids)} chunks)")
|
|
collection.delete(ids=ids)
|
|
|
|
for fpath in modified_reader:
|
|
path_str = str(fpath)
|
|
ids = indexed[path_str]["ids"]
|
|
print(f" Re-indexing {fpath.name} ({len(ids)} chunks)")
|
|
collection.delete(ids=ids)
|
|
|
|
for doc in modified_custom:
|
|
path_str = doc.metadata["file_path"]
|
|
if path_str in indexed:
|
|
ids = indexed[path_str]["ids"]
|
|
print(f" Re-indexing {doc.metadata['file_name']} ({len(ids)} chunks)")
|
|
collection.delete(ids=ids)
|
|
|
|
# Add new and modified files
|
|
files_to_add = new_reader + modified_reader
|
|
docs_to_add = new_custom + modified_custom
|
|
|
|
if files_to_add or docs_to_add:
|
|
documents = load_all_documents(files_to_add, docs_to_add)
|
|
if documents:
|
|
print(f"Indexing {len(documents)} document(s)...")
|
|
vector_store = ChromaVectorStore(chroma_collection=collection)
|
|
storage_context = StorageContext.from_defaults(vector_store=vector_store)
|
|
|
|
VectorStoreIndex.from_documents(
|
|
documents,
|
|
storage_context=storage_context,
|
|
transformations=[get_text_splitter()],
|
|
show_progress=True,
|
|
)
|
|
|
|
print(f"\nIndex updated. Collection now has {collection.count()} vectors.")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Build or update the clippings vector store (ChromaDB)."
|
|
)
|
|
parser.add_argument(
|
|
"--rebuild",
|
|
action="store_true",
|
|
help="Full rebuild from scratch (default: incremental update)",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
# Configure embedding model (offline, cached in ./models)
|
|
embed_model = HuggingFaceEmbedding(
|
|
model_name=EMBED_MODEL_NAME,
|
|
cache_folder="./models",
|
|
local_files_only=True,
|
|
)
|
|
Settings.embed_model = embed_model
|
|
|
|
if not DATA_DIR.exists():
|
|
raise FileNotFoundError(
|
|
f"Clippings directory not found: {DATA_DIR.absolute()}\n"
|
|
f"Create symlink: ln -s ../clippings ./clippings"
|
|
)
|
|
|
|
start = time.time()
|
|
|
|
# Scan and classify files
|
|
print(f"Scanning {DATA_DIR.absolute()}...")
|
|
reader_files, custom_docs, skipped, ocr_needed = scan_clippings()
|
|
|
|
n_valid = len(reader_files) + len(custom_docs)
|
|
print(f"\nFiles to index: {n_valid}")
|
|
print(f" PDF/TXT: {len(reader_files)}")
|
|
print(f" Webarchive/RTF: {len(custom_docs)}")
|
|
print(f"Files skipped: {len(skipped)}")
|
|
for fpath, reason in skipped:
|
|
print(f" SKIP: {fpath.name} -- {reason}")
|
|
|
|
if ocr_needed:
|
|
write_ocr_list(ocr_needed)
|
|
|
|
if n_valid == 0:
|
|
raise ValueError("No valid files found to index")
|
|
|
|
if args.rebuild:
|
|
print("\nMode: full rebuild")
|
|
rebuild(reader_files, custom_docs)
|
|
else:
|
|
print("\nMode: incremental update")
|
|
if not Path(PERSIST_DIR).exists():
|
|
print(f"No existing index at {PERSIST_DIR}, doing full rebuild.")
|
|
rebuild(reader_files, custom_docs)
|
|
else:
|
|
update(reader_files, custom_docs)
|
|
|
|
elapsed = time.time() - start
|
|
print(f"Done in {elapsed:.1f}s")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|