from pathlib import Path import duckdb import polars as pl from admin_analytics import config from admin_analytics.config import UD_UNITID # HD column mappings: canonical name → list of IPEDS varnames (priority order). # The first match found in the file's columns wins. HD_COLUMN_VARIANTS = { "unitid": ["UNITID"], "institution_name": ["INSTNM"], "city": ["CITY"], "state": ["STABBR"], "sector": ["SECTOR"], "control": ["CONTROL"], "ein": ["EIN"], "carnegie_class": ["C21BASIC", "C18BASIC", "C15BASIC", "CCBASIC", "CARNEGIE"], "enrollment_total": ["EFYTOTLT"], } CANONICAL_COLUMNS = [ "unitid", "year", "ein", "institution_name", "city", "state", "sector", "control", "carnegie_class", "enrollment_total", ] def _find_csv(component_dir: Path) -> Path | None: """Find the main data CSV in an extracted IPEDS directory.""" csvs = [f for f in component_dir.glob("*.csv") if "dict" not in f.stem.lower()] return csvs[0] if csvs else None def _resolve_columns(df: pl.DataFrame) -> dict[str, str]: """For each canonical name, find the first matching IPEDS column.""" upper_cols = {c.upper(): c for c in df.columns} resolved = {} for canonical, variants in HD_COLUMN_VARIANTS.items(): for var in variants: if var in upper_cols: resolved[canonical] = upper_cols[var] break return resolved def _map_columns(df: pl.DataFrame, year: int) -> pl.DataFrame: """Rename IPEDS columns to canonical names and add year.""" col_map = _resolve_columns(df) # Select and rename only the resolved columns select_exprs = [pl.col(actual).alias(canonical) for canonical, actual in col_map.items()] df = pl.DataFrame({canonical: df[actual] for canonical, actual in col_map.items()}) df = df.with_columns(pl.lit(year).alias("year")) # Ensure all canonical columns exist for col in CANONICAL_COLUMNS: if col not in df.columns: df = df.with_columns(pl.lit(None).alias(col)) return df.select(CANONICAL_COLUMNS) def load_institutions( conn: duckdb.DuckDBPyConnection, year_range: range, unitid_filter: int | None = UD_UNITID, ) -> int: """Load IPEDS HD data into raw_institution. Returns rows loaded.""" total = 0 for year in year_range: data_dir = config.IPEDS_DATA_DIR / "hd" / str(year) csv_path = _find_csv(data_dir) if csv_path is None: print(f" No HD CSV found for {year}, skipping") continue df = pl.read_csv( csv_path, infer_schema_length=0, # read everything as string first encoding="utf8-lossy", ) df = _map_columns(df, year) # Cast types df = df.with_columns( pl.col("unitid").cast(pl.Int64), pl.col("year").cast(pl.Int64), pl.col("sector").cast(pl.Int64, strict=False), pl.col("control").cast(pl.Int64, strict=False), pl.col("carnegie_class").cast(pl.Int64, strict=False), pl.col("enrollment_total").cast(pl.Int64, strict=False), ) if unitid_filter is not None: df = df.filter(pl.col("unitid") == unitid_filter) if df.height == 0: continue conn.execute("DELETE FROM raw_institution WHERE year = ?", [year]) conn.register("_tmp_institution", df.to_arrow()) conn.execute("INSERT INTO raw_institution SELECT * FROM _tmp_institution") conn.unregister("_tmp_institution") total += df.height return total