-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathwrite-aggregate-state-demographic-data.handler.ts
More file actions
93 lines (88 loc) · 3.66 KB
/
write-aggregate-state-demographic-data.handler.ts
File metadata and controls
93 lines (88 loc) · 3.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import { RDSDataService } from 'aws-sdk';
import { BatchExecuteStatementRequest, SqlParametersList } from 'aws-sdk/clients/rdsdataservice';
import { geoJsonHandlerFactory } from './handler-factory';
import { AggregateUsDemographicTableRowBuilder, GeoType } from '../model/aggregate_us_demographic_table';
// As of 2022-08-05, this should have 51 unique rows.
const s3Params = {
Bucket: 'opendataplatformapistaticdata/demographics',
Key: 'state_demographics.geojson',
};
const SCHEMA = 'public';
/**
* Writes rows into the aggregate US demographics table for counties.
*
* @param rdsService: RDS service to connect to the db.
* @param rows: Rows to write to the db.
*/
async function insertBatch(
rdsService: RDSDataService,
rows: SqlParametersList[],
): Promise<RDSDataService.BatchExecuteStatementResponse> {
const batchExecuteParams: BatchExecuteStatementRequest = {
database: process.env.DATABASE_NAME ?? 'postgres',
parameterSets: rows,
resourceArn: process.env.RESOURCE_ARN ?? '',
schema: SCHEMA,
secretArn: process.env.CREDENTIALS_SECRET ?? '',
sql: `INSERT INTO aggregate_us_demographics (census_geo_id,
geo_type,
name,
median_year_built,
median_income,
home_age_index,
income_index,
weighted_national_adi,
weighted_state_adi,
population_count,
geom)
SELECT :census_geo_id,
:geo_type,
s.name,
:median_year_built,
:median_income,
:home_age_index,
:income_index,
:weighted_national_adi,
:weighted_state_adi,
:population_count,
ST_AsText(ST_GeomFromGeoJSON(:geom))
FROM states s
WHERE s.usps like :usps ON CONFLICT (census_geo_id) DO NOTHING`,
};
return rdsService.batchExecuteStatement(batchExecuteParams).promise();
}
/**
* Maps a data row to a table row ready to write to the db.
* @param row: row with all data needed to build a [AggregateUsDemographicTableRow].
*/
function getTableRowFromRow(row: any): SqlParametersList {
const value = row.value;
const properties = value.properties;
return (
new AggregateUsDemographicTableRowBuilder()
// Substring of AFFGEOID which is just state census geo ID.
.censusGeoId(properties.AFFGEOID?.substring(9, 11))
.geoType(GeoType.State)
.usps(properties.STUSPS ?? '')
.medianYearBuilt(properties.median_yearbuilt.toString() ?? '')
.medianIncome(properties.median_income ?? 0)
.homeAgeIndex(properties.homeage_index ?? 0)
.incomeIndex(properties.income_index ?? 0)
.weightedNationalAdi(properties.weighted_nat_adi ?? 0)
.weightedStateAdi(properties.weighted_state_adi ?? 0)
.populationCount(properties.RaceTotal ?? 0)
// Keep JSON formatting. Post-GIS helpers depend on this.
.geom(JSON.stringify(value.geometry))
.build()
);
}
/**
* Parses S3 'state_demographics.geojson' file and writes rows
* to aggregate_us_demographics table in the MainCluster postgres db.
*/
export const handler = geoJsonHandlerFactory(
s3Params,
async (rows: any[], rdsDataService: RDSDataService) => {
await insertBatch(rdsDataService, rows.map(getTableRowFromRow));
},
);