ssearch/search_keywords.py
Eric e9fc99ddc6 Initial commit: RAG pipeline for semantic search over personal journal archive
Vector search with cross-encoder re-ranking, hybrid BM25+vector retrieval,
incremental index updates, and multiple LLM backends (Ollama local, OpenAI API).
2026-02-20 06:02:28 -05:00

189 lines
5.8 KiB
Python

# search_keywords.py
# Keyword search: extract terms from a query using POS tagging, then grep
# across journal files for matches.
#
# Complements the vector search pipeline by catching exact names, places,
# and dates that embeddings can miss. No vector store or LLM needed.
#
# Term extraction uses NLTK POS tagging to keep nouns (NN*), proper nouns
# (NNP*), and adjectives (JJ*) -- skipping stopwords and function words
# automatically. Consecutive proper nouns are joined into multi-word phrases
# (e.g., "Robert Wright" stays as one search term, not "robert" + "wright").
#
# E.M.F. February 2026
import os
import sys
import re
from pathlib import Path
import nltk
#
# Globals
#
DATA_DIR = Path("./data")
CONTEXT_LINES = 2 # lines of context around each match
MAX_MATCHES_PER_FILE = 3 # cap matches shown per file to avoid flooding
# POS tags to keep: nouns, proper nouns, adjectives
KEEP_TAGS = {"NN", "NNS", "NNP", "NNPS", "JJ", "JJS", "JJR"}
# Proper noun tags (consecutive runs are joined as phrases)
PROPER_NOUN_TAGS = {"NNP", "NNPS"}
# Minimum word length to keep (filters out short noise)
MIN_WORD_LEN = 3
def ensure_nltk_data():
"""Download NLTK data if not already present."""
for resource, name in [
("tokenizers/punkt_tab", "punkt_tab"),
("taggers/averaged_perceptron_tagger_eng", "averaged_perceptron_tagger_eng"),
]:
try:
nltk.data.find(resource)
except LookupError:
print(f"Downloading NLTK resource: {name}")
nltk.download(name, quiet=True)
def extract_terms(query):
"""Extract key terms from a query using POS tagging.
Tokenizes the query, runs POS tagging, and keeps nouns, proper nouns,
and adjectives. Consecutive proper nouns (NNP/NNPS) are joined into
multi-word phrases (e.g., "Robert Wright""robert wright").
Returns a list of terms (lowercase), phrases listed first.
"""
tokens = nltk.word_tokenize(query)
tagged = nltk.pos_tag(tokens)
phrases = [] # multi-word proper noun phrases
single_terms = [] # individual nouns/adjectives
proper_run = [] # accumulator for consecutive proper nouns
for word, tag in tagged:
if tag in PROPER_NOUN_TAGS:
proper_run.append(word)
else:
# Flush any accumulated proper noun run
if proper_run:
phrase = " ".join(proper_run).lower()
if len(phrase) >= MIN_WORD_LEN:
phrases.append(phrase)
proper_run = []
# Keep other nouns and adjectives as single terms
if tag in KEEP_TAGS and len(word) >= MIN_WORD_LEN:
single_terms.append(word.lower())
# Flush final proper noun run
if proper_run:
phrase = " ".join(proper_run).lower()
if len(phrase) >= MIN_WORD_LEN:
phrases.append(phrase)
# Phrases first (more specific), then single terms
all_terms = phrases + single_terms
return list(dict.fromkeys(all_terms)) # deduplicate, preserve order
def search_files(terms, data_dir, context_lines=CONTEXT_LINES):
"""Search all .txt files in data_dir for the given terms.
Returns a list of (file_path, match_count, matches) where matches is a
list of (line_number, context_block) tuples.
"""
if not terms:
return []
# Build a single regex pattern that matches any term (case-insensitive)
pattern = re.compile(
r"\b(" + "|".join(re.escape(t) for t in terms) + r")\b",
re.IGNORECASE
)
results = []
txt_files = sorted(data_dir.glob("*.txt"))
for fpath in txt_files:
try:
lines = fpath.read_text(encoding="utf-8").splitlines()
except (OSError, UnicodeDecodeError):
continue
matches = []
match_count = 0
seen_lines = set() # avoid overlapping context blocks
for i, line in enumerate(lines):
if pattern.search(line):
match_count += 1
if i in seen_lines:
continue
# Extract context window
start = max(0, i - context_lines)
end = min(len(lines), i + context_lines + 1)
block = []
for j in range(start, end):
seen_lines.add(j)
marker = ">>>" if j == i else " "
block.append(f" {marker} {j+1:4d}: {lines[j]}")
matches.append((i + 1, "\n".join(block)))
if match_count > 0:
results.append((fpath, match_count, matches))
# Sort by match count (most matches first)
results.sort(key=lambda x: x[1], reverse=True)
return results
def main():
if len(sys.argv) < 2:
print("Usage: python search_keywords.py QUERY_TEXT")
sys.exit(1)
ensure_nltk_data()
q = " ".join(sys.argv[1:])
# Extract terms
terms = extract_terms(q)
if not terms:
print(f"Query: {q}")
print("No searchable terms extracted. Try a more specific query.")
sys.exit(0)
print(f"Query: {q}")
print(f"Extracted terms: {', '.join(terms)}\n")
# Search
results = search_files(terms, DATA_DIR)
if not results:
print("No matches found.")
sys.exit(0)
# Summary
total_matches = sum(r[1] for r in results)
print(f"Found {total_matches} matches across {len(results)} files\n")
# Detailed output
for fpath, match_count, matches in results:
print("="*60)
print(f"--- {fpath.name} ({match_count} matches) ---")
print("="*60)
for line_num, block in matches[:MAX_MATCHES_PER_FILE]:
print(block)
print()
if len(matches) > MAX_MATCHES_PER_FILE:
print(f" ... and {len(matches) - MAX_MATCHES_PER_FILE} more matches\n")
if __name__ == "__main__":
main()