AdminAnalytics/src/admin_analytics/ipeds/finance.py
emfurst a766f6ff0d Add endowment spending distribution, move planning docs to private
- Add IPEDS F2H03C (spending distribution for current use) to endowment schema, loader, queries, and dashboard
- Endowment tab now shows spend rate alongside investment return rate
- Move planning docs to planning/ directory (gitignored)
- Update data dictionary

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 07:27:57 -04:00

208 lines
6.9 KiB
Python

from pathlib import Path
import duckdb
import polars as pl
from admin_analytics import config
from admin_analytics.config import UD_UNITID
# F1A (GASB/public) column mappings — first match wins per canonical column.
F1A_COLUMN_VARIANTS = {
"unitid": ["UNITID"],
"total_expenses": ["F1C191", "F1D02"],
"instruction_expenses": ["F1C011"],
"research_expenses": ["F1C021"],
"public_service_expenses": ["F1C031"],
"academic_support_expenses": ["F1C051"],
"student_services_expenses": ["F1C061"],
"institutional_support_expenses": ["F1C071"],
"auxiliary_expenses": ["F1C111"],
"hospital_expenses": ["F1C121"],
"other_expenses": ["F1C141"],
"salaries_wages": ["F1C192"],
"benefits": ["F1C193"],
}
# F2 (FASB/private-style) column mappings — UD reports here despite being public.
F2_COLUMN_VARIANTS = {
"unitid": ["UNITID"],
"total_expenses": ["F2E131"],
"instruction_expenses": ["F2E011"],
"research_expenses": ["F2E021"],
"public_service_expenses": ["F2E031"],
"academic_support_expenses": ["F2E041"],
"student_services_expenses": ["F2E051"],
"institutional_support_expenses": ["F2E061"],
"auxiliary_expenses": ["F2E071"],
"hospital_expenses": ["F2E081"],
"other_expenses": ["F2E121"],
"salaries_wages": ["F2E132"],
"benefits": ["F2E133"],
}
# F2 endowment / philanthropy fields
F2_ENDOWMENT_VARIANTS = {
"unitid": ["UNITID"],
"endowment_boy": ["F2H01"],
"endowment_eoy": ["F2H02"],
"new_gifts": ["F2H03A"],
"net_investment_return": ["F2H03B"],
"spending_distribution": ["F2H03C"],
"other_changes": ["F2H03D"],
"total_private_gifts": ["F2D08"],
"total_investment_return": ["F2D10"],
"long_term_investments": ["F2A01"],
}
ENDOWMENT_COLUMNS = [
"unitid", "year", "endowment_boy", "endowment_eoy", "new_gifts",
"net_investment_return", "spending_distribution", "other_changes", "total_private_gifts",
"total_investment_return", "long_term_investments",
]
CANONICAL_COLUMNS = [
"unitid", "year", "reporting_standard", "total_expenses",
"instruction_expenses", "research_expenses", "public_service_expenses",
"academic_support_expenses", "student_services_expenses",
"institutional_support_expenses", "auxiliary_expenses", "hospital_expenses",
"other_expenses", "salaries_wages", "benefits",
]
def _find_csv(component_dir: Path) -> Path | None:
csvs = [f for f in component_dir.glob("*.csv") if "dict" not in f.stem.lower()]
return csvs[0] if csvs else None
def _resolve_columns(df: pl.DataFrame, variants: dict) -> dict[str, str]:
"""For each canonical name, find the first matching column."""
upper_cols = {c.strip().upper(): c for c in df.columns}
resolved = {}
for canonical, candidates in variants.items():
for var in candidates:
if var in upper_cols:
resolved[canonical] = upper_cols[var]
break
return resolved
def _load_file(
csv_path: Path,
year: int,
variants: dict,
reporting_standard: str,
conn: duckdb.DuckDBPyConnection,
unitid_filter: int | None,
) -> int:
"""Load a single finance CSV into raw_ipeds_finance."""
df = pl.read_csv(csv_path, infer_schema_length=0, encoding="utf8-lossy")
col_map = _resolve_columns(df, variants)
if "unitid" not in col_map:
return 0
# Build dataframe with canonical columns
result = pl.DataFrame({
canonical: df[actual] for canonical, actual in col_map.items()
})
result = result.with_columns(
pl.lit(year).alias("year"),
pl.lit(reporting_standard).alias("reporting_standard"),
)
# Cast numeric columns
for col in CANONICAL_COLUMNS:
if col not in result.columns:
result = result.with_columns(pl.lit(None).alias(col))
elif col not in ("reporting_standard",):
result = result.with_columns(pl.col(col).cast(pl.Int64, strict=False))
if unitid_filter is not None:
result = result.filter(pl.col("unitid") == unitid_filter)
if result.height == 0:
return 0
result = result.select(CANONICAL_COLUMNS)
conn.register("_tmp_finance", result.to_arrow())
conn.execute("INSERT INTO raw_ipeds_finance SELECT * FROM _tmp_finance")
conn.unregister("_tmp_finance")
return result.height
def load_finance(
conn: duckdb.DuckDBPyConnection,
year_range: range,
unitid_filter: int | None = UD_UNITID,
) -> int:
"""Load IPEDS finance data into raw_ipeds_finance.
Tries both F1A (GASB) and F2 (FASB) files since some public institutions
like UD report under FASB.
"""
total = 0
for year in year_range:
conn.execute("DELETE FROM raw_ipeds_finance WHERE year = ?", [year])
# Try F1A (GASB)
f1a_dir = config.IPEDS_DATA_DIR / "finance" / str(year)
csv_path = _find_csv(f1a_dir)
if csv_path:
total += _load_file(csv_path, year, F1A_COLUMN_VARIANTS, "GASB", conn, unitid_filter)
# Try F2 (FASB) — needed for institutions like UD
f2_dir = config.IPEDS_DATA_DIR / "finance_f2" / str(year)
csv_path = _find_csv(f2_dir)
if csv_path:
total += _load_file(csv_path, year, F2_COLUMN_VARIANTS, "FASB", conn, unitid_filter)
if total == 0 and not _find_csv(f1a_dir) and not _find_csv(f2_dir):
print(f" No finance CSV found for {year}, skipping")
return total
def load_endowment(
conn: duckdb.DuckDBPyConnection,
year_range: range,
unitid_filter: int | None = UD_UNITID,
) -> int:
"""Load IPEDS F2 endowment and philanthropy data into raw_ipeds_endowment."""
total = 0
for year in year_range:
f2_dir = config.IPEDS_DATA_DIR / "finance_f2" / str(year)
csv_path = _find_csv(f2_dir)
if csv_path is None:
continue
df = pl.read_csv(csv_path, infer_schema_length=0, encoding="utf8-lossy")
col_map = _resolve_columns(df, F2_ENDOWMENT_VARIANTS)
if "unitid" not in col_map:
continue
result = pl.DataFrame({
canonical: df[actual] for canonical, actual in col_map.items()
})
result = result.with_columns(pl.lit(year).alias("year"))
for col in ENDOWMENT_COLUMNS:
if col not in result.columns:
result = result.with_columns(pl.lit(None).alias(col))
elif col not in ("year",):
result = result.with_columns(pl.col(col).cast(pl.Int64, strict=False))
if unitid_filter is not None:
result = result.filter(pl.col("unitid") == unitid_filter)
if result.height == 0:
continue
result = result.select(ENDOWMENT_COLUMNS)
conn.execute("DELETE FROM raw_ipeds_endowment WHERE year = ?", [year])
conn.register("_tmp_endow", result.to_arrow())
conn.execute("INSERT INTO raw_ipeds_endowment SELECT * FROM _tmp_endow")
conn.unregister("_tmp_endow")
total += result.height
return total