diff --git a/.python-version b/.python-version index a08ffae0..89a1ad7a 100644 --- a/.python-version +++ b/.python-version @@ -1 +1 @@ -3.8.2 +3.8.12 diff --git a/redisolar/dao/redis/base.py b/redisolar/dao/redis/base.py index 589d643f..aaaa2786 100644 --- a/redisolar/dao/redis/base.py +++ b/redisolar/dao/redis/base.py @@ -11,4 +11,4 @@ def __init__(self, self.redis = redis_client if key_schema is None: key_schema = KeySchema() - self.key_schema = key_schema + self.key_schema = key_schema \ No newline at end of file diff --git a/redisolar/dao/redis/capacity_report.py b/redisolar/dao/redis/capacity_report.py index 7ff5dbca..4d56dedd 100644 --- a/redisolar/dao/redis/capacity_report.py +++ b/redisolar/dao/redis/capacity_report.py @@ -33,8 +33,8 @@ def get_report(self, limit: int, **kwargs) -> CapacityReport: return CapacityReport(high_capacity_list, low_capacity_list) def get_rank(self, site_id: int, **kwargs) -> float: - # START Challenge #4 - # Remove the following line after you have added code to - # get the real rank. - return 0 - # END Challenge #4 + capacity_ranking_key = self.key_schema.capacity_ranking_key() + rank = self.redis.zrevrank(capacity_ranking_key, site_id) + # If the site_id is not found in the sorted set, rank will be None. + # You can choose how to handle that case. Here, we simply return -1. + return rank if rank is not None else -1 diff --git a/redisolar/dao/redis/feed.py b/redisolar/dao/redis/feed.py index c4c80479..ee2b4918 100644 --- a/redisolar/dao/redis/feed.py +++ b/redisolar/dao/redis/feed.py @@ -25,10 +25,19 @@ def insert(self, meter_reading: MeterReading, **kwargs) -> None: p.execute() def _insert(self, meter_reading: MeterReading, - pipeline: redis.client.Pipeline) -> None: - """Helper method to insert a meter reading.""" - # START Challenge #6 - # END Challenge #6 + pipeline: redis.client.Pipeline) -> None: + # Serialize the meter reading into a dictionary so it can be stored in the stream. + data = MeterReadingSchema().dump(meter_reading) + + # Get the key for the global feed. + global_key = self.key_schema.global_feed_key() + # Add the serialized reading to the global stream, trimming it to GLOBAL_MAX_FEED_LENGTH. + pipeline.xadd(global_key, data, maxlen=self.GLOBAL_MAX_FEED_LENGTH, approximate=True) + + # Get the key for the site-specific feed. + site_key = self.key_schema.feed_key(meter_reading.site_id) + # Add the serialized reading to the site-specific stream, trimming it to SITE_MAX_FEED_LENGTH. + pipeline.xadd(site_key, data, maxlen=self.SITE_MAX_FEED_LENGTH, approximate=True) def get_recent_global(self, limit: int, **kwargs) -> List[MeterReading]: return self.get_recent(self.key_schema.global_feed_key(), limit) diff --git a/redisolar/dao/redis/metric.py b/redisolar/dao/redis/metric.py index ea240f3a..a53eedc3 100644 --- a/redisolar/dao/redis/metric.py +++ b/redisolar/dao/redis/metric.py @@ -116,11 +116,20 @@ def insert(self, meter_reading: MeterReading, **kwargs) -> None: def insert_metric(self, site_id: int, value: float, unit: MetricUnit, time: datetime.datetime, pipeline: redis.client.Pipeline): """Insert a specific metric.""" - metric_key = self.key_schema.day_metric_key(site_id, unit, time) # pylint: disable=unused-variable - minute_of_day = self._get_day_minute(time) # pylint: disable=unused-variable + metric_key = self.key_schema.day_metric_key(site_id, unit, time) + minute_of_day = self._get_day_minute(time) + + # Create a MeasurementMinute object to encode measurement + minute into a single string. + mm = MeasurementMinute(value, minute_of_day) + + # Add to the sorted set: + # - member is the string representation of MeasurementMinute (e.g. "18.00:120") + # - score is the integer minute of the day (e.g. 120 for 2:00 am) + pipeline.zadd(metric_key, {str(mm): minute_of_day}) + + # Optionally set an expiration to clean up older data: + pipeline.expire(metric_key, METRIC_EXPIRATION_SECONDS) - # START Challenge #2 - # END Challenge #2 def get_recent(self, site_id: int, unit: MetricUnit, time: datetime.datetime, limit: int, **kwargs) -> Deque[Measurement]: diff --git a/redisolar/dao/redis/site.py b/redisolar/dao/redis/site.py index 252df66f..2f26935f 100644 --- a/redisolar/dao/redis/site.py +++ b/redisolar/dao/redis/site.py @@ -36,9 +36,15 @@ def find_by_id(self, site_id: int, **kwargs) -> Site: def find_all(self, **kwargs) -> Set[Site]: """Find all Sites in Redis.""" - # START Challenge #1 - # Remove this line when you've written code to build `site_hashes`. - site_hashes = [] # type: ignore - # END Challenge #1 + site_ids_key = self.key_schema.site_ids_key() + + # Get all stored site IDs from the Redis set + site_ids = self.redis.smembers(site_ids_key) + + # Retrieve and deserialize all sites + site_hashes = [ + self.redis.hgetall(self.key_schema.site_hash_key(site_id)) + for site_id in site_ids + ] return {FlatSiteSchema().load(site_hash) for site_hash in site_hashes} diff --git a/redisolar/dao/redis/site_geo.py b/redisolar/dao/redis/site_geo.py index f9aa00af..e13ac546 100644 --- a/redisolar/dao/redis/site_geo.py +++ b/redisolar/dao/redis/site_geo.py @@ -53,32 +53,37 @@ def _find_by_geo(self, query: GeoQuery, **kwargs) -> Set[Site]: return {FlatSiteSchema().load(site) for site in sites} def _find_by_geo_with_capacity(self, query: GeoQuery, **kwargs) -> Set[Site]: - # START Challenge #5 - # Your task: Get the sites matching the GEO query. - # END Challenge #5 - - p = self.redis.pipeline(transaction=False) - - # START Challenge #5 - # - # Your task: Populate a dictionary called "scores" whose keys are site - # IDs and whose values are the site's capacity. - # - # Make sure to run any Redis commands against a Pipeline object - # for better performance. - # END Challenge #5 - - # Delete the next lines after you've populated a `site_ids` - # and `scores` variable. - site_ids: List[str] = [] + # STEP 1: Use Redis GEO command to find site IDs within the specified radius. + site_ids: List[str] = self.redis.georadius( + self.key_schema.site_geo_key(), + query.coordinate.lng, + query.coordinate.lat, + query.radius, + query.radius_unit.value + ) + + # STEP 2: For each site in the geo query, retrieve its capacity. + # Site capacity is stored in a Sorted Set under the key provided by capacity_ranking_key(). + capacity_key = self.key_schema.capacity_ranking_key() + score_pipe = self.redis.pipeline(transaction=False) + for site_id in site_ids: + score_pipe.zscore(capacity_key, site_id) + score_results = score_pipe.execute() + + # Build a dictionary mapping site IDs to their capacity (default to 0.0 if missing). scores: Dict[str, float] = {} + for site_id, score in zip(site_ids, score_results): + scores[str(site_id)] = float(score) if score is not None else 0.0 + # Now, use a pipeline to fetch the full site hash for those sites that exceed the capacity threshold. + p = self.redis.pipeline(transaction=False) for site_id in site_ids: - if scores[site_id] and scores[site_id] > CAPACITY_THRESHOLD: + if scores.get(str(site_id), 0.0) > CAPACITY_THRESHOLD: p.hgetall(self.key_schema.site_hash_key(site_id)) site_hashes = p.execute() - return {FlatSiteSchema().load(site) for site in site_hashes} + # Return a set of Site objects built from the hashes. + return {FlatSiteSchema().load(site) for site in site_hashes if site} def find_by_geo(self, query: GeoQuery, **kwargs) -> Set[Site]: """Find Sites using a geographic query.""" diff --git a/redisolar/dao/redis/site_stats.py b/redisolar/dao/redis/site_stats.py index 9b950865..37a080ad 100644 --- a/redisolar/dao/redis/site_stats.py +++ b/redisolar/dao/redis/site_stats.py @@ -54,14 +54,32 @@ def _update_basic(self, key: str, reading: MeterReading) -> None: self.redis.hset(key, SiteStats.MAX_CAPACITY, reading.wh_generated) def _update_optimized(self, key: str, meter_reading: MeterReading, - pipeline: redis.client.Pipeline = None) -> None: + pipeline: redis.client.Pipeline = None) -> None: execute = False if pipeline is None: pipeline = self.redis.pipeline() execute = True - # START Challenge #3 - # END Challenge #3 + # Set the reporting time, increment count, and set the expiry. + reporting_time = datetime.datetime.utcnow().isoformat() + pipeline.hset(key, SiteStats.LAST_REPORTING_TIME, reporting_time) + pipeline.hincrby(key, SiteStats.COUNT, 1) + pipeline.expire(key, WEEK_SECONDS) + + # Use the Lua script to update MAX_WH only if the new wh_generated is greater. + self.compare_and_update_script.update_if_greater( + pipeline, key, SiteStats.MAX_WH, meter_reading.wh_generated + ) + + # Use the Lua script to update MIN_WH only if the new wh_generated is lower. + self.compare_and_update_script.update_if_less( + pipeline, key, SiteStats.MIN_WH, meter_reading.wh_generated + ) + + # Use the Lua script to update MAX_CAPACITY only if the new current_capacity is greater. + self.compare_and_update_script.update_if_greater( + pipeline, key, SiteStats.MAX_CAPACITY, meter_reading.current_capacity + ) if execute: pipeline.execute() diff --git a/redisolar/dao/redis/sliding_window_rate_limiter.py b/redisolar/dao/redis/sliding_window_rate_limiter.py index 25b0c8d9..4ac087b1 100644 --- a/redisolar/dao/redis/sliding_window_rate_limiter.py +++ b/redisolar/dao/redis/sliding_window_rate_limiter.py @@ -1,13 +1,14 @@ # Uncomment for Challenge #7 -#import datetime -#import random +import datetime +import random from redis.client import Redis - +import time from redisolar.dao.base import RateLimiterDaoBase from redisolar.dao.redis.base import RedisDaoBase from redisolar.dao.redis.key_schema import KeySchema + # Uncomment for Challenge #7 -#from redisolar.dao.base import RateLimitExceededException +from redisolar.dao.base import RateLimitExceededException class SlidingWindowRateLimiter(RateLimiterDaoBase, RedisDaoBase): @@ -24,5 +25,34 @@ def __init__(self, def hit(self, name: str): """Record a hit using the rate-limiter.""" - # START Challenge #7 - # END Challenge #7 + # Get the current timestamp in milliseconds. + now = int(time.time() * 1000) + + # Generate the key for the sliding window rate limiter. + # Key format: limiter:[name]:[window_size_ms]:[max_hits] + key = self.key_schema.fixed_rate_limiter_key(name, int(self.window_size_ms), self.max_hits) + + # Start a pipeline to perform the following operations atomically. + pipe = self.redis.pipeline() + + # Create a unique member for this hit. + # Instead of using uuid, we use a random integer to ensure uniqueness. + member = f"{now}-{random.randint(0, 1000000)}" + + # 1. Add the current hit to the sorted set with the timestamp as its score. + pipe.zadd(key, {member: now}) + + # 2. Remove all entries older than the current sliding window. + cutoff = now - self.window_size_ms + pipe.zremrangebyscore(key, 0, cutoff) + + # 3. Get the current count of entries in the sorted set. + pipe.zcard(key) + + # Execute the pipeline. + results = pipe.execute() + + # The third result is the count from ZCARD. + count = results[2] + if count > self.max_hits: + raise RateLimitExceededException(f"Rate limit exceeded: {count} hits in {self.window_size_ms}ms") \ No newline at end of file diff --git a/tests/dao/redis/test_capacity_report.py b/tests/dao/redis/test_capacity_report.py index 46ea7db3..e7534176 100644 --- a/tests/dao/redis/test_capacity_report.py +++ b/tests/dao/redis/test_capacity_report.py @@ -46,7 +46,6 @@ def test_get_report(readings, capacity_report_dao): assert lowest[4].capacity > lowest[0].capacity -@pytest.mark.skip("Remove for challenge #4") def test_get_rank(readings, capacity_report_dao): for reading in readings: capacity_report_dao.update(reading) diff --git a/tests/dao/redis/test_feed.py b/tests/dao/redis/test_feed.py index 9c41e668..fc925420 100644 --- a/tests/dao/redis/test_feed.py +++ b/tests/dao/redis/test_feed.py @@ -22,7 +22,7 @@ def generate_meter_reading(site_id: int, datetime: datetime.datetime): wh_used=0.015) -@pytest.mark.skip("Remove for Challenge #6") +# @pytest.mark.skip("Remove for Challenge #6") def test_basic_insert_returns_recent(feed_dao): now = datetime.datetime.now() reading0 = generate_meter_reading(1, now) diff --git a/tests/dao/redis/test_metric.py b/tests/dao/redis/test_metric.py index 3fb6e774..26bcbc6b 100644 --- a/tests/dao/redis/test_metric.py +++ b/tests/dao/redis/test_metric.py @@ -49,7 +49,7 @@ def _test_insert_and_retrieve(readings: List[MeterReading], # Challenge #2 -@pytest.mark.skip("Remove for challenge #2") +# @pytest.mark.skip("Remove for challenge #2") def test_small(metric_dao, readings): _test_insert_and_retrieve(readings, metric_dao, 1) diff --git a/tests/dao/redis/test_site.py b/tests/dao/redis/test_site.py index 8e0d985c..0d551def 100644 --- a/tests/dao/redis/test_site.py +++ b/tests/dao/redis/test_site.py @@ -76,7 +76,7 @@ def test_find_by_id_existing_site(site_dao): assert found_site == site -@pytest.mark.skip("Remove for challenge #1") +# @pytest.mark.skip("Remove for challenge #1") def test_find_all(site_dao): site1 = Site(id=1, capacity=10.0, diff --git a/tests/dao/redis/test_site_geo.py b/tests/dao/redis/test_site_geo.py index 43c8a646..63fa6a98 100644 --- a/tests/dao/redis/test_site_geo.py +++ b/tests/dao/redis/test_site_geo.py @@ -161,7 +161,7 @@ def test_find_sites_by_geo(site_geo_dao): assert site_geo_dao.find_by_geo(query) == {site1, site2} -@pytest.mark.skip("Remove for challenge #5") +# @pytest.mark.skip("Remove for challenge #5") def test_find_by_geo_with_excess_capacity(site_geo_dao, capacity_dao): site1 = Site(id=1, capacity=10.0, diff --git a/tests/dao/redis/test_sliding_window_rate_limiter.py b/tests/dao/redis/test_sliding_window_rate_limiter.py index aefb0771..467c1da5 100644 --- a/tests/dao/redis/test_sliding_window_rate_limiter.py +++ b/tests/dao/redis/test_sliding_window_rate_limiter.py @@ -8,7 +8,7 @@ TEN_SECONDS = 10 * 1000 # Challenge #7 -@pytest.mark.skip("Remove for challenge #7") +# @pytest.mark.skip("Remove for challenge #7") def test_within_limit_inside_window(redis, key_schema): exception_count = 0 limiter = SlidingWindowRateLimiter(TEN_SECONDS, 10, redis, key_schema=key_schema) @@ -22,7 +22,7 @@ def test_within_limit_inside_window(redis, key_schema): assert exception_count == 0 -@pytest.mark.skip("Remove for challenge #7") +# @pytest.mark.skip("Remove for challenge #7") def test_exceeds_limit_inside_window(redis, key_schema): exception_count = 0 limiter = SlidingWindowRateLimiter(TEN_SECONDS, 10, redis, key_schema=key_schema) @@ -36,7 +36,7 @@ def test_exceeds_limit_inside_window(redis, key_schema): assert exception_count == 2 -@pytest.mark.skip("Remove for challenge #7") +# @pytest.mark.skip("Remove for challenge #7") def test_exceeds_limit_outside_window(redis, key_schema): raised = False limiter = SlidingWindowRateLimiter(100, 10, redis, key_schema=key_schema)