Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions ixa-bench/criterion/sample_entity_scaling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ define_entity!(Mosquito);
define_property!(struct Species(u8), Mosquito);
define_property!(struct Region(u8), Mosquito);
define_multi_property!((Species, Region), Mosquito);
define_property!(struct Unindexed10(u8), Mosquito);

const POPULATION_SIZES: [usize; 3] = [1_000, 10_000, 100_000];

Expand All @@ -31,6 +32,7 @@ fn setup_context(population_size: usize) -> Context {
.add_entity((
Species(context.sample_range(SampleScalingRng, 0..10)),
Region(context.sample_range(SampleScalingRng, 0..10)),
Unindexed10(context.sample_range(SampleScalingRng, 0..10)),
))
.unwrap();
}
Expand Down Expand Up @@ -156,12 +158,37 @@ pub fn bench_sample_entity_multi_property_indexed(c: &mut Criterion, results: Re
group.finish();
}

// Sampling one entity when the query is on an unindexed property. The source iterator is a
// PropertyVecIter, which must scan the property's value vector.
pub fn bench_sample_entity_single_property_unindexed(c: &mut Criterion, results: Results) {
let bench_name = "sample_entity_single_property_unindexed";
let mut group = c.benchmark_group(bench_name);

for &size in &POPULATION_SIZES {
let context = setup_context(size);

group.bench_with_input(BenchmarkId::from_parameter(size), &size, |b, _| {
let ns = bench_ns_per_sample(b, || {
let _ = context.sample_entity(SampleScalingRng, (Unindexed10(5),));
});

results
.lock()
.unwrap()
.insert((bench_name.to_string(), size), ns);
});
}

group.finish();
}

fn sample_entity_scaling(c: &mut Criterion) {
let results: Results = Arc::new(Mutex::new(BTreeMap::new()));

bench_sample_entity_whole_population(c, results.clone());
bench_sample_entity_single_property_indexed(c, results.clone());
bench_sample_entity_multi_property_indexed(c, results.clone());
bench_sample_entity_single_property_unindexed(c, results.clone());

// Prints a scaling summary at the end like:
// === Scaling summary: sample_entity_whole_population ===
Expand All @@ -174,6 +201,7 @@ fn sample_entity_scaling(c: &mut Criterion) {
print_scaling_summary(&results, "sample_entity_whole_population");
print_scaling_summary(&results, "sample_entity_single_property_indexed");
print_scaling_summary(&results, "sample_entity_multi_property_indexed");
print_scaling_summary(&results, "sample_entity_single_property_unindexed");
}

criterion_group!(benches, sample_entity_scaling);
Expand Down
32 changes: 32 additions & 0 deletions ixa-bench/criterion/sample_people.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const SEED: u64 = 42;
define_entity!(Person);
define_property!(struct Property10(u8), Person);
define_property!(struct Property100(u8), Person);
define_property!(struct Unindexed10(u8), Person);

fn setup() -> (Context, Vec<u8>) {
let mut rng = StdRng::seed_from_u64(SEED);
Expand All @@ -36,6 +37,7 @@ fn setup() -> (Context, Vec<u8>) {
.add_entity((
Property10(context.sample_range(SampleBenchRng, 0..10)),
Property100(context.sample_range(SampleBenchRng, 0..100)),
Unindexed10(context.sample_range(SampleBenchRng, 0..10)),
))
.unwrap();
}
Expand Down Expand Up @@ -106,6 +108,36 @@ pub fn criterion_benchmark(criterion: &mut Criterion) {
});
});

// Sampling one entity when the query is on an unindexed property. The source iterator is a
// PropertyVecIter, which must scan the property's value vector.
criterion.bench_function("sampling_single_unindexed_entities", |bencher| {
bencher.iter(|| {
let counts = black_box(&counts);

for value in counts {
let _selected = black_box(
context.sample_entity(SampleBenchRng, black_box((Unindexed10(*value % 10),))),
);
}
});
});

// Sampling several entities when the query is on an unindexed property. The source iterator is a
// PropertyVecIter, which must scan the property's value vector.
criterion.bench_function("sampling_multiple_unindexed_entities", |bencher| {
bencher.iter(|| {
let counts = black_box(&counts);

for value in counts {
let _selected = black_box(context.sample_entities(
SampleBenchRng,
black_box((Unindexed10(*value % 10),)),
*black_box(value) as usize,
));
}
});
});

criterion.finish()
}

Expand Down
15 changes: 13 additions & 2 deletions src/entity/entity_set/entity_set_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use rand::Rng;
use crate::entity::entity_set::source_set::{SourceIterator, SourceSet};
use crate::entity::{Entity, EntityId, PopulationIterator};
use crate::hashing::IndexSet;
use crate::random::{sample_multiple_l_reservoir, sample_single_l_reservoir};
use crate::random::{
sample_multiple_from_known_length, sample_multiple_l_reservoir, sample_single_l_reservoir,
};

/// An iterator over the IDs in an entity set, producing `EntityId<E>`s until exhausted.
pub struct EntitySetIterator<'c, E: Entity> {
Expand Down Expand Up @@ -101,7 +103,16 @@ impl<'c, E: Entity> EntitySetIterator<'c, E> {
where
R: Rng,
{
sample_multiple_l_reservoir(rng, self, requested)
match self.size_hint() {
(lower, Some(upper)) if lower == upper => {
if lower == 0 {
warn!("Requested a sample of entities from an empty population");
return vec![];
}
sample_multiple_from_known_length(rng, self, requested)
}
_ => sample_multiple_l_reservoir(rng, self, requested),
}
}
}

Expand Down
143 changes: 83 additions & 60 deletions src/random/sampling_algorithms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,30 @@
use crate::rand::seq::index::sample as choose_range;
use crate::rand::Rng;

/// Sample a random element uniformly from a container of known length.
/// Samples one element uniformly at random from an iterator whose length is known at runtime.
///
/// We do not assume the container is randomly indexable, only that it can be iterated over.
/// This algorithm is used when the property is indexed, and thus we know the length of the result set.
/// The caller must ensure that `(len, Some(len)) == iter.size_hint()`, i.e. the iterator
/// reports its exact length via `size_hint`. We do not require `ExactSizeIterator`
/// because that is a compile-time guarantee, whereas our requirement is a runtime condition.
///
/// The implementation selects a random index and uses `Iterator::nth`. For iterators
/// with O(1) `nth` (e.g., randomly indexable structures), this is very efficient.
/// The selected value is cloned.
///
/// The iterator need only support iteration; random indexing is not required.
/// This function is intended for use when the result set is indexed and its length is known.
pub fn sample_single_from_known_length<I, R, T>(rng: &mut R, mut iter: I) -> Option<T>
where
R: Rng,
I: Iterator<Item = T> + ExactSizeIterator<Item = T>,
I: Iterator<Item = T>,
{
let len = iter.len();
if len == 0 {
// It is the caller's responsibility to ensure that `(len, Some(len)) == iter.size_hint()`.
let (length, _) = iter.size_hint();
if length == 0 {
return None;
}
// This little trick with `u32` makes this function 30% faster.
let index = rng.random_range(0..len as u32) as usize;
let index = rng.random_range(0..length as u32) as usize;
// The set need not be randomly indexable, so we have to use the `nth` method.
iter.nth(index)
}
Expand All @@ -33,61 +42,72 @@ where
///
/// This algorithm is significantly slower than the "known length" algorithm (factor
/// of 10^4). The reservoir algorithm from [`rand`](crate::rand) reduces to the "known length"
/// algorithm when the iterator is an [`ExactSizeIterator`](std::iter::ExactSizeIterator), or more precisely,
/// when `iterator.size_hint()` returns `(k, Some(k))` for some `k`. Otherwise,
/// algorithm when `iterator.size_hint()` returns `(k, Some(k))` for some `k`. Otherwise,
/// this algorithm is much faster than the [`rand`](crate::rand) implementation (factor of 100).
pub fn sample_single_l_reservoir<I, R, T>(rng: &mut R, iterable: I) -> Option<T>
where
R: Rng,
I: IntoIterator<Item = T>,
{
let mut chosen_item: Option<T> = None; // the currently selected element
let mut iter = iterable.into_iter();
let mut weight: f64 = rng.random_range(0.0..1.0); // controls skip distance distribution
let mut position: usize = 0; // current index in data
let mut next_pick_position: usize = 1; // index of the next item to pick

iterable.into_iter().for_each(|item| {
position += 1;
if position == next_pick_position {
chosen_item = Some(item);
next_pick_position +=
(f64::ln(rng.random_range(0.0..1.0)) / f64::ln(1.0 - weight)).floor() as usize + 1;
weight *= rng.random_range(0.0..1.0);
let mut chosen_item: T = iter.next()?; // the currently selected element

// Number of elements to skip before the next candidate to consider for the reservoir.
// `iter.nth(skip)` skips `skip` elements and returns the next one.
let mut skip = (f64::ln(rng.random_range(0.0..1.0)) / f64::ln(1.0 - weight)).floor() as usize;
weight *= rng.random_range(0.0..1.0);

loop {
match iter.nth(skip) {
Some(item) => {
chosen_item = item;
skip =
(f64::ln(rng.random_range(0.0..1.0)) / f64::ln(1.0 - weight)).floor() as usize;
weight *= rng.random_range(0.0..1.0);
}
None => return Some(chosen_item),
}
});

chosen_item
}
}

/// Sample multiple random elements uniformly without replacement from a container of known length.
/// This function assumes `set.len() >= requested`.
/// Samples `requested` elements uniformly at random without replacement from an iterator
/// whose length is known at runtime. Requires `len >= requested`.
///
/// The caller must ensure that `(len, Some(len)) == iter.size_hint()`, i.e. the iterator
/// reports its exact length via `size_hint`. We do not require `ExactSizeIterator`
/// because that is a compile-time guarantee, whereas our requirement is a runtime condition.
///
/// We do not assume the container is randomly indexable, only that it can be iterated over. The values are cloned.
/// The implementation selects random indices and uses `Iterator::nth`. For iterators
/// with O(1) `nth` (e.g., randomly indexable structures), this is very efficient.
/// Selected values are cloned.
///
/// This algorithm can be used when the property is indexed, and thus we know the length of the result set.
/// For very small `requested` values (<=5), this algorithm is faster than reservoir because it doesn't
/// iterate over the entire set.
/// This strategy is particularly effective for small `requested` (≤ 5), since it
/// avoids iterating over the entire set and is typically faster than reservoir sampling.
pub fn sample_multiple_from_known_length<I, R, T>(rng: &mut R, iter: I, requested: usize) -> Vec<T>
where
R: Rng,
I: IntoIterator<Item = T> + ExactSizeIterator<Item = T>,
I: IntoIterator<Item = T>,
{
let mut iter = iter.into_iter();
// It is the caller's responsibility to ensure that `(length, Some(length)) == iter.size_hint()`.
let (length, _) = iter.size_hint();

let mut indexes = Vec::with_capacity(requested);
indexes.extend(choose_range(rng, iter.len(), requested));
indexes.extend(choose_range(rng, length, requested));
indexes.sort_unstable();
let mut index_iterator = indexes.into_iter();
let mut next_idx = index_iterator.next().unwrap();

let mut selected = Vec::with_capacity(requested);
let mut consumed: usize = 0; // number of elements consumed from the iterator so far

for (idx, item) in iter.enumerate() {
if idx == next_idx {
// `iter.nth(n)` skips `n` elements and returns the next one, so to reach
// index `idx` we skip `idx - consumed` where `consumed` tracks how many
// elements have already been consumed.
for idx in indexes {
if let Some(item) = iter.nth(idx - consumed) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused by this change, why don't you break early anymore

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously we iterated over the elements of iter (the source set), checked if we found the next index, and if we did land on the next index, updated the next index from the list of precomputed indexes. We break if there are no more indexes.

In the new version, we iterate over the precomputed indexes. There's no need to break, because it's implicit in the for idx in indexes. We move through the iter iterator by calling iter.nth on it.

selected.push(item);
if let Some(i) = index_iterator.next() {
next_idx = i;
} else {
break;
}
}
consumed = idx + 1;
}

selected
Expand All @@ -96,7 +116,8 @@ where
/// Sample multiple random elements uniformly without replacement from a container of unknown length. If
/// more samples are requested than are in the set, the function returns as many items as it can.
///
/// We do not assume the container is randomly indexable, only that it can be iterated over. The values are cloned.
/// The implementation uses `Iterator::nth`. Randomly indexable structures will have a O(1) `nth`
/// implementation and will be very efficient. The values are cloned.
///
/// This function implements "Algorithm L" from KIM-HUNG LI
/// Reservoir-Sampling Algorithms of Time Complexity O(n(1 + log(N/n)))
Expand All @@ -115,32 +136,34 @@ where

let mut weight: f64 = rng.random_range(0.0..1.0); // controls skip distance distribution
weight = weight.powf(1.0 / requested as f64);
let mut position: usize = 0; // current index in data
let mut next_pick_position: usize = 1; // index of the next item to pick
let mut reservoir = Vec::with_capacity(requested); // the sample reservoir

iter.into_iter().for_each(|item| {
position += 1;
if position == next_pick_position {
if reservoir.len() == requested {
let mut iter = iter.into_iter();
let mut reservoir: Vec<T> = iter.by_ref().take(requested).collect(); // the sample reservoir

if reservoir.len() < requested {
return reservoir;
}

// Number of elements to skip before the next candidate to consider for the reservoir.
// `iter.nth(skip)` skips `skip` elements and returns the next one.
let mut skip = (f64::ln(rng.random_range(0.0..1.0)) / f64::ln(1.0 - weight)).floor() as usize;
let uniform_random: f64 = rng.random_range(0.0..1.0);
weight *= uniform_random.powf(1.0 / requested as f64);

loop {
match iter.nth(skip) {
Some(item) => {
let to_remove = rng.random_range(0..reservoir.len());
reservoir.swap_remove(to_remove);
}
reservoir.push(item);
reservoir.push(item);

if reservoir.len() == requested {
next_pick_position += (f64::ln(rng.random_range(0.0..1.0)) / f64::ln(1.0 - weight))
.floor() as usize
+ 1;
skip =
(f64::ln(rng.random_range(0.0..1.0)) / f64::ln(1.0 - weight)).floor() as usize;
let uniform_random: f64 = rng.random_range(0.0..1.0);
weight *= uniform_random.powf(1.0 / requested as f64);
} else {
next_pick_position += 1;
}
None => return reservoir,
}
});

reservoir
}
}

#[cfg(test)]
Expand Down