Skip to content

Commit 9b191f7

Browse files
Port ndi.cloud.sync.downloadNew and sync infrastructure
1 parent d9d760b commit 9b191f7

20 files changed

Lines changed: 900 additions & 0 deletions
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
from ..implementation.datasets.get_dataset import GetDataset as GetDatasetImpl
2+
3+
def get_dataset(dataset_id):
4+
"""
5+
User-facing wrapper to get dataset details.
6+
7+
Args:
8+
dataset_id (str): The ID of the dataset.
9+
10+
Returns:
11+
tuple: (success, answer, response, url)
12+
"""
13+
api_call = GetDatasetImpl(dataset_id)
14+
return api_call.execute()

src/ndi/cloud/api/documents.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from .implementation.documents.delete_document import DeleteDocument as DeleteDocumentImpl
55
from .implementation.documents.list_dataset_documents import ListDatasetDocuments as ListDatasetDocumentsImpl
66
from .implementation.documents.list_dataset_documents_all import ListDatasetDocumentsAll as ListDatasetDocumentsAllImpl
7+
from .implementation.documents.get_bulk_download_url import GetBulkDownloadURL as GetBulkDownloadURLImpl
78

89
def add_document(dataset_id, document_info):
910
"""
@@ -90,3 +91,17 @@ def list_dataset_documents_all(dataset_id, page_size=20):
9091
"""
9192
api_call = ListDatasetDocumentsAllImpl(dataset_id, page_size)
9293
return api_call.execute()
94+
95+
def get_bulk_download_url(dataset_id, document_ids=None):
96+
"""
97+
Retrieves a pre-signed URL for bulk document download.
98+
99+
Args:
100+
dataset_id (str): The ID of the dataset.
101+
document_ids (list of str, optional): List of cloud document IDs to download.
102+
103+
Returns:
104+
tuple: (success, answer, response, url)
105+
"""
106+
api_call = GetBulkDownloadURLImpl(dataset_id, document_ids)
107+
return api_call.execute()

src/ndi/cloud/api/files/__init__.py

Whitespace-only changes.
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
from ..implementation.files.get_file_details import GetFileDetails as GetFileDetailsImpl
2+
3+
def get_file_details(dataset_id, file_uid):
4+
"""
5+
User-facing wrapper to get file details.
6+
7+
Args:
8+
dataset_id (str): The ID of the dataset.
9+
file_uid (str): The UID of the file.
10+
11+
Returns:
12+
tuple: (success, answer, response, url)
13+
"""
14+
api_call = GetFileDetailsImpl(dataset_id, file_uid)
15+
return api_call.execute()
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
from ...call import Call
2+
from ... import url
3+
from ....authenticate import authenticate
4+
import requests
5+
import json
6+
7+
class GetDataset(Call):
8+
"""
9+
Implementation class for getting dataset details.
10+
"""
11+
12+
def __init__(self, dataset_id):
13+
"""
14+
Creates a new GetDataset API call object.
15+
16+
Args:
17+
dataset_id (str): The ID of the dataset.
18+
"""
19+
self.dataset_id = dataset_id
20+
self.endpoint_name = 'get_dataset'
21+
22+
def execute(self):
23+
"""
24+
Performs the API call.
25+
"""
26+
token = authenticate()
27+
api_url = url.get_url(self.endpoint_name, dataset_id=self.dataset_id)
28+
29+
headers = {
30+
'Accept': 'application/json',
31+
'Authorization': f'Bearer {token}'
32+
}
33+
34+
response = requests.get(api_url, headers=headers)
35+
36+
if response.status_code == 200:
37+
return True, response.json(), response, api_url
38+
else:
39+
try:
40+
answer = response.json()
41+
except json.JSONDecodeError:
42+
answer = response.text
43+
return False, answer, response, api_url
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
from ...call import Call
2+
from ... import url
3+
from ....authenticate import authenticate
4+
import requests
5+
import json
6+
7+
class GetBulkDownloadURL(Call):
8+
"""
9+
Implementation class for getting a bulk download URL.
10+
"""
11+
12+
def __init__(self, dataset_id, document_ids=None):
13+
"""
14+
Creates a new GetBulkDownloadURL API call object.
15+
16+
Args:
17+
dataset_id (str): The ID of the dataset.
18+
document_ids (list of str, optional): List of cloud document IDs to download.
19+
If None or empty, all documents are included.
20+
"""
21+
self.dataset_id = dataset_id
22+
self.document_ids = document_ids if document_ids is not None else []
23+
self.endpoint_name = 'bulk_download_documents'
24+
25+
def execute(self):
26+
"""
27+
Performs the API call.
28+
"""
29+
token = authenticate()
30+
api_url = url.get_url(self.endpoint_name, dataset_id=self.dataset_id)
31+
32+
headers = {
33+
'Accept': 'application/json',
34+
'Content-Type': 'application/json',
35+
'Authorization': f'Bearer {token}'
36+
}
37+
38+
# The body specifies which document IDs to include
39+
data = {'documentIds': self.document_ids}
40+
41+
response = requests.post(api_url, headers=headers, json=data)
42+
43+
if response.status_code in [200, 201]:
44+
try:
45+
answer = response.json().get('url')
46+
return True, answer, response, api_url
47+
except json.JSONDecodeError:
48+
return False, response.text, response, api_url
49+
else:
50+
try:
51+
answer = response.json()
52+
except json.JSONDecodeError:
53+
answer = response.text
54+
return False, answer, response, api_url
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
from ...call import Call
2+
from ... import url
3+
from ....authenticate import authenticate
4+
import requests
5+
import json
6+
7+
class GetFileDetails(Call):
8+
"""
9+
Implementation class for getting file details.
10+
"""
11+
12+
def __init__(self, dataset_id, file_uid):
13+
"""
14+
Creates a new GetFileDetails API call object.
15+
16+
Args:
17+
dataset_id (str): The ID of the dataset.
18+
file_uid (str): The UID of the file.
19+
"""
20+
self.dataset_id = dataset_id
21+
self.file_uid = file_uid
22+
self.endpoint_name = 'get_file_details'
23+
24+
def execute(self):
25+
"""
26+
Performs the API call.
27+
"""
28+
token = authenticate()
29+
api_url = url.get_url(self.endpoint_name, dataset_id=self.dataset_id, file_uid=self.file_uid)
30+
31+
headers = {
32+
'Accept': 'application/json',
33+
'Authorization': f'Bearer {token}'
34+
}
35+
36+
response = requests.get(api_url, headers=headers)
37+
38+
if response.status_code == 200:
39+
return True, response.json(), response, api_url
40+
else:
41+
try:
42+
answer = response.json()
43+
except json.JSONDecodeError:
44+
answer = response.text
45+
return False, answer, response, api_url

src/ndi/cloud/download/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from .download_utils import download_document_collection, download_dataset_files
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
import os
2+
import requests
3+
import json
4+
import tempfile
5+
import zipfile
6+
import time
7+
from ..api.documents import get_bulk_download_url
8+
from ..api.datasets import get_dataset
9+
from ..api.files import get_file_details
10+
from ...document import Document
11+
12+
def download_document_collection(dataset_id, document_ids=None, timeout=20, chunk_size=2000):
13+
"""
14+
Download documents using bulk download with chunking.
15+
"""
16+
if not document_ids:
17+
# Avoid circular import by importing inside function
18+
from ...sync.internal.document_utils import list_remote_document_ids
19+
print('No document IDs provided; fetching all document IDs from the server...')
20+
id_map = list_remote_document_ids(dataset_id)
21+
document_ids = id_map['apiId']
22+
if not document_ids:
23+
return []
24+
25+
num_docs = len(document_ids)
26+
num_chunks = (num_docs + chunk_size - 1) // chunk_size
27+
document_chunks = [document_ids[i:i + chunk_size] for i in range(0, num_docs, chunk_size)]
28+
29+
all_document_structs = []
30+
print(f'Beginning download of {num_docs} documents in {num_chunks} chunk(s).')
31+
32+
for c, chunk_doc_ids in enumerate(document_chunks, 1):
33+
print(f' Processing chunk {c} of {num_chunks} ({len(chunk_doc_ids)} documents)...')
34+
35+
success, download_url, _, _ = get_bulk_download_url(dataset_id, chunk_doc_ids)
36+
if not success:
37+
raise RuntimeError(f"Failed to get bulk download URL for chunk {c}")
38+
39+
with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as temp_zip:
40+
temp_zip_path = temp_zip.name
41+
42+
try:
43+
is_finished = False
44+
start_time = time.time()
45+
46+
while not is_finished and (time.time() - start_time) < timeout:
47+
try:
48+
response = requests.get(download_url, stream=True)
49+
if response.status_code == 200:
50+
with open(temp_zip_path, 'wb') as f:
51+
for chunk in response.iter_content(chunk_size=8192):
52+
f.write(chunk)
53+
is_finished = True
54+
else:
55+
time.sleep(1)
56+
except Exception:
57+
time.sleep(1)
58+
59+
if not is_finished:
60+
raise RuntimeError(f"Download failed for chunk {c} after timeout")
61+
62+
with tempfile.TemporaryDirectory() as extract_dir:
63+
with zipfile.ZipFile(temp_zip_path, 'r') as zip_ref:
64+
zip_ref.extractall(extract_dir)
65+
66+
# Assume one JSON file per chunk as per Matlab logic (unzippedFiles{1})
67+
# But zip might contain multiple files. Matlab code: jsonFile = unzippedFiles{1}
68+
# We iterate over extracted files
69+
for filename in os.listdir(extract_dir):
70+
if filename.endswith('.json'):
71+
with open(os.path.join(extract_dir, filename), 'r') as f:
72+
# Handling potential NaN/Null is skipped for now, assuming standard JSON
73+
document_structs = json.load(f)
74+
# dropDuplicateDocsFromJsonDecode logic is skipped for now
75+
if isinstance(document_structs, list):
76+
all_document_structs.extend(document_structs)
77+
else:
78+
all_document_structs.append(document_structs)
79+
finally:
80+
if os.path.exists(temp_zip_path):
81+
os.remove(temp_zip_path)
82+
83+
print(f'Download complete. Converting {len(all_document_structs)} structs to NDI documents...')
84+
documents = [Document(d) for d in all_document_structs]
85+
print('Processing complete.')
86+
return documents
87+
88+
def download_dataset_files(cloud_dataset_id, target_folder, file_uuids=None, verbose=True, abort_on_error=True):
89+
"""
90+
Downloads dataset files from a cloud dataset.
91+
"""
92+
success, dataset_info, _, _ = get_dataset(cloud_dataset_id)
93+
if not success:
94+
raise RuntimeError(f"Failed to get dataset: {dataset_info}")
95+
96+
if 'files' not in dataset_info and file_uuids is not None:
97+
raise RuntimeError('No files found in the dataset despite files requested.')
98+
99+
if 'files' not in dataset_info:
100+
return
101+
102+
files = _filter_files_to_download(dataset_info['files'], file_uuids)
103+
num_files = len(files)
104+
105+
if verbose:
106+
print(f'Will download {num_files} files...')
107+
108+
for i, file_info in enumerate(files, 1):
109+
if verbose:
110+
_display_progress(i, num_files)
111+
112+
file_uid = file_info['uid']
113+
exists_on_cloud = file_info.get('uploaded', False)
114+
115+
if not exists_on_cloud:
116+
print(f'Warning: File with uuid "{file_uid}" does not exist on the cloud, skipping...')
117+
continue
118+
119+
target_filepath = os.path.join(target_folder, file_uid)
120+
if os.path.exists(target_filepath):
121+
if verbose:
122+
print(f'File {i} already exists locally, skipping...')
123+
continue
124+
125+
success, answer, _, _ = get_file_details(cloud_dataset_id, file_uid)
126+
if not success:
127+
print(f"Warning: Failed to get file details: {answer}")
128+
continue
129+
130+
download_url = answer.get('downloadUrl')
131+
if not download_url:
132+
print(f"Warning: No download URL for file {file_uid}")
133+
continue
134+
135+
try:
136+
response = requests.get(download_url, stream=True)
137+
response.raise_for_status()
138+
with open(target_filepath, 'wb') as f:
139+
for chunk in response.iter_content(chunk_size=8192):
140+
f.write(chunk)
141+
except Exception as e:
142+
if abort_on_error:
143+
raise e
144+
else:
145+
print(f"Warning: Download failed for file {i}: {e}")
146+
147+
if verbose:
148+
print('File download complete.')
149+
150+
def _filter_files_to_download(files, file_uuids):
151+
if file_uuids is not None:
152+
# Assuming file_uuids is a list of strings
153+
# Filter files where uid is in file_uuids
154+
filtered_files = [f for f in files if f['uid'] in file_uuids]
155+
return filtered_files
156+
return files
157+
158+
def _display_progress(current_file_number, total_file_number):
159+
percent_finished = round((current_file_number / total_file_number) * 100)
160+
print(f'Downloading file {current_file_number} of {total_file_number} ({percent_finished}% complete) ...')
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from ...query import Query
2+
3+
def get_cloud_dataset_id_for_local_dataset(ndi_dataset):
4+
"""
5+
Retrieves the cloud dataset ID for a local dataset.
6+
"""
7+
cloud_dataset_id_query = Query('', 'isa', 'dataset_remote')
8+
# Assuming database_search returns a list of documents
9+
cloud_dataset_id_documents = ndi_dataset.database_search(cloud_dataset_id_query)
10+
11+
if len(cloud_dataset_id_documents) > 1:
12+
raise RuntimeError(f"Found more than one remote cloudDatasetId for the local dataset: {ndi_dataset.path}.")
13+
elif cloud_dataset_id_documents:
14+
# Assuming document structure
15+
doc = cloud_dataset_id_documents[0]
16+
return doc.document_properties['dataset_remote']['dataset_id'], cloud_dataset_id_documents
17+
else:
18+
return '', []

0 commit comments

Comments
 (0)