Skip to content

Commit dde01f8

Browse files
committed
Add automatic site chunking
1 parent 2fde691 commit dde01f8

11 files changed

Lines changed: 189 additions & 261 deletions

NEWS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
dataRetrieval 2.7.24
22
===================
3+
* Let dataRetrieval take care of chunking up requests by monitoring_location_id.
34

45
dataRetrieval 2.7.23
56
===================

R/get_ogc_data.R

Lines changed: 73 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -3,68 +3,89 @@
33
#' @param args arguments from individual functions
44
#' @param output_id Name of id column to return
55
#' @param service Endpoint name.
6+
#' @param split_into Number of monitoring_location_ids to chunk requests into.
67
#'
78
#' @noRd
89
#' @return data.frame with attributes
910
get_ogc_data <- function(args,
1011
output_id,
11-
service){
12+
service,
13+
split_into = 250){
1214

13-
args <- switch_arg_id(args,
14-
id_name = output_id,
15-
service = service)
16-
17-
args <- check_limits(args)
18-
19-
properties <- args[["properties"]]
20-
args[["properties"]] <- switch_properties_id(properties,
21-
id = output_id)
22-
convertType <- args[["convertType"]]
23-
args[["convertType"]] <- NULL
24-
args[["service"]] <- service
25-
26-
req <- do.call(construct_api_requests, args)
2715

28-
no_paging <- grepl("f=csv", req$url)
29-
30-
message("Requesting:\n", req$url)
31-
32-
if(no_paging){
33-
return_list <- get_csv(req, limit = args[["limit"]])
34-
} else {
35-
return_list <- walk_pages(req)
36-
}
37-
38-
if(is.na(args[["skipGeometry"]])){
39-
skipGeometry <- FALSE
16+
if(length(args[["monitoring_location_id"]]) > split_into){
17+
18+
ml_splits <- split(args[["monitoring_location_id"]],
19+
ceiling(seq_along(args[["monitoring_location_id"]])/split_into))
20+
21+
rl <- lapply(ml_splits, function(x) {
22+
args[["monitoring_location_id"]] <- x
23+
get_ogc_data(args = args,
24+
output_id = output_id,
25+
service = service)})
26+
rl_filtered <- rl[sapply(rl, function(x) dim(x)[1]) > 0]
27+
28+
return_list <- do.call(rbind, rl_filtered)
29+
4030
} else {
41-
skipGeometry <- args[["skipGeometry"]]
42-
}
43-
44-
return_list <- deal_with_empty(return_list, properties, service,
45-
skipGeometry, convertType, no_paging)
46-
47-
return_list <- rejigger_cols(return_list, properties, output_id)
31+
args[["chunk_sites_by"]] <- NULL
32+
33+
args <- switch_arg_id(args,
34+
id_name = output_id,
35+
service = service)
36+
37+
args <- check_limits(args)
38+
39+
properties <- args[["properties"]]
40+
args[["properties"]] <- switch_properties_id(properties,
41+
id = output_id)
42+
convertType <- args[["convertType"]]
43+
args[["convertType"]] <- NULL
44+
args[["service"]] <- service
45+
46+
req <- do.call(construct_api_requests, args)
4847

49-
if(convertType){
50-
return_list <- cleanup_cols(return_list, service)
51-
return_list <- order_results(return_list)
52-
53-
# Mostly drop the id column except ts-meta, monitoring location:
54-
if(!service %in% c("monitoring-locations",
55-
"time-series-metadata",
56-
"field-measurements-metadata",
57-
"combined-metadata",
58-
"parameter-codes")){
59-
return_list <- return_list[, names(return_list)[names(return_list)!= output_id]]
48+
no_paging <- grepl("f=csv", req$url)
49+
50+
message("Requesting:\n", req$url)
51+
52+
if(no_paging){
53+
return_list <- get_csv(req, limit = args[["limit"]])
54+
} else {
55+
return_list <- walk_pages(req)
56+
}
57+
58+
if(is.na(args[["skipGeometry"]])){
59+
skipGeometry <- FALSE
60+
} else {
61+
skipGeometry <- args[["skipGeometry"]]
62+
}
63+
64+
return_list <- deal_with_empty(return_list, properties, service,
65+
skipGeometry, convertType, no_paging)
66+
67+
return_list <- rejigger_cols(return_list, properties, output_id)
68+
69+
if(convertType){
70+
return_list <- cleanup_cols(return_list, service)
71+
return_list <- order_results(return_list)
72+
73+
# Mostly drop the id column except ts-meta, monitoring location:
74+
if(!service %in% c("monitoring-locations",
75+
"time-series-metadata",
76+
"field-measurements-metadata",
77+
"combined-metadata",
78+
"parameter-codes")){
79+
return_list <- return_list[, names(return_list)[names(return_list)!= output_id]]
80+
}
81+
# Move other id columns:
82+
return_list <- move_id_col(return_list,
83+
output_id)
84+
}
85+
86+
if(getOption("dataRetrieval.attach_request")){
87+
attr(return_list, "request") <- req
6088
}
61-
# Move other id columns:
62-
return_list <- move_id_col(return_list,
63-
output_id)
64-
}
65-
66-
if(getOption("dataRetrieval.attach_request")){
67-
attr(return_list, "request") <- req
6889
}
6990

7091
attr(return_list, "queryTime") <- Sys.time()

R/read_waterdata_combined_meta.R

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,12 @@
135135
#' hydrologic_unit_code = c("11010008", "11010009"),
136136
#' site_type = c("Stream", "Spring")
137137
#' )
138+
#'
139+
#' site_list <- read_waterdata_combined_meta(
140+
#' monitoring_location_id = hucs$monitoring_location_id
141+
#' )
142+
#'
143+
#'
138144
#' }
139145
read_waterdata_combined_meta <- function(monitoring_location_id = NA_character_,
140146
parameter_code = NA_character_,

R/read_waterdata_monitoring_location.R

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,14 @@
8080
#' properties = c("monitoring_location_id",
8181
#' "state_name",
8282
#' "county_name",
83-
#' "country_name"),
83+
#' "country_name",
84+
#' "site_type"),
85+
#' site_type = "Well",
8486
#' skipGeometry = TRUE)
8587
#'
86-
#' site_info_no_sf <- read_waterdata_monitoring_location(monitoring_location_id = site,
87-
#' skipGeometry = TRUE)
88+
#' site_info_no_sf <- read_waterdata_monitoring_location(
89+
#' monitoring_location_id = site_slim_no_sf_slim$monitoring_location_id[1:1000],
90+
#' skipGeometry = TRUE)
8891
#'
8992
#' bbox_vals = c(-94.00, 35.0, -93.5, 35.5)
9093
#' multi_site <- read_waterdata_monitoring_location(bbox = bbox_vals)
@@ -139,8 +142,9 @@ read_waterdata_monitoring_location <- function(monitoring_location_id = NA_chara
139142

140143
args <- mget(names(formals()))
141144
args[["convertType"]] <- FALSE
145+
142146
return_list <- get_ogc_data(args,
143-
output_id,
147+
output_id,
144148
service)
145149

146150
return(return_list)

R/walk_pages.R

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,12 @@ walk_pages <- function(req){
3030
get_resp_data <- function(resp) {
3131

3232
body <- httr2::resp_body_json(resp)
33-
33+
use_sf <- !grepl("skipGeometry=true", resp$url, ignore.case = TRUE)
34+
3435
if(isTRUE(body[["numberReturned"]] == 0)){
3536
return(data.frame())
3637
}
3738

38-
use_sf <- !grepl("skipGeometry=true", resp$url, ignore.case = TRUE)
3939
return_df <- sf::read_sf(httr2::resp_body_string(resp))
4040

4141
included_num_cols <- names(return_df)[names(return_df) %in% num_cols]
@@ -46,7 +46,10 @@ get_resp_data <- function(resp) {
4646

4747
if("qualifier" %in% names(return_df)){
4848
return_df$qualifier <- as.character(vapply(X = return_df$qualifier,
49-
FUN = function(x) paste(x, collapse = ", "),
49+
FUN = function(x) {
50+
x[is.na(x)] <- ""
51+
paste(x, collapse = ", ")
52+
},
5053
FUN.VALUE = c(NA_character_)))
5154
}
5255

man/read_waterdata_channel.Rd

Lines changed: 4 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

man/read_waterdata_combined_meta.Rd

Lines changed: 8 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

man/read_waterdata_continuous.Rd

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

man/read_waterdata_monitoring_location.Rd

Lines changed: 7 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tests/testthat/tests_userFriendly_fxns.R

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -148,19 +148,17 @@ test_that("read_waterdata_daily", {
148148

149149
expect_true(length(unique(raw_waterdata_MultiSites$monitoring_location_id)) == 2)
150150

151-
site <- "05212700"
152-
153-
notActiveUSGS <- read_waterdata_daily(monitoring_location_id = paste0("USGS-", site),
154-
parameter_code = "00060",
155-
time = c("2014-01-01", "2014-01-07"))
156-
expect_true(nrow(notActiveUSGS) == 0)
157-
expect_type(notActiveUSGS$value, "double")
158-
notActiveUSGS2 <- read_waterdata_daily(monitoring_location_id = paste0("USGS-", site),
159-
parameter_code = "00060",
160-
convertType = FALSE,
161-
time = c("2014-01-01", "2014-01-07"))
162-
expect_type(notActiveUSGS2$value, "character")
163-
151+
mixed_qualifiers <- read_waterdata_continuous(monitoring_location_id = "USGS-01648010",
152+
parameter_code =c("00010","00095","00400","00300","63680"),
153+
time ="2026-01-01T00:00:00Z/2026-03-13T00:00:00Z")
154+
expect_all_true(unique(mixed_qualifiers$qualifier) %in% c("", "EQUIP", "LESSTHAN"))
155+
156+
# Unclear if these qualifiers will remain persistent:
157+
# ts_id <- "fa44702f2bd64d9fa493608de9081cbe"
158+
# multi_q <- read_waterdata_continuous(time_series_id = ts_id)
159+
# expect_all_true(unique(multi_q$qualifier) %in% c("REGULATED, UNKNOWNREGULATION",
160+
# "ESTIMATED, REGULATED, UNKNOWNREGULATION",
161+
# "ICE, REGULATED, UNKNOWNREGULATION"))
164162
})
165163

166164
test_that("WQP qw tests", {

0 commit comments

Comments
 (0)