Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ install: install-env-run install-env-docs install-env-test
install-env-run:
@echo "👷‍♂️ $(BLUE)creating virtual environment $(PROJECT)-run$(NC)"
pyenv local --unset
-pyenv virtualenv $(PROJECT)-run > /dev/null
-pyenv virtualenv $(word 1,$(PYTHON_VERSIONS)) $(PROJECT)-run > /dev/null
pyenv local $(PROJECT)-run
pip install --no-user -U pip > /dev/null
pip install --no-user -r requirements.txt > /dev/null
Expand All @@ -39,7 +39,7 @@ install-env-run:
install-env-docs:
@echo "👷‍♂️ $(BLUE)creating virtual environment $(PROJECT)-docs$(NC)"
pyenv local --unset
-pyenv virtualenv $(PROJECT)-docs > /dev/null
-pyenv virtualenv $(word 1,$(PYTHON_VERSIONS)) $(PROJECT)-docs > /dev/null
pyenv local $(PROJECT)-docs
pip install --no-user -U pip > /dev/null
pip install --no-user -r requirements.docs.txt > /dev/null
Expand Down
15 changes: 8 additions & 7 deletions QuantileFlow/ddsketch/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,17 @@ def insert(self, value: Union[int, float]) -> None:
Raises:
ValueError: If value is negative and cont_neg is False.
"""
if value == 0:
self.zero_count += 1
elif value > 0:
if value > 0:
bucket_idx = self.mapping.compute_bucket_index(value)
self.positive_store.add(bucket_idx)
elif value < 0 and self.cont_neg:
bucket_idx = self.mapping.compute_bucket_index(-value)
self.negative_store.add(bucket_idx)
elif value < 0:
raise ValueError("Negative values not supported when cont_neg is False")
if self.cont_neg:
bucket_idx = self.mapping.compute_bucket_index(-value)
self.negative_store.add(bucket_idx)
else:
raise ValueError("Negative values not supported when cont_neg is False")
else:
self.zero_count += 1
self.count += 1

def delete(self, value: Union[int, float]) -> None:
Expand Down
7 changes: 2 additions & 5 deletions QuantileFlow/ddsketch/mapping/cubic_interpolation.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,6 @@ def _cubic_interpolation(self, s: float) -> float:
return s * (self.C + s * (self.B + s * self.A))

def compute_bucket_index(self, value: float) -> int:
if value <= 0:
raise ValueError("Value must be positive")

# Get binary exponent and normalized significand
exponent, significand = self._extract_exponent_and_significand(value)

Expand All @@ -73,7 +70,7 @@ def compute_bucket_index(self, value: float) -> int:
# where m is the optimal multiplier, e is the exponent,
# P(s) is the cubic interpolation, and γ is (1+α)/(1-α)
index = self.m * (exponent + interpolated) / self.log2_gamma
return int(math.ceil(index))
return math.ceil(index)

def compute_value_from_index(self, index: float) -> float:
"""
Expand All @@ -84,7 +81,7 @@ def compute_value_from_index(self, index: float) -> float:
target = (index * self.log2_gamma) / self.m

# Extract integer and fractional parts
e = int(math.floor(target))
e = math.floor(target)
f = target - e

# If f is close to 0 or 1, return power of 2 directly
Expand Down
7 changes: 2 additions & 5 deletions QuantileFlow/ddsketch/mapping/linear_interpolation.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ def _extract_exponent(self, value: float) -> tuple[int, float]:
return exponent, normalized_fraction

def compute_bucket_index(self, value: float) -> int:
if value <= 0:
raise ValueError("Value must be positive")

# Get binary exponent and normalized fraction
exponent, normalized_fraction = self._extract_exponent(value)

Expand All @@ -42,7 +39,7 @@ def compute_bucket_index(self, value: float) -> int:

# Compute final index
log2_value = exponent + log2_fraction
return int(math.ceil(log2_value / self.log_gamma))
return math.ceil(log2_value / self.log_gamma)

def compute_value_from_index(self, index: int) -> float:
"""
Expand All @@ -58,7 +55,7 @@ def compute_value_from_index(self, index: int) -> float:
log2_value = index * self.log_gamma

# Extract the integer and fractional parts of log2_value
exponent = int(math.floor(log2_value) + 1)
exponent = math.floor(log2_value) + 1
mantissa = (log2_value - exponent + 2) / 2.0

# Use ldexp to efficiently compute 2^exponent * mantissa
Expand Down
8 changes: 4 additions & 4 deletions QuantileFlow/ddsketch/mapping/logarithmic.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@
from .base import MappingScheme

class LogarithmicMapping(MappingScheme):
def __init__(self, relative_accuracy: float):
__slots__ = ('relative_accuracy', 'gamma', 'multiplier')

def __init__(self, relative_accuracy: float):
self.relative_accuracy = relative_accuracy
self.gamma = (1 + relative_accuracy) / (1 - relative_accuracy)
self.multiplier = 1 / math.log(self.gamma)

def compute_bucket_index(self, value: float) -> int:
if value <= 0:
raise ValueError(f"Value must be positive, got {value}")
# ceil(log_gamma(value) = ceil(log(value) / log(gamma))
return int(math.ceil(math.log(value) * self.multiplier))
return math.ceil(math.log(value) * self.multiplier)

def compute_value_from_index(self, index: int) -> float:
# Return geometric mean of bucket boundaries
Expand Down
46 changes: 17 additions & 29 deletions QuantileFlow/ddsketch/storage/contiguous.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ class ContiguousStorage(Storage):
- If inserting below min: collapse if range too large, otherwise adjust min
- If inserting above max: collapse lowest buckets to make room
"""

__slots__ = ('total_count', 'counts', 'min_index', 'max_index',
'num_buckets', 'arr_index_of_min_bucket', 'collapse_count',
'max_buckets', 'bucket_mask', 'strategy')

def __init__(self, max_buckets: int = 2048):
"""
Expand All @@ -28,26 +32,13 @@ def __init__(self, max_buckets: int = 2048):
super().__init__(max_buckets, BucketManagementStrategy.FIXED)
self.total_count = 0
self.counts = np.zeros(max_buckets, dtype=np.int64)
self.max_buckets = max_buckets
self.min_index = None # Minimum bucket index seen
self.max_index = None # Maximum bucket index seen
self.num_buckets = 0 # Number of non-zero buckets
self.arr_index_of_min_bucket = 0 # Array index where min bucket is stored
self.collapse_count = 0 # Number of times buckets have been collapsed

def _get_position(self, bucket_index: int) -> int:
"""
Get array position for bucket index using new mapping formula.

Args:
bucket_index: The bucket index to map to array position.

Returns:
The array position in the circular buffer.
"""
if self.min_index is None:
return 0
return (bucket_index - self.min_index + self.arr_index_of_min_bucket) % len(self.counts)

def add(self, bucket_index: int, count: int = 1):
"""
Add count to bucket_index using new collapsing strategy.
Expand All @@ -56,8 +47,6 @@ def add(self, bucket_index: int, count: int = 1):
bucket_index: The bucket index to add to.
count: The count to add (default 1).
"""
if count <= 0:
return

if self.min_index is None:
# First insertion
Expand All @@ -70,23 +59,23 @@ def add(self, bucket_index: int, count: int = 1):
if bucket_index < self.min_index:
new_range = self.max_index - bucket_index + 1
# Handle insertion below current minimum
if new_range > len(self.counts):
if new_range > self.max_buckets:
# Range too large, collapse into min bucket
pos = self._get_position(self.min_index)
pos = (self.arr_index_of_min_bucket) % self.max_buckets
self.counts[pos] += count
self.collapse_count += 1
else:
# Update min and place value
shift = self.min_index - bucket_index
self.min_index = bucket_index
self.arr_index_of_min_bucket = self.arr_index_of_min_bucket - shift
pos = self._get_position(bucket_index)
pos = (bucket_index - self.min_index + self.arr_index_of_min_bucket) % self.max_buckets
self.counts[pos] = count
self.num_buckets += 1

elif bucket_index > self.max_index:
new_range = bucket_index - self.min_index + 1
if new_range > len(self.counts):
if new_range > self.max_buckets:
# Handle insertion above current maximum
buckets_to_collapse = bucket_index - self.max_index
# Collapse lowest buckets
Expand All @@ -103,7 +92,7 @@ def add(self, bucket_index: int, count: int = 1):

# Add collapsed values to new min bucket
new_min = self.min_index + buckets_to_collapse
new_min_pos = self._get_position(new_min)
new_min_pos = (buckets_to_collapse + self.arr_index_of_min_bucket) % self.max_buckets
self.counts[new_min_pos] += collapse_sum

# Update tracking variables
Expand All @@ -113,14 +102,14 @@ def add(self, bucket_index: int, count: int = 1):

# Place new value
self.max_index = bucket_index
pos = self._get_position(bucket_index)
pos = (bucket_index - self.min_index + self.arr_index_of_min_bucket) % self.max_buckets
was_zero = self.counts[pos] == 0
self.counts[pos] += count
if was_zero:
self.num_buckets += 1
else:
# Normal insertion within current range
pos = self._get_position(bucket_index)
pos = (bucket_index - self.min_index + self.arr_index_of_min_bucket) % self.max_buckets
was_zero = self.counts[pos] == 0
self.counts[pos] += count
if was_zero:
Expand All @@ -143,7 +132,7 @@ def remove(self, bucket_index: int, count: int = 1) -> bool:
return False

if self.min_index <= bucket_index <= self.max_index:
pos = self._get_position(bucket_index)
pos = (bucket_index - self.min_index + self.arr_index_of_min_bucket) % self.max_buckets
old_count = self.counts[pos]

if old_count == 0:
Expand All @@ -160,15 +149,15 @@ def remove(self, bucket_index: int, count: int = 1) -> bool:
elif bucket_index == self.min_index:
# Find new minimum index
for i in range(self.max_index - self.min_index + 1):
pos = (self.arr_index_of_min_bucket + i) % len(self.counts)
pos = (self.arr_index_of_min_bucket + i) % self.max_buckets
if self.counts[pos] > 0:
self.min_index += i
self.arr_index_of_min_bucket = pos
break
elif bucket_index == self.max_index:
# Find new maximum index
for i in range(self.max_index - self.min_index + 1):
pos = (self.arr_index_of_min_bucket + (self.max_index - self.min_index - i)) % len(self.counts)
pos = (self.arr_index_of_min_bucket + (self.max_index - self.min_index - i)) % self.max_buckets
if self.counts[pos] > 0:
self.max_index -= i
break
Expand All @@ -189,9 +178,8 @@ def get_count(self, bucket_index: int) -> int:
The count at the specified bucket index.
"""
if self.min_index is None or bucket_index < self.min_index or bucket_index > self.max_index:
warnings.warn("Bucket index is out of range. Returning 0.", UserWarning)
return 0
pos = self._get_position(bucket_index)
pos = (bucket_index - self.min_index + self.arr_index_of_min_bucket) % self.max_buckets
return int(self.counts[pos])

def merge(self, other: 'ContiguousStorage'):
Expand All @@ -206,7 +194,7 @@ def merge(self, other: 'ContiguousStorage'):

# Add each non-zero bucket
for i in range(other.max_index - other.min_index + 1):
pos = (other.arr_index_of_min_bucket + i) % len(other.counts)
pos = (other.arr_index_of_min_bucket + i) % other.max_buckets
if other.counts[pos] > 0:
bucket_index = other.min_index + i
self.add(bucket_index, int(other.counts[pos]))
Expand Down
Loading