diff --git a/docker/granular/backups.py b/docker/granular/backups.py index eed77ce..e20778a 100644 --- a/docker/granular/backups.py +++ b/docker/granular/backups.py @@ -89,61 +89,55 @@ def build_namespace_path(namespace=configs.default_namespace()): return '%s/%s' % (configs.backups_storage(), namespace) -def build_database_backup_path(backup_id, database, - namespace=configs.default_namespace(), external_backup_storage=None): - if configs.get_encryption(): - return '%s/%s_enc.dump' % ( - build_backup_path(backup_id, namespace, external_backup_storage), database) - else: - return '%s/%s.dump' % ( - build_backup_path(backup_id, namespace, external_backup_storage), database) - +def build_database_backup_path(backup_id, database, + namespace=configs.default_namespace(), external_backup_storage=None, blob_path=None): + ext = '_enc.dump' if configs.get_encryption() else '.dump' + return '%s/%s%s' % ( + build_backup_path(backup_id, namespace, external_backup_storage, blob_path), database, ext) def build_roles_backup_path(backup_id, database, - namespace=configs.default_namespace(), external_backup_storage=None): - if configs.get_encryption(): - return "%s/%s.roles_enc.sql" % ( - build_backup_path(backup_id, namespace, external_backup_storage), database) - else: - return "%s/%s.roles.sql" % ( - build_backup_path(backup_id, namespace, external_backup_storage), database) - + namespace=configs.default_namespace(), external_backup_storage=None, blob_path=None): + ext = 'roles_enc.sql' if configs.get_encryption() else 'roles.sql' + return "%s/%s.%s" % ( + build_backup_path(backup_id, namespace, external_backup_storage, blob_path), database, ext) def build_database_backup_full_path(backup_id, database, storage_root, namespace=configs.default_namespace(), - ): - if configs.get_encryption(): - return '%s/%s/%s/%s_enc.dump' % ( - storage_root, namespace, backup_id, database) + blob_path=None): + ext = '_enc.dump' if configs.get_encryption() else '.dump' + if blob_path is not None: + return '%s/%s/%s%s' % (blob_path, backup_id, database, ext) else: - return '%s/%s/%s/%s.dump' % ( - storage_root, namespace, backup_id, database) + return '%s/%s/%s/%s%s' % (storage_root, namespace, backup_id, database, ext) def build_database_restore_report_path(backup_id, database, restore_tracking_id, namespace=configs.default_namespace()): return '%s/%s.%s.report' % (build_backup_path(backup_id, namespace), database, restore_tracking_id) -def build_backup_path(backup_id, namespace=configs.default_namespace(), external_backup_storage=None): - return '%s/%s/%s' % (configs.backups_storage() if external_backup_storage is None else external_backup_storage, - namespace, backup_id) +def build_backup_path(backup_id, namespace=configs.default_namespace(), external_backup_storage=None, blob_path=None): + if blob_path is not None: + return '%s/%s' % (blob_path, backup_id) + else: + return '%s/%s/%s' % (configs.backups_storage() if external_backup_storage is None else external_backup_storage, + namespace, backup_id) def build_external_backup_root(external_backup_path): return '%s/%s' % (os.getenv("EXTERNAL_STORAGE_ROOT"), external_backup_path) -def build_backup_status_file_path(backup_id, namespace=configs.default_namespace(), external_backup_storage=None): - return '%s/status.json' % build_backup_path(backup_id, namespace, external_backup_storage) +def build_backup_status_file_path(backup_id, namespace=configs.default_namespace(), external_backup_storage=None, blob_path=None): + return '%s/status.json' % build_backup_path(backup_id, namespace, external_backup_storage, blob_path) def build_restore_status_file_path(backup_id, tracking_id, namespace=configs.default_namespace(), - external_backup_storage=None): - return '%s/%s.json' % (build_backup_path(backup_id, namespace, external_backup_storage), tracking_id) + external_backup_storage=None, blob_path=None): + return '%s/%s.json' % (build_backup_path(backup_id, namespace, external_backup_storage, blob_path), tracking_id) -def get_key_name_by_backup_id(backup_id, namespace, external_backup_storage=None): - status_path = build_backup_status_file_path(backup_id, namespace, external_backup_storage) +def get_key_name_by_backup_id(backup_id, namespace, external_backup_storage=None, blob_path=None): + status_path = build_backup_status_file_path(backup_id, namespace, external_backup_storage, blob_path) with open(status_path) as f: data = json.load(f) return data.get("key_name") diff --git a/docker/granular/granular.py b/docker/granular/granular.py index 699adb2..b01cbc5 100644 --- a/docker/granular/granular.py +++ b/docker/granular/granular.py @@ -286,7 +286,8 @@ def __init__(self): 'keep', 'compressionLevel', 'externalBackupPath', - 'storageName'] + 'storageName', + 'blobPath'] def perform_granular_backup(self, backup_request): # # for gke full backup @@ -333,7 +334,7 @@ def perform_granular_backup(self, backup_request): backup_id = backups.generate_backup_id() backup_request['backupId'] = backup_id - worker = pg_backup.PostgreSQLDumpWorker(databases, backup_request) + worker = pg_backup.PostgreSQLDumpWorker(databases, backup_request, backup_request.get('blobPath')) worker.start() @@ -1063,6 +1064,7 @@ class NewBackup(flask_restful.Resource): def __init__(self): self.log = logging.getLogger("NewBackup") self.allowed_fields = ["storageName", "blobPath", "databases"] + self.s3 = storage_s3.AwsS3Vault() if os.environ.get("STORAGE_TYPE") == "s3" else None @staticmethod def get_endpoints(): @@ -1070,6 +1072,9 @@ def get_endpoints(): @auth.login_required def post(self): + if not self.s3: + return "S3 is not configured for backup daemon", http.client.FORBIDDEN + body = request.get_json(silent=True) or {} storage_name = body.get("storageName") blob_path = body.get("blobPath") @@ -1085,7 +1090,7 @@ def post(self): # Reuse old logic directly backup_request = { "databases": list(databases), - "externalBackupPath": blob_path, + "blobPath": blob_path, "storageName": storage_name } resp = GranularBackupRequestEndpoint().perform_granular_backup(backup_request) @@ -1096,6 +1101,8 @@ def post(self): elif isinstance(resp, dict): body, code = resp, http.client.ACCEPTED else: + if code == http.client.BAD_REQUEST: + return resp, http.client.NOT_FOUND if code == http.client.BAD_REQUEST: return resp, http.client.NOT_FOUND return resp @@ -1143,6 +1150,9 @@ def get_endpoints(): @auth.login_required def get(self, backup_id): + if not self.s3: + return "S3 is not configured for backup daemon", http.client.FORBIDDEN + if not backup_id: return "Backup ID is not specified.", http.client.BAD_REQUEST @@ -1151,49 +1161,38 @@ def get(self, backup_id): return "Invalid namespace name: %s." % namespace.encode("utf-8"), http.client.BAD_REQUEST blob_path = normalize_blobPath(request.args.get("blobPath")) - external_backup_path = blob_path - external_backup_root = backups.build_external_backup_root(external_backup_path) if external_backup_path else None + status_path = backups.build_backup_status_file_path(backup_id, blob_path=blob_path) - status_path = backups.build_backup_status_file_path(backup_id, namespace, external_backup_root) - - if self.s3: - try: - raw = json.loads(self.s3.read_object(status_path)) - except Exception: - return "Backup in bucket is not found.", http.client.NOT_FOUND - else: - if not os.path.isfile(status_path): - return "Backup is not found.", http.client.NOT_FOUND - with open(status_path) as f: - raw = json.load(f) + + try: + raw = json.loads(self.s3.read_object(status_path)) + except Exception: + return "Backup in bucket is not found.", http.client.NOT_FOUND - if external_backup_path and not (raw.get("blobPath") or raw.get("externalBackupPath")): - raw["blobPath"] = external_backup_path + if blob_path and not raw.get("blobPath"): + raw["blobPath"] = blob_path return backups.transform_backup_status_v1(raw), http.client.OK @auth.login_required @superuser_authorization def delete(self, backup_id): + if not self.s3: + return "S3 is not configured for backup daemon", http.client.FORBIDDEN + if not backup_id: return {"backupId": backup_id, "message": "Backup ID is not specified", "status": "Failed"}, http.client.BAD_REQUEST req_ns = request.args.get("namespace") - blob_path = normalize_blobPath(request.args.get("blobPath")) - external_backup_path = blob_path or request.args.get("externalBackupPath") - if not external_backup_path: + blob_path = request.args.get("blobPath") + if not blob_path: return {"backupId": backup_id, "message": "blobPath query parameter is required (e.g. ?blobPath=tmp/a/b/c).", "status": "Failed"}, http.client.BAD_REQUEST - external_backup_root = backups.build_external_backup_root(external_backup_path) + blob_path = normalize_blobPath(blob_path) - def _exists(p: str) -> bool: - if self.s3: - try: - return self.s3.is_file_exists(p) - except Exception: - return False - return os.path.isfile(p) + def _exists(p: str) -> bool: + return self.s3.is_file_exists(p) namespace = req_ns if not namespace: @@ -1205,14 +1204,14 @@ def _exists(p: str) -> bool: if "default" not in candidates: candidates.append("default") for cand in candidates: - status_try = backups.build_backup_status_file_path(backup_id, cand, external_backup_root) + status_try = backups.build_backup_status_file_path(backup_id, cand, blob_path=blob_path) if _exists(status_try): namespace = cand break if not namespace: namespace = configs.default_namespace() - status_path = backups.build_backup_status_file_path(backup_id, namespace, external_backup_root) + status_path = backups.build_backup_status_file_path(backup_id, namespace, blob_path=blob_path) existed_before = _exists(status_path) resp = TerminateBackupEndpoint().post(backup_id) @@ -1242,16 +1241,8 @@ def _exists(p: str) -> bool: self.log.info("Terminate response for %s: code=%s body=%s", backup_id, term_code, term_body) try: - target_dir = backups.build_backup_path(backup_id, namespace, external_backup_root) - if self.s3: - self.s3.delete_objects(target_dir) - else: - if os.path.isdir(target_dir): - shutil.rmtree(target_dir, ignore_errors=False) - if external_backup_root is None: - ns_dir = backups.build_namespace_path(namespace) - if os.path.isdir(ns_dir) and not os.listdir(ns_dir) and namespace != "default": - shutil.rmtree(ns_dir, ignore_errors=True) + target_dir = backups.build_backup_path(backup_id, blob_path=blob_path) + self.s3.delete_objects(target_dir) except Exception as e: if not existed_before and term_code == http.client.NOT_FOUND: return {"backupId": backup_id, "message": "Backup is not found.", "status": "Failed"}, http.client.NOT_FOUND @@ -1291,6 +1282,9 @@ def get_endpoints(): @auth.login_required @superuser_authorization def post(self, backup_id): + if not self.s3: + return "S3 is not configured for backup daemon", http.client.FORBIDDEN + body = request.get_json(silent=True) or {} blob_path = body.get("blobPath") pairs = body.get("databases") or [] @@ -1320,32 +1314,21 @@ def post(self, backup_id): if not backups.is_valid_namespace(namespace): return "Invalid namespace name: %s." % namespace.encode("utf-8"), http.client.BAD_REQUEST - external_backup_path = blob_path - external_backup_root = backups.build_external_backup_root(external_backup_path) if external_backup_path else None - backup_details_file = backups.build_backup_status_file_path(backup_id, namespace, external_backup_root) + backup_details_file = backups.build_backup_status_file_path(backup_id, blob_path=blob_path) - if self.s3: - try: - status = self.s3.read_object(backup_details_file) - backup_details = json.loads(status) - except Exception: - return "Backup in bucket is not found.", http.client.NOT_FOUND - else: - if not os.path.isfile(backup_details_file): - return "Backup is not found.", http.client.NOT_FOUND - with open(backup_details_file, "r") as f: - backup_details = json.load(f) + try: + status = self.s3.read_object(backup_details_file) + backup_details = json.loads(status) + except Exception: + return "Backup in bucket is not found.", http.client.NOT_FOUND backup_status = backup_details["status"] if backup_status != backups.BackupStatus.SUCCESSFUL: return "Backup status '%s' is unsuitable status for restore." % backup_status, http.client.FORBIDDEN - if self.s3: - for database in list(backup_details.get("databases", {}).keys()): - if not self.s3.is_file_exists(backups.build_database_backup_path(backup_id, database, namespace, external_backup_root)): - return "Backup in bucket is not found.", http.client.NOT_FOUND - elif not backups.backup_exists(backup_id, namespace, external_backup_root): - return "Backup is not found.", http.client.NOT_FOUND + for database in list(backup_details.get("databases", {}).keys()): + if not self.s3.is_file_exists(backups.build_database_backup_path(backup_id, database, blob_path=blob_path)): + return "Backup in bucket is not found.", http.client.NOT_FOUND ghost_databases = [] uncompleted_backups = [] @@ -1382,8 +1365,8 @@ def post(self, backup_id): if not dry_run: worker = pg_restore.PostgreSQLRestoreWorker( requested, force, - {"backupId": backup_id, "namespace": namespace, "externalBackupPath": external_backup_path, "trackingId": tracking_id}, - databases_mapping, owners_mapping, restore_roles, single_transaction, body.get("dbaasClone") + {"backupId": backup_id, "namespace": namespace, "trackingId": tracking_id}, + databases_mapping, owners_mapping, restore_roles, single_transaction, body.get("dbaasClone"), blob_path ) worker.start() @@ -1394,7 +1377,6 @@ def post(self, backup_id): created_iso = "" storage_name = body.get("storageName") or "" - blob_path = blob_path or external_backup_path or "" dbs_out = [] for prev in (requested or []): @@ -1430,18 +1412,13 @@ def post(self, backup_id): "creationTime": created_iso} for prev in (requested or []) }, "storageName": storage_name, "blobPath": blob_path, - "externalBackupPath": external_backup_path or "", "sourceBackupId": backup_id } if dry_run: return enriched, http.client.OK - try: - status_path = backups.build_restore_status_file_path(backup_id, tracking_id, namespace, - backups.build_external_backup_root(external_backup_path) if external_backup_path else None) - except TypeError: - status_path = backups.build_restore_status_file_path(backup_id, tracking_id, namespace) + status_path = backups.build_restore_status_file_path(backup_id, tracking_id, namespace) try: if hasattr(utils, "write_in_json"): @@ -1472,6 +1449,9 @@ def get_endpoints(): @auth.login_required @superuser_authorization def get(self, restore_id): + if not self.s3: + return "S3 is not configured for backup daemon", http.client.FORBIDDEN + if not restore_id: return "Restore tracking ID is not specified.", http.client.BAD_REQUEST @@ -1485,24 +1465,16 @@ def get(self, restore_id): return "Invalid namespace name: %s." % namespace.encode("utf-8"), http.client.BAD_REQUEST blob_path = normalize_blobPath(request.args.get("blobPath")) - external_backup_path = blob_path - external_backup_root = backups.build_external_backup_root(external_backup_path) if external_backup_path else None storage_name = request.args.get("storageName") or os.environ.get("STORAGE_NAME") - status_path = backups.build_restore_status_file_path(backup_id, restore_id, namespace, external_backup_root) + status_path = backups.build_restore_status_file_path(backup_id, restore_id, blob_path=blob_path) - if self.s3: - try: - raw = json.loads(self.s3.read_object(status_path)) - except Exception: - return "Backup in bucket is not found.", http.client.NOT_FOUND - else: - if not os.path.isfile(status_path): - return "Restore is not found.", http.client.NOT_FOUND - with open(status_path) as f: - raw = json.load(f) + try: + raw = json.loads(self.s3.read_object(status_path)) + except Exception: + return "Backup in bucket is not found.", http.client.NOT_FOUND - if external_backup_path and not (raw.get("blobPath") or raw.get("externalBackupPath")): - raw["blobPath"] = external_backup_path + if blob_path and not raw.get("blobPath"): + raw["blobPath"] = blob_path if storage_name and not raw.get("storageName"): raw["storageName"] = storage_name @@ -1511,6 +1483,9 @@ def get(self, restore_id): @auth.login_required @superuser_authorization def delete(self, restore_id): + if not self.s3: + return "S3 is not configured for backup daemon", http.client.FORBIDDEN + if not restore_id: return {"restoreId": restore_id, "message": "Restore ID is not specified", "status": "Failed"}, http.client.BAD_REQUEST @@ -1545,38 +1520,33 @@ def delete(self, restore_id): "termination": {"code": term_code, "body": term_body} }, http.client.OK - blob_path = normalize_blobPath(request.args.get("blobPath")) - external_backup_path = blob_path or request.args.get("externalBackupPath") - if not external_backup_path: + blob_path = request.args.get("blobPath") + if not blob_path: return { "restoreId": restore_id, "message": "blobPath query parameter is required for cleanup (e.g. ?blobPath=tmp/a/b/c).", "status": "Failed" }, http.client.BAD_REQUEST + blob_path = normalize_blobPath(blob_path) - external_backup_root = backups.build_external_backup_root(external_backup_path) - status_path = backups.build_restore_status_file_path(backup_id, restore_id, namespace, external_backup_root) - backup_base = backups.build_backup_path(backup_id, namespace, external_backup_root) + status_path = backups.build_restore_status_file_path(backup_id, restore_id, blob_path=blob_path) + backup_base = backups.build_backup_path(backup_id, blob_path=blob_path) pattern_name = f"{restore_id}" pattern_glob = os.path.join(backup_base, pattern_name + "*") def _exists_file(p: str) -> bool: - if self.s3: + try: + return self.s3.is_file_exists(p) + except Exception: + return False + + def _prefix_exists() -> bool: + if hasattr(self.s3, "is_prefix_exists"): try: - return self.s3.is_file_exists(p) + return self.s3.is_prefix_exists(os.path.join(backup_base, pattern_name)) except Exception: return False - return os.path.isfile(p) - - def _prefix_exists() -> bool: - if self.s3: - if hasattr(self.s3, "is_prefix_exists"): - try: - return self.s3.is_prefix_exists(os.path.join(backup_base, pattern_name)) - except Exception: - return False - return False - return any(glob.glob(pattern_glob)) + return False existed_status = _exists_file(status_path) existed_prefix = _prefix_exists() @@ -1616,35 +1586,19 @@ def _prefix_exists() -> bool: self.log.info("Terminate response for restore %s: code=%s body=%s", restore_id, term_code, term_body) try: - if self.s3: - prefix = os.path.join(backup_base, pattern_name).rstrip("/") - self.s3.delete_objects(prefix if prefix.endswith("/") else prefix) - else: - for p in glob.glob(pattern_glob): - if os.path.isfile(p): - os.remove(p) - if os.path.isfile(status_path): - try: - os.remove(status_path) - except FileNotFoundError: - pass + prefix = os.path.join(backup_base, pattern_name).rstrip("/") + self.s3.delete_objects(prefix if prefix.endswith("/") else prefix) except Exception as e: self.log.exception("Restore cleanup failed for %s: %s", restore_id, e) return { "restoreId": restore_id, "message": f"Termination attempted; cleanup encountered an error: {e}", - "status": "Successful", + "status": "Failed", "termination": {"code": term_code, "body": term_body} - }, http.client.OK - - if term_code == 404: - return {"restoreId": restore_id, "message": "Restore is not found.", "status": "Failed", - "termination": {"code": term_code, "body": term_body}}, http.client.NOT_FOUND + }, http.client.INTERNAL_SERVER_ERROR if term_code and 200 <= term_code < 300: msg = "Restore terminated successfully. Cleanup completed." - elif term_code == 404: - msg = "No active restore (already finished or not found). Cleanup completed." else: msg = "Termination attempted. Cleanup completed." @@ -1655,7 +1609,11 @@ def normalize_blobPath(blob_path): # Normalize blob_path by removing a single leading and trailing slash if isinstance(blob_path, str): if blob_path.startswith("/"): - blob_path = blob_path[1:] + if not os.getenv("AWS_S3_PREFIX", ""): + blob_path = blob_path[1:] + else: + if os.getenv("AWS_S3_PREFIX", ""): + blob_path = "/" + blob_path if blob_path.endswith("/"): blob_path = blob_path[:-1] return blob_path diff --git a/docker/granular/pg_backup.py b/docker/granular/pg_backup.py index 0c169b0..d5ccb79 100644 --- a/docker/granular/pg_backup.py +++ b/docker/granular/pg_backup.py @@ -32,7 +32,7 @@ class PostgreSQLDumpWorker(Thread): - def __init__(self, databases, backup_request): + def __init__(self, databases, backup_request, blob_path=None): Thread.__init__(self) self.log = logging.getLogger("PostgreSQLDumpWorker") @@ -50,6 +50,7 @@ def __init__(self, databases, backup_request): self.bin_path = configs.get_pgsql_bin_path(self.postgres_version) self.parallel_jobs = configs.get_parallel_jobs() self.databases = databases if databases else [] + self.blob_path = blob_path self.backup_dir = backups.build_backup_path(self.backup_id, self.namespace, self.external_backup_root) self.create_backup_dir() self.s3 = storage_s3.AwsS3Vault() if os.environ['STORAGE_TYPE'] == "s3" else None @@ -116,7 +117,7 @@ def flush_status(self, external_backup_storage=None): if self.s3: try: # upload dumpfile - self.s3.upload_file(path) + self.s3.upload_file(path, self.blob_path, self.backup_id) except Exception as e: raise e @@ -296,9 +297,9 @@ def backup_single_database(self, database): if self.s3: try: # upload dumpfile - self.s3.upload_file(database_backup_path) + self.s3.upload_file(database_backup_path, self.blob_path, self.backup_id) # upload errorfile - self.s3.upload_file(self.stderr_file(database)) + self.s3.upload_file(self.stderr_file(database), self.blob_path, self.backup_id) except Exception as e: raise e @@ -433,7 +434,7 @@ def dump_roles_for_db(self, roles, database): if self.s3: try: logging.info("Streaming {} roles to AWS".format(database)) - self.s3.upload_file(roles_backup_path) + self.s3.upload_file(roles_backup_path, self.blob_path, self.backup_id) except Exception as e: raise e finally: @@ -446,9 +447,9 @@ def cleanup(self, database): os.remove(self.stderr_file(database)) def on_success(self, database): - database_backup_path = backups.build_database_backup_path(self.backup_id, database, self.namespace, self.external_backup_root) + database_backup_path = backups.build_database_backup_path(self.backup_id, database, self.namespace, self.external_backup_root, self.blob_path) - pg_dump_backup_path = backups.build_backup_path(self.backup_id, self.namespace, self.external_backup_root) + pg_dump_backup_path = backups.build_backup_path(self.backup_id, self.namespace, self.external_backup_root, self.blob_path) path_for_parallel_flag_backup = os.path.join(pg_dump_backup_path, database) if self.s3: @@ -463,7 +464,7 @@ def on_success(self, database): size_bytes = os.path.getsize(database_backup_path) self.update_status('path', backups. build_database_backup_full_path( - self.backup_id, database, self.location, self.namespace), database) + self.backup_id, database, self.location, self.namespace, blob_path=self.blob_path), database) self.update_status('sizeBytes', size_bytes, database) self.update_status('size', backups.sizeof_fmt(size_bytes), database) if self.encryption: diff --git a/docker/granular/pg_restore.py b/docker/granular/pg_restore.py index dfa2a52..9743bce 100644 --- a/docker/granular/pg_restore.py +++ b/docker/granular/pg_restore.py @@ -35,7 +35,7 @@ class PostgreSQLRestoreWorker(Thread): - def __init__(self, databases, force, restore_request, databases_mapping, owners_mapping, restore_roles=True, single_transaction=False, dbaas_clone=False): + def __init__(self, databases, force, restore_request, databases_mapping, owners_mapping, restore_roles=True, single_transaction=False, dbaas_clone=False, blobPath=None): Thread.__init__(self) self.log = logging.getLogger("PostgreSQLRestoreWorker") @@ -57,16 +57,16 @@ def __init__(self, databases, force, restore_request, databases_mapping, owners_ self.restore_roles = restore_roles self.postgres_version = utils.get_version_of_pgsql_server() self.location = configs.backups_storage(self.postgres_version) if self.is_standard_storage \ - else backups.build_external_backup_root(restore_request.get('externalBackupPath')) + else backups.build_external_backup_root(restore_request.get('externalBackupPath')) if restore_request.get('externalBackupPath') is not None else blobPath self.external_backup_root = None if self.is_standard_storage else self.location self.databases_mapping = databases_mapping self.owners_mapping = owners_mapping self.bin_path = configs.get_pgsql_bin_path(self.postgres_version) self.parallel_jobs = configs.get_parallel_jobs() self.s3 = storage_s3.AwsS3Vault() if os.environ['STORAGE_TYPE'] == "s3" else None - if self.s3: - self.backup_dir = backups.build_backup_path(self.backup_id, self.namespace, self.external_backup_root) - self.create_backup_dir(self.backup_dir) + self.blob_path = blobPath + self.backup_dir = backups.build_backup_path(self.backup_id, self.namespace, self.external_backup_root) + self.create_backup_dir(self.backup_dir) if configs.get_encryption(): self.encryption = True self.key_name = backups.get_key_name_by_backup_id(self.backup_id, @@ -95,12 +95,12 @@ def log_msg(self, msg): def flush_status(self, external_backup_storage=None): path = backups.build_restore_status_file_path(self.backup_id, self.tracking_id, self.namespace, - external_backup_storage) + external_backup_storage) utils.write_in_json(path, self.status) if self.s3: try: # upload status file - self.s3.upload_file(path) + self.s3.upload_file(path, self.blob_path, self.backup_id) except Exception as e: raise e @@ -171,7 +171,7 @@ def restore_single_database(self, database): self.update_status('status', backups.BackupStatus.IN_PROGRESS, database) self.update_status('source', backups.build_database_backup_full_path( self.backup_id, database, self.location, - self.namespace), database, flush=True) + self.namespace, blob_path=self.blob_path), database, flush=True) if int(self.parallel_jobs) > 1: pg_dump_backup_path = backups.build_backup_path(self.backup_id, self.namespace, self.external_backup_root) @@ -190,14 +190,14 @@ def restore_single_database(self, database): stdout_path = stdout_path + '.stdout' if self.s3: try: - self.s3.download_file(dump_path) + self.s3.download_file(dump_path, self.backup_id, self.blob_path) if self.restore_roles: - self.s3.download_file(roles_backup_path) + self.s3.download_file(roles_backup_path, self.backup_id, self.blob_path) except Exception as e: raise e self.update_status('path', backups.build_database_backup_full_path( - self.backup_id, database, self.location, self.namespace), + self.backup_id, database, self.location, self.namespace, blob_path=self.blob_path), database=database, flush=True) new_bd_name = self.databases_mapping.get(database) or database @@ -523,7 +523,7 @@ def process_restore_request(self): self.update_status('started', str(datetime.datetime.fromtimestamp(start_timestamp).isoformat()), flush=True) backup_status_file = backups.build_backup_status_file_path(self.backup_id, - self.namespace, self.external_backup_root) + self.namespace, self.external_backup_root, self.blob_path) if self.s3: try: status = self.s3.read_object(backup_status_file) @@ -552,10 +552,10 @@ def process_restore_request(self): for database in self.databases: if self.s3: is_backup_exist = self.s3.is_file_exists(backups.build_database_backup_path(self.backup_id, database, - self.namespace, self.external_backup_root)) + self.namespace, self.external_backup_root, self.blob_path)) else: if int(self.parallel_jobs) > 1: - pg_dump_backup_path = backups.build_backup_path(self.backup_id, self.namespace, self.external_backup_root) + pg_dump_backup_path = backups.build_backup_path(self.backup_id, self.namespace, self.external_backup_root, self.blob_path) path_for_parallel_flag_backup = os.path.join(pg_dump_backup_path, database) is_backup_exist = os.path.exists(path_for_parallel_flag_backup) else: diff --git a/docker/granular/storage_s3.py b/docker/granular/storage_s3.py index 7bee12b..e273b68 100644 --- a/docker/granular/storage_s3.py +++ b/docker/granular/storage_s3.py @@ -61,8 +61,13 @@ def get_s3_client(self): verify=(False if os.getenv("AWS_S3_UNTRUSTED_CERT", "false").lower() == "true" else None)) @retry(stop_max_attempt_number=RETRY_COUNT, wait_fixed=RETRY_WAIT) - def upload_file(self, file_path): - return self.get_s3_client().upload_file(file_path, self.bucket, self.aws_prefix + file_path) + def upload_file(self, file_path, blob_path=None, backup_id=None): + if blob_path: + file_name = file_path.rsplit('/',1)[1] + s3FilePath = self.aws_prefix + f'{blob_path}/{backup_id}/{file_name}' + return self.get_s3_client().upload_file(file_path, self.bucket, s3FilePath) + else: + return self.get_s3_client().upload_file(file_path, self.bucket, self.aws_prefix + file_path) @retry(stop_max_attempt_number=RETRY_COUNT, wait_fixed=RETRY_WAIT) def delete_file(self, filename): @@ -91,10 +96,17 @@ def get_file_size(self, file_path): self.__log.info("Requested {} file not found".format(file_path)) @retry(stop_max_attempt_number=RETRY_COUNT, wait_fixed=RETRY_WAIT) - def download_file(self, filename): - logging.info("Downloading file {}" .format(self.aws_prefix + filename)) + def download_file(self, filename, backup_id, blob_path=None): try: - self.get_s3_client().download_file(self.bucket, self.aws_prefix + filename, filename) + if blob_path: + only_file_name = filename.rsplit('/',1)[1] + file_path = f'{blob_path}/{backup_id}/{only_file_name}' + logging.info(f'Downloading file {file_path} to {filename}') + + self.get_s3_client().download_file(self.bucket, self.aws_prefix + file_path, filename) + else: + logging.info("Downloading file {}" .format(self.aws_prefix + filename)) + self.get_s3_client().download_file(self.bucket, self.aws_prefix + filename, filename) except Exception as e: raise e return