Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions growthbook/growthbook.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
feature flagging and A/B testing platform.
More info at https://www.growthbook.io
"""

import atexit
import sys
import json
import threading
Expand Down Expand Up @@ -151,8 +151,9 @@ def connect(self):
return

self.is_running = True
self._sse_thread = threading.Thread(target=self._run_sse_channel)
self._sse_thread = threading.Thread(target=self._run_sse_channel, daemon=True)
self._sse_thread.start()
atexit.register(self.disconnect)

def disconnect(self, timeout=10):
"""Gracefully disconnect with timeout"""
Expand Down
37 changes: 21 additions & 16 deletions growthbook/growthbook_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env python

import json
from dataclasses import dataclass, field
import random
import logging
Expand Down Expand Up @@ -181,34 +181,39 @@ def remove_callback(self, callback: Callable[[Dict[str, Any]], Awaitable[None]])
The Hand off - when the event arrives (we're still on the background thread), sse_handler uses `asyncio.run_coroutine_threadsafe`
to schedule the async processing `_handle_sse_event` onto the main event loop.
"""

async def _handle_sse_event(self, event_data: Dict[str, Any]) -> None:
"""Process an event received from the SSE connection"""
try:
event_type = event_data.get("type")
if event_type == "features-updated":
response = await self.load_features_async(
self._api_host, self._client_key, self._decryption_key, self._cache_ttl
)
if response is not None:
await self._handle_feature_update(response)
elif event_type == "features":
data = event_data.get("data", "{}")
if isinstance(data, str):
data = json.loads(data)
await self._handle_feature_update(data)
except Exception:
logger.exception("Error handling SSE event")

async def _start_sse_refresh(self) -> None:
"""Start SSE-based feature refresh"""
with self._refresh_lock:
if self._refresh_task is not None: # Already running
return

# SSEClient invokes `on_event` synchronously from a background thread.
async def _handle_sse_event(event_data: Dict[str, Any]) -> None:
try:
event_type = event_data.get("type")
if event_type == "features-updated":
response = await self.load_features_async(
self._api_host, self._client_key, self._decryption_key, self._cache_ttl
)
if response is not None:
await self._handle_feature_update(response)
elif event_type == "features":
await self._handle_feature_update(event_data.get("data", {}))
except Exception:
logger.exception("Error handling SSE event")

main_loop = asyncio.get_running_loop()

# We must not pass an `async def` callback here (it would never be awaited).
def sse_handler(event_data: Dict[str, Any]) -> None:
# Schedule async processing onto the main event loop.
try:
asyncio.run_coroutine_threadsafe(_handle_sse_event(event_data), main_loop)
asyncio.run_coroutine_threadsafe(self._handle_sse_event(event_data), main_loop)
except Exception:
logger.exception("Failed to schedule SSE event handler")

Expand Down
32 changes: 16 additions & 16 deletions tests/test_growthbook_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,13 @@ async def test_sse_connection_lifecycle(mock_options, mock_features_response):
patch('growthbook.growthbook_client.EnhancedFeatureRepository.stopAutoRefresh') as mock_stop:
await client.initialize()
# Allow the SSE lifecycle task to start and invoke startAutoRefresh
await asyncio.sleep(0)
await asyncio.sleep(0.1)
assert mock_start.called

# Verify the thread created is a daemon thread (if possible without real start)
# Since we mock startAutoRefresh, we can't check the real thread here.
# But we can check that SSEClient is initialized correctly if we don't mock it all.

await client.close()
assert mock_stop.called

Expand Down Expand Up @@ -337,18 +342,14 @@ async def test_callback(features):

@pytest.mark.asyncio
async def test_sse_event_handling(mock_options):
"""Test SSE event handling and reconnection logic"""
"""Test SSE event handling including JSON parsing"""
events = [
{'type': 'features', 'data': {'features': {'feature1': {'defaultValue': 1}}}},
{'type': 'ping', 'data': {}}, # Should be ignored
{'type': 'features', 'data': {'features': {'feature1': {'defaultValue': 2}}}}
# Real SSE payload is a raw string in 'data'
{'type': 'features', 'data': json.dumps({'features': {'feature1': {'defaultValue': 1}}})},
{'type': 'ping', 'data': '{}'}, # Should be ignored
{'type': 'features', 'data': json.dumps({'features': {'feature1': {'defaultValue': 2}}})}
]

async def mock_sse_handler(event_data):
"""Mock the SSE event handler to directly update feature cache"""
if event_data['type'] == 'features':
await client._features_repository._handle_feature_update(event_data['data'])

with patch('growthbook.FeatureRepository.load_features_async',
new_callable=AsyncMock, return_value={"features": {}, "savedGroups": {}}) as mock_load:

Expand All @@ -364,16 +365,15 @@ async def mock_sse_handler(event_data):
try:
await client.initialize()

# Simulate SSE events directly
# Simulate SSE events using the actual handler method
# This now tests the json.loads parsing logic!
for event in events:
if event['type'] == 'features':
await client._features_repository._handle_feature_update(event['data'])
await client._features_repository._handle_sse_event(event)

# print(f"AFTER TEST: Current cache state: {client._features_repository._feature_cache.get_current_state()}")
# Verify feature update happened
assert client._features_repository._feature_cache.get_current_state()["features"]["feature1"]["defaultValue"] == 2
state = client._features_repository._feature_cache.get_current_state()
assert state["features"]["feature1"]["defaultValue"] == 2
finally:
# Ensure we clean up the SSE connection
await client.close()

@pytest.mark.asyncio
Expand Down