Phase 1 project prototype

This commit is contained in:
emfurst 2026-03-30 19:29:33 -04:00
commit 2c9ae1c312
29 changed files with 2967 additions and 22 deletions

View file

@ -0,0 +1,263 @@
"""Dashboard query layer — all DuckDB queries returning polars DataFrames."""
from typing import Any
import duckdb
import polars as pl
from admin_analytics.config import UD_UNITID
from admin_analytics.irs990.titles import normalize_title
# Shared CTE for CPI adjustment
_CPI_CTE = """
WITH annual_cpi AS (
SELECT year, AVG(value) AS avg_cpi
FROM raw_cpi_u
GROUP BY year
),
latest_cpi AS (
SELECT avg_cpi FROM annual_cpi
WHERE year = (SELECT MAX(year) FROM annual_cpi)
)
"""
def query_admin_cost_ratio(conn: duckdb.DuckDBPyConnection) -> pl.DataFrame:
"""Admin cost ratio trend with CPI-adjusted values."""
return conn.execute(f"""
{_CPI_CTE}
SELECT
f.year,
f.institutional_support_expenses,
f.total_expenses,
ROUND(f.institutional_support_expenses * 100.0
/ NULLIF(f.total_expenses, 0), 2) AS admin_cost_pct,
ROUND(f.institutional_support_expenses
* (SELECT avg_cpi FROM latest_cpi) / ac.avg_cpi, 0)
AS inst_support_cpi_adjusted,
ROUND(f.total_expenses
* (SELECT avg_cpi FROM latest_cpi) / ac.avg_cpi, 0)
AS total_expenses_cpi_adjusted
FROM raw_ipeds_finance f
LEFT JOIN annual_cpi ac ON ac.year = f.year
WHERE f.unitid = ?
ORDER BY f.year
""", [UD_UNITID]).pl()
def query_expense_breakdown(conn: duckdb.DuckDBPyConnection) -> pl.DataFrame:
"""Expense breakdown by function over time."""
return conn.execute("""
SELECT year,
instruction_expenses, research_expenses, public_service_expenses,
academic_support_expenses, student_services_expenses,
institutional_support_expenses, auxiliary_expenses,
hospital_expenses, other_expenses
FROM raw_ipeds_finance
WHERE unitid = ?
ORDER BY year
""", [UD_UNITID]).pl()
def query_admin_per_student(conn: duckdb.DuckDBPyConnection) -> pl.DataFrame:
"""Admin cost per student (nominal and CPI-adjusted)."""
return conn.execute(f"""
{_CPI_CTE}
SELECT
f.year,
f.institutional_support_expenses,
e.total_enrollment,
ROUND(f.institutional_support_expenses * 1.0
/ NULLIF(e.total_enrollment, 0), 0) AS admin_per_student,
ROUND(
(f.institutional_support_expenses
* (SELECT avg_cpi FROM latest_cpi) / ac.avg_cpi)
/ NULLIF(e.total_enrollment, 0), 0
) AS admin_per_student_cpi
FROM raw_ipeds_finance f
JOIN raw_ipeds_enrollment e ON e.unitid = f.unitid AND e.year = f.year
LEFT JOIN annual_cpi ac ON ac.year = f.year
WHERE f.unitid = ?
ORDER BY f.year
""", [UD_UNITID]).pl()
def query_admin_faculty_ratio(conn: duckdb.DuckDBPyConnection) -> pl.DataFrame:
"""Admin-to-faculty ratio over time."""
return conn.execute("""
SELECT year,
management_total,
faculty_total,
ROUND(management_total * 1.0 / NULLIF(faculty_total, 0), 3)
AS admin_faculty_ratio
FROM raw_ipeds_staff
WHERE unitid = ?
ORDER BY year
""", [UD_UNITID]).pl()
def query_top_earners(
conn: duckdb.DuckDBPyConnection, year: int | None = None
) -> pl.DataFrame:
"""Top earners from Schedule J, optionally filtered by year."""
where = "WHERE j.total_compensation > 0"
params: list[Any] = []
if year is not None:
where += " AND j.tax_year = ?"
params.append(year)
df = conn.execute(f"""
SELECT
j.tax_year,
j.person_name,
j.title,
j.base_compensation,
j.bonus_compensation,
j.other_compensation,
j.deferred_compensation,
j.nontaxable_benefits,
j.total_compensation,
f.organization_name
FROM raw_990_schedule_j j
JOIN raw_990_filing f ON f.object_id = j.object_id
{where}
ORDER BY j.tax_year DESC, j.total_compensation DESC
""", params).pl()
if df.height > 0:
df = df.with_columns(
pl.col("title").map_elements(
normalize_title, return_dtype=pl.Utf8
).alias("canonical_role")
)
return df
def query_comp_by_role(conn: duckdb.DuckDBPyConnection) -> pl.DataFrame:
"""Compensation trends by canonical role across years."""
df = conn.execute("""
SELECT j.tax_year, j.person_name, j.title, j.total_compensation
FROM raw_990_schedule_j j
JOIN raw_990_filing f ON f.object_id = j.object_id
WHERE j.total_compensation > 0
ORDER BY j.tax_year, j.total_compensation DESC
""").pl()
if df.height == 0:
return df
df = df.with_columns(
pl.col("title").map_elements(
normalize_title, return_dtype=pl.Utf8
).alias("canonical_role")
)
# Keep highest-paid person per role per year
return (
df.sort("total_compensation", descending=True)
.group_by(["tax_year", "canonical_role"])
.first()
.sort(["tax_year", "canonical_role"])
)
def query_comp_vs_cpi(conn: duckdb.DuckDBPyConnection) -> pl.DataFrame:
"""Compensation growth vs CPI growth, indexed to first available year = 100."""
return conn.execute("""
WITH yearly_max_comp AS (
SELECT tax_year, MAX(total_compensation) AS top_comp
FROM raw_990_schedule_j
GROUP BY tax_year
),
annual_cpi AS (
SELECT year, AVG(value) AS avg_cpi
FROM raw_cpi_u GROUP BY year
),
base AS (
SELECT c.top_comp AS base_comp, ac.avg_cpi AS base_cpi
FROM yearly_max_comp c
JOIN annual_cpi ac ON ac.year = c.tax_year
ORDER BY c.tax_year LIMIT 1
)
SELECT
c.tax_year AS year,
c.top_comp,
ac.avg_cpi,
ROUND(c.top_comp * 100.0 / NULLIF((SELECT base_comp FROM base), 0), 1)
AS comp_index,
ROUND(ac.avg_cpi * 100.0 / NULLIF((SELECT base_cpi FROM base), 0), 1)
AS cpi_index
FROM yearly_max_comp c
JOIN annual_cpi ac ON ac.year = c.tax_year
ORDER BY year
""").pl()
def query_staff_composition(conn: duckdb.DuckDBPyConnection) -> pl.DataFrame:
"""Staff composition over time."""
return conn.execute("""
SELECT year, total_staff, faculty_total, management_total,
total_staff - COALESCE(faculty_total, 0) - COALESCE(management_total, 0)
AS other_staff
FROM raw_ipeds_staff
WHERE unitid = ?
ORDER BY year
""", [UD_UNITID]).pl()
def query_student_staff_ratios(conn: duckdb.DuckDBPyConnection) -> pl.DataFrame:
"""Student-to-staff and student-to-faculty ratios."""
return conn.execute("""
SELECT s.year, e.total_enrollment, s.total_staff, s.faculty_total,
ROUND(e.total_enrollment * 1.0 / NULLIF(s.total_staff, 0), 1)
AS students_per_staff,
ROUND(e.total_enrollment * 1.0 / NULLIF(s.faculty_total, 0), 1)
AS students_per_faculty
FROM raw_ipeds_staff s
JOIN raw_ipeds_enrollment e ON e.unitid = s.unitid AND e.year = s.year
WHERE s.unitid = ?
ORDER BY s.year
""", [UD_UNITID]).pl()
def query_growth_index(conn: duckdb.DuckDBPyConnection) -> pl.DataFrame:
"""Management vs enrollment growth, indexed to first year = 100."""
return conn.execute("""
WITH base AS (
SELECT s.management_total AS base_mgmt, e.total_enrollment AS base_enrl
FROM raw_ipeds_staff s
JOIN raw_ipeds_enrollment e ON e.unitid = s.unitid AND e.year = s.year
WHERE s.unitid = ?
ORDER BY s.year LIMIT 1
)
SELECT s.year,
s.management_total,
e.total_enrollment,
ROUND(s.management_total * 100.0
/ NULLIF((SELECT base_mgmt FROM base), 0), 1) AS mgmt_index,
ROUND(e.total_enrollment * 100.0
/ NULLIF((SELECT base_enrl FROM base), 0), 1) AS enrollment_index
FROM raw_ipeds_staff s
JOIN raw_ipeds_enrollment e ON e.unitid = s.unitid AND e.year = s.year
WHERE s.unitid = ?
ORDER BY s.year
""", [UD_UNITID, UD_UNITID]).pl()
def query_admin_headcount(conn: duckdb.DuckDBPyConnection) -> pl.DataFrame:
"""All scraped admin headcount entries."""
return conn.execute("""
SELECT unit, person_name, title, category, is_overhead, scrape_date
FROM raw_admin_headcount
ORDER BY unit, category, person_name
""").pl()
def query_headcount_summary(conn: duckdb.DuckDBPyConnection) -> pl.DataFrame:
"""Headcount summary by unit and category."""
return conn.execute("""
SELECT unit, category, is_overhead, COUNT(*) AS count
FROM raw_admin_headcount
GROUP BY unit, category, is_overhead
ORDER BY unit, count DESC
""").pl()