From 3a943f3dcad7b6936613006d3019f3096199d736 Mon Sep 17 00:00:00 2001 From: amarin <17020181+asiripanich@users.noreply.github.com> Date: Sat, 4 Apr 2020 08:07:23 +1100 Subject: [PATCH] feat: `sim_parallel()` + world now has a new field to store functional sequences --- NAMESPACE | 1 + NEWS.md | 2 +- R/World.R | 17 ++++- R/sim.R | 146 ++++++++++++++++++++++++++++++++++---- R/utils.R | 15 ++++ man/sim.Rd | 46 +++++++++++- tests/testthat/test-sim.R | 75 ++++++++++++++++++++ 7 files changed, 285 insertions(+), 17 deletions(-) diff --git a/NAMESPACE b/NAMESPACE index a809ae62..0227b746 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -108,6 +108,7 @@ export(register) export(sample_choice) export(set_active_scenario) export(sim) +export(sim_parallel) export(simulate_choice) export(test_entity) export(test_entity_ids) diff --git a/NEWS.md b/NEWS.md index 75f1ea5f..7b7ec3ab 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,6 +1,6 @@ # dymiumCore (development version) -- Added `sim()` for compiling and executing a microsimulation pipeline. +- Added `sim()` and `sim_parallel()` for compiling and executing a microsimulation pipeline. # dymiumCore 0.1.7 diff --git a/R/World.R b/R/World.R index 566939e8..380bbb8b 100644 --- a/R/World.R +++ b/R/World.R @@ -128,6 +128,7 @@ World <- R6::R6Class( checkmate::check_r6(x, classes = c("Entity", "Generic"), null.ok = FALSE), checkmate::check_r6(x, classes = c("Container", "Generic"), null.ok = FALSE), checkmate::check_r6(x, classes = c("Model", "Generic"), null.ok = FALSE), + checkmate::check_function(x, nargs = 1), checkmate::check_subset(class(x)[[1]], choices = dymiumCore::SupportedTransitionModels(), empty.ok = FALSE), @@ -172,6 +173,11 @@ World <- R6::R6Class( .listname <- ".models" } + if (inherits(x, "fseq")) { + lg$info("Adding a function object '{name}' to the `fseqs` field.") + .listname <- ".fseqs" + } + # make sure there is only one of each Entity sub class stored in entities .listnames <- names(get(.listname, envir = private)) if (name %in% .listnames) { @@ -200,6 +206,10 @@ World <- R6::R6Class( private$.models[[.pos]] <- self$get(.last_pos) names(private$.models)[.pos] <- name }, + ".fseqs" = { + private$.fseqs[[.pos]] <- self$get(.last_pos) + names(private$.fseqs)[.pos] <- name + }, stop("Something is not right please report this issue to the maintainer.")) invisible() @@ -387,6 +397,10 @@ World <- R6::R6Class( models = function() { get(".models", envir = private) }, + # @field containers a list of all [fseq] stored in World. + fseqs = function() { + get(".fseqs", envir = private) + }, scale = function() { options("dymium.simulation_scale")[[1]] } @@ -395,6 +409,7 @@ World <- R6::R6Class( private = list( .containers = list(), .entities = list(), - .models = list() + .models = list(), + .fseqs = list() ) ) diff --git a/R/sim.R b/R/sim.R index 82b68e27..54a6f53d 100644 --- a/R/sim.R +++ b/R/sim.R @@ -3,17 +3,39 @@ #' @description #' This function compiles and executes a microsimulation pipeline. #' +#' `sim` mutates the given [World] object. While, `sim_parallel` clones the +#' given [World] object and returns `n_repeats` of mutated [Worlds]. +#' +#' Note that, for `sim_parallel` all objects used in the pipeline must be stored +#' inside the world object and not in your global environment. You can store filter +#' functions as well, if you need them. +#' #' @param world (`World`)\cr #' A [World] object. #' @param pipeline (`function()`)\cr #' A functional sequence (`fseq`) object. #' @param n_iters a number of iterations. (`integer(1)`)\cr -#' Number of times the microsimulation pipeline should be repeated. +#' Number of times the microsimulation pipeline, `pipeline`, should be repeated +#' on the `world` object. #' @param write.error.dump.folder (`character(1)`)\cr #' path: Saves the dump of the workspace in a specific folder instead of the #' working directory +#' @param n_repeats (`integer(1)`)\cr +#' Number of times the entire simulation should be repeated. This is not the same as +#' `n_iters`. +#' @param n_workers (`integer(1)`)\cr +#' Number of parallel workers. This requires the `parallel` package to be installed. +#' @param DTthreads (`integer(1)`|`missing`)\cr +#' Number of cores data.table is allowed to used in an R session. If missing, +#' it will use the default number of cores set by [data.table::data.table], which +#' is usually half of the available number of cores. +#' @param .options [furrr::future_options()]\cr +#' By default this is set to `furrr::future_options(scheduling = FALSE)`. +#' See [furrr::future_options()] for more details. +#' +#' @return `sim` doesn't return anything but mutates the given [World] object. While, +#' `sim_paralell()` returns `n_tasks` number of mutated [World] objects. #' -#' @return `NULL` #' @export #' #' @examples @@ -74,18 +96,17 @@ #' #' # complie and execute a simulation pipeline #' sim(world = world, pipeline = microsimulation_pipeline, n_iters = 10) -sim <- function(world, pipeline, n_iters, write.error.dump.folder) { +sim <- + function(world, + pipeline, + n_iters = 1, + write.error.dump.folder = get_active_scenario()$output_dir, + write.error.dump.file = FALSE) { checkmate::assert_r6(world, classes = "World") checkmate::assert_function(pipeline, nargs = 1) checkmate::assert_count(n_iters, positive = TRUE) - - if (!missing(write.error.dump.folder)) { - checkmate::assert_directory_exists(write.error.dump.folder, access = "rwx") - output_dir <- write.error.dump.folder - } else { - output_dir <- get_active_scenario()$output_dir - } + checkmate::assert_directory_exists(write.error.dump.folder, access = "rwx") tryCatchLog::tryCatchLog({ for (i in 1:n_iters) { @@ -93,12 +114,113 @@ sim <- function(world, pipeline, n_iters, write.error.dump.folder) { pipeline(.) } }, - write.error.dump.file = TRUE, - write.error.dump.folder = get_active_scenario()$output_dir) + write.error.dump.file = write.error.dump.file, + write.error.dump.folder = write.error.dump.folder) invisible() } +#' @rdname sim +#' @export +sim_parallel <- + function(world, + pipeline, + n_iters = 1L, + n_repeats = 1L, + n_workers = 1L, + DTthreads = data.table::getDTthreads(), + .future_options, + write.error.dump.file = FALSE, + write.error.dump.folder = get_active_scenario()$output_dir) { + + check_package("furrr") # furrr requires future, future requires parallel. + checkmate::assert_r6(world, classes = "World") + checkmate::assert_function(pipeline, nargs = 1) + checkmate::assert_count(n_iters, positive = TRUE) + checkmate::assert_count(n_repeats, positive = TRUE) + checkmate::assert_count(n_workers, positive = TRUE) + checkmate::assert_count(DTthreads, positive = TRUE) + checkmate::assert_flag(write.error.dump.file, null.ok = FALSE, na.ok = FALSE) + checkmate::assert_directory_exists(write.error.dump.folder, access = "rwx") + + if (DTthreads > parallel::detectCores()) { + stop(sprintf("DTthreads cannot be set more than the number of cores available (%s).", + parallel::detectCores())) + } + + if (n_workers > parallel::detectCores()) { + stop(sprintf("n_workers cannot be set more than the number of cores available (%s).", + parallel::detectCores())) + } + if (missing(.future_options)) { + .future_options <- furrr::future_options(scheduling = FALSE, packages = "dymiumCore") + } else { + checkmate::assert_class(.future_options, classes = "future_options") + } + + # save world to a folder + world_saved_path <- fs::path(get_active_scenario()$input_dir, "world_tmp.rds") + message("Saving `world` at ", world_saved_path) + saveRDS(world, world_saved_path) + + # create a cluster + message("Creating a cluster of ", n_workers, " workers.") + cl <- future::makeClusterPSOCK(workers = n_workers) + future::plan(future::cluster, workers = cl) + # future::plan(future::sequential) + + # browser() + + message("Sending simualtion tasks to workers") + simulated_worlds <- + tryCatchLog::tryCatchLog({ + # assign simulation tasks to workers + furrr::future_map( + .x = seq_len(n_repeats), + seed = FALSE, + .options = .future_options, + .progress = TRUE, + .f = ~ { + data.table::setDTthreads(threads = DTthreads) + .world <- readRDS(world_saved_path) + # run simulation + for (i in 1:n_iters) { + .world$start_iter(time_step = .world$get_time() + 1L) %>% + pipeline(.) + } + return(.world) + }) + }, finally = { + if (exists("cl")) { + parallel::stopCluster(cl) + } + }, + write.error.dump.file = write.error.dump.file, + write.error.dump.folder = write.error.dump.folder, + silent.messages = TRUE, + silent.warnings = TRUE, + include.full.call.stack = FALSE, + include.compact.call.stack = FALSE) + + message("Done. ;)") + return(simulated_worlds) +} + + +fsim <- function(x, n_reps, my_func) { + cl <- future::makeClusterPSOCK(workers = 2) + future::plan(future::cluster, workers = cl) + + res <- furrr::future_map(seq_len(n_reps), + ~ { + x %>% + my_func + }) + + parallel::stopCluster(cl) + + return(res) +} diff --git a/R/utils.R b/R/utils.R index 6c9a641b..7a1e31ee 100644 --- a/R/utils.R +++ b/R/utils.R @@ -406,3 +406,18 @@ get_current_git_branch <- function() { gsub("\\*|\\ ", "", .) return(current_branch) } + + +check_package <- function(pkgname, repos = "https://cloud.r-project.org") { + if (!requireNamespace(pkgname, quietly = TRUE)) { + .choice <- + utils::menu(choices = c("Yes", "No"), + title = glue::glue("The `{pkgname}` package is required. \\ + Would you like to download and install the package now?")) + if (.choice == 1) { + install.packages(pkgname, repos = repos) + } else { + stop(sprintf("The `%s` is required but missing.", pkgname)) + } + } +} diff --git a/man/sim.Rd b/man/sim.Rd index 1fde99f2..1e21668b 100644 --- a/man/sim.Rd +++ b/man/sim.Rd @@ -2,9 +2,28 @@ % Please edit documentation in R/sim.R \name{sim} \alias{sim} +\alias{sim_parallel} \title{Compile and execute a microsimulation pipeline} \usage{ -sim(world, pipeline, n_iters, write.error.dump.folder) +sim( + world, + pipeline, + n_iters = 1, + write.error.dump.folder = get_active_scenario()$output_dir, + write.error.dump.file = FALSE +) + +sim_parallel( + world, + pipeline, + n_iters = 1L, + n_repeats = 1L, + n_workers = 1L, + DTthreads = data.table::getDTthreads(), + .future_options, + write.error.dump.file = FALSE, + write.error.dump.folder = get_active_scenario()$output_dir +) } \arguments{ \item{world}{(\code{World})\cr @@ -14,17 +33,38 @@ A \link{World} object.} A functional sequence (\code{fseq}) object.} \item{n_iters}{a number of iterations. (\code{integer(1)})\cr -Number of times the microsimulation pipeline should be repeated.} +Number of times the microsimulation pipeline, \code{pipeline}, should be repeated +on the \code{world} object.} \item{write.error.dump.folder}{(\code{character(1)})\cr path: Saves the dump of the workspace in a specific folder instead of the working directory} + +\item{n_repeats}{(\code{integer(1)})\cr +Number of times the entire simulation should be repeated. This is not the same as +\code{n_iters}.} + +\item{n_workers}{(\code{integer(1)})\cr +Number of parallel workers. This requires the \code{parallel} package to be installed.} + +\item{DTthreads}{(\code{integer(1)}|\code{missing})\cr +Number of cores data.table is allowed to used in an R session. If missing, +it will use the default number of cores set by \link[data.table:data.table]{data.table::data.table}, which +is usually half of the available number of cores.} + +\item{.options}{\code{\link[furrr:future_options]{furrr::future_options()}}\cr +By default this is set to \code{furrr::future_options(scheduling = FALSE)}. +See \code{\link[furrr:future_options]{furrr::future_options()}} for more details.} } \value{ -\code{NULL} +\code{sim} doesn't return anything but mutates the given \link{World} object. While, +\code{sim_paralell()} returns \code{n_tasks} number of mutated \link{World} objects. } \description{ This function compiles and executes a microsimulation pipeline. + +\code{sim} mutates the given \link{World} object. While, \code{sim_parallel} clones the +given \link{World} object and returns \code{n_repeats} of mutated \link{Worlds}. } \examples{ diff --git a/tests/testthat/test-sim.R b/tests/testthat/test-sim.R index f1fdd8e0..2cd12e29 100644 --- a/tests/testthat/test-sim.R +++ b/tests/testthat/test-sim.R @@ -65,3 +65,78 @@ test_that("sim works", { "Assertion on 'pipeline' failed: Must be a function") }) + +test_that("sim_parallel works", { + + testthat::skip("still experimental") + + # create simple models + birth_model <- list(yes = 0.1, no = 0.9) + death_model <- list(yes = 0.1, no = 0.9) + + # prepare population data + ind_data <- + data.table::copy(toy_individuals) %>% + .[, .give_birth := "no"] + + # create filters, this is a method for creating functions using `magrittr` and + # data.table's syntax + filter_alive <- + . %>% + .[age != -1] + + filter_eligible_females <- + . %>% + .[sex == "female" & age %in% c(18:50)] %>% + filter_alive + + ?magrittr::use_series() + + # create a World object, a container for all entities and models for simulation + world <- World$new() + world$add(x = Individual$new(.data = ind_data, id_col = "pid")) + world$add(death_model, name = "death_model") + world$add(birth_model, name = "birth_model") + world$add(filter_alive, name = "filter_alive") + world$add(filter_eligible_females, name = "filter_eligible_female") + + microsimulation_pipeline <- + . %>% + # ageing + mutate_entity(entity = "Individual", + age := age + 1L, + subset = age != -1L) %>% + # simulate birth decision + transition(entity = "Individual", + model = .$models$birth_model, + attr = ".give_birth", + preprocessing_fn = .$fseqs$filter_eligible_female) %>% + # add newborns + add_entity(entity = "Individual", + newdata = toy_individuals[age == 0, ], + target = .$entities$Individual$get_data()[.give_birth == "yes", .N]) %>% + # reset the birth decision variable + mutate_entity(entity = "Individual", + .give_birth := "no", + subset = age != -1L) %>% + # simulate deaths + transition(entity = "Individual", + model = .$models$death_model, + attr = "age", + values = c(yes = -1L), + preprocessing_fn = .$fseqs$filter_alive) %>% + # log the total number of alive individuals at the end of the iteration + add_log(desc = "count:Individual", + value = .$entities$Individual$get_data()[age != -1L, .N]) + + # complie and execute a simulation pipeline + # sim(world = world, pipeline = microsimulation_pipeline, n_iters = 10) + + # parallel sim + simulated_worlds <- sim_parallel(world = world, + pipeline = microsimulation_pipeline, + n_iters = 5, + n_repeats = 2, + n_workers = 2) + +})