Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ boto3>=1.4.0
python-keystoneclient>=1.8.1
python-swiftclient
six
tqdm
1 change: 0 additions & 1 deletion stor/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from six import PY3

from stor import utils
import six


class TreeWalkWarning(Warning):
Expand Down
1 change: 1 addition & 0 deletions stor/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
"""
import argparse
import copy
from collections import OrderedDict

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK this import is unused

from functools import partial
import locale
import logging
Expand Down
71 changes: 18 additions & 53 deletions stor/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,63 +113,19 @@ def _get_s3_transfer(config=None):


class S3DownloadLogger(utils.BaseProgressLogger):
def __init__(self, total_download_objects):
super(S3DownloadLogger, self).__init__(progress_logger)
self.total_download_objects = total_download_objects
self.downloaded_bytes = 0

def update_progress(self, result):
"""Tracks number of bytes downloaded."""
self.downloaded_bytes += (os.path.getsize(result['dest'])
if not utils.has_trailing_slash(result['source']) else 0)

def get_start_message(self):
return 'starting download of %s objects' % self.total_download_objects

def get_finish_message(self):
return 'download complete - %s' % self.get_progress_message()

def get_progress_message(self):
elapsed_time = self.get_elapsed_time()
formatted_elapsed_time = self.format_time(elapsed_time)
mb = self.downloaded_bytes / (1024 * 1024.0)
mb_s = mb / elapsed_time.total_seconds() if elapsed_time else 0.0
return (
'%s/%s\t'
'%s\t'
'%0.2f MB\t'
'%0.2f MB/s'
) % (self.num_results, self.total_download_objects, formatted_elapsed_time, mb, mb_s)
logger = progress_logger

def add_result(self, result):
self.update_progress(num_objects=1, addl_bytes=os.path.getszie(result['dest']))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a typo? getszie

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm... maybe I didn't let the s3 version bubble up long enough..



class S3UploadLogger(utils.BaseProgressLogger):
def __init__(self, total_upload_objects):
super(S3UploadLogger, self).__init__(progress_logger)
self.total_upload_objects = total_upload_objects
self.uploaded_bytes = 0
logger = progress_logger

def update_progress(self, result):
def add_result(self, result):
"""Keep track of total uploaded bytes by referencing the object sizes"""
self.uploaded_bytes += (os.path.getsize(result['source'])
if not utils.has_trailing_slash(result['dest']) else 0)

def get_start_message(self):
return 'starting upload of %s objects' % self.total_upload_objects

def get_finish_message(self):
return 'upload complete - %s' % self.get_progress_message()

def get_progress_message(self):
elapsed_time = self.get_elapsed_time()
formatted_elapsed_time = self.format_time(elapsed_time)
mb = self.uploaded_bytes / (1024 * 1024.0)
mb_s = mb / elapsed_time.total_seconds() if elapsed_time else 0.0
return (
'%s/%s\t'
'%s\t'
'%0.2f MB\t'
'%0.2f MB/s'
) % (self.num_results, self.total_upload_objects, formatted_elapsed_time, mb, mb_s)
self.update_progress(1, (os.path.getsize(result['source'])
if not utils.has_trailing_slash(result['dest']) else 0))


class S3Path(OBSPath):
Expand Down Expand Up @@ -627,7 +583,16 @@ def download(self, dest, condition=None, use_manifest=False, **kwargs):
return downloaded

def _upload_object(self, upload_obj, config=None):
"""Upload a single object given an OBSUploadObject."""
"""Upload a single object given an OBSUploadObject.

Returns:
dict: source, dest (str), success (bool), and optionally an error key"""
result = {
'source': upload_obj.source,
'dest': str(upload_obj.object_name),
'success': True
}

if utils.has_trailing_slash(upload_obj.object_name):
# Handle empty directories separately
ul_kwargs = {
Expand Down
150 changes: 65 additions & 85 deletions stor/swift.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,81 +338,45 @@ def _validate_manifest_download(expected_objs, download_results):
return set(expected_objs).issubset(downloaded_objs)


class SwiftDownloadLogger(utils.BaseProgressLogger):
def __init__(self):
super(SwiftDownloadLogger, self).__init__(progress_logger)
self.downloaded_bytes = 0
class SwiftDeleteLogger(utils.BaseProgressLogger):
progress_logger = progress_logger
show_bytes_bar = False

def update_progress(self, result):
"""Tracks number of bytes downloaded.
def add_result(self, result):
action = result.get('action', None)
if action == 'delete_object':
self.update_progress(1, 0)
elif action == 'bulk_delete':
deleted = result.get('result', {}).get('Number Deleted') or 0
self.update_progress(deleted, 0)
elif action == 'delete_container':
logger.debug('deleted container: %s', result['container'])

The ``read_length`` property in swift download results contains the
total size of the object
"""
self.downloaded_bytes += result.get('read_length', 0)

class SwiftDownloadLogger(utils.BaseProgressLogger):
def add_result(self, result):
"""Only add results to progress if they are ``download_object`` actions.

The ``download_object`` action encompasses creating an empty directory
marker
"""
if result.get('action', None) == 'download_object':
super(SwiftDownloadLogger, self).add_result(result)

def get_start_message(self):
return 'starting download'

def get_finish_message(self):
return 'download complete - %s' % self.get_progress_message()

def get_progress_message(self):
elapsed_time = self.get_elapsed_time()
formatted_elapsed_time = self.format_time(elapsed_time)
mb = self.downloaded_bytes / (1024 * 1024.0)
mb_s = mb / elapsed_time.total_seconds() if elapsed_time else 0
return (
'%s\t'
'%s\t'
'%0.2f MB\t'
'%0.2f MB/s'
) % (self.num_results, formatted_elapsed_time, mb, mb_s)
self.update_progress(1, result.get('read_length', 0))


class SwiftUploadLogger(utils.BaseProgressLogger):
def __init__(self, total_upload_objects, upload_object_sizes):
super(SwiftUploadLogger, self).__init__(progress_logger)
self.total_upload_objects = total_upload_objects
self.upload_object_sizes = upload_object_sizes
self.uploaded_bytes = 0
def __init__(self, total_objects, size_map, **tqdm_args):
"""Helper that uses size map to initialize the number of bytes for transfer.

def update_progress(self, result):
"""Keep track of total uploaded bytes by referencing the object sizes"""
self.uploaded_bytes += self.upload_object_sizes.get(result['path'], 0)
Args:
total_objects (int): num objects to transfer
size_map (Dict[str, int]): maps path => size bytes
"""
self.size_map = size_map
super(SwiftUploadLogger, self).__init__(
total_objects,
total_bytes=sum(self.size_map.values()),
**tqdm_args
)

def add_result(self, result):
"""Only add results if they are ``upload_object`` and ``create_dir_marker``
actions"""
if result.get('action', None) in ('upload_object', 'create_dir_marker'):
super(SwiftUploadLogger, self).add_result(result)

def get_start_message(self):
return 'starting upload of %s objects' % self.total_upload_objects

def get_finish_message(self):
return 'upload complete - %s' % self.get_progress_message()

def get_progress_message(self):
elapsed_time = self.get_elapsed_time()
formatted_elapsed_time = self.format_time(elapsed_time)
mb = self.uploaded_bytes / (1024 * 1024.0)
mb_s = mb / elapsed_time.total_seconds() if elapsed_time else 0
return (
'%s/%s\t'
'%s\t'
'%0.2f MB\t'
'%0.2f MB/s'
) % (self.num_results, self.total_upload_objects, formatted_elapsed_time, mb, mb_s)
self.update_progress(1, self.size_map.get(result['path'], 0))


class SwiftPath(OBSPath):
Expand Down Expand Up @@ -572,7 +536,12 @@ def _swift_service_call(self, method_name, *args, **kwargs):
raise r['error']
results.append(r)
if service_progress_logger:
service_progress_logger.add_result(r)
try:
service_progress_logger.add_result(r)
except Exception as e: # pragma: no cover
logger.error('{logger_type} failed ({exc_class}: %s)'.format(
logger_type=type(service_progress_logger).__name__,
exc_class=type(e).__name__), e)

return results

Expand Down Expand Up @@ -1047,6 +1016,7 @@ def download(self,
raise ValueError('cannot call download on tenant with no container')
utils.validate_condition(condition)

total_objects = 0
if use_manifest:
# Do a full list with the manifest before the download. This will retry until
# all results in the manifest can be listed, which helps ensure the download
Expand All @@ -1056,6 +1026,12 @@ def download(self,
manifest_cond = partial(_validate_manifest_download, object_names)
condition = (utils.join_conditions(condition, manifest_cond)
if condition else manifest_cond)
total_objects = len(object_names)
object_sizes = None
if not self.resource:
stat = self.stat()
object_sizes = int(stat['Bytes'])
total_objects = int(stat['Objects'])

options = settings.get()['swift:download']
service_options = {
Expand All @@ -1069,7 +1045,7 @@ def download(self,
'skip_identical': options['skip_identical'],
'shuffle': options['shuffle']
}
with SwiftDownloadLogger() as dl:
with SwiftDownloadLogger(total_objects, object_sizes) as dl:
results = self._swift_service_call('download',
self.container,
options=download_options,
Expand Down Expand Up @@ -1297,26 +1273,30 @@ def wrapper(*args, **kwargs):
return []
return wrapper

if not to_delete.resource:
results = _ignore_not_found(self._swift_service_call)('delete',
to_delete.container,
_service_options=service_options)
# Try to delete a segment container since swift client does not
# do this automatically
if not deleting_segments:
segment_containers = ('%s_segments' % to_delete.container,
'.segments_%s' % to_delete.container,
'%s+segments' % to_delete.container)
for segment_container in segment_containers:
_ignore_not_found(self._swift_service_call)('delete',
segment_container,
_service_options=service_options)
else:
objs_to_delete = [p.resource for p in to_delete.list()]
results = _ignore_not_found(self._swift_service_call)('delete',
self.container,
objs_to_delete,
_service_options=service_options)
with SwiftDeleteLogger(None, {}, desc=str(to_delete)) as dl:
if not to_delete.resource:
results = _ignore_not_found(self._swift_service_call)('delete',
to_delete.container,
_progress_logger=dl,
_service_options=service_options)
# Try to delete a segment container since swift client does not
# do this automatically
if not deleting_segments:
segment_containers = ('%s_segments' % to_delete.container,
'.segments_%s' % to_delete.container,
'%s+segments' % to_delete.container)
for segment_container in segment_containers:
_ignore_not_found(self._swift_service_call)('delete',
segment_container,
_progress_logger=dl,
_service_options=service_options)
else:
objs_to_delete = [p.resource for p in to_delete.list()]
results = _ignore_not_found(self._swift_service_call)('delete',
self.container,
objs_to_delete,
_progress_logger=dl,
_service_options=service_options)

# Verify that all objects have been deleted before returning. Otherwise try deleting again
with settings.use({'swift': {'num_retries': 0}}):
Expand Down
Loading