diff --git a/cs3api4lab/api/cs3_file_api.py b/cs3api4lab/api/cs3_file_api.py index cba271f7..fac74bd5 100644 --- a/cs3api4lab/api/cs3_file_api.py +++ b/cs3api4lab/api/cs3_file_api.py @@ -14,6 +14,7 @@ import cs3.gateway.v1beta1.gateway_api_pb2_grpc as cs3gw_grpc import cs3.rpc.v1beta1.code_pb2 as cs3code import cs3.storage.provider.v1beta1.provider_api_pb2 as cs3sp + from cs3api4lab.exception.exceptions import ResourceNotFoundError from cs3api4lab.utils.file_utils import FileUtils @@ -23,6 +24,7 @@ from cs3api4lab.auth.channel_connector import ChannelConnector from cs3api4lab.config.config_manager import Cs3ConfigManager from cs3api4lab.api.lock_manager import LockManager +from cs3api4lab.utils.sqlquerycache import SqlQueryCache class Cs3FileApi: @@ -31,6 +33,7 @@ class Cs3FileApi: auth = None config = None lock_manager = None + sql_cache = None def __init__(self, log): self.log = log @@ -41,9 +44,8 @@ def __init__(self, log): intercept_channel = grpc.intercept_channel(channel, auth_interceptor) self.cs3_api = cs3gw_grpc.GatewayAPIStub(intercept_channel) self.storage_api = StorageApi(log) - self.lock_manager = LockManager(log) - + self.sql_cache = SqlQueryCache(config=self.config) return def mount_point(self): @@ -64,27 +66,53 @@ def stat_info(self, file_path, endpoint='/'): """ time_start = time.time() stat = self.storage_api.stat(file_path, endpoint) - if stat.status.code == cs3code.CODE_OK: - time_end = time.time() - self.log.info('msg="Invoked stat" fileid="%s" elapsedTimems="%.1f"' % (file_path, (time_end - time_start) * 1000)) - return { - 'inode': {'storage_id': stat.info.id.storage_id, - 'opaque_id': stat.info.id.opaque_id}, - 'filepath': stat.info.path, - 'userid': stat.info.owner.opaque_id, - 'size': stat.info.size, - 'mtime': stat.info.mtime.seconds, - 'type': stat.info.type, - 'mime_type': stat.info.mime_type, - 'idp': stat.info.owner.idp, - 'permissions': stat.info.permission_set - } - elif stat.status.code == cs3code.CODE_NOT_FOUND: + time_end = time.time() + self.log.info( + 'msg="Invoked stat" fileid="%s" elapsedTimems="%.1f"' % (file_path, (time_end - time_start) * 1000)) + + if stat.status.code == cs3code.CODE_NOT_FOUND: self.log.info('msg="Failed stat" fileid="%s" reason="%s"' % (file_path, stat.status.message)) raise FileNotFoundError(stat.status.message + ", file " + file_path) else: - self._handle_error(stat) - + self.sql_cache.save_item( + storage_id=stat.info.id.storage_id, + opaque_id=stat.info.id.storage_id, + stored_value=stat + ) + return self._stat_output(stat) + + def stat_info_by_resource(self, opaque_id, storage_id): + """ + Stat a file and returns (size, mtime) as well as other extended info using the given userid as access token. + Note that endpoint here means the storage id. Note that fileid can be either a path (which MUST begin with /) + or an id (which MUST NOT start with a /). + """ + time_start = time.time() + # + if self.sql_cache.item_exists(storage_id=storage_id, opaque_id=opaque_id): + stat = self.sql_cache.get_stored_value( + storage_id=storage_id, + opaque_id=opaque_id, + message=cs3sp.StatResponse() + ) + else: + stat = self.storage_api.stat_by_resource(opaque_id, storage_id) + if stat.status.code == cs3code.CODE_NOT_FOUND: + self.log.info( + 'msg="Failed stat" fileid="%s" storageid="%s" reason="%s"' % + (opaque_id, storage_id, stat.status.message) + ) + raise FileNotFoundError(stat.status.message + ", file " + stat.info.path) + else: + self.sql_cache.save_item(storage_id=storage_id, opaque_id=opaque_id, stored_value=stat) + + time_end = time.time() + self.log.info( + 'msg="Invoked stat" fileid="%s" storageid="%s" elapsedTimems="%.1f"' % ( + opaque_id, storage_id, (time_end - time_start) * 1000)) + + return self._stat_output(stat) + def read_file(self, file_path, endpoint=None): """ Read a file using the given userid as access token. @@ -254,6 +282,23 @@ def create_directory(self, path, endpoint=None): def get_home_dir(self): return self.config.home_dir if self.config.home_dir else "" + def _stat_output(self, stat): + if stat.status.code == cs3code.CODE_OK: + return { + 'inode': {'storage_id': stat.info.id.storage_id, + 'opaque_id': stat.info.id.opaque_id}, + 'filepath': stat.info.path, + 'userid': stat.info.owner.opaque_id, + 'size': stat.info.size, + 'mtime': stat.info.mtime.seconds, + 'type': stat.info.type, + 'mime_type': stat.info.mime_type, + 'idp': stat.info.owner.idp, + 'permissions': stat.info.permission_set + } + else: + self._handle_error(stat) + def _handle_error(self, response): self.log.error(response) raise Exception("Incorrect server response: " + diff --git a/cs3api4lab/api/share_api_facade.py b/cs3api4lab/api/share_api_facade.py index 75fd0850..55c22b5e 100644 --- a/cs3api4lab/api/share_api_facade.py +++ b/cs3api4lab/api/share_api_facade.py @@ -1,4 +1,5 @@ import urllib.parse +import time import cs3.ocm.provider.v1beta1.provider_api_pb2_grpc as ocm_provider_api_grpc import cs3.storage.provider.v1beta1.resources_pb2 as Resources @@ -19,6 +20,7 @@ from cs3api4lab.api.storage_api import StorageApi from cs3api4lab.exception.exceptions import OCMDisabledError + class ShareAPIFacade: def __init__(self, log): self.log = log @@ -34,6 +36,7 @@ def __init__(self, log): self.ocm_share_api = Cs3OcmShareApi(log) self.storage_api = StorageApi(log) + return def create(self, endpoint, file_path, opaque_id, idp, role=Role.EDITOR, grantee_type=Grantee.USER, reshare=True): @@ -102,12 +105,18 @@ def list_shares(self): :return: created shares and OCM shares combined and mapped to Jupyter model :rtype: dict """ + time_start = time.time() share_list = self.share_api.list() if self.config.enable_ocm: ocm_share_list = self.ocm_share_api.list() else: ocm_share_list = None - return self.map_shares(share_list, ocm_share_list) + + mapped_shares = self.map_shares(share_list, ocm_share_list) + time_end = time.time() + print('shares times:', time_end - time_start) + + return mapped_shares def list_received(self, status=None): """ @@ -205,9 +214,9 @@ def map_shares_to_model(self, list_response, received=False): share = share.share try: user = self.user_api.get_user_info(share.owner.idp, share.owner.opaque_id) - stat = self.file_api.stat_info(urllib.parse.unquote(share.resource_id.opaque_id), - share.resource_id.storage_id) # todo remove this and use storage_logic - # stat = self.storage_logic.stat_info(urllib.parse.unquote(share.resource_id.opaque_id), share.resource_id.storage_id) + # if not self.stat_cache.item_exists(share.resource_id.storage_id, share.resource_id.opaque_id): + stat = self.file_api.stat_info_by_resource(urllib.parse.unquote(share.resource_id.opaque_id), + share.resource_id.storage_id) if stat['type'] == Resources.RESOURCE_TYPE_FILE: if hasattr(share.permissions.permissions, @@ -226,6 +235,7 @@ def map_shares_to_model(self, list_response, received=False): model['writable'] = True if ShareUtils.map_permissions_to_role( share.permissions.permissions) == 'editor' else False except Exception as e: + print(e) self.log.error("Unable to map share " + share.resource_id.opaque_id + ", " + e.__str__()) continue diff --git a/cs3api4lab/api/storage_api.py b/cs3api4lab/api/storage_api.py index ea780959..6eff03b1 100644 --- a/cs3api4lab/api/storage_api.py +++ b/cs3api4lab/api/storage_api.py @@ -46,6 +46,10 @@ def stat(self, file_path, endpoint='/'): ref = FileUtils.get_reference(file_path, endpoint) return self._stat_internal(ref) + def stat_by_resource(self, opaque_id, storage_id): + ref = FileUtils.get_reference_by_resource(opaque_id=opaque_id, storage_id=storage_id) + return self._stat_internal(ref) + def _stat_internal(self, ref): return self.cs3_api.Stat(request=cs3sp.StatRequest(ref=ref), metadata=[('x-access-token', self.auth.authenticate())]) diff --git a/cs3api4lab/config/config_manager.py b/cs3api4lab/config/config_manager.py index 7f6ea7cf..a4f1f201 100644 --- a/cs3api4lab/config/config_manager.py +++ b/cs3api4lab/config/config_manager.py @@ -80,6 +80,15 @@ class Config(LoggingConfigurable): oauth_token = Unicode( config=True, allow_none=True, help="""OAuth token""" ) + stat_cache_enabled = Bool( + config=True, default_value=False, help="""Stat caching is enabled""" + ) + stat_cache_file = Unicode( + config=True, default_value="./tmp_cache_file.db", allow_none=True, help="""Path to db file""" + ) + stat_cache_time = CInt( + config=True, default_value=180, allow_none=True, help="""Cache ttl in seconds""" + ) @default("reva_host") def _reva_host_default(self): @@ -180,6 +189,18 @@ def _oauth_file_default(self): def _oauth_token_default(self): return self._get_config_value("oauth_token") + @default("stat_cache_enabled") + def _stat_cache_enabled_default(self): + return self._get_config_value("stat_cache_enabled") in ["true", True] + + @default("stat_cache_file") + def _stat_cache_file_default(self): + return self._get_config_value("stat_cache_file") + + @default("stat_cache_time") + def _stat_cache_time_default(self): + return self._get_config_value("stat_cache_time") + def _get_config_value(self, key): env = os.getenv("CS3_" + key.upper()) if env: @@ -235,7 +256,8 @@ def _file_config(self, key): "eos_file": None, "eos_token": None, "oauth_file": None, - "oauth_token": None + "oauth_token": None, + "stat_cache_enabled": False, } diff --git a/cs3api4lab/tests/extensions.py b/cs3api4lab/tests/extensions.py index 16bb4aa4..35af4312 100644 --- a/cs3api4lab/tests/extensions.py +++ b/cs3api4lab/tests/extensions.py @@ -22,6 +22,8 @@ import cs3.sharing.ocm.v1beta1.ocm_api_pb2_grpc as ocm_api_grpc import cs3.gateway.v1beta1.gateway_api_pb2_grpc as grpc_gateway +from cs3api4lab.utils.sqlquerycache import SqlQueryCache + class ExtStorageApi(StorageApi): def __init__(self, log, config): @@ -76,6 +78,7 @@ def __init__(self, log, config) -> None: self.cs3_api = cs3gw_grpc.GatewayAPIStub(intercept_channel) self.lock_manager = ExtLockManager(log, config) self.storage_api = ExtStorageApi(log, config) + self.sql_cache = SqlQueryCache(config=self.config) class ExtCs3ShareApi(Cs3ShareApi): diff --git a/cs3api4lab/tests/jupyter-config/jupyter_cs3_config.json b/cs3api4lab/tests/jupyter-config/jupyter_cs3_config.json index 5ca9e15b..528d5c19 100644 --- a/cs3api4lab/tests/jupyter-config/jupyter_cs3_config.json +++ b/cs3api4lab/tests/jupyter-config/jupyter_cs3_config.json @@ -16,6 +16,7 @@ "locks_expiration_time": 10, "tus_enabled": false, "enable_ocm": false, - "shared_folder": "MyShares" + "shared_folder": "MyShares", + "stat_cache_enabled": false } } diff --git a/cs3api4lab/tests/share_test_base.py b/cs3api4lab/tests/share_test_base.py index 45d19b77..4f4cfa21 100644 --- a/cs3api4lab/tests/share_test_base.py +++ b/cs3api4lab/tests/share_test_base.py @@ -5,6 +5,7 @@ import cs3.rpc.v1beta1.code_pb2 as cs3code from collections import namedtuple + class ShareTestBase: storage_id = '123e4567-e89b-12d3-a456-426655440000' receiver_role = 'editor' @@ -34,10 +35,11 @@ def setUp(self): "authenticator_class": "cs3api4lab.auth.RevaPassword", "client_id": "marie", "client_secret": "radioactivity", - "locks_expiration_time": 10, - "tus_enabled": True, - "enable_ocm": False - } + "locks_expiration_time": 10, + "tus_enabled": True, + "enable_ocm": False, + "stat_cache_enabled": False + } marie_ext_config = namedtuple('MarieConfig', marie_ext_config)(**marie_ext_config) richard_local_config = { @@ -54,9 +56,10 @@ def setUp(self): "authenticator_class": "cs3api4lab.auth.RevaPassword", "client_id": "richard", "client_secret": "superfluidity", - "locks_expiration_time": 10, - "tus_enabled": True, - "enable_ocm": False + "locks_expiration_time": 10, + "tus_enabled": True, + "enable_ocm": False, + "stat_cache_enabled": False, } richard_local_config = namedtuple('richardConfig', richard_local_config)(**richard_local_config) @@ -152,7 +155,6 @@ def clear_locks_on_file(self, file, endpoint='/'): for lock in list(metadata.keys()): self.storage_api.set_metadata({lock: "{}"}, file, endpoint) - def remove_test_share(self, user, share_id): if user == 'einstein': self.share_api.remove(share_id) @@ -228,9 +230,9 @@ def remove_share_and_file_by_path(self, user, file_path): stat = storage.stat(file_path) if stat.status.code == cs3code.CODE_NOT_FOUND or stat.status.code == cs3code.CODE_INTERNAL: self.create_test_file(user, file_path) - #todo the code above won't be necessary after https://github.com/cs3org/reva/issues/2847 is fixed + # todo the code above won't be necessary after https://github.com/cs3org/reva/issues/2847 is fixed - shares = share_api.list_shares_for_filepath(file_path) #todo this won't work on CERNBOX + shares = share_api.list_shares_for_filepath(file_path) # todo this won't work on CERNBOX if shares: for share in shares: share_api.remove(share['opaque_id']) diff --git a/cs3api4lab/utils/file_utils.py b/cs3api4lab/utils/file_utils.py index 2bf6d716..aba114ce 100644 --- a/cs3api4lab/utils/file_utils.py +++ b/cs3api4lab/utils/file_utils.py @@ -18,6 +18,12 @@ def get_reference(file_id, endpoint=None): # assume we have an opaque fileid return storage_provider.Reference(resource_id=storage_provider.ResourceId(storage_id=endpoint, opaque_id=file_id)) + @staticmethod + def get_reference_by_resource(opaque_id, storage_id): + return storage_provider.Reference( + resource_id=storage_provider.ResourceId(storage_id=storage_id, opaque_id=opaque_id) + ) + @staticmethod def check_and_transform_file_path(file_id): config = Cs3ConfigManager().get_config() #note: can cause problems in tests because of the config, it should be passed as an argument diff --git a/cs3api4lab/utils/sqlquerycache.py b/cs3api4lab/utils/sqlquerycache.py new file mode 100644 index 00000000..df2dcfb1 --- /dev/null +++ b/cs3api4lab/utils/sqlquerycache.py @@ -0,0 +1,100 @@ +from google.protobuf.json_format import MessageToJson, Parse +import time +from datetime import datetime, timedelta +import sqlite3 + + +class SqlQueryCache: + _cursor = None + _connection = None + config = None + + def __init__(self, config): + self.config = config + + @property + def connection(self): + if self._connection is None: + self._connection = sqlite3.connect( + self.config.stat_cache_file, + check_same_thread=False, + isolation_level=None # Set isolation level to None to autocommit all changes to the database. + ) + self._connection.row_factory = sqlite3.Row + return self._connection + + @property + def cursor(self): + if not self.config.stat_cache_enabled: + return None + + """Start a cursor and create a database called 'session'""" + if self._cursor is None: + self._cursor = self.connection.cursor() + self._cursor.execute( + """CREATE TABLE IF NOT EXISTS cached_stat + (storage_id, opaque_id, stored_value, ctime)""" + ) + + return self._cursor + + def close(self): + """Close the sqlite connection""" + if self._cursor is not None: + self._cursor.close() + self._cursor = None + + def item_exists(self, storage_id, opaque_id): + if not self.config.stat_cache_enabled: + return False + + """Check to see if the session of a given name exists""" + self.cursor.execute("SELECT * FROM cached_stat WHERE storage_id=? AND opaque_id=?", (storage_id, opaque_id)) + row = self.cursor.fetchone() + exists = False + print('-----------------') + + if row is not None: + expiration_time = datetime.fromtimestamp(row['ctime']) + timedelta(seconds=self.config.stat_cache_time) + exists = expiration_time > datetime.fromtimestamp(time.time()) + if not exists: + self.cursor.execute("DELETE FROM cached_stat WHERE storage_id=? AND opaque_id=?", + (storage_id, opaque_id)) + + print('item exists:', exists) + return exists + + def save_item(self, storage_id=None, opaque_id=None, stored_value=None): + if not self.config.stat_cache_enabled: + return False + + ctime = datetime.timestamp(datetime.now()) + stored_value = MessageToJson(stored_value) + if not self.item_exists(storage_id, opaque_id): + self.cursor.execute( + "INSERT INTO cached_stat VALUES (?,?,?,?)", (storage_id, opaque_id, stored_value, ctime) + ) + + def clean_old_records(self): + if not self.config.stat_cache_enabled: + return False + + older_than = datetime.now() + timedelta(seconds=-self.config.stat_cache_time) + older_than = datetime.timestamp(older_than) + self.cursor.execute("DELETE FROM cached_stat WHERE ctime < ?", (older_than,)) + return + + def get_stored_value(self, storage_id, opaque_id, message): + if not self.config.stat_cache_enabled: + return None + + self.cursor.execute("SELECT * FROM cached_stat WHERE storage_id=? AND opaque_id=?", (storage_id, opaque_id)) + row = self.cursor.fetchone() + if row is not None: + return Parse(row['stored_value'], message) + else: + return None + + def __del__(self): + self.clean_old_records() + self.close() diff --git a/jupyter-config/jupyter_cs3_config.json b/jupyter-config/jupyter_cs3_config.json index 7172bc24..d6aa0d19 100644 --- a/jupyter-config/jupyter_cs3_config.json +++ b/jupyter-config/jupyter_cs3_config.json @@ -16,6 +16,9 @@ "locks_expiration_time": 150, "tus_enabled": false, "enable_ocm": false, - "shared_folder": "MyShares" + "shared_folder": "MyShares", + "stat_cache_enabled": true, + "stat_cache_file": "./test.db", + "stat_cache_time": 180 } }