Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .python-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.8.2
3.8.12
2 changes: 1 addition & 1 deletion redisolar/dao/redis/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ def __init__(self,
self.redis = redis_client
if key_schema is None:
key_schema = KeySchema()
self.key_schema = key_schema
self.key_schema = key_schema
10 changes: 5 additions & 5 deletions redisolar/dao/redis/capacity_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ def get_report(self, limit: int, **kwargs) -> CapacityReport:
return CapacityReport(high_capacity_list, low_capacity_list)

def get_rank(self, site_id: int, **kwargs) -> float:
# START Challenge #4
# Remove the following line after you have added code to
# get the real rank.
return 0
# END Challenge #4
capacity_ranking_key = self.key_schema.capacity_ranking_key()
rank = self.redis.zrevrank(capacity_ranking_key, site_id)
# If the site_id is not found in the sorted set, rank will be None.
# You can choose how to handle that case. Here, we simply return -1.
return rank if rank is not None else -1
17 changes: 13 additions & 4 deletions redisolar/dao/redis/feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,19 @@ def insert(self, meter_reading: MeterReading, **kwargs) -> None:
p.execute()

def _insert(self, meter_reading: MeterReading,
pipeline: redis.client.Pipeline) -> None:
"""Helper method to insert a meter reading."""
# START Challenge #6
# END Challenge #6
pipeline: redis.client.Pipeline) -> None:
# Serialize the meter reading into a dictionary so it can be stored in the stream.
data = MeterReadingSchema().dump(meter_reading)

# Get the key for the global feed.
global_key = self.key_schema.global_feed_key()
# Add the serialized reading to the global stream, trimming it to GLOBAL_MAX_FEED_LENGTH.
pipeline.xadd(global_key, data, maxlen=self.GLOBAL_MAX_FEED_LENGTH, approximate=True)

# Get the key for the site-specific feed.
site_key = self.key_schema.feed_key(meter_reading.site_id)
# Add the serialized reading to the site-specific stream, trimming it to SITE_MAX_FEED_LENGTH.
pipeline.xadd(site_key, data, maxlen=self.SITE_MAX_FEED_LENGTH, approximate=True)

def get_recent_global(self, limit: int, **kwargs) -> List[MeterReading]:
return self.get_recent(self.key_schema.global_feed_key(), limit)
Expand Down
17 changes: 13 additions & 4 deletions redisolar/dao/redis/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,20 @@ def insert(self, meter_reading: MeterReading, **kwargs) -> None:
def insert_metric(self, site_id: int, value: float, unit: MetricUnit,
time: datetime.datetime, pipeline: redis.client.Pipeline):
"""Insert a specific metric."""
metric_key = self.key_schema.day_metric_key(site_id, unit, time) # pylint: disable=unused-variable
minute_of_day = self._get_day_minute(time) # pylint: disable=unused-variable
metric_key = self.key_schema.day_metric_key(site_id, unit, time)
minute_of_day = self._get_day_minute(time)

# Create a MeasurementMinute object to encode measurement + minute into a single string.
mm = MeasurementMinute(value, minute_of_day)

# Add to the sorted set:
# - member is the string representation of MeasurementMinute (e.g. "18.00:120")
# - score is the integer minute of the day (e.g. 120 for 2:00 am)
pipeline.zadd(metric_key, {str(mm): minute_of_day})

# Optionally set an expiration to clean up older data:
pipeline.expire(metric_key, METRIC_EXPIRATION_SECONDS)

# START Challenge #2
# END Challenge #2

def get_recent(self, site_id: int, unit: MetricUnit, time: datetime.datetime,
limit: int, **kwargs) -> Deque[Measurement]:
Expand Down
14 changes: 10 additions & 4 deletions redisolar/dao/redis/site.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,15 @@ def find_by_id(self, site_id: int, **kwargs) -> Site:

def find_all(self, **kwargs) -> Set[Site]:
"""Find all Sites in Redis."""
# START Challenge #1
# Remove this line when you've written code to build `site_hashes`.
site_hashes = [] # type: ignore
# END Challenge #1
site_ids_key = self.key_schema.site_ids_key()

# Get all stored site IDs from the Redis set
site_ids = self.redis.smembers(site_ids_key)

# Retrieve and deserialize all sites
site_hashes = [
self.redis.hgetall(self.key_schema.site_hash_key(site_id))
for site_id in site_ids
]

return {FlatSiteSchema().load(site_hash) for site_hash in site_hashes}
45 changes: 25 additions & 20 deletions redisolar/dao/redis/site_geo.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,32 +53,37 @@ def _find_by_geo(self, query: GeoQuery, **kwargs) -> Set[Site]:
return {FlatSiteSchema().load(site) for site in sites}

def _find_by_geo_with_capacity(self, query: GeoQuery, **kwargs) -> Set[Site]:
# START Challenge #5
# Your task: Get the sites matching the GEO query.
# END Challenge #5

p = self.redis.pipeline(transaction=False)

# START Challenge #5
#
# Your task: Populate a dictionary called "scores" whose keys are site
# IDs and whose values are the site's capacity.
#
# Make sure to run any Redis commands against a Pipeline object
# for better performance.
# END Challenge #5

# Delete the next lines after you've populated a `site_ids`
# and `scores` variable.
site_ids: List[str] = []
# STEP 1: Use Redis GEO command to find site IDs within the specified radius.
site_ids: List[str] = self.redis.georadius(
self.key_schema.site_geo_key(),
query.coordinate.lng,
query.coordinate.lat,
query.radius,
query.radius_unit.value
)

# STEP 2: For each site in the geo query, retrieve its capacity.
# Site capacity is stored in a Sorted Set under the key provided by capacity_ranking_key().
capacity_key = self.key_schema.capacity_ranking_key()
score_pipe = self.redis.pipeline(transaction=False)
for site_id in site_ids:
score_pipe.zscore(capacity_key, site_id)
score_results = score_pipe.execute()

# Build a dictionary mapping site IDs to their capacity (default to 0.0 if missing).
scores: Dict[str, float] = {}
for site_id, score in zip(site_ids, score_results):
scores[str(site_id)] = float(score) if score is not None else 0.0

# Now, use a pipeline to fetch the full site hash for those sites that exceed the capacity threshold.
p = self.redis.pipeline(transaction=False)
for site_id in site_ids:
if scores[site_id] and scores[site_id] > CAPACITY_THRESHOLD:
if scores.get(str(site_id), 0.0) > CAPACITY_THRESHOLD:
p.hgetall(self.key_schema.site_hash_key(site_id))
site_hashes = p.execute()

return {FlatSiteSchema().load(site) for site in site_hashes}
# Return a set of Site objects built from the hashes.
return {FlatSiteSchema().load(site) for site in site_hashes if site}

def find_by_geo(self, query: GeoQuery, **kwargs) -> Set[Site]:
"""Find Sites using a geographic query."""
Expand Down
24 changes: 21 additions & 3 deletions redisolar/dao/redis/site_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,32 @@ def _update_basic(self, key: str, reading: MeterReading) -> None:
self.redis.hset(key, SiteStats.MAX_CAPACITY, reading.wh_generated)

def _update_optimized(self, key: str, meter_reading: MeterReading,
pipeline: redis.client.Pipeline = None) -> None:
pipeline: redis.client.Pipeline = None) -> None:
execute = False
if pipeline is None:
pipeline = self.redis.pipeline()
execute = True

# START Challenge #3
# END Challenge #3
# Set the reporting time, increment count, and set the expiry.
reporting_time = datetime.datetime.utcnow().isoformat()
pipeline.hset(key, SiteStats.LAST_REPORTING_TIME, reporting_time)
pipeline.hincrby(key, SiteStats.COUNT, 1)
pipeline.expire(key, WEEK_SECONDS)

# Use the Lua script to update MAX_WH only if the new wh_generated is greater.
self.compare_and_update_script.update_if_greater(
pipeline, key, SiteStats.MAX_WH, meter_reading.wh_generated
)

# Use the Lua script to update MIN_WH only if the new wh_generated is lower.
self.compare_and_update_script.update_if_less(
pipeline, key, SiteStats.MIN_WH, meter_reading.wh_generated
)

# Use the Lua script to update MAX_CAPACITY only if the new current_capacity is greater.
self.compare_and_update_script.update_if_greater(
pipeline, key, SiteStats.MAX_CAPACITY, meter_reading.current_capacity
)

if execute:
pipeline.execute()
Expand Down
42 changes: 36 additions & 6 deletions redisolar/dao/redis/sliding_window_rate_limiter.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
# Uncomment for Challenge #7
#import datetime
#import random
import datetime
import random
from redis.client import Redis

import time
from redisolar.dao.base import RateLimiterDaoBase
from redisolar.dao.redis.base import RedisDaoBase
from redisolar.dao.redis.key_schema import KeySchema

# Uncomment for Challenge #7
#from redisolar.dao.base import RateLimitExceededException
from redisolar.dao.base import RateLimitExceededException


class SlidingWindowRateLimiter(RateLimiterDaoBase, RedisDaoBase):
Expand All @@ -24,5 +25,34 @@ def __init__(self,

def hit(self, name: str):
"""Record a hit using the rate-limiter."""
# START Challenge #7
# END Challenge #7
# Get the current timestamp in milliseconds.
now = int(time.time() * 1000)

# Generate the key for the sliding window rate limiter.
# Key format: limiter:[name]:[window_size_ms]:[max_hits]
key = self.key_schema.fixed_rate_limiter_key(name, int(self.window_size_ms), self.max_hits)

# Start a pipeline to perform the following operations atomically.
pipe = self.redis.pipeline()

# Create a unique member for this hit.
# Instead of using uuid, we use a random integer to ensure uniqueness.
member = f"{now}-{random.randint(0, 1000000)}"

# 1. Add the current hit to the sorted set with the timestamp as its score.
pipe.zadd(key, {member: now})

# 2. Remove all entries older than the current sliding window.
cutoff = now - self.window_size_ms
pipe.zremrangebyscore(key, 0, cutoff)

# 3. Get the current count of entries in the sorted set.
pipe.zcard(key)

# Execute the pipeline.
results = pipe.execute()

# The third result is the count from ZCARD.
count = results[2]
if count > self.max_hits:
raise RateLimitExceededException(f"Rate limit exceeded: {count} hits in {self.window_size_ms}ms")
1 change: 0 additions & 1 deletion tests/dao/redis/test_capacity_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ def test_get_report(readings, capacity_report_dao):
assert lowest[4].capacity > lowest[0].capacity


@pytest.mark.skip("Remove for challenge #4")
def test_get_rank(readings, capacity_report_dao):
for reading in readings:
capacity_report_dao.update(reading)
Expand Down
2 changes: 1 addition & 1 deletion tests/dao/redis/test_feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def generate_meter_reading(site_id: int, datetime: datetime.datetime):
wh_used=0.015)


@pytest.mark.skip("Remove for Challenge #6")
# @pytest.mark.skip("Remove for Challenge #6")
def test_basic_insert_returns_recent(feed_dao):
now = datetime.datetime.now()
reading0 = generate_meter_reading(1, now)
Expand Down
2 changes: 1 addition & 1 deletion tests/dao/redis/test_metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def _test_insert_and_retrieve(readings: List[MeterReading],


# Challenge #2
@pytest.mark.skip("Remove for challenge #2")
# @pytest.mark.skip("Remove for challenge #2")
def test_small(metric_dao, readings):
_test_insert_and_retrieve(readings, metric_dao, 1)

Expand Down
2 changes: 1 addition & 1 deletion tests/dao/redis/test_site.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def test_find_by_id_existing_site(site_dao):
assert found_site == site


@pytest.mark.skip("Remove for challenge #1")
# @pytest.mark.skip("Remove for challenge #1")
def test_find_all(site_dao):
site1 = Site(id=1,
capacity=10.0,
Expand Down
2 changes: 1 addition & 1 deletion tests/dao/redis/test_site_geo.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def test_find_sites_by_geo(site_geo_dao):
assert site_geo_dao.find_by_geo(query) == {site1, site2}


@pytest.mark.skip("Remove for challenge #5")
# @pytest.mark.skip("Remove for challenge #5")
def test_find_by_geo_with_excess_capacity(site_geo_dao, capacity_dao):
site1 = Site(id=1,
capacity=10.0,
Expand Down
6 changes: 3 additions & 3 deletions tests/dao/redis/test_sliding_window_rate_limiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
TEN_SECONDS = 10 * 1000

# Challenge #7
@pytest.mark.skip("Remove for challenge #7")
# @pytest.mark.skip("Remove for challenge #7")
def test_within_limit_inside_window(redis, key_schema):
exception_count = 0
limiter = SlidingWindowRateLimiter(TEN_SECONDS, 10, redis, key_schema=key_schema)
Expand All @@ -22,7 +22,7 @@ def test_within_limit_inside_window(redis, key_schema):
assert exception_count == 0


@pytest.mark.skip("Remove for challenge #7")
# @pytest.mark.skip("Remove for challenge #7")
def test_exceeds_limit_inside_window(redis, key_schema):
exception_count = 0
limiter = SlidingWindowRateLimiter(TEN_SECONDS, 10, redis, key_schema=key_schema)
Expand All @@ -36,7 +36,7 @@ def test_exceeds_limit_inside_window(redis, key_schema):
assert exception_count == 2


@pytest.mark.skip("Remove for challenge #7")
# @pytest.mark.skip("Remove for challenge #7")
def test_exceeds_limit_outside_window(redis, key_schema):
raised = False
limiter = SlidingWindowRateLimiter(100, 10, redis, key_schema=key_schema)
Expand Down