"""Dashboard query layer — all DuckDB queries returning polars DataFrames.""" from typing import Any import duckdb import polars as pl from admin_analytics.config import UD_UNITID from admin_analytics.irs990.titles import normalize_title # Shared CTE for CPI adjustment _CPI_CTE = """ WITH annual_cpi AS ( SELECT year, AVG(value) AS avg_cpi FROM raw_cpi_u GROUP BY year ), latest_cpi AS ( SELECT avg_cpi FROM annual_cpi WHERE year = (SELECT MAX(year) FROM annual_cpi) ) """ def query_admin_cost_ratio(conn: duckdb.DuckDBPyConnection) -> pl.DataFrame: """Admin cost ratio trend with CPI-adjusted values.""" return conn.execute(f""" {_CPI_CTE} SELECT f.year, f.institutional_support_expenses, f.total_expenses, ROUND(f.institutional_support_expenses * 100.0 / NULLIF(f.total_expenses, 0), 2) AS admin_cost_pct, ROUND(f.institutional_support_expenses * (SELECT avg_cpi FROM latest_cpi) / ac.avg_cpi, 0) AS inst_support_cpi_adjusted, ROUND(f.total_expenses * (SELECT avg_cpi FROM latest_cpi) / ac.avg_cpi, 0) AS total_expenses_cpi_adjusted FROM raw_ipeds_finance f LEFT JOIN annual_cpi ac ON ac.year = f.year WHERE f.unitid = ? ORDER BY f.year """, [UD_UNITID]).pl() def query_expense_breakdown(conn: duckdb.DuckDBPyConnection) -> pl.DataFrame: """Expense breakdown by function over time.""" return conn.execute(""" SELECT year, instruction_expenses, research_expenses, public_service_expenses, academic_support_expenses, student_services_expenses, institutional_support_expenses, auxiliary_expenses, hospital_expenses, other_expenses FROM raw_ipeds_finance WHERE unitid = ? ORDER BY year """, [UD_UNITID]).pl() def query_admin_per_student(conn: duckdb.DuckDBPyConnection) -> pl.DataFrame: """Admin cost per student (nominal and CPI-adjusted).""" return conn.execute(f""" {_CPI_CTE} SELECT f.year, f.institutional_support_expenses, e.total_enrollment, ROUND(f.institutional_support_expenses * 1.0 / NULLIF(e.total_enrollment, 0), 0) AS admin_per_student, ROUND( (f.institutional_support_expenses * (SELECT avg_cpi FROM latest_cpi) / ac.avg_cpi) / NULLIF(e.total_enrollment, 0), 0 ) AS admin_per_student_cpi FROM raw_ipeds_finance f JOIN raw_ipeds_enrollment e ON e.unitid = f.unitid AND e.year = f.year LEFT JOIN annual_cpi ac ON ac.year = f.year WHERE f.unitid = ? ORDER BY f.year """, [UD_UNITID]).pl() def query_admin_faculty_ratio(conn: duckdb.DuckDBPyConnection) -> pl.DataFrame: """Admin-to-faculty ratio over time.""" return conn.execute(""" SELECT year, management_total, faculty_total, ROUND(management_total * 1.0 / NULLIF(faculty_total, 0), 3) AS admin_faculty_ratio FROM raw_ipeds_staff WHERE unitid = ? ORDER BY year """, [UD_UNITID]).pl() def query_aggregate_comp(conn: duckdb.DuckDBPyConnection) -> pl.DataFrame: """Top-10 Schedule J compensation per year — total, count, and average.""" return conn.execute(""" WITH ranked AS ( SELECT j.tax_year, j.total_compensation, j.base_compensation, j.bonus_compensation, j.deferred_compensation, j.nontaxable_benefits, j.other_compensation, ROW_NUMBER() OVER (PARTITION BY j.tax_year ORDER BY j.total_compensation DESC) AS rn FROM raw_990_schedule_j j WHERE j.total_compensation > 0 ) SELECT tax_year, COUNT(*) AS headcount, SUM(total_compensation) AS total_comp, ROUND(AVG(total_compensation), 0) AS avg_comp, SUM(base_compensation) AS total_base, SUM(bonus_compensation) AS total_bonus, SUM(deferred_compensation) AS total_deferred, SUM(nontaxable_benefits) AS total_benefits, SUM(other_compensation) AS total_other FROM ranked WHERE rn <= 10 GROUP BY tax_year ORDER BY tax_year """).pl() def query_aggregate_comp_cagr(conn: duckdb.DuckDBPyConnection) -> dict | None: """CAGR of aggregate Schedule J compensation over the last 5 years of data.""" df = query_aggregate_comp(conn) if df.height < 2: return None # Use last 5 years of available data df = df.tail(min(5, df.height)) start_year = df["tax_year"][0] end_year = df["tax_year"][-1] start_comp = float(df["total_comp"][0]) end_comp = float(df["total_comp"][-1]) n_years = end_year - start_year if n_years <= 0 or start_comp <= 0: return None cagr = ((end_comp / start_comp) ** (1.0 / n_years) - 1) * 100 return { "cagr_pct": round(cagr, 1), "start_year": start_year, "end_year": end_year, "start_comp": int(end_comp), "end_comp": int(end_comp), } def query_comp_cagr(conn: duckdb.DuckDBPyConnection) -> dict | None: """Annualized growth rate (CAGR) of President compensation. Tracks the President role specifically using title normalization. Returns dict with cagr_pct, start_year, end_year, start_comp, end_comp, or None if insufficient data. """ raw = conn.execute(""" SELECT j.tax_year, j.title, j.total_compensation FROM raw_990_schedule_j j WHERE j.total_compensation > 0 ORDER BY j.tax_year """).pl() if raw.height == 0: return None raw = raw.with_columns( pl.col("title").map_elements( normalize_title, return_dtype=pl.Utf8 ).alias("role") ) df = ( raw.filter(pl.col("role") == "PRESIDENT") .group_by("tax_year") .agg(pl.col("total_compensation").max().alias("top_comp")) .sort("tax_year") ) if df.height < 2: return None start_year = df["tax_year"][0] end_year = df["tax_year"][-1] start_comp = df["top_comp"][0] end_comp = df["top_comp"][-1] n_years = end_year - start_year if n_years <= 0 or start_comp <= 0: return None cagr = ((end_comp / start_comp) ** (1.0 / n_years) - 1) * 100 return { "cagr_pct": round(cagr, 1), "start_year": start_year, "end_year": end_year, "start_comp": start_comp, "end_comp": end_comp, } def query_top_earners( conn: duckdb.DuckDBPyConnection, year: int | None = None ) -> pl.DataFrame: """Top earners from Schedule J, optionally filtered by year.""" where = "WHERE j.total_compensation > 0" params: list[Any] = [] if year is not None: where += " AND j.tax_year = ?" params.append(year) df = conn.execute(f""" SELECT j.tax_year, j.person_name, j.title, j.base_compensation, j.bonus_compensation, j.other_compensation, j.deferred_compensation, j.nontaxable_benefits, j.total_compensation, f.organization_name FROM raw_990_schedule_j j JOIN raw_990_filing f ON f.object_id = j.object_id {where} ORDER BY j.tax_year DESC, j.total_compensation DESC """, params).pl() if df.height > 0: df = df.with_columns( pl.col("title").map_elements( normalize_title, return_dtype=pl.Utf8 ).alias("canonical_role") ) return df def query_comp_by_role(conn: duckdb.DuckDBPyConnection) -> pl.DataFrame: """Compensation trends by canonical role across years.""" df = conn.execute(""" SELECT j.tax_year, j.person_name, j.title, j.total_compensation FROM raw_990_schedule_j j JOIN raw_990_filing f ON f.object_id = j.object_id WHERE j.total_compensation > 0 ORDER BY j.tax_year, j.total_compensation DESC """).pl() if df.height == 0: return df df = df.with_columns( pl.col("title").map_elements( normalize_title, return_dtype=pl.Utf8 ).alias("canonical_role") ) # Keep highest-paid person per role per year return ( df.sort("total_compensation", descending=True) .group_by(["tax_year", "canonical_role"]) .first() .sort(["tax_year", "canonical_role"]) ) def query_comp_vs_cpi(conn: duckdb.DuckDBPyConnection) -> pl.DataFrame: """Compensation growth vs CPI growth, indexed to first available year = 100. Includes top earner, top-10 aggregate, and CPI-U. """ return conn.execute(""" WITH ranked AS ( SELECT tax_year, total_compensation, ROW_NUMBER() OVER (PARTITION BY tax_year ORDER BY total_compensation DESC) AS rn FROM raw_990_schedule_j WHERE total_compensation > 0 ), yearly_comp AS ( SELECT tax_year, MAX(total_compensation) AS top_comp, SUM(CASE WHEN rn <= 10 THEN total_compensation END) AS agg_comp FROM ranked GROUP BY tax_year ), annual_cpi AS ( SELECT year, AVG(value) AS avg_cpi FROM raw_cpi_u GROUP BY year ), base AS ( SELECT c.top_comp AS base_top, c.agg_comp AS base_agg, ac.avg_cpi AS base_cpi FROM yearly_comp c JOIN annual_cpi ac ON ac.year = c.tax_year ORDER BY c.tax_year LIMIT 1 ) SELECT c.tax_year AS year, c.top_comp, c.agg_comp, ac.avg_cpi, ROUND(c.top_comp * 100.0 / NULLIF((SELECT base_top FROM base), 0), 1) AS comp_index, ROUND(c.agg_comp * 100.0 / NULLIF((SELECT base_agg FROM base), 0), 1) AS agg_index, ROUND(ac.avg_cpi * 100.0 / NULLIF((SELECT base_cpi FROM base), 0), 1) AS cpi_index FROM yearly_comp c JOIN annual_cpi ac ON ac.year = c.tax_year ORDER BY year """).pl() def query_staff_composition(conn: duckdb.DuckDBPyConnection) -> pl.DataFrame: """Staff composition over time.""" return conn.execute(""" SELECT year, total_staff, faculty_total, management_total, total_staff - COALESCE(faculty_total, 0) - COALESCE(management_total, 0) AS other_staff FROM raw_ipeds_staff WHERE unitid = ? ORDER BY year """, [UD_UNITID]).pl() def query_student_staff_ratios(conn: duckdb.DuckDBPyConnection) -> pl.DataFrame: """Student-to-staff and student-to-faculty ratios.""" return conn.execute(""" SELECT s.year, e.total_enrollment, s.total_staff, s.faculty_total, ROUND(e.total_enrollment * 1.0 / NULLIF(s.total_staff, 0), 1) AS students_per_staff, ROUND(e.total_enrollment * 1.0 / NULLIF(s.faculty_total, 0), 1) AS students_per_faculty FROM raw_ipeds_staff s JOIN raw_ipeds_enrollment e ON e.unitid = s.unitid AND e.year = s.year WHERE s.unitid = ? ORDER BY s.year """, [UD_UNITID]).pl() def query_growth_index(conn: duckdb.DuckDBPyConnection) -> pl.DataFrame: """Management, faculty, and enrollment growth, indexed to first year = 100.""" return conn.execute(""" WITH base AS ( SELECT s.management_total AS base_mgmt, s.faculty_total AS base_fac, e.total_enrollment AS base_enrl FROM raw_ipeds_staff s JOIN raw_ipeds_enrollment e ON e.unitid = s.unitid AND e.year = s.year WHERE s.unitid = ? ORDER BY s.year LIMIT 1 ) SELECT s.year, s.management_total, s.faculty_total, e.total_enrollment, ROUND(s.management_total * 100.0 / NULLIF((SELECT base_mgmt FROM base), 0), 1) AS mgmt_index, ROUND(s.faculty_total * 100.0 / NULLIF((SELECT base_fac FROM base), 0), 1) AS faculty_index, ROUND(e.total_enrollment * 100.0 / NULLIF((SELECT base_enrl FROM base), 0), 1) AS enrollment_index FROM raw_ipeds_staff s JOIN raw_ipeds_enrollment e ON e.unitid = s.unitid AND e.year = s.year WHERE s.unitid = ? ORDER BY s.year """, [UD_UNITID, UD_UNITID]).pl() def query_endowment(conn: duckdb.DuckDBPyConnection) -> pl.DataFrame: """Endowment performance over time.""" return conn.execute(""" SELECT year, endowment_boy, endowment_eoy, new_gifts, net_investment_return, other_changes, long_term_investments FROM raw_ipeds_endowment WHERE unitid = ? ORDER BY year """, [UD_UNITID]).pl() def query_endowment_per_student(conn: duckdb.DuckDBPyConnection) -> pl.DataFrame: """Endowment value per student over time.""" return conn.execute(""" SELECT e.year, e.endowment_eoy, en.total_enrollment, ROUND(e.endowment_eoy * 1.0 / NULLIF(en.total_enrollment, 0), 0) AS endowment_per_student FROM raw_ipeds_endowment e JOIN raw_ipeds_enrollment en ON en.unitid = e.unitid AND en.year = e.year WHERE e.unitid = ? ORDER BY e.year """, [UD_UNITID]).pl() def query_cio_vs_endowment(conn: duckdb.DuckDBPyConnection) -> pl.DataFrame: """Chief Investment Officer compensation vs endowment growth, indexed.""" raw = conn.execute(""" SELECT j.tax_year, j.title, j.total_compensation FROM raw_990_schedule_j j WHERE j.total_compensation > 0 """).pl() if raw.height == 0: return pl.DataFrame() raw = raw.with_columns( pl.col("title").map_elements( normalize_title, return_dtype=pl.Utf8 ).alias("role") ) cio = ( raw.filter(pl.col("role") == "CHIEF_INVESTMENT_OFFICER") .group_by("tax_year") .agg(pl.col("total_compensation").max().alias("cio_comp")) .sort("tax_year") ) if cio.height == 0: return pl.DataFrame() endow = conn.execute(""" SELECT year, endowment_eoy FROM raw_ipeds_endowment WHERE unitid = ? ORDER BY year """, [UD_UNITID]).pl() merged = ( cio.join(endow, left_on="tax_year", right_on="year", how="inner") .drop_nulls(subset=["cio_comp", "endowment_eoy"]) .sort("tax_year") ) if merged.height < 2: return merged base_comp = float(merged["cio_comp"][0]) base_endow = float(merged["endowment_eoy"][0]) merged = merged.with_columns( (pl.col("cio_comp").cast(pl.Float64) * 100.0 / base_comp).round(1).alias("cio_index"), (pl.col("endowment_eoy").cast(pl.Float64) * 100.0 / base_endow).round(1).alias("endowment_index"), ) return merged def query_philanthropy(conn: duckdb.DuckDBPyConnection) -> pl.DataFrame: """Philanthropic giving over time — IPEDS private gifts + 990 revenue.""" return conn.execute(f""" {_CPI_CTE} SELECT e.year, e.total_private_gifts, e.new_gifts AS endowment_gifts, ROUND(e.total_private_gifts * (SELECT avg_cpi FROM latest_cpi) / ac.avg_cpi, 0) AS gifts_cpi_adjusted FROM raw_ipeds_endowment e LEFT JOIN annual_cpi ac ON ac.year = e.year WHERE e.unitid = ? ORDER BY e.year """, [UD_UNITID]).pl() def query_comp_vs_philanthropy(conn: duckdb.DuckDBPyConnection) -> pl.DataFrame: """VP Advancement and President comp vs philanthropic gifts, indexed.""" raw = conn.execute(""" SELECT j.tax_year, j.title, j.total_compensation FROM raw_990_schedule_j j WHERE j.total_compensation > 0 """).pl() if raw.height == 0: return pl.DataFrame() raw = raw.with_columns( pl.col("title").map_elements( normalize_title, return_dtype=pl.Utf8 ).alias("role") ) # Get max comp per role per year for President and VP Advancement roles = raw.filter(pl.col("role").is_in(["PRESIDENT", "VP_ADVANCEMENT"])) if roles.height == 0: return pl.DataFrame() pivoted = ( roles.group_by(["tax_year", "role"]) .agg(pl.col("total_compensation").max().alias("comp")) .sort("tax_year") ) pres = ( pivoted.filter(pl.col("role") == "PRESIDENT") .select(pl.col("tax_year"), pl.col("comp").alias("president_comp")) ) vp = ( pivoted.filter(pl.col("role") == "VP_ADVANCEMENT") .select(pl.col("tax_year"), pl.col("comp").alias("vp_adv_comp")) ) gifts = conn.execute(""" SELECT year, total_private_gifts FROM raw_ipeds_endowment WHERE unitid = ? ORDER BY year """, [UD_UNITID]).pl() # Join all three on year merged = ( pres.join(vp, on="tax_year", how="outer_coalesce") .join(gifts, left_on="tax_year", right_on="year", how="inner") .drop_nulls(subset=["total_private_gifts"]) .sort("tax_year") ) if merged.height < 2: return merged base_pres = float(merged.drop_nulls("president_comp")["president_comp"][0]) base_vp = float(merged.drop_nulls("vp_adv_comp")["vp_adv_comp"][0]) base_gifts = float(merged["total_private_gifts"][0]) merged = merged.with_columns( (pl.col("president_comp").cast(pl.Float64) * 100.0 / base_pres).round(1).alias("president_index"), (pl.col("vp_adv_comp").cast(pl.Float64) * 100.0 / base_vp).round(1).alias("vp_adv_index"), (pl.col("total_private_gifts").cast(pl.Float64) * 100.0 / base_gifts).round(1).alias("gifts_index"), ) return merged def query_admin_headcount(conn: duckdb.DuckDBPyConnection) -> pl.DataFrame: """All scraped admin headcount entries.""" return conn.execute(""" SELECT unit, person_name, title, category, is_overhead, scrape_date FROM raw_admin_headcount ORDER BY unit, category, person_name """).pl() def query_headcount_summary(conn: duckdb.DuckDBPyConnection) -> pl.DataFrame: """Headcount summary by unit and category.""" return conn.execute(""" SELECT unit, category, is_overhead, COUNT(*) AS count FROM raw_admin_headcount GROUP BY unit, category, is_overhead ORDER BY unit, count DESC """).pl()