diff --git a/etls/loadHalloween/mapping.ipynb b/etls/loadHalloween/mapping.ipynb new file mode 100644 index 00000000..ca314a76 --- /dev/null +++ b/etls/loadHalloween/mapping.ipynb @@ -0,0 +1,717 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "a126be52", + "metadata": {}, + "source": [ + "# Halloween CSV to API db Initial Load POC" + ] + }, + { + "cell_type": "markdown", + "id": "bfebd40d", + "metadata": {}, + "source": [ + "## Initial Set-up" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "0a26a1bc", + "metadata": {}, + "outputs": [], + "source": [ + "import pandas as pd\n", + "import datetime\n", + "import os\n", + "from utils import createEngine\n", + "import uuid\n", + "import numpy as np\n", + "import warnings\n", + "from pangres import upsert\n", + "from urllib.parse import urljoin\n", + "import requests\n", + "import zipfile\n", + "import io\n", + "\n", + "warnings.simplefilter(action='ignore', category=pd.errors.DtypeWarning)\n", + "warnings.simplefilter(\n", + " action='ignore', category=pd.errors.SettingWithCopyWarning)\n" + ] + }, + { + "cell_type": "markdown", + "id": "9b17ed3b", + "metadata": {}, + "source": [ + "### Define helper functions" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "357ba103", + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "def convertBool(val):\n", + " \"\"\"\n", + " The Halloween CSVs use 1 and 0 to represent True and False. This function maps the 1s to True and the 0s to False.\n", + " \"\"\"\n", + " primary_to_bool = {1: True, 0: False}\n", + " if val in primary_to_bool.keys():\n", + " return primary_to_bool[val]\n", + " else:\n", + " return False\n", + "\n", + "def show_or_load(df, table_name, schema_name, engine, load=True):\n", + " \"\"\"\n", + " This function allows you to decide whether you do or don't proceed with loading data (so you can focus on preparing it/ debugging).\n", + " It also prints out the name of the table that is being loaded, so you can tell what has been loaded and what is in progress.\n", + " \"\"\"\n", + " if load:\n", + " print(f'Loading {table_name}')\n", + " df.to_sql(table_name, schema=schema_name, con=engine,\n", + " if_exists='append', index=False)\n", + " else:\n", + " print(f'Showing {table_name}')\n", + " print(df.head())\n", + "\n", + "def gen_uuid_id(base_url, id):\n", + " url = urljoin(base_url, id)\n", + " return uuid.uuid5(uuid.NAMESPACE_URL, url)" + ] + }, + { + "cell_type": "markdown", + "id": "6cc95d55", + "metadata": {}, + "source": [ + "### Create a database engine from the settings specified in your .env file" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "09f9d8b4", + "metadata": {}, + "outputs": [], + "source": [ + "engine = createEngine()" + ] + }, + { + "cell_type": "markdown", + "id": "eb1d8d3e", + "metadata": {}, + "source": [ + "### Define the scratch workspace where the Halloween CSVs are located and where the NPPES Main File will be downloaded" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "3e78049b", + "metadata": {}, + "outputs": [], + "source": [ + "scratch_dir = os.path.join('..','scratch')" + ] + }, + { + "cell_type": "markdown", + "id": "08e6c3c0", + "metadata": {}, + "source": [ + "## Get data to fill gaps in Halloween CSVs" + ] + }, + { + "cell_type": "markdown", + "id": "ce42c70e", + "metadata": {}, + "source": [ + "### Get FIPS State Reference Data\n", + "We need to load the FIPS state reference data from the target db, to serve as a lookup table between state abbreviations and state codes, because the Halloween CSVs only contain state abbreviations but the db utilizes state codes" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "6d13c3ce", + "metadata": {}, + "outputs": [], + "source": [ + "fips_state_df = pd.read_sql('select * from npd.fips_state', con = engine)\n", + "fips_state_df.set_index('abbreviation', inplace=True)" + ] + }, + { + "cell_type": "markdown", + "id": "c3d737e6", + "metadata": {}, + "source": [ + "### Get NPI Data\n", + "Since the Halloween CSV files do not contain sufficient attributes for NPIs, we need to download the latest NPPES main file and get the additional NPI fields that the target db is expecting" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "18201595", + "metadata": {}, + "outputs": [], + "source": [ + "def generateNPPESVersion(version = 'Monthly', days_ago = 0):\n", + " current_date = datetime.datetime.now() - datetime.timedelta(days = days_ago)\n", + " if version == 'Monthly':\n", + " current_month = current_date.strftime(\"%B\")\n", + " current_year = current_date.year\n", + " csv_version = f'{current_month}_{current_year}_V2'\n", + " else:\n", + " current_week_start = current_date - datetime.timedelta(days=current_date.weekday()-7).strftime(\"%\")\n", + " current_week_end = current_date + datetime.timedelta(days = 6)\n", + "\n", + " return csv_version" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "c9cff0ac", + "metadata": {}, + "outputs": [ + { + "ename": "TypeError", + "evalue": "strftime() missing required argument 'format' (pos 1)", + "output_type": "error", + "traceback": [ + "\u001b[31m---------------------------------------------------------------------------\u001b[39m", + "\u001b[31mTypeError\u001b[39m Traceback (most recent call last)", + "\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[8]\u001b[39m\u001b[32m, line 3\u001b[39m\n\u001b[32m 1\u001b[39m current_date = datetime.datetime.now()\n\u001b[32m 2\u001b[39m current_month = current_date.strftime(\u001b[33m\"\u001b[39m\u001b[33m%\u001b[39m\u001b[33mB\u001b[39m\u001b[33m\"\u001b[39m)\n\u001b[32m----> \u001b[39m\u001b[32m3\u001b[39m prior_month = \u001b[43m(\u001b[49m\u001b[43mcurrent_date\u001b[49m\u001b[43m \u001b[49m\u001b[43m-\u001b[49m\u001b[43m \u001b[49m\u001b[43mdatetime\u001b[49m\u001b[43m.\u001b[49m\u001b[43mtimedelta\u001b[49m\u001b[43m(\u001b[49m\u001b[43mdays\u001b[49m\u001b[43m \u001b[49m\u001b[43m=\u001b[49m\u001b[43m \u001b[49m\u001b[32;43m30\u001b[39;49m\u001b[43m)\u001b[49m\u001b[43m)\u001b[49m\u001b[43m.\u001b[49m\u001b[43mstrftime\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 4\u001b[39m current_year = current_date.year\n\u001b[32m 5\u001b[39m csv_version = \u001b[33mf\u001b[39m\u001b[33m'\u001b[39m\u001b[38;5;132;01m{\u001b[39;00mcurrent_month\u001b[38;5;132;01m}\u001b[39;00m\u001b[33m_\u001b[39m\u001b[38;5;132;01m{\u001b[39;00mcurrent_year\u001b[38;5;132;01m}\u001b[39;00m\u001b[33m_V2\u001b[39m\u001b[33m'\u001b[39m\n", + "\u001b[31mTypeError\u001b[39m: strftime() missing required argument 'format' (pos 1)" + ] + } + ], + "source": [ + "current_date = datetime.datetime.now()\n", + "current_month = current_date.strftime(\"%B\")\n", + "prior_month = (current_date - datetime.timedelta(days = 30)).strftime()\n", + "current_year = current_date.year\n", + "csv_version = f'{current_month}_{current_year}_V2'\n", + "nppes_dir = os.path.join(scratch_dir,'nppes')\n", + "\n", + "# Download and unzip the NPPES CSV files\n", + "#zipData = requests.get(f'https://download.cms.gov/nppes/NPPES_Data_Dissemination_{csv_version}.zip').content" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "eff5e9d8", + "metadata": {}, + "outputs": [ + { + "ename": "NameError", + "evalue": "name 'csv_version' is not defined", + "output_type": "error", + "traceback": [ + "\u001b[31m---------------------------------------------------------------------------\u001b[39m", + "\u001b[31mNameError\u001b[39m Traceback (most recent call last)", + "\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[9]\u001b[39m\u001b[32m, line 1\u001b[39m\n\u001b[32m----> \u001b[39m\u001b[32m1\u001b[39m \u001b[33mf\u001b[39m\u001b[33m'\u001b[39m\u001b[33mhttps://download.cms.gov/nppes/NPPES_Data_Dissemination_\u001b[39m\u001b[38;5;132;01m{\u001b[39;00m\u001b[43mcsv_version\u001b[49m\u001b[38;5;132;01m}\u001b[39;00m\u001b[33m.zip\u001b[39m\u001b[33m'\u001b[39m\n", + "\u001b[31mNameError\u001b[39m: name 'csv_version' is not defined" + ] + } + ], + "source": [ + "f'https://download.cms.gov/nppes/NPPES_Data_Dissemination_{csv_version}.zip'" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "eec38e7c", + "metadata": {}, + "outputs": [], + "source": [ + "current_date = datetime.datetime.now()\n", + "current_month = current_date.strftime(\"%B\")\n", + "current_year = current_date.year\n", + "csv_version = f'{current_month}_{current_year}_V2'\n", + "nppes_dir = os.path.join(scratch_dir,'nppes')\n", + "\n", + "# Download and unzip the NPPES CSV files\n", + "#zipData = requests.get(f'https://download.cms.gov/nppes/NPPES_Data_Dissemination_{csv_version}.zip').content\n", + "#with zipfile.ZipFile(io.BytesIO(zipData), 'r') as zip_file:\n", + "# zip_file.extractall(nppes_dir)\n", + "main_files = [f for f in os.listdir(nppes_dir) if 'npidata_pfile' in f and '_fileheader' not in f]\n", + "main_files.sort()\n", + "latest_main_file = main_files[-1]\n", + "\n", + "npi_df = pd.read_csv(os.path.join(nppes_dir, latest_main_file), usecols = ['Provider Last Name (Legal Name)', 'NPI', 'Entity Type Code', 'Replacement NPI', 'Provider Enumeration Date', 'Last Update Date',\n", + " 'NPI Deactivation Reason Code', 'NPI Deactivation Date', 'NPI Reactivation Date', 'Certification Date'])\n", + "npi_df_renamed = npi_df.rename(columns={\n", + " 'NPI': 'npi',\n", + " 'Entity Type Code': 'entity_type_code',\n", + " 'Replacement NPI': 'replacement_npi',\n", + " 'Provider Enumeration Date': 'enumeration_date',\n", + " 'Last Update Date': 'last_update_date',\n", + " 'NPI Deactivation Reason Code': 'deactivation_reason_code',\n", + " 'NPI Deactivation Date': 'deactivation_date',\n", + " 'NPI Reactivation Date': 'reactivation_date',\n", + " 'Certification Date': 'certification_date'\n", + " })" + ] + }, + { + "cell_type": "markdown", + "id": "78d95fd5", + "metadata": {}, + "source": [ + "### Populate NPI Fields\n", + "The NPPES main file strips certain field values for records with deactivated NPIs, so we populate those as needed for the target db.\n", + "1. Deactivated NPIs show up without entity_type_code values, but those are required in the db. We use the Provider Last Name (Legal Name) field to intuit whether a provider is an invidual (if there is a last name listed, the provider has entity_type_code 1) or an organization (if there is not a last name listed, the provider has entity_type_code 2)\n", + "2. Deactivated NPIs show up without enumeration_date and last_update_date values. We populate bogus dates of 1/1/1900." + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "ae525963", + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "deactivated_npi1_condition = (npi_df_renamed['entity_type_code'].isnull())&~(npi_df_renamed['Provider Last Name (Legal Name)'].isnull())\n", + "deactivated_npi2_condition = (npi_df_renamed['entity_type_code'].isnull())&(npi_df_renamed['Provider Last Name (Legal Name)'].isnull())\n", + "npi_df_renamed.loc[deactivated_npi1_condition, ['entity_type_code', 'enumeration_date', 'last_update_date']] = [1, '1900-01-01', '1900-01-01']\n", + "npi_df_renamed.loc[deactivated_npi2_condition, ['entity_type_code', 'enumeration_date', 'last_update_date']] = [2, '1900-01-01', '1900-01-01']\n", + "del npi_df_renamed['Provider Last Name (Legal Name)']" + ] + }, + { + "cell_type": "markdown", + "id": "9ff46de7", + "metadata": {}, + "source": [ + "## Process Halloween Data" + ] + }, + { + "cell_type": "markdown", + "id": "501c6898", + "metadata": {}, + "source": [ + "### Read in the Halloween CSVs \n", + "We loop through the halloween_data folder (downloaded from the [Halloween Release Google Drive Folder](https://drive.google.com/drive/folders/1zvneyQi7xNReIfeKkdpTgxX1BziaeKkT)) and store each in a dictionary. Optionally, we can load them to the raw_csv schema within the database, to facilitate querying and inspection." + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "ed4a504c", + "metadata": {}, + "outputs": [], + "source": [ + "load_raw_csvs = False\n", + "df_dict={}\n", + "for f in os.listdir(os.path.join(scratch_dir, 'halloween_data')):\n", + " if '.csv' in f:\n", + " tablename = f.split('.csv')[0]\n", + " df = pd.read_csv(os.path.join(scratch_dir,'halloween_data',f), na_values=[''], keep_default_na=False)\n", + " df_dict[f]=df\n", + " if load_raw_csvs:\n", + " df.to_sql(tablename, index=False, schema = 'raw_csv', con = engine, if_exists='replace')" + ] + }, + { + "cell_type": "markdown", + "id": "22276745", + "metadata": {}, + "source": [ + "### Structure Practitioner Data\n", + "Using the pracitioner csv and the practiitonerrole csv, we transform the data into the formats necessary for the individual provider tables:\n", + "* Assign a uuid id to each practitioner record\n", + "* Map the following csv fields to db fields:\n", + " * gender_code to sex, because that is how the field is captured in NPPES (in the ETL process it is renamed to gender_code, but here it is renamed back)\n", + " * name_prefix to prefix\n", + " * name_suffix to suffix\n", + "* Assign a name_use_id of 1 for \"usual,\" which is the FHIR code assigned to the primary name. Since we don't have any other provider names listed, we assume that the name provided here is the primary name\n", + "* Join practitioner data with taxonomy data, so we can associate our new uuids with the taxonomy records\n", + "* Filter out the practitioner with the NPI 1770923773, because that NPI does not exist\n", + "* Filter out the taxonomy records with a state code of 'ZZ,' because that state code does not exist\n", + "* Separate out taxonomy information from license information" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "7e520af4", + "metadata": {}, + "outputs": [], + "source": [ + "practitioner_df = df_dict['practitioner.csv']\n", + "#note: we can do this because each practitioner only appears once in this table\n", + "practitioner_df['id'] = [gen_uuid_id('https://api/individual/', str(i)) for i in practitioner_df['id']]\n", + "practitioner_df_renamed = practitioner_df.rename(columns = {'gender_code': 'sex', 'name_prefix': 'prefix', 'name_suffix': 'suffix'})\n", + "practitioner_df_renamed['name_use_id'] = 1\n", + "practitioner_taxonomy_df = df_dict['practitionerrole.csv']\n", + "filtered_practitioner_taxonomy_df = practitioner_taxonomy_df.loc[practitioner_taxonomy_df['practitioner_id']!=1770923773]\n", + "merged_taxonomy_df = filtered_practitioner_taxonomy_df.merge(practitioner_df_renamed, left_on = 'practitioner_id', right_on = 'npi', suffixes = ('tax', 'individual')) \n", + "merged_taxonomy_df = merged_taxonomy_df.loc[merged_taxonomy_df['state_code']!='ZZ']\n", + "merged_taxonomy_df['state_code'] = [fips_state_df.loc[i]['id'] if i in fips_state_df.index else np.nan for i in merged_taxonomy_df['state_code']]\n", + "merged_taxonomy_df_renamed = merged_taxonomy_df.rename(columns={'idindividual': 'individual_id', 'taxonomy_code':'nucc_code'})\n", + "provider_to_taxonomy_df = merged_taxonomy_df_renamed[['npi', 'nucc_code', 'is_primary']]\n", + "provider_to_taxonomy_df['is_primary'] = provider_to_taxonomy_df['is_primary'].apply(lambda x: convertBool(x))\n", + "dedup_taxonomy_df = provider_to_taxonomy_df.sort_values(by='is_primary', ascending=False)[\n", + " ['npi', 'nucc_code', 'is_primary']].drop_duplicates(subset=['nucc_code', 'npi'])\n", + "dedup_taxonomy_df['id'] = [uuid.uuid4() for i in dedup_taxonomy_df.index]\n", + "license_df = dedup_taxonomy_df.merge(merged_taxonomy_df_renamed, on = ['npi', 'nucc_code'], suffixes = ('tax', 'cred'))\n", + "license_df_renamed = license_df.rename(columns={'id': 'provider_to_taxonomy_id'})" + ] + }, + { + "cell_type": "markdown", + "id": "2762d9ce", + "metadata": {}, + "source": [ + "### Structure Organization Data\n", + "Using the organization csv and the organization_npi csv, we attempt to discern a hierarchical organization structure and transform the data into the formats necessary for the organization tables:\n", + "* Since we only have one name per organization, we assume this is the primary name and set the is_primary field (which will later be loaded into the organization_to_name table) to True\n", + "* We associate a uuid id with each organization \n", + "* We back calculate the organization hierarchy by backpopulating the uuids into the old id and parent id fields\n", + "* We also ensure that each NPI is associated with its own organization and that the hierarchy is maintained when doing so\n" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "74c78543", + "metadata": {}, + "outputs": [], + "source": [ + "organization_df = df_dict['organization.csv']\n", + "organization_df['is_primary'] = True\n", + "organization_df_renamed = organization_df.rename(columns={'id':'old_org_id', 'parent_id':'old_parent_id', 'organization_name':'name'})\n", + "organization_df_renamed.set_index(['old_org_id'], inplace=True)\n", + "organization_df_renamed['org_id'] = [uuid.uuid4() for i in organization_df_renamed.index]\n", + "organization_df_renamed['org_parent_id'] = [organization_df_renamed.loc[i]['org_id'] if i in fips_state_df.index else np.nan for i in organization_df_renamed['old_parent_id']]\n", + "organization_npi_df = df_dict['organization_npi.csv']\n", + "organization_npi_df_renamed = organization_npi_df.rename(columns={'organization_id':'old_org_id'})\n", + "organization_npi_df_renamed['id'] = [gen_uuid_id('https://api/organization/', str(i)) for i in organization_npi_df_renamed['npi']]\n", + "clinical_organization_df = organization_npi_df_renamed.merge(organization_df_renamed, on='old_org_id', how='outer')\n", + "clinical_organization_df_renamed = clinical_organization_df.rename(columns={'org_id':'parent_id'})\n", + "other_organization_df = organization_df_renamed.rename(columns = {'org_id':'id', 'org_parent_id': 'parent_id'})\n" + ] + }, + { + "cell_type": "markdown", + "id": "7cb2b701", + "metadata": {}, + "source": [ + "### Structure Endpoint Data\n", + "Using the endpoint csv, we transform the data into the necessary structure for the endpoint tables:\n", + "* Rename `fhir_url` to `address`\n", + "* Create a table of unique ehr vendors and assign a uuid to each\n", + "* Join the vendor uuids back to the endpoint data\n", + "* Populate fields that are not present in the dataset (environment_type_id and endpoint_connection_type_id) with hardcoded values\n", + "* Assign a uuid to each endpoint record" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "39a319ec", + "metadata": {}, + "outputs": [], + "source": [ + "endpoint_df = df_dict['endpoint.csv']\n", + "endpoint_df_renamed = endpoint_df.rename(columns={'id':'endpoint_id','fhir_url':'address'})\n", + "ehr_vendor_df = endpoint_df.drop_duplicates(subset='vendor_name')\n", + "ehr_vendor_df['id'] = [gen_uuid_id('https://api/ehr_vendor/', str(i)) for i in ehr_vendor_df['vendor_name']]\n", + "ehr_vendor_df_renamed = ehr_vendor_df.rename(columns={'vendor_name':'name'})\n", + "ehr_vendor_df_renamed.set_index('name', inplace=True, drop=False)\n", + "endpoint_df_renamed['ehr_vendor_id'] = endpoint_df_renamed['vendor_name'].apply(lambda x: ehr_vendor_df_renamed.loc[x]['id'])\n", + "endpoint_df_renamed['environment_type_id'] = 'prod'\n", + "endpoint_df_renamed['endpoint_connection_type_id'] = 'hl7-fhir-rest'\n", + "endpoint_df_renamed['id'] = [gen_uuid_id('https://api/endpoint_instance/', str(i)) for i in endpoint_df_renamed['endpoint_id']]" + ] + }, + { + "cell_type": "markdown", + "id": "885e032a", + "metadata": {}, + "source": [ + "### Structure Organization to Endpoint Data\n", + "Using the organization_endpoint csv, we transform the data into the necessary structure for the organization to endpoint relationship:\n", + "* Join the endpoint data to the organization_to_endpoint data so we can associate the endpoint uuids with the org_to_endpoint records and also join the organization data to the organization_to_endpoint data, so we can associate the organization uuids with the org_to_endpoint records" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "55f6e7f2", + "metadata": {}, + "outputs": [], + "source": [ + "org_to_endpoint_df = df_dict['organization_endpoint.csv']\n", + "merged_org_to_endpoint_df = org_to_endpoint_df.merge(endpoint_df_renamed, on = 'endpoint_id', how='outer').merge(clinical_organization_df_renamed, left_on = 'organization_npi', right_on = 'npi', suffixes = ('endpoint', 'organization'), how='outer')\n", + "merged_org_to_endpoint_df= merged_org_to_endpoint_df[['idendpoint', 'idorganization']].rename(columns = {'idendpoint': 'endpoint_instance_id', 'idorganization':'organization_id'}).dropna()" + ] + }, + { + "cell_type": "markdown", + "id": "98b6e7f0", + "metadata": {}, + "source": [ + "### Structure Address Data\n", + "Using the location csv and the npi_location csv, transform the data into the necessary structure for the address and location tables:\n", + "* Rename the columns to align with the naming in the database:\n", + " * `line` to `delivery_line_1` (note: ideally we would have multiple fields, one for each line)\n", + " * `postalcode` to `zipcode`\n", + " * `city` to `city_name`\n", + "* Assign a uuid to each address record\n", + "* Filter out the states that do not exist (FM, ~, UK, MH) (note: FM and MH are US territories, but the addresses with those values listed as states do not correspond to those territories)\n", + "* Populate the fips state codes based on state abbreviation\n", + "* Join the address data to npi_location in order to populate the address uuids\n", + "* Join the practitioner and organization data to npi_location in order to populate practitioner uuid and organization uuid\n", + "* Populate the address_use_id with a hard coded value of 2 (for work address), since there is no reference to the address type\n", + "* Assign a location uuid to each location (address associated with an organization)\n", + "* Associate the endpoints with the appropriate locations, based on their organization affiliations (we assume that each organization uses each endpoint at all their locations)" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "id": "56004b90", + "metadata": {}, + "outputs": [], + "source": [ + "address_df = df_dict['location.csv']\n", + "address_df_renamed = address_df.rename(columns={'id':'address_us_id', 'line':'delivery_line_1', 'postalcode':'zipcode', 'city':'city_name'})\n", + "address_df_renamed['id']= [uuid.uuid4() for i in address_df_renamed.index]\n", + "address_df_renamed = address_df_renamed.loc[(address_df_renamed['state'] != 'FM') & (address_df_renamed['state'] != '~') & (address_df_renamed['state'] != 'UK') & (address_df['state'] != 'MH')]\n", + "address_df_renamed['state_code'] = address_df_renamed['state'].apply(lambda x: fips_state_df.loc[x]['id'])\n", + "location_npi_df = df_dict['npi_location.csv']\n", + "merged_df_1 = location_npi_df.merge(address_df_renamed, left_on='location_id', right_on = 'address_us_id', how='outer')\n", + "merged_df_2 = merged_df_1.merge(npi_df_renamed, on = 'npi', suffixes=('address','npi'), how='outer')\n", + "merged_df_3 = merged_df_2.merge(practitioner_df_renamed, on = 'npi', suffixes = ('address', 'individual'), how='outer')\n", + "merged_location_df = merged_df_3.merge(clinical_organization_df_renamed, on = 'npi', suffixes = ('address', 'organization'), how='outer')\n", + "merged_location_df_renamed = merged_location_df.rename(columns={'idaddress':'address_id', 'idindividual':'individual_id', 'id':'organization_id', 'nameaddress':'name'})\n", + "merged_location_df_renamed['address_use_id'] = 2\n", + "individual_to_address_df = merged_location_df_renamed[['address_id','individual_id', 'address_use_id']].dropna(how='any')\n", + "location_df = merged_location_df_renamed[['address_id','organization_id','name', 'address_use_id']].dropna(how='any')\n", + "location_df['id'] = [uuid.uuid4() for i in location_df.index]\n", + "location_to_endpoint_df = location_df.merge(merged_org_to_endpoint_df, on = 'organization_id', how='outer')[['id', 'endpoint_instance_id']].dropna(how = 'any').rename(columns = {'id':'location_id'})\n" + ] + }, + { + "cell_type": "markdown", + "id": "7ece7d77", + "metadata": {}, + "source": [ + "### Structure Provider to Organization Data\n", + "Using the personal_npi_to_organizational_npi csv, transform the data into the necessary structure for the provider to organization and provider to location relationships:\n", + "* Join provider and organization data to associate the provider and organization uuids with the NPIs listed\n", + "* Assign a uuid to each provider to organization relationship\n", + "* Assign a relationship_type_id value of 2 for each relationship where the affiliation_source is 'PECOS Assignment Relationships'\n", + "* Join location information based on organization NPI (we assume each provider works at every location owned by the organization that they have a relationship with)\n", + "* Assign a uuid to each provider to location relationship" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "id": "2595c9c5", + "metadata": {}, + "outputs": [], + "source": [ + "provider_to_organization_df = df_dict['personal_npi_to_organizational_npi.csv']\n", + "merged_provider_to_org_df = provider_to_organization_df.merge(practitioner_df_renamed, left_on = 'personal_npi', right_on = 'npi', how='inner').merge(clinical_organization_df_renamed, left_on = 'organizational_npi', right_on = 'npi', suffixes = ('individual', 'organization'), how='inner')\n", + "provider_to_org_df_renamed = merged_provider_to_org_df.rename(columns = {'idindividual':'individual_id', 'idorganization':'organization_id'})\n", + "provider_to_org_df_renamed['id'] = [uuid.uuid4() for i in provider_to_org_df_renamed.index]\n", + "provider_to_org_df_renamed['relationship_type_id'] = [2 if val=='PECOS Assignment Relationships' else val for val in provider_to_org_df_renamed['affiliation_source']]\n", + "provider_to_location_df = provider_to_org_df_renamed.merge(location_df, on='organization_id', how='inner', suffixes=('porg','location'))\n", + "provider_to_location_df['id'] = [uuid.uuid4() for i in provider_to_location_df.index]\n", + "provider_to_location_df_renamed = provider_to_location_df.rename(columns={'idlocation':'location_id', 'idporg':'provider_to_organization_id'})" + ] + }, + { + "cell_type": "markdown", + "id": "c13ccbcd", + "metadata": {}, + "source": [ + "## Load the Data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "be8bb0b8", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Loading npi\n", + "Loading individual\n", + "Loading individual_to_name\n", + "Loading provider\n", + "Loading organization\n", + "adding parent_id to organization\n", + "Loading organization\n", + "adding parent_id to clinical_organization organizations\n", + "Loading organization_to_name\n", + "Loading organization_to_name\n", + "Loading clinical_organization\n", + "Loading ehr_vendor\n", + "Loading endpoint_instance\n", + "Loading address_us\n", + "Loading address\n", + "Loading individual_to_address\n" + ] + }, + { + "ename": "PendingRollbackError", + "evalue": "Can't reconnect until invalid transaction is rolled back. Please rollback() fully before proceeding (Background on this error at: https://sqlalche.me/e/20/8s2b)", + "output_type": "error", + "traceback": [ + "\u001b[31m---------------------------------------------------------------------------\u001b[39m", + "\u001b[31mPendingRollbackError\u001b[39m Traceback (most recent call last)", + "\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[19]\u001b[39m\u001b[32m, line 53\u001b[39m\n\u001b[32m 50\u001b[39m show_or_load(address_df_renamed[[\u001b[33m'\u001b[39m\u001b[33mid\u001b[39m\u001b[33m'\u001b[39m, \u001b[33m'\u001b[39m\u001b[33maddress_us_id\u001b[39m\u001b[33m'\u001b[39m]], \u001b[33m'\u001b[39m\u001b[33maddress\u001b[39m\u001b[33m'\u001b[39m, schema_name, engine, load)\n\u001b[32m 52\u001b[39m \u001b[38;5;66;03m# load individual_to_address\u001b[39;00m\n\u001b[32m---> \u001b[39m\u001b[32m53\u001b[39m \u001b[43mshow_or_load\u001b[49m\u001b[43m(\u001b[49m\u001b[43mindividual_to_address_df\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[33;43m'\u001b[39;49m\u001b[33;43mindividual_to_address\u001b[39;49m\u001b[33;43m'\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mschema_name\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mengine\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mload\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 55\u001b[39m \u001b[38;5;66;03m# load organization_to_address\u001b[39;00m\n\u001b[32m 56\u001b[39m show_or_load(location_df[[\u001b[33m'\u001b[39m\u001b[33maddress_id\u001b[39m\u001b[33m'\u001b[39m,\u001b[33m'\u001b[39m\u001b[33morganization_id\u001b[39m\u001b[33m'\u001b[39m, \u001b[33m'\u001b[39m\u001b[33maddress_use_id\u001b[39m\u001b[33m'\u001b[39m]], \u001b[33m'\u001b[39m\u001b[33morganization_to_address\u001b[39m\u001b[33m'\u001b[39m, schema_name, engine, load)\n", + "\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[2]\u001b[39m\u001b[32m, line 18\u001b[39m, in \u001b[36mshow_or_load\u001b[39m\u001b[34m(df, table_name, schema_name, engine, load)\u001b[39m\n\u001b[32m 16\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m load:\n\u001b[32m 17\u001b[39m \u001b[38;5;28mprint\u001b[39m(\u001b[33mf\u001b[39m\u001b[33m'\u001b[39m\u001b[33mLoading \u001b[39m\u001b[38;5;132;01m{\u001b[39;00mtable_name\u001b[38;5;132;01m}\u001b[39;00m\u001b[33m'\u001b[39m)\n\u001b[32m---> \u001b[39m\u001b[32m18\u001b[39m \u001b[43mdf\u001b[49m\u001b[43m.\u001b[49m\u001b[43mto_sql\u001b[49m\u001b[43m(\u001b[49m\u001b[43mtable_name\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mschema\u001b[49m\u001b[43m=\u001b[49m\u001b[43mschema_name\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mcon\u001b[49m\u001b[43m=\u001b[49m\u001b[43mengine\u001b[49m\u001b[43m,\u001b[49m\n\u001b[32m 19\u001b[39m \u001b[43m \u001b[49m\u001b[43mif_exists\u001b[49m\u001b[43m=\u001b[49m\u001b[33;43m'\u001b[39;49m\u001b[33;43mappend\u001b[39;49m\u001b[33;43m'\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mindex\u001b[49m\u001b[43m=\u001b[49m\u001b[38;5;28;43;01mFalse\u001b[39;49;00m\u001b[43m)\u001b[49m\n\u001b[32m 20\u001b[39m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[32m 21\u001b[39m \u001b[38;5;28mprint\u001b[39m(\u001b[33mf\u001b[39m\u001b[33m'\u001b[39m\u001b[33mShowing \u001b[39m\u001b[38;5;132;01m{\u001b[39;00mtable_name\u001b[38;5;132;01m}\u001b[39;00m\u001b[33m'\u001b[39m)\n", + "\u001b[36mFile \u001b[39m\u001b[32m/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/pandas/util/_decorators.py:333\u001b[39m, in \u001b[36mdeprecate_nonkeyword_arguments..decorate..wrapper\u001b[39m\u001b[34m(*args, **kwargs)\u001b[39m\n\u001b[32m 327\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mlen\u001b[39m(args) > num_allow_args:\n\u001b[32m 328\u001b[39m warnings.warn(\n\u001b[32m 329\u001b[39m msg.format(arguments=_format_argument_list(allow_args)),\n\u001b[32m 330\u001b[39m \u001b[38;5;167;01mFutureWarning\u001b[39;00m,\n\u001b[32m 331\u001b[39m stacklevel=find_stack_level(),\n\u001b[32m 332\u001b[39m )\n\u001b[32m--> \u001b[39m\u001b[32m333\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[43mfunc\u001b[49m\u001b[43m(\u001b[49m\u001b[43m*\u001b[49m\u001b[43margs\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43m*\u001b[49m\u001b[43m*\u001b[49m\u001b[43mkwargs\u001b[49m\u001b[43m)\u001b[49m\n", + "\u001b[36mFile \u001b[39m\u001b[32m/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/pandas/core/generic.py:3106\u001b[39m, in \u001b[36mNDFrame.to_sql\u001b[39m\u001b[34m(self, name, con, schema, if_exists, index, index_label, chunksize, dtype, method)\u001b[39m\n\u001b[32m 2908\u001b[39m \u001b[38;5;250m\u001b[39m\u001b[33;03m\"\"\"\u001b[39;00m\n\u001b[32m 2909\u001b[39m \u001b[33;03mWrite records stored in a DataFrame to a SQL database.\u001b[39;00m\n\u001b[32m 2910\u001b[39m \n\u001b[32m (...)\u001b[39m\u001b[32m 3102\u001b[39m \u001b[33;03m[(1,), (None,), (2,)]\u001b[39;00m\n\u001b[32m 3103\u001b[39m \u001b[33;03m\"\"\"\u001b[39;00m \u001b[38;5;66;03m# noqa: E501\u001b[39;00m\n\u001b[32m 3104\u001b[39m \u001b[38;5;28;01mfrom\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34;01mpandas\u001b[39;00m\u001b[34;01m.\u001b[39;00m\u001b[34;01mio\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[38;5;28;01mimport\u001b[39;00m sql\n\u001b[32m-> \u001b[39m\u001b[32m3106\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[43msql\u001b[49m\u001b[43m.\u001b[49m\u001b[43mto_sql\u001b[49m\u001b[43m(\u001b[49m\n\u001b[32m 3107\u001b[39m \u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[43m,\u001b[49m\n\u001b[32m 3108\u001b[39m \u001b[43m \u001b[49m\u001b[43mname\u001b[49m\u001b[43m,\u001b[49m\n\u001b[32m 3109\u001b[39m \u001b[43m \u001b[49m\u001b[43mcon\u001b[49m\u001b[43m,\u001b[49m\n\u001b[32m 3110\u001b[39m \u001b[43m \u001b[49m\u001b[43mschema\u001b[49m\u001b[43m=\u001b[49m\u001b[43mschema\u001b[49m\u001b[43m,\u001b[49m\n\u001b[32m 3111\u001b[39m \u001b[43m \u001b[49m\u001b[43mif_exists\u001b[49m\u001b[43m=\u001b[49m\u001b[43mif_exists\u001b[49m\u001b[43m,\u001b[49m\n\u001b[32m 3112\u001b[39m \u001b[43m \u001b[49m\u001b[43mindex\u001b[49m\u001b[43m=\u001b[49m\u001b[43mindex\u001b[49m\u001b[43m,\u001b[49m\n\u001b[32m 3113\u001b[39m \u001b[43m \u001b[49m\u001b[43mindex_label\u001b[49m\u001b[43m=\u001b[49m\u001b[43mindex_label\u001b[49m\u001b[43m,\u001b[49m\n\u001b[32m 3114\u001b[39m \u001b[43m \u001b[49m\u001b[43mchunksize\u001b[49m\u001b[43m=\u001b[49m\u001b[43mchunksize\u001b[49m\u001b[43m,\u001b[49m\n\u001b[32m 3115\u001b[39m \u001b[43m \u001b[49m\u001b[43mdtype\u001b[49m\u001b[43m=\u001b[49m\u001b[43mdtype\u001b[49m\u001b[43m,\u001b[49m\n\u001b[32m 3116\u001b[39m \u001b[43m \u001b[49m\u001b[43mmethod\u001b[49m\u001b[43m=\u001b[49m\u001b[43mmethod\u001b[49m\u001b[43m,\u001b[49m\n\u001b[32m 3117\u001b[39m \u001b[43m\u001b[49m\u001b[43m)\u001b[49m\n", + "\u001b[36mFile \u001b[39m\u001b[32m/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/pandas/io/sql.py:843\u001b[39m, in \u001b[36mto_sql\u001b[39m\u001b[34m(frame, name, con, schema, if_exists, index, index_label, chunksize, dtype, method, engine, **engine_kwargs)\u001b[39m\n\u001b[32m 838\u001b[39m \u001b[38;5;28;01melif\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28misinstance\u001b[39m(frame, DataFrame):\n\u001b[32m 839\u001b[39m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mNotImplementedError\u001b[39;00m(\n\u001b[32m 840\u001b[39m \u001b[33m\"\u001b[39m\u001b[33m'\u001b[39m\u001b[33mframe\u001b[39m\u001b[33m'\u001b[39m\u001b[33m argument should be either a Series or a DataFrame\u001b[39m\u001b[33m\"\u001b[39m\n\u001b[32m 841\u001b[39m )\n\u001b[32m--> \u001b[39m\u001b[32m843\u001b[39m \u001b[43m\u001b[49m\u001b[38;5;28;43;01mwith\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43mpandasSQL_builder\u001b[49m\u001b[43m(\u001b[49m\u001b[43mcon\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mschema\u001b[49m\u001b[43m=\u001b[49m\u001b[43mschema\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mneed_transaction\u001b[49m\u001b[43m=\u001b[49m\u001b[38;5;28;43;01mTrue\u001b[39;49;00m\u001b[43m)\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43;01mas\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43mpandas_sql\u001b[49m\u001b[43m:\u001b[49m\n\u001b[32m 844\u001b[39m \u001b[43m \u001b[49m\u001b[38;5;28;43;01mreturn\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43mpandas_sql\u001b[49m\u001b[43m.\u001b[49m\u001b[43mto_sql\u001b[49m\u001b[43m(\u001b[49m\n\u001b[32m 845\u001b[39m \u001b[43m \u001b[49m\u001b[43mframe\u001b[49m\u001b[43m,\u001b[49m\n\u001b[32m 846\u001b[39m \u001b[43m \u001b[49m\u001b[43mname\u001b[49m\u001b[43m,\u001b[49m\n\u001b[32m (...)\u001b[39m\u001b[32m 855\u001b[39m \u001b[43m \u001b[49m\u001b[43m*\u001b[49m\u001b[43m*\u001b[49m\u001b[43mengine_kwargs\u001b[49m\u001b[43m,\u001b[49m\n\u001b[32m 856\u001b[39m \u001b[43m \u001b[49m\u001b[43m)\u001b[49m\n", + "\u001b[36mFile \u001b[39m\u001b[32m/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/pandas/io/sql.py:1657\u001b[39m, in \u001b[36mSQLDatabase.__exit__\u001b[39m\u001b[34m(self, *args)\u001b[39m\n\u001b[32m 1655\u001b[39m \u001b[38;5;28;01mdef\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34m__exit__\u001b[39m(\u001b[38;5;28mself\u001b[39m, *args) -> \u001b[38;5;28;01mNone\u001b[39;00m:\n\u001b[32m 1656\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28mself\u001b[39m.returns_generator:\n\u001b[32m-> \u001b[39m\u001b[32m1657\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43mexit_stack\u001b[49m\u001b[43m.\u001b[49m\u001b[43mclose\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n", + "\u001b[36mFile \u001b[39m\u001b[32m/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/contextlib.py:594\u001b[39m, in \u001b[36mExitStack.close\u001b[39m\u001b[34m(self)\u001b[39m\n\u001b[32m 592\u001b[39m \u001b[38;5;28;01mdef\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34mclose\u001b[39m(\u001b[38;5;28mself\u001b[39m):\n\u001b[32m 593\u001b[39m \u001b[38;5;250m \u001b[39m\u001b[33;03m\"\"\"Immediately unwind the context stack.\"\"\"\u001b[39;00m\n\u001b[32m--> \u001b[39m\u001b[32m594\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[34;43m__exit__\u001b[39;49m\u001b[43m(\u001b[49m\u001b[38;5;28;43;01mNone\u001b[39;49;00m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43;01mNone\u001b[39;49;00m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43;01mNone\u001b[39;49;00m\u001b[43m)\u001b[49m\n", + "\u001b[36mFile \u001b[39m\u001b[32m/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/contextlib.py:586\u001b[39m, in \u001b[36mExitStack.__exit__\u001b[39m\u001b[34m(self, *exc_details)\u001b[39m\n\u001b[32m 582\u001b[39m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[32m 583\u001b[39m \u001b[38;5;66;03m# bare \"raise exc_details[1]\" replaces our carefully\u001b[39;00m\n\u001b[32m 584\u001b[39m \u001b[38;5;66;03m# set-up context\u001b[39;00m\n\u001b[32m 585\u001b[39m fixed_ctx = exc_details[\u001b[32m1\u001b[39m].__context__\n\u001b[32m--> \u001b[39m\u001b[32m586\u001b[39m \u001b[38;5;28;01mraise\u001b[39;00m exc_details[\u001b[32m1\u001b[39m]\n\u001b[32m 587\u001b[39m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mBaseException\u001b[39;00m:\n\u001b[32m 588\u001b[39m exc_details[\u001b[32m1\u001b[39m].__context__ = fixed_ctx\n", + "\u001b[36mFile \u001b[39m\u001b[32m/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/contextlib.py:571\u001b[39m, in \u001b[36mExitStack.__exit__\u001b[39m\u001b[34m(self, *exc_details)\u001b[39m\n\u001b[32m 569\u001b[39m \u001b[38;5;28;01massert\u001b[39;00m is_sync\n\u001b[32m 570\u001b[39m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[32m--> \u001b[39m\u001b[32m571\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m \u001b[43mcb\u001b[49m\u001b[43m(\u001b[49m\u001b[43m*\u001b[49m\u001b[43mexc_details\u001b[49m\u001b[43m)\u001b[49m:\n\u001b[32m 572\u001b[39m suppressed_exc = \u001b[38;5;28;01mTrue\u001b[39;00m\n\u001b[32m 573\u001b[39m pending_raise = \u001b[38;5;28;01mFalse\u001b[39;00m\n", + "\u001b[36mFile \u001b[39m\u001b[32m/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/sqlalchemy/engine/util.py:147\u001b[39m, in \u001b[36mTransactionalContext.__exit__\u001b[39m\u001b[34m(self, type_, value, traceback)\u001b[39m\n\u001b[32m 145\u001b[39m \u001b[38;5;28mself\u001b[39m.commit()\n\u001b[32m 146\u001b[39m \u001b[38;5;28;01mexcept\u001b[39;00m:\n\u001b[32m--> \u001b[39m\u001b[32m147\u001b[39m \u001b[43m \u001b[49m\u001b[38;5;28;43;01mwith\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43mutil\u001b[49m\u001b[43m.\u001b[49m\u001b[43msafe_reraise\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\u001b[43m:\u001b[49m\n\u001b[32m 148\u001b[39m \u001b[43m \u001b[49m\u001b[38;5;28;43;01mif\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43m_rollback_can_be_called\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\u001b[43m:\u001b[49m\n\u001b[32m 149\u001b[39m \u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43mrollback\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n", + "\u001b[36mFile \u001b[39m\u001b[32m/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py:224\u001b[39m, in \u001b[36msafe_reraise.__exit__\u001b[39m\u001b[34m(self, type_, value, traceback)\u001b[39m\n\u001b[32m 222\u001b[39m \u001b[38;5;28;01massert\u001b[39;00m exc_value \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m\n\u001b[32m 223\u001b[39m \u001b[38;5;28mself\u001b[39m._exc_info = \u001b[38;5;28;01mNone\u001b[39;00m \u001b[38;5;66;03m# remove potential circular references\u001b[39;00m\n\u001b[32m--> \u001b[39m\u001b[32m224\u001b[39m \u001b[38;5;28;01mraise\u001b[39;00m exc_value.with_traceback(exc_tb)\n\u001b[32m 225\u001b[39m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[32m 226\u001b[39m \u001b[38;5;28mself\u001b[39m._exc_info = \u001b[38;5;28;01mNone\u001b[39;00m \u001b[38;5;66;03m# remove potential circular references\u001b[39;00m\n", + "\u001b[36mFile \u001b[39m\u001b[32m/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/sqlalchemy/engine/util.py:145\u001b[39m, in \u001b[36mTransactionalContext.__exit__\u001b[39m\u001b[34m(self, type_, value, traceback)\u001b[39m\n\u001b[32m 143\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m type_ \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m \u001b[38;5;129;01mand\u001b[39;00m \u001b[38;5;28mself\u001b[39m._transaction_is_active():\n\u001b[32m 144\u001b[39m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[32m--> \u001b[39m\u001b[32m145\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43mcommit\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 146\u001b[39m \u001b[38;5;28;01mexcept\u001b[39;00m:\n\u001b[32m 147\u001b[39m \u001b[38;5;28;01mwith\u001b[39;00m util.safe_reraise():\n", + "\u001b[36mFile \u001b[39m\u001b[32m/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/sqlalchemy/engine/base.py:2628\u001b[39m, in \u001b[36mTransaction.commit\u001b[39m\u001b[34m(self)\u001b[39m\n\u001b[32m 2612\u001b[39m \u001b[38;5;250m\u001b[39m\u001b[33;03m\"\"\"Commit this :class:`.Transaction`.\u001b[39;00m\n\u001b[32m 2613\u001b[39m \n\u001b[32m 2614\u001b[39m \u001b[33;03mThe implementation of this may vary based on the type of transaction in\u001b[39;00m\n\u001b[32m (...)\u001b[39m\u001b[32m 2625\u001b[39m \n\u001b[32m 2626\u001b[39m \u001b[33;03m\"\"\"\u001b[39;00m\n\u001b[32m 2627\u001b[39m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[32m-> \u001b[39m\u001b[32m2628\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43m_do_commit\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 2629\u001b[39m \u001b[38;5;28;01mfinally\u001b[39;00m:\n\u001b[32m 2630\u001b[39m \u001b[38;5;28;01massert\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28mself\u001b[39m.is_active\n", + "\u001b[36mFile \u001b[39m\u001b[32m/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/sqlalchemy/engine/base.py:2733\u001b[39m, in \u001b[36mRootTransaction._do_commit\u001b[39m\u001b[34m(self)\u001b[39m\n\u001b[32m 2730\u001b[39m \u001b[38;5;28;01massert\u001b[39;00m \u001b[38;5;28mself\u001b[39m.connection._transaction \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;28mself\u001b[39m\n\u001b[32m 2732\u001b[39m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[32m-> \u001b[39m\u001b[32m2733\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43m_connection_commit_impl\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 2734\u001b[39m \u001b[38;5;28;01mfinally\u001b[39;00m:\n\u001b[32m 2735\u001b[39m \u001b[38;5;66;03m# whether or not commit succeeds, cancel any\u001b[39;00m\n\u001b[32m 2736\u001b[39m \u001b[38;5;66;03m# nested transactions, make this transaction \"inactive\"\u001b[39;00m\n\u001b[32m 2737\u001b[39m \u001b[38;5;66;03m# and remove it as a reset agent\u001b[39;00m\n\u001b[32m 2738\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mself\u001b[39m.connection._nested_transaction:\n", + "\u001b[36mFile \u001b[39m\u001b[32m/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/sqlalchemy/engine/base.py:2704\u001b[39m, in \u001b[36mRootTransaction._connection_commit_impl\u001b[39m\u001b[34m(self)\u001b[39m\n\u001b[32m 2703\u001b[39m \u001b[38;5;28;01mdef\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34m_connection_commit_impl\u001b[39m(\u001b[38;5;28mself\u001b[39m) -> \u001b[38;5;28;01mNone\u001b[39;00m:\n\u001b[32m-> \u001b[39m\u001b[32m2704\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43mconnection\u001b[49m\u001b[43m.\u001b[49m\u001b[43m_commit_impl\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n", + "\u001b[36mFile \u001b[39m\u001b[32m/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/sqlalchemy/engine/base.py:1143\u001b[39m, in \u001b[36mConnection._commit_impl\u001b[39m\u001b[34m(self)\u001b[39m\n\u001b[32m 1141\u001b[39m \u001b[38;5;28mself\u001b[39m.engine.dialect.do_commit(\u001b[38;5;28mself\u001b[39m.connection)\n\u001b[32m 1142\u001b[39m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mBaseException\u001b[39;00m \u001b[38;5;28;01mas\u001b[39;00m e:\n\u001b[32m-> \u001b[39m\u001b[32m1143\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43m_handle_dbapi_exception\u001b[49m\u001b[43m(\u001b[49m\u001b[43me\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43;01mNone\u001b[39;49;00m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43;01mNone\u001b[39;49;00m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43;01mNone\u001b[39;49;00m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43;01mNone\u001b[39;49;00m\u001b[43m)\u001b[49m\n", + "\u001b[36mFile \u001b[39m\u001b[32m/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/sqlalchemy/engine/base.py:2354\u001b[39m, in \u001b[36mConnection._handle_dbapi_exception\u001b[39m\u001b[34m(self, e, statement, parameters, cursor, context, is_sub_exec)\u001b[39m\n\u001b[32m 2352\u001b[39m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[32m 2353\u001b[39m \u001b[38;5;28;01massert\u001b[39;00m exc_info[\u001b[32m1\u001b[39m] \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m\n\u001b[32m-> \u001b[39m\u001b[32m2354\u001b[39m \u001b[38;5;28;01mraise\u001b[39;00m exc_info[\u001b[32m1\u001b[39m].with_traceback(exc_info[\u001b[32m2\u001b[39m])\n\u001b[32m 2355\u001b[39m \u001b[38;5;28;01mfinally\u001b[39;00m:\n\u001b[32m 2356\u001b[39m \u001b[38;5;28;01mdel\u001b[39;00m \u001b[38;5;28mself\u001b[39m._reentrant_error\n", + "\u001b[36mFile \u001b[39m\u001b[32m/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/sqlalchemy/engine/base.py:1141\u001b[39m, in \u001b[36mConnection._commit_impl\u001b[39m\u001b[34m(self)\u001b[39m\n\u001b[32m 1139\u001b[39m \u001b[38;5;28mself\u001b[39m._log_info(\u001b[33m\"\u001b[39m\u001b[33mCOMMIT\u001b[39m\u001b[33m\"\u001b[39m)\n\u001b[32m 1140\u001b[39m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[32m-> \u001b[39m\u001b[32m1141\u001b[39m \u001b[38;5;28mself\u001b[39m.engine.dialect.do_commit(\u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43mconnection\u001b[49m)\n\u001b[32m 1142\u001b[39m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mBaseException\u001b[39;00m \u001b[38;5;28;01mas\u001b[39;00m e:\n\u001b[32m 1143\u001b[39m \u001b[38;5;28mself\u001b[39m._handle_dbapi_exception(e, \u001b[38;5;28;01mNone\u001b[39;00m, \u001b[38;5;28;01mNone\u001b[39;00m, \u001b[38;5;28;01mNone\u001b[39;00m, \u001b[38;5;28;01mNone\u001b[39;00m)\n", + "\u001b[36mFile \u001b[39m\u001b[32m/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/sqlalchemy/engine/base.py:583\u001b[39m, in \u001b[36mConnection.connection\u001b[39m\u001b[34m(self)\u001b[39m\n\u001b[32m 581\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mself\u001b[39m._dbapi_connection \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m:\n\u001b[32m 582\u001b[39m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[32m--> \u001b[39m\u001b[32m583\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43m_revalidate_connection\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 584\u001b[39m \u001b[38;5;28;01mexcept\u001b[39;00m (exc.PendingRollbackError, exc.ResourceClosedError):\n\u001b[32m 585\u001b[39m \u001b[38;5;28;01mraise\u001b[39;00m\n", + "\u001b[36mFile \u001b[39m\u001b[32m/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/sqlalchemy/engine/base.py:675\u001b[39m, in \u001b[36mConnection._revalidate_connection\u001b[39m\u001b[34m(self)\u001b[39m\n\u001b[32m 673\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mself\u001b[39m.__can_reconnect \u001b[38;5;129;01mand\u001b[39;00m \u001b[38;5;28mself\u001b[39m.invalidated:\n\u001b[32m 674\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mself\u001b[39m._transaction \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m:\n\u001b[32m--> \u001b[39m\u001b[32m675\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43m_invalid_transaction\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 676\u001b[39m \u001b[38;5;28mself\u001b[39m._dbapi_connection = \u001b[38;5;28mself\u001b[39m.engine.raw_connection()\n\u001b[32m 677\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28mself\u001b[39m._dbapi_connection\n", + "\u001b[36mFile \u001b[39m\u001b[32m/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/site-packages/sqlalchemy/engine/base.py:665\u001b[39m, in \u001b[36mConnection._invalid_transaction\u001b[39m\u001b[34m(self)\u001b[39m\n\u001b[32m 664\u001b[39m \u001b[38;5;28;01mdef\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34m_invalid_transaction\u001b[39m(\u001b[38;5;28mself\u001b[39m) -> NoReturn:\n\u001b[32m--> \u001b[39m\u001b[32m665\u001b[39m \u001b[38;5;28;01mraise\u001b[39;00m exc.PendingRollbackError(\n\u001b[32m 666\u001b[39m \u001b[33m\"\u001b[39m\u001b[33mCan\u001b[39m\u001b[33m'\u001b[39m\u001b[33mt reconnect until invalid \u001b[39m\u001b[38;5;132;01m%s\u001b[39;00m\u001b[33mtransaction is rolled \u001b[39m\u001b[33m\"\u001b[39m\n\u001b[32m 667\u001b[39m \u001b[33m\"\u001b[39m\u001b[33mback. Please rollback() fully before proceeding\u001b[39m\u001b[33m\"\u001b[39m\n\u001b[32m 668\u001b[39m % (\u001b[33m\"\u001b[39m\u001b[33msavepoint \u001b[39m\u001b[33m\"\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mself\u001b[39m._nested_transaction \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m \u001b[38;5;28;01melse\u001b[39;00m \u001b[33m\"\u001b[39m\u001b[33m\"\u001b[39m),\n\u001b[32m 669\u001b[39m code=\u001b[33m\"\u001b[39m\u001b[33m8s2b\u001b[39m\u001b[33m\"\u001b[39m,\n\u001b[32m 670\u001b[39m )\n", + "\u001b[31mPendingRollbackError\u001b[39m: Can't reconnect until invalid transaction is rolled back. Please rollback() fully before proceeding (Background on this error at: https://sqlalche.me/e/20/8s2b)" + ] + } + ], + "source": [ + "schema_name = 'npd'\n", + "load = True\n", + "\n", + "# load npi\n", + "#show_or_load(npi_df_renamed, 'npi', schema_name, engine, load)\n", + "\n", + "# load individual\n", + "#show_or_load(practitioner_df_renamed[['id', 'sex']], 'individual', schema_name, engine, load)\n", + "practitioner_df_renamed_renamed = practitioner_df_renamed.rename(columns={'id':'individual_id'})\n", + "\n", + "# load individual_to_name\n", + "#show_or_load(practitioner_df_renamed_renamed[['individual_id', 'first_name', 'middle_name', 'last_name', 'prefix', 'suffix', 'name_use_id']], 'individual_to_name', schema_name, engine, load)\n", + "\n", + "# load provider\n", + "#show_or_load(practitioner_df_renamed_renamed.merge(npi_df_renamed, on = 'npi', how='inner')[['npi', 'individual_id']], 'provider', schema_name, engine, load)\n", + "\n", + "# load organization\n", + "#show_or_load(other_organization_df[['id']], 'organization', schema_name, engine, load)\n", + "other_organization_df.set_index('id', drop=False, inplace=True)\n", + "if load:\n", + " print('adding parent_id to organization')\n", + " #upsert(df = other_organization_df[['parent_id']], con = engine, schema = schema_name, if_row_exists='update', table_name = 'organization')\n", + "#show_or_load(clinical_organization_df_renamed[['id']], 'organization', schema_name, engine, load)\n", + "clinical_organization_df_renamed.set_index('id', drop=False, inplace=True)\n", + "if load:\n", + " print('adding parent_id to clinical_organization organizations')\n", + " #upsert(df = clinical_organization_df_renamed[['parent_id']], con = engine, schema = schema_name, if_row_exists='update', table_name = 'organization')\n", + "\n", + "other_organization_df_renamed = other_organization_df.rename(columns={'id':'organization_id', 'organization_name':'name'})\n", + "clinical_organization_df_renamed_renamed = clinical_organization_df_renamed.rename(columns={'id':'organization_id'})\n", + "\n", + "# load organization_to_name\n", + "\n", + "#show_or_load(other_organization_df_renamed[['organization_id', 'name', 'is_primary']], 'organization_to_name', schema_name, engine, load)\n", + "#show_or_load(clinical_organization_df_renamed_renamed[['organization_id', 'name', 'is_primary']], 'organization_to_name', schema_name, engine, load)\n", + "\n", + "# load clinical_organization\n", + "#show_or_load(clinical_organization_df_renamed_renamed[['organization_id', 'npi']], 'clinical_organization', schema_name, engine, load)\n", + "\n", + "# load ehr_vendor\n", + "#show_or_load(ehr_vendor_df_renamed[['id', 'name']], 'ehr_vendor', schema_name, engine, load)\n", + "\n", + "# load endpoint_instance\n", + "#show_or_load(endpoint_df_renamed[['id', 'ehr_vendor_id', 'address', 'endpoint_connection_type_id', 'environment_type_id']], 'endpoint_instance', schema_name, engine, load)\n", + "\n", + "# load address_us\n", + "#show_or_load(address_df_renamed[['address_us_id', 'delivery_line_1','city_name','state_code','zipcode']].rename(columns={'address_us_id':'id'}), 'address_us', schema_name, engine, load)\n", + "\n", + "# load address\n", + "#show_or_load(address_df_renamed[['id', 'address_us_id']], 'address', schema_name, engine, load)\n", + "\n", + "# load individual_to_address\n", + "show_or_load(individual_to_address_df, 'individual_to_address', schema_name, engine, load)\n", + "\n", + "# load organization_to_address\n", + "show_or_load(location_df[['address_id','organization_id', 'address_use_id']], 'organization_to_address', schema_name, engine, load)\n", + "\n", + "# load location\n", + "show_or_load(location_df[['id','address_id','organization_id']], 'location', schema_name, engine, load)\n", + "\n", + "# load location_to_endpoint_instance\n", + "show_or_load(location_to_endpoint_df, 'location_to_endpoint_instance', schema_name, engine, load)\n", + "\n", + "# load provider_to_organization\n", + "show_or_load(provider_to_org_df_renamed[['individual_id', 'organization_id', 'relationship_type_id','id']], 'provider_to_organization', schema_name, engine, load)\n", + "\n", + "# load provider_to_location\n", + "show_or_load(provider_to_location_df_renamed[['location_id', 'provider_to_organization_id', 'id']], 'provider_to_location', schema_name, engine, load)\n", + "\n", + "# load provider_to_taxonomy\n", + "show_or_load(dedup_taxonomy_df, 'provider_to_taxonomy', schema_name, engine, load)\n", + "\n", + "# load provider_to_credential\n", + "###show_or_load(credential_df_renamed[['license_number', 'state_code', 'provider_to_taxonomy_id']], 'provider_to_credential', schema_name, engine, load)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "09bb20d6", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.0" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/etls/loadHalloween/utils.py b/etls/loadHalloween/utils.py new file mode 100644 index 00000000..e51848cf --- /dev/null +++ b/etls/loadHalloween/utils.py @@ -0,0 +1,17 @@ +import os +from dotenv import load_dotenv +from sqlalchemy import create_engine + + +def createEngine(): + # Get database details and create engine + load_dotenv() + username = os.getenv("SOURCE_DB_USER") + password = os.getenv("SOURCE_DB_PASSWORD") + instance = os.getenv("SOURCE_DB_HOST") + db = os.getenv("SOURCE_DB_NAME") + port = os.getenv("SOURCE_DB_PORT") + engine = create_engine( + f"postgresql+psycopg2://{username}:{password}@{instance}:{port}/{db}" + ) + return engine diff --git a/flyway/docker-compose.yml b/flyway/docker-compose.yml new file mode 100644 index 00000000..128bb0a7 --- /dev/null +++ b/flyway/docker-compose.yml @@ -0,0 +1,16 @@ +name: halloween +services: + db-migrations: + image: 'flyway/flyway:10' + env_file: + - path: .env + required: false + environment: + FLYWAY_URL: jdbc:postgresql://${NPD_DB_HOST:-db}:5432/${NPD_DB_NAME:-npd_development} + FLYWAY_USER: ${NPD_DB_USER:-postgres} + FLYWAY_PASSWORD: ${NPD_DB_PASSWORD:-postgres} + FLYWAY_PLACEHOLDERS_apiSchema: ${NPD_DB_SCHEMA:-npd} + FLYWAY_PLACEHOLDERS_superuserDefaultPassword: "" + volumes: + - '../flyway/sql:/flyway/sql' + command: migrate -environment=development -outputType=json