diff --git a/simplyblock_core/controllers/lvol_controller.py b/simplyblock_core/controllers/lvol_controller.py index be8c4fc55..f43ccaea3 100644 --- a/simplyblock_core/controllers/lvol_controller.py +++ b/simplyblock_core/controllers/lvol_controller.py @@ -866,6 +866,10 @@ def delete_lvol_from_node(lvol_id, node_id, clear_data=True, del_async=False): except KeyError: return True + if lvol.frozen: + logger.warning(f"lvol in migration. cannot delete lvol {lvol.uuid}") + return False + logger.info(f"Deleting LVol:{lvol.get_id()} from node:{snode.get_id()}") rpc_client = RPCClient(snode.mgmt_ip, snode.rpc_port, snode.rpc_username, snode.rpc_password, timeout=5, retry=2) diff --git a/simplyblock_core/controllers/lvol_migration_controller.py b/simplyblock_core/controllers/lvol_migration_controller.py new file mode 100644 index 000000000..cc0c100d3 --- /dev/null +++ b/simplyblock_core/controllers/lvol_migration_controller.py @@ -0,0 +1,638 @@ +import logging +from logging import exception +from time import sleep + +from jc.parsers.asn1crypto.core import Boolean + +from ..cluster_ops import db_controller +from ..models.lvol_migration import * +from dataclasses import dataclass +from typing import Optional, Dict +from simplyblock_core.storage_node_ops import * +from simplyblock_core.db_controller import * +from simplyblock_core.models.lvol_model import LVol +from simplyblock_core.models.storage_node import StorageNode +from simplyblock_core.models.lvol_migration import Snapshot +from simplyblock_core.models.snapshot import SnapShot +from datetime import datetime + +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) +import uuid +import copy + + + +#TODOS: Integrate in Task Mgmt +# Asynchronous delete of objects must check results before sync delete and cleanup is ready +# must reconnect rpc clients after node restart +# double-check all object states +# we must convert with previous + +# --------------------------------------------------------------------------- +# Migration Service +# --------------------------------------------------------------------------- + +def generate_nqn(): + random_uuid = str(uuid.uuid4()) + nqn = f"nqn.2024-01.io.simplyblock:tmp:{random_uuid}" + return nqn + +class MigrationQueueObjectType: + SNAPSHOT = "snapshot" + CLONE = "clone" + LVOL = "lvol" + + +@dataclass +class MigrationQueueObject: + obj: object + type: str + status: ObjectMigrationState = ObjectMigrationState.NEW + retries: int = 0 + last_offset: Optional[int] = None # For snapshot continuation + + +class MigrationQueue: + """Queue holding migration objects for a single LVOL.""" + + def __init__(self): + self.objects: List[MigrationQueueObject] = [] + + def add(self, obj, obj_type, status=ObjectMigrationState.NEW): + mqo = MigrationQueueObject(obj=obj, type=obj_type, status=status) + self.objects.append(mqo) + return mqo + + def reset(self): + self.objects.clear() + + + +class MigrationService: + """Service containing core migration logic.""" + + MAX_RETRIES = 3 + RETRY_DELAY = 5 # seconds, can be increased exponentially + + + +# --------------------------------------------------------------------------- +# Migration Controller +# --------------------------------------------------------------------------- + +class MigrationController: + """Controller orchestrates LVOL migrations.""" + + + def __init__(self): + self._stop_event = threading.Event() + self.lock = threading.Lock() + self.m: MigrationObject = MigrationObject() + self.db_controller = DBController() + self.prev_time = datetime.now() + + #connect clients for both primary (source) and secondary (target) nodes + def connect_clients(self): + try: + self.m.rpc_client1 = self.connect_client(self.m.node_pri) + self.m.rpc_client2 = self.connect_client(self.m.node_sec) + self.m.rpc_client3 = self.connect_client(self.m.target_node_pri) + self.m.rpc_client4 = self.connect_client(self.m.target_node_sec) + except: + raise f"migration {self.m.uuid}: cannot create rpc client for all nodes. all nodes online?" + return + + def get_rpc_client(self, node: StorageNode): + if node.uuid == self.m.node_pri.uuid: + client = self.m.rpc_client1 + elif node.uuid == self.m.target_node_pri.uuid: + client = self.m.rpc_client3 + elif node.uuid == self.m.node_sec.uuid: + client = self.m.rpc_client2 + elif node.uuid == self.m.target_node_sec.uuid: + client = self.m.rpc_client4 + else: + raise RuntimeError(f"migration {self.m.uuid}: invalid node {node.uuid}, stopping. ") + if not client or node.status != StorageNode.STATUS_ONLINE: + raise RuntimeError(f"migration {self.m.uuid}: node {node.uuid} not online, stopping. ") + return client + + def snap_assign(self, lvol: LogicalVolumeRef, snap: SnapShot): + s = Snapshot() + s.lvol=lvol + s.snap=snap + return s + + def lvol_assign(self, lvol: LVol): + m = LogicalVolumeRef() + m.lvol = lvol + return m + + def check_nodes_online(self): + if self.m.node_pri.status == StorageNode.STATUS_ONLINE and self.m.node_sec.status == StorageNode.STATUS_ONLINE and self.m.target_node_pri.status == StorageNode.STATUS_ONLINE and self.m.target_node_sec.status == StorageNode.STATUS_ONLINE: + return True + return False + + def raise_exception_on_error(self, ret: dict, err_str: str): + error="object not found" + if not ret or "error" in ret: + if ret: + error = f"{ret['error']['message']}:{ret['error']['code']}" + raise RuntimeError( + f"migration {self.m.uuid}:" + err_str + f": {error}") + return True + + def get_transfer_state(self, node: StorageNode, counter: int): + client = self.get_rpc_client(node) + for m in self.m.completion_poll_queue: + if m.status==ObjectMigrationState.TRANSFER: + try: + name=m.snap.lvol.lvs_name+"/"+m.snap.snap_bdev + ret = client.bdev_lvol_transfer_stat(name) + self.raise_exception_on_error(ret, f"could not get transfer state for lvol: {name}") + if ret["transfer_state"]=="Done": + m.status=ObjectMigrationState.TRANSFERRED + self.m.write_to_db(db_controller.kv_store) + self.m.completion_poll_queue.remove(m) + return True, 0 + else: + return False, ret["offset"] + except: + logger.error(f"could not get transfer state for lvol") + return False, 0 + return False, 0 + + def create_snapshot(self, node: StorageNode, index: int): + client = self.get_rpc_client(node) + ret=client.lvol_exists(node.lvstore,"mig_snap_"+str(index)+"_"+self.m.vol.lvol.lvol_name) + if not ret or "error" in ret: + ret=client.lvol_create_snapshot(self.m.vol.lvol.lvol_uuid, "mig_snap_"+str(index)+"_"+self.m.vol.lvol.lvol_name) + self.raise_exception_on_error(ret, f"could not create snapshot for lvol: {self.m.vol.lvol.uuid}") + for sn in self.m.snapshots: + if sn.snap.uuid==ret["result"]: + return True + s=self.snap_assign(self.m.vol,ret["result"]) + self.m.snapshots.append(s) + return True + + def migrations_list(self): + self.db_controller = DBController() + migrations = db_controller.get_migrations() + data = [] + for m in migrations: + logger.debug(m) + data.append({ + "UUID": m.uuid, + "Lvol UUID": m.vol.lvol.uuid, + "Primary (source):": m.node_pri, + "Primary (target):": m.target_node_pri, + "DateTime:": m.create_dt, + "Status": m.status, + }) + return utils.print_table(data) + + @staticmethod + def connect_client(node:StorageNode): + return RPCClient(node.mgmt_ip, node.rpc_port, node.rpc_username, node.rpc_password, timeout=3, retry=1) + + def unfreeze_objects(self): + self.db_controller = DBController() + l = db_controller.get_lvol_by_id(self.m.vol.lvol.uuid) + l.frozen = False + l.write_to_db(db_controller.kv_store) + snaps = db_controller.get_snapshots_by_node_id(self.m.node_pri) + for s in snaps: + s.frozen = False + s.write_to_db(db_controller.kv_store) + snaps = db_controller.get_snapshots_by_node_id(self.m.node_sec) + for s in snaps: + s.frozen = False + s.write_to_db(db_controller.kv_store) + return True + + def complete_snapshot_migration(self): + tr=db_controller.kv_store.create_transaction() + #snapshot objects are always create new, while lvols are really migrated + for s in self.m.snapshots: + if s.status==ObjectMigrationState.DONE: + snapshot = copy.copy(s.snap) + snapshot.uuid = str(uuid.uuid4()) + snapshot.snap_uuid = s.target_uuid + snapshot.node_id=self.m.node_pri.get_id() + snapshot.write_to_db(db_controller.kv_store,tr) + + lvol = copy.copy(self.m.vol.lvol) + lvol.node_id=self.m.node_pri.get_id() + lvol.lvol_bdev=self.m.vol.lvol.lvol_bdev + lvol.blobid=self.m.vol.lvol.blobid + lvol.lvol_uuid=self.m.vol.lvol.lvol_uuid + lvol.lvs_name=self.m.vol.lvol.lvs_name + lvol.write_to_db(db_controller.kv_store,tr) + try: + tr.commit.wait() + except: + raise RuntimeError(f"migration {self.m.uuid}: error updating snapshots and volumes in db.") + return True + + def create_lvol(self, node: StorageNode, snap: Snapshot): + client = self.get_rpc_client(node) + name = node.lvstore + "/" + snap.snap.snap_bdev + snap_uuid = client.lvol_exists(node.lvstore,node.lvstore+"/"+snap.snap.snap_bdev) + if not snap_uuid or "error" in snap_uuid: + snap_uuid = client.create_lvol(name, snap.snap.size, self.m.target_node_pri.lvstore, + self.m.vol.lvol.lvol_priority_class, + self.m.vol.lvol.ndcs, + self.m.vol.lvol.npcs) + self.raise_exception_on_error(snap_uuid,f"could not create lvol on target: {snap.snap.uuid}") + snap.target_uuid = snap_uuid["result"] + return True + + def set_mig_status(self, node: StorageNode, snap: Snapshot): + client = self.get_rpc_client(node) + name = self.m.target_node_pri.lvstore + "/" + snap.snap.snap_bdev + ret=client.bdev_lvol_set_migration_flag(name) + self.raise_exception_on_error(ret, f"issue creating an target object during migration of snapshot {snap.uuid}") + snap.status = ObjectMigrationState.MIG_FLAG_SET + self.m.write_to_db(self.db_controller.kv_store) + return True + + def export_lvol(self, node: StorageNode, nqn: str, s: Snapshot, anaState: str, namespaces: int, serial: str, model: str): + client = self.get_rpc_client(node) + #check if subsystem exists, namespace is added and listener exists + #nqn=generate_nqn() + ss,listener,ns=client.find_subsystem_by_nqn(nqn) + if not ss: + ret=client.subsystem_create(nqn,serial, model, 1, namespaces) + self.raise_exception_on_error(ret, f"could not list subsystem for lvol: {s.snap.uuid}") + if not ns: + ret=client.nvmf_subsystem_add_ns(s.temporary_nqn,s.snap.lvol.lvs_name+"/"+s.snap.snap_bdev) + self.raise_exception_on_error(ret,f"could not list subsystem for lvol: {s.snap.uuid} ") + if not listener: + if self.m.target_node_pri.active_rdma: + fabric="RDMA" + else: + fabric="TCP" + ret=client.nvmf_subsystem_add_listener(s.temporary_nqn, fabric,self.m.target_node_pri.nvmf_port, + self.m.target_node_pri.hostname, anaState) + self.raise_exception_on_error(ret, f"could not list subsystem for lvol: {s.snap.uuid}") + return True + + #delete subystem only, if there is only zero or one namespaces left; + #if one namespace is left, it must match the volume + def delete_subsystem(self, node: StorageNode, nqn:str, lvol: LVol): + client=self.get_rpc_client(node) + data=client.subsystem_list(nqn) + if not data: + return False + ret = None + for subsystem in data['result']: + # Check if the subsystem has namespaces + namespaces = subsystem.get('namespaces', None) + if not namespaces or len(namespaces<2): + ret=client.subsystem_delete(nqn) + self.raise_exception_on_error(data, f"could not delete subsystem: {nqn} for lvol: {lvol.uuid}") + elif len(namespaces>1): + client.nvmf_subsystem_remove_ns(nqn,lvol.namespace) + return True + + def connect_lvol(self, node: StorageNode, s: Snapshot): + client = self.get_rpc_client(node) + if node.active_rdma: + transport="RDMA" + else: + transport="TCP" + ret=client.nvmf_get_subsystems() + subsystem = None + if ret and not "error" in ret: + subsystem = next((s for s in ret["result"] if s["nqn"] == s.temporary_nqn), None) + attach=True + if subsystem: + attach=False + first_namespace_name = subsystem.get("namespaces", [{}])[0].get("name") + if first_namespace_name == None: + client.bdev_nvme_detach_controller(s.snap.snap_bdev) + self.raise_exception_on_error(ret, f"could not remove remote controller: {s.snap.uuid}") + attach=True + if attach: + ret = client.bdev_nvme_attach_controller(s.snap.snap_bdev,s.temporary_nqn,node.hostname,node.nvmf_port,transport) + self.raise_exception_on_error(ret, f"could not connect lvol: {s.snap.uuid}") + s.controller = ret[0] + return True + + def delete_lvol_from_node(self, node: StorageNode, oid: str, deleteType: bool): + client=self.get_rpc_client(node) + lvol=db_controller.get_lvol_by_id(oid) + if lvol: + ret=client.delete_lvol(lvol.lvs_name+"/"+lvol.lvol_name, deleteType) + else: + snap=db_controller.get_snapshot_by_id(oid) + ret=client.delete_lvol(snap.lvol.lvs_name + "/" + snap.lvol.lvol_name, deleteType) + self.raise_exception_on_error(ret, f"could not delete snapshot/lvol: {oid} ") + return + + def transfer_data(self, node: StorageNode, snap: Snapshot, offset: int): + try: + client = self.get_rpc_client(node) + ret=client.bdev_lvol_transfer(snap.snap.lvol.lvs_name+"/"+snap.snap.snap_bdev,offset,4,snap.controller, "migrate") + self.raise_exception_on_error(ret, f"could not transfer data: {snap.snap.uuid} ") + except Exception as e: + logger.error(e) + return False + return True + + def convert_lvol(self, s: Snapshot): + client=self.get_rpc_client(self.m.target_node_pri) + ret=client.bdev_lvol_convert(s.snap.lvol.lvs_name+"/"+s.snap.snap_bdev) + if ret and "exists" in ret: + return True + self.raise_exception_on_error(ret, f"could not convert lvol to snapshot: {s.snap.uuid} to remote subsystem:") + return True + + def time_difference(self): + return (datetime.now()-self.prev_time).total_seconds() + + def create_target_lvol(self, s: Snapshot): + client = self.get_rpc_client(self.m.target_node_pri) + ret=client.create_lvol(s.snap.snap_bdev,s.snap.size,self.m.target_node_pri.lvstore,self.m.vol.lvol.lvol_priority_class,self.m.vol.lvol.ndcs,self.m.vol.lvol.npcs) + self.raise_exception_on_error(ret, f"could not create target lvol for snapshot:{s.snap.uuid}") + return True + + def create_target_lvol2(self, node: StorageNode, l: LogicalVolumeRef): + client = self.get_rpc_client(node) + if l.lvol.crypto_bdev != "": + client.lvol_crypto_create(l.lvol.crypto_bdev,l.lvol.lvol_bdev,l.lvol.crypto_key_name) + ret = client.create_lvol(l.lvol.lvol_bdev, l.lvol.size, node.lvstore, l.lvol.lvol_priority_class, l.lvol.ndcs, l.lvol.npcs) + ret=client.create_lvol(l.lvol.lvol_bdev,l.lvol.size,node.lvstore,l.lvol.lvol_priority_class,l.lvol.ndcs,l.lvol.npcs) + self.raise_exception_on_error(ret, f"could not create target lvol for main lvol:{l.lvol.uuid}") + return True + + def connect_hublvol(self, node: StorageNode): + client = self.get_rpc_client(node) + if node.active_rdma: + fabric="RDMA" + else: + fabric="TCP" + + ret=client.bdev_nvme_controller_list("migratelvol") + if not ret: + ret=client.bdev_nvme_attach_controller("migratelvol",node.hublvol,node.hostname,node.nvmf_port,fabric) + self.raise_exception_on_error(ret, f"could not attach controller for {self.m.vol.lvol.uuid} for hublvol") + + return True + + def transfer_data_final(self): + client1 = self.get_rpc_client(self.m.node_pri) + client2 = self.get_rpc_client(self.m.target_node_sec) + client3 = self.get_rpc_client(self.m.target_node_pri) + uuid, map_id = client3.lvol_exists(self.m.target_node_pri,self.m.vol) + if not uuid: + self.create_target_lvol2(self.m.target_node_pri,self.m.vol) + uuid1, _ = client2.lvol_exists(self.m.target_node_sec, self.m.vol) + if not uuid1: + ret=client2.bdev_lvol_register(self.m.vol.lvol.lvol_bdev,self.m.target_node_sec.lvstore, self.m.vol.lvol.blobid, self.m.vol.lvol.lvol_uuid) + self.raise_exception_on_error(ret, f"could not register on secondary {self.m.vol.lvol.uuid}") + + self.connect_hublvol(self.m.node_pri) + + uuid, map_id = client3.lvol_exists(self.m.target_node_pri.lvstore,self.m.vol.lvol.lvol_bdev) + if not uuid or not map_id: + raise RuntimeError( + f"migration {self.m.uuid}: could not get mapid of volume: {self.m.vol.lvol.uuid}") + last_snap_uuid = (self.m.snapshots)[-1].snap.snap_uuid + ret = client1.bdev_lvol_final_migration(self.m.vol.lvol.lvol_bdev,map_id, + last_snap_uuid,4,self.m.target_node_pri.hublvol.nqn) + self.raise_exception_on_error(ret, f"could not initiate final lvol migration: {self.m.vol.lvol.uuid}") + return True + + def delete_hublvol_controller(self): + return + + def reconnect_subsystems(self): + + #if "error" in ret: + # raise f"migration {self.m.uuid}: could not convert lvol to snapshot: {s.uuid} to remote subsystem: {ret["error"]["message"]}:{ret["error"]["code"]}" + return + + def cleanup_migration(self, status: bool): + db_controller = DBController() + real_snapshots = db_controller.get_snapshots() + self.unfreeze_objects() + #Migration was not successful + try: + if self.m.status >= MigrationState.HUBLVOL_CONNECTED: + self.delete_hublvol_controller() + if not status: + pri_node=self.m.node_pri + sec_node=self.m.node_sec + else: + pri_node = self.m.target_node_pri + sec_node = self.m.target_node_sec + + if (self.m.status >= MigrationState.TARGET_LVOL_CREATED and not status) or self.m.status == MigrationState.DONE: + self.delete_subsystem(pri_node, self.m.vol.lvol.nqn, self.m.vol.lvol) + self.delete_subsystem(sec_node, self.m.vol.lvol.uuid, ) + self.delete_lvol_from_node(pri_node, self.m.vol.lvol.uuid, True) + self.(sec_node, self.m.vol.lvol.uuid) + + snaps = self.m.snapshots + snaps.reverse() + for sn in snaps: + if sn.snap.uuid: + rsn = db_controller.get_snapshot_by_id(sn.snap.uuid) + if len(rsn.successor)==1: + + + + self.delete_lvol_from_node(pri_node, sn.snap.uuid, True) + self.delete_subsystem(pri_node,sn.snap.uuid) + self.delete_lvol_from_node(sec_node, sn.snap.uuid) + else: + break + except: + raise f"cleanup of migration not successful, will try later {self.m.uuid}" + return True + + def migrate_final_lvol(self): + try: + if self.m.status==MigrationState.SNAPS_MIGRATED: + self.transfer_data_final() + elif self.m.status==MigrationState.TARGET_LVOL_CREATED: + self.connect_hublvol() + elif self.m.status==MigrationState.HUBLVOL_CONNECTED: + self.transfer_data_final() + elif self.m.status==MigrationState.TRANSFERRED_TO_TARGET: + self.reconnect_subsystems() + elif self.m.status == MigrationState.RECONNECT_DONE: + self.cleanup_migration(True) + except: + raise f"cannot transfer to target: {self.m.vol.lvol.uuid}" + return True + + def migrate_snaps(self): + if self.m.status==MigrationState.RUNNING: + try: + all_snaps_done = True + p="" + for s in self.m.snapshots: + if s.status is not ObjectMigrationState.DONE: + all_snaps_done = False + if s.status in ObjectMigrationState.NEW: + self.create_target_lvol(s) + elif s.status in ObjectMigrationState.LVOL_CREATED: + self.set_mig_status(self.m.target_node_pri,s) + elif s.status in ObjectMigrationState.MIG_FLAG_SET: + self.export_lvol(s) + elif s.status in ObjectMigrationState.LVOL_EXPORTED: + self.connect_lvol(s) + elif s.status in ObjectMigrationState.LVOL_CONNECTED: + self.transfer_data(s, 0) + elif s.status==ObjectMigrationState.TRANSFERRED: + self.convert_lvol(s,p) + elif s.status == ObjectMigrationState.CONVERTED: + self.delete_subsystem(self.m.target_node_pri,s.snap.uuid) + elif s.status == ObjectMigrationState.CLEANING: + self.delete_lvol_from_node(self.m.target_node_sec, s.snap.uuid) + p=s + if self.m.rerun < 3 or self.time_difference()>5: + ret, snap_uuid=self.create_snapshot(self.m.vol) + sn=self.snap_assign(self.m.vol,snap_uuid) + self.m.snapshots.append(sn) + self.prev_time=datetime.now() + self.migrate_snaps() + elif all_snaps_done: + self.m.status = MigrationState.SNAPS_MIGRATED + self.m.write_to_db(self.db_controller.kv_store) + self.migrate_final_lvol() + except: + self.m.pre_status = self.m.status + self.m.status = MigrationState.FAILED + self.cleanup_migration(False) + return True + + def lvol_migrate(self, lvol: LVol, target_node: StorageNode, m: MigrationObject=None): + """Migrate a logical volume and its snapshots/clones.""" + + # if this Migration Object does not exist (first call to lvol_migrate): + if not m: + try: + self.m = MigrationObject() + self.m.uuid = str(uuid.uuid4()) + self.m.create_dt = str(datetime.datetime) + self.m.status = MigrationState.NEW + self.m.write_to_db(self.db_controller.kv_store) + except: + return False #not even in database, lvol_migrate call must be repeated + else: + self.m = m + + # freeze lvols and snapshots during migration + try: + lvol.frozen = True + lvol.write_to_db(self.db_controller.kv_store) + + # copy now all data from the lvol to the migration lvol (temporary object for lvol during migration) + + self.m.node_pri = StorageNode(self.db_controller.get_storage_node_by_id(lvol.node_id)) + self.m.node_sec = self.db_controller.get_storage_node_by_id(self.m.node_pri.secondary_node_id) + self.m.target_node_pri = target_node + self.m.target_node_sec = self.db_controller.get_storage_node_by_id(self.m.target_node_pri.secondary_node_id) + + self.m.vol = self.lvol_assign(lvol) + + # get all 4 storage node objects: primary, secondary source and target + + # create rpc clients for both primaries: + self.connect_clients() + + # now we create a chain of snapshots from all snapshots taken from this lvol + snapshots = self.db_controller.get_snapshots() + snapshots.sort(key=lambda s: s.created_at) + self.m.snapshots = [] + sr = None + for s in snapshots: + if s.lvol.uuid == self.m.vol.lvol.uuid: + s.frozen = True + # need to reset that one on node restart + s.write_to_db(self.db_controller.kv_store) + sr = self.snap_assign(self.m.vol, s) + self.m.snapshots.append(sr) + except: + return True + self.m.status=MigrationState.RUNNING + self.m.write_to_db(self.db_controller.kv_store) + self.migrate_snaps() + return True + + if self.check_nodes_online(): + self.m.status = MigrationState.RUNNING + self.m.write_to_db(self.db_controller.kv_store) + self.migrate_snaps() + return True + else: + logger.warning(f"Not all nodes online. Suspending lvol life migration {lvol.uuid}") + self.m.write_to_db(self.db_controller.kv_store) + return -1 + + def check_status_migration(self, on_restart: bool): + while True: + sleep(10) + try: + migrations=self.db_controller.get_migrations() + for m in migrations: + if m.status!=MigrationState.DONE and m.status!=MigrationState.FAILED: + if self.check_nodes_online(): + if m.status==MigrationState.NEW: + self.lvol_migrate(m.vol.lvol,m.node_pri,m) + elif m.status==MigrationState.RUNNING: + for q in m.completion_poll_queue: + m.completion_poll_queue.remove(q) + if q.status==ObjectMigrationState.TRANSFER: + result, offset = self.get_transfer_state(self.m.node_pri,q.retry) + if not result: + if q.retry > 5: + raise f"could not transfer snapshot. max retries. name: {q.snap.lvol.lvs_name + "/" + q.snap.snap_bdev}. uuid: {q.snap.uuid}" + q.retry += 1 + self.transfer_data(self.m.node_pri,q,offset) + m.completion_poll_queue.append(q) + self.migrate_snaps() + else: + self.migrate_final_lvol() + except: + logger.error(f"migration controller exception. Migration failed: {self.m.uuid} ") + self.m.status=MigrationState.FAILED + self.cleanup_migration(False) + return False + return True + + migrate_lock = threading.Lock() + + def add_new_migration(self, lvol, target_node: StorageNode): + with self.migrate_lock: + try: + migrations = self.db_controller.get_migrations() + for m in migrations: + if lvol.node_id==m.vol.lvol.node_id and (m.status!=MigrationState.DONE or m.status!=MigrationState.FAILED_AND_CLEANED): + raise exception("cannot add migration - ongoing migration") + self.lvol_migrate(lvol, target_node) + except: + logger.error(f"could not add lvol {lvol.uuid} for migration as another migration is currently running.") + return False + return self.lvol_migrate(lvol,target_node) + + def start_service(self, on_restart=False): + """ + Starts the migration checker in a background thread. + """ + self._thread = threading.Thread( + target=self.check_status_migration, args=(on_restart,), daemon=True + ) + self._thread.start() + + def stop_service(self): + """ + Stops the background service gracefully. + """ + self._stop_event.set() + self._thread.join() + diff --git a/simplyblock_core/controllers/snapshot_controller.py b/simplyblock_core/controllers/snapshot_controller.py index d3eca0e00..2b5880e38 100644 --- a/simplyblock_core/controllers/snapshot_controller.py +++ b/simplyblock_core/controllers/snapshot_controller.py @@ -8,11 +8,25 @@ from simplyblock_core import utils, constants from simplyblock_core.db_controller import DBController from simplyblock_core.models.pool import Pool -from simplyblock_core.models.snapshot import SnapShot +from simplyblock_core.models.snapshot import SnapShot, SnapshotRef from simplyblock_core.models.lvol_model import LVol from simplyblock_core.models.storage_node import StorageNode from simplyblock_core.rpc_client import RPCClient +import threading +from collections import defaultdict + +from simplyblock_core.services.lvol_stat_collector import rpc_client + +# A dictionary to hold locks per node +node_locks = defaultdict(threading.Lock) +node_locks_global_lock = threading.Lock() # protects the node_locks dict + +def get_node_lock(node_id): + # Ensure thread-safe creation of locks + with node_locks_global_lock: + return node_locks[node_id] + logger = lg.getLogger() @@ -20,19 +34,28 @@ def add(lvol_id, snapshot_name): - try: + + + try: lvol = db_controller.get_lvol_by_id(lvol_id) - except KeyError as e: + except KeyError as e: logger.error(e) return False, str(e) - pool = db_controller.get_pool_by_id(lvol.pool_uuid) - if pool.status == Pool.STATUS_INACTIVE: + if lvol.frozen: + logger.warning(f"Lvol in migration: Cannot create snapshot from lvol {lvol.uuid} ") + return False + + node_lock = get_node_lock(lvol.node_id) + with node_lock: + try: + pool = db_controller.get_pool_by_id(lvol.pool_uuid) + if pool.status == Pool.STATUS_INACTIVE: msg = "Pool is disabled" logger.error(msg) return False, msg - if lvol.cloned_from_snap: + if lvol.cloned_from_snap: snap = db_controller.get_snapshot_by_id(lvol.cloned_from_snap) ref_count = snap.ref_count if snap.snap_ref_id: @@ -44,54 +67,57 @@ def add(lvol_id, snapshot_name): logger.error(msg) return False, msg - for sn in db_controller.get_snapshots(): + for sn in db_controller.get_snapshots(): if sn.cluster_id == pool.cluster_id: if sn.snap_name == snapshot_name: return False, f"Snapshot name must be unique: {snapshot_name}" - logger.info(f"Creating snapshot: {snapshot_name} from LVol: {lvol.get_id()}") - snode = db_controller.get_storage_node_by_id(lvol.node_id) - - rec = db_controller.get_lvol_stats(lvol, 1) - if rec: + logger.info(f"Creating snapshot: {snapshot_name} from LVol: {lvol.get_id()}") + snode = db_controller.get_storage_node_by_id(lvol.node_id) + rec = db_controller.get_lvol_stats(lvol, 1) + if rec: size = rec[0].size_used - else: + else: size = lvol.size - if 0 < pool.lvol_max_size < size: + if 0 < pool.lvol_max_size < size: msg = f"Pool Max LVol size is: {utils.humanbytes(pool.lvol_max_size)}, LVol size: {utils.humanbytes(size)} must be below this limit" logger.error(msg) return False, msg - if pool.pool_max_size > 0: + if pool.pool_max_size > 0: total = pool_controller.get_pool_total_capacity(pool.get_id()) if total + size > pool.pool_max_size: msg = f"Invalid LVol size: {utils.humanbytes(size)}. pool max size has reached {utils.humanbytes(total+size)} of {utils.humanbytes(pool.pool_max_size)}" logger.error(msg) return False, msg - if pool.pool_max_size > 0: + if pool.pool_max_size > 0: total = pool_controller.get_pool_total_capacity(pool.get_id()) if total + lvol.size > pool.pool_max_size: msg = f"Pool max size has reached {utils.humanbytes(total)} of {utils.humanbytes(pool.pool_max_size)}" logger.error(msg) return False, msg - cluster = db_controller.get_cluster_by_id(pool.cluster_id) - if cluster.status not in [cluster.STATUS_ACTIVE, cluster.STATUS_DEGRADED]: + cluster = db_controller.get_cluster_by_id(pool.cluster_id) + if cluster.status not in [cluster.STATUS_ACTIVE, cluster.STATUS_DEGRADED]: return False, f"Cluster is not active, status: {cluster.status}" - snap_vuid = utils.get_random_snapshot_vuid() - snap_bdev_name = f"SNAP_{snap_vuid}" - size = lvol.size - blobid = 0 - snap_uuid = "" - used_size = 0 + snap_vuid = utils.get_random_snapshot_vuid() + snap_bdev_name = f"SNAP_{snap_vuid}" + size = lvol.size + blobid = 0 + snap_uuid = "" + used_size = 0 + node_id = lvol.node_id + - if lvol.ha_type == "single": + + if lvol.ha_type == "single": if snode.status == StorageNode.STATUS_ONLINE: rpc_client = RPCClient(snode.mgmt_ip, snode.rpc_port, snode.rpc_username, snode.rpc_password) logger.info("Creating Snapshot bdev") + ret = rpc_client.lvol_create_snapshot(f"{lvol.lvs_name}/{lvol.lvol_bdev}", snap_bdev_name) if not ret: return False, f"Failed to create snapshot on node: {snode.get_id()}" @@ -108,7 +134,7 @@ def add(lvol_id, snapshot_name): logger.error(msg) return False, msg - if lvol.ha_type == "ha": + if lvol.ha_type == "ha": primary_node = None secondary_node = None host_node = db_controller.get_storage_node_by_id(snode.get_id()) @@ -188,40 +214,72 @@ def add(lvol_id, snapshot_name): logger.error(f"Failed to delete snap from node: {snode.get_id()}") return False, msg - snap = SnapShot() - snap.uuid = str(uuid.uuid4()) - snap.snap_uuid = snap_uuid - snap.size = size - snap.used_size = used_size - snap.blobid = blobid - snap.pool_uuid = pool.get_id() - snap.cluster_id = pool.cluster_id - snap.snap_name = snapshot_name - snap.snap_bdev = f"{lvol.lvs_name}/{snap_bdev_name}" - snap.created_at = int(time.time()) - snap.lvol = lvol - snap.fabric = lvol.fabric - snap.vuid = snap_vuid - snap.status = SnapShot.STATUS_ONLINE - - snap.write_to_db(db_controller.kv_store) - - if lvol.cloned_from_snap: + snap = SnapShot() + snap.uuid = str(uuid.uuid4()) + snap.snap_uuid = snap_uuid + snap.size = size + snap.used_size = used_size + snap.blobid = blobid + snap.pool_uuid = pool.get_id() + snap.cluster_id = pool.cluster_id + snap.snap_name = snapshot_name + snap.snap_bdev = f"{lvol.lvs_name}/{snap_bdev_name}" + snap.created_at = int(time.time()) + snap.lvol = lvol + snap.fabric = lvol.fabric + snap.vuid = snap_vuid + snap.node_id = node_id + snap.status = SnapShot.STATUS_ONLINE + snap.predecessor = lvol.last_snapshot_uuid + snap.successor.append(SnapshotRef.TYPE_LVOL,lvol.uuid) + tr = db_controller.kv_store.create_transaction() + snap.write_to_db(db_controller.kv_store,tr) + pred = db_controller.get_snapshot_by_id(lvol.last_snapshot_uuid) + for p in pred.successor: + if p.next == lvol.uuid: + p.next = snap.uuid + + pred.write_to_db(db_controller.kv_store,tr) + + lvol.last_snapshot_uuid = snap.uuid + lvol.write_to_db(db_controller.kv_store,tr) + + if lvol.cloned_from_snap: original_snap = db_controller.get_snapshot_by_id(lvol.cloned_from_snap) if original_snap: if original_snap.snap_ref_id: original_snap = db_controller.get_snapshot_by_id(original_snap.snap_ref_id) original_snap.ref_count += 1 - original_snap.write_to_db(db_controller.kv_store) + original_snap.write_to_db(db_controller.kv_store,tr) snap.snap_ref_id = original_snap.get_id() - snap.write_to_db(db_controller.kv_store) + snap.write_to_db(db_controller.kv_store,tr) + + tr.commit().wait() + + #still should move to asynchronous delete just in case, if fdb fails AND + #at the same time primary goes down we could have inconsistency + #also, if the webapi container dies exactly during execution, this can be a problem + + except Exception as e: + try: + rpc_client = RPCClient( + primary_node.mgmt_ip, primary_node.rpc_port, primary_node.rpc_username, primary_node.rpc_password) + rpc_client.delete_lvol(f"{lvol.lvs_name}/{snap_bdev_name}") + except: + raise logger.error(f"exception creating creating snapshot: {snap.uuid}") + try: + sec_rpc_client = RPCClient( + secondary_node.mgmt_ip, secondary_node.rpc_port, secondary_node.rpc_username, secondary_node.rpc_password) + sec_rpc_client.delete_lvol(f"{lvol.lvs_name}/{snap_bdev_name}") + except: + raise logger.error(f"exception creating creating snapshot: {snap.uuid}") + raise logger.error(f"exception creating creating snapshot: {snap.uuid}") logger.info("Done") snapshot_events.snapshot_create(snap) return snap.uuid, False - def list(all=False): snaps = db_controller.get_snapshots() data = [] @@ -250,6 +308,10 @@ def delete(snapshot_uuid, force_delete=False): logger.error(f"Snapshot not found {snapshot_uuid}") return False + if snap.frozen: + logger.warning(f"lvol in migration. cannot delete snapshot {snap.uuid}") + return False + try: snode = db_controller.get_storage_node_by_id(snap.lvol.node_id) except KeyError: @@ -356,12 +418,21 @@ def delete(snapshot_uuid, force_delete=False): def clone(snapshot_id, clone_name, new_size=0, pvc_name=None, pvc_namespace=None): - try: + + + + try: snap = db_controller.get_snapshot_by_id(snapshot_id) - except KeyError as e: + except KeyError as e: logger.error(e) return False, str(e) + if snap.frozen: + logger.warning(f"lvol in migration. cannot create clone {snap.uuid}") + return False + + node_lock = get_node_lock(snap.lvol.node_id) + with node_lock: try: pool = db_controller.get_pool_by_id(snap.lvol.pool_uuid) except KeyError: @@ -375,90 +446,97 @@ def clone(snapshot_id, clone_name, new_size=0, pvc_name=None, pvc_namespace=None return False, msg try: - snode = db_controller.get_storage_node_by_id(snap.lvol.node_id) + snode = db_controller.get_storage_node_by_id(snap.lvol.node_id) except KeyError: msg = 'Storage node not found' logger.exception(msg) return False, msg - cluster = db_controller.get_cluster_by_id(pool.cluster_id) - if cluster.status not in [cluster.STATUS_ACTIVE, cluster.STATUS_DEGRADED]: + try: + cluster = db_controller.get_cluster_by_id(pool.cluster_id) + if cluster.status not in [cluster.STATUS_ACTIVE, cluster.STATUS_DEGRADED]: return False, f"Cluster is not active, status: {cluster.status}" - ref_count = snap.ref_count - if snap.snap_ref_id: + ref_count = snap.ref_count + if snap.snap_ref_id: ref_snap = db_controller.get_snapshot_by_id(snap.snap_ref_id) ref_count = ref_snap.ref_count - if ref_count >= constants.MAX_SNAP_COUNT: + if ref_count >= constants.MAX_SNAP_COUNT: msg = f"Can not create more than {constants.MAX_SNAP_COUNT} clones from this snapshot" logger.error(msg) return False, msg - for lvol in db_controller.get_lvols(): + for lvol in db_controller.get_lvols(): if lvol.pool_uuid == pool.get_id(): if lvol.lvol_name == clone_name: msg=f"LVol name must be unique: {clone_name}" logger.error(msg) return False, msg - size = snap.size - if 0 < pool.lvol_max_size < size: + size = snap.size + if 0 < pool.lvol_max_size < size: msg = f"Pool Max LVol size is: {utils.humanbytes(pool.lvol_max_size)}, LVol size: {utils.humanbytes(size)} must be below this limit" logger.error(msg) return False, msg - if pool.pool_max_size > 0: + if pool.pool_max_size > 0: total = pool_controller.get_pool_total_capacity(pool.get_id()) if total + size > pool.pool_max_size: msg = f"Invalid LVol size: {utils.humanbytes(size)}. Pool max size has reached {utils.humanbytes(total+size)} of {utils.humanbytes(pool.pool_max_size)}" logger.error(msg) return False, msg - lvol_count = len(db_controller.get_lvols_by_node_id(snode.get_id())) - if lvol_count >= snode.max_lvol: + lvol_count = len(db_controller.get_lvols_by_node_id(snode.get_id())) + if lvol_count >= snode.max_lvol: error = f"Too many lvols on node: {snode.get_id()}, max lvols reached: {lvol_count}" logger.error(error) return False, error - if pool.pool_max_size > 0: + if pool.pool_max_size > 0: total = pool_controller.get_pool_total_capacity(pool.get_id()) if total + snap.lvol.size > pool.pool_max_size: msg = f"Pool max size has reached {utils.humanbytes(total)} of {utils.humanbytes(pool.pool_max_size)}" logger.error(msg) return False, msg - lvol = LVol() - lvol.uuid = str(uuid.uuid4()) - lvol.lvol_name = clone_name - lvol.size = snap.lvol.size - lvol.max_size = snap.lvol.max_size - lvol.base_bdev = snap.lvol.base_bdev - lvol.lvol_bdev = f"CLN_{utils.get_random_vuid()}" - lvol.lvs_name = snap.lvol.lvs_name - lvol.top_bdev = f"{lvol.lvs_name}/{lvol.lvol_bdev}" - lvol.hostname = snode.hostname - lvol.node_id = snode.get_id() - lvol.nodes = snap.lvol.nodes - lvol.mode = 'read-write' - lvol.cloned_from_snap = snapshot_id - lvol.nqn = cluster.nqn + ":lvol:" + lvol.uuid - lvol.pool_uuid = pool.get_id() - lvol.ha_type = snap.lvol.ha_type - lvol.lvol_type = 'lvol' - lvol.guid = utils.generate_hex_string(16) - lvol.vuid = snap.lvol.vuid - lvol.snapshot_name = snap.snap_bdev - lvol.subsys_port = snap.lvol.subsys_port - lvol.fabric = snap.fabric - - if pvc_name: + lvol = LVol() + lvol.uuid = str(uuid.uuid4()) + lvol.lvol_name = clone_name + lvol.size = snap.lvol.size + lvol.max_size = snap.lvol.max_size + lvol.base_bdev = snap.lvol.base_bdev + lvol.lvol_bdev = f"CLN_{utils.get_random_vuid()}" + lvol.lvs_name = snap.lvol.lvs_name + lvol.top_bdev = f"{lvol.lvs_name}/{lvol.lvol_bdev}" + lvol.hostname = snode.hostname + lvol.node_id = snode.get_id() + lvol.nodes = snap.lvol.nodes + lvol.mode = 'read-write' + lvol.cloned_from_snap = snapshot_id + lvol.nqn = cluster.nqn + ":lvol:" + lvol.uuid + lvol.pool_uuid = pool.get_id() + lvol.ha_type = snap.lvol.ha_type + lvol.lvol_type = 'lvol' + lvol.guid = utils.generate_hex_string(16) + lvol.vuid = snap.lvol.vuid + lvol.snapshot_name = snap.snap_bdev + lvol.subsys_port = snap.lvol.subsys_port + lvol.fabric = snap.fabric + lvol.last_snapshot_uuid=snap.snap_uuid + + su = SnapshotRef() + su.type = SnapshotRef.TYPE_CLONE + su.next = lvol.uuid + snap.successor.append(su) + + if pvc_name: lvol.pvc_name = pvc_name - if pvc_namespace: + if pvc_namespace: lvol.namespace = pvc_namespace - lvol.status = LVol.STATUS_IN_CREATION - lvol.bdev_stack = [ + lvol.status = LVol.STATUS_IN_CREATION + lvol.bdev_stack = [ { "type": "bdev_lvol_clone", "name": lvol.top_bdev, @@ -467,9 +545,9 @@ def clone(snapshot_id, clone_name, new_size=0, pvc_name=None, pvc_namespace=None "clone_name": lvol.lvol_bdev } } - ] + ] - if snap.lvol.crypto_bdev: + if snap.lvol.crypto_bdev: lvol.crypto_bdev = f"crypto_{lvol.lvol_bdev}" lvol.bdev_stack.append({ "type": "crypto", @@ -486,7 +564,7 @@ def clone(snapshot_id, clone_name, new_size=0, pvc_name=None, pvc_namespace=None lvol.crypto_key1 = snap.lvol.crypto_key1 lvol.crypto_key2 = snap.lvol.crypto_key2 - if new_size: + if new_size: if snap.lvol.size >= new_size: msg = f"New size {new_size} must be higher than the original size {snap.lvol.size}" logger.error(msg) @@ -498,17 +576,21 @@ def clone(snapshot_id, clone_name, new_size=0, pvc_name=None, pvc_namespace=None return False, msg lvol.size = new_size - lvol.write_to_db(db_controller.kv_store) + tr = db_controller.kv_store.create_transaction() + lvol.write_to_db(db_controller.kv_store,tr) + snap.write_to_db(db_controller.kv_store,tr) - if lvol.ha_type == "single": + if lvol.ha_type == "single": lvol_bdev, error = lvol_controller.add_lvol_on_node(lvol, snode) if error: + msg = f"Could not add lvol on node {lvol.uuid}" + logger.error(msg) return False, error lvol.nodes = [snode.get_id()] lvol.lvol_uuid = lvol_bdev['uuid'] lvol.blobid = lvol_bdev['driver_specific']['lvol']['blobid'] - if lvol.ha_type == "ha": + if lvol.ha_type == "ha": host_node = snode lvol.nodes = [host_node.get_id(), host_node.secondary_node_id] primary_node = None @@ -521,7 +603,6 @@ def clone(snapshot_id, clone_name, new_size=0, pvc_name=None, pvc_namespace=None if sec_node.status == StorageNode.STATUS_DOWN: msg = "Secondary node is in down status, can not clone snapshot" logger.error(msg) - lvol.remove(db_controller.kv_store) return False, msg if sec_node.status == StorageNode.STATUS_ONLINE: @@ -557,8 +638,8 @@ def clone(snapshot_id, clone_name, new_size=0, pvc_name=None, pvc_namespace=None if primary_node: lvol_bdev, error = lvol_controller.add_lvol_on_node(lvol, primary_node) if error: - logger.error(error) - lvol.remove(db_controller.kv_store) + msg = f"Cannot add lvol to node: {lvol.uuid}" + logger.error(msg) return False, error lvol.lvol_uuid = lvol_bdev['uuid'] @@ -567,21 +648,25 @@ def clone(snapshot_id, clone_name, new_size=0, pvc_name=None, pvc_namespace=None if secondary_node: lvol_bdev, error = lvol_controller.add_lvol_on_node(lvol, secondary_node, is_primary=False) if error: + msg = f"Cannot add lvol to secondary node: {lvol.uuid}" logger.error(error) - lvol.remove(db_controller.kv_store) return False, error - lvol.status = LVol.STATUS_ONLINE - lvol.write_to_db(db_controller.kv_store) + lvol.status = LVol.STATUS_ONLINE + lvol.write_to_db(db_controller.kv_store) - if snap.snap_ref_id: + if snap.snap_ref_id: ref_snap = db_controller.get_snapshot_by_id(snap.snap_ref_id) ref_snap.ref_count += 1 ref_snap.write_to_db(db_controller.kv_store) - else: + else: snap.ref_count += 1 snap.write_to_db(db_controller.kv_store) + tr.commit.wait() + except: + raise f"could not create clone: {lvol.uuid}" + logger.info("Done") snapshot_events.snapshot_clone(snap, lvol) if new_size: diff --git a/simplyblock_core/db_controller.py b/simplyblock_core/db_controller.py index 277d1b68a..5cf48e976 100644 --- a/simplyblock_core/db_controller.py +++ b/simplyblock_core/db_controller.py @@ -8,6 +8,7 @@ from simplyblock_core.models.cluster import Cluster from simplyblock_core.models.events import EventObj from simplyblock_core.models.job_schedule import JobSchedule +from simplyblock_core.models.lvol_migration import MigrationObject from simplyblock_core.models.lvol_model import LVol from simplyblock_core.models.mgmt_node import MgmtNode from simplyblock_core.models.nvme_device import NVMeDevice, JMDevice @@ -19,8 +20,6 @@ PoolStatObject, CachedLVolStatObject from simplyblock_core.models.storage_node import StorageNode - - class Singleton(type): _instances = {} # type: ignore def __call__(cls, *args, **kwargs): @@ -32,8 +31,6 @@ def __call__(cls, *args, **kwargs): cls._instances[cls] = ins return ins - - class DBController(metaclass=Singleton): kv_store=None @@ -270,7 +267,7 @@ def get_snapshots_by_node_id(self, node_id) -> List[SnapShot]: ret = [] snaps = SnapShot().read_from_db(self.kv_store) for snap in snaps: - if snap.lvol.node_id == node_id: + if snap.node_id == node_id: ret.append(snap) return ret @@ -309,3 +306,10 @@ def get_qos(self, cluster_id=None) -> List[QOSClass]: else: classes = QOSClass().read_from_db(self.kv_store) return sorted(classes, key=lambda x: x.class_id) + + def get_migrations(self) -> List[MigrationObject]: + ret = MigrationObject().read_from_db(self.kv_store) + migrations = [] + for m in ret: + migrations.append(m) + return sorted(migrations, key=lambda x: x.create_dt) diff --git a/simplyblock_core/models/base_model.py b/simplyblock_core/models/base_model.py index 23640c816..b5c9b171d 100644 --- a/simplyblock_core/models/base_model.py +++ b/simplyblock_core/models/base_model.py @@ -143,18 +143,36 @@ def get_last(self, kv_store): return objects[0] return None - def write_to_db(self, kv_store=None): - if not kv_store: + def write_to_db(self, kv_store=None, transaction=None): + """ + Write this object to FDB. Can either: + - use an existing transaction (transaction parameter), or + - use kv_store for a standalone write. + + Note: If you pass a transaction, commit must be done outside this function. + """ + if not kv_store and not transaction: from simplyblock_core.db_controller import DBController kv_store = DBController().kv_store + try: prefix = self.get_db_id() st = json.dumps(self.to_dict()) - kv_store.set(prefix.encode(), st.encode()) + + if transaction: + # Use the existing transaction, don't commit here + transaction.set(prefix.encode(), st.encode()) + else: + # Create a new transaction for standalone write + tr = kv_store.create_transaction() + tr.set(prefix.encode(), st.encode()) + tr.commit().wait() + return True + except Exception as e: print(f"Error Writing to FDB! {e}") - exit(1) + raise # Better than exit, let caller handle def remove(self, kv_store): prefix = self.get_db_id() diff --git a/simplyblock_core/models/lvol_migration.py b/simplyblock_core/models/lvol_migration.py new file mode 100644 index 000000000..f8cfe456a --- /dev/null +++ b/simplyblock_core/models/lvol_migration.py @@ -0,0 +1,95 @@ +from __future__ import annotations +from dataclasses import dataclass +from enum import Enum +from typing import List +import storage_node +from base_model import * +from simplyblock_core.models.lvol_model import LVol +from simplyblock_core.models.snapshot import SnapShot +from simplyblock_core.rpc_client import RPCClient + + +# --------------------------------------------------------------------------- +# ENUMS +# --------------------------------------------------------------------------- + +class MigrationState(str, Enum): + NEW = 0 + PREPARING = 1 + RUNNING = 2 + SNAPS_MIGRATED = 3 + TARGET_LVOL_CREATED = 4 + HUBLVOL_CONNECTED = 5 + TRANSFERRED_TO_TARGET = 6 + RECONNECT_DONE = 6 + CLEANUP = 7 + FAILED = 8 + FAILED_AND_CLEANED = 9 + DONE = 10 + +class ObjectMigrationState(str, Enum): + NEW = 1 + LVOL_CREATED = 2 + MIG_FLAG_SET = 3 + NAMESPACE_CREATED = 4 + NQN_CREATED = 5 + LVOL_CONNECTED = 6 + LVOL_EXPORTED = 7 + TRANSFER = 8 + RETRANSFER = 9 + TRANSFERRED = 10 + CONVERTED = 11 + CLEANING = 12 + FAILED = 13 + DONE = 14 + +# --------------------------------------------------------------------------- +# DATA MODELS +# --------------------------------------------------------------------------- + +@dataclass +class LogicalVolumeRef: + """Reference to a logical volume participating in a migration.""" + lvol: LVol = None + target_uuid: str = "" + mapid: int = 0 + state : ObjectMigrationState = ObjectMigrationState.NEW + retry : int = 0 + +@dataclass +class Snapshot: + snap: SnapShot = None + lvol: LogicalVolumeRef = None + controller: str = "" + target_uuid: str = "" + retry : int = 0 + # Migration metadata + temporary_nqn: str = "" + status: ObjectMigrationState = ObjectMigrationState.NEW + +@dataclass +class MigrationObject(BaseModel): + """ + Full migration object, containing multiple streams and logical volumes. + Snapshots exist independently and are referenced by streams. + """ + status: MigrationState = MigrationState.NEW + pre_status: MigrationState = MigrationState.NEW + + vol : LogicalVolumeRef = None + node_pri : storage_node.StorageNode = None + node_sec: storage_node.StorageNode = None + target_node_pri: storage_node.StorageNode = None + target_node_sec: storage_node.StorageNode = None + rpc_client1: RPCClient = None + rpc_client2: RPCClient = None + rpc_client3: RPCClient = None + rpc_client4: RPCClient = None + + # Global snapshot objects (shared across streams) + snapshots: List[Snapshot] = None + # Async queue for polling migration completion (set externally) + completion_poll_queue: List[Snapshot] = None + rerun : int = 0 + + diff --git a/simplyblock_core/models/lvol_model.py b/simplyblock_core/models/lvol_model.py index f84091473..b8b06bc37 100644 --- a/simplyblock_core/models/lvol_model.py +++ b/simplyblock_core/models/lvol_model.py @@ -5,8 +5,48 @@ from simplyblock_core.models.base_model import BaseModel from simplyblock_core.models.nvme_device import NVMeDevice +import copy class LVol(BaseModel): + # your class-level constants and annotations... + + def __copy__(self): + """ + Shallow copy: + - BaseModel fields copied + - Mutable fields (list, dict, set) shallow-copied so references do NOT leak + """ + new = type(self)() + + for attr in self.get_attrs_map(): + value = getattr(self, attr) + + # Copy containers to avoid unintended shared references + if isinstance(value, (dict, list, set)): + value = value.copy() + + setattr(new, attr, value) + + return new + + def __deepcopy__(self, memo): + """ + Deep copy: + - Recursively duplicates everything + - Uses memo to avoid infinite recursion + """ + if id(self) in memo: + return memo[id(self)] + + new = type(self)() + memo[id(self)] = new + + for attr in self.get_attrs_map(): + value = getattr(self, attr) + setattr(new, attr, copy.deepcopy(value, memo)) + + return new + STATUS_IN_CREATION = 'in_creation' STATUS_ONLINE = 'online' @@ -20,7 +60,6 @@ class LVol(BaseModel): STATUS_IN_CREATION: 4, } - base_bdev: str = "" bdev_stack: List = [] blobid: int = 0 cloned_from_snap: str = "" @@ -66,6 +105,8 @@ class LVol(BaseModel): fabric: str = "tcp" ndcs: int = 0 npcs: int = 0 + frozen: bool = False + last_snapshot_uuid: str = "" def has_qos(self): return (self.rw_ios_per_sec > 0 or self.rw_mbytes_per_sec > 0 or self.r_mbytes_per_sec > 0 or self.w_mbytes_per_sec > 0) diff --git a/simplyblock_core/models/snapshot.py b/simplyblock_core/models/snapshot.py index 1da571ec8..378d13b89 100644 --- a/simplyblock_core/models/snapshot.py +++ b/simplyblock_core/models/snapshot.py @@ -2,6 +2,16 @@ from simplyblock_core.models.base_model import BaseModel from simplyblock_core.models.lvol_model import LVol +from typing import List +import copy + +class SnapshotRef(): + + TYPE_LVOL = "lvol" + TYPE_CLONE = "clone" + TYPE_SNAP = "snap" + type: str + next: str class SnapShot(BaseModel): @@ -21,11 +31,42 @@ class SnapShot(BaseModel): ref_count: int = 0 size: int = 0 used_size: int = 0 - snap_bdev: str = "" - snap_name: str = "" + snap_bdev: str = "" #snapshot relative name (part without lvstore) + snap_name: str = "" #snapshot full name snap_ref_id: str = "" snap_uuid: str = "" vuid: int = 0 deletion_status: str = "" status: str = "" fabric: str = "tcp" + frozen: bool = False + node_id : str = "" + successor : List[SnapshotRef] = [] + predecessor: str = "" + + def __copy__(self): + # Shallow copy + # 1. Copy base class attributes + base_copy = super().__copy__() if hasattr(super(), '__copy__') else type(self)() + + # 2. Copy derived attributes + new = type(self)() + for attr in self.get_attrs_map(): + value = getattr(self, attr) + # Shallow copy for mutable types + if isinstance(value, (dict, list, set)): + value = value.copy() + setattr(new, attr, value) + return new + + def __deepcopy__(self, memo): + # 1. Deep copy base attributes + base_copy = super().__deepcopy__(memo) if hasattr(super(), '__deepcopy__') else type(self)() + + # 2. Deep copy derived attributes + new = type(self)() + memo[id(self)] = new + for attr in self.get_attrs_map(): + value = getattr(self, attr) + setattr(new, attr, copy.deepcopy(value, memo)) + return new diff --git a/simplyblock_core/rpc_client.py b/simplyblock_core/rpc_client.py index ce48e1796..e1269d2fc 100644 --- a/simplyblock_core/rpc_client.py +++ b/simplyblock_core/rpc_client.py @@ -1229,3 +1229,97 @@ def nvmf_port_unblock_rdma(self, port): def nvmf_get_blocked_ports_rdma(self): return self._request("nvmf_get_blocked_ports") + + def bdev_lvol_final_migration( + self, + lvol_name: str, + lvol_id: str, + snapshot_name: str, + batch: int, + nqn: str + ): + params = { + "lvol_name": lvol_name, + "lvol_id": lvol_id, + "snapshot_name": snapshot_name, + "cluster_batch": batch, + "gateway": nqn + } + return self._request("bdev_lvol_final_migration", params) + + def bdev_lvol_set_migration_flag(self, lvol_name: str): + params = { + "lvol_name": lvol_name + } + return self._request("bdev_lvol_set_migration_flag", params) + + def bdev_lvol_convert(self, lvol_name: str): + params = { + "lvol_name": lvol_name + } + return self._request("bdev_lvol_convert", params) + + def bdev_lvol_add_clone(self, clone_name: str, source_lvol_name: str): + params = { + "clone_name": clone_name, + "child_name": source_lvol_name + } + return self._request("bdev_lvol_add_clone", params) + + + def bdev_lvol_transfer(self, lvolname: str,o: int,batch: int,gw: str, op: str): + params = { + "lvol_name" : lvolname, + "offset" : o, + "cluster_batch" : batch, + "gateway" :gw, + "operation" : op, + } + return self._request("bdev_lvol_transfer", params) + + def bdev_lvol_get_lvols( + self, + lvs: str): + params = { + "lvs_uuid": lvs, + } + return self._request("bdev_lvol_get_lvols", params) + + def bdev_lvol_transfer_stat( + self, + name: str): + params = { + "lvol_name": name, + } + return self._request("bdev_lvol_transfer_stat", params) + + def lvol_exists(self, lvs_name, name): + params = { + "lvs_name": lvs_name, + } + ret = self._request("bdev_lvol_get_lvols", params) + if not ret or "error" in ret: + raise RuntimeError(ret["error"]) + for lvol in ret["result"]: + if lvol.get("name") == name: + if lvol["map_id"]: + return lvol["uuid"],lvol["map_id"] + else: + return lvol["uuid"],None + return None, None + + def nvmf_get_subsystems( + self): + params = { + } + return self._request("bdev_lvol_transfer_stat", params) + + def find_subsystem_by_nqn(self, nqn_to_find): + subsystems = self.nvmf_get_subsystems() + for subsystem in subsystems: + if subsystem.get("nqn") == nqn_to_find: + listener = subsystem.get("listen_addresses", [None])[0] if subsystem.get("listen_addresses") else None + nsid = subsystem.get("namespaces", [None])[0].get("nsid") if subsystem.get("namespaces") else None + return subsystem, listener, nsid + return None, None, None + diff --git a/simplyblock_core/storage_node_ops.py b/simplyblock_core/storage_node_ops.py index a6d89b74d..ea9186960 100644 --- a/simplyblock_core/storage_node_ops.py +++ b/simplyblock_core/storage_node_ops.py @@ -22,10 +22,12 @@ from simplyblock_core.constants import LINUX_DRV_MASS_STORAGE_NVME_TYPE_ID, LINUX_DRV_MASS_STORAGE_ID from simplyblock_core.controllers import lvol_controller, storage_events, snapshot_controller, device_events, \ device_controller, tasks_controller, health_controller, tcp_ports_events, qos_controller +from simplyblock_core.controllers.lvol_migration_controller import MigrationController from simplyblock_core.db_controller import DBController from simplyblock_core.fw_api_client import FirewallClient from simplyblock_core.models.iface import IFace from simplyblock_core.models.job_schedule import JobSchedule +from simplyblock_core.models.lvol_migration import MigrationState from simplyblock_core.models.lvol_model import LVol from simplyblock_core.models.nvme_device import NVMeDevice, JMDevice from simplyblock_core.models.snapshot import SnapShot @@ -126,6 +128,15 @@ def connect_device(name: str, device: NVMeDevice, node: StorageNode, bdev_names: return bdev_name +#if a node was rebooted during an ongoing migration, +def restart_migration(node:StorageNode): + db_controller = DBController() + migs=db_controller.get_migrations() + for m in migs: + if m.node_pri==node.uuid: + if m.status!=MigrationState.DONE: + add_task() + return def get_next_cluster_device_order(db_controller, cluster_id): max_order = 0 @@ -2045,6 +2056,7 @@ def restart_storage_node( online_devices_list.append(dev.get_id()) if online_devices_list: tasks_controller.add_device_mig_task(online_devices_list, snode.cluster_id) + restart_migration(snode) return True