Skip to content

Quick Start

Manu Murugesan edited this page Mar 14, 2026 · 3 revisions

Quick Start

This page walks through the basic workflow: setting up a Dask cluster, loading claims, cleaning them, and extracting a patient cohort.

1. Set Up a Dask Cluster

from dask.distributed import Client, LocalCluster

cluster = LocalCluster(
    n_workers=8,
    threads_per_worker=1,    # 1 thread per worker avoids GIL contention with pandas
    memory_limit="8GB",
)
client = Client(cluster)
print(client.dashboard_link)  # Opens Dask dashboard for monitoring

For HPC/SLURM environments, see Scaling with Dask.

2. Load and Clean Claims

MAX Format (Pre-2016)

from medicaid_utils.preprocessing import max_ip, max_ot, max_ps

# Load and preprocess inpatient claims (cleaning + variable construction)
ip = max_ip.MAXIP(year=2012, state="WY", data_root="/path/to/data")

# Access the cleaned Dask DataFrame
df_ip = ip.df

# Load outpatient claims with IP overlap flagging
ot = max_ot.MAXOT(year=2012, state="WY", data_root="/path/to/data")
ot.flag_ip_overlaps_and_ed(df_ip)

# Load person summary with rural classification
ps = max_ps.MAXPS(year=2012, state="WY", data_root="/path/to/data")

TAF Format (2016+)

from medicaid_utils.preprocessing import taf_ip, taf_ot, taf_ps

ip = taf_ip.TAFIP(year=2019, state="AL", data_root="/path/to/data")
ps = taf_ps.TAFPS(year=2019, state="AL", data_root="/path/to/data")

# TAF data is in dct_files (keyed by subtype: "base", "line", "occurrence_code", "base_diag_codes", "line_ndc_codes")
df_ip_base = ip.dct_files["base"]

Key difference: MAX files use ip.df, TAF files use ip.dct_files["base"]. See MAX vs TAF for details.

3. Apply Risk Adjustment

from medicaid_utils.adapted_algorithms.py_elixhauser.elixhauser_comorbidity import score

# MAX — first construct LST_DIAG_CD from individual diagnosis columns
diag_cols = [c for c in ip.df.columns if c.startswith("DIAG_CD_")]
ip.df = ip.df.map_partitions(
    lambda pdf: pdf.assign(
        LST_DIAG_CD=pdf[diag_cols].apply(
            lambda row: ",".join(v for v in row if v and str(v).strip()), axis=1
        )
    )
)
df_scored = score(ip.df, lst_diag_col_name="LST_DIAG_CD", cms_format="MAX")

# TAF — gather diagnosis codes (creates LST_DIAG_CD on dct_files["base_diag_codes"])
# ip.gather_bene_level_diag_ndc_codes()
# df_scored = score(ip.dct_files["base_diag_codes"], lst_diag_col_name="LST_DIAG_CD", cms_format="TAF")

4. Extract a Patient Cohort

from medicaid_utils.filters.patients.cohort_extraction import extract_cohort

# Define ICD-9 and ICD-10 diagnosis codes for Type 2 diabetes
dct_codes = {
    "diag_codes": {"diabetes_t2": {"incl": {9: ["250"], 10: ["E11"]}}},
    "proc_codes": {},
}

# Define filters and paths
dct_filters = {"cohort": {"ip": {"missing_dob": 0}}, "export": {}}
dct_paths = {"source_root": "/path/to/data", "export_folder": "/output/cohort/"}

# Extract and export cohort claim files
extract_cohort(
    state="WY", lst_year=[2012],
    dct_diag_proc_codes=dct_codes,
    dct_filters=dct_filters,
    lst_types_to_export=["ip", "ot", "ps"],
    dct_data_paths=dct_paths,
    cms_format="MAX",
)

5. Flag Claims by Diagnosis or Procedure

from medicaid_utils.filters.claims import dx_and_proc

# Flag claims matching ICD-9 diagnosis codes
df_flagged = dx_and_proc.flag_diagnoses_and_procedures(
    dct_diag_codes={"asthma": {"incl": {9: ["4939", "49390"]}}},
    dct_proc_codes={},
    df_claims=ot.df,
    cms_format="MAX",
)

Next Steps

Clone this wiki locally