Skip to content

Commit 497a51a

Browse files
authored
Merge pull request #16 from ESA-PhiLab/copilot/fix-a5c4f09d-5aed-4188-8b33-d9ae22bd2177
2 parents 251da60 + c919855 commit 497a51a

3 files changed

Lines changed: 263 additions & 8 deletions

File tree

notebooks/1_search_n_download.ipynb

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
"name": "stdout",
3939
"output_type": "stream",
4040
"text": [
41-
"Number of results: 910\n"
41+
"Number of results: 5337\n"
4242
]
4343
},
4444
{
@@ -289,10 +289,10 @@
289289
" orbit_direction=None,\n",
290290
" cloud_cover_threshold=None,\n",
291291
" aoi_wkt=aoi_wkt, # Example: aoi_wkt=aoi_wkt if you want to use the defined AOI\n",
292-
" start_date = '2023-05-03T00:00:00',\n",
292+
" start_date = '2020-05-03T00:00:00',\n",
293293
" end_date = '2024-05-03T04:00:00',\n",
294294
" top=1000,\n",
295-
" count=True, # Set to True to get the total count of results\n",
295+
" count=True, # Set to True to get all the results, superseed top arg.\n",
296296
" attributes={'processingLevel':'LEVEL0',\n",
297297
" 'operationalMode': 'SM',\n",
298298
" # 'swathIdentifier': 'S1', # Swath identifier is: 1,2,3,4,5,6 for RAW\n",
@@ -353,7 +353,7 @@
353353
],
354354
"metadata": {
355355
"kernelspec": {
356-
"display_name": ".venv",
356+
"display_name": "phidown-3.9",
357357
"language": "python",
358358
"name": "python3"
359359
},
@@ -367,7 +367,7 @@
367367
"name": "python",
368368
"nbconvert_exporter": "python",
369369
"pygments_lexer": "ipython3",
370-
"version": "3.9.6"
370+
"version": "3.9.18"
371371
}
372372
},
373373
"nbformat": 4,

phidown/search.py

Lines changed: 79 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import typing
66
from datetime import datetime
77
import copy
8+
import asyncio
89

910
from .downloader import pull_down
1011

@@ -111,7 +112,6 @@ def query_by_filter(
111112
start_date (str, optional): Start date for filtering (ISO 8601 format). Defaults to None.
112113
end_date (str, optional): End date for filtering (ISO 8601 format). Defaults to None.
113114
top (int, optional): Maximum number of results to retrieve. Defaults to 1000.
114-
count (bool, optional): Whether to include count of results. Defaults to False.
115115
order_by (str, optional): Field and direction to order results by. Defaults to "ContentDate/Start desc".
116116
burst_mode (bool, optional): Enable Sentinel-1 SLC Burst mode searching. Defaults to False.
117117
burst_id (int, optional): Burst ID to filter (burst mode only). Defaults to None.
@@ -161,6 +161,8 @@ def query_by_filter(
161161
self._validate_time() # Validate start and end dates
162162

163163
self.top = top
164+
if self.count:
165+
self.top = 1000
164166
self._validate_top()
165167

166168
self.order_by = order_by
@@ -680,15 +682,89 @@ def _build_query(self):
680682
return self.url
681683

682684
def execute_query(self):
683-
"""Execute the query and retrieve data"""
685+
"""Execute the query and retrieve data.
686+
687+
If count=True and the total number of results exceeds the 'top' limit,
688+
this method will automatically paginate through all results using
689+
multiple requests with the $skip parameter, combining all results
690+
into a single DataFrame.
691+
692+
Returns:
693+
pd.DataFrame: DataFrame containing all retrieved products.
694+
"""
684695
url = self._build_query()
685696
self.response = copy.deepcopy(requests.get(url))
686697
self.response.raise_for_status() # Raise an error for bad status codes
687698

688699
self.json_data = self.response.json()
689700
self.num_results = self.json_data.get('@odata.count', 0)
690-
self.df = pd.DataFrame.from_dict(self.json_data['value'])
701+
702+
# Check if pagination is needed
703+
if self.count and self.num_results > self.top:
704+
return self._execute_paginated_query()
705+
else:
706+
self.df = pd.DataFrame.from_dict(self.json_data['value'])
707+
return self.df
708+
709+
def _execute_paginated_query(self):
710+
"""Execute paginated queries when results exceed top limit using asyncio"""
711+
all_data = []
712+
713+
# Add first page (already retrieved in execute_query)
714+
if 'value' in self.json_data:
715+
all_data.extend(self.json_data['value'])
716+
717+
page_size = self.top # Use the current top value as page size
718+
719+
# Calculate skips based on total results and page size
720+
skips = range(page_size, self.num_results, page_size)
721+
722+
if not skips:
723+
self.df = pd.DataFrame.from_dict(all_data)
724+
return self.df
725+
726+
urls = []
727+
for skip in skips:
728+
paginated_query = f"?$filter={self.filter_condition}&$orderby={self.order_by}&$top={page_size}&$skip={skip}&$expand=Attributes"
729+
if self.count:
730+
paginated_query += "&$count=true"
731+
urls.append(f"{self.base_url}{paginated_query}")
732+
733+
async def fetch_url(url):
734+
loop = asyncio.get_running_loop()
735+
try:
736+
response = await loop.run_in_executor(None, requests.get, url)
737+
response.raise_for_status()
738+
return response.json()
739+
except Exception as e:
740+
return e
691741

742+
async def fetch_all(urls):
743+
tasks = [fetch_url(url) for url in urls]
744+
return await asyncio.gather(*tasks, return_exceptions=True)
745+
746+
try:
747+
loop = asyncio.get_running_loop()
748+
except RuntimeError:
749+
loop = None
750+
751+
if loop and loop.is_running():
752+
# If in a running loop (e.g. Jupyter), run the new loop in a separate thread
753+
import concurrent.futures
754+
with concurrent.futures.ThreadPoolExecutor() as pool:
755+
results = pool.submit(asyncio.run, fetch_all(urls)).result()
756+
else:
757+
results = asyncio.run(fetch_all(urls))
758+
759+
# Process results
760+
for res in results:
761+
if isinstance(res, Exception):
762+
print(f"Warning: Error retrieving page: {res}")
763+
elif isinstance(res, dict) and 'value' in res:
764+
all_data.extend(res['value'])
765+
766+
# Create DataFrame from all collected data
767+
self.df = pd.DataFrame.from_dict(all_data)
692768
return self.df
693769

694770
def query_by_name(self, product_name: str) -> pd.DataFrame:

tests/test_pagination.py

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
import pytest
2+
import sys, os
3+
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
4+
5+
from phidown.search import CopernicusDataSearcher
6+
from unittest.mock import Mock, patch
7+
import pandas as pd
8+
9+
# Define the path to the config file relative to the test file
10+
CONFIG_PATH = os.path.join(os.path.dirname(__file__), '..', 'phidown', 'config.json')
11+
12+
13+
def test_pagination_disabled_by_default():
14+
"""Test that pagination is not triggered when count=False"""
15+
searcher = CopernicusDataSearcher()
16+
searcher.query_by_filter(
17+
collection_name='SENTINEL-1',
18+
product_type='SLC',
19+
top=10,
20+
count=False # Pagination should not trigger
21+
)
22+
23+
# Mock response with large count
24+
mock_response = Mock()
25+
mock_response.json.return_value = {
26+
'value': [{'Id': f'product_{i}', 'Name': f'name_{i}'} for i in range(10)],
27+
'@odata.count': 1500 # More than top=10, but count=False
28+
}
29+
mock_response.raise_for_status = Mock()
30+
31+
with patch('requests.get', return_value=mock_response) as mock_get:
32+
df = searcher.execute_query()
33+
34+
# Should only make one request since count=False
35+
assert mock_get.call_count == 1
36+
assert len(df) == 10 # Only the first page
37+
38+
39+
def test_pagination_when_count_enabled_and_results_exceed_top():
40+
"""Test pagination is triggered when count=True and results > top"""
41+
searcher = CopernicusDataSearcher()
42+
searcher.query_by_filter(
43+
collection_name='SENTINEL-1',
44+
product_type='SLC',
45+
top=5,
46+
count=True
47+
)
48+
49+
# Mock responses for pagination
50+
mock_response_1 = Mock()
51+
mock_response_1.json.return_value = {
52+
'value': [{'Id': f'product_{i}', 'Name': f'name_{i}'} for i in range(5)],
53+
'@odata.count': 12
54+
}
55+
mock_response_1.raise_for_status = Mock()
56+
57+
mock_response_2 = Mock()
58+
mock_response_2.json.return_value = {
59+
'value': [{'Id': f'product_{i}', 'Name': f'name_{i}'} for i in range(5, 10)]
60+
}
61+
mock_response_2.raise_for_status = Mock()
62+
63+
mock_response_3 = Mock()
64+
mock_response_3.json.return_value = {
65+
'value': [{'Id': f'product_{i}', 'Name': f'name_{i}'} for i in range(10, 12)]
66+
}
67+
mock_response_3.raise_for_status = Mock()
68+
69+
with patch('requests.get', side_effect=[mock_response_1, mock_response_2, mock_response_3]) as mock_get:
70+
df = searcher.execute_query()
71+
72+
# Should make 3 requests total
73+
assert mock_get.call_count == 3
74+
assert len(df) == 12
75+
76+
# Check that skip parameters were used correctly
77+
calls = mock_get.call_args_list
78+
assert '$skip=5' in calls[1][0][0]
79+
assert '$skip=10' in calls[2][0][0]
80+
81+
82+
def test_no_pagination_when_results_within_top_limit():
83+
"""Test no pagination when count=True but results <= top"""
84+
searcher = CopernicusDataSearcher()
85+
searcher.query_by_filter(
86+
collection_name='SENTINEL-1',
87+
product_type='SLC',
88+
top=100,
89+
count=True
90+
)
91+
92+
# Mock response with count less than top
93+
mock_response = Mock()
94+
mock_response.json.return_value = {
95+
'value': [{'Id': f'product_{i}', 'Name': f'name_{i}'} for i in range(50)],
96+
'@odata.count': 50 # Less than top=100
97+
}
98+
mock_response.raise_for_status = Mock()
99+
100+
with patch('requests.get', return_value=mock_response) as mock_get:
101+
df = searcher.execute_query()
102+
103+
# Should only make one request
104+
assert mock_get.call_count == 1
105+
assert len(df) == 50
106+
107+
108+
def test_pagination_with_1000_page_size():
109+
"""Test pagination with default page size of 1000"""
110+
searcher = CopernicusDataSearcher()
111+
searcher.query_by_filter(
112+
collection_name='SENTINEL-1',
113+
product_type='SLC',
114+
top=1000, # Default page size
115+
count=True
116+
)
117+
118+
# Mock responses for large dataset
119+
mock_response_1 = Mock()
120+
mock_response_1.json.return_value = {
121+
'value': [{'Id': f'product_{i}', 'Name': f'name_{i}'} for i in range(1000)],
122+
'@odata.count': 2500
123+
}
124+
mock_response_1.raise_for_status = Mock()
125+
126+
mock_response_2 = Mock()
127+
mock_response_2.json.return_value = {
128+
'value': [{'Id': f'product_{i}', 'Name': f'name_{i}'} for i in range(1000, 2000)]
129+
}
130+
mock_response_2.raise_for_status = Mock()
131+
132+
mock_response_3 = Mock()
133+
mock_response_3.json.return_value = {
134+
'value': [{'Id': f'product_{i}', 'Name': f'name_{i}'} for i in range(2000, 2500)]
135+
}
136+
mock_response_3.raise_for_status = Mock()
137+
138+
with patch('requests.get', side_effect=[mock_response_1, mock_response_2, mock_response_3]) as mock_get:
139+
df = searcher.execute_query()
140+
141+
# Should make 3 requests total
142+
assert mock_get.call_count == 3
143+
assert len(df) == 2500
144+
145+
# Check skip parameters
146+
calls = mock_get.call_args_list
147+
assert '$skip=1000' in calls[1][0][0]
148+
assert '$skip=2000' in calls[2][0][0]
149+
150+
151+
def test_pagination_handles_request_errors_gracefully():
152+
"""Test that pagination handles request errors gracefully"""
153+
searcher = CopernicusDataSearcher()
154+
searcher.query_by_filter(
155+
collection_name='SENTINEL-1',
156+
product_type='SLC',
157+
top=5,
158+
count=True
159+
)
160+
161+
# Mock first response successful
162+
mock_response_1 = Mock()
163+
mock_response_1.json.return_value = {
164+
'value': [{'Id': f'product_{i}', 'Name': f'name_{i}'} for i in range(5)],
165+
'@odata.count': 15
166+
}
167+
mock_response_1.raise_for_status = Mock()
168+
169+
# Mock second response fails
170+
mock_response_2 = Mock()
171+
mock_response_2.raise_for_status.side_effect = Exception("Network error")
172+
173+
with patch('requests.get', side_effect=[mock_response_1, mock_response_2]):
174+
# Should not raise exception, but return partial results
175+
df = searcher.execute_query()
176+
177+
# Should return at least the first page
178+
assert len(df) == 5
179+
assert 'product_0' in df['Id'].values

0 commit comments

Comments
 (0)