diff --git a/growthbook/growthbook.py b/growthbook/growthbook.py index 36b1599..d5e15c6 100644 --- a/growthbook/growthbook.py +++ b/growthbook/growthbook.py @@ -4,7 +4,7 @@ feature flagging and A/B testing platform. More info at https://www.growthbook.io """ - +import atexit import sys import json import threading @@ -151,8 +151,9 @@ def connect(self): return self.is_running = True - self._sse_thread = threading.Thread(target=self._run_sse_channel) + self._sse_thread = threading.Thread(target=self._run_sse_channel, daemon=True) self._sse_thread.start() + atexit.register(self.disconnect) def disconnect(self, timeout=10): """Gracefully disconnect with timeout""" diff --git a/growthbook/growthbook_client.py b/growthbook/growthbook_client.py index 3a8ef1a..70f9224 100644 --- a/growthbook/growthbook_client.py +++ b/growthbook/growthbook_client.py @@ -1,5 +1,5 @@ #!/usr/bin/env python - +import json from dataclasses import dataclass, field import random import logging @@ -181,6 +181,25 @@ def remove_callback(self, callback: Callable[[Dict[str, Any]], Awaitable[None]]) The Hand off - when the event arrives (we're still on the background thread), sse_handler uses `asyncio.run_coroutine_threadsafe` to schedule the async processing `_handle_sse_event` onto the main event loop. """ + + async def _handle_sse_event(self, event_data: Dict[str, Any]) -> None: + """Process an event received from the SSE connection""" + try: + event_type = event_data.get("type") + if event_type == "features-updated": + response = await self.load_features_async( + self._api_host, self._client_key, self._decryption_key, self._cache_ttl + ) + if response is not None: + await self._handle_feature_update(response) + elif event_type == "features": + data = event_data.get("data", "{}") + if isinstance(data, str): + data = json.loads(data) + await self._handle_feature_update(data) + except Exception: + logger.exception("Error handling SSE event") + async def _start_sse_refresh(self) -> None: """Start SSE-based feature refresh""" with self._refresh_lock: @@ -188,27 +207,13 @@ async def _start_sse_refresh(self) -> None: return # SSEClient invokes `on_event` synchronously from a background thread. - async def _handle_sse_event(event_data: Dict[str, Any]) -> None: - try: - event_type = event_data.get("type") - if event_type == "features-updated": - response = await self.load_features_async( - self._api_host, self._client_key, self._decryption_key, self._cache_ttl - ) - if response is not None: - await self._handle_feature_update(response) - elif event_type == "features": - await self._handle_feature_update(event_data.get("data", {})) - except Exception: - logger.exception("Error handling SSE event") - main_loop = asyncio.get_running_loop() # We must not pass an `async def` callback here (it would never be awaited). def sse_handler(event_data: Dict[str, Any]) -> None: # Schedule async processing onto the main event loop. try: - asyncio.run_coroutine_threadsafe(_handle_sse_event(event_data), main_loop) + asyncio.run_coroutine_threadsafe(self._handle_sse_event(event_data), main_loop) except Exception: logger.exception("Failed to schedule SSE event handler") diff --git a/tests/test_growthbook_client.py b/tests/test_growthbook_client.py index 4fe3816..c88b024 100644 --- a/tests/test_growthbook_client.py +++ b/tests/test_growthbook_client.py @@ -95,8 +95,13 @@ async def test_sse_connection_lifecycle(mock_options, mock_features_response): patch('growthbook.growthbook_client.EnhancedFeatureRepository.stopAutoRefresh') as mock_stop: await client.initialize() # Allow the SSE lifecycle task to start and invoke startAutoRefresh - await asyncio.sleep(0) + await asyncio.sleep(0.1) assert mock_start.called + + # Verify the thread created is a daemon thread (if possible without real start) + # Since we mock startAutoRefresh, we can't check the real thread here. + # But we can check that SSEClient is initialized correctly if we don't mock it all. + await client.close() assert mock_stop.called @@ -337,18 +342,14 @@ async def test_callback(features): @pytest.mark.asyncio async def test_sse_event_handling(mock_options): - """Test SSE event handling and reconnection logic""" + """Test SSE event handling including JSON parsing""" events = [ - {'type': 'features', 'data': {'features': {'feature1': {'defaultValue': 1}}}}, - {'type': 'ping', 'data': {}}, # Should be ignored - {'type': 'features', 'data': {'features': {'feature1': {'defaultValue': 2}}}} + # Real SSE payload is a raw string in 'data' + {'type': 'features', 'data': json.dumps({'features': {'feature1': {'defaultValue': 1}}})}, + {'type': 'ping', 'data': '{}'}, # Should be ignored + {'type': 'features', 'data': json.dumps({'features': {'feature1': {'defaultValue': 2}}})} ] - async def mock_sse_handler(event_data): - """Mock the SSE event handler to directly update feature cache""" - if event_data['type'] == 'features': - await client._features_repository._handle_feature_update(event_data['data']) - with patch('growthbook.FeatureRepository.load_features_async', new_callable=AsyncMock, return_value={"features": {}, "savedGroups": {}}) as mock_load: @@ -364,16 +365,15 @@ async def mock_sse_handler(event_data): try: await client.initialize() - # Simulate SSE events directly + # Simulate SSE events using the actual handler method + # This now tests the json.loads parsing logic! for event in events: - if event['type'] == 'features': - await client._features_repository._handle_feature_update(event['data']) + await client._features_repository._handle_sse_event(event) - # print(f"AFTER TEST: Current cache state: {client._features_repository._feature_cache.get_current_state()}") # Verify feature update happened - assert client._features_repository._feature_cache.get_current_state()["features"]["feature1"]["defaultValue"] == 2 + state = client._features_repository._feature_cache.get_current_state() + assert state["features"]["feature1"]["defaultValue"] == 2 finally: - # Ensure we clean up the SSE connection await client.close() @pytest.mark.asyncio